Implementation:Apache Beam PortableRunner Run
| Property | Value |
|---|---|
| Implementation Name | PortableRunner Run |
| Category | Pipeline_Orchestration, Serialization |
| Related Principle | Principle:Apache_Beam_Pipeline_Translation |
| Source File | runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableRunner.java |
| Language | Java |
| last_updated | 2026-02-09 04:00 GMT |
Overview
PortableRunner is the concrete tool for translating and submitting a portable pipeline to a remote job service. It extends PipelineRunner<PipelineResult> and orchestrates the entire portable pipeline submission workflow: translation to protobuf, gRPC connection establishment, job preparation, artifact staging, and job execution.
Code Reference
Source Location
| Property | Value |
|---|---|
| File | runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableRunner.java
|
| Package | org.apache.beam.runners.portability
|
| Lines | L64-232 |
| Class Declaration | L64: public class PortableRunner extends PipelineRunner<PipelineResult>
|
Class Structure
public class PortableRunner extends PipelineRunner<PipelineResult> {
private final PipelineOptions options;
private final String endpoint;
private final ManagedChannelFactory channelFactory;
// Factory methods
public static PortableRunner fromOptions(PipelineOptions options); // L81-83
static PortableRunner create(PipelineOptions options,
ManagedChannelFactory channelFactory); // L86-93
// Private constructor
private PortableRunner(PipelineOptions options, String endpoint,
ManagedChannelFactory channelFactory); // L95-100
// Core execution
@Override
public PipelineResult run(Pipeline pipeline); // L103-226
}
Key Signatures
// Factory method: creates runner from PipelineOptions
public static PortableRunner fromOptions(PipelineOptions options) {
return create(options, ManagedChannelFactory.createDefault());
}
// Test-visible factory with injectable channel factory
@VisibleForTesting
static PortableRunner create(PipelineOptions options, ManagedChannelFactory channelFactory) {
PortablePipelineOptions portableOptions =
PipelineOptionsValidator.validate(PortablePipelineOptions.class, options);
String endpoint = portableOptions.getJobEndpoint();
return new PortableRunner(options, endpoint, channelFactory);
}
// Core method: translates, stages, and submits the pipeline
@Override
public PipelineResult run(Pipeline pipeline) // L103-226
Import Statement
import org.apache.beam.runners.portability.PortableRunner;
I/O Contract
Inputs
| Input | Type | Description |
|---|---|---|
pipeline |
Pipeline |
User-constructed Beam pipeline with transforms, coders, and windowing strategies |
options |
PipelineOptions |
Configuration including PortablePipelineOptions fields
|
options.jobEndpoint |
String |
gRPC endpoint of the job service (e.g., localhost:8099)
|
options.defaultEnvironmentType |
String |
Worker environment type: DOCKER, PROCESS, EXTERNAL, LOOPBACK
|
options.filesToStage |
List<String> |
Explicit files to stage; if null, classpath is auto-detected |
options.jobServerTimeout |
int |
Timeout in seconds for job service RPCs |
Outputs
| Output | Type | Description |
|---|---|---|
| Return value | PipelineResult |
A JobServicePipelineResult instance for monitoring the submitted job
|
| Side effect | RunnerApi.Pipeline protobuf |
Serialized pipeline sent to job service via prepare RPC |
| Side effect | Staged artifacts | Pipeline dependencies uploaded to staging service |
| Side effect | Running job | Job started on the remote runner backend |
Internal Workflow
The run() method executes the following steps in sequence:
Step 1: Loopback Worker Setup (L105-128)
If the environment type is LOOPBACK, a local ExternalWorkerService is started for in-process SDK harness execution:
if (Environments.ENVIRONMENT_LOOPBACK.equals(
options.as(PortablePipelineOptions.class).getDefaultEnvironmentType())) {
GrpcFnServer<ExternalWorkerService> workerService;
workerService = new ExternalWorkerService(options).start();
options.as(PortablePipelineOptions.class)
.setDefaultEnvironmentConfig(workerService.getApiServiceDescriptor().getUrl());
cleanup = () -> { workerService.close(); };
}
Step 2: Files-to-Stage Collection (L130-160)
Collects all files that need to be staged on the runner, from explicit configuration or classpath auto-detection:
List<String> stagingFiles = options.as(PortablePipelineOptions.class).getFilesToStage();
if (stagingFiles == null) {
List<String> classpathResources =
detectClassPathResourcesToStage(Environments.class.getClassLoader(), options);
filesToStageBuilder.addAll(classpathResources);
} else {
filesToStageBuilder.addAll(stagingFiles);
}
// Also parses jar_packages from experimental flags
Step 3: Pipeline Translation (L162-164)
Converts the in-memory Pipeline to a protobuf representation:
RunnerApi.Pipeline pipelineProto =
PipelineTranslation.toProto(pipeline, SdkComponents.create(options));
pipelineProto = DefaultArtifactResolver.INSTANCE.resolveArtifacts(pipelineProto);
Step 4: Prepare Request Construction (L166-171)
PrepareJobRequest prepareJobRequest =
PrepareJobRequest.newBuilder()
.setJobName(options.getJobName())
.setPipeline(pipelineProto)
.setPipelineOptions(PipelineOptionsTranslation.toProto(options))
.build();
Step 5: gRPC Connection and Prepare (L174-187)
ManagedChannel jobServiceChannel =
channelFactory.forDescriptor(ApiServiceDescriptor.newBuilder().setUrl(endpoint).build());
JobServiceBlockingStub jobService = JobServiceGrpc.newBlockingStub(jobServiceChannel);
PrepareJobResponse prepareJobResponse =
jobService.withDeadlineAfter(jobServerTimeout, TimeUnit.SECONDS)
.withWaitForReady()
.prepare(prepareJobRequest);
Step 6: Artifact Staging (L193-206)
ArtifactStagingService.offer(
new ArtifactRetrievalService(),
ArtifactStagingServiceGrpc.newStub(artifactChannel.get()),
stagingSessionToken);
Step 7: Run Job (L208-222)
RunJobRequest runJobRequest =
RunJobRequest.newBuilder()
.setPreparationId(prepareJobResponse.getPreparationId())
.build();
RunJobResponse runJobResponse = jobService.run(runJobRequest);
ByteString jobId = runJobResponse.getJobIdBytes();
return new JobServicePipelineResult(jobId, jobServerTimeout, wrappedJobService.transfer(), cleanup);
Usage Examples
Basic Portable Pipeline Submission
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
options.as(PortablePipelineOptions.class).setJobEndpoint("localhost:8099");
options.setRunner(PortableRunner.class);
Pipeline pipeline = Pipeline.create(options);
pipeline.apply("ReadLines", TextIO.read().from("input.txt"))
.apply("ToUpper", MapElements.via(new SimpleFunction<String, String>() {
@Override
public String apply(String input) {
return input.toUpperCase();
}
}))
.apply("WriteLines", TextIO.write().to("output"));
PipelineResult result = pipeline.run();
result.waitUntilFinish();
Loopback Environment for Testing
PipelineOptions options = PipelineOptionsFactory.create();
options.as(PortablePipelineOptions.class).setJobEndpoint("localhost:8099");
options.as(PortablePipelineOptions.class).setDefaultEnvironmentType("LOOPBACK");
options.setRunner(PortableRunner.class);
Pipeline pipeline = Pipeline.create(options);
// ... define pipeline ...
PipelineResult result = pipeline.run();
Related Pages
- Principle:Apache_Beam_Pipeline_Translation -- The principle this implementation realizes
- Implementation:Apache_Beam_JobServiceGrpc_Connection -- gRPC connection established at L174-177
- Implementation:Apache_Beam_InMemoryJobService_Prepare -- Server-side prepare handler invoked at L186
- Implementation:Apache_Beam_InMemoryJobService_Run -- Server-side run handler invoked at L216
- Implementation:Apache_Beam_JobServicePipelineResult -- Result object returned at L221-222
- Environment:Apache_Beam_Portable_Runner_Environment -- Portability framework runtime with gRPC Job Service and Docker/loopback SDK harness.