Principle:Microsoft Agent framework Workflow Result Processing
Template:Microsoft Agent framework Sidebar
| Property | Value |
|---|---|
| Principle Name | Workflow Result Processing |
| SDK | Microsoft Agent Framework |
| Repository | Microsoft Agent Framework |
| Source Reference | python/packages/core/agent_framework/_workflows/_workflow.py:L40-96
|
| Import | Returned from workflow.run()
|
| Domains | Agent_Architecture, Workflow_Engine |
Overview
Workflow Result Processing is a pattern for extracting outputs from completed workflow executions in the Microsoft Agent Framework. The WorkflowRunResult.get_outputs() method collects all data yielded via ctx.yield_output() during workflow execution, providing a unified interface for retrieving the artifacts produced by a workflow run. The default aggregator produces list[Message], while custom aggregators can produce any type.
Description
When a workflow completes execution, its results are encapsulated in a WorkflowRunResult object. This object stores the complete sequence of WorkflowEvent instances emitted during the run and provides accessor methods to extract meaningful information from them.
The central method is get_outputs(), which scans the event list for all outputs that were yielded by workflow steps via ctx.yield_output(). This decouples the production of outputs (which happens incrementally during workflow execution) from the consumption of outputs (which happens after the workflow completes).
Output Aggregation
The framework provides two aggregation strategies for collecting workflow outputs:
- Default aggregator: When no custom aggregator is configured,
get_outputs()returns alist[Message]. Each message in the list corresponds to an output yielded during workflow execution, preserving the order in which outputs were produced. - Custom aggregator: Developers can supply a custom aggregator function that transforms the raw output events into any desired type. This enables domain-specific result shapes such as structured reports, data frames, or summary objects.
Event Inspection
Beyond output extraction, the WorkflowRunResult provides additional introspection capabilities:
get_request_info_events(): Returns the subset of events that carry request-level metadata, useful for debugging and observability.get_final_state(): Returns the terminalWorkflowRunStateof the execution, indicating whether the workflow completed successfully, was cancelled, or encountered an error.
Theoretical Basis
The Workflow Result Processing pattern implements a Collector strategy. During execution, each workflow step may yield zero or more outputs via ctx.yield_output(). These outputs are recorded as events in the workflow's event stream. After execution completes, the WorkflowRunResult acts as a materialized view over this event stream, allowing callers to query outputs, metadata, and terminal state without needing to process the raw event sequence manually.
This separation of concerns means that:
- Workflow authors focus on producing outputs at the appropriate points in their logic via
ctx.yield_output(). - Workflow callers focus on consuming the aggregated results via
get_outputs(), without knowledge of the internal event structure. - Custom aggregators serve as a bridge, transforming the generic event stream into domain-specific result types.
I/O Contract
Inputs
| Input | Type | Description |
|---|---|---|
| Workflow execution | Completed workflow.run() call |
The workflow must have finished execution. The WorkflowRunResult is returned from await workflow.run().
|
| Yielded outputs | ctx.yield_output(value) calls |
Zero or more values yielded during workflow step execution. Each yielded value is captured as a WorkflowEvent.
|
Outputs
| Method | Return Type | Description |
|---|---|---|
get_outputs() |
list[Any] (default: list[Message]) |
All outputs yielded during workflow execution, collected in order. The concrete type depends on the configured aggregator. |
get_request_info_events() |
list[WorkflowEvent] |
The subset of workflow events carrying request-level metadata. |
get_final_state() |
WorkflowRunState |
The terminal state of the workflow execution (e.g., completed, cancelled, error). |
Usage
# Run a workflow and extract outputs using the default aggregator
result = await workflow.run("Create a report")
outputs = result.get_outputs()
for output in outputs:
if isinstance(output, list):
for msg in output:
print(f"[{msg.author_name}]: {msg.text}")
# Inspect the terminal state
state = result.get_final_state()
print(f"Final state: {state}")