Move Go reference implementation to refimpl/go-lang/ and add new Python reference implementation in refimpl/python/. Update build.sh with renamed draft and simplified tool paths. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
161 lines
5.7 KiB
Python
161 lines
5.7 KiB
Python
"""ECT verification per Section 7."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import hmac
|
|
import time
|
|
from dataclasses import dataclass
|
|
from typing import Callable, Optional
|
|
|
|
import jwt
|
|
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePublicKey
|
|
|
|
from ect.types import ECT_TYPE, Payload, valid_pol_decision
|
|
from ect.dag import ECTStore, DAGConfig, validate_dag
|
|
from ect.validate import validate_ext, validate_hash_format, valid_uuid
|
|
|
|
|
|
@dataclass
|
|
class ParsedECT:
|
|
header: dict
|
|
payload: Payload
|
|
raw: str
|
|
|
|
|
|
KeyResolver = Callable[[str], Optional[EllipticCurvePublicKey]]
|
|
|
|
|
|
@dataclass
|
|
class VerifyOptions:
|
|
verifier_id: str = ""
|
|
resolve_key: Optional[KeyResolver] = None
|
|
store: Optional[ECTStore] = None
|
|
dag: Optional[DAGConfig] = None
|
|
now: Optional[int] = None # unix seconds; None = time.time()
|
|
iat_max_age_sec: int = 900
|
|
iat_max_future_sec: int = 30
|
|
jti_seen: Optional[Callable[[str], bool]] = None
|
|
wit_subject: str = ""
|
|
validate_uuids: bool = False
|
|
max_par_length: int = 0 # 0 = no limit
|
|
on_verify_attempt: Optional[Callable[[str, Optional[Exception]], None]] = None # (jti, err) for observability
|
|
|
|
|
|
def default_verify_options() -> VerifyOptions:
|
|
from ect.dag import default_dag_config
|
|
return VerifyOptions(dag=default_dag_config())
|
|
|
|
|
|
def parse(compact: str) -> ParsedECT:
|
|
"""Parse compact JWS and return header + payload without verification."""
|
|
try:
|
|
unverified = jwt.decode(
|
|
compact,
|
|
options={"verify_signature": False, "verify_exp": False},
|
|
)
|
|
except Exception as e:
|
|
raise ValueError(f"ect: parse failed: {e}") from e
|
|
header = jwt.get_unverified_header(compact)
|
|
if header.get("alg") != "ES256":
|
|
raise ValueError("ect: expected ES256")
|
|
payload = Payload.from_claims(unverified)
|
|
return ParsedECT(header=header, payload=payload, raw=compact)
|
|
|
|
|
|
def verify(compact: str, opts: VerifyOptions) -> ParsedECT:
|
|
"""Full Section 7 verification and optional DAG validation."""
|
|
log_jti: list[str] = [""] # use list so callback sees updated jti
|
|
|
|
def set_log_jti(jti: str) -> None:
|
|
log_jti[0] = jti
|
|
|
|
err: Optional[Exception] = None
|
|
try:
|
|
return _verify_impl(compact, opts, set_log_jti)
|
|
except Exception as e:
|
|
err = e
|
|
raise
|
|
finally:
|
|
if opts.on_verify_attempt is not None:
|
|
opts.on_verify_attempt(log_jti[0], err)
|
|
|
|
|
|
def _verify_impl(compact: str, opts: VerifyOptions, set_log_jti: Callable[[str], None]) -> ParsedECT:
|
|
header = jwt.get_unverified_header(compact)
|
|
typ = header.get("typ") or ""
|
|
# Constant-time comparison for typ
|
|
if not hmac.compare_digest(typ, ECT_TYPE):
|
|
raise ValueError("ect: invalid typ parameter")
|
|
alg = header.get("alg")
|
|
if alg in ("none", "HS256", "HS384", "HS512"):
|
|
raise ValueError("ect: prohibited algorithm")
|
|
kid = header.get("kid")
|
|
if not kid:
|
|
raise ValueError("ect: missing kid")
|
|
if not opts.resolve_key:
|
|
raise ValueError("ect: ResolveKey required")
|
|
pub = opts.resolve_key(kid)
|
|
if pub is None:
|
|
raise ValueError("ect: unknown key identifier")
|
|
|
|
try:
|
|
claims = jwt.decode(
|
|
compact,
|
|
pub,
|
|
algorithms=["ES256"],
|
|
options={"verify_exp": False, "verify_aud": False, "verify_iat": False},
|
|
)
|
|
except jwt.InvalidSignatureError as e:
|
|
raise ValueError(f"ect: invalid signature: {e}") from e
|
|
except Exception as e:
|
|
raise ValueError(f"ect: verify failed: {e}") from e
|
|
|
|
payload = Payload.from_claims(claims)
|
|
set_log_jti(payload.jti)
|
|
|
|
validate_ext(payload.ext)
|
|
if opts.max_par_length > 0 and len(payload.par) > opts.max_par_length:
|
|
raise ValueError("ect: par exceeds max length")
|
|
if opts.validate_uuids:
|
|
if not valid_uuid(payload.jti):
|
|
raise ValueError("ect: jti must be UUID format")
|
|
if payload.wid and not valid_uuid(payload.wid):
|
|
raise ValueError("ect: wid must be UUID format when set")
|
|
if payload.inp_hash:
|
|
validate_hash_format(payload.inp_hash)
|
|
if payload.out_hash:
|
|
validate_hash_format(payload.out_hash)
|
|
|
|
if opts.wit_subject and payload.iss != opts.wit_subject:
|
|
raise ValueError("ect: issuer does not match WIT subject")
|
|
if opts.verifier_id and not payload.contains_audience(opts.verifier_id):
|
|
raise ValueError("ect: audience does not include verifier")
|
|
|
|
now = opts.now if opts.now is not None else int(time.time())
|
|
if now > payload.exp:
|
|
raise ValueError("ect: token expired")
|
|
if now - payload.iat > opts.iat_max_age_sec:
|
|
raise ValueError("ect: iat too far in the past")
|
|
if payload.iat > now + opts.iat_max_future_sec:
|
|
raise ValueError("ect: iat in the future")
|
|
|
|
# Required claims per spec: jti, exec_act, par. par may be set to [] when missing (from_claims already uses []).
|
|
if not payload.jti or not payload.exec_act:
|
|
raise ValueError("ect: missing required claims (jti, exec_act, par)")
|
|
if payload.par is None:
|
|
payload.par = []
|
|
# If pol or pol_decision present, both must be present and valid
|
|
if payload.pol or payload.pol_decision:
|
|
if not payload.pol or not payload.pol_decision:
|
|
raise ValueError("ect: pol and pol_decision must both be present when either is set")
|
|
if not valid_pol_decision(payload.pol_decision):
|
|
raise ValueError("ect: invalid pol_decision value")
|
|
|
|
if opts.store is not None and opts.dag is not None:
|
|
validate_dag(payload, opts.store, opts.dag)
|
|
|
|
if opts.jti_seen is not None and opts.jti_seen(payload.jti):
|
|
raise ValueError("ect: jti already seen (replay)")
|
|
|
|
return ParsedECT(header=header, payload=payload, raw=compact)
|