Files
ietf-draft-analyzer/workspace/act/act/delegation.py
Christian Nennemann 2506b6325a
Some checks failed
CI / test (3.11) (push) Failing after 1m37s
CI / test (3.12) (push) Failing after 57s
feat: add draft data, gap analysis report, and workspace config
2026-04-06 18:47:15 +02:00

334 lines
11 KiB
Python

"""ACT delegation chain construction and verification.
Handles peer-to-peer delegation where Agent A authorizes Agent B
with reduced privileges, building a cryptographic chain of authority.
Reference: ACT §6 (Delegation Chain).
"""
from __future__ import annotations
import base64
from typing import Any
from .crypto import (
PrivateKey,
PublicKey,
compute_sha256,
sign as crypto_sign,
verify as crypto_verify,
)
from .errors import (
ACTDelegationError,
ACTPrivilegeEscalationError,
ACTValidationError,
)
from .token import (
ACTMandate,
Capability,
Delegation,
DelegationEntry,
_b64url_encode,
_b64url_decode,
encode_jws,
)
def create_delegated_mandate(
parent_mandate: ACTMandate,
parent_compact: str,
delegator_private_key: PrivateKey,
*,
sub: str,
kid: str,
iss: str,
aud: str | list[str],
iat: int,
exp: int,
jti: str,
cap: list[Capability],
task: Any,
alg: str = "EdDSA",
wid: str | None = None,
max_depth: int | None = None,
oversight: Any | None = None,
) -> tuple[ACTMandate, str]:
"""Create a delegated ACT mandate from a parent mandate.
Agent A (delegator) creates a new mandate for Agent B (sub) with
reduced privileges. The delegation chain is extended with a new
entry linking back to the parent ACT.
Reference: ACT §6.1 (Peer-to-Peer Delegation).
Args:
parent_mandate: The parent ACT that authorizes delegation.
parent_compact: JWS compact serialization of the parent ACT.
delegator_private_key: The delegator's private key for chain sig.
sub: Target agent identifier.
kid: Key identifier for the new mandate's signing key.
iss: Issuer identifier (the delegator).
aud: Audience for the new mandate.
iat: Issuance time.
exp: Expiration time.
jti: Unique identifier for the new mandate.
cap: Capabilities (must be subset of parent).
task: TaskClaim for the new mandate.
alg: Algorithm (default EdDSA).
wid: Workflow identifier (optional).
max_depth: Max delegation depth (must be <= parent's).
oversight: Oversight claim (optional).
Returns:
Tuple of (ACTMandate, needs to be signed by delegator).
Raises:
ACTDelegationError: If delegation depth would exceed max_depth.
ACTPrivilegeEscalationError: If cap exceeds parent capabilities.
"""
# Determine parent delegation state
if parent_mandate.delegation is not None:
parent_depth = parent_mandate.delegation.depth
parent_max_depth = parent_mandate.delegation.max_depth
parent_chain = list(parent_mandate.delegation.chain)
else:
# Root mandate without del claim — delegation not permitted
raise ACTDelegationError(
"Parent mandate has no 'del' claim; delegation is not permitted"
)
new_depth = parent_depth + 1
# Validate depth constraints — ACT §6.3 step 3
if new_depth > parent_max_depth:
raise ACTDelegationError(
f"Delegation depth {new_depth} exceeds max_depth {parent_max_depth}"
)
# Validate max_depth — ACT §6.1 step 4
if max_depth is None:
effective_max_depth = parent_max_depth
else:
if max_depth > parent_max_depth:
raise ACTDelegationError(
f"Requested max_depth {max_depth} exceeds parent max_depth "
f"{parent_max_depth}"
)
effective_max_depth = max_depth
# Validate capability subset — ACT §6.2
verify_capability_subset(parent_mandate.cap, cap)
# Compute chain entry signature — ACT §6.1 step 5
parent_hash = compute_sha256(parent_compact.encode("utf-8"))
chain_sig = crypto_sign(delegator_private_key, parent_hash)
chain_sig_b64 = _b64url_encode(chain_sig)
# Build new chain entry
new_entry = DelegationEntry(
delegator=iss,
jti=parent_mandate.jti,
sig=chain_sig_b64,
)
# Extend chain — ordered root → immediate parent
new_chain = parent_chain + [new_entry]
delegation = Delegation(
depth=new_depth,
max_depth=effective_max_depth,
chain=new_chain,
)
mandate = ACTMandate(
alg=alg,
kid=kid,
iss=iss,
sub=sub,
aud=aud,
iat=iat,
exp=exp,
jti=jti,
wid=wid if wid is not None else parent_mandate.wid,
task=task,
cap=cap,
delegation=delegation,
oversight=oversight,
)
return mandate, ""
def verify_capability_subset(
parent_caps: list[Capability],
child_caps: list[Capability],
) -> None:
"""Verify that child capabilities are a subset of parent capabilities.
Each child capability action must exist in the parent. Constraints
must be at least as restrictive.
Reference: ACT §6.2 (Privilege Reduction Requirements).
Raises:
ACTPrivilegeEscalationError: If child cap exceeds parent cap.
"""
parent_actions = {c.action: c for c in parent_caps}
for child_cap in child_caps:
if child_cap.action not in parent_actions:
raise ACTPrivilegeEscalationError(
f"Capability action {child_cap.action!r} not present in "
f"parent capabilities: {sorted(parent_actions.keys())}"
)
parent_cap = parent_actions[child_cap.action]
_verify_constraints_subset(
parent_cap.constraints, child_cap.constraints, child_cap.action
)
def _verify_constraints_subset(
parent_constraints: dict[str, Any] | None,
child_constraints: dict[str, Any] | None,
action: str,
) -> None:
"""Verify child constraints are at least as restrictive as parent.
Reference: ACT §6.2 (Privilege Reduction Requirements).
Rules:
- Numeric values: child must be <= parent (lower = more restrictive)
- data_sensitivity enum: child must be >= parent in ordering
- Unknown/domain-specific: must be byte-for-byte identical
Raises:
ACTPrivilegeEscalationError: If child constraint is less restrictive.
"""
if parent_constraints is None:
# Parent has no constraints — child may add constraints (more restrictive)
return
if child_constraints is None:
# Parent has constraints but child does not — escalation
raise ACTPrivilegeEscalationError(
f"Capability {action!r}: parent has constraints but child does not"
)
# Sensitivity ordering per ACT §6.2
_SENSITIVITY_ORDER = {
"public": 0,
"internal": 1,
"confidential": 2,
"restricted": 3,
}
for key, parent_val in parent_constraints.items():
if key not in child_constraints:
# Missing constraint in child = less restrictive
raise ACTPrivilegeEscalationError(
f"Capability {action!r}: constraint {key!r} present in "
f"parent but missing in child"
)
child_val = child_constraints[key]
if key == "data_sensitivity" or key == "data_classification_max":
# Enum comparison — higher = more restrictive
p_ord = _SENSITIVITY_ORDER.get(parent_val)
c_ord = _SENSITIVITY_ORDER.get(child_val)
if p_ord is not None and c_ord is not None:
if c_ord < p_ord:
raise ACTPrivilegeEscalationError(
f"Capability {action!r}: constraint {key!r} "
f"value {child_val!r} is less restrictive than "
f"parent value {parent_val!r}"
)
continue
if isinstance(parent_val, (int, float)) and isinstance(child_val, (int, float)):
# Numeric: lower/equal = more restrictive
if child_val > parent_val:
raise ACTPrivilegeEscalationError(
f"Capability {action!r}: numeric constraint {key!r} "
f"value {child_val} exceeds parent value {parent_val}"
)
continue
# Unknown/domain-specific: must be identical — ACT §6.2
if child_val != parent_val:
raise ACTPrivilegeEscalationError(
f"Capability {action!r}: constraint {key!r} value "
f"{child_val!r} differs from parent value {parent_val!r} "
f"(non-comparable constraints must be identical)"
)
def verify_delegation_chain(
mandate: ACTMandate,
resolve_key: Any,
resolve_parent_compact: Any | None = None,
) -> None:
"""Verify the delegation chain of a mandate.
Reference: ACT §6.3 (Delegation Verification).
Args:
mandate: The ACT mandate to verify.
resolve_key: Callable(delegator_id: str) -> PublicKey to resolve
the public key of a delegator.
resolve_parent_compact: Optional callable(jti: str) -> str|None
to retrieve the parent ACT compact form.
Required for full chain sig verification.
Raises:
ACTDelegationError: If the chain is structurally invalid.
ACTPrivilegeEscalationError: If capabilities were escalated.
"""
if mandate.delegation is None:
# No delegation — root mandate, nothing to verify
return
delegation = mandate.delegation
# Step 3: depth <= max_depth
if delegation.depth > delegation.max_depth:
raise ACTDelegationError(
f"Delegation depth {delegation.depth} exceeds "
f"max_depth {delegation.max_depth}"
)
# Step 4: chain length == depth
if len(delegation.chain) != delegation.depth:
raise ACTDelegationError(
f"Delegation chain length {len(delegation.chain)} does not "
f"match depth {delegation.depth}"
)
# Step 2: verify each chain entry
for i, entry in enumerate(delegation.chain):
# Step 2a: resolve delegator's public key
try:
pub_key = resolve_key(entry.delegator)
except Exception as e:
raise ACTDelegationError(
f"Cannot resolve key for delegator {entry.delegator!r} "
f"at chain index {i}: {e}"
) from e
# Step 2b: verify signature if parent compact is available
if resolve_parent_compact is not None:
parent_compact = resolve_parent_compact(entry.jti)
if parent_compact is not None:
parent_hash = compute_sha256(
parent_compact.encode("utf-8")
)
sig_bytes = _b64url_decode(entry.sig)
try:
crypto_verify(pub_key, sig_bytes, parent_hash)
except Exception as e:
raise ACTDelegationError(
f"Chain entry signature verification failed at "
f"index {i} (delegator={entry.delegator!r}): {e}"
) from e