Workflow:Microsoft Agent framework Graph Based Workflow Execution
| Knowledge Sources | |
|---|---|
| Domains | AI_Agents, Workflow_Orchestration, LLM_Ops, Python |
| Last Updated | 2026-02-11 17:00 GMT |
Overview
End-to-end process for building custom graph-based workflows using executors, typed edges, conditional routing, parallel fan-out/fan-in, human-in-the-loop pauses, and streaming event observation.
Description
This workflow covers the low-level graph-based workflow system at the core of the Microsoft Agent Framework. Unlike the high-level builder APIs (SequentialBuilder, ConcurrentBuilder), the graph-based approach gives full control over the execution topology. Developers define custom Executor nodes (or wrap agents as AgentExecutor nodes), connect them with typed edges using WorkflowBuilder, add conditional routing for decision branches, configure fan-out/fan-in for parallel processing, and implement human-in-the-loop pauses for interactive approval or input collection. The system supports streaming observation of executor inputs/outputs and checkpoint-based state persistence for long-running workflows.
Usage
Execute this workflow when the high-level orchestration builders do not fit your topology requirements. Use it for workflows that require custom conditional branching (spam detection, classification routing), iterative feedback loops (quality review cycles), mixed agent and deterministic function nodes, parallel expert analysis with custom aggregation, or interactive human-in-the-loop steps. You need familiarity with directed acyclic graph (DAG) concepts and async Python programming.
Execution Steps
Step 1: Define executor nodes
Create the processing nodes for your workflow graph. Executors can be defined as classes extending the Executor base class with @handler decorated methods, or as standalone async functions using the @executor decorator. Each executor declares its input type, output type, and optional workflow output type through type annotations. Agent nodes are created by wrapping an Agent with AgentExecutor.
Key considerations:
- Class-based executors extend Executor and use @handler on async methods
- Function-based executors use the @executor decorator with type annotations
- Type annotations drive the framework's automatic type routing between nodes
- AgentExecutor wraps an Agent to participate in the graph as a node
- Each executor has a unique id for edge wiring
Step 2: Wire executors with edges using WorkflowBuilder
Use WorkflowBuilder to connect executors into a directed graph. Add edges between nodes with .add_edge(source, target) to define the data flow. For parallel branches, use .add_fan_out_edges(source, [targets]) to broadcast output to multiple nodes and .add_fan_in_edges([sources], target) to collect results from parallel branches into a single aggregator.
Key considerations:
- Edges define the data flow path through the graph
- Type compatibility between connected nodes is validated at build time
- Fan-out edges broadcast one output to multiple downstream nodes
- Fan-in edges collect outputs from multiple sources into a list for the aggregator
- The builder returns a Workflow object ready for execution
Step 3: Add conditional routing
For decision-based branching, add conditions to edges using the condition parameter. A condition function receives the source executor's output and returns True or False to determine whether the edge is traversed. This enables patterns like classifier-driven routing where a structured output determines which downstream branch executes.
Key considerations:
- Condition functions receive the source output and return a boolean
- Use Pydantic models as response_format for structured classification outputs
- Multiple conditional edges from the same source create a switch-case pattern
- Exactly one condition should evaluate to True per execution for deterministic routing
Step 4: Implement human-in-the-loop pauses
For interactive workflows, use ctx.request_info() within an executor handler to pause execution and surface a request to the application. The application presents the request to the user, collects their response, and resumes the workflow by passing the response through workflow.run(responses={request_id: user_response}). A @response_handler decorated method on the executor processes the response and continues execution.
Key considerations:
- ctx.request_info(request_data=payload, response_type=ResponseType) pauses the executor
- The workflow emits a "request_info" event with the request_id
- Resume with workflow.run(responses={request_id: answer}) to provide user input
- The @response_handler method receives the typed response and resumes processing
- Multiple HITL interactions can occur in a single workflow run
Step 5: Run the workflow with streaming observation
Execute the workflow with workflow.run(input, stream=True) to receive an async stream of WorkflowEvent objects. Events include executor invocations, executor completions, output updates, and request_info events. The streaming interface provides full observability into the workflow's execution progress without modifying executor code.
Key considerations:
- Events with type "output" contain AgentResponseUpdate or final workflow output
- Events with type "executor_invoked" and "executor_completed" provide observability
- Events with type "request_info" signal human-in-the-loop pause points
- Checkpoint storage can be configured for durable state persistence
- The workflow can be wrapped as an agent using .as_agent() for reuse