Implementation:Apache Beam BeamBatchWorker Execute
| Attribute | Value |
|---|---|
| Implementation Name | BeamBatchWorker Execute |
| Domain | Job_Management, HPC |
| Overview | Concrete tool for executing a Beam batch pipeline on Twister2 worker nodes |
| Deprecation Notice | The Twister2 Runner is deprecated and scheduled for removal in Apache Beam 3.0 |
| last_updated | 2026-02-09 04:00 GMT |
Overview
BeamBatchWorker is the Twister2 worker class that executes a Beam batch pipeline on each worker node. It implements BatchTSetIWorker and Serializable. When a Twister2 job is submitted, the framework instantiates this worker on each allocated node and calls its execute() method with a BatchTSetEnvironment containing the serialized pipeline graph and configuration.
Note: The Twister2 Runner is deprecated and is scheduled for removal in Apache Beam 3.0. Users should plan migration to an actively maintained runner.
Code Reference
Source Location
| File | Lines | Repository |
|---|---|---|
runners/twister2/src/main/java/org/apache/beam/runners/twister2/BeamBatchWorker.java |
L52-166 | GitHub |
Signature: execute
public class BeamBatchWorker
implements Serializable, BatchTSetIWorker {
private static final String SIDEINPUTS = "sideInputs";
private static final String LEAVES = "leaves";
private static final String GRAPH = "graph";
private HashMap<String, BatchTSet<?>> sideInputDataSets;
private Set<TSet> leaves;
@Override
public void execute(BatchTSetEnvironment env) {
Config config = env.getConfig();
Map<String, String> sideInputIds =
(LinkedHashMap<String, String>) config.get(SIDEINPUTS);
Set<String> leaveIds =
(Set<String>) config.get(LEAVES);
TBaseGraph graph =
(TBaseGraph) config.get(GRAPH);
env.settBaseGraph(graph);
setupTSets(env, sideInputIds, leaveIds);
resetEnv(env, graph);
executePipeline(env);
}
}
Signature: setupTSets
private void setupTSets(
BatchTSetEnvironment env,
Map<String, String> sideInputIds,
Set<String> leaveIds) {
sideInputDataSets = new LinkedHashMap<>();
leaves = new HashSet<>();
// Reset sources to avoid duplicate source objects
// from deserialization
Set<BuildableTSet> newSources = new HashSet<>();
for (BuildableTSet source : env.getGraph().getSources()) {
newSources.add(
(BuildableTSet) env.getGraph().getNodeById(source.getId()));
}
env.getGraph().setSources(newSources);
for (Map.Entry<String, String> entry : sideInputIds.entrySet()) {
BatchTSet curr =
(BatchTSet) env.getGraph().getNodeById(entry.getValue());
sideInputDataSets.put(entry.getKey(), curr);
}
for (String leaveId : leaveIds) {
leaves.add((TSet) env.getGraph().getNodeById(leaveId));
}
}
Signature: resetEnv
private void resetEnv(BatchTSetEnvironment env, TBaseGraph graph) {
Set<TBase> nodes = graph.getNodes();
for (TBase node : nodes) {
if (node instanceof BaseTSet) {
((BaseTSet) node).setTSetEnv(env);
} else if (node instanceof BaseTLink) {
((BaseTLink) node).setTSetEnv(env);
} else {
throw new IllegalStateException(
"node must be either of type BaseTSet or BaseTLink");
}
}
}
Signature: executePipeline
public void executePipeline(BatchTSetEnvironment env) {
Map<String, CachedTSet> sideInputTSets = new HashMap<>();
for (Map.Entry<String, BatchTSet<?>> sides
: sideInputDataSets.entrySet()) {
BatchTSet<?> sideTSet = sides.getValue();
addInputs((BaseTSet) sideTSet, sideInputTSets);
CachedTSet tempCache = (CachedTSet) sideTSet.cache();
sideInputTSets.put(sides.getKey(), tempCache);
}
for (TSet leaf : leaves) {
SinkTSet sinkTSet =
(SinkTSet) leaf.direct().sink(new Twister2SinkFunction());
addInputs(sinkTSet, sideInputTSets);
eval(env, sinkTSet);
}
}
Signature: addInputs
private void addInputs(
BaseTSet sinkTSet,
Map<String, CachedTSet> sideInputTSets) {
if (sideInputTSets.isEmpty()) {
return;
}
TBaseGraph graph = sinkTSet.getTBaseGraph();
TBase currNode = null;
Deque<TBase> deque = new ArrayDeque<>();
deque.add(sinkTSet);
while (!deque.isEmpty()) {
currNode = deque.remove();
deque.addAll(graph.getPredecessors(currNode));
if (currNode instanceof ComputeTSet) {
if (((ComputeTSet) currNode).getComputeFunc()
instanceof DoFnFunction) {
Set<String> sideInputKeys =
((DoFnFunction) ((ComputeTSet) currNode)
.getComputeFunc()).getSideInputKeys();
for (String sideInputKey : sideInputKeys) {
if (!sideInputTSets.containsKey(sideInputKey)) {
throw new IllegalStateException(
"Side input not found for key "
+ sideInputKey);
}
((ComputeTSet) currNode).addInput(
sideInputKey,
sideInputTSets.get(sideInputKey));
}
}
}
}
}
Import Statement
import org.apache.beam.runners.twister2.BeamBatchWorker;
I/O Contract
Inputs
| Input | Type | Description |
|---|---|---|
env |
BatchTSetEnvironment |
Twister2 batch execution environment provided by the framework |
config["sideInputs"] |
LinkedHashMap<String, String> |
Map of PCollectionView name to TSet node ID for side inputs |
config["leaves"] |
Set<String> |
Set of TSet node IDs representing pipeline outputs (leaf nodes) |
config["graph"] |
TBaseGraph |
The complete serialized TSet DAG from pipeline translation |
Outputs
| Output | Type | Description |
|---|---|---|
| Executed pipeline | Side effects | All leaf TSets are evaluated, producing pipeline output (writes, side effects) |
| Cached side inputs | Map<String, CachedTSet> |
Side inputs are cached and made available to downstream DoFn operations |
Execution Flow
The BeamBatchWorker.execute() method performs the following steps:
- Deserialize config -- Extracts the side input IDs, leaf IDs, and TSet graph from the
BatchTSetEnvironment's config - Set graph -- Calls
env.settBaseGraph(graph)to install the deserialized graph as the active graph - Setup TSets -- Resolves side input and leaf TSet nodes by ID from the graph, fixing deserialization-induced source duplication
- Reset environment -- Iterates all nodes in the graph and resets their
TSetEnvreference to the current worker's environment (necessary because the serialized graph had a different environment) - Execute pipeline:
- Cache side inputs -- For each side input TSet, traverses its subgraph to wire up any dependent side inputs, then caches the result
- Evaluate leaves -- For each leaf TSet, creates a
SinkTSetwith aTwister2SinkFunction, wires up side inputs by traversing predecessors, and callsenv.run(sinkTSet)
Side Input Wiring
The addInputs() method performs a breadth-first traversal backward through the TSet graph (from sink to sources). When it encounters a ComputeTSet whose compute function is a DoFnFunction, it retrieves the DoFn's declared side input keys and adds the corresponding cached side input TSets as inputs. This ensures that DoFn operations have access to their required side inputs during execution.
Usage Examples
Worker Execution (Framework-Driven)
BeamBatchWorker is not instantiated directly by users. It is specified as the worker class in the Twister2Job descriptor:
// Inside Twister2Runner.run()
Twister2Job twister2Job = Twister2Job.newBuilder()
.setJobName(options.getJobName())
.setWorkerClass(BeamBatchWorker.class) // <-- worker class
.addComputeResource(
options.getWorkerCPUs(),
options.getRamMegaBytes(), workers)
.setConfig(jobConfig)
.build();
The Twister2 framework then instantiates BeamBatchWorker on each worker node and invokes execute(env).
Debugging Worker-Side Issues
Common worker-side issues:
IllegalStateException: "node must be either of type BaseTSet or BaseTLink"
-> Graph contains an unexpected node type after deserialization
IllegalStateException: "Side input not found for key <key>"
-> A DoFn declares a side input that was not included
in the job's sideInputs configuration
Related Pages
- Principle:Apache_Beam_Job_Submission_Twister2 -- The job submission principle this worker implements
- Implementation:Apache_Beam_Twister2Runner_Run -- The runner that constructs and submits the job with this worker class
- Implementation:Apache_Beam_DoFnFunction_And_GroupByWindowFunction -- Compute functions executed within the TSet DAG on the worker
- Implementation:Apache_Beam_Twister2BatchPipelineTranslator -- Translator that originally built the TSet DAG
- Heuristic:Apache_Beam_Warning_Deprecated_Twister2_Runner