Implementation:Deepspeedai DeepSpeed CCL Backend
| Knowledge Sources | |
|---|---|
| Domains | Communication, Distributed_Computing, CPU_Optimization |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
OneCCL-based communication backend for CPU-based distributed training with optimized shared memory allreduce for local nodes.
Description
The CCL Backend provides a complete communication layer built on Intel's OneCCL (Collective Communications Library) for CPU-based distributed training in DeepSpeed. It implements standard collective operations (broadcast, all_reduce, barrier) while integrating with a specialized shared memory (SHM) based allreduce implementation for low-latency communication when all ranks reside on the same physical machine. The backend handles KVS (Key-Value Store) initialization for rank coordination, supports sub-communicator creation for process groups, and provides operation caching to optimize repeated communications. For inference workloads, it automatically selects between OneCCL and SHM implementations based on data size and topology, with SHM being used for BFloat16 and Float32 tensors when LOCAL_SIZE equals world size.
The implementation includes both standard CCL operations and an optimized inference path that falls back to the SHM-based allreduce for low-latency scenarios. It supports datatype mapping for int32, int64, float32, float64, bfloat16, and float16, along with reduction operations (sum, min, max, product). The backend is exposed to Python via PyBind11 and integrates seamlessly with DeepSpeed's communication abstraction layer.
Usage
Use the CCL Backend when performing CPU-based distributed training with Intel hardware, especially when ranks are co-located on the same physical machine for maximum performance through shared memory optimization. It is automatically selected when DeepSpeed is configured for CPU operations with OneCCL installed.
Code Reference
Source Location
- Repository: DeepSpeed
- File: csrc/cpu/comm/ccl.cpp
Signature
// Initialization
void initialize(int size, int rank, torch::Tensor& kvs_data);
std::vector<uint8_t> get_kvs_addr(int rank);
// Communicator management
void initialize_sub_comm(int size, int rank, torch::Tensor& kvs_data, std::vector<int> ranks);
// Collective operations
void broadcast(torch::Tensor& data, int src, std::vector<int> group, bool async_op);
void all_reduce(torch::Tensor& data, py::object op, std::vector<int> group, bool async_op);
void all_reduce_caching(torch::Tensor& data, py::object op, std::string match_id,
std::vector<int> group, bool async_op);
void inference_all_reduce(torch::Tensor& data, py::object op);
void barrier(std::vector<int> group, bool async_op);
// Utility functions
int get_rank(int group = 0);
int get_world_size(int group = 0);
ccl::datatype get_ccl_datatype(c10::ScalarType type);
ccl::reduction get_ccl_reduce_op(py::object op, at::Tensor& input);
Import
# The CCL backend is loaded as a Python extension module
import deepspeed.comm
# Initialize the CCL backend (typically done internally by DeepSpeed)
# Requires OneCCL to be installed and configured
deepspeed.init_distributed(dist_backend='ccl')
# Use standard DeepSpeed communication operations
# which will use the CCL backend on CPU
deepspeed.comm.broadcast(tensor, src=0)
deepspeed.comm.all_reduce(tensor)
I/O Contract
| Parameter | Type | Description |
|---|---|---|
| size | int | Total number of ranks in the communicator |
| rank | int | Current rank identifier (0 to size-1) |
| kvs_data | torch::Tensor | KVS address data from rank 0 |
| Parameter | Type | Description |
|---|---|---|
| data | torch::Tensor | Tensor to broadcast (in-place) |
| src | int | Source rank for broadcast |
| group | std::vector<int> | Process group ranks |
| async_op | bool | Asynchronous operation flag (currently unused) |
| Parameter | Type | Description |
|---|---|---|
| data | torch::Tensor | Input/output tensor for reduction (in-place) |
| op | py::object | Reduction operation (SUM, MIN, MAX, PRODUCT) |
| group | std::vector<int> | Process group ranks |
| async_op | bool | Asynchronous operation flag (currently unused) |
| Parameter | Type | Description |
|---|---|---|
| data | torch::Tensor | Input/output tensor (BFloat16/Float32) |
| op | py::object | Reduction operation (must be SUM) |
| Returns | Uses SHM optimization when all ranks are local |
| C10 Type | CCL Type | Element Size |
|---|---|---|
| Int | int32 | 4 bytes |
| Long | int64 | 8 bytes |
| Float | float32 | 4 bytes |
| Double | float64 | 8 bytes |
| BFloat16 | bfloat16 | 2 bytes |
| Half | float16 | 2 bytes |
Usage Examples
import torch
import deepspeed
# Initialize DeepSpeed with CCL backend
deepspeed.init_distributed(dist_backend='ccl')
# Create a tensor on CPU
tensor = torch.randn(1024, 1024, dtype=torch.float32)
# Broadcast from rank 0 to all ranks
deepspeed.comm.broadcast(tensor, src=0)
# All-reduce with sum operation (in-place)
deepspeed.comm.all_reduce(tensor, op=deepspeed.comm.ReduceOp.SUM)
# For inference workloads, optimized allreduce is used automatically
# when all ranks are on the same machine (LOCAL_SIZE == world_size)
# This uses shared memory for BFloat16/Float32 tensors
model_output = torch.randn(512, 512, dtype=torch.bfloat16)
deepspeed.comm.inference_all_reduce(model_output)
# All-reduce with caching for repeated operations
# Use match_id to cache communication patterns
for step in range(100):
deepspeed.comm.all_reduce_caching(
tensor,
op=deepspeed.comm.ReduceOp.SUM,
match_id="my_gradient_tensor"
)
# Synchronization barrier
deepspeed.comm.barrier()
Implementation Details
Automatic SHM Selection
The CCL backend automatically uses shared memory optimization when:
- LOCAL_SIZE environment variable equals world_size (all ranks co-located)
- Data type is BFloat16 or Float32
- Called via inference_all_reduce()
KVS Initialization
The Key-Value Store (KVS) coordination mechanism:
- Rank 0 creates main KVS and broadcasts address
- Other ranks receive KVS address and connect
- Used for communicator creation and rank coordination
Operation Caching
The all_reduce_caching operation optimizes repeated communications:
- Set to_cache attribute to true
- Provide unique match_id string for each operation
- Same match_id must be used across all ranks
- Different operations on same tensor need different match_ids