Implementation:Apache Beam Twister2Runner Run
| Attribute | Value |
|---|---|
| Implementation Name | Twister2Runner Run |
| Domain | Pipeline_Orchestration, HPC |
| Overview | Concrete tool for applying transform overrides and initiating pipeline execution on Twister2 |
| Deprecation Notice | The Twister2 Runner is deprecated (annotated @Deprecated) and scheduled for removal in Apache Beam 3.0
|
| last_updated | 2026-02-09 04:00 GMT |
Overview
Twister2Runner.run() is the central entry point for executing a Beam pipeline on the Twister2 HPC runtime. It orchestrates the entire pipeline execution lifecycle: applying transform overrides, translating the pipeline to a TSet DAG, packaging classpath dependencies, building the Twister2 job descriptor, and submitting the job for execution. The runner supports both local mode (no Twister2 installation required) and cluster mode (standalone, nomad, kubernetes, mesos).
Note: The Twister2 Runner is deprecated and is scheduled for removal in Apache Beam 3.0. Users should plan migration to an actively maintained runner.
Code Reference
Source Location
| File | Lines | Repository |
|---|---|---|
runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2Runner.java |
L72-358 | GitHub |
Signature
@SuppressWarnings({"rawtypes", "nullness"})
@Deprecated
public class Twister2Runner extends PipelineRunner<PipelineResult> {
private final Twister2PipelineOptions options;
protected Twister2Runner(Twister2PipelineOptions options) {
this.options = options;
}
public static Twister2Runner fromOptions(PipelineOptions options) {
return new Twister2Runner(
PipelineOptionsValidator.validate(Twister2PipelineOptions.class, options));
}
@Override
public PipelineResult run(Pipeline pipeline) {
// 1. Create execution environment
Twister2PipelineExecutionEnvironment env =
new Twister2PipelineExecutionEnvironment(options);
// 2. Apply transform overrides
pipeline.replaceAll(getDefaultOverrides());
// 3. Convert SDF reads if not using beam_fn_api
if (!ExperimentalOptions.hasExperiment(
pipeline.getOptions(), "beam_fn_api")) {
SplittableParDo
.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline);
}
// 4. Translate pipeline to TSet DAG
env.translate(pipeline);
// 5. Setup system (classpath packaging, config validation)
setupSystem(options);
// 6. Build job configuration
JobConfig jobConfig = new JobConfig();
// ... populates sideInputs, leaves, graph ...
// 7. Build Twister2Job
Twister2Job twister2Job = Twister2Job.newBuilder()
.setJobName(options.getJobName())
.setWorkerClass(BeamBatchWorker.class)
.addComputeResource(
options.getWorkerCPUs(),
options.getRamMegaBytes(), workers)
.setConfig(jobConfig)
.build();
// 8. Submit job
Twister2JobState jobState;
if (isLocalMode(options)) {
jobState = LocalSubmitter.submitJob(twister2Job, config);
} else {
jobState = Twister2Submitter.submitJob(twister2Job, config);
}
return new Twister2PipelineResult(jobState);
}
}
Import Statement
import org.apache.beam.runners.twister2.Twister2Runner;
I/O Contract
Inputs
| Input | Type | Description |
|---|---|---|
pipeline |
Pipeline |
User-constructed Beam pipeline with original SDK transforms |
options |
Twister2PipelineOptions |
Validated configuration including parallelism, cluster type, Twister2 home, worker resources |
Outputs
| Output | Type | Description |
|---|---|---|
PipelineResult |
Twister2PipelineResult |
Wraps the Twister2JobState with Beam state mapping (RUNNING, DONE, FAILED)
|
Execution Flow
The run() method executes the following steps in sequence:
- Create execution environment -- Instantiates
Twister2PipelineExecutionEnvironmentwith the validated options. This also creates aBeamBatchTSetEnvironmentand sets it on the options. - Apply transform overrides -- Calls
pipeline.replaceAll(getDefaultOverrides())to replace SplittableParDo transforms with runner-compatible implementations. - Convert SDF reads -- Unless the
beam_fn_apiexperiment is enabled, converts read-based splittable DoFns to primitive reads. - Translate pipeline -- Calls
env.translate(pipeline)which creates the appropriate translator (Twister2BatchPipelineTranslator) and traverses the pipeline topologically. - Setup system -- Calls
setupSystem(options)which packages classpath dependencies into a ZIP, sets system properties for cluster type, job file, and Twister2 home. In cluster mode, validates that required config files (core.yaml, network.yaml, data.yaml, resource.yaml, task.yaml) exist. - Build job config -- Populates a
JobConfig(or config map for local mode) with serialized side inputs, leaf node IDs, and the TSet graph. - Build Twister2Job -- Creates a
Twister2Jobdescriptor with the job name,BeamBatchWorkeras the worker class, and compute resource specifications. - Submit job -- Submits via
LocalSubmitter(local mode) orTwister2Submitter(cluster mode).
Local Mode Detection
private boolean isLocalMode(Twister2PipelineOptions options) {
if (options.getTwister2Home() == null
|| "".equals(options.getTwister2Home())) {
return true;
} else {
return false;
}
}
When twister2Home is null or empty, the runner operates in local mode, forces parallelism to 1, and uses LocalSubmitter. A warning is logged that local mode only supports a single worker.
Usage Examples
Basic Pipeline Execution
// Configure options
Twister2PipelineOptions options = PipelineOptionsFactory
.fromArgs(args)
.withValidation()
.as(Twister2PipelineOptions.class);
options.setParallelism(4);
options.setClusterType("standalone");
options.setTwister2Home("/opt/twister2");
// Build and run pipeline
Pipeline pipeline = Pipeline.create(options);
pipeline.apply("ReadInput", TextIO.read().from("input.txt"))
.apply("Transform", ParDo.of(new MyDoFn()))
.apply("WriteOutput", TextIO.write().to("output"));
PipelineResult result = pipeline.run();
// result.getState() returns DONE, RUNNING, or FAILED
Programmatic Runner Instantiation
// Direct runner instantiation (alternative to --runner flag)
Twister2Runner runner = Twister2Runner.fromOptions(options);
PipelineResult result = runner.run(pipeline);
Related Pages
- Principle:Apache_Beam_Transform_Override_Application_Twister2 -- The transform override principle this runner implements
- Implementation:Apache_Beam_Twister2PipelineOptions -- Configuration options consumed by this runner
- Implementation:Apache_Beam_Twister2BatchPipelineTranslator -- The translator invoked during
env.translate() - Implementation:Apache_Beam_Twister2Runner_ZipDependencies -- Classpath packaging performed during
setupSystem() - Implementation:Apache_Beam_BeamBatchWorker_Execute -- Worker class that executes the submitted job
- Heuristic:Apache_Beam_Warning_Deprecated_Twister2_Runner