"""ECT creation: build and sign JWT with ES256.""" from __future__ import annotations import copy import time from dataclasses import dataclass from typing import Optional import jwt from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePrivateKey from ect.types import Payload, valid_pol_decision from ect.validate import ( DEFAULT_MAX_PAR_LENGTH, validate_ext, validate_hash_format, valid_uuid, ) @dataclass class CreateOptions: key_id: str iat_max_age_sec: int = 900 # 15 min default_expiry_sec: int = 600 # 10 min validate_uuids: bool = False max_par_length: int = 0 # 0 = no limit; use DEFAULT_MAX_PAR_LENGTH for 100 def default_create_options() -> CreateOptions: return CreateOptions(key_id="") def _validate_payload(p: Payload, opts: CreateOptions) -> None: if not p.iss: raise ValueError("ect: iss required") if not p.aud: raise ValueError("ect: aud required") if not p.jti: raise ValueError("ect: jti required") if not p.exec_act: raise ValueError("ect: exec_act required") if opts.validate_uuids: if not valid_uuid(p.jti): raise ValueError("ect: jti must be UUID format") if p.wid and not valid_uuid(p.wid): raise ValueError("ect: wid must be UUID format when set") max_par = opts.max_par_length or 0 if max_par > 0 and len(p.par) > max_par: raise ValueError("ect: par exceeds max length") if p.inp_hash: validate_hash_format(p.inp_hash) if p.out_hash: validate_hash_format(p.out_hash) validate_ext(p.ext) # pol/pol_decision OPTIONAL; if either set, both must be present and valid if p.pol or p.pol_decision: if not p.pol or not p.pol_decision: raise ValueError("ect: pol and pol_decision must both be present when either is set") if not valid_pol_decision(p.pol_decision): raise ValueError( "ect: pol_decision must be approved, rejected, or pending_human_review" ) # compensation in ext per spec if p.ext and p.ext.get("compensation_reason") and not p.ext.get("compensation_required"): raise ValueError("ect: ext.compensation_reason requires ext.compensation_required true") def create( payload: Payload, private_key: EllipticCurvePrivateKey, opts: CreateOptions, ) -> str: """Build and sign an ECT. Payload must have required claims; iat/exp can be 0 for defaults. create() may modify the payload in place (iat, exp, sub, par) when filling defaults; pass a copy if the original must stay unchanged. """ if not opts.key_id: raise ValueError("ect: KeyID required") # Work on a copy so we do not mutate the caller's payload. payload = copy.deepcopy(payload) now = int(time.time()) if payload.iat == 0: payload.iat = now if payload.exp == 0: payload.exp = now + (opts.default_expiry_sec or 600) if not payload.sub: payload.sub = payload.iss if payload.par is None: payload.par = [] _validate_payload(payload, opts) claims = payload.to_claims() headers = { "typ": "wimse-exec+jwt", "alg": "ES256", "kid": opts.key_id, } return jwt.encode( claims, private_key, algorithm="ES256", headers=headers, ) def generate_key() -> EllipticCurvePrivateKey: """Create an ECDSA P-256 key for ES256 (testing/demo).""" from cryptography.hazmat.primitives.asymmetric import ec return ec.generate_private_key(ec.SECP256R1())