Implementation:LMCache LMCache Controller Utils
| Knowledge Sources | |
|---|---|
| Domains | Distributed Caching, Data Structures |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
This module provides the core data structures for the cache controller registry: the hierarchical RegistryTree and its constituent WorkerNode and InstanceNode classes.
Description
The utils.py module implements a tree-structured registry for managing KV cache metadata across a distributed cluster. The hierarchy is RegistryTree -> InstanceNode -> WorkerNode. Each WorkerNode stores per-location KV chunk sets with sequence tracking, supports thread-safe batched operations via fine-grained locking, and tracks full sync state. InstanceNode groups workers by instance and provides instance-level key lookups. RegistryTree is the top-level container with read-write locking, optimistic reads, and cached worker info accessors. The module also defines supporting types: KVChunkInfo (a NamedTuple for chunk location metadata), FullSyncState (an enum for sync states), and WorkerSyncInfo (a dataclass for per-worker sync tracking). The locking hierarchy is carefully designed for concurrent throughput: RegistryTree lock > InstanceNode lock > WorkerNode lock.
Usage
This module is imported by the cache controller components (KVController, RegistrationController, FullSyncTracker) to manage and query the cluster's KV cache state. It is not intended for direct end-user usage.
Code Reference
Source Location
- Repository: LMCache
- File: lmcache/v1/cache_controller/utils.py
- Lines: 1-679
Signature
class KVChunkInfo(NamedTuple):
instance_id: str
worker_id: int
location: str
class FullSyncState(Enum):
IDLE = "idle"
SYNCING = "syncing"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class WorkerSyncInfo:
sync_id: str
state: FullSyncState
start_time: float
expected_total_keys: int
expected_batch_count: int
received_batches: Set[int]
received_keys_count: int
last_activity_time: float
@dataclass
class WorkerNode:
worker_id: int
ip: str
port: int
peer_init_url: Optional[str]
socket: Optional[zmq.asyncio.Socket]
registration_time: float
last_heartbeat_time: float
def handle_batched_kv_operations(self, msg, on_seq_num_out_of_order=None, is_full_sync=False) -> bool: ...
def has_kv(self, location: str, key: int) -> bool: ...
def get_kv_keys(self, location: str) -> set[int]: ...
def find_key(self, key: int) -> Optional[tuple[KVChunkInfo, Optional[str], set[int]]]: ...
def to_worker_info(self, instance_id: str) -> WorkerInfo: ...
@dataclass
class InstanceNode:
instance_id: str
workers: dict[int, WorkerNode]
def add_worker(self, worker_node: WorkerNode) -> None: ...
def remove_worker(self, worker_id: int) -> Optional[WorkerNode]: ...
def get_worker(self, worker_id: int) -> Optional[WorkerNode]: ...
def find_key(self, key: int) -> Optional[tuple[KVChunkInfo, Optional[str], set[int]]]: ...
class RegistryTree:
def __init__(self) -> None: ...
def register_worker(self, instance_id, worker_id, ip, port, peer_init_url, socket, registration_time) -> WorkerNode: ...
def deregister_worker(self, instance_id, worker_id) -> Optional[WorkerNode]: ...
def handle_batched_kv_operations(self, msg, is_full_sync=False) -> bool: ...
def find_kv(self, key, exclude_instance_id=None) -> Optional[KVChunkInfo]: ...
def find_kv_with_worker_info(self, key, exclude_instance_id=None) -> Optional[tuple]: ...
def get_total_kv_count(self) -> int: ...
def get_worker_kv_keys(self, instance_id, worker_id, location) -> set[int]: ...
def clear_worker_kv(self, instance_id, worker_id, location=None) -> bool: ...
Import
from lmcache.v1.cache_controller.utils import (
RegistryTree, WorkerNode, InstanceNode,
KVChunkInfo, FullSyncState, WorkerSyncInfo,
)
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| instance_id | str | Yes | Unique identifier for an LMCache instance |
| worker_id | int | Yes | Worker identifier within an instance (typically TP rank) |
| ip | str | Yes | IP address of the worker |
| port | int | Yes | Port of the worker |
| peer_init_url | Optional[str] | No | URL for peer-to-peer KV transfer (only when P2P is enabled) |
| socket | zmq.asyncio.Socket | Yes | ZMQ async socket for controller-to-worker communication |
| msg | BatchedKVOperationMsg | Yes (for KV ops) | Batched KV operation message with admit/evict operations |
| key | int | Yes (for lookups) | The chunk hash key to search for |
Outputs
| Name | Type | Description |
|---|---|---|
| WorkerNode | WorkerNode | Registered worker node with metadata and KV store |
| KVChunkInfo | KVChunkInfo | Named tuple with instance_id, worker_id, and location of a found KV chunk |
| set[int] | set[int] | Set of chunk hash keys at a given worker location |
| list[WorkerInfo] | list[WorkerInfo] | Cached list of all worker metadata across the cluster |
Usage Examples
from lmcache.v1.cache_controller.utils import RegistryTree
# Create the registry
registry = RegistryTree()
# Register a worker
worker_node = registry.register_worker(
instance_id="inst_0",
worker_id=0,
ip="10.0.0.1",
port=5000,
peer_init_url="tcp://10.0.0.1:5001",
socket=zmq_socket,
registration_time=1700000000.0,
)
# Look up a KV chunk by hash across all instances
result = registry.find_kv(key=123456)
if result is not None:
print(f"Found at instance={result.instance_id}, location={result.location}")
# Get total KV count for monitoring
total_keys = registry.get_total_kv_count()