Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:OpenHands OpenHands ClusteredConversationManager Redis Subscribe

From Leeroopedia
Knowledge Sources
Domains Distributed_Systems, Conversation_Management
Last Updated 2026-02-11 21:00 GMT

Overview

Concrete tool for subscribing to Redis pub/sub channels to synchronize conversation state across cluster nodes, provided by the OpenHands enterprise conversation management layer.

Description

The _redis_subscribe method is the main subscription loop in the ClusteredConversationManager class, which extends StandaloneConversationManager to add multi-node coordination. The method subscribes to a Redis pub/sub channel and continuously listens for state update messages from other cluster nodes.

When a message arrives, it is dispatched to _process_message, which parses the message payload and takes appropriate action based on the event type:

  • Status updates -- The local in-memory conversation state cache is updated and connected WebSocket clients are notified via Socket.IO.
  • Conversation stops -- The conversation is removed from the local state cache, and any connected clients are disconnected.
  • New conversations -- The conversation is added to the local state cache so that clients connecting to this node can discover it.

The class also provides _update_state_in_redis, which is the publishing counterpart: when a locally-owned conversation's state changes, this method writes the updated state to a Redis hash and publishes a notification to the channel so other nodes can update their caches.

The ClusteredConversationManager is a dataclass with inherited fields from StandaloneConversationManager plus Redis connection and pub/sub configuration.

Usage

The _redis_subscribe method is started as a background asyncio task when the ClusteredConversationManager initializes. It runs for the lifetime of the server process. The companion methods _process_message and _update_state_in_redis are called internally as part of the subscription and publishing workflows.

Code Reference

Source Location

  • Repository: OpenHands
  • File: enterprise/server/clustered_conversation_manager.py
  • Lines: L107-133 (_redis_subscribe), L135-211 (_process_message), L435-483 (_update_state_in_redis)

Signature

async def _redis_subscribe(self) -> None
async def _process_message(self, message: dict) -> None
async def _update_state_in_redis(self) -> None

Import

from enterprise.server.clustered_conversation_manager import ClusteredConversationManager

I/O Contract

Inputs

Name Type Required Description
(self) ClusteredConversationManager Yes The manager instance, which holds the Redis connection, the pub/sub channel name, the local state cache, and the Socket.IO server reference.

For _process_message:

Name Type Required Description
message dict Yes A deserialized Redis pub/sub message containing keys such as type (event type), conversation_id, source_node, and event-specific payload fields like new_status.

Outputs

Name Type Description
(side effect) None _redis_subscribe runs indefinitely as a background task. _process_message updates local state and emits Socket.IO events. _update_state_in_redis writes to the Redis hash and publishes to the channel. None of these methods return a value.

Usage Examples

Basic Usage

import asyncio
from enterprise.server.clustered_conversation_manager import ClusteredConversationManager

# The subscribe loop is typically started as a background task during initialization
manager = ClusteredConversationManager(
    sio=sio_server,
    config=app_config,
    server_config=server_config,
    file_store=file_store,
    # ... additional fields
)

# Start the subscription loop as a background task
subscribe_task = asyncio.create_task(manager._redis_subscribe())

# The task runs indefinitely, processing messages from other nodes
# To stop it during shutdown:
subscribe_task.cancel()

Publishing State Updates

# When a locally-owned conversation changes state,
# _update_state_in_redis is called to propagate the change

# This happens automatically within the manager's internal workflows,
# but conceptually it works like:
await manager._update_state_in_redis()
# This writes the current state of all local conversations to the Redis hash
# and publishes change notifications to the pub/sub channel

Related Pages

Implements Principle

Environment

Heuristics

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment