Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:NVIDIA NeMo Curator WorkflowBase

From Leeroopedia
Knowledge Sources
Domains Data Curation, Pipeline Orchestration
Last Updated 2026-02-14 00:00 GMT

Overview

Defines the abstract WorkflowBase class and the WorkflowRunResult dataclass for structuring high-level multi-pipeline workflows and their outputs in NeMo Curator.

Description

This module provides the base abstraction for higher-level workflow orchestration that coordinates multiple pipelines and aggregates their results. It contains two key entities:

  • WorkflowRunResult -- A dataclass container returned by workflows to expose pipeline outputs. It stores a human-readable workflow name, a mapping of pipeline names to their output Task lists, and a free-form metadata dictionary. Helper methods include add_pipeline_tasks() for recording tasks emitted by a pipeline run, extend_metadata() and add_metadata() for updating the metadata dictionary, and get_metadata() for retrieval.
  • WorkflowBase -- An abstract base class (ABC) requiring subclasses to implement a run() method that returns a WorkflowRunResult. This provides a uniform interface for complex workflows such as fuzzy deduplication that orchestrate multiple pipeline stages.

Usage

Subclass WorkflowBase when building a high-level workflow that coordinates multiple pipelines. Implement the run() method to execute the pipelines and return a WorkflowRunResult containing the aggregated outputs and metadata. This pattern is used, for example, in the FuzzyDeduplicationWorkflow.

Code Reference

Source Location

  • Repository: NeMo-Curator
  • File: nemo_curator/pipeline/workflow.py
  • Lines: 1-57

Signature

@dataclass
class WorkflowRunResult:
    workflow_name: str
    pipeline_tasks: dict[str, list[Task]] = field(default_factory=dict)
    metadata: dict[str, Any] = field(default_factory=dict)

    def add_pipeline_tasks(self, pipeline_name: str, tasks: list[Task] | None) -> None: ...
    def extend_metadata(self, updates: dict[str, Any] | None = None) -> None: ...
    def add_metadata(self, key: str, value: Any) -> None: ...
    def get_metadata(self, key: str) -> Any: ...


class WorkflowBase(ABC):
    @abstractmethod
    def run(self, *args, **kwargs) -> WorkflowRunResult: ...

Import

from nemo_curator.pipeline.workflow import WorkflowBase, WorkflowRunResult

I/O Contract

Inputs

Name Type Required Description
*args Any No Positional arguments passed to the workflow's run() method (subclass-defined)
**kwargs Any No Keyword arguments passed to the workflow's run() method (subclass-defined)

Outputs

Name Type Description
result WorkflowRunResult Container with workflow_name, pipeline_tasks mapping, and metadata dictionary

WorkflowRunResult Fields

Name Type Description
workflow_name str Human-readable workflow identifier (e.g., "fuzzy_dedup")
pipeline_tasks dict[str, list[Task]] Mapping of pipeline names to the Task objects they produced
metadata dict[str, Any] Free-form dictionary for workflow-specific timing or counters

Usage Examples

Basic Usage

from nemo_curator.pipeline.workflow import WorkflowBase, WorkflowRunResult

class MyDeduplicationWorkflow(WorkflowBase):
    def run(self, input_path: str, output_path: str) -> WorkflowRunResult:
        result = WorkflowRunResult(workflow_name="my_dedup")

        # Execute pipelines and collect tasks
        tasks = self._run_minhash_pipeline(input_path)
        result.add_pipeline_tasks("minhash", tasks)

        # Store metadata
        result.add_metadata("input_path", input_path)
        result.add_metadata("output_path", output_path)

        return result

workflow = MyDeduplicationWorkflow()
result = workflow.run("/data/input", "/data/output")
print(result.workflow_name)       # "my_dedup"
print(result.get_metadata("input_path"))  # "/data/input"

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment