Skip to main content

MAF v1 — State and Checkpoints (Python + .NET)

Nitin Kumar Singh
Author
Nitin Kumar Singh
I build enterprise AI solutions and cloud-native systems. I write about architecture patterns, AI agents, Azure, and modern development practices — with full source code.
MAF v1 — State and Checkpoints (Python + .NET)
MAF v1: Python and .NET - This article is part of a series.
Part 18: This Article

Series note — Part of MAF v1: Python and .NET. Second of four advanced chapters — after Human-in-the-Loop, before Declarative Workflows.

Repo — Full runnable code for this chapter: tutorials/18-state-and-checkpoints. Clone, cd tutorials/18-state-and-checkpoints, follow the per-language instructions below.

Why this chapter
#

A workflow that forgets everything when the process restarts is a demo, not a system. Four situations turn “forgets” into a real problem:

  • Long-running workflows. A pre-purchase price negotiation that fans out to six suppliers and waits for every response can run for minutes. An overnight research report takes hours. A deployment verifier polls for a day. Losing that progress to a pod restart is unacceptable.
  • Expensive operations. If superstep 3 cost $4 in LLM calls and the container got OOM-killed during superstep 4, you want superstep 5 to start from the saved facts ledger, not pay the $4 again.
  • HITL pauses that span hours or days. Chapter 17 showed a workflow pausing for a human approval. If the reviewer is in another timezone and clicks “Approve” the next morning, the workflow object you kept in memory is long dead. Checkpoint the pause; rehydrate it; resume.
  • Crash recovery and migration. Blue-green deploys, k8s rolling restarts, moving a tenant between regions — all require serializing the current state of every in-flight workflow and restoring it somewhere else.

MAF answers all four with the same primitive: a checkpoint — a serialized snapshot of the whole workflow (every executor’s state, every in-flight message, every pending HITL request) taken at every superstep boundary. You pick a CheckpointStorage backend (in-memory, file, Postgres, or Cosmos); the framework does the rest.

The rest of this chapter is the mechanics: what a checkpoint contains, when it’s written, the two executor hooks that control serialization, and the four storage backends that make this work from unit tests up to production multi-tenant SaaS.

Jargon defined inline: CheckpointStorage, on_checkpoint_save / OnCheckpointingAsync, on_checkpoint_restore / OnCheckpointRestoredAsync, FileCheckpointStorage, CosmosCheckpointStorage, CheckpointManager (.NET), FileSystemJsonCheckpointStore (.NET), checkpoint_id, get_latest(workflow_name=...).

Prerequisites
#

  • Completed Chapter 17 — Human-in-the-Loop. The HITL pause/resume model is the easiest-to-understand motivator for durable checkpoints.
  • uv for Python; .NET 10 SDK for the .NET sample.
  • No LLM required. Checkpointing is framework plumbing. The counter example is pure int arithmetic, which is why the tests are fully deterministic.

The concept: the checkpoint lifecycle
#

There are two moments that matter in checkpointing. Save fires every time a superstep completes — the scheduler calls every executor’s save hook, bundles the returned state with the in-flight messages and pending requests, and hands the blob to the storage backend. Restore fires when the caller asks to resume from a checkpoint_id — the framework loads the blob from storage, walks every executor calling its restore hook with the matching state slice, replays any pending messages, and the scheduler picks up from the next superstep.

Everything between save and restore — a crash, a deploy, a week-long HITL pause, a migration to another region — is a pure I/O problem. The framework doesn’t care.

%%{init: {'theme':'base', 'themeVariables': { 'primaryColor': '#2563eb','primaryTextColor': '#ffffff','primaryBorderColor': '#1e40af', 'lineColor': '#64748b','secondaryColor': '#f59e0b','tertiaryColor': '#10b981', 'background': 'transparent'}}}%% flowchart LR classDef core fill:#2563eb,stroke:#1e40af,color:#ffffff classDef external fill:#f59e0b,stroke:#b45309,color:#000000 classDef success fill:#10b981,stroke:#047857,color:#ffffff classDef error fill:#ef4444,stroke:#b91c1c,color:#ffffff classDef infra fill:#64748b,stroke:#334155,color:#ffffff execute([Executor runs
send_message / yield_output]) superstep[Superstep ends] save[[on_checkpoint_save
OnCheckpointingAsync]] subgraph backends[CheckpointStorage backends] mem[(InMemory
tests)] file[(File
single host)] pg[(Postgres
capstone)] cosmos[(Cosmos DB
distributed)] end boundary{{crash / restart / migrate}} load[[load / RetrieveIndex]] restore[[on_checkpoint_restore
OnCheckpointRestoredAsync]] resume([Workflow resumes
pending messages replay]) execute --> superstep --> save --> backends backends -. process boundary .-> boundary boundary --> load --> restore --> resume class execute core class superstep core class save success class load success class restore success class resume core class mem infra class file infra class pg infra class cosmos external class boundary error

The dotted line is the process boundary. Your code owns the storage backend and the retention policy. MAF owns the snapshot shape, the save cadence (every superstep, non-negotiable), and the replay semantics.

A checkpoint bundles four things (MS docs):

  1. Every executor’s state — the dicts your on_checkpoint_save hooks return.
  2. Pending messages for the next superstep — routes from send_message / SendMessageAsync that haven’t been dispatched yet.
  3. Pending requests and responses — the HITL pauses from Ch17 (request_info / RequestPort).
  4. Shared state — anything you pushed via ctx.QueueStateUpdateAsync (.NET) or per-executor state dicts (Python).

Jargon recap
#

  • CheckpointStorage (Python) / CheckpointManager (.NET) — the interface for persisting checkpoints. Four built-in backends below.
  • on_checkpoint_save(self) -> dict[str, Any] (Python) / OnCheckpointingAsync(IWorkflowContext context, CancellationToken ct) (.NET) — the hook that runs at the end of every superstep and returns what to persist. Python returns a plain dict; .NET pushes state into the workflow context via context.QueueStateUpdateAsync(key, value).
  • on_checkpoint_restore(self, state) (Python) / OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken ct) (.NET) — the hook that runs on resume, before any handler fires. Python receives the dict you returned from save; .NET reads it back with context.ReadStateAsync<T>(key).
  • FileCheckpointStorage (Python) / FileSystemJsonCheckpointStore (.NET) — disk-backed storage. Single-host durability; no cross-machine coordination.
  • CosmosCheckpointStorage (Python, in agent-framework-azure-cosmos) — Azure Cosmos DB NoSQL backend. Durable, distributed, partitioned by workflow_name.
  • checkpoint_id — a framework-assigned unique id on every checkpoint. Opaque; treat it like the HITL request_id from Ch17 — round-trip it verbatim, don’t parse it.
  • get_latest(workflow_name=...) (Python) / RetrieveIndexAsync(sessionId) + ordering (.NET) — query the store for the most recent checkpoint of a named workflow. Unnamed workflows can’t be queried; name yours at build time.

Storage backends — pick the one that matches your deployment
#

All four backends implement the same CheckpointStorage contract. Swapping them is a one-line change — workflow and executor code stay identical. Pick by durability, locality, and operational footprint:

BackendPackageDurabilityWhen to use
InMemoryCheckpointStorage (Py) / CheckpointManager.CreateInMemory() (.NET)agent-framework / Microsoft.Agents.AI.WorkflowsProcess memoryUnit tests, deterministic demos, short-lived workflows that never cross a process boundary.
FileCheckpointStorage (Py) / FileSystemJsonCheckpointStore wrapped in CheckpointManager.CreateJson(store) (.NET)Built-inLocal diskSingle-machine workflows that need to survive a crash or a restart. Dev machines, VMs, edge devices. Not cross-pod safe.
CosmosCheckpointStorage (Py) / equivalent in .NETagent-framework-azure-cosmosAzure Cosmos DB NoSQLDistributed production. Cosmos partitions by workflow_name automatically; managed identity + RBAC is the recommended auth. Use this when multiple pods/regions share a workflow fleet.
PostgresCheckpointStorage (Py, capstone) / equivalent in .NETCustom, in shared/checkpoint_storage.pyPostgres JSONB columnProduction systems already running Postgres. The capstone ships one because the rest of the stack (orders, inventory) already lives in pgvector-enabled Postgres — reusing it keeps the dependency surface small.

The MS docs describe three as “built in”; the Postgres one is a custom implementation this capstone contributes. The shape of any backend is five methods: save, load, list_checkpoints, get_latest, delete — so rolling your own for DynamoDB, SQL Server, or S3 is a one-class exercise.

Code walkthrough
#

Source: dotnet/Program.cs. Two executors, one edge. The demo seeds an AccumulatorExecutor with 10, adds 5, runs end-to-end, then rehydrates into a fresh workflow with a deliberately wrong seed (999) — proving OnCheckpointRestoredAsync clobbers the seed and restores the real total from disk.

using Microsoft.Agents.AI.Workflows;
using Microsoft.Agents.AI.Workflows.Checkpointing;

// Store + manager. CheckpointManager.CreateJson wraps the store with a
// JSON marshaller so executor state round-trips through System.Text.Json.
var store = new FileSystemJsonCheckpointStore(new DirectoryInfo("./.checkpoints"));
CheckpointManager checkpointManager = CheckpointManager.CreateJson(store);

// Phase 1: run end-to-end. Every superstep boundary writes a checkpoint.
string sessionId = Guid.NewGuid().ToString("N");
Workflow workflow1 = BuildWorkflow(seed: 10);

await using StreamingRun run = await InProcessExecution
    .RunStreamingAsync(workflow1, input: 5, checkpointManager, sessionId);

await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    if (evt is SuperStepCompletedEvent step && step.CompletionInfo?.Checkpoint is { } cp)
        Console.WriteLine($"  superstep done — checkpoint {cp.CheckpointId[..8]}");
    if (evt is WorkflowOutputEvent output)
        Console.WriteLine($"Phase 1 result: total = {output.Data}");
}

// Phase 2: rehydrate into a fresh Workflow with the WRONG seed.
var checkpoints = (await store.RetrieveIndexAsync(sessionId)).ToList();
CheckpointInfo firstCheckpoint = checkpoints[0];

Workflow workflow2 = BuildWorkflow(seed: 999);
await using StreamingRun resumed = await InProcessExecution
    .ResumeStreamingAsync(workflow2, firstCheckpoint, checkpointManager);

await foreach (WorkflowEvent evt in resumed.WatchStreamAsync())
    if (evt is WorkflowOutputEvent output)
        Console.WriteLine($"Phase 2 result: total = {output.Data}");  // still 15

The accumulator itself:

[SendsMessage(typeof(int))]
internal sealed partial class AccumulatorExecutor : Executor
{
    private const string StateKey = "total";
    private int _total;

    public AccumulatorExecutor(int seed) : base("accumulator") => _total = seed;

    [MessageHandler]
    public async ValueTask HandleAsync(int amount, IWorkflowContext context, CancellationToken ct = default)
    {
        _total += amount;
        await context.SendMessageAsync(_total, cancellationToken: ct);
    }

    protected override ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken ct = default) =>
        context.QueueStateUpdateAsync(StateKey, _total, cancellationToken: ct);

    protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken ct = default)
    {
        _total = await context.ReadStateAsync<int>(StateKey, cancellationToken: ct);
    }
}

Three things that trip people up moving from Python to .NET:

  • State goes through the IWorkflowContext state bag. Python returns a dict; .NET calls context.QueueStateUpdateAsync(key, value) during save and context.ReadStateAsync<T>(key) during restore. The System.Text.Json marshaller handles primitives, records, and anything with [JsonSerializable] out of the box.
  • sessionId is the .NET partition key. FileSystemJsonCheckpointStore writes files named {sessionId}_{checkpointId}.json. If you resume a different session, ask the store for its index via RetrieveIndexAsync(sessionId), not RetrieveIndexAsync(workflowName). Python partitions by workflow_name instead — same idea, different key.
  • ResumeStreamingAsync rebuilds the run. The newRun is a fresh StreamingRun — the old one is disposed. Streaming events start fresh. The MS docs call this “rehydration” and offer RestoreCheckpointAsync on an existing run as the alternative for same-process resume.

Run it:

cd tutorials/18-state-and-checkpoints/dotnet
dotnet run                  # defaults: seed=10, add=5, restored total=15
dotnet run -- 100 7         # seed=100, add=7, restored total=107

Source: python/main.py. Same two-executor shape.

from agent_framework._workflows._checkpoint import FileCheckpointStorage
from agent_framework._workflows._executor import Executor, handler
from agent_framework._workflows._workflow_builder import WorkflowBuilder
from agent_framework._workflows._workflow_context import WorkflowContext


class AccumulatorExecutor(Executor):
    def __init__(self, seed: int) -> None:
        super().__init__(id="accumulator")
        self.total = seed

    @handler
    async def handle(self, amount: int, ctx: WorkflowContext[int, None]) -> None:
        self.total += amount
        await ctx.send_message(self.total)

    async def on_checkpoint_save(self) -> dict[str, Any]:
        return {"total": self.total}

    async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
        self.total = int(state.get("total", 0))


class FinalizerExecutor(Executor):
    def __init__(self) -> None:
        super().__init__(id="finalizer")

    @handler
    async def handle(self, total: int, ctx: WorkflowContext[None, int]) -> None:
        await ctx.yield_output(total)


storage = FileCheckpointStorage("./.checkpoints")
accumulator = AccumulatorExecutor(seed=10)
workflow = (
    WorkflowBuilder(
        start_executor=accumulator,
        name="accumulator-workflow",
        checkpoint_storage=storage,
    )
    .add_edge(accumulator, FinalizerExecutor())
    .build()
)

# Phase 1: end-to-end run writes checkpoints at every superstep boundary.
result = await run_once(storage, seed=10, amount=5)   # returns 15

# Phase 2: resume from the FIRST checkpoint, not the latest.
# The latest checkpoint is after Finalizer ran; its pending-message queue is
# empty. The first checkpoint captures the accumulator's post-add state.
checkpoints = await storage.list_checkpoints(workflow_name="accumulator-workflow")
checkpoints.sort(key=lambda cp: cp.timestamp)
replayed = await resume_from_checkpoint(storage, checkpoints[0].checkpoint_id, resume_seed=999)
assert replayed == result                             # 15, not 999

Four things to pin down:

  • Save must return a dict. Not a dataclass, not a Pydantic model — a plain dict[str, Any]. MAF pickles it, base64s the result, and embeds it inside the outer JSON envelope.
  • Restore is called before any handler on resume. The framework rebuilds the executor (your __init__ runs with whatever you pass the builder — here seed=999), then immediately calls on_checkpoint_restore to overwrite whatever __init__ set. Treat the hook as authoritative.
  • name= is mandatory for get_latest. The query storage.list_checkpoints(workflow_name=...) is the only way to ask the store “what’s the newest snapshot of this workflow?”.
  • Resume accepts no new input message. workflow.run(checkpoint_id=..., checkpoint_storage=..., stream=True) replays the saved superstep’s pending messages. Passing a fresh input alongside raises ValueError.

Output from python main.py 10 5:

Phase 1: seed=10, add=5
Phase 1 result: total = 15

2 checkpoint file(s) on disk.
Resuming from ecc3f7ff… with seed=999
Phase 2 result: total = 15 (expected 15)

The expected 15 prints 15, not 999. That’s the whole contract in a single line: the checkpoint is the source of truth; __init__ is just a default.

Side-by-side differences
#

AspectPython.NET
Save hookasync on_checkpoint_save(self) -> dict[str, Any]OnCheckpointingAsync(IWorkflowContext ctx, CancellationToken ct) — pushes via ctx.QueueStateUpdateAsync(key, value)
Restore hookasync on_checkpoint_restore(self, state: dict)OnCheckpointRestoredAsync(IWorkflowContext ctx, CancellationToken ct) — pulls via ctx.ReadStateAsync<T>(key)
Storage abstractionCheckpointStorage passed as checkpoint_storage= on WorkflowBuilderCheckpointManager wrapping an ICheckpointStore; passed to InProcessExecution.RunStreamingAsync(..., checkpointManager, sessionId)
Built-in backendsInMemoryCheckpointStorage, FileCheckpointStorage, CosmosCheckpointStorage (via agent-framework-azure-cosmos)CheckpointManager.CreateInMemory(), FileSystemJsonCheckpointStore wrapped in CreateJson(store), Cosmos via the .Azure.Cosmos package
Partition keyworkflow_name (from WorkflowBuilder(..., name=...))sessionId passed to RunStreamingAsync
Query latestawait storage.get_latest(workflow_name=...)(await store.RetrieveIndexAsync(sessionId)).Last() or run.Checkpoints.Last()
Resume — same runN/A (Python always rebuilds)await run.RestoreCheckpointAsync(savedCheckpoint)
Resume — fresh runworkflow.run(checkpoint_id=id, checkpoint_storage=storage, stream=True)InProcessExecution.ResumeStreamingAsync(newWorkflow, savedCheckpoint, checkpointManager)
SerializationPickle-in-JSON (base64); restricted unpickler by defaultSystem.Text.Json via CheckpointManager.CreateJson

Same state machine, different names. The surprising one is partitioning: Python groups by workflow name, .NET groups by session id. In the capstone we normalize to workflow name on both sides so pre-purchase and return-replace workflows can be queried identically across languages.

Gotchas
#

  • Resume accepts no new input. Python raises ValueError if you pass both input= and checkpoint_id=. .NET’s ResumeStreamingAsync doesn’t take an input parameter at all. The saved checkpoint is the input.
  • get_latest isn’t always what you want. In a workflow that ran to completion, the latest checkpoint has an empty pending-message queue. Resuming from it produces no output. Use list_checkpoints + your own timestamp sort when you want a specific superstep.
  • State must round-trip through the serializer. Python uses pickle-in-JSON; .NET uses System.Text.Json. Plain dicts, lists, primitives, dataclasses, records — all fine. Custom objects with runtime references (open file handles, sockets, asyncio.Lock instances) will either fail on save or deserialize into ghosts.
  • The workflow needs a name to be queryable. list_checkpoints(workflow_name=...) and get_latest(workflow_name=...) filter by the name you pass to WorkflowBuilder(..., name=...) in Python. .NET uses sessionId instead — decide your partition key up front, it’s in every API call.
  • Checkpoints pile up. No backend auto-deletes. Production Postgres/Cosmos deployments need a retention job (daily cron, DELETE FROM workflow_checkpoints WHERE created_at < now() - '7 days'::interval). The capstone ships a cleanup script.
  • Pickle is a trust boundary. FileCheckpointStorage and CosmosCheckpointStorage use pickle-in-JSON for non-native Python types. MAF’s restricted unpickler blocks arbitrary classes by default — whitelist your custom types with allowed_checkpoint_types=["my_app.models:SafeState"]. Never load a checkpoint from an untrusted source.
  • The __init__ args matter for edges and ids, not for state. When a new process rebuilds the workflow, the executor ids and edge topology must match what was saved. Changing the id of a checkpointed executor between versions orphans its saved state.
  • HITL requests survive checkpoints. Ch17’s RequestPort pauses are captured in the checkpoint. On resume, the framework re-emits the RequestInfoEvent so the caller can collect a response and call SendResponseAsync or pass responses= on the next run. You get crash-safe HITL for free once you add checkpoint storage.

Tests
#

Python ships 8 deterministic tests — no LLM, no network, no sleeps, no flakiness:

cd agents/python
uv run python -m pytest ../../tutorials/18-state-and-checkpoints/python/tests/ -v
# 8 passed in 0.46s

.NET builds cleanly under TreatWarningsAsErrors=true and runs end-to-end:

cd tutorials/18-state-and-checkpoints/dotnet
dotnet build                  # 0 warnings, 0 errors
dotnet run -- 10 5            # Phase 2 result: total = 15 (expected 15)

How this shows up in the capstone
#

Phase 7 plans/refactor/11-checkpointing.md ships a custom Postgres-backed CheckpointStorage (agents/python/shared/checkpoint_storage.py) that uses MAF’s encode_checkpoint_value / decode_checkpoint_value helpers to round-trip state through a workflow_checkpoints JSONB table. The factory (agents/python/shared/factory.py → get_checkpoint_storage) switches on MAF_CHECKPOINT_BACKEND env var so tests use memory, local dev uses file, and production uses postgres. Two workflows opt in:

  • workflows/pre_purchase.py — the slow fan-out to multiple suppliers. A pod restart mid-negotiation picks up where it left off instead of re-quoting every supplier.
  • workflows/return_replace.py — paired with Ch17 HITL. When a high-value return paused for approval and the customer doesn’t confirm for a day, the orchestrator rehydrates the checkpoint when the “Approve” webhook fires and resumes the discount-apply executor.

.NET has the mirror implementation at agents/dotnet/src/ECommerceAgents.Shared/Checkpoints/PostgresCheckpointStorage.cs plus a full CheckpointStorageFactory that switches on the same env var. The integration tests run against a real Postgres testcontainer — never mock the DB.

Further reading
#

MAF v1: Python and .NET - This article is part of a series.
Part 18: This Article

Related