"""Tests for act.delegation module.""" import time import uuid import pytest from act.crypto import generate_ed25519_keypair, sign, verify, compute_sha256 from act.delegation import ( create_delegated_mandate, verify_capability_subset, verify_delegation_chain, ) from act.errors import ( ACTDelegationError, ACTPrivilegeEscalationError, ) from act.token import ( ACTMandate, Capability, Delegation, DelegationEntry, TaskClaim, _b64url_decode, encode_jws, ) @pytest.fixture def parent_setup(): iss_priv, iss_pub = generate_ed25519_keypair() mandate = ACTMandate( alg="EdDSA", kid="iss-key", iss="agent-a", sub="agent-b", aud="agent-b", iat=1772064000, exp=1772064900, jti="parent-jti-1", task=TaskClaim(purpose="parent_task"), cap=[ Capability(action="read.data", constraints={"max_records": 10}), Capability(action="write.result"), ], delegation=Delegation(depth=0, max_depth=3, chain=[]), ) sig = sign(iss_priv, mandate.signing_input()) compact = encode_jws(mandate, sig) return mandate, compact, iss_priv, iss_pub class TestCreateDelegatedMandate: def test_basic_delegation(self, parent_setup): mandate, compact, priv, _ = parent_setup delegated, _ = create_delegated_mandate( parent_mandate=mandate, parent_compact=compact, delegator_private_key=priv, sub="agent-c", kid="key-b", iss="agent-a", aud="agent-c", iat=1772064010, exp=1772064600, jti="child-jti-1", cap=[Capability(action="read.data", constraints={"max_records": 5})], task=TaskClaim(purpose="child_task"), ) assert delegated.delegation.depth == 1 assert len(delegated.delegation.chain) == 1 assert delegated.delegation.chain[0].delegator == "agent-a" def test_depth_exceeded(self, parent_setup): mandate, compact, priv, _ = parent_setup # Set parent to max depth mandate.delegation = Delegation(depth=3, max_depth=3, chain=[ DelegationEntry(delegator="x", jti="j", sig="s") for _ in range(3) ]) with pytest.raises(ACTDelegationError, match="exceeds max_depth"): create_delegated_mandate( parent_mandate=mandate, parent_compact=compact, delegator_private_key=priv, sub="c", kid="k", iss="a", aud="c", iat=1, exp=2, jti="j", cap=[Capability(action="read.data", constraints={"max_records": 5})], task=TaskClaim(purpose="t"), ) def test_no_del_claim(self): priv, _ = generate_ed25519_keypair() mandate = ACTMandate( alg="EdDSA", kid="k", iss="a", sub="b", aud="b", iat=1, exp=2, task=TaskClaim(purpose="t"), cap=[Capability(action="x.y")], delegation=None, # no del claim ) with pytest.raises(ACTDelegationError, match="not permitted"): create_delegated_mandate( parent_mandate=mandate, parent_compact="compact", delegator_private_key=priv, sub="c", kid="k", iss="a", aud="c", iat=1, exp=2, jti="j", cap=[Capability(action="x.y")], task=TaskClaim(purpose="t"), ) def test_max_depth_reduction(self, parent_setup): mandate, compact, priv, _ = parent_setup delegated, _ = create_delegated_mandate( parent_mandate=mandate, parent_compact=compact, delegator_private_key=priv, sub="c", kid="k", iss="a", aud="c", iat=1, exp=2, jti="j", cap=[Capability(action="read.data", constraints={"max_records": 5})], task=TaskClaim(purpose="t"), max_depth=2, ) assert delegated.delegation.max_depth == 2 def test_max_depth_escalation(self, parent_setup): mandate, compact, priv, _ = parent_setup with pytest.raises(ACTDelegationError, match="exceeds parent max_depth"): create_delegated_mandate( parent_mandate=mandate, parent_compact=compact, delegator_private_key=priv, sub="c", kid="k", iss="a", aud="c", iat=1, exp=2, jti="j", cap=[Capability(action="read.data", constraints={"max_records": 5})], task=TaskClaim(purpose="t"), max_depth=10, ) class TestCapabilitySubset: def test_valid_subset(self): parent = [Capability(action="read.data", constraints={"max_records": 10})] child = [Capability(action="read.data", constraints={"max_records": 5})] verify_capability_subset(parent, child) def test_extra_action(self): parent = [Capability(action="read.data")] child = [Capability(action="delete.data")] with pytest.raises(ACTPrivilegeEscalationError): verify_capability_subset(parent, child) def test_numeric_escalation(self): parent = [Capability(action="read.data", constraints={"max_records": 10})] child = [Capability(action="read.data", constraints={"max_records": 100})] with pytest.raises(ACTPrivilegeEscalationError): verify_capability_subset(parent, child) def test_sensitivity_escalation(self): parent = [Capability(action="read.data", constraints={"data_sensitivity": "confidential"})] child = [Capability(action="read.data", constraints={"data_sensitivity": "internal"})] with pytest.raises(ACTPrivilegeEscalationError): verify_capability_subset(parent, child) def test_sensitivity_more_restrictive(self): parent = [Capability(action="read.data", constraints={"data_sensitivity": "internal"})] child = [Capability(action="read.data", constraints={"data_sensitivity": "restricted"})] verify_capability_subset(parent, child) # should pass def test_missing_constraint(self): parent = [Capability(action="read.data", constraints={"max_records": 10, "scope": "local"})] child = [Capability(action="read.data", constraints={"max_records": 5})] with pytest.raises(ACTPrivilegeEscalationError, match="missing"): verify_capability_subset(parent, child) def test_domain_specific_identical(self): parent = [Capability(action="read.data", constraints={"custom": "value_a"})] child = [Capability(action="read.data", constraints={"custom": "value_a"})] verify_capability_subset(parent, child) def test_domain_specific_different(self): parent = [Capability(action="read.data", constraints={"custom": "value_a"})] child = [Capability(action="read.data", constraints={"custom": "value_b"})] with pytest.raises(ACTPrivilegeEscalationError, match="identical"): verify_capability_subset(parent, child) class TestVerifyDelegationChain: def test_chain_sig_verification(self, parent_setup): mandate, compact, priv, pub = parent_setup delegated, _ = create_delegated_mandate( parent_mandate=mandate, parent_compact=compact, delegator_private_key=priv, sub="c", kid="k", iss="agent-a", aud="c", iat=1, exp=2, jti="j", cap=[Capability(action="read.data", constraints={"max_records": 5})], task=TaskClaim(purpose="t"), ) # Verify the chain def resolve_key(delegator_id): return pub def resolve_compact(jti): if jti == "parent-jti-1": return compact return None verify_delegation_chain(delegated, resolve_key, resolve_compact) def test_no_delegation(self): mandate = ACTMandate( alg="EdDSA", kid="k", iss="a", sub="b", aud="b", iat=1, exp=2, task=TaskClaim(purpose="t"), cap=[Capability(action="x.y")], ) verify_delegation_chain(mandate, lambda x: None) # no-op def test_depth_exceeds_max(self): mandate = ACTMandate( alg="EdDSA", kid="k", iss="a", sub="b", aud="b", iat=1, exp=2, task=TaskClaim(purpose="t"), cap=[Capability(action="x.y")], delegation=Delegation(depth=5, max_depth=3, chain=[ DelegationEntry(delegator="x", jti="j", sig="s") for _ in range(5) ]), ) with pytest.raises(ACTDelegationError, match="exceeds"): verify_delegation_chain(mandate, lambda x: None)