Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Principle:Apache Beam Parallel Execution

From Leeroopedia


Field Value
Principle Name Parallel Execution
Overview Execution strategy that processes pipeline bundles in parallel using a thread pool, driving the pipeline to quiescence through watermark-driven scheduling.
Domains Pipeline_Orchestration, Concurrency
Related Implementation Implementation:Apache_Beam_ExecutorServiceParallelExecutor
last_updated 2026-02-09 04:00 GMT

Description

Parallel execution is the core runtime strategy of the Direct Runner. It processes bundles -- groups of elements targeted at a specific transform -- using a fixed-size thread pool. The execution engine is driven by a quiescence-based loop that continues scheduling work until no more bundles remain to be processed and all watermarks have advanced to their terminal values.

Execution Model

The parallel execution model operates through the following phases:

Phase 1: Initialization

  • Root transforms (those with no input PCollections, such as Read or Create) are identified from the DirectGraph.
  • For each root transform, the RootProviderRegistry generates initial input bundles by splitting the source into approximately max(3, targetParallelism) splits.
  • These initial bundles are placed into pending-root-bundle queues, one per root transform.
  • The EvaluationContext is initialized with these pending bundles.

Phase 2: Quiescence-Driven Execution

The QuiescenceDriver runs in a loop, performing the following steps on each iteration:

  1. Schedule available root bundles -- Dequeue pending root bundles and submit them for processing.
  2. Fire ready timers -- Query the WatermarkManager for timers whose trigger time has been reached by the current watermark, and schedule timer-firing bundles.
  3. Process bundles -- The thread pool executes TransformExecutor callables, each of which:
    • Applies the transform's evaluator to each element in the bundle
    • Applies model enforcements (immutability checking) before and after each element
    • Collects output elements into output bundles
    • Reports the TransformResult back to the executor
  4. Handle results -- The EvaluationContext commits results:
    • Output bundles are made available to downstream consumers
    • Watermarks are updated based on the committed work
    • Metrics are updated
  5. Detect quiescence -- If no pending root bundles remain, no timers are ready to fire, and no bundles are in-flight, the pipeline has reached quiescence and execution is complete.

Phase 3: Shutdown

  • When the QuiescenceDriver signals SHUTDOWN (all work complete) or FAILED (an error occurred), the executor shuts down the thread pool and signals the pipeline result.

Bundle Processing Strategies

The executor uses two bundle processing strategies depending on the data characteristics:

  • Parallel processing -- For unkeyed PCollections, bundles are submitted to the shared thread pool for fully concurrent execution. Multiple bundles for the same transform may execute simultaneously.
  • Serial processing -- For keyed PCollections, bundles with the same StepAndKey (transform + key) are processed serially using dedicated single-thread executor services. This maintains per-key ordering guarantees required by stateful processing and timers. The serial executor services are weakly cached, allowing garbage collection when no pending work exists for a given key.

Watermark Management

The WatermarkManager tracks per-transform watermarks that bound the progress of the pipeline:

  • Input watermark -- For each transform, the input watermark represents the oldest timestamp of any unprocessed element. It is the minimum of the output watermarks of all upstream transforms.
  • Output watermark -- For each transform, the output watermark represents the oldest timestamp of any pending output. It advances when all in-flight bundles for the transform complete.
  • Timer firing -- When a transform's input watermark advances past a timer's trigger time, the timer fires and produces a timer-firing bundle for processing.

Usage

Understanding the parallel execution model is essential for:

  • Debugging pipeline hangs -- A pipeline that hangs on the Direct Runner likely has a watermark that cannot advance, often due to a stuck timer or an element that prevents quiescence detection.
  • Understanding element ordering -- Elements within a bundle are processed sequentially, but bundles are processed concurrently. This means element ordering across bundles is non-deterministic.
  • Reasoning about timer behavior -- Timers fire when watermarks advance, and watermarks advance when bundles complete. The interleaving of these events determines the exact timing of timer callbacks.
  • Performance tuning -- The targetParallelism option controls the thread pool size. Setting it appropriately balances throughput against memory usage for local execution.
  • Testing stateful pipelines -- The serial processing guarantee for keyed data ensures that state updates and timer callbacks for a given key are never concurrent, matching the behavior of production runners.

Theoretical Basis

The parallel execution strategy is grounded in the watermark-driven execution model from the Dataflow Model (Akidau et al., VLDB 2015):

  • Watermarks as progress indicators -- Watermarks bound the completeness of input data. An input watermark at time t means that no more elements with event time earlier than t will arrive. This enables the system to determine when windows can be closed and when timers should fire.
  • Quiescence as termination condition -- For bounded pipelines, execution terminates when all watermarks reach positive infinity (the end of time) and no work remains in-flight. This is the quiescence condition -- the system has consumed all input and produced all output.
  • Bundle-based parallelism -- The bundle is the unit of parallel execution, analogous to a task in a traditional parallel computing system. Bundles are independent units of work that can be processed concurrently, subject to per-key serialization constraints for stateful processing.
  • Exactly-once semantics -- Each element is processed exactly once within each transform, ensured by the bundle commitment protocol. A bundle's results are atomically committed or discarded, preventing partial processing.

The Direct Runner's implementation of this model prioritizes correctness over throughput. It processes elements through the full enforcement pipeline and maintains complete watermark state, even though this is more expensive than the optimized implementations in production runners.

Related Pages

Sources

  • Paper -- The Dataflow Model -- Akidau et al., VLDB 2015. Defines the watermark-driven execution model.

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment