Series note — Part of MAF v1: Python and .NET. Second orchestration pattern after Sequential. Next: Handoff.
Repo — Full runnable code for this chapter is at https://github.com/nitin27may/e-commerce-agents/tree/main/tutorials/13-concurrent-orchestration. Clone the repo,
cd tutorials/13-concurrent-orchestration, and follow the per-language instructions below.
Why this chapter#
Sequential orchestration (Chapter 12) is an assembly line: each agent adds its turn to a shared conversation, the next one reads the whole thing, output latency is the sum of every call. That’s the right shape when later agents need to see earlier work.
Concurrent orchestration is the opposite shape. Three independent reviewers give their verdicts on the same input; none of them needs to see the others’ drafts first. The only thing you wait for is whichever of them runs slowest — then an aggregator (a function that reduces the outputs of several parallel agents into a single result) combines their verdicts into one output.
This chapter builds that exact flow end-to-end: three agents — Researcher, Marketer, Legal — review one product idea in parallel, a custom aggregator synthesises their three one-sentence verdicts into a cross-functional summary, and we measure wall-clock timing to prove the LLM calls actually ran concurrently.
Jargon defined inline below: ConcurrentBuilder (Python builder), AgentWorkflowBuilder.BuildConcurrent (.NET builder), aggregator, fan-in, wall-clock timing, executor_completed event.
Prerequisites#
- Completed Chapter 12 — Sequential Orchestration.
- Repo-root
.envwith working OpenAI or Azure OpenAI credentials (OPENAI_API_KEYor theAZURE_OPENAI_*set). uvfor Python; .NET 10 SDK for the .NET sample.
The concept#
Wall-clock timing: sum vs max#
Wall-clock timing is the elapsed real-world time between a request entering the workflow and the final output leaving it. Distinct from CPU time or cumulative LLM time — three 1.5-second LLM calls running in parallel still consume ~4.5 seconds of model compute in aggregate, but the caller only waits for the slowest one.
T1 = 1.4s] s2[Marketer
T2 = 1.3s] s3[Legal
T3 = 1.2s] sOut([Total ~3.9s]) s0 --> s1 --> s2 --> s3 --> sOut end subgraph CON["Concurrent — wall-clock max(T1, T2, T3)"] direction LR c0([Input]) c1[Researcher
T1 = 1.4s] c2[Marketer
T2 = 1.3s] c3[Legal
T3 = 1.2s] cAgg[[Aggregator]] cOut([Total ~1.5s]) c0 --> c1 c0 --> c2 c0 --> c3 c1 --> cAgg c2 --> cAgg c3 --> cAgg cAgg --> cOut end class s1,s2,s3,c1,c2,c3 core class cAgg external class s0,c0 infra class sOut,cOut success
Same three agents, same three LLM round-trips. Sequential pays for the sum; concurrent pays for the max plus a trivial aggregator overhead.
The shape matters for product latency budgets. If each specialist takes 1.5 seconds and you have three of them, a sequential shape gives you a 4.5-second user-facing wait; a concurrent shape gives you 1.5 seconds. That’s not a micro-optimisation — it’s the difference between “snappy” and “users start retrying while the first request is still pending”.
Fan-out, fan-in, and the aggregator#
Concurrent orchestration is structurally a fan-out (same input sent to multiple downstream executors) followed by a fan-in (a barrier that waits for every downstream to finish, then hands their combined outputs to one collector). The collector is the aggregator — one function that reduces N agent outputs into the workflow’s single terminal output.
one string]) fanout{{Fan-out
same input to N agents}} researcher[Researcher] marketer[Marketer] legal[Legal] barrier{{Barrier
wait for all N}} agg[[Aggregator
N lists to 1 list]] output([Workflow output
synthesised summary]) input --> fanout fanout --> researcher fanout --> marketer fanout --> legal researcher --> barrier marketer --> barrier legal --> barrier barrier --> agg agg --> output class researcher,marketer,legal core class agg external class fanout,barrier infra class output success class input infra
The barrier is what makes fan-in safe. No agent can see another’s output; the aggregator only runs after every agent has finished or failed. Same Pregel superstep model from Chapter 09, wrapped by a convenience builder.
Jargon recap#
ConcurrentBuilder(Python) — the fluent Python builder:ConcurrentBuilder(participants=[a, b, c]).with_aggregator(fn).build(). Lives inagent_framework.orchestrations.AgentWorkflowBuilder.BuildConcurrent(.NET) — the equivalent static helper:AgentWorkflowBuilder.BuildConcurrent(agents, aggregator: fn). Lives inMicrosoft.Agents.AI.Workflows.- Aggregator — a function the framework calls once every concurrent branch has produced its output. Python signature:
async def(list[AgentExecutorResponse]) -> str | list[Message]. .NET signature:Func<IList<List<ChatMessage>>, List<ChatMessage>>. The return value surfaces as the workflow’s terminal output event. - Fan-in — the structural shape where N upstream executors feed one downstream collector. In MAF, the concurrent builder wires this for you; in Chapter 09 you built it by hand with
add_fan_in_edge. - Wall-clock timing — elapsed real-world time, measured from outside the workflow. This chapter’s key assertion: wall-clock ≈ max of per-agent latency, not the sum.
executor_completedevent — the Python event type emitted when anAgentExecutorinside the concurrent workflow has produced its response. Itsdatapayload is alist[AgentExecutorResponse]containing one entry per agent that completed in this superstep. In .NET the equivalent isAgentResponseEvent(one per agent) plus the finalWorkflowOutputEventcarrying the aggregator’s return value.
Code walkthrough#
Source: dotnet/Program.cs. Runnable end-to-end — three real LLM calls, a custom aggregator, and wall-clock timing.
using Microsoft.Agents.AI;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;
ChatClient chatClient = BuildChatClient(); // OpenAI or Azure OpenAI
AIAgent researcher = chatClient.AsAIAgent(
instructions: "You are a Market Researcher. In ONE sentence, assess market fit...",
name: "researcher");
AIAgent marketer = chatClient.AsAIAgent(
instructions: "You are a Marketer. In ONE sentence, propose a positioning angle...",
name: "marketer");
AIAgent legal = chatClient.AsAIAgent(
instructions: "You are a Legal advisor. In ONE sentence, flag ONE regulatory or IP concern...",
name: "legal");
Workflow workflow = AgentWorkflowBuilder.BuildConcurrent(
new[] { researcher, marketer, legal },
aggregator: SynthesizeReview);BuildConcurrent returns a plain Workflow object — the same type WorkflowBuilder.Build() returns in Chapter 09. The difference is the DAG inside: an input adapter, N parallel AgentExecutor branches, a barrier, and your aggregator, all prewired.
The aggregator signature#
.NET keeps the aggregator synchronous (Func<IList<List<ChatMessage>>, List<ChatMessage>>) — the outer list has one entry per agent, in the same order you passed them to BuildConcurrent; the inner list is that agent’s emitted messages:
/// <summary>
/// Fan-in aggregator. Runs after every concurrent branch has completed.
/// Receives one List<ChatMessage> per agent; returns the workflow's
/// terminal output.
/// </summary>
private static List<ChatMessage> SynthesizeReview(IList<List<ChatMessage>> perAgentMessages)
{
var builder = new StringBuilder();
builder.AppendLine("Cross-functional review:");
foreach (List<ChatMessage> agentOutput in perAgentMessages)
{
if (agentOutput.Count == 0) continue;
// Last assistant message = that agent's verdict.
ChatMessage final = agentOutput[^1];
string label = final.AuthorName ?? "agent";
builder.Append("- ").Append(label).Append(": ").AppendLine(final.Text.Trim());
}
return new List<ChatMessage>
{
new(ChatRole.Assistant, builder.ToString().TrimEnd())
{
AuthorName = "concurrent-aggregator",
},
};
}Running it#
var messages = new List<ChatMessage> { new(ChatRole.User, "ultrasonic pet collar") };
await using StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, messages);
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
List<ChatMessage>? aggregated = null;
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
switch (evt)
{
case AgentResponseEvent response:
// One per agent as it finishes. AgentResponseUpdateEvent fires
// for streaming tokens; pick whichever suits your UI.
Console.WriteLine($"[{response.ExecutorId}] {response.Response.Text.Trim()}");
break;
case WorkflowOutputEvent output when output.Data is List<ChatMessage> list:
aggregated = list; // the aggregator's return value
break;
}
}Four things worth highlighting:
TurnToken(emitEvents: true)is mandatory. The concurrent workflow waits for aTurnTokenbefore firing the branches — that’s how MAF separates “workflow is wired and listening” from “now take a turn”. Forgetting this leaves the run hanging forever with no events.- Two event types for agent output.
AgentResponseEventfires once per agent when it finishes;AgentResponseUpdateEventfires per streaming token. For the aggregator demo we use the former; Chapter 03’s streaming patterns apply directly if you want the latter. WorkflowOutputEvent.Datais the aggregator’s return value. The default aggregator returns a flatList<ChatMessage>with one last-message-per-agent. OurSynthesizeReviewreturns a one-message list containing the synthesised summary.ChatMessageisMicrosoft.Extensions.AI.ChatMessage, notOpenAI.Chat.ChatMessage. The two types collide inusingscope when you reference bothMicrosoft.Extensions.AIandOpenAI.Chat; the runnableProgram.csdisambiguates with ausing ChatMessage = Microsoft.Extensions.AI.ChatMessage;alias.
Running all together:
cd tutorials/13-concurrent-orchestration/dotnet
dotnet run -- "AI-powered meal planner"
# Idea: AI-powered meal planner
#
# [researcher] An AI-powered meal planner addresses the needs of busy individuals...
# [marketer] Take the guesswork out of healthy eating with our AI-powered meal planner...
# [legal] There may be an intellectual property concern if the AI-powered meal planner...
#
# ===== Aggregated summary =====
# Cross-functional review:
# - researcher: An AI-powered meal planner addresses the needs of busy individuals...
# - marketer: Take the guesswork out of healthy eating with our AI-powered meal planner...
# - legal: There may be an intellectual property concern if the AI-powered meal planner...
#
# Wall-clock: 1.48s (three LLM calls ran in parallel)Source: python/main.py. Three agents, one builder call, one aggregator hook.
from agent_framework import Agent
from agent_framework.orchestrations import ConcurrentBuilder
def researcher(client):
return Agent(client, instructions=(
"You are a Market Researcher. In one sentence, assess the market fit "
"of the product idea the user provides."
), name="researcher")
def marketer(client):
return Agent(client, instructions=(
"You are a Marketer. In one sentence, propose a positioning angle "
"for the product idea the user provides."
), name="marketer")
def legal(client):
return Agent(client, instructions=(
"You are a Legal advisor. In one sentence, flag one regulatory or IP "
"concern about the product idea the user provides."
), name="legal")
workflow = ConcurrentBuilder(
participants=[researcher(client), marketer(client), legal(client)]
).build()That’s the whole wiring step. ConcurrentBuilder accepts a list of participants (either Agent instances or Executors — anything that implements SupportsAgentRun), creates an input-adapter executor at the front, a barrier fan-in at the back, and routes each participant as its own parallel branch.
Reading the event stream#
workflow.run(..., stream=True) yields events as each branch finishes. For Concurrent, the shape to watch for is executor_completed with a list[AgentExecutorResponse] payload:
import time
per_agent: dict[str, str] = {}
start = time.perf_counter()
async for event in workflow.run("ultrasonic pet collar", stream=True):
if event.type != "executor_completed":
continue
payload = event.data
if not isinstance(payload, list):
continue
for item in payload:
if item.agent_response and item.executor_id in {"researcher", "marketer", "legal"}:
per_agent[item.executor_id] = item.agent_response.text
elapsed = time.perf_counter() - start
print(f"Wall-clock: {elapsed:.2f}s")Output (one run against real OpenAI gpt-4.1):
researcher: The ultrasonic pet collar has moderate market fit...
marketer: Give your pet a voice and peace of mind with our...
legal: FCC emission standards for ultrasonic devices may...
Wall-clock: 1.60sThree LLM calls, roughly the time of one. That’s not a coincidence or noise — the workflow.run(...) coroutine schedules all three AgentExecutors into the same superstep, which the runtime dispatches with asyncio.gather under the hood.
A real aggregator, not a placeholder#
By default, ConcurrentBuilder(...).build() collects every agent’s response into a list[Message] and makes that the workflow output. Fine for display, useless for downstream automation that needs a single string. Replace the default with a real aggregator:
from agent_framework import AgentExecutorResponse
async def synthesise_review(results: list[AgentExecutorResponse]) -> str:
"""Fan-in: reduce three per-agent responses into one cross-functional summary."""
sections: list[str] = []
for r in results:
messages = getattr(r.agent_response, "messages", [])
final = messages[-1].text if messages else "(no response)"
sections.append(f"- {r.executor_id}: {final.strip()}")
return "Cross-functional review:\n" + "\n".join(sections)
workflow = (
ConcurrentBuilder(participants=[researcher(client), marketer(client), legal(client)])
.with_aggregator(synthesise_review)
.build()
)
async for event in workflow.run("ultrasonic pet collar", stream=True):
if event.type == "output":
print(event.data)Output:
Cross-functional review:
- researcher: The ultrasonic pet collar has moderate market fit...
- marketer: Give your pet a voice and peace of mind with our...
- legal: FCC emission standards for ultrasonic devices may apply...Two things worth staring at:
- The aggregator is async. If you want an LLM-synthesised summary instead of string-concatenation, call another agent inside this function — the framework will
awaitit. The MS docs custom-aggregator example does exactly that with a “summariser” agent. - The aggregator sees only final responses.
r.agent_response.messagesis the per-agent message list;r.full_conversationis the full history if you need the user turn too. No partial streaming tokens; the framework waits for each branch to complete before invoking the aggregator. That’s the fan-in barrier doing its job.
Side-by-side — Python vs .NET#
| Aspect | Python | .NET |
|---|---|---|
| Builder entry point | ConcurrentBuilder(participants=[...]).build() | AgentWorkflowBuilder.BuildConcurrent(agents, aggregator) |
| Aggregator signature | async def(list[AgentExecutorResponse]) -> str | list[Message] | Func<IList<List<ChatMessage>>, List<ChatMessage>> (sync) |
| Aggregator wiring | .with_aggregator(fn).build() | Second argument to BuildConcurrent |
| Per-agent output event | executor_completed, data is list[AgentExecutorResponse] | AgentResponseEvent, one per agent |
| Per-token streaming event | Agent’s own delta events | AgentResponseUpdateEvent |
| Final output surface | event.type == "output", event.data is aggregator return | WorkflowOutputEvent.Data is aggregator return |
| Kicking off the run | workflow.run(input, stream=True) | InProcessExecution.RunStreamingAsync + TrySendMessageAsync(new TurnToken(emitEvents: true)) |
| Disambiguation gotcha | — | ChatMessage ambiguity with OpenAI.Chat.ChatMessage — use a using alias |
Structurally identical. The biggest surface difference is that Python’s aggregator can be async (call another LLM inside), while .NET’s is a synchronous Func<...> — in practice, if you need an LLM inside the .NET aggregator you wrap the call in Task.Run and .GetAwaiter().GetResult(), or more cleanly you wire the aggregator as its own executor (skip the convenience builder, drop back to WorkflowBuilder from Ch09 with an explicit fan-in edge).
Gotchas#
- Parallelism is real. The three LLM calls fire concurrently. If your provider has concurrency limits (Azure OpenAI TPM/RPM quotas, for example), scale tokens and deployment accordingly. You can concurrency-starve yourself locally just by running this demo against a small Azure deployment.
- Aggregator argument ordering is stable. Both Python and .NET pass per-agent outputs in the same order the agents were supplied to the builder. Don’t rely on them arriving in completion order — the framework buffers at the barrier and hands them to you in insertion order.
- Agents don’t see each other’s output. Concurrent is isolated by design. If agent B genuinely needs agent A’s output, you want Sequential (Ch12) or a hand-rolled workflow with an explicit edge.
- Default aggregator is a list, not a string. Callers that expect a single string have to iterate
WorkflowOutputEvent.Datathemselves or provide a custom aggregator. The aggregator shown in this chapter is the common case for a user-facing “summary of the panel”. TurnTokenforgotten = hang forever (.NET).AgentWorkflowBuilder.BuildConcurrentproduces a workflow that waits for an explicit turn token before dispatching. Python’sConcurrentBuilderdispatches as soon asworkflow.run(...)is called — no equivalent step.- Aggregator exceptions kill the whole output. If your aggregator throws, the workflow emits
WorkflowErrorEventand noWorkflowOutputEvent. Agent-level failures are reported asexecutor_failed/ExecutorFailedEventbut don’t stop the other branches from completing — the aggregator just receives a shorter list. - Wall-clock measurement must wrap the whole stream. If you start the timer inside the
async for/await foreachloop, you miss the workflow’s setup time; if you stop it on the firstexecutor_completedyou miss the aggregator. Start beforeworkflow.run(...)/RunStreamingAsync(...), stop after the stream fully drains. AuthorNameonChatMessageis not the agent’snameby default in every code path — inspectmessage.AuthorNamebut fall back to the executor id from the event you’re iterating. The .NET runnable in this chapter defends against that with a?? "agent".
Tests#
Python tests exercise the concurrency claim directly — one wiring test plus three real-LLM integration tests covering responses, parallel timing, and distinct perspectives:
# Python (4 tests)
source agents/.venv/bin/activate
python -m pytest tutorials/13-concurrent-orchestration/python/tests/ -v
# 4 passedThe parallel-timing test is the load-bearing one: asserts wall-clock < 6s on three agents whose serial baseline would comfortably exceed that. If something regressed the scheduler to dispatch branches serially, this test would flip red before users noticed a latency doubling.
.NET:
cd tutorials/13-concurrent-orchestration/dotnet
dotnet build # type-check and compile
dotnet run -- "<idea>" # end-to-end run with real LLMThe .NET build pins Microsoft.Agents.AI.Workflows 1.1.0, Microsoft.Agents.AI 1.1.0, Microsoft.Agents.AI.OpenAI 1.1.0, and Azure.AI.OpenAI 2.1.0. No source generator needed — the convenience builder handles executor registration internally, so no [MessageHandler] / partial-class machinery shows up in this chapter. You get that back in the capstone (Phase 7) when we drop below BuildConcurrent to wire a bespoke aggregator executor.
How this shows up in the capstone#
agents/python/workflows/pre_purchase.py today uses asyncio.gather plus a hand-rolled state dataclass to fan out to three specialists — ProductDiscovery (reviews), PricingPromotions (price history), InventoryFulfillment (stock). Phase 7 plan plans/refactor/08-pre-purchase-concurrent.md replaces that by-hand parallelism with ConcurrentBuilder and a synthesising aggregator agent. The .NET parity port in agents/dotnet/src/ECommerceAgents.Shared/Workflows/PrePurchaseWorkflow.cs follows the same path with AgentWorkflowBuilder.BuildConcurrent. Both refactors delete more code than they add; the aggregator becomes the single explicit place where three sources of truth merge.
Further reading#
- MAF docs — Concurrent Orchestration (C#) — the canonical .NET reference, including
AgentResponseUpdateEventstreaming and the custom-aggregator pattern this chapter mirrors. - MAF docs — Concurrent Orchestration (Python) — covers
ConcurrentBuilder, the advanced custom-executor pattern, and the async summariser aggregator. - Previous chapter: Chapter 12 — Sequential Orchestration.
- Next chapter: Chapter 14 — Handoff Orchestration.
- Canonical article: nitinksingh.com/posts/maf-v1-13-concurrent-orchestration/
- Source on GitHub: tutorials/13-concurrent-orchestration

