Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:NVIDIA NeMo Curator RayActorPoolExecutor

From Leeroopedia
Knowledge Sources
Domains Execution Backend, Ray, Pipeline Orchestration
Last Updated 2026-02-14 00:00 GMT

Overview

Experimental Ray-based pipeline executor that creates pools of Ray actors per processing stage, supporting standard, RAFT, and shuffle/LSH stage types with load-balanced task processing and progress tracking.

Description

The RayActorPoolExecutor class extends BaseExecutor and provides a Ray ActorPool-based execution backend for NeMo Curator processing pipelines. It is one of the experimental executor backends offering fine-grained resource control and GPU-accelerated stage support.

Core Execution Flow: The execute() method:

  1. Initializes Ray with a parsed runtime environment (overriding RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES)
  2. Runs setup_on_node for all stages before processing begins
  3. Iterates through stages sequentially, creating an appropriate actor pool for each
  4. Processes task batches through the actor pool using ActorPool.map_unordered()
  5. Cleans up actor pools after each stage and shuts down Ray upon completion

Actor Pool Types: Three types of actor pools are created depending on the stage specification:

  • Standard ActorPool: Uses RayActorPoolStageAdapter actors with configurable CPU/GPU resources per actor. Tasks are batched based on the stage's batch size setting.
  • RAFT ActorPool: Uses RayActorPoolRAFTAdapter actors for GPU-accelerated stages requiring NCCL communication. The root actor broadcasts its unique ID, and all actors perform a setup handshake. Tasks are distributed evenly across all actors.
  • Shuffle/RapidsMPF Actors: Uses ShuffleStageAdapter actors for distributed shuffle operations. Actors set up UCXX communication, then process data in three phases: insert tasks into the shuffler, signal insertion complete, and extract written results.

LSH Stage Handling: LSH (Locality-Sensitive Hashing) stages receive special treatment via _execute_lsh_stage(), which iterates over band ranges, creating fresh shuffle actor pools for each band iteration and accumulating outputs across all iterations.

Task Batching: The _generate_task_batches() method supports two modes: batching by a fixed batch_size (for standard stages) or splitting into a specified num_output_tasks number of batches (for RAFT stages that need even distribution across actors).

Resource Calculation: The optimal number of actors per stage is calculated by calculate_optimal_actors_for_stage() based on available cluster resources, per-stage CPU/GPU requirements, and reserved resources.

Usage

Use this executor when you need fine-grained control over resource allocation, support for GPU-accelerated RAFT stages, distributed shuffle operations via RapidsMPF, or LSH band iteration processing that is not available in the standard Ray Data backend.

Code Reference

Source Location

  • Repository: NeMo-Curator
  • File: nemo_curator/backends/experimental/ray_actor_pool/executor.py
  • Lines: 1-424

Signature

class RayActorPoolExecutor(BaseExecutor):
    def __init__(
        self,
        config: dict | None = None,
        ignore_head_node: bool = False,
        show_progress: bool = True,
        progress_interval: float = 10.0,
    ): ...

    def execute(
        self,
        stages: list["ProcessingStage"],
        initial_tasks: list[Task] | None = None,
    ) -> list[Task]: ...

    def _create_actor_pool(self, stage, num_actors) -> ActorPool: ...
    def _create_raft_actor_pool(self, stage, num_actors, session_id) -> ActorPool: ...
    def _create_rapidsmpf_actors(self, stage, num_actors, num_tasks) -> list: ...
    def _generate_task_batches(self, tasks, batch_size=None, num_output_tasks=None) -> list: ...
    def _process_stage_with_pool(self, actor_pool, _stage, tasks) -> list[Task]: ...
    def _process_shuffle_stage_with_rapidsmpf_actors(self, actors, tasks, band_range=None) -> list[Task]: ...
    def _execute_lsh_stage(self, stage, input_tasks) -> list[Task]: ...
    def _cleanup_actors(self, actors) -> None: ...
    def _cleanup_actor_pool(self, actor_pool) -> None: ...

Import

from nemo_curator.backends.experimental.ray_actor_pool.executor import RayActorPoolExecutor

I/O Contract

Inputs

Name Type Required Description
config dict or None No Configuration dictionary for the executor (e.g., runtime_env, reserved_cpus, reserved_gpus)
ignore_head_node bool No If True, do not schedule tasks on the Ray head node (default: False)
show_progress bool No If True, display tqdm progress bars during execution (default: True)
progress_interval float No Minimum interval in seconds between progress bar updates (default: 10.0)
stages list[ProcessingStage] Yes List of processing stages to execute in the pipeline
initial_tasks list[Task] or None No Initial tasks to process; defaults to [EmptyTask] if None

Outputs

Name Type Description
final_results list[Task] List of processed Task objects after all stages complete

Usage Examples

Basic Usage

from nemo_curator.backends.experimental.ray_actor_pool.executor import RayActorPoolExecutor

executor = RayActorPoolExecutor(
    config={
        "runtime_env": {},
        "reserved_cpus": 2.0,
        "reserved_gpus": 0.0,
    },
    ignore_head_node=False,
    show_progress=True,
    progress_interval=10.0,
)

# Execute a list of processing stages
results = executor.execute(stages=my_pipeline_stages, initial_tasks=my_tasks)

With RAFT Stage Support

# When a stage has IS_RAFT_ACTOR=True in its ray_stage_spec,
# the executor automatically:
# 1. Creates RAFT actors with NCCL communication setup
# 2. Distributes tasks evenly across all actors
# 3. Handles root unique ID broadcast and setup handshake

executor = RayActorPoolExecutor(
    config={"reserved_gpus": 1.0},
    show_progress=True,
)
results = executor.execute(stages=[raft_enabled_stage], initial_tasks=tasks)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment