Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Microsoft Autogen WebSocketManager Stream

From Leeroopedia
Revision as of 11:34, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/Microsoft_Autogen_WebSocketManager_Stream.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)
Knowledge Sources
Domains Agent Frameworks, Real-time Streaming, WebSocket Communication, Interactive Testing
Last Updated 2026-02-11 00:00 GMT

Overview

Concrete tool for streaming multi-agent team execution results over WebSocket connections provided by the autogenstudio package, wrapping FastAPI WebSocket with team execution.

Description

The WebSocketManager class manages WebSocket connections and message streaming for team task execution. It tracks active connections, cancellation tokens, and per-run input queues. The start_stream() method orchestrates team execution by creating a TeamManager, streaming messages to the connected WebSocket, saving messages to the database, and handling run status updates. The run_websocket() route function handles the WebSocket lifecycle including connection establishment, authentication, message routing (start, stop, ping, input_response), and cleanup on disconnect.

Usage

Use WebSocketManager and run_websocket when you need real-time streaming of agent team execution to a browser-based playground. The WebSocketManager is instantiated as a singleton in the FastAPI application and shared across all WebSocket connections.

Code Reference

Source Location

  • Repository: Microsoft AutoGen
  • File: python/packages/autogen-studio/autogenstudio/web/managers/connection.py (L85-L172 for start_stream)
  • File: python/packages/autogen-studio/autogenstudio/web/routes/ws.py (L21-L133 for run_websocket)

Signature

# WebSocketManager.start_stream (connection.py L85-L172)
class WebSocketManager:
    def __init__(self, db_manager: DatabaseManager) -> None: ...

    async def connect(self, websocket: WebSocket, run_id: int) -> bool: ...

    async def start_stream(
        self,
        run_id: int,
        task: str | ChatMessage | Sequence[ChatMessage] | None,
        team_config: str | Path | Dict[str, Any] | ComponentModel,
    ) -> None: ...

    async def stop_run(self, run_id: int, reason: str) -> None: ...

    async def disconnect(self, run_id: int) -> None: ...

    def create_input_func(self, run_id: int) -> Callable: ...

    async def handle_input_response(self, run_id: int, response: str) -> None: ...

# run_websocket endpoint (ws.py L21-L133)
@router.websocket("/runs/{run_id}")
async def run_websocket(
    websocket: WebSocket,
    run_id: int,
    ws_manager: WebSocketManager = Depends(get_websocket_manager),
    db=Depends(get_db),
    auth_manager=Depends(get_ws_auth_manager),
) -> None: ...

Import

from autogenstudio.web.managers.connection import WebSocketManager
from autogenstudio.web.routes.ws import router  # FastAPI router with WebSocket endpoint

I/O Contract

Inputs (start_stream)

Name Type Required Description
run_id int Yes Database ID of the run to execute; must have an active WebSocket connection
task ChatMessage | Sequence[ChatMessage] | None Yes The task to execute (text prompt or structured chat message(s))
team_config Path | Dict[str, Any] | ComponentModel Yes Team configuration as a file path, dictionary, or ComponentModel

Inputs (run_websocket)

Name Type Required Description
websocket WebSocket Yes FastAPI WebSocket connection object
run_id int Yes Database ID of the run (from URL path)
ws_manager WebSocketManager Yes (injected) WebSocket manager singleton (FastAPI dependency injection)
db DatabaseManager Yes (injected) Database manager (FastAPI dependency injection)
auth_manager AuthManager or None Yes (injected) Authentication manager or None if auth is disabled

Outputs

Name Type Description
(WebSocket messages) JSON Agent messages, results, errors, and system events streamed to the client via WebSocket
(database side effects) None Run status, messages, and team results are persisted to the database during execution

WebSocket Message Types (Server to Client)

Type Field Description
system Connection status notification (e.g., connected)
message Agent message event (TextMessage, ToolCallRequestEvent, etc.)
message_chunk Streaming token chunk from model client
result Final TeamResult with task_result, usage, and duration
input_request Request for human-in-the-loop input with prompt
completion Run completed or cancelled notification
error Error notification

WebSocket Message Types (Client to Server)

Type Field Description
start Begin team execution with task and team_config
stop Cancel the running execution with optional reason
ping Keep-alive ping (server responds with pong)
input_response Human-in-the-loop response to an input_request

Usage Examples

Basic Example

# Client-side JavaScript example for connecting to the playground WebSocket
import json
import asyncio
import websockets

async def test_team():
    uri = "ws://localhost:8081/api/ws/runs/42"

    async with websockets.connect(uri) as ws:
        # Wait for connection confirmation
        msg = await ws.recv()
        print(f"Connected: {msg}")

        # Start the team execution
        await ws.send(json.dumps({
            "type": "start",
            "task": "What is 25 * 17?",
            "team_config": {
                "provider": "autogen_agentchat.teams.RoundRobinGroupChat",
                "component_type": "team",
                "config": {
                    "participants": [...]
                }
            }
        }))

        # Stream messages
        while True:
            msg = json.loads(await ws.recv())
            if msg["type"] == "message":
                data = msg["data"]
                print(f"[{data.get('source', 'unknown')}]: {data.get('content', '')}")
            elif msg["type"] == "result":
                print(f"Completed: {msg['data']}")
                break
            elif msg["type"] == "input_request":
                # Handle human-in-the-loop input
                user_input = input(msg.get("prompt", "> "))
                await ws.send(json.dumps({
                    "type": "input_response",
                    "response": user_input,
                }))

asyncio.run(test_team())

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment