"""ECT creation: build and sign JWT with ES256.""" from __future__ import annotations import copy import time from dataclasses import dataclass from typing import Optional import jwt from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePrivateKey from ect.types import ECT_TYPE, Payload from ect.validate import ( DEFAULT_MAX_PRED_LENGTH, validate_ext, validate_hash_format, valid_uuid, ) @dataclass class CreateOptions: key_id: str iat_max_age_sec: int = 900 # 15 min default_expiry_sec: int = 600 # 10 min validate_uuids: bool = False max_pred_length: int = 0 # 0 = no limit; use DEFAULT_MAX_PRED_LENGTH for 100 def default_create_options() -> CreateOptions: return CreateOptions(key_id="") def _validate_payload(p: Payload, opts: CreateOptions) -> None: if not p.iss: raise ValueError("ect: iss required") if not p.aud: raise ValueError("ect: aud required") if not p.jti: raise ValueError("ect: jti required") if not p.exec_act: raise ValueError("ect: exec_act required") if opts.validate_uuids: if not valid_uuid(p.jti): raise ValueError("ect: jti must be UUID format") if p.wid and not valid_uuid(p.wid): raise ValueError("ect: wid must be UUID format when set") max_pred = opts.max_pred_length or 0 if max_pred > 0 and len(p.pred) > max_pred: raise ValueError("ect: pred exceeds max length") if p.inp_hash: validate_hash_format(p.inp_hash) if p.out_hash: validate_hash_format(p.out_hash) validate_ext(p.ext) # compensation in ext per spec if p.ext and p.ext.get("compensation_reason") and not p.ext.get("compensation_required"): raise ValueError("ect: ext.compensation_reason requires ext.compensation_required true") def create( payload: Payload, private_key: EllipticCurvePrivateKey, opts: CreateOptions, ) -> str: """Build and sign an ECT. Payload must have required claims; iat/exp can be 0 for defaults. create() works on a deep copy so the caller's payload is not modified. """ if not opts.key_id: raise ValueError("ect: KeyID required") # Work on a copy so we do not mutate the caller's payload. payload = copy.deepcopy(payload) now = int(time.time()) if payload.iat == 0: payload.iat = now if payload.exp == 0: payload.exp = now + (opts.default_expiry_sec or 600) if payload.pred is None: payload.pred = [] _validate_payload(payload, opts) claims = payload.to_claims() headers = { "typ": ECT_TYPE, "alg": "ES256", "kid": opts.key_id, } return jwt.encode( claims, private_key, algorithm="ES256", headers=headers, ) def generate_key() -> EllipticCurvePrivateKey: """Create an ECDSA P-256 key for ES256 (testing/demo).""" from cryptography.hazmat.primitives.asymmetric import ec return ec.generate_private_key(ec.SECP256R1())