Implementation:Alibaba ROLL Cluster
| Knowledge Sources | |
|---|---|
| Domains | Distributed_Systems, GPU_Computing |
| Last Updated | 2026-02-07 20:00 GMT |
Overview
Concrete distributed cluster manager for Ray-based GPU worker groups provided by the Alibaba ROLL library.
Description
The Cluster class manages a group of Ray actors representing workers of the same role. It handles worker creation with placement group scheduling, method binding for distributed dispatch, and provides synchronous/asynchronous execution interfaces. Combined with ResourceManager for GPU allocation, it forms the distributed execution backbone of all ROLL pipelines.
Usage
Import this class when initializing distributed workers for any ROLL pipeline. Each pipeline creates multiple Cluster instances for different roles (actor_train, actor_infer, reference, critic, reward).
Code Reference
Source Location
- Repository: Alibaba ROLL
- File: roll/distributed/executor/cluster.py
- Lines: L32-265
Signature
class Cluster:
def __init__(
self,
name: str,
worker_cls: Union[Type[Worker], str],
resource_manager: ResourceManager,
worker_config: WorkerConfig,
) -> None:
"""
Initialize Cluster with Ray actor workers.
Args:
name: Cluster name (e.g., 'actor_train', 'reward-math')
worker_cls: Worker class or string path for Ray remote actors
resource_manager: GPU/CPU resource allocation manager
worker_config: Configuration for all workers in this cluster
"""
def execute_rank_zero_sync(self, method_name: str, *args, **kwargs) -> Any:
"""Execute method on rank 0 worker synchronously."""
def execute_rank_zero_async(self, method_name: str, *args, **kwargs) -> Any:
"""Execute method on rank 0 worker asynchronously."""
def execute_all_sync(self, method_name: str, *args, **kwargs) -> List[Any]:
"""Execute method on all workers synchronously."""
def execute_all_async(self, method_name: str, *args, **kwargs) -> List[Any]:
"""Execute method on all workers asynchronously."""
Import
from roll.distributed.executor.cluster import Cluster
from roll.distributed.scheduler.resource_manager import ResourceManager
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| name | str | Yes | Cluster name identifier |
| worker_cls | Union[Type[Worker], str] | Yes | Worker class or fully qualified string path |
| resource_manager | ResourceManager | Yes | GPU resource allocation manager |
| worker_config | WorkerConfig | Yes | Worker configuration with device_mapping, strategy_args |
Outputs
| Name | Type | Description |
|---|---|---|
| Cluster instance | Cluster | Initialized cluster with Ray actor workers deployed on assigned GPUs |
| dp_size | int | Data parallel size of the cluster |
| tp_size | int | Tensor parallel size of the cluster |
Usage Examples
Creating Actor Training Cluster
from roll.distributed.executor.cluster import Cluster
from roll.distributed.scheduler.resource_manager import ResourceManager
# 1. Create resource manager for the Ray cluster
resource_manager = ResourceManager(num_gpus_per_node=8, num_nodes=4)
# 2. Create actor training cluster
actor_train = Cluster(
name="actor_train",
worker_cls="roll.pipeline.rlvr.actor_worker.ActorWorker",
resource_manager=resource_manager,
worker_config=rlvr_config.actor_train,
)
# 3. Initialize all workers
actor_train.execute_all_sync("initialize", rlvr_config)
# 4. Execute training step on all workers
results = actor_train.execute_all_sync("train_step", batch)
Related Pages
Implements Principle
Requires Environment
Environment Dependencies
This implementation requires the following environment constraints:
- Environment:Alibaba_ROLL_CUDA_GPU_Environment
- Environment:Alibaba_ROLL_ROCm_GPU_Environment
- Environment:Alibaba_ROLL_Ascend_NPU_Environment
- Environment:Alibaba_ROLL_Python_Runtime_Environment
Heuristics Applied
This implementation uses the following heuristics: