Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Sgl project Sglang CPU Shared Memory

From Leeroopedia


Knowledge Sources
Domains Distributed Computing, CPU Kernels
Last Updated 2026-02-10 00:00 GMT

Overview

Implements shared memory (SHM) based allreduce and allgather collective operations for low-latency intra-node communication in CPU tensor parallelism.

Description

This file uses POSIX shared memory (shm_open, mmap) to create shared data segments accessible by all ranks on the same machine. The implementation consists of several layers:

SHM Building Blocks:

  • SharedData struct wraps SHM file descriptors and mapped memory pointers.
  • shared_open opens an existing SHM segment with O_RDWR permissions.
  • shared_create creates a new SHM segment and writes initial data to it.

Allreduce Workspace:

  • The allreduce_workspace struct contains a double-buffered design with two state fields and a large buffer area split into regions for symmetric (small message) and distributed (large message) allreduce.
  • The threshold between symmetric and distributed strategies is NAIVE_ALLREDUCE_THRESHOLD (1MB).
  • Maximum buffer size is MAX_BUF_SIZE (32MB).

Collective Operations:

  • symmetric_naive_all_reduce -- All ranks copy data to shared buffer, one rank reduces, all read result. Used for messages under 1MB.
  • distributed_naive_reduce -- Each rank reduces a different slice, then all ranks read all slices. Used for larger messages.
  • all_reduce_outer_loop -- Dispatches to symmetric or distributed based on chunk size, handles messages larger than 32MB via outer loop.
  • naive_all_gather -- Each rank copies its data to shared buffer, all ranks read all buffers.
  • all_gather -- Public API wrapping naive_all_gather with dimension-aware striding.

State synchronization uses wait_buffer_state_until_2 for polling on coll_state enum values that track progress through copy-in and reduce phases. The double-buffer scheme uses alternating buffers (BUFFER0_OFFSET and BUFFER1_OFFSET) to avoid synchronization between consecutive collective rounds. Architecture-specific implementations are included via conditional headers for x86_64 and aarch64 platforms.

Usage

Use this module for CPU tensor parallelism when multiple ranks need to perform allreduce or allgather operations within a single machine. It is initialized via shm_initialize with world size, rank, and address/port strings for coordination. The SHM approach achieves microsecond-level latencies for small allreduce operations, critical for the frequent allreduce calls in tensor-parallel transformer inference.

Code Reference

Source Location

Signature

// SHM lifecycle
void shared_open(SharedData* data, const char* name, size_t nbytes);
void shared_create(SharedData* data, const char* name, void* bytes, size_t nbytes);

// Initialization
void shm_initialize(int size, int rank, const char* addr_string, const char* port_string);

// Collective operations
void all_reduce_outer_loop(torch::Tensor& data, size_t numel, int data_size);
torch::Tensor& all_gather(
    torch::Tensor& result, torch::Tensor& data,
    int dim, size_t numel, int data_size);

// Internal primitives
void symmetric_naive_all_reduce(
    char* data_ptr, c10::ScalarType scalar_type,
    size_t chunk_size, size_t chunk_el);
void distributed_naive_reduce(
    char* data_ptr, c10::ScalarType scalar_type,
    size_t chunk_size, size_t chunk_el);
void naive_all_gather(
    char* result_ptr, char* data_ptr,
    size_t res_stride, size_t chunk_size, size_t chunk_el);

Import

#include "shm.h"
#if defined(__x86_64__)
#include "x86_64/shm.h"
#elif defined(__aarch64__)
#include "aarch64/shm.h"
#endif

#include <ATen/ATen.h>
#include <sys/mman.h>
#include <fcntl.h>

I/O Contract

Inputs

Name Type Required Description
size int Yes World size (total number of ranks) for shm_initialize
rank int Yes Current rank index for shm_initialize
addr_string const char* Yes Address string for SHM segment naming coordination
port_string const char* Yes Port string for SHM segment naming coordination
data torch::Tensor Yes Input tensor for allreduce or allgather operations
result torch::Tensor Depends Output tensor for allgather (pre-allocated)
dim int Depends Dimension along which to gather for allgather
numel size_t Yes Number of elements in the tensor
data_size int Yes Total size in bytes of the tensor data

Outputs

Name Type Description
data (modified) torch::Tensor For allreduce: input tensor is modified in-place with the reduced result
result torch::Tensor& For allgather: result tensor filled with gathered data from all ranks

Usage Examples

// Initialize SHM communication for 4-rank tensor parallelism
shm_initialize(/*size=*/4, /*rank=*/my_rank, "127.0.0.1", "29500");

// Perform allreduce on a tensor (modifies data in-place)
torch::Tensor hidden_states = /* ... */;
size_t numel = hidden_states.numel();
int data_size = numel * hidden_states.element_size();
all_reduce_outer_loop(hidden_states, numel, data_size);

// Perform allgather along dimension 0
torch::Tensor local_data = /* [local_size, hidden_dim] */;
torch::Tensor gathered = torch::empty({world_size * local_size, hidden_dim}, local_data.options());
all_gather(gathered, local_data, /*dim=*/0, numel, data_size);

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment