Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:NVIDIA NeMo Curator RayDataStageAdapter

From Leeroopedia
Knowledge Sources
Domains Backend Architecture, Ray Integration, Ray Data, Streaming Execution
Last Updated 2026-02-14 00:00 GMT

Overview

Adapts processing stages to work with Ray Data's map_batches API, supporting both stateless task-based and stateful actor-based execution modes with automatic concurrency calculation.

Description

RayDataStageAdapter extends BaseStageAdapter to serve as the bridge between NeMo Curator's stage abstraction and Ray Data's dataset transformation API. It provides two execution modes:

  • Actor mode -- Used for GPU stages, stages with custom setup logic, or stages that explicitly declare IS_ACTOR_STAGE in their ray_stage_spec(). A dynamically-named actor class is created via create_actor_from_stage() that initializes node setup and stage setup in its constructor.
  • Task mode -- Used for CPU-only stateless stages. A named function is created via create_task_from_stage() that wraps the adapter's processing logic.

The process_dataset() method is the primary entry point. It:

  1. Determines whether to use actor or task mode based on the stage's ray_stage_spec() and resource requirements.
  2. Sets concurrency based on available cluster resources (for actors) or leaves it unbounded (for tasks).
  3. Applies dataset.map_batches() with the appropriate function/class, batch size, and resource allocation (num_cpus, num_gpus).
  4. If the stage is a fanout stage (IS_FANOUT_STAGE), repartitions the output dataset to one row per block.

Both modes handle the Ray Data batch dictionary format, where tasks are stored under the "item" key.

During construction, the adapter validates that all keys in the stage's ray_stage_spec() are recognized RayStageSpecKeys values, and warns if a GPU stage has no batch size set (defaulting to 1).

Usage

This adapter is used internally by RayDataExecutor. For each stage in a pipeline, the executor creates a RayDataStageAdapter and calls process_dataset() to apply the stage as a Ray Data transformation. Users do not typically instantiate this adapter directly.

Code Reference

Source Location

  • Repository: NeMo-Curator
  • File: nemo_curator/backends/experimental/ray_data/adapter.py
  • Lines: 1-166

Signature

class RayDataStageAdapter(BaseStageAdapter):
    def __init__(self, stage: ProcessingStage): ...
    @property
    def batch_size(self) -> int | None: ...
    def process_dataset(self, dataset: Dataset, ignore_head_node: bool = False) -> Dataset: ...

def create_actor_from_stage(stage: ProcessingStage) -> type[RayDataStageAdapter]: ...
def create_task_from_stage(stage: ProcessingStage) -> Callable[[dict[str, Any]], dict[str, Any]]: ...

Import

from nemo_curator.backends.experimental.ray_data.adapter import RayDataStageAdapter

I/O Contract

Inputs

Name Type Required Description
stage ProcessingStage Yes The processing stage to adapt for Ray Data

process_dataset() Inputs

Name Type Required Description
dataset ray.data.Dataset Yes Ray Data dataset containing Task objects in the "item" column
ignore_head_node bool No Whether to exclude the head node from concurrency calculations (default False)

Outputs

Name Type Description
process_dataset() ray.data.Dataset Transformed Ray Data dataset with processed Task objects

Helper Functions

create_actor_from_stage

Creates a dynamically-named actor class (RayDataStageActorAdapter) for stateful processing. The class name is set to the original stage class name plus "Actor" (e.g., MyStageActor) for clear display in Ray dashboards. The actor's __init__ calls setup_on_node() and setup(), and its __call__ method processes batches.

create_task_from_stage

Creates a named standalone function for stateless processing. The function name is set to the original stage class name plus "Task" (e.g., MyStageTask). It wraps an adapter instance's _process_batch_internal() method.

Usage Examples

Processing a Dataset Through a Stage

import ray
from nemo_curator.backends.experimental.ray_data.adapter import RayDataStageAdapter

# Create adapter
adapter = RayDataStageAdapter(my_stage)

# Create a Ray Data dataset from tasks
dataset = ray.data.from_items(tasks, override_num_blocks=len(tasks))

# Process through the stage
processed_dataset = adapter.process_dataset(dataset, ignore_head_node=False)

# Materialize results
results = processed_dataset.take_all()

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment