Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Workflow:Microsoft Agent framework Graph Based Workflow Execution

From Leeroopedia
Revision as of 11:01, 16 February 2026 by Admin (talk | contribs) (Auto-imported from workflows/Microsoft_Agent_framework_Graph_Based_Workflow_Execution.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)
Knowledge Sources
Domains AI_Agents, Workflow_Orchestration, LLM_Ops, Python
Last Updated 2026-02-11 17:00 GMT

Overview

End-to-end process for building custom graph-based workflows using executors, typed edges, conditional routing, parallel fan-out/fan-in, human-in-the-loop pauses, and streaming event observation.

Description

This workflow covers the low-level graph-based workflow system at the core of the Microsoft Agent Framework. Unlike the high-level builder APIs (SequentialBuilder, ConcurrentBuilder), the graph-based approach gives full control over the execution topology. Developers define custom Executor nodes (or wrap agents as AgentExecutor nodes), connect them with typed edges using WorkflowBuilder, add conditional routing for decision branches, configure fan-out/fan-in for parallel processing, and implement human-in-the-loop pauses for interactive approval or input collection. The system supports streaming observation of executor inputs/outputs and checkpoint-based state persistence for long-running workflows.

Usage

Execute this workflow when the high-level orchestration builders do not fit your topology requirements. Use it for workflows that require custom conditional branching (spam detection, classification routing), iterative feedback loops (quality review cycles), mixed agent and deterministic function nodes, parallel expert analysis with custom aggregation, or interactive human-in-the-loop steps. You need familiarity with directed acyclic graph (DAG) concepts and async Python programming.

Execution Steps

Step 1: Define executor nodes

Create the processing nodes for your workflow graph. Executors can be defined as classes extending the Executor base class with @handler decorated methods, or as standalone async functions using the @executor decorator. Each executor declares its input type, output type, and optional workflow output type through type annotations. Agent nodes are created by wrapping an Agent with AgentExecutor.

Key considerations:

  • Class-based executors extend Executor and use @handler on async methods
  • Function-based executors use the @executor decorator with type annotations
  • Type annotations drive the framework's automatic type routing between nodes
  • AgentExecutor wraps an Agent to participate in the graph as a node
  • Each executor has a unique id for edge wiring

Step 2: Wire executors with edges using WorkflowBuilder

Use WorkflowBuilder to connect executors into a directed graph. Add edges between nodes with .add_edge(source, target) to define the data flow. For parallel branches, use .add_fan_out_edges(source, [targets]) to broadcast output to multiple nodes and .add_fan_in_edges([sources], target) to collect results from parallel branches into a single aggregator.

Key considerations:

  • Edges define the data flow path through the graph
  • Type compatibility between connected nodes is validated at build time
  • Fan-out edges broadcast one output to multiple downstream nodes
  • Fan-in edges collect outputs from multiple sources into a list for the aggregator
  • The builder returns a Workflow object ready for execution

Step 3: Add conditional routing

For decision-based branching, add conditions to edges using the condition parameter. A condition function receives the source executor's output and returns True or False to determine whether the edge is traversed. This enables patterns like classifier-driven routing where a structured output determines which downstream branch executes.

Key considerations:

  • Condition functions receive the source output and return a boolean
  • Use Pydantic models as response_format for structured classification outputs
  • Multiple conditional edges from the same source create a switch-case pattern
  • Exactly one condition should evaluate to True per execution for deterministic routing

Step 4: Implement human-in-the-loop pauses

For interactive workflows, use ctx.request_info() within an executor handler to pause execution and surface a request to the application. The application presents the request to the user, collects their response, and resumes the workflow by passing the response through workflow.run(responses={request_id: user_response}). A @response_handler decorated method on the executor processes the response and continues execution.

Key considerations:

  • ctx.request_info(request_data=payload, response_type=ResponseType) pauses the executor
  • The workflow emits a "request_info" event with the request_id
  • Resume with workflow.run(responses={request_id: answer}) to provide user input
  • The @response_handler method receives the typed response and resumes processing
  • Multiple HITL interactions can occur in a single workflow run

Step 5: Run the workflow with streaming observation

Execute the workflow with workflow.run(input, stream=True) to receive an async stream of WorkflowEvent objects. Events include executor invocations, executor completions, output updates, and request_info events. The streaming interface provides full observability into the workflow's execution progress without modifying executor code.

Key considerations:

  • Events with type "output" contain AgentResponseUpdate or final workflow output
  • Events with type "executor_invoked" and "executor_completed" provide observability
  • Events with type "request_info" signal human-in-the-loop pause points
  • Checkpoint storage can be configured for durable state persistence
  • The workflow can be wrapped as an agent using .as_agent() for reuse

Execution Diagram

GitHub URL

Workflow Repository