Overview
Defines the abstract WorkflowBase class and the WorkflowRunResult dataclass for structuring high-level multi-pipeline workflows and their outputs in NeMo Curator.
Description
This module provides the base abstraction for higher-level workflow orchestration that coordinates multiple pipelines and aggregates their results. It contains two key entities:
- WorkflowRunResult -- A dataclass container returned by workflows to expose pipeline outputs. It stores a human-readable workflow name, a mapping of pipeline names to their output
Task lists, and a free-form metadata dictionary. Helper methods include add_pipeline_tasks() for recording tasks emitted by a pipeline run, extend_metadata() and add_metadata() for updating the metadata dictionary, and get_metadata() for retrieval.
- WorkflowBase -- An abstract base class (ABC) requiring subclasses to implement a
run() method that returns a WorkflowRunResult. This provides a uniform interface for complex workflows such as fuzzy deduplication that orchestrate multiple pipeline stages.
Usage
Subclass WorkflowBase when building a high-level workflow that coordinates multiple pipelines. Implement the run() method to execute the pipelines and return a WorkflowRunResult containing the aggregated outputs and metadata. This pattern is used, for example, in the FuzzyDeduplicationWorkflow.
Code Reference
Source Location
- Repository: NeMo-Curator
- File: nemo_curator/pipeline/workflow.py
- Lines: 1-57
Signature
@dataclass
class WorkflowRunResult:
workflow_name: str
pipeline_tasks: dict[str, list[Task]] = field(default_factory=dict)
metadata: dict[str, Any] = field(default_factory=dict)
def add_pipeline_tasks(self, pipeline_name: str, tasks: list[Task] | None) -> None: ...
def extend_metadata(self, updates: dict[str, Any] | None = None) -> None: ...
def add_metadata(self, key: str, value: Any) -> None: ...
def get_metadata(self, key: str) -> Any: ...
class WorkflowBase(ABC):
@abstractmethod
def run(self, *args, **kwargs) -> WorkflowRunResult: ...
Import
from nemo_curator.pipeline.workflow import WorkflowBase, WorkflowRunResult
I/O Contract
Inputs
| Name |
Type |
Required |
Description
|
| *args |
Any |
No |
Positional arguments passed to the workflow's run() method (subclass-defined)
|
| **kwargs |
Any |
No |
Keyword arguments passed to the workflow's run() method (subclass-defined)
|
Outputs
| Name |
Type |
Description
|
| result |
WorkflowRunResult |
Container with workflow_name, pipeline_tasks mapping, and metadata dictionary
|
WorkflowRunResult Fields
| Name |
Type |
Description
|
| workflow_name |
str |
Human-readable workflow identifier (e.g., "fuzzy_dedup")
|
| pipeline_tasks |
dict[str, list[Task]] |
Mapping of pipeline names to the Task objects they produced
|
| metadata |
dict[str, Any] |
Free-form dictionary for workflow-specific timing or counters
|
Usage Examples
Basic Usage
from nemo_curator.pipeline.workflow import WorkflowBase, WorkflowRunResult
class MyDeduplicationWorkflow(WorkflowBase):
def run(self, input_path: str, output_path: str) -> WorkflowRunResult:
result = WorkflowRunResult(workflow_name="my_dedup")
# Execute pipelines and collect tasks
tasks = self._run_minhash_pipeline(input_path)
result.add_pipeline_tasks("minhash", tasks)
# Store metadata
result.add_metadata("input_path", input_path)
result.add_metadata("output_path", output_path)
return result
workflow = MyDeduplicationWorkflow()
result = workflow.run("/data/input", "/data/output")
print(result.workflow_name) # "my_dedup"
print(result.get_metadata("input_path")) # "/data/input"
Related Pages
Page Connections
Double-click a node to navigate. Hold to expand connections.