Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Beam DirectRunner DefaultTransformOverrides

From Leeroopedia


Field Value
Implementation Name DirectRunner DefaultTransformOverrides
Overview Concrete tool for applying Direct Runner specific transform overrides to a Beam pipeline, provided by the Direct Runner module.
Source runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
Implements Principle:Apache_Beam_Transform_Override_Application
last_updated 2026-02-09 04:00 GMT

Code Reference

Source Location

File Lines Description
runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java L244-309 performRewrites(), sideInputUsingTransformOverrides(), groupByKeyOverrides()
runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java Full file Handles SplittableDoFn and stateful/timer ParDo decomposition
runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java Full file Handles runner-determined shard count for WriteFiles
runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java Full file Handles GroupByKey decomposition into DirectGroupByKey
runners/direct-java/src/main/java/org/apache/beam/runners/direct/MultiStepCombine.java Full file Multi-step decomposition of Combine transforms

Signature

// Main entry point: called from DirectRunner.run() at L176
@VisibleForTesting
void performRewrites(Pipeline pipeline) {
    // Phase 1: Apply side-input-using transform overrides
    pipeline.replaceAll(sideInputUsingTransformOverrides());

    // Phase 2: Add WriteView primitives for active side inputs
    pipeline.traverseTopologically(new DirectWriteViewVisitor());

    // Phase 3: Apply GroupByKey and SplittableDoFn overrides
    pipeline.replaceAll(groupByKeyOverrides());

    // Phase 4: Convert SDF-based reads to primitive reads if necessary
    SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline);
}

// Phase 1 overrides (side-input-introducing)
@SuppressWarnings("rawtypes")
private List<PTransformOverride> sideInputUsingTransformOverrides() {
    // Returns overrides for:
    // - WriteWithShardingFactory (if runnerDeterminedSharding is enabled)
    // - MultiStepCombine.Factory
    // - DirectTestStreamFactory
}

// Phase 3 overrides (GBK and SplittableDoFn)
@SuppressWarnings("rawtypes")
private List<PTransformOverride> groupByKeyOverrides() {
    // Returns overrides for:
    // - ParDoMultiOverrideFactory (splittable ParDo)
    // - ParDoMultiOverrideFactory (state/timer ParDo)
    // - SplittableParDoViaKeyedWorkItems.OverrideFactory
    // - DirectGBKIntoKeyedWorkItemsOverrideFactory
    // - DirectGroupByKeyOverrideFactory
}

Import

import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.sdk.util.construction.PTransformMatchers;
import org.apache.beam.sdk.util.construction.PTransformTranslation;

I/O Contract

Inputs

Parameter Type Description
pipeline Pipeline The user's Pipeline object containing the original, user-defined transforms. The pipeline is modified in-place.

Outputs

Output Type Description
Modified pipeline Pipeline (mutated in place) The same Pipeline object with transforms replaced by Direct Runner specific implementations. The transform hierarchy now contains runner-optimized primitives.

Usage Examples

How Overrides Are Applied During Pipeline Execution

// This happens automatically inside DirectRunner.run()
// Users do not call performRewrites() directly.

@Override
public DirectPipelineResult run(Pipeline pipeline) {
    // Serialize/deserialize options for isolation
    options = MAPPER.readValue(MAPPER.writeValueAsBytes(options), PipelineOptions.class)
                    .as(DirectOptions.class);

    // Apply all transform overrides
    performRewrites(pipeline);   // <-- This is where overrides happen

    // Continue with graph construction, enforcement setup, and execution...
    MetricsEnvironment.setMetricsSupported(true);
    DirectGraphVisitor graphVisitor = new DirectGraphVisitor();
    pipeline.traverseTopologically(graphVisitor);
    // ...
}

Override Ordering Example

The ordering of overrides is critical. The Direct Runner applies them in three phases:

// Phase 1: Side-input-using overrides (introduces new side inputs)
// - WriteWithShardingFactory: replaces WriteFiles with log-based shard count
// - MultiStepCombine.Factory: decomposes Combine into multiple steps
// - DirectTestStreamFactory: replaces TestStream with direct implementation

// Phase 2: WriteView visitor (adds WriteView primitives for side inputs)
// Must run after Phase 1 (which may introduce new side inputs)
// Must run before Phase 3 (which handles GBK used by WriteView)

// Phase 3: GBK and SplittableDoFn overrides
// - ParDoMultiOverrideFactory: decomposes splittable/stateful ParDo
// - SplittableParDoViaKeyedWorkItems.OverrideFactory
// - DirectGBKIntoKeyedWorkItemsOverrideFactory
// - DirectGroupByKeyOverrideFactory: decomposes GBK into DirectGroupByKey

Supporting Factory Details

ParDoMultiOverrideFactory

Replaces ParDo.MultiOutput transforms that involve splittable DoFns or stateful/timer DoFns. The factory detects whether the DoFn uses @ProcessElement with a RestrictionTracker parameter (splittable) or @StateId/@TimerId annotations (stateful), and produces the appropriate decomposed replacement.

WriteWithShardingFactory

Replaces WriteFiles transforms that use runner-determined sharding. It computes the number of shards using a logarithmic formula: shards = 2 * log2(numRecords), ensuring reasonable parallelism without excessive small files.

DirectGroupByKeyOverrideFactory

Replaces GroupByKey with DirectGroupByKey, which decomposes the operation into separate enforce-key and group-by-key-only steps. This enables the Direct Runner to apply immutability enforcement between the key extraction and grouping phases.

Related Pages

Sources

  • Repo -- Apache Beam -- Source repository at runners/direct-java/.

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment