Skip to main content

MAF v1 — Concurrent Orchestration (Python + .NET)

Nitin Kumar Singh
Author
Nitin Kumar Singh
I build enterprise AI solutions and cloud-native systems. I write about architecture patterns, AI agents, Azure, and modern development practices — with full source code.
MAF v1 — Concurrent Orchestration (Python + .NET)
MAF v1: Python and .NET - This article is part of a series.
Part 13: This Article

Series note — Part of MAF v1: Python and .NET. Second orchestration pattern after Sequential. Next: Handoff.

Repo — Full runnable code for this chapter is at https://github.com/nitin27may/e-commerce-agents/tree/main/tutorials/13-concurrent-orchestration. Clone the repo, cd tutorials/13-concurrent-orchestration, and follow the per-language instructions below.

Why this chapter
#

Sequential orchestration (Chapter 12) is an assembly line: each agent adds its turn to a shared conversation, the next one reads the whole thing, output latency is the sum of every call. That’s the right shape when later agents need to see earlier work.

Concurrent orchestration is the opposite shape. Three independent reviewers give their verdicts on the same input; none of them needs to see the others’ drafts first. The only thing you wait for is whichever of them runs slowest — then an aggregator (a function that reduces the outputs of several parallel agents into a single result) combines their verdicts into one output.

This chapter builds that exact flow end-to-end: three agents — Researcher, Marketer, Legal — review one product idea in parallel, a custom aggregator synthesises their three one-sentence verdicts into a cross-functional summary, and we measure wall-clock timing to prove the LLM calls actually ran concurrently.

Jargon defined inline below: ConcurrentBuilder (Python builder), AgentWorkflowBuilder.BuildConcurrent (.NET builder), aggregator, fan-in, wall-clock timing, executor_completed event.

Prerequisites
#

  • Completed Chapter 12 — Sequential Orchestration.
  • Repo-root .env with working OpenAI or Azure OpenAI credentials (OPENAI_API_KEY or the AZURE_OPENAI_* set).
  • uv for Python; .NET 10 SDK for the .NET sample.

The concept
#

Wall-clock timing: sum vs max
#

Wall-clock timing is the elapsed real-world time between a request entering the workflow and the final output leaving it. Distinct from CPU time or cumulative LLM time — three 1.5-second LLM calls running in parallel still consume ~4.5 seconds of model compute in aggregate, but the caller only waits for the slowest one.

%%{init: {'theme':'base', 'themeVariables': { 'primaryColor': '#2563eb','primaryTextColor': '#ffffff','primaryBorderColor': '#1e40af', 'lineColor': '#64748b','secondaryColor': '#f59e0b','tertiaryColor': '#10b981', 'background': 'transparent'}}}%% flowchart LR classDef core fill:#2563eb,stroke:#1e40af,color:#ffffff classDef external fill:#f59e0b,stroke:#b45309,color:#000000 classDef success fill:#10b981,stroke:#047857,color:#ffffff classDef infra fill:#64748b,stroke:#334155,color:#ffffff subgraph SEQ["Sequential — wall-clock T1 + T2 + T3"] direction LR s0([Input]) s1[Researcher
T1 = 1.4s] s2[Marketer
T2 = 1.3s] s3[Legal
T3 = 1.2s] sOut([Total ~3.9s]) s0 --> s1 --> s2 --> s3 --> sOut end subgraph CON["Concurrent — wall-clock max(T1, T2, T3)"] direction LR c0([Input]) c1[Researcher
T1 = 1.4s] c2[Marketer
T2 = 1.3s] c3[Legal
T3 = 1.2s] cAgg[[Aggregator]] cOut([Total ~1.5s]) c0 --> c1 c0 --> c2 c0 --> c3 c1 --> cAgg c2 --> cAgg c3 --> cAgg cAgg --> cOut end class s1,s2,s3,c1,c2,c3 core class cAgg external class s0,c0 infra class sOut,cOut success

Same three agents, same three LLM round-trips. Sequential pays for the sum; concurrent pays for the max plus a trivial aggregator overhead.

The shape matters for product latency budgets. If each specialist takes 1.5 seconds and you have three of them, a sequential shape gives you a 4.5-second user-facing wait; a concurrent shape gives you 1.5 seconds. That’s not a micro-optimisation — it’s the difference between “snappy” and “users start retrying while the first request is still pending”.

Fan-out, fan-in, and the aggregator
#

Concurrent orchestration is structurally a fan-out (same input sent to multiple downstream executors) followed by a fan-in (a barrier that waits for every downstream to finish, then hands their combined outputs to one collector). The collector is the aggregator — one function that reduces N agent outputs into the workflow’s single terminal output.

%%{init: {'theme':'base', 'themeVariables': { 'primaryColor': '#2563eb','primaryTextColor': '#ffffff','primaryBorderColor': '#1e40af', 'lineColor': '#64748b','secondaryColor': '#f59e0b','tertiaryColor': '#10b981', 'background': 'transparent'}}}%% flowchart LR classDef core fill:#2563eb,stroke:#1e40af,color:#ffffff classDef external fill:#f59e0b,stroke:#b45309,color:#000000 classDef success fill:#10b981,stroke:#047857,color:#ffffff classDef infra fill:#64748b,stroke:#334155,color:#ffffff input([User idea
one string]) fanout{{Fan-out
same input to N agents}} researcher[Researcher] marketer[Marketer] legal[Legal] barrier{{Barrier
wait for all N}} agg[[Aggregator
N lists to 1 list]] output([Workflow output
synthesised summary]) input --> fanout fanout --> researcher fanout --> marketer fanout --> legal researcher --> barrier marketer --> barrier legal --> barrier barrier --> agg agg --> output class researcher,marketer,legal core class agg external class fanout,barrier infra class output success class input infra

The barrier is what makes fan-in safe. No agent can see another’s output; the aggregator only runs after every agent has finished or failed. Same Pregel superstep model from Chapter 09, wrapped by a convenience builder.

Jargon recap
#

  • ConcurrentBuilder (Python) — the fluent Python builder: ConcurrentBuilder(participants=[a, b, c]).with_aggregator(fn).build(). Lives in agent_framework.orchestrations.
  • AgentWorkflowBuilder.BuildConcurrent (.NET) — the equivalent static helper: AgentWorkflowBuilder.BuildConcurrent(agents, aggregator: fn). Lives in Microsoft.Agents.AI.Workflows.
  • Aggregator — a function the framework calls once every concurrent branch has produced its output. Python signature: async def(list[AgentExecutorResponse]) -> str | list[Message]. .NET signature: Func<IList<List<ChatMessage>>, List<ChatMessage>>. The return value surfaces as the workflow’s terminal output event.
  • Fan-in — the structural shape where N upstream executors feed one downstream collector. In MAF, the concurrent builder wires this for you; in Chapter 09 you built it by hand with add_fan_in_edge.
  • Wall-clock timing — elapsed real-world time, measured from outside the workflow. This chapter’s key assertion: wall-clock ≈ max of per-agent latency, not the sum.
  • executor_completed event — the Python event type emitted when an AgentExecutor inside the concurrent workflow has produced its response. Its data payload is a list[AgentExecutorResponse] containing one entry per agent that completed in this superstep. In .NET the equivalent is AgentResponseEvent (one per agent) plus the final WorkflowOutputEvent carrying the aggregator’s return value.

Code walkthrough
#

Source: dotnet/Program.cs. Runnable end-to-end — three real LLM calls, a custom aggregator, and wall-clock timing.

using Microsoft.Agents.AI;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;

ChatClient chatClient = BuildChatClient(); // OpenAI or Azure OpenAI

AIAgent researcher = chatClient.AsAIAgent(
    instructions: "You are a Market Researcher. In ONE sentence, assess market fit...",
    name: "researcher");
AIAgent marketer = chatClient.AsAIAgent(
    instructions: "You are a Marketer. In ONE sentence, propose a positioning angle...",
    name: "marketer");
AIAgent legal = chatClient.AsAIAgent(
    instructions: "You are a Legal advisor. In ONE sentence, flag ONE regulatory or IP concern...",
    name: "legal");

Workflow workflow = AgentWorkflowBuilder.BuildConcurrent(
    new[] { researcher, marketer, legal },
    aggregator: SynthesizeReview);

BuildConcurrent returns a plain Workflow object — the same type WorkflowBuilder.Build() returns in Chapter 09. The difference is the DAG inside: an input adapter, N parallel AgentExecutor branches, a barrier, and your aggregator, all prewired.

The aggregator signature
#

.NET keeps the aggregator synchronous (Func<IList<List<ChatMessage>>, List<ChatMessage>>) — the outer list has one entry per agent, in the same order you passed them to BuildConcurrent; the inner list is that agent’s emitted messages:

/// <summary>
/// Fan-in aggregator. Runs after every concurrent branch has completed.
/// Receives one List&lt;ChatMessage&gt; per agent; returns the workflow's
/// terminal output.
/// </summary>
private static List<ChatMessage> SynthesizeReview(IList<List<ChatMessage>> perAgentMessages)
{
    var builder = new StringBuilder();
    builder.AppendLine("Cross-functional review:");

    foreach (List<ChatMessage> agentOutput in perAgentMessages)
    {
        if (agentOutput.Count == 0) continue;

        // Last assistant message = that agent's verdict.
        ChatMessage final = agentOutput[^1];
        string label = final.AuthorName ?? "agent";
        builder.Append("- ").Append(label).Append(": ").AppendLine(final.Text.Trim());
    }

    return new List<ChatMessage>
    {
        new(ChatRole.Assistant, builder.ToString().TrimEnd())
        {
            AuthorName = "concurrent-aggregator",
        },
    };
}

Running it
#

var messages = new List<ChatMessage> { new(ChatRole.User, "ultrasonic pet collar") };

await using StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, messages);
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));

List<ChatMessage>? aggregated = null;
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    switch (evt)
    {
        case AgentResponseEvent response:
            // One per agent as it finishes. AgentResponseUpdateEvent fires
            // for streaming tokens; pick whichever suits your UI.
            Console.WriteLine($"[{response.ExecutorId}] {response.Response.Text.Trim()}");
            break;

        case WorkflowOutputEvent output when output.Data is List<ChatMessage> list:
            aggregated = list; // the aggregator's return value
            break;
    }
}

Four things worth highlighting:

  • TurnToken(emitEvents: true) is mandatory. The concurrent workflow waits for a TurnToken before firing the branches — that’s how MAF separates “workflow is wired and listening” from “now take a turn”. Forgetting this leaves the run hanging forever with no events.
  • Two event types for agent output. AgentResponseEvent fires once per agent when it finishes; AgentResponseUpdateEvent fires per streaming token. For the aggregator demo we use the former; Chapter 03’s streaming patterns apply directly if you want the latter.
  • WorkflowOutputEvent.Data is the aggregator’s return value. The default aggregator returns a flat List<ChatMessage> with one last-message-per-agent. Our SynthesizeReview returns a one-message list containing the synthesised summary.
  • ChatMessage is Microsoft.Extensions.AI.ChatMessage, not OpenAI.Chat.ChatMessage. The two types collide in using scope when you reference both Microsoft.Extensions.AI and OpenAI.Chat; the runnable Program.cs disambiguates with a using ChatMessage = Microsoft.Extensions.AI.ChatMessage; alias.

Running all together:

cd tutorials/13-concurrent-orchestration/dotnet
dotnet run -- "AI-powered meal planner"
# Idea: AI-powered meal planner
#
# [researcher] An AI-powered meal planner addresses the needs of busy individuals...
# [marketer]   Take the guesswork out of healthy eating with our AI-powered meal planner...
# [legal]      There may be an intellectual property concern if the AI-powered meal planner...
#
# ===== Aggregated summary =====
# Cross-functional review:
# - researcher: An AI-powered meal planner addresses the needs of busy individuals...
# - marketer: Take the guesswork out of healthy eating with our AI-powered meal planner...
# - legal: There may be an intellectual property concern if the AI-powered meal planner...
#
# Wall-clock: 1.48s (three LLM calls ran in parallel)

Source: python/main.py. Three agents, one builder call, one aggregator hook.

from agent_framework import Agent
from agent_framework.orchestrations import ConcurrentBuilder


def researcher(client):
    return Agent(client, instructions=(
        "You are a Market Researcher. In one sentence, assess the market fit "
        "of the product idea the user provides."
    ), name="researcher")


def marketer(client):
    return Agent(client, instructions=(
        "You are a Marketer. In one sentence, propose a positioning angle "
        "for the product idea the user provides."
    ), name="marketer")


def legal(client):
    return Agent(client, instructions=(
        "You are a Legal advisor. In one sentence, flag one regulatory or IP "
        "concern about the product idea the user provides."
    ), name="legal")


workflow = ConcurrentBuilder(
    participants=[researcher(client), marketer(client), legal(client)]
).build()

That’s the whole wiring step. ConcurrentBuilder accepts a list of participants (either Agent instances or Executors — anything that implements SupportsAgentRun), creates an input-adapter executor at the front, a barrier fan-in at the back, and routes each participant as its own parallel branch.

Reading the event stream
#

workflow.run(..., stream=True) yields events as each branch finishes. For Concurrent, the shape to watch for is executor_completed with a list[AgentExecutorResponse] payload:

import time

per_agent: dict[str, str] = {}
start = time.perf_counter()
async for event in workflow.run("ultrasonic pet collar", stream=True):
    if event.type != "executor_completed":
        continue
    payload = event.data
    if not isinstance(payload, list):
        continue
    for item in payload:
        if item.agent_response and item.executor_id in {"researcher", "marketer", "legal"}:
            per_agent[item.executor_id] = item.agent_response.text
elapsed = time.perf_counter() - start
print(f"Wall-clock: {elapsed:.2f}s")

Output (one run against real OpenAI gpt-4.1):

researcher: The ultrasonic pet collar has moderate market fit...
marketer:   Give your pet a voice and peace of mind with our...
legal:      FCC emission standards for ultrasonic devices may...

Wall-clock: 1.60s

Three LLM calls, roughly the time of one. That’s not a coincidence or noise — the workflow.run(...) coroutine schedules all three AgentExecutors into the same superstep, which the runtime dispatches with asyncio.gather under the hood.

A real aggregator, not a placeholder
#

By default, ConcurrentBuilder(...).build() collects every agent’s response into a list[Message] and makes that the workflow output. Fine for display, useless for downstream automation that needs a single string. Replace the default with a real aggregator:

from agent_framework import AgentExecutorResponse

async def synthesise_review(results: list[AgentExecutorResponse]) -> str:
    """Fan-in: reduce three per-agent responses into one cross-functional summary."""
    sections: list[str] = []
    for r in results:
        messages = getattr(r.agent_response, "messages", [])
        final = messages[-1].text if messages else "(no response)"
        sections.append(f"- {r.executor_id}: {final.strip()}")
    return "Cross-functional review:\n" + "\n".join(sections)


workflow = (
    ConcurrentBuilder(participants=[researcher(client), marketer(client), legal(client)])
    .with_aggregator(synthesise_review)
    .build()
)

async for event in workflow.run("ultrasonic pet collar", stream=True):
    if event.type == "output":
        print(event.data)

Output:

Cross-functional review:
- researcher: The ultrasonic pet collar has moderate market fit...
- marketer: Give your pet a voice and peace of mind with our...
- legal: FCC emission standards for ultrasonic devices may apply...

Two things worth staring at:

  • The aggregator is async. If you want an LLM-synthesised summary instead of string-concatenation, call another agent inside this function — the framework will await it. The MS docs custom-aggregator example does exactly that with a “summariser” agent.
  • The aggregator sees only final responses. r.agent_response.messages is the per-agent message list; r.full_conversation is the full history if you need the user turn too. No partial streaming tokens; the framework waits for each branch to complete before invoking the aggregator. That’s the fan-in barrier doing its job.

Side-by-side — Python vs .NET
#

AspectPython.NET
Builder entry pointConcurrentBuilder(participants=[...]).build()AgentWorkflowBuilder.BuildConcurrent(agents, aggregator)
Aggregator signatureasync def(list[AgentExecutorResponse]) -> str | list[Message]Func<IList<List<ChatMessage>>, List<ChatMessage>> (sync)
Aggregator wiring.with_aggregator(fn).build()Second argument to BuildConcurrent
Per-agent output eventexecutor_completed, data is list[AgentExecutorResponse]AgentResponseEvent, one per agent
Per-token streaming eventAgent’s own delta eventsAgentResponseUpdateEvent
Final output surfaceevent.type == "output", event.data is aggregator returnWorkflowOutputEvent.Data is aggregator return
Kicking off the runworkflow.run(input, stream=True)InProcessExecution.RunStreamingAsync + TrySendMessageAsync(new TurnToken(emitEvents: true))
Disambiguation gotchaChatMessage ambiguity with OpenAI.Chat.ChatMessage — use a using alias

Structurally identical. The biggest surface difference is that Python’s aggregator can be async (call another LLM inside), while .NET’s is a synchronous Func<...> — in practice, if you need an LLM inside the .NET aggregator you wrap the call in Task.Run and .GetAwaiter().GetResult(), or more cleanly you wire the aggregator as its own executor (skip the convenience builder, drop back to WorkflowBuilder from Ch09 with an explicit fan-in edge).

Gotchas
#

  • Parallelism is real. The three LLM calls fire concurrently. If your provider has concurrency limits (Azure OpenAI TPM/RPM quotas, for example), scale tokens and deployment accordingly. You can concurrency-starve yourself locally just by running this demo against a small Azure deployment.
  • Aggregator argument ordering is stable. Both Python and .NET pass per-agent outputs in the same order the agents were supplied to the builder. Don’t rely on them arriving in completion order — the framework buffers at the barrier and hands them to you in insertion order.
  • Agents don’t see each other’s output. Concurrent is isolated by design. If agent B genuinely needs agent A’s output, you want Sequential (Ch12) or a hand-rolled workflow with an explicit edge.
  • Default aggregator is a list, not a string. Callers that expect a single string have to iterate WorkflowOutputEvent.Data themselves or provide a custom aggregator. The aggregator shown in this chapter is the common case for a user-facing “summary of the panel”.
  • TurnToken forgotten = hang forever (.NET). AgentWorkflowBuilder.BuildConcurrent produces a workflow that waits for an explicit turn token before dispatching. Python’s ConcurrentBuilder dispatches as soon as workflow.run(...) is called — no equivalent step.
  • Aggregator exceptions kill the whole output. If your aggregator throws, the workflow emits WorkflowErrorEvent and no WorkflowOutputEvent. Agent-level failures are reported as executor_failed / ExecutorFailedEvent but don’t stop the other branches from completing — the aggregator just receives a shorter list.
  • Wall-clock measurement must wrap the whole stream. If you start the timer inside the async for/await foreach loop, you miss the workflow’s setup time; if you stop it on the first executor_completed you miss the aggregator. Start before workflow.run(...) / RunStreamingAsync(...), stop after the stream fully drains.
  • AuthorName on ChatMessage is not the agent’s name by default in every code path — inspect message.AuthorName but fall back to the executor id from the event you’re iterating. The .NET runnable in this chapter defends against that with a ?? "agent".

Tests
#

Python tests exercise the concurrency claim directly — one wiring test plus three real-LLM integration tests covering responses, parallel timing, and distinct perspectives:

# Python (4 tests)
source agents/.venv/bin/activate
python -m pytest tutorials/13-concurrent-orchestration/python/tests/ -v
# 4 passed

The parallel-timing test is the load-bearing one: asserts wall-clock < 6s on three agents whose serial baseline would comfortably exceed that. If something regressed the scheduler to dispatch branches serially, this test would flip red before users noticed a latency doubling.

.NET:

cd tutorials/13-concurrent-orchestration/dotnet
dotnet build            # type-check and compile
dotnet run -- "<idea>"  # end-to-end run with real LLM

The .NET build pins Microsoft.Agents.AI.Workflows 1.1.0, Microsoft.Agents.AI 1.1.0, Microsoft.Agents.AI.OpenAI 1.1.0, and Azure.AI.OpenAI 2.1.0. No source generator needed — the convenience builder handles executor registration internally, so no [MessageHandler] / partial-class machinery shows up in this chapter. You get that back in the capstone (Phase 7) when we drop below BuildConcurrent to wire a bespoke aggregator executor.

How this shows up in the capstone
#

agents/python/workflows/pre_purchase.py today uses asyncio.gather plus a hand-rolled state dataclass to fan out to three specialists — ProductDiscovery (reviews), PricingPromotions (price history), InventoryFulfillment (stock). Phase 7 plan plans/refactor/08-pre-purchase-concurrent.md replaces that by-hand parallelism with ConcurrentBuilder and a synthesising aggregator agent. The .NET parity port in agents/dotnet/src/ECommerceAgents.Shared/Workflows/PrePurchaseWorkflow.cs follows the same path with AgentWorkflowBuilder.BuildConcurrent. Both refactors delete more code than they add; the aggregator becomes the single explicit place where three sources of truth merge.

Further reading
#

MAF v1: Python and .NET - This article is part of a series.
Part 13: This Article

Related