Implementation:LMCache LMCache Redis Connector
| Knowledge Sources | |
|---|---|
| Domains | Caching, Storage Connectors, Redis |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
The Redis connector module provides three RemoteConnector implementations for storing KV cache data in Redis: standalone (RedisConnector), Sentinel-based (RedisSentinelConnector), and cluster-based (RedisClusterConnector).
Description
This module implements three Redis-backed storage connectors using the redis-py async library. RedisConnector connects to a single Redis instance via URL with a connection pool (default 150 max connections) and an asyncio semaphore for concurrency control. RedisSentinelConnector uses Redis Sentinel for automatic master/slave failover, reading from slaves and writing to the master, configured through environment variables (REDIS_SERVICE_NAME, REDIS_TIMEOUT). RedisClusterConnector uses RedisCluster from redis-py to connect to a Redis cluster with multiple startup nodes. All three connectors store metadata and KV bytes as separate Redis keys (suffixed with metadata and kv_bytes respectively), setting kv_bytes first to avoid race conditions. Both RedisConnector and RedisClusterConnector route operations through an AsyncPQExecutor priority queue and support batched put, batched async contains, and non-blocking batched get operations.
Usage
Use RedisConnector for single-instance Redis deployments (URL starts with redis://). Use RedisSentinelConnector for high-availability Redis setups with Sentinel failover (URL starts with redis-sentinel://). Use RedisClusterConnector for horizontally-scaled Redis cluster deployments (URL starts with redis-cluster://).
Code Reference
Source Location
- Repository: LMCache
- File: lmcache/v1/storage_backend/connector/redis_connector.py
- Lines: 1-627
Signature
class Priorities(IntEnum):
PEEK = auto()
PREFETCH = auto()
GET = auto()
PUT = auto()
class RedisConnector(RemoteConnector):
def __init__(self, url: str, loop: asyncio.AbstractEventLoop,
local_cpu_backend: LocalCPUBackend): ...
async def exists(self, key: CacheEngineKey) -> bool: ...
def exists_sync(self, key: CacheEngineKey) -> bool: ...
async def get(self, key: CacheEngineKey) -> Optional[MemoryObj]: ...
async def put(self, key: CacheEngineKey, memory_obj: MemoryObj): ...
async def batched_put(self, keys: List[CacheEngineKey], memory_objs: List[MemoryObj]): ...
async def batched_async_contains(self, lookup_id: str, keys: List[CacheEngineKey], pin: bool = False) -> int: ...
async def batched_get_non_blocking(self, lookup_id: str, keys: List[CacheEngineKey]) -> List[MemoryObj]: ...
async def close(self): ...
class RedisSentinelConnector(RemoteConnector):
def __init__(self, hosts_and_ports: List[Tuple[str, int]], username: str,
password: str, loop: asyncio.AbstractEventLoop,
local_cpu_backend: LocalCPUBackend): ...
async def exists(self, key: CacheEngineKey) -> bool: ...
async def get(self, key: CacheEngineKey) -> Optional[MemoryObj]: ...
async def put(self, key: CacheEngineKey, memory_obj: MemoryObj): ...
async def close(self): ...
class RedisClusterConnector(RemoteConnector):
def __init__(self, hosts_and_ports: List[Tuple[str, int]], username: str,
password: str, loop: asyncio.AbstractEventLoop,
local_cpu_backend: LocalCPUBackend): ...
async def exists(self, key: CacheEngineKey) -> bool: ...
async def get(self, key: CacheEngineKey) -> Optional[MemoryObj]: ...
async def put(self, key: CacheEngineKey, memory_obj: MemoryObj): ...
async def batched_put(self, keys: List[CacheEngineKey], memory_objs: List[MemoryObj]): ...
async def batched_async_contains(self, lookup_id: str, keys: List[CacheEngineKey], pin: bool = False) -> int: ...
async def batched_get_non_blocking(self, lookup_id: str, keys: List[CacheEngineKey]) -> List[MemoryObj]: ...
async def close(self): ...
Import
from lmcache.v1.storage_backend.connector.redis_connector import (
RedisConnector,
RedisSentinelConnector,
RedisClusterConnector,
)
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| url | str | Yes (RedisConnector) | Redis connection URL (e.g., "redis://localhost:6379") |
| hosts_and_ports | List[Tuple[str, int]] | Yes (Sentinel/Cluster) | List of (host, port) tuples for Sentinel or Cluster nodes |
| username | str | Yes (Sentinel/Cluster) | Redis authentication username |
| password | str | Yes (Sentinel/Cluster) | Redis authentication password |
| loop | asyncio.AbstractEventLoop | Yes | Asyncio event loop for async operations |
| local_cpu_backend | LocalCPUBackend | Yes | CPU backend for memory allocation |
Outputs
| Name | Type | Description |
|---|---|---|
| RedisConnector | RemoteConnector | Connector to a single Redis instance with connection pooling and priority queue execution |
| RedisSentinelConnector | RemoteConnector | Connector using Redis Sentinel for master/slave failover with read-from-slave pattern |
| RedisClusterConnector | RemoteConnector | Connector to a Redis Cluster with connection pooling and priority queue execution |
Usage Examples
from lmcache.v1.storage_backend.connector.redis_connector import RedisConnector
# Single Redis instance
connector = RedisConnector(
url="redis://localhost:6379",
loop=asyncio.get_event_loop(),
local_cpu_backend=local_cpu_backend,
)
# Store and retrieve KV cache
await connector.put(key, memory_obj)
result = await connector.get(key)
# Batched operations
await connector.batched_put(keys, memory_objs)
hit_count = await connector.batched_async_contains("lookup-1", keys)
# Close connection
await connector.close()