Implementation:Huggingface Datatrove RayInferenceCluster
| Knowledge Sources | |
|---|---|
| Domains | Distributed Computing, Machine Learning Inference |
| Last Updated | 2026-02-14 17:00 GMT |
Overview
The Ray inference cluster module provides async functions for initializing, monitoring, and cleaning up a Ray cluster specifically for multi-node distributed inference workloads.
Description
This module manages the full lifecycle of a Ray cluster used for distributed inference. It provides the following key functions:
calculate_object_store_memory computes the optimal object store memory for Ray as the minimum of three constraints: 30% of available system memory, the size of /dev/shm (shared memory), and a 200GB maximum cap.
init_ray_master starts a Ray head node via the ray start --head CLI command with calculated resource allocations (CPUs, GPUs, memory), connects to it via ray.init(address="auto"), and polls ray.nodes() until all expected worker nodes have joined the cluster.
init_ray_worker connects a worker node to the head node with retry logic (up to 5 attempts with 10-second delays between retries), handling timeouts and connection failures gracefully.
monitor_ray_cluster_health runs a periodic health check loop using the ray health-check --skip-version-check command, returning when the cluster becomes unhealthy. monitor_ray_workers monitors the alive node count and raises an exception if workers drop below the expected count.
cleanup_ray performs graceful teardown by calling ray.shutdown() followed by ray stop.
Usage
Use these functions when setting up multi-node inference clusters, particularly for distributed model serving with VLLM or other inference frameworks that require Ray for multi-GPU/multi-node coordination.
Code Reference
Source Location
- Repository: Huggingface_Datatrove
- File: src/datatrove/pipeline/inference/distributed/ray.py
- Lines: 1-376
Signature
def calculate_object_store_memory() -> int:
async def init_ray_master(master_port: int, expected_workers: int) -> None:
async def init_ray_worker(
master_ip: str,
master_port: int,
max_retries: int = 5,
retry_delay: float = 10.0,
) -> None:
async def monitor_ray_cluster_health(check_interval: float = 30.0) -> None:
async def monitor_ray_workers(expected_workers: int) -> None:
def cleanup_ray() -> None:
Import
from datatrove.pipeline.inference.distributed.ray import (
init_ray_master,
init_ray_worker,
monitor_ray_cluster_health,
monitor_ray_workers,
cleanup_ray,
calculate_object_store_memory,
)
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| master_port | int | Yes (master/worker) | Port for the Ray cluster head node |
| expected_workers | int | Yes (master) | Number of worker nodes expected to join the cluster |
| master_ip | str | Yes (worker) | IP address of the master node to connect to |
| max_retries | int | No | Maximum connection attempts for workers (default: 5) |
| retry_delay | float | No | Seconds between worker retry attempts (default: 10.0) |
| check_interval | float | No | Seconds between health checks (default: 30.0) |
Outputs
| Name | Type | Description |
|---|---|---|
| None | None | Functions initialize/monitor Ray cluster state as side effects |
| object_store_memory | int | Calculated object store memory in bytes (from calculate_object_store_memory) |
Usage Examples
Basic Usage
import asyncio
from datatrove.pipeline.inference.distributed.ray import (
init_ray_master,
init_ray_worker,
cleanup_ray,
)
# On master node
async def start_master():
await init_ray_master(master_port=6379, expected_workers=3)
# On worker nodes
async def start_worker(master_ip):
await init_ray_worker(master_ip=master_ip, master_port=6379)
# Cleanup
cleanup_ray()