Implementation:OpenHands OpenHands ClusteredConversationManager Redis Subscribe
| Knowledge Sources | |
|---|---|
| Domains | Distributed_Systems, Conversation_Management |
| Last Updated | 2026-02-11 21:00 GMT |
Overview
Concrete tool for subscribing to Redis pub/sub channels to synchronize conversation state across cluster nodes, provided by the OpenHands enterprise conversation management layer.
Description
The _redis_subscribe method is the main subscription loop in the ClusteredConversationManager class, which extends StandaloneConversationManager to add multi-node coordination. The method subscribes to a Redis pub/sub channel and continuously listens for state update messages from other cluster nodes.
When a message arrives, it is dispatched to _process_message, which parses the message payload and takes appropriate action based on the event type:
- Status updates -- The local in-memory conversation state cache is updated and connected WebSocket clients are notified via Socket.IO.
- Conversation stops -- The conversation is removed from the local state cache, and any connected clients are disconnected.
- New conversations -- The conversation is added to the local state cache so that clients connecting to this node can discover it.
The class also provides _update_state_in_redis, which is the publishing counterpart: when a locally-owned conversation's state changes, this method writes the updated state to a Redis hash and publishes a notification to the channel so other nodes can update their caches.
The ClusteredConversationManager is a dataclass with inherited fields from StandaloneConversationManager plus Redis connection and pub/sub configuration.
Usage
The _redis_subscribe method is started as a background asyncio task when the ClusteredConversationManager initializes. It runs for the lifetime of the server process. The companion methods _process_message and _update_state_in_redis are called internally as part of the subscription and publishing workflows.
Code Reference
Source Location
- Repository: OpenHands
- File:
enterprise/server/clustered_conversation_manager.py - Lines: L107-133 (_redis_subscribe), L135-211 (_process_message), L435-483 (_update_state_in_redis)
Signature
async def _redis_subscribe(self) -> None
async def _process_message(self, message: dict) -> None
async def _update_state_in_redis(self) -> None
Import
from enterprise.server.clustered_conversation_manager import ClusteredConversationManager
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| (self) | ClusteredConversationManager | Yes | The manager instance, which holds the Redis connection, the pub/sub channel name, the local state cache, and the Socket.IO server reference. |
For _process_message:
| Name | Type | Required | Description |
|---|---|---|---|
| message | dict | Yes | A deserialized Redis pub/sub message containing keys such as type (event type), conversation_id, source_node, and event-specific payload fields like new_status.
|
Outputs
| Name | Type | Description |
|---|---|---|
| (side effect) | None | _redis_subscribe runs indefinitely as a background task. _process_message updates local state and emits Socket.IO events. _update_state_in_redis writes to the Redis hash and publishes to the channel. None of these methods return a value. |
Usage Examples
Basic Usage
import asyncio
from enterprise.server.clustered_conversation_manager import ClusteredConversationManager
# The subscribe loop is typically started as a background task during initialization
manager = ClusteredConversationManager(
sio=sio_server,
config=app_config,
server_config=server_config,
file_store=file_store,
# ... additional fields
)
# Start the subscription loop as a background task
subscribe_task = asyncio.create_task(manager._redis_subscribe())
# The task runs indefinitely, processing messages from other nodes
# To stop it during shutdown:
subscribe_task.cancel()
Publishing State Updates
# When a locally-owned conversation changes state,
# _update_state_in_redis is called to propagate the change
# This happens automatically within the manager's internal workflows,
# but conceptually it works like:
await manager._update_state_in_redis()
# This writes the current state of all local conversations to the Redis hash
# and publishes change notifications to the pub/sub channel