Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Beam JobServicePipelineResult

From Leeroopedia


Property Value
Implementation Name JobServicePipelineResult
Category Monitoring, Job_Management
Related Principle Principle:Apache_Beam_Execution_Monitoring
Source File runners/portability/java/src/main/java/org/apache/beam/runners/portability/JobServicePipelineResult.java
Also PortableMetrics.java
Language Java
last_updated 2026-02-09 04:00 GMT

Overview

JobServicePipelineResult is the concrete tool for monitoring portable pipeline execution via job service state polling and metrics collection. It implements both PipelineResult and AutoCloseable, providing the client-side interface for tracking job progress after submission.

Code Reference

Source Location

Property Value
Result File runners/portability/java/src/main/java/org/apache/beam/runners/portability/JobServicePipelineResult.java
Result Package org.apache.beam.runners.portability
Result Class L43: class JobServicePipelineResult implements PipelineResult, AutoCloseable
Result Lines L43-212
Metrics File runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableMetrics.java
Metrics Class L59: public class PortableMetrics extends MetricResults
Metrics Lines L59-276

JobServicePipelineResult Signatures

class JobServicePipelineResult implements PipelineResult, AutoCloseable {

    private static final long POLL_INTERVAL_MS = 3_000;                       // L45

    // Constructor (L56-66)
    JobServicePipelineResult(
        ByteString jobId,
        int jobServerTimeout,
        CloseableResource<JobServiceBlockingStub> jobService,
        Runnable cleanup);

    // PipelineResult interface
    @Override public State getState();                                         // L69-78
    @Override public State cancel();                                           // L81-86
    @Override public @Nullable State waitUntilFinish(Duration duration);       // L90-108
    @Override public State waitUntilFinish();                                  // L111-122
    @Override public MetricResults metrics();                                  // L125-127

    // AutoCloseable
    @Override public void close();                                             // L130-141

    // Internal
    private void waitForTerminalState();                                       // L143-160
    private void propagateErrors();                                            // L162-180
    private static State getJavaState(JobApi.JobState.Enum protoState);        // L182-211
}

PortableMetrics Signatures

public class PortableMetrics extends MetricResults {

    // Factory method (L82-84)
    public static PortableMetrics of(JobApi.MetricResults jobMetrics);

    // MetricResults interface (L87-101)
    @Override
    public MetricQueryResults queryMetrics(MetricsFilter filter);
}

Import Statements

import org.apache.beam.runners.portability.JobServicePipelineResult;
import org.apache.beam.runners.portability.PortableMetrics;

I/O Contract

Inputs

Input Type Source
jobId ByteString From RunJobResponse.getJobIdBytes() after job submission
jobServerTimeout int From PortablePipelineOptions.getJobServerTimeout()
jobService CloseableResource<JobServiceBlockingStub> Transferred from PortableRunner.run() via .transfer()
cleanup Runnable Optional cleanup callback (e.g., stopping loopback worker service)

Outputs

Output Type Description
Terminal state PipelineResult.State DONE, FAILED, CANCELLED, or UPDATED
Metrics MetricResults Counters, distributions, gauges, string sets, bounded tries
Exception (on failure) RuntimeException Contains error message from job service message stream

State Polling

waitUntilFinish() (L111-122)

The primary blocking method that polls until the job reaches a terminal state:

@Override
public State waitUntilFinish() {
    if (terminalState != null) {
        return terminalState;
    }
    try {
        waitForTerminalState();
        propagateErrors();
        return terminalState;
    } finally {
        close();
    }
}

waitForTerminalState() (L143-160)

The core polling loop:

private void waitForTerminalState() {
    JobServiceBlockingStub stub =
        jobService.get().withDeadlineAfter(jobServerTimeout, TimeUnit.SECONDS);
    GetJobStateRequest request =
        GetJobStateRequest.newBuilder().setJobIdBytes(jobId).build();
    JobStateEvent response = stub.getState(request);
    State lastState = getJavaState(response.getState());
    while (!lastState.isTerminal()) {
        try {
            Thread.sleep(POLL_INTERVAL_MS);  // 3000ms
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
        response = stub.withDeadlineAfter(jobServerTimeout, TimeUnit.SECONDS)
                       .getState(request);
        lastState = getJavaState(response.getState());
    }
    terminalState = lastState;
}

waitUntilFinish(Duration) (L90-108)

Timed variant that returns null on timeout:

@Override
public @Nullable State waitUntilFinish(Duration duration) {
    if (duration.compareTo(Duration.millis(1)) <= 0) {
        return waitUntilFinish();  // Infinite timeout
    } else {
        CompletableFuture<State> result =
            CompletableFuture.supplyAsync(this::waitUntilFinish);
        try {
            return result.get(duration.getMillis(), TimeUnit.MILLISECONDS);
        } catch (TimeoutException e) {
            return null;  // Null indicates timeout
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        } catch (ExecutionException e) {
            throw new RuntimeException(e);
        }
    }
}

Error Propagation

When the job reaches a non-DONE terminal state, propagateErrors() at L162-180 retrieves error messages:

private void propagateErrors() {
    if (terminalState != State.DONE) {
        JobMessagesRequest messageStreamRequest =
            JobMessagesRequest.newBuilder().setJobIdBytes(jobId).build();
        Iterator<JobMessagesResponse> messageStreamIterator =
            jobService.get()
                .withDeadlineAfter(jobServerTimeout, TimeUnit.SECONDS)
                .getMessageStream(messageStreamRequest);
        while (messageStreamIterator.hasNext()) {
            JobMessage messageResponse =
                messageStreamIterator.next().getMessageResponse();
            if (messageResponse.getImportance() ==
                    JobMessage.MessageImportance.JOB_MESSAGE_ERROR) {
                throw new RuntimeException(
                    "The Runner experienced the following error during execution:\n"
                        + messageResponse.getMessageText());
            }
        }
    }
}

Metrics Collection

Fetching Metrics (L130-141)

Metrics are fetched during close() after the job reaches a terminal state:

@Override
public void close() {
    try (CloseableResource<JobServiceBlockingStub> jobService = this.jobService) {
        JobApi.GetJobMetricsRequest metricsRequest =
            JobApi.GetJobMetricsRequest.newBuilder().setJobIdBytes(jobId).build();
        jobMetrics = jobService.get().getJobMetrics(metricsRequest).getMetrics();
        if (cleanup != null) {
            cleanup.run();
        }
    } catch (Exception e) {
        LOG.warn("Error cleaning up job service", e);
    }
}

PortableMetrics Conversion

The PortableMetrics.of() method at L82-84 converts protobuf MonitoringInfo messages into SDK metric types:

public static PortableMetrics of(JobApi.MetricResults jobMetrics) {
    return convertMonitoringInfosToMetricResults(jobMetrics);
}

The conversion process at L103-135:

  1. Collects all attempted MonitoringInfo messages into a deduplication map
  2. Overlays all committed MonitoringInfo messages (committed wins over attempted)
  3. Extracts counters (type SUM_INT64_TYPE)
  4. Extracts distributions (type DISTRIBUTION_INT64_TYPE)
  5. Extracts gauges (type LATEST_INT64_TYPE)
  6. Extracts string sets (type SET_STRING_TYPE)
  7. Extracts bounded tries (type BOUNDED_TRIE_TYPE)

Each metric is identified by three labels: PTRANSFORM (step name), NAMESPACE, and NAME.

Querying Metrics

// PortableMetrics.queryMetrics() at L87-101
@Override
public MetricQueryResults queryMetrics(MetricsFilter filter) {
    return MetricQueryResults.create(
        Iterables.filter(this.counters,
            (counter) -> MetricFiltering.matches(filter, counter.getKey())),
        Iterables.filter(this.distributions,
            (distribution) -> MetricFiltering.matches(filter, distribution.getKey())),
        Iterables.filter(this.gauges,
            (gauge) -> MetricFiltering.matches(filter, gauge.getKey())),
        Iterables.filter(this.stringSets,
            (stringSet) -> MetricFiltering.matches(filter, stringSet.getKey())),
        Iterables.filter(this.boundedTries,
            (boundedTries) -> MetricFiltering.matches(filter, boundedTries.getKey())),
        Collections.emptyList());
}

State Mapping

The getJavaState() method at L182-211 maps protobuf states to SDK states:

Proto State Java State Notes
UNSPECIFIED UNKNOWN
STOPPED STOPPED
RUNNING RUNNING
STARTING RUNNING Mapped to RUNNING (non-terminal)
DONE DONE Terminal
FAILED FAILED Terminal
CANCELLED CANCELLED Terminal
CANCELLING CANCELLED Mapped to CANCELLED (non-terminal in proto)
UPDATED UPDATED Terminal
DRAINING UNKNOWN TODO: correct mapping pending
DRAINED UNKNOWN TODO: correct mapping pending

Usage Examples

Basic Monitoring

// Returned by PortableRunner.run()
PipelineResult result = pipeline.run();

// Block until job completes
State finalState = result.waitUntilFinish();
System.out.println("Pipeline finished with state: " + finalState);

// Collect metrics
MetricResults metrics = result.metrics();
MetricQueryResults queryResults = metrics.queryMetrics(
    MetricsFilter.builder()
        .addNameFilter(MetricNameFilter.named("myNamespace", "myCounter"))
        .build());

for (MetricResult<Long> counter : queryResults.getCounters()) {
    System.out.println(counter.getName() + ": " + counter.getAttempted());
}

Timed Monitoring with Cancellation

PipelineResult result = pipeline.run();

// Wait up to 30 minutes
State state = result.waitUntilFinish(Duration.standardMinutes(30));
if (state == null) {
    System.out.println("Pipeline still running after 30 minutes, cancelling...");
    result.cancel();
}

Polling State Without Blocking

PipelineResult result = pipeline.run();

// Non-blocking state check
State currentState = result.getState();
System.out.println("Current state: " + currentState);

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment