Implementation:Apache Beam ExecutorServiceParallelExecutor
Appearance
| Field | Value |
|---|---|
| Implementation Name | ExecutorServiceParallelExecutor |
| Overview | Concrete tool for parallel pipeline execution with watermark-driven scheduling, provided by the Direct Runner module. |
| Source | runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
|
| Implements | Principle:Apache_Beam_Parallel_Execution |
| last_updated | 2026-02-09 04:00 GMT |
Code Reference
Source Location
| File | Lines | Description |
|---|---|---|
runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java |
L66-68 | Class declaration: implements PipelineExecutor and BundleProcessor
|
runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java |
L88-96 | create(): static factory method
|
runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java |
L156-210 | start(): initializes root bundles and launches the QuiescenceDriver
|
runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java |
L213-244 | process() and evaluateBundle(): bundle dispatch with keyed/parallel routing
|
runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java |
L251-295 | waitUntilFinish(): blocking wait for pipeline completion
|
runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java |
L307-310 | stop(): cancels execution
|
runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java |
L312-364 | shutdownIfNecessary(): graceful shutdown of all executor services
|
runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java |
Full file | Quiescence-based execution driver that schedules root bundles and timers |
runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java |
Full file | Manages execution state, bundle commitment, and watermark updates |
runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java |
Full file | Registry mapping transform URNs to evaluator factories |
Signature
final class ExecutorServiceParallelExecutor
implements PipelineExecutor,
BundleProcessor<PCollection<?>, CommittedBundle<?>, AppliedPTransform<?, ?, ?>> {
// Factory method
public static ExecutorServiceParallelExecutor create(
int targetParallelism,
TransformEvaluatorRegistry registry,
Map<String, Collection<ModelEnforcementFactory>> transformEnforcements,
EvaluationContext context,
ExecutorService metricsExecutor)
// Start pipeline execution: initialize root bundles and launch driver
@Override
public void start(DirectGraph graph, RootProviderRegistry rootProviderRegistry)
// Process a bundle for a given consumer transform
@Override
public void process(
CommittedBundle<?> bundle,
AppliedPTransform<?, ?, ?> consumer,
CompletionCallback onComplete)
// Block until pipeline completes or duration elapses
@Override
public State waitUntilFinish(Duration duration) throws Exception
// Get current pipeline state
@Override
public State getPipelineState()
// Stop pipeline execution (cancellation)
@Override
public void stop()
}
Import
import org.apache.beam.runners.direct.ExecutorServiceParallelExecutor;
import org.apache.beam.runners.direct.EvaluationContext;
import org.apache.beam.runners.direct.TransformEvaluatorRegistry;
import org.apache.beam.runners.local.ExecutionDriver;
I/O Contract
Inputs
| Parameter | Type | Description |
|---|---|---|
targetParallelism |
int |
Thread pool size for parallel bundle processing. Configured via DirectOptions.getTargetParallelism().
|
registry |
TransformEvaluatorRegistry |
Maps transform URNs to evaluator factories that know how to execute each type of transform. |
transformEnforcements |
Map<String, Collection<ModelEnforcementFactory>> |
Maps transform URNs to enforcement factories (e.g., PAR_DO_TRANSFORM_URN to ImmutabilityEnforcementFactory).
|
context |
EvaluationContext |
Manages execution state including bundle factories, watermark manager, metrics, and keyed PValue tracking. |
metricsExecutor |
ExecutorService |
Dedicated executor for asynchronous metrics aggregation. |
graph |
DirectGraph |
The execution graph containing root transforms and producer-consumer relationships. Passed to start().
|
rootProviderRegistry |
RootProviderRegistry |
Provides initial input bundles for root transforms by splitting sources. Passed to start().
|
Outputs
| Output | Type | Description |
|---|---|---|
| Running pipeline execution | Asynchronous execution | Bundles are processed in parallel by the thread pool. The QuiescenceDriver schedules work until the pipeline reaches quiescence or fails.
|
| Pipeline state | State |
Accessible via getPipelineState(). Transitions from RUNNING to DONE (success), FAILED (error), or CANCELLED (user cancellation).
|
| Visible updates | VisibleExecutorUpdate |
Queued updates consumed by waitUntilFinish() to detect terminal state or propagate exceptions.
|
Usage Examples
Executor Creation and Startup in DirectRunner.run()
// Inside DirectRunner.run() at L205-214
TransformEvaluatorRegistry registry =
TransformEvaluatorRegistry.javaSdkNativeRegistry(context, options);
PipelineExecutor executor =
ExecutorServiceParallelExecutor.create(
options.getTargetParallelism(), // thread pool size
registry, // transform evaluators
Enforcement.defaultModelEnforcements(enabledEnforcements), // immutability checks
context, // evaluation context
metricsPool); // metrics executor
// Launch execution
executor.start(graph, RootProviderRegistry.javaNativeRegistry(context, options));
Bundle Dispatch: Keyed vs. Parallel
// In evaluateBundle() at L221-244
private <T> void evaluateBundle(
final AppliedPTransform<?, ?, ?> transform,
final CommittedBundle<T> bundle,
final CompletionCallback onComplete) {
TransformExecutorService transformExecutor;
if (isKeyed(bundle.getPCollection())) {
// Keyed data: serialize execution per (step, key) pair
final StepAndKey stepAndKey = StepAndKey.of(transform, bundle.getKey());
transformExecutor = serialExecutorServices.getUnchecked(stepAndKey);
} else {
// Unkeyed data: parallel execution
transformExecutor = parallelExecutorService;
}
TransformExecutor callable =
executorFactory.create(bundle, transform, onComplete, transformExecutor);
if (!pipelineState.get().isTerminal()) {
transformExecutor.schedule(callable);
}
}
Start Method: Root Bundle Initialization
// In start() at L156-210
@Override
public void start(DirectGraph graph, RootProviderRegistry rootProviderRegistry) {
int numTargetSplits = Math.max(3, targetParallelism);
// Generate initial bundles for each root transform
for (AppliedPTransform<?, ?, ?> root : graph.getRootTransforms()) {
Queue<CommittedBundle<?>> pending = Queues.newArrayDeque();
Collection<CommittedBundle<?>> initialInputs =
rootProviderRegistry.getInitialInputs(root, numTargetSplits);
pending.addAll(initialInputs);
pendingRootBundles.put(root, pending);
}
// Initialize evaluation context with pending root bundles
evaluationContext.initialize(pendingRootBundles.build());
// Create and launch the QuiescenceDriver
final ExecutionDriver executionDriver =
QuiescenceDriver.create(
evaluationContext, graph, this, visibleUpdates, pendingRootBundles.build());
// Submit the driver loop to the thread pool
executorService.submit(new Runnable() {
@Override
public void run() {
DriverState drive = executionDriver.drive();
if (drive.isTerminal()) {
// SHUTDOWN -> DONE, FAILED -> FAILED
shutdownIfNecessary(newPipelineState);
} else {
// CONTINUE -> re-submit for next iteration
executorService.submit(this);
}
}
});
}
Waiting for Pipeline Completion
// In waitUntilFinish() at L251-295
@Override
public State waitUntilFinish(Duration duration) throws Exception {
Instant completionTime = duration.equals(Duration.ZERO)
? new Instant(Long.MAX_VALUE)
: Instant.now().plus(duration);
while (Instant.now().isBefore(completionTime)) {
// Poll for visible updates (25ms timeout)
VisibleExecutorUpdate update = visibleUpdates.tryNext(Duration.millis(25L));
if (update == null && pipelineState.get().isTerminal()) {
// Pipeline finished; check for any final update
update = visibleUpdates.tryNext(Duration.millis(1L));
if (update == null) {
return pipelineState.get();
}
}
if (update != null) {
if (isTerminalStateUpdate(update)) {
return pipelineState.get();
}
if (update.thrown.isPresent()) {
throw (Exception) update.thrown.get();
}
}
}
return null; // timeout elapsed
}
Key Supporting Classes
| Class | Role | Key Methods |
|---|---|---|
QuiescenceDriver |
Drives the execution loop; schedules root bundles and fired timers until quiescence | create(), drive() returns DriverState (CONTINUE, SHUTDOWN, FAILED)
|
EvaluationContext |
Manages execution state; commits bundle results; updates watermarks and metrics | create(), handleResult(), initialize(), getMetrics()
|
TransformEvaluatorRegistry |
Maps transform URNs to evaluator factories for the Java SDK native transforms | javaSdkNativeRegistry(), getEvaluator()
|
DirectTransformExecutor |
Callable that processes a single bundle through a transform evaluator with enforcement | Factory.create(), call()
|
TransformExecutorServices |
Provides parallel (shared thread pool) and serial (single-thread) executor service wrappers | parallel(), serial()
|
Thread Pool Architecture
+-------------------------------------------+
| ExecutorServiceParallelExecutor |
| |
| +-----------+ +----------------------+ |
| | Parallel | | Serial Executor | |
| | Executor | | Cache (per StepAndKey)| |
| | Service | | [weakly cached] | |
| +-----+-----+ +----------+-----------+ |
| | | |
| +-----v--------------------v-----------+ |
| | Fixed Thread Pool (targetParallelism)| |
| | (non-daemon "direct-runner-worker") | |
| +--------------------------------------+ |
| |
| +--------------------------------------+ |
| | Metrics Executor (cached pool) | |
| | (non-daemon "direct-metrics-...") | |
| +--------------------------------------+ |
+-------------------------------------------+
Related Pages
- Principle:Apache_Beam_Parallel_Execution -- The principle of watermark-driven parallel bundle execution.
- Environment:Apache_Beam_Java_Build_Environment -- Java build environment with JDK 8+, Gradle, and multi-language toolchain.
- Heuristic:Apache_Beam_Thread_Pool_Parallelism_Sizing -- Thread pool sizing via max(cores, 3) and 4× cache concurrency.
- Heuristic:Apache_Beam_Watermark_Update_Throttling -- Watermark update throttling to MAX_INCREMENTAL_UPDATES=10.
- Heuristic:Apache_Beam_Executor_Shutdown_Ordering -- Cache invalidation before thread pool shutdown pattern.
Sources
- Repo -- Apache Beam -- Source repository at
runners/direct-java/.
Page Connections
Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment