Implementation:Vllm project Vllm CPU SHM
| Knowledge Sources | |
|---|---|
| Domains | Inter-Process Communication, Tensor Parallelism |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
Implements shared memory-based inter-process communication for multi-process tensor operations on CPU, enabling efficient tensor parallelism without network overhead.
Description
This file provides a complete shared memory (SHM) communication layer for CPU tensor parallel inference. The ThreadSHMContext class manages per-thread shared memory buffers with producer/consumer stamps for lock-free synchronization, supporting both x86 (memory fence) and AArch64 (atomic acquire/release) memory models. The SHMManager class orchestrates shared memory allocation via POSIX shm_open/mmap and provides collective operations including allreduce, gather, all-gather, and point-to-point tensor send/receive. Double-buffering with stamp-based synchronization avoids buffer conflicts across concurrent operations.
Usage
This code is compiled as part of the vLLM CPU extension. It is used when running multi-process tensor parallel inference on CPU, where multiple processes communicate via shared memory instead of network-based collectives.
Code Reference
Source Location
- Repository: vllm
- File: csrc/cpu/shm.cpp
- Lines: 1-867
Signature
struct ThreadSHMContext {
ThreadSHMContext(const int thread_id, const int thread_num,
const int rank, const int group_size,
void* thread_shm_ptr);
void set_context(int rank, ThreadSHMContext* ptr, void* thread_shm_ptr);
template <typename T> T* get_thread_shm_ptr(int rank);
void next_stamp();
void commit_ready_stamp();
template <typename Cond> void wait_for_all(Cond&& cond);
template <typename Cond> void wait_for_one(int rank, Cond&& cond);
};
class SHMManager {
public:
explicit SHMManager(const std::string& name, const int rank,
const int group_size, const int thread_num);
void join(const std::string& name);
static int64_t create_singleton_instance(const std::string& name,
const int group_size, const int rank, const int thread_num);
static SHMManager* get_singleton_instance(int64_t handle);
};
// Public API functions
int64_t init_shm_manager(const std::string& name, const int64_t group_size,
const int64_t rank, const int64_t thread_num);
std::string join_shm_manager(int64_t handle, const std::string& name);
void shm_allreduce(int64_t handle, torch::Tensor& data);
void shm_gather(int64_t handle, torch::Tensor& data,
const std::optional<std::vector<torch::Tensor>>& outputs,
int64_t dst);
void shm_all_gather(int64_t handle, const torch::Tensor& data,
torch::Tensor& output);
void shm_send_tensor_list(int64_t handle,
const std::vector<torch::Tensor>& tensor_list,
int64_t dst);
std::vector<torch::Tensor> shm_recv_tensor_list(int64_t handle, int64_t src);
Import
#include "cpu/cpu_types.hpp"
#include <sys/mman.h>
#include <fcntl.h>
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| name | std::string | Yes | Base name for the POSIX shared memory region |
| group_size | int64_t | Yes | Number of processes in the communication group (max 8) |
| rank | int64_t | Yes | Rank of the current process within the group |
| thread_num | int64_t | Yes | Number of threads per process for parallel communication |
| handle | int64_t | Yes | Opaque handle to a SHMManager singleton instance |
| data | torch::Tensor | Yes | Tensor to communicate (allreduce, gather, etc.) |
| dst | int64_t | No | Destination rank for gather/send operations |
Outputs
| Name | Type | Description |
|---|---|---|
| handle | int64_t | Opaque handle returned by init_shm_manager for subsequent operations |
| data | torch::Tensor | Modified in-place with allreduce result |
| output | torch::Tensor | Result of all-gather operation |
| (return) | std::vector<torch::Tensor> | Received tensor list from shm_recv_tensor_list |
Usage Examples
// Initialize shared memory manager
int64_t handle = init_shm_manager("vllm_shm", group_size, rank, thread_num);
// Join all ranks
std::string name = join_shm_manager(handle, "vllm_shm");
// Perform allreduce on a tensor
torch::Tensor data = torch::randn({hidden_size});
shm_allreduce(handle, data);
// Perform all-gather
torch::Tensor output = torch::empty({group_size * hidden_size});
shm_all_gather(handle, data, output);