Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Principle:Apache Beam Execution Monitoring

From Leeroopedia


Property Value
Principle Name Execution Monitoring
Category Monitoring, Job_Management
Related Implementation Implementation:Apache_Beam_JobServicePipelineResult
Source Reference runners/portability/.../JobServicePipelineResult.java L43-212
last_updated 2026-02-09 04:00 GMT

Overview

Execution Monitoring is the process of polling a remote job service for pipeline state transitions and collecting execution metrics. After job submission, the SDK client uses this process to track pipeline progress, detect completion or failure, and gather performance data.

Description

After job submission, the client monitors execution by periodically polling the job service for state updates. The client polls getState() every 3 seconds until reaching a terminal state (DONE, FAILED, CANCELLED). Metrics are collected from MonitoringInfo protobuf messages and converted to SDK MetricResult objects.

The monitoring loop is implemented in JobServicePipelineResult:

// JobServicePipelineResult.waitForTerminalState() at L143-160
private void waitForTerminalState() {
    JobServiceBlockingStub stub =
        jobService.get().withDeadlineAfter(jobServerTimeout, TimeUnit.SECONDS);
    GetJobStateRequest request = GetJobStateRequest.newBuilder().setJobIdBytes(jobId).build();
    JobStateEvent response = stub.getState(request);
    State lastState = getJavaState(response.getState());
    while (!lastState.isTerminal()) {
        Thread.sleep(POLL_INTERVAL_MS);  // 3000ms
        response = stub.withDeadlineAfter(jobServerTimeout, TimeUnit.SECONDS).getState(request);
        lastState = getJavaState(response.getState());
    }
    terminalState = lastState;
}

Polling Architecture

Parameter Value Description
Poll interval 3,000 ms Time between consecutive getState() calls
RPC timeout Configurable via jobServerTimeout Deadline for each individual getState() RPC
Terminal states DONE, FAILED, CANCELLED, UPDATED States that end the polling loop
Timed wait Supported via waitUntilFinish(Duration) Allows the client to specify a maximum wait time

State Mapping

The monitoring layer maps protobuf job states to Java SDK PipelineResult.State values:

Proto State Java State Terminal?
UNSPECIFIED UNKNOWN No
STOPPED STOPPED No
RUNNING RUNNING No
STARTING RUNNING No
DONE DONE Yes
FAILED FAILED Yes
CANCELLED CANCELLED Yes
CANCELLING CANCELLED No
UPDATED UPDATED Yes
DRAINING UNKNOWN No
DRAINED UNKNOWN No

Metrics Collection

Upon reaching a terminal state, the client fetches metrics from the job service and converts them using PortableMetrics:

// JobServicePipelineResult.close() at L130-141
public void close() {
    try (CloseableResource<JobServiceBlockingStub> jobService = this.jobService) {
        JobApi.GetJobMetricsRequest metricsRequest =
            JobApi.GetJobMetricsRequest.newBuilder().setJobIdBytes(jobId).build();
        jobMetrics = jobService.get().getJobMetrics(metricsRequest).getMetrics();
        if (cleanup != null) {
            cleanup.run();
        }
    }
}

The PortableMetrics class converts MonitoringInfo protobuf messages into five metric types:

Metric Type Proto Type URN SDK Result Type
Counters SUM_INT64_TYPE MetricResult<Long>
Distributions DISTRIBUTION_INT64_TYPE MetricResult<DistributionResult>
Gauges LATEST_INT64_TYPE MetricResult<GaugeResult>
String Sets SET_STRING_TYPE MetricResult<StringSetResult>
Bounded Tries BOUNDED_TRIE_TYPE MetricResult<BoundedTrieResult>

Metrics are deduplicated by combining attempted and committed values. When both exist for the same metric identity (step + namespace + name), the committed value takes precedence.

Error Propagation

When the job reaches a non-DONE terminal state, the client retrieves the message stream and propagates error messages:

// JobServicePipelineResult.propagateErrors() at L162-180
private void propagateErrors() {
    if (terminalState != State.DONE) {
        Iterator<JobMessagesResponse> messageStreamIterator =
            jobService.get()
                .withDeadlineAfter(jobServerTimeout, TimeUnit.SECONDS)
                .getMessageStream(messageStreamRequest);
        while (messageStreamIterator.hasNext()) {
            JobMessage messageResponse = messageStreamIterator.next().getMessageResponse();
            if (messageResponse.getImportance() == JobMessage.MessageImportance.JOB_MESSAGE_ERROR) {
                throw new RuntimeException(
                    "The Runner experienced the following error during execution:\n"
                        + messageResponse.getMessageText());
            }
        }
    }
}

Usage

Execution monitoring is used after job submission to track pipeline progress, collect metrics, and detect completion or failure.

Typical Client Pattern

// User code
PipelineResult result = pipeline.run();

// Blocks until the job reaches a terminal state
State finalState = result.waitUntilFinish();

// Collect metrics after completion
MetricResults metrics = result.metrics();
MetricQueryResults queryResults = metrics.queryMetrics(MetricsFilter.builder().build());

Timed Waiting

For pipelines that may run indefinitely (streaming), clients can specify a timeout:

// Returns null on timeout, terminal State on completion
State state = result.waitUntilFinish(Duration.standardMinutes(30));
if (state == null) {
    // Timed out, pipeline is still running
    result.cancel();
}

Theoretical Basis

Execution Monitoring is based on the polling pattern for asynchronous job monitoring:

  • Pull-Based Monitoring -- The client acts as an observer, periodically checking state until completion. Pull-based monitoring (polling) is simpler and more reliable than push-based (callbacks) across network boundaries. It naturally handles network partitions: a missed poll simply delays detection rather than losing events.
  • Fixed-Interval Polling -- The 3-second interval balances responsiveness with efficiency. More frequent polling would waste network resources; less frequent polling would delay completion detection.
  • Terminal State Detection -- The isTerminal() predicate provides a clear stopping condition for the polling loop, preventing infinite loops on unexpected states.
  • Metric Deduplication -- The committed-wins-over-attempted strategy implements last-writer-wins conflict resolution, ensuring that the most accurate metric values are presented to the user.
  • Resource Cleanup -- The AutoCloseable pattern ensures that the gRPC channel and any loopback worker services are properly shut down when monitoring completes, regardless of whether the job succeeded or failed.

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment