Implementation:Apache Beam DirectPipelineResult
Appearance
| Field | Value |
|---|---|
| Implementation Name | DirectPipelineResult |
| Overview | Concrete tool for collecting Direct Runner pipeline execution results and metrics. |
| Source | runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java (inner class)
|
| Implements | Principle:Apache_Beam_Result_Collection |
| last_updated | 2026-02-09 04:00 GMT |
Code Reference
Source Location
| File | Lines | Description |
|---|---|---|
runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java |
L312-393 | DirectPipelineResult inner class: implements PipelineResult
|
runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java |
L317-322 | Constructor: initializes state to RUNNING
|
runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java |
L324-330 | getState(): returns current state, delegating to executor if still running
|
runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java |
L332-335 | metrics(): returns MetricResults from EvaluationContext
|
runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java |
L345-347 | waitUntilFinish(): delegates to waitUntilFinish(Duration.ZERO)
|
runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java |
L350-357 | cancel(): stops executor and returns terminal state
|
runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java |
L367-392 | waitUntilFinish(Duration): blocks until completion or timeout, rethrows user exceptions
|
runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java |
L56-end | DirectMetrics: MetricResults implementation for the Direct Runner
|
Signature
// DirectPipelineResult: inner class of DirectRunner
public static class DirectPipelineResult implements PipelineResult {
private final PipelineExecutor executor;
private final EvaluationContext evaluationContext;
private State state;
// Constructor (package-private, created by DirectRunner.run())
private DirectPipelineResult(
PipelineExecutor executor, EvaluationContext evaluationContext)
// Get current pipeline state
@Override
public State getState()
// Get pipeline metrics
@Override
public MetricResults metrics()
// Block until pipeline completes (no timeout)
@Override
public State waitUntilFinish()
// Block until pipeline completes or duration elapses
@Override
public State waitUntilFinish(Duration duration)
// Cancel pipeline execution
@Override
public State cancel()
}
// DirectMetrics: MetricResults implementation
class DirectMetrics extends MetricResults {
// Query metrics with a filter
@Override
public MetricQueryResults queryMetrics(MetricsFilter filter)
}
Import
import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.metrics.MetricQueryResults;
I/O Contract
Inputs
| Parameter | Type | Description |
|---|---|---|
executor |
PipelineExecutor |
The ExecutorServiceParallelExecutor instance driving pipeline execution. Provides getPipelineState(), waitUntilFinish(), and stop().
|
evaluationContext |
EvaluationContext |
The execution context containing metrics, bundle factories, and watermark state. Provides getMetrics() which returns a DirectMetrics instance.
|
Outputs
| Output | Type | Description |
|---|---|---|
| Pipeline state | State |
One of RUNNING, DONE, FAILED, or CANCELLED. Returned by getState() and waitUntilFinish().
|
| Metric results | MetricResults |
Queryable metrics via DirectMetrics. Supports filtering by metric name, namespace, and step. Returns MetricQueryResults containing counters, distributions, gauges, string sets, and bounded tries.
|
| On failure | PipelineExecutionException |
Thrown by waitUntilFinish() when the pipeline fails due to a user code exception. Wraps the original cause.
|
Usage Examples
Standard Blocking Execution
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
Pipeline p = Pipeline.create(options);
// ... define pipeline transforms ...
// run() blocks until completion because blockOnRun defaults to true
PipelineResult result = p.run();
// Check state (already terminal at this point)
PipelineResult.State state = result.getState();
System.out.println("Pipeline finished with state: " + state);
Non-Blocking Execution with Explicit Wait
import org.apache.beam.runners.direct.DirectOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.joda.time.Duration;
DirectOptions options = PipelineOptionsFactory.create().as(DirectOptions.class);
options.setBlockOnRun(false); // Don't block on run()
Pipeline p = Pipeline.create(options);
// ... define pipeline transforms ...
PipelineResult result = p.run();
// Do other work while pipeline runs...
System.out.println("Pipeline state: " + result.getState()); // likely RUNNING
// Now wait for completion (with optional timeout)
PipelineResult.State finalState = result.waitUntilFinish(Duration.standardMinutes(5));
if (finalState == null) {
System.out.println("Pipeline did not finish within 5 minutes");
result.cancel();
} else {
System.out.println("Pipeline finished: " + finalState);
}
Collecting Metrics After Execution
import org.apache.beam.sdk.metrics.MetricQueryResults;
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.metrics.MetricsFilter;
PipelineResult result = pipeline.run();
result.waitUntilFinish();
// Query all metrics
MetricResults metrics = result.metrics();
MetricQueryResults queryResults = metrics.queryMetrics(MetricsFilter.builder().build());
// Iterate over counters
for (MetricResult<Long> counter : queryResults.getCounters()) {
System.out.printf("Counter %s: attempted=%d, committed=%d%n",
counter.getName(),
counter.getAttempted(),
counter.getCommitted());
}
// Iterate over distributions
for (MetricResult<DistributionResult> dist : queryResults.getDistributions()) {
DistributionResult committed = dist.getCommitted();
System.out.printf("Distribution %s: count=%d, sum=%d, min=%d, max=%d, mean=%.2f%n",
dist.getName(),
committed.getCount(), committed.getSum(),
committed.getMin(), committed.getMax(), committed.getMean());
}
// Query specific metrics by name
MetricQueryResults filtered = metrics.queryMetrics(
MetricsFilter.builder()
.addNameFilter(MetricNameFilter.named("myNamespace", "myCounter"))
.build());
Error Handling
import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
Pipeline p = Pipeline.create(options);
// ... define pipeline with potentially failing user code ...
try {
PipelineResult result = p.run();
result.waitUntilFinish();
} catch (PipelineExecutionException e) {
// The original user code exception is available via getCause()
System.err.println("Pipeline failed: " + e.getCause().getMessage());
e.getCause().printStackTrace();
}
DirectPipelineResult State Machine
+----------+
| RUNNING |
+----+-----+
|
+-----------+-----------+
| | |
v v v
+--------+ +--------+ +-----------+
| DONE | | FAILED | | CANCELLED |
+--------+ +--------+ +-----------+
(terminal) (terminal) (terminal)
State transitions:
- RUNNING to DONE -- The
QuiescenceDriverreturnsDriverState.SHUTDOWN(all work complete, all watermarks at positive infinity). - RUNNING to FAILED -- The
QuiescenceDriverreturnsDriverState.FAILED(an unhandled exception occurred during bundle processing). - RUNNING to CANCELLED -- The user calls
cancel(), which callsexecutor.stop(), setting the state viashutdownIfNecessary(State.CANCELLED).
Key Implementation Details
getState() Delegation
@Override
public State getState() {
if (this.state == State.RUNNING) {
// Delegate to executor for latest state
this.state = executor.getPipelineState();
}
return state;
}
Once the state becomes terminal, getState() returns the cached terminal state without further delegation. This ensures stability after completion.
waitUntilFinish() Exception Handling
@Override
public State waitUntilFinish(Duration duration) {
if (this.state.isTerminal()) {
return this.state; // Already finished
}
try {
endState = executor.waitUntilFinish(duration);
} catch (UserCodeException uce) {
// Truncate stack trace and wrap in PipelineExecutionException
throw new Pipeline.PipelineExecutionException(uce.getCause());
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt(); // Preserve interrupt status
}
throw new RuntimeException(e);
}
if (endState != null) {
this.state = endState;
}
return endState; // null if timeout elapsed
}
DirectMetrics Details
The DirectMetrics class (in DirectMetrics.java) manages metric aggregation:
| Method | Description |
|---|---|
queryMetrics(MetricsFilter filter) |
Queries accumulated metrics, returning MetricQueryResults with counters, distributions, gauges, string sets, and bounded tries. Supports filtering by name, namespace, and step.
|
commitPhysical(CommittedBundle, MetricUpdates) |
Commits physical (attempted) metric updates from a processed bundle. |
commitLogical(CommittedBundle, MetricUpdates) |
Commits logical (committed) metric updates after a bundle is durably committed. |
Metrics are aggregated using a DirectMetric inner class that maintains separate attempted and committed accumulators. The aggregation runs asynchronously on the metrics executor service to avoid blocking the main execution path.
Related Pages
- Principle:Apache_Beam_Result_Collection -- The principle of collecting pipeline execution results and metrics.
- Environment:Apache_Beam_Java_Build_Environment -- Java build environment with JDK 8+, Gradle, and multi-language toolchain.
- Heuristic:Apache_Beam_Executor_Shutdown_Ordering -- Graceful shutdown ordering for pipeline result finalization.
Sources
- Repo -- Apache Beam -- Source repository at
runners/direct-java/.
Page Connections
Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment