Implementation:Microsoft Agent framework Executor Base Class
Appearance
| Property | Value |
|---|---|
| Implementation Name | Executor Base Class |
| SDK | Microsoft Agent Framework |
| Repository | Microsoft Agent Framework |
| Source File | python/packages/core/agent_framework/_workflows/_executor.py
|
| Line Range | L29-169 |
| Import | from agent_framework import Executor
|
| Type | Base class |
Overview
The Executor Base Class is the foundational building block for defining workflow nodes in the Microsoft Agent Framework. The Executor class inherits from RequestInfoMixin and DictConvertible, providing each workflow node with request metadata access and dictionary serialization support. Users subclass Executor, implement one or more @handler methods that receive typed input and a WorkflowContext, and use ctx.send_message() to route results to downstream nodes or ctx.yield_output() to surface results to the workflow caller.
Code Reference
Source Location
| Property | Value |
|---|---|
| File | python/packages/core/agent_framework/_workflows/_executor.py
|
| Class | Executor
|
| Base Classes | RequestInfoMixin, DictConvertible
|
| Lines | 29-169 |
Signature
class Executor(RequestInfoMixin, DictConvertible):
def __init__(self, id: str):
super().__init__(id=id)
Import Statement
from agent_framework import Executor
I/O Contract
Handler Inputs
| Parameter | Type | Description |
|---|---|---|
self |
Executor (subclass) |
The Executor instance. Instance-level state is available across handler invocations within a workflow run. |
| typed input | User-defined (e.g., str, dict, custom class) |
The message routed to this handler by the workflow engine. The type is determined by the handler method's type annotation and used by the engine for type-based dispatch. |
ctx |
WorkflowContext[T] |
The workflow execution context. Provides methods for sending messages downstream and yielding output to the workflow caller. The generic parameter T matches the handler's output type.
|
Handler Outputs
| Method | Type | Description |
|---|---|---|
ctx.send_message(result) |
Any (typed by downstream handler) |
Sends a result message to connected downstream Executor nodes in the workflow graph. The message type must match what the downstream handler expects. |
ctx.yield_output(result) |
Any (typed by workflow output) |
Yields a result as workflow-level output, making it available to the caller of workflow.run(). Use this for terminal nodes that produce final results.
|
| return | None |
Handler methods return None. All output is routed through the context object rather than return values.
|
Key Methods and Attributes
| Member | Source | Description |
|---|---|---|
id |
Constructor parameter | Unique identifier for this Executor within the workflow graph. Used by the engine for routing and debugging. |
@handler |
Decorator | Marks an async method as a message handler. The engine discovers all @handler methods at registration time and routes messages based on the handler's input type annotation.
|
to_dict() |
DictConvertible |
Serializes the Executor's state to a dictionary representation for persistence, transport, or debugging. |
from_dict() |
DictConvertible |
Reconstructs an Executor instance from a dictionary representation. |
Usage Examples
Basic Executor with Single Handler
from agent_framework import Executor, handler, WorkflowContext
class UpperCase(Executor):
def __init__(self, id: str):
super().__init__(id=id)
@handler
async def to_upper(self, text: str, ctx: WorkflowContext[str]) -> None:
result = text.upper()
await ctx.send_message(result)
Executor Yielding Workflow Output
from agent_framework import Executor, handler, WorkflowContext
class FinalFormatter(Executor):
def __init__(self, id: str):
super().__init__(id=id)
@handler
async def format_output(self, text: str, ctx: WorkflowContext[str]) -> None:
formatted = f"=== Result ===\n{text}"
await ctx.yield_output(formatted)
Executor with Stateful Accumulation
from agent_framework import Executor, handler, WorkflowContext
class Counter(Executor):
def __init__(self, id: str):
super().__init__(id=id)
self._count = 0
@handler
async def count_message(self, text: str, ctx: WorkflowContext[int]) -> None:
self._count += 1
await ctx.send_message(self._count)
Wiring an Executor into a Workflow
from agent_framework import Executor, handler, WorkflowContext
from agent_framework.orchestrations import SequentialBuilder
class Greet(Executor):
def __init__(self, id: str):
super().__init__(id=id)
@handler
async def greet(self, name: str, ctx: WorkflowContext[str]) -> None:
await ctx.send_message(f"Hello, {name}!")
class Shout(Executor):
def __init__(self, id: str):
super().__init__(id=id)
@handler
async def shout(self, text: str, ctx: WorkflowContext[str]) -> None:
await ctx.yield_output(text.upper())
workflow = SequentialBuilder(participants=[
Greet(id="greeter"),
Shout(id="shouter"),
]).build()
result = await workflow.run("World")
# Output: "HELLO, WORLD!"
Related Pages
Page Connections
Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment