Implementation:Microsoft Autogen AssistantAgent On Messages Stream
| Knowledge Sources | |
|---|---|
| Domains | Tool Use, Agent Execution, LLM Agents, Iterative Reasoning |
| Last Updated | 2026-02-11 00:00 GMT |
Overview
Concrete implementation of the tool execution loop within AssistantAgent provided by Microsoft AutoGen, processing messages through iterative LLM inference and tool calling via the on_messages_stream method and the internal _process_model_result method.
Description
The on_messages_stream method is the core entry point for processing messages in an AssistantAgent. It implements the full tool execution loop as an async generator that yields events and messages as they occur.
The method proceeds through five major steps:
- Context setup: Add incoming messages to the model context and update context with any relevant memory.
- Initial inference: Call the LLM with the current context and tool schemas. The LLM produces either a text response or tool call requests.
- Tool execution loop: If the LLM requested tool calls, enter the
_process_model_resultloop that iterates up tomax_tool_iterationstimes:- Emit a
ToolCallRequestEventwith the requested function calls. - Execute all tool calls concurrently via
asyncio.gather, streaming any sub-events from tool execution. - Emit a
ToolCallExecutionEventwith the results. - Check for handoffs (if a tool call triggers a handoff, terminate immediately).
- If not the last iteration, call the LLM again with the updated context (including tool results) to see if more tool calls are needed.
- Emit a
- Post-loop processing: After the loop ends, either reflect on tool results (additional LLM call without tools) or format a summary using the configured template.
- Response assembly: Yield a
Responseobject containing the final chat message and all inner messages (the full audit trail of events).
The method supports both streaming and non-streaming LLM calls, controlled by the model_client_stream configuration. During streaming, ModelClientStreamingChunkEvent events are yielded as the LLM generates tokens.
Usage
This method is called internally when the agent participates in a team or when run()/run_stream() is invoked. Consume the async generator to receive real-time events during tool execution. Use this for building UIs that display tool call progress, or for debugging agent behavior by inspecting the yielded events.
Code Reference
Source Location
- Repository: Microsoft AutoGen
- File:
python/packages/autogen-agentchat/src/autogen_agentchat/agents/_assistant_agent.py(lines 901-1012 foron_messages_stream, lines 1118-1315 for_process_model_result)
Signature
class AssistantAgent:
async def on_messages_stream(
self,
messages: Sequence[BaseChatMessage],
cancellation_token: CancellationToken,
) -> AsyncGenerator[Union[BaseAgentEvent, BaseChatMessage, Response], None]:
...
Import
from autogen_agentchat.agents import AssistantAgent
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| messages | Sequence[BaseChatMessage] | Yes | The incoming messages to process. Typically includes user messages, handoff messages, or other chat messages that form the conversation context. |
| cancellation_token | CancellationToken | Yes | A token that can be used to cancel the operation. Passed through to tool executions and LLM calls for cooperative cancellation. |
Outputs (Yielded)
| Name | Type | Description |
|---|---|---|
| ToolCallRequestEvent | BaseAgentEvent | Emitted when the LLM requests one or more tool calls. Contains the list of FunctionCall objects with function names and arguments.
|
| ToolCallExecutionEvent | BaseAgentEvent | Emitted after tool calls are executed. Contains the list of FunctionExecutionResult objects with results.
|
| ThoughtEvent | BaseAgentEvent | Emitted when the LLM produces a hidden thought (for reasoning models). Contains the thought text. |
| ModelClientStreamingChunkEvent | BaseAgentEvent | Emitted during streaming LLM calls as tokens are generated. |
| Sub-agent events | BaseAgentEvent or BaseChatMessage | Events emitted by sub-agent tools (AgentTool/TeamTool) during their execution. |
| Response | Response | The final yielded item, containing the chat message (TextMessage, ToolCallSummaryMessage, or HandoffMessage) and the full list of inner messages (audit trail). |
Usage Examples
Basic Example
import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import TextMessage, ToolCallRequestEvent, ToolCallExecutionEvent
from autogen_core import CancellationToken
from autogen_ext.models.openai import OpenAIChatCompletionClient
async def get_weather(city: str) -> str:
"""Get the current weather."""
return f"Sunny, 72F in {city}"
async def main():
model_client = OpenAIChatCompletionClient(model="gpt-4o")
agent = AssistantAgent(
name="weather_agent",
model_client=model_client,
tools=[get_weather],
reflect_on_tool_use=True,
)
messages = [TextMessage(content="What is the weather in Seattle?", source="user")]
cancellation_token = CancellationToken()
async for event in agent.on_messages_stream(messages, cancellation_token):
if isinstance(event, ToolCallRequestEvent):
print(f"Tool call requested: {event.content}")
elif isinstance(event, ToolCallExecutionEvent):
print(f"Tool result: {event.content}")
elif isinstance(event, Response):
print(f"Final response: {event.chat_message.content}")
print(f"Inner messages count: {len(event.inner_messages or [])}")
asyncio.run(main())
Multi-Iteration Loop Example
import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import TextMessage, ToolCallRequestEvent
from autogen_core import CancellationToken
from autogen_ext.models.openai import OpenAIChatCompletionClient
async def search(query: str) -> str:
"""Search for information."""
return f"Found results for: {query}"
async def analyze(data: str) -> str:
"""Analyze search results."""
return f"Analysis of: {data}"
async def main():
model_client = OpenAIChatCompletionClient(model="gpt-4o")
agent = AssistantAgent(
name="research_agent",
model_client=model_client,
tools=[search, analyze],
max_tool_iterations=3,
reflect_on_tool_use=True,
)
messages = [TextMessage(content="Research and analyze AI trends", source="user")]
iteration_count = 0
async for event in agent.on_messages_stream(messages, CancellationToken()):
if isinstance(event, ToolCallRequestEvent):
iteration_count += 1
print(f"Iteration {iteration_count}: {[c.name for c in event.content]}")
elif isinstance(event, Response):
print(f"Completed after {iteration_count} tool iterations")
print(f"Final: {event.chat_message.content}")
asyncio.run(main())