Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Microsoft Autogen AssistantAgent On Messages Stream

From Leeroopedia
Knowledge Sources
Domains Tool Use, Agent Execution, LLM Agents, Iterative Reasoning
Last Updated 2026-02-11 00:00 GMT

Overview

Concrete implementation of the tool execution loop within AssistantAgent provided by Microsoft AutoGen, processing messages through iterative LLM inference and tool calling via the on_messages_stream method and the internal _process_model_result method.

Description

The on_messages_stream method is the core entry point for processing messages in an AssistantAgent. It implements the full tool execution loop as an async generator that yields events and messages as they occur.

The method proceeds through five major steps:

  1. Context setup: Add incoming messages to the model context and update context with any relevant memory.
  2. Initial inference: Call the LLM with the current context and tool schemas. The LLM produces either a text response or tool call requests.
  3. Tool execution loop: If the LLM requested tool calls, enter the _process_model_result loop that iterates up to max_tool_iterations times:
    • Emit a ToolCallRequestEvent with the requested function calls.
    • Execute all tool calls concurrently via asyncio.gather, streaming any sub-events from tool execution.
    • Emit a ToolCallExecutionEvent with the results.
    • Check for handoffs (if a tool call triggers a handoff, terminate immediately).
    • If not the last iteration, call the LLM again with the updated context (including tool results) to see if more tool calls are needed.
  4. Post-loop processing: After the loop ends, either reflect on tool results (additional LLM call without tools) or format a summary using the configured template.
  5. Response assembly: Yield a Response object containing the final chat message and all inner messages (the full audit trail of events).

The method supports both streaming and non-streaming LLM calls, controlled by the model_client_stream configuration. During streaming, ModelClientStreamingChunkEvent events are yielded as the LLM generates tokens.

Usage

This method is called internally when the agent participates in a team or when run()/run_stream() is invoked. Consume the async generator to receive real-time events during tool execution. Use this for building UIs that display tool call progress, or for debugging agent behavior by inspecting the yielded events.

Code Reference

Source Location

  • Repository: Microsoft AutoGen
  • File: python/packages/autogen-agentchat/src/autogen_agentchat/agents/_assistant_agent.py (lines 901-1012 for on_messages_stream, lines 1118-1315 for _process_model_result)

Signature

class AssistantAgent:
    async def on_messages_stream(
        self,
        messages: Sequence[BaseChatMessage],
        cancellation_token: CancellationToken,
    ) -> AsyncGenerator[Union[BaseAgentEvent, BaseChatMessage, Response], None]:
        ...

Import

from autogen_agentchat.agents import AssistantAgent

I/O Contract

Inputs

Name Type Required Description
messages Sequence[BaseChatMessage] Yes The incoming messages to process. Typically includes user messages, handoff messages, or other chat messages that form the conversation context.
cancellation_token CancellationToken Yes A token that can be used to cancel the operation. Passed through to tool executions and LLM calls for cooperative cancellation.

Outputs (Yielded)

Name Type Description
ToolCallRequestEvent BaseAgentEvent Emitted when the LLM requests one or more tool calls. Contains the list of FunctionCall objects with function names and arguments.
ToolCallExecutionEvent BaseAgentEvent Emitted after tool calls are executed. Contains the list of FunctionExecutionResult objects with results.
ThoughtEvent BaseAgentEvent Emitted when the LLM produces a hidden thought (for reasoning models). Contains the thought text.
ModelClientStreamingChunkEvent BaseAgentEvent Emitted during streaming LLM calls as tokens are generated.
Sub-agent events BaseAgentEvent or BaseChatMessage Events emitted by sub-agent tools (AgentTool/TeamTool) during their execution.
Response Response The final yielded item, containing the chat message (TextMessage, ToolCallSummaryMessage, or HandoffMessage) and the full list of inner messages (audit trail).

Usage Examples

Basic Example

import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import TextMessage, ToolCallRequestEvent, ToolCallExecutionEvent
from autogen_core import CancellationToken
from autogen_ext.models.openai import OpenAIChatCompletionClient


async def get_weather(city: str) -> str:
    """Get the current weather."""
    return f"Sunny, 72F in {city}"


async def main():
    model_client = OpenAIChatCompletionClient(model="gpt-4o")
    agent = AssistantAgent(
        name="weather_agent",
        model_client=model_client,
        tools=[get_weather],
        reflect_on_tool_use=True,
    )

    messages = [TextMessage(content="What is the weather in Seattle?", source="user")]
    cancellation_token = CancellationToken()

    async for event in agent.on_messages_stream(messages, cancellation_token):
        if isinstance(event, ToolCallRequestEvent):
            print(f"Tool call requested: {event.content}")
        elif isinstance(event, ToolCallExecutionEvent):
            print(f"Tool result: {event.content}")
        elif isinstance(event, Response):
            print(f"Final response: {event.chat_message.content}")
            print(f"Inner messages count: {len(event.inner_messages or [])}")


asyncio.run(main())

Multi-Iteration Loop Example

import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import TextMessage, ToolCallRequestEvent
from autogen_core import CancellationToken
from autogen_ext.models.openai import OpenAIChatCompletionClient


async def search(query: str) -> str:
    """Search for information."""
    return f"Found results for: {query}"


async def analyze(data: str) -> str:
    """Analyze search results."""
    return f"Analysis of: {data}"


async def main():
    model_client = OpenAIChatCompletionClient(model="gpt-4o")
    agent = AssistantAgent(
        name="research_agent",
        model_client=model_client,
        tools=[search, analyze],
        max_tool_iterations=3,
        reflect_on_tool_use=True,
    )

    messages = [TextMessage(content="Research and analyze AI trends", source="user")]
    iteration_count = 0

    async for event in agent.on_messages_stream(messages, CancellationToken()):
        if isinstance(event, ToolCallRequestEvent):
            iteration_count += 1
            print(f"Iteration {iteration_count}: {[c.name for c in event.content]}")
        elif isinstance(event, Response):
            print(f"Completed after {iteration_count} tool iterations")
            print(f"Final: {event.chat_message.content}")


asyncio.run(main())

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment