Implementation:Alibaba ROLL TrajEnvManager
| Knowledge Sources | |
|---|---|
| Domains | Reinforcement_Learning, Agentic_AI, Environment_Interaction |
| Last Updated | 2026-02-07 20:00 GMT |
Overview
Concrete trajectory-based environment manager for multi-turn LLM agent training provided by the Alibaba ROLL library.
Description
The TrajEnvManager class manages the interaction loop between an LLM and an environment. It orchestrates episode collection by coordinating environment resets, LLM generation requests, environment steps, and training sample formulation. The class runs as a Ray actor, uses an output queue (GroupQueueManager) for trajectory batching, and supports concurrent episode execution with thread-safe locks.
The StepEnvManager extends TrajEnvManager to create step-level training samples instead of trajectory-level ones, enabling GiGPO-style training.
Usage
Instantiated automatically by the agentic pipeline. Each environment type (Sokoban, FrozenLake, WebShop) gets its own TrajEnvManager or StepEnvManager instance.
Code Reference
Source Location
- Repository: Alibaba ROLL
- File: roll/pipeline/agentic/env_manager/traj_env_manager.py
- Lines: L30-374
Signature
class TrajEnvManager(BaseEnvManager):
def __init__(
self,
worker_config: EnvManagerConfig,
pipeline_config: AgenticConfig,
env_config: DictConfig,
tokenizer: PreTrainedTokenizer,
generate_scheduler,
output_queue: GroupQueueManager,
thread_lock: Lock,
mode: str = "train",
) -> None:
"""
Args:
worker_config: Environment manager configuration
pipeline_config: Pipeline configuration
env_config: Environment-specific config (Sokoban/FrozenLake/etc.)
tokenizer: Tokenizer for encoding/decoding
generate_scheduler: LLM generation request scheduler
output_queue: Ray queue for output trajectories
thread_lock: Thread synchronization lock
mode: "train" or "val"
"""
def run_rollout_loop(self, data: DataProto) -> None:
"""Main rollout loop: collect episodes until completion."""
def reset(self) -> RolloutCache:
"""Reset environment and create new episode cache."""
def step(self, llm_output: DataProto) -> RolloutCache:
"""Execute one environment step with LLM response."""
def make_decision(self, rollout_cache: RolloutCache) -> DataProto:
"""Generate LLM decision via language model proxy."""
def formulate_rollouts(self, rollout_cache: RolloutCache) -> DataProto:
"""Construct training samples from collected trajectory."""
Import
from roll.pipeline.agentic.env_manager.traj_env_manager import TrajEnvManager
from roll.pipeline.agentic.env_manager.step_env_manager import StepEnvManager
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| env_config | DictConfig | Yes | Environment-specific configuration |
| tokenizer | PreTrainedTokenizer | Yes | Tokenizer for text encoding |
| generate_scheduler | object | Yes | LLM generation request scheduler |
| output_queue | GroupQueueManager | Yes | Ray queue for output trajectories |
Outputs
| Name | Type | Description |
|---|---|---|
| DataProto trajectories | DataProto | Training samples with input_ids, attention_mask, response_mask, scores, traj_group_id, traj_id, step_scores, episode_scores |
Usage Examples
Trajectory Collection
# TrajEnvManager is typically used within the agentic pipeline.
# The run_rollout_loop continuously collects episodes:
env_manager = TrajEnvManager(
worker_config=env_manager_config,
pipeline_config=agentic_config,
env_config=sokoban_env_config,
tokenizer=tokenizer,
generate_scheduler=scheduler,
output_queue=group_queue,
thread_lock=lock,
mode="train",
)
# The rollout loop runs asynchronously, producing trajectories
env_manager.run_rollout_loop(data=DataProto(meta_info={"global_step": 0}))
Related Pages
Implements Principle
Requires Environment
Environment Dependencies
This implementation requires the following environment constraints:
Heuristics Applied
No specific heuristics apply to this implementation.