Implementation:Apache Beam PortablePipelineJarCreator
| Property | Value |
|---|---|
| Implementation Name | PortablePipelineJarCreator |
| Category | Pipeline_Orchestration, Packaging |
| Related Principle | Principle:Apache_Beam_Artifact_Staging |
| Source File | runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/PortablePipelineJarCreator.java |
| Also | PortablePipelineJarUtils.java |
| Language | Java |
| last_updated | 2026-02-09 04:00 GMT |
Overview
PortablePipelineJarCreator is the concrete tool for bundling a portable pipeline and its dependencies into an executable JAR file. It implements the PortablePipelineRunner interface but does not actually run the pipeline -- instead, it packages everything needed for later execution into a self-contained JAR.
Code Reference
Source Location
| Property | Value |
|---|---|
| Creator File | runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/PortablePipelineJarCreator.java
|
| Creator Package | org.apache.beam.runners.jobsubmission
|
| Creator Class | L70: public class PortablePipelineJarCreator implements PortablePipelineRunner
|
| Creator Lines | L70-254 |
| Utils File | runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/PortablePipelineJarUtils.java
|
| Utils Class | L68: public abstract class PortablePipelineJarUtils
|
| Utils Lines | L68-140 |
PortablePipelineJarCreator Signatures
public class PortablePipelineJarCreator implements PortablePipelineRunner {
@VisibleForTesting JarOutputStream outputStream;
@VisibleForTesting WritableByteChannel outputChannel;
// Constructor (L79-81)
public PortablePipelineJarCreator(Class mainClass) {
this.mainClass = mainClass;
}
// Core method: bundles pipeline into JAR (L89-113)
@Override
public PortablePipelineResult run(Pipeline pipeline, JobInfo jobInfo) throws Exception;
// Helpers
@VisibleForTesting
Manifest createManifest(Class mainClass, String defaultJobName); // L116-137
@VisibleForTesting
protected void copyResourcesFromJar(JarFile inputJar) throws IOException; // L141-161
@VisibleForTesting
protected Pipeline writeArtifacts(Pipeline pipeline, String jobName)
throws IOException; // L168-177
}
PortablePipelineJarUtils Signatures
public abstract class PortablePipelineJarUtils {
private static final String ARTIFACT_FOLDER = "artifacts"; // L69
private static final String PIPELINE_FOLDER = "BEAM-PIPELINE"; // L70
private static final String PIPELINE = "pipeline.json"; // L71
private static final String PIPELINE_OPTIONS = "pipeline-options.json"; // L72
// Read pipeline from classpath (L101-105)
public static Pipeline getPipelineFromClasspath(String jobName) throws IOException;
// Read options from classpath (L107-111)
public static Struct getPipelineOptionsFromClasspath(String jobName) throws IOException;
// Get the default job name from manifest (L125-131)
public static String getDefaultJobName() throws IOException;
// Write the default job name to manifest (L133-139)
public static void writeDefaultJobName(JarOutputStream outputStream, String jobName)
throws IOException;
// URI builders
static String getPipelineUri(String jobName); // L113-115
static String getPipelineOptionsUri(String jobName); // L117-119
static String getArtifactUri(String jobName, String artifactId); // L121-123
}
Import Statements
import org.apache.beam.runners.jobsubmission.PortablePipelineJarCreator;
import org.apache.beam.runners.jobsubmission.PortablePipelineJarUtils;
I/O Contract
Inputs
| Input | Type | Description |
|---|---|---|
pipeline |
RunnerApi.Pipeline |
The translated pipeline protobuf |
jobInfo |
JobInfo |
Job metadata including name, ID, and pipeline options |
jobInfo.pipelineOptions() |
Struct |
Serialized pipeline options (must include outputExecutablePath)
|
mainClass |
Class |
The main class whose JAR provides base resources and entry point |
Outputs
| Output | Type | Description |
|---|---|---|
| JAR file on disk | File | Self-contained JAR at PortablePipelineOptions.getOutputExecutablePath()
|
| Return value | PortablePipelineResult |
A JarCreatorPipelineResult with state DONE
|
Run Method Walkthrough
The run() method at L89-113 performs the following steps:
Step 1: Extract Options and Output Path (L90-95)
PortablePipelineOptions pipelineOptions =
PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions())
.as(PortablePipelineOptions.class);
final String jobName = jobInfo.jobName();
File outputFile = new File(checkArgumentNotNull(pipelineOptions.getOutputExecutablePath()));
Step 2: Create JAR Output Stream with Manifest (L97-99)
outputStream =
new JarOutputStream(new FileOutputStream(outputFile), createManifest(mainClass, jobName));
outputChannel = Channels.newChannel(outputStream);
The manifest includes Main-Class (if the main class has a valid main(String[]) method) and Manifest-Version: 1.0.
Step 3: Write Pipeline Manifest (L100)
PortablePipelineJarUtils.writeDefaultJobName(outputStream, jobName);
Writes BEAM-PIPELINE/pipeline-manifest.json containing the default job name.
Step 4: Copy Resources from Source JAR (L101-102)
copyResourcesFromJar(
new JarFile(mainClass.getProtectionDomain().getCodeSource().getLocation().getPath()));
Copies all entries from the main class's JAR, deduplicating entries and skipping the old manifest.
Step 5: Write Pipeline Options as JSON (L103-105)
writeAsJson(
PipelineOptionsTranslation.toProto(pipelineOptions),
PortablePipelineJarUtils.getPipelineOptionsUri(jobName));
Writes to BEAM-PIPELINE/{jobName}/pipeline-options.json.
Step 6: Write Artifacts and Update Pipeline (L106)
Pipeline pipelineWithClasspathArtifacts = writeArtifacts(pipeline, jobName);
Each artifact from each environment's dependency list is:
- Written into the JAR at
BEAM-PIPELINE/{jobName}/artifacts/{uuid} - Replaced with a
ClassLoaderFileSystemreference pointing to its JAR location
Step 7: Write Pipeline Proto as JSON (L107)
writeAsJson(pipelineWithClasspathArtifacts, PortablePipelineJarUtils.getPipelineUri(jobName));
Writes the modified pipeline (with updated artifact references) to BEAM-PIPELINE/{jobName}/pipeline.json.
Step 8: Close and Return (L109-112)
outputChannel.close();
LOG.info("Jar {} created successfully.", outputFile.getAbsolutePath());
return new JarCreatorPipelineResult();
JAR Layout
The resulting JAR has the following structure:
META-INF/
MANIFEST.MF
Main-Class: com.example.MyPipelineMain
Manifest-Version: 1.0
BEAM-PIPELINE/
pipeline-manifest.json
{"defaultJobName": "my-job"}
my-job/
pipeline.json
pipeline-options.json
artifacts/
a1b2c3d4-e5f6-...
f7g8h9i0-j1k2-...
...Java classes from main JAR...
Reading Back from JAR
The PortablePipelineJarUtils class provides methods to read back the packaged pipeline at execution time:
// Read the default job name from the manifest
String jobName = PortablePipelineJarUtils.getDefaultJobName();
// Read the pipeline proto from the classpath
Pipeline pipeline = PortablePipelineJarUtils.getPipelineFromClasspath(jobName);
// Read the pipeline options from the classpath
Struct options = PortablePipelineJarUtils.getPipelineOptionsFromClasspath(jobName);
These methods use Class.getClassLoader().getResourceAsStream() to read resources from the JAR's classpath, then parse the JSON content into protobuf messages using JsonFormat.parser().
Usage Examples
Creating a Pipeline JAR
// The JarCreator is typically invoked by the job service, not directly
PortablePipelineJarCreator jarCreator = new PortablePipelineJarCreator(MyPipelineMain.class);
JobInfo jobInfo = JobInfo.create(
"my-job-id",
"my-job-name",
"retrieval-token",
PipelineOptionsTranslation.toProto(options));
PortablePipelineResult result = jarCreator.run(pipelineProto, jobInfo);
// result.getState() == State.DONE
// JAR is written to options.getOutputExecutablePath()
Executing the Packaged JAR
# The JAR is self-contained and executable
java -jar my-pipeline.jar
JarCreatorPipelineResult
The inner class JarCreatorPipelineResult at L222-253 implements PortablePipelineResult with trivial behavior:
private static class JarCreatorPipelineResult implements PortablePipelineResult {
public State getState() { return State.DONE; }
public State cancel() { return State.DONE; }
public State waitUntilFinish(Duration d) { return State.DONE; }
public State waitUntilFinish() { return State.DONE; }
public MetricResults metrics() { throw new UnsupportedOperationException(...); }
public JobApi.MetricResults portableMetrics() {
return JobApi.MetricResults.getDefaultInstance();
}
}
Related Pages
- Principle:Apache_Beam_Artifact_Staging -- The principle this implementation realizes
- Implementation:Apache_Beam_InMemoryJobService_Prepare -- Preparation phase that precedes staging
- Implementation:Apache_Beam_PortableRunner_Run -- The gRPC-based staging alternative in PortableRunner
- Implementation:Apache_Beam_InMemoryJobService_Run -- Execution phase that consumes staged artifacts
- Environment:Apache_Beam_Portable_Runner_Environment -- Portability framework runtime with gRPC Job Service and Docker/loopback SDK harness.