Implementation:Microsoft Autogen GraphFlow Run Stream
| Knowledge Sources | |
|---|---|
| Domains | Multi-Agent Systems, Graph Execution, Concurrent Processing, Workflow Runtime, Streaming |
| Last Updated | 2026-02-11 00:00 GMT |
Overview
Concrete tool for executing a graph-directed agent workflow as an asynchronous stream of events and messages, provided by Microsoft AutoGen.
Description
GraphFlow.run_stream() is an asynchronous generator method inherited from BaseGroupChat that executes the graph-directed agent workflow and yields events as they occur. Internally, the GraphFlowManager orchestrates execution by maintaining a ready queue of nodes, evaluating edge conditions against agent outputs, and managing activation groups for fan-in and cyclic patterns.
The execution proceeds as follows:
- The task is converted to a TextMessage (if a string) and published to all participants via the group topic.
- The GraphFlowManager initializes its ready queue with the graph's start nodes.
- In each turn, the manager drains all ready nodes and returns them as the next speakers.
- All selected speakers execute concurrently. Their output messages are published to the group topic and yielded in the stream.
- After each agent completes, the manager evaluates outgoing edges from that agent's node. Satisfied edges decrement the remaining count for the target node's activation group.
- When a target node's activation requirements are met ("all" edges satisfied, or "any" single edge satisfied), it is added to the ready queue.
- For cyclic graphs, triggered activation groups are reset when a node is dequeued, allowing re-entry.
- Execution terminates when the ready queue is empty (natural graph completion), a termination condition is met, or max_turns is reached.
- The stream yields a TaskResult as its final item, containing the complete message history and stop reason.
The run() method is a convenience wrapper that consumes the stream and returns only the final TaskResult.
Usage
Use run_stream() when you need real-time streaming of agent events during graph execution, for example to render progress in a UI or to log events as they occur. Use run() when you only need the final result.
Code Reference
Source Location
- Repository: Microsoft AutoGen
- File:
python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_base_group_chat.py(Lines 351-453) for run_stream - File:
python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_graph/_digraph_group_chat.py(Lines 309-538) for GraphFlowManager
Signature
# Inherited from BaseGroupChat
async def run_stream(
self,
*,
task: str | BaseChatMessage | Sequence[BaseChatMessage] | None = None,
cancellation_token: CancellationToken | None = None,
output_task_messages: bool = True,
) -> AsyncGenerator[BaseAgentEvent | BaseChatMessage | TaskResult, None]:
# Convenience wrapper
async def run(
self,
*,
task: str | BaseChatMessage | Sequence[BaseChatMessage] | None = None,
cancellation_token: CancellationToken | None = None,
output_task_messages: bool = True,
) -> TaskResult:
Import
from autogen_agentchat.teams import GraphFlow
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| task | str, BaseChatMessage, Sequence[BaseChatMessage], or None | No | The task to run. If a string, it is wrapped in a TextMessage with source "user". If None, continues from the previous task state. |
| cancellation_token | CancellationToken or None | No | Token to cancel execution immediately. May leave the team in an inconsistent state. For graceful stopping, use ExternalTermination instead. |
| output_task_messages | bool | No | Whether to include task messages in the output stream and TaskResult. Defaults to True. |
Outputs (run_stream)
| Name | Type | Description |
|---|---|---|
| stream | AsyncGenerator[BaseAgentEvent or BaseChatMessage or TaskResult, None] | An asynchronous generator that yields agent events and messages as they occur. The final item is always a TaskResult. |
Outputs (run)
| Name | Type | Description |
|---|---|---|
| result | TaskResult | The final result containing all messages produced during execution and the stop reason. |
Stream Item Types
| Type | Description |
|---|---|
| BaseAgentEvent | Internal agent events such as tool call results and intermediate processing steps. Yielded in the stream but not included in TaskResult.messages (except for ModelClientStreamingChunkEvent which is excluded). |
| BaseChatMessage | Agent output messages that form the conversation history. Yielded in the stream and included in TaskResult.messages. |
| TaskResult | The final result, always the last item in the stream. Contains the complete message sequence and stop reason. |
Usage Examples
Basic Example: Streaming a Sequential Flow
import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
from autogen_ext.models.openai import OpenAIChatCompletionClient
async def main():
model_client = OpenAIChatCompletionClient(model="gpt-4.1-nano")
agent_a = AssistantAgent("A", model_client=model_client,
system_message="You are a helpful assistant.")
agent_b = AssistantAgent("B", model_client=model_client,
system_message="Translate input to Chinese.")
agent_c = AssistantAgent("C", model_client=model_client,
system_message="Translate input to English.")
builder = DiGraphBuilder()
builder.add_node(agent_a).add_node(agent_b).add_node(agent_c)
builder.add_edge(agent_a, agent_b).add_edge(agent_b, agent_c)
graph = builder.build()
team = GraphFlow(
participants=builder.get_participants(),
graph=graph,
termination_condition=MaxMessageTermination(5),
)
# Stream execution events in real time
async for event in team.run_stream(task="Write a short story about a cat."):
print(event)
asyncio.run(main())
Fan-Out Parallel Execution
import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
from autogen_ext.models.openai import OpenAIChatCompletionClient
async def main():
model_client = OpenAIChatCompletionClient(model="gpt-4.1-nano")
summarizer = AssistantAgent("summarizer", model_client=model_client,
system_message="Summarize the input.")
translator_cn = AssistantAgent("translator_cn", model_client=model_client,
system_message="Translate input to Chinese.")
translator_jp = AssistantAgent("translator_jp", model_client=model_client,
system_message="Translate input to Japanese.")
# Fan-out: summarizer -> (translator_cn, translator_jp) execute in parallel
builder = DiGraphBuilder()
builder.add_node(summarizer).add_node(translator_cn).add_node(translator_jp)
builder.add_edge(summarizer, translator_cn)
builder.add_edge(summarizer, translator_jp)
graph = builder.build()
team = GraphFlow(
participants=builder.get_participants(),
graph=graph,
termination_condition=MaxMessageTermination(5),
)
# Both translators execute concurrently after summarizer completes
async for event in team.run_stream(task="Explain quantum computing in simple terms."):
print(event)
asyncio.run(main())
Conditional Branching with Callable Conditions
import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
from autogen_ext.models.openai import OpenAIChatCompletionClient
async def main():
model_client = OpenAIChatCompletionClient(model="gpt-4.1-nano")
classifier = AssistantAgent(
"classifier", model_client=model_client,
system_message="Classify input language. Say 'chinese' if Chinese, else 'other'.",
)
cn_translator = AssistantAgent("cn_translator", model_client=model_client,
system_message="Translate Chinese to English.")
other_handler = AssistantAgent("other_handler", model_client=model_client,
system_message="Translate to Chinese.")
builder = DiGraphBuilder()
builder.add_node(classifier).add_node(cn_translator).add_node(other_handler)
builder.add_edge(classifier, cn_translator,
condition=lambda msg: "chinese" in msg.to_model_text())
builder.add_edge(classifier, other_handler,
condition=lambda msg: "chinese" not in msg.to_model_text())
graph = builder.build()
team = GraphFlow(
participants=builder.get_participants(),
graph=graph,
termination_condition=MaxMessageTermination(5),
)
# Only one branch executes based on classifier output
async for event in team.run_stream(task="Hello, how are you?"):
print(event)
asyncio.run(main())