"""ECT verification per Section 7.""" from __future__ import annotations import hmac import time from dataclasses import dataclass from typing import Callable, Optional import jwt from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePublicKey from ect.types import ECT_TYPE, Payload, valid_pol_decision from ect.dag import ECTStore, DAGConfig, validate_dag from ect.validate import validate_ext, validate_hash_format, valid_uuid @dataclass class ParsedECT: header: dict payload: Payload raw: str KeyResolver = Callable[[str], Optional[EllipticCurvePublicKey]] @dataclass class VerifyOptions: verifier_id: str = "" resolve_key: Optional[KeyResolver] = None store: Optional[ECTStore] = None dag: Optional[DAGConfig] = None now: Optional[int] = None # unix seconds; None = time.time() iat_max_age_sec: int = 900 iat_max_future_sec: int = 30 jti_seen: Optional[Callable[[str], bool]] = None wit_subject: str = "" validate_uuids: bool = False max_par_length: int = 0 # 0 = no limit on_verify_attempt: Optional[Callable[[str, Optional[Exception]], None]] = None # (jti, err) for observability def default_verify_options() -> VerifyOptions: from ect.dag import default_dag_config return VerifyOptions(dag=default_dag_config()) def parse(compact: str) -> ParsedECT: """Parse compact JWS and return header + payload without verification.""" try: unverified = jwt.decode( compact, options={"verify_signature": False, "verify_exp": False}, ) except Exception as e: raise ValueError(f"ect: parse failed: {e}") from e header = jwt.get_unverified_header(compact) if header.get("alg") != "ES256": raise ValueError("ect: expected ES256") payload = Payload.from_claims(unverified) return ParsedECT(header=header, payload=payload, raw=compact) def verify(compact: str, opts: VerifyOptions) -> ParsedECT: """Full Section 7 verification and optional DAG validation.""" log_jti: list[str] = [""] # use list so callback sees updated jti def set_log_jti(jti: str) -> None: log_jti[0] = jti err: Optional[Exception] = None try: return _verify_impl(compact, opts, set_log_jti) except Exception as e: err = e raise finally: if opts.on_verify_attempt is not None: opts.on_verify_attempt(log_jti[0], err) def _verify_impl(compact: str, opts: VerifyOptions, set_log_jti: Callable[[str], None]) -> ParsedECT: header = jwt.get_unverified_header(compact) typ = header.get("typ") or "" # Constant-time comparison for typ if not hmac.compare_digest(typ, ECT_TYPE): raise ValueError("ect: invalid typ parameter") alg = header.get("alg") if alg in ("none", "HS256", "HS384", "HS512"): raise ValueError("ect: prohibited algorithm") kid = header.get("kid") if not kid: raise ValueError("ect: missing kid") if not opts.resolve_key: raise ValueError("ect: ResolveKey required") pub = opts.resolve_key(kid) if pub is None: raise ValueError("ect: unknown key identifier") try: claims = jwt.decode( compact, pub, algorithms=["ES256"], options={"verify_exp": False, "verify_aud": False, "verify_iat": False}, ) except jwt.InvalidSignatureError as e: raise ValueError(f"ect: invalid signature: {e}") from e except Exception as e: raise ValueError(f"ect: verify failed: {e}") from e payload = Payload.from_claims(claims) set_log_jti(payload.jti) validate_ext(payload.ext) if opts.max_par_length > 0 and len(payload.par) > opts.max_par_length: raise ValueError("ect: par exceeds max length") if opts.validate_uuids: if not valid_uuid(payload.jti): raise ValueError("ect: jti must be UUID format") if payload.wid and not valid_uuid(payload.wid): raise ValueError("ect: wid must be UUID format when set") if payload.inp_hash: validate_hash_format(payload.inp_hash) if payload.out_hash: validate_hash_format(payload.out_hash) if opts.wit_subject and payload.iss != opts.wit_subject: raise ValueError("ect: issuer does not match WIT subject") if opts.verifier_id and not payload.contains_audience(opts.verifier_id): raise ValueError("ect: audience does not include verifier") now = opts.now if opts.now is not None else int(time.time()) if now > payload.exp: raise ValueError("ect: token expired") if now - payload.iat > opts.iat_max_age_sec: raise ValueError("ect: iat too far in the past") if payload.iat > now + opts.iat_max_future_sec: raise ValueError("ect: iat in the future") # Required claims per spec: jti, exec_act, par. par may be set to [] when missing (from_claims already uses []). if not payload.jti or not payload.exec_act: raise ValueError("ect: missing required claims (jti, exec_act, par)") if payload.par is None: payload.par = [] # If pol or pol_decision present, both must be present and valid if payload.pol or payload.pol_decision: if not payload.pol or not payload.pol_decision: raise ValueError("ect: pol and pol_decision must both be present when either is set") if not valid_pol_decision(payload.pol_decision): raise ValueError("ect: invalid pol_decision value") if opts.store is not None and opts.dag is not None: validate_dag(payload, opts.store, opts.dag) if opts.jti_seen is not None and opts.jti_seen(payload.jti): raise ValueError("ect: jti already seen (replay)") return ParsedECT(header=header, payload=payload, raw=compact)