| Property |
Value
|
| Implementation Name |
ConcurrentBuilder Init
|
| Framework |
Microsoft Agent Framework
|
| Repository |
microsoft/agent-framework
|
| Source File |
python/packages/orchestrations/agent_framework_orchestrations/_concurrent.py
|
| Line Range |
L183-421
|
| Import |
from agent_framework.orchestrations import ConcurrentBuilder
|
| Type |
Builder class
|
Overview
The ConcurrentBuilder is the builder class that constructs concurrent multi-agent workflows in the Microsoft Agent Framework. It implements the fan-out/fan-in orchestration pattern: all registered participants receive the same input prompt simultaneously, execute in parallel, and their results are aggregated through a configurable aggregator. The builder exposes a fluent API for configuring participants, optional checkpoint storage, intermediate output capture, and the aggregation strategy before producing a finalized Workflow instance.
Code Reference
Source Location
| Property |
Value
|
| File |
python/packages/orchestrations/agent_framework_orchestrations/_concurrent.py
|
| Class |
ConcurrentBuilder
|
| Lines |
183-421
|
Signature
class ConcurrentBuilder:
def __init__(
self,
*,
participants: Sequence[SupportsAgentRun | Executor],
checkpoint_storage: CheckpointStorage | None = None,
intermediate_outputs: bool = False,
) -> None:
...
def with_aggregator(
self,
aggregator: Executor
| Callable[[list[AgentExecutorResponse]], Any]
| Callable[[list[AgentExecutorResponse], WorkflowContext], Any],
) -> "ConcurrentBuilder":
...
def build(self) -> Workflow:
...
Import Statement
from agent_framework.orchestrations import ConcurrentBuilder
Internal Wiring
The build() method assembles the following execution pipeline:
input -> Dispatcher -> [participant1, ..., participantN] -> Aggregator -> output
| Stage |
Component |
Description
|
| Input |
Prompt string or message |
The user-provided input that initiates the workflow
|
| Dispatcher |
Internal fan-out node |
Broadcasts the input to all registered participants simultaneously
|
| Participants |
Executor (parallel) |
Each participant processes the input independently and concurrently
|
| Aggregator |
Callable |
Combines all participant responses into a single output
|
| Output |
Aggregated result |
The final merged result from the aggregation step
|
I/O Contract
Constructor Parameters
| Parameter |
Type |
Default |
Description
|
participants |
Executor] |
(required) |
The agents or executors that will process the input in parallel. Each participant receives the full input prompt and runs independently.
|
checkpoint_storage |
None |
None |
Optional storage backend for persisting workflow checkpoints. Enables resumption of interrupted workflows.
|
intermediate_outputs |
bool |
False |
When True, individual participant responses are included in the workflow output alongside the aggregated result.
|
with_aggregator Parameters
| Parameter |
Type |
Description
|
aggregator |
Callable[[list[AgentExecutorResponse]], Any] | Callable[[list[AgentExecutorResponse], WorkflowContext], Any] |
The aggregation strategy. An Executor can be an LLM-based agent that synthesizes results. A simple callable receives the list of responses. A context-aware callable also receives the WorkflowContext.
|
Output
| Method |
Return Type |
Description
|
with_aggregator() |
ConcurrentBuilder |
Returns self for fluent chaining.
|
build() |
Workflow |
A compiled workflow instance ready for execution via workflow.run().
|
Usage Examples
Minimal Concurrent Workflow
from agent_framework.orchestrations import ConcurrentBuilder
# Three specialized agents analyze the same prompt in parallel
workflow = ConcurrentBuilder(participants=[researcher, marketer, legal]).build()
result = await workflow.run("Launch plan for new product")
Concurrent Workflow with LLM Aggregator
from agent_framework.orchestrations import ConcurrentBuilder
# Use an LLM-based aggregator to synthesize perspectives
workflow = (
ConcurrentBuilder(participants=[researcher, marketer, legal])
.with_aggregator(synthesis_agent)
.build()
)
result = await workflow.run("Launch plan for new product")
Concurrent Workflow with Custom Aggregation Function
from agent_framework.orchestrations import ConcurrentBuilder
def merge_responses(responses: list) -> dict:
"""Combine all participant outputs into a structured report."""
return {
"perspectives": [r.output for r in responses],
"participant_count": len(responses),
}
workflow = (
ConcurrentBuilder(
participants=[researcher, marketer, legal],
intermediate_outputs=True,
)
.with_aggregator(merge_responses)
.build()
)
result = await workflow.run("Launch plan for new product")
Concurrent Workflow with Checkpoint Storage
from agent_framework.orchestrations import ConcurrentBuilder
workflow = (
ConcurrentBuilder(
participants=[researcher, marketer, legal],
checkpoint_storage=my_checkpoint_store,
)
.with_aggregator(synthesis_agent)
.build()
)
# Workflow can be resumed if interrupted
result = await workflow.run("Launch plan for new product")
Related Pages