Implementation:Apache Beam DirectGraphVisitor
Appearance
| Field | Value |
|---|---|
| Implementation Name | DirectGraphVisitor |
| Overview | Concrete tool for building a DirectGraph execution graph from pipeline traversal, provided by the Direct Runner module. |
| Source | runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
|
| Implements | Principle:Apache_Beam_Pipeline_Graph_Construction |
| last_updated | 2026-02-09 04:00 GMT |
Code Reference
Source Location
| File | Lines | Description |
|---|---|---|
runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java |
L53-161 | Full class: pipeline visitor that builds the execution graph |
runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java |
L70-78 | enterCompositeTransform(): enters composites, checks not finalized
|
runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java |
L82-98 | leaveCompositeTransform(): finalizes graph on root node exit
|
runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java |
L101-129 | visitPrimitiveTransform(): records transforms, step names, consumers
|
runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java |
L132-137 | visitValue(): records PCollection producers
|
runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java |
L156-160 | getGraph(): builds immutable DirectGraph
|
runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraph.java |
L38-124 | DirectGraph: immutable execution graph with lookup methods
|
Signature
// DirectGraphVisitor: PipelineVisitor that builds the execution graph
class DirectGraphVisitor extends PipelineVisitor.Defaults {
// Enter composite transforms (validates not already finalized)
@Override
public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node)
// Leave composite transforms (finalizes on root node exit)
@Override
public void leaveCompositeTransform(TransformHierarchy.Node node)
// Visit primitive transforms: records step names, root transforms,
// per-element consumers, all consumers, consumed views, view writers
@Override
public void visitPrimitiveTransform(TransformHierarchy.Node node)
// Visit values: records PCollection producers
@Override
public void visitValue(PValue value, TransformHierarchy.Node producer)
// Build the immutable DirectGraph (must be called after traversal)
public DirectGraph getGraph()
}
// DirectGraph: immutable execution graph
class DirectGraph implements ExecutableGraph<AppliedPTransform<?, ?, ?>, PValue> {
// Factory method
static DirectGraph create(
Map<PCollection<?>, AppliedPTransform<?, ?, ?>> producers,
Map<PCollectionView<?>, AppliedPTransform<?, ?, ?>> viewWriters,
ListMultimap<PInput, AppliedPTransform<?, ?, ?>> perElementConsumers,
Set<AppliedPTransform<?, ?, ?>> rootTransforms,
Map<AppliedPTransform<?, ?, ?>, String> stepNames)
// Lookup methods
AppliedPTransform<?, ?, ?> getProducer(PValue produced)
Collection<PValue> getProduced(AppliedPTransform<?, ?, ?> producer)
Collection<PValue> getPerElementInputs(AppliedPTransform<?, ?, ?> transform)
List<AppliedPTransform<?, ?, ?>> getPerElementConsumers(PValue consumed)
Set<AppliedPTransform<?, ?, ?>> getRootTransforms()
Collection<AppliedPTransform<?, ?, ?>> getExecutables()
String getStepName(AppliedPTransform<?, ?, ?> step)
Set<PCollectionView<?>> getViews()
}
Import
import org.apache.beam.runners.direct.DirectGraphVisitor;
import org.apache.beam.runners.direct.DirectGraph;
I/O Contract
Inputs
| Parameter | Type | Description |
|---|---|---|
| Pipeline with overridden transforms | Pipeline |
A pipeline that has already had transform overrides applied via performRewrites(). The visitor is passed to pipeline.traverseTopologically().
|
Outputs
| Output | Type | Description |
|---|---|---|
| Execution graph | DirectGraph |
An immutable graph containing: root transforms (transforms with no inputs), step names (unique IDs like s0, s1), per-element consumers (transforms consuming each PCollection as main input), producers (transform producing each PCollection), and view writers (transforms materializing PCollectionViews).
|
Usage Examples
Graph Construction in DirectRunner.run()
// Inside DirectRunner.run() at L179-196
DirectGraphVisitor graphVisitor = new DirectGraphVisitor();
pipeline.traverseTopologically(graphVisitor);
DirectGraph graph = graphVisitor.getGraph();
// The graph is now used to set up the execution context
EvaluationContext context = EvaluationContext.create(
clockSupplier.get(),
Enforcement.bundleFactoryFor(enabledEnforcements, graph),
graph,
keyedPValueVisitor.getKeyedPValues(),
metricsPool);
How visitPrimitiveTransform Works
// For each primitive transform in the pipeline:
@Override
public void visitPrimitiveTransform(TransformHierarchy.Node node) {
AppliedPTransform<?, ?, ?> appliedTransform = getAppliedTransform(node);
// Assign a unique step name (s0, s1, s2, ...)
stepNames.put(appliedTransform, genStepName());
if (node.getInputs().isEmpty()) {
// No inputs -> this is a root transform (e.g., Read, Create)
rootTransforms.add(appliedTransform);
} else {
// Record per-element consumers (main inputs, excluding side inputs)
Collection<PValue> mainInputs =
TransformInputs.nonAdditionalInputs(node.toAppliedPTransform(getPipeline()));
for (PValue value : mainInputs) {
perElementConsumers.put(value, appliedTransform);
}
// Record all consumers (including side inputs)
for (PValue value : node.getInputs().values()) {
allConsumers.put(value, appliedTransform);
}
}
// Track side input views and view writers
if (node.getTransform() instanceof ParDo.MultiOutput) {
consumedViews.addAll(
((ParDo.MultiOutput<?, ?>) node.getTransform()).getSideInputs().values());
} else if (node.getTransform() instanceof WriteView) {
viewWriters.put(
((WriteView) node.getTransform()).getView(),
node.toAppliedPTransform(getPipeline()));
}
}
Graph Validation on Finalization
When the visitor leaves the root composite node, it validates that all consumed PCollectionView objects have a corresponding WriteView transform:
@Override
public void leaveCompositeTransform(TransformHierarchy.Node node) {
if (node.isRootNode()) {
finalized = true;
checkState(
viewWriters.keySet().containsAll(consumedViews),
"All PCollectionViews that are consumed must be written by some WriteView PTransform");
}
}
Internal Data Structures
| Field | Type | Purpose |
|---|---|---|
producers |
Map<PCollection, AppliedPTransform> |
Maps each PCollection to its producing transform |
viewWriters |
Map<PCollectionView, AppliedPTransform> |
Maps each side input view to its WriteView transform |
perElementConsumers |
ListMultimap<PInput, AppliedPTransform> |
Maps each PValue to transforms consuming it as main input |
rootTransforms |
Set<AppliedPTransform> |
Transforms with no inputs (execution starting points) |
stepNames |
Map<AppliedPTransform, String> |
Unique step name per primitive transform |
consumedViews |
Set<PCollectionView> |
Views referenced as side inputs by ParDo transforms |
finalized |
boolean |
Guard: prevents traversal of a second pipeline with the same visitor |
Related Pages
- Principle:Apache_Beam_Pipeline_Graph_Construction -- The principle of building an immutable execution graph from pipeline traversal.
- Environment:Apache_Beam_Java_Build_Environment -- Java build environment with JDK 8+, Gradle, and multi-language toolchain.
Sources
- Repo -- Apache Beam -- Source repository at
runners/direct-java/.
Page Connections
Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment