Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Beam PortableRunner Run

From Leeroopedia


Property Value
Implementation Name PortableRunner Run
Category Pipeline_Orchestration, Serialization
Related Principle Principle:Apache_Beam_Pipeline_Translation
Source File runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableRunner.java
Language Java
last_updated 2026-02-09 04:00 GMT

Overview

PortableRunner is the concrete tool for translating and submitting a portable pipeline to a remote job service. It extends PipelineRunner<PipelineResult> and orchestrates the entire portable pipeline submission workflow: translation to protobuf, gRPC connection establishment, job preparation, artifact staging, and job execution.

Code Reference

Source Location

Property Value
File runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableRunner.java
Package org.apache.beam.runners.portability
Lines L64-232
Class Declaration L64: public class PortableRunner extends PipelineRunner<PipelineResult>

Class Structure

public class PortableRunner extends PipelineRunner<PipelineResult> {

    private final PipelineOptions options;
    private final String endpoint;
    private final ManagedChannelFactory channelFactory;

    // Factory methods
    public static PortableRunner fromOptions(PipelineOptions options);          // L81-83
    static PortableRunner create(PipelineOptions options,
                                  ManagedChannelFactory channelFactory);        // L86-93

    // Private constructor
    private PortableRunner(PipelineOptions options, String endpoint,
                           ManagedChannelFactory channelFactory);              // L95-100

    // Core execution
    @Override
    public PipelineResult run(Pipeline pipeline);                              // L103-226
}

Key Signatures

// Factory method: creates runner from PipelineOptions
public static PortableRunner fromOptions(PipelineOptions options) {
    return create(options, ManagedChannelFactory.createDefault());
}

// Test-visible factory with injectable channel factory
@VisibleForTesting
static PortableRunner create(PipelineOptions options, ManagedChannelFactory channelFactory) {
    PortablePipelineOptions portableOptions =
        PipelineOptionsValidator.validate(PortablePipelineOptions.class, options);
    String endpoint = portableOptions.getJobEndpoint();
    return new PortableRunner(options, endpoint, channelFactory);
}

// Core method: translates, stages, and submits the pipeline
@Override
public PipelineResult run(Pipeline pipeline)   // L103-226

Import Statement

import org.apache.beam.runners.portability.PortableRunner;

I/O Contract

Inputs

Input Type Description
pipeline Pipeline User-constructed Beam pipeline with transforms, coders, and windowing strategies
options PipelineOptions Configuration including PortablePipelineOptions fields
options.jobEndpoint String gRPC endpoint of the job service (e.g., localhost:8099)
options.defaultEnvironmentType String Worker environment type: DOCKER, PROCESS, EXTERNAL, LOOPBACK
options.filesToStage List<String> Explicit files to stage; if null, classpath is auto-detected
options.jobServerTimeout int Timeout in seconds for job service RPCs

Outputs

Output Type Description
Return value PipelineResult A JobServicePipelineResult instance for monitoring the submitted job
Side effect RunnerApi.Pipeline protobuf Serialized pipeline sent to job service via prepare RPC
Side effect Staged artifacts Pipeline dependencies uploaded to staging service
Side effect Running job Job started on the remote runner backend

Internal Workflow

The run() method executes the following steps in sequence:

Step 1: Loopback Worker Setup (L105-128)

If the environment type is LOOPBACK, a local ExternalWorkerService is started for in-process SDK harness execution:

if (Environments.ENVIRONMENT_LOOPBACK.equals(
        options.as(PortablePipelineOptions.class).getDefaultEnvironmentType())) {
    GrpcFnServer<ExternalWorkerService> workerService;
    workerService = new ExternalWorkerService(options).start();
    options.as(PortablePipelineOptions.class)
           .setDefaultEnvironmentConfig(workerService.getApiServiceDescriptor().getUrl());
    cleanup = () -> { workerService.close(); };
}

Step 2: Files-to-Stage Collection (L130-160)

Collects all files that need to be staged on the runner, from explicit configuration or classpath auto-detection:

List<String> stagingFiles = options.as(PortablePipelineOptions.class).getFilesToStage();
if (stagingFiles == null) {
    List<String> classpathResources =
        detectClassPathResourcesToStage(Environments.class.getClassLoader(), options);
    filesToStageBuilder.addAll(classpathResources);
} else {
    filesToStageBuilder.addAll(stagingFiles);
}
// Also parses jar_packages from experimental flags

Step 3: Pipeline Translation (L162-164)

Converts the in-memory Pipeline to a protobuf representation:

RunnerApi.Pipeline pipelineProto =
    PipelineTranslation.toProto(pipeline, SdkComponents.create(options));
pipelineProto = DefaultArtifactResolver.INSTANCE.resolveArtifacts(pipelineProto);

Step 4: Prepare Request Construction (L166-171)

PrepareJobRequest prepareJobRequest =
    PrepareJobRequest.newBuilder()
        .setJobName(options.getJobName())
        .setPipeline(pipelineProto)
        .setPipelineOptions(PipelineOptionsTranslation.toProto(options))
        .build();

Step 5: gRPC Connection and Prepare (L174-187)

ManagedChannel jobServiceChannel =
    channelFactory.forDescriptor(ApiServiceDescriptor.newBuilder().setUrl(endpoint).build());
JobServiceBlockingStub jobService = JobServiceGrpc.newBlockingStub(jobServiceChannel);

PrepareJobResponse prepareJobResponse =
    jobService.withDeadlineAfter(jobServerTimeout, TimeUnit.SECONDS)
        .withWaitForReady()
        .prepare(prepareJobRequest);

Step 6: Artifact Staging (L193-206)

ArtifactStagingService.offer(
    new ArtifactRetrievalService(),
    ArtifactStagingServiceGrpc.newStub(artifactChannel.get()),
    stagingSessionToken);

Step 7: Run Job (L208-222)

RunJobRequest runJobRequest =
    RunJobRequest.newBuilder()
        .setPreparationId(prepareJobResponse.getPreparationId())
        .build();
RunJobResponse runJobResponse = jobService.run(runJobRequest);
ByteString jobId = runJobResponse.getJobIdBytes();
return new JobServicePipelineResult(jobId, jobServerTimeout, wrappedJobService.transfer(), cleanup);

Usage Examples

Basic Portable Pipeline Submission

PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
options.as(PortablePipelineOptions.class).setJobEndpoint("localhost:8099");
options.setRunner(PortableRunner.class);

Pipeline pipeline = Pipeline.create(options);
pipeline.apply("ReadLines", TextIO.read().from("input.txt"))
        .apply("ToUpper", MapElements.via(new SimpleFunction<String, String>() {
            @Override
            public String apply(String input) {
                return input.toUpperCase();
            }
        }))
        .apply("WriteLines", TextIO.write().to("output"));

PipelineResult result = pipeline.run();
result.waitUntilFinish();

Loopback Environment for Testing

PipelineOptions options = PipelineOptionsFactory.create();
options.as(PortablePipelineOptions.class).setJobEndpoint("localhost:8099");
options.as(PortablePipelineOptions.class).setDefaultEnvironmentType("LOOPBACK");
options.setRunner(PortableRunner.class);

Pipeline pipeline = Pipeline.create(options);
// ... define pipeline ...
PipelineResult result = pipeline.run();

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment