Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Microsoft Autogen BaseGroupChat Run

From Leeroopedia
Knowledge Sources
Domains Multi-Agent Systems, Async Execution, Stream Processing, AI Agents, Result Aggregation
Last Updated 2026-02-11 00:00 GMT

Overview

Concrete tools for running multi-agent teams and rendering their output provided by Microsoft AutoGen.

Description

BaseGroupChat provides two execution methods shared by all team types (RoundRobinGroupChat, SelectorGroupChat, etc.):

  • run(): Executes the team asynchronously and returns a TaskResult containing the full conversation history and stop reason. Internally, it consumes the run_stream() generator and returns the final item.
  • run_stream(): Executes the team and yields messages as an async generator. Each yielded item is either a BaseAgentEvent, a BaseChatMessage, or (as the final item) a TaskResult. This enables real-time streaming of agent responses to UIs, logs, or other consumers.

Both methods accept a task parameter (string, message, or sequence of messages) and an optional cancellation_token. When task is None, the team resumes from its previous conversation state.

The companion Console() function consumes a message stream from run_stream() and renders it to the terminal with formatted agent names and message content. It optionally displays inline images (in iTerm2) and token usage statistics.

Key implementation details:

  • The runtime is lazily initialized on first run and reused for subsequent runs.
  • The termination condition is automatically reset after each run completes, allowing the team to be run again.
  • ModelClientStreamingChunkEvent messages are yielded in the stream for real-time display but are not included in the final TaskResult.messages.
  • Cancellation via CancellationToken is immediate but may leave the team in an inconsistent state. For graceful stopping, use ExternalTermination instead.

Usage

After assembling a team (e.g., RoundRobinGroupChat or SelectorGroupChat), call await team.run(task=...) for batch results or async for msg in team.run_stream(task=...) for streaming. Wrap run_stream() with await Console(...) for terminal rendering during development.

Code Reference

Source Location

  • Repository: Microsoft AutoGen
  • File (run/run_stream): python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_base_group_chat.py (lines 247-453)
  • File (Console): python/packages/autogen-agentchat/src/autogen_agentchat/ui/_console.py (lines 82-88)

Signature

class BaseGroupChat:
    async def run(
        self,
        *,
        task: str | BaseChatMessage | Sequence[BaseChatMessage] | None = None,
        cancellation_token: CancellationToken | None = None,
        output_task_messages: bool = True,
    ) -> TaskResult:
        ...

    async def run_stream(
        self,
        *,
        task: str | BaseChatMessage | Sequence[BaseChatMessage] | None = None,
        cancellation_token: CancellationToken | None = None,
        output_task_messages: bool = True,
    ) -> AsyncGenerator[BaseAgentEvent | BaseChatMessage | TaskResult, None]:
        ...

async def Console(
    stream: AsyncGenerator[BaseAgentEvent | BaseChatMessage | T, None],
    *,
    no_inline_images: bool = False,
    output_stats: bool = False,
    user_input_manager: UserInputManager | None = None,
) -> T:
    ...

Import

from autogen_agentchat.ui import Console

Note: run() and run_stream() are instance methods on team classes (e.g., RoundRobinGroupChat, SelectorGroupChat), which inherit from BaseGroupChat.

I/O Contract

Inputs

run() and run_stream():

Name Type Required Description
task str or BaseChatMessage or Sequence[BaseChatMessage] or None No The task to run. Strings are converted to TextMessage. None resumes the previous conversation.
cancellation_token CancellationToken or None No Token for immediate cancellation. May leave team in inconsistent state. Use ExternalTermination for graceful stopping.
output_task_messages bool No Whether to include the initial task messages in the output stream. Defaults to True.

Console():

Name Type Required Description
stream AsyncGenerator Yes Message stream from run_stream() or on_messages_stream().
no_inline_images bool No Disable inline image rendering in iTerm2. Defaults to False.
output_stats bool No Display token usage statistics after completion. Experimental. Defaults to False.
user_input_manager UserInputManager or None No Manager for handling user input requests during conversation.

Outputs

run():

Name Type Description
result TaskResult Contains messages (List[BaseAgentEvent or BaseChatMessage]) with the full conversation history, and stop_reason (str or None) describing why the conversation ended.

run_stream():

Name Type Description
yielded items BaseAgentEvent or BaseChatMessage Individual messages produced by agents during the conversation.
final item TaskResult The last item yielded, containing the full conversation history and stop reason.

Console():

Name Type Description
result TaskResult or Response The final result from the consumed stream, passed through after rendering all messages to the terminal.

Usage Examples

Basic Example

import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_ext.models.openai import OpenAIChatCompletionClient

async def main():
    model_client = OpenAIChatCompletionClient(model="gpt-4o")

    agent1 = AssistantAgent("Assistant1", model_client=model_client)
    agent2 = AssistantAgent("Assistant2", model_client=model_client)
    termination = MaxMessageTermination(3)
    team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination)

    # Batch execution: get the final result
    result = await team.run(task="Count from 1 to 10, respond one at a time.")
    print(result)

    # Run again to continue the previous conversation
    result = await team.run()
    print(result)

asyncio.run(main())

Streaming with Console

import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import TextMentionTermination
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient

async def main():
    model_client = OpenAIChatCompletionClient(model="gpt-4o")

    agent1 = AssistantAgent("Writer", model_client=model_client)
    agent2 = AssistantAgent("Critic", model_client=model_client)
    team = RoundRobinGroupChat(
        [agent1, agent2],
        termination_condition=TextMentionTermination("TERMINATE"),
    )

    # Stream to console with token usage stats
    result = await Console(
        team.run_stream(task="Write a short poem about the ocean."),
        output_stats=True,
    )
    print(f"\nStop reason: {result.stop_reason}")

asyncio.run(main())

Manual Stream Processing

import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.base import TaskResult
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_ext.models.openai import OpenAIChatCompletionClient

async def main():
    model_client = OpenAIChatCompletionClient(model="gpt-4o")

    agent1 = AssistantAgent("Agent1", model_client=model_client)
    agent2 = AssistantAgent("Agent2", model_client=model_client)
    team = RoundRobinGroupChat(
        [agent1, agent2],
        termination_condition=MaxMessageTermination(4),
    )

    # Process each message individually
    stream = team.run_stream(task="Debate the merits of static vs dynamic typing.")
    async for message in stream:
        if isinstance(message, TaskResult):
            print(f"\n--- Conversation ended: {message.stop_reason} ---")
            print(f"Total messages: {len(message.messages)}")
        else:
            print(f"[{message.source}]: {message.to_text()[:100]}")

asyncio.run(main())

With Cancellation Token

import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.ui import Console
from autogen_core import CancellationToken
from autogen_ext.models.openai import OpenAIChatCompletionClient

async def main():
    model_client = OpenAIChatCompletionClient(model="gpt-4o")

    agent1 = AssistantAgent("Agent1", model_client=model_client)
    agent2 = AssistantAgent("Agent2", model_client=model_client)
    team = RoundRobinGroupChat(
        [agent1, agent2],
        termination_condition=MaxMessageTermination(100),
    )

    cancellation_token = CancellationToken()

    # Run in background and cancel after 5 seconds
    run_task = asyncio.create_task(
        Console(
            team.run_stream(
                task="Discuss the future of AI.",
                cancellation_token=cancellation_token,
            )
        )
    )

    await asyncio.sleep(5)
    cancellation_token.cancel()

    try:
        await run_task
    except asyncio.CancelledError:
        print("Conversation was cancelled.")

asyncio.run(main())

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment