Implementation:OpenHands OpenHands ProactiveConversationStore
| Knowledge Sources | |
|---|---|
| Domains | Storage, CI_CD, Proactive_Automation |
| Last Updated | 2026-02-11 21:00 GMT |
Overview
Storage layer for CI/CD workflow status tracking that determines when to automatically trigger OpenHands conversations, provided by the OpenHands enterprise storage layer.
Description
ProactiveConversationStore manages the persistence and decision logic for the proactive conversation feature. This feature monitors CI/CD workflow runs and automatically creates OpenHands conversations when certain conditions are met (e.g., a workflow fails repeatedly).
The central method is store_workflow_information(), which records a workflow run and uses SELECT...FOR UPDATE to atomically evaluate whether the accumulated workflow data should trigger a new conversation. If the trigger conditions are satisfied, it returns a WorkflowRunGroup object that the caller uses to initiate the conversation. The row-level lock prevents race conditions when multiple webhook events arrive simultaneously for the same repository.
clean_old_convos() removes stale workflow records that are past their retention window, preventing unbounded table growth. get_repo_id() resolves a repository to its internal identifier for scoping queries.
The proactive conversation pattern is a key differentiator: rather than waiting for users to ask for help, OpenHands detects CI failures and proactively offers to investigate and fix them.
Usage
Use ProactiveConversationStore in webhook handlers that receive CI/CD workflow status events. Call store_workflow_information() for each incoming event and check the return value to decide whether to trigger a conversation. Use clean_old_convos() in periodic cleanup jobs.
Code Reference
Source Location
- Repository: OpenHands
- File: enterprise/storage/proactive_conversation_store.py
- Lines: 1-166
Signature
class ProactiveConversationStore:
def __init__(self, session: Session):
...
def store_workflow_information(
self,
repo_id: str,
workflow_run_id: str,
workflow_name: str,
status: str,
**kwargs
) -> Optional[WorkflowRunGroup]:
"""Uses SELECT...FOR UPDATE. Returns WorkflowRunGroup if conversation should be triggered."""
...
def clean_old_convos(self, retention_days: int = 30) -> int:
"""Removes workflow records older than the retention window. Returns count of deleted rows."""
...
def get_repo_id(self, owner: str, repo: str) -> Optional[str]:
"""Resolves a repository owner/name pair to its internal identifier."""
...
Import
from enterprise.storage.proactive_conversation_store import ProactiveConversationStore
I/O Contract
Inputs
Constructor
| Name | Type | Required | Description |
|---|---|---|---|
| session | Session | Yes | SQLAlchemy database session for executing queries |
store_workflow_information()
| Name | Type | Required | Description |
|---|---|---|---|
| repo_id | str | Yes | Internal repository identifier |
| workflow_run_id | str | Yes | The CI/CD platform's unique identifier for this workflow run |
| workflow_name | str | Yes | Name of the workflow (e.g., "CI Tests", "Build") |
| status | str | Yes | Outcome status of the workflow run (e.g., "failure", "success") |
| **kwargs | dict | No | Additional workflow metadata (e.g., branch, commit_sha, triggered_by) |
clean_old_convos()
| Name | Type | Required | Description |
|---|---|---|---|
| retention_days | int | No | Number of days to retain records (default: 30) |
get_repo_id()
| Name | Type | Required | Description |
|---|---|---|---|
| owner | str | Yes | Repository owner (e.g., "All-Hands-AI") |
| repo | str | Yes | Repository name (e.g., "OpenHands") |
Outputs
| Method | Return Type | Description |
|---|---|---|
| store_workflow_information() | Optional[WorkflowRunGroup] | Returns a WorkflowRunGroup if trigger conditions are met, None otherwise |
| clean_old_convos() | int | Number of stale records deleted |
| get_repo_id() | Optional[str] | The internal repository ID, or None if not found |
Usage Examples
Handling a CI/CD Webhook Event
from enterprise.storage.proactive_conversation_store import ProactiveConversationStore
store = ProactiveConversationStore(session=db_session)
# Resolve the repository
repo_id = store.get_repo_id(owner="All-Hands-AI", repo="OpenHands")
if repo_id:
# Record the workflow run and check if we should trigger a conversation
trigger = store.store_workflow_information(
repo_id=repo_id,
workflow_run_id="run-12345",
workflow_name="CI Tests",
status="failure",
branch="feature/new-thing",
commit_sha="abc123"
)
if trigger is not None:
# Conditions met -- create a proactive conversation
create_proactive_conversation(trigger)
Periodic Cleanup
# In a scheduled job:
deleted_count = store.clean_old_convos(retention_days=60)
logger.info(f"Cleaned up {deleted_count} stale workflow records")