Principle:Microsoft Agent framework Workflow Event Streaming
Summary
The WorkflowEvent type system provides a structured, typed mechanism for streaming real-time progress information during workflow execution. Each WorkflowEvent object carries a typed payload identified by an event type string, enabling consumers to observe, react to, and log every stage of a workflow's lifecycle as it happens.
Principle Statement
All workflow execution progress MUST be communicated through strongly-typed WorkflowEvent objects emitted as an asynchronous stream, where each event declares its category via a WorkflowEventType literal and carries a corresponding typed payload.
This ensures that:
- Every stage of workflow execution is observable in real time.
- Consumers can filter, route, and handle events based on well-known type discriminators.
- The event stream serves as the single source of truth for execution progress, replacing ad-hoc logging or polling.
Event Type Taxonomy
The WorkflowEventType literal union defines the full set of event categories emitted during workflow execution:
| Event Type | Category | Description |
|---|---|---|
"started" |
Lifecycle | The workflow has begun execution. |
"status" |
Lifecycle | A general status update on the workflow's progress. |
"failed" |
Lifecycle | The workflow has terminated due to an unrecoverable error. |
"output" |
Data | The workflow has produced a final or intermediate output value. |
"data" |
Data | An arbitrary data payload emitted during execution. |
"request_info" |
Data | Metadata about a request being processed within the workflow. |
"warning" |
Diagnostic | A non-fatal warning encountered during execution. |
"error" |
Diagnostic | An error encountered during execution (may or may not be fatal). |
"superstep_started" |
Orchestration | A superstep (batch of parallel executor invocations) has begun. |
"superstep_completed" |
Orchestration | A superstep has completed. |
"executor_invoked" |
Executor | A specific executor has been invoked within the workflow. |
"executor_completed" |
Executor | A specific executor has finished successfully. |
"executor_failed" |
Executor | A specific executor has failed. |
Design Rationale
Why a Typed Event Stream?
Traditional workflow engines often rely on opaque log messages or polling-based status checks. The WorkflowEvent approach provides several advantages:
- Type safety -- Each event type has a corresponding payload type (
DataT), enforced through theGeneric[DataT]parameterization of theWorkflowEventclass. Consumers can narrow the type ofevent.databased onevent.type. - Composability -- Because events are yielded through Python's
async forprotocol, consumers can compose standard async iteration patterns (filtering, batching, timeout) without special framework support. - Decoupling -- The event producer (the workflow engine) and event consumer (UI, logger, orchestrator) are fully decoupled. The producer does not need to know what the consumer will do with events.
Event Granularity
The type system distinguishes between four categories of events:
- Lifecycle events (
started,status,failed) -- Track the overall workflow state machine. - Data events (
output,data,request_info) -- Carry payload values produced during execution. - Diagnostic events (
warning,error) -- Communicate problems without necessarily halting execution. - Orchestration events (
superstep_started,superstep_completed,executor_invoked,executor_completed,executor_failed) -- Provide fine-grained visibility into the internal scheduling of executors.
This granularity allows consumers to subscribe at the level of detail they need. A simple progress bar only needs lifecycle events; a debugging dashboard needs all of them.
Observability Patterns
Real-Time Logging
async for event in workflow.run("input", stream=True):
logger.info(f"[{event.type}] executor={event.executor_id} request={event.request_id}")
Executor-Level Tracing
async for event in workflow.run("input", stream=True):
if event.type == "executor_invoked":
span = tracer.start_span(event.executor_id)
elif event.type == "executor_completed":
span.end()
elif event.type == "executor_failed":
span.set_status(StatusCode.ERROR)
span.end()
Filtered Output Collection
outputs = []
async for event in workflow.run("input", stream=True):
if event.type == "output":
outputs.append(event.data)
Constraints and Guarantees
- A
"started"event is always the first event emitted. - A
"failed"event, if emitted, is always the last event in the stream. - Every
"executor_invoked"event is followed by exactly one"executor_completed"or"executor_failed"event for the sameexecutor_id. - Every
"superstep_started"event is followed by exactly one"superstep_completed"event. - The
request_idfield on events correlates events that belong to the same logical request within the workflow.
Related
- Implementation:Microsoft_Agent_framework_WorkflowEvent_Class -- The concrete
WorkflowEventgeneric class andWorkflowEventTypeliteral definition.