Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:LMCache LMCache Controller Utils

From Leeroopedia
Revision as of 15:24, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/LMCache_LMCache_Controller_Utils.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Knowledge Sources
Domains Distributed Caching, Data Structures
Last Updated 2026-02-09 00:00 GMT

Overview

This module provides the core data structures for the cache controller registry: the hierarchical RegistryTree and its constituent WorkerNode and InstanceNode classes.

Description

The utils.py module implements a tree-structured registry for managing KV cache metadata across a distributed cluster. The hierarchy is RegistryTree -> InstanceNode -> WorkerNode. Each WorkerNode stores per-location KV chunk sets with sequence tracking, supports thread-safe batched operations via fine-grained locking, and tracks full sync state. InstanceNode groups workers by instance and provides instance-level key lookups. RegistryTree is the top-level container with read-write locking, optimistic reads, and cached worker info accessors. The module also defines supporting types: KVChunkInfo (a NamedTuple for chunk location metadata), FullSyncState (an enum for sync states), and WorkerSyncInfo (a dataclass for per-worker sync tracking). The locking hierarchy is carefully designed for concurrent throughput: RegistryTree lock > InstanceNode lock > WorkerNode lock.

Usage

This module is imported by the cache controller components (KVController, RegistrationController, FullSyncTracker) to manage and query the cluster's KV cache state. It is not intended for direct end-user usage.

Code Reference

Source Location

Signature

class KVChunkInfo(NamedTuple):
    instance_id: str
    worker_id: int
    location: str

class FullSyncState(Enum):
    IDLE = "idle"
    SYNCING = "syncing"
    COMPLETED = "completed"
    FAILED = "failed"

@dataclass
class WorkerSyncInfo:
    sync_id: str
    state: FullSyncState
    start_time: float
    expected_total_keys: int
    expected_batch_count: int
    received_batches: Set[int]
    received_keys_count: int
    last_activity_time: float

@dataclass
class WorkerNode:
    worker_id: int
    ip: str
    port: int
    peer_init_url: Optional[str]
    socket: Optional[zmq.asyncio.Socket]
    registration_time: float
    last_heartbeat_time: float
    def handle_batched_kv_operations(self, msg, on_seq_num_out_of_order=None, is_full_sync=False) -> bool: ...
    def has_kv(self, location: str, key: int) -> bool: ...
    def get_kv_keys(self, location: str) -> set[int]: ...
    def find_key(self, key: int) -> Optional[tuple[KVChunkInfo, Optional[str], set[int]]]: ...
    def to_worker_info(self, instance_id: str) -> WorkerInfo: ...

@dataclass
class InstanceNode:
    instance_id: str
    workers: dict[int, WorkerNode]
    def add_worker(self, worker_node: WorkerNode) -> None: ...
    def remove_worker(self, worker_id: int) -> Optional[WorkerNode]: ...
    def get_worker(self, worker_id: int) -> Optional[WorkerNode]: ...
    def find_key(self, key: int) -> Optional[tuple[KVChunkInfo, Optional[str], set[int]]]: ...

class RegistryTree:
    def __init__(self) -> None: ...
    def register_worker(self, instance_id, worker_id, ip, port, peer_init_url, socket, registration_time) -> WorkerNode: ...
    def deregister_worker(self, instance_id, worker_id) -> Optional[WorkerNode]: ...
    def handle_batched_kv_operations(self, msg, is_full_sync=False) -> bool: ...
    def find_kv(self, key, exclude_instance_id=None) -> Optional[KVChunkInfo]: ...
    def find_kv_with_worker_info(self, key, exclude_instance_id=None) -> Optional[tuple]: ...
    def get_total_kv_count(self) -> int: ...
    def get_worker_kv_keys(self, instance_id, worker_id, location) -> set[int]: ...
    def clear_worker_kv(self, instance_id, worker_id, location=None) -> bool: ...

Import

from lmcache.v1.cache_controller.utils import (
    RegistryTree, WorkerNode, InstanceNode,
    KVChunkInfo, FullSyncState, WorkerSyncInfo,
)

I/O Contract

Inputs

Name Type Required Description
instance_id str Yes Unique identifier for an LMCache instance
worker_id int Yes Worker identifier within an instance (typically TP rank)
ip str Yes IP address of the worker
port int Yes Port of the worker
peer_init_url Optional[str] No URL for peer-to-peer KV transfer (only when P2P is enabled)
socket zmq.asyncio.Socket Yes ZMQ async socket for controller-to-worker communication
msg BatchedKVOperationMsg Yes (for KV ops) Batched KV operation message with admit/evict operations
key int Yes (for lookups) The chunk hash key to search for

Outputs

Name Type Description
WorkerNode WorkerNode Registered worker node with metadata and KV store
KVChunkInfo KVChunkInfo Named tuple with instance_id, worker_id, and location of a found KV chunk
set[int] set[int] Set of chunk hash keys at a given worker location
list[WorkerInfo] list[WorkerInfo] Cached list of all worker metadata across the cluster

Usage Examples

from lmcache.v1.cache_controller.utils import RegistryTree

# Create the registry
registry = RegistryTree()

# Register a worker
worker_node = registry.register_worker(
    instance_id="inst_0",
    worker_id=0,
    ip="10.0.0.1",
    port=5000,
    peer_init_url="tcp://10.0.0.1:5001",
    socket=zmq_socket,
    registration_time=1700000000.0,
)

# Look up a KV chunk by hash across all instances
result = registry.find_kv(key=123456)
if result is not None:
    print(f"Found at instance={result.instance_id}, location={result.location}")

# Get total KV count for monitoring
total_keys = registry.get_total_kv_count()

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment