Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Deepspeedai DeepSpeed DeepCompile Runtime

From Leeroopedia
Revision as of 14:45, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/Deepspeedai_DeepSpeed_DeepCompile_Runtime.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Knowledge Sources
Domains Graph_Compilation, Distributed_Training, Runtime_System
Last Updated 2026-02-09 00:00 GMT

Overview

DeepCompile Runtime provides the core initialization, lifecycle management, and shared infrastructure for torch.compile integration with DeepSpeed ZeRO optimizations.

Description

The DeepCompile runtime (deepcompile.cpp) serves as the central coordinator for all ZeRO stages within the graph compilation framework. It manages:

  • Initialization: Establishes NCCL communicator from ProcessGroup, initializes parameter registry, and configures reduce buckets with optional double buffering
  • Global State: Maintains shared resources including process group, NCCL communicator, parameter registry, executor map, and optional symmetric memory workspace
  • Lifecycle Hooks: Provides start/end callbacks for forward and backward passes that coordinate all registered graph executors
  • Configuration Management: Extracts settings from Python config objects including symmetric memory, double buffering, activation threshold, and debug synchronization flags
  • Memory Management: Implements activation memory freeing for tensors exceeding size threshold via set_data() trick
  • Gradient Reduction: Routes gradient reduction requests to appropriate graph executors
  • Symmetric Memory: Lazily initializes symmetric memory workspace sized to largest registered parameter when enabled
  • Cleanup: Provides reset (clear executors) and cleanup (destroy all resources) operations

The runtime uses a namespace-based design (dc::) and maintains global state accessible to all ZeRO stage implementations (Z1, Z2, Z3).

Usage

The runtime is initialized once per process before any model compilation, registers graphs per ZeRO stage, and coordinates execution through lifecycle callbacks invoked by the Python training loop.

Code Reference

Source Location

Signature

namespace dc {

// Global state
extern std::shared_ptr<DSParamRegistry> param_registry;
extern std::unordered_map<long, std::shared_ptr<CustomOpExecutor>> executors;
extern std::shared_ptr<DoubleBufferedReduceBucket> reduce_buckets;
extern c10::intrusive_ptr<c10d::ProcessGroup> process_group;
extern c10::intrusive_ptr<c10d::symmetric_memory::SymmetricMemory> symm_mem;
extern ncclComm_t nccl_comm;
extern bool profile;
extern bool pre_div_reduce;

// Initialization and lifecycle
void init(c10::intrusive_ptr<c10d::ProcessGroup> pg,
          pybind11::object& config,
          int64_t initial_reduce_bucket_size);

void start_forward();
void end_forward();
void start_backward(bool update);
void end_backward(long graph_id);

void reset();      // Clear executors, keep buckets
void cleanup();    // Destroy all resources

// Utility functions
void enable_profiling(bool enable);
bool is_profiling();
void lazy_init_symm_memory();
ncclDataType_t get_nccl_data_type(at::ScalarType scalar_type);

// Gradient operations
at::Tensor reduce_grad(at::Tensor grad_tensor, long graph_id, long ds_id);
void free_tensors(std::vector<at::Tensor> tensors);

}  // namespace dc

Import

import torch
import torch.distributed as dist
from deepspeed.ops import dc

# Configuration object
config = {
    "symmetric_memory": False,
    "double_buffer": True,
    "free_activation_threshold": 1024 * 1024,
    "sync_before_reduce": False,
    "sync_after_reduce": False,
    "sync_before_allgather": False,
    "sync_after_allgather": False,
}

# Initialize runtime
process_group = dist.new_group()
dc.init(process_group, config, initial_reduce_bucket_size=100_000_000)

# Enable profiling mode (skips actual communication)
dc.enable_profiling(True)

# Lifecycle management in training loop
dc.start_forward()
# ... forward pass ...
dc.end_forward()

dc.start_backward(update=True)  # update=True means optimizer will step
# ... backward pass ...
dc.end_backward(graph_id=0)

# Cleanup
dc.reset()    # Clear executors but keep memory allocations
dc.cleanup()  # Full cleanup including NCCL

I/O Contract

init

Parameter Type Description
pg c10::intrusive_ptr<c10d::ProcessGroup> PyTorch distributed process group
config pybind11::object Python config object with settings
initial_reduce_bucket_size int64_t Initial size for gradient reduction buckets
Effect - Creates NCCL communicator, initializes registry and buckets

Config Fields:

Field Type Description
symmetric_memory bool Enable symmetric memory for allgather
double_buffer bool Enable double buffering for reduce buckets
free_activation_threshold int64_t Minimum size (elements) to free activations
sync_before_reduce bool Synchronize device before reduce (debug)
sync_after_reduce bool Synchronize device after reduce (debug)
sync_before_allgather bool Synchronize device before allgather (debug)
sync_after_allgather bool Synchronize device after allgather (debug)

Lifecycle Functions

Function Description
start_forward() Lazy-initializes symmetric memory, calls startForward() on all executors
end_forward() Calls endForward() on all executors
start_backward(update) Passes update flag to all executors (gradient accumulation)
end_backward(graph_id) Flushes reduce buckets, synchronizes reduce stream

reduce_grad

Parameter Type Description
grad_tensor at::Tensor Gradient tensor to reduce
graph_id long Graph executor identifier
ds_id long Parameter identifier
Returns at::Tensor Empty tensor (gradient stored in buffer)

free_tensors

Parameter Type Description
tensors std::vector<at::Tensor> Activation tensors to free
Effect - Calls record_stream() then set_data(empty) for large CUDA tensors

Utility Functions

Function Description
enable_profiling(enable) Sets profile flag; when true, skips actual communication
is_profiling() Returns current profiling state
lazy_init_symm_memory() Creates symmetric memory workspace on first forward pass
reset() Clears executor map, preserves bucket allocations
cleanup() Destroys NCCL communicator, clears all state

Usage Examples

import torch
import torch.distributed as dist
from deepspeed.ops import dc
from deepspeed.compile import register_graph_z3, register_z3_param

# Initialize distributed
dist.init_process_group("nccl")

# Configure DeepCompile
config = {
    "symmetric_memory": True,  # Use symmetric memory for allgather
    "double_buffer": True,     # Pipeline reduce communication
    "free_activation_threshold": 10_000_000,  # Free activations > 10M elements
    "sync_before_reduce": False,
    "sync_after_reduce": False,
    "sync_before_allgather": False,
    "sync_after_allgather": False,
}

# Initialize runtime with process group
pg = dist.new_group()
dc.init(pg, config, initial_reduce_bucket_size=100_000_000)

# Register model parameters (ZeRO-3 example)
model_params = []
for i, (param, grad_buf) in enumerate(zip(model.parameters(), grad_buffers)):
    register_z3_param(
        ds_id=i,
        ds_shape=param.shape,
        ds_tensor=param_shards[i],
        grad_buffer=grad_buf,
        persistent=False
    )
    model_params.append(i)

# Register compiled graph
register_graph_z3(graph_id=0, ds_ids=model_params)

# Training loop with lifecycle management
for epoch in range(num_epochs):
    for batch in dataloader:
        # Forward pass lifecycle
        dc.start_forward()
        outputs = model(batch)
        dc.end_forward()

        # Backward pass lifecycle
        loss = criterion(outputs, batch.labels)
        dc.start_backward(update=True)  # update=True for optimizer step
        loss.backward()
        dc.end_backward(graph_id=0)

        # Optimizer step
        optimizer.step()
        optimizer.zero_grad()

# Profiling mode (dry run without communication)
dc.enable_profiling(True)
dc.start_forward()
_ = model(sample_batch)  # Profile without actual allgather
dc.end_forward()
dc.enable_profiling(False)

# Free large activations manually
activations = [act1, act2, act3]  # Large intermediate tensors
dc.free_tensors(activations)  # Frees if size > threshold

# Cleanup options
dc.reset()    # Clear graph executors (e.g., before recompilation)
dc.cleanup()  # Full teardown (end of training)

Implementation Details

NCCL Communicator Initialization

The init function creates a NCCL communicator from the ProcessGroup: 1. Rank 0 generates ncclUniqueId via ncclGetUniqueId() 2. ID is broadcast to all ranks using ProcessGroup::broadcast() 3. Each rank calls ncclCommInitRank() with the shared ID

This approach allows NCCL direct calls with specific streams while maintaining ProcessGroup compatibility.

Symmetric Memory Management

Symmetric memory is lazily initialized on first forward pass:

  • Scans all registered parameters to find maximum size
  • Allocates symmetric memory workspace via c10d::symmetric_memory::empty_strided_p2p()
  • Performs rendezvous across all ranks
  • Used by Z3 allgather operations as alternative to NCCL

Double Buffering

When enabled, DoubleBufferedReduceBucket maintains two buffers per data type:

  • Current buffer accumulates new gradients
  • Shadow buffer is used by ongoing NCCL operation
  • Buffers swap after each flush, enabling overlap of computation and communication

Profiling Mode

Setting profile=true causes executors to skip actual communication:

  • Allgather returns empty tensors with correct metadata
  • Reduce operations are no-ops
  • Useful for graph capture, memory profiling, and dry runs

Activation Memory Management

free_tensors implements the "set_data trick": 1. Calls record_stream() to ensure CUDA operations complete 2. Replaces tensor data pointer with empty tensor via set_data() 3. Original memory released by PyTorch garbage collector 4. Only applies to CUDA tensors exceeding free_activation_threshold

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment