Implementation:NVIDIA NeMo Curator Core Utils
| Knowledge Sources | |
|---|---|
| Domains | Core Infrastructure, Ray Integration, Cluster Management, Networking |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
Provides utility functions for finding free network ports and initializing a local Ray head node cluster via subprocess, serving as the low-level cluster bootstrapping logic used by RayClient.
Description
This module contains two primary functions and two internal serialization helpers:
get_free_port(start_port, get_next_free_port)
Finds an available network port starting from start_port. It iterates through ports from start_port up to 65535, skipping the reserved Ray worker port range (DEFAULT_RAY_MIN_WORKER_PORT to DEFAULT_RAY_MAX_WORKER_PORT). For each candidate port, it attempts to bind a socket with SO_REUSEADDR to test availability.
If get_next_free_port is True (the default when using default ports), it continues searching until a free port is found. If False and the specified port is already in use, it raises a RuntimeError.
init_cluster(...)
Initializes a new local Ray head node cluster by:
- Registering a custom loguru serializer/deserializer for Ray (since loguru is not natively serializable).
- Building a ray start --head command with all specified options: node IP address, GCS port, metrics export port, dashboard host/port, client server port, temp directory, GPU/CPU counts, object store memory, object spilling configuration, and block mode.
- Setting environment variables for metrics ports (DASHBOARD_METRIC_PORT, AUTOSCALER_METRIC_PORT) and Xenna integration (XENNA_RAY_METRICS_PORT, XENNA_RESPECT_CUDA_VISIBLE_DEVICES).
- Launching the Ray process via subprocess.Popen with start_new_session=True (creating a new process group for clean shutdown). Optionally captures stdout/stderr to a file.
Internal Helpers
- _logger_custom_serializer -- Returns None (loguru logger cannot be meaningfully serialized).
- _logger_custom_deserializer -- Returns the default loguru logger instance on deserialization.
Usage
These utilities are called internally by RayClient.start(). They can also be used directly for programmatic Ray cluster initialization in custom scripts that need fine-grained control over the startup process.
Code Reference
Source Location
- Repository: NeMo-Curator
- File: nemo_curator/core/utils.py
- Lines: 1-136
Signature
def get_free_port(start_port: int, get_next_free_port: bool = True) -> int: ...
def init_cluster(
ray_port: int,
ray_temp_dir: str,
ray_dashboard_port: int,
ray_metrics_port: int,
ray_client_server_port: int,
ray_dashboard_host: str,
num_gpus: int | None = None,
num_cpus: int | None = None,
object_store_memory: int | None = None,
enable_object_spilling: bool = False,
block: bool = True,
ip_address: str | None = None,
stdouterr_capture_file: str | None = None,
) -> subprocess.Popen: ...
Import
from nemo_curator.core.utils import get_free_port, init_cluster
I/O Contract
get_free_port Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| start_port | int | Yes | The port number to start searching from |
| get_next_free_port | bool | No | If True, search for the next free port; if False, raise error if start_port is in use (default: True) |
get_free_port Output
| Name | Type | Description |
|---|---|---|
| return | int | An available port number |
init_cluster Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| ray_port | int | Yes | Ray GCS port |
| ray_temp_dir | str | Yes | Temporary directory for Ray runtime files |
| ray_dashboard_port | int | Yes | Ray dashboard port |
| ray_metrics_port | int | Yes | Ray metrics export port |
| ray_client_server_port | int | Yes | Ray client server port |
| ray_dashboard_host | str | Yes | Dashboard binding host |
| num_gpus | int or None | No | Number of GPUs to advertise (default: None) |
| num_cpus | int or None | No | Number of CPUs to advertise (default: None) |
| object_store_memory | int or None | No | Object store memory in bytes (default: None) |
| enable_object_spilling | bool | No | Enable object spilling to disk (default: False) |
| block | bool | No | Whether to start Ray in blocking mode (default: True) |
| ip_address | str or None | No | Node IP address (default: auto-detected via hostname) |
| stdouterr_capture_file | str or None | No | File path to capture stdout/stderr (default: None) |
init_cluster Output
| Name | Type | Description |
|---|---|---|
| return | subprocess.Popen | The Ray head node process |
Side Effects
The init_cluster function sets the following environment variables:
| Variable | Description |
|---|---|
| DASHBOARD_METRIC_PORT | Port for Ray dashboard metrics (auto-assigned free port) |
| AUTOSCALER_METRIC_PORT | Port for Ray autoscaler metrics (auto-assigned free port) |
| XENNA_RAY_METRICS_PORT | Ray metrics port for Xenna integration |
| XENNA_RESPECT_CUDA_VISIBLE_DEVICES | Set to "1" for proper Xenna GPU management |
Usage Examples
Finding a Free Port
from nemo_curator.core.utils import get_free_port
# Find the next free port starting from 6379
port = get_free_port(6379, get_next_free_port=True)
print(f"Using port: {port}")
# Require a specific port (raises RuntimeError if in use)
port = get_free_port(8080, get_next_free_port=False)
Initializing a Ray Cluster Directly
from nemo_curator.core.utils import init_cluster
proc = init_cluster(
ray_port=6379,
ray_temp_dir="/tmp/ray",
ray_dashboard_port=8265,
ray_metrics_port=8080,
ray_client_server_port=10001,
ray_dashboard_host="0.0.0.0",
num_gpus=4,
num_cpus=32,
block=True,
)
# proc is a subprocess.Popen object for the Ray head node
Related Pages
- Environment:NVIDIA_NeMo_Curator_Python_Linux_Base
- NVIDIA_NeMo_Curator_RayClient -- High-level client that uses these utilities