Principle:Apache Beam Execution Monitoring
| Property | Value |
|---|---|
| Principle Name | Execution Monitoring |
| Category | Monitoring, Job_Management |
| Related Implementation | Implementation:Apache_Beam_JobServicePipelineResult |
| Source Reference | runners/portability/.../JobServicePipelineResult.java L43-212 |
| last_updated | 2026-02-09 04:00 GMT |
Overview
Execution Monitoring is the process of polling a remote job service for pipeline state transitions and collecting execution metrics. After job submission, the SDK client uses this process to track pipeline progress, detect completion or failure, and gather performance data.
Description
After job submission, the client monitors execution by periodically polling the job service for state updates. The client polls getState() every 3 seconds until reaching a terminal state (DONE, FAILED, CANCELLED). Metrics are collected from MonitoringInfo protobuf messages and converted to SDK MetricResult objects.
The monitoring loop is implemented in JobServicePipelineResult:
// JobServicePipelineResult.waitForTerminalState() at L143-160
private void waitForTerminalState() {
JobServiceBlockingStub stub =
jobService.get().withDeadlineAfter(jobServerTimeout, TimeUnit.SECONDS);
GetJobStateRequest request = GetJobStateRequest.newBuilder().setJobIdBytes(jobId).build();
JobStateEvent response = stub.getState(request);
State lastState = getJavaState(response.getState());
while (!lastState.isTerminal()) {
Thread.sleep(POLL_INTERVAL_MS); // 3000ms
response = stub.withDeadlineAfter(jobServerTimeout, TimeUnit.SECONDS).getState(request);
lastState = getJavaState(response.getState());
}
terminalState = lastState;
}
Polling Architecture
| Parameter | Value | Description |
|---|---|---|
| Poll interval | 3,000 ms | Time between consecutive getState() calls
|
| RPC timeout | Configurable via jobServerTimeout |
Deadline for each individual getState() RPC
|
| Terminal states | DONE, FAILED, CANCELLED, UPDATED | States that end the polling loop |
| Timed wait | Supported via waitUntilFinish(Duration) |
Allows the client to specify a maximum wait time |
State Mapping
The monitoring layer maps protobuf job states to Java SDK PipelineResult.State values:
| Proto State | Java State | Terminal? |
|---|---|---|
UNSPECIFIED |
UNKNOWN |
No |
STOPPED |
STOPPED |
No |
RUNNING |
RUNNING |
No |
STARTING |
RUNNING |
No |
DONE |
DONE |
Yes |
FAILED |
FAILED |
Yes |
CANCELLED |
CANCELLED |
Yes |
CANCELLING |
CANCELLED |
No |
UPDATED |
UPDATED |
Yes |
DRAINING |
UNKNOWN |
No |
DRAINED |
UNKNOWN |
No |
Metrics Collection
Upon reaching a terminal state, the client fetches metrics from the job service and converts them using PortableMetrics:
// JobServicePipelineResult.close() at L130-141
public void close() {
try (CloseableResource<JobServiceBlockingStub> jobService = this.jobService) {
JobApi.GetJobMetricsRequest metricsRequest =
JobApi.GetJobMetricsRequest.newBuilder().setJobIdBytes(jobId).build();
jobMetrics = jobService.get().getJobMetrics(metricsRequest).getMetrics();
if (cleanup != null) {
cleanup.run();
}
}
}
The PortableMetrics class converts MonitoringInfo protobuf messages into five metric types:
| Metric Type | Proto Type URN | SDK Result Type |
|---|---|---|
| Counters | SUM_INT64_TYPE |
MetricResult<Long>
|
| Distributions | DISTRIBUTION_INT64_TYPE |
MetricResult<DistributionResult>
|
| Gauges | LATEST_INT64_TYPE |
MetricResult<GaugeResult>
|
| String Sets | SET_STRING_TYPE |
MetricResult<StringSetResult>
|
| Bounded Tries | BOUNDED_TRIE_TYPE |
MetricResult<BoundedTrieResult>
|
Metrics are deduplicated by combining attempted and committed values. When both exist for the same metric identity (step + namespace + name), the committed value takes precedence.
Error Propagation
When the job reaches a non-DONE terminal state, the client retrieves the message stream and propagates error messages:
// JobServicePipelineResult.propagateErrors() at L162-180
private void propagateErrors() {
if (terminalState != State.DONE) {
Iterator<JobMessagesResponse> messageStreamIterator =
jobService.get()
.withDeadlineAfter(jobServerTimeout, TimeUnit.SECONDS)
.getMessageStream(messageStreamRequest);
while (messageStreamIterator.hasNext()) {
JobMessage messageResponse = messageStreamIterator.next().getMessageResponse();
if (messageResponse.getImportance() == JobMessage.MessageImportance.JOB_MESSAGE_ERROR) {
throw new RuntimeException(
"The Runner experienced the following error during execution:\n"
+ messageResponse.getMessageText());
}
}
}
}
Usage
Execution monitoring is used after job submission to track pipeline progress, collect metrics, and detect completion or failure.
Typical Client Pattern
// User code
PipelineResult result = pipeline.run();
// Blocks until the job reaches a terminal state
State finalState = result.waitUntilFinish();
// Collect metrics after completion
MetricResults metrics = result.metrics();
MetricQueryResults queryResults = metrics.queryMetrics(MetricsFilter.builder().build());
Timed Waiting
For pipelines that may run indefinitely (streaming), clients can specify a timeout:
// Returns null on timeout, terminal State on completion
State state = result.waitUntilFinish(Duration.standardMinutes(30));
if (state == null) {
// Timed out, pipeline is still running
result.cancel();
}
Theoretical Basis
Execution Monitoring is based on the polling pattern for asynchronous job monitoring:
- Pull-Based Monitoring -- The client acts as an observer, periodically checking state until completion. Pull-based monitoring (polling) is simpler and more reliable than push-based (callbacks) across network boundaries. It naturally handles network partitions: a missed poll simply delays detection rather than losing events.
- Fixed-Interval Polling -- The 3-second interval balances responsiveness with efficiency. More frequent polling would waste network resources; less frequent polling would delay completion detection.
- Terminal State Detection -- The
isTerminal()predicate provides a clear stopping condition for the polling loop, preventing infinite loops on unexpected states. - Metric Deduplication -- The committed-wins-over-attempted strategy implements last-writer-wins conflict resolution, ensuring that the most accurate metric values are presented to the user.
- Resource Cleanup -- The
AutoCloseablepattern ensures that the gRPC channel and any loopback worker services are properly shut down when monitoring completes, regardless of whether the job succeeded or failed.
Related Pages
- Implementation:Apache_Beam_JobServicePipelineResult -- The concrete polling and metrics collection implementation
- Principle:Apache_Beam_Job_Execution -- Execution must start before monitoring can begin
- Principle:Apache_Beam_Job_Service_Connection -- The gRPC connection used for state polling
- Principle:Apache_Beam_Pipeline_Translation -- Metrics reference transform names from the translated pipeline