Implementation:Hpcaitech ColossalAI Launch Zero Bubble
| Knowledge Sources | |
|---|---|
| Domains | Distributed_Training, RLHF, Zero_Bubble_Pipeline |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
launch_zero_bubble.py orchestrates the launch of the Zero Bubble distributed RL training pipeline, creating and coordinating Ray-based producers, consumers, and distributors for asynchronous GRPO/DAPO training.
Description
This module provides the launch_distributed function, which is the entry point for the zero-bubble asynchronous RL pipeline. It creates SimpleProducer actors for inference/rollout, GRPOConsumer (or DAPOConsumer) actors for policy training, and Distributor actors that mediate weight synchronization. Communication groups are initialized with granular separation: gloo-based groups for consumer-distributor model sync, producer-distributor model sync (per pipeline-parallel stage), and separate collectives for consumers, producers, and distributors. The module also includes helper functions get_jsonl_size_fast for counting JSONL dataset lines and get_dp_size_fast for computing data-parallel world size from plugin configuration.
Usage
Use this module to launch a fully distributed GRPO/DAPO training pipeline using Ray with zero-bubble scheduling. It is called from training scripts that need to coordinate multiple GPU workers across inference and training with asynchronous model weight distribution.
Code Reference
Source Location
- Repository: Hpcaitech_ColossalAI
- File: applications/ColossalChat/coati/distributed/launch_zero_bubble.py
- Lines: 1-305
Signature
def launch_distributed(
num_producers: int,
num_proc_per_producer: int,
num_consumer_procs: int,
num_episodes: int,
inference_batch_size: int,
inference_microbatch_size: int,
train_batch_size: int,
train_minibatch_size: int,
train_dataset_config: Dict[str, Any],
inference_model_config: Dict[str, Any],
generate_config: Dict[str, Any],
train_model_config: Dict[str, Any],
grpo_config: Dict[str, Any],
plugin_config: Dict[str, Any],
tokenizer_config: Optional[Dict[str, Any]] = None,
inference_backend: str = "transformers",
num_generations: int = 8,
master_addr: str = "localhost",
master_port: int = 29500,
core_algo: str = "GRPO",
project_name: Optional[str] = None,
save_interval: int = 100,
save_dir: str = "./model",
eval_dataset_config: Optional[Dict[str, Any]] = None,
eval_interval: int = 100,
eval_save_dir: Optional[str] = None,
eval_generation_config: Optional[Dict[str, Any]] = None,
log_rollout_interval: int = 20,
rollout_save_dir: str = "./rollout",
enable_profiling: bool = False,
data_actor_buffer_size_limit: int = 0,
) -> None
Helper Functions
def get_jsonl_size_fast(path: str) -> int
def get_dp_size_fast(n_procs: int, plugin_config: Dict[str, Any]) -> int
Import
from coati.distributed.launch_zero_bubble import launch_distributed
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| num_producers | int | Yes | Number of producer (inference) workers to spawn |
| num_proc_per_producer | int | Yes | Number of GPUs allocated per producer |
| num_consumer_procs | int | Yes | Number of consumer (training) processes |
| num_episodes | int | Yes | Number of training episodes to run |
| inference_batch_size | int | Yes | Batch size for inference on each producer |
| inference_microbatch_size | int | Yes | Microbatch size for inference |
| train_batch_size | int | Yes | Batch size for training on each consumer |
| train_minibatch_size | int | Yes | Minibatch size for training gradient accumulation |
| train_dataset_config | Dict[str, Any] | Yes | Configuration dict containing dataset path and options |
| inference_model_config | Dict[str, Any] | Yes | Model configuration for inference producers |
| generate_config | Dict[str, Any] | Yes | Generation parameters (temperature, top_p, etc.) |
| train_model_config | Dict[str, Any] | Yes | Model configuration for training consumers |
| grpo_config | Dict[str, Any] | Yes | GRPO algorithm configuration |
| plugin_config | Dict[str, Any] | Yes | ColossalAI HybridParallelPlugin configuration (tp_size, pp_size, etc.) |
| core_algo | str | No | Algorithm selection, one of "GRPO" or "DAPO" (default: "GRPO") |
| data_actor_buffer_size_limit | int | No | Buffer size limit for data exchange actor (default: 2 * train_minibatch_size * dp_size) |
Outputs
| Name | Type | Description |
|---|---|---|
| (none) | None | Function blocks until all producer and consumer loops complete via ray.get() |
Usage Examples
from coati.distributed.launch_zero_bubble import launch_distributed
launch_distributed(
num_producers=2,
num_proc_per_producer=4,
num_consumer_procs=8,
num_episodes=3,
inference_batch_size=16,
inference_microbatch_size=4,
train_batch_size=8,
train_minibatch_size=2,
train_dataset_config={"path": "/data/train.jsonl"},
inference_model_config={"path": "Qwen/Qwen2.5-7B"},
generate_config={"temperature": 1.0, "top_p": 0.95, "max_tokens": 2048},
train_model_config={"path": "Qwen/Qwen2.5-7B"},
grpo_config={"lr": 1e-6, "reward_fn_type": "think_answer_tags"},
plugin_config={"tp_size": 1, "pp_size": 1, "zero_stage": 2},
core_algo="GRPO",
)