Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:LMCache LMCache Base Transfer Channel

From Leeroopedia


Knowledge Sources
Domains Data Transfer, Distributed Systems
Last Updated 2026-02-09 00:00 GMT

Overview

BaseTransferChannel defines the abstract interface for peer-to-peer data transfer channels used in distributed LMCache deployments.

Description

The BaseTransferChannel is an abstract base class (using abc.ABCMeta) that specifies the contract for all transfer channel implementations. It defines three categories of operations: initialization (peer connection setup with optional side messages via ZMQ), paired send/recv operations (both sync and async batched variants), and one-sided read/write operations (both sync and async batched variants). The base class provides concrete implementations for side-message handling during initialization, including handle_init_side_msg for P2P peer lookup URL exchange and send_init_side_msg/async_send_init_side_msg for sending side messages over ZMQ sockets using msgspec serialization.

Usage

Subclass BaseTransferChannel to implement a specific data transfer mechanism (e.g., socket-based, RDMA-based) for distributed KV cache sharing between LMCache instances. All concrete implementations must provide the full set of abstract methods for initialization, send/recv, read/write, and cleanup.

Code Reference

Source Location

Signature

class BaseTransferChannel(metaclass=abc.ABCMeta):
    # Initialization
    def lazy_init_peer_connection(self, local_id: str, peer_id: str, peer_init_url: str, init_side_msg: Optional[InitSideMsgBase] = None) -> Optional[InitSideRetMsgBase]: ...
    async def async_lazy_init_peer_connection(self, local_id: str, peer_id: str, peer_init_url: str, init_side_msg: Optional[InitSideMsgBase] = None) -> Optional[InitSideRetMsgBase]: ...
    def remote_xfer_handler_exists(self, receiver_or_sender_id: str) -> bool: ...
    def handle_init_side_msg(self, req: InitSideMsgBase) -> InitSideRetMsgBase: ...
    def send_init_side_msg(self, init_tmp_socket: zmq.Socket, init_side_msg: InitSideMsgBase) -> InitSideRetMsgBase: ...
    async def async_send_init_side_msg(self, init_tmp_socket: zmq.Socket, init_side_msg: InitSideMsgBase) -> InitSideRetMsgBase: ...

    # Utility
    def get_local_mem_indices(self, objects: Union[list[bytes], list[MemoryObj]]) -> list[int]: ...

    # Paired send/recv
    def batched_send(self, objects: Union[list[bytes], list[MemoryObj]], transfer_spec: Optional[dict] = None) -> int: ...
    def batched_recv(self, buffers: Union[list[bytes], list[MemoryObj]], transfer_spec: Optional[dict] = None) -> int: ...
    async def async_batched_send(self, objects: Union[list[bytes], list[MemoryObj]], transfer_spec: Optional[dict] = None) -> int: ...
    async def async_batched_recv(self, buffers: Union[list[bytes], list[MemoryObj]], transfer_spec: Optional[dict] = None) -> int: ...

    # One-sided read/write
    def batched_write(self, objects: Union[list[bytes], list[MemoryObj]], transfer_spec: Optional[dict] = None) -> int: ...
    def batched_read(self, buffers: Union[list[bytes], list[MemoryObj]], transfer_spec: Optional[dict] = None) -> int: ...
    async def async_batched_write(self, objects: Union[list[bytes], list[MemoryObj]], transfer_spec: Optional[dict] = None) -> int: ...
    async def async_batched_read(self, buffers: Union[list[bytes], list[MemoryObj]], transfer_spec: Optional[dict] = None) -> int: ...

    # Cleanup
    def close(self) -> None: ...

Import

from lmcache.v1.transfer_channel.abstract import BaseTransferChannel

I/O Contract

Inputs

Name Type Required Description
local_id str Yes (for init) Identifier of the local instance
peer_id str Yes (for init) Identifier of the remote peer
peer_init_url str Yes (for init) URL used to initialize the connection to the peer
init_side_msg InitSideMsgBase No Optional side message exchanged during initialization
objects Union[list[bytes], list[MemoryObj]] Yes (for send/write) Batch of data objects to transfer
buffers Union[list[bytes], list[MemoryObj]] Yes (for recv/read) Pre-allocated buffers to receive data into
transfer_spec Optional[dict] No Additional transfer specifications

Outputs

Name Type Description
lazy_init_peer_connection return Optional[InitSideRetMsgBase] Side message received from peer during initialization, or None
batched_send/recv/write/read return int Number of successfully transferred objects
remote_xfer_handler_exists return bool Whether a remote transfer handler exists for the given ID

Usage Examples

from lmcache.v1.transfer_channel.abstract import BaseTransferChannel

# BaseTransferChannel is abstract; use a concrete subclass
# Example: establish a peer connection
ret_msg = channel.lazy_init_peer_connection(
    local_id="node_0",
    peer_id="node_1",
    peer_init_url="tcp://192.168.1.2:5555",
)

# Send a batch of memory objects
num_sent = channel.batched_send(memory_objects)

# Receive into pre-allocated buffers
num_received = channel.batched_recv(receive_buffers)

# Close the channel
channel.close()

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment