Implementation:LMCache LMCache Controller Message
| Knowledge Sources | |
|---|---|
| Domains | Distributed Caching, Message Protocol |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
This module defines the complete message protocol for communication between LMCache workers, the cache controller, and the orchestrator.
Description
The message.py module contains all message types used in the LMCache cache controller system, organized into four communication layers: (1) WorkerMsg for push-based messages from workers to the controller, (2) WorkerReqMsg / WorkerReqRetMsg for request-reply messages between workers and the controller, (3) ControlMsg / ControlRetMsg for controller-to-worker commands, and (4) OrchMsg / OrchRetMsg for orchestrator-to-controller interactions. All message classes inherit from MsgBase which extends msgspec.Struct with tagged union support, enabling efficient serialization via msgpack. The module also defines supporting types like OpType, KVOpEvent, WorkerInfo, and the comprehensive Msg union type.
Usage
Import specific message types when implementing handlers in the controller, executor, or worker components. The Msg union type is used for generic deserialization of incoming messages.
Code Reference
Source Location
- Repository: LMCache
- File: lmcache/v1/cache_controller/message.py
- Lines: 1-828
Signature
class MsgBase(msgspec.Struct, tag=True): ...
# Worker -> Controller (push)
class WorkerMsg(MsgBase): ...
class KVAdmitMsg(KVOperationMsg): ...
class KVEvictMsg(KVOperationMsg): ...
class BatchedKVOperationMsg(WorkerMsg): ...
class FullSyncBatchMsg(WorkerMsg): ...
class FullSyncEndMsg(WorkerMsg): ...
class DeRegisterMsg(WorkerMsg): ...
# Worker <-> Controller (request-reply)
class WorkerReqMsg(MsgBase): ...
class RegisterMsg(WorkerReqMsg): ...
class HeartbeatMsg(WorkerReqMsg): ...
class BatchedP2PLookupMsg(WorkerReqMsg): ...
class FullSyncStartMsg(WorkerReqMsg): ...
class FullSyncStatusMsg(WorkerReqMsg): ...
class WorkerReqRetMsg(MsgBase): ...
class RegisterRetMsg(WorkerReqRetMsg): ...
class HeartbeatRetMsg(WorkerReqRetMsg): ...
class BatchedP2PLookupRetMsg(WorkerReqRetMsg): ...
class FullSyncStartRetMsg(WorkerReqRetMsg): ...
class FullSyncStatusRetMsg(WorkerReqRetMsg): ...
# Controller -> Worker (commands)
class ControlMsg(MsgBase): ...
class ClearWorkerMsg(ControlMsg): ...
class PinWorkerMsg(ControlMsg): ...
class CompressWorkerMsg(ControlMsg): ...
class DecompressWorkerMsg(ControlMsg): ...
class MoveWorkerMsg(ControlMsg): ...
class HealthWorkerMsg(ControlMsg): ...
class ControlRetMsg(MsgBase): ...
class ClearWorkerRetMsg(ControlRetMsg): ...
class PinWorkerRetMsg(ControlRetMsg): ...
# Orchestrator <-> Controller
class OrchMsg(MsgBase): ...
class LookupMsg(OrchMsg): ...
class ClearMsg(OrchMsg): ...
class PinMsg(OrchMsg): ...
class CompressMsg(OrchMsg): ...
class DecompressMsg(OrchMsg): ...
class MoveMsg(OrchMsg): ...
class HealthMsg(OrchMsg): ...
class QueryWorkerInfoMsg(OrchMsg): ...
class OrchRetMsg(MsgBase): ...
class LookupRetMsg(OrchRetMsg): ...
class ErrorMsg(WorkerReqRetMsg): ...
# Supporting types
class OpType(Enum): ...
class KVOpEvent(msgspec.Struct): ...
@dataclass
class WorkerInfo: ...
# Union of all message types for deserialization
Msg = Union[RegisterMsg, RegisterRetMsg, ..., FullSyncStatusRetMsg]
Import
from lmcache.v1.cache_controller.message import (
Msg, MsgBase, RegisterMsg, RegisterRetMsg,
LookupMsg, LookupRetMsg, ClearMsg, ClearRetMsg,
BatchedKVOperationMsg, KVOpEvent, OpType,
WorkerInfo, ErrorMsg,
)
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| instance_id | str | Yes (most messages) | Identifier of the LMCache instance |
| worker_id | int | Yes (worker messages) | Identifier of the worker within an instance |
| event_id | str | Yes (orchestrator messages) | Unique event identifier for tracking operations |
| tokens | list[int] | Varies | Token IDs for lookup, pin, compress, or move operations |
| location | str | Varies | Storage location identifier (e.g., backend name) |
| hashes | list[int] | Yes (P2P lookup) | Chunk hashes for peer-to-peer lookup |
Outputs
| Name | Type | Description |
|---|---|---|
| All message classes | MsgBase subclasses | Structured messages serializable via msgspec for inter-component communication |
| Msg | Union type | Union of all message types for generic deserialization |
| WorkerInfo | dataclass | Worker metadata including IP, port, peer URL, and timestamps |
Usage Examples
import msgspec
from lmcache.v1.cache_controller.message import (
Msg, RegisterMsg, LookupMsg, BatchedKVOperationMsg,
KVOpEvent, OpType,
)
# Create a registration message
reg_msg = RegisterMsg(
instance_id="inst_0",
worker_id=0,
ip="192.168.1.10",
port=5000,
peer_init_url="tcp://192.168.1.10:5001",
)
# Serialize and deserialize
encoded = msgspec.msgpack.encode(reg_msg)
decoded = msgspec.msgpack.decode(encoded, type=Msg)
# Create a batched KV operation
batch_msg = BatchedKVOperationMsg(
instance_id="inst_0",
worker_id=0,
location="hot_cache",
operations=[
KVOpEvent(op_type=OpType.ADMIT, key=12345, seq_num=0),
KVOpEvent(op_type=OpType.ADMIT, key=67890, seq_num=1),
],
)