Strategic work for IETF submission of draft-nennemann-act-01 and
draft-nennemann-wimse-ect-02:
Package restructure:
- move ACT and ECT refimpls to workspace/packages/{act,ect}/
- ietf-act and ietf-ect distribution names (sibling packages)
- cross-spec interop test plan (INTEROP-TEST-PLAN.md)
ACT draft -01 revisions:
- rename 'par' claim to 'pred' (align with ECT)
- rename 'Agent Compact Token' to 'Agent Context Token' (semantic
alignment with ECT family)
- add Applicability section (MCP, OpenAI, LangGraph, A2A, CrewAI)
- add DAG vs Linear Delegation Chains section (differentiator vs
txn-tokens-for-agents actchain, Agentic JWT, AIP/IBCTs)
- add Related Work: AIP, SentinelAgent, Agentic JWT, txn-tokens-for-agents,
HDP, SCITT-AI-agent-execution
- pin SCITT arch to -22, note AUTH48 status
Outreach drafts:
- Emirdag liaison email (SCITT-AI coordination)
- OAuth ML response on txn-tokens-for-agents-06
Strategy document:
- STRATEGY.md with phased action plan, risk register, timeline
Submodule:
- update workspace/drafts/ietf-wimse-ect pointer to -02 commit
245 lines
7.3 KiB
Python
245 lines
7.3 KiB
Python
"""Tests for act.token module."""
|
|
|
|
import json
|
|
import time
|
|
import uuid
|
|
|
|
import pytest
|
|
|
|
from act.token import (
|
|
ACTMandate,
|
|
ACTRecord,
|
|
Capability,
|
|
Delegation,
|
|
DelegationEntry,
|
|
ErrorClaim,
|
|
Oversight,
|
|
TaskClaim,
|
|
_b64url_decode,
|
|
_b64url_encode,
|
|
decode_jws,
|
|
encode_jws,
|
|
parse_token,
|
|
validate_action_name,
|
|
)
|
|
from act.errors import ACTPhaseError, ACTValidationError
|
|
|
|
|
|
@pytest.fixture
|
|
def base_time():
|
|
return 1772064000
|
|
|
|
|
|
@pytest.fixture
|
|
def mandate(base_time):
|
|
return ACTMandate(
|
|
alg="EdDSA",
|
|
kid="test-key",
|
|
iss="agent-a",
|
|
sub="agent-b",
|
|
aud="agent-b",
|
|
iat=base_time,
|
|
exp=base_time + 900,
|
|
jti=str(uuid.uuid4()),
|
|
task=TaskClaim(purpose="test_task"),
|
|
cap=[Capability(action="read.data")],
|
|
)
|
|
|
|
|
|
class TestBase64url:
|
|
def test_roundtrip(self):
|
|
data = b"hello world"
|
|
assert _b64url_decode(_b64url_encode(data)) == data
|
|
|
|
def test_no_padding(self):
|
|
encoded = _b64url_encode(b"test")
|
|
assert "=" not in encoded
|
|
|
|
|
|
class TestActionNameValidation:
|
|
def test_valid_simple(self):
|
|
validate_action_name("read")
|
|
|
|
def test_valid_dotted(self):
|
|
validate_action_name("read.data")
|
|
|
|
def test_valid_with_hyphens(self):
|
|
validate_action_name("read-write.data_item")
|
|
|
|
def test_invalid_starts_with_digit(self):
|
|
with pytest.raises(ACTValidationError):
|
|
validate_action_name("1read")
|
|
|
|
def test_invalid_empty(self):
|
|
with pytest.raises(ACTValidationError):
|
|
validate_action_name("")
|
|
|
|
def test_invalid_double_dot(self):
|
|
with pytest.raises(ACTValidationError):
|
|
validate_action_name("read..data")
|
|
|
|
|
|
class TestTaskClaim:
|
|
def test_roundtrip(self):
|
|
t = TaskClaim(purpose="test", data_sensitivity="restricted")
|
|
d = t.to_dict()
|
|
t2 = TaskClaim.from_dict(d)
|
|
assert t == t2
|
|
|
|
def test_missing_purpose(self):
|
|
with pytest.raises(ACTValidationError):
|
|
TaskClaim.from_dict({})
|
|
|
|
|
|
class TestCapability:
|
|
def test_roundtrip(self):
|
|
c = Capability(action="read.data", constraints={"max": 10})
|
|
d = c.to_dict()
|
|
c2 = Capability.from_dict(d)
|
|
assert c == c2
|
|
|
|
def test_validates_action(self):
|
|
with pytest.raises(ACTValidationError):
|
|
Capability(action="")
|
|
|
|
|
|
class TestDelegation:
|
|
def test_roundtrip(self):
|
|
d = Delegation(
|
|
depth=1,
|
|
max_depth=3,
|
|
chain=[DelegationEntry(delegator="a", jti="j1", sig="sig1")],
|
|
)
|
|
as_dict = d.to_dict()
|
|
d2 = Delegation.from_dict(as_dict)
|
|
assert d.depth == d2.depth
|
|
assert len(d2.chain) == 1
|
|
|
|
|
|
class TestACTMandate:
|
|
def test_validate_success(self, mandate):
|
|
mandate.validate()
|
|
|
|
def test_validate_missing_iss(self, base_time):
|
|
m = ACTMandate(
|
|
alg="EdDSA", kid="k", iss="", sub="b", aud="b",
|
|
iat=base_time, exp=base_time + 900,
|
|
task=TaskClaim(purpose="t"), cap=[Capability(action="x.y")],
|
|
)
|
|
with pytest.raises(ACTValidationError, match="iss"):
|
|
m.validate()
|
|
|
|
def test_validate_forbidden_alg(self, base_time):
|
|
m = ACTMandate(
|
|
alg="HS256", kid="k", iss="a", sub="b", aud="b",
|
|
iat=base_time, exp=base_time + 900,
|
|
task=TaskClaim(purpose="t"), cap=[Capability(action="x.y")],
|
|
)
|
|
with pytest.raises(ACTValidationError):
|
|
m.validate()
|
|
|
|
def test_validate_alg_none(self, base_time):
|
|
m = ACTMandate(
|
|
alg="none", kid="k", iss="a", sub="b", aud="b",
|
|
iat=base_time, exp=base_time + 900,
|
|
task=TaskClaim(purpose="t"), cap=[Capability(action="x.y")],
|
|
)
|
|
with pytest.raises(ACTValidationError):
|
|
m.validate()
|
|
|
|
def test_to_claims_includes_optional(self, base_time):
|
|
m = ACTMandate(
|
|
alg="EdDSA", kid="k", iss="a", sub="b", aud="b",
|
|
iat=base_time, exp=base_time + 900,
|
|
task=TaskClaim(purpose="t"), cap=[Capability(action="x.y")],
|
|
wid="w-1",
|
|
oversight=Oversight(requires_approval_for=["x.y"]),
|
|
)
|
|
claims = m.to_claims()
|
|
assert claims["wid"] == "w-1"
|
|
assert "oversight" in claims
|
|
|
|
def test_is_phase2(self, mandate):
|
|
assert mandate.is_phase2() is False
|
|
|
|
def test_from_claims_rejects_phase2(self):
|
|
with pytest.raises(ACTPhaseError):
|
|
ACTMandate.from_claims(
|
|
{"alg": "EdDSA", "typ": "act+jwt", "kid": "k"},
|
|
{"exec_act": "x", "iss": "a", "sub": "b", "aud": "b",
|
|
"iat": 1, "exp": 2, "jti": "j",
|
|
"task": {"purpose": "t"}, "cap": [{"action": "x"}]},
|
|
)
|
|
|
|
|
|
class TestACTRecord:
|
|
def test_from_mandate(self, mandate):
|
|
r = ACTRecord.from_mandate(
|
|
mandate, kid="sub-key", exec_act="read.data",
|
|
pred=[], status="completed",
|
|
)
|
|
assert r.iss == mandate.iss
|
|
assert r.exec_act == "read.data"
|
|
assert r.kid == "sub-key"
|
|
|
|
def test_validate_bad_status(self, mandate):
|
|
r = ACTRecord.from_mandate(
|
|
mandate, kid="k", exec_act="read.data",
|
|
pred=[], exec_ts=mandate.iat + 100, status="invalid",
|
|
)
|
|
with pytest.raises(ACTValidationError, match="status"):
|
|
r.validate()
|
|
|
|
def test_is_phase2(self, mandate):
|
|
r = ACTRecord.from_mandate(
|
|
mandate, kid="k", exec_act="read.data",
|
|
pred=[], status="completed",
|
|
)
|
|
assert r.is_phase2() is True
|
|
|
|
def test_from_claims_rejects_phase1(self):
|
|
with pytest.raises(ACTPhaseError):
|
|
ACTRecord.from_claims(
|
|
{"alg": "EdDSA", "typ": "act+jwt", "kid": "k"},
|
|
{"iss": "a", "sub": "b", "aud": "b",
|
|
"iat": 1, "exp": 2, "jti": "j",
|
|
"task": {"purpose": "t"}, "cap": [{"action": "x"}]},
|
|
)
|
|
|
|
|
|
class TestJWSSerialization:
|
|
def test_decode_invalid_parts(self):
|
|
with pytest.raises(ACTValidationError):
|
|
decode_jws("only.two")
|
|
|
|
def test_decode_invalid_header(self):
|
|
with pytest.raises(ACTValidationError):
|
|
decode_jws("!!!.cGF5bG9hZA.c2ln")
|
|
|
|
def test_decode_wrong_typ(self):
|
|
header = _b64url_encode(json.dumps({"alg": "EdDSA", "typ": "jwt", "kid": "k"}).encode())
|
|
payload = _b64url_encode(json.dumps({"iss": "a"}).encode())
|
|
sig = _b64url_encode(b"sig")
|
|
with pytest.raises(ACTValidationError, match="typ"):
|
|
decode_jws(f"{header}.{payload}.{sig}")
|
|
|
|
def test_parse_token_phase1(self, mandate):
|
|
from act.crypto import generate_ed25519_keypair, sign
|
|
priv, pub = generate_ed25519_keypair()
|
|
sig = sign(priv, mandate.signing_input())
|
|
compact = encode_jws(mandate, sig)
|
|
parsed = parse_token(compact)
|
|
assert isinstance(parsed, ACTMandate)
|
|
|
|
def test_parse_token_phase2(self, mandate):
|
|
from act.crypto import generate_ed25519_keypair, sign
|
|
priv, pub = generate_ed25519_keypair()
|
|
record = ACTRecord.from_mandate(
|
|
mandate, kid="k", exec_act="read.data",
|
|
pred=[], status="completed",
|
|
)
|
|
sig = sign(priv, record.signing_input())
|
|
compact = encode_jws(record, sig)
|
|
parsed = parse_token(compact)
|
|
assert isinstance(parsed, ACTRecord)
|