Implementation:Microsoft Agent framework Custom Aggregator Callback
Overview
The Custom Aggregator Callback is a user-defined interface for implementing aggregation logic in concurrent multi-agent workflows within the Microsoft Agent framework. When multiple agents run in parallel, the framework collects their individual AgentExecutorResponse objects and passes them to a user-supplied async callback function. The callback synthesizes these responses into a single return value.
This implementation document describes the callback interface, the data structures involved, and provides concrete examples of how to author custom aggregators.
Interface
The aggregator callback is an async function conforming to one of two supported signatures:
# Option 1: Simple callback
async def aggregator(results: list[AgentExecutorResponse]) -> Any:
...
# Option 2: With context access
async def aggregator(results: list[AgentExecutorResponse], ctx: WorkflowContext) -> Any:
...
| Signature | Parameters | When to Use |
|---|---|---|
| Simple callback | results: list[AgentExecutorResponse] |
The aggregation logic depends only on the agent responses. No workflow-level state is needed. |
| Context-aware callback | results: list[AgentExecutorResponse], ctx: WorkflowContext |
The aggregation logic requires access to workflow metadata, configuration values, or shared state stored in the WorkflowContext.
|
The return type is unconstrained (Any). Users are encouraged to annotate a concrete return type for downstream type safety.
Data Structures
AgentExecutorResponse
Each element in the results list is an AgentExecutorResponse object with the following fields:
| Field | Type | Description |
|---|---|---|
.executor_id |
str |
A unique string identifying the agent executor that produced this response. Useful for labeling, filtering, or routing logic within the aggregator. |
.agent_response |
AgentResponse |
The structured response returned by the agent. Contains the primary output (accessible via .text) as well as any additional metadata the agent attached.
|
.full_conversation |
list[Message] |
The complete ordered list of messages exchanged between the workflow engine and the agent during this execution. Includes both system/user prompts and agent replies. |
Field Access Patterns
# Accessing individual response fields
for r in results:
executor_name: str = r.executor_id
output_text: str = r.agent_response.text
messages: list[Message] = r.full_conversation
Implementation Examples
Basic Labeled Summary
Concatenates each agent's output into a labeled, newline-separated summary string:
async def custom_aggregator(results: list[AgentExecutorResponse]) -> str:
summary_parts = []
for r in results:
agent_name = r.executor_id
response_text = r.agent_response.text
summary_parts.append(f"[{agent_name}]: {response_text}")
return "\n\n".join(summary_parts)
Majority-Vote Aggregator
Selects the most common response text across all agents (useful for consensus-based workflows):
from collections import Counter
async def majority_vote_aggregator(results: list[AgentExecutorResponse]) -> str:
votes = Counter(r.agent_response.text.strip() for r in results)
winner, count = votes.most_common(1)[0]
return winner
Context-Aware Weighted Aggregator
Uses the WorkflowContext to retrieve per-agent weight configuration and computes a weighted selection:
async def weighted_aggregator(
results: list[AgentExecutorResponse],
ctx: WorkflowContext,
) -> str:
weights = ctx.config.get("agent_weights", {})
scored = []
for r in results:
weight = weights.get(r.executor_id, 1.0)
scored.append((weight, r.agent_response.text))
scored.sort(key=lambda pair: pair[0], reverse=True)
return scored[0][1]
Structured Output Aggregator
Returns a dictionary mapping each executor to its output, preserving the full structure for downstream processing:
async def structured_aggregator(
results: list[AgentExecutorResponse],
) -> dict[str, dict]:
return {
r.executor_id: {
"text": r.agent_response.text,
"message_count": len(r.full_conversation),
}
for r in results
}
Error Handling
Aggregator callbacks should account for the possibility that individual agents may have failed or returned incomplete data:
async def safe_aggregator(results: list[AgentExecutorResponse]) -> str:
parts = []
for r in results:
try:
text = r.agent_response.text
if text:
parts.append(f"[{r.executor_id}]: {text}")
except AttributeError:
parts.append(f"[{r.executor_id}]: <no response>")
return "\n\n".join(parts) if parts else "No agent produced a valid response."
Integration Notes
| Aspect | Detail |
|---|---|
| Async requirement | All aggregator callbacks must be async functions (async def). The framework awaits the callback within its concurrent execution pipeline.
|
| Signature detection | The framework inspects the callback's signature at registration time to determine whether to pass the WorkflowContext as a second argument.
|
| Return type | Unrestricted. The aggregator may return strings, dictionaries, typed dataclass instances, or any other Python object. |
| Execution order | The order of AgentExecutorResponse objects in the results list is not guaranteed and may vary between runs. Do not rely on positional ordering.
|
| Thread safety | The callback runs in the async event loop. Avoid blocking I/O; use await for any asynchronous operations within the aggregator.
|
Related
- Principle:Microsoft_Agent_framework_Custom_Aggregation_Pattern -- The principle-level pattern describing the Custom Aggregation Pattern.