Implementation:Apache Beam DoFnFunction And GroupByWindowFunction
| Attribute | Value |
|---|---|
| Implementation Name | DoFnFunction And GroupByWindowFunction |
| Domain | Data_Processing, HPC, Windowing |
| Overview | Concrete tools for executing Beam DoFn and GroupByWindow operations within Twister2 TSet batch execution |
| Deprecation Notice | The Twister2 Runner is deprecated and scheduled for removal in Apache Beam 3.0 |
| last_updated | 2026-02-09 04:00 GMT |
Overview
This page documents three interrelated classes that form the core execution layer of the Twister2 Beam runner: DoFnFunction (wraps Beam DoFns as Twister2 compute functions), GroupByWindowFunction (implements window-aware GroupByKey), and Twister2PipelineResult (wraps job execution state). Together, these classes bridge Beam's processing semantics with Twister2's TSet execution model.
Note: The Twister2 Runner is deprecated and is scheduled for removal in Apache Beam 3.0. Users should plan migration to an actively maintained runner.
Code Reference
Source Location
| File | Lines | Repository |
|---|---|---|
runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/DoFnFunction.java |
L70-362 | GitHub |
runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/GroupByWindowFunction.java |
L58-213 | GitHub |
runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2PipelineResult.java |
L31-89 | GitHub |
Signature: DoFnFunction
public class DoFnFunction<OutputT, InputT>
implements ComputeCollectorFunc<RawUnionValue,
Iterator<WindowedValue<InputT>>> {
// No-arg constructor for Kryo serialization
public DoFnFunction() {
this.isInitialized = false;
}
public DoFnFunction(
Twister2TranslationContext context,
DoFn<InputT, OutputT> doFn,
Coder<InputT> inputCoder,
Map<TupleTag<?>, Coder<?>> outputCoders,
List<TupleTag<?>> sideOutputs,
WindowingStrategy<?, ?> windowingStrategy,
Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
TupleTag<OutputT> mainOutput,
DoFnSchemaInformation doFnSchemaInformation,
Map<TupleTag<?>, Integer> outputMap,
Map<String, PCollectionView<?>> sideInputMapping) {
// stores parameters and calls prepareSerialization()
}
@Override
public void prepare(TSetContext context) {
initTransient();
sideInputReader =
new Twister2SideInputReader(sideInputs, context);
outputManager.setup(mainOutput, sideOutputs);
doFnInvoker =
DoFnInvokers.tryInvokeSetupFor(doFn, pipelineOptions);
doFnRunner = DoFnRunners.simpleRunner(
pipelineOptions, doFn, sideInputReader, outputManager,
mainOutput, sideOutputs, stepcontext, inputCoder,
outputCoders, windowingStrategy, doFnSchemaInformation,
sideInputMapping);
}
@Override
public void compute(
Iterator<WindowedValue<InputT>> input,
RecordCollector<RawUnionValue> output) {
try {
outputManager.clear();
doFnRunner.startBundle();
while (input.hasNext()) {
doFnRunner.processElement(input.next());
}
doFnRunner.finishBundle();
Iterator<RawUnionValue> outputs =
outputManager.getOutputs();
while (outputs.hasNext()) {
output.collect(outputs.next());
}
} catch (final RuntimeException re) {
DoFnInvokers.invokerFor(doFn).invokeTeardown();
throw re;
}
}
@Override
public void close() {
Optional.ofNullable(doFnInvoker)
.ifPresent(DoFnInvoker::invokeTeardown);
}
public Set<String> getSideInputKeys() {
initTransient();
Set<String> keys = new HashSet<>();
for (TupleTag<?> view : sideInputs.keySet()) {
keys.add(view.getId());
}
return keys;
}
}
Signature: GroupByWindowFunction
public class GroupByWindowFunction<K, V, W extends BoundedWindow>
implements FlatMapFunc<WindowedValue<KV<K, Iterable<V>>>,
KV<K, Iterable<WindowedValue<V>>>> {
// No-arg constructor for Kryo serialization
public GroupByWindowFunction() {
this.isInitialized = false;
}
public GroupByWindowFunction(
WindowingStrategy<?, W> windowingStrategy,
SystemReduceFn<K, V, Iterable<V>, Iterable<V>, W> reduceFn,
PipelineOptions options) {
this.windowingStrategy = windowingStrategy;
// Serializes windowingStrategy to protobuf bytes
this.reduceFn = reduceFn;
}
@Override
public void prepare(TSetContext context) {
initTransient();
}
@Override
public void flatMap(
KV<K, Iterable<WindowedValue<V>>> kIteratorKV,
RecordCollector<WindowedValue<KV<K, Iterable<V>>>> collector) {
try {
K key = kIteratorKV.getKey();
Iterable<WindowedValue<V>> values = kIteratorKV.getValue();
InMemoryTimerInternals timerInternals =
new InMemoryTimerInternals();
timerInternals.advanceProcessingTime(Instant.now());
timerInternals.advanceSynchronizedProcessingTime(
Instant.now());
StateInternals stateInternals =
InMemoryStateInternals.forKey(key);
ReduceFnRunner<K, V, Iterable<V>, W> reduceFnRunner =
new ReduceFnRunner<>(
key, windowingStrategy,
ExecutableTriggerStateMachine.create(
TriggerStateMachines.stateMachineForTrigger(
TriggerTranslation.toProto(
windowingStrategy.getTrigger()))),
stateInternals, timerInternals,
outputter,
new UnsupportedSideInputReader(
"GroupAlsoByWindow"),
reduceFn, null);
reduceFnRunner.processElements(values);
// Advance watermarks to close all windows
timerInternals.advanceInputWatermark(
BoundedWindow.TIMESTAMP_MAX_VALUE);
timerInternals.advanceProcessingTime(
BoundedWindow.TIMESTAMP_MAX_VALUE);
timerInternals.advanceSynchronizedProcessingTime(
BoundedWindow.TIMESTAMP_MAX_VALUE);
fireEligibleTimers(timerInternals, reduceFnRunner);
reduceFnRunner.persist();
Iterator<WindowedValue<KV<K, Iterable<V>>>> outputs =
outputter.getOutputs().iterator();
while (outputs.hasNext()) {
collector.collect(outputs.next());
}
} catch (Exception e) {
LOG.info(e.getMessage());
}
}
}
Signature: Twister2PipelineResult
public class Twister2PipelineResult implements PipelineResult {
PipelineResult.State state;
public Twister2PipelineResult(Twister2JobState jobState) {
state = mapToState(jobState);
}
@Override
public State getState() {
return state;
}
@Override
public State cancel() throws IOException {
throw new UnsupportedOperationException(
"Operation not supported");
}
@Override
public State waitUntilFinish(Duration duration) {
// Duration parameter is ignored
return waitUntilFinish();
}
@Override
public State waitUntilFinish() {
return state;
}
@Override
public MetricResults metrics() {
throw new UnsupportedOperationException(
"Operation not supported");
}
private State mapToState(Twister2JobState jobState) {
switch (jobState.getJobstate()) {
case RUNNING: return State.RUNNING;
case COMPLETED: return State.DONE;
case FAILED: return State.FAILED;
default: return State.FAILED;
}
}
}
Import Statements
import org.apache.beam.runners.twister2.translators.functions.DoFnFunction;
import org.apache.beam.runners.twister2.translators.functions.GroupByWindowFunction;
import org.apache.beam.runners.twister2.Twister2PipelineResult;
I/O Contract
DoFnFunction I/O
| Direction | Type | Description |
|---|---|---|
| Input | Iterator<WindowedValue<InputT>> |
Windowed elements from the upstream TSet operation |
| Output | RecordCollector<RawUnionValue> |
Processed elements wrapped as RawUnionValue with tag indices for main and side outputs
|
| Side Inputs | Via Twister2SideInputReader |
Reads from cached side input TSets injected via addInput() on the worker
|
GroupByWindowFunction I/O
| Direction | Type | Description |
|---|---|---|
| Input | KV<K, Iterable<WindowedValue<V>>> |
Grouped key-value pairs where values retain their window information |
| Output | RecordCollector<WindowedValue<KV<K, Iterable<V>>>> |
Window-aggregated key-value results with correct window assignments |
Twister2PipelineResult I/O
| Direction | Type | Description |
|---|---|---|
| Input | Twister2JobState |
Job state from Twister2 submission (RUNNING, COMPLETED, FAILED) |
| Output | PipelineResult.State |
Mapped Beam state (RUNNING, DONE, FAILED) |
Serialization Details
DoFnFunction Serialization
The DoFnFunction class handles a complex serialization challenge: many of its fields (DoFn, coders, windowing strategies) are transient and must be manually serialized/deserialized. The prepareSerialization() method converts these to byte arrays:
| Field | Serialization Method | Byte Storage |
|---|---|---|
doFn |
ParDoTranslation.translateDoFn() to protobuf, then .getPayload().toByteArray() |
doFnwithExBytes
|
inputCoder |
SerializableUtils.serializeToByteArray() |
coderBytes
|
outputCoders |
SerializableUtils.serializeToByteArray() per entry |
outputCodersBytes
|
windowingStrategy |
WindowingStrategyTranslation.toMessageProto() |
windowBytes
|
sideInputs |
WindowingStrategyTranslation.toMessageProto() per entry |
sideInputBytes
|
sideOutputs |
TupleTag.getId() per tag |
serializedSideOutputs
|
outputMap |
TupleTag.getId() per key |
serializedOutputMap
|
pipelineOptions |
SerializablePipelineOptions.toString() |
serializedOptions
|
The initTransient() method reverses this process on the worker side, reconstructing all transient fields from their byte representations.
GroupByWindowFunction Serialization
GroupByWindowFunction serializes only the windowing strategy:
windowingStrategyis converted to a protobufMessageWithComponentsand stored aswindowBytesreduceFnis aSystemReduceFnwhich implementsSerializabledirectly- Both classes provide no-arg constructors for Kryo deserialization and a
readResolve()method
Usage Examples
DoFnFunction Is Created During Translation
DoFnFunction instances are created by ParDoMultiOutputTranslatorBatch during pipeline translation:
// Inside ParDoMultiOutputTranslatorBatch.translateNode()
DoFnFunction doFnFunction = new DoFnFunction(
context, // Twister2TranslationContext
doFn, // user's DoFn
inputCoder, // Coder<InputT>
outputCoders, // Map<TupleTag<?>, Coder<?>>
sideOutputs, // List<TupleTag<?>>
windowingStrategy,
sideInputs, // Map<PCollectionView<?>, WindowingStrategy<?, ?>>
mainOutput, // TupleTag<OutputT>
doFnSchemaInformation,
outputMap, // Map<TupleTag<?>, Integer>
sideInputMapping
);
// doFnFunction is then passed to a ComputeCollector TSet
GroupByWindowFunction Is Created During Translation
// Inside GroupByKeyTranslatorBatch.translateNode()
GroupByWindowFunction gbwFunction = new GroupByWindowFunction(
windowingStrategy,
SystemReduceFn.buffering(valueCoder),
options
);
// gbwFunction is then passed to a FlatMap TSet
Checking Pipeline Result
PipelineResult result = pipeline.run();
PipelineResult.State state = result.getState();
if (state == PipelineResult.State.FAILED) {
System.err.println("Pipeline execution failed");
}
// Note: result.metrics() is not supported
// Note: result.cancel() is not supported
// Note: result.waitUntilFinish(Duration) ignores the duration
Related Pages
- Principle:Apache_Beam_Twister2_Execution_and_Result_Collection -- The execution and result collection principle these classes implement
- Implementation:Apache_Beam_BeamBatchWorker_Execute -- Worker that executes the TSet DAG containing these functions
- Implementation:Apache_Beam_Twister2BatchPipelineTranslator -- Translator that creates DoFnFunction and GroupByWindowFunction instances
- Implementation:Apache_Beam_Twister2Runner_Run -- Runner that creates the Twister2PipelineResult from the job state
- Heuristic:Apache_Beam_Warning_Deprecated_Twister2_Runner