Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:LMCache LMCache Redis Connector

From Leeroopedia


Knowledge Sources
Domains Caching, Storage Connectors, Redis
Last Updated 2026-02-09 00:00 GMT

Overview

The Redis connector module provides three RemoteConnector implementations for storing KV cache data in Redis: standalone (RedisConnector), Sentinel-based (RedisSentinelConnector), and cluster-based (RedisClusterConnector).

Description

This module implements three Redis-backed storage connectors using the redis-py async library. RedisConnector connects to a single Redis instance via URL with a connection pool (default 150 max connections) and an asyncio semaphore for concurrency control. RedisSentinelConnector uses Redis Sentinel for automatic master/slave failover, reading from slaves and writing to the master, configured through environment variables (REDIS_SERVICE_NAME, REDIS_TIMEOUT). RedisClusterConnector uses RedisCluster from redis-py to connect to a Redis cluster with multiple startup nodes. All three connectors store metadata and KV bytes as separate Redis keys (suffixed with metadata and kv_bytes respectively), setting kv_bytes first to avoid race conditions. Both RedisConnector and RedisClusterConnector route operations through an AsyncPQExecutor priority queue and support batched put, batched async contains, and non-blocking batched get operations.

Usage

Use RedisConnector for single-instance Redis deployments (URL starts with redis://). Use RedisSentinelConnector for high-availability Redis setups with Sentinel failover (URL starts with redis-sentinel://). Use RedisClusterConnector for horizontally-scaled Redis cluster deployments (URL starts with redis-cluster://).

Code Reference

Source Location

Signature

class Priorities(IntEnum):
    PEEK = auto()
    PREFETCH = auto()
    GET = auto()
    PUT = auto()

class RedisConnector(RemoteConnector):
    def __init__(self, url: str, loop: asyncio.AbstractEventLoop,
                 local_cpu_backend: LocalCPUBackend): ...
    async def exists(self, key: CacheEngineKey) -> bool: ...
    def exists_sync(self, key: CacheEngineKey) -> bool: ...
    async def get(self, key: CacheEngineKey) -> Optional[MemoryObj]: ...
    async def put(self, key: CacheEngineKey, memory_obj: MemoryObj): ...
    async def batched_put(self, keys: List[CacheEngineKey], memory_objs: List[MemoryObj]): ...
    async def batched_async_contains(self, lookup_id: str, keys: List[CacheEngineKey], pin: bool = False) -> int: ...
    async def batched_get_non_blocking(self, lookup_id: str, keys: List[CacheEngineKey]) -> List[MemoryObj]: ...
    async def close(self): ...

class RedisSentinelConnector(RemoteConnector):
    def __init__(self, hosts_and_ports: List[Tuple[str, int]], username: str,
                 password: str, loop: asyncio.AbstractEventLoop,
                 local_cpu_backend: LocalCPUBackend): ...
    async def exists(self, key: CacheEngineKey) -> bool: ...
    async def get(self, key: CacheEngineKey) -> Optional[MemoryObj]: ...
    async def put(self, key: CacheEngineKey, memory_obj: MemoryObj): ...
    async def close(self): ...

class RedisClusterConnector(RemoteConnector):
    def __init__(self, hosts_and_ports: List[Tuple[str, int]], username: str,
                 password: str, loop: asyncio.AbstractEventLoop,
                 local_cpu_backend: LocalCPUBackend): ...
    async def exists(self, key: CacheEngineKey) -> bool: ...
    async def get(self, key: CacheEngineKey) -> Optional[MemoryObj]: ...
    async def put(self, key: CacheEngineKey, memory_obj: MemoryObj): ...
    async def batched_put(self, keys: List[CacheEngineKey], memory_objs: List[MemoryObj]): ...
    async def batched_async_contains(self, lookup_id: str, keys: List[CacheEngineKey], pin: bool = False) -> int: ...
    async def batched_get_non_blocking(self, lookup_id: str, keys: List[CacheEngineKey]) -> List[MemoryObj]: ...
    async def close(self): ...

Import

from lmcache.v1.storage_backend.connector.redis_connector import (
    RedisConnector,
    RedisSentinelConnector,
    RedisClusterConnector,
)

I/O Contract

Inputs

Name Type Required Description
url str Yes (RedisConnector) Redis connection URL (e.g., "redis://localhost:6379")
hosts_and_ports List[Tuple[str, int]] Yes (Sentinel/Cluster) List of (host, port) tuples for Sentinel or Cluster nodes
username str Yes (Sentinel/Cluster) Redis authentication username
password str Yes (Sentinel/Cluster) Redis authentication password
loop asyncio.AbstractEventLoop Yes Asyncio event loop for async operations
local_cpu_backend LocalCPUBackend Yes CPU backend for memory allocation

Outputs

Name Type Description
RedisConnector RemoteConnector Connector to a single Redis instance with connection pooling and priority queue execution
RedisSentinelConnector RemoteConnector Connector using Redis Sentinel for master/slave failover with read-from-slave pattern
RedisClusterConnector RemoteConnector Connector to a Redis Cluster with connection pooling and priority queue execution

Usage Examples

from lmcache.v1.storage_backend.connector.redis_connector import RedisConnector

# Single Redis instance
connector = RedisConnector(
    url="redis://localhost:6379",
    loop=asyncio.get_event_loop(),
    local_cpu_backend=local_cpu_backend,
)

# Store and retrieve KV cache
await connector.put(key, memory_obj)
result = await connector.get(key)

# Batched operations
await connector.batched_put(keys, memory_objs)
hit_count = await connector.batched_async_contains("lookup-1", keys)

# Close connection
await connector.close()

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment