Implementation:Deepspeedai DeepSpeed SHM Allreduce
| Knowledge Sources | |
|---|---|
| Domains | Communication, Distributed_Computing, CPU_Optimization, SIMD |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
Low-latency shared memory based allreduce implementation optimized with SIMD instructions for CPU-based distributed inference.
Description
The SHM Allreduce provides a high-performance, low-latency alternative to traditional network-based collective communication by using POSIX shared memory (shm_open/mmap) for inter-process communication when all ranks reside on the same physical machine. It implements two complementary allreduce algorithms: symmetric naive allreduce for small tensors (< 1MB) and distributed naive reduce-scatter/allgather for large tensors (>= 1MB). Both algorithms are heavily optimized with SIMD vectorization using either AVX-512 instructions (x86_64) or RISC-V Vector extensions (riscv64).
The symmetric naive algorithm uses a copy-reduce pattern with double buffering and a three-state FSM (finite state machine) to handle overlapping iterations. Each rank copies data to its own SHM buffer, waits for all ranks to complete copying, then performs local reduction across all buffers. The distributed naive algorithm implements a reduce-scatter followed by allgather pattern, where each rank reduces its assigned slice of the data, then all ranks copy the reduced slices back to their local memory.
The implementation supports BFloat16, Float16, and Float32 data types with specialized reduction kernels that convert to FP32 for accumulation to maintain numerical accuracy. It uses OpenMP parallelization for large reductions and handles non-aligned remainder elements with scalar fallback. Double buffering allows leading ranks to proceed to the next iteration while lagging ranks complete the current one, maximizing throughput.
Usage
Use SHM Allreduce for low-latency inference workloads when all distributed ranks are co-located on the same physical node (LOCAL_SIZE == world_size). It is automatically selected by the CCL backend's inference_all_reduce when these conditions are met, requiring no explicit API calls.
Code Reference
Source Location
- Repository: DeepSpeed
- File: csrc/cpu/comm/shm.cpp
Signature
// Initialization
void shm_initialize(int size, int rank, char* addr_string, char* port_string);
// Main entry point
void all_reduce_outer_loop(torch::Tensor& data, size_t numel, int data_size);
// Algorithm implementations
void symmetric_naive_all_reduce(char* data_ptr, c10::ScalarType scalar_type,
size_t chunk_size, size_t chunk_el);
void distributed_naive_reduce(char* data_ptr, c10::ScalarType scalar_type,
size_t chunk_size, size_t chunk_el);
// SIMD-optimized reduction kernels
void reduce_bf16_buffers(int start_elements, int num_elements,
char* to_buffer, char** buffers);
void reduce_fp16_buffers(int start_elements, int num_elements,
char* to_buffer, char** buffers);
void reduce_fp32_buffers(int start_elements, int num_elements,
char* to_buffer, char** buffers);
// Utility functions
void parallel_memcpy(void* to, void* from, size_t n_bytes);
void reduce_all_buffers(int start_elements, int num_elements,
c10::ScalarType scalar_type, int to_buffer_idx,
char* to_buffer, char** buffers);
Import
# SHM allreduce is used automatically by the CCL backend
# when LOCAL_SIZE environment variable equals world size
import os
import torch
import deepspeed
# Ensure all ranks are on same machine
os.environ['LOCAL_SIZE'] = str(world_size)
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '29500'
# Initialize DeepSpeed with CCL backend
deepspeed.init_distributed(dist_backend='ccl')
# SHM allreduce is automatically selected for inference
tensor = torch.randn(1024, 1024, dtype=torch.bfloat16)
deepspeed.comm.inference_all_reduce(tensor) # Uses SHM under the hood
I/O Contract
| Parameter | Type | Description |
|---|---|---|
| size | int | Total number of local ranks |
| rank | int | Current rank identifier |
| addr_string | char* | Master address (from MASTER_ADDR) |
| port_string | char* | Master port (from MASTER_PORT) |
| Effect | Creates shared memory buffers for all ranks |
| Parameter | Type | Description |
|---|---|---|
| data | torch::Tensor | Input/output tensor (in-place reduction) |
| numel | size_t | Number of elements in tensor |
| data_size | int | Total size in bytes |
| Effect | Processes tensor in chunks up to 32MB |
| Chunk Size | Algorithm | Buffer |
|---|---|---|
| < 1 MB | symmetric_naive_all_reduce | 2 x 1MB buffers |
| >= 1 MB | distributed_naive_reduce | 2 x 32MB buffers |
| Offset | Size | Purpose |
|---|---|---|
| 0 | 2 x 1MB | Double buffer for symmetric allreduce |
| 2MB | 2 x 32MB | Double buffer for distributed allreduce |
| + states | 2 ints | FSM states for synchronization |
| Type | Element Size | Conversion |
|---|---|---|
| BFloat16 | 2 bytes | Convert to FP32 for reduction |
| Half (FP16) | 2 bytes | Convert to FP32 for reduction |
| Float | 4 bytes | Native FP32 reduction |
Usage Examples
import os
import torch
import deepspeed
# Setup for local multi-process training
world_size = 8
os.environ['LOCAL_SIZE'] = str(world_size)
os.environ['MASTER_ADDR'] = '127.0.0.1'
os.environ['MASTER_PORT'] = '29500'
# Initialize DeepSpeed
deepspeed.init_distributed(dist_backend='ccl', rank=rank, world_size=world_size)
# Example 1: Small tensor (uses symmetric_naive_all_reduce)
small_tensor = torch.randn(256, 256, dtype=torch.bfloat16) # 128KB
deepspeed.comm.inference_all_reduce(small_tensor)
# Example 2: Large tensor (uses distributed_naive_reduce)
large_tensor = torch.randn(2048, 2048, dtype=torch.float32) # 16MB
deepspeed.comm.inference_all_reduce(large_tensor)
# Example 3: Multiple iterations with double buffering
for step in range(100):
gradient = torch.randn(1024, 1024, dtype=torch.bfloat16)
# Double buffering allows overlapping communication
deepspeed.comm.inference_all_reduce(gradient)
# While reducing current gradient, next iteration can start copying
# Example 4: Mixed precision training
bf16_activations = torch.randn(512, 512, dtype=torch.bfloat16)
fp32_weights = torch.randn(512, 512, dtype=torch.float32)
deepspeed.comm.inference_all_reduce(bf16_activations) # BF16 reduction
deepspeed.comm.inference_all_reduce(fp32_weights) # FP32 reduction
Implementation Details
Symmetric Naive Algorithm (< 1MB)
- Copy Phase: Each rank copies input data to its SHM buffer
- Synchronization: Wait for all ranks to complete copying (FSM state check)
- Reduce Phase: Each rank independently reduces across all SHM buffers
- Double Buffer: Alternate between two buffers to allow overlap
Distributed Naive Algorithm (>= 1MB)
- Copy Phase: Each rank copies entire input to its SHM buffer
- Reduce-Scatter: Each rank reduces its assigned slice across all buffers
- Synchronization: Wait for all ranks to complete reduction
- AllGather: Each rank copies all reduced slices back to local memory
SIMD Optimization
- AVX-512 (x86_64): Processes 32 bytes per iteration (16 BF16/FP16, 8 FP32)
- RISC-V Vector: Dynamic vector length set via vsetvl instruction
- Unrolled Loops: Switch statement unrolls up to 16 ranks
- Remainder Handling: Scalar fallback for non-aligned elements
Synchronization Mechanism
- Three-State FSM: Distinguishes between lagging, syncing, and leading ranks
- Lock-Free: Uses atomic memory fences (std::memory_order_release)
- Spin Waiting: Busy-wait on volatile state pointers for low latency
Memory Layout
Shared Memory Region per Rank:
+---------------------------+
| states[2] | FSM states (8 bytes)
+---------------------------+
| buffer[0] (1MB) | Symmetric buffer 0
+---------------------------+
| buffer[1] (1MB) | Symmetric buffer 1
+---------------------------+
| buffer[2] (32MB) | Distributed buffer 0
+---------------------------+
| buffer[3] (32MB) | Distributed buffer 1
+---------------------------+
Total: ~66MB per rank
Performance Considerations
- Threshold: 1MB threshold balances copy overhead vs. distributed reduction
- Parallelization: OpenMP parallel for loops in reduction kernels
- Cache Efficiency: Each rank works on its local slice in distributed mode
- NUMA Awareness: Shared memory allocated on local NUMA node