Restructure refimpl into go-lang and python subdirectories
Move Go reference implementation to refimpl/go-lang/ and add new Python reference implementation in refimpl/python/. Update build.sh with renamed draft and simplified tool paths. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
197
refimpl/python/tests/test_verify.py
Normal file
197
refimpl/python/tests/test_verify.py
Normal file
@@ -0,0 +1,197 @@
|
||||
"""Tests for verify module."""
|
||||
|
||||
import time
|
||||
|
||||
import pytest
|
||||
|
||||
from ect import (
|
||||
Payload,
|
||||
create,
|
||||
generate_key,
|
||||
CreateOptions,
|
||||
parse,
|
||||
verify,
|
||||
VerifyOptions,
|
||||
default_verify_options,
|
||||
POL_DECISION_APPROVED,
|
||||
)
|
||||
|
||||
|
||||
def test_parse():
|
||||
key = generate_key()
|
||||
now = int(time.time())
|
||||
p = Payload(
|
||||
iss="iss", aud=["a"], iat=now, exp=now + 3600,
|
||||
jti="jti-parse", exec_act="act", par=[], pol="p", pol_decision=POL_DECISION_APPROVED,
|
||||
)
|
||||
compact = create(p, key, CreateOptions(key_id="kid"))
|
||||
parsed = parse(compact)
|
||||
assert parsed.payload.jti == "jti-parse"
|
||||
assert parsed.raw == compact
|
||||
|
||||
|
||||
def test_default_verify_options():
|
||||
opts = default_verify_options()
|
||||
assert opts.dag is not None
|
||||
assert opts.iat_max_age_sec == 900
|
||||
|
||||
|
||||
def test_verify_expired():
|
||||
key = generate_key()
|
||||
now = int(time.time())
|
||||
p = Payload(
|
||||
iss="iss", aud=["v"], iat=now - 3600, exp=now - 60,
|
||||
jti="jti-exp", exec_act="act", par=[], pol="p", pol_decision=POL_DECISION_APPROVED,
|
||||
)
|
||||
compact = create(p, key, CreateOptions(key_id="kid"))
|
||||
resolver = lambda kid: key.public_key() if kid == "kid" else None
|
||||
with pytest.raises(ValueError, match="expired"):
|
||||
verify(compact, VerifyOptions(verifier_id="v", resolve_key=resolver, now=now))
|
||||
|
||||
|
||||
def test_verify_replay():
|
||||
key = generate_key()
|
||||
now = int(time.time())
|
||||
p = Payload(
|
||||
iss="iss", aud=["v"], iat=now, exp=now + 3600,
|
||||
jti="jti-replay", exec_act="act", par=[], pol="p", pol_decision=POL_DECISION_APPROVED,
|
||||
)
|
||||
compact = create(p, key, CreateOptions(key_id="kid"))
|
||||
resolver = lambda kid: key.public_key() if kid == "kid" else None
|
||||
with pytest.raises(ValueError, match="replay"):
|
||||
verify(compact, VerifyOptions(
|
||||
verifier_id="v", resolve_key=resolver, now=now,
|
||||
jti_seen=lambda j: j == "jti-replay",
|
||||
))
|
||||
|
||||
|
||||
def test_verify_invalid_typ():
|
||||
import jwt as jwt_lib
|
||||
with pytest.raises((ValueError, jwt_lib.exceptions.DecodeError)):
|
||||
verify("not-a-jws", VerifyOptions())
|
||||
|
||||
|
||||
def test_verify_audience_mismatch():
|
||||
key = generate_key()
|
||||
now = int(time.time())
|
||||
p = Payload(
|
||||
iss="iss", aud=["other"], iat=now, exp=now + 3600,
|
||||
jti="jti-a", exec_act="act", par=[], pol="p", pol_decision=POL_DECISION_APPROVED,
|
||||
)
|
||||
compact = create(p, key, CreateOptions(key_id="kid"))
|
||||
resolver = lambda kid: key.public_key() if kid == "kid" else None
|
||||
with pytest.raises(ValueError, match="audience"):
|
||||
verify(compact, VerifyOptions(verifier_id="verifier", resolve_key=resolver, now=now))
|
||||
|
||||
|
||||
def test_verify_wit_subject_mismatch():
|
||||
key = generate_key()
|
||||
now = int(time.time())
|
||||
p = Payload(
|
||||
iss="wrong-iss", aud=["v"], iat=now, exp=now + 3600,
|
||||
jti="jti-w", exec_act="act", par=[], pol="p", pol_decision=POL_DECISION_APPROVED,
|
||||
)
|
||||
compact = create(p, key, CreateOptions(key_id="kid"))
|
||||
resolver = lambda kid: key.public_key() if kid == "kid" else None
|
||||
with pytest.raises(ValueError, match="WIT subject"):
|
||||
verify(compact, VerifyOptions(
|
||||
verifier_id="v", resolve_key=resolver, now=now, wit_subject="correct-iss",
|
||||
))
|
||||
|
||||
|
||||
def test_verify_iat_too_old():
|
||||
key = generate_key()
|
||||
now = int(time.time())
|
||||
p = Payload(
|
||||
iss="iss", aud=["v"], iat=now - 2000, exp=now + 3600,
|
||||
jti="jti-old", exec_act="act", par=[], pol="p", pol_decision=POL_DECISION_APPROVED,
|
||||
)
|
||||
compact = create(p, key, CreateOptions(key_id="kid"))
|
||||
resolver = lambda kid: key.public_key() if kid == "kid" else None
|
||||
with pytest.raises(ValueError, match="iat"):
|
||||
verify(compact, VerifyOptions(
|
||||
verifier_id="v", resolve_key=resolver, now=now, iat_max_age_sec=900,
|
||||
))
|
||||
|
||||
|
||||
def test_verify_unknown_key():
|
||||
key = generate_key()
|
||||
now = int(time.time())
|
||||
p = Payload(
|
||||
iss="iss", aud=["v"], iat=now, exp=now + 3600,
|
||||
jti="jti-k", exec_act="act", par=[], pol="p", pol_decision=POL_DECISION_APPROVED,
|
||||
)
|
||||
compact = create(p, key, CreateOptions(key_id="kid"))
|
||||
resolver = lambda kid: None # unknown key
|
||||
with pytest.raises(ValueError, match="unknown key"):
|
||||
verify(compact, VerifyOptions(verifier_id="v", resolve_key=resolver, now=now))
|
||||
|
||||
|
||||
def test_verify_resolve_key_required():
|
||||
key = generate_key()
|
||||
now = int(time.time())
|
||||
p = Payload(
|
||||
iss="iss", aud=["v"], iat=now, exp=now + 3600,
|
||||
jti="jti-r", exec_act="act", par=[], pol="p", pol_decision=POL_DECISION_APPROVED,
|
||||
)
|
||||
compact = create(p, key, CreateOptions(key_id="kid"))
|
||||
with pytest.raises(ValueError, match="ResolveKey"):
|
||||
verify(compact, VerifyOptions(verifier_id="v", resolve_key=None))
|
||||
|
||||
|
||||
def test_verify_with_dag():
|
||||
from ect import MemoryLedger
|
||||
key = generate_key()
|
||||
ledger = MemoryLedger()
|
||||
now = int(time.time())
|
||||
root = Payload(
|
||||
iss="iss", aud=["v"], iat=now, exp=now + 3600,
|
||||
jti="jti-root", exec_act="act", par=[], pol="p", pol_decision=POL_DECISION_APPROVED,
|
||||
)
|
||||
compact_root = create(root, key, CreateOptions(key_id="kid"))
|
||||
resolver = lambda kid: key.public_key() if kid == "kid" else None
|
||||
opts = VerifyOptions(verifier_id="v", resolve_key=resolver, store=ledger, now=now)
|
||||
parsed = verify(compact_root, opts)
|
||||
ledger.append(compact_root, parsed.payload)
|
||||
child = Payload(
|
||||
iss="iss", aud=["v"], iat=now + 1, exp=now + 3600,
|
||||
jti="jti-child", exec_act="act2", par=["jti-root"], pol="p", pol_decision=POL_DECISION_APPROVED,
|
||||
)
|
||||
compact_child = create(child, key, CreateOptions(key_id="kid"))
|
||||
parsed2 = verify(compact_child, opts)
|
||||
assert parsed2.payload.jti == "jti-child"
|
||||
|
||||
|
||||
def test_on_verify_attempt_callback():
|
||||
"""Observability: on_verify_attempt is called with jti and error (or None)."""
|
||||
key = generate_key()
|
||||
now = int(time.time())
|
||||
p = Payload(iss="i", aud=["v"], iat=now, exp=now + 3600, jti="jti-obs", exec_act="a", par=[])
|
||||
compact = create(p, key, CreateOptions(key_id="kid"))
|
||||
resolver = lambda k: key.public_key() if k == "kid" else None
|
||||
seen = []
|
||||
def hook(jti, err):
|
||||
seen.append((jti, err))
|
||||
opts = VerifyOptions(verifier_id="v", resolve_key=resolver, on_verify_attempt=hook)
|
||||
result = verify(compact, opts)
|
||||
assert result.payload.jti == "jti-obs"
|
||||
assert len(seen) == 1
|
||||
assert seen[0][0] == "jti-obs"
|
||||
assert seen[0][1] is None
|
||||
|
||||
|
||||
def test_on_verify_attempt_called_on_failure():
|
||||
key = generate_key()
|
||||
now = int(time.time())
|
||||
p = Payload(iss="i", aud=["v"], iat=now, exp=now - 1, jti="jti-fail", exec_act="a", par=[])
|
||||
compact = create(p, key, CreateOptions(key_id="kid"))
|
||||
resolver = lambda k: key.public_key() if k == "kid" else None
|
||||
seen = []
|
||||
opts = VerifyOptions(verifier_id="v", resolve_key=resolver, now=now, on_verify_attempt=lambda jti, err: seen.append((jti, err)))
|
||||
with pytest.raises(ValueError, match="expired"):
|
||||
verify(compact, opts)
|
||||
assert len(seen) == 1
|
||||
assert seen[0][0] == "jti-fail"
|
||||
assert seen[0][1] is not None
|
||||
|
||||
|
||||
Reference in New Issue
Block a user