Implementation:Deepspeedai DeepSpeed PipelineEngine Init
Overview
Concrete tool for initializing the DeepSpeed pipeline training engine provided by the DeepSpeed library. PipelineEngine is created automatically by deepspeed.initialize() when the model is a PipelineModule.
Description
PipelineEngine extends DeepSpeedEngine with pipeline-specific functionality: P2P communication groups, micro-batch buffers, activation shape tracking, and the 1F1B training schedule. It is not instantiated directly — instead, deepspeed.initialize() detects that the model is a PipelineModule and creates a PipelineEngine automatically.
Key initialization actions:
- Validates the model is a
PipelineModuleand ZeRO stage < 2. - Extracts grid, stage information, and data parallel topology from the
PipelineModule. - Initializes P2P process groups for inter-stage communication via
p2p.init_process_groups(). - Allocates pipeline buffer structures (inputs, labels, outputs, output_tensors).
- Configures activation checkpointing interval and reentrant/non-reentrant mode from the pipeline config.
- Sets up loss tracking tensors (
loss,total_loss,agg_loss,dp_group_loss). - Performs a communication handshake between adjacent stages to verify P2P channels.
- Disables
forward(),backward(), andstep()(all raisePipelineError).
Code Reference
- Repository: https://github.com/deepspeedai/DeepSpeed
- File:
deepspeed/runtime/pipe/engine.py - Lines: L60-258
Signature:
class PipelineEngine(DeepSpeedEngine):
def __init__(self, has_bool_tensors=False, *super_args, **super_kwargs)
Import:
# PipelineEngine is created automatically by deepspeed.initialize()
# when model is a PipelineModule. No direct import needed.
import deepspeed
from deepspeed.pipe import PipelineModule
I/O Contract
Inputs
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
| model | PipelineModule | Yes | — | Passed via deepspeed.initialize(model=...); must be a PipelineModule instance
|
| config | dict | Yes | — | DeepSpeed configuration; ZeRO stage must be 0 or 1 |
| has_bool_tensors | bool | No | False | Whether the pipeline passes boolean tensors (e.g., attention masks); these are cast to half for NCCL compatibility |
Outputs
| Output | Type | Description |
|---|---|---|
| engine | PipelineEngine | Engine with P2P communication groups, micro-batch buffers, and pipeline schedule configured |
| optimizer | Optimizer | The wrapped optimizer |
| dataloader | DataLoader | The training data loader (if provided) |
| lr_scheduler | LRScheduler | The learning rate scheduler (if provided) |
Usage Example
import deepspeed
from deepspeed.pipe import PipelineModule, LayerSpec
import torch.nn as nn
# Define pipeline model
layers = [LayerSpec(nn.Linear, 1024, 1024) for _ in range(24)]
model = PipelineModule(
layers=layers,
num_stages=4,
loss_fn=nn.CrossEntropyLoss()
)
# Initialize - PipelineEngine is created automatically
engine, optimizer, _, lr_scheduler = deepspeed.initialize(
model=model,
model_parameters=model.parameters(),
config={
"train_batch_size": 32,
"train_micro_batch_size_per_gpu": 4,
"steps_per_print": 100,
"optimizer": {
"type": "Adam",
"params": {"lr": 1e-4}
},
"zero_optimization": {
"stage": 1
}
}
)
# engine is now a PipelineEngine instance
print(f"Stage: {engine.stage_id}, Num stages: {engine.num_stages}")
Related Pages
- Principle:Deepspeedai_DeepSpeed_Pipeline_Engine_Init
- Implementation:Deepspeedai_DeepSpeed_PipelineModule_Init
- Implementation:Deepspeedai_DeepSpeed_PipelineEngine_Train_Batch
- Heuristic:Deepspeedai_DeepSpeed_ZeRO_Pipeline_Incompatibility
Knowledge Sources
Last updated: 2026-02-09 00:00 GMT