Implementation:Microsoft Autogen ChatAgent Protocol
| Key | Value |
|---|---|
| id | Microsoft_Autogen_ChatAgent_Protocol |
| source | Microsoft_Autogen |
| category | Protocol |
Overview
Description
The ChatAgent protocol defines the abstract interface that all chat agents must implement in the Autogen framework. It serves as the fundamental contract for any agent participating in chat-based conversations, whether individual agents or teams.
This protocol extends both ABC (Abstract Base Class), TaskRunner, and ComponentBase[BaseModel], establishing a comprehensive interface for agent lifecycle management, message handling, state persistence, and component serialization.
Key responsibilities defined by the protocol:
- Message Handling: Processing incoming messages and generating responses
- Streaming Support: Providing streaming responses with intermediate events
- State Management: Saving and loading agent state for persistence
- Lifecycle Management: Handling reset, pause, resume, and close operations
- Metadata: Exposing agent name, description, and produced message types
- Resource Management: Proper cleanup of agent resources
Usage
The ChatAgent protocol is used as the base interface for implementing custom agents. All agent implementations in Autogen (AssistantAgent, UserProxyAgent, etc.) must implement this protocol to participate in teams and conversations.
The protocol enforces:
- Consistent agent behavior across different implementations
- Support for both blocking and streaming message processing
- State serialization for checkpoint/restore scenarios
- Proper resource lifecycle management
Code Reference
Source Location
- Repository: https://github.com/microsoft/autogen
- File Path: /tmp/kapso_repo_2mr4n2g4/python/packages/autogen-agentchat/src/autogen_agentchat/base/_chat_agent.py
- Lines: 1-95
Signature
class ChatAgent(ABC, TaskRunner, ComponentBase[BaseModel]):
"""Protocol for a chat agent."""
component_type = "agent"
@property
@abstractmethod
def name(self) -> str: ...
@property
@abstractmethod
def description(self) -> str: ...
@property
@abstractmethod
def produced_message_types(self) -> Sequence[type[BaseChatMessage]]: ...
@abstractmethod
async def on_messages(
self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken
) -> Response: ...
@abstractmethod
def on_messages_stream(
self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken
) -> AsyncGenerator[BaseAgentEvent | BaseChatMessage | Response, None]: ...
@abstractmethod
async def on_reset(self, cancellation_token: CancellationToken) -> None: ...
@abstractmethod
async def on_pause(self, cancellation_token: CancellationToken) -> None: ...
@abstractmethod
async def on_resume(self, cancellation_token: CancellationToken) -> None: ...
@abstractmethod
async def save_state(self) -> Mapping[str, Any]: ...
@abstractmethod
async def load_state(self, state: Mapping[str, Any]) -> None: ...
@abstractmethod
async def close(self) -> None: ...
Import
from autogen_agentchat.base import ChatAgent, Response
I/O Contract
Required Properties
| Property | Type | Description |
|---|---|---|
| name | str | Unique identifier for the agent within a team |
| description | str | Description of agent capabilities and interaction patterns |
| produced_message_types | Sequence[type[BaseChatMessage]] | Types of messages the agent can produce in Response.chat_message |
Required Methods
| Method | Parameters | Returns | Description |
|---|---|---|---|
| on_messages | messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken | Response | Process incoming messages and return a response (blocking) |
| on_messages_stream | messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken | BaseChatMessage | Response, None] | Process messages and stream intermediate events, final item is Response |
| on_reset | cancellation_token: CancellationToken | None | Reset agent to initialization state |
| on_pause | cancellation_token: CancellationToken | None | Pause agent operations (may be called during on_messages) |
| on_resume | cancellation_token: CancellationToken | None | Resume paused agent operations |
| save_state | None | Mapping[str, Any] | Save current agent state for later restoration |
| load_state | state: Mapping[str, Any] | None | Restore agent from saved state |
| close | None | None | Release any resources held by the agent |
Response Dataclass
| Field | Type | Description |
|---|---|---|
| chat_message | SerializeAsAny[BaseChatMessage] | The main chat message produced by the agent |
| inner_messages | BaseChatMessage]] | None | Optional inner messages (events or messages) produced during processing |
Usage Examples
Implementing a Custom Agent
from typing import Any, AsyncGenerator, Mapping, Sequence
from autogen_core import CancellationToken
from autogen_agentchat.base import ChatAgent, Response
from autogen_agentchat.messages import BaseAgentEvent, BaseChatMessage, TextMessage
class CustomAgent(ChatAgent):
def __init__(self, name: str, description: str):
self._name = name
self._description = description
self._state = {}
@property
def name(self) -> str:
return self._name
@property
def description(self) -> str:
return self._description
@property
def produced_message_types(self) -> Sequence[type[BaseChatMessage]]:
return (TextMessage,)
async def on_messages(
self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken
) -> Response:
# Process messages and generate response
response_text = f"Processed {len(messages)} messages"
return Response(chat_message=TextMessage(content=response_text, source=self.name))
async def on_messages_stream(
self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken
) -> AsyncGenerator[BaseAgentEvent | BaseChatMessage | Response, None]:
# Yield intermediate events/messages, then final Response
response = await self.on_messages(messages, cancellation_token)
yield response
async def on_reset(self, cancellation_token: CancellationToken) -> None:
self._state = {}
async def on_pause(self, cancellation_token: CancellationToken) -> None:
pass # Implement pause logic
async def on_resume(self, cancellation_token: CancellationToken) -> None:
pass # Implement resume logic
async def save_state(self) -> Mapping[str, Any]:
return {"state": self._state}
async def load_state(self, state: Mapping[str, Any]) -> None:
self._state = state.get("state", {})
async def close(self) -> None:
pass # Cleanup resources
Using an Agent
import asyncio
from autogen_core import CancellationToken
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.messages import TextMessage
async def use_agent():
agent = AssistantAgent("assistant", model_client=model_client)
# Process messages
messages = [TextMessage(content="Hello", source="user")]
response = await agent.on_messages(messages, CancellationToken())
print(f"Response: {response.chat_message.content}")
# Stream messages
async for event in agent.on_messages_stream(messages, CancellationToken()):
if isinstance(event, Response):
print(f"Final response: {event.chat_message.content}")
else:
print(f"Event: {event}")
# Save and restore state
state = await agent.save_state()
await agent.on_reset(CancellationToken())
await agent.load_state(state)
# Cleanup
await agent.close()
State Management
async def checkpoint_restore_example(agent: ChatAgent):
# Save checkpoint before processing
checkpoint = await agent.save_state()
try:
# Process messages
messages = [TextMessage(content="Task", source="user")]
response = await agent.on_messages(messages, CancellationToken())
except Exception as e:
# Restore from checkpoint on error
await agent.load_state(checkpoint)
raise
Lifecycle Management
async def agent_lifecycle(agent: ChatAgent):
try:
# Use agent
messages = [TextMessage(content="Query", source="user")]
response = await agent.on_messages(messages, CancellationToken())
# Pause if needed
await agent.on_pause(CancellationToken())
# Resume later
await agent.on_resume(CancellationToken())
# Reset to initial state
await agent.on_reset(CancellationToken())
finally:
# Always cleanup
await agent.close()
Related Pages
- Microsoft_Autogen_UserProxyAgent - Implementation of ChatAgent for human input
- Microsoft_Autogen_AssistantAgent - Implementation of ChatAgent using LLM
- Microsoft_Autogen_Response - Response dataclass returned by on_messages
- Microsoft_Autogen_BaseChatMessage - Base class for chat messages
- Microsoft_Autogen_BaseAgentEvent - Base class for agent events
- Microsoft_Autogen_TaskRunner - Base protocol for task execution
- Microsoft_Autogen_ComponentBase - Base class for serializable components
- Microsoft_Autogen_CancellationToken - Token for cancellation support