Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Deepspeedai DeepSpeed CCL Backend

From Leeroopedia


Knowledge Sources
Domains Communication, Distributed_Computing, CPU_Optimization
Last Updated 2026-02-09 00:00 GMT

Overview

OneCCL-based communication backend for CPU-based distributed training with optimized shared memory allreduce for local nodes.

Description

The CCL Backend provides a complete communication layer built on Intel's OneCCL (Collective Communications Library) for CPU-based distributed training in DeepSpeed. It implements standard collective operations (broadcast, all_reduce, barrier) while integrating with a specialized shared memory (SHM) based allreduce implementation for low-latency communication when all ranks reside on the same physical machine. The backend handles KVS (Key-Value Store) initialization for rank coordination, supports sub-communicator creation for process groups, and provides operation caching to optimize repeated communications. For inference workloads, it automatically selects between OneCCL and SHM implementations based on data size and topology, with SHM being used for BFloat16 and Float32 tensors when LOCAL_SIZE equals world size.

The implementation includes both standard CCL operations and an optimized inference path that falls back to the SHM-based allreduce for low-latency scenarios. It supports datatype mapping for int32, int64, float32, float64, bfloat16, and float16, along with reduction operations (sum, min, max, product). The backend is exposed to Python via PyBind11 and integrates seamlessly with DeepSpeed's communication abstraction layer.

Usage

Use the CCL Backend when performing CPU-based distributed training with Intel hardware, especially when ranks are co-located on the same physical machine for maximum performance through shared memory optimization. It is automatically selected when DeepSpeed is configured for CPU operations with OneCCL installed.

Code Reference

Source Location

Signature

// Initialization
void initialize(int size, int rank, torch::Tensor& kvs_data);
std::vector<uint8_t> get_kvs_addr(int rank);

// Communicator management
void initialize_sub_comm(int size, int rank, torch::Tensor& kvs_data, std::vector<int> ranks);

// Collective operations
void broadcast(torch::Tensor& data, int src, std::vector<int> group, bool async_op);
void all_reduce(torch::Tensor& data, py::object op, std::vector<int> group, bool async_op);
void all_reduce_caching(torch::Tensor& data, py::object op, std::string match_id,
                        std::vector<int> group, bool async_op);
void inference_all_reduce(torch::Tensor& data, py::object op);
void barrier(std::vector<int> group, bool async_op);

// Utility functions
int get_rank(int group = 0);
int get_world_size(int group = 0);
ccl::datatype get_ccl_datatype(c10::ScalarType type);
ccl::reduction get_ccl_reduce_op(py::object op, at::Tensor& input);

Import

# The CCL backend is loaded as a Python extension module
import deepspeed.comm

# Initialize the CCL backend (typically done internally by DeepSpeed)
# Requires OneCCL to be installed and configured
deepspeed.init_distributed(dist_backend='ccl')

# Use standard DeepSpeed communication operations
# which will use the CCL backend on CPU
deepspeed.comm.broadcast(tensor, src=0)
deepspeed.comm.all_reduce(tensor)

I/O Contract

initialize(size, rank, kvs_data)
Parameter Type Description
size int Total number of ranks in the communicator
rank int Current rank identifier (0 to size-1)
kvs_data torch::Tensor KVS address data from rank 0
broadcast(data, src, group, async_op)
Parameter Type Description
data torch::Tensor Tensor to broadcast (in-place)
src int Source rank for broadcast
group std::vector<int> Process group ranks
async_op bool Asynchronous operation flag (currently unused)
all_reduce(data, op, group, async_op)
Parameter Type Description
data torch::Tensor Input/output tensor for reduction (in-place)
op py::object Reduction operation (SUM, MIN, MAX, PRODUCT)
group std::vector<int> Process group ranks
async_op bool Asynchronous operation flag (currently unused)
inference_all_reduce(data, op)
Parameter Type Description
data torch::Tensor Input/output tensor (BFloat16/Float32)
op py::object Reduction operation (must be SUM)
Returns Uses SHM optimization when all ranks are local
Supported Data Types
C10 Type CCL Type Element Size
Int int32 4 bytes
Long int64 8 bytes
Float float32 4 bytes
Double float64 8 bytes
BFloat16 bfloat16 2 bytes
Half float16 2 bytes

Usage Examples

import torch
import deepspeed

# Initialize DeepSpeed with CCL backend
deepspeed.init_distributed(dist_backend='ccl')

# Create a tensor on CPU
tensor = torch.randn(1024, 1024, dtype=torch.float32)

# Broadcast from rank 0 to all ranks
deepspeed.comm.broadcast(tensor, src=0)

# All-reduce with sum operation (in-place)
deepspeed.comm.all_reduce(tensor, op=deepspeed.comm.ReduceOp.SUM)

# For inference workloads, optimized allreduce is used automatically
# when all ranks are on the same machine (LOCAL_SIZE == world_size)
# This uses shared memory for BFloat16/Float32 tensors
model_output = torch.randn(512, 512, dtype=torch.bfloat16)
deepspeed.comm.inference_all_reduce(model_output)

# All-reduce with caching for repeated operations
# Use match_id to cache communication patterns
for step in range(100):
    deepspeed.comm.all_reduce_caching(
        tensor,
        op=deepspeed.comm.ReduceOp.SUM,
        match_id="my_gradient_tensor"
    )

# Synchronization barrier
deepspeed.comm.barrier()

Implementation Details

Automatic SHM Selection

The CCL backend automatically uses shared memory optimization when:

  • LOCAL_SIZE environment variable equals world_size (all ranks co-located)
  • Data type is BFloat16 or Float32
  • Called via inference_all_reduce()

KVS Initialization

The Key-Value Store (KVS) coordination mechanism:

  • Rank 0 creates main KVS and broadcasts address
  • Other ranks receive KVS address and connect
  • Used for communicator creation and rank coordination

Operation Caching

The all_reduce_caching operation optimizes repeated communications:

  • Set to_cache attribute to true
  • Provide unique match_id string for each operation
  • Same match_id must be used across all ranks
  • Different operations on same tensor need different match_ids

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment