Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Microsoft Autogen GraphFlow Run Stream

From Leeroopedia
Knowledge Sources
Domains Multi-Agent Systems, Graph Execution, Concurrent Processing, Workflow Runtime, Streaming
Last Updated 2026-02-11 00:00 GMT

Overview

Concrete tool for executing a graph-directed agent workflow as an asynchronous stream of events and messages, provided by Microsoft AutoGen.

Description

GraphFlow.run_stream() is an asynchronous generator method inherited from BaseGroupChat that executes the graph-directed agent workflow and yields events as they occur. Internally, the GraphFlowManager orchestrates execution by maintaining a ready queue of nodes, evaluating edge conditions against agent outputs, and managing activation groups for fan-in and cyclic patterns.

The execution proceeds as follows:

  1. The task is converted to a TextMessage (if a string) and published to all participants via the group topic.
  2. The GraphFlowManager initializes its ready queue with the graph's start nodes.
  3. In each turn, the manager drains all ready nodes and returns them as the next speakers.
  4. All selected speakers execute concurrently. Their output messages are published to the group topic and yielded in the stream.
  5. After each agent completes, the manager evaluates outgoing edges from that agent's node. Satisfied edges decrement the remaining count for the target node's activation group.
  6. When a target node's activation requirements are met ("all" edges satisfied, or "any" single edge satisfied), it is added to the ready queue.
  7. For cyclic graphs, triggered activation groups are reset when a node is dequeued, allowing re-entry.
  8. Execution terminates when the ready queue is empty (natural graph completion), a termination condition is met, or max_turns is reached.
  9. The stream yields a TaskResult as its final item, containing the complete message history and stop reason.

The run() method is a convenience wrapper that consumes the stream and returns only the final TaskResult.

Usage

Use run_stream() when you need real-time streaming of agent events during graph execution, for example to render progress in a UI or to log events as they occur. Use run() when you only need the final result.

Code Reference

Source Location

  • Repository: Microsoft AutoGen
  • File: python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_base_group_chat.py (Lines 351-453) for run_stream
  • File: python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_graph/_digraph_group_chat.py (Lines 309-538) for GraphFlowManager

Signature

# Inherited from BaseGroupChat
async def run_stream(
    self,
    *,
    task: str | BaseChatMessage | Sequence[BaseChatMessage] | None = None,
    cancellation_token: CancellationToken | None = None,
    output_task_messages: bool = True,
) -> AsyncGenerator[BaseAgentEvent | BaseChatMessage | TaskResult, None]:

# Convenience wrapper
async def run(
    self,
    *,
    task: str | BaseChatMessage | Sequence[BaseChatMessage] | None = None,
    cancellation_token: CancellationToken | None = None,
    output_task_messages: bool = True,
) -> TaskResult:

Import

from autogen_agentchat.teams import GraphFlow

I/O Contract

Inputs

Name Type Required Description
task str, BaseChatMessage, Sequence[BaseChatMessage], or None No The task to run. If a string, it is wrapped in a TextMessage with source "user". If None, continues from the previous task state.
cancellation_token CancellationToken or None No Token to cancel execution immediately. May leave the team in an inconsistent state. For graceful stopping, use ExternalTermination instead.
output_task_messages bool No Whether to include task messages in the output stream and TaskResult. Defaults to True.

Outputs (run_stream)

Name Type Description
stream AsyncGenerator[BaseAgentEvent or BaseChatMessage or TaskResult, None] An asynchronous generator that yields agent events and messages as they occur. The final item is always a TaskResult.

Outputs (run)

Name Type Description
result TaskResult The final result containing all messages produced during execution and the stop reason.

Stream Item Types

Type Description
BaseAgentEvent Internal agent events such as tool call results and intermediate processing steps. Yielded in the stream but not included in TaskResult.messages (except for ModelClientStreamingChunkEvent which is excluded).
BaseChatMessage Agent output messages that form the conversation history. Yielded in the stream and included in TaskResult.messages.
TaskResult The final result, always the last item in the stream. Contains the complete message sequence and stop reason.

Usage Examples

Basic Example: Streaming a Sequential Flow

import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
from autogen_ext.models.openai import OpenAIChatCompletionClient


async def main():
    model_client = OpenAIChatCompletionClient(model="gpt-4.1-nano")

    agent_a = AssistantAgent("A", model_client=model_client,
                              system_message="You are a helpful assistant.")
    agent_b = AssistantAgent("B", model_client=model_client,
                              system_message="Translate input to Chinese.")
    agent_c = AssistantAgent("C", model_client=model_client,
                              system_message="Translate input to English.")

    builder = DiGraphBuilder()
    builder.add_node(agent_a).add_node(agent_b).add_node(agent_c)
    builder.add_edge(agent_a, agent_b).add_edge(agent_b, agent_c)
    graph = builder.build()

    team = GraphFlow(
        participants=builder.get_participants(),
        graph=graph,
        termination_condition=MaxMessageTermination(5),
    )

    # Stream execution events in real time
    async for event in team.run_stream(task="Write a short story about a cat."):
        print(event)


asyncio.run(main())

Fan-Out Parallel Execution

import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
from autogen_ext.models.openai import OpenAIChatCompletionClient


async def main():
    model_client = OpenAIChatCompletionClient(model="gpt-4.1-nano")

    summarizer = AssistantAgent("summarizer", model_client=model_client,
                                 system_message="Summarize the input.")
    translator_cn = AssistantAgent("translator_cn", model_client=model_client,
                                    system_message="Translate input to Chinese.")
    translator_jp = AssistantAgent("translator_jp", model_client=model_client,
                                    system_message="Translate input to Japanese.")

    # Fan-out: summarizer -> (translator_cn, translator_jp) execute in parallel
    builder = DiGraphBuilder()
    builder.add_node(summarizer).add_node(translator_cn).add_node(translator_jp)
    builder.add_edge(summarizer, translator_cn)
    builder.add_edge(summarizer, translator_jp)
    graph = builder.build()

    team = GraphFlow(
        participants=builder.get_participants(),
        graph=graph,
        termination_condition=MaxMessageTermination(5),
    )

    # Both translators execute concurrently after summarizer completes
    async for event in team.run_stream(task="Explain quantum computing in simple terms."):
        print(event)


asyncio.run(main())

Conditional Branching with Callable Conditions

import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
from autogen_ext.models.openai import OpenAIChatCompletionClient


async def main():
    model_client = OpenAIChatCompletionClient(model="gpt-4.1-nano")

    classifier = AssistantAgent(
        "classifier", model_client=model_client,
        system_message="Classify input language. Say 'chinese' if Chinese, else 'other'.",
    )
    cn_translator = AssistantAgent("cn_translator", model_client=model_client,
                                    system_message="Translate Chinese to English.")
    other_handler = AssistantAgent("other_handler", model_client=model_client,
                                    system_message="Translate to Chinese.")

    builder = DiGraphBuilder()
    builder.add_node(classifier).add_node(cn_translator).add_node(other_handler)
    builder.add_edge(classifier, cn_translator,
                     condition=lambda msg: "chinese" in msg.to_model_text())
    builder.add_edge(classifier, other_handler,
                     condition=lambda msg: "chinese" not in msg.to_model_text())
    graph = builder.build()

    team = GraphFlow(
        participants=builder.get_participants(),
        graph=graph,
        termination_condition=MaxMessageTermination(5),
    )

    # Only one branch executes based on classifier output
    async for event in team.run_stream(task="Hello, how are you?"):
        print(event)


asyncio.run(main())

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment