Implementation:LMCache LMCache Custom IPC Types
| Knowledge Sources | |
|---|---|
| Domains | Inter-Process Communication, Serialization |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
This module defines custom types and serialization utilities for inter-process communication (IPC) between LMCache processes, including CUDA tensor sharing and cache engine key management.
Description
The module provides two primary types: CudaIPCWrapper for sharing CUDA tensors between processes via IPC handles, and IPCCacheEngineKey for identifying cache entries across processes. CudaIPCWrapper wraps a CUDA tensor's storage handle, dtype, shape, and device UUID, enabling safe reconstruction on the receiving process through GPU device UUID-based mapping. IPCCacheEngineKey is a frozen dataclass keyed by model name, world size, worker ID, and chunk hash bytes. The module also provides a customized msgspec encoder/decoder framework using extension types to seamlessly serialize these custom objects within the msgpack protocol.
Usage
Use these types when implementing IPC messaging between LMCache worker processes and the core server. CudaIPCWrapper is used to share GPU tensor references without copying data, while IPCCacheEngineKey identifies specific cache chunks in the distributed system.
Code Reference
Source Location
- Repository: LMCache
- File: lmcache/v1/multiprocess/custom_types.py
- Lines: 1-197
Signature
class CudaIPCWrapper:
def __init__(self, tensor: torch.Tensor): ...
def to_tensor(self) -> torch.Tensor: ...
@staticmethod
def Serialize(obj: "CudaIPCWrapper") -> bytes: ...
@staticmethod
def Deserialize(data: bytes) -> "CudaIPCWrapper": ...
@dataclass(order=True, frozen=True)
class IPCCacheEngineKey:
model_name: str
world_size: int
worker_id: int | None
chunk_hash: bytes
def no_worker_id_version(self) -> "IPCCacheEngineKey": ...
@staticmethod
def Serialize(obj: "IPCCacheEngineKey") -> bytes: ...
@staticmethod
def Deserialize(data: bytes) -> "IPCCacheEngineKey": ...
KVCache = list[CudaIPCWrapper]
def get_customized_encoder(type: Any) -> msgspec.msgpack.Encoder: ...
def get_customized_decoder(type: Any) -> msgspec.msgpack.Decoder: ...
Import
from lmcache.v1.multiprocess.custom_types import (
CudaIPCWrapper,
IPCCacheEngineKey,
KVCache,
get_customized_encoder,
get_customized_decoder,
)
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| tensor | torch.Tensor | Yes | A contiguous CUDA tensor with zero storage offset to wrap for IPC sharing |
| model_name | str | Yes | Name of the LLM model for cache key construction |
| world_size | int | Yes | Total number of distributed workers |
| worker_id | int or None | Yes | Worker identifier (None for scheduler/lookup keys) |
| chunk_hash | bytes | Yes | Hash bytes uniquely identifying a cache chunk |
Outputs
| Name | Type | Description |
|---|---|---|
| to_tensor | torch.Tensor | Reconstructed CUDA tensor from IPC handle on the receiving process |
| Serialize | bytes | Serialized byte representation of the wrapper or key |
| Deserialize | CudaIPCWrapper or IPCCacheEngineKey | Deserialized object from byte data |
| get_customized_encoder | msgspec.msgpack.Encoder | Encoder with custom extension hooks for IPC types |
| get_customized_decoder | msgspec.msgpack.Decoder | Decoder with custom extension hooks for IPC types |
Usage Examples
from lmcache.v1.multiprocess.custom_types import CudaIPCWrapper, IPCCacheEngineKey
import torch
# Wrap a CUDA tensor for IPC sharing
tensor = torch.randn(256, 128, device="cuda:0")
wrapper = CudaIPCWrapper(tensor)
# Serialize and deserialize the wrapper
data = CudaIPCWrapper.Serialize(wrapper)
restored_wrapper = CudaIPCWrapper.Deserialize(data)
restored_tensor = restored_wrapper.to_tensor()
# Create an IPC cache engine key
key = IPCCacheEngineKey(
model_name="llama-7b",
world_size=4,
worker_id=0,
chunk_hash=b"\x01\x02\x03\x04",
)
lookup_key = key.no_worker_id_version()