Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Microsoft Agent framework WorkflowBuilder Init

From Leeroopedia
Property Value
Implementation Name WorkflowBuilder Init
Framework Microsoft Agent Framework
Repository microsoft/agent-framework
Source File python/packages/core/agent_framework/_workflows/_workflow_builder.py
Line Range L41-689
Import from agent_framework import WorkflowBuilder
Type Builder class
Domains Workflow_Engine

Overview

The WorkflowBuilder is the builder class that constructs directed graphs of executors into validated, immutable Workflow instances in the Microsoft Agent Framework. It provides a fluent API for incrementally wiring executors with sequential edges, fan-out edges (one source to many targets), and fan-in edges (many sources to one target). The builder validates type compatibility between connected nodes at wiring time and produces a sealed Workflow object via build().

Code Reference

Source Location

Property Value
File python/packages/core/agent_framework/_workflows/_workflow_builder.py
Class WorkflowBuilder
Lines 41-689

Signature

class WorkflowBuilder:
    def __init__(
        self,
        max_iterations: int = DEFAULT_MAX_ITERATIONS,
        name: str | None = None,
        description: str | None = None,
        *,
        start_executor: Executor | SupportsAgentRun,
        checkpoint_storage: CheckpointStorage | None = None,
        output_executors: list[Executor | SupportsAgentRun] | None = None,
    ) -> None:
        ...

    def add_edge(
        self,
        source: Executor | SupportsAgentRun,
        target: Executor | SupportsAgentRun,
        condition: Callable | None = None,
    ) -> Self:
        ...

    def add_fan_out_edges(
        self,
        source: Executor | SupportsAgentRun,
        targets: Sequence[Executor | SupportsAgentRun],
    ) -> Self:
        ...

    def add_fan_in_edges(
        self,
        sources: Sequence[Executor | SupportsAgentRun],
        target: Executor | SupportsAgentRun,
    ) -> Self:
        ...

    def build(self) -> Workflow:
        ...

Import Statement

from agent_framework import WorkflowBuilder

Internal Wiring

The build() method assembles the following execution graph:

start_executor -> [edge wiring via add_edge / add_fan_out / add_fan_in] -> output_executors
Stage Component Description
Entry start_executor The designated entry point of the workflow graph. Receives the initial input when the workflow is run.
Sequential edges add_edge(source, target) Connects one executor to another with an optional condition for conditional branching.
Fan-out edges add_fan_out_edges(source, targets) Connects one source to multiple targets that execute concurrently.
Fan-in edges add_fan_in_edges(sources, target) Connects multiple sources to a single target that waits for all sources to complete.
Output output_executors The designated terminal executors whose results form the workflow output.
Compilation build() Validates the graph topology and type compatibility, then produces an immutable Workflow.

I/O Contract

Constructor Parameters

Parameter Type Default Description
max_iterations int DEFAULT_MAX_ITERATIONS Maximum number of iterations the workflow is allowed to execute. Acts as a safety bound to prevent infinite loops in cyclic graphs.
name None None Optional human-readable name for the workflow, used in tracing and logging.
description None None Optional description of the workflow's purpose and behavior.
start_executor SupportsAgentRun (required) The entry-point executor that receives the initial input when the workflow is executed. Must be an Executor or an object conforming to SupportsAgentRun.
checkpoint_storage None None Optional storage backend for persisting workflow checkpoints. Enables resumption of interrupted workflows.
output_executors SupportsAgentRun] | None None Optional list of terminal executors whose outputs form the final workflow result. If not specified, the framework infers output executors from the graph topology.

add_edge Parameters

Parameter Type Default Description
source SupportsAgentRun (required) The upstream executor whose output feeds into the target.
target SupportsAgentRun (required) The downstream executor that receives the source's output.
condition None None Optional callable that receives the source output and returns a boolean. The edge is only traversed when the condition returns True.

add_fan_out_edges Parameters

Parameter Type Default Description
source SupportsAgentRun (required) The single upstream executor whose output is broadcast to all targets.
targets SupportsAgentRun] (required) The downstream executors that each receive the source's output and execute concurrently.

add_fan_in_edges Parameters

Parameter Type Default Description
sources SupportsAgentRun] (required) The upstream executors whose outputs are collected and forwarded to the target.
target SupportsAgentRun (required) The single downstream executor that waits for all sources to complete before executing.

Output

Method Return Type Description
__init__() WorkflowBuilder A new builder instance ready for edge wiring.
add_edge() Self Returns self for fluent method chaining.
add_fan_out_edges() Self Returns self for fluent method chaining.
add_fan_in_edges() Self Returns self for fluent method chaining.
build() Workflow An immutable, validated workflow instance ready for execution via workflow.run().

Usage Examples

Linear Pipeline

from agent_framework import WorkflowBuilder

workflow = (
    WorkflowBuilder(start_executor=step1)
    .add_edge(step1, step2)
    .add_edge(step2, step3)
    .build()
)

result = await workflow.run("Process this input")

Fan-Out / Fan-In Diamond

from agent_framework import WorkflowBuilder

workflow = (
    WorkflowBuilder(start_executor=dispatcher)
    .add_fan_out_edges(dispatcher, [analyst_a, analyst_b, analyst_c])
    .add_fan_in_edges([analyst_a, analyst_b, analyst_c], aggregator)
    .build()
)

result = await workflow.run("Analyze quarterly results")

Conditional Branching

from agent_framework import WorkflowBuilder

workflow = (
    WorkflowBuilder(start_executor=classifier)
    .add_edge(classifier, positive_handler, condition=lambda out: out.label == "positive")
    .add_edge(classifier, negative_handler, condition=lambda out: out.label == "negative")
    .build()
)

result = await workflow.run("Classify this sentiment")

Named Workflow with Checkpoint Storage

from agent_framework import WorkflowBuilder

workflow = (
    WorkflowBuilder(
        start_executor=intake,
        name="document-processing-pipeline",
        description="Processes documents through extraction, classification, and archival.",
        checkpoint_storage=my_checkpoint_store,
        output_executors=[archiver],
    )
    .add_edge(intake, extractor)
    .add_fan_out_edges(extractor, [classifier, summarizer])
    .add_fan_in_edges([classifier, summarizer], archiver)
    .build()
)

result = await workflow.run("Process uploaded document")

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment