Overview
Concrete tool for streaming multi-agent team execution results over WebSocket connections provided by the autogenstudio package, wrapping FastAPI WebSocket with team execution.
Description
The WebSocketManager class manages WebSocket connections and message streaming for team task execution. It tracks active connections, cancellation tokens, and per-run input queues. The start_stream() method orchestrates team execution by creating a TeamManager, streaming messages to the connected WebSocket, saving messages to the database, and handling run status updates. The run_websocket() route function handles the WebSocket lifecycle including connection establishment, authentication, message routing (start, stop, ping, input_response), and cleanup on disconnect.
Usage
Use WebSocketManager and run_websocket when you need real-time streaming of agent team execution to a browser-based playground. The WebSocketManager is instantiated as a singleton in the FastAPI application and shared across all WebSocket connections.
Code Reference
Source Location
- Repository: Microsoft AutoGen
- File:
python/packages/autogen-studio/autogenstudio/web/managers/connection.py (L85-L172 for start_stream)
- File:
python/packages/autogen-studio/autogenstudio/web/routes/ws.py (L21-L133 for run_websocket)
Signature
# WebSocketManager.start_stream (connection.py L85-L172)
class WebSocketManager:
def __init__(self, db_manager: DatabaseManager) -> None: ...
async def connect(self, websocket: WebSocket, run_id: int) -> bool: ...
async def start_stream(
self,
run_id: int,
task: str | ChatMessage | Sequence[ChatMessage] | None,
team_config: str | Path | Dict[str, Any] | ComponentModel,
) -> None: ...
async def stop_run(self, run_id: int, reason: str) -> None: ...
async def disconnect(self, run_id: int) -> None: ...
def create_input_func(self, run_id: int) -> Callable: ...
async def handle_input_response(self, run_id: int, response: str) -> None: ...
# run_websocket endpoint (ws.py L21-L133)
@router.websocket("/runs/{run_id}")
async def run_websocket(
websocket: WebSocket,
run_id: int,
ws_manager: WebSocketManager = Depends(get_websocket_manager),
db=Depends(get_db),
auth_manager=Depends(get_ws_auth_manager),
) -> None: ...
Import
from autogenstudio.web.managers.connection import WebSocketManager
from autogenstudio.web.routes.ws import router # FastAPI router with WebSocket endpoint
I/O Contract
Inputs (start_stream)
| Name |
Type |
Required |
Description
|
| run_id |
int |
Yes |
Database ID of the run to execute; must have an active WebSocket connection
|
| task |
ChatMessage | Sequence[ChatMessage] | None |
Yes |
The task to execute (text prompt or structured chat message(s))
|
| team_config |
Path | Dict[str, Any] | ComponentModel |
Yes |
Team configuration as a file path, dictionary, or ComponentModel
|
Inputs (run_websocket)
| Name |
Type |
Required |
Description
|
| websocket |
WebSocket |
Yes |
FastAPI WebSocket connection object
|
| run_id |
int |
Yes |
Database ID of the run (from URL path)
|
| ws_manager |
WebSocketManager |
Yes (injected) |
WebSocket manager singleton (FastAPI dependency injection)
|
| db |
DatabaseManager |
Yes (injected) |
Database manager (FastAPI dependency injection)
|
| auth_manager |
AuthManager or None |
Yes (injected) |
Authentication manager or None if auth is disabled
|
Outputs
| Name |
Type |
Description
|
| (WebSocket messages) |
JSON |
Agent messages, results, errors, and system events streamed to the client via WebSocket
|
| (database side effects) |
None |
Run status, messages, and team results are persisted to the database during execution
|
WebSocket Message Types (Server to Client)
| Type Field |
Description
|
| system |
Connection status notification (e.g., connected)
|
| message |
Agent message event (TextMessage, ToolCallRequestEvent, etc.)
|
| message_chunk |
Streaming token chunk from model client
|
| result |
Final TeamResult with task_result, usage, and duration
|
| input_request |
Request for human-in-the-loop input with prompt
|
| completion |
Run completed or cancelled notification
|
| error |
Error notification
|
WebSocket Message Types (Client to Server)
| Type Field |
Description
|
| start |
Begin team execution with task and team_config
|
| stop |
Cancel the running execution with optional reason
|
| ping |
Keep-alive ping (server responds with pong)
|
| input_response |
Human-in-the-loop response to an input_request
|
Usage Examples
Basic Example
# Client-side JavaScript example for connecting to the playground WebSocket
import json
import asyncio
import websockets
async def test_team():
uri = "ws://localhost:8081/api/ws/runs/42"
async with websockets.connect(uri) as ws:
# Wait for connection confirmation
msg = await ws.recv()
print(f"Connected: {msg}")
# Start the team execution
await ws.send(json.dumps({
"type": "start",
"task": "What is 25 * 17?",
"team_config": {
"provider": "autogen_agentchat.teams.RoundRobinGroupChat",
"component_type": "team",
"config": {
"participants": [...]
}
}
}))
# Stream messages
while True:
msg = json.loads(await ws.recv())
if msg["type"] == "message":
data = msg["data"]
print(f"[{data.get('source', 'unknown')}]: {data.get('content', '')}")
elif msg["type"] == "result":
print(f"Completed: {msg['data']}")
break
elif msg["type"] == "input_request":
# Handle human-in-the-loop input
user_input = input(msg.get("prompt", "> "))
await ws.send(json.dumps({
"type": "input_response",
"response": user_input,
}))
asyncio.run(test_team())
Related Pages
Implements Principle