Implementation:LMCache LMCache KVController
| Knowledge Sources | |
|---|---|
| Domains | Distributed Caching, KV Cache Management |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
KVController manages the lifecycle of KV cache chunks across cluster instances and workers, handling lookups, operations dispatch, and full sync coordination.
Description
The KVController class is a central component in the LMCache cache controller system. It maintains a registry tree that maps (instance_id, worker_id) pairs to their stored KV chunk hashes organized by location. The controller delegates cache operations (clear, pin, compress, decompress, move) to a cluster executor and handles the full sync protocol for re-synchronizing worker state after controller restarts. It also supports token-based prefix lookups and batched peer-to-peer lookups for distributed KV cache sharing.
Usage
This class is instantiated by the cache controller server to manage KV cache metadata. It is used internally by the controller and should not be imported directly by end users. Other controller components interact with it via message-based interfaces.
Code Reference
Source Location
- Repository: LMCache
- File: lmcache/v1/cache_controller/controllers/kv_controller.py
- Lines: 1-439
Signature
class KVController:
def __init__(
self,
registry: RegistryTree,
full_sync_completion_threshold: float = 0.8,
full_sync_timeout_s: float = 300.0,
) -> None: ...
def post_init(
self, reg_controller: "RegistrationController", cluster_executor: Any
) -> None: ...
async def clear(self, msg: ClearMsg) -> ClearRetMsg: ...
async def pin(self, msg: PinMsg) -> PinRetMsg: ...
async def compress(self, msg: CompressMsg) -> CompressRetMsg: ...
async def decompress(self, msg: DecompressMsg) -> DecompressRetMsg: ...
async def move(self, msg: MoveMsg) -> MoveRetMsg: ...
async def check_finish(self, msg: CheckFinishMsg) -> CheckFinishRetMsg: ...
async def handle_batched_kv_operations(self, msg: BatchedKVOperationMsg) -> None: ...
async def handle_full_sync_start(self, msg: FullSyncStartMsg) -> FullSyncStartRetMsg: ...
async def handle_full_sync_batch(self, msg: FullSyncBatchMsg) -> None: ...
async def handle_full_sync_end(self, msg: FullSyncEndMsg) -> None: ...
async def handle_full_sync_status(self, msg: FullSyncStatusMsg) -> FullSyncStatusRetMsg: ...
async def lookup(self, msg: LookupMsg) -> LookupRetMsg: ...
async def batched_p2p_lookup(self, msg: BatchedP2PLookupMsg) -> BatchedP2PLookupRetMsg: ...
Import
from lmcache.v1.cache_controller.controllers.kv_controller import KVController
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| registry | RegistryTree | Yes | The central registry tree tracking instances, workers, and their KV chunks |
| full_sync_completion_threshold | float | No | Fraction of workers that must complete sync before freeze mode can exit (default 0.8) |
| full_sync_timeout_s | float | No | Timeout in seconds for full sync operations (default 300.0) |
Outputs
| Name | Type | Description |
|---|---|---|
| LookupRetMsg | LookupRetMsg | Layout info mapping instance IDs to matched location and token end position |
| BatchedP2PLookupRetMsg | BatchedP2PLookupRetMsg | Peer-to-peer lookup results with instance ID, location, hit count, and peer init URL |
| ClearRetMsg / PinRetMsg / etc. | OrchRetMsg subclasses | Operation return messages with event IDs and token counts |
| FullSyncStartRetMsg | FullSyncStartRetMsg | Acceptance or rejection of a full sync start request |
| FullSyncStatusRetMsg | FullSyncStatusRetMsg | Sync completion status, global progress, freeze exit flag, and missing batch list |
Usage Examples
from lmcache.v1.cache_controller.utils import RegistryTree
from lmcache.v1.cache_controller.controllers.kv_controller import KVController
from lmcache.v1.cache_controller.message import LookupMsg
# Create registry and controller
registry = RegistryTree()
kv_ctrl = KVController(registry, full_sync_completion_threshold=0.8)
# Post-initialization with registration controller and executor
kv_ctrl.post_init(reg_controller, cluster_executor)
# Perform a token-based lookup
lookup_msg = LookupMsg(event_id="evt_001", tokens=[101, 202, 303])
result = await kv_ctrl.lookup(lookup_msg)
# result.layout_info maps instance_id -> (location, matched_end)