Series note — Part of MAF v1: Python and .NET. Builds on Chapter 09 — Workflow Executors and Edges by adding observability into the workflow’s internals.
Repo — Full runnable code for this chapter is at https://github.com/nitin27may/e-commerce-agents/tree/main/tutorials/10-workflow-events-and-builder. Clone the repo,
cd tutorials/10-workflow-events-and-builder, and follow the README.
Why this chapter#
A workflow that finishes in 30 seconds needs to tell you what it’s doing for those 30 seconds. Without events, you squint at logs. With events, the caller subscribes to a single stream and sees every step the workflow takes — plus whatever custom signals you decide to emit.
MAF models this as one ordered stream with two kinds of events:
- Lifecycle events — what MAF emits for you.
WorkflowStartedEvent,ExecutorInvokedEvent,ExecutorCompletedEvent,SuperStepStartedEvent,SuperStepCompletedEvent,WorkflowOutputEvent. One per phase, automatic, same shape in both languages. - Custom events — what your executor emits. Any payload you care about: progress percentages, metrics, intermediate results, diagnostics. You decide the type; you decide when to call emit.
Both flow through the same stream in the order they occurred.
Lifecycle vs custom — the concept Ch09 glossed over#
Ch09 introduced WorkflowContext.add_event(...) in passing but never unpacked what the runtime was already putting on that same stream. The rule is simple once you see it:
- MAF owns lifecycle events. The framework emits one when an executor is about to run, one when it finishes, one when a superstep starts and ends, one when the workflow starts, and one every time
YieldOutputAsync/yield_outputfires. You cannot suppress them and you shouldn’t try to forge them. Use them for bookkeeping: render progress bars, draw the DAG, debug “did my executor even run?”. - You own custom events. They carry domain information the framework has no way to know about — “I validated 42 of 100 rows”, “cache hit rate was 83%”, “agent chose tool X”. Pick a type, pick a payload, emit whenever you have something to say.
A useful test: if a consumer in another team could usefully handle your event without reading your executor’s source, it’s a good custom event. If the event just duplicates what ExecutorCompletedEvent already says, drop it.
Prerequisites#
- Completed Chapter 09 — Workflow Executors and Edges
- Python 3.12+ via
uv; .NET 10 SDK
The concept#
| Term | Definition |
|---|---|
WorkflowEvent | Base type for every event the workflow stream emits — lifecycle or custom. Carries an ExecutorId (when relevant) and a payload on Data. |
| Emit | The act of pushing an event onto the run’s event stream from inside an executor handler. In .NET: context.AddEventAsync(new MyEvent(...)). In Python: ctx.add_event(WorkflowEvent(type="my_type", data=...)). |
event.type discriminator (Python) | The string tag on every WorkflowEvent — "executor_invoked", "output", "data", or whatever you pick for a custom event. |
| Lifecycle events | Emitted automatically by the framework: executor_invoked, executor_completed, executor_failed, superstep_started, superstep_completed, output, workflow_started, workflow_error. |
| Custom event payload | Any object you want. In .NET, subclass WorkflowEvent; in Python, pass a type string + data payload. |
| Event ordering | Events arrive in emit order. Lifecycle events for executor A appear before executor B’s if A ran first. Within one executor, events arrive in the order you called emit. |
Diagram — event timeline for a 3-executor pipeline#
orange = custom events emitted by your executor
Every arrow going back to Caller is one yield from the event stream. The caller sees lifecycle and custom events interleaved. The final WorkflowOutputEvent is itself a lifecycle event — the framework raises it the moment your executor calls YieldOutputAsync.
Event taxonomy reference#
The full set of events MAF emits (per the official docs):
| Event | When it fires | Carried data |
|---|---|---|
WorkflowStartedEvent / type="started" | Run begins, before the start executor is invoked | — |
SuperStepStartedEvent / type="superstep_started" | A superstep (one Pregel round) is about to dispatch | Superstep index / pending message count |
ExecutorInvokedEvent / type="executor_invoked" | An executor is about to handle a message | ExecutorId |
AgentResponseEvent / type="data" | An agent-backed executor produced a response | AgentResponse payload |
AgentResponseUpdateEvent / type="data" | Streaming token update from an agent executor | AgentResponseUpdate payload |
| Your custom events | Any time you call AddEventAsync / add_event | Whatever you put there |
ExecutorCompletedEvent / type="executor_completed" | An executor returned without throwing | ExecutorId, handler result (if any) |
ExecutorFailedEvent / type="executor_failed" | An executor threw | ExecutorId, exception |
SuperStepCompletedEvent / type="superstep_completed" | All executors in the superstep have returned | Superstep completion info |
WorkflowOutputEvent / type="output" | Any executor called YieldOutputAsync / yield_output | The yielded payload |
RequestInfoEvent / type="request_info" | Workflow paused for human input (Ch17) | RequestId, request payload |
WorkflowWarningEvent / type="warning" | Non-fatal framework warning | Message |
WorkflowErrorEvent / type="error" / "failed" | Workflow terminated with error | Exception details |
Code walkthrough#
Source: dotnet/Program.cs.
Define a custom event — subclass WorkflowEvent with your payload type:
internal sealed class ProgressEvent(string step, int percent)
: WorkflowEvent(new ProgressPayload(step, percent))
{
public string Step => ((ProgressPayload)Data!).Step;
public int Percent => ((ProgressPayload)Data!).Percent;
}
internal sealed record ProgressPayload(string Step, int Percent);Emit from inside a handler — context.AddEventAsync is order-preserving:
[SendsMessage(typeof(string))]
internal sealed partial class UppercaseExecutor() : Executor("uppercase")
{
[MessageHandler]
public async ValueTask HandleAsync(
string message,
IWorkflowContext context,
CancellationToken cancellationToken = default)
{
await context.AddEventAsync(new ProgressEvent("uppercase", 33), cancellationToken);
await context.SendMessageAsync(message.ToUpperInvariant(), cancellationToken);
}
}Consume the stream — pattern-match on the concrete subtype first, lifecycle types after:
await using StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, input);
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
switch (evt)
{
case ProgressEvent p:
Console.WriteLine($" [progress] {p.Step} -> {p.Percent}%");
break;
case ExecutorInvokedEvent invoke:
Console.WriteLine($"[lifecycle] executor_invoked {invoke.ExecutorId}");
break;
case ExecutorCompletedEvent done:
Console.WriteLine($"[lifecycle] executor_completed {done.ExecutorId}");
break;
case WorkflowOutputEvent output:
Console.WriteLine($" [output] {output.Data}");
break;
}
}Running it:
$ dotnet run -- "hello world"
input: 'hello world'
[lifecycle] workflow_started
[lifecycle] superstep_started (step starts)
[lifecycle] executor_invoked uppercase
[progress] uppercase -> 33%
[lifecycle] executor_completed uppercase
[lifecycle] superstep_completed (step ends)
[lifecycle] superstep_started (step starts)
[lifecycle] executor_invoked validate
[progress] validate -> 66%
[lifecycle] executor_completed validate
[lifecycle] superstep_completed (step ends)
[lifecycle] superstep_started (step starts)
[lifecycle] executor_invoked log
[progress] log -> 100%
[output] LOGGED: HELLO WORLD
[lifecycle] executor_completed log
[lifecycle] superstep_completed (step ends)Lifecycle events bracket every executor; custom events sit right where the code called AddEventAsync. The project uses the same source-generator flow as Ch09 — Microsoft.Agents.AI.Workflows.Generators wires up each [MessageHandler] method at compile time.
Source: python/main.py.
Define a payload:
from dataclasses import dataclass
from agent_framework._workflows._events import WorkflowEvent
@dataclass(frozen=True)
class ProgressPayload:
step: str
percent: intEmit from inside a handler:
class UppercaseExecutor(Executor):
def __init__(self) -> None:
super().__init__(id="uppercase")
@handler
async def run(self, message: str, ctx: WorkflowContext[str]) -> None:
await ctx.add_event(WorkflowEvent.emit("uppercase", ProgressPayload("uppercase", 33)))
await ctx.send_message(message.upper())Consume and filter — discriminate on type + isinstance on the payload:
async for event in workflow.run(text, stream=True):
etype = getattr(event, "type", None)
if etype == "data" and isinstance(getattr(event, "data", None), ProgressPayload):
progress.append(event.data)
elif etype == "output":
outputs.append(event.data)The isinstance guard matters — agent responses and your progress payload both surface as type="data", so filter on the payload type, not just the string tag.
Running it:
$ python main.py "hello world"
input: 'hello world'
progress: uppercase → 33%
progress: validate → 66%
progress: log → 100%
output: 'LOGGED: HELLO WORLD'Streaming vs non-streaming#
MAF gives you two ways to run a workflow. The distinction matters for events:
| Mode | .NET API | Python API | Events behaviour |
|---|---|---|---|
| Streaming | InProcessExecution.RunStreamingAsync(...) returning StreamingRun + WatchStreamAsync() | workflow.run(input, stream=True) → async iterator | Events arrive as the workflow runs. |
| Non-streaming | InProcessExecution.RunAsync(...) returning Run | workflow.run(input) → awaitable returning a run summary | All events collected on the returned run; you iterate them after the workflow finishes. |
For anything user-facing (progress bars, SSE endpoints, CLI output), use streaming. Non-streaming is fine for tests that don’t care about ordering.
Side-by-side differences#
| Aspect | Python | .NET |
|---|---|---|
| Event model | One WorkflowEvent class with a .type string discriminator | One WorkflowEvent base class, concrete subtypes per event kind |
| Define custom event | WorkflowEvent(type="progress", data=...) or WorkflowEvent.emit(source_id, data) | class ProgressEvent(...) : WorkflowEvent(data) |
| Emit inside handler | await ctx.add_event(evt) | await context.AddEventAsync(evt, cancellationToken) |
| Start the run | workflow.run(input, stream=True) → async iterator of events | await InProcessExecution.RunStreamingAsync(workflow, input) → StreamingRun |
| Stream the events | async for event in workflow.run(...) | await foreach (var evt in run.WatchStreamAsync()) |
| Filter by kind | if event.type == "...": ... + isinstance(event.data, ...) | switch (evt) { case ProgressEvent p: ... } |
| Lifecycle event names | strings: "executor_invoked", "output", … | types: ExecutorInvokedEvent, WorkflowOutputEvent, … |
| Reserved tags | "started", "status", "failed" are reserved; custom emits of those are dropped | n/a — custom events are a different subtype |
Gotchas#
- Match the concrete subtype first in .NET.
ProgressEventinherits fromWorkflowEvent, socase WorkflowEventwould swallow it. Always put custom cases above lifecycle cases in aswitch. - Custom events in short-circuited branches never fire. If an upstream executor calls
YieldOutputAsync/yield_output, downstream executors don’t run, so their emits never land on the stream. - Don’t share mutable state across executors to “sync” events. Emits are per-executor. Cross-executor data goes through
SendMessageAsync/send_message. - Reserved Python type tags. The strings
"started","status","failed"are reserved by the framework. Emit with one of those and MAF drops the event and logs a warning. await usingtheStreamingRunin .NET. Not disposing it leaks the internal channel reader.ExecutorCompletedEvent.Datais the handler’s return value, not the forwarded message. If your handler returnsValueTask<T>,Tlands onData.
Tests#
Both test suites assert on the event stream — no LLM, fully deterministic.
# Python — 5 tests (order, payload type, short-circuit, output pairing, incremental stream)
python -m pytest tutorials/10-workflow-events-and-builder/python/tests/ -v
# 5 passed
# .NET — 6 tests (+ lifecycle/custom interleave test)
cd tutorials/10-workflow-events-and-builder/dotnet/tests
dotnet test --nologo
# Passed! - Failed: 0, Passed: 6How this shows up in the capstone#
- Phase 7’s Concurrent pre-purchase workflow emits
ProgressPayload-shaped events so the frontend can render a live progress bar while reviews + stock + price-history run in parallel. - The Aspire Dashboard visualizes both lifecycle and custom events in the same trace — the custom payload becomes a span attribute.
- The .NET orchestrator port reuses the
WorkflowEventsubclass pattern from this chapter.

