Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Beam DoFnFunction And GroupByWindowFunction

From Leeroopedia


Attribute Value
Implementation Name DoFnFunction And GroupByWindowFunction
Domain Data_Processing, HPC, Windowing
Overview Concrete tools for executing Beam DoFn and GroupByWindow operations within Twister2 TSet batch execution
Deprecation Notice The Twister2 Runner is deprecated and scheduled for removal in Apache Beam 3.0
last_updated 2026-02-09 04:00 GMT

Overview

This page documents three interrelated classes that form the core execution layer of the Twister2 Beam runner: DoFnFunction (wraps Beam DoFns as Twister2 compute functions), GroupByWindowFunction (implements window-aware GroupByKey), and Twister2PipelineResult (wraps job execution state). Together, these classes bridge Beam's processing semantics with Twister2's TSet execution model.

Note: The Twister2 Runner is deprecated and is scheduled for removal in Apache Beam 3.0. Users should plan migration to an actively maintained runner.

Code Reference

Source Location

File Lines Repository
runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/DoFnFunction.java L70-362 GitHub
runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/GroupByWindowFunction.java L58-213 GitHub
runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2PipelineResult.java L31-89 GitHub

Signature: DoFnFunction

public class DoFnFunction<OutputT, InputT>
    implements ComputeCollectorFunc<RawUnionValue,
                                    Iterator<WindowedValue<InputT>>> {

  // No-arg constructor for Kryo serialization
  public DoFnFunction() {
    this.isInitialized = false;
  }

  public DoFnFunction(
      Twister2TranslationContext context,
      DoFn<InputT, OutputT> doFn,
      Coder<InputT> inputCoder,
      Map<TupleTag<?>, Coder<?>> outputCoders,
      List<TupleTag<?>> sideOutputs,
      WindowingStrategy<?, ?> windowingStrategy,
      Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
      TupleTag<OutputT> mainOutput,
      DoFnSchemaInformation doFnSchemaInformation,
      Map<TupleTag<?>, Integer> outputMap,
      Map<String, PCollectionView<?>> sideInputMapping) {
    // stores parameters and calls prepareSerialization()
  }

  @Override
  public void prepare(TSetContext context) {
    initTransient();
    sideInputReader =
        new Twister2SideInputReader(sideInputs, context);
    outputManager.setup(mainOutput, sideOutputs);
    doFnInvoker =
        DoFnInvokers.tryInvokeSetupFor(doFn, pipelineOptions);
    doFnRunner = DoFnRunners.simpleRunner(
        pipelineOptions, doFn, sideInputReader, outputManager,
        mainOutput, sideOutputs, stepcontext, inputCoder,
        outputCoders, windowingStrategy, doFnSchemaInformation,
        sideInputMapping);
  }

  @Override
  public void compute(
      Iterator<WindowedValue<InputT>> input,
      RecordCollector<RawUnionValue> output) {
    try {
      outputManager.clear();
      doFnRunner.startBundle();
      while (input.hasNext()) {
        doFnRunner.processElement(input.next());
      }
      doFnRunner.finishBundle();
      Iterator<RawUnionValue> outputs =
          outputManager.getOutputs();
      while (outputs.hasNext()) {
        output.collect(outputs.next());
      }
    } catch (final RuntimeException re) {
      DoFnInvokers.invokerFor(doFn).invokeTeardown();
      throw re;
    }
  }

  @Override
  public void close() {
    Optional.ofNullable(doFnInvoker)
        .ifPresent(DoFnInvoker::invokeTeardown);
  }

  public Set<String> getSideInputKeys() {
    initTransient();
    Set<String> keys = new HashSet<>();
    for (TupleTag<?> view : sideInputs.keySet()) {
      keys.add(view.getId());
    }
    return keys;
  }
}

Signature: GroupByWindowFunction

public class GroupByWindowFunction<K, V, W extends BoundedWindow>
    implements FlatMapFunc<WindowedValue<KV<K, Iterable<V>>>,
                           KV<K, Iterable<WindowedValue<V>>>> {

  // No-arg constructor for Kryo serialization
  public GroupByWindowFunction() {
    this.isInitialized = false;
  }

  public GroupByWindowFunction(
      WindowingStrategy<?, W> windowingStrategy,
      SystemReduceFn<K, V, Iterable<V>, Iterable<V>, W> reduceFn,
      PipelineOptions options) {
    this.windowingStrategy = windowingStrategy;
    // Serializes windowingStrategy to protobuf bytes
    this.reduceFn = reduceFn;
  }

  @Override
  public void prepare(TSetContext context) {
    initTransient();
  }

  @Override
  public void flatMap(
      KV<K, Iterable<WindowedValue<V>>> kIteratorKV,
      RecordCollector<WindowedValue<KV<K, Iterable<V>>>> collector) {
    try {
      K key = kIteratorKV.getKey();
      Iterable<WindowedValue<V>> values = kIteratorKV.getValue();

      InMemoryTimerInternals timerInternals =
          new InMemoryTimerInternals();
      timerInternals.advanceProcessingTime(Instant.now());
      timerInternals.advanceSynchronizedProcessingTime(
          Instant.now());
      StateInternals stateInternals =
          InMemoryStateInternals.forKey(key);

      ReduceFnRunner<K, V, Iterable<V>, W> reduceFnRunner =
          new ReduceFnRunner<>(
              key, windowingStrategy,
              ExecutableTriggerStateMachine.create(
                  TriggerStateMachines.stateMachineForTrigger(
                      TriggerTranslation.toProto(
                          windowingStrategy.getTrigger()))),
              stateInternals, timerInternals,
              outputter,
              new UnsupportedSideInputReader(
                  "GroupAlsoByWindow"),
              reduceFn, null);

      reduceFnRunner.processElements(values);

      // Advance watermarks to close all windows
      timerInternals.advanceInputWatermark(
          BoundedWindow.TIMESTAMP_MAX_VALUE);
      timerInternals.advanceProcessingTime(
          BoundedWindow.TIMESTAMP_MAX_VALUE);
      timerInternals.advanceSynchronizedProcessingTime(
          BoundedWindow.TIMESTAMP_MAX_VALUE);

      fireEligibleTimers(timerInternals, reduceFnRunner);
      reduceFnRunner.persist();

      Iterator<WindowedValue<KV<K, Iterable<V>>>> outputs =
          outputter.getOutputs().iterator();
      while (outputs.hasNext()) {
        collector.collect(outputs.next());
      }
    } catch (Exception e) {
      LOG.info(e.getMessage());
    }
  }
}

Signature: Twister2PipelineResult

public class Twister2PipelineResult implements PipelineResult {

  PipelineResult.State state;

  public Twister2PipelineResult(Twister2JobState jobState) {
    state = mapToState(jobState);
  }

  @Override
  public State getState() {
    return state;
  }

  @Override
  public State cancel() throws IOException {
    throw new UnsupportedOperationException(
        "Operation not supported");
  }

  @Override
  public State waitUntilFinish(Duration duration) {
    // Duration parameter is ignored
    return waitUntilFinish();
  }

  @Override
  public State waitUntilFinish() {
    return state;
  }

  @Override
  public MetricResults metrics() {
    throw new UnsupportedOperationException(
        "Operation not supported");
  }

  private State mapToState(Twister2JobState jobState) {
    switch (jobState.getJobstate()) {
      case RUNNING:  return State.RUNNING;
      case COMPLETED: return State.DONE;
      case FAILED:   return State.FAILED;
      default:       return State.FAILED;
    }
  }
}

Import Statements

import org.apache.beam.runners.twister2.translators.functions.DoFnFunction;
import org.apache.beam.runners.twister2.translators.functions.GroupByWindowFunction;
import org.apache.beam.runners.twister2.Twister2PipelineResult;

I/O Contract

DoFnFunction I/O

Direction Type Description
Input Iterator<WindowedValue<InputT>> Windowed elements from the upstream TSet operation
Output RecordCollector<RawUnionValue> Processed elements wrapped as RawUnionValue with tag indices for main and side outputs
Side Inputs Via Twister2SideInputReader Reads from cached side input TSets injected via addInput() on the worker

GroupByWindowFunction I/O

Direction Type Description
Input KV<K, Iterable<WindowedValue<V>>> Grouped key-value pairs where values retain their window information
Output RecordCollector<WindowedValue<KV<K, Iterable<V>>>> Window-aggregated key-value results with correct window assignments

Twister2PipelineResult I/O

Direction Type Description
Input Twister2JobState Job state from Twister2 submission (RUNNING, COMPLETED, FAILED)
Output PipelineResult.State Mapped Beam state (RUNNING, DONE, FAILED)

Serialization Details

DoFnFunction Serialization

The DoFnFunction class handles a complex serialization challenge: many of its fields (DoFn, coders, windowing strategies) are transient and must be manually serialized/deserialized. The prepareSerialization() method converts these to byte arrays:

Field Serialization Method Byte Storage
doFn ParDoTranslation.translateDoFn() to protobuf, then .getPayload().toByteArray() doFnwithExBytes
inputCoder SerializableUtils.serializeToByteArray() coderBytes
outputCoders SerializableUtils.serializeToByteArray() per entry outputCodersBytes
windowingStrategy WindowingStrategyTranslation.toMessageProto() windowBytes
sideInputs WindowingStrategyTranslation.toMessageProto() per entry sideInputBytes
sideOutputs TupleTag.getId() per tag serializedSideOutputs
outputMap TupleTag.getId() per key serializedOutputMap
pipelineOptions SerializablePipelineOptions.toString() serializedOptions

The initTransient() method reverses this process on the worker side, reconstructing all transient fields from their byte representations.

GroupByWindowFunction Serialization

GroupByWindowFunction serializes only the windowing strategy:

  • windowingStrategy is converted to a protobuf MessageWithComponents and stored as windowBytes
  • reduceFn is a SystemReduceFn which implements Serializable directly
  • Both classes provide no-arg constructors for Kryo deserialization and a readResolve() method

Usage Examples

DoFnFunction Is Created During Translation

DoFnFunction instances are created by ParDoMultiOutputTranslatorBatch during pipeline translation:

// Inside ParDoMultiOutputTranslatorBatch.translateNode()
DoFnFunction doFnFunction = new DoFnFunction(
    context,       // Twister2TranslationContext
    doFn,          // user's DoFn
    inputCoder,    // Coder<InputT>
    outputCoders,  // Map<TupleTag<?>, Coder<?>>
    sideOutputs,   // List<TupleTag<?>>
    windowingStrategy,
    sideInputs,    // Map<PCollectionView<?>, WindowingStrategy<?, ?>>
    mainOutput,    // TupleTag<OutputT>
    doFnSchemaInformation,
    outputMap,     // Map<TupleTag<?>, Integer>
    sideInputMapping
);
// doFnFunction is then passed to a ComputeCollector TSet

GroupByWindowFunction Is Created During Translation

// Inside GroupByKeyTranslatorBatch.translateNode()
GroupByWindowFunction gbwFunction = new GroupByWindowFunction(
    windowingStrategy,
    SystemReduceFn.buffering(valueCoder),
    options
);
// gbwFunction is then passed to a FlatMap TSet

Checking Pipeline Result

PipelineResult result = pipeline.run();
PipelineResult.State state = result.getState();
if (state == PipelineResult.State.FAILED) {
    System.err.println("Pipeline execution failed");
}
// Note: result.metrics() is not supported
// Note: result.cancel() is not supported
// Note: result.waitUntilFinish(Duration) ignores the duration

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment