Implementation:Microsoft DeepSpeedExamples Pipeline Parallelism Training
| Knowledge Sources | |
|---|---|
| Domains | Pipeline Parallelism, Distributed Training |
| Last Updated | 2026-02-07 12:00 GMT |
Overview
Implements pipeline parallelism training for CIFAR-10 classification using DeepSpeed's PipelineModule with AlexNet, supporting both standard and pipeline-parallel training modes.
Description
This script provides two training modes for CIFAR-10 image classification with an AlexNet model. The train_base function implements standard data-parallel training using a DeepSpeed engine with manual forward-backward-step loops, a RepeatingLoader for infinite data iteration, and cross-entropy loss. It runs for a configurable number of gradient accumulation-aware steps.
The train_pipe function implements pipeline-parallel training by decomposing the AlexNet model into sequential layers via join_layers, which flattens the model's features, average pooling, a flatten lambda, and classifier layers into a single list. These layers are wrapped in a PipelineModule with configurable stage count and a loss function, and training proceeds through engine.train_batch() calls that handle micro-batch scheduling automatically.
The cifar_trainset function handles distributed dataset loading with a barrier-based semaphore to ensure only rank 0 downloads the dataset. Images are preprocessed with resize to 256, center crop to 224, and ImageNet normalization. The script supports configurable pipeline parallelism size, random seed, distributed backend, and step count via command-line arguments.
Usage
Use this script as a reference implementation for setting up pipeline parallelism with DeepSpeed. Setting --pipeline-parallel-size 0 runs standard data-parallel training, while any positive value activates pipeline-parallel mode with the specified number of stages.
Code Reference
Source Location
- Repository: Microsoft_DeepSpeedExamples
- File: training/pipeline_parallelism/train.py
- Lines: 1-159
Signature
def cifar_trainset(local_rank, dl_path='/tmp/cifar10-data'):
...
def get_args():
...
def train_base(args):
...
def join_layers(vision_model):
...
def train_pipe(args, part='parameters'):
...
Import
from train import cifar_trainset, train_base, train_pipe, join_layers
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| --local_rank | int | No | Local rank from distributed launcher (default: -1) |
| --steps | int | No | Number of training steps before quitting (default: 100) |
| --pipeline-parallel-size | int | No | Number of pipeline stages; 0 for standard training (default: 2) |
| --backend | str | No | Distributed backend, e.g., 'nccl' (default: 'nccl') |
| --seed | int | No | Random seed for reproducibility (default: 1138) |
| --deepspeed_config | str | No | Path to DeepSpeed configuration JSON file |
Outputs
| Name | Type | Description |
|---|---|---|
| stdout | text | Training progress with step number and loss values (every 10 steps in base mode) |
| model | DeepSpeedEngine | Trained DeepSpeed engine with AlexNet weights |
Usage Examples
# Launch with DeepSpeed for pipeline parallelism (2 stages)
# deepspeed train.py --deepspeed_config ds_config.json --pipeline-parallel-size 2 --steps 100
# Launch standard data-parallel training
# deepspeed train.py --deepspeed_config ds_config.json --pipeline-parallel-size 0 --steps 100
# Programmatic layer decomposition
from torchvision.models import AlexNet
from train import join_layers
net = AlexNet(num_classes=10)
layers = join_layers(net)
# layers is a flat list of nn.Module and lambda components for PipelineModule