Implementation:Microsoft Autogen Swarm Run Stream
| Knowledge Sources | |
|---|---|
| Domains | Multi-Agent Systems, Streaming Execution, Event-Driven Architecture, Swarm Workflows |
| Last Updated | 2026-02-11 00:00 GMT |
Overview
Concrete tool for executing a swarm conversation as a streaming async generator that yields agent events, chat messages, and HandoffMessage transitions, provided by Microsoft AutoGen.
Description
The run_stream method on Swarm (inherited from BaseGroupChat) initiates a swarm conversation and returns an AsyncGenerator that yields events in real time. The method accepts a task (string, message, or sequence of messages), converts it to the internal message format, and drives the swarm's execution loop.
The stream produces three categories of items:
- BaseAgentEvent instances: tool calls, model streaming chunks, and other agent-internal events.
- BaseChatMessage instances: TextMessage for regular responses, HandoffMessage for agent transitions, ToolCallSummaryMessage for tool results.
- TaskResult: the final item, containing the complete list of messages (excluding streaming chunks) and the stop reason.
The method is stateful: the team remembers its conversation history across calls. Calling run_stream() again without a task continues from where the previous execution stopped. This is critical for the human-in-the-loop pattern where a swarm is paused via HandoffTermination, the user provides input, and the swarm is resumed with a new HandoffMessage task.
The output_task_messages parameter controls whether the initial task messages are included in the output stream. When set to True (default), task messages appear at the beginning of the stream.
Usage
Import Swarm and call run_stream() to execute swarm workflows with real-time event streaming. Use async for to consume events. Combine with Console for formatted terminal output. Use HandoffMessage detection in the stream for custom UI updates or logging.
Code Reference
Source Location
- Repository: Microsoft AutoGen
- File:
python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_base_group_chat.py(Lines 351-453)
Signature
async def run_stream(
self,
*,
task: str | BaseChatMessage | Sequence[BaseChatMessage] | None = None,
cancellation_token: CancellationToken | None = None,
output_task_messages: bool = True,
) -> AsyncGenerator[BaseAgentEvent | BaseChatMessage | TaskResult, None]
Import
from autogen_agentchat.teams import Swarm
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| task | BaseChatMessage | Sequence[BaseChatMessage] | None | No | The task to execute. Strings are converted to TextMessage(source="user"). A HandoffMessage can be passed to resume after a handoff pause. None continues from the previous state. |
| cancellation_token | None | No | Token to abort execution immediately. May leave the team in an inconsistent state. Use ExternalTermination for graceful stopping. |
| output_task_messages | bool | No | Whether to include the initial task messages in the output stream. Defaults to True. |
Outputs
| Name | Type | Description |
|---|---|---|
| stream | BaseChatMessage | TaskResult, None] | An async generator yielding events in real time. BaseAgentEvent for agent-internal events, BaseChatMessage (including HandoffMessage) for conversation messages, and TaskResult as the final item. |
Usage Examples
Basic Example
import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import Swarm
from autogen_agentchat.conditions import HandoffTermination, MaxMessageTermination
from autogen_agentchat.messages import HandoffMessage
from autogen_agentchat.base import TaskResult
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient
async def main() -> None:
model_client = OpenAIChatCompletionClient(model="gpt-4o")
agent = AssistantAgent(
"Alice",
model_client=model_client,
handoffs=["user"],
system_message=(
"You are Alice. Help with questions. "
"Ask the user for clarification if needed."
),
)
termination = HandoffTermination(target="user") | MaxMessageTermination(5)
team = Swarm([agent], termination_condition=termination)
# Stream events and detect handoffs
stream = team.run_stream(task="What is the weather today?")
async for event in stream:
if isinstance(event, HandoffMessage):
print(f"[HANDOFF] {event.source} -> {event.target}: {event.content}")
elif isinstance(event, TaskResult):
print(f"[DONE] Stop reason: {event.stop_reason}")
print(f"[DONE] Total messages: {len(event.messages)}")
else:
print(f"[{event.source}] {event}")
# Resume after user provides input
stream = team.run_stream(
task=HandoffMessage(
source="user",
target="Alice",
content="I am in San Francisco.",
)
)
async for event in stream:
if isinstance(event, TaskResult):
print(f"[DONE] Stop reason: {event.stop_reason}")
else:
print(f"[{event.source}] {event}")
asyncio.run(main())