Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Hpcaitech ColossalAI DetachedReplayBuffer

From Leeroopedia


Knowledge Sources
Domains Reinforcement Learning, Distributed Training, Experience Replay
Last Updated 2026-02-09 00:00 GMT

Overview

A detached replay buffer that uses a Ray queue to share experience batches across workers on the same node in the distributed RLHF training pipeline.

Description

DetachedReplayBuffer provides a shared experience buffer for the Ray-based ColossalChat RLHF system. It accumulates individual experience items from remote ExperienceMakerHolder workers via the append and extend methods, batches them to the configured sample_batch_size, and enqueues complete batches into a Ray Queue. The trainer samples from this queue in a blocking fashion, ensuring experiences are consumed only when full batches are available.

The buffer supports an optional size limit, blocking producers when the queue is full. It splits incoming Experience objects into individual BufferItem instances and re-batches them before enqueuing.

Usage

Use DetachedReplayBuffer within a DetachedTrainer Ray actor to receive and buffer experience data from remote ExperienceMakerHolder actors. Each trainer node is expected to have one instance. The buffer is called remotely by experience makers and sampled locally by the trainer.

Code Reference

Source Location

Signature

class DetachedReplayBuffer:
    def __init__(self, sample_batch_size: int, limit: int = 0) -> None: ...
    def append(self, experience: Experience) -> None: ...
    def extend(self, items: List[BufferItem]) -> None: ...
    def clear(self) -> None: ...
    def sample(self, worker_rank=0, to_device="cpu") -> Experience: ...
    def get_length(self) -> int: ...

Import

from coati.ray.detached_replay_buffer import DetachedReplayBuffer

I/O Contract

Inputs

Name Type Required Description
sample_batch_size int Yes Batch size for grouping individual items before enqueuing
limit int No Maximum number of batches in the queue; 0 means unlimited (default 0)

Outputs

Name Type Description
sample() return Experience A batched Experience object moved to the specified device
get_length() return int The current number of batches in the queue

Usage Examples

from coati.ray.detached_replay_buffer import DetachedReplayBuffer

# Create a buffer that batches experiences into groups of 8
buffer = DetachedReplayBuffer(sample_batch_size=8, limit=16)

# Remotely called by ExperienceMakerHolder
buffer.append(experience)

# Locally called by the trainer to get a batch
batch = buffer.sample(to_device="cuda:0")

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment