Implementation:Microsoft Agent framework WorkflowBuilder Init
Appearance
| Property | Value |
|---|---|
| Implementation Name | WorkflowBuilder Init |
| Framework | Microsoft Agent Framework |
| Repository | microsoft/agent-framework |
| Source File | python/packages/core/agent_framework/_workflows/_workflow_builder.py
|
| Line Range | L41-689 |
| Import | from agent_framework import WorkflowBuilder
|
| Type | Builder class |
| Domains | Workflow_Engine |
Overview
The WorkflowBuilder is the builder class that constructs directed graphs of executors into validated, immutable Workflow instances in the Microsoft Agent Framework. It provides a fluent API for incrementally wiring executors with sequential edges, fan-out edges (one source to many targets), and fan-in edges (many sources to one target). The builder validates type compatibility between connected nodes at wiring time and produces a sealed Workflow object via build().
Code Reference
Source Location
| Property | Value |
|---|---|
| File | python/packages/core/agent_framework/_workflows/_workflow_builder.py
|
| Class | WorkflowBuilder
|
| Lines | 41-689 |
Signature
class WorkflowBuilder:
def __init__(
self,
max_iterations: int = DEFAULT_MAX_ITERATIONS,
name: str | None = None,
description: str | None = None,
*,
start_executor: Executor | SupportsAgentRun,
checkpoint_storage: CheckpointStorage | None = None,
output_executors: list[Executor | SupportsAgentRun] | None = None,
) -> None:
...
def add_edge(
self,
source: Executor | SupportsAgentRun,
target: Executor | SupportsAgentRun,
condition: Callable | None = None,
) -> Self:
...
def add_fan_out_edges(
self,
source: Executor | SupportsAgentRun,
targets: Sequence[Executor | SupportsAgentRun],
) -> Self:
...
def add_fan_in_edges(
self,
sources: Sequence[Executor | SupportsAgentRun],
target: Executor | SupportsAgentRun,
) -> Self:
...
def build(self) -> Workflow:
...
Import Statement
from agent_framework import WorkflowBuilder
Internal Wiring
The build() method assembles the following execution graph:
start_executor -> [edge wiring via add_edge / add_fan_out / add_fan_in] -> output_executors
| Stage | Component | Description |
|---|---|---|
| Entry | start_executor |
The designated entry point of the workflow graph. Receives the initial input when the workflow is run. |
| Sequential edges | add_edge(source, target) |
Connects one executor to another with an optional condition for conditional branching. |
| Fan-out edges | add_fan_out_edges(source, targets) |
Connects one source to multiple targets that execute concurrently. |
| Fan-in edges | add_fan_in_edges(sources, target) |
Connects multiple sources to a single target that waits for all sources to complete. |
| Output | output_executors |
The designated terminal executors whose results form the workflow output. |
| Compilation | build() |
Validates the graph topology and type compatibility, then produces an immutable Workflow.
|
I/O Contract
Constructor Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
max_iterations |
int |
DEFAULT_MAX_ITERATIONS |
Maximum number of iterations the workflow is allowed to execute. Acts as a safety bound to prevent infinite loops in cyclic graphs. |
name |
None | None |
Optional human-readable name for the workflow, used in tracing and logging. |
description |
None | None |
Optional description of the workflow's purpose and behavior. |
start_executor |
SupportsAgentRun | (required) | The entry-point executor that receives the initial input when the workflow is executed. Must be an Executor or an object conforming to SupportsAgentRun.
|
checkpoint_storage |
None | None |
Optional storage backend for persisting workflow checkpoints. Enables resumption of interrupted workflows. |
output_executors |
SupportsAgentRun] | None | None |
Optional list of terminal executors whose outputs form the final workflow result. If not specified, the framework infers output executors from the graph topology. |
add_edge Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
source |
SupportsAgentRun | (required) | The upstream executor whose output feeds into the target. |
target |
SupportsAgentRun | (required) | The downstream executor that receives the source's output. |
condition |
None | None |
Optional callable that receives the source output and returns a boolean. The edge is only traversed when the condition returns True.
|
add_fan_out_edges Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
source |
SupportsAgentRun | (required) | The single upstream executor whose output is broadcast to all targets. |
targets |
SupportsAgentRun] | (required) | The downstream executors that each receive the source's output and execute concurrently. |
add_fan_in_edges Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
sources |
SupportsAgentRun] | (required) | The upstream executors whose outputs are collected and forwarded to the target. |
target |
SupportsAgentRun | (required) | The single downstream executor that waits for all sources to complete before executing. |
Output
| Method | Return Type | Description |
|---|---|---|
__init__() |
WorkflowBuilder |
A new builder instance ready for edge wiring. |
add_edge() |
Self |
Returns self for fluent method chaining.
|
add_fan_out_edges() |
Self |
Returns self for fluent method chaining.
|
add_fan_in_edges() |
Self |
Returns self for fluent method chaining.
|
build() |
Workflow |
An immutable, validated workflow instance ready for execution via workflow.run().
|
Usage Examples
Linear Pipeline
from agent_framework import WorkflowBuilder
workflow = (
WorkflowBuilder(start_executor=step1)
.add_edge(step1, step2)
.add_edge(step2, step3)
.build()
)
result = await workflow.run("Process this input")
Fan-Out / Fan-In Diamond
from agent_framework import WorkflowBuilder
workflow = (
WorkflowBuilder(start_executor=dispatcher)
.add_fan_out_edges(dispatcher, [analyst_a, analyst_b, analyst_c])
.add_fan_in_edges([analyst_a, analyst_b, analyst_c], aggregator)
.build()
)
result = await workflow.run("Analyze quarterly results")
Conditional Branching
from agent_framework import WorkflowBuilder
workflow = (
WorkflowBuilder(start_executor=classifier)
.add_edge(classifier, positive_handler, condition=lambda out: out.label == "positive")
.add_edge(classifier, negative_handler, condition=lambda out: out.label == "negative")
.build()
)
result = await workflow.run("Classify this sentiment")
Named Workflow with Checkpoint Storage
from agent_framework import WorkflowBuilder
workflow = (
WorkflowBuilder(
start_executor=intake,
name="document-processing-pipeline",
description="Processes documents through extraction, classification, and archival.",
checkpoint_storage=my_checkpoint_store,
output_executors=[archiver],
)
.add_edge(intake, extractor)
.add_fan_out_edges(extractor, [classifier, summarizer])
.add_fan_in_edges([classifier, summarizer], archiver)
.build()
)
result = await workflow.run("Process uploaded document")
Related Pages
Page Connections
Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment