Implementation:Apache Beam DirectRunner DefaultTransformOverrides
| Field | Value |
|---|---|
| Implementation Name | DirectRunner DefaultTransformOverrides |
| Overview | Concrete tool for applying Direct Runner specific transform overrides to a Beam pipeline, provided by the Direct Runner module. |
| Source | runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
|
| Implements | Principle:Apache_Beam_Transform_Override_Application |
| last_updated | 2026-02-09 04:00 GMT |
Code Reference
Source Location
| File | Lines | Description |
|---|---|---|
runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java |
L244-309 | performRewrites(), sideInputUsingTransformOverrides(), groupByKeyOverrides()
|
runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java |
Full file | Handles SplittableDoFn and stateful/timer ParDo decomposition |
runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java |
Full file | Handles runner-determined shard count for WriteFiles |
runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java |
Full file | Handles GroupByKey decomposition into DirectGroupByKey |
runners/direct-java/src/main/java/org/apache/beam/runners/direct/MultiStepCombine.java |
Full file | Multi-step decomposition of Combine transforms |
Signature
// Main entry point: called from DirectRunner.run() at L176
@VisibleForTesting
void performRewrites(Pipeline pipeline) {
// Phase 1: Apply side-input-using transform overrides
pipeline.replaceAll(sideInputUsingTransformOverrides());
// Phase 2: Add WriteView primitives for active side inputs
pipeline.traverseTopologically(new DirectWriteViewVisitor());
// Phase 3: Apply GroupByKey and SplittableDoFn overrides
pipeline.replaceAll(groupByKeyOverrides());
// Phase 4: Convert SDF-based reads to primitive reads if necessary
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline);
}
// Phase 1 overrides (side-input-introducing)
@SuppressWarnings("rawtypes")
private List<PTransformOverride> sideInputUsingTransformOverrides() {
// Returns overrides for:
// - WriteWithShardingFactory (if runnerDeterminedSharding is enabled)
// - MultiStepCombine.Factory
// - DirectTestStreamFactory
}
// Phase 3 overrides (GBK and SplittableDoFn)
@SuppressWarnings("rawtypes")
private List<PTransformOverride> groupByKeyOverrides() {
// Returns overrides for:
// - ParDoMultiOverrideFactory (splittable ParDo)
// - ParDoMultiOverrideFactory (state/timer ParDo)
// - SplittableParDoViaKeyedWorkItems.OverrideFactory
// - DirectGBKIntoKeyedWorkItemsOverrideFactory
// - DirectGroupByKeyOverrideFactory
}
Import
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.sdk.util.construction.PTransformMatchers;
import org.apache.beam.sdk.util.construction.PTransformTranslation;
I/O Contract
Inputs
| Parameter | Type | Description |
|---|---|---|
pipeline |
Pipeline |
The user's Pipeline object containing the original, user-defined transforms. The pipeline is modified in-place. |
Outputs
| Output | Type | Description |
|---|---|---|
| Modified pipeline | Pipeline (mutated in place) |
The same Pipeline object with transforms replaced by Direct Runner specific implementations. The transform hierarchy now contains runner-optimized primitives. |
Usage Examples
How Overrides Are Applied During Pipeline Execution
// This happens automatically inside DirectRunner.run()
// Users do not call performRewrites() directly.
@Override
public DirectPipelineResult run(Pipeline pipeline) {
// Serialize/deserialize options for isolation
options = MAPPER.readValue(MAPPER.writeValueAsBytes(options), PipelineOptions.class)
.as(DirectOptions.class);
// Apply all transform overrides
performRewrites(pipeline); // <-- This is where overrides happen
// Continue with graph construction, enforcement setup, and execution...
MetricsEnvironment.setMetricsSupported(true);
DirectGraphVisitor graphVisitor = new DirectGraphVisitor();
pipeline.traverseTopologically(graphVisitor);
// ...
}
Override Ordering Example
The ordering of overrides is critical. The Direct Runner applies them in three phases:
// Phase 1: Side-input-using overrides (introduces new side inputs)
// - WriteWithShardingFactory: replaces WriteFiles with log-based shard count
// - MultiStepCombine.Factory: decomposes Combine into multiple steps
// - DirectTestStreamFactory: replaces TestStream with direct implementation
// Phase 2: WriteView visitor (adds WriteView primitives for side inputs)
// Must run after Phase 1 (which may introduce new side inputs)
// Must run before Phase 3 (which handles GBK used by WriteView)
// Phase 3: GBK and SplittableDoFn overrides
// - ParDoMultiOverrideFactory: decomposes splittable/stateful ParDo
// - SplittableParDoViaKeyedWorkItems.OverrideFactory
// - DirectGBKIntoKeyedWorkItemsOverrideFactory
// - DirectGroupByKeyOverrideFactory: decomposes GBK into DirectGroupByKey
Supporting Factory Details
ParDoMultiOverrideFactory
Replaces ParDo.MultiOutput transforms that involve splittable DoFns or stateful/timer DoFns. The factory detects whether the DoFn uses @ProcessElement with a RestrictionTracker parameter (splittable) or @StateId/@TimerId annotations (stateful), and produces the appropriate decomposed replacement.
WriteWithShardingFactory
Replaces WriteFiles transforms that use runner-determined sharding. It computes the number of shards using a logarithmic formula: shards = 2 * log2(numRecords), ensuring reasonable parallelism without excessive small files.
DirectGroupByKeyOverrideFactory
Replaces GroupByKey with DirectGroupByKey, which decomposes the operation into separate enforce-key and group-by-key-only steps. This enables the Direct Runner to apply immutability enforcement between the key extraction and grouping phases.
Related Pages
- Principle:Apache_Beam_Transform_Override_Application -- The principle of runner-specific transform replacement.
- Environment:Apache_Beam_Java_Build_Environment -- Java build environment with JDK 8+, Gradle, and multi-language toolchain.
Sources
- Repo -- Apache Beam -- Source repository at
runners/direct-java/.