Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Microsoft Agent framework ConcurrentBuilder Init

From Leeroopedia
Revision as of 11:31, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/Microsoft_Agent_framework_ConcurrentBuilder_Init.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)
Property Value
Implementation Name ConcurrentBuilder Init
Framework Microsoft Agent Framework
Repository microsoft/agent-framework
Source File python/packages/orchestrations/agent_framework_orchestrations/_concurrent.py
Line Range L183-421
Import from agent_framework.orchestrations import ConcurrentBuilder
Type Builder class

Overview

The ConcurrentBuilder is the builder class that constructs concurrent multi-agent workflows in the Microsoft Agent Framework. It implements the fan-out/fan-in orchestration pattern: all registered participants receive the same input prompt simultaneously, execute in parallel, and their results are aggregated through a configurable aggregator. The builder exposes a fluent API for configuring participants, optional checkpoint storage, intermediate output capture, and the aggregation strategy before producing a finalized Workflow instance.

Code Reference

Source Location

Property Value
File python/packages/orchestrations/agent_framework_orchestrations/_concurrent.py
Class ConcurrentBuilder
Lines 183-421

Signature

class ConcurrentBuilder:
    def __init__(
        self,
        *,
        participants: Sequence[SupportsAgentRun | Executor],
        checkpoint_storage: CheckpointStorage | None = None,
        intermediate_outputs: bool = False,
    ) -> None:
        ...

    def with_aggregator(
        self,
        aggregator: Executor
        | Callable[[list[AgentExecutorResponse]], Any]
        | Callable[[list[AgentExecutorResponse], WorkflowContext], Any],
    ) -> "ConcurrentBuilder":
        ...

    def build(self) -> Workflow:
        ...

Import Statement

from agent_framework.orchestrations import ConcurrentBuilder

Internal Wiring

The build() method assembles the following execution pipeline:

input -> Dispatcher -> [participant1, ..., participantN] -> Aggregator -> output
Stage Component Description
Input Prompt string or message The user-provided input that initiates the workflow
Dispatcher Internal fan-out node Broadcasts the input to all registered participants simultaneously
Participants Executor (parallel) Each participant processes the input independently and concurrently
Aggregator Callable Combines all participant responses into a single output
Output Aggregated result The final merged result from the aggregation step

I/O Contract

Constructor Parameters

Parameter Type Default Description
participants Executor] (required) The agents or executors that will process the input in parallel. Each participant receives the full input prompt and runs independently.
checkpoint_storage None None Optional storage backend for persisting workflow checkpoints. Enables resumption of interrupted workflows.
intermediate_outputs bool False When True, individual participant responses are included in the workflow output alongside the aggregated result.

with_aggregator Parameters

Parameter Type Description
aggregator Callable[[list[AgentExecutorResponse]], Any] | Callable[[list[AgentExecutorResponse], WorkflowContext], Any] The aggregation strategy. An Executor can be an LLM-based agent that synthesizes results. A simple callable receives the list of responses. A context-aware callable also receives the WorkflowContext.

Output

Method Return Type Description
with_aggregator() ConcurrentBuilder Returns self for fluent chaining.
build() Workflow A compiled workflow instance ready for execution via workflow.run().

Usage Examples

Minimal Concurrent Workflow

from agent_framework.orchestrations import ConcurrentBuilder

# Three specialized agents analyze the same prompt in parallel
workflow = ConcurrentBuilder(participants=[researcher, marketer, legal]).build()
result = await workflow.run("Launch plan for new product")

Concurrent Workflow with LLM Aggregator

from agent_framework.orchestrations import ConcurrentBuilder

# Use an LLM-based aggregator to synthesize perspectives
workflow = (
    ConcurrentBuilder(participants=[researcher, marketer, legal])
    .with_aggregator(synthesis_agent)
    .build()
)

result = await workflow.run("Launch plan for new product")

Concurrent Workflow with Custom Aggregation Function

from agent_framework.orchestrations import ConcurrentBuilder

def merge_responses(responses: list) -> dict:
    """Combine all participant outputs into a structured report."""
    return {
        "perspectives": [r.output for r in responses],
        "participant_count": len(responses),
    }

workflow = (
    ConcurrentBuilder(
        participants=[researcher, marketer, legal],
        intermediate_outputs=True,
    )
    .with_aggregator(merge_responses)
    .build()
)

result = await workflow.run("Launch plan for new product")

Concurrent Workflow with Checkpoint Storage

from agent_framework.orchestrations import ConcurrentBuilder

workflow = (
    ConcurrentBuilder(
        participants=[researcher, marketer, legal],
        checkpoint_storage=my_checkpoint_store,
    )
    .with_aggregator(synthesis_agent)
    .build()
)

# Workflow can be resumed if interrupted
result = await workflow.run("Launch plan for new product")

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment