Implementation:Deepspeedai DeepSpeed ZeRO2 DeepCompile
| Knowledge Sources | |
|---|---|
| Domains | Graph_Compilation, ZeRO_Optimization, Gradient_Partitioning |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
ZeRO Stage 2 DeepCompile implements torch.compile-compatible gradient reduction for optimizer state and gradient partitioning using NCCL allreduce with gradient accumulation tracking.
Description
The Z2CustomOpExecutor extends CustomOpExecutor to provide ZeRO Stage 2 functionality within DeepSpeed's graph compilation framework. ZeRO-2 partitions optimizer states and gradients, requiring allreduce with accumulation-aware gradient handling. Key features include:
- AllReduce Communication: Uses NCCL ncclAllReduce to compute gradient sums across all ranks (same as ZeRO-1)
- Gradient Accumulation Tracking: Maintains has_acc_grad_ map to distinguish first gradient from accumulated gradients per parameter
- Conditional Copy vs. Add: First reduced gradient is copied to buffer; subsequent gradients are accumulated via add_()
- Bucketed Reduction: Inherits bucket-based gradient coalescing from CustomOpExecutor for efficient communication
- Offset-Based Gradient Mapping: Each parameter has an offset into the shared gradient buffer for efficient indexing
- Pre-Division Support: Optional gradient pre-division before allreduce (when using ncclSum) vs. using ncclAvg
- Stream Coordination: Uses dedicated reduce-scatter stream (rs_stream) and copy stream for asynchronous operations
The critical difference from ZeRO-1 is accumulation handling: on the first reduction, gradients are copied to the buffer; on subsequent reductions (when has_acc_grad_ is true), gradients are added to existing values. The accumulation flags are reset at the end of backward when param_updated_ is true.
Usage
Z2CustomOpExecutor is instantiated per compiled graph during registration and receives gradient reduction requests through torch.ops.dc.reduce_grad, automatically managing accumulation state and batching communication.
Code Reference
Source Location
- Repository: DeepSpeed
- File: csrc/compile/z2.cpp
Signature
class Z2CustomOpExecutor : public CustomOpExecutor {
public:
Z2CustomOpExecutor(c10::intrusive_ptr<c10d::ProcessGroup> process_group,
std::shared_ptr<DSParamRegistry> param_registry,
std::shared_ptr<DoubleBufferedReduceBucket> reduce_buckets,
std::vector<long> ds_ids,
ncclComm_t nccl_comm,
at::cuda::CUDAStream rs_stream,
at::cuda::CUDAStream copy_stream,
bool pre_div_reduce);
void endBackward() override;
void flushReduceBucket(at::ScalarType scalar_type) override;
};
// Public API functions
void register_graph_z2(long graph_id, const std::vector<long>& ds_ids);
Import
import torch
from deepspeed.ops import dc
# Register ZeRO-2 parameters (same as ZeRO-1)
dc.register_param(
ds_id=param_id,
ds_shape=param.shape,
ds_tensor=param, # Full parameter (not partitioned)
grad_buffer=grad_buffer, # Shared gradient buffer segment
offset=offset_in_buffer # Offset for this parameter's gradients
)
# Register graph with parameter IDs
dc.register_graph_z2(graph_id=0, ds_ids=[0, 1, 2])
# Gradient reduction happens via custom op (inserted by compiler)
torch.ops.dc.reduce_grad(grad_tensor, graph_id=0, ds_id=0)
I/O Contract
Z2CustomOpExecutor Constructor
| Parameter | Type | Description |
|---|---|---|
| process_group | c10::intrusive_ptr<c10d::ProcessGroup> | Distributed process group |
| param_registry | std::shared_ptr<DSParamRegistry> | Registry of registered parameters |
| reduce_buckets | std::shared_ptr<DoubleBufferedReduceBucket> | Bucket manager for gradient coalescing |
| ds_ids | std::vector<long> | Parameter IDs managed by this executor |
| nccl_comm | ncclComm_t | NCCL communicator for direct NCCL calls |
| rs_stream | at::cuda::CUDAStream | Stream for allreduce operations |
| copy_stream | at::cuda::CUDAStream | Stream for gradient copies |
| pre_div_reduce | bool | Whether to pre-divide gradients (use ncclSum vs ncclAvg) |
endBackward
| Parameter | Type | Description |
|---|---|---|
| Effect | - | Resets accumulation flags (has_acc_grad_) when param_updated_ is true |
Behavior:
- Calls CustomOpExecutor::endBackward() to flush all buckets
- If param_updated_ is true, sets all has_acc_grad_ flags to false
- Prepares executor for next gradient accumulation cycle
flushReduceBucket
| Parameter | Type | Description |
|---|---|---|
| scalar_type | at::ScalarType | Data type of gradients to reduce |
| Effect | - | Performs NCCL allreduce and copies/accumulates results to gradient buffers |
Steps: 1. Blocks on copy stream events 2. Applies pre-division if enabled 3. Performs ncclAllReduce (in-place) on send buffers 4. For each parameter:
- If has_acc_grad_ is false: copies reduced gradient to buffer - If has_acc_grad_ is true: adds reduced gradient to existing buffer - Sets has_acc_grad_ to true
5. Swaps buffers and cleans up
register_graph_z2
| Parameter | Type | Description |
|---|---|---|
| graph_id | long | Unique graph identifier |
| ds_ids | std::vector<long> | Parameter IDs in this graph |
| Effect | - | Creates Z2CustomOpExecutor and stores in global executor map |
Usage Examples
import torch
import torch.distributed as dist
from deepspeed.ops import dc
# Initialize distributed
dist.init_process_group("nccl")
pg = dist.new_group()
# Initialize DeepCompile runtime
config = {
"symmetric_memory": False,
"double_buffer": True,
"free_activation_threshold": 1_000_000,
"sync_before_reduce": False,
"sync_after_reduce": False,
"sync_before_allgather": False,
"sync_after_allgather": False,
}
dc.init(pg, config, initial_reduce_bucket_size=100_000_000)
# Create shared gradient buffer for all parameters
total_grad_size = sum(p.numel() for p in model.parameters())
shared_grad_buffer = torch.zeros(total_grad_size, device='cuda', dtype=torch.float32)
# Register ZeRO-2 parameters with offsets (same as ZeRO-1)
offset = 0
param_ids = []
for i, param in enumerate(model.parameters()):
grad_buffer_slice = shared_grad_buffer[offset:offset + param.numel()].view(param.shape)
dc.register_param(
ds_id=i,
ds_shape=list(param.shape),
ds_tensor=param,
grad_buffer=grad_buffer_slice,
offset=offset
)
param_ids.append(i)
offset += param.numel()
# Register compiled graph
dc.register_graph_z2(graph_id=0, ds_ids=param_ids)
# Training with gradient accumulation
accumulation_steps = 4
optimizer.zero_grad()
for step in range(accumulation_steps):
dc.start_forward()
output = model(batch[step])
dc.end_forward()
loss = criterion(output, labels[step])
# All steps perform reduction, but accumulation is tracked
is_last_step = (step == accumulation_steps - 1)
dc.start_backward(update=is_last_step)
loss.backward()
# First reduction: gradient copied to buffer
# Subsequent reductions: gradient added to buffer
# has_acc_grad_ flags track this per parameter
if is_last_step:
dc.end_backward(graph_id=0) # Flushes buckets, resets accumulation flags
# Optimizer step uses shared gradient buffer
optimizer.step()
optimizer.zero_grad()
# Next iteration starts fresh (has_acc_grad_ all false)
dc.start_forward()
# ...
# Cleanup
dc.cleanup()
Implementation Details
Gradient Accumulation Tracking
The has_acc_grad_ map tracks whether each parameter has accumulated gradients: ```cpp void flushReduceBucket(at::ScalarType scalar_type) override {
// ... allreduce communication ...
at::cuda::CUDAStreamGuard guard(rs_stream_);
for (const ReduceTask& t : reduce_tasks_.at(scalar_type)) {
bool acc_grad = has_acc_grad_.at(t.getDSId());
auto grad_buf = param_registry_->getParam(t.getDSId()).getGradBuffer();
// ... extract recv_buf at correct offset ...
if (acc_grad) {
grad_buf.add_(recv_buf); // Accumulate
} else {
grad_buf.copy_(recv_buf); // First gradient
}
has_acc_grad_[t.getDSId()] = true;
}
} ```
Accumulation Flag Reset
Flags are reset at the end of backward when optimizer will step: ```cpp void endBackward() override {
CustomOpExecutor::endBackward(); // Flush all buckets
if (param_updated_) {
for (auto& it : has_acc_grad_) {
it.second = false; // Reset for next cycle
}
}
} ```
When param_updated_ is false (gradient accumulation mode), flags persist across backward passes.
AllReduce Implementation
Identical to ZeRO-1, performs in-place allreduce: ```cpp ncclGroupStart(); for (const ReduceTask& t : reduce_tasks_.at(scalar_type)) {
ncclAllReduce(
t.getSendBuf().data_ptr(),
t.getSendBuf().data_ptr(), // In-place
t.getSendBuf().numel(),
get_nccl_data_type(scalar_type),
getReductionOp(),
nccl_comm_,
rs_stream_
);
} ncclGroupEnd(); ```
Offset-Based Gradient Indexing
Same as ZeRO-1, uses offsets to extract gradient slices: ```cpp int64_t offset = param.getOffset(); auto recv_buf = t.getSendBuf().flatten().index(
{torch::indexing::Slice(offset, offset + grad_buf.numel())}
); ```
Differences from ZeRO-1 and ZeRO-3
| Feature | ZeRO-1 | ZeRO-2 | ZeRO-3 |
|---|---|---|---|
| reduceGrad Override | Yes (accumulates gradients) | No (uses base class) | No (uses base class) |
| Accumulation Handling | In reduceGrad method | In flushReduceBucket | In flushReduceBucket with temp buffer |
| has_acc_grad_ Usage | Not used | Tracks first vs accumulated | Tracks first vs accumulated |
| Accumulation Reset | Immediate (in reduceGrad) | Deferred (in endBackward) | Deferred (in endBackward) |
| Communication Pattern | AllReduce | AllReduce | ReduceScatter |
| Parameter Partitioning | No | No | Yes |
Key Difference: Copy vs. Add
The critical implementation difference between Z1 and Z2 is gradient accumulation handling:
ZeRO-1: Accumulates gradients in grad_tensors_ map before any reduction. Only reduces when param_updated_ is true.
ZeRO-2: Reduces every gradient immediately, but uses has_acc_grad_ to choose between copy (first) and add (subsequent).
This allows ZeRO-2 to support more flexible gradient accumulation patterns where individual parameters may have different accumulation states.