Implementation:LMCache LMCache Periodic Thread
| Knowledge Sources | |
|---|---|
| Domains | Thread Management, Background Processing, Monitoring |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
The periodic thread module provides a unified abstraction for creating, managing, and monitoring periodic background threads with configurable intervals, importance levels, and a global singleton registry.
Description
The module defines PeriodicThread, an abstract base class that standardizes periodic background task execution with named daemon threads, configurable execution intervals, initial wait times, and interruptible sleep via threading.Event. Each execution cycle produces a ThreadRunSummary dataclass capturing timestamp, duration, success status, and optional extra info. The class tracks cumulative statistics (total runs, failed runs, success rate) and supports liveness checks via is_running and is_active properties. A ThreadLevel enum classifies threads as CRITICAL, HIGH, MEDIUM, or LOW importance. The PeriodicThreadRegistry singleton provides centralized tracking with registration, lookup by name or level, running/active counts, and comprehensive summaries. A create_periodic_thread factory function allows creating simple periodic threads from plain callables without subclassing. The framework handles IrrecoverableException by stopping the loop, while regular exceptions are logged and the loop continues.
Usage
Use this module to create any periodic background task in LMCache (health monitoring, stats logging, memory expansion, etc.) with consistent lifecycle management and monitoring. Subclass PeriodicThread and implement _execute, or use create_periodic_thread for simple cases.
Code Reference
Source Location
- Repository: LMCache
- File: lmcache/v1/periodic_thread.py
- Lines: 1-580
Signature
class ThreadLevel(Enum):
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
@dataclass
class ThreadRunSummary:
timestamp: float = 0.0
duration_ms: float = 0.0
success: bool = True
message: str = ""
extra_info: Dict[str, str] = field(default_factory=dict)
class PeriodicThread(ABC):
def __init__(self, name: str, interval: float,
level: ThreadLevel = ThreadLevel.MEDIUM,
init_wait: float = 0.0) -> None: ...
def start(self) -> Optional[threading.Thread]: ...
def stop(self, timeout: float = 5.0) -> None: ...
@abstractmethod
def _execute(self) -> ThreadRunSummary: ...
def get_status(self) -> Dict: ...
class PeriodicThreadRegistry:
@classmethod
def get_instance(cls) -> "PeriodicThreadRegistry": ...
def register(self, thread: PeriodicThread) -> None: ...
def unregister(self, name: str) -> Optional[PeriodicThread]: ...
def get_summary(self) -> Dict: ...
def create_periodic_thread(
name: str, interval: float,
execute_fn: Callable[[], ThreadRunSummary],
level: ThreadLevel = ThreadLevel.MEDIUM,
init_wait: float = 0.0,
auto_register: bool = True,
) -> PeriodicThread: ...
Import
from lmcache.v1.periodic_thread import (
PeriodicThread,
PeriodicThreadRegistry,
ThreadLevel,
ThreadRunSummary,
create_periodic_thread,
)
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| name | str | Yes | Human-readable name for thread identification and registry lookup |
| interval | float | Yes | Time interval between executions in seconds |
| level | ThreadLevel | No | Thread importance level (default: MEDIUM) |
| init_wait | float | No | Initial wait time before first execution in seconds (default: 0.0) |
| execute_fn | Callable[[], ThreadRunSummary] | Yes (for factory) | Function to execute on each cycle when using create_periodic_thread |
| auto_register | bool | No | Whether to automatically register with the global registry (default: True) |
Outputs
| Name | Type | Description |
|---|---|---|
| start() | Optional[threading.Thread] | The started daemon thread, or None if already running |
| _execute() | ThreadRunSummary | Summary of the execution cycle including success status and duration |
| get_status() | Dict | Complete thread status: name, level, interval, is_running, is_active, total_runs, failed_runs, success_rate, last_summary |
| get_summary() | Dict | Registry-wide summary: total_count, running_count, active_count, by_level breakdown, per-thread statuses |
Usage Examples
from lmcache.v1.periodic_thread import (
PeriodicThread, ThreadLevel, ThreadRunSummary,
PeriodicThreadRegistry, create_periodic_thread,
)
import time
# Subclass approach
class HealthMonitorThread(PeriodicThread):
def __init__(self):
super().__init__(
name="health-monitor-thread",
interval=30.0,
level=ThreadLevel.CRITICAL,
init_wait=5.0,
)
def _execute(self) -> ThreadRunSummary:
# Perform health check
return ThreadRunSummary(
timestamp=time.time(),
success=True,
message="All backends healthy",
)
thread = HealthMonitorThread()
thread.start()
# Factory approach
simple_thread = create_periodic_thread(
name="stats-logger",
interval=60.0,
execute_fn=lambda: ThreadRunSummary(success=True),
level=ThreadLevel.MEDIUM,
)
simple_thread.start()
# Registry inspection
registry = PeriodicThreadRegistry.get_instance()
summary = registry.get_summary()
print(f"Running threads: {summary['running_count']}")