Implementation:Alibaba ROLL RolloutScheduler
| Knowledge Sources | |
|---|---|
| Domains | Distributed_Systems, Reinforcement_Learning, Agentic_AI |
| Last Updated | 2026-02-07 20:00 GMT |
Overview
Concrete distributed rollout scheduler with group-based trajectory batching provided by the Alibaba ROLL library.
Description
The RolloutScheduler class coordinates asynchronous trajectory collection. It manages GroupQueueManager (a Ray actor that buffers completed episodes and assembles groups), LLM inference request routing via the generation scheduler, and dynamic GPU allocation for overlapped generation and training. The scheduler supports suspension during model updates and batch assembly with configurable sizes.
Usage
Instantiated by the agentic pipeline for both training and validation rollout collection. Created as a Ray actor for distributed coordination.
Code Reference
Source Location
- Repository: Alibaba ROLL
- File: roll/distributed/scheduler/rollout_scheduler.py
- Lines: L542-760
Signature
class RolloutScheduler:
def __init__(
self,
config,
env_manager_config: EnvManagerConfig,
resource_manager,
infer_cluster,
mode: str,
collator=None,
) -> None:
"""
Args:
config: Pipeline configuration
env_manager_config: Environment manager configuration
resource_manager: Resource management actor
infer_cluster: LLM inference cluster
mode: "train" or "val"
collator: Optional data collator
"""
async def get_batch(self, data: DataProto, batch_size: int) -> Optional[DataProto]:
"""Get batch of rollout trajectories."""
async def suspend(self) -> None:
"""Suspend generation scheduler during model update."""
async def shutdown(self) -> None:
"""Gracefully shutdown all components."""
async def shrink_sampler(self, target_gpus: List[int]) -> Dict:
"""Shrink generation to free GPUs for training."""
async def expand_sampler(self, target_gpus: List[int], skip_load: bool = False) -> Dict:
"""Expand generation to use more GPUs."""
Import
from roll.distributed.scheduler.rollout_scheduler import RolloutScheduler, GroupQueueManager
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| data | DataProto | Yes | Contains global_step in meta_info |
| batch_size | int | Yes | Number of trajectories to collect (-1 for unlimited) |
Outputs
| Name | Type | Description |
|---|---|---|
| batch | Optional[DataProto] | Concatenated trajectory batch or None if empty |
Usage Examples
Rollout Collection in Pipeline
import ray
# Suspend during model update
ray.get(train_rollout_scheduler.suspend.remote())
# Perform model update
model_update()
# Collect training batch
batch = ray.get(train_rollout_scheduler.get_batch.remote(
data=DataProto(meta_info={"global_step": step}),
batch_size=32
))
# Train on collected batch
train(batch)
Related Pages
Implements Principle
Requires Environment
Environment Dependencies
This implementation requires the following environment constraints:
Heuristics Applied
This implementation uses the following heuristics: