Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Beam DirectGraphVisitor

From Leeroopedia


Field Value
Implementation Name DirectGraphVisitor
Overview Concrete tool for building a DirectGraph execution graph from pipeline traversal, provided by the Direct Runner module.
Source runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
Implements Principle:Apache_Beam_Pipeline_Graph_Construction
last_updated 2026-02-09 04:00 GMT

Code Reference

Source Location

File Lines Description
runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java L53-161 Full class: pipeline visitor that builds the execution graph
runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java L70-78 enterCompositeTransform(): enters composites, checks not finalized
runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java L82-98 leaveCompositeTransform(): finalizes graph on root node exit
runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java L101-129 visitPrimitiveTransform(): records transforms, step names, consumers
runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java L132-137 visitValue(): records PCollection producers
runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java L156-160 getGraph(): builds immutable DirectGraph
runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraph.java L38-124 DirectGraph: immutable execution graph with lookup methods

Signature

// DirectGraphVisitor: PipelineVisitor that builds the execution graph
class DirectGraphVisitor extends PipelineVisitor.Defaults {

    // Enter composite transforms (validates not already finalized)
    @Override
    public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node)

    // Leave composite transforms (finalizes on root node exit)
    @Override
    public void leaveCompositeTransform(TransformHierarchy.Node node)

    // Visit primitive transforms: records step names, root transforms,
    // per-element consumers, all consumers, consumed views, view writers
    @Override
    public void visitPrimitiveTransform(TransformHierarchy.Node node)

    // Visit values: records PCollection producers
    @Override
    public void visitValue(PValue value, TransformHierarchy.Node producer)

    // Build the immutable DirectGraph (must be called after traversal)
    public DirectGraph getGraph()
}

// DirectGraph: immutable execution graph
class DirectGraph implements ExecutableGraph<AppliedPTransform<?, ?, ?>, PValue> {

    // Factory method
    static DirectGraph create(
        Map<PCollection<?>, AppliedPTransform<?, ?, ?>> producers,
        Map<PCollectionView<?>, AppliedPTransform<?, ?, ?>> viewWriters,
        ListMultimap<PInput, AppliedPTransform<?, ?, ?>> perElementConsumers,
        Set<AppliedPTransform<?, ?, ?>> rootTransforms,
        Map<AppliedPTransform<?, ?, ?>, String> stepNames)

    // Lookup methods
    AppliedPTransform<?, ?, ?> getProducer(PValue produced)
    Collection<PValue> getProduced(AppliedPTransform<?, ?, ?> producer)
    Collection<PValue> getPerElementInputs(AppliedPTransform<?, ?, ?> transform)
    List<AppliedPTransform<?, ?, ?>> getPerElementConsumers(PValue consumed)
    Set<AppliedPTransform<?, ?, ?>> getRootTransforms()
    Collection<AppliedPTransform<?, ?, ?>> getExecutables()
    String getStepName(AppliedPTransform<?, ?, ?> step)
    Set<PCollectionView<?>> getViews()
}

Import

import org.apache.beam.runners.direct.DirectGraphVisitor;
import org.apache.beam.runners.direct.DirectGraph;

I/O Contract

Inputs

Parameter Type Description
Pipeline with overridden transforms Pipeline A pipeline that has already had transform overrides applied via performRewrites(). The visitor is passed to pipeline.traverseTopologically().

Outputs

Output Type Description
Execution graph DirectGraph An immutable graph containing: root transforms (transforms with no inputs), step names (unique IDs like s0, s1), per-element consumers (transforms consuming each PCollection as main input), producers (transform producing each PCollection), and view writers (transforms materializing PCollectionViews).

Usage Examples

Graph Construction in DirectRunner.run()

// Inside DirectRunner.run() at L179-196
DirectGraphVisitor graphVisitor = new DirectGraphVisitor();
pipeline.traverseTopologically(graphVisitor);

DirectGraph graph = graphVisitor.getGraph();

// The graph is now used to set up the execution context
EvaluationContext context = EvaluationContext.create(
    clockSupplier.get(),
    Enforcement.bundleFactoryFor(enabledEnforcements, graph),
    graph,
    keyedPValueVisitor.getKeyedPValues(),
    metricsPool);

How visitPrimitiveTransform Works

// For each primitive transform in the pipeline:
@Override
public void visitPrimitiveTransform(TransformHierarchy.Node node) {
    AppliedPTransform<?, ?, ?> appliedTransform = getAppliedTransform(node);

    // Assign a unique step name (s0, s1, s2, ...)
    stepNames.put(appliedTransform, genStepName());

    if (node.getInputs().isEmpty()) {
        // No inputs -> this is a root transform (e.g., Read, Create)
        rootTransforms.add(appliedTransform);
    } else {
        // Record per-element consumers (main inputs, excluding side inputs)
        Collection<PValue> mainInputs =
            TransformInputs.nonAdditionalInputs(node.toAppliedPTransform(getPipeline()));
        for (PValue value : mainInputs) {
            perElementConsumers.put(value, appliedTransform);
        }
        // Record all consumers (including side inputs)
        for (PValue value : node.getInputs().values()) {
            allConsumers.put(value, appliedTransform);
        }
    }

    // Track side input views and view writers
    if (node.getTransform() instanceof ParDo.MultiOutput) {
        consumedViews.addAll(
            ((ParDo.MultiOutput<?, ?>) node.getTransform()).getSideInputs().values());
    } else if (node.getTransform() instanceof WriteView) {
        viewWriters.put(
            ((WriteView) node.getTransform()).getView(),
            node.toAppliedPTransform(getPipeline()));
    }
}

Graph Validation on Finalization

When the visitor leaves the root composite node, it validates that all consumed PCollectionView objects have a corresponding WriteView transform:

@Override
public void leaveCompositeTransform(TransformHierarchy.Node node) {
    if (node.isRootNode()) {
        finalized = true;
        checkState(
            viewWriters.keySet().containsAll(consumedViews),
            "All PCollectionViews that are consumed must be written by some WriteView PTransform");
    }
}

Internal Data Structures

Field Type Purpose
producers Map<PCollection, AppliedPTransform> Maps each PCollection to its producing transform
viewWriters Map<PCollectionView, AppliedPTransform> Maps each side input view to its WriteView transform
perElementConsumers ListMultimap<PInput, AppliedPTransform> Maps each PValue to transforms consuming it as main input
rootTransforms Set<AppliedPTransform> Transforms with no inputs (execution starting points)
stepNames Map<AppliedPTransform, String> Unique step name per primitive transform
consumedViews Set<PCollectionView> Views referenced as side inputs by ParDo transforms
finalized boolean Guard: prevents traversal of a second pipeline with the same visitor

Related Pages

Sources

  • Repo -- Apache Beam -- Source repository at runners/direct-java/.

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment