Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Beam Twister2BatchPipelineTranslator

From Leeroopedia


Attribute Value
Implementation Name Twister2BatchPipelineTranslator
Domain Pipeline_Orchestration, Compilation, HPC
Overview Concrete tool for translating a Beam pipeline into a Twister2 TSet batch execution graph
Deprecation Notice The Twister2 Runner is deprecated and scheduled for removal in Apache Beam 3.0
last_updated 2026-02-09 04:00 GMT

Overview

Twister2BatchPipelineTranslator is the batch pipeline translator that converts Beam primitive transforms into Twister2 TSet operations. It extends Twister2PipelineTranslator (which implements Pipeline.PipelineVisitor) and uses a static registry of transform translators keyed by URN to dispatch each transform to its appropriate TSet creation logic. It works in conjunction with Twister2PipelineExecutionEnvironment, which orchestrates mode detection and translator creation.

Note: The Twister2 Runner is deprecated and is scheduled for removal in Apache Beam 3.0. Users should plan migration to an actively maintained runner.

Code Reference

Source Location

File Lines Repository
runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/Twister2BatchPipelineTranslator.java L33-94 GitHub
runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2PipelineExecutionEnvironment.java L40-118 GitHub

Signature: Twister2BatchPipelineTranslator

public class Twister2BatchPipelineTranslator extends Twister2PipelineTranslator {

  private static final Map<String, BatchTransformTranslator> TRANSFORM_TRANSLATORS =
      new HashMap<>();

  private final Twister2BatchTranslationContext translationContext;

  static {
    TRANSFORM_TRANSLATORS.put(
        PTransformTranslation.IMPULSE_TRANSFORM_URN,
        new ImpulseTranslatorBatch());
    TRANSFORM_TRANSLATORS.put(
        PTransformTranslation.READ_TRANSFORM_URN,
        new ReadSourceTranslatorBatch());
    TRANSFORM_TRANSLATORS.put(
        PTransformTranslation.PAR_DO_TRANSFORM_URN,
        new ParDoMultiOutputTranslatorBatch());
    TRANSFORM_TRANSLATORS.put(
        PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN,
        new GroupByKeyTranslatorBatch());
    TRANSFORM_TRANSLATORS.put(
        PTransformTranslation.FLATTEN_TRANSFORM_URN,
        new FlattenTranslatorBatch());
    TRANSFORM_TRANSLATORS.put(
        PTransformTranslation.CREATE_VIEW_TRANSFORM_URN,
        new PCollectionViewTranslatorBatch());
    TRANSFORM_TRANSLATORS.put(
        PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN,
        new AssignWindowTranslatorBatch());
  }

  public Twister2BatchPipelineTranslator(
      Twister2PipelineOptions options,
      Twister2BatchTranslationContext twister2TranslationContext) {
    this.translationContext = twister2TranslationContext;
  }

  @Override
  public void visitPrimitiveTransform(TransformHierarchy.Node node) {
    LOG.fine(String.format("visiting transform %s", node.getTransform()));
    PTransform transform = node.getTransform();
    BatchTransformTranslator translator = getTransformTranslator(transform);
    if (null == translator) {
      throw new IllegalStateException(
          "no translator registered for " + transform);
    }
    translationContext.setCurrentTransform(
        node.toAppliedPTransform(getPipeline()));
    translator.translateNode(transform, translationContext);
  }

  private BatchTransformTranslator<?> getTransformTranslator(
      PTransform transform) {
    @Nullable String urn =
        PTransformTranslation.urnForTransformOrNull(transform);
    return urn == null ? null : TRANSFORM_TRANSLATORS.get(urn);
  }
}

Signature: Twister2PipelineExecutionEnvironment

public class Twister2PipelineExecutionEnvironment {

  private final Twister2PipelineOptions options;
  private Twister2TranslationContext twister2TranslationContext;

  public Twister2PipelineExecutionEnvironment(
      Twister2PipelineOptions options) {
    this.options = options;
    options.setTSetEnvironment(new BeamBatchTSetEnvironment());
  }

  public void translate(Pipeline pipeline) {
    TranslationModeDetector detector = new TranslationModeDetector();
    pipeline.traverseTopologically(detector);

    if (detector.isStreaming()) {
      throw new UnsupportedOperationException(
          "Streaming is not supported currently in the Twister2 Runner");
    }

    Twister2PipelineTranslator translator;
    if (options.isStreaming()) {
      // streaming path (unsupported)
      twister2TranslationContext =
          new Twister2StreamTranslationContext(options);
      translator = new Twister2StreamPipelineTranslator();
    } else {
      twister2TranslationContext =
          new Twister2BatchTranslationContext(options);
      translator = new Twister2BatchPipelineTranslator(
          options,
          (Twister2BatchTranslationContext) twister2TranslationContext);
    }
    translator.translate(pipeline);
  }

  public Map<String, BatchTSet<?>> getSideInputs() {
    return twister2TranslationContext.getSideInputDataSets();
  }

  public Set<TSet> getLeaves() {
    return twister2TranslationContext.getLeaves();
  }

  protected TBaseGraph getTSetGraph() {
    return twister2TranslationContext.getEnvironment().getGraph();
  }
}

Import Statements

import org.apache.beam.runners.twister2.translators.Twister2BatchPipelineTranslator;
import org.apache.beam.runners.twister2.Twister2PipelineExecutionEnvironment;

I/O Contract

Inputs

Input Type Description
pipeline Pipeline Beam pipeline with overridden transforms (post-override application)
options Twister2PipelineOptions Runner configuration options
twister2TranslationContext Twister2BatchTranslationContext Mutable context accumulating the TSet DAG

Outputs

Output Type Description
TSet DAG TBaseGraph (within context) Directed acyclic graph of Twister2 TSet operations
Side inputs Map<String, BatchTSet<?>> Side input TSets keyed by PCollectionView identifier
Leaves Set<TSet> Leaf (output) TSet nodes in the DAG

Transform Translator Registry

The following batch transform translators are statically registered:

URN Constant Translator Class Description
IMPULSE_TRANSFORM_URN ImpulseTranslatorBatch Creates a source TSet for impulse (root) transforms
READ_TRANSFORM_URN ReadSourceTranslatorBatch Creates a source TSet from a BoundedSource
PAR_DO_TRANSFORM_URN ParDoMultiOutputTranslatorBatch Creates a ComputeCollector TSet wrapping a DoFnFunction
GROUP_BY_KEY_TRANSFORM_URN GroupByKeyTranslatorBatch Creates keyed TSet operations with GroupByWindowFunction
FLATTEN_TRANSFORM_URN FlattenTranslatorBatch Creates a union TSet from multiple input TSets
CREATE_VIEW_TRANSFORM_URN PCollectionViewTranslatorBatch Creates a cached TSet for side input materialization
ASSIGN_WINDOWS_TRANSFORM_URN AssignWindowTranslatorBatch Creates a TSet that assigns windows to elements

Usage Examples

Translation Is Invoked Internally

Translation is not called directly by users. It is invoked within Twister2Runner.run():

// Inside Twister2Runner.run()
Twister2PipelineExecutionEnvironment env =
    new Twister2PipelineExecutionEnvironment(options);
pipeline.replaceAll(getDefaultOverrides());
env.translate(pipeline); // <-- triggers Twister2BatchPipelineTranslator

Understanding Translation Errors

If a transform has no registered translator, translation fails with:

// IllegalStateException is thrown:
// "no translator registered for <TransformName>"

This typically means the transform is either composite (should have been decomposed by overrides) or is genuinely unsupported by the Twister2 runner.

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment