Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:NVIDIA NeMo Curator RayClient

From Leeroopedia
Revision as of 13:21, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/NVIDIA_NeMo_Curator_RayClient.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)
Knowledge Sources
Domains Core Infrastructure, Ray Integration, Cluster Management, Monitoring
Last Updated 2026-02-14 00:00 GMT

Overview

Provides the RayClient dataclass that manages the lifecycle of a local Ray cluster, including starting, stopping, port allocation, and optional Prometheus/Grafana metrics integration.

Description

RayClient is a Python dataclass that abstracts Ray cluster setup so users can start and stop a properly configured Ray cluster with a single high-level interface. It handles port conflicts, resource configuration, and cleanup automatically.

Initialization validates the stdout/stderr capture file (raising FileExistsError if it already exists) and accepts the following configurable parameters:

  • ray_port, ray_dashboard_port, ray_client_server_port, ray_metrics_port -- Network ports for Ray services, with sensible defaults from nemo_curator.core.constants.
  • ray_temp_dir -- Temporary directory for Ray runtime files.
  • ray_dashboard_host -- Dashboard binding host.
  • num_gpus, num_cpus -- Resource counts to advertise to Ray.
  • object_store_memory -- Memory allocation for the Ray object store.
  • enable_object_spilling -- Whether to enable spilling objects to disk when memory is full.
  • include_dashboard -- Whether to set up Prometheus/Grafana metrics integration.

start() performs the following:

  1. If include_dashboard is True, attempts to add Ray metrics service discovery to an existing Prometheus configuration (requires Prometheus and Grafana to already be running).
  2. Checks RAY_ADDRESS environment variable -- if set, assumes a cluster is already running and skips startup.
  3. Otherwise, finds free ports (auto-incrementing from defaults if the default port is in use), resolves the host IP address, and calls init_cluster() to spawn a ray start --head subprocess.
  4. Sets RAY_ADDRESS and registers an atexit handler for cleanup.

stop() terminates the Ray cluster process group using SIGTERM (with SIGKILL fallback after a 5-second timeout), cleans up RAY_ADDRESS, and logs a warning about potential lingering Ray processes.

RayClient supports context manager usage via __enter__ and __exit__, making it convenient for use with Python's with statement.

Usage

Use RayClient to programmatically start a local Ray cluster for NeMo Curator pipeline execution. It is particularly useful in scripts, notebooks, and testing environments where you need a managed Ray cluster lifecycle.

Code Reference

Source Location

  • Repository: NeMo-Curator
  • File: nemo_curator/core/client.py
  • Lines: 1-190

Signature

@dataclass
class RayClient:
    ray_port: int = DEFAULT_RAY_PORT
    ray_dashboard_port: int = DEFAULT_RAY_DASHBOARD_PORT
    ray_client_server_port: int = DEFAULT_RAY_CLIENT_SERVER_PORT
    ray_temp_dir: str = DEFAULT_RAY_TEMP_DIR
    include_dashboard: bool = True
    ray_metrics_port: int = DEFAULT_RAY_METRICS_PORT
    ray_dashboard_host: str = DEFAULT_RAY_DASHBOARD_HOST
    num_gpus: int | None = None
    num_cpus: int | None = None
    object_store_memory: int | None = None
    enable_object_spilling: bool = False
    ray_stdouterr_capture_file: str | None = None

    def start(self) -> None: ...
    def stop(self) -> None: ...
    def __enter__(self): ...
    def __exit__(self, *exc): ...

Import

from nemo_curator.core.client import RayClient

I/O Contract

Inputs

Name Type Required Description
ray_port int No Ray GCS port (default: DEFAULT_RAY_PORT)
ray_dashboard_port int No Ray dashboard port (default: DEFAULT_RAY_DASHBOARD_PORT)
ray_client_server_port int No Ray client server port (default: DEFAULT_RAY_CLIENT_SERVER_PORT)
ray_temp_dir str No Ray temporary directory (default: DEFAULT_RAY_TEMP_DIR)
include_dashboard bool No Whether to set up metrics integration (default: True)
ray_metrics_port int No Ray metrics export port (default: DEFAULT_RAY_METRICS_PORT)
ray_dashboard_host str No Dashboard binding host (default: DEFAULT_RAY_DASHBOARD_HOST)
num_gpus int or None No Number of GPUs to advertise (default: None, auto-detect)
num_cpus int or None No Number of CPUs to advertise (default: None, auto-detect)
object_store_memory int or None No Object store memory in bytes (default: None, Ray default)
enable_object_spilling bool No Enable spilling to disk (default: False)
ray_stdouterr_capture_file str or None No File to capture stdout/stderr (default: None)

Outputs

Name Type Description
ray_process subprocess.Popen or None The Ray head node process (None if an existing cluster was used)

Side Effects

  • Sets RAY_ADDRESS environment variable on start
  • Clears RAY_ADDRESS environment variable on stop
  • Registers an atexit handler to stop the cluster on interpreter exit
  • Spawns a ray start --head subprocess (if no existing cluster detected)

Usage Examples

Using as a Context Manager

from nemo_curator.core.client import RayClient

with RayClient(num_gpus=4, num_cpus=32) as client:
    # Ray cluster is running; execute pipelines here
    executor = XennaExecutor()
    results = executor.execute(stages=my_stages, initial_tasks=my_tasks)
# Ray cluster is automatically stopped

Manual Start and Stop

from nemo_curator.core.client import RayClient

client = RayClient(
    ray_port=6379,
    ray_dashboard_port=8265,
    include_dashboard=True,
    enable_object_spilling=True,
)
client.start()

try:
    # Use the cluster
    pass
finally:
    client.stop()

Capturing Ray Output

from nemo_curator.core.client import RayClient

client = RayClient(ray_stdouterr_capture_file="/tmp/ray_output.log")
client.start()
# Ray stdout/stderr is captured to /tmp/ray_output.log

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment