"""Minimal RFC 9421 HTTP Message Signatures for the PoC. Covers just enough of RFC 9421 + draft-ietf-wimse-http-signature-03 to bind an ECT-bearing request to its method, target URI, body digest, and the Wimse-ECT header itself. Not a general-purpose implementation — the signed-component serialization follows RFC 9421 §2.3 for this fixed set of components only. Signature metadata parameters per draft-ietf-wimse-http-signature-03: keyid — kid of the signing workload alg — "ecdsa-p256-sha256" created — NumericDate seconds wimse-aud — target audience workload identity (new in -03, replaces the removed Wimse-Audience HTTP header) Format: Signature-Input: sig1=(\"@method\" \"@target-uri\" \"content-digest\" \ \"wimse-ect\");created=...;keyid=\"...\";\ alg=\"ecdsa-p256-sha256\";wimse-aud=\"...\" Signature: sig1=:: """ from __future__ import annotations import base64 import hashlib import time from dataclasses import dataclass from typing import Iterable from urllib.parse import urlsplit from cryptography.hazmat.primitives.asymmetric.ec import ( EllipticCurvePrivateKey, EllipticCurvePublicKey, ) from act.crypto import sign as act_sign, verify as act_verify from act.errors import ACTSignatureError COVERED_COMPONENTS: tuple[str, ...] = ( "@method", "@target-uri", "content-digest", "wimse-ect", ) SIG_ALG = "ecdsa-p256-sha256" def content_digest(body: bytes) -> str: """RFC 9530 Content-Digest header value using sha-256.""" digest = hashlib.sha256(body).digest() return "sha-256=:" + base64.b64encode(digest).decode("ascii") + ":" def _serialize_components( *, method: str, target_uri: str, content_digest_hdr: str, wimse_ect_hdr: str, params: str, ) -> bytes: """RFC 9421 §2.3 signature base for the fixed PoC component set.""" lines = [ f'"@method": {method.upper()}', f'"@target-uri": {target_uri}', f'"content-digest": {content_digest_hdr}', f'"wimse-ect": {wimse_ect_hdr}', f'"@signature-params": {params}', ] return "\n".join(lines).encode("ascii") def _signature_params( *, created: int, keyid: str, wimse_aud: str, ) -> str: """Render the signature-params string (quoted inner-list + params).""" components = " ".join(f'"{c}"' for c in COVERED_COMPONENTS) return ( f"({components})" f";created={created}" f';keyid="{keyid}"' f';alg="{SIG_ALG}"' f';wimse-aud="{wimse_aud}"' ) @dataclass class SignedRequest: content_digest: str signature_input: str signature: str def sign_request( *, method: str, target_uri: str, body: bytes, wimse_ect: str, wimse_aud: str, keyid: str, private_key: EllipticCurvePrivateKey, created: int | None = None, ) -> SignedRequest: """Produce the three headers needed to send a signed PoC request.""" created = int(created if created is not None else time.time()) cd = content_digest(body) params = _signature_params(created=created, keyid=keyid, wimse_aud=wimse_aud) base = _serialize_components( method=method, target_uri=target_uri, content_digest_hdr=cd, wimse_ect_hdr=wimse_ect, params=params, ) sig = act_sign(private_key, base) sig_b64 = base64.b64encode(sig).decode("ascii") return SignedRequest( content_digest=cd, signature_input=f"sig1={params}", signature=f"sig1=:{sig_b64}:", ) @dataclass class ParsedSignature: covered: tuple[str, ...] created: int keyid: str alg: str wimse_aud: str raw_params: str signature_b64: str def _parse_signature_input(value: str) -> ParsedSignature: """Parse 'sig1=(...);created=...;keyid="...";alg="...";wimse-aud="..."'.""" if "=" not in value: raise ValueError("signature-input: missing label") _label, _, inner = value.partition("=") # inner: '("a" "b" ...);created=...;keyid="...";...' if not inner.startswith("("): raise ValueError("signature-input: missing covered components list") close = inner.index(")") components_raw = inner[1:close] covered = tuple( part.strip().strip('"') for part in components_raw.split() if part.strip() ) rest = inner[close + 1 :] params: dict[str, str] = {} for part in rest.split(";"): part = part.strip() if not part or "=" not in part: continue k, _, v = part.partition("=") params[k.strip()] = v.strip().strip('"') return ParsedSignature( covered=covered, created=int(params["created"]), keyid=params["keyid"], alg=params["alg"], wimse_aud=params["wimse-aud"], raw_params=inner, # full params string (for sig base reconstruction) signature_b64="", ) def _parse_signature(value: str) -> str: """Parse 'sig1=::' → base64 string.""" if "=" not in value: raise ValueError("signature: missing label") _label, _, inner = value.partition("=") inner = inner.strip() if not (inner.startswith(":") and inner.endswith(":")): raise ValueError("signature: expected byte-sequence form :...:" ) return inner[1:-1] def verify_request( *, method: str, target_uri: str, body: bytes, wimse_ect_header: str, content_digest_header: str, signature_input_header: str, signature_header: str, expected_audience: str, public_key: EllipticCurvePublicKey, max_age_seconds: int = 300, now: int | None = None, ) -> ParsedSignature: """Verify the signature covers the expected components and matches. Returns the parsed signature metadata (keyid, alg, wimse-aud, created) so the caller can cross-check against ECT/ACT claims. """ parsed = _parse_signature_input(signature_input_header) if parsed.covered != COVERED_COMPONENTS: raise ACTSignatureError( f"signed components {parsed.covered!r} differ from expected " f"{COVERED_COMPONENTS!r}" ) if parsed.alg != SIG_ALG: raise ACTSignatureError(f"unexpected alg {parsed.alg!r}") if parsed.wimse_aud != expected_audience: raise ACTSignatureError( f"wimse-aud {parsed.wimse_aud!r} != expected {expected_audience!r}" ) current = int(now if now is not None else time.time()) if current - parsed.created > max_age_seconds: raise ACTSignatureError( f"signature too old: created={parsed.created}, now={current}" ) expected_digest = content_digest(body) if content_digest_header != expected_digest: raise ACTSignatureError("content-digest does not match body") sig_b64 = _parse_signature(signature_header) sig = base64.b64decode(sig_b64) base = _serialize_components( method=method, target_uri=target_uri, content_digest_hdr=content_digest_header, wimse_ect_hdr=wimse_ect_header, params=parsed.raw_params, ) act_verify(public_key, sig, base) # raises ACTSignatureError on bad sig parsed.signature_b64 = sig_b64 return parsed