Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Alibaba ROLL RolloutScheduler

From Leeroopedia


Knowledge Sources
Domains Distributed_Systems, Reinforcement_Learning, Agentic_AI
Last Updated 2026-02-07 20:00 GMT

Overview

Concrete distributed rollout scheduler with group-based trajectory batching provided by the Alibaba ROLL library.

Description

The RolloutScheduler class coordinates asynchronous trajectory collection. It manages GroupQueueManager (a Ray actor that buffers completed episodes and assembles groups), LLM inference request routing via the generation scheduler, and dynamic GPU allocation for overlapped generation and training. The scheduler supports suspension during model updates and batch assembly with configurable sizes.

Usage

Instantiated by the agentic pipeline for both training and validation rollout collection. Created as a Ray actor for distributed coordination.

Code Reference

Source Location

  • Repository: Alibaba ROLL
  • File: roll/distributed/scheduler/rollout_scheduler.py
  • Lines: L542-760

Signature

class RolloutScheduler:
    def __init__(
        self,
        config,
        env_manager_config: EnvManagerConfig,
        resource_manager,
        infer_cluster,
        mode: str,
        collator=None,
    ) -> None:
        """
        Args:
            config: Pipeline configuration
            env_manager_config: Environment manager configuration
            resource_manager: Resource management actor
            infer_cluster: LLM inference cluster
            mode: "train" or "val"
            collator: Optional data collator
        """

    async def get_batch(self, data: DataProto, batch_size: int) -> Optional[DataProto]:
        """Get batch of rollout trajectories."""

    async def suspend(self) -> None:
        """Suspend generation scheduler during model update."""

    async def shutdown(self) -> None:
        """Gracefully shutdown all components."""

    async def shrink_sampler(self, target_gpus: List[int]) -> Dict:
        """Shrink generation to free GPUs for training."""

    async def expand_sampler(self, target_gpus: List[int], skip_load: bool = False) -> Dict:
        """Expand generation to use more GPUs."""

Import

from roll.distributed.scheduler.rollout_scheduler import RolloutScheduler, GroupQueueManager

I/O Contract

Inputs

Name Type Required Description
data DataProto Yes Contains global_step in meta_info
batch_size int Yes Number of trajectories to collect (-1 for unlimited)

Outputs

Name Type Description
batch Optional[DataProto] Concatenated trajectory batch or None if empty

Usage Examples

Rollout Collection in Pipeline

import ray

# Suspend during model update
ray.get(train_rollout_scheduler.suspend.remote())

# Perform model update
model_update()

# Collect training batch
batch = ray.get(train_rollout_scheduler.get_batch.remote(
    data=DataProto(meta_info={"global_step": step}),
    batch_size=32
))

# Train on collected batch
train(batch)

Related Pages

Implements Principle

Requires Environment

Environment Dependencies

This implementation requires the following environment constraints:

Heuristics Applied

This implementation uses the following heuristics:

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment