Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Beam BeamBatchWorker Execute

From Leeroopedia


Attribute Value
Implementation Name BeamBatchWorker Execute
Domain Job_Management, HPC
Overview Concrete tool for executing a Beam batch pipeline on Twister2 worker nodes
Deprecation Notice The Twister2 Runner is deprecated and scheduled for removal in Apache Beam 3.0
last_updated 2026-02-09 04:00 GMT

Overview

BeamBatchWorker is the Twister2 worker class that executes a Beam batch pipeline on each worker node. It implements BatchTSetIWorker and Serializable. When a Twister2 job is submitted, the framework instantiates this worker on each allocated node and calls its execute() method with a BatchTSetEnvironment containing the serialized pipeline graph and configuration.

Note: The Twister2 Runner is deprecated and is scheduled for removal in Apache Beam 3.0. Users should plan migration to an actively maintained runner.

Code Reference

Source Location

File Lines Repository
runners/twister2/src/main/java/org/apache/beam/runners/twister2/BeamBatchWorker.java L52-166 GitHub

Signature: execute

public class BeamBatchWorker
    implements Serializable, BatchTSetIWorker {

  private static final String SIDEINPUTS = "sideInputs";
  private static final String LEAVES = "leaves";
  private static final String GRAPH = "graph";
  private HashMap<String, BatchTSet<?>> sideInputDataSets;
  private Set<TSet> leaves;

  @Override
  public void execute(BatchTSetEnvironment env) {
    Config config = env.getConfig();
    Map<String, String> sideInputIds =
        (LinkedHashMap<String, String>) config.get(SIDEINPUTS);
    Set<String> leaveIds =
        (Set<String>) config.get(LEAVES);
    TBaseGraph graph =
        (TBaseGraph) config.get(GRAPH);
    env.settBaseGraph(graph);
    setupTSets(env, sideInputIds, leaveIds);
    resetEnv(env, graph);
    executePipeline(env);
  }
}

Signature: setupTSets

private void setupTSets(
    BatchTSetEnvironment env,
    Map<String, String> sideInputIds,
    Set<String> leaveIds) {
  sideInputDataSets = new LinkedHashMap<>();
  leaves = new HashSet<>();

  // Reset sources to avoid duplicate source objects
  // from deserialization
  Set<BuildableTSet> newSources = new HashSet<>();
  for (BuildableTSet source : env.getGraph().getSources()) {
    newSources.add(
        (BuildableTSet) env.getGraph().getNodeById(source.getId()));
  }
  env.getGraph().setSources(newSources);

  for (Map.Entry<String, String> entry : sideInputIds.entrySet()) {
    BatchTSet curr =
        (BatchTSet) env.getGraph().getNodeById(entry.getValue());
    sideInputDataSets.put(entry.getKey(), curr);
  }
  for (String leaveId : leaveIds) {
    leaves.add((TSet) env.getGraph().getNodeById(leaveId));
  }
}

Signature: resetEnv

private void resetEnv(BatchTSetEnvironment env, TBaseGraph graph) {
  Set<TBase> nodes = graph.getNodes();
  for (TBase node : nodes) {
    if (node instanceof BaseTSet) {
      ((BaseTSet) node).setTSetEnv(env);
    } else if (node instanceof BaseTLink) {
      ((BaseTLink) node).setTSetEnv(env);
    } else {
      throw new IllegalStateException(
          "node must be either of type BaseTSet or BaseTLink");
    }
  }
}

Signature: executePipeline

public void executePipeline(BatchTSetEnvironment env) {
  Map<String, CachedTSet> sideInputTSets = new HashMap<>();
  for (Map.Entry<String, BatchTSet<?>> sides
      : sideInputDataSets.entrySet()) {
    BatchTSet<?> sideTSet = sides.getValue();
    addInputs((BaseTSet) sideTSet, sideInputTSets);
    CachedTSet tempCache = (CachedTSet) sideTSet.cache();
    sideInputTSets.put(sides.getKey(), tempCache);
  }
  for (TSet leaf : leaves) {
    SinkTSet sinkTSet =
        (SinkTSet) leaf.direct().sink(new Twister2SinkFunction());
    addInputs(sinkTSet, sideInputTSets);
    eval(env, sinkTSet);
  }
}

Signature: addInputs

private void addInputs(
    BaseTSet sinkTSet,
    Map<String, CachedTSet> sideInputTSets) {
  if (sideInputTSets.isEmpty()) {
    return;
  }

  TBaseGraph graph = sinkTSet.getTBaseGraph();
  TBase currNode = null;
  Deque<TBase> deque = new ArrayDeque<>();
  deque.add(sinkTSet);
  while (!deque.isEmpty()) {
    currNode = deque.remove();
    deque.addAll(graph.getPredecessors(currNode));
    if (currNode instanceof ComputeTSet) {
      if (((ComputeTSet) currNode).getComputeFunc()
          instanceof DoFnFunction) {
        Set<String> sideInputKeys =
            ((DoFnFunction) ((ComputeTSet) currNode)
                .getComputeFunc()).getSideInputKeys();
        for (String sideInputKey : sideInputKeys) {
          if (!sideInputTSets.containsKey(sideInputKey)) {
            throw new IllegalStateException(
                "Side input not found for key "
                + sideInputKey);
          }
          ((ComputeTSet) currNode).addInput(
              sideInputKey,
              sideInputTSets.get(sideInputKey));
        }
      }
    }
  }
}

Import Statement

import org.apache.beam.runners.twister2.BeamBatchWorker;

I/O Contract

Inputs

Input Type Description
env BatchTSetEnvironment Twister2 batch execution environment provided by the framework
config["sideInputs"] LinkedHashMap<String, String> Map of PCollectionView name to TSet node ID for side inputs
config["leaves"] Set<String> Set of TSet node IDs representing pipeline outputs (leaf nodes)
config["graph"] TBaseGraph The complete serialized TSet DAG from pipeline translation

Outputs

Output Type Description
Executed pipeline Side effects All leaf TSets are evaluated, producing pipeline output (writes, side effects)
Cached side inputs Map<String, CachedTSet> Side inputs are cached and made available to downstream DoFn operations

Execution Flow

The BeamBatchWorker.execute() method performs the following steps:

  1. Deserialize config -- Extracts the side input IDs, leaf IDs, and TSet graph from the BatchTSetEnvironment's config
  2. Set graph -- Calls env.settBaseGraph(graph) to install the deserialized graph as the active graph
  3. Setup TSets -- Resolves side input and leaf TSet nodes by ID from the graph, fixing deserialization-induced source duplication
  4. Reset environment -- Iterates all nodes in the graph and resets their TSetEnv reference to the current worker's environment (necessary because the serialized graph had a different environment)
  5. Execute pipeline:
    • Cache side inputs -- For each side input TSet, traverses its subgraph to wire up any dependent side inputs, then caches the result
    • Evaluate leaves -- For each leaf TSet, creates a SinkTSet with a Twister2SinkFunction, wires up side inputs by traversing predecessors, and calls env.run(sinkTSet)

Side Input Wiring

The addInputs() method performs a breadth-first traversal backward through the TSet graph (from sink to sources). When it encounters a ComputeTSet whose compute function is a DoFnFunction, it retrieves the DoFn's declared side input keys and adds the corresponding cached side input TSets as inputs. This ensures that DoFn operations have access to their required side inputs during execution.

Usage Examples

Worker Execution (Framework-Driven)

BeamBatchWorker is not instantiated directly by users. It is specified as the worker class in the Twister2Job descriptor:

// Inside Twister2Runner.run()
Twister2Job twister2Job = Twister2Job.newBuilder()
    .setJobName(options.getJobName())
    .setWorkerClass(BeamBatchWorker.class)  // <-- worker class
    .addComputeResource(
        options.getWorkerCPUs(),
        options.getRamMegaBytes(), workers)
    .setConfig(jobConfig)
    .build();

The Twister2 framework then instantiates BeamBatchWorker on each worker node and invokes execute(env).

Debugging Worker-Side Issues

Common worker-side issues:

IllegalStateException: "node must be either of type BaseTSet or BaseTLink"
  -> Graph contains an unexpected node type after deserialization

IllegalStateException: "Side input not found for key <key>"
  -> A DoFn declares a side input that was not included
     in the job's sideInputs configuration

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment