Implementation:Datajuicer Data juicer PipelineDAG
| Knowledge Sources | |
|---|---|
| Domains | Data_Processing, Core |
| Last Updated | 2026-02-14 16:00 GMT |
Overview
Concrete tool for pipeline execution state tracking via a directed acyclic graph provided by Data-Juicer.
Description
PipelineDAG provides the core DAG (Directed Acyclic Graph) data structure for representing pipeline execution state, including node management, dependency tracking, serialization, and visualization. DAGNodeStatus is an enum defining the state machine states (PENDING, RUNNING, COMPLETED, FAILED). The DAG stores nodes as dictionaries (matching the strategy output format) and provides methods to mark node status transitions (started/completed/failed), query ready nodes (all dependencies completed), save/load execution plans as JSON, and generate text-based visualizations of the DAG structure. The DAG is persisted to the work directory for resumption support.
Usage
Use when building or managing pipeline execution plans that require dependency-aware execution ordering, state tracking, and the ability to resume from a previously saved execution plan.
Code Reference
Source Location
- Repository: Datajuicer_Data_juicer
- File:
data_juicer/core/executor/pipeline_dag.py
Signature
class DAGNodeStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
class PipelineDAG:
def __init__(self, work_dir: str):
def save_execution_plan(self, filename: str = "dag_execution_plan.json") -> str:
def load_execution_plan(self, filename: str = "dag_execution_plan.json") -> bool:
def mark_node_started(self, node_id: str) -> None:
def mark_node_completed(self, node_id: str, duration: float = None) -> None:
def mark_node_failed(self, node_id: str, error_message: str) -> None:
def get_node_status(self, node_id: str) -> DAGNodeStatus:
def get_ready_nodes(self) -> List[str]:
def get_execution_summary(self) -> Dict[str, Any]:
def visualize(self) -> str:
Import
from data_juicer.core.executor.pipeline_dag import PipelineDAG, DAGNodeStatus
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| work_dir | str | Yes | Working directory for storing DAG execution plans |
| node_id | str | Yes (for node ops) | Unique identifier for a DAG node |
| filename | str | No | Name of the JSON file for saving/loading execution plans. Default: "dag_execution_plan.json" |
| duration | float | No | Actual duration of node execution in seconds |
| error_message | str | Yes (for mark_node_failed) | Error message describing the failure |
Outputs
| Name | Type | Description |
|---|---|---|
| execution_plan_path | str | Path to the saved execution plan JSON file (from save_execution_plan) |
| ready_nodes | List[str] | List of node IDs whose dependencies are all completed (from get_ready_nodes) |
| summary | Dict[str, Any] | Execution summary with counts of completed, failed, running, pending nodes and total duration |
| visualization | str | Text-based visualization of the DAG structure (from visualize) |
Usage Examples
from data_juicer.core.executor.pipeline_dag import PipelineDAG, DAGNodeStatus
# Create a pipeline DAG
dag = PipelineDAG(work_dir="/path/to/work_dir")
# Add nodes (typically done by execution strategies)
dag.nodes["op_0"] = {
"node_id": "op_0",
"operation_name": "text_length_filter",
"dependencies": [],
"execution_order": 0,
"status": "pending"
}
# Track execution state
dag.mark_node_started("op_0")
dag.mark_node_completed("op_0", duration=5.2)
# Save and reload
dag.save_execution_plan()
# Visualize
print(dag.visualize())