Implementation:LMCache LMCache PySocket Channel
| Knowledge Sources | |
|---|---|
| Domains | Data Transfer, Network Communication |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
PySocketChannel implements the control plane for socket-based transfer channels using ZMQ for handshake and peer initialization.
Description
The PySocketChannel class extends BaseTransferChannel to provide a ZMQ-based control plane for peer-to-peer connections. It manages initialization handshakes through ZMQ REQ/REP sockets, maintaining a registry of remote connections keyed by peer URL. The class supports both synchronous and asynchronous operation modes: in sync mode, an initialization loop runs in a daemon thread; in async mode, it runs as a coroutine on the event loop. Message serialization uses msgspec.msgpack with tagged structs (PySocketInitRequest, PySocketInitResponse). Construction requires role, buffer pointer, buffer size, alignment, tensor parallelism rank, and peer initialization URL parameters. The data plane is left for subclasses to implement.
Usage
Use PySocketChannel as a base class for transfer channels that need ZMQ-based peer discovery and initialization. Subclasses should implement the actual data transfer methods (batched_send, batched_recv, batched_write, batched_read, and their async variants). Pass async_mode=True for event-loop-based operation or False for thread-based initialization.
Code Reference
Source Location
- Repository: LMCache
- File: lmcache/v1/transfer_channel/py_socket_channel.py
- Lines: 1-260
Signature
class PySocketMsgBase(msgspec.Struct, tag=True): ...
class PySocketInitRequest(PySocketMsgBase):
peer_init_url: str
class PySocketInitResponse(PySocketMsgBase):
status: str
class PySocketChannel(BaseTransferChannel):
def __init__(self, async_mode: bool = False, **kwargs) -> None: ...
def lazy_init_peer_connection(self, local_id: str, peer_id: str, peer_init_url: str, init_side_msg: Optional[InitSideMsgBase] = None) -> Optional[InitSideRetMsgBase]: ...
async def async_lazy_init_peer_connection(self, local_id: str, peer_id: str, peer_init_url: str, init_side_msg: Optional[InitSideMsgBase] = None) -> Optional[InitSideRetMsgBase]: ...
def close(self) -> None: ...
Import
from lmcache.v1.transfer_channel.py_socket_channel import PySocketChannel
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| async_mode | bool | No (default False) | Whether to use async ZMQ context and event loop for initialization |
| role | str | Yes (via kwargs) | Role identifier for this channel endpoint |
| buffer_ptr | int | Yes (via kwargs) | Pointer to the shared transfer buffer |
| buffer_size | int | Yes (via kwargs) | Size of the shared transfer buffer in bytes |
| align_bytes | int | Yes (via kwargs) | Byte alignment for buffer operations |
| tp_rank | int | Yes (via kwargs) | Tensor parallelism rank |
| peer_init_url | str | Yes (via kwargs) | URL for peer initialization (ZMQ bind address) |
| peer_lookup_url | str | No (via kwargs) | Optional URL for peer lookup in P2P mode |
| event_loop | asyncio.AbstractEventLoop | No (via kwargs) | Required when async_mode is True |
| local_id | str | Yes (for init) | Local instance identifier |
| peer_id | str | Yes (for init) | Remote peer identifier |
| peer_init_url | str | Yes (for init) | ZMQ URL to connect to for handshake |
Outputs
| Name | Type | Description |
|---|---|---|
| async_lazy_init_peer_connection return | Optional[InitSideRetMsgBase] | Side message from peer during initialization, or None if no side message was sent |
Usage Examples
from lmcache.v1.transfer_channel.py_socket_channel import PySocketChannel
# Create a channel in async mode
channel = PySocketChannel(
async_mode=True,
role="sender",
buffer_ptr=buffer_pointer,
buffer_size=1024 * 1024,
align_bytes=64,
tp_rank=0,
peer_init_url="5555",
event_loop=asyncio.get_event_loop(),
)
# Initialize a peer connection asynchronously
ret_msg = await channel.async_lazy_init_peer_connection(
local_id="node_0",
peer_id="node_1",
peer_init_url="5556",
)
# Close when done
channel.close()