Implementation:Hpcaitech ColossalAI DetachedReplayBuffer
| Knowledge Sources | |
|---|---|
| Domains | Reinforcement Learning, Distributed Training, Experience Replay |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
A detached replay buffer that uses a Ray queue to share experience batches across workers on the same node in the distributed RLHF training pipeline.
Description
DetachedReplayBuffer provides a shared experience buffer for the Ray-based ColossalChat RLHF system. It accumulates individual experience items from remote ExperienceMakerHolder workers via the append and extend methods, batches them to the configured sample_batch_size, and enqueues complete batches into a Ray Queue. The trainer samples from this queue in a blocking fashion, ensuring experiences are consumed only when full batches are available.
The buffer supports an optional size limit, blocking producers when the queue is full. It splits incoming Experience objects into individual BufferItem instances and re-batches them before enqueuing.
Usage
Use DetachedReplayBuffer within a DetachedTrainer Ray actor to receive and buffer experience data from remote ExperienceMakerHolder actors. Each trainer node is expected to have one instance. The buffer is called remotely by experience makers and sampled locally by the trainer.
Code Reference
Source Location
- Repository: Hpcaitech_ColossalAI
- File: applications/ColossalChat/coati/ray/detached_replay_buffer.py
- Lines: 1-70
Signature
class DetachedReplayBuffer:
def __init__(self, sample_batch_size: int, limit: int = 0) -> None: ...
def append(self, experience: Experience) -> None: ...
def extend(self, items: List[BufferItem]) -> None: ...
def clear(self) -> None: ...
def sample(self, worker_rank=0, to_device="cpu") -> Experience: ...
def get_length(self) -> int: ...
Import
from coati.ray.detached_replay_buffer import DetachedReplayBuffer
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| sample_batch_size | int | Yes | Batch size for grouping individual items before enqueuing |
| limit | int | No | Maximum number of batches in the queue; 0 means unlimited (default 0) |
Outputs
| Name | Type | Description |
|---|---|---|
| sample() return | Experience | A batched Experience object moved to the specified device |
| get_length() return | int | The current number of batches in the queue |
Usage Examples
from coati.ray.detached_replay_buffer import DetachedReplayBuffer
# Create a buffer that batches experiences into groups of 8
buffer = DetachedReplayBuffer(sample_batch_size=8, limit=16)
# Remotely called by ExperienceMakerHolder
buffer.append(experience)
# Locally called by the trainer to get a batch
batch = buffer.sample(to_device="cuda:0")