Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Microsoft Autogen ChatAgent Protocol

From Leeroopedia
Key Value
id Microsoft_Autogen_ChatAgent_Protocol
source Microsoft_Autogen
category Protocol

Overview

Description

The ChatAgent protocol defines the abstract interface that all chat agents must implement in the Autogen framework. It serves as the fundamental contract for any agent participating in chat-based conversations, whether individual agents or teams.

This protocol extends both ABC (Abstract Base Class), TaskRunner, and ComponentBase[BaseModel], establishing a comprehensive interface for agent lifecycle management, message handling, state persistence, and component serialization.

Key responsibilities defined by the protocol:

  • Message Handling: Processing incoming messages and generating responses
  • Streaming Support: Providing streaming responses with intermediate events
  • State Management: Saving and loading agent state for persistence
  • Lifecycle Management: Handling reset, pause, resume, and close operations
  • Metadata: Exposing agent name, description, and produced message types
  • Resource Management: Proper cleanup of agent resources

Usage

The ChatAgent protocol is used as the base interface for implementing custom agents. All agent implementations in Autogen (AssistantAgent, UserProxyAgent, etc.) must implement this protocol to participate in teams and conversations.

The protocol enforces:

  • Consistent agent behavior across different implementations
  • Support for both blocking and streaming message processing
  • State serialization for checkpoint/restore scenarios
  • Proper resource lifecycle management

Code Reference

Source Location

Signature

class ChatAgent(ABC, TaskRunner, ComponentBase[BaseModel]):
    """Protocol for a chat agent."""

    component_type = "agent"

    @property
    @abstractmethod
    def name(self) -> str: ...

    @property
    @abstractmethod
    def description(self) -> str: ...

    @property
    @abstractmethod
    def produced_message_types(self) -> Sequence[type[BaseChatMessage]]: ...

    @abstractmethod
    async def on_messages(
        self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken
    ) -> Response: ...

    @abstractmethod
    def on_messages_stream(
        self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken
    ) -> AsyncGenerator[BaseAgentEvent | BaseChatMessage | Response, None]: ...

    @abstractmethod
    async def on_reset(self, cancellation_token: CancellationToken) -> None: ...

    @abstractmethod
    async def on_pause(self, cancellation_token: CancellationToken) -> None: ...

    @abstractmethod
    async def on_resume(self, cancellation_token: CancellationToken) -> None: ...

    @abstractmethod
    async def save_state(self) -> Mapping[str, Any]: ...

    @abstractmethod
    async def load_state(self, state: Mapping[str, Any]) -> None: ...

    @abstractmethod
    async def close(self) -> None: ...

Import

from autogen_agentchat.base import ChatAgent, Response

I/O Contract

Required Properties

Property Type Description
name str Unique identifier for the agent within a team
description str Description of agent capabilities and interaction patterns
produced_message_types Sequence[type[BaseChatMessage]] Types of messages the agent can produce in Response.chat_message

Required Methods

Method Parameters Returns Description
on_messages messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken Response Process incoming messages and return a response (blocking)
on_messages_stream messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken BaseChatMessage | Response, None] Process messages and stream intermediate events, final item is Response
on_reset cancellation_token: CancellationToken None Reset agent to initialization state
on_pause cancellation_token: CancellationToken None Pause agent operations (may be called during on_messages)
on_resume cancellation_token: CancellationToken None Resume paused agent operations
save_state None Mapping[str, Any] Save current agent state for later restoration
load_state state: Mapping[str, Any] None Restore agent from saved state
close None None Release any resources held by the agent

Response Dataclass

Field Type Description
chat_message SerializeAsAny[BaseChatMessage] The main chat message produced by the agent
inner_messages BaseChatMessage]] | None Optional inner messages (events or messages) produced during processing

Usage Examples

Implementing a Custom Agent

from typing import Any, AsyncGenerator, Mapping, Sequence
from autogen_core import CancellationToken
from autogen_agentchat.base import ChatAgent, Response
from autogen_agentchat.messages import BaseAgentEvent, BaseChatMessage, TextMessage


class CustomAgent(ChatAgent):
    def __init__(self, name: str, description: str):
        self._name = name
        self._description = description
        self._state = {}

    @property
    def name(self) -> str:
        return self._name

    @property
    def description(self) -> str:
        return self._description

    @property
    def produced_message_types(self) -> Sequence[type[BaseChatMessage]]:
        return (TextMessage,)

    async def on_messages(
        self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken
    ) -> Response:
        # Process messages and generate response
        response_text = f"Processed {len(messages)} messages"
        return Response(chat_message=TextMessage(content=response_text, source=self.name))

    async def on_messages_stream(
        self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken
    ) -> AsyncGenerator[BaseAgentEvent | BaseChatMessage | Response, None]:
        # Yield intermediate events/messages, then final Response
        response = await self.on_messages(messages, cancellation_token)
        yield response

    async def on_reset(self, cancellation_token: CancellationToken) -> None:
        self._state = {}

    async def on_pause(self, cancellation_token: CancellationToken) -> None:
        pass  # Implement pause logic

    async def on_resume(self, cancellation_token: CancellationToken) -> None:
        pass  # Implement resume logic

    async def save_state(self) -> Mapping[str, Any]:
        return {"state": self._state}

    async def load_state(self, state: Mapping[str, Any]) -> None:
        self._state = state.get("state", {})

    async def close(self) -> None:
        pass  # Cleanup resources

Using an Agent

import asyncio
from autogen_core import CancellationToken
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.messages import TextMessage


async def use_agent():
    agent = AssistantAgent("assistant", model_client=model_client)

    # Process messages
    messages = [TextMessage(content="Hello", source="user")]
    response = await agent.on_messages(messages, CancellationToken())

    print(f"Response: {response.chat_message.content}")

    # Stream messages
    async for event in agent.on_messages_stream(messages, CancellationToken()):
        if isinstance(event, Response):
            print(f"Final response: {event.chat_message.content}")
        else:
            print(f"Event: {event}")

    # Save and restore state
    state = await agent.save_state()
    await agent.on_reset(CancellationToken())
    await agent.load_state(state)

    # Cleanup
    await agent.close()

State Management

async def checkpoint_restore_example(agent: ChatAgent):
    # Save checkpoint before processing
    checkpoint = await agent.save_state()

    try:
        # Process messages
        messages = [TextMessage(content="Task", source="user")]
        response = await agent.on_messages(messages, CancellationToken())
    except Exception as e:
        # Restore from checkpoint on error
        await agent.load_state(checkpoint)
        raise

Lifecycle Management

async def agent_lifecycle(agent: ChatAgent):
    try:
        # Use agent
        messages = [TextMessage(content="Query", source="user")]
        response = await agent.on_messages(messages, CancellationToken())

        # Pause if needed
        await agent.on_pause(CancellationToken())

        # Resume later
        await agent.on_resume(CancellationToken())

        # Reset to initial state
        await agent.on_reset(CancellationToken())
    finally:
        # Always cleanup
        await agent.close()

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment