Implementation:LMCache LMCache Base Transfer Channel
| Knowledge Sources | |
|---|---|
| Domains | Data Transfer, Distributed Systems |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
BaseTransferChannel defines the abstract interface for peer-to-peer data transfer channels used in distributed LMCache deployments.
Description
The BaseTransferChannel is an abstract base class (using abc.ABCMeta) that specifies the contract for all transfer channel implementations. It defines three categories of operations: initialization (peer connection setup with optional side messages via ZMQ), paired send/recv operations (both sync and async batched variants), and one-sided read/write operations (both sync and async batched variants). The base class provides concrete implementations for side-message handling during initialization, including handle_init_side_msg for P2P peer lookup URL exchange and send_init_side_msg/async_send_init_side_msg for sending side messages over ZMQ sockets using msgspec serialization.
Usage
Subclass BaseTransferChannel to implement a specific data transfer mechanism (e.g., socket-based, RDMA-based) for distributed KV cache sharing between LMCache instances. All concrete implementations must provide the full set of abstract methods for initialization, send/recv, read/write, and cleanup.
Code Reference
Source Location
- Repository: LMCache
- File: lmcache/v1/transfer_channel/abstract.py
- Lines: 1-285
Signature
class BaseTransferChannel(metaclass=abc.ABCMeta):
# Initialization
def lazy_init_peer_connection(self, local_id: str, peer_id: str, peer_init_url: str, init_side_msg: Optional[InitSideMsgBase] = None) -> Optional[InitSideRetMsgBase]: ...
async def async_lazy_init_peer_connection(self, local_id: str, peer_id: str, peer_init_url: str, init_side_msg: Optional[InitSideMsgBase] = None) -> Optional[InitSideRetMsgBase]: ...
def remote_xfer_handler_exists(self, receiver_or_sender_id: str) -> bool: ...
def handle_init_side_msg(self, req: InitSideMsgBase) -> InitSideRetMsgBase: ...
def send_init_side_msg(self, init_tmp_socket: zmq.Socket, init_side_msg: InitSideMsgBase) -> InitSideRetMsgBase: ...
async def async_send_init_side_msg(self, init_tmp_socket: zmq.Socket, init_side_msg: InitSideMsgBase) -> InitSideRetMsgBase: ...
# Utility
def get_local_mem_indices(self, objects: Union[list[bytes], list[MemoryObj]]) -> list[int]: ...
# Paired send/recv
def batched_send(self, objects: Union[list[bytes], list[MemoryObj]], transfer_spec: Optional[dict] = None) -> int: ...
def batched_recv(self, buffers: Union[list[bytes], list[MemoryObj]], transfer_spec: Optional[dict] = None) -> int: ...
async def async_batched_send(self, objects: Union[list[bytes], list[MemoryObj]], transfer_spec: Optional[dict] = None) -> int: ...
async def async_batched_recv(self, buffers: Union[list[bytes], list[MemoryObj]], transfer_spec: Optional[dict] = None) -> int: ...
# One-sided read/write
def batched_write(self, objects: Union[list[bytes], list[MemoryObj]], transfer_spec: Optional[dict] = None) -> int: ...
def batched_read(self, buffers: Union[list[bytes], list[MemoryObj]], transfer_spec: Optional[dict] = None) -> int: ...
async def async_batched_write(self, objects: Union[list[bytes], list[MemoryObj]], transfer_spec: Optional[dict] = None) -> int: ...
async def async_batched_read(self, buffers: Union[list[bytes], list[MemoryObj]], transfer_spec: Optional[dict] = None) -> int: ...
# Cleanup
def close(self) -> None: ...
Import
from lmcache.v1.transfer_channel.abstract import BaseTransferChannel
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| local_id | str | Yes (for init) | Identifier of the local instance |
| peer_id | str | Yes (for init) | Identifier of the remote peer |
| peer_init_url | str | Yes (for init) | URL used to initialize the connection to the peer |
| init_side_msg | InitSideMsgBase | No | Optional side message exchanged during initialization |
| objects | Union[list[bytes], list[MemoryObj]] | Yes (for send/write) | Batch of data objects to transfer |
| buffers | Union[list[bytes], list[MemoryObj]] | Yes (for recv/read) | Pre-allocated buffers to receive data into |
| transfer_spec | Optional[dict] | No | Additional transfer specifications |
Outputs
| Name | Type | Description |
|---|---|---|
| lazy_init_peer_connection return | Optional[InitSideRetMsgBase] | Side message received from peer during initialization, or None |
| batched_send/recv/write/read return | int | Number of successfully transferred objects |
| remote_xfer_handler_exists return | bool | Whether a remote transfer handler exists for the given ID |
Usage Examples
from lmcache.v1.transfer_channel.abstract import BaseTransferChannel
# BaseTransferChannel is abstract; use a concrete subclass
# Example: establish a peer connection
ret_msg = channel.lazy_init_peer_connection(
local_id="node_0",
peer_id="node_1",
peer_init_url="tcp://192.168.1.2:5555",
)
# Send a batch of memory objects
num_sent = channel.batched_send(memory_objects)
# Receive into pre-allocated buffers
num_received = channel.batched_recv(receive_buffers)
# Close the channel
channel.close()