Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:NVIDIA NeMo Curator Backend Base Classes

From Leeroopedia
Knowledge Sources
Domains Backend Architecture, Pipeline Execution, Performance Monitoring
Last Updated 2026-02-14 00:00 GMT

Overview

Defines the abstract base classes and data structures that form the execution backend framework for NeMo Curator, providing BaseExecutor for running pipelines and BaseStageAdapter for wrapping processing stages with performance tracking.

Description

This module establishes the core abstraction layer for NeMo Curator's backend system. It contains four key components:

  • NodeInfo -- A lightweight dataclass holding a node_id string, used to pass node-level context during setup calls across backends.
  • WorkerMetadata -- A lightweight dataclass holding a worker_id string and a generic allocation field for backend-specific allocation information.
  • BaseExecutor -- An abstract base class (ABC) for pipeline executors. It accepts an optional configuration dictionary and an ignore_head_node flag (which can also be set via the CURATOR_IGNORE_RAY_HEAD_NODE environment variable). Subclasses must implement the abstract execute() method to run a list of processing stages.
  • BaseStageAdapter -- A concrete base class that wraps a ProcessingStage and adds automatic performance tracking. Its process_batch() method uses a StageTimer to measure processing time, log statistics, consume custom metrics recorded by the stage, and attach performance stats to each output task. It also delegates setup_on_node(), setup(), and teardown() lifecycle calls to the underlying stage.

All backend implementations (Xenna, Ray ActorPool, Ray Data) inherit from these base classes to share a common interface for stage execution and performance monitoring.

Usage

These classes are not used directly by end users. They are extended by concrete backend implementations such as XennaExecutor, RayDataExecutor, and RayActorPoolStageAdapter. When building a custom backend, subclass BaseExecutor and implement execute(). When adapting a stage to a new backend, subclass BaseStageAdapter and override methods as needed.

Code Reference

Source Location

  • Repository: NeMo-Curator
  • File: nemo_curator/backends/base.py
  • Lines: 1-122

Signature

@dataclass
class NodeInfo:
    node_id: str = ""

@dataclass
class WorkerMetadata:
    worker_id: str = ""
    allocation: Any = None

class BaseExecutor(ABC):
    def __init__(self, config: dict[str, Any] | None = None, ignore_head_node: bool = False): ...
    @abstractmethod
    def execute(self, stages: list["ProcessingStage"], initial_tasks: list[Task] | None = None) -> None: ...

class BaseStageAdapter:
    def __init__(self, stage: "ProcessingStage"): ...
    def process_batch(self, tasks: list[Task]) -> list[Task]: ...
    def setup_on_node(self, node_info: NodeInfo | None = None, worker_metadata: WorkerMetadata | None = None) -> None: ...
    def setup(self, worker_metadata: WorkerMetadata | None = None) -> None: ...
    def teardown(self) -> None: ...

Import

from nemo_curator.backends.base import BaseExecutor, BaseStageAdapter, NodeInfo, WorkerMetadata

I/O Contract

BaseExecutor

Inputs

Name Type Required Description
config dict[str, Any] or None No Optional configuration dictionary for the executor
ignore_head_node bool No Whether to ignore the Ray head node for scheduling (default False, also settable via CURATOR_IGNORE_RAY_HEAD_NODE env var)

execute() Inputs

Name Type Required Description
stages list[ProcessingStage] Yes List of processing stages to execute in order
initial_tasks list[Task] or None No Optional initial tasks to feed into the first stage

BaseStageAdapter

Inputs

Name Type Required Description
stage ProcessingStage Yes The processing stage to wrap and adapt

process_batch()

Name Type Required Description
tasks list[Task] Yes List of tasks to process through the wrapped stage

Outputs

Name Type Description
process_batch return list[Task] Processed tasks with attached stage performance stats and custom metrics

Usage Examples

Subclassing BaseExecutor

from nemo_curator.backends.base import BaseExecutor
from nemo_curator.stages.base import ProcessingStage
from nemo_curator.tasks import Task

class MyCustomExecutor(BaseExecutor):
    def execute(self, stages: list[ProcessingStage], initial_tasks: list[Task] | None = None) -> list[Task]:
        tasks = initial_tasks or []
        for stage in stages:
            adapter = BaseStageAdapter(stage)
            tasks = adapter.process_batch(tasks)
        return tasks

Using BaseStageAdapter for Performance Tracking

from nemo_curator.backends.base import BaseStageAdapter

# Wrap a processing stage to add performance tracking
adapter = BaseStageAdapter(my_stage)

# Setup lifecycle
adapter.setup_on_node(node_info, worker_metadata)
adapter.setup(worker_metadata)

# Process with automatic timing and metrics
results = adapter.process_batch(tasks)

# Teardown
adapter.teardown()

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment