Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:LMCache LMCache Controller Message

From Leeroopedia


Knowledge Sources
Domains Distributed Caching, Message Protocol
Last Updated 2026-02-09 00:00 GMT

Overview

This module defines the complete message protocol for communication between LMCache workers, the cache controller, and the orchestrator.

Description

The message.py module contains all message types used in the LMCache cache controller system, organized into four communication layers: (1) WorkerMsg for push-based messages from workers to the controller, (2) WorkerReqMsg / WorkerReqRetMsg for request-reply messages between workers and the controller, (3) ControlMsg / ControlRetMsg for controller-to-worker commands, and (4) OrchMsg / OrchRetMsg for orchestrator-to-controller interactions. All message classes inherit from MsgBase which extends msgspec.Struct with tagged union support, enabling efficient serialization via msgpack. The module also defines supporting types like OpType, KVOpEvent, WorkerInfo, and the comprehensive Msg union type.

Usage

Import specific message types when implementing handlers in the controller, executor, or worker components. The Msg union type is used for generic deserialization of incoming messages.

Code Reference

Source Location

Signature

class MsgBase(msgspec.Struct, tag=True): ...

# Worker -> Controller (push)
class WorkerMsg(MsgBase): ...
class KVAdmitMsg(KVOperationMsg): ...
class KVEvictMsg(KVOperationMsg): ...
class BatchedKVOperationMsg(WorkerMsg): ...
class FullSyncBatchMsg(WorkerMsg): ...
class FullSyncEndMsg(WorkerMsg): ...
class DeRegisterMsg(WorkerMsg): ...

# Worker <-> Controller (request-reply)
class WorkerReqMsg(MsgBase): ...
class RegisterMsg(WorkerReqMsg): ...
class HeartbeatMsg(WorkerReqMsg): ...
class BatchedP2PLookupMsg(WorkerReqMsg): ...
class FullSyncStartMsg(WorkerReqMsg): ...
class FullSyncStatusMsg(WorkerReqMsg): ...

class WorkerReqRetMsg(MsgBase): ...
class RegisterRetMsg(WorkerReqRetMsg): ...
class HeartbeatRetMsg(WorkerReqRetMsg): ...
class BatchedP2PLookupRetMsg(WorkerReqRetMsg): ...
class FullSyncStartRetMsg(WorkerReqRetMsg): ...
class FullSyncStatusRetMsg(WorkerReqRetMsg): ...

# Controller -> Worker (commands)
class ControlMsg(MsgBase): ...
class ClearWorkerMsg(ControlMsg): ...
class PinWorkerMsg(ControlMsg): ...
class CompressWorkerMsg(ControlMsg): ...
class DecompressWorkerMsg(ControlMsg): ...
class MoveWorkerMsg(ControlMsg): ...
class HealthWorkerMsg(ControlMsg): ...

class ControlRetMsg(MsgBase): ...
class ClearWorkerRetMsg(ControlRetMsg): ...
class PinWorkerRetMsg(ControlRetMsg): ...

# Orchestrator <-> Controller
class OrchMsg(MsgBase): ...
class LookupMsg(OrchMsg): ...
class ClearMsg(OrchMsg): ...
class PinMsg(OrchMsg): ...
class CompressMsg(OrchMsg): ...
class DecompressMsg(OrchMsg): ...
class MoveMsg(OrchMsg): ...
class HealthMsg(OrchMsg): ...
class QueryWorkerInfoMsg(OrchMsg): ...

class OrchRetMsg(MsgBase): ...
class LookupRetMsg(OrchRetMsg): ...
class ErrorMsg(WorkerReqRetMsg): ...

# Supporting types
class OpType(Enum): ...
class KVOpEvent(msgspec.Struct): ...

@dataclass
class WorkerInfo: ...

# Union of all message types for deserialization
Msg = Union[RegisterMsg, RegisterRetMsg, ..., FullSyncStatusRetMsg]

Import

from lmcache.v1.cache_controller.message import (
    Msg, MsgBase, RegisterMsg, RegisterRetMsg,
    LookupMsg, LookupRetMsg, ClearMsg, ClearRetMsg,
    BatchedKVOperationMsg, KVOpEvent, OpType,
    WorkerInfo, ErrorMsg,
)

I/O Contract

Inputs

Name Type Required Description
instance_id str Yes (most messages) Identifier of the LMCache instance
worker_id int Yes (worker messages) Identifier of the worker within an instance
event_id str Yes (orchestrator messages) Unique event identifier for tracking operations
tokens list[int] Varies Token IDs for lookup, pin, compress, or move operations
location str Varies Storage location identifier (e.g., backend name)
hashes list[int] Yes (P2P lookup) Chunk hashes for peer-to-peer lookup

Outputs

Name Type Description
All message classes MsgBase subclasses Structured messages serializable via msgspec for inter-component communication
Msg Union type Union of all message types for generic deserialization
WorkerInfo dataclass Worker metadata including IP, port, peer URL, and timestamps

Usage Examples

import msgspec
from lmcache.v1.cache_controller.message import (
    Msg, RegisterMsg, LookupMsg, BatchedKVOperationMsg,
    KVOpEvent, OpType,
)

# Create a registration message
reg_msg = RegisterMsg(
    instance_id="inst_0",
    worker_id=0,
    ip="192.168.1.10",
    port=5000,
    peer_init_url="tcp://192.168.1.10:5001",
)

# Serialize and deserialize
encoded = msgspec.msgpack.encode(reg_msg)
decoded = msgspec.msgpack.decode(encoded, type=Msg)

# Create a batched KV operation
batch_msg = BatchedKVOperationMsg(
    instance_id="inst_0",
    worker_id=0,
    location="hot_cache",
    operations=[
        KVOpEvent(op_type=OpType.ADMIT, key=12345, seq_num=0),
        KVOpEvent(op_type=OpType.ADMIT, key=67890, seq_num=1),
    ],
)

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment