Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Beam PortablePipelineJarCreator

From Leeroopedia


Property Value
Implementation Name PortablePipelineJarCreator
Category Pipeline_Orchestration, Packaging
Related Principle Principle:Apache_Beam_Artifact_Staging
Source File runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/PortablePipelineJarCreator.java
Also PortablePipelineJarUtils.java
Language Java
last_updated 2026-02-09 04:00 GMT

Overview

PortablePipelineJarCreator is the concrete tool for bundling a portable pipeline and its dependencies into an executable JAR file. It implements the PortablePipelineRunner interface but does not actually run the pipeline -- instead, it packages everything needed for later execution into a self-contained JAR.

Code Reference

Source Location

Property Value
Creator File runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/PortablePipelineJarCreator.java
Creator Package org.apache.beam.runners.jobsubmission
Creator Class L70: public class PortablePipelineJarCreator implements PortablePipelineRunner
Creator Lines L70-254
Utils File runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/PortablePipelineJarUtils.java
Utils Class L68: public abstract class PortablePipelineJarUtils
Utils Lines L68-140

PortablePipelineJarCreator Signatures

public class PortablePipelineJarCreator implements PortablePipelineRunner {

    @VisibleForTesting JarOutputStream outputStream;
    @VisibleForTesting WritableByteChannel outputChannel;

    // Constructor (L79-81)
    public PortablePipelineJarCreator(Class mainClass) {
        this.mainClass = mainClass;
    }

    // Core method: bundles pipeline into JAR (L89-113)
    @Override
    public PortablePipelineResult run(Pipeline pipeline, JobInfo jobInfo) throws Exception;

    // Helpers
    @VisibleForTesting
    Manifest createManifest(Class mainClass, String defaultJobName);              // L116-137
    @VisibleForTesting
    protected void copyResourcesFromJar(JarFile inputJar) throws IOException;    // L141-161
    @VisibleForTesting
    protected Pipeline writeArtifacts(Pipeline pipeline, String jobName)
        throws IOException;                                                       // L168-177
}

PortablePipelineJarUtils Signatures

public abstract class PortablePipelineJarUtils {

    private static final String ARTIFACT_FOLDER = "artifacts";                    // L69
    private static final String PIPELINE_FOLDER = "BEAM-PIPELINE";               // L70
    private static final String PIPELINE = "pipeline.json";                       // L71
    private static final String PIPELINE_OPTIONS = "pipeline-options.json";       // L72

    // Read pipeline from classpath (L101-105)
    public static Pipeline getPipelineFromClasspath(String jobName) throws IOException;

    // Read options from classpath (L107-111)
    public static Struct getPipelineOptionsFromClasspath(String jobName) throws IOException;

    // Get the default job name from manifest (L125-131)
    public static String getDefaultJobName() throws IOException;

    // Write the default job name to manifest (L133-139)
    public static void writeDefaultJobName(JarOutputStream outputStream, String jobName)
        throws IOException;

    // URI builders
    static String getPipelineUri(String jobName);                                 // L113-115
    static String getPipelineOptionsUri(String jobName);                          // L117-119
    static String getArtifactUri(String jobName, String artifactId);              // L121-123
}

Import Statements

import org.apache.beam.runners.jobsubmission.PortablePipelineJarCreator;
import org.apache.beam.runners.jobsubmission.PortablePipelineJarUtils;

I/O Contract

Inputs

Input Type Description
pipeline RunnerApi.Pipeline The translated pipeline protobuf
jobInfo JobInfo Job metadata including name, ID, and pipeline options
jobInfo.pipelineOptions() Struct Serialized pipeline options (must include outputExecutablePath)
mainClass Class The main class whose JAR provides base resources and entry point

Outputs

Output Type Description
JAR file on disk File Self-contained JAR at PortablePipelineOptions.getOutputExecutablePath()
Return value PortablePipelineResult A JarCreatorPipelineResult with state DONE

Run Method Walkthrough

The run() method at L89-113 performs the following steps:

Step 1: Extract Options and Output Path (L90-95)

PortablePipelineOptions pipelineOptions =
    PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions())
        .as(PortablePipelineOptions.class);

final String jobName = jobInfo.jobName();
File outputFile = new File(checkArgumentNotNull(pipelineOptions.getOutputExecutablePath()));

Step 2: Create JAR Output Stream with Manifest (L97-99)

outputStream =
    new JarOutputStream(new FileOutputStream(outputFile), createManifest(mainClass, jobName));
outputChannel = Channels.newChannel(outputStream);

The manifest includes Main-Class (if the main class has a valid main(String[]) method) and Manifest-Version: 1.0.

Step 3: Write Pipeline Manifest (L100)

PortablePipelineJarUtils.writeDefaultJobName(outputStream, jobName);

Writes BEAM-PIPELINE/pipeline-manifest.json containing the default job name.

Step 4: Copy Resources from Source JAR (L101-102)

copyResourcesFromJar(
    new JarFile(mainClass.getProtectionDomain().getCodeSource().getLocation().getPath()));

Copies all entries from the main class's JAR, deduplicating entries and skipping the old manifest.

Step 5: Write Pipeline Options as JSON (L103-105)

writeAsJson(
    PipelineOptionsTranslation.toProto(pipelineOptions),
    PortablePipelineJarUtils.getPipelineOptionsUri(jobName));

Writes to BEAM-PIPELINE/{jobName}/pipeline-options.json.

Step 6: Write Artifacts and Update Pipeline (L106)

Pipeline pipelineWithClasspathArtifacts = writeArtifacts(pipeline, jobName);

Each artifact from each environment's dependency list is:

  1. Written into the JAR at BEAM-PIPELINE/{jobName}/artifacts/{uuid}
  2. Replaced with a ClassLoaderFileSystem reference pointing to its JAR location

Step 7: Write Pipeline Proto as JSON (L107)

writeAsJson(pipelineWithClasspathArtifacts, PortablePipelineJarUtils.getPipelineUri(jobName));

Writes the modified pipeline (with updated artifact references) to BEAM-PIPELINE/{jobName}/pipeline.json.

Step 8: Close and Return (L109-112)

outputChannel.close();
LOG.info("Jar {} created successfully.", outputFile.getAbsolutePath());
return new JarCreatorPipelineResult();

JAR Layout

The resulting JAR has the following structure:

META-INF/
  MANIFEST.MF
    Main-Class: com.example.MyPipelineMain
    Manifest-Version: 1.0
BEAM-PIPELINE/
  pipeline-manifest.json
    {"defaultJobName": "my-job"}
  my-job/
    pipeline.json
    pipeline-options.json
    artifacts/
      a1b2c3d4-e5f6-...
      f7g8h9i0-j1k2-...
...Java classes from main JAR...

Reading Back from JAR

The PortablePipelineJarUtils class provides methods to read back the packaged pipeline at execution time:

// Read the default job name from the manifest
String jobName = PortablePipelineJarUtils.getDefaultJobName();

// Read the pipeline proto from the classpath
Pipeline pipeline = PortablePipelineJarUtils.getPipelineFromClasspath(jobName);

// Read the pipeline options from the classpath
Struct options = PortablePipelineJarUtils.getPipelineOptionsFromClasspath(jobName);

These methods use Class.getClassLoader().getResourceAsStream() to read resources from the JAR's classpath, then parse the JSON content into protobuf messages using JsonFormat.parser().

Usage Examples

Creating a Pipeline JAR

// The JarCreator is typically invoked by the job service, not directly
PortablePipelineJarCreator jarCreator = new PortablePipelineJarCreator(MyPipelineMain.class);

JobInfo jobInfo = JobInfo.create(
    "my-job-id",
    "my-job-name",
    "retrieval-token",
    PipelineOptionsTranslation.toProto(options));

PortablePipelineResult result = jarCreator.run(pipelineProto, jobInfo);
// result.getState() == State.DONE
// JAR is written to options.getOutputExecutablePath()

Executing the Packaged JAR

# The JAR is self-contained and executable
java -jar my-pipeline.jar

JarCreatorPipelineResult

The inner class JarCreatorPipelineResult at L222-253 implements PortablePipelineResult with trivial behavior:

private static class JarCreatorPipelineResult implements PortablePipelineResult {
    public State getState()                    { return State.DONE; }
    public State cancel()                      { return State.DONE; }
    public State waitUntilFinish(Duration d)   { return State.DONE; }
    public State waitUntilFinish()             { return State.DONE; }
    public MetricResults metrics()             { throw new UnsupportedOperationException(...); }
    public JobApi.MetricResults portableMetrics() {
        return JobApi.MetricResults.getDefaultInstance();
    }
}

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment