"""ACT cryptographic primitives and key management. Provides sign/verify operations and key resolution across all three ACT trust tiers: - Tier 1: Pre-shared Ed25519 and P-256 keys - Tier 2: PKI / X.509 certificate chains - Tier 3: DID (did:key self-contained, did:web via resolver callback) Reference: ACT §5 (Trust Model), §8 (Verification Procedure). """ from __future__ import annotations import base64 import hashlib import re from typing import Any, Callable, Protocol from cryptography.exceptions import InvalidSignature from cryptography.hazmat.primitives.asymmetric.ec import ( ECDSA, SECP256R1, EllipticCurvePrivateKey, EllipticCurvePublicKey, generate_private_key as ec_generate_private_key, ) from cryptography.hazmat.primitives.asymmetric.ed25519 import ( Ed25519PrivateKey, Ed25519PublicKey, ) from cryptography.hazmat.primitives.hashes import SHA256 from cryptography.x509 import ( Certificate, load_der_x509_certificate, ) from .errors import ( ACTKeyResolutionError, ACTSignatureError, ACTValidationError, ) # Type aliases for public/private keys supported by ACT. PublicKey = Ed25519PublicKey | EllipticCurvePublicKey PrivateKey = Ed25519PrivateKey | EllipticCurvePrivateKey # Callback type for DID:web resolution. DIDResolver = Callable[[str], PublicKey | None] def generate_ed25519_keypair() -> tuple[Ed25519PrivateKey, Ed25519PublicKey]: """Generate an Ed25519 key pair for ACT signing. Returns a (private_key, public_key) tuple. The private key object carries its associated public key per ACT security requirements. Reference: ACT §5.2 (Tier 1 pre-shared keys). """ private_key = Ed25519PrivateKey.generate() return private_key, private_key.public_key() def generate_p256_keypair() -> tuple[EllipticCurvePrivateKey, EllipticCurvePublicKey]: """Generate a P-256 (ES256) key pair for ACT signing. Returns a (private_key, public_key) tuple. Reference: ACT §5.2 (Tier 1 pre-shared keys). """ private_key = ec_generate_private_key(SECP256R1()) return private_key, private_key.public_key() def sign(private_key: PrivateKey, data: bytes) -> bytes: """Sign data using the appropriate algorithm for the key type. Uses Ed25519 for Ed25519PrivateKey, ECDSA with SHA-256 for P-256. Returns raw signature bytes (for Ed25519: 64 bytes; for ES256: raw r||s format per RFC 7518 §3.4). Reference: ACT §5, RFC 7515 §5.1. Raises: ACTValidationError: If the key type is not supported. """ if isinstance(private_key, Ed25519PrivateKey): return private_key.sign(data) elif isinstance(private_key, EllipticCurvePrivateKey): from cryptography.hazmat.primitives.asymmetric.utils import ( decode_dss_signature, ) # Sign with DER-encoded signature, then convert to raw r||s der_sig = private_key.sign(data, ECDSA(SHA256())) r, s = decode_dss_signature(der_sig) # P-256 uses 32-byte integers return r.to_bytes(32, "big") + s.to_bytes(32, "big") else: raise ACTValidationError(f"Unsupported key type: {type(private_key)}") def verify(public_key: PublicKey, signature: bytes, data: bytes) -> None: """Verify a signature against the given public key and data. Reference: ACT §8.1 step 5. Raises: ACTSignatureError: If the signature is invalid. ACTValidationError: If the key type is not supported. """ try: if isinstance(public_key, Ed25519PublicKey): public_key.verify(signature, data) elif isinstance(public_key, EllipticCurvePublicKey): from cryptography.hazmat.primitives.asymmetric.utils import ( encode_dss_signature, ) # Convert raw r||s back to DER r = int.from_bytes(signature[:32], "big") s = int.from_bytes(signature[32:], "big") der_sig = encode_dss_signature(r, s) public_key.verify(der_sig, data, ECDSA(SHA256())) else: raise ACTValidationError( f"Unsupported key type: {type(public_key)}" ) except InvalidSignature as e: raise ACTSignatureError("Signature verification failed") from e def compute_sha256(data: bytes) -> bytes: """Compute SHA-256 hash of data. Used for delegation chain signatures and inp_hash/out_hash claims. Reference: ACT §6.1 (delegation sig), §4.3 (inp_hash, out_hash). """ return hashlib.sha256(data).digest() def b64url_sha256(data: bytes) -> str: """Compute base64url(SHA-256(data)) without padding. Used for inp_hash and out_hash claims. Reference: ACT §4.3. """ digest = compute_sha256(data) return base64.urlsafe_b64encode(digest).rstrip(b"=").decode("ascii") def x509_kid(cert_der: bytes) -> str: """Compute the Tier 2 kid: SHA-256 thumbprint of DER certificate. Reference: ACT §5.3 (Tier 2 kid format). """ return hashlib.sha256(cert_der).hexdigest() class KeyRegistry: """Tier 1 pre-shared key registry. Maps kid strings to public keys. Configured at initialization time with no external resolution needed. Reference: ACT §5.2 (Tier 1 Pre-Shared Keys). """ def __init__(self) -> None: self._keys: dict[str, PublicKey] = {} def register(self, kid: str, public_key: PublicKey) -> None: """Register a public key under the given kid. Reference: ACT §5.2. """ self._keys[kid] = public_key def get(self, kid: str) -> PublicKey | None: """Retrieve the public key for a kid, or None if not found.""" return self._keys.get(kid) def __contains__(self, kid: str) -> bool: return kid in self._keys def __len__(self) -> int: return len(self._keys) class X509TrustStore: """Tier 2 PKI/X.509 trust store. Holds trusted CA certificates and resolves kid (certificate thumbprint) to public keys. Supports x5c header chain validation. Reference: ACT §5.3 (Tier 2 PKI). """ def __init__(self) -> None: self._trusted_certs: dict[str, Certificate] = {} def add_trusted_cert(self, cert: Certificate) -> str: """Add a trusted certificate to the store. Returns the kid (SHA-256 thumbprint of DER encoding). Reference: ACT §5.3. """ from cryptography.hazmat.primitives.serialization import Encoding der_bytes = cert.public_bytes(Encoding.DER) kid = x509_kid(der_bytes) self._trusted_certs[kid] = cert return kid def resolve(self, kid: str) -> PublicKey | None: """Resolve kid to a public key from a trusted certificate. Reference: ACT §5.3, §8.1 step 4. """ cert = self._trusted_certs.get(kid) if cert is None: return None pub = cert.public_key() if isinstance(pub, (Ed25519PublicKey, EllipticCurvePublicKey)): return pub return None def resolve_x5c(self, x5c: list[str]) -> PublicKey | None: """Resolve public key from x5c certificate chain. The first entry in x5c is the end-entity certificate. Validates that the chain terminates in a trusted CA. Reference: ACT §4.1 (x5c header), §5.3. """ if not x5c: return None try: # Decode certificates from base64 DER certs = [ load_der_x509_certificate(base64.b64decode(c)) for c in x5c ] except Exception: return None # Check if any cert in the chain is in our trust store from cryptography.hazmat.primitives.serialization import Encoding for cert in certs: der_bytes = cert.public_bytes(Encoding.DER) kid = x509_kid(der_bytes) if kid in self._trusted_certs: # End-entity cert is the first one ee_pub = certs[0].public_key() if isinstance(ee_pub, (Ed25519PublicKey, EllipticCurvePublicKey)): return ee_pub return None return None # --- Tier 3: DID Support --- # Multicodec prefixes for did:key _ED25519_MULTICODEC = b"\xed\x01" _P256_MULTICODEC = b"\x80\x24" def _multibase_decode(encoded: str) -> bytes: """Decode a multibase-encoded string (base58btc 'z' prefix). Reference: ACT §5.4 (Tier 3 DID:key). """ if not encoded.startswith("z"): raise ACTKeyResolutionError( f"Unsupported multibase encoding prefix: {encoded[0]!r}" ) return _base58btc_decode(encoded[1:]) def _base58btc_decode(s: str) -> bytes: """Decode a base58btc string.""" alphabet = "123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz" n = 0 for ch in s: idx = alphabet.index(ch) n = n * 58 + idx # Compute byte length byte_length = (n.bit_length() + 7) // 8 result = n.to_bytes(byte_length, "big") if byte_length > 0 else b"" # Preserve leading zeros leading_zeros = len(s) - len(s.lstrip("1")) return b"\x00" * leading_zeros + result def resolve_did_key(did: str) -> PublicKey: """Resolve a did:key identifier to a public key. Supports Ed25519 and P-256 key types. The did:key method is self-contained — no external resolution is needed. Reference: ACT §5.4 (Tier 3 DID:key). Raises: ACTKeyResolutionError: If the DID cannot be resolved. """ # Strip fragment if present (e.g., did:key:z6Mk...#z6Mk...) did_base = did.split("#")[0] if not did_base.startswith("did:key:"): raise ACTKeyResolutionError( f"Not a did:key identifier: {did!r}" ) multibase_value = did_base[len("did:key:"):] try: decoded = _multibase_decode(multibase_value) except Exception as e: raise ACTKeyResolutionError( f"Failed to decode did:key multibase value: {e}" ) from e if decoded[:2] == _ED25519_MULTICODEC: raw_key = decoded[2:] if len(raw_key) != 32: raise ACTKeyResolutionError( f"Ed25519 public key must be 32 bytes, got {len(raw_key)}" ) return Ed25519PublicKey.from_public_bytes(raw_key) elif decoded[:2] == _P256_MULTICODEC: raw_key = decoded[2:] from cryptography.hazmat.primitives.asymmetric.ec import ( EllipticCurvePublicKey as ECPub, ) from cryptography.hazmat.primitives.serialization import ( load_der_public_key, ) # P-256 compressed point (33 bytes) or uncompressed (65 bytes) # Wrap in SubjectPublicKeyInfo for loading try: return EllipticCurvePublicKey.from_encoded_point( SECP256R1(), raw_key ) except Exception as e: raise ACTKeyResolutionError( f"Failed to load P-256 key from did:key: {e}" ) from e else: raise ACTKeyResolutionError( f"Unsupported multicodec prefix in did:key: {decoded[:2]!r}" ) def did_key_from_ed25519(public_key: Ed25519PublicKey) -> str: """Create a did:key identifier from an Ed25519 public key. Reference: ACT §5.4 (Tier 3 DID:key). """ from cryptography.hazmat.primitives.serialization import ( Encoding, PublicFormat, ) raw = public_key.public_bytes(Encoding.Raw, PublicFormat.Raw) multicodec = _ED25519_MULTICODEC + raw encoded = "z" + _base58btc_encode(multicodec) return f"did:key:{encoded}" def _base58btc_encode(data: bytes) -> str: """Encode bytes as base58btc.""" alphabet = "123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz" # Count leading zeros leading_zeros = 0 for b in data: if b == 0: leading_zeros += 1 else: break n = int.from_bytes(data, "big") if n == 0: return "1" * leading_zeros chars: list[str] = [] while n > 0: n, remainder = divmod(n, 58) chars.append(alphabet[remainder]) return "1" * leading_zeros + "".join(reversed(chars)) class ACTKeyResolver: """Unified key resolver across all trust tiers. Tries Tier 1 (pre-shared), then Tier 2 (X.509), then Tier 3 (DID) to resolve a kid to a public key. Reference: ACT §5 (Trust Model), §8.1 step 4. """ def __init__( self, registry: KeyRegistry | None = None, x509_store: X509TrustStore | None = None, did_web_resolver: DIDResolver | None = None, ) -> None: self._registry = registry or KeyRegistry() self._x509_store = x509_store or X509TrustStore() self._did_web_resolver = did_web_resolver @property def registry(self) -> KeyRegistry: """Access the Tier 1 key registry.""" return self._registry @property def x509_store(self) -> X509TrustStore: """Access the Tier 2 X.509 trust store.""" return self._x509_store def resolve( self, kid: str, header: dict[str, Any] | None = None, ) -> PublicKey: """Resolve a kid to a public key, trying all configured tiers. Resolution order: 1. Tier 1: Pre-shared key registry lookup by kid 2. Tier 2: X.509 certificate lookup by kid (thumbprint) or x5c header chain validation 3. Tier 3: DID resolution (did:key or did:web) Reference: ACT §5 (Trust Model), §8.1 step 4. Raises: ACTKeyResolutionError: If no key can be resolved for the kid. """ header = header or {} # Tier 1: Pre-shared keys key = self._registry.get(kid) if key is not None: return key # Tier 2: X.509 key = self._x509_store.resolve(kid) if key is not None: return key # Tier 2: x5c chain in header x5c = header.get("x5c") if x5c: key = self._x509_store.resolve_x5c(x5c) if key is not None: return key # Tier 3: DID did_value = header.get("did") or kid if did_value.startswith("did:key:"): try: return resolve_did_key(did_value) except ACTKeyResolutionError: pass if did_value.startswith("did:web:") and self._did_web_resolver: resolved = self._did_web_resolver(did_value) if resolved is not None: return resolved raise ACTKeyResolutionError( f"Cannot resolve kid {kid!r} to a public key via any trust tier" )