Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Microsoft Agent framework Executor Base Class

From Leeroopedia
Property Value
Implementation Name Executor Base Class
SDK Microsoft Agent Framework
Repository Microsoft Agent Framework
Source File python/packages/core/agent_framework/_workflows/_executor.py
Line Range L29-169
Import from agent_framework import Executor
Type Base class

Overview

The Executor Base Class is the foundational building block for defining workflow nodes in the Microsoft Agent Framework. The Executor class inherits from RequestInfoMixin and DictConvertible, providing each workflow node with request metadata access and dictionary serialization support. Users subclass Executor, implement one or more @handler methods that receive typed input and a WorkflowContext, and use ctx.send_message() to route results to downstream nodes or ctx.yield_output() to surface results to the workflow caller.

Code Reference

Source Location

Property Value
File python/packages/core/agent_framework/_workflows/_executor.py
Class Executor
Base Classes RequestInfoMixin, DictConvertible
Lines 29-169

Signature

class Executor(RequestInfoMixin, DictConvertible):
    def __init__(self, id: str):
        super().__init__(id=id)

Import Statement

from agent_framework import Executor

I/O Contract

Handler Inputs

Parameter Type Description
self Executor (subclass) The Executor instance. Instance-level state is available across handler invocations within a workflow run.
typed input User-defined (e.g., str, dict, custom class) The message routed to this handler by the workflow engine. The type is determined by the handler method's type annotation and used by the engine for type-based dispatch.
ctx WorkflowContext[T] The workflow execution context. Provides methods for sending messages downstream and yielding output to the workflow caller. The generic parameter T matches the handler's output type.

Handler Outputs

Method Type Description
ctx.send_message(result) Any (typed by downstream handler) Sends a result message to connected downstream Executor nodes in the workflow graph. The message type must match what the downstream handler expects.
ctx.yield_output(result) Any (typed by workflow output) Yields a result as workflow-level output, making it available to the caller of workflow.run(). Use this for terminal nodes that produce final results.
return None Handler methods return None. All output is routed through the context object rather than return values.

Key Methods and Attributes

Member Source Description
id Constructor parameter Unique identifier for this Executor within the workflow graph. Used by the engine for routing and debugging.
@handler Decorator Marks an async method as a message handler. The engine discovers all @handler methods at registration time and routes messages based on the handler's input type annotation.
to_dict() DictConvertible Serializes the Executor's state to a dictionary representation for persistence, transport, or debugging.
from_dict() DictConvertible Reconstructs an Executor instance from a dictionary representation.

Usage Examples

Basic Executor with Single Handler

from agent_framework import Executor, handler, WorkflowContext

class UpperCase(Executor):
    def __init__(self, id: str):
        super().__init__(id=id)

    @handler
    async def to_upper(self, text: str, ctx: WorkflowContext[str]) -> None:
        result = text.upper()
        await ctx.send_message(result)

Executor Yielding Workflow Output

from agent_framework import Executor, handler, WorkflowContext

class FinalFormatter(Executor):
    def __init__(self, id: str):
        super().__init__(id=id)

    @handler
    async def format_output(self, text: str, ctx: WorkflowContext[str]) -> None:
        formatted = f"=== Result ===\n{text}"
        await ctx.yield_output(formatted)

Executor with Stateful Accumulation

from agent_framework import Executor, handler, WorkflowContext

class Counter(Executor):
    def __init__(self, id: str):
        super().__init__(id=id)
        self._count = 0

    @handler
    async def count_message(self, text: str, ctx: WorkflowContext[int]) -> None:
        self._count += 1
        await ctx.send_message(self._count)

Wiring an Executor into a Workflow

from agent_framework import Executor, handler, WorkflowContext
from agent_framework.orchestrations import SequentialBuilder

class Greet(Executor):
    def __init__(self, id: str):
        super().__init__(id=id)

    @handler
    async def greet(self, name: str, ctx: WorkflowContext[str]) -> None:
        await ctx.send_message(f"Hello, {name}!")

class Shout(Executor):
    def __init__(self, id: str):
        super().__init__(id=id)

    @handler
    async def shout(self, text: str, ctx: WorkflowContext[str]) -> None:
        await ctx.yield_output(text.upper())

workflow = SequentialBuilder(participants=[
    Greet(id="greeter"),
    Shout(id="shouter"),
]).build()

result = await workflow.run("World")
# Output: "HELLO, WORLD!"

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment