Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Deepspeedai DeepSpeed ZeRO2 DeepCompile

From Leeroopedia


Knowledge Sources
Domains Graph_Compilation, ZeRO_Optimization, Gradient_Partitioning
Last Updated 2026-02-09 00:00 GMT

Overview

ZeRO Stage 2 DeepCompile implements torch.compile-compatible gradient reduction for optimizer state and gradient partitioning using NCCL allreduce with gradient accumulation tracking.

Description

The Z2CustomOpExecutor extends CustomOpExecutor to provide ZeRO Stage 2 functionality within DeepSpeed's graph compilation framework. ZeRO-2 partitions optimizer states and gradients, requiring allreduce with accumulation-aware gradient handling. Key features include:

  • AllReduce Communication: Uses NCCL ncclAllReduce to compute gradient sums across all ranks (same as ZeRO-1)
  • Gradient Accumulation Tracking: Maintains has_acc_grad_ map to distinguish first gradient from accumulated gradients per parameter
  • Conditional Copy vs. Add: First reduced gradient is copied to buffer; subsequent gradients are accumulated via add_()
  • Bucketed Reduction: Inherits bucket-based gradient coalescing from CustomOpExecutor for efficient communication
  • Offset-Based Gradient Mapping: Each parameter has an offset into the shared gradient buffer for efficient indexing
  • Pre-Division Support: Optional gradient pre-division before allreduce (when using ncclSum) vs. using ncclAvg
  • Stream Coordination: Uses dedicated reduce-scatter stream (rs_stream) and copy stream for asynchronous operations

The critical difference from ZeRO-1 is accumulation handling: on the first reduction, gradients are copied to the buffer; on subsequent reductions (when has_acc_grad_ is true), gradients are added to existing values. The accumulation flags are reset at the end of backward when param_updated_ is true.

Usage

Z2CustomOpExecutor is instantiated per compiled graph during registration and receives gradient reduction requests through torch.ops.dc.reduce_grad, automatically managing accumulation state and batching communication.

Code Reference

Source Location

Signature

class Z2CustomOpExecutor : public CustomOpExecutor {
public:
    Z2CustomOpExecutor(c10::intrusive_ptr<c10d::ProcessGroup> process_group,
                       std::shared_ptr<DSParamRegistry> param_registry,
                       std::shared_ptr<DoubleBufferedReduceBucket> reduce_buckets,
                       std::vector<long> ds_ids,
                       ncclComm_t nccl_comm,
                       at::cuda::CUDAStream rs_stream,
                       at::cuda::CUDAStream copy_stream,
                       bool pre_div_reduce);

    void endBackward() override;
    void flushReduceBucket(at::ScalarType scalar_type) override;
};

// Public API functions
void register_graph_z2(long graph_id, const std::vector<long>& ds_ids);

Import

import torch
from deepspeed.ops import dc

# Register ZeRO-2 parameters (same as ZeRO-1)
dc.register_param(
    ds_id=param_id,
    ds_shape=param.shape,
    ds_tensor=param,           # Full parameter (not partitioned)
    grad_buffer=grad_buffer,   # Shared gradient buffer segment
    offset=offset_in_buffer    # Offset for this parameter's gradients
)

# Register graph with parameter IDs
dc.register_graph_z2(graph_id=0, ds_ids=[0, 1, 2])

# Gradient reduction happens via custom op (inserted by compiler)
torch.ops.dc.reduce_grad(grad_tensor, graph_id=0, ds_id=0)

I/O Contract

Z2CustomOpExecutor Constructor

Parameter Type Description
process_group c10::intrusive_ptr<c10d::ProcessGroup> Distributed process group
param_registry std::shared_ptr<DSParamRegistry> Registry of registered parameters
reduce_buckets std::shared_ptr<DoubleBufferedReduceBucket> Bucket manager for gradient coalescing
ds_ids std::vector<long> Parameter IDs managed by this executor
nccl_comm ncclComm_t NCCL communicator for direct NCCL calls
rs_stream at::cuda::CUDAStream Stream for allreduce operations
copy_stream at::cuda::CUDAStream Stream for gradient copies
pre_div_reduce bool Whether to pre-divide gradients (use ncclSum vs ncclAvg)

endBackward

Parameter Type Description
Effect - Resets accumulation flags (has_acc_grad_) when param_updated_ is true

Behavior:

  • Calls CustomOpExecutor::endBackward() to flush all buckets
  • If param_updated_ is true, sets all has_acc_grad_ flags to false
  • Prepares executor for next gradient accumulation cycle

flushReduceBucket

Parameter Type Description
scalar_type at::ScalarType Data type of gradients to reduce
Effect - Performs NCCL allreduce and copies/accumulates results to gradient buffers

Steps: 1. Blocks on copy stream events 2. Applies pre-division if enabled 3. Performs ncclAllReduce (in-place) on send buffers 4. For each parameter:

  - If has_acc_grad_ is false: copies reduced gradient to buffer
  - If has_acc_grad_ is true: adds reduced gradient to existing buffer
  - Sets has_acc_grad_ to true

5. Swaps buffers and cleans up

register_graph_z2

Parameter Type Description
graph_id long Unique graph identifier
ds_ids std::vector<long> Parameter IDs in this graph
Effect - Creates Z2CustomOpExecutor and stores in global executor map

Usage Examples

import torch
import torch.distributed as dist
from deepspeed.ops import dc

# Initialize distributed
dist.init_process_group("nccl")
pg = dist.new_group()

# Initialize DeepCompile runtime
config = {
    "symmetric_memory": False,
    "double_buffer": True,
    "free_activation_threshold": 1_000_000,
    "sync_before_reduce": False,
    "sync_after_reduce": False,
    "sync_before_allgather": False,
    "sync_after_allgather": False,
}
dc.init(pg, config, initial_reduce_bucket_size=100_000_000)

# Create shared gradient buffer for all parameters
total_grad_size = sum(p.numel() for p in model.parameters())
shared_grad_buffer = torch.zeros(total_grad_size, device='cuda', dtype=torch.float32)

# Register ZeRO-2 parameters with offsets (same as ZeRO-1)
offset = 0
param_ids = []
for i, param in enumerate(model.parameters()):
    grad_buffer_slice = shared_grad_buffer[offset:offset + param.numel()].view(param.shape)

    dc.register_param(
        ds_id=i,
        ds_shape=list(param.shape),
        ds_tensor=param,
        grad_buffer=grad_buffer_slice,
        offset=offset
    )
    param_ids.append(i)
    offset += param.numel()

# Register compiled graph
dc.register_graph_z2(graph_id=0, ds_ids=param_ids)

# Training with gradient accumulation
accumulation_steps = 4
optimizer.zero_grad()

for step in range(accumulation_steps):
    dc.start_forward()
    output = model(batch[step])
    dc.end_forward()

    loss = criterion(output, labels[step])

    # All steps perform reduction, but accumulation is tracked
    is_last_step = (step == accumulation_steps - 1)
    dc.start_backward(update=is_last_step)
    loss.backward()

    # First reduction: gradient copied to buffer
    # Subsequent reductions: gradient added to buffer
    # has_acc_grad_ flags track this per parameter

    if is_last_step:
        dc.end_backward(graph_id=0)  # Flushes buckets, resets accumulation flags

# Optimizer step uses shared gradient buffer
optimizer.step()
optimizer.zero_grad()

# Next iteration starts fresh (has_acc_grad_ all false)
dc.start_forward()
# ...

# Cleanup
dc.cleanup()

Implementation Details

Gradient Accumulation Tracking

The has_acc_grad_ map tracks whether each parameter has accumulated gradients: ```cpp void flushReduceBucket(at::ScalarType scalar_type) override {

   // ... allreduce communication ...
   at::cuda::CUDAStreamGuard guard(rs_stream_);
   for (const ReduceTask& t : reduce_tasks_.at(scalar_type)) {
       bool acc_grad = has_acc_grad_.at(t.getDSId());
       auto grad_buf = param_registry_->getParam(t.getDSId()).getGradBuffer();
       // ... extract recv_buf at correct offset ...
       if (acc_grad) {
           grad_buf.add_(recv_buf);  // Accumulate
       } else {
           grad_buf.copy_(recv_buf);  // First gradient
       }
       has_acc_grad_[t.getDSId()] = true;
   }

} ```

Accumulation Flag Reset

Flags are reset at the end of backward when optimizer will step: ```cpp void endBackward() override {

   CustomOpExecutor::endBackward();  // Flush all buckets
   if (param_updated_) {
       for (auto& it : has_acc_grad_) {
           it.second = false;  // Reset for next cycle
       }
   }

} ```

When param_updated_ is false (gradient accumulation mode), flags persist across backward passes.

AllReduce Implementation

Identical to ZeRO-1, performs in-place allreduce: ```cpp ncclGroupStart(); for (const ReduceTask& t : reduce_tasks_.at(scalar_type)) {

   ncclAllReduce(
       t.getSendBuf().data_ptr(),
       t.getSendBuf().data_ptr(),  // In-place
       t.getSendBuf().numel(),
       get_nccl_data_type(scalar_type),
       getReductionOp(),
       nccl_comm_,
       rs_stream_
   );

} ncclGroupEnd(); ```

Offset-Based Gradient Indexing

Same as ZeRO-1, uses offsets to extract gradient slices: ```cpp int64_t offset = param.getOffset(); auto recv_buf = t.getSendBuf().flatten().index(

   {torch::indexing::Slice(offset, offset + grad_buf.numel())}

); ```

Differences from ZeRO-1 and ZeRO-3

Feature ZeRO-1 ZeRO-2 ZeRO-3
reduceGrad Override Yes (accumulates gradients) No (uses base class) No (uses base class)
Accumulation Handling In reduceGrad method In flushReduceBucket In flushReduceBucket with temp buffer
has_acc_grad_ Usage Not used Tracks first vs accumulated Tracks first vs accumulated
Accumulation Reset Immediate (in reduceGrad) Deferred (in endBackward) Deferred (in endBackward)
Communication Pattern AllReduce AllReduce ReduceScatter
Parameter Partitioning No No Yes

Key Difference: Copy vs. Add

The critical implementation difference between Z1 and Z2 is gradient accumulation handling:

ZeRO-1: Accumulates gradients in grad_tensors_ map before any reduction. Only reduces when param_updated_ is true.

ZeRO-2: Reduces every gradient immediately, but uses has_acc_grad_ to choose between copy (first) and add (subsequent).

This allows ZeRO-2 to support more flexible gradient accumulation patterns where individual parameters may have different accumulation states.

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment