Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Principle:Apache Beam Artifact Staging

From Leeroopedia


Property Value
Principle Name Artifact Staging
Category Pipeline_Orchestration, Packaging
Related Implementation Implementation:Apache_Beam_PortablePipelineJarCreator
Source Reference runners/java-job-service/.../PortablePipelineJarCreator.java
last_updated 2026-02-09 04:00 GMT

Overview

Artifact Staging is the process of packaging pipeline dependencies, artifacts, and the pipeline definition into a deployable format for remote execution. It bridges the gap between the client-side development environment and the runner-side execution environment.

Description

Artifact staging transfers pipeline dependencies from the client to the runner environment. Two primary approaches exist in the Beam framework:

Approach 1: gRPC-Based Staging

The standard approach uploads artifacts to a staging service endpoint using a session token obtained during job preparation. The ArtifactStagingService coordinates the transfer:

// From PortableRunner.run() at L193-206
try (CloseableResource<ManagedChannel> artifactChannel =
    CloseableResource.of(
        channelFactory.forDescriptor(artifactStagingEndpoint), ManagedChannel::shutdown)) {

    ArtifactStagingService.offer(
        new ArtifactRetrievalService(),
        ArtifactStagingServiceGrpc.newStub(artifactChannel.get()),
        stagingSessionToken);
}

In this approach:

  • The client opens a gRPC channel to the artifact staging endpoint returned by the prepare RPC.
  • The ArtifactRetrievalService reads artifacts from the client's local environment.
  • The ArtifactStagingService.offer() method streams each artifact to the staging server.
  • The staging session token authorizes the upload and associates artifacts with the correct preparation.

Approach 2: JAR-Based Staging

The JAR-based approach bundles the pipeline proto, options, and all classpath dependencies into a self-contained executable JAR. This enables offline pipeline packaging for later execution:

// PortablePipelineJarCreator.run() at L89-113
public PortablePipelineResult run(Pipeline pipeline, JobInfo jobInfo) throws Exception {
    // Creates a JAR containing:
    //   BEAM-PIPELINE/pipeline-manifest.json
    //   BEAM-PIPELINE/{jobName}/pipeline.json
    //   BEAM-PIPELINE/{jobName}/pipeline-options.json
    //   BEAM-PIPELINE/{jobName}/artifacts/...
    //   ...Java classes from main JAR...
}

Artifact Types

Artifact Type Description Staging Mechanism
SDK worker harness The SDK runtime that executes user transforms Environment dependency in pipeline proto
User code JARs Application code compiled into JAR files Classpath detection via detectClassPathResourcesToStage()
Additional JARs Extra dependencies specified via jar_packages experiment Experimental flag parsing
Explicitly staged files Files specified via PortablePipelineOptions.getFilesToStage() Direct user configuration

File Detection and Collection

Before staging, the runner collects all files that need to be staged:

// From PortableRunner.run() at L130-160
ImmutableList.Builder<String> filesToStageBuilder = ImmutableList.builder();
List<String> stagingFiles = options.as(PortablePipelineOptions.class).getFilesToStage();
if (stagingFiles == null) {
    List<String> classpathResources =
        detectClassPathResourcesToStage(Environments.class.getClassLoader(), options);
    filesToStageBuilder.addAll(classpathResources);
} else {
    filesToStageBuilder.addAll(stagingFiles);
}
// Also adds jar_packages from experimental flags

Usage

Artifact staging happens automatically during portable pipeline submission. The JAR creator path is used when creating offline pipeline packages.

gRPC Staging Workflow

  1. Job preparation returns an artifactStagingEndpoint and stagingSessionToken
  2. Client opens a gRPC channel to the staging endpoint
  3. Client streams each artifact to the staging service via ArtifactStagingService.offer()
  4. Staging service stores artifacts keyed by session token
  5. During job run, the server resolves dependencies from the staged artifacts

JAR Packaging Workflow

  1. PortablePipelineJarCreator.run() is invoked with the pipeline and job info
  2. A manifest is created with the main class for later execution
  3. Resources from the original JAR are copied to the output JAR
  4. Pipeline options are serialized as JSON into the JAR
  5. Each artifact is written into the JAR's BEAM-PIPELINE/{jobName}/artifacts/ directory
  6. The pipeline proto (with updated artifact references) is serialized as JSON into the JAR
  7. The resulting JAR can be executed independently via java -jar

Theoretical Basis

Artifact Staging is based on dependency injection and packaging patterns fundamental to distributed execution:

  • Dependency Closure -- All code and data needed for execution must be available on the runner. Artifact staging ensures the execution environment contains a complete closure of dependencies, analogous to how container images package all runtime dependencies.
  • Content-Addressable Storage -- In the gRPC staging path, artifacts are keyed by staging session tokens and environment IDs, enabling the server to associate the correct artifacts with each pipeline environment.
  • Self-Contained Packaging -- The JAR-based approach creates a self-contained executable unit, following the "fat JAR" or "uber JAR" pattern common in Java deployments. This ensures the pipeline can be executed without external dependency resolution.
  • Late Binding -- The two-step process (prepare then stage) enables late binding of artifact locations. The pipeline proto references abstract artifact descriptors, which are resolved to concrete staged locations during the run phase via resolveDependencies().

JAR Layout

The JAR-based staging produces the following directory structure:

META-INF/
  MANIFEST.MF                          (Main-Class, Manifest-Version)
BEAM-PIPELINE/
  pipeline-manifest.json               (defaultJobName)
  {jobName}/
    pipeline.json                      (RunnerApi.Pipeline as JSON)
    pipeline-options.json              (PipelineOptions as JSON Struct)
    artifacts/
      {uuid-1}                         (First artifact binary)
      {uuid-2}                         (Second artifact binary)
      ...
...Java classes from main JAR...

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment