Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Datajuicer Data juicer PipelineDAG

From Leeroopedia
Knowledge Sources
Domains Data_Processing, Core
Last Updated 2026-02-14 16:00 GMT

Overview

Concrete tool for pipeline execution state tracking via a directed acyclic graph provided by Data-Juicer.

Description

PipelineDAG provides the core DAG (Directed Acyclic Graph) data structure for representing pipeline execution state, including node management, dependency tracking, serialization, and visualization. DAGNodeStatus is an enum defining the state machine states (PENDING, RUNNING, COMPLETED, FAILED). The DAG stores nodes as dictionaries (matching the strategy output format) and provides methods to mark node status transitions (started/completed/failed), query ready nodes (all dependencies completed), save/load execution plans as JSON, and generate text-based visualizations of the DAG structure. The DAG is persisted to the work directory for resumption support.

Usage

Use when building or managing pipeline execution plans that require dependency-aware execution ordering, state tracking, and the ability to resume from a previously saved execution plan.

Code Reference

Source Location

Signature

class DAGNodeStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"

class PipelineDAG:
    def __init__(self, work_dir: str):

    def save_execution_plan(self, filename: str = "dag_execution_plan.json") -> str:

    def load_execution_plan(self, filename: str = "dag_execution_plan.json") -> bool:

    def mark_node_started(self, node_id: str) -> None:

    def mark_node_completed(self, node_id: str, duration: float = None) -> None:

    def mark_node_failed(self, node_id: str, error_message: str) -> None:

    def get_node_status(self, node_id: str) -> DAGNodeStatus:

    def get_ready_nodes(self) -> List[str]:

    def get_execution_summary(self) -> Dict[str, Any]:

    def visualize(self) -> str:

Import

from data_juicer.core.executor.pipeline_dag import PipelineDAG, DAGNodeStatus

I/O Contract

Inputs

Name Type Required Description
work_dir str Yes Working directory for storing DAG execution plans
node_id str Yes (for node ops) Unique identifier for a DAG node
filename str No Name of the JSON file for saving/loading execution plans. Default: "dag_execution_plan.json"
duration float No Actual duration of node execution in seconds
error_message str Yes (for mark_node_failed) Error message describing the failure

Outputs

Name Type Description
execution_plan_path str Path to the saved execution plan JSON file (from save_execution_plan)
ready_nodes List[str] List of node IDs whose dependencies are all completed (from get_ready_nodes)
summary Dict[str, Any] Execution summary with counts of completed, failed, running, pending nodes and total duration
visualization str Text-based visualization of the DAG structure (from visualize)

Usage Examples

from data_juicer.core.executor.pipeline_dag import PipelineDAG, DAGNodeStatus

# Create a pipeline DAG
dag = PipelineDAG(work_dir="/path/to/work_dir")

# Add nodes (typically done by execution strategies)
dag.nodes["op_0"] = {
    "node_id": "op_0",
    "operation_name": "text_length_filter",
    "dependencies": [],
    "execution_order": 0,
    "status": "pending"
}

# Track execution state
dag.mark_node_started("op_0")
dag.mark_node_completed("op_0", duration=5.2)

# Save and reload
dag.save_execution_plan()

# Visualize
print(dag.visualize())

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment