Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Principle:Apache Beam Twister2 Execution and Result Collection

From Leeroopedia


Attribute Value
Principle Name Twister2 Execution and Result Collection
Domain Data_Processing, HPC, Windowing
Description Process of executing Beam DoFn logic and window-aware GroupByKey operations within the Twister2 TSet execution framework and collecting results
Deprecation Notice The Twister2 Runner is deprecated and scheduled for removal in Apache Beam 3.0
last_updated 2026-02-09 04:00 GMT

Overview

Twister2 Execution and Result Collection describes how Beam's core processing abstractions -- DoFns and window-aware GroupByKey operations -- are executed within the Twister2 TSet framework, and how execution results are collected and reported back to the client. This principle covers the runtime behavior of the two key compute functions that bridge Beam semantics with Twister2, as well as the pipeline result wrapper.

Note: The Twister2 Runner is deprecated and is scheduled for removal in Apache Beam 3.0. Users should plan migration to an actively maintained runner.

Description

During Twister2 batch execution, three key components bridge Beam semantics with the Twister2 TSet execution model:

1. DoFn Execution (DoFnFunction)

DoFnFunction wraps a user's DoFn as a Twister2 ComputeCollectorFunc. This adapter handles:

  • Serialization -- DoFn instances, coders, windowing strategies, and side input mappings are serialized to byte arrays and protobuf for transport to worker nodes. On the worker side, initTransient() deserializes them back.
  • DoFnRunner initialization -- During prepare(), a DoFnRunners.simpleRunner() is created with the deserialized DoFn, side input reader, output manager, coders, and windowing strategy.
  • Bundle lifecycle -- During compute(), the DoFnRunner processes a complete bundle:
    1. startBundle()
    2. processElement() for each input element
    3. finishBundle()
    4. Collect all outputs via the output manager
  • Output routing -- The DoFnOutputManager routes outputs to main and side output tags, wrapping them as RawUnionValue instances with tag indices.
  • Teardown -- close() invokes DoFnInvoker.invokeTeardown() for resource cleanup.

2. Window-Aware GroupByKey (GroupByWindowFunction)

GroupByWindowFunction implements window-aware GroupByKey as a Twister2 FlatMapFunc. For each key-values group:

  • State and timer setup -- Creates InMemoryStateInternals and InMemoryTimerInternals for the key
  • ReduceFnRunner creation -- Creates a ReduceFnRunner with the windowing strategy, trigger state machine, state internals, timer internals, and the system reduce function
  • Element processing -- Passes all values through reduceFnRunner.processElements()
  • Watermark advancement -- Advances the input watermark to TIMESTAMP_MAX_VALUE to close all pending windows
  • Timer firing -- Advances processing time to infinity and fires all eligible timers (event timers, processing timers, synchronized processing timers)
  • Output collection -- Collects all windowed key-value outputs via a RecordCollector

3. Pipeline Result (Twister2PipelineResult)

Twister2PipelineResult implements PipelineResult and wraps the Twister2 job state:

  • getState() returns the mapped Beam state
  • cancel() throws UnsupportedOperationException
  • waitUntilFinish(Duration) delegates to waitUntilFinish() (duration is ignored)
  • waitUntilFinish() returns the current state immediately (no actual waiting)
  • metrics() throws UnsupportedOperationException

Usage

These are internal execution primitives not directly invoked by users. Understanding them is valuable when:

  • Debugging element processing errors -- If a DoFn throws exceptions during processing, the stack trace will show DoFnFunction.compute()
  • Troubleshooting windowing issues -- Incorrect window assignment or trigger behavior on Twister2 can be traced through GroupByWindowFunction.flatMap()
  • Understanding serialization failures -- If DoFn or coder serialization fails, errors will appear during DoFnFunction.prepareSerialization() or initTransient()
  • Evaluating result state -- The Twister2PipelineResult has limited functionality (no cancel, no metrics, no wait-with-duration), which affects how monitoring code should interact with the result

Theoretical Basis

The execution model is based on several design patterns and theoretical concepts:

Adapter Pattern

Both DoFnFunction and GroupByWindowFunction are adapters that bridge Beam's processing abstractions to Twister2's compute function interfaces:

Beam Abstraction Twister2 Interface Adapter Class
DoFn<InputT, OutputT> ComputeCollectorFunc<RawUnionValue, Iterator<WindowedValue<InputT>>> DoFnFunction
Window-aware GroupByKey FlatMapFunc<WindowedValue<KV<K, Iterable<V>>>, KV<K, Iterable<WindowedValue<V>>>> GroupByWindowFunction

DoFn Lifecycle Mapping

The DoFn lifecycle must be correctly mapped to Twister2's prepare/compute cycle:

DoFn Lifecycle Phase Twister2 Phase Method
@Setup prepare(TSetContext) DoFnInvokers.tryInvokeSetupFor()
@StartBundle compute() start doFnRunner.startBundle()
@ProcessElement compute() loop doFnRunner.processElement()
@FinishBundle compute() end doFnRunner.finishBundle()
@Teardown close() or on error DoFnInvoker.invokeTeardown()

Batch Window Completion

The GroupByWindowFunction uses a batch completion strategy: after processing all elements, it advances all watermarks and timers to infinity. This forces all windows to close and emit their results, which is correct for batch processing where all data is available upfront.

Serialization Bridge

Both functions handle the challenge of transporting Java objects across the cluster by converting transient objects (DoFn, coders, windowing strategies) to byte arrays and protobuf on the client side, then reconstructing them on the worker side. This follows the serialization proxy pattern for safe cross-process object transfer.

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment