Implementation:Apache Beam JobServicePipelineResult
| Property | Value |
|---|---|
| Implementation Name | JobServicePipelineResult |
| Category | Monitoring, Job_Management |
| Related Principle | Principle:Apache_Beam_Execution_Monitoring |
| Source File | runners/portability/java/src/main/java/org/apache/beam/runners/portability/JobServicePipelineResult.java |
| Also | PortableMetrics.java |
| Language | Java |
| last_updated | 2026-02-09 04:00 GMT |
Overview
JobServicePipelineResult is the concrete tool for monitoring portable pipeline execution via job service state polling and metrics collection. It implements both PipelineResult and AutoCloseable, providing the client-side interface for tracking job progress after submission.
Code Reference
Source Location
| Property | Value |
|---|---|
| Result File | runners/portability/java/src/main/java/org/apache/beam/runners/portability/JobServicePipelineResult.java
|
| Result Package | org.apache.beam.runners.portability
|
| Result Class | L43: class JobServicePipelineResult implements PipelineResult, AutoCloseable
|
| Result Lines | L43-212 |
| Metrics File | runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableMetrics.java
|
| Metrics Class | L59: public class PortableMetrics extends MetricResults
|
| Metrics Lines | L59-276 |
JobServicePipelineResult Signatures
class JobServicePipelineResult implements PipelineResult, AutoCloseable {
private static final long POLL_INTERVAL_MS = 3_000; // L45
// Constructor (L56-66)
JobServicePipelineResult(
ByteString jobId,
int jobServerTimeout,
CloseableResource<JobServiceBlockingStub> jobService,
Runnable cleanup);
// PipelineResult interface
@Override public State getState(); // L69-78
@Override public State cancel(); // L81-86
@Override public @Nullable State waitUntilFinish(Duration duration); // L90-108
@Override public State waitUntilFinish(); // L111-122
@Override public MetricResults metrics(); // L125-127
// AutoCloseable
@Override public void close(); // L130-141
// Internal
private void waitForTerminalState(); // L143-160
private void propagateErrors(); // L162-180
private static State getJavaState(JobApi.JobState.Enum protoState); // L182-211
}
PortableMetrics Signatures
public class PortableMetrics extends MetricResults {
// Factory method (L82-84)
public static PortableMetrics of(JobApi.MetricResults jobMetrics);
// MetricResults interface (L87-101)
@Override
public MetricQueryResults queryMetrics(MetricsFilter filter);
}
Import Statements
import org.apache.beam.runners.portability.JobServicePipelineResult;
import org.apache.beam.runners.portability.PortableMetrics;
I/O Contract
Inputs
| Input | Type | Source |
|---|---|---|
jobId |
ByteString |
From RunJobResponse.getJobIdBytes() after job submission
|
jobServerTimeout |
int |
From PortablePipelineOptions.getJobServerTimeout()
|
jobService |
CloseableResource<JobServiceBlockingStub> |
Transferred from PortableRunner.run() via .transfer()
|
cleanup |
Runnable |
Optional cleanup callback (e.g., stopping loopback worker service) |
Outputs
| Output | Type | Description |
|---|---|---|
| Terminal state | PipelineResult.State |
DONE, FAILED, CANCELLED, or UPDATED |
| Metrics | MetricResults |
Counters, distributions, gauges, string sets, bounded tries |
| Exception (on failure) | RuntimeException |
Contains error message from job service message stream |
State Polling
waitUntilFinish() (L111-122)
The primary blocking method that polls until the job reaches a terminal state:
@Override
public State waitUntilFinish() {
if (terminalState != null) {
return terminalState;
}
try {
waitForTerminalState();
propagateErrors();
return terminalState;
} finally {
close();
}
}
waitForTerminalState() (L143-160)
The core polling loop:
private void waitForTerminalState() {
JobServiceBlockingStub stub =
jobService.get().withDeadlineAfter(jobServerTimeout, TimeUnit.SECONDS);
GetJobStateRequest request =
GetJobStateRequest.newBuilder().setJobIdBytes(jobId).build();
JobStateEvent response = stub.getState(request);
State lastState = getJavaState(response.getState());
while (!lastState.isTerminal()) {
try {
Thread.sleep(POLL_INTERVAL_MS); // 3000ms
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
response = stub.withDeadlineAfter(jobServerTimeout, TimeUnit.SECONDS)
.getState(request);
lastState = getJavaState(response.getState());
}
terminalState = lastState;
}
waitUntilFinish(Duration) (L90-108)
Timed variant that returns null on timeout:
@Override
public @Nullable State waitUntilFinish(Duration duration) {
if (duration.compareTo(Duration.millis(1)) <= 0) {
return waitUntilFinish(); // Infinite timeout
} else {
CompletableFuture<State> result =
CompletableFuture.supplyAsync(this::waitUntilFinish);
try {
return result.get(duration.getMillis(), TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
return null; // Null indicates timeout
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
}
Error Propagation
When the job reaches a non-DONE terminal state, propagateErrors() at L162-180 retrieves error messages:
private void propagateErrors() {
if (terminalState != State.DONE) {
JobMessagesRequest messageStreamRequest =
JobMessagesRequest.newBuilder().setJobIdBytes(jobId).build();
Iterator<JobMessagesResponse> messageStreamIterator =
jobService.get()
.withDeadlineAfter(jobServerTimeout, TimeUnit.SECONDS)
.getMessageStream(messageStreamRequest);
while (messageStreamIterator.hasNext()) {
JobMessage messageResponse =
messageStreamIterator.next().getMessageResponse();
if (messageResponse.getImportance() ==
JobMessage.MessageImportance.JOB_MESSAGE_ERROR) {
throw new RuntimeException(
"The Runner experienced the following error during execution:\n"
+ messageResponse.getMessageText());
}
}
}
}
Metrics Collection
Fetching Metrics (L130-141)
Metrics are fetched during close() after the job reaches a terminal state:
@Override
public void close() {
try (CloseableResource<JobServiceBlockingStub> jobService = this.jobService) {
JobApi.GetJobMetricsRequest metricsRequest =
JobApi.GetJobMetricsRequest.newBuilder().setJobIdBytes(jobId).build();
jobMetrics = jobService.get().getJobMetrics(metricsRequest).getMetrics();
if (cleanup != null) {
cleanup.run();
}
} catch (Exception e) {
LOG.warn("Error cleaning up job service", e);
}
}
PortableMetrics Conversion
The PortableMetrics.of() method at L82-84 converts protobuf MonitoringInfo messages into SDK metric types:
public static PortableMetrics of(JobApi.MetricResults jobMetrics) {
return convertMonitoringInfosToMetricResults(jobMetrics);
}
The conversion process at L103-135:
- Collects all attempted
MonitoringInfomessages into a deduplication map - Overlays all committed
MonitoringInfomessages (committed wins over attempted) - Extracts counters (type
SUM_INT64_TYPE) - Extracts distributions (type
DISTRIBUTION_INT64_TYPE) - Extracts gauges (type
LATEST_INT64_TYPE) - Extracts string sets (type
SET_STRING_TYPE) - Extracts bounded tries (type
BOUNDED_TRIE_TYPE)
Each metric is identified by three labels: PTRANSFORM (step name), NAMESPACE, and NAME.
Querying Metrics
// PortableMetrics.queryMetrics() at L87-101
@Override
public MetricQueryResults queryMetrics(MetricsFilter filter) {
return MetricQueryResults.create(
Iterables.filter(this.counters,
(counter) -> MetricFiltering.matches(filter, counter.getKey())),
Iterables.filter(this.distributions,
(distribution) -> MetricFiltering.matches(filter, distribution.getKey())),
Iterables.filter(this.gauges,
(gauge) -> MetricFiltering.matches(filter, gauge.getKey())),
Iterables.filter(this.stringSets,
(stringSet) -> MetricFiltering.matches(filter, stringSet.getKey())),
Iterables.filter(this.boundedTries,
(boundedTries) -> MetricFiltering.matches(filter, boundedTries.getKey())),
Collections.emptyList());
}
State Mapping
The getJavaState() method at L182-211 maps protobuf states to SDK states:
| Proto State | Java State | Notes |
|---|---|---|
UNSPECIFIED |
UNKNOWN |
|
STOPPED |
STOPPED |
|
RUNNING |
RUNNING |
|
STARTING |
RUNNING |
Mapped to RUNNING (non-terminal) |
DONE |
DONE |
Terminal |
FAILED |
FAILED |
Terminal |
CANCELLED |
CANCELLED |
Terminal |
CANCELLING |
CANCELLED |
Mapped to CANCELLED (non-terminal in proto) |
UPDATED |
UPDATED |
Terminal |
DRAINING |
UNKNOWN |
TODO: correct mapping pending |
DRAINED |
UNKNOWN |
TODO: correct mapping pending |
Usage Examples
Basic Monitoring
// Returned by PortableRunner.run()
PipelineResult result = pipeline.run();
// Block until job completes
State finalState = result.waitUntilFinish();
System.out.println("Pipeline finished with state: " + finalState);
// Collect metrics
MetricResults metrics = result.metrics();
MetricQueryResults queryResults = metrics.queryMetrics(
MetricsFilter.builder()
.addNameFilter(MetricNameFilter.named("myNamespace", "myCounter"))
.build());
for (MetricResult<Long> counter : queryResults.getCounters()) {
System.out.println(counter.getName() + ": " + counter.getAttempted());
}
Timed Monitoring with Cancellation
PipelineResult result = pipeline.run();
// Wait up to 30 minutes
State state = result.waitUntilFinish(Duration.standardMinutes(30));
if (state == null) {
System.out.println("Pipeline still running after 30 minutes, cancelling...");
result.cancel();
}
Polling State Without Blocking
PipelineResult result = pipeline.run();
// Non-blocking state check
State currentState = result.getState();
System.out.println("Current state: " + currentState);
Related Pages
- Principle:Apache_Beam_Execution_Monitoring -- The principle this implementation realizes
- Implementation:Apache_Beam_PortableRunner_Run -- Creates and returns this result object at L221-222
- Implementation:Apache_Beam_JobServiceGrpc_Connection -- The gRPC connection used for state polling
- Implementation:Apache_Beam_InMemoryJobService_Run -- The server-side job whose state is being polled
- Environment:Apache_Beam_Portable_Runner_Environment -- Portability framework runtime with gRPC Job Service and Docker/loopback SDK harness.