Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:LMCache LMCache Messaging Future

From Leeroopedia


Knowledge Sources
Domains Asynchronous Programming, Inter-Process Communication
Last Updated 2026-02-09 00:00 GMT

Overview

MessagingFuture and CUDAMessagingFuture are thread-safe future classes used for asynchronous result handling in LMCache's inter-process messaging system.

Description

MessagingFuture[T] is a generic future backed by a threading.Event that allows consumers to query, wait for, or retrieve the result of an asynchronous operation. The set_result method is intended to be called only by the messaging system when the result becomes available. CUDAMessagingFuture[T] extends the base future to additionally synchronize on a CUDA IPC event. It wraps a raw future that returns a tuple of serialized CUDA event bytes and the actual result, deserializing the event on first access and calling event.synchronize() before returning the result. This ensures CUDA operations on the producing side have completed before the consumer accesses the data.

Usage

Use MessagingFuture for general asynchronous RPC result handling in the multiprocess protocol. Use CUDAMessagingFuture when the result involves GPU data transfers that require CUDA event synchronization before the data is safe to read.

Code Reference

Source Location

Signature

class MessagingFuture(Generic[T]):
    def __init__(self): ...
    def query(self) -> bool: ...
    def wait(self, timeout: Optional[float] = None) -> bool: ...
    def result(self, timeout: Optional[float] = None) -> T: ...
    def set_result(self, result: T) -> None: ...
    def to_cuda_future(self, device: torch.cuda.device | None = None) -> "CUDAMessagingFuture": ...

class CUDAMessagingFuture(MessagingFuture[T]):
    def __init__(self, raw_future: MessagingFuture[tuple[bytes, T]], device: torch.cuda.device | None = None): ...
    def query(self) -> bool: ...
    def wait(self, timeout: Optional[float] = None) -> bool: ...
    def result(self, timeout: Optional[float] = None) -> T: ...

    @staticmethod
    def FromMessagingFuture(
        raw_future: MessagingFuture[tuple[bytes, T]],
        device: torch.cuda.device | None = None,
    ) -> "CUDAMessagingFuture[T]": ...

Import

from lmcache.v1.multiprocess.futures import MessagingFuture, CUDAMessagingFuture

I/O Contract

Inputs

Name Type Required Description
timeout Optional[float] No Maximum seconds to wait for result; None means wait indefinitely
result T Yes The result value to set on the future (called by messaging system only)
raw_future MessagingFuture[tuple[bytes, T]] Yes Underlying future returning serialized CUDA event and result (for CUDAMessagingFuture)
device torch.cuda.device or None No Target CUDA device for event deserialization (defaults to current device)

Outputs

Name Type Description
query bool True if the future is done (and CUDA event is complete for CUDAMessagingFuture)
wait bool True if the future completed, False if timeout was reached
result T The result value, blocks until available or raises TimeoutError

Usage Examples

from lmcache.v1.multiprocess.futures import MessagingFuture

# Create a future and check status
future = MessagingFuture()
assert not future.query()

# In the messaging system callback
future.set_result(42)

# Consumer side
if future.wait(timeout=5.0):
    value = future.result()
    print(f"Got result: {value}")

# Convert to CUDA future for GPU-synchronized results
cuda_future = future.to_cuda_future(device=torch.device("cuda:0"))

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment