Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Hpcaitech ColossalAI Launch Zero Bubble

From Leeroopedia


Knowledge Sources
Domains Distributed_Training, RLHF, Zero_Bubble_Pipeline
Last Updated 2026-02-09 00:00 GMT

Overview

launch_zero_bubble.py orchestrates the launch of the Zero Bubble distributed RL training pipeline, creating and coordinating Ray-based producers, consumers, and distributors for asynchronous GRPO/DAPO training.

Description

This module provides the launch_distributed function, which is the entry point for the zero-bubble asynchronous RL pipeline. It creates SimpleProducer actors for inference/rollout, GRPOConsumer (or DAPOConsumer) actors for policy training, and Distributor actors that mediate weight synchronization. Communication groups are initialized with granular separation: gloo-based groups for consumer-distributor model sync, producer-distributor model sync (per pipeline-parallel stage), and separate collectives for consumers, producers, and distributors. The module also includes helper functions get_jsonl_size_fast for counting JSONL dataset lines and get_dp_size_fast for computing data-parallel world size from plugin configuration.

Usage

Use this module to launch a fully distributed GRPO/DAPO training pipeline using Ray with zero-bubble scheduling. It is called from training scripts that need to coordinate multiple GPU workers across inference and training with asynchronous model weight distribution.

Code Reference

Source Location

Signature

def launch_distributed(
    num_producers: int,
    num_proc_per_producer: int,
    num_consumer_procs: int,
    num_episodes: int,
    inference_batch_size: int,
    inference_microbatch_size: int,
    train_batch_size: int,
    train_minibatch_size: int,
    train_dataset_config: Dict[str, Any],
    inference_model_config: Dict[str, Any],
    generate_config: Dict[str, Any],
    train_model_config: Dict[str, Any],
    grpo_config: Dict[str, Any],
    plugin_config: Dict[str, Any],
    tokenizer_config: Optional[Dict[str, Any]] = None,
    inference_backend: str = "transformers",
    num_generations: int = 8,
    master_addr: str = "localhost",
    master_port: int = 29500,
    core_algo: str = "GRPO",
    project_name: Optional[str] = None,
    save_interval: int = 100,
    save_dir: str = "./model",
    eval_dataset_config: Optional[Dict[str, Any]] = None,
    eval_interval: int = 100,
    eval_save_dir: Optional[str] = None,
    eval_generation_config: Optional[Dict[str, Any]] = None,
    log_rollout_interval: int = 20,
    rollout_save_dir: str = "./rollout",
    enable_profiling: bool = False,
    data_actor_buffer_size_limit: int = 0,
) -> None

Helper Functions

def get_jsonl_size_fast(path: str) -> int

def get_dp_size_fast(n_procs: int, plugin_config: Dict[str, Any]) -> int

Import

from coati.distributed.launch_zero_bubble import launch_distributed

I/O Contract

Inputs

Name Type Required Description
num_producers int Yes Number of producer (inference) workers to spawn
num_proc_per_producer int Yes Number of GPUs allocated per producer
num_consumer_procs int Yes Number of consumer (training) processes
num_episodes int Yes Number of training episodes to run
inference_batch_size int Yes Batch size for inference on each producer
inference_microbatch_size int Yes Microbatch size for inference
train_batch_size int Yes Batch size for training on each consumer
train_minibatch_size int Yes Minibatch size for training gradient accumulation
train_dataset_config Dict[str, Any] Yes Configuration dict containing dataset path and options
inference_model_config Dict[str, Any] Yes Model configuration for inference producers
generate_config Dict[str, Any] Yes Generation parameters (temperature, top_p, etc.)
train_model_config Dict[str, Any] Yes Model configuration for training consumers
grpo_config Dict[str, Any] Yes GRPO algorithm configuration
plugin_config Dict[str, Any] Yes ColossalAI HybridParallelPlugin configuration (tp_size, pp_size, etc.)
core_algo str No Algorithm selection, one of "GRPO" or "DAPO" (default: "GRPO")
data_actor_buffer_size_limit int No Buffer size limit for data exchange actor (default: 2 * train_minibatch_size * dp_size)

Outputs

Name Type Description
(none) None Function blocks until all producer and consumer loops complete via ray.get()

Usage Examples

from coati.distributed.launch_zero_bubble import launch_distributed

launch_distributed(
    num_producers=2,
    num_proc_per_producer=4,
    num_consumer_procs=8,
    num_episodes=3,
    inference_batch_size=16,
    inference_microbatch_size=4,
    train_batch_size=8,
    train_minibatch_size=2,
    train_dataset_config={"path": "/data/train.jsonl"},
    inference_model_config={"path": "Qwen/Qwen2.5-7B"},
    generate_config={"temperature": 1.0, "top_p": 0.95, "max_tokens": 2048},
    train_model_config={"path": "Qwen/Qwen2.5-7B"},
    grpo_config={"lr": 1e-6, "reward_fn_type": "think_answer_tags"},
    plugin_config={"tp_size": 1, "pp_size": 1, "zero_stage": 2},
    core_algo="GRPO",
)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment