Skip to main content

MAF v1 — Production hardening (Python + .NET)

Nitin Kumar Singh
Author
Nitin Kumar Singh
I build enterprise AI solutions and cloud-native systems. I write about architecture patterns, AI agents, Azure, and modern development practices — with full source code.
MAF v1 — Production hardening (Python + .NET)
MAF v1: Python and .NET - This article is part of a series.
Part 20.2: This Article

Series note — Companion chapter sitting between Ch20b — DevUI and Ch21 — Putting it all together. This is HTTP-layer hardening — not framework API surface — but it’s the gap most readers hit when they take the capstone past localhost. The original series introduced JWT auth in Part 7 — Production Readiness. That post covered the happy path. This one covers what production demanded six months in: password reset, refresh-token rotation with reuse detection, and graceful secret rotation.

Repo — Runnable code: tutorials/20c-production-hardening.

Why this chapter
#

JWT auth and RBAC, as covered in the original series, get you a working login flow. They don’t get you through a real ops cycle. Three concrete moments will eventually force the issue:

  1. A user forgets their password.
  2. A refresh token leaks (a stolen device, a logged URL, a backup snapshot).
  3. You need to roll the JWT signing key — or the x-agent-secret shared between agents — without a deploy window.

Each of these has a textbook answer; each of those answers has a footgun. This chapter walks the patterns the capstone settled on, in Python first, with the .NET twin called out where the idioms actually diverge.

Prerequisites
#

  • Familiarity with Part 7 — Production Readiness: Auth, RBAC, and Deployment — this picks up where that left off.
  • Python 3.12+ with fastapi, pyjwt, bcrypt, asyncpg (or the .NET equivalents).
  • Postgres 16 (the capstone’s database).
  • An SMTP relay or transactional-email provider for password reset. The code below assumes a send_email(to, subject, body) helper; wire whatever you use.

Part A: Password reset
#

Threat model first
#

A password-reset endpoint is a credential issuer. A leak in this flow is worse than a leak in login — the attacker writes the new credential. Three rules cover most of what can go wrong:

  • The token in the email is a one-shot bearer credential. Treat it like a password. Short TTL. Single-use. Never logged.
  • Store the hash, not the token. A database snapshot must not let an attacker complete pending resets.
  • Don’t leak account existence. POST /auth/forgot-password returns the same shape whether the email is registered or not. Otherwise the endpoint becomes an account-enumeration oracle.

Schema
#

CREATE TABLE password_reset_tokens (
    token_hash      TEXT PRIMARY KEY,           -- sha256(plaintext token)
    user_id         UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
    expires_at      TIMESTAMPTZ NOT NULL,
    consumed_at     TIMESTAMPTZ,
    created_at      TIMESTAMPTZ NOT NULL DEFAULT now(),
    requesting_ip   INET,
    user_agent      TEXT
);

CREATE INDEX idx_prt_user_unconsumed
    ON password_reset_tokens (user_id)
    WHERE consumed_at IS NULL;

The partial index on unconsumed rows lets us quickly invalidate prior pending tokens for the same user when a new one is issued (one outstanding token per user — see below).

Issuing a token
#

import hashlib, secrets
from datetime import datetime, timedelta, timezone

RESET_TOKEN_TTL = timedelta(minutes=15)

def _hash_token(token: str) -> str:
    return hashlib.sha256(token.encode()).hexdigest()

async def request_password_reset(email: str, request_ip: str, user_agent: str) -> None:
    """Generate and email a reset link. Never raises for unknown emails."""
    user = await db.fetch_one("SELECT id FROM users WHERE email = $1", email)
    if user is None:
        # Constant-time burn — same wall-clock as the success path
        await asyncio.sleep(0.05)
        return

    # Invalidate any outstanding pending token for this user
    await db.execute(
        "UPDATE password_reset_tokens SET consumed_at = now() "
        "WHERE user_id = $1 AND consumed_at IS NULL",
        user["id"],
    )

    plaintext = secrets.token_urlsafe(32)               # 256 bits
    await db.execute(
        "INSERT INTO password_reset_tokens "
        "(token_hash, user_id, expires_at, requesting_ip, user_agent) "
        "VALUES ($1, $2, $3, $4, $5)",
        _hash_token(plaintext),
        user["id"],
        datetime.now(timezone.utc) + RESET_TOKEN_TTL,
        request_ip,
        user_agent,
    )

    reset_url = f"{settings.frontend_base_url}/reset-password?token={plaintext}"
    await send_email(
        to=email,
        subject="Reset your ECommerce Agents password",
        body=f"Click to reset (expires in 15 minutes): {reset_url}",
    )

Three details that matter:

  • secrets.token_urlsafe(32) generates 256 bits of entropy, URL-safe. random.choice and uuid4 are both wrong here — random is not cryptographic, and uuid4 only carries 122 bits.
  • Plaintext goes only to the email. The DB never sees it. If the DB leaks, the attacker has hashes — which they can’t use to construct the URL.
  • Old pending tokens are invalidated when a new one is issued. Otherwise an attacker who triggered an earlier reset (and intercepted that email) can race the legitimate user.

Consuming the token
#

async def reset_password(token: str, new_password: str) -> bool:
    token_hash = _hash_token(token)
    async with db.transaction():
        row = await db.fetch_one(
            "SELECT user_id, expires_at, consumed_at "
            "FROM password_reset_tokens WHERE token_hash = $1 FOR UPDATE",
            token_hash,
        )
        if row is None:
            return False
        if row["consumed_at"] is not None:
            return False
        if row["expires_at"] < datetime.now(timezone.utc):
            return False

        await db.execute(
            "UPDATE password_reset_tokens SET consumed_at = now() "
            "WHERE token_hash = $1",
            token_hash,
        )
        await db.execute(
            "UPDATE users SET password_hash = $1 WHERE id = $2",
            hash_password(new_password),
            row["user_id"],
        )
        # Critical: revoke all existing refresh tokens for this user
        await db.execute(
            "UPDATE refresh_tokens SET revoked_at = now() "
            "WHERE user_id = $1 AND revoked_at IS NULL",
            row["user_id"],
        )
    return True

The FOR UPDATE row lock on the reset-token row is what makes single-use atomic across concurrent requests. The transaction also covers the users update and the refresh-token revocation — if anything fails, nothing changes.

The refresh-token revocation is the easy-to-forget step. Without it, a password reset doesn’t sign anyone out — an attacker who already had a refresh token (the reason for the reset) keeps minting access tokens. This is the single most common production bug in this flow.

Endpoint
#

@router.post("/auth/forgot-password", status_code=204)
async def forgot_password(req: ForgotPasswordRequest, request: Request):
    await request_password_reset(
        email=req.email,
        request_ip=request.client.host,
        user_agent=request.headers.get("user-agent", ""),
    )
    return Response(status_code=204)


@router.post("/auth/reset-password")
async def reset_password_endpoint(req: ResetPasswordRequest):
    ok = await reset_password(req.token, req.new_password)
    if not ok:
        # Same response on expired/consumed/unknown — no oracle
        raise HTTPException(400, detail="Invalid or expired reset token")
    return {"ok": True}

forgot-password always returns 204. The endpoint is unconditionally rate-limited (10 per IP per hour and 3 per email per day are reasonable starting points; the capstone uses Redis for both counters). Rate limits exist to throttle the email send-budget — not to prove anything to an attacker.

.NET note
#

ASP.NET’s IDataProtector plus a small ResetTokenStore table covers the same ground. The DataProtectionTokenProvider<TUser> that ships with Microsoft.AspNetCore.Identity is useful only if you’re already using Identity; if not, the secrets.token_urlsafe(32)SHA256 pattern translates one-for-one to RandomNumberGenerator.GetBytes(32) + Base64Url + SHA256.HashData. Don’t reach for Guid.NewGuid() here.


Part B: Refresh-token rotation with reuse detection
#

The original series ships short-lived access tokens (60 min) and long-lived refresh tokens (7 days), both as JWTs. That’s enough to log in once and stay logged in. It’s not enough to detect a stolen refresh.

The hardening: every refresh issues a new refresh token and invalidates the previous one. If a refresh token is presented after it has been used, the entire token family is revoked and the user is forced to re-authenticate.

This catches the canonical attack: an attacker steals a refresh token from a backup, the legitimate user later refreshes (rotating it), and then the attacker tries to refresh with the stale value. The replay attempt triggers a family-wide revocation — the legitimate user is signed out, but so is the attacker, and the security team gets a paged incident.

Token family lineage
#

%%{init: {'theme':'base', 'themeVariables': { 'primaryColor': '#2563eb','primaryTextColor': '#ffffff','primaryBorderColor': '#1e40af', 'lineColor': '#64748b','secondaryColor': '#f59e0b','tertiaryColor': '#10b981', 'background': 'transparent'}}}%% flowchart LR classDef issued fill:#10b981,stroke:#047857,color:#ffffff classDef rotated fill:#64748b,stroke:#334155,color:#ffffff classDef revoked fill:#ef4444,stroke:#991b1b,color:#ffffff login(["Login"]) rt0["RT0 — fresh"] rt1["RT1 — fresh"] rt2["RT2 — current"] attacker(["Attacker replays RT0"]) family["Entire family revoked"] login --> rt0 rt0 -- "refresh" --> rt1 rt1 -- "refresh" --> rt2 rt0 -. replay .-> attacker attacker --> family rt2 -- "revoked too" --> family class login,rt0,rt1,rt2 rotated class attacker,family revoked

The family_id ties RT0, RT1, RT2 together. A reuse attempt on any rotated token revokes every descendant in the family.

Schema
#

CREATE TABLE refresh_tokens (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    family_id       UUID NOT NULL,                 -- shared across the lineage
    parent_id       UUID REFERENCES refresh_tokens(id),
    user_id         UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
    token_hash      TEXT NOT NULL UNIQUE,          -- sha256 of the JWT
    issued_at       TIMESTAMPTZ NOT NULL DEFAULT now(),
    expires_at      TIMESTAMPTZ NOT NULL,
    rotated_at      TIMESTAMPTZ,                   -- set when used to mint a child
    revoked_at      TIMESTAMPTZ                    -- set when family is killed
);

CREATE INDEX idx_rt_family ON refresh_tokens (family_id);
CREATE INDEX idx_rt_user_active
    ON refresh_tokens (user_id)
    WHERE revoked_at IS NULL;

token_hash (not the raw JWT) is what we look up. JWTs are bearer credentials — we should not store them at rest in plaintext.

Refresh handler
#

async def refresh(presented_token: str) -> tuple[str, str]:
    """Returns (new_access_token, new_refresh_token) or raises."""
    payload = jwt.decode(presented_token, settings.jwt_public_key, algorithms=["RS256"])
    if payload.get("type") != "refresh":
        raise InvalidTokenError("Wrong token type")

    token_hash = _hash_token(presented_token)

    async with db.transaction():
        row = await db.fetch_one(
            "SELECT id, family_id, user_id, rotated_at, revoked_at, expires_at "
            "FROM refresh_tokens WHERE token_hash = $1 FOR UPDATE",
            token_hash,
        )
        if row is None:
            raise InvalidTokenError("Unknown token")
        if row["revoked_at"] is not None:
            raise InvalidTokenError("Token revoked")
        if row["expires_at"] < datetime.now(timezone.utc):
            raise InvalidTokenError("Token expired")

        # Reuse detection: this token has already been rotated
        if row["rotated_at"] is not None:
            await db.execute(
                "UPDATE refresh_tokens SET revoked_at = now() "
                "WHERE family_id = $1 AND revoked_at IS NULL",
                row["family_id"],
            )
            log.warning("refresh_reuse_detected", user_id=str(row["user_id"]),
                        family_id=str(row["family_id"]))
            raise InvalidTokenError("Token reuse detected — family revoked")

        # Mint child
        new_jwt = _issue_refresh_jwt(row["user_id"])
        new_id = uuid.uuid4()
        await db.execute(
            "INSERT INTO refresh_tokens "
            "(id, family_id, parent_id, user_id, token_hash, expires_at) "
            "VALUES ($1, $2, $3, $4, $5, $6)",
            new_id, row["family_id"], row["id"], row["user_id"],
            _hash_token(new_jwt),
            datetime.now(timezone.utc) + REFRESH_TTL,
        )
        await db.execute(
            "UPDATE refresh_tokens SET rotated_at = now() WHERE id = $1",
            row["id"],
        )

    return _issue_access_jwt(row["user_id"]), new_jwt

The whole sequence sits inside one transaction with a row lock on the presented token. That guarantees:

  • Two concurrent refreshes with the same token can’t both succeed (one will block on FOR UPDATE, then see rotated_at set, then trigger family revocation — which is the right behaviour: simultaneous use of the same refresh is exactly the attack signature).
  • The “mint child + mark parent rotated” pair is atomic. A crash mid-flight leaves the parent unmodified.

Logout
#

async def logout(refresh_jwt: str) -> None:
    token_hash = _hash_token(refresh_jwt)
    await db.execute(
        "UPDATE refresh_tokens SET revoked_at = now() "
        "WHERE family_id = (SELECT family_id FROM refresh_tokens WHERE token_hash = $1) "
        "AND revoked_at IS NULL",
        token_hash,
    )

Logout revokes the entire family, not just the presented token. This matters because it makes “Sign out everywhere” trivial — the same query, run for every active family belonging to the user.

What this doesn’t catch
#

Refresh-token rotation catches replay of stolen refresh tokens. It does not catch:

  • Theft of the access token before it expires. Access tokens are stateless; the only mitigation is short TTL. 60 minutes is the upper bound; 15 is more honest.
  • Compromise of the JWT signing key. That’s Part C.
  • A logged-in attacker abusing the legitimate session. Audit your tool calls (Ch06 middleware), don’t try to fix this in auth.

.NET note
#

The pattern is identical with Microsoft.IdentityModel.Tokens for JWT validation and EF Core for the table. The one trap: JwtBearerOptions.SaveToken = true will stash the access JWT on HttpContext; that’s harmless but unnecessary if you’re already extracting it explicitly.


Part C: Graceful secret rotation
#

Two kinds of secrets need rotation in the capstone:

  1. JWT signing keys — the asymmetric key pair used to sign access and refresh tokens. RS256 means there’s a public key (used by every agent for verification) and a private key (used only by the orchestrator’s auth service to sign).
  2. Inter-agent shared secrets — the x-agent-secret header that a specialist accepts as proof the request came from an authorized orchestrator.

Rotation that requires a deploy window or simultaneous restart is the same as no rotation — it never happens. The pattern below makes both kinds rotatable on a live system.

The two-key window
#

The trick is the same in both cases: maintain a list of valid keys, sign with the newest, accept any. A new key is “promoted in” hours or days before the old one is “demoted out.” During the overlap, both work.

# settings.py
JWT_KEYS: dict[str, RSAKey] = {
    "2026-04-15": load_rsa_key("keys/jwt-2026-04-15.pem"),  # current signer
    "2026-03-01": load_rsa_key("keys/jwt-2026-03-01.pem"),  # accepted, retiring
}
JWT_CURRENT_KID = "2026-04-15"

Signing (always uses current)
#

def issue_access_jwt(user_id: UUID) -> str:
    return jwt.encode(
        {
            "sub": str(user_id),
            "type": "access",
            "exp": datetime.now(timezone.utc) + ACCESS_TTL,
        },
        settings.JWT_KEYS[settings.JWT_CURRENT_KID].private,
        algorithm="RS256",
        headers={"kid": settings.JWT_CURRENT_KID},
    )

The kid (key ID) header is what makes the verifier’s job trivial.

Verifying (looks up by kid, rejects unknown)
#

def decode_access_jwt(token: str) -> dict:
    header = jwt.get_unverified_header(token)
    kid = header.get("kid")
    if kid not in settings.JWT_KEYS:
        raise InvalidTokenError(f"Unknown key id: {kid}")
    return jwt.decode(
        token,
        settings.JWT_KEYS[kid].public,
        algorithms=["RS256"],
    )

Rotation lifecycle
#

%%{init: {'theme':'base', 'themeVariables': { 'primaryColor': '#2563eb','primaryTextColor': '#ffffff','primaryBorderColor': '#1e40af', 'lineColor': '#64748b','secondaryColor': '#f59e0b','tertiaryColor': '#10b981', 'background': 'transparent'}}}%% flowchart LR classDef sign fill:#10b981,stroke:#047857,color:#ffffff classDef accept fill:#2563eb,stroke:#1e40af,color:#ffffff classDef gone fill:#64748b,stroke:#334155,color:#ffffff k0["Key A
signs"]:::sign k0a["Key A
accepts only"]:::accept k0g["Key A
removed"]:::gone k1i["Key B
added,
accepts only"]:::accept k1s["Key B
signs"]:::sign k0 -- "T0+0d:
add Key B" --> k1i k1i -- "T0+1d:
promote Key B" --> k1s k0 -- "T0+1d:
demote Key A" --> k0a k0a -- "T0+8d (>access TTL):
remove Key A" --> k0g

Day 0: add Key B as accepted-only. Day 1: switch the signer to Key B; Key A keeps verifying tokens already in flight. Day 8 (or whenever every Key A access token has expired): remove Key A from the keyring.

The “Day 8” gap matters — it must be longer than the longest access-token TTL plus any refresh slack. With a 60-minute access TTL, 24 hours is plenty. With a 7-day refresh, you wait a week before pulling the old key (or you accept that a small number of users will get one forced re-login).

Operationalizing
#

The JWT_KEYS dict above is a static reload. In production you want it to come from a key vault or a config service that supports atomic updates:

  • Azure Key Vault — store keys as secrets keyed by kid. Cache them locally with a 60-second TTL; the cache miss is acceptable because rotation is a planned event.
  • Filesystem with a watcherinotify/FileSystemWatcher on the keys directory; the agent reloads JWT_KEYS when a .pem is added or removed. Cheaper than a vault for self-hosted deployments.
  • A /admin/reload-keys endpoint — gated behind admin RBAC, useful as a manual fallback when the watcher misses an event.

Whichever way, the application code stays the same: it reads from settings.JWT_KEYS and trusts that someone (orchestration, watcher, manual call) keeps the dict current.

x-agent-secret rotation
#

The agent-to-agent shared secret is even simpler — there’s no kid because the header value is the secret. Solution: accept a list of secrets.

# settings.py
AGENT_SECRETS: set[str] = {
    "current_value_set_at_2026_04_15",   # promoted today
    "previous_value_set_at_2026_03_01",  # being retired
}
AGENT_SECRET_CURRENT = "current_value_set_at_2026_04_15"


def verify_agent_secret(presented: str) -> bool:
    # Constant-time compare against any allowed value
    for allowed in settings.AGENT_SECRETS:
        if hmac.compare_digest(presented, allowed):
            return True
    return False


def outgoing_agent_secret() -> str:
    return settings.AGENT_SECRET_CURRENT

The orchestrator always sends the current secret; specialists accept either. The rotation script:

  1. Generate new secret. Add it to AGENT_SECRETS on every specialist (rolling restart or live config reload). Don’t change AGENT_SECRET_CURRENT yet.
  2. Once every specialist accepts the new secret, switch AGENT_SECRET_CURRENT on the orchestrator. The orchestrator now sends the new value.
  3. After the access-token TTL, remove the old secret from AGENT_SECRETS on the specialists.

hmac.compare_digest instead of == matters: the latter short-circuits on the first different byte, which leaks length and content via timing.

.NET note
#

IConfiguration reload-on-change combined with IOptionsMonitor<JwtKeyringOptions> does the equivalent without a watcher. For Key Vault, the Azure.Extensions.AspNetCore.Configuration.Secrets package wires it up directly. The constant-time compare is CryptographicOperations.FixedTimeEquals — never string.Equals.


Putting it together
#

Three checklists, copy-paste:

On every login flow:

  • Access token is RS256 with a kid header.
  • Refresh token is also a JWT with kid, hashed in the DB, and tied to a family_id.
  • Logout revokes the entire family.

On every refresh:

  • Verify with whatever key the kid selects.
  • Look up the token by sha256 hash, in a transaction, with FOR UPDATE.
  • If rotated_at is set on the presented token, revoke the entire family.

On every secret-rotation event:

  • Add the new key to the verifier set first.
  • Promote the signer once every verifier accepts the new key.
  • Remove the old key only after the longest live token has expired.

What changes for the capstone
#

The capstone today (agents/python/shared/auth.py) ships the original Part 7 design — single JWT signing key, simple refresh, no reset flow. Phase 8 of the refactor plan tracks the migration:

  • agents/python/shared/auth/keyring.pyJWT_KEYS map, decode_access_jwt, outgoing_agent_secret.
  • migrations/0042_password_reset_and_refresh_families.sql — the two new tables and the partial indexes.
  • agents/python/orchestrator/auth_routes.py/auth/forgot-password, /auth/reset-password, refactored /auth/refresh.

None of this changes the agent code. Specialists keep using verify_agent_secret; tools keep reading current_user_email. The hardening lives entirely in the auth boundary.

What’s next
#

That closes the production-hardening loop the original Part 7 left open. Ch21 — Putting it all together is the final chapter: a guided tour of the live e-commerce stack with file:line citations for every concept covered across the series.

MAF v1: Python and .NET - This article is part of a series.
Part 20.2: This Article

Related