Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Huggingface Datatrove RayInferenceCluster

From Leeroopedia
Knowledge Sources
Domains Distributed Computing, Machine Learning Inference
Last Updated 2026-02-14 17:00 GMT

Overview

The Ray inference cluster module provides async functions for initializing, monitoring, and cleaning up a Ray cluster specifically for multi-node distributed inference workloads.

Description

This module manages the full lifecycle of a Ray cluster used for distributed inference. It provides the following key functions:

calculate_object_store_memory computes the optimal object store memory for Ray as the minimum of three constraints: 30% of available system memory, the size of /dev/shm (shared memory), and a 200GB maximum cap.

init_ray_master starts a Ray head node via the ray start --head CLI command with calculated resource allocations (CPUs, GPUs, memory), connects to it via ray.init(address="auto"), and polls ray.nodes() until all expected worker nodes have joined the cluster.

init_ray_worker connects a worker node to the head node with retry logic (up to 5 attempts with 10-second delays between retries), handling timeouts and connection failures gracefully.

monitor_ray_cluster_health runs a periodic health check loop using the ray health-check --skip-version-check command, returning when the cluster becomes unhealthy. monitor_ray_workers monitors the alive node count and raises an exception if workers drop below the expected count.

cleanup_ray performs graceful teardown by calling ray.shutdown() followed by ray stop.

Usage

Use these functions when setting up multi-node inference clusters, particularly for distributed model serving with VLLM or other inference frameworks that require Ray for multi-GPU/multi-node coordination.

Code Reference

Source Location

Signature

def calculate_object_store_memory() -> int:

async def init_ray_master(master_port: int, expected_workers: int) -> None:

async def init_ray_worker(
    master_ip: str,
    master_port: int,
    max_retries: int = 5,
    retry_delay: float = 10.0,
) -> None:

async def monitor_ray_cluster_health(check_interval: float = 30.0) -> None:

async def monitor_ray_workers(expected_workers: int) -> None:

def cleanup_ray() -> None:

Import

from datatrove.pipeline.inference.distributed.ray import (
    init_ray_master,
    init_ray_worker,
    monitor_ray_cluster_health,
    monitor_ray_workers,
    cleanup_ray,
    calculate_object_store_memory,
)

I/O Contract

Inputs

Name Type Required Description
master_port int Yes (master/worker) Port for the Ray cluster head node
expected_workers int Yes (master) Number of worker nodes expected to join the cluster
master_ip str Yes (worker) IP address of the master node to connect to
max_retries int No Maximum connection attempts for workers (default: 5)
retry_delay float No Seconds between worker retry attempts (default: 10.0)
check_interval float No Seconds between health checks (default: 30.0)

Outputs

Name Type Description
None None Functions initialize/monitor Ray cluster state as side effects
object_store_memory int Calculated object store memory in bytes (from calculate_object_store_memory)

Usage Examples

Basic Usage

import asyncio
from datatrove.pipeline.inference.distributed.ray import (
    init_ray_master,
    init_ray_worker,
    cleanup_ray,
)

# On master node
async def start_master():
    await init_ray_master(master_port=6379, expected_workers=3)

# On worker nodes
async def start_worker(master_ip):
    await init_ray_worker(master_ip=master_ip, master_port=6379)

# Cleanup
cleanup_ray()

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment