Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Microsoft Autogen Swarm Run Stream

From Leeroopedia
Knowledge Sources
Domains Multi-Agent Systems, Streaming Execution, Event-Driven Architecture, Swarm Workflows
Last Updated 2026-02-11 00:00 GMT

Overview

Concrete tool for executing a swarm conversation as a streaming async generator that yields agent events, chat messages, and HandoffMessage transitions, provided by Microsoft AutoGen.

Description

The run_stream method on Swarm (inherited from BaseGroupChat) initiates a swarm conversation and returns an AsyncGenerator that yields events in real time. The method accepts a task (string, message, or sequence of messages), converts it to the internal message format, and drives the swarm's execution loop.

The stream produces three categories of items:

  • BaseAgentEvent instances: tool calls, model streaming chunks, and other agent-internal events.
  • BaseChatMessage instances: TextMessage for regular responses, HandoffMessage for agent transitions, ToolCallSummaryMessage for tool results.
  • TaskResult: the final item, containing the complete list of messages (excluding streaming chunks) and the stop reason.

The method is stateful: the team remembers its conversation history across calls. Calling run_stream() again without a task continues from where the previous execution stopped. This is critical for the human-in-the-loop pattern where a swarm is paused via HandoffTermination, the user provides input, and the swarm is resumed with a new HandoffMessage task.

The output_task_messages parameter controls whether the initial task messages are included in the output stream. When set to True (default), task messages appear at the beginning of the stream.

Usage

Import Swarm and call run_stream() to execute swarm workflows with real-time event streaming. Use async for to consume events. Combine with Console for formatted terminal output. Use HandoffMessage detection in the stream for custom UI updates or logging.

Code Reference

Source Location

  • Repository: Microsoft AutoGen
  • File: python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_base_group_chat.py (Lines 351-453)

Signature

async def run_stream(
    self,
    *,
    task: str | BaseChatMessage | Sequence[BaseChatMessage] | None = None,
    cancellation_token: CancellationToken | None = None,
    output_task_messages: bool = True,
) -> AsyncGenerator[BaseAgentEvent | BaseChatMessage | TaskResult, None]

Import

from autogen_agentchat.teams import Swarm

I/O Contract

Inputs

Name Type Required Description
task BaseChatMessage | Sequence[BaseChatMessage] | None No The task to execute. Strings are converted to TextMessage(source="user"). A HandoffMessage can be passed to resume after a handoff pause. None continues from the previous state.
cancellation_token None No Token to abort execution immediately. May leave the team in an inconsistent state. Use ExternalTermination for graceful stopping.
output_task_messages bool No Whether to include the initial task messages in the output stream. Defaults to True.

Outputs

Name Type Description
stream BaseChatMessage | TaskResult, None] An async generator yielding events in real time. BaseAgentEvent for agent-internal events, BaseChatMessage (including HandoffMessage) for conversation messages, and TaskResult as the final item.

Usage Examples

Basic Example

import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import Swarm
from autogen_agentchat.conditions import HandoffTermination, MaxMessageTermination
from autogen_agentchat.messages import HandoffMessage
from autogen_agentchat.base import TaskResult
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient


async def main() -> None:
    model_client = OpenAIChatCompletionClient(model="gpt-4o")

    agent = AssistantAgent(
        "Alice",
        model_client=model_client,
        handoffs=["user"],
        system_message=(
            "You are Alice. Help with questions. "
            "Ask the user for clarification if needed."
        ),
    )

    termination = HandoffTermination(target="user") | MaxMessageTermination(5)
    team = Swarm([agent], termination_condition=termination)

    # Stream events and detect handoffs
    stream = team.run_stream(task="What is the weather today?")
    async for event in stream:
        if isinstance(event, HandoffMessage):
            print(f"[HANDOFF] {event.source} -> {event.target}: {event.content}")
        elif isinstance(event, TaskResult):
            print(f"[DONE] Stop reason: {event.stop_reason}")
            print(f"[DONE] Total messages: {len(event.messages)}")
        else:
            print(f"[{event.source}] {event}")

    # Resume after user provides input
    stream = team.run_stream(
        task=HandoffMessage(
            source="user",
            target="Alice",
            content="I am in San Francisco.",
        )
    )
    async for event in stream:
        if isinstance(event, TaskResult):
            print(f"[DONE] Stop reason: {event.stop_reason}")
        else:
            print(f"[{event.source}] {event}")


asyncio.run(main())

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment