Files
Christian Nennemann 884d2dc836 feat: migrate refimpls from draft-00 to draft-01 claim names
- Rename `par` to `pred` (predecessor) in types, serialization, tests
- Remove `pol`, `pol_decision` from core payload; move to `ect_ext`
- Remove `sub` from payload (not part of ECT spec)
- Update `typ` from `wimse-exec+jwt` to `exec+jwt` (accept both)
- Rename MaxParLength to MaxPredLength everywhere
- Update testdata, demos, READMEs with migration table
- All Go tests pass, all 56 Python tests pass (90% coverage)
2026-04-03 10:55:58 +02:00

155 lines
5.4 KiB
Python

"""ECT verification per Section 7."""
from __future__ import annotations
import hmac
import time
from dataclasses import dataclass
from typing import Callable, Optional
import jwt
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePublicKey
from ect.types import ECT_TYPE, ECT_TYPE_LEGACY, Payload
from ect.dag import ECTStore, DAGConfig, validate_dag
from ect.validate import validate_ext, validate_hash_format, valid_uuid
@dataclass
class ParsedECT:
header: dict
payload: Payload
raw: str
KeyResolver = Callable[[str], Optional[EllipticCurvePublicKey]]
@dataclass
class VerifyOptions:
verifier_id: str = ""
resolve_key: Optional[KeyResolver] = None
store: Optional[ECTStore] = None
dag: Optional[DAGConfig] = None
now: Optional[int] = None # unix seconds; None = time.time()
iat_max_age_sec: int = 900
iat_max_future_sec: int = 30
jti_seen: Optional[Callable[[str], bool]] = None
wit_subject: str = ""
validate_uuids: bool = False
max_pred_length: int = 0 # 0 = no limit
on_verify_attempt: Optional[Callable[[str, Optional[Exception]], None]] = None # (jti, err) for observability
def default_verify_options() -> VerifyOptions:
from ect.dag import default_dag_config
return VerifyOptions(dag=default_dag_config())
def parse(compact: str) -> ParsedECT:
"""Parse compact JWS and return header + payload without verification."""
try:
unverified = jwt.decode(
compact,
options={"verify_signature": False, "verify_exp": False},
)
except Exception as e:
raise ValueError(f"ect: parse failed: {e}") from e
header = jwt.get_unverified_header(compact)
if header.get("alg") != "ES256":
raise ValueError("ect: expected ES256")
payload = Payload.from_claims(unverified)
return ParsedECT(header=header, payload=payload, raw=compact)
def verify(compact: str, opts: VerifyOptions) -> ParsedECT:
"""Full Section 7 verification and optional DAG validation."""
log_jti: list[str] = [""] # use list so callback sees updated jti
def set_log_jti(jti: str) -> None:
log_jti[0] = jti
err: Optional[Exception] = None
try:
return _verify_impl(compact, opts, set_log_jti)
except Exception as e:
err = e
raise
finally:
if opts.on_verify_attempt is not None:
opts.on_verify_attempt(log_jti[0], err)
def _verify_impl(compact: str, opts: VerifyOptions, set_log_jti: Callable[[str], None]) -> ParsedECT:
header = jwt.get_unverified_header(compact)
typ = header.get("typ") or ""
# Constant-time comparison for typ; accept both preferred and legacy values
if not hmac.compare_digest(typ, ECT_TYPE) and not hmac.compare_digest(typ, ECT_TYPE_LEGACY):
raise ValueError("ect: invalid typ parameter")
alg = header.get("alg")
if alg in ("none", "HS256", "HS384", "HS512"):
raise ValueError("ect: prohibited algorithm")
kid = header.get("kid")
if not kid:
raise ValueError("ect: missing kid")
if not opts.resolve_key:
raise ValueError("ect: ResolveKey required")
pub = opts.resolve_key(kid)
if pub is None:
raise ValueError("ect: unknown key identifier")
try:
claims = jwt.decode(
compact,
pub,
algorithms=["ES256"],
options={"verify_exp": False, "verify_aud": False, "verify_iat": False},
)
except jwt.InvalidSignatureError as e:
raise ValueError(f"ect: invalid signature: {e}") from e
except Exception as e:
raise ValueError(f"ect: verify failed: {e}") from e
payload = Payload.from_claims(claims)
set_log_jti(payload.jti)
validate_ext(payload.ext)
if opts.max_pred_length > 0 and len(payload.pred) > opts.max_pred_length:
raise ValueError("ect: pred exceeds max length")
if opts.validate_uuids:
if not valid_uuid(payload.jti):
raise ValueError("ect: jti must be UUID format")
if payload.wid and not valid_uuid(payload.wid):
raise ValueError("ect: wid must be UUID format when set")
if payload.inp_hash:
validate_hash_format(payload.inp_hash)
if payload.out_hash:
validate_hash_format(payload.out_hash)
if opts.wit_subject and payload.iss != opts.wit_subject:
raise ValueError("ect: issuer does not match WIT subject")
if opts.verifier_id and not payload.contains_audience(opts.verifier_id):
raise ValueError("ect: audience does not include verifier")
now = opts.now if opts.now is not None else int(time.time())
if now > payload.exp:
raise ValueError("ect: token expired")
if now - payload.iat > opts.iat_max_age_sec:
raise ValueError("ect: iat too far in the past")
if payload.iat > now + opts.iat_max_future_sec:
raise ValueError("ect: iat in the future")
# Required claims per spec: jti, exec_act, pred. pred may be set to [] when missing (from_claims already uses []).
if not payload.jti or not payload.exec_act:
raise ValueError("ect: missing required claims (jti, exec_act, pred)")
if payload.pred is None:
payload.pred = []
if opts.store is not None and opts.dag is not None:
validate_dag(payload, opts.store, opts.dag)
if opts.jti_seen is not None and opts.jti_seen(payload.jti):
raise ValueError("ect: jti already seen (replay)")
return ParsedECT(header=header, payload=payload, raw=compact)