Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:NVIDIA NeMo Curator Core Utils

From Leeroopedia
Revision as of 13:20, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/NVIDIA_NeMo_Curator_Core_Utils.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)
Knowledge Sources
Domains Core Infrastructure, Ray Integration, Cluster Management, Networking
Last Updated 2026-02-14 00:00 GMT

Overview

Provides utility functions for finding free network ports and initializing a local Ray head node cluster via subprocess, serving as the low-level cluster bootstrapping logic used by RayClient.

Description

This module contains two primary functions and two internal serialization helpers:

get_free_port(start_port, get_next_free_port)

Finds an available network port starting from start_port. It iterates through ports from start_port up to 65535, skipping the reserved Ray worker port range (DEFAULT_RAY_MIN_WORKER_PORT to DEFAULT_RAY_MAX_WORKER_PORT). For each candidate port, it attempts to bind a socket with SO_REUSEADDR to test availability.

If get_next_free_port is True (the default when using default ports), it continues searching until a free port is found. If False and the specified port is already in use, it raises a RuntimeError.

init_cluster(...)

Initializes a new local Ray head node cluster by:

  1. Registering a custom loguru serializer/deserializer for Ray (since loguru is not natively serializable).
  2. Building a ray start --head command with all specified options: node IP address, GCS port, metrics export port, dashboard host/port, client server port, temp directory, GPU/CPU counts, object store memory, object spilling configuration, and block mode.
  3. Setting environment variables for metrics ports (DASHBOARD_METRIC_PORT, AUTOSCALER_METRIC_PORT) and Xenna integration (XENNA_RAY_METRICS_PORT, XENNA_RESPECT_CUDA_VISIBLE_DEVICES).
  4. Launching the Ray process via subprocess.Popen with start_new_session=True (creating a new process group for clean shutdown). Optionally captures stdout/stderr to a file.

Internal Helpers

  • _logger_custom_serializer -- Returns None (loguru logger cannot be meaningfully serialized).
  • _logger_custom_deserializer -- Returns the default loguru logger instance on deserialization.

Usage

These utilities are called internally by RayClient.start(). They can also be used directly for programmatic Ray cluster initialization in custom scripts that need fine-grained control over the startup process.

Code Reference

Source Location

  • Repository: NeMo-Curator
  • File: nemo_curator/core/utils.py
  • Lines: 1-136

Signature

def get_free_port(start_port: int, get_next_free_port: bool = True) -> int: ...

def init_cluster(
    ray_port: int,
    ray_temp_dir: str,
    ray_dashboard_port: int,
    ray_metrics_port: int,
    ray_client_server_port: int,
    ray_dashboard_host: str,
    num_gpus: int | None = None,
    num_cpus: int | None = None,
    object_store_memory: int | None = None,
    enable_object_spilling: bool = False,
    block: bool = True,
    ip_address: str | None = None,
    stdouterr_capture_file: str | None = None,
) -> subprocess.Popen: ...

Import

from nemo_curator.core.utils import get_free_port, init_cluster

I/O Contract

get_free_port Inputs

Name Type Required Description
start_port int Yes The port number to start searching from
get_next_free_port bool No If True, search for the next free port; if False, raise error if start_port is in use (default: True)

get_free_port Output

Name Type Description
return int An available port number

init_cluster Inputs

Name Type Required Description
ray_port int Yes Ray GCS port
ray_temp_dir str Yes Temporary directory for Ray runtime files
ray_dashboard_port int Yes Ray dashboard port
ray_metrics_port int Yes Ray metrics export port
ray_client_server_port int Yes Ray client server port
ray_dashboard_host str Yes Dashboard binding host
num_gpus int or None No Number of GPUs to advertise (default: None)
num_cpus int or None No Number of CPUs to advertise (default: None)
object_store_memory int or None No Object store memory in bytes (default: None)
enable_object_spilling bool No Enable object spilling to disk (default: False)
block bool No Whether to start Ray in blocking mode (default: True)
ip_address str or None No Node IP address (default: auto-detected via hostname)
stdouterr_capture_file str or None No File path to capture stdout/stderr (default: None)

init_cluster Output

Name Type Description
return subprocess.Popen The Ray head node process

Side Effects

The init_cluster function sets the following environment variables:

Variable Description
DASHBOARD_METRIC_PORT Port for Ray dashboard metrics (auto-assigned free port)
AUTOSCALER_METRIC_PORT Port for Ray autoscaler metrics (auto-assigned free port)
XENNA_RAY_METRICS_PORT Ray metrics port for Xenna integration
XENNA_RESPECT_CUDA_VISIBLE_DEVICES Set to "1" for proper Xenna GPU management

Usage Examples

Finding a Free Port

from nemo_curator.core.utils import get_free_port

# Find the next free port starting from 6379
port = get_free_port(6379, get_next_free_port=True)
print(f"Using port: {port}")

# Require a specific port (raises RuntimeError if in use)
port = get_free_port(8080, get_next_free_port=False)

Initializing a Ray Cluster Directly

from nemo_curator.core.utils import init_cluster

proc = init_cluster(
    ray_port=6379,
    ray_temp_dir="/tmp/ray",
    ray_dashboard_port=8265,
    ray_metrics_port=8080,
    ray_client_server_port=10001,
    ray_dashboard_host="0.0.0.0",
    num_gpus=4,
    num_cpus=32,
    block=True,
)

# proc is a subprocess.Popen object for the Ray head node

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment