Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Beam Twister2Runner Run

From Leeroopedia


Attribute Value
Implementation Name Twister2Runner Run
Domain Pipeline_Orchestration, HPC
Overview Concrete tool for applying transform overrides and initiating pipeline execution on Twister2
Deprecation Notice The Twister2 Runner is deprecated (annotated @Deprecated) and scheduled for removal in Apache Beam 3.0
last_updated 2026-02-09 04:00 GMT

Overview

Twister2Runner.run() is the central entry point for executing a Beam pipeline on the Twister2 HPC runtime. It orchestrates the entire pipeline execution lifecycle: applying transform overrides, translating the pipeline to a TSet DAG, packaging classpath dependencies, building the Twister2 job descriptor, and submitting the job for execution. The runner supports both local mode (no Twister2 installation required) and cluster mode (standalone, nomad, kubernetes, mesos).

Note: The Twister2 Runner is deprecated and is scheduled for removal in Apache Beam 3.0. Users should plan migration to an actively maintained runner.

Code Reference

Source Location

File Lines Repository
runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2Runner.java L72-358 GitHub

Signature

@SuppressWarnings({"rawtypes", "nullness"})
@Deprecated
public class Twister2Runner extends PipelineRunner<PipelineResult> {

  private final Twister2PipelineOptions options;

  protected Twister2Runner(Twister2PipelineOptions options) {
    this.options = options;
  }

  public static Twister2Runner fromOptions(PipelineOptions options) {
    return new Twister2Runner(
        PipelineOptionsValidator.validate(Twister2PipelineOptions.class, options));
  }

  @Override
  public PipelineResult run(Pipeline pipeline) {
    // 1. Create execution environment
    Twister2PipelineExecutionEnvironment env =
        new Twister2PipelineExecutionEnvironment(options);

    // 2. Apply transform overrides
    pipeline.replaceAll(getDefaultOverrides());

    // 3. Convert SDF reads if not using beam_fn_api
    if (!ExperimentalOptions.hasExperiment(
            pipeline.getOptions(), "beam_fn_api")) {
      SplittableParDo
          .convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline);
    }

    // 4. Translate pipeline to TSet DAG
    env.translate(pipeline);

    // 5. Setup system (classpath packaging, config validation)
    setupSystem(options);

    // 6. Build job configuration
    JobConfig jobConfig = new JobConfig();
    // ... populates sideInputs, leaves, graph ...

    // 7. Build Twister2Job
    Twister2Job twister2Job = Twister2Job.newBuilder()
        .setJobName(options.getJobName())
        .setWorkerClass(BeamBatchWorker.class)
        .addComputeResource(
            options.getWorkerCPUs(),
            options.getRamMegaBytes(), workers)
        .setConfig(jobConfig)
        .build();

    // 8. Submit job
    Twister2JobState jobState;
    if (isLocalMode(options)) {
      jobState = LocalSubmitter.submitJob(twister2Job, config);
    } else {
      jobState = Twister2Submitter.submitJob(twister2Job, config);
    }

    return new Twister2PipelineResult(jobState);
  }
}

Import Statement

import org.apache.beam.runners.twister2.Twister2Runner;

I/O Contract

Inputs

Input Type Description
pipeline Pipeline User-constructed Beam pipeline with original SDK transforms
options Twister2PipelineOptions Validated configuration including parallelism, cluster type, Twister2 home, worker resources

Outputs

Output Type Description
PipelineResult Twister2PipelineResult Wraps the Twister2JobState with Beam state mapping (RUNNING, DONE, FAILED)

Execution Flow

The run() method executes the following steps in sequence:

  1. Create execution environment -- Instantiates Twister2PipelineExecutionEnvironment with the validated options. This also creates a BeamBatchTSetEnvironment and sets it on the options.
  2. Apply transform overrides -- Calls pipeline.replaceAll(getDefaultOverrides()) to replace SplittableParDo transforms with runner-compatible implementations.
  3. Convert SDF reads -- Unless the beam_fn_api experiment is enabled, converts read-based splittable DoFns to primitive reads.
  4. Translate pipeline -- Calls env.translate(pipeline) which creates the appropriate translator (Twister2BatchPipelineTranslator) and traverses the pipeline topologically.
  5. Setup system -- Calls setupSystem(options) which packages classpath dependencies into a ZIP, sets system properties for cluster type, job file, and Twister2 home. In cluster mode, validates that required config files (core.yaml, network.yaml, data.yaml, resource.yaml, task.yaml) exist.
  6. Build job config -- Populates a JobConfig (or config map for local mode) with serialized side inputs, leaf node IDs, and the TSet graph.
  7. Build Twister2Job -- Creates a Twister2Job descriptor with the job name, BeamBatchWorker as the worker class, and compute resource specifications.
  8. Submit job -- Submits via LocalSubmitter (local mode) or Twister2Submitter (cluster mode).

Local Mode Detection

private boolean isLocalMode(Twister2PipelineOptions options) {
    if (options.getTwister2Home() == null
        || "".equals(options.getTwister2Home())) {
      return true;
    } else {
      return false;
    }
}

When twister2Home is null or empty, the runner operates in local mode, forces parallelism to 1, and uses LocalSubmitter. A warning is logged that local mode only supports a single worker.

Usage Examples

Basic Pipeline Execution

// Configure options
Twister2PipelineOptions options = PipelineOptionsFactory
    .fromArgs(args)
    .withValidation()
    .as(Twister2PipelineOptions.class);
options.setParallelism(4);
options.setClusterType("standalone");
options.setTwister2Home("/opt/twister2");

// Build and run pipeline
Pipeline pipeline = Pipeline.create(options);
pipeline.apply("ReadInput", TextIO.read().from("input.txt"))
    .apply("Transform", ParDo.of(new MyDoFn()))
    .apply("WriteOutput", TextIO.write().to("output"));

PipelineResult result = pipeline.run();
// result.getState() returns DONE, RUNNING, or FAILED

Programmatic Runner Instantiation

// Direct runner instantiation (alternative to --runner flag)
Twister2Runner runner = Twister2Runner.fromOptions(options);
PipelineResult result = runner.run(pipeline);

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment