Implementation:NVIDIA NeMo Curator RayDataStageAdapter
| Knowledge Sources | |
|---|---|
| Domains | Backend Architecture, Ray Integration, Ray Data, Streaming Execution |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
Adapts processing stages to work with Ray Data's map_batches API, supporting both stateless task-based and stateful actor-based execution modes with automatic concurrency calculation.
Description
RayDataStageAdapter extends BaseStageAdapter to serve as the bridge between NeMo Curator's stage abstraction and Ray Data's dataset transformation API. It provides two execution modes:
- Actor mode -- Used for GPU stages, stages with custom setup logic, or stages that explicitly declare IS_ACTOR_STAGE in their ray_stage_spec(). A dynamically-named actor class is created via create_actor_from_stage() that initializes node setup and stage setup in its constructor.
- Task mode -- Used for CPU-only stateless stages. A named function is created via create_task_from_stage() that wraps the adapter's processing logic.
The process_dataset() method is the primary entry point. It:
- Determines whether to use actor or task mode based on the stage's ray_stage_spec() and resource requirements.
- Sets concurrency based on available cluster resources (for actors) or leaves it unbounded (for tasks).
- Applies dataset.map_batches() with the appropriate function/class, batch size, and resource allocation (num_cpus, num_gpus).
- If the stage is a fanout stage (IS_FANOUT_STAGE), repartitions the output dataset to one row per block.
Both modes handle the Ray Data batch dictionary format, where tasks are stored under the "item" key.
During construction, the adapter validates that all keys in the stage's ray_stage_spec() are recognized RayStageSpecKeys values, and warns if a GPU stage has no batch size set (defaulting to 1).
Usage
This adapter is used internally by RayDataExecutor. For each stage in a pipeline, the executor creates a RayDataStageAdapter and calls process_dataset() to apply the stage as a Ray Data transformation. Users do not typically instantiate this adapter directly.
Code Reference
Source Location
- Repository: NeMo-Curator
- File: nemo_curator/backends/experimental/ray_data/adapter.py
- Lines: 1-166
Signature
class RayDataStageAdapter(BaseStageAdapter):
def __init__(self, stage: ProcessingStage): ...
@property
def batch_size(self) -> int | None: ...
def process_dataset(self, dataset: Dataset, ignore_head_node: bool = False) -> Dataset: ...
def create_actor_from_stage(stage: ProcessingStage) -> type[RayDataStageAdapter]: ...
def create_task_from_stage(stage: ProcessingStage) -> Callable[[dict[str, Any]], dict[str, Any]]: ...
Import
from nemo_curator.backends.experimental.ray_data.adapter import RayDataStageAdapter
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| stage | ProcessingStage | Yes | The processing stage to adapt for Ray Data |
process_dataset() Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| dataset | ray.data.Dataset | Yes | Ray Data dataset containing Task objects in the "item" column |
| ignore_head_node | bool | No | Whether to exclude the head node from concurrency calculations (default False) |
Outputs
| Name | Type | Description |
|---|---|---|
| process_dataset() | ray.data.Dataset | Transformed Ray Data dataset with processed Task objects |
Helper Functions
create_actor_from_stage
Creates a dynamically-named actor class (RayDataStageActorAdapter) for stateful processing. The class name is set to the original stage class name plus "Actor" (e.g., MyStageActor) for clear display in Ray dashboards. The actor's __init__ calls setup_on_node() and setup(), and its __call__ method processes batches.
create_task_from_stage
Creates a named standalone function for stateless processing. The function name is set to the original stage class name plus "Task" (e.g., MyStageTask). It wraps an adapter instance's _process_batch_internal() method.
Usage Examples
Processing a Dataset Through a Stage
import ray
from nemo_curator.backends.experimental.ray_data.adapter import RayDataStageAdapter
# Create adapter
adapter = RayDataStageAdapter(my_stage)
# Create a Ray Data dataset from tasks
dataset = ray.data.from_items(tasks, override_num_blocks=len(tasks))
# Process through the stage
processed_dataset = adapter.process_dataset(dataset, ignore_head_node=False)
# Materialize results
results = processed_dataset.take_all()
Related Pages
- Environment:NVIDIA_NeMo_Curator_Python_Linux_Base
- NVIDIA_NeMo_Curator_Backend_Base_Classes -- Parent base classes
- NVIDIA_NeMo_Curator_RayDataExecutor -- Executor that uses this adapter