Implementation:Microsoft Autogen BaseGroupChat Run
| Knowledge Sources | |
|---|---|
| Domains | Multi-Agent Systems, Async Execution, Stream Processing, AI Agents, Result Aggregation |
| Last Updated | 2026-02-11 00:00 GMT |
Overview
Concrete tools for running multi-agent teams and rendering their output provided by Microsoft AutoGen.
Description
BaseGroupChat provides two execution methods shared by all team types (RoundRobinGroupChat, SelectorGroupChat, etc.):
run(): Executes the team asynchronously and returns aTaskResultcontaining the full conversation history and stop reason. Internally, it consumes therun_stream()generator and returns the final item.
run_stream(): Executes the team and yields messages as an async generator. Each yielded item is either aBaseAgentEvent, aBaseChatMessage, or (as the final item) aTaskResult. This enables real-time streaming of agent responses to UIs, logs, or other consumers.
Both methods accept a task parameter (string, message, or sequence of messages) and an optional cancellation_token. When task is None, the team resumes from its previous conversation state.
The companion Console() function consumes a message stream from run_stream() and renders it to the terminal with formatted agent names and message content. It optionally displays inline images (in iTerm2) and token usage statistics.
Key implementation details:
- The runtime is lazily initialized on first run and reused for subsequent runs.
- The termination condition is automatically reset after each run completes, allowing the team to be run again.
ModelClientStreamingChunkEventmessages are yielded in the stream for real-time display but are not included in the finalTaskResult.messages.- Cancellation via
CancellationTokenis immediate but may leave the team in an inconsistent state. For graceful stopping, useExternalTerminationinstead.
Usage
After assembling a team (e.g., RoundRobinGroupChat or SelectorGroupChat), call await team.run(task=...) for batch results or async for msg in team.run_stream(task=...) for streaming. Wrap run_stream() with await Console(...) for terminal rendering during development.
Code Reference
Source Location
- Repository: Microsoft AutoGen
- File (run/run_stream):
python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_base_group_chat.py(lines 247-453) - File (Console):
python/packages/autogen-agentchat/src/autogen_agentchat/ui/_console.py(lines 82-88)
Signature
class BaseGroupChat:
async def run(
self,
*,
task: str | BaseChatMessage | Sequence[BaseChatMessage] | None = None,
cancellation_token: CancellationToken | None = None,
output_task_messages: bool = True,
) -> TaskResult:
...
async def run_stream(
self,
*,
task: str | BaseChatMessage | Sequence[BaseChatMessage] | None = None,
cancellation_token: CancellationToken | None = None,
output_task_messages: bool = True,
) -> AsyncGenerator[BaseAgentEvent | BaseChatMessage | TaskResult, None]:
...
async def Console(
stream: AsyncGenerator[BaseAgentEvent | BaseChatMessage | T, None],
*,
no_inline_images: bool = False,
output_stats: bool = False,
user_input_manager: UserInputManager | None = None,
) -> T:
...
Import
from autogen_agentchat.ui import Console
Note: run() and run_stream() are instance methods on team classes (e.g., RoundRobinGroupChat, SelectorGroupChat), which inherit from BaseGroupChat.
I/O Contract
Inputs
run() and run_stream():
| Name | Type | Required | Description |
|---|---|---|---|
| task | str or BaseChatMessage or Sequence[BaseChatMessage] or None | No | The task to run. Strings are converted to TextMessage. None resumes the previous conversation. |
| cancellation_token | CancellationToken or None | No | Token for immediate cancellation. May leave team in inconsistent state. Use ExternalTermination for graceful stopping. |
| output_task_messages | bool | No | Whether to include the initial task messages in the output stream. Defaults to True. |
Console():
| Name | Type | Required | Description |
|---|---|---|---|
| stream | AsyncGenerator | Yes | Message stream from run_stream() or on_messages_stream(). |
| no_inline_images | bool | No | Disable inline image rendering in iTerm2. Defaults to False. |
| output_stats | bool | No | Display token usage statistics after completion. Experimental. Defaults to False. |
| user_input_manager | UserInputManager or None | No | Manager for handling user input requests during conversation. |
Outputs
run():
| Name | Type | Description |
|---|---|---|
| result | TaskResult | Contains messages (List[BaseAgentEvent or BaseChatMessage]) with the full conversation history, and stop_reason (str or None) describing why the conversation ended. |
run_stream():
| Name | Type | Description |
|---|---|---|
| yielded items | BaseAgentEvent or BaseChatMessage | Individual messages produced by agents during the conversation. |
| final item | TaskResult | The last item yielded, containing the full conversation history and stop reason. |
Console():
| Name | Type | Description |
|---|---|---|
| result | TaskResult or Response | The final result from the consumed stream, passed through after rendering all messages to the terminal. |
Usage Examples
Basic Example
import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_ext.models.openai import OpenAIChatCompletionClient
async def main():
model_client = OpenAIChatCompletionClient(model="gpt-4o")
agent1 = AssistantAgent("Assistant1", model_client=model_client)
agent2 = AssistantAgent("Assistant2", model_client=model_client)
termination = MaxMessageTermination(3)
team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination)
# Batch execution: get the final result
result = await team.run(task="Count from 1 to 10, respond one at a time.")
print(result)
# Run again to continue the previous conversation
result = await team.run()
print(result)
asyncio.run(main())
Streaming with Console
import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import TextMentionTermination
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient
async def main():
model_client = OpenAIChatCompletionClient(model="gpt-4o")
agent1 = AssistantAgent("Writer", model_client=model_client)
agent2 = AssistantAgent("Critic", model_client=model_client)
team = RoundRobinGroupChat(
[agent1, agent2],
termination_condition=TextMentionTermination("TERMINATE"),
)
# Stream to console with token usage stats
result = await Console(
team.run_stream(task="Write a short poem about the ocean."),
output_stats=True,
)
print(f"\nStop reason: {result.stop_reason}")
asyncio.run(main())
Manual Stream Processing
import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.base import TaskResult
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_ext.models.openai import OpenAIChatCompletionClient
async def main():
model_client = OpenAIChatCompletionClient(model="gpt-4o")
agent1 = AssistantAgent("Agent1", model_client=model_client)
agent2 = AssistantAgent("Agent2", model_client=model_client)
team = RoundRobinGroupChat(
[agent1, agent2],
termination_condition=MaxMessageTermination(4),
)
# Process each message individually
stream = team.run_stream(task="Debate the merits of static vs dynamic typing.")
async for message in stream:
if isinstance(message, TaskResult):
print(f"\n--- Conversation ended: {message.stop_reason} ---")
print(f"Total messages: {len(message.messages)}")
else:
print(f"[{message.source}]: {message.to_text()[:100]}")
asyncio.run(main())
With Cancellation Token
import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.ui import Console
from autogen_core import CancellationToken
from autogen_ext.models.openai import OpenAIChatCompletionClient
async def main():
model_client = OpenAIChatCompletionClient(model="gpt-4o")
agent1 = AssistantAgent("Agent1", model_client=model_client)
agent2 = AssistantAgent("Agent2", model_client=model_client)
team = RoundRobinGroupChat(
[agent1, agent2],
termination_condition=MaxMessageTermination(100),
)
cancellation_token = CancellationToken()
# Run in background and cancel after 5 seconds
run_task = asyncio.create_task(
Console(
team.run_stream(
task="Discuss the future of AI.",
cancellation_token=cancellation_token,
)
)
)
await asyncio.sleep(5)
cancellation_token.cancel()
try:
await run_task
except asyncio.CancelledError:
print("Conversation was cancelled.")
asyncio.run(main())