Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Deepspeedai DeepSpeed ZeRO1 DeepCompile

From Leeroopedia


Knowledge Sources
Domains Graph_Compilation, ZeRO_Optimization, Optimizer_State_Partitioning
Last Updated 2026-02-09 00:00 GMT

Overview

ZeRO Stage 1 DeepCompile implements torch.compile-compatible gradient reduction for optimizer state partitioning using NCCL allreduce with gradient accumulation support.

Description

The Z1CustomOpExecutor extends CustomOpExecutor to provide ZeRO Stage 1 functionality within DeepSpeed's graph compilation framework. ZeRO-1 partitions only optimizer states (not parameters), so gradients must be allreduced across all ranks. Key features include:

  • AllReduce Communication: Uses NCCL ncclAllReduce to compute gradient sums across all ranks
  • Gradient Accumulation: Maintains gradient tensor map to accumulate multiple backward passes before reduction
  • Bucketed Reduction: Inherits bucket-based gradient coalescing from CustomOpExecutor for efficient communication
  • Offset-Based Gradient Mapping: Each parameter has an offset into the shared gradient buffer for efficient indexing
  • Pre-Division Support: Optional gradient pre-division before allreduce (when using ncclSum) vs. using ncclAvg
  • Stream Coordination: Uses dedicated reduce-scatter stream (rs_stream) and copy stream for asynchronous operations

The implementation accumulates gradients locally during multiple micro-batches (gradient accumulation), then performs allreduce when param_updated_ is true. After allreduce, gradients are copied back to individual parameter gradient buffers at their designated offsets.

Usage

Z1CustomOpExecutor is instantiated per compiled graph during registration and receives gradient reduction requests through torch.ops.dc.reduce_grad, automatically batching and flushing communication buckets.

Code Reference

Source Location

Signature

class Z1CustomOpExecutor : public CustomOpExecutor {
public:
    Z1CustomOpExecutor(c10::intrusive_ptr<c10d::ProcessGroup> process_group,
                       std::shared_ptr<DSParamRegistry> param_registry,
                       std::shared_ptr<DoubleBufferedReduceBucket> reduce_buckets,
                       std::vector<long> ds_ids,
                       ncclComm_t nccl_comm,
                       at::cuda::CUDAStream rs_stream,
                       at::cuda::CUDAStream copy_stream,
                       bool pre_div_reduce);

    at::Tensor reduceGrad(at::Tensor grad_tensor, long ds_id) override;
    void flushReduceBucket(at::ScalarType scalar_type) override;

private:
    std::unordered_map<long, at::Tensor> grad_tensors_;  // Accumulation map
};

// Public API functions
void register_graph_z1(long graph_id, const std::vector<long>& ds_ids);
void register_param(long ds_id,
                    const std::vector<int64_t>& ds_shape,
                    at::Tensor ds_tensor,
                    at::Tensor grad_buffer,
                    int64_t offset);

Import

import torch
from deepspeed.ops import dc

# Register ZeRO-1 parameter with offset into gradient buffer
dc.register_param(
    ds_id=param_id,
    ds_shape=param.shape,
    ds_tensor=param,           # Full parameter (not partitioned)
    grad_buffer=grad_buffer,   # Shared gradient buffer segment
    offset=offset_in_buffer    # Offset for this parameter's gradients
)

# Register graph with parameter IDs
dc.register_graph_z1(graph_id=0, ds_ids=[0, 1, 2])

# Gradient reduction happens via custom op (inserted by compiler)
torch.ops.dc.reduce_grad(grad_tensor, graph_id=0, ds_id=0)

I/O Contract

Z1CustomOpExecutor Constructor

Parameter Type Description
process_group c10::intrusive_ptr<c10d::ProcessGroup> Distributed process group
param_registry std::shared_ptr<DSParamRegistry> Registry of registered parameters
reduce_buckets std::shared_ptr<DoubleBufferedReduceBucket> Bucket manager for gradient coalescing
ds_ids std::vector<long> Parameter IDs managed by this executor
nccl_comm ncclComm_t NCCL communicator for direct NCCL calls
rs_stream at::cuda::CUDAStream Stream for allreduce operations
copy_stream at::cuda::CUDAStream Stream for gradient copies
pre_div_reduce bool Whether to pre-divide gradients (use ncclSum vs ncclAvg)

reduceGrad

Parameter Type Description
grad_tensor at::Tensor Gradient tensor to reduce
ds_id long Parameter identifier
Returns at::Tensor Empty tensor (gradient stored in buffer)

Behavior:

  • If parameter not in grad_tensors_, stores gradient
  • If parameter already in grad_tensors_, accumulates via add_()
  • If param_updated_ is true, performs reduction and clears accumulation map

flushReduceBucket

Parameter Type Description
scalar_type at::ScalarType Data type of gradients to reduce
Effect - Performs NCCL allreduce and copies results to gradient buffers

Steps: 1. Blocks on copy stream events 2. Applies pre-division if enabled 3. Performs ncclAllReduce (in-place) on send buffers 4. Copies reduced gradients to parameter gradient buffers at correct offsets 5. Swaps buffers and cleans up

register_graph_z1

Parameter Type Description
graph_id long Unique graph identifier
ds_ids std::vector<long> Parameter IDs in this graph
Effect - Creates Z1CustomOpExecutor and stores in global executor map

register_param

Parameter Type Description
ds_id long Parameter identifier
ds_shape std::vector<int64_t> Parameter shape
ds_tensor at::Tensor Full parameter tensor (not partitioned)
grad_buffer at::Tensor Gradient buffer segment for this parameter
offset int64_t Offset into shared optimizer gradient buffer
Effect - Registers parameter in global param_registry

Usage Examples

import torch
import torch.distributed as dist
from deepspeed.ops import dc

# Initialize distributed
dist.init_process_group("nccl")
pg = dist.new_group()

# Initialize DeepCompile runtime
config = {
    "symmetric_memory": False,
    "double_buffer": True,
    "free_activation_threshold": 1_000_000,
    "sync_before_reduce": False,
    "sync_after_reduce": False,
    "sync_before_allgather": False,
    "sync_after_allgather": False,
}
dc.init(pg, config, initial_reduce_bucket_size=100_000_000)

# Create shared gradient buffer for all parameters
total_grad_size = sum(p.numel() for p in model.parameters())
shared_grad_buffer = torch.zeros(total_grad_size, device='cuda', dtype=torch.float32)

# Register ZeRO-1 parameters with offsets
offset = 0
param_ids = []
for i, param in enumerate(model.parameters()):
    grad_buffer_slice = shared_grad_buffer[offset:offset + param.numel()].view(param.shape)

    dc.register_param(
        ds_id=i,
        ds_shape=list(param.shape),
        ds_tensor=param,               # Full parameter (ZeRO-1 doesn't partition params)
        grad_buffer=grad_buffer_slice, # View into shared buffer
        offset=offset                  # Offset for indexing
    )
    param_ids.append(i)
    offset += param.numel()

# Register compiled graph
dc.register_graph_z1(graph_id=0, ds_ids=param_ids)

# Training with gradient accumulation
accumulation_steps = 4
optimizer.zero_grad()

for step in range(accumulation_steps):
    dc.start_forward()
    output = model(batch[step])
    dc.end_forward()

    loss = criterion(output, labels[step])

    # Last step triggers actual reduction
    is_last_step = (step == accumulation_steps - 1)
    dc.start_backward(update=is_last_step)
    loss.backward()

    # Gradients accumulated via reduceGrad calls
    # Only reduced when update=True

    if is_last_step:
        dc.end_backward(graph_id=0)  # Flushes all buckets

# Optimizer step uses shared gradient buffer
optimizer.step()

# Cleanup
dc.cleanup()

Implementation Details

Gradient Accumulation Logic

The reduceGrad method implements accumulation: ```cpp at::Tensor reduceGrad(at::Tensor grad_tensor, long ds_id) override {

   if (!hasKey(grad_tensors_, ds_id)) {
       grad_tensors_[ds_id] = grad_tensor;  // First gradient
   } else {
       grad_tensors_[ds_id].add_(grad_tensor);  // Accumulate
   }
   if (param_updated_) {
       CustomOpExecutor::reduceGrad(grad_tensors_[ds_id], ds_id);
       grad_tensors_.erase(ds_id);  // Clear accumulation
   }
   return at::Tensor();

} ```

When param_updated_ is false (gradient accumulation), gradients are stored locally. When param_updated_ is true (optimizer step), accumulated gradients are reduced and the map is cleared.

AllReduce Implementation

flushReduceBucket performs in-place allreduce: ```cpp ncclGroupStart(); for (const ReduceTask& t : reduce_tasks_.at(scalar_type)) {

   ncclAllReduce(
       t.getSendBuf().data_ptr(),  // Input
       t.getSendBuf().data_ptr(),  // Output (in-place)
       t.getSendBuf().numel(),
       get_nccl_data_type(scalar_type),
       getReductionOp(),           // ncclSum or ncclAvg
       nccl_comm_,
       rs_stream_
   );

} ncclGroupEnd(); ```

After allreduce, results are scattered back to individual gradient buffers using the offset field.

Offset-Based Gradient Indexing

Each parameter's gradient is stored in a shared buffer: ```cpp int64_t offset = param.getOffset(); auto recv_buf = t.getSendBuf().flatten().index(

   {torch::indexing::Slice(offset, offset + grad_buf.numel())}

); grad_buf.copy_(recv_buf); ```

This enables efficient memory layout for optimizer states, which are contiguous across parameters.

Stream Synchronization

Two streams coordinate operations:

  • rs_stream: Executes NCCL allreduce operations
  • copy_stream: Handles gradient tensor copies

Events ensure copy completes before allreduce begins, and allreduce completes before results are used.

Differences from ZeRO-2 and ZeRO-3

Feature ZeRO-1 ZeRO-2 ZeRO-3
Parameters Full replicas Full replicas Partitioned
Gradients AllReduce AllReduce ReduceScatter
Optimizer States Partitioned Partitioned Partitioned
Communication ncclAllReduce ncclAllReduce ncclReduceScatter + ncclAllGather
Memory Savings Optimizer only Optimizer + Gradients Optimizer + Gradients + Parameters
Gradient Buffer Shared with offsets Shared with offsets Per-partition

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment