Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Beam DirectPipelineResult

From Leeroopedia


Field Value
Implementation Name DirectPipelineResult
Overview Concrete tool for collecting Direct Runner pipeline execution results and metrics.
Source runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java (inner class)
Implements Principle:Apache_Beam_Result_Collection
last_updated 2026-02-09 04:00 GMT

Code Reference

Source Location

File Lines Description
runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java L312-393 DirectPipelineResult inner class: implements PipelineResult
runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java L317-322 Constructor: initializes state to RUNNING
runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java L324-330 getState(): returns current state, delegating to executor if still running
runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java L332-335 metrics(): returns MetricResults from EvaluationContext
runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java L345-347 waitUntilFinish(): delegates to waitUntilFinish(Duration.ZERO)
runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java L350-357 cancel(): stops executor and returns terminal state
runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java L367-392 waitUntilFinish(Duration): blocks until completion or timeout, rethrows user exceptions
runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java L56-end DirectMetrics: MetricResults implementation for the Direct Runner

Signature

// DirectPipelineResult: inner class of DirectRunner
public static class DirectPipelineResult implements PipelineResult {
    private final PipelineExecutor executor;
    private final EvaluationContext evaluationContext;
    private State state;

    // Constructor (package-private, created by DirectRunner.run())
    private DirectPipelineResult(
        PipelineExecutor executor, EvaluationContext evaluationContext)

    // Get current pipeline state
    @Override
    public State getState()

    // Get pipeline metrics
    @Override
    public MetricResults metrics()

    // Block until pipeline completes (no timeout)
    @Override
    public State waitUntilFinish()

    // Block until pipeline completes or duration elapses
    @Override
    public State waitUntilFinish(Duration duration)

    // Cancel pipeline execution
    @Override
    public State cancel()
}

// DirectMetrics: MetricResults implementation
class DirectMetrics extends MetricResults {

    // Query metrics with a filter
    @Override
    public MetricQueryResults queryMetrics(MetricsFilter filter)
}

Import

import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.metrics.MetricQueryResults;

I/O Contract

Inputs

Parameter Type Description
executor PipelineExecutor The ExecutorServiceParallelExecutor instance driving pipeline execution. Provides getPipelineState(), waitUntilFinish(), and stop().
evaluationContext EvaluationContext The execution context containing metrics, bundle factories, and watermark state. Provides getMetrics() which returns a DirectMetrics instance.

Outputs

Output Type Description
Pipeline state State One of RUNNING, DONE, FAILED, or CANCELLED. Returned by getState() and waitUntilFinish().
Metric results MetricResults Queryable metrics via DirectMetrics. Supports filtering by metric name, namespace, and step. Returns MetricQueryResults containing counters, distributions, gauges, string sets, and bounded tries.
On failure PipelineExecutionException Thrown by waitUntilFinish() when the pipeline fails due to a user code exception. Wraps the original cause.

Usage Examples

Standard Blocking Execution

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;

Pipeline p = Pipeline.create(options);
// ... define pipeline transforms ...

// run() blocks until completion because blockOnRun defaults to true
PipelineResult result = p.run();

// Check state (already terminal at this point)
PipelineResult.State state = result.getState();
System.out.println("Pipeline finished with state: " + state);

Non-Blocking Execution with Explicit Wait

import org.apache.beam.runners.direct.DirectOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.joda.time.Duration;

DirectOptions options = PipelineOptionsFactory.create().as(DirectOptions.class);
options.setBlockOnRun(false);  // Don't block on run()

Pipeline p = Pipeline.create(options);
// ... define pipeline transforms ...

PipelineResult result = p.run();

// Do other work while pipeline runs...
System.out.println("Pipeline state: " + result.getState());  // likely RUNNING

// Now wait for completion (with optional timeout)
PipelineResult.State finalState = result.waitUntilFinish(Duration.standardMinutes(5));
if (finalState == null) {
    System.out.println("Pipeline did not finish within 5 minutes");
    result.cancel();
} else {
    System.out.println("Pipeline finished: " + finalState);
}

Collecting Metrics After Execution

import org.apache.beam.sdk.metrics.MetricQueryResults;
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.metrics.MetricsFilter;

PipelineResult result = pipeline.run();
result.waitUntilFinish();

// Query all metrics
MetricResults metrics = result.metrics();
MetricQueryResults queryResults = metrics.queryMetrics(MetricsFilter.builder().build());

// Iterate over counters
for (MetricResult<Long> counter : queryResults.getCounters()) {
    System.out.printf("Counter %s: attempted=%d, committed=%d%n",
        counter.getName(),
        counter.getAttempted(),
        counter.getCommitted());
}

// Iterate over distributions
for (MetricResult<DistributionResult> dist : queryResults.getDistributions()) {
    DistributionResult committed = dist.getCommitted();
    System.out.printf("Distribution %s: count=%d, sum=%d, min=%d, max=%d, mean=%.2f%n",
        dist.getName(),
        committed.getCount(), committed.getSum(),
        committed.getMin(), committed.getMax(), committed.getMean());
}

// Query specific metrics by name
MetricQueryResults filtered = metrics.queryMetrics(
    MetricsFilter.builder()
        .addNameFilter(MetricNameFilter.named("myNamespace", "myCounter"))
        .build());

Error Handling

import org.apache.beam.sdk.Pipeline.PipelineExecutionException;

Pipeline p = Pipeline.create(options);
// ... define pipeline with potentially failing user code ...

try {
    PipelineResult result = p.run();
    result.waitUntilFinish();
} catch (PipelineExecutionException e) {
    // The original user code exception is available via getCause()
    System.err.println("Pipeline failed: " + e.getCause().getMessage());
    e.getCause().printStackTrace();
}

DirectPipelineResult State Machine

                 +----------+
                 | RUNNING  |
                 +----+-----+
                      |
          +-----------+-----------+
          |           |           |
          v           v           v
     +--------+  +--------+  +-----------+
     |  DONE  |  | FAILED |  | CANCELLED |
     +--------+  +--------+  +-----------+
     (terminal)  (terminal)   (terminal)

State transitions:

  • RUNNING to DONE -- The QuiescenceDriver returns DriverState.SHUTDOWN (all work complete, all watermarks at positive infinity).
  • RUNNING to FAILED -- The QuiescenceDriver returns DriverState.FAILED (an unhandled exception occurred during bundle processing).
  • RUNNING to CANCELLED -- The user calls cancel(), which calls executor.stop(), setting the state via shutdownIfNecessary(State.CANCELLED).

Key Implementation Details

getState() Delegation

@Override
public State getState() {
    if (this.state == State.RUNNING) {
        // Delegate to executor for latest state
        this.state = executor.getPipelineState();
    }
    return state;
}

Once the state becomes terminal, getState() returns the cached terminal state without further delegation. This ensures stability after completion.

waitUntilFinish() Exception Handling

@Override
public State waitUntilFinish(Duration duration) {
    if (this.state.isTerminal()) {
        return this.state;  // Already finished
    }
    try {
        endState = executor.waitUntilFinish(duration);
    } catch (UserCodeException uce) {
        // Truncate stack trace and wrap in PipelineExecutionException
        throw new Pipeline.PipelineExecutionException(uce.getCause());
    } catch (Exception e) {
        if (e instanceof InterruptedException) {
            Thread.currentThread().interrupt();  // Preserve interrupt status
        }
        throw new RuntimeException(e);
    }
    if (endState != null) {
        this.state = endState;
    }
    return endState;  // null if timeout elapsed
}

DirectMetrics Details

The DirectMetrics class (in DirectMetrics.java) manages metric aggregation:

Method Description
queryMetrics(MetricsFilter filter) Queries accumulated metrics, returning MetricQueryResults with counters, distributions, gauges, string sets, and bounded tries. Supports filtering by name, namespace, and step.
commitPhysical(CommittedBundle, MetricUpdates) Commits physical (attempted) metric updates from a processed bundle.
commitLogical(CommittedBundle, MetricUpdates) Commits logical (committed) metric updates after a bundle is durably committed.

Metrics are aggregated using a DirectMetric inner class that maintains separate attempted and committed accumulators. The aggregation runs asynchronously on the metrics executor service to avoid blocking the main execution path.

Related Pages

Sources

  • Repo -- Apache Beam -- Source repository at runners/direct-java/.

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment