Skip to main content

MAF v1 — Workflow Events and Builder (Python + .NET)

Nitin Kumar Singh
Author
Nitin Kumar Singh
I build enterprise AI solutions and cloud-native systems. I write about architecture patterns, AI agents, Azure, and modern development practices — with full source code.
MAF v1 — Workflow Events and Builder (Python + .NET)
Table of Contents
MAF v1: Python and .NET - This article is part of a series.
Part 10: This Article

Series note — Part of MAF v1: Python and .NET. Builds on Chapter 09 — Workflow Executors and Edges by adding observability into the workflow’s internals.

Repo — Full runnable code for this chapter is at https://github.com/nitin27may/e-commerce-agents/tree/main/tutorials/10-workflow-events-and-builder. Clone the repo, cd tutorials/10-workflow-events-and-builder, and follow the README.

Why this chapter
#

A workflow that finishes in 30 seconds needs to tell you what it’s doing for those 30 seconds. Without events, you squint at logs. With events, the caller subscribes to a single stream and sees every step the workflow takes — plus whatever custom signals you decide to emit.

MAF models this as one ordered stream with two kinds of events:

  • Lifecycle events — what MAF emits for you. WorkflowStartedEvent, ExecutorInvokedEvent, ExecutorCompletedEvent, SuperStepStartedEvent, SuperStepCompletedEvent, WorkflowOutputEvent. One per phase, automatic, same shape in both languages.
  • Custom events — what your executor emits. Any payload you care about: progress percentages, metrics, intermediate results, diagnostics. You decide the type; you decide when to call emit.

Both flow through the same stream in the order they occurred.

Lifecycle vs custom — the concept Ch09 glossed over
#

Ch09 introduced WorkflowContext.add_event(...) in passing but never unpacked what the runtime was already putting on that same stream. The rule is simple once you see it:

  • MAF owns lifecycle events. The framework emits one when an executor is about to run, one when it finishes, one when a superstep starts and ends, one when the workflow starts, and one every time YieldOutputAsync / yield_output fires. You cannot suppress them and you shouldn’t try to forge them. Use them for bookkeeping: render progress bars, draw the DAG, debug “did my executor even run?”.
  • You own custom events. They carry domain information the framework has no way to know about — “I validated 42 of 100 rows”, “cache hit rate was 83%”, “agent chose tool X”. Pick a type, pick a payload, emit whenever you have something to say.

A useful test: if a consumer in another team could usefully handle your event without reading your executor’s source, it’s a good custom event. If the event just duplicates what ExecutorCompletedEvent already says, drop it.

Prerequisites
#

The concept
#

TermDefinition
WorkflowEventBase type for every event the workflow stream emits — lifecycle or custom. Carries an ExecutorId (when relevant) and a payload on Data.
EmitThe act of pushing an event onto the run’s event stream from inside an executor handler. In .NET: context.AddEventAsync(new MyEvent(...)). In Python: ctx.add_event(WorkflowEvent(type="my_type", data=...)).
event.type discriminator (Python)The string tag on every WorkflowEvent"executor_invoked", "output", "data", or whatever you pick for a custom event.
Lifecycle eventsEmitted automatically by the framework: executor_invoked, executor_completed, executor_failed, superstep_started, superstep_completed, output, workflow_started, workflow_error.
Custom event payloadAny object you want. In .NET, subclass WorkflowEvent; in Python, pass a type string + data payload.
Event orderingEvents arrive in emit order. Lifecycle events for executor A appear before executor B’s if A ran first. Within one executor, events arrive in the order you called emit.

Diagram — event timeline for a 3-executor pipeline
#

%%{init: {'theme':'base', 'themeVariables': { 'primaryColor': '#2563eb','primaryTextColor': '#ffffff','primaryBorderColor': '#1e40af', 'lineColor': '#64748b','secondaryColor': '#f59e0b','tertiaryColor': '#10b981', 'background': 'transparent'}}}%% sequenceDiagram participant C as Caller participant W as Workflow participant U as uppercase participant V as validate participant L as log C->>W: RunStreamingAsync(input) W-->>C: workflow_started W-->>C: superstep_started W->>U: invoke W-->>C: executor_invoked(uppercase) U-->>C: ProgressPayload(uppercase, 33) U->>W: SendMessageAsync(UPPER) W-->>C: executor_completed(uppercase) W-->>C: superstep_completed W-->>C: superstep_started W->>V: invoke W-->>C: executor_invoked(validate) V-->>C: ProgressPayload(validate, 66) V->>W: SendMessageAsync(msg) W-->>C: executor_completed(validate) W-->>C: superstep_completed W-->>C: superstep_started W->>L: invoke W-->>C: executor_invoked(log) L-->>C: ProgressPayload(log, 100) L->>W: YieldOutputAsync(LOGGED:...) W-->>C: WorkflowOutputEvent W-->>C: executor_completed(log) W-->>C: superstep_completed Note over C,L: blue = MAF lifecycle events (automatic)
orange = custom events emitted by your executor

Every arrow going back to Caller is one yield from the event stream. The caller sees lifecycle and custom events interleaved. The final WorkflowOutputEvent is itself a lifecycle event — the framework raises it the moment your executor calls YieldOutputAsync.

Event taxonomy reference
#

The full set of events MAF emits (per the official docs):

EventWhen it firesCarried data
WorkflowStartedEvent / type="started"Run begins, before the start executor is invoked
SuperStepStartedEvent / type="superstep_started"A superstep (one Pregel round) is about to dispatchSuperstep index / pending message count
ExecutorInvokedEvent / type="executor_invoked"An executor is about to handle a messageExecutorId
AgentResponseEvent / type="data"An agent-backed executor produced a responseAgentResponse payload
AgentResponseUpdateEvent / type="data"Streaming token update from an agent executorAgentResponseUpdate payload
Your custom eventsAny time you call AddEventAsync / add_eventWhatever you put there
ExecutorCompletedEvent / type="executor_completed"An executor returned without throwingExecutorId, handler result (if any)
ExecutorFailedEvent / type="executor_failed"An executor threwExecutorId, exception
SuperStepCompletedEvent / type="superstep_completed"All executors in the superstep have returnedSuperstep completion info
WorkflowOutputEvent / type="output"Any executor called YieldOutputAsync / yield_outputThe yielded payload
RequestInfoEvent / type="request_info"Workflow paused for human input (Ch17)RequestId, request payload
WorkflowWarningEvent / type="warning"Non-fatal framework warningMessage
WorkflowErrorEvent / type="error" / "failed"Workflow terminated with errorException details

Code walkthrough
#

Source: dotnet/Program.cs.

Define a custom event — subclass WorkflowEvent with your payload type:

internal sealed class ProgressEvent(string step, int percent)
    : WorkflowEvent(new ProgressPayload(step, percent))
{
    public string Step => ((ProgressPayload)Data!).Step;
    public int Percent => ((ProgressPayload)Data!).Percent;
}

internal sealed record ProgressPayload(string Step, int Percent);

Emit from inside a handlercontext.AddEventAsync is order-preserving:

[SendsMessage(typeof(string))]
internal sealed partial class UppercaseExecutor() : Executor("uppercase")
{
    [MessageHandler]
    public async ValueTask HandleAsync(
        string message,
        IWorkflowContext context,
        CancellationToken cancellationToken = default)
    {
        await context.AddEventAsync(new ProgressEvent("uppercase", 33), cancellationToken);
        await context.SendMessageAsync(message.ToUpperInvariant(), cancellationToken);
    }
}

Consume the stream — pattern-match on the concrete subtype first, lifecycle types after:

await using StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, input);

await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    switch (evt)
    {
        case ProgressEvent p:
            Console.WriteLine($"  [progress]  {p.Step} -> {p.Percent}%");
            break;
        case ExecutorInvokedEvent invoke:
            Console.WriteLine($"[lifecycle] executor_invoked    {invoke.ExecutorId}");
            break;
        case ExecutorCompletedEvent done:
            Console.WriteLine($"[lifecycle] executor_completed  {done.ExecutorId}");
            break;
        case WorkflowOutputEvent output:
            Console.WriteLine($"  [output]    {output.Data}");
            break;
    }
}

Running it:

$ dotnet run -- "hello world"
input:  'hello world'

[lifecycle] workflow_started
[lifecycle] superstep_started   (step starts)
[lifecycle] executor_invoked    uppercase
  [progress]  uppercase  ->  33%
[lifecycle] executor_completed  uppercase
[lifecycle] superstep_completed (step ends)
[lifecycle] superstep_started   (step starts)
[lifecycle] executor_invoked    validate
  [progress]  validate   ->  66%
[lifecycle] executor_completed  validate
[lifecycle] superstep_completed (step ends)
[lifecycle] superstep_started   (step starts)
[lifecycle] executor_invoked    log
  [progress]  log        -> 100%
  [output]    LOGGED: HELLO WORLD
[lifecycle] executor_completed  log
[lifecycle] superstep_completed (step ends)

Lifecycle events bracket every executor; custom events sit right where the code called AddEventAsync. The project uses the same source-generator flow as Ch09 — Microsoft.Agents.AI.Workflows.Generators wires up each [MessageHandler] method at compile time.

Source: python/main.py.

Define a payload:

from dataclasses import dataclass
from agent_framework._workflows._events import WorkflowEvent


@dataclass(frozen=True)
class ProgressPayload:
    step: str
    percent: int

Emit from inside a handler:

class UppercaseExecutor(Executor):
    def __init__(self) -> None:
        super().__init__(id="uppercase")

    @handler
    async def run(self, message: str, ctx: WorkflowContext[str]) -> None:
        await ctx.add_event(WorkflowEvent.emit("uppercase", ProgressPayload("uppercase", 33)))
        await ctx.send_message(message.upper())

Consume and filter — discriminate on type + isinstance on the payload:

async for event in workflow.run(text, stream=True):
    etype = getattr(event, "type", None)
    if etype == "data" and isinstance(getattr(event, "data", None), ProgressPayload):
        progress.append(event.data)
    elif etype == "output":
        outputs.append(event.data)

The isinstance guard matters — agent responses and your progress payload both surface as type="data", so filter on the payload type, not just the string tag.

Running it:

$ python main.py "hello world"
input: 'hello world'
  progress: uppercase → 33%
  progress: validate → 66%
  progress: log → 100%
output: 'LOGGED: HELLO WORLD'

Streaming vs non-streaming
#

MAF gives you two ways to run a workflow. The distinction matters for events:

Mode.NET APIPython APIEvents behaviour
StreamingInProcessExecution.RunStreamingAsync(...) returning StreamingRun + WatchStreamAsync()workflow.run(input, stream=True) → async iteratorEvents arrive as the workflow runs.
Non-streamingInProcessExecution.RunAsync(...) returning Runworkflow.run(input) → awaitable returning a run summaryAll events collected on the returned run; you iterate them after the workflow finishes.

For anything user-facing (progress bars, SSE endpoints, CLI output), use streaming. Non-streaming is fine for tests that don’t care about ordering.

Side-by-side differences
#

AspectPython.NET
Event modelOne WorkflowEvent class with a .type string discriminatorOne WorkflowEvent base class, concrete subtypes per event kind
Define custom eventWorkflowEvent(type="progress", data=...) or WorkflowEvent.emit(source_id, data)class ProgressEvent(...) : WorkflowEvent(data)
Emit inside handlerawait ctx.add_event(evt)await context.AddEventAsync(evt, cancellationToken)
Start the runworkflow.run(input, stream=True) → async iterator of eventsawait InProcessExecution.RunStreamingAsync(workflow, input)StreamingRun
Stream the eventsasync for event in workflow.run(...)await foreach (var evt in run.WatchStreamAsync())
Filter by kindif event.type == "...": ... + isinstance(event.data, ...)switch (evt) { case ProgressEvent p: ... }
Lifecycle event namesstrings: "executor_invoked", "output", …types: ExecutorInvokedEvent, WorkflowOutputEvent, …
Reserved tags"started", "status", "failed" are reserved; custom emits of those are droppedn/a — custom events are a different subtype

Gotchas
#

  • Match the concrete subtype first in .NET. ProgressEvent inherits from WorkflowEvent, so case WorkflowEvent would swallow it. Always put custom cases above lifecycle cases in a switch.
  • Custom events in short-circuited branches never fire. If an upstream executor calls YieldOutputAsync / yield_output, downstream executors don’t run, so their emits never land on the stream.
  • Don’t share mutable state across executors to “sync” events. Emits are per-executor. Cross-executor data goes through SendMessageAsync / send_message.
  • Reserved Python type tags. The strings "started", "status", "failed" are reserved by the framework. Emit with one of those and MAF drops the event and logs a warning.
  • await using the StreamingRun in .NET. Not disposing it leaks the internal channel reader.
  • ExecutorCompletedEvent.Data is the handler’s return value, not the forwarded message. If your handler returns ValueTask<T>, T lands on Data.

Tests
#

Both test suites assert on the event stream — no LLM, fully deterministic.

# Python — 5 tests (order, payload type, short-circuit, output pairing, incremental stream)
python -m pytest tutorials/10-workflow-events-and-builder/python/tests/ -v
# 5 passed

# .NET — 6 tests (+ lifecycle/custom interleave test)
cd tutorials/10-workflow-events-and-builder/dotnet/tests
dotnet test --nologo
# Passed! - Failed: 0, Passed: 6

How this shows up in the capstone
#

  • Phase 7’s Concurrent pre-purchase workflow emits ProgressPayload-shaped events so the frontend can render a live progress bar while reviews + stock + price-history run in parallel.
  • The Aspire Dashboard visualizes both lifecycle and custom events in the same trace — the custom payload becomes a span attribute.
  • The .NET orchestrator port reuses the WorkflowEvent subclass pattern from this chapter.

What’s next
#

MAF v1: Python and .NET - This article is part of a series.
Part 10: This Article

Related

MAF v1 — MCP Tools (Python + .NET)

·15 mins
Stand up a Python MCP server and consume it from Python and .NET MAF agents, including discovery and AITool wiring from one server.