Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Hpcaitech ColossalAI DetachedTrainer

From Leeroopedia


Knowledge Sources
Domains Reinforcement Learning, Distributed Training, RLHF
Last Updated 2026-02-09 00:00 GMT

Overview

Abstract base class for detached RLHF trainers in the Ray-based ColossalChat pipeline, where the experience maker runs as a separate Ray actor from the trainer.

Description

DetachedTrainer is an abstract base class that implements the core training loop for RLHF when the experience maker is detached (runs on separate Ray actors). It manages a DetachedReplayBuffer for receiving experiences from remote ExperienceMakerHolder actors, executes the training loop with warmup and multiple epochs, and provides Ray-method-decorated buffer operations (buffer_append, buffer_extend, buffer_get_length, _buffer_sample) with concurrency groups to safely handle concurrent access.

The trainer resolves remote experience maker holders by name via ray.get_actor and dispatches model weight updates to them. It provides a full callback lifecycle including fit, episode, epoch, batch, and update hooks. Subclasses must implement training_step and _update_remote_makers.

Usage

Subclass DetachedTrainer to implement a specific RL algorithm (e.g., PPO). The subclass should be decorated with @ray.remote and configured with concurrency groups. Deploy as a Ray actor with a name so that ExperienceMakerHolder instances can find it and push experience data into its buffer.

Code Reference

Source Location

Signature

class DetachedTrainer(ABC):
    def __init__(
        self,
        experience_maker_holder_name_list: List[str],
        train_batch_size: int = 8,
        buffer_limit: int = 0,
        dataloader_pin_memory: bool = True,
        callbacks: List[TrainerCallback] = [],
        debug: bool = False,
    ) -> None: ...

    @abstractmethod
    def _update_remote_makers(self, fully_update: bool = False, **kwargs): ...

    @abstractmethod
    def training_step(self, experience: Experience) -> Dict[str, Any]: ...

    def fit(self, total_steps: int, update_steps: int, train_epochs: int = 1) -> None: ...
    def sync_models_to_remote_makers(self, **kwargs): ...
    def buffer_get_length(self): ...
    def buffer_append(self, experience: Experience): ...
    def buffer_extend(self, items: List[BufferItem]): ...

Import

from coati.ray.detached_trainer_base import DetachedTrainer

I/O Contract

Inputs

Name Type Required Description
experience_maker_holder_name_list List[str] Yes Names of remote ExperienceMakerHolder Ray actors
train_batch_size int No Batch size for training (default 8)
buffer_limit int No Maximum replay buffer size; 0 for unlimited (default 0)
dataloader_pin_memory bool No Whether to pin memory for the data loader (default True)
callbacks List[TrainerCallback] No List of callback instances (default [])
debug bool No Enable debug logging (default False)

Outputs

Name Type Description
training_step return Dict[str, Any] Dictionary of training metrics (e.g., loss values)
buffer_get_length return int Current number of batches in the replay buffer

Usage Examples

import ray
from coati.ray.detached_trainer_base import DetachedTrainer
from coati.experience_maker import Experience

@ray.remote(concurrency_groups={
    "buffer_length": 1, "buffer_append": 1,
    "buffer_sample": 1, "compute": 1
})
class MyTrainer(DetachedTrainer):
    def training_step(self, experience: Experience):
        # Implement training logic
        return {"loss": 0.0}

    def _update_remote_makers(self, fully_update=False, **kwargs):
        # Implement model sync to remote makers
        pass

trainer = MyTrainer.options(name="trainer_0").remote(
    experience_maker_holder_name_list=["maker_0"],
    train_batch_size=8,
)
ray.get(trainer.fit.remote(total_steps=1000, update_steps=100, train_epochs=2))

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment