Skip to main content

MAF v1 — Workflow Executors and Edges (Python + .NET)

Nitin Kumar Singh
Author
Nitin Kumar Singh
I build enterprise AI solutions and cloud-native systems. I write about architecture patterns, AI agents, Azure, and modern development practices — with full source code.
MAF v1 — Workflow Executors and Edges (Python + .NET)
MAF v1: Python and .NET - This article is part of a series.
Part 9: This Article

Series note — Part of MAF v1: Python and .NET. Partially supersedes Part 11 — Graph-Based Workflows, which used custom asyncio code. This chapter is the MAF-native version, and the first of six chapters that build on the same primitives.

Repo — Full runnable code for this chapter is at https://github.com/nitin27may/e-commerce-agents/tree/main/tutorials/09-workflow-executors-and-edges. Clone the repo, cd tutorials/09-workflow-executors-and-edges, and follow the per-language instructions below.

Why this chapter
#

Chapters 01–08 treated every agent call as a single atomic request. Real flows have steps: validate input, fan out to three data sources in parallel, wait for all of them, then synthesize. You can paste together agent.run() calls by hand, and for two steps that’s fine. For five, with a branch, a retry, and a fan-in, it stops fitting in your head.

MAF’s answer is the Workflow — a deterministic DAG where each node is an Executor (a typed processing unit) and each connection is an Edge (a typed route for messages). The runtime is a Pregel-style superstep scheduler: executors ready to run this round all run concurrently, the framework flushes their output messages at a barrier, then the next round begins with whoever received a message. Iterate until the queue is empty or someone calls yield_output.

That’s it. The rest of this chapter unpacks what each of those words means, why the superstep model is the one Microsoft picked, and how the same primitives look in Python and .NET.

What this chapter is not. It’s not about agents running as executors — that’s the next chapter. It’s not about declarative YAML workflows — that’s Chapter 19. It’s not about events, OTel traces, or checkpointing — those are Chapters 10, 07, and 18 respectively. This chapter is the load-bearing “what is an executor, what is an edge” lesson; every later workflow chapter assumes you’ve internalised it.

We’re going to build the simplest workflow that still demonstrates the three interesting behaviours: typed message passing, terminal outputs via yield_output, and graceful short-circuiting. Three executors — Uppercase, Validate, Log — wired in a straight line. Pass an empty string and Validate kills the run; pass anything else and the message flows through all three. No LLM, no database, no network — just the scheduler and a couple of typed messages.

Prerequisites
#

The concept
#

What a workflow is
#

A Workflow is three things glued together:

  1. A set of Executors — objects with one or more typed handler methods.
  2. A set of Edges — directed connections between executors. Plain edges forward every message; conditional edges forward only when a predicate returns true.
  3. A runtime that schedules executors in supersteps and routes messages along edges.

The Python SDK exposes them as Executor, WorkflowBuilder.add_edge(...), and workflow.run(...). The .NET SDK exposes them as Executor / Executor<TIn> / Executor<TIn, TOut>, WorkflowBuilder.AddEdge(...), and InProcessExecution.RunAsync(...). Same concepts, PascalCase vs snake_case.

The DAG we’re building
#

Three executors in a straight line. The middle one can terminate the run early.

%%{init: {'theme':'base', 'themeVariables': { 'primaryColor': '#2563eb','primaryTextColor': '#ffffff','primaryBorderColor': '#1e40af', 'lineColor': '#64748b','secondaryColor': '#f59e0b','tertiaryColor': '#10b981', 'background': 'transparent'}}}%% flowchart LR classDef core fill:#2563eb,stroke:#1e40af,color:#ffffff classDef external fill:#f59e0b,stroke:#b45309,color:#000000 classDef success fill:#10b981,stroke:#047857,color:#ffffff classDef error fill:#ef4444,stroke:#b91c1c,color:#ffffff classDef infra fill:#64748b,stroke:#334155,color:#ffffff input([User input
string]) up[Uppercase
Executor] validate[Validate
Executor] log[Log
Executor] shortCircuit([Terminal output
skipped empty input]) result([Terminal output
LOGGED: ...]) input --> up up -- "edge: str" --> validate validate -- "edge: non-empty str" --> log validate -. "yield_output (empty)" .-> shortCircuit log -- "yield_output" --> result class up,validate,log core class input infra class result success class shortCircuit error

Uppercase produces a string; Validate either forwards it or terminates with a canned skip output; Log decorates it and terminates with the final result. Two terminal paths, one DAG.

Pregel and supersteps — why this scheduler
#

MAF’s workflow runtime is not a request-response loop that calls the next executor inline. It’s a Pregel-style scheduler borrowed from Google’s Pregel graph-processing paper (2010). Each round is a superstep. Within a superstep, every executor that has an inbound message runs concurrently. When they all finish, a barrier flushes their outbound messages to the receiving executors’ queues. The next superstep begins with whoever got mail.

Imagine five executors, one of which calls a slow API and two of which hold a lock. If the runtime inlined await next_executor.run(msg) the moment an upstream finished, you’d get whatever ordering the scheduler happened to pick — sometimes ExecutorA before ExecutorB, sometimes the reverse, sometimes one starves while the other holds the lock. The same graph with the same inputs would produce different traces, different timing, and occasionally different outputs. Debugging that is misery.

Pregel removes the class of bug entirely. Within a superstep, executors don’t see each other’s work. At the barrier, every in-flight message is delivered atomically. The next superstep runs on a frozen snapshot. You can stop the run between any two supersteps, serialize the state, restart a week later on a different machine, and get the same outcome.

Why a barrier and not just await next_executor.run(msg)?

  • Determinism. Within a superstep, order of execution between executors doesn’t affect the result, because no executor can see another’s output until the barrier. The workflow’s behaviour depends only on the DAG and the inputs, never on timing.
  • Checkpointability. A barrier is a natural serialization point. You can snapshot the full workflow state between supersteps and restore it later. That’s how Chapter 18 — Checkpointing and HITL pauses a run for a human and resumes hours later.
  • Debuggability. You get a clean event timeline — superstep N started, these executors ran, these messages were sent, superstep N+1 started. Much easier to reason about than nested callbacks.
  • Fan-out for free. If an executor sends to three downstream executors, the next superstep runs all three in parallel without you writing asyncio.gather. Same for fan-in — the framework waits at the barrier.
%%{init: {'theme':'base', 'themeVariables': { 'primaryColor': '#2563eb','primaryTextColor': '#ffffff','primaryBorderColor': '#1e40af', 'lineColor': '#64748b','secondaryColor': '#f59e0b','tertiaryColor': '#10b981', 'background': 'transparent'}}}%% flowchart TB classDef core fill:#2563eb,stroke:#1e40af,color:#ffffff classDef success fill:#10b981,stroke:#047857,color:#ffffff classDef error fill:#ef4444,stroke:#b91c1c,color:#ffffff classDef infra fill:#64748b,stroke:#334155,color:#ffffff subgraph T0["Superstep T0"] direction LR up0[Uppercase runs
hello to HELLO] b0{{Barrier: flush
messages + checkpoint}} up0 --> b0 end subgraph T1["Superstep T1"] direction LR v1[Validate runs
non-empty forwards] b1{{Barrier: flush
messages + checkpoint}} v1 --> b1 end subgraph T2["Superstep T2"] direction LR log2[Log runs
yield_output LOGGED: HELLO] done2([Run completes]) log2 --> done2 end subgraph T1alt["Superstep T1 (empty input)"] direction LR v1alt[Validate runs
empty to yield_output] cut([Downstream skipped
skipped empty input]) v1alt -- "yield_output is terminal" --> cut end T0 --> T1 T1 --> T2 T0 -. "empty input path" .-> T1alt class up0,v1,log2,v1alt core class b0,b1 infra class done2 success class cut error

Three supersteps, three barriers for the happy path. The empty-input branch short-circuits at T1 via yield_output — the Log executor never enters the schedule, because no message was ever queued for it.

Jargon recap
#

  • Executor — a class with one or more typed message handlers. Processes inbound messages, optionally sends more, optionally yields a workflow output. The unit of work. Can be stateless (the common case) or stateful; stateful executors that live across runs must implement IResettableExecutor in .NET.
  • @handler (Python) — decorator that marks a method on an Executor subclass as a handler for the inbound message type inferred from the first parameter’s annotation.
  • [MessageHandler] (.NET) — attribute on a method inside a partial class derived from Executor. A source generator from the Microsoft.Agents.AI.Workflows.Generators NuGet package reads the attribute at build time and emits the required ConfigureProtocol override — no reflection at runtime.
  • Edge — a directed connection from one executor to another. Plain edges forward every message; conditional edges forward only when the predicate is true.
  • WorkflowBuilder — the fluent API for assembling a workflow.
  • WorkflowContext (Python) / IWorkflowContext (.NET) — passed into every handler. Exposes send_message to fire edges, and yield_output to emit a final workflow result.
  • Pregel — the 2010 Google paper that introduced the superstep model for graph processing. MAF borrows the scheduling idea, not the distributed-computing machinery.
  • Superstep — one round of the workflow scheduler. All pending messages are dispatched concurrently; the next superstep fires only after every executor in this one has finished.
  • Barrier — the synchronization point between supersteps. Outbound messages produced in superstep N are flushed to receivers’ queues only at the barrier.
  • yield_output / YieldOutputAsync — the terminal output signal. Emits a final result for the workflow and does not fire any outbound edges from that executor.
  • Conditional edge — an edge with a predicate. The runtime only forwards a message along the edge if the predicate returns true.

Code walkthrough
#

Source: dotnet/Program.cs. Same three executors as the Python side, source-generator-driven registration via [MessageHandler].

// dotnet/Program.cs (excerpt)
using Microsoft.Agents.AI.Workflows;

internal static class WorkflowFactory
{
    public static Workflow Build()
    {
        var uppercase = new UppercaseExecutor();
        var validate  = new ValidateExecutor();
        var log       = new LogExecutor();

        return new WorkflowBuilder(uppercase)
            .AddEdge(uppercase, validate)
            .AddEdge(validate, log)
            .WithOutputFrom(validate, log) // either can emit the final output
            .Build();
    }
}

[SendsMessage(typeof(string))]
internal sealed partial class UppercaseExecutor() : Executor("uppercase")
{
    [MessageHandler]
    public async ValueTask HandleAsync(string message, IWorkflowContext context, CancellationToken ct = default)
        => await context.SendMessageAsync(message.ToUpperInvariant(), ct);
}

[SendsMessage(typeof(string))]
[YieldsOutput(typeof(string))]
internal sealed partial class ValidateExecutor() : Executor("validate")
{
    [MessageHandler]
    public async ValueTask HandleAsync(string message, IWorkflowContext context, CancellationToken ct = default)
    {
        if (string.IsNullOrWhiteSpace(message))
        {
            await context.YieldOutputAsync("[skipped: empty input]", ct);
            return;
        }
        await context.SendMessageAsync(message, ct);
    }
}

[YieldsOutput(typeof(string))]
internal sealed partial class LogExecutor() : Executor("log")
{
    [MessageHandler]
    public async ValueTask HandleAsync(string message, IWorkflowContext context, CancellationToken ct = default)
        => await context.YieldOutputAsync($"LOGGED: {message}", ct);
}

Running is a two-step dance: build the Workflow, then hand it to InProcessExecution. Streaming gives you the full event timeline:

await using StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, "hello world");
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    if (evt is WorkflowOutputEvent output && output.Data is string s)
        Console.WriteLine($"output: {s}");
}

Five pieces worth highlighting:

  • partial keyword is mandatory. The source generator in Microsoft.Agents.AI.Workflows.Generators emits a sibling partial file with the ConfigureProtocol(ProtocolBuilder) override MAF needs. Drop partial and the build fails with CS0534.
  • [SendsMessage] / [YieldsOutput] declare the executor’s outbound surface. They’re used for type-checking edges and for graph visualization.
  • Constructor-parameter id. : Executor("uppercase") — that string is the stable id used in events, checkpoints, and visualizations. Unique per workflow.
  • IWorkflowContext, not WorkflowContext<TIn,TOut>. .NET doesn’t type the context on handler input/output; the attributes on the class do.
  • InProcessExecution.RunStreamingAsync returns a StreamingRun. await foreach WatchStreamAsync() yields WorkflowEvent — pattern-match for WorkflowOutputEvent, ExecutorInvokedEvent, ExecutorFailedEvent. For one-shot runs, InProcessExecution.RunAsync(workflow, input) returns a Run whose run.NewEvents you iterate synchronously.

Wiring the generator into the csproj is four lines:

<ItemGroup>
  <PackageReference Include="Microsoft.Agents.AI.Workflows" Version="1.1.0" />
  <PackageReference Include="Microsoft.Agents.AI.Workflows.Generators" Version="1.1.0">
    <PrivateAssets>all</PrivateAssets>
    <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
  </PackageReference>
</ItemGroup>

PrivateAssets=all prevents the generator from leaking as a transitive dependency; IncludeAssets with analyzers is what actually makes Roslyn invoke it during dotnet build. Both entries matter — drop either and the generator effectively doesn’t run.

cd tutorials/09-workflow-executors-and-edges/dotnet
dotnet run -- "hello world"
# input:  'hello world'
# output: 'LOGGED: HELLO WORLD'

dotnet run -- ""
# input:  ''
# output: '[skipped: empty input]'

Source: python/main.py. Three executors, one WorkflowBuilder, pure asyncio, no LLM.

# python/main.py (excerpt)
from agent_framework._workflows._executor import Executor, handler
from agent_framework._workflows._workflow_builder import WorkflowBuilder
from agent_framework._workflows._workflow_context import WorkflowContext


class UppercaseExecutor(Executor):
    def __init__(self) -> None:
        super().__init__(id="uppercase")

    @handler
    async def run(self, message: str, ctx: WorkflowContext[str]) -> None:
        await ctx.send_message(message.upper())


class ValidateExecutor(Executor):
    def __init__(self) -> None:
        super().__init__(id="validate")

    @handler
    async def run(self, message: str, ctx: WorkflowContext[str, str]) -> None:
        if not message.strip():
            await ctx.yield_output("[skipped: empty input]")  # terminal; no downstream fires
            return
        await ctx.send_message(message)


class LogExecutor(Executor):
    @handler
    async def run(self, message: str, ctx: WorkflowContext[None, str]) -> None:
        await ctx.yield_output(f"LOGGED: {message}")


def build_workflow():
    up, validate, log = UppercaseExecutor(), ValidateExecutor(), LogExecutor()
    return (
        WorkflowBuilder(start_executor=up)
        .add_edge(up, validate)
        .add_edge(validate, log)
        .build()
    )

Three things worth staring at:

  • WorkflowContext[InputT, OutputT] — the generic parameters tell MAF what the handler sends and what it yields. WorkflowContext[str] means “sends a str downstream”; WorkflowContext[str, str] means “sends str and yields str”; WorkflowContext[None, str] means “sends nothing, yields str”.
  • yield_output is terminal for this executor’s branch. Validate can either send_message (forwards to log) or yield_output (emits a final workflow output and does not fire the edge).
  • Order in the superstep graph, not the builder. WorkflowBuilder just registers edges. The runtime decides ordering from the graph.
cd tutorials/09-workflow-executors-and-edges/python
uv run python main.py "hello world"
# output: 'LOGGED: HELLO WORLD'

uv run python main.py ""
# output: '[skipped: empty input]'

Side-by-side — Python vs .NET
#

AspectPython.NET
Handler declaration@handler decorator[MessageHandler] attribute on a method inside a partial class
Handler dispatchRuntime reflection on parameter typeBuild-time source generator emits ConfigureProtocol
Context typeWorkflowContext[InputT, OutputT]IWorkflowContext + [SendsMessage] / [YieldsOutput] attributes
Executor idsuper().__init__(id="uppercase"): Executor("uppercase") primary constructor
Conditional edgeadd_edge(a, b, condition=lambda msg: ...).AddEdge(a, b, condition: msg => ...)
Final output surfaceAny executor that calls yield_output(...).WithOutputFrom(...) enumerates the yielders
Runworkflow.run("input", stream=True)InProcessExecution.RunStreamingAsync(workflow, "input")
Build-pipeline stepNoneSource generator runs during dotnet build

Structurally identical. The biggest practical difference is that .NET does its type-checking at build time (source-gen + attribute validation) where Python does it at workflow.build() time.

Beyond linear — fan-out, fan-in, and conditional edges
#

The three-executor pipeline is deliberately minimal. Real workflows layer on three edge patterns the framework supports directly:

  • Fan-out — one executor sends to several downstream executors that run concurrently. Python: builder.add_fan_out_edge(start, [a, b, c]). .NET: builder.AddFanOutEdge(start, [a, b, c]).
  • Fan-in barrier — multiple upstream executors feed a single downstream one; the downstream handler sees a list containing one entry per source. Python: builder.add_fan_in_edge([a, b, c], aggregator). .NET: builder.AddFanInBarrierEdge([a, b, c], aggregator).
  • Conditional edges — plain edges plus a predicate. Python: add_edge(a, b, condition=lambda msg: ...). .NET: AddEdge(a, b, condition: msg => ...).

The Pre-purchase workflow in the capstone uses all three: fan-out to three gatherers, fan-in barrier into a conditional EstimateShipping step, then a single synthesis executor.

Gotchas
#

  • Unique executor ids per workflow. Reusing an id in the same WorkflowBuilder throws at build. The id is what shows up in ExecutorInvokedEvent.ExecutorId, checkpoints, and Mermaid exports.
  • Type matching between handlers and messages. The downstream handler’s input type must be assignable from the upstream’s output, or routing silently doesn’t fire.
  • yield_output is terminal for that executor’s branch, not for the whole workflow. It stops the edges out of that executor from firing. The workflow terminates when the message queue is empty.
  • partial is mandatory in .NET. [MessageHandler] on a non-partial class fails with CS0534 because the source generator can’t emit its half of the class.
  • Generator package version must match runtime package. Microsoft.Agents.AI.Workflows and Microsoft.Agents.AI.Workflows.Generators are released in lockstep.
  • InProcessExecution.RunStreamingAsync doesn’t dispose itself. Use await using StreamingRun run = ....
  • Python — don’t forget start_executor=. WorkflowBuilder() with no start will fail at .build().
  • Don’t confuse [MessageHandler] (workflows) with [Description] (tools). Different layer of the framework.
  • Executors with mutable state across runs must implement IResettableExecutor in .NET.

Tests
#

Python — 5 unit tests; .NET — 6 xUnit tests. Both suites exercise happy path, empty-input short-circuit, whitespace short-circuit, successful build, and event-ordering across supersteps.

# Python
uv run pytest tutorials/09-workflow-executors-and-edges/python/tests/ -v

# .NET
cd tutorials/09-workflow-executors-and-edges/dotnet/tests
dotnet test

Both suites run in under a second each because there’s no LLM in the loop — deterministic text transforms only.

How this shows up in the capstone
#

The capstone’s agents/python/workflows/pre_purchase.py is a 180+ line production-shaped workflow built on exactly these primitives:

  • Five executors — three parallel gatherers (reviews, stock, price history), a conditional shipping step, and a synthesis aggregator.
  • Fan-out edge from the start executor to all three gatherers, running them in parallel in a single superstep.
  • Fan-in barrier edge into EstimateShipping, conditionally fired based on stock availability.
  • Final aggregator that yield_outputs the structured recommendation.

Reading that file after this chapter is the reason the chapter exists — every shape there (Executor, @handler, add_edge, add_fan_in_edge, yield_output) is one you now recognise. The .NET parity port still uses plain Task.WhenAll for the fan-out; Phase 7 rewrites it to the source-gen [MessageHandler] shape from this chapter.

Further reading
#

What’s next
#

Chapter 10 — Workflow Events and Builder walks the event stream in detail: ExecutorInvokedEvent, ExecutorCompletedEvent, WorkflowOutputEvent, WorkflowErrorEvent, and how to emit your own via ctx.add_event(...). That’s the foundation for every UI you’ll build on top of a workflow — progress bars, live graphs, audit logs — and for the OTel trace surface Chapter 07 wires in.

MAF v1: Python and .NET - This article is part of a series.
Part 9: This Article

Related

MAF v1 — MCP Tools (Python + .NET)

·15 mins
Stand up a Python MCP server and consume it from Python and .NET MAF agents, including discovery and AITool wiring from one server.