Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Deepspeedai DeepSpeed PipelineEngine Init

From Leeroopedia


Overview

Concrete tool for initializing the DeepSpeed pipeline training engine provided by the DeepSpeed library. PipelineEngine is created automatically by deepspeed.initialize() when the model is a PipelineModule.

Description

PipelineEngine extends DeepSpeedEngine with pipeline-specific functionality: P2P communication groups, micro-batch buffers, activation shape tracking, and the 1F1B training schedule. It is not instantiated directly — instead, deepspeed.initialize() detects that the model is a PipelineModule and creates a PipelineEngine automatically.

Key initialization actions:

  • Validates the model is a PipelineModule and ZeRO stage < 2.
  • Extracts grid, stage information, and data parallel topology from the PipelineModule.
  • Initializes P2P process groups for inter-stage communication via p2p.init_process_groups().
  • Allocates pipeline buffer structures (inputs, labels, outputs, output_tensors).
  • Configures activation checkpointing interval and reentrant/non-reentrant mode from the pipeline config.
  • Sets up loss tracking tensors (loss, total_loss, agg_loss, dp_group_loss).
  • Performs a communication handshake between adjacent stages to verify P2P channels.
  • Disables forward(), backward(), and step() (all raise PipelineError).

Code Reference

Signature:

class PipelineEngine(DeepSpeedEngine):
    def __init__(self, has_bool_tensors=False, *super_args, **super_kwargs)

Import:

# PipelineEngine is created automatically by deepspeed.initialize()
# when model is a PipelineModule. No direct import needed.
import deepspeed
from deepspeed.pipe import PipelineModule

I/O Contract

Inputs

Parameter Type Required Default Description
model PipelineModule Yes Passed via deepspeed.initialize(model=...); must be a PipelineModule instance
config dict Yes DeepSpeed configuration; ZeRO stage must be 0 or 1
has_bool_tensors bool No False Whether the pipeline passes boolean tensors (e.g., attention masks); these are cast to half for NCCL compatibility

Outputs

Output Type Description
engine PipelineEngine Engine with P2P communication groups, micro-batch buffers, and pipeline schedule configured
optimizer Optimizer The wrapped optimizer
dataloader DataLoader The training data loader (if provided)
lr_scheduler LRScheduler The learning rate scheduler (if provided)

Usage Example

import deepspeed
from deepspeed.pipe import PipelineModule, LayerSpec
import torch.nn as nn

# Define pipeline model
layers = [LayerSpec(nn.Linear, 1024, 1024) for _ in range(24)]
model = PipelineModule(
    layers=layers,
    num_stages=4,
    loss_fn=nn.CrossEntropyLoss()
)

# Initialize - PipelineEngine is created automatically
engine, optimizer, _, lr_scheduler = deepspeed.initialize(
    model=model,
    model_parameters=model.parameters(),
    config={
        "train_batch_size": 32,
        "train_micro_batch_size_per_gpu": 4,
        "steps_per_print": 100,
        "optimizer": {
            "type": "Adam",
            "params": {"lr": 1e-4}
        },
        "zero_optimization": {
            "stage": 1
        }
    }
)

# engine is now a PipelineEngine instance
print(f"Stage: {engine.stage_id}, Num stages: {engine.num_stages}")

Related Pages

Knowledge Sources

Last updated: 2026-02-09 00:00 GMT

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment