Series note — Part of MAF v1: Python and .NET. Second of four advanced chapters — after Human-in-the-Loop, before Declarative Workflows.
Repo — Full runnable code for this chapter: tutorials/18-state-and-checkpoints. Clone,
cd tutorials/18-state-and-checkpoints, follow the per-language instructions below.
Why this chapter#
A workflow that forgets everything when the process restarts is a demo, not a system. Four situations turn “forgets” into a real problem:
- Long-running workflows. A pre-purchase price negotiation that fans out to six suppliers and waits for every response can run for minutes. An overnight research report takes hours. A deployment verifier polls for a day. Losing that progress to a pod restart is unacceptable.
- Expensive operations. If superstep 3 cost $4 in LLM calls and the container got OOM-killed during superstep 4, you want superstep 5 to start from the saved
facts ledger, not pay the $4 again. - HITL pauses that span hours or days. Chapter 17 showed a workflow pausing for a human approval. If the reviewer is in another timezone and clicks “Approve” the next morning, the workflow object you kept in memory is long dead. Checkpoint the pause; rehydrate it; resume.
- Crash recovery and migration. Blue-green deploys, k8s rolling restarts, moving a tenant between regions — all require serializing the current state of every in-flight workflow and restoring it somewhere else.
MAF answers all four with the same primitive: a checkpoint — a serialized snapshot of the whole workflow (every executor’s state, every in-flight message, every pending HITL request) taken at every superstep boundary. You pick a CheckpointStorage backend (in-memory, file, Postgres, or Cosmos); the framework does the rest.
The rest of this chapter is the mechanics: what a checkpoint contains, when it’s written, the two executor hooks that control serialization, and the four storage backends that make this work from unit tests up to production multi-tenant SaaS.
Jargon defined inline: CheckpointStorage, on_checkpoint_save / OnCheckpointingAsync, on_checkpoint_restore / OnCheckpointRestoredAsync, FileCheckpointStorage, CosmosCheckpointStorage, CheckpointManager (.NET), FileSystemJsonCheckpointStore (.NET), checkpoint_id, get_latest(workflow_name=...).
Prerequisites#
- Completed Chapter 17 — Human-in-the-Loop. The HITL pause/resume model is the easiest-to-understand motivator for durable checkpoints.
uvfor Python; .NET 10 SDK for the .NET sample.- No LLM required. Checkpointing is framework plumbing. The counter example is pure int arithmetic, which is why the tests are fully deterministic.
The concept: the checkpoint lifecycle#
There are two moments that matter in checkpointing. Save fires every time a superstep completes — the scheduler calls every executor’s save hook, bundles the returned state with the in-flight messages and pending requests, and hands the blob to the storage backend. Restore fires when the caller asks to resume from a checkpoint_id — the framework loads the blob from storage, walks every executor calling its restore hook with the matching state slice, replays any pending messages, and the scheduler picks up from the next superstep.
Everything between save and restore — a crash, a deploy, a week-long HITL pause, a migration to another region — is a pure I/O problem. The framework doesn’t care.
send_message / yield_output]) superstep[Superstep ends] save[[on_checkpoint_save
OnCheckpointingAsync]] subgraph backends[CheckpointStorage backends] mem[(InMemory
tests)] file[(File
single host)] pg[(Postgres
capstone)] cosmos[(Cosmos DB
distributed)] end boundary{{crash / restart / migrate}} load[[load / RetrieveIndex]] restore[[on_checkpoint_restore
OnCheckpointRestoredAsync]] resume([Workflow resumes
pending messages replay]) execute --> superstep --> save --> backends backends -. process boundary .-> boundary boundary --> load --> restore --> resume class execute core class superstep core class save success class load success class restore success class resume core class mem infra class file infra class pg infra class cosmos external class boundary error
The dotted line is the process boundary. Your code owns the storage backend and the retention policy. MAF owns the snapshot shape, the save cadence (every superstep, non-negotiable), and the replay semantics.
A checkpoint bundles four things (MS docs):
- Every executor’s state — the dicts your
on_checkpoint_savehooks return. - Pending messages for the next superstep — routes from
send_message/SendMessageAsyncthat haven’t been dispatched yet. - Pending requests and responses — the HITL pauses from Ch17 (
request_info/RequestPort). - Shared state — anything you pushed via
ctx.QueueStateUpdateAsync(.NET) or per-executor state dicts (Python).
Jargon recap#
CheckpointStorage(Python) /CheckpointManager(.NET) — the interface for persisting checkpoints. Four built-in backends below.on_checkpoint_save(self) -> dict[str, Any](Python) /OnCheckpointingAsync(IWorkflowContext context, CancellationToken ct)(.NET) — the hook that runs at the end of every superstep and returns what to persist. Python returns a plain dict; .NET pushes state into the workflow context viacontext.QueueStateUpdateAsync(key, value).on_checkpoint_restore(self, state)(Python) /OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken ct)(.NET) — the hook that runs on resume, before any handler fires. Python receives the dict you returned from save; .NET reads it back withcontext.ReadStateAsync<T>(key).FileCheckpointStorage(Python) /FileSystemJsonCheckpointStore(.NET) — disk-backed storage. Single-host durability; no cross-machine coordination.CosmosCheckpointStorage(Python, inagent-framework-azure-cosmos) — Azure Cosmos DB NoSQL backend. Durable, distributed, partitioned byworkflow_name.checkpoint_id— a framework-assigned unique id on every checkpoint. Opaque; treat it like the HITLrequest_idfrom Ch17 — round-trip it verbatim, don’t parse it.get_latest(workflow_name=...)(Python) /RetrieveIndexAsync(sessionId)+ ordering (.NET) — query the store for the most recent checkpoint of a named workflow. Unnamed workflows can’t be queried; name yours at build time.
Storage backends — pick the one that matches your deployment#
All four backends implement the same CheckpointStorage contract. Swapping them is a one-line change — workflow and executor code stay identical. Pick by durability, locality, and operational footprint:
| Backend | Package | Durability | When to use |
|---|---|---|---|
InMemoryCheckpointStorage (Py) / CheckpointManager.CreateInMemory() (.NET) | agent-framework / Microsoft.Agents.AI.Workflows | Process memory | Unit tests, deterministic demos, short-lived workflows that never cross a process boundary. |
FileCheckpointStorage (Py) / FileSystemJsonCheckpointStore wrapped in CheckpointManager.CreateJson(store) (.NET) | Built-in | Local disk | Single-machine workflows that need to survive a crash or a restart. Dev machines, VMs, edge devices. Not cross-pod safe. |
CosmosCheckpointStorage (Py) / equivalent in .NET | agent-framework-azure-cosmos | Azure Cosmos DB NoSQL | Distributed production. Cosmos partitions by workflow_name automatically; managed identity + RBAC is the recommended auth. Use this when multiple pods/regions share a workflow fleet. |
PostgresCheckpointStorage (Py, capstone) / equivalent in .NET | Custom, in shared/checkpoint_storage.py | Postgres JSONB column | Production systems already running Postgres. The capstone ships one because the rest of the stack (orders, inventory) already lives in pgvector-enabled Postgres — reusing it keeps the dependency surface small. |
The MS docs describe three as “built in”; the Postgres one is a custom implementation this capstone contributes. The shape of any backend is five methods: save, load, list_checkpoints, get_latest, delete — so rolling your own for DynamoDB, SQL Server, or S3 is a one-class exercise.
Code walkthrough#
Source: dotnet/Program.cs. Two executors, one edge. The demo seeds an AccumulatorExecutor with 10, adds 5, runs end-to-end, then rehydrates into a fresh workflow with a deliberately wrong seed (999) — proving OnCheckpointRestoredAsync clobbers the seed and restores the real total from disk.
using Microsoft.Agents.AI.Workflows;
using Microsoft.Agents.AI.Workflows.Checkpointing;
// Store + manager. CheckpointManager.CreateJson wraps the store with a
// JSON marshaller so executor state round-trips through System.Text.Json.
var store = new FileSystemJsonCheckpointStore(new DirectoryInfo("./.checkpoints"));
CheckpointManager checkpointManager = CheckpointManager.CreateJson(store);
// Phase 1: run end-to-end. Every superstep boundary writes a checkpoint.
string sessionId = Guid.NewGuid().ToString("N");
Workflow workflow1 = BuildWorkflow(seed: 10);
await using StreamingRun run = await InProcessExecution
.RunStreamingAsync(workflow1, input: 5, checkpointManager, sessionId);
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is SuperStepCompletedEvent step && step.CompletionInfo?.Checkpoint is { } cp)
Console.WriteLine($" superstep done — checkpoint {cp.CheckpointId[..8]}");
if (evt is WorkflowOutputEvent output)
Console.WriteLine($"Phase 1 result: total = {output.Data}");
}
// Phase 2: rehydrate into a fresh Workflow with the WRONG seed.
var checkpoints = (await store.RetrieveIndexAsync(sessionId)).ToList();
CheckpointInfo firstCheckpoint = checkpoints[0];
Workflow workflow2 = BuildWorkflow(seed: 999);
await using StreamingRun resumed = await InProcessExecution
.ResumeStreamingAsync(workflow2, firstCheckpoint, checkpointManager);
await foreach (WorkflowEvent evt in resumed.WatchStreamAsync())
if (evt is WorkflowOutputEvent output)
Console.WriteLine($"Phase 2 result: total = {output.Data}"); // still 15The accumulator itself:
[SendsMessage(typeof(int))]
internal sealed partial class AccumulatorExecutor : Executor
{
private const string StateKey = "total";
private int _total;
public AccumulatorExecutor(int seed) : base("accumulator") => _total = seed;
[MessageHandler]
public async ValueTask HandleAsync(int amount, IWorkflowContext context, CancellationToken ct = default)
{
_total += amount;
await context.SendMessageAsync(_total, cancellationToken: ct);
}
protected override ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken ct = default) =>
context.QueueStateUpdateAsync(StateKey, _total, cancellationToken: ct);
protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken ct = default)
{
_total = await context.ReadStateAsync<int>(StateKey, cancellationToken: ct);
}
}Three things that trip people up moving from Python to .NET:
- State goes through the
IWorkflowContextstate bag. Python returns a dict; .NET callscontext.QueueStateUpdateAsync(key, value)during save andcontext.ReadStateAsync<T>(key)during restore. TheSystem.Text.Jsonmarshaller handles primitives, records, and anything with[JsonSerializable]out of the box. sessionIdis the .NET partition key.FileSystemJsonCheckpointStorewrites files named{sessionId}_{checkpointId}.json. If you resume a different session, ask the store for its index viaRetrieveIndexAsync(sessionId), notRetrieveIndexAsync(workflowName). Python partitions byworkflow_nameinstead — same idea, different key.ResumeStreamingAsyncrebuilds the run. ThenewRunis a freshStreamingRun— the old one is disposed. Streaming events start fresh. The MS docs call this “rehydration” and offerRestoreCheckpointAsyncon an existing run as the alternative for same-process resume.
Run it:
cd tutorials/18-state-and-checkpoints/dotnet
dotnet run # defaults: seed=10, add=5, restored total=15
dotnet run -- 100 7 # seed=100, add=7, restored total=107Source: python/main.py. Same two-executor shape.
from agent_framework._workflows._checkpoint import FileCheckpointStorage
from agent_framework._workflows._executor import Executor, handler
from agent_framework._workflows._workflow_builder import WorkflowBuilder
from agent_framework._workflows._workflow_context import WorkflowContext
class AccumulatorExecutor(Executor):
def __init__(self, seed: int) -> None:
super().__init__(id="accumulator")
self.total = seed
@handler
async def handle(self, amount: int, ctx: WorkflowContext[int, None]) -> None:
self.total += amount
await ctx.send_message(self.total)
async def on_checkpoint_save(self) -> dict[str, Any]:
return {"total": self.total}
async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
self.total = int(state.get("total", 0))
class FinalizerExecutor(Executor):
def __init__(self) -> None:
super().__init__(id="finalizer")
@handler
async def handle(self, total: int, ctx: WorkflowContext[None, int]) -> None:
await ctx.yield_output(total)
storage = FileCheckpointStorage("./.checkpoints")
accumulator = AccumulatorExecutor(seed=10)
workflow = (
WorkflowBuilder(
start_executor=accumulator,
name="accumulator-workflow",
checkpoint_storage=storage,
)
.add_edge(accumulator, FinalizerExecutor())
.build()
)
# Phase 1: end-to-end run writes checkpoints at every superstep boundary.
result = await run_once(storage, seed=10, amount=5) # returns 15
# Phase 2: resume from the FIRST checkpoint, not the latest.
# The latest checkpoint is after Finalizer ran; its pending-message queue is
# empty. The first checkpoint captures the accumulator's post-add state.
checkpoints = await storage.list_checkpoints(workflow_name="accumulator-workflow")
checkpoints.sort(key=lambda cp: cp.timestamp)
replayed = await resume_from_checkpoint(storage, checkpoints[0].checkpoint_id, resume_seed=999)
assert replayed == result # 15, not 999Four things to pin down:
- Save must return a dict. Not a dataclass, not a Pydantic model — a plain
dict[str, Any]. MAF pickles it, base64s the result, and embeds it inside the outer JSON envelope. - Restore is called before any handler on resume. The framework rebuilds the executor (your
__init__runs with whatever you pass the builder — hereseed=999), then immediately callson_checkpoint_restoreto overwrite whatever__init__set. Treat the hook as authoritative. name=is mandatory forget_latest. The querystorage.list_checkpoints(workflow_name=...)is the only way to ask the store “what’s the newest snapshot of this workflow?”.- Resume accepts no new input message.
workflow.run(checkpoint_id=..., checkpoint_storage=..., stream=True)replays the saved superstep’s pending messages. Passing a fresh input alongside raisesValueError.
Output from python main.py 10 5:
Phase 1: seed=10, add=5
Phase 1 result: total = 15
2 checkpoint file(s) on disk.
Resuming from ecc3f7ff… with seed=999
Phase 2 result: total = 15 (expected 15)The expected 15 prints 15, not 999. That’s the whole contract in a single line: the checkpoint is the source of truth; __init__ is just a default.
Side-by-side differences#
| Aspect | Python | .NET |
|---|---|---|
| Save hook | async on_checkpoint_save(self) -> dict[str, Any] | OnCheckpointingAsync(IWorkflowContext ctx, CancellationToken ct) — pushes via ctx.QueueStateUpdateAsync(key, value) |
| Restore hook | async on_checkpoint_restore(self, state: dict) | OnCheckpointRestoredAsync(IWorkflowContext ctx, CancellationToken ct) — pulls via ctx.ReadStateAsync<T>(key) |
| Storage abstraction | CheckpointStorage passed as checkpoint_storage= on WorkflowBuilder | CheckpointManager wrapping an ICheckpointStore; passed to InProcessExecution.RunStreamingAsync(..., checkpointManager, sessionId) |
| Built-in backends | InMemoryCheckpointStorage, FileCheckpointStorage, CosmosCheckpointStorage (via agent-framework-azure-cosmos) | CheckpointManager.CreateInMemory(), FileSystemJsonCheckpointStore wrapped in CreateJson(store), Cosmos via the .Azure.Cosmos package |
| Partition key | workflow_name (from WorkflowBuilder(..., name=...)) | sessionId passed to RunStreamingAsync |
| Query latest | await storage.get_latest(workflow_name=...) | (await store.RetrieveIndexAsync(sessionId)).Last() or run.Checkpoints.Last() |
| Resume — same run | N/A (Python always rebuilds) | await run.RestoreCheckpointAsync(savedCheckpoint) |
| Resume — fresh run | workflow.run(checkpoint_id=id, checkpoint_storage=storage, stream=True) | InProcessExecution.ResumeStreamingAsync(newWorkflow, savedCheckpoint, checkpointManager) |
| Serialization | Pickle-in-JSON (base64); restricted unpickler by default | System.Text.Json via CheckpointManager.CreateJson |
Same state machine, different names. The surprising one is partitioning: Python groups by workflow name, .NET groups by session id. In the capstone we normalize to workflow name on both sides so pre-purchase and return-replace workflows can be queried identically across languages.
Gotchas#
- Resume accepts no new input. Python raises
ValueErrorif you pass bothinput=andcheckpoint_id=. .NET’sResumeStreamingAsyncdoesn’t take an input parameter at all. The saved checkpoint is the input. get_latestisn’t always what you want. In a workflow that ran to completion, the latest checkpoint has an empty pending-message queue. Resuming from it produces no output. Uselist_checkpoints+ your own timestamp sort when you want a specific superstep.- State must round-trip through the serializer. Python uses pickle-in-JSON; .NET uses
System.Text.Json. Plain dicts, lists, primitives, dataclasses, records — all fine. Custom objects with runtime references (open file handles, sockets,asyncio.Lockinstances) will either fail on save or deserialize into ghosts. - The workflow needs a
nameto be queryable.list_checkpoints(workflow_name=...)andget_latest(workflow_name=...)filter by the name you pass toWorkflowBuilder(..., name=...)in Python. .NET usessessionIdinstead — decide your partition key up front, it’s in every API call. - Checkpoints pile up. No backend auto-deletes. Production Postgres/Cosmos deployments need a retention job (daily cron,
DELETE FROM workflow_checkpoints WHERE created_at < now() - '7 days'::interval). The capstone ships a cleanup script. - Pickle is a trust boundary.
FileCheckpointStorageandCosmosCheckpointStorageuse pickle-in-JSON for non-native Python types. MAF’s restricted unpickler blocks arbitrary classes by default — whitelist your custom types withallowed_checkpoint_types=["my_app.models:SafeState"]. Never load a checkpoint from an untrusted source. - The
__init__args matter for edges and ids, not for state. When a new process rebuilds the workflow, the executor ids and edge topology must match what was saved. Changing the id of a checkpointed executor between versions orphans its saved state. - HITL requests survive checkpoints. Ch17’s
RequestPortpauses are captured in the checkpoint. On resume, the framework re-emits theRequestInfoEventso the caller can collect a response and callSendResponseAsyncor passresponses=on the next run. You get crash-safe HITL for free once you add checkpoint storage.
Tests#
Python ships 8 deterministic tests — no LLM, no network, no sleeps, no flakiness:
cd agents/python
uv run python -m pytest ../../tutorials/18-state-and-checkpoints/python/tests/ -v
# 8 passed in 0.46s.NET builds cleanly under TreatWarningsAsErrors=true and runs end-to-end:
cd tutorials/18-state-and-checkpoints/dotnet
dotnet build # 0 warnings, 0 errors
dotnet run -- 10 5 # Phase 2 result: total = 15 (expected 15)How this shows up in the capstone#
Phase 7 plans/refactor/11-checkpointing.md ships a custom Postgres-backed CheckpointStorage (agents/python/shared/checkpoint_storage.py) that uses MAF’s encode_checkpoint_value / decode_checkpoint_value helpers to round-trip state through a workflow_checkpoints JSONB table. The factory (agents/python/shared/factory.py → get_checkpoint_storage) switches on MAF_CHECKPOINT_BACKEND env var so tests use memory, local dev uses file, and production uses postgres. Two workflows opt in:
workflows/pre_purchase.py— the slow fan-out to multiple suppliers. A pod restart mid-negotiation picks up where it left off instead of re-quoting every supplier.workflows/return_replace.py— paired with Ch17 HITL. When a high-value return paused for approval and the customer doesn’t confirm for a day, the orchestrator rehydrates the checkpoint when the “Approve” webhook fires and resumes the discount-apply executor.
.NET has the mirror implementation at agents/dotnet/src/ECommerceAgents.Shared/Checkpoints/PostgresCheckpointStorage.cs plus a full CheckpointStorageFactory that switches on the same env var. The integration tests run against a real Postgres testcontainer — never mock the DB.
Further reading#
- MAF docs — Checkpointing (C#) — canonical
CheckpointManager,SuperStepCompletedEvent,RestoreCheckpointAsync,ResumeStreamingAsyncreference. - MAF docs — Checkpointing (Python) — the
CheckpointStorageprotocol and the three built-in backends. - MAF docs — Core workflow concepts — the superstep scheduler that defines when checkpoints fire.
- Previous chapter: Chapter 17 — Human-in-the-Loop. HITL pauses are the most compelling reason to checkpoint.
- Next chapter: Chapter 19 — Declarative Workflows. YAML-defined workflows use the same checkpoint backend.
- Full source:
python/·dotnet/. - Series shared resources: Mermaid style guide · Jargon glossary.

