Implementation:Hpcaitech ColossalAI ExperienceMakerHolder
| Knowledge Sources | |
|---|---|
| Domains | Reinforcement Learning, Distributed Training, RLHF, Experience Generation |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
A Ray remote actor that manages the experience generation process in the detached RLHF pipeline, running actor, critic, reward, and initial models for inference and distributing generated experiences to trainer actors.
Description
ExperienceMakerHolder is a Ray remote actor with concurrency groups for experience I/O, model I/O, and compute. It initializes four models (actor, critic, reward model, initial model) via factory functions and wraps them in a NaiveExperienceMaker. The workingloop method iterates over a dataloader to generate experiences, optionally for a fixed number of steps or epochs, while distributing the generated experience items across multiple trainer actors in a round-robin fashion.
The update_experience_maker method allows trainers to synchronize model weights back to this actor in a chunked fashion using a model visit lock to prevent concurrent reads during weight updates. It supports both full state dict updates and incremental LoRA weight updates via LoRAConstructor. The holder waits for full initialization before starting the working loop when sync_models_from_trainers is enabled.
Usage
Deploy as a named Ray actor in the distributed RLHF pipeline. Provide factory functions for the strategy and models, along with names of the DetachedTrainer actors that will receive generated experience data. The holder's workingloop method drives the experience generation loop.
Code Reference
Source Location
- Repository: Hpcaitech_ColossalAI
- File: applications/ColossalChat/coati/ray/experience_maker_holder.py
- Lines: 1-274
Signature
@ray.remote(concurrency_groups={
"experience_io": 1, "model_io": 1, "compute": 1
})
class ExperienceMakerHolder:
def __init__(
self,
detached_trainer_name_list: List[str],
strategy_fn: Callable[[], Strategy],
model_fn: Callable[[], Tuple[Actor, Critic, RewardModel, Actor]],
env_info: Dict[str, str] = None,
sync_models_from_trainers: bool = False,
buffer_cpu_offload: bool = True,
kl_coef: float = 0.1,
callbacks: List[MakerCallback] = [],
eval_performance: bool = False,
debug: bool = False,
update_lora_weights: bool = False,
**generate_kwargs,
): ...
def workingloop(
self,
dataloader_fn: Callable[[], Iterable],
num_epochs: int = 1,
num_steps: int = 0,
): ...
def update_experience_maker(
self,
new_actor_state_dict: Dict[str, Any] = None,
new_actor_lora_config_dict: Dict[str, Any] = None,
new_critic_state_dict: Dict[str, Any] = None,
new_critic_lora_config_dict: Dict[str, Any] = None,
fully_update: bool = False,
chunk_start: bool = None,
chunk_end: bool = None,
): ...
Import
from coati.ray.experience_maker_holder import ExperienceMakerHolder
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| detached_trainer_name_list | List[str] | Yes | Names of remote DetachedTrainer Ray actors to send experience to |
| strategy_fn | Callable[[], Strategy] | Yes | Factory function returning the training strategy |
| model_fn | Callable[[], Tuple[Actor, Critic, RewardModel, Actor]] | Yes | Factory function returning actor, critic, reward model, initial model |
| env_info | Dict[str, str] | No | Environment variables for distributed setup (default None) |
| sync_models_from_trainers | bool | No | Wait for trainer to push initial weights (default False) |
| buffer_cpu_offload | bool | No | Offload experience to CPU before sending (default True) |
| kl_coef | float | No | KL divergence loss coefficient (default 0.1) |
| callbacks | List[MakerCallback] | No | Callback instances (default []) |
| eval_performance | bool | No | Enable performance evaluation callback (default False) |
| debug | bool | No | Enable debug logging (default False) |
| update_lora_weights | bool | No | Use incremental LoRA weight updates (default False) |
| **generate_kwargs | dict | No | Additional keyword arguments for text generation |
Outputs
| Name | Type | Description |
|---|---|---|
| workingloop return | None | Runs the experience generation loop to completion |
| update_experience_maker return | None | Updates model weights in place |
Usage Examples
import ray
from coati.ray.experience_maker_holder import ExperienceMakerHolder
maker = ExperienceMakerHolder.options(
name="maker_0", num_gpus=1
).remote(
detached_trainer_name_list=["trainer_0"],
strategy_fn=lambda: my_strategy(),
model_fn=lambda: (my_actor(), my_critic(), my_reward_model(), my_initial_model()),
kl_coef=0.1,
buffer_cpu_offload=True,
eval_performance=True,
max_length=512,
top_k=50,
)
# Start the experience generation loop
ray.get(maker.workingloop.remote(
dataloader_fn=lambda: my_dataloader(),
num_epochs=5,
))