Implementation:Deepspeedai DeepSpeed DeepCompile Runtime
| Knowledge Sources | |
|---|---|
| Domains | Graph_Compilation, Distributed_Training, Runtime_System |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
DeepCompile Runtime provides the core initialization, lifecycle management, and shared infrastructure for torch.compile integration with DeepSpeed ZeRO optimizations.
Description
The DeepCompile runtime (deepcompile.cpp) serves as the central coordinator for all ZeRO stages within the graph compilation framework. It manages:
- Initialization: Establishes NCCL communicator from ProcessGroup, initializes parameter registry, and configures reduce buckets with optional double buffering
- Global State: Maintains shared resources including process group, NCCL communicator, parameter registry, executor map, and optional symmetric memory workspace
- Lifecycle Hooks: Provides start/end callbacks for forward and backward passes that coordinate all registered graph executors
- Configuration Management: Extracts settings from Python config objects including symmetric memory, double buffering, activation threshold, and debug synchronization flags
- Memory Management: Implements activation memory freeing for tensors exceeding size threshold via set_data() trick
- Gradient Reduction: Routes gradient reduction requests to appropriate graph executors
- Symmetric Memory: Lazily initializes symmetric memory workspace sized to largest registered parameter when enabled
- Cleanup: Provides reset (clear executors) and cleanup (destroy all resources) operations
The runtime uses a namespace-based design (dc::) and maintains global state accessible to all ZeRO stage implementations (Z1, Z2, Z3).
Usage
The runtime is initialized once per process before any model compilation, registers graphs per ZeRO stage, and coordinates execution through lifecycle callbacks invoked by the Python training loop.
Code Reference
Source Location
- Repository: DeepSpeed
- File: csrc/compile/deepcompile.cpp
Signature
namespace dc {
// Global state
extern std::shared_ptr<DSParamRegistry> param_registry;
extern std::unordered_map<long, std::shared_ptr<CustomOpExecutor>> executors;
extern std::shared_ptr<DoubleBufferedReduceBucket> reduce_buckets;
extern c10::intrusive_ptr<c10d::ProcessGroup> process_group;
extern c10::intrusive_ptr<c10d::symmetric_memory::SymmetricMemory> symm_mem;
extern ncclComm_t nccl_comm;
extern bool profile;
extern bool pre_div_reduce;
// Initialization and lifecycle
void init(c10::intrusive_ptr<c10d::ProcessGroup> pg,
pybind11::object& config,
int64_t initial_reduce_bucket_size);
void start_forward();
void end_forward();
void start_backward(bool update);
void end_backward(long graph_id);
void reset(); // Clear executors, keep buckets
void cleanup(); // Destroy all resources
// Utility functions
void enable_profiling(bool enable);
bool is_profiling();
void lazy_init_symm_memory();
ncclDataType_t get_nccl_data_type(at::ScalarType scalar_type);
// Gradient operations
at::Tensor reduce_grad(at::Tensor grad_tensor, long graph_id, long ds_id);
void free_tensors(std::vector<at::Tensor> tensors);
} // namespace dc
Import
import torch
import torch.distributed as dist
from deepspeed.ops import dc
# Configuration object
config = {
"symmetric_memory": False,
"double_buffer": True,
"free_activation_threshold": 1024 * 1024,
"sync_before_reduce": False,
"sync_after_reduce": False,
"sync_before_allgather": False,
"sync_after_allgather": False,
}
# Initialize runtime
process_group = dist.new_group()
dc.init(process_group, config, initial_reduce_bucket_size=100_000_000)
# Enable profiling mode (skips actual communication)
dc.enable_profiling(True)
# Lifecycle management in training loop
dc.start_forward()
# ... forward pass ...
dc.end_forward()
dc.start_backward(update=True) # update=True means optimizer will step
# ... backward pass ...
dc.end_backward(graph_id=0)
# Cleanup
dc.reset() # Clear executors but keep memory allocations
dc.cleanup() # Full cleanup including NCCL
I/O Contract
init
| Parameter | Type | Description |
|---|---|---|
| pg | c10::intrusive_ptr<c10d::ProcessGroup> | PyTorch distributed process group |
| config | pybind11::object | Python config object with settings |
| initial_reduce_bucket_size | int64_t | Initial size for gradient reduction buckets |
| Effect | - | Creates NCCL communicator, initializes registry and buckets |
Config Fields:
| Field | Type | Description |
|---|---|---|
| symmetric_memory | bool | Enable symmetric memory for allgather |
| double_buffer | bool | Enable double buffering for reduce buckets |
| free_activation_threshold | int64_t | Minimum size (elements) to free activations |
| sync_before_reduce | bool | Synchronize device before reduce (debug) |
| sync_after_reduce | bool | Synchronize device after reduce (debug) |
| sync_before_allgather | bool | Synchronize device before allgather (debug) |
| sync_after_allgather | bool | Synchronize device after allgather (debug) |
Lifecycle Functions
| Function | Description |
|---|---|
| start_forward() | Lazy-initializes symmetric memory, calls startForward() on all executors |
| end_forward() | Calls endForward() on all executors |
| start_backward(update) | Passes update flag to all executors (gradient accumulation) |
| end_backward(graph_id) | Flushes reduce buckets, synchronizes reduce stream |
reduce_grad
| Parameter | Type | Description |
|---|---|---|
| grad_tensor | at::Tensor | Gradient tensor to reduce |
| graph_id | long | Graph executor identifier |
| ds_id | long | Parameter identifier |
| Returns | at::Tensor | Empty tensor (gradient stored in buffer) |
free_tensors
| Parameter | Type | Description |
|---|---|---|
| tensors | std::vector<at::Tensor> | Activation tensors to free |
| Effect | - | Calls record_stream() then set_data(empty) for large CUDA tensors |
Utility Functions
| Function | Description |
|---|---|
| enable_profiling(enable) | Sets profile flag; when true, skips actual communication |
| is_profiling() | Returns current profiling state |
| lazy_init_symm_memory() | Creates symmetric memory workspace on first forward pass |
| reset() | Clears executor map, preserves bucket allocations |
| cleanup() | Destroys NCCL communicator, clears all state |
Usage Examples
import torch
import torch.distributed as dist
from deepspeed.ops import dc
from deepspeed.compile import register_graph_z3, register_z3_param
# Initialize distributed
dist.init_process_group("nccl")
# Configure DeepCompile
config = {
"symmetric_memory": True, # Use symmetric memory for allgather
"double_buffer": True, # Pipeline reduce communication
"free_activation_threshold": 10_000_000, # Free activations > 10M elements
"sync_before_reduce": False,
"sync_after_reduce": False,
"sync_before_allgather": False,
"sync_after_allgather": False,
}
# Initialize runtime with process group
pg = dist.new_group()
dc.init(pg, config, initial_reduce_bucket_size=100_000_000)
# Register model parameters (ZeRO-3 example)
model_params = []
for i, (param, grad_buf) in enumerate(zip(model.parameters(), grad_buffers)):
register_z3_param(
ds_id=i,
ds_shape=param.shape,
ds_tensor=param_shards[i],
grad_buffer=grad_buf,
persistent=False
)
model_params.append(i)
# Register compiled graph
register_graph_z3(graph_id=0, ds_ids=model_params)
# Training loop with lifecycle management
for epoch in range(num_epochs):
for batch in dataloader:
# Forward pass lifecycle
dc.start_forward()
outputs = model(batch)
dc.end_forward()
# Backward pass lifecycle
loss = criterion(outputs, batch.labels)
dc.start_backward(update=True) # update=True for optimizer step
loss.backward()
dc.end_backward(graph_id=0)
# Optimizer step
optimizer.step()
optimizer.zero_grad()
# Profiling mode (dry run without communication)
dc.enable_profiling(True)
dc.start_forward()
_ = model(sample_batch) # Profile without actual allgather
dc.end_forward()
dc.enable_profiling(False)
# Free large activations manually
activations = [act1, act2, act3] # Large intermediate tensors
dc.free_tensors(activations) # Frees if size > threshold
# Cleanup options
dc.reset() # Clear graph executors (e.g., before recompilation)
dc.cleanup() # Full teardown (end of training)
Implementation Details
NCCL Communicator Initialization
The init function creates a NCCL communicator from the ProcessGroup: 1. Rank 0 generates ncclUniqueId via ncclGetUniqueId() 2. ID is broadcast to all ranks using ProcessGroup::broadcast() 3. Each rank calls ncclCommInitRank() with the shared ID
This approach allows NCCL direct calls with specific streams while maintaining ProcessGroup compatibility.
Symmetric Memory Management
Symmetric memory is lazily initialized on first forward pass:
- Scans all registered parameters to find maximum size
- Allocates symmetric memory workspace via c10d::symmetric_memory::empty_strided_p2p()
- Performs rendezvous across all ranks
- Used by Z3 allgather operations as alternative to NCCL
Double Buffering
When enabled, DoubleBufferedReduceBucket maintains two buffers per data type:
- Current buffer accumulates new gradients
- Shadow buffer is used by ongoing NCCL operation
- Buffers swap after each flush, enabling overlap of computation and communication
Profiling Mode
Setting profile=true causes executors to skip actual communication:
- Allgather returns empty tensors with correct metadata
- Reduce operations are no-ops
- Useful for graph capture, memory profiling, and dry runs
Activation Memory Management
free_tensors implements the "set_data trick": 1. Calls record_stream() to ensure CUDA operations complete 2. Replaces tensor data pointer with empty tensor via set_data() 3. Original memory released by PyTorch garbage collector 4. Only applies to CUDA tensors exceeding free_activation_threshold
Related Pages
- Environment:Deepspeedai_DeepSpeed_CUDA_GPU_Environment
- Implementation:Deepspeedai_DeepSpeed_DeepCompile_Header
- Implementation:Deepspeedai_DeepSpeed_DeepCompile_Init
- Implementation:Deepspeedai_DeepSpeed_ZeRO3_DeepCompile
- Implementation:Deepspeedai_DeepSpeed_ZeRO1_DeepCompile
- Implementation:Deepspeedai_DeepSpeed_ZeRO2_DeepCompile