Implementation:Deepspeedai DeepSpeed ZeRO1 DeepCompile
| Knowledge Sources | |
|---|---|
| Domains | Graph_Compilation, ZeRO_Optimization, Optimizer_State_Partitioning |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
ZeRO Stage 1 DeepCompile implements torch.compile-compatible gradient reduction for optimizer state partitioning using NCCL allreduce with gradient accumulation support.
Description
The Z1CustomOpExecutor extends CustomOpExecutor to provide ZeRO Stage 1 functionality within DeepSpeed's graph compilation framework. ZeRO-1 partitions only optimizer states (not parameters), so gradients must be allreduced across all ranks. Key features include:
- AllReduce Communication: Uses NCCL ncclAllReduce to compute gradient sums across all ranks
- Gradient Accumulation: Maintains gradient tensor map to accumulate multiple backward passes before reduction
- Bucketed Reduction: Inherits bucket-based gradient coalescing from CustomOpExecutor for efficient communication
- Offset-Based Gradient Mapping: Each parameter has an offset into the shared gradient buffer for efficient indexing
- Pre-Division Support: Optional gradient pre-division before allreduce (when using ncclSum) vs. using ncclAvg
- Stream Coordination: Uses dedicated reduce-scatter stream (rs_stream) and copy stream for asynchronous operations
The implementation accumulates gradients locally during multiple micro-batches (gradient accumulation), then performs allreduce when param_updated_ is true. After allreduce, gradients are copied back to individual parameter gradient buffers at their designated offsets.
Usage
Z1CustomOpExecutor is instantiated per compiled graph during registration and receives gradient reduction requests through torch.ops.dc.reduce_grad, automatically batching and flushing communication buckets.
Code Reference
Source Location
- Repository: DeepSpeed
- File: csrc/compile/z1.cpp
Signature
class Z1CustomOpExecutor : public CustomOpExecutor {
public:
Z1CustomOpExecutor(c10::intrusive_ptr<c10d::ProcessGroup> process_group,
std::shared_ptr<DSParamRegistry> param_registry,
std::shared_ptr<DoubleBufferedReduceBucket> reduce_buckets,
std::vector<long> ds_ids,
ncclComm_t nccl_comm,
at::cuda::CUDAStream rs_stream,
at::cuda::CUDAStream copy_stream,
bool pre_div_reduce);
at::Tensor reduceGrad(at::Tensor grad_tensor, long ds_id) override;
void flushReduceBucket(at::ScalarType scalar_type) override;
private:
std::unordered_map<long, at::Tensor> grad_tensors_; // Accumulation map
};
// Public API functions
void register_graph_z1(long graph_id, const std::vector<long>& ds_ids);
void register_param(long ds_id,
const std::vector<int64_t>& ds_shape,
at::Tensor ds_tensor,
at::Tensor grad_buffer,
int64_t offset);
Import
import torch
from deepspeed.ops import dc
# Register ZeRO-1 parameter with offset into gradient buffer
dc.register_param(
ds_id=param_id,
ds_shape=param.shape,
ds_tensor=param, # Full parameter (not partitioned)
grad_buffer=grad_buffer, # Shared gradient buffer segment
offset=offset_in_buffer # Offset for this parameter's gradients
)
# Register graph with parameter IDs
dc.register_graph_z1(graph_id=0, ds_ids=[0, 1, 2])
# Gradient reduction happens via custom op (inserted by compiler)
torch.ops.dc.reduce_grad(grad_tensor, graph_id=0, ds_id=0)
I/O Contract
Z1CustomOpExecutor Constructor
| Parameter | Type | Description |
|---|---|---|
| process_group | c10::intrusive_ptr<c10d::ProcessGroup> | Distributed process group |
| param_registry | std::shared_ptr<DSParamRegistry> | Registry of registered parameters |
| reduce_buckets | std::shared_ptr<DoubleBufferedReduceBucket> | Bucket manager for gradient coalescing |
| ds_ids | std::vector<long> | Parameter IDs managed by this executor |
| nccl_comm | ncclComm_t | NCCL communicator for direct NCCL calls |
| rs_stream | at::cuda::CUDAStream | Stream for allreduce operations |
| copy_stream | at::cuda::CUDAStream | Stream for gradient copies |
| pre_div_reduce | bool | Whether to pre-divide gradients (use ncclSum vs ncclAvg) |
reduceGrad
| Parameter | Type | Description |
|---|---|---|
| grad_tensor | at::Tensor | Gradient tensor to reduce |
| ds_id | long | Parameter identifier |
| Returns | at::Tensor | Empty tensor (gradient stored in buffer) |
Behavior:
- If parameter not in grad_tensors_, stores gradient
- If parameter already in grad_tensors_, accumulates via add_()
- If param_updated_ is true, performs reduction and clears accumulation map
flushReduceBucket
| Parameter | Type | Description |
|---|---|---|
| scalar_type | at::ScalarType | Data type of gradients to reduce |
| Effect | - | Performs NCCL allreduce and copies results to gradient buffers |
Steps: 1. Blocks on copy stream events 2. Applies pre-division if enabled 3. Performs ncclAllReduce (in-place) on send buffers 4. Copies reduced gradients to parameter gradient buffers at correct offsets 5. Swaps buffers and cleans up
register_graph_z1
| Parameter | Type | Description |
|---|---|---|
| graph_id | long | Unique graph identifier |
| ds_ids | std::vector<long> | Parameter IDs in this graph |
| Effect | - | Creates Z1CustomOpExecutor and stores in global executor map |
register_param
| Parameter | Type | Description |
|---|---|---|
| ds_id | long | Parameter identifier |
| ds_shape | std::vector<int64_t> | Parameter shape |
| ds_tensor | at::Tensor | Full parameter tensor (not partitioned) |
| grad_buffer | at::Tensor | Gradient buffer segment for this parameter |
| offset | int64_t | Offset into shared optimizer gradient buffer |
| Effect | - | Registers parameter in global param_registry |
Usage Examples
import torch
import torch.distributed as dist
from deepspeed.ops import dc
# Initialize distributed
dist.init_process_group("nccl")
pg = dist.new_group()
# Initialize DeepCompile runtime
config = {
"symmetric_memory": False,
"double_buffer": True,
"free_activation_threshold": 1_000_000,
"sync_before_reduce": False,
"sync_after_reduce": False,
"sync_before_allgather": False,
"sync_after_allgather": False,
}
dc.init(pg, config, initial_reduce_bucket_size=100_000_000)
# Create shared gradient buffer for all parameters
total_grad_size = sum(p.numel() for p in model.parameters())
shared_grad_buffer = torch.zeros(total_grad_size, device='cuda', dtype=torch.float32)
# Register ZeRO-1 parameters with offsets
offset = 0
param_ids = []
for i, param in enumerate(model.parameters()):
grad_buffer_slice = shared_grad_buffer[offset:offset + param.numel()].view(param.shape)
dc.register_param(
ds_id=i,
ds_shape=list(param.shape),
ds_tensor=param, # Full parameter (ZeRO-1 doesn't partition params)
grad_buffer=grad_buffer_slice, # View into shared buffer
offset=offset # Offset for indexing
)
param_ids.append(i)
offset += param.numel()
# Register compiled graph
dc.register_graph_z1(graph_id=0, ds_ids=param_ids)
# Training with gradient accumulation
accumulation_steps = 4
optimizer.zero_grad()
for step in range(accumulation_steps):
dc.start_forward()
output = model(batch[step])
dc.end_forward()
loss = criterion(output, labels[step])
# Last step triggers actual reduction
is_last_step = (step == accumulation_steps - 1)
dc.start_backward(update=is_last_step)
loss.backward()
# Gradients accumulated via reduceGrad calls
# Only reduced when update=True
if is_last_step:
dc.end_backward(graph_id=0) # Flushes all buckets
# Optimizer step uses shared gradient buffer
optimizer.step()
# Cleanup
dc.cleanup()
Implementation Details
Gradient Accumulation Logic
The reduceGrad method implements accumulation: ```cpp at::Tensor reduceGrad(at::Tensor grad_tensor, long ds_id) override {
if (!hasKey(grad_tensors_, ds_id)) {
grad_tensors_[ds_id] = grad_tensor; // First gradient
} else {
grad_tensors_[ds_id].add_(grad_tensor); // Accumulate
}
if (param_updated_) {
CustomOpExecutor::reduceGrad(grad_tensors_[ds_id], ds_id);
grad_tensors_.erase(ds_id); // Clear accumulation
}
return at::Tensor();
} ```
When param_updated_ is false (gradient accumulation), gradients are stored locally. When param_updated_ is true (optimizer step), accumulated gradients are reduced and the map is cleared.
AllReduce Implementation
flushReduceBucket performs in-place allreduce: ```cpp ncclGroupStart(); for (const ReduceTask& t : reduce_tasks_.at(scalar_type)) {
ncclAllReduce(
t.getSendBuf().data_ptr(), // Input
t.getSendBuf().data_ptr(), // Output (in-place)
t.getSendBuf().numel(),
get_nccl_data_type(scalar_type),
getReductionOp(), // ncclSum or ncclAvg
nccl_comm_,
rs_stream_
);
} ncclGroupEnd(); ```
After allreduce, results are scattered back to individual gradient buffers using the offset field.
Offset-Based Gradient Indexing
Each parameter's gradient is stored in a shared buffer: ```cpp int64_t offset = param.getOffset(); auto recv_buf = t.getSendBuf().flatten().index(
{torch::indexing::Slice(offset, offset + grad_buf.numel())}
); grad_buf.copy_(recv_buf); ```
This enables efficient memory layout for optimizer states, which are contiguous across parameters.
Stream Synchronization
Two streams coordinate operations:
- rs_stream: Executes NCCL allreduce operations
- copy_stream: Handles gradient tensor copies
Events ensure copy completes before allreduce begins, and allreduce completes before results are used.
Differences from ZeRO-2 and ZeRO-3
| Feature | ZeRO-1 | ZeRO-2 | ZeRO-3 |
|---|---|---|---|
| Parameters | Full replicas | Full replicas | Partitioned |
| Gradients | AllReduce | AllReduce | ReduceScatter |
| Optimizer States | Partitioned | Partitioned | Partitioned |
| Communication | ncclAllReduce | ncclAllReduce | ncclReduceScatter + ncclAllGather |
| Memory Savings | Optimizer only | Optimizer + Gradients | Optimizer + Gradients + Parameters |
| Gradient Buffer | Shared with offsets | Shared with offsets | Per-partition |