Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Principle:Microsoft Agent framework Workflow Event Streaming

From Leeroopedia
Revision as of 17:54, 16 February 2026 by Admin (talk | contribs) (Auto-imported from principles/Microsoft_Agent_framework_Workflow_Event_Streaming.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)

Template:KapsoPrinciple

Summary

The WorkflowEvent type system provides a structured, typed mechanism for streaming real-time progress information during workflow execution. Each WorkflowEvent object carries a typed payload identified by an event type string, enabling consumers to observe, react to, and log every stage of a workflow's lifecycle as it happens.

Principle Statement

All workflow execution progress MUST be communicated through strongly-typed WorkflowEvent objects emitted as an asynchronous stream, where each event declares its category via a WorkflowEventType literal and carries a corresponding typed payload.

This ensures that:

  1. Every stage of workflow execution is observable in real time.
  2. Consumers can filter, route, and handle events based on well-known type discriminators.
  3. The event stream serves as the single source of truth for execution progress, replacing ad-hoc logging or polling.

Event Type Taxonomy

The WorkflowEventType literal union defines the full set of event categories emitted during workflow execution:

Event Type Category Description
"started" Lifecycle The workflow has begun execution.
"status" Lifecycle A general status update on the workflow's progress.
"failed" Lifecycle The workflow has terminated due to an unrecoverable error.
"output" Data The workflow has produced a final or intermediate output value.
"data" Data An arbitrary data payload emitted during execution.
"request_info" Data Metadata about a request being processed within the workflow.
"warning" Diagnostic A non-fatal warning encountered during execution.
"error" Diagnostic An error encountered during execution (may or may not be fatal).
"superstep_started" Orchestration A superstep (batch of parallel executor invocations) has begun.
"superstep_completed" Orchestration A superstep has completed.
"executor_invoked" Executor A specific executor has been invoked within the workflow.
"executor_completed" Executor A specific executor has finished successfully.
"executor_failed" Executor A specific executor has failed.

Design Rationale

Why a Typed Event Stream?

Traditional workflow engines often rely on opaque log messages or polling-based status checks. The WorkflowEvent approach provides several advantages:

  • Type safety -- Each event type has a corresponding payload type (DataT), enforced through the Generic[DataT] parameterization of the WorkflowEvent class. Consumers can narrow the type of event.data based on event.type.
  • Composability -- Because events are yielded through Python's async for protocol, consumers can compose standard async iteration patterns (filtering, batching, timeout) without special framework support.
  • Decoupling -- The event producer (the workflow engine) and event consumer (UI, logger, orchestrator) are fully decoupled. The producer does not need to know what the consumer will do with events.

Event Granularity

The type system distinguishes between four categories of events:

  1. Lifecycle events (started, status, failed) -- Track the overall workflow state machine.
  2. Data events (output, data, request_info) -- Carry payload values produced during execution.
  3. Diagnostic events (warning, error) -- Communicate problems without necessarily halting execution.
  4. Orchestration events (superstep_started, superstep_completed, executor_invoked, executor_completed, executor_failed) -- Provide fine-grained visibility into the internal scheduling of executors.

This granularity allows consumers to subscribe at the level of detail they need. A simple progress bar only needs lifecycle events; a debugging dashboard needs all of them.

Observability Patterns

Real-Time Logging

async for event in workflow.run("input", stream=True):
    logger.info(f"[{event.type}] executor={event.executor_id} request={event.request_id}")

Executor-Level Tracing

async for event in workflow.run("input", stream=True):
    if event.type == "executor_invoked":
        span = tracer.start_span(event.executor_id)
    elif event.type == "executor_completed":
        span.end()
    elif event.type == "executor_failed":
        span.set_status(StatusCode.ERROR)
        span.end()

Filtered Output Collection

outputs = []
async for event in workflow.run("input", stream=True):
    if event.type == "output":
        outputs.append(event.data)

Constraints and Guarantees

  • A "started" event is always the first event emitted.
  • A "failed" event, if emitted, is always the last event in the stream.
  • Every "executor_invoked" event is followed by exactly one "executor_completed" or "executor_failed" event for the same executor_id.
  • Every "superstep_started" event is followed by exactly one "superstep_completed" event.
  • The request_id field on events correlates events that belong to the same logical request within the workflow.

Related

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment