"""Tests for act.verify module.""" import time import uuid import pytest from act.crypto import ( ACTKeyResolver, KeyRegistry, generate_ed25519_keypair, sign, ) from act.errors import ( ACTAudienceMismatchError, ACTCapabilityError, ACTExpiredError, ACTPhaseError, ACTSignatureError, ACTValidationError, ) from act.ledger import ACTLedger from act.lifecycle import transition_to_record from act.token import ( ACTMandate, ACTRecord, Capability, Delegation, TaskClaim, encode_jws, ) from act.verify import ACTVerifier @pytest.fixture def setup(): iss_priv, iss_pub = generate_ed25519_keypair() sub_priv, sub_pub = generate_ed25519_keypair() registry = KeyRegistry() registry.register("iss-key", iss_pub) registry.register("sub-key", sub_pub) resolver = ACTKeyResolver(registry=registry) base_time = 1772064000 return { "iss_priv": iss_priv, "iss_pub": iss_pub, "sub_priv": sub_priv, "sub_pub": sub_pub, "registry": registry, "resolver": resolver, "base_time": base_time, } def make_mandate(setup, **overrides): bt = setup["base_time"] defaults = dict( alg="EdDSA", kid="iss-key", iss="agent-issuer", sub="agent-subject", aud="agent-subject", iat=bt, exp=bt + 900, jti=str(uuid.uuid4()), task=TaskClaim(purpose="test"), cap=[Capability(action="read.data")], ) defaults.update(overrides) return ACTMandate(**defaults) def sign_mandate(mandate, priv_key): sig = sign(priv_key, mandate.signing_input()) return encode_jws(mandate, sig) class TestVerifyMandate: def test_valid_mandate(self, setup): verifier = ACTVerifier( setup["resolver"], verifier_id="agent-subject", trusted_issuers={"agent-issuer"}, ) mandate = make_mandate(setup) compact = sign_mandate(mandate, setup["iss_priv"]) result = verifier.verify_mandate(compact, now=setup["base_time"] + 100) assert result.iss == "agent-issuer" def test_expired(self, setup): verifier = ACTVerifier(setup["resolver"], verifier_id="agent-subject") mandate = make_mandate(setup) compact = sign_mandate(mandate, setup["iss_priv"]) with pytest.raises(ACTExpiredError): verifier.verify_mandate(compact, now=setup["base_time"] + 2000) def test_wrong_audience(self, setup): verifier = ACTVerifier( setup["resolver"], verifier_id="other-agent", trusted_issuers={"agent-issuer"}, ) mandate = make_mandate(setup) compact = sign_mandate(mandate, setup["iss_priv"]) with pytest.raises(ACTAudienceMismatchError): verifier.verify_mandate( compact, now=setup["base_time"] + 100, check_sub=False, ) def test_untrusted_issuer(self, setup): verifier = ACTVerifier( setup["resolver"], verifier_id="agent-subject", trusted_issuers={"trusted-only"}, ) mandate = make_mandate(setup) compact = sign_mandate(mandate, setup["iss_priv"]) with pytest.raises(ACTValidationError, match="not trusted"): verifier.verify_mandate(compact, now=setup["base_time"] + 100) def test_signature_failure(self, setup): verifier = ACTVerifier(setup["resolver"], verifier_id="agent-subject") mandate = make_mandate(setup) compact = sign_mandate(mandate, setup["iss_priv"]) # Tamper with signature parts = compact.split(".") parts[2] = parts[2][:-4] + "XXXX" tampered = ".".join(parts) with pytest.raises(ACTSignatureError): verifier.verify_mandate(tampered, now=setup["base_time"] + 100) def test_phase2_as_mandate(self, setup): verifier = ACTVerifier(setup["resolver"]) mandate = make_mandate(setup) record, compact = transition_to_record( mandate, sub_kid="sub-key", sub_private_key=setup["sub_priv"], exec_act="read.data", par=[], status="completed", exec_ts=setup["base_time"] + 100, ) with pytest.raises(ACTPhaseError): verifier.verify_mandate(compact, now=setup["base_time"] + 100) def test_future_iat(self, setup): verifier = ACTVerifier(setup["resolver"], verifier_id="agent-subject") bt = setup["base_time"] mandate = make_mandate(setup, iat=bt + 1000, exp=bt + 2000) compact = sign_mandate(mandate, setup["iss_priv"]) with pytest.raises(ACTValidationError, match="future"): verifier.verify_mandate(compact, now=bt) class TestVerifyRecord: def test_valid_record(self, setup): verifier = ACTVerifier( setup["resolver"], verifier_id="agent-subject", trusted_issuers={"agent-issuer"}, ) mandate = make_mandate(setup) record, compact = transition_to_record( mandate, sub_kid="sub-key", sub_private_key=setup["sub_priv"], exec_act="read.data", par=[], exec_ts=setup["base_time"] + 100, status="completed", ) result = verifier.verify_record( compact, now=setup["base_time"] + 200, check_aud=False, ) assert result.exec_act == "read.data" def test_wrong_signer(self, setup): verifier = ACTVerifier(setup["resolver"]) mandate = make_mandate(setup) record = ACTRecord.from_mandate( mandate, kid="sub-key", exec_act="read.data", par=[], exec_ts=setup["base_time"] + 100, status="completed", ) # Sign with iss key instead of sub key sig = sign(setup["iss_priv"], record.signing_input()) compact = encode_jws(record, sig) with pytest.raises(ACTSignatureError): verifier.verify_record(compact, now=setup["base_time"] + 200) def test_with_dag_validation(self, setup): verifier = ACTVerifier( setup["resolver"], verifier_id="agent-subject", trusted_issuers={"agent-issuer"}, ) ledger = ACTLedger() mandate = make_mandate(setup) record, compact = transition_to_record( mandate, sub_kid="sub-key", sub_private_key=setup["sub_priv"], exec_act="read.data", par=[], exec_ts=setup["base_time"] + 100, status="completed", ) result = verifier.verify_record( compact, store=ledger, now=setup["base_time"] + 200, check_aud=False, ) assert result.status == "completed"