Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Microsoft Agent framework WorkflowEvent Class

From Leeroopedia

Template:KapsoImplementation

Summary

The WorkflowEvent class is a generic, immutable data carrier that represents a single event emitted during workflow execution. It is parameterized over its payload type via Generic[DataT] and tagged with a WorkflowEventType literal discriminator, enabling type-safe consumption of heterogeneous event streams.

Source Location

Property Value
File python/packages/core/agent_framework/_workflows/_events.py
Lines L131-426
Class class WorkflowEvent(Generic[DataT]):
Import from agent_framework import WorkflowEvent

I/O Contract

Input

WorkflowEvent instances are constructed internally by the workflow engine. They are not created directly by user code. The engine populates each event with:

Parameter Type Description
type WorkflowEventType The event category discriminator (see table below).
data DataT The typed payload carried by the event. The concrete type depends on type.
executor_id str Identifier of the executor that produced or is associated with this event.
request_id str Identifier correlating events to a specific request within the workflow.
request_type type The type of the request object associated with this event.
response_type type The type of the response object associated with this event.

Output

WorkflowEvent objects are yielded as an asynchronous stream from workflow.run(..., stream=True). Each object is read-only and exposes the properties listed above. The stream terminates when the workflow completes or fails.

Output Type Description
Event stream AsyncIterator[WorkflowEvent[DataT]] Asynchronous iterator of typed workflow events, consumed via async for.

WorkflowEventType

The WorkflowEventType is a Literal union of the following string values:

Type Literal Category Payload (DataT) Description
"started" Lifecycle Workflow metadata Workflow execution has begun.
"status" Lifecycle Status message or object General progress update.
"failed" Lifecycle Error or exception info Workflow terminated with failure.
"output" Data Workflow output value A final or intermediate output has been produced.
"data" Data Arbitrary data An arbitrary data payload emitted during execution.
"request_info" Data Request metadata Information about a request being processed.
"warning" Diagnostic Warning message Non-fatal warning during execution.
"error" Diagnostic Error message Error during execution (may or may not be fatal).
"superstep_started" Orchestration Superstep metadata A superstep (batch of parallel executors) has begun.
"superstep_completed" Orchestration Superstep result A superstep has completed.
"executor_invoked" Executor Executor invocation info A specific executor has been invoked.
"executor_completed" Executor Executor result A specific executor has completed successfully.
"executor_failed" Executor Executor error info A specific executor has failed.

Class Definition

class WorkflowEvent(Generic[DataT]):
    """A single typed event emitted during workflow execution.

    WorkflowEvent carries a discriminated payload identified by a
    WorkflowEventType literal. Consumers iterate over the async
    stream of events to observe execution progress in real time.

    Attributes:
        type: The event category (e.g. "started", "executor_invoked").
        data: The typed payload whose concrete type corresponds to `type`.
        executor_id: Identifier of the associated executor.
        request_id: Correlation identifier for the originating request.
        request_type: The type of the request object.
        response_type: The type of the response object.
    """

    type: WorkflowEventType
    data: DataT
    executor_id: str
    request_id: str
    request_type: type
    response_type: type

Key Properties

.type (WorkflowEventType)

The type property is the discriminator tag that identifies which category of event this object represents. It is one of the 13 string literals defined in WorkflowEventType. Consumers should use this property in conditional branches to narrow the payload type:

if event.type == "executor_invoked":
    # event.data is executor invocation info
    handle_invocation(event.data)
elif event.type == "output":
    # event.data is the workflow output value
    handle_output(event.data)

.data (DataT)

The data property holds the typed payload. Its concrete type is determined by the value of event.type. Because WorkflowEvent is Generic[DataT], static type checkers can narrow DataT when the event type is known.

.executor_id (str)

The executor_id property identifies which executor within the workflow produced or is associated with this event. This is particularly relevant for "executor_invoked", "executor_completed", and "executor_failed" events, but may be populated on other event types when the context is executor-specific.

.request_id (str)

The request_id property is a correlation identifier that links events to a specific request within the workflow. Multiple events may share the same request_id, enabling consumers to reconstruct the full lifecycle of a single request.

.request_type and .response_type

The request_type and response_type properties expose the Python types of the request and response objects associated with the event. These are useful for introspection, serialization, and schema-driven tooling.

Usage Example

The following example demonstrates consuming the event stream to observe executor lifecycle and collect outputs:

from agent_framework import WorkflowEvent

async for event in workflow.run("hello", stream=True):
    if event.type == "executor_invoked":
        print(f"Executor {event.executor_id} started")
    elif event.type == "executor_completed":
        print(f"Executor {event.executor_id} finished")
    elif event.type == "output":
        print(f"Output: {event.data}")
    elif event.type == "request_info":
        print(f"Request: {event.data} (id={event.request_id})")

Example Output

Executor planner started
Executor planner finished
Executor summarizer started
Request: SummarizeRequest(text='...') (id=req-abc-123)
Executor summarizer finished
Output: Summary of the workflow result

Implementation Notes

  • Immutability -- WorkflowEvent instances are effectively immutable once created. All properties are set at construction time by the workflow engine and are not intended to be modified by consumers.
  • Generic parameterization -- The Generic[DataT] base allows the event's data property to be statically typed. When the workflow engine constructs events, it provides the concrete type for DataT based on the event type being emitted.
  • Async iteration protocol -- Events are yielded through Python's AsyncIterator protocol, meaning consumers use async for to receive events as they are produced, with no buffering or batching required.

Related

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment