Overview
Defines the abstract base classes and data structures that form the execution backend framework for NeMo Curator, providing BaseExecutor for running pipelines and BaseStageAdapter for wrapping processing stages with performance tracking.
Description
This module establishes the core abstraction layer for NeMo Curator's backend system. It contains four key components:
- NodeInfo -- A lightweight dataclass holding a node_id string, used to pass node-level context during setup calls across backends.
- WorkerMetadata -- A lightweight dataclass holding a worker_id string and a generic allocation field for backend-specific allocation information.
- BaseExecutor -- An abstract base class (ABC) for pipeline executors. It accepts an optional configuration dictionary and an ignore_head_node flag (which can also be set via the CURATOR_IGNORE_RAY_HEAD_NODE environment variable). Subclasses must implement the abstract execute() method to run a list of processing stages.
- BaseStageAdapter -- A concrete base class that wraps a ProcessingStage and adds automatic performance tracking. Its process_batch() method uses a StageTimer to measure processing time, log statistics, consume custom metrics recorded by the stage, and attach performance stats to each output task. It also delegates setup_on_node(), setup(), and teardown() lifecycle calls to the underlying stage.
All backend implementations (Xenna, Ray ActorPool, Ray Data) inherit from these base classes to share a common interface for stage execution and performance monitoring.
Usage
These classes are not used directly by end users. They are extended by concrete backend implementations such as XennaExecutor, RayDataExecutor, and RayActorPoolStageAdapter. When building a custom backend, subclass BaseExecutor and implement execute(). When adapting a stage to a new backend, subclass BaseStageAdapter and override methods as needed.
Code Reference
Source Location
- Repository: NeMo-Curator
- File: nemo_curator/backends/base.py
- Lines: 1-122
Signature
@dataclass
class NodeInfo:
node_id: str = ""
@dataclass
class WorkerMetadata:
worker_id: str = ""
allocation: Any = None
class BaseExecutor(ABC):
def __init__(self, config: dict[str, Any] | None = None, ignore_head_node: bool = False): ...
@abstractmethod
def execute(self, stages: list["ProcessingStage"], initial_tasks: list[Task] | None = None) -> None: ...
class BaseStageAdapter:
def __init__(self, stage: "ProcessingStage"): ...
def process_batch(self, tasks: list[Task]) -> list[Task]: ...
def setup_on_node(self, node_info: NodeInfo | None = None, worker_metadata: WorkerMetadata | None = None) -> None: ...
def setup(self, worker_metadata: WorkerMetadata | None = None) -> None: ...
def teardown(self) -> None: ...
Import
from nemo_curator.backends.base import BaseExecutor, BaseStageAdapter, NodeInfo, WorkerMetadata
I/O Contract
BaseExecutor
Inputs
| Name |
Type |
Required |
Description
|
| config |
dict[str, Any] or None |
No |
Optional configuration dictionary for the executor
|
| ignore_head_node |
bool |
No |
Whether to ignore the Ray head node for scheduling (default False, also settable via CURATOR_IGNORE_RAY_HEAD_NODE env var)
|
execute() Inputs
| Name |
Type |
Required |
Description
|
| stages |
list[ProcessingStage] |
Yes |
List of processing stages to execute in order
|
| initial_tasks |
list[Task] or None |
No |
Optional initial tasks to feed into the first stage
|
BaseStageAdapter
Inputs
| Name |
Type |
Required |
Description
|
| stage |
ProcessingStage |
Yes |
The processing stage to wrap and adapt
|
process_batch()
| Name |
Type |
Required |
Description
|
| tasks |
list[Task] |
Yes |
List of tasks to process through the wrapped stage
|
Outputs
| Name |
Type |
Description
|
| process_batch return |
list[Task] |
Processed tasks with attached stage performance stats and custom metrics
|
Usage Examples
Subclassing BaseExecutor
from nemo_curator.backends.base import BaseExecutor
from nemo_curator.stages.base import ProcessingStage
from nemo_curator.tasks import Task
class MyCustomExecutor(BaseExecutor):
def execute(self, stages: list[ProcessingStage], initial_tasks: list[Task] | None = None) -> list[Task]:
tasks = initial_tasks or []
for stage in stages:
adapter = BaseStageAdapter(stage)
tasks = adapter.process_batch(tasks)
return tasks
Using BaseStageAdapter for Performance Tracking
from nemo_curator.backends.base import BaseStageAdapter
# Wrap a processing stage to add performance tracking
adapter = BaseStageAdapter(my_stage)
# Setup lifecycle
adapter.setup_on_node(node_info, worker_metadata)
adapter.setup(worker_metadata)
# Process with automatic timing and metrics
results = adapter.process_batch(tasks)
# Teardown
adapter.teardown()
Related Pages