Implementation:Alibaba ROLL RolloutScheduler Get Batch
| Knowledge Sources | |
|---|---|
| Domains | Reinforcement_Learning, Agentic_AI |
| Last Updated | 2026-02-07 20:00 GMT |
Overview
Concrete trajectory batch assembly method from the RolloutScheduler provided by the Alibaba ROLL library.
Description
The RolloutScheduler.get_batch method collects completed trajectory groups from the GroupQueueManager and concatenates them into a single DataProto batch for training. It handles group completeness checking, metric collection, and optional data collation. The method operates asynchronously and blocks until the requested batch size is filled.
Usage
Called by the agentic pipeline at each training step to collect a batch of completed trajectories for policy optimization.
Code Reference
Source Location
- Repository: Alibaba ROLL
- File: roll/distributed/scheduler/rollout_scheduler.py
- Lines: L628-672
Signature
async def get_batch(
self,
data: DataProto,
batch_size: int
) -> Optional[DataProto]:
"""
Get batch of rollout trajectories.
Args:
data: Input DataProto with global_step in meta_info
batch_size: Number of trajectories to collect (-1 for unlimited)
Returns:
Concatenated DataProto batch with input_ids, attention_mask,
position_ids, response_mask, prompt_mask, scores, traj_group_id,
traj_id, env_ids, step_scores, episode_scores.
Returns None if no trajectories available.
"""
Import
from roll.distributed.scheduler.rollout_scheduler import RolloutScheduler
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| data | DataProto | Yes | Contains global_step in meta_info for group tracking |
| batch_size | int | Yes | Number of trajectories to collect (-1 for all available) |
Outputs
| Name | Type | Description |
|---|---|---|
| batch | Optional[DataProto] | Concatenated training batch with trajectory tensors and metadata |
| input_ids | torch.Tensor | Padded input token sequences |
| response_mask | torch.Tensor | Mask for response tokens |
| scores | torch.Tensor | Episode-level scores |
| step_scores | torch.Tensor | Per-step reward scores |
| traj_group_id | torch.Tensor | Group identifiers for variance reduction |
Usage Examples
Collecting Training Batch
import ray
from roll.distributed.scheduler.protocol import DataProto
# Collect batch of 32 trajectories
batch = await rollout_scheduler.get_batch(
data=DataProto(meta_info={"global_step": current_step}),
batch_size=32
)
if batch is not None:
# Access trajectory data
input_ids = batch.batch["input_ids"]
scores = batch.batch["scores"]
traj_group_id = batch.batch["traj_group_id"]
Related Pages
Implements Principle
Requires Environment
Environment Dependencies
This implementation requires the following environment constraints:
Heuristics Applied
This implementation uses the following heuristics: