Implementation:Iterative Dvc Plan Repro
| Knowledge Sources | |
|---|---|
| Domains | Pipeline_Management, Graph_Algorithms |
| Last Updated | 2026-02-10 00:00 GMT |
Overview
Concrete tool for deriving the execution plan for pipeline reproduction by performing DFS post-order traversal on filtered dependency subgraphs, provided by the DVC library.
Description
The plan_repro() function in DVC's dvc.repo.reproduce module computes the ordered list of stages that need to be evaluated during pipeline reproduction. It takes the dependency graph (with frozen stages already disconnected by get_active_graph()), extracts the relevant subgraph based on target stages and mode flags (pipeline/downstream), and returns a DFS post-order traversal of that subgraph.
The function orchestrates three helper functions: get_active_graph() creates a copy of the dependency graph where frozen stages have their outgoing edges removed (disconnecting them from their dependencies); get_subgraph() extracts the portion of the graph relevant to the requested targets, supporting both upstream (default) and downstream traversal; and networkx.dfs_postorder_nodes() produces the final execution ordering.
In the main reproduce() entry point, these are combined: the active graph is obtained from self.index.graph, then plan_repro() is called to determine the ordered list of stages, which is then passed to the _reproduce() loop that executes each stage in sequence.
Usage
Use plan_repro() when you need to:
- Determine which stages need to run and in what order for a given set of targets.
- Support partial reproduction (only targets and their upstream dependencies).
- Support downstream reproduction (a target plus all stages that depend on it).
- Support full pipeline reproduction (all stages in the same connected component as the target).
- Respect frozen stages by pre-filtering the graph with get_active_graph().
Code Reference
Source Location
- Repository: DVC
- File:
dvc/repo/reproduce.py - Lines: L65-109 (plan_repro), L56-62 (get_active_graph), L38-53 (get_subgraph)
Signature
def plan_repro(
graph: "DiGraph",
stages: Optional[list["T"]] = None,
pipeline: bool = False,
downstream: bool = False,
) -> list["T"]:
r"""Derive the evaluation of the given node for the given graph.
When you _reproduce a stage_, you want to _evaluate the descendants_
to know if it make sense to _recompute_ it. A post-ordered search
will give us an order list of the nodes we want.
For example, let's say that we have the following pipeline:
E
/ \
D F
/ \ \
B C G
\ /
A
The derived evaluation of D would be: [A, B, C, D]
"""
...
def get_active_graph(graph: "DiGraph") -> "DiGraph":
"""Return graph copy with frozen stage edges removed."""
...
def get_subgraph(
graph: "DiGraph",
nodes: Optional[list] = None,
pipeline: bool = False,
downstream: bool = False,
) -> "DiGraph":
"""Extract relevant subgraph based on target nodes and mode."""
...
Import
from dvc.repo.reproduce import plan_repro, get_active_graph, get_subgraph
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| graph | networkx.DiGraph | Yes | The pipeline dependency graph (typically from get_active_graph(repo.index.graph)) |
| stages | Optional[list[Stage]] | No | Target stages to reproduce; if None, the entire graph is traversed |
| pipeline | bool | No | If True, include entire weakly connected components containing target stages |
| downstream | bool | No | If True, include stages that depend on the targets (downstream propagation) instead of dependencies (upstream) |
Outputs
| Name | Type | Description |
|---|---|---|
| execution_plan | list[Stage] | Ordered list of stages in DFS post-order: dependencies appear before dependents, ready for sequential execution |
Usage Examples
Basic Usage
from dvc.repo import Repo
from dvc.repo.reproduce import plan_repro, get_active_graph, collect_stages
repo = Repo(".")
# Collect target stages
targets = collect_stages(repo, ["train"], recursive=False, glob=False)
# Get the active graph (frozen stages disconnected)
graph = get_active_graph(repo.index.graph)
# Plan reproduction: upstream dependencies of "train"
plan = plan_repro(graph, stages=targets)
for stage in plan:
print(f"Will reproduce: {stage.addressing}")
# Output order: prepare -> featurize -> train
Downstream Propagation
# Plan downstream: find all stages affected by changes to "prepare"
targets = collect_stages(repo, ["prepare"], recursive=False, glob=False)
graph = get_active_graph(repo.index.graph)
plan = plan_repro(graph, stages=targets, downstream=True)
for stage in plan:
print(f"Affected stage: {stage.addressing}")
# Output: prepare -> featurize -> train -> evaluate
Full Pipeline Mode
# Plan entire pipeline containing the target stage
plan = plan_repro(graph, stages=targets, pipeline=True)
# Returns all stages in the weakly connected component