Implementation:NVIDIA NeMo Curator RayActorPoolExecutor
| Knowledge Sources | |
|---|---|
| Domains | Execution Backend, Ray, Pipeline Orchestration |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
Experimental Ray-based pipeline executor that creates pools of Ray actors per processing stage, supporting standard, RAFT, and shuffle/LSH stage types with load-balanced task processing and progress tracking.
Description
The RayActorPoolExecutor class extends BaseExecutor and provides a Ray ActorPool-based execution backend for NeMo Curator processing pipelines. It is one of the experimental executor backends offering fine-grained resource control and GPU-accelerated stage support.
Core Execution Flow: The execute() method:
- Initializes Ray with a parsed runtime environment (overriding
RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES) - Runs
setup_on_nodefor all stages before processing begins - Iterates through stages sequentially, creating an appropriate actor pool for each
- Processes task batches through the actor pool using
ActorPool.map_unordered() - Cleans up actor pools after each stage and shuts down Ray upon completion
Actor Pool Types: Three types of actor pools are created depending on the stage specification:
- Standard ActorPool: Uses
RayActorPoolStageAdapteractors with configurable CPU/GPU resources per actor. Tasks are batched based on the stage's batch size setting. - RAFT ActorPool: Uses
RayActorPoolRAFTAdapteractors for GPU-accelerated stages requiring NCCL communication. The root actor broadcasts its unique ID, and all actors perform a setup handshake. Tasks are distributed evenly across all actors. - Shuffle/RapidsMPF Actors: Uses
ShuffleStageAdapteractors for distributed shuffle operations. Actors set up UCXX communication, then process data in three phases: insert tasks into the shuffler, signal insertion complete, and extract written results.
LSH Stage Handling: LSH (Locality-Sensitive Hashing) stages receive special treatment via _execute_lsh_stage(), which iterates over band ranges, creating fresh shuffle actor pools for each band iteration and accumulating outputs across all iterations.
Task Batching: The _generate_task_batches() method supports two modes: batching by a fixed batch_size (for standard stages) or splitting into a specified num_output_tasks number of batches (for RAFT stages that need even distribution across actors).
Resource Calculation: The optimal number of actors per stage is calculated by calculate_optimal_actors_for_stage() based on available cluster resources, per-stage CPU/GPU requirements, and reserved resources.
Usage
Use this executor when you need fine-grained control over resource allocation, support for GPU-accelerated RAFT stages, distributed shuffle operations via RapidsMPF, or LSH band iteration processing that is not available in the standard Ray Data backend.
Code Reference
Source Location
- Repository: NeMo-Curator
- File: nemo_curator/backends/experimental/ray_actor_pool/executor.py
- Lines: 1-424
Signature
class RayActorPoolExecutor(BaseExecutor):
def __init__(
self,
config: dict | None = None,
ignore_head_node: bool = False,
show_progress: bool = True,
progress_interval: float = 10.0,
): ...
def execute(
self,
stages: list["ProcessingStage"],
initial_tasks: list[Task] | None = None,
) -> list[Task]: ...
def _create_actor_pool(self, stage, num_actors) -> ActorPool: ...
def _create_raft_actor_pool(self, stage, num_actors, session_id) -> ActorPool: ...
def _create_rapidsmpf_actors(self, stage, num_actors, num_tasks) -> list: ...
def _generate_task_batches(self, tasks, batch_size=None, num_output_tasks=None) -> list: ...
def _process_stage_with_pool(self, actor_pool, _stage, tasks) -> list[Task]: ...
def _process_shuffle_stage_with_rapidsmpf_actors(self, actors, tasks, band_range=None) -> list[Task]: ...
def _execute_lsh_stage(self, stage, input_tasks) -> list[Task]: ...
def _cleanup_actors(self, actors) -> None: ...
def _cleanup_actor_pool(self, actor_pool) -> None: ...
Import
from nemo_curator.backends.experimental.ray_actor_pool.executor import RayActorPoolExecutor
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| config | dict or None | No | Configuration dictionary for the executor (e.g., runtime_env, reserved_cpus, reserved_gpus) |
| ignore_head_node | bool | No | If True, do not schedule tasks on the Ray head node (default: False) |
| show_progress | bool | No | If True, display tqdm progress bars during execution (default: True) |
| progress_interval | float | No | Minimum interval in seconds between progress bar updates (default: 10.0) |
| stages | list[ProcessingStage] | Yes | List of processing stages to execute in the pipeline |
| initial_tasks | list[Task] or None | No | Initial tasks to process; defaults to [EmptyTask] if None |
Outputs
| Name | Type | Description |
|---|---|---|
| final_results | list[Task] | List of processed Task objects after all stages complete |
Usage Examples
Basic Usage
from nemo_curator.backends.experimental.ray_actor_pool.executor import RayActorPoolExecutor
executor = RayActorPoolExecutor(
config={
"runtime_env": {},
"reserved_cpus": 2.0,
"reserved_gpus": 0.0,
},
ignore_head_node=False,
show_progress=True,
progress_interval=10.0,
)
# Execute a list of processing stages
results = executor.execute(stages=my_pipeline_stages, initial_tasks=my_tasks)
With RAFT Stage Support
# When a stage has IS_RAFT_ACTOR=True in its ray_stage_spec,
# the executor automatically:
# 1. Creates RAFT actors with NCCL communication setup
# 2. Distributes tasks evenly across all actors
# 3. Handles root unique ID broadcast and setup handshake
executor = RayActorPoolExecutor(
config={"reserved_gpus": 1.0},
show_progress=True,
)
results = executor.execute(stages=[raft_enabled_stage], initial_tasks=tasks)