Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:LMCache LMCache PySocket Channel

From Leeroopedia
Revision as of 15:25, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/LMCache_LMCache_PySocket_Channel.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Knowledge Sources
Domains Data Transfer, Network Communication
Last Updated 2026-02-09 00:00 GMT

Overview

PySocketChannel implements the control plane for socket-based transfer channels using ZMQ for handshake and peer initialization.

Description

The PySocketChannel class extends BaseTransferChannel to provide a ZMQ-based control plane for peer-to-peer connections. It manages initialization handshakes through ZMQ REQ/REP sockets, maintaining a registry of remote connections keyed by peer URL. The class supports both synchronous and asynchronous operation modes: in sync mode, an initialization loop runs in a daemon thread; in async mode, it runs as a coroutine on the event loop. Message serialization uses msgspec.msgpack with tagged structs (PySocketInitRequest, PySocketInitResponse). Construction requires role, buffer pointer, buffer size, alignment, tensor parallelism rank, and peer initialization URL parameters. The data plane is left for subclasses to implement.

Usage

Use PySocketChannel as a base class for transfer channels that need ZMQ-based peer discovery and initialization. Subclasses should implement the actual data transfer methods (batched_send, batched_recv, batched_write, batched_read, and their async variants). Pass async_mode=True for event-loop-based operation or False for thread-based initialization.

Code Reference

Source Location

Signature

class PySocketMsgBase(msgspec.Struct, tag=True): ...

class PySocketInitRequest(PySocketMsgBase):
    peer_init_url: str

class PySocketInitResponse(PySocketMsgBase):
    status: str

class PySocketChannel(BaseTransferChannel):
    def __init__(self, async_mode: bool = False, **kwargs) -> None: ...
    def lazy_init_peer_connection(self, local_id: str, peer_id: str, peer_init_url: str, init_side_msg: Optional[InitSideMsgBase] = None) -> Optional[InitSideRetMsgBase]: ...
    async def async_lazy_init_peer_connection(self, local_id: str, peer_id: str, peer_init_url: str, init_side_msg: Optional[InitSideMsgBase] = None) -> Optional[InitSideRetMsgBase]: ...
    def close(self) -> None: ...

Import

from lmcache.v1.transfer_channel.py_socket_channel import PySocketChannel

I/O Contract

Inputs

Name Type Required Description
async_mode bool No (default False) Whether to use async ZMQ context and event loop for initialization
role str Yes (via kwargs) Role identifier for this channel endpoint
buffer_ptr int Yes (via kwargs) Pointer to the shared transfer buffer
buffer_size int Yes (via kwargs) Size of the shared transfer buffer in bytes
align_bytes int Yes (via kwargs) Byte alignment for buffer operations
tp_rank int Yes (via kwargs) Tensor parallelism rank
peer_init_url str Yes (via kwargs) URL for peer initialization (ZMQ bind address)
peer_lookup_url str No (via kwargs) Optional URL for peer lookup in P2P mode
event_loop asyncio.AbstractEventLoop No (via kwargs) Required when async_mode is True
local_id str Yes (for init) Local instance identifier
peer_id str Yes (for init) Remote peer identifier
peer_init_url str Yes (for init) ZMQ URL to connect to for handshake

Outputs

Name Type Description
async_lazy_init_peer_connection return Optional[InitSideRetMsgBase] Side message from peer during initialization, or None if no side message was sent

Usage Examples

from lmcache.v1.transfer_channel.py_socket_channel import PySocketChannel

# Create a channel in async mode
channel = PySocketChannel(
    async_mode=True,
    role="sender",
    buffer_ptr=buffer_pointer,
    buffer_size=1024 * 1024,
    align_bytes=64,
    tp_rank=0,
    peer_init_url="5555",
    event_loop=asyncio.get_event_loop(),
)

# Initialize a peer connection asynchronously
ret_msg = await channel.async_lazy_init_peer_connection(
    local_id="node_0",
    peer_id="node_1",
    peer_init_url="5556",
)

# Close when done
channel.close()

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment