Implementation:LMCache LMCache Messaging Future
| Knowledge Sources | |
|---|---|
| Domains | Asynchronous Programming, Inter-Process Communication |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
MessagingFuture and CUDAMessagingFuture are thread-safe future classes used for asynchronous result handling in LMCache's inter-process messaging system.
Description
MessagingFuture[T] is a generic future backed by a threading.Event that allows consumers to query, wait for, or retrieve the result of an asynchronous operation. The set_result method is intended to be called only by the messaging system when the result becomes available. CUDAMessagingFuture[T] extends the base future to additionally synchronize on a CUDA IPC event. It wraps a raw future that returns a tuple of serialized CUDA event bytes and the actual result, deserializing the event on first access and calling event.synchronize() before returning the result. This ensures CUDA operations on the producing side have completed before the consumer accesses the data.
Usage
Use MessagingFuture for general asynchronous RPC result handling in the multiprocess protocol. Use CUDAMessagingFuture when the result involves GPU data transfers that require CUDA event synchronization before the data is safe to read.
Code Reference
Source Location
- Repository: LMCache
- File: lmcache/v1/multiprocess/futures.py
- Lines: 1-194
Signature
class MessagingFuture(Generic[T]):
def __init__(self): ...
def query(self) -> bool: ...
def wait(self, timeout: Optional[float] = None) -> bool: ...
def result(self, timeout: Optional[float] = None) -> T: ...
def set_result(self, result: T) -> None: ...
def to_cuda_future(self, device: torch.cuda.device | None = None) -> "CUDAMessagingFuture": ...
class CUDAMessagingFuture(MessagingFuture[T]):
def __init__(self, raw_future: MessagingFuture[tuple[bytes, T]], device: torch.cuda.device | None = None): ...
def query(self) -> bool: ...
def wait(self, timeout: Optional[float] = None) -> bool: ...
def result(self, timeout: Optional[float] = None) -> T: ...
@staticmethod
def FromMessagingFuture(
raw_future: MessagingFuture[tuple[bytes, T]],
device: torch.cuda.device | None = None,
) -> "CUDAMessagingFuture[T]": ...
Import
from lmcache.v1.multiprocess.futures import MessagingFuture, CUDAMessagingFuture
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| timeout | Optional[float] | No | Maximum seconds to wait for result; None means wait indefinitely |
| result | T | Yes | The result value to set on the future (called by messaging system only) |
| raw_future | MessagingFuture[tuple[bytes, T]] | Yes | Underlying future returning serialized CUDA event and result (for CUDAMessagingFuture) |
| device | torch.cuda.device or None | No | Target CUDA device for event deserialization (defaults to current device) |
Outputs
| Name | Type | Description |
|---|---|---|
| query | bool | True if the future is done (and CUDA event is complete for CUDAMessagingFuture) |
| wait | bool | True if the future completed, False if timeout was reached |
| result | T | The result value, blocks until available or raises TimeoutError |
Usage Examples
from lmcache.v1.multiprocess.futures import MessagingFuture
# Create a future and check status
future = MessagingFuture()
assert not future.query()
# In the messaging system callback
future.set_result(42)
# Consumer side
if future.wait(timeout=5.0):
value = future.result()
print(f"Got result: {value}")
# Convert to CUDA future for GPU-synchronized results
cuda_future = future.to_cuda_future(device=torch.device("cuda:0"))