Implementation:NVIDIA NeMo Curator RayActorPoolAdapter
| Knowledge Sources | |
|---|---|
| Domains | Backend Architecture, Ray Integration, Actor Pool |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
Adapts a ProcessingStage to run as a Ray actor within a Ray ActorPool, handling worker-level initialization and batch size configuration.
Description
RayActorPoolStageAdapter extends BaseStageAdapter to bridge the NeMo Curator processing stage interface with Ray's actor model. When an actor is created, the adapter:
- Retrieves the Ray runtime context (node ID, worker ID) via get_worker_metadata_and_node_id().
- Stores the resulting node_info and worker_metadata as instance attributes.
- Calls stage.setup() with the worker metadata, performing per-actor initialization.
- Reads the stage's batch_size, defaulting to 1 with a warning if it is not set.
The adapter exposes get_batch_size() for the executor to query the configured batch size, and delegates setup_on_node() to the base adapter with the stored node and worker context.
This adapter is the base class for more specialized actor pool adapters such as RayActorPoolRAFTAdapter and ShuffleStageAdapter.
Usage
This adapter is used internally by the Ray ActorPool executor. It is instantiated once per actor in the pool. Users do not typically create this adapter directly; instead, the executor wraps stages with this adapter automatically when running a pipeline using the ActorPool backend.
Code Reference
Source Location
- Repository: NeMo-Curator
- File: nemo_curator/backends/experimental/ray_actor_pool/adapter.py
- Lines: 1-57
Signature
class RayActorPoolStageAdapter(BaseStageAdapter):
def __init__(self, stage: ProcessingStage): ...
def get_batch_size(self) -> int: ...
def setup_on_node(self) -> None: ...
Import
from nemo_curator.backends.experimental.ray_actor_pool.adapter import RayActorPoolStageAdapter
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| stage | ProcessingStage | Yes | The processing stage to wrap and run within a Ray actor |
Outputs
| Name | Type | Description |
|---|---|---|
| get_batch_size() | int | The batch size for this stage (defaults to 1 if not set on the stage) |
| process_batch() | list[Task] | Processed tasks with performance stats (inherited from BaseStageAdapter) |
Usage Examples
Creating an Actor Pool Adapter
from nemo_curator.backends.experimental.ray_actor_pool.adapter import RayActorPoolStageAdapter
# Typically used inside a Ray actor; the constructor automatically
# retrieves the runtime context and initializes the stage
adapter = RayActorPoolStageAdapter(my_processing_stage)
# Query the batch size for scheduling
batch_size = adapter.get_batch_size()
# Process a batch of tasks (inherited from BaseStageAdapter)
results = adapter.process_batch(tasks)
Related Pages
- Environment:NVIDIA_NeMo_Curator_Python_Linux_Base
- NVIDIA_NeMo_Curator_Backend_Base_Classes -- Parent base classes
- NVIDIA_NeMo_Curator_RaftAdapter -- RAFT-enabled extension of this adapter
- NVIDIA_NeMo_Curator_ShuffleAdapter -- Shuffle-enabled extension for data redistribution