Implementation:Sgl project Sglang CPU Shared Memory
| Knowledge Sources | |
|---|---|
| Domains | Distributed Computing, CPU Kernels |
| Last Updated | 2026-02-10 00:00 GMT |
Overview
Implements shared memory (SHM) based allreduce and allgather collective operations for low-latency intra-node communication in CPU tensor parallelism.
Description
This file uses POSIX shared memory (shm_open, mmap) to create shared data segments accessible by all ranks on the same machine. The implementation consists of several layers:
SHM Building Blocks:
- SharedData struct wraps SHM file descriptors and mapped memory pointers.
- shared_open opens an existing SHM segment with O_RDWR permissions.
- shared_create creates a new SHM segment and writes initial data to it.
Allreduce Workspace:
- The allreduce_workspace struct contains a double-buffered design with two state fields and a large buffer area split into regions for symmetric (small message) and distributed (large message) allreduce.
- The threshold between symmetric and distributed strategies is NAIVE_ALLREDUCE_THRESHOLD (1MB).
- Maximum buffer size is MAX_BUF_SIZE (32MB).
Collective Operations:
- symmetric_naive_all_reduce -- All ranks copy data to shared buffer, one rank reduces, all read result. Used for messages under 1MB.
- distributed_naive_reduce -- Each rank reduces a different slice, then all ranks read all slices. Used for larger messages.
- all_reduce_outer_loop -- Dispatches to symmetric or distributed based on chunk size, handles messages larger than 32MB via outer loop.
- naive_all_gather -- Each rank copies its data to shared buffer, all ranks read all buffers.
- all_gather -- Public API wrapping naive_all_gather with dimension-aware striding.
State synchronization uses wait_buffer_state_until_2 for polling on coll_state enum values that track progress through copy-in and reduce phases. The double-buffer scheme uses alternating buffers (BUFFER0_OFFSET and BUFFER1_OFFSET) to avoid synchronization between consecutive collective rounds. Architecture-specific implementations are included via conditional headers for x86_64 and aarch64 platforms.
Usage
Use this module for CPU tensor parallelism when multiple ranks need to perform allreduce or allgather operations within a single machine. It is initialized via shm_initialize with world size, rank, and address/port strings for coordination. The SHM approach achieves microsecond-level latencies for small allreduce operations, critical for the frequent allreduce calls in tensor-parallel transformer inference.
Code Reference
Source Location
- Repository: Sgl_project_Sglang
- File: sgl-kernel/csrc/cpu/shm.cpp
- Lines: 1-389
Signature
// SHM lifecycle
void shared_open(SharedData* data, const char* name, size_t nbytes);
void shared_create(SharedData* data, const char* name, void* bytes, size_t nbytes);
// Initialization
void shm_initialize(int size, int rank, const char* addr_string, const char* port_string);
// Collective operations
void all_reduce_outer_loop(torch::Tensor& data, size_t numel, int data_size);
torch::Tensor& all_gather(
torch::Tensor& result, torch::Tensor& data,
int dim, size_t numel, int data_size);
// Internal primitives
void symmetric_naive_all_reduce(
char* data_ptr, c10::ScalarType scalar_type,
size_t chunk_size, size_t chunk_el);
void distributed_naive_reduce(
char* data_ptr, c10::ScalarType scalar_type,
size_t chunk_size, size_t chunk_el);
void naive_all_gather(
char* result_ptr, char* data_ptr,
size_t res_stride, size_t chunk_size, size_t chunk_el);
Import
#include "shm.h"
#if defined(__x86_64__)
#include "x86_64/shm.h"
#elif defined(__aarch64__)
#include "aarch64/shm.h"
#endif
#include <ATen/ATen.h>
#include <sys/mman.h>
#include <fcntl.h>
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| size | int | Yes | World size (total number of ranks) for shm_initialize |
| rank | int | Yes | Current rank index for shm_initialize |
| addr_string | const char* | Yes | Address string for SHM segment naming coordination |
| port_string | const char* | Yes | Port string for SHM segment naming coordination |
| data | torch::Tensor | Yes | Input tensor for allreduce or allgather operations |
| result | torch::Tensor | Depends | Output tensor for allgather (pre-allocated) |
| dim | int | Depends | Dimension along which to gather for allgather |
| numel | size_t | Yes | Number of elements in the tensor |
| data_size | int | Yes | Total size in bytes of the tensor data |
Outputs
| Name | Type | Description |
|---|---|---|
| data (modified) | torch::Tensor | For allreduce: input tensor is modified in-place with the reduced result |
| result | torch::Tensor& | For allgather: result tensor filled with gathered data from all ranks |
Usage Examples
// Initialize SHM communication for 4-rank tensor parallelism
shm_initialize(/*size=*/4, /*rank=*/my_rank, "127.0.0.1", "29500");
// Perform allreduce on a tensor (modifies data in-place)
torch::Tensor hidden_states = /* ... */;
size_t numel = hidden_states.numel();
int data_size = numel * hidden_states.element_size();
all_reduce_outer_loop(hidden_states, numel, data_size);
// Perform allgather along dimension 0
torch::Tensor local_data = /* [local_size, hidden_dim] */;
torch::Tensor gathered = torch::empty({world_size * local_size, hidden_dim}, local_data.options());
all_gather(gathered, local_data, /*dim=*/0, numel, data_size);