Implementation:Iterative Dvc Stage Run
| Knowledge Sources | |
|---|---|
| Domains | Pipeline_Management, Process_Execution |
| Last Updated | 2026-02-10 00:00 GMT |
Overview
Concrete tool for executing pipeline stage commands as isolated subprocesses with run-cache deduplication, shell environment management, and signal handling, provided by the DVC library.
Description
The stage execution system in DVC spans two modules: Stage.run() in dvc/stage/__init__.py orchestrates the high-level execution flow (output removal, import handling, command execution, output saving, and committing), while run_stage() and cmd_run() in dvc/stage/run.py handle the low-level details of run-cache lookup and subprocess management.
Stage.run() dispatches execution based on stage type: import stages trigger sync_import(), command stages trigger _run_stage() (which delegates to run_stage()), and data sources or frozen stages simply verify existing outputs. After execution, Stage.save() records the current checksums of all dependencies and outputs, computes the stage's MD5, and saves to the run-cache. If no_commit is not set, Stage.commit() transfers output data into the DVC cache.
run_stage() first attempts to restore from run-cache (via stage.repo.stage_cache.restore()). If a cached result is found and its outputs are available, execution is skipped entirely. Only on cache miss does the actual subprocess execution occur. The cmd_run() function prepares the subprocess environment (working directory, environment variables including DVC_ROOT and DVC_STAGE), detects the shell executable from the SHELL environment variable, applies shell-specific flags to suppress config file loading, and runs each command line via subprocess.Popen. During execution, SIGINT is temporarily ignored in the parent process to prevent interrupting the child process cleanup.
The unlocked_repo decorator is applied to cmd_run during non-dry execution, temporarily releasing the DVC repo lock so that user scripts can invoke DVC commands (such as dvc pull) without deadlocking.
Usage
Use Stage.run() and run_stage() when you need to:
- Execute a pipeline stage's command with full environment setup and error handling.
- Leverage run-cache to skip redundant executions.
- Handle subprocess lifecycle including signal management on the main thread.
- Support dry-run mode for previewing commands without execution.
- Manage the output save/commit cycle after successful execution.
Code Reference
Source Location
- Repository: DVC
- File:
dvc/stage/__init__.py - Lines: L585-624 (Stage.run)
- File:
dvc/stage/run.py - Lines: L166-182 (run_stage), L139-153 (cmd_run), L116-136 (_run subprocess helper)
Signature
class Stage:
@rwlocked(read=["deps", "outs"])
def run(
self,
dry: bool = False,
no_commit: bool = False,
force: bool = False,
allow_missing: bool = False,
no_download: bool = False,
**kwargs,
) -> None:
...
def run_stage(
stage: "Stage",
dry: bool = False,
force: bool = False,
run_env: Optional[dict] = None,
**kwargs,
) -> None:
...
def cmd_run(
stage: "Stage",
dry: bool = False,
run_env: Optional[dict] = None,
) -> None:
...
Import
from dvc.stage import Stage
from dvc.stage.run import run_stage, cmd_run
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| stage | Stage | Yes | The pipeline stage to execute, containing cmd, deps, outs, wdir, and repo reference |
| dry | bool | No | If True, only display commands without executing them (default: False) |
| force | bool | No | If True, skip run-cache lookup and force re-execution (default: False) |
| no_commit | bool | No | If True, skip committing outputs to DVC cache after execution (default: False) |
| allow_missing | bool | No | If True, tolerate missing outputs during save (default: False) |
| no_download | bool | No | If True, skip downloading and only record output metadata (default: False) |
| run_env | Optional[dict] | No | Additional environment variables to set during subprocess execution |
Outputs
| Name | Type | Description |
|---|---|---|
| (side effect) | None | Stage command executed via subprocess; output files produced on disk; Output.save() records checksums; Output.commit() transfers to cache; run-cache entry created |
Usage Examples
Basic Usage
from dvc.repo import Repo
repo = Repo(".")
# Get a stage and run it
stage = repo.stage.collect("train")[0]
# Dry run - preview what would happen
stage.run(dry=True)
# Output: Running stage 'train':
# > python train.py --lr 0.001
# Actual execution
stage.run(force=True)
# Removes old outputs, runs command, saves deps/outs, commits to cache
Run with Custom Environment
from dvc.stage.run import run_stage
# Run with additional environment variables
run_stage(
stage,
dry=False,
force=True,
run_env={"CUDA_VISIBLE_DEVICES": "0", "MLFLOW_TRACKING_URI": "http://localhost:5000"},
)
Skip Cache and Commit
# Force re-execution without run-cache and skip committing to cache
stage.run(force=True, no_commit=True)
# Useful during development when iterating quickly