Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Beam ExecutorServiceParallelExecutor

From Leeroopedia


Field Value
Implementation Name ExecutorServiceParallelExecutor
Overview Concrete tool for parallel pipeline execution with watermark-driven scheduling, provided by the Direct Runner module.
Source runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
Implements Principle:Apache_Beam_Parallel_Execution
last_updated 2026-02-09 04:00 GMT

Code Reference

Source Location

File Lines Description
runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java L66-68 Class declaration: implements PipelineExecutor and BundleProcessor
runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java L88-96 create(): static factory method
runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java L156-210 start(): initializes root bundles and launches the QuiescenceDriver
runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java L213-244 process() and evaluateBundle(): bundle dispatch with keyed/parallel routing
runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java L251-295 waitUntilFinish(): blocking wait for pipeline completion
runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java L307-310 stop(): cancels execution
runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java L312-364 shutdownIfNecessary(): graceful shutdown of all executor services
runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java Full file Quiescence-based execution driver that schedules root bundles and timers
runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java Full file Manages execution state, bundle commitment, and watermark updates
runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java Full file Registry mapping transform URNs to evaluator factories

Signature

final class ExecutorServiceParallelExecutor
    implements PipelineExecutor,
        BundleProcessor<PCollection<?>, CommittedBundle<?>, AppliedPTransform<?, ?, ?>> {

    // Factory method
    public static ExecutorServiceParallelExecutor create(
        int targetParallelism,
        TransformEvaluatorRegistry registry,
        Map<String, Collection<ModelEnforcementFactory>> transformEnforcements,
        EvaluationContext context,
        ExecutorService metricsExecutor)

    // Start pipeline execution: initialize root bundles and launch driver
    @Override
    public void start(DirectGraph graph, RootProviderRegistry rootProviderRegistry)

    // Process a bundle for a given consumer transform
    @Override
    public void process(
        CommittedBundle<?> bundle,
        AppliedPTransform<?, ?, ?> consumer,
        CompletionCallback onComplete)

    // Block until pipeline completes or duration elapses
    @Override
    public State waitUntilFinish(Duration duration) throws Exception

    // Get current pipeline state
    @Override
    public State getPipelineState()

    // Stop pipeline execution (cancellation)
    @Override
    public void stop()
}

Import

import org.apache.beam.runners.direct.ExecutorServiceParallelExecutor;
import org.apache.beam.runners.direct.EvaluationContext;
import org.apache.beam.runners.direct.TransformEvaluatorRegistry;
import org.apache.beam.runners.local.ExecutionDriver;

I/O Contract

Inputs

Parameter Type Description
targetParallelism int Thread pool size for parallel bundle processing. Configured via DirectOptions.getTargetParallelism().
registry TransformEvaluatorRegistry Maps transform URNs to evaluator factories that know how to execute each type of transform.
transformEnforcements Map<String, Collection<ModelEnforcementFactory>> Maps transform URNs to enforcement factories (e.g., PAR_DO_TRANSFORM_URN to ImmutabilityEnforcementFactory).
context EvaluationContext Manages execution state including bundle factories, watermark manager, metrics, and keyed PValue tracking.
metricsExecutor ExecutorService Dedicated executor for asynchronous metrics aggregation.
graph DirectGraph The execution graph containing root transforms and producer-consumer relationships. Passed to start().
rootProviderRegistry RootProviderRegistry Provides initial input bundles for root transforms by splitting sources. Passed to start().

Outputs

Output Type Description
Running pipeline execution Asynchronous execution Bundles are processed in parallel by the thread pool. The QuiescenceDriver schedules work until the pipeline reaches quiescence or fails.
Pipeline state State Accessible via getPipelineState(). Transitions from RUNNING to DONE (success), FAILED (error), or CANCELLED (user cancellation).
Visible updates VisibleExecutorUpdate Queued updates consumed by waitUntilFinish() to detect terminal state or propagate exceptions.

Usage Examples

Executor Creation and Startup in DirectRunner.run()

// Inside DirectRunner.run() at L205-214
TransformEvaluatorRegistry registry =
    TransformEvaluatorRegistry.javaSdkNativeRegistry(context, options);

PipelineExecutor executor =
    ExecutorServiceParallelExecutor.create(
        options.getTargetParallelism(),   // thread pool size
        registry,                          // transform evaluators
        Enforcement.defaultModelEnforcements(enabledEnforcements),  // immutability checks
        context,                           // evaluation context
        metricsPool);                      // metrics executor

// Launch execution
executor.start(graph, RootProviderRegistry.javaNativeRegistry(context, options));

Bundle Dispatch: Keyed vs. Parallel

// In evaluateBundle() at L221-244
private <T> void evaluateBundle(
    final AppliedPTransform<?, ?, ?> transform,
    final CommittedBundle<T> bundle,
    final CompletionCallback onComplete) {

    TransformExecutorService transformExecutor;

    if (isKeyed(bundle.getPCollection())) {
        // Keyed data: serialize execution per (step, key) pair
        final StepAndKey stepAndKey = StepAndKey.of(transform, bundle.getKey());
        transformExecutor = serialExecutorServices.getUnchecked(stepAndKey);
    } else {
        // Unkeyed data: parallel execution
        transformExecutor = parallelExecutorService;
    }

    TransformExecutor callable =
        executorFactory.create(bundle, transform, onComplete, transformExecutor);
    if (!pipelineState.get().isTerminal()) {
        transformExecutor.schedule(callable);
    }
}

Start Method: Root Bundle Initialization

// In start() at L156-210
@Override
public void start(DirectGraph graph, RootProviderRegistry rootProviderRegistry) {
    int numTargetSplits = Math.max(3, targetParallelism);

    // Generate initial bundles for each root transform
    for (AppliedPTransform<?, ?, ?> root : graph.getRootTransforms()) {
        Queue<CommittedBundle<?>> pending = Queues.newArrayDeque();
        Collection<CommittedBundle<?>> initialInputs =
            rootProviderRegistry.getInitialInputs(root, numTargetSplits);
        pending.addAll(initialInputs);
        pendingRootBundles.put(root, pending);
    }

    // Initialize evaluation context with pending root bundles
    evaluationContext.initialize(pendingRootBundles.build());

    // Create and launch the QuiescenceDriver
    final ExecutionDriver executionDriver =
        QuiescenceDriver.create(
            evaluationContext, graph, this, visibleUpdates, pendingRootBundles.build());

    // Submit the driver loop to the thread pool
    executorService.submit(new Runnable() {
        @Override
        public void run() {
            DriverState drive = executionDriver.drive();
            if (drive.isTerminal()) {
                // SHUTDOWN -> DONE, FAILED -> FAILED
                shutdownIfNecessary(newPipelineState);
            } else {
                // CONTINUE -> re-submit for next iteration
                executorService.submit(this);
            }
        }
    });
}

Waiting for Pipeline Completion

// In waitUntilFinish() at L251-295
@Override
public State waitUntilFinish(Duration duration) throws Exception {
    Instant completionTime = duration.equals(Duration.ZERO)
        ? new Instant(Long.MAX_VALUE)
        : Instant.now().plus(duration);

    while (Instant.now().isBefore(completionTime)) {
        // Poll for visible updates (25ms timeout)
        VisibleExecutorUpdate update = visibleUpdates.tryNext(Duration.millis(25L));

        if (update == null && pipelineState.get().isTerminal()) {
            // Pipeline finished; check for any final update
            update = visibleUpdates.tryNext(Duration.millis(1L));
            if (update == null) {
                return pipelineState.get();
            }
        }

        if (update != null) {
            if (isTerminalStateUpdate(update)) {
                return pipelineState.get();
            }
            if (update.thrown.isPresent()) {
                throw (Exception) update.thrown.get();
            }
        }
    }
    return null;  // timeout elapsed
}

Key Supporting Classes

Class Role Key Methods
QuiescenceDriver Drives the execution loop; schedules root bundles and fired timers until quiescence create(), drive() returns DriverState (CONTINUE, SHUTDOWN, FAILED)
EvaluationContext Manages execution state; commits bundle results; updates watermarks and metrics create(), handleResult(), initialize(), getMetrics()
TransformEvaluatorRegistry Maps transform URNs to evaluator factories for the Java SDK native transforms javaSdkNativeRegistry(), getEvaluator()
DirectTransformExecutor Callable that processes a single bundle through a transform evaluator with enforcement Factory.create(), call()
TransformExecutorServices Provides parallel (shared thread pool) and serial (single-thread) executor service wrappers parallel(), serial()

Thread Pool Architecture

+-------------------------------------------+
|  ExecutorServiceParallelExecutor           |
|                                            |
|  +-----------+  +----------------------+  |
|  | Parallel  |  | Serial Executor      |  |
|  | Executor  |  | Cache (per StepAndKey)|  |
|  | Service   |  | [weakly cached]      |  |
|  +-----+-----+  +----------+-----------+  |
|        |                    |              |
|  +-----v--------------------v-----------+  |
|  |  Fixed Thread Pool (targetParallelism)|  |
|  |  (non-daemon "direct-runner-worker") |  |
|  +--------------------------------------+  |
|                                            |
|  +--------------------------------------+  |
|  |  Metrics Executor (cached pool)      |  |
|  |  (non-daemon "direct-metrics-...")   |  |
|  +--------------------------------------+  |
+-------------------------------------------+

Related Pages

Sources

  • Repo -- Apache Beam -- Source repository at runners/direct-java/.

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment