Implementation:NVIDIA NeMo Curator RayClient
| Knowledge Sources | |
|---|---|
| Domains | Core Infrastructure, Ray Integration, Cluster Management, Monitoring |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
Provides the RayClient dataclass that manages the lifecycle of a local Ray cluster, including starting, stopping, port allocation, and optional Prometheus/Grafana metrics integration.
Description
RayClient is a Python dataclass that abstracts Ray cluster setup so users can start and stop a properly configured Ray cluster with a single high-level interface. It handles port conflicts, resource configuration, and cleanup automatically.
Initialization validates the stdout/stderr capture file (raising FileExistsError if it already exists) and accepts the following configurable parameters:
- ray_port, ray_dashboard_port, ray_client_server_port, ray_metrics_port -- Network ports for Ray services, with sensible defaults from nemo_curator.core.constants.
- ray_temp_dir -- Temporary directory for Ray runtime files.
- ray_dashboard_host -- Dashboard binding host.
- num_gpus, num_cpus -- Resource counts to advertise to Ray.
- object_store_memory -- Memory allocation for the Ray object store.
- enable_object_spilling -- Whether to enable spilling objects to disk when memory is full.
- include_dashboard -- Whether to set up Prometheus/Grafana metrics integration.
start() performs the following:
- If include_dashboard is True, attempts to add Ray metrics service discovery to an existing Prometheus configuration (requires Prometheus and Grafana to already be running).
- Checks RAY_ADDRESS environment variable -- if set, assumes a cluster is already running and skips startup.
- Otherwise, finds free ports (auto-incrementing from defaults if the default port is in use), resolves the host IP address, and calls init_cluster() to spawn a ray start --head subprocess.
- Sets RAY_ADDRESS and registers an atexit handler for cleanup.
stop() terminates the Ray cluster process group using SIGTERM (with SIGKILL fallback after a 5-second timeout), cleans up RAY_ADDRESS, and logs a warning about potential lingering Ray processes.
RayClient supports context manager usage via __enter__ and __exit__, making it convenient for use with Python's with statement.
Usage
Use RayClient to programmatically start a local Ray cluster for NeMo Curator pipeline execution. It is particularly useful in scripts, notebooks, and testing environments where you need a managed Ray cluster lifecycle.
Code Reference
Source Location
- Repository: NeMo-Curator
- File: nemo_curator/core/client.py
- Lines: 1-190
Signature
@dataclass
class RayClient:
ray_port: int = DEFAULT_RAY_PORT
ray_dashboard_port: int = DEFAULT_RAY_DASHBOARD_PORT
ray_client_server_port: int = DEFAULT_RAY_CLIENT_SERVER_PORT
ray_temp_dir: str = DEFAULT_RAY_TEMP_DIR
include_dashboard: bool = True
ray_metrics_port: int = DEFAULT_RAY_METRICS_PORT
ray_dashboard_host: str = DEFAULT_RAY_DASHBOARD_HOST
num_gpus: int | None = None
num_cpus: int | None = None
object_store_memory: int | None = None
enable_object_spilling: bool = False
ray_stdouterr_capture_file: str | None = None
def start(self) -> None: ...
def stop(self) -> None: ...
def __enter__(self): ...
def __exit__(self, *exc): ...
Import
from nemo_curator.core.client import RayClient
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| ray_port | int | No | Ray GCS port (default: DEFAULT_RAY_PORT) |
| ray_dashboard_port | int | No | Ray dashboard port (default: DEFAULT_RAY_DASHBOARD_PORT) |
| ray_client_server_port | int | No | Ray client server port (default: DEFAULT_RAY_CLIENT_SERVER_PORT) |
| ray_temp_dir | str | No | Ray temporary directory (default: DEFAULT_RAY_TEMP_DIR) |
| include_dashboard | bool | No | Whether to set up metrics integration (default: True) |
| ray_metrics_port | int | No | Ray metrics export port (default: DEFAULT_RAY_METRICS_PORT) |
| ray_dashboard_host | str | No | Dashboard binding host (default: DEFAULT_RAY_DASHBOARD_HOST) |
| num_gpus | int or None | No | Number of GPUs to advertise (default: None, auto-detect) |
| num_cpus | int or None | No | Number of CPUs to advertise (default: None, auto-detect) |
| object_store_memory | int or None | No | Object store memory in bytes (default: None, Ray default) |
| enable_object_spilling | bool | No | Enable spilling to disk (default: False) |
| ray_stdouterr_capture_file | str or None | No | File to capture stdout/stderr (default: None) |
Outputs
| Name | Type | Description |
|---|---|---|
| ray_process | subprocess.Popen or None | The Ray head node process (None if an existing cluster was used) |
Side Effects
- Sets RAY_ADDRESS environment variable on start
- Clears RAY_ADDRESS environment variable on stop
- Registers an atexit handler to stop the cluster on interpreter exit
- Spawns a ray start --head subprocess (if no existing cluster detected)
Usage Examples
Using as a Context Manager
from nemo_curator.core.client import RayClient
with RayClient(num_gpus=4, num_cpus=32) as client:
# Ray cluster is running; execute pipelines here
executor = XennaExecutor()
results = executor.execute(stages=my_stages, initial_tasks=my_tasks)
# Ray cluster is automatically stopped
Manual Start and Stop
from nemo_curator.core.client import RayClient
client = RayClient(
ray_port=6379,
ray_dashboard_port=8265,
include_dashboard=True,
enable_object_spilling=True,
)
client.start()
try:
# Use the cluster
pass
finally:
client.stop()
Capturing Ray Output
from nemo_curator.core.client import RayClient
client = RayClient(ray_stdouterr_capture_file="/tmp/ray_output.log")
client.start()
# Ray stdout/stderr is captured to /tmp/ray_output.log
Related Pages
- Environment:NVIDIA_NeMo_Curator_Python_Linux_Base
- NVIDIA_NeMo_Curator_Core_Utils -- Low-level cluster bootstrapping utilities used by RayClient
- NVIDIA_NeMo_Curator_XennaExecutor -- Production executor that runs on the Ray cluster
- NVIDIA_NeMo_Curator_RayDataExecutor -- Experimental executor that runs on the Ray cluster