Implementation:Deepspeedai DeepSpeed DeepCompile Header
| Knowledge Sources | |
|---|---|
| Domains | Graph_Compilation, System_Architecture, Header_Interface |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
DeepCompile Header declares the complete infrastructure for DeepSpeed's graph compilation framework including parameter registry, executors, reduce buckets, and base classes shared across all ZeRO stages.
Description
The deepcompile.h header file provides the foundational declarations for DeepSpeed's graph compilation system. It defines:
- Core Classes: DSParam (parameter descriptor), DSParamRegistry (parameter management), ReduceTask (gradient reduction unit), ReduceBucket (gradient coalescing), DoubleBufferedReduceBucket (pipelined buckets), and CustomOpExecutor (base executor class)
- Global State: External declarations for process group, NCCL communicator, parameter registry, executor map, and configuration flags
- Utility Functions: Template helpers for map lookup (hasKey), string formatting (to_string, join_as_str), dimension products (productDim), and tensor inspection (tensorToString, tensorDimToString)
- Type Conversion: NCCL data type mapping (get_nccl_data_type), PyTorch size conversion (sizes_to_int_vector)
- Lifecycle Management: Initialization (init), cleanup (cleanup, reset), and forward/backward lifecycle hooks
- Communication Operations: Gradient reduction (reduce_grad), activation freeing (free_tensors), and their meta variants
- Memory Management: Symmetric memory workspace creation (getSymmMemWorkspace, lazy_init_symm_memory)
The header uses modern C++ features including smart pointers (std::shared_ptr, c10::intrusive_ptr), templates, and RAII patterns. It integrates with PyTorch's C++ API (ATen, c10d) and NCCL for distributed communication.
Usage
This header is included by all DeepCompile implementation files (deepcompile.cpp, z1.cpp, z2.cpp, z3.cpp, init.cpp) and serves as the central interface for the compilation framework.
Code Reference
Source Location
- Repository: DeepSpeed
- File: csrc/includes/deepcompile.h
Key Classes
namespace dc {
// Parameter descriptor with offload/reload support
class DSParam {
public:
DSParam(long id, std::vector<int64_t> ds_shape, at::Tensor ds_tensor,
at::Tensor grad_buffer, bool partitioned, int64_t offset, bool persistent);
long getId() const;
std::vector<int64_t> getShape() const;
at::ScalarType getDtype() const;
at::Tensor getDSTensor() const; // Handles offload/reload automatically
at::Tensor getGradBuffer() const;
bool isPartitioned() const;
int64_t getOffset() const; // For Z1/Z2
bool isPersistent() const; // For Z3
void setPersistent(bool persistent);
void offload(); // Move to CPU
void reload(); // Move to GPU
};
// Central parameter registry
class DSParamRegistry {
public:
void registerParam(long ds_id, const std::vector<int64_t>& ds_shape,
at::Tensor ds_tensor, at::Tensor grad_buffer,
bool partitioned, int64_t offset, bool persistent);
void registerGatheredParam(long ds_id, at::Tensor ds_tensor);
void unregisterGatheredParam(long ds_id);
const DSParam& getParam(long ds_id) const;
const at::Tensor& getGatheredParam(long ds_id) const;
bool hasGatheredParam(long ds_id) const;
void setPersistent(long ds_id, bool persistent);
void offload(long ds_id);
void reload(long ds_id);
void setValid(long ds_id, bool valid); // For Z3 validity tracking
bool isValid(long ds_id) const;
};
// Gradient reduction task
class ReduceTask {
public:
ReduceTask(long ds_id, at::Tensor grad, at::Tensor send_buf);
long getDSId() const;
at::Tensor getSendBuf() const;
};
// Single reduction bucket
class ReduceBucket {
public:
ReduceBucket(int64_t size, at::ScalarType scalar_type);
int64_t getSize() const;
int64_t getOffset() const;
at::Tensor getBuffer() const;
at::ScalarType getScalarType() const;
void reserve(int64_t size); // Extend buffer
at::Tensor allocate(int64_t numel); // Allocate slice
bool shouldFlush(int64_t numel); // Check if full
void reset(); // Reset offset
};
// Double-buffered buckets for overlap
class DoubleBufferedReduceBucket {
public:
DoubleBufferedReduceBucket(int64_t initial_bucket_size, bool enable_double_buffer);
void swap(at::ScalarType scalar_type,
at::cuda::CUDAStream rs_stream,
at::cuda::CUDAStream copy_stream);
std::shared_ptr<ReduceBucket> getBuffer(at::ScalarType scalar_type);
std::shared_ptr<at::cuda::CUDAEvent> getEvent(at::ScalarType scalar_type);
void clear();
};
// Base executor class for all ZeRO stages
class CustomOpExecutor {
public:
CustomOpExecutor(c10::intrusive_ptr<c10d::ProcessGroup> process_group,
std::shared_ptr<DSParamRegistry> param_registry,
std::shared_ptr<DoubleBufferedReduceBucket> reduce_buckets,
std::vector<long> ds_ids,
ncclComm_t nccl_comm,
at::cuda::CUDAStream rs_stream,
at::cuda::CUDAStream copy_stream,
bool pre_div_reduce);
virtual void startForward();
virtual void endForward();
virtual void startBackward(bool update);
virtual void endBackward();
virtual at::Tensor reduceGrad(at::Tensor grad_tensor, long ds_id);
bool hasParam(long ds_id) const;
protected:
virtual void flushReduceBucket(at::ScalarType scalar_type) = 0;
void flushAllReduceBuckets();
void blockCopyEvents(at::ScalarType scalar_type);
void applyPreDivision(at::ScalarType scalar_type);
ncclRedOp_t getReductionOp() const;
void performCleanup(at::ScalarType scalar_type);
c10::intrusive_ptr<c10d::ProcessGroup> process_group_;
std::shared_ptr<DSParamRegistry> param_registry_;
std::shared_ptr<DoubleBufferedReduceBucket> reduce_buckets_;
std::vector<long> ds_ids_;
ncclComm_t nccl_comm_;
at::cuda::CUDAStream rs_stream_;
at::cuda::CUDAStream copy_stream_;
std::unordered_map<at::ScalarType, std::vector<ReduceTask>> reduce_tasks_;
std::unordered_map<long, bool> has_acc_grad_;
bool param_updated_;
bool pre_div_reduce_;
};
} // namespace dc
Global State
namespace dc {
// Global resources
extern std::shared_ptr<DSParamRegistry> param_registry;
extern std::unordered_map<long, std::shared_ptr<CustomOpExecutor>> executors;
extern std::shared_ptr<DoubleBufferedReduceBucket> reduce_buckets;
extern c10::intrusive_ptr<c10d::ProcessGroup> process_group;
extern c10::intrusive_ptr<c10d::symmetric_memory::SymmetricMemory> symm_mem;
extern ncclComm_t nccl_comm;
// Configuration flags
extern bool use_symm_mem;
extern bool profile;
extern bool pre_div_reduce;
extern int64_t free_activation_threshold;
// Debug synchronization flags
extern bool sync_before_reduce;
extern bool sync_after_reduce;
extern bool sync_before_allgather;
extern bool sync_after_allgather;
} // namespace dc
Utility Functions
namespace dc {
// Map utilities
template <typename K, typename V>
static bool hasKey(const std::unordered_map<K, V>& map, const K& key);
// String formatting
template <typename T>
inline std::string to_string(const T& v);
template <typename T>
std::string join_as_str(const T& v, const char* delim = ",", const size_t maxlen = 0);
template <typename T>
std::string tensorPtrToString(T* ptr, size_t size, size_t str_len = 100);
std::string tensorPtrToString(void* ptr, size_t size, c10::ScalarType datatype,
size_t max_elem = 20, size_t max_str_len = 100);
std::string tensorToString(const at::Tensor& t, size_t max_elem = 20, size_t max_str_len = 100);
std::string tensorDimToString(const at::Tensor& t);
// Dimension utilities
template <typename L>
size_t productDim(const L& dim);
// Type conversion
ncclDataType_t get_nccl_data_type(at::ScalarType scalar_type);
std::vector<int64_t> sizes_to_int_vector(at::IntArrayRef sizes);
// Executor helpers
template <typename T, typename U>
std::shared_ptr<T> getExecutor(long graph_id,
const std::unordered_map<long, std::shared_ptr<U>>& executors);
} // namespace dc
Import
// Include in implementation files
#include "deepcompile.h"
// Access global state
dc::param_registry->registerParam(/*...*/);
dc::executors[graph_id] = std::make_shared<Z3CustomOpExecutor>(/*...*/);
// Use utilities
if (dc::hasKey(dc::executors, graph_id)) {
auto executor = dc::getExecutor<Z3CustomOpExecutor>(graph_id, dc::executors);
}
ncclDataType_t nccl_type = dc::get_nccl_data_type(tensor.scalar_type());
I/O Contract
DSParam
| Method | Returns | Description |
|---|---|---|
| getDSTensor() | at::Tensor | Returns ds_tensor_ or ds_reload_tensor_ with automatic stream sync |
| offload() | void | Copies ds_reload_tensor_ back to ds_tensor_ on CPU (async) |
| reload() | void | Copies ds_tensor_ from CPU to ds_reload_tensor_ on GPU (async) |
DSParamRegistry
| Method | Description |
|---|---|
| registerParam() | Registers parameter, zeros gradient buffer |
| registerGatheredParam() | Associates gathered tensor with parameter ID |
| unregisterGatheredParam() | Removes gathered tensor, sets valid=false |
| setValid() / isValid() | Tracks whether Z3 gathered parameter is current |
| offload() / reload() | Delegates to DSParam offload/reload methods |
ReduceBucket
| Method | Description |
|---|---|
| allocate() | Returns slice of buffer, advances offset |
| shouldFlush() | Returns true if allocating numel would exceed size |
| reset() | Sets offset to 0 |
| reserve() | Extends buffer if numel exceeds current size |
DoubleBufferedReduceBucket
| Method | Description |
|---|---|
| getBuffer() | Returns current buffer (creates if not exists) |
| swap() | Records event on rs_stream, swaps current/shadow buffers |
| getEvent() | Returns event for current buffer |
CustomOpExecutor
| Method | Description |
|---|---|
| startForward() | Virtual hook called at start of forward pass |
| endForward() | Virtual hook called at end of forward pass |
| startBackward(update) | Sets param_updated_ flag for gradient accumulation |
| endBackward() | Flushes all reduce buckets, synchronizes rs_stream |
| reduceGrad() | Copies gradient to bucket, flushes if full |
| flushReduceBucket() | Pure virtual - implemented by Z1/Z2/Z3 |
| blockCopyEvents() | Blocks rs_stream on copy completion events |
| applyPreDivision() | Divides gradients by world_size if pre_div_reduce_ |
| performCleanup() | Swaps buffers, blocks comp_stream on copy events |
Usage Examples
// Using DSParam with offload/reload
dc::DSParam param(0, {1024, 768}, ds_tensor, grad_buffer, true, 0, false);
// Offload to CPU
param.offload(); // Async copy to CPU
// Later, reload to GPU
param.reload(); // Async copy to GPU
// Access automatically handles streams
at::Tensor t = param.getDSTensor(); // Waits for reload if needed
// Using DSParamRegistry
auto registry = std::make_shared<dc::DSParamRegistry>();
registry->registerParam(0, {1024, 768}, ds_tensor, grad_buffer, true, 0, false);
const dc::DSParam& param = registry->getParam(0);
registry->setValid(0, true); // Mark Z3 parameter as valid
// Using ReduceBucket
dc::ReduceBucket bucket(100'000'000, at::kFloat);
if (bucket.shouldFlush(grad.numel())) {
// Flush logic
bucket.reset();
}
at::Tensor slice = bucket.allocate(grad.numel());
slice.copy_(grad);
// Using DoubleBufferedReduceBucket
auto buckets = std::make_shared<dc::DoubleBufferedReduceBucket>(100'000'000, true);
auto bucket = buckets->getBuffer(at::kFloat);
at::Tensor slice = bucket->allocate(1024);
// After communication
buckets->swap(at::kFloat, rs_stream, copy_stream);
// Using CustomOpExecutor (subclass implementation)
class MyExecutor : public dc::CustomOpExecutor {
void flushReduceBucket(at::ScalarType scalar_type) override {
blockCopyEvents(scalar_type);
applyPreDivision(scalar_type);
// Perform NCCL operation
ncclAllReduce(/*...*/);
performCleanup(scalar_type);
}
};
// Using utility functions
if (dc::hasKey(dc::executors, graph_id)) {
auto executor = dc::getExecutor<dc::Z3CustomOpExecutor>(graph_id, dc::executors);
}
ncclDataType_t nccl_dtype = dc::get_nccl_data_type(at::kFloat16);
std::string tensor_str = dc::tensorToString(tensor, 10, 100);
std::cout << "Tensor: " << tensor_str << std::endl;
Design Patterns
RAII and Smart Pointers
The header extensively uses smart pointers:
- std::shared_ptr for reference-counted resources (DSParamRegistry, executors, buckets)
- c10::intrusive_ptr for PyTorch's reference-counted types (ProcessGroup, SymmetricMemory)
- Ensures automatic cleanup and prevents memory leaks
Template-Based Utilities
Generic functions use templates for flexibility: ```cpp template <typename K, typename V> static bool hasKey(const std::unordered_map<K, V>& map, const K& key)
template <typename T, typename U> std::shared_ptr<T> getExecutor(long graph_id, const std::unordered_map<long, std::shared_ptr>& executors) ```
Virtual Functions and Inheritance
CustomOpExecutor uses virtual functions for polymorphism:
- Pure virtual flushReduceBucket() requires subclass implementation
- Virtual startForward/endForward/startBackward/endBackward allow override
- Enables Z1/Z2/Z3-specific behavior while sharing common logic
Double Buffering Pattern
DoubleBufferedReduceBucket implements double buffering:
- Current buffer accumulates new gradients
- Shadow buffer used by ongoing communication
- Swap after flush enables computation/communication overlap
Lazy Initialization
Symmetric memory is lazily initialized: ```cpp void lazy_init_symm_memory(); // Called on first forward pass ``` Avoids allocation overhead if symmetric memory disabled.
Stream-Based Asynchrony
All communication uses dedicated streams:
- rs_stream for reduce operations
- copy_stream for gradient copies
- Events coordinate dependencies without blocking default stream
Class Relationships
CustomOpExecutor (abstract base)
├── Z1CustomOpExecutor (ZeRO-1: allreduce, gradient accumulation in map)
├── Z2CustomOpExecutor (ZeRO-2: allreduce, has_acc_grad_ tracking)
└── Z3CustomOpExecutor (ZeRO-3: allgather + reduce-scatter, offload/reload)
DSParamRegistry
└── contains: std::unordered_map<long, DSParam>
DoubleBufferedReduceBucket
├── current_buffer_: std::unordered_map<ScalarType, shared_ptr<ReduceBucket>>
└── shadow_buffer_: std::unordered_map<ScalarType, shared_ptr<ReduceBucket>>
Global State (extern in dc namespace)
├── param_registry: shared_ptr<DSParamRegistry>
├── executors: unordered_map<long, shared_ptr<CustomOpExecutor>>
└── reduce_buckets: shared_ptr<DoubleBufferedReduceBucket>
Related Pages
- Environment:Deepspeedai_DeepSpeed_CUDA_GPU_Environment
- Implementation:Deepspeedai_DeepSpeed_DeepCompile_Runtime
- Implementation:Deepspeedai_DeepSpeed_ZeRO3_DeepCompile
- Implementation:Deepspeedai_DeepSpeed_ZeRO1_DeepCompile
- Implementation:Deepspeedai_DeepSpeed_ZeRO2_DeepCompile
- Implementation:Deepspeedai_DeepSpeed_ZeRO3_API_Header
- Concept:Header_Files
- Concept:RAII
- Concept:Template_Metaprogramming