- Rename `par` to `pred` (predecessor) in types, serialization, tests - Remove `pol`, `pol_decision` from core payload; move to `ect_ext` - Remove `sub` from payload (not part of ECT spec) - Update `typ` from `wimse-exec+jwt` to `exec+jwt` (accept both) - Rename MaxParLength to MaxPredLength everywhere - Update testdata, demos, READMEs with migration table - All Go tests pass, all 56 Python tests pass (90% coverage)
105 lines
3.0 KiB
Python
105 lines
3.0 KiB
Python
"""ECT creation: build and sign JWT with ES256."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import copy
|
|
import time
|
|
from dataclasses import dataclass
|
|
from typing import Optional
|
|
|
|
import jwt
|
|
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePrivateKey
|
|
|
|
from ect.types import ECT_TYPE, Payload
|
|
from ect.validate import (
|
|
DEFAULT_MAX_PRED_LENGTH,
|
|
validate_ext,
|
|
validate_hash_format,
|
|
valid_uuid,
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class CreateOptions:
|
|
key_id: str
|
|
iat_max_age_sec: int = 900 # 15 min
|
|
default_expiry_sec: int = 600 # 10 min
|
|
validate_uuids: bool = False
|
|
max_pred_length: int = 0 # 0 = no limit; use DEFAULT_MAX_PRED_LENGTH for 100
|
|
|
|
|
|
def default_create_options() -> CreateOptions:
|
|
return CreateOptions(key_id="")
|
|
|
|
|
|
def _validate_payload(p: Payload, opts: CreateOptions) -> None:
|
|
if not p.iss:
|
|
raise ValueError("ect: iss required")
|
|
if not p.aud:
|
|
raise ValueError("ect: aud required")
|
|
if not p.jti:
|
|
raise ValueError("ect: jti required")
|
|
if not p.exec_act:
|
|
raise ValueError("ect: exec_act required")
|
|
if opts.validate_uuids:
|
|
if not valid_uuid(p.jti):
|
|
raise ValueError("ect: jti must be UUID format")
|
|
if p.wid and not valid_uuid(p.wid):
|
|
raise ValueError("ect: wid must be UUID format when set")
|
|
max_pred = opts.max_pred_length or 0
|
|
if max_pred > 0 and len(p.pred) > max_pred:
|
|
raise ValueError("ect: pred exceeds max length")
|
|
if p.inp_hash:
|
|
validate_hash_format(p.inp_hash)
|
|
if p.out_hash:
|
|
validate_hash_format(p.out_hash)
|
|
validate_ext(p.ext)
|
|
# compensation in ext per spec
|
|
if p.ext and p.ext.get("compensation_reason") and not p.ext.get("compensation_required"):
|
|
raise ValueError("ect: ext.compensation_reason requires ext.compensation_required true")
|
|
|
|
|
|
def create(
|
|
payload: Payload,
|
|
private_key: EllipticCurvePrivateKey,
|
|
opts: CreateOptions,
|
|
) -> str:
|
|
"""Build and sign an ECT. Payload must have required claims; iat/exp can be 0 for defaults.
|
|
create() works on a deep copy so the caller's payload is not modified.
|
|
"""
|
|
if not opts.key_id:
|
|
raise ValueError("ect: KeyID required")
|
|
|
|
# Work on a copy so we do not mutate the caller's payload.
|
|
payload = copy.deepcopy(payload)
|
|
|
|
now = int(time.time())
|
|
if payload.iat == 0:
|
|
payload.iat = now
|
|
if payload.exp == 0:
|
|
payload.exp = now + (opts.default_expiry_sec or 600)
|
|
if payload.pred is None:
|
|
payload.pred = []
|
|
|
|
_validate_payload(payload, opts)
|
|
|
|
claims = payload.to_claims()
|
|
headers = {
|
|
"typ": ECT_TYPE,
|
|
"alg": "ES256",
|
|
"kid": opts.key_id,
|
|
}
|
|
return jwt.encode(
|
|
claims,
|
|
private_key,
|
|
algorithm="ES256",
|
|
headers=headers,
|
|
)
|
|
|
|
|
|
def generate_key() -> EllipticCurvePrivateKey:
|
|
"""Create an ECDSA P-256 key for ES256 (testing/demo)."""
|
|
from cryptography.hazmat.primitives.asymmetric import ec
|
|
|
|
return ec.generate_private_key(ec.SECP256R1())
|