Compare commits

11 Commits
ect-00 ... main

Author SHA1 Message Date
ba38569319 fix: update hash format validation to -01 spec (plain base64url, no prefix)
Go ValidateHashFormat was still validating the old -00 format
(algorithm:base64url with sha-256/sha-384/sha-512 prefix). Updated to
validate plain base64url without prefix per -01 spec and RFC 9449.
Python was already updated but uncommitted. Both refimpls now match.
2026-04-11 17:51:29 +02:00
884d2dc836 feat: migrate refimpls from draft-00 to draft-01 claim names
- Rename `par` to `pred` (predecessor) in types, serialization, tests
- Remove `pol`, `pol_decision` from core payload; move to `ect_ext`
- Remove `sub` from payload (not part of ECT spec)
- Update `typ` from `wimse-exec+jwt` to `exec+jwt` (accept both)
- Rename MaxParLength to MaxPredLength everywhere
- Update testdata, demos, READMEs with migration table
- All Go tests pass, all 56 Python tests pass (90% coverage)
2026-04-03 10:55:58 +02:00
ba044f6626 Merge branch 'main' of git.xorwell.de:c/ietf-wimse-ect 2026-04-03 07:53:47 +02:00
8cf0d8aade feat: polish draft-01 for submission — claim renames, review fixes, refimpl docs
Draft improvements:
- Rename ext -> ect_ext, clarify iss/aud requirements per level
- Add algorithm agility guidance and RFC 8725 reference
- Add HTTP header size constraints and body transport fallback
- Add cross-level parent reference semantics
- Add emerging agent protocols (A2A, MCP) to Related Work
- Fix HTTP error handling (403 not 401), IANA +jwt suffix note
- Add workflow consistency check to DAG validation
- Add defense-in-depth note for acyclicity check

Supporting files:
- Fix blog post outdated claim names (par -> pred, ext -> ect_ext)
- Update refimpl README with -00 vs -01 migration mapping
- Add refimpl IMPROVEMENTS.md section 6 with -01 migration tasks
2026-04-03 07:49:36 +02:00
16a1973d02 Fix: prevent duplicate triggers and bot self-triggering 2026-03-09 22:21:04 +00:00
342ca5257c Add Claude workflow with cost controls and model selection 2026-03-09 22:17:26 +00:00
bd2a5f819a Fix build errors: duplicate anchors, missing ref, back-matter numbering
- Add unique anchors for security subsections that collided with
  protocol section anchors (level-1/2/3, x509-binding, wimse-binding)
- Fix broken {{jwt-claims}} reference to {{exec-claims}}
- Fix I-D.oauth-transaction-tokens-for-agents reference (add manual
  entry since bibxml3 lookup fails)
- Remove RFC2119/8174 from YAML normative block (bcp14-tagged
  boilerplate adds them automatically)
- Add {:numbered="false"} to all back-matter subsections

Build now succeeds: XML, TXT, and HTML generated cleanly.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 20:07:11 +01:00
eb79c6998a Apply comprehensive review fixes to draft-nennemann-wimse-ect-01
Critical fixes:
- Add RFC 2119/8174 to normative refs, move RFC 9449 to normative
- Rewrite level detection algorithm with precise parsing order
  (JWS first, then base64url-decode for L1)
- Add downgrade attack analysis and minimum-level policy requirement
- Complete application/wimse-exec+jwt IANA registration template
- Fix bare draft-00 citations, fix I-D reference anchor format
- Rewrite abstract to remove changelog language

Medium fixes:
- Add jti replay check to L1 verification procedure
- Add L3 async failure handling (notify downstream, treat as L2)
- Add L3 sync timeout retry/fallback guidance
- Add identity binding security subsection (JWK caching, OCSP
  failure policy, trust bundle refresh)
- Add audit ledger threats subsection (availability, split-view,
  receipt authenticity, async gap)
- Collapse redundant Section 9 into HTTP Error Handling
- Remove redundant L3 verification steps for iss/aud
- Add L2 use case (multi-vendor SaaS document pipeline)

Low fixes:
- Strengthen ext object limits from SHOULD NOT to MUST NOT
- Add level negotiation future work note
- Document L1 DAG validation limitation without ledger
- Add alg=none defense-in-depth note
- Strengthen self-assertion limitation for L1
- Add workflow topology leakage to privacy considerations
- Add cross-workflow correlation to privacy considerations
- Add RATS (RFC 9334) to related work
- Expand SCITT comparison with L3 audit ledger parallel
- Pin SPIFFE reference to specific version URL
- Clean up redundant {:numbered="false"} in back matter

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 19:58:04 +01:00
139a4e85e2 Rename 'par' claim to 'pred' and fix area to SEC
Rename the 'par' (Parent Task Identifiers) JWT claim to 'pred'
(Predecessor Task Identifiers) to avoid collision with RFC 9126
(Pushed Authorization Requests) which already registers 'par' in
the IANA JWT Claims registry. Fix IETF area from ART to SEC since
WIMSE is in the Security area.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 19:35:12 +01:00
2d3af57923 Restructure repo: single source file with git tags for versioning
Drop versioned directories and archive/ in favor of git tags (draft-00,
draft-01) for frozen submissions. Rename source to
draft-nennemann-wimse-ect.md (version comes from docname in front matter).
Update build.sh to extract docname automatically. Ignore generated outputs.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 19:20:38 +01:00
998a7f2eb8 Add draft-nennemann-wimse-ect-01 with assurance levels and identity-framework agnostic design
Introduces three assurance levels (L1 unsigned JSON, L2 JOSE signing,
L3 JOSE signing with audit ledger) so deployments can choose the
appropriate trade-off between simplicity and regulatory compliance.

Decouples ECTs from WIMSE/SPIFFE hard dependencies by introducing an
abstract identity binding model with concrete profiles for WIMSE,
X.509, and JWK sets. The typ header moves from wimse-exec+jwt to
exec+jwt (with backward compatibility).

Includes blog article (blog-ect-assurance-levels.md) explaining the
assurance levels change and identity-framework agnostic design.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-01 23:04:12 +01:00
44 changed files with 2633 additions and 8522 deletions

134
.gitea/workflows/claude.yml Normal file
View File

@@ -0,0 +1,134 @@
name: Claude Code Assistant
on:
issues:
types: [opened, labeled]
issue_comment:
types: [created]
concurrency:
group: claude-${{ github.event.issue.number }}
cancel-in-progress: true
jobs:
claude-code:
if: >-
(github.event_name == 'issues' &&
contains(toJSON(github.event.issue.labels), 'claude')) ||
(github.event_name == 'issue_comment' &&
contains(github.event.comment.body, '@claude') &&
github.event.comment.user.login != 'admin')
runs-on: ubuntu-latest
timeout-minutes: 15
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Run Claude on Issue
env:
ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }}
GIT_TOKEN: ${{ secrets.GIT_TOKEN }}
run: |
set +e
# Configure git
git config user.name "Claude Bot"
git config user.email "claude@localhost"
git remote set-url origin "http://admin:${GIT_TOKEN}@localhost:3000/${{ github.repository }}.git"
ISSUE_NUMBER="${{ github.event.issue.number }}"
ISSUE_TITLE="${{ github.event.issue.title }}"
REPO="${{ github.repository }}"
LABELS_JSON='${{ toJSON(github.event.issue.labels) }}'
# Determine model + cost limits from issue labels
# Default: haiku (cheap). Add claude:sonnet or claude:opus for harder tasks.
CLAUDE_MODEL="haiku"
MAX_TURNS=15
MAX_BUDGET="0.50"
EFFORT="low"
if echo "$LABELS_JSON" | grep -q '"claude:opus"'; then
CLAUDE_MODEL="claude-opus-4-6"
MAX_TURNS=40
MAX_BUDGET="5.00"
EFFORT="high"
elif echo "$LABELS_JSON" | grep -q '"claude:sonnet"'; then
CLAUDE_MODEL="claude-sonnet-4-6"
MAX_TURNS=25
MAX_BUDGET="2.00"
EFFORT="medium"
fi
ISSUE_BODY=$(curl -s "http://localhost:3000/api/v1/repos/${REPO}/issues/${ISSUE_NUMBER}" \
-H "Authorization: token ${GIT_TOKEN}" | python3 -c "import sys,json; print(json.load(sys.stdin).get('body',''))")
COMMENT_BODY=""
if [ "${{ github.event_name }}" = "issue_comment" ]; then
COMMENT_ID="${{ github.event.comment.id }}"
COMMENT_BODY=$(curl -s "http://localhost:3000/api/v1/repos/${REPO}/issues/comments/${COMMENT_ID}" \
-H "Authorization: token ${GIT_TOKEN}" | python3 -c "import sys,json; print(json.load(sys.stdin).get('body',''))")
fi
BRANCH="claude/issue-${ISSUE_NUMBER}"
git checkout -b "${BRANCH}"
# Run Claude Code with cost controls
claude -p "You are working on the repository ${REPO} (Gitea instance at http://localhost:3000).
A Gitea issue needs your attention:
Issue #${ISSUE_NUMBER}: ${ISSUE_TITLE}
Description: ${ISSUE_BODY}
Additional context: ${COMMENT_BODY}
IMPORTANT RULES:
- Do NOT retry failed commands more than once. If something fails twice, stop and report the error.
- Do NOT loop on failing tests. Fix the obvious issue or report it. Never run the same failing command 3+ times.
- If you cannot complete the task, push what you have, create the PR as draft, and explain what is blocked.
- Be efficient: read only files you need, make targeted edits, avoid unnecessary exploration.
Steps:
1. Read and understand the relevant parts of the codebase
2. Implement the requested changes
3. Commit your changes with a descriptive message
4. Push branch ${BRANCH} to origin
5. Create a pull request targeting main that references issue #${ISSUE_NUMBER}
6. Post a comment on issue #${ISSUE_NUMBER} summarizing what you did
Git is configured. You are on branch ${BRANCH}. Work in the current directory.
Use git commands to push, and curl to the Gitea API for PR creation and comments.
Gitea API token is available as env var GIT_TOKEN." \
--allowedTools "Bash,Read,Edit,Write,Glob,Grep" \
--model "${CLAUDE_MODEL}" \
--max-turns "${MAX_TURNS}" \
--max-budget-usd "${MAX_BUDGET}" \
--effort "${EFFORT}" \
--permission-mode bypassPermissions \
--output-format json 2>&1 > /tmp/claude-result.json
CLAUDE_EXIT=$?
# Extract cost from JSON output
COST=$(python3 -c "
import json
with open('/tmp/claude-result.json') as f:
data = json.load(f)
cost = data.get('total_cost_usd', 0)
print(f'\${cost:.4f}')
" 2>/dev/null || echo "unknown")
# Amend the last commit to include cost and model
if git log --oneline main..HEAD 2>/dev/null | head -1 | grep -q .; then
LAST_MSG=$(git log -1 --format=%B)
git commit --amend -m "${LAST_MSG}
Claude model: ${CLAUDE_MODEL} | API cost: ${COST}" --no-verify
git push origin "${BRANCH}" --force
fi
# Post cost as comment
curl -s -X POST "http://localhost:3000/api/v1/repos/${REPO}/issues/${ISSUE_NUMBER}/comments" \
-H "Authorization: token ${GIT_TOKEN}" \
-H "Content-Type: application/json" \
-d "{\"body\": \"Done (model: **${CLAUDE_MODEL}**, effort: ${EFFORT}, budget cap: \$${MAX_BUDGET}). API cost: **${COST}**\"}" > /dev/null
exit ${CLAUDE_EXIT}

4
.gitignore vendored
View File

@@ -1 +1,5 @@
.refcache/
# Generated build outputs (XML, TXT, HTML)
draft-nennemann-wimse-ect-*.xml
draft-nennemann-wimse-ect-*.txt
draft-nennemann-wimse-ect-*.html

View File

@@ -0,0 +1,163 @@
# The False Choice: Why Agentic Audit Trails Need Assurance Levels
*How draft-nennemann-wimse-ect-01 introduces L1/L2/L3 to bridge the gap between "no overhead" and "full regulatory compliance."*
---
When you're building a system where AI agents act on behalf of users — approving transactions, recommending treatments, executing trades — you face a question that doesn't have a clean answer: how much proof do you need that each step actually happened?
The first version of the Execution Context Token specification (draft-nennemann-wimse-ect-00) answered this with "all of it." Every ECT required a full JWS asymmetric signature, tightly coupled to WIMSE Workload Identity Tokens. That's the right answer for cross-organizational deployments where two companies need cryptographic non-repudiation. But it's wildly excessive for a team running an internal AI pipeline behind a service mesh. And it's not enough for a healthcare system that needs to prove, years later, that every step in a clinical decision was tamper-evidently recorded.
The -01 revision introduces three assurance levels: L1, L2, and L3. Same payload. Same DAG structure. Same claims. Different envelopes, different verification, different guarantees. Choose the one that fits your threat model.
## What Are ECTs?
Before diving into assurance levels, a quick recap. Execution Context Tokens are JWT-based records of what an agent actually did in a distributed workflow. Each ECT captures a single task — "recommend_treatment", "verify_compliance", "execute_trade" — and links it to its predecessor tasks through a directed acyclic graph (DAG).
ECTs originated as an extension to the IETF WIMSE (Workload Identity in Multi-System Environments) framework, but the -01 revision makes them identity-framework agnostic. Your identity layer handles "who is this agent?" ECTs handle "what did this agent do, and in what order?"
The key properties:
- **Per-task granularity.** One ECT per task, not one per session or per request chain.
- **DAG ordering.** Parent references (`pred` claim) create a verifiable execution graph. Fan-out, fan-in, parallel branches — all representable.
- **Data integrity without data exposure.** Input and output hashes (`inp_hash`, `out_hash`) prove what was processed without revealing the data itself.
- **Identity-framework agnostic.** ECTs work with WIMSE WIT/WPT, X.509 certificates, OAuth credentials, or plain JWK sets. The spec defines abstract identity binding requirements and concrete profiles for each framework.
## The Problem with draft-00
The -00 draft defined one mode of operation: JWS Compact Serialization with asymmetric signing. Every ECT, always. This created two problems:
**Too heavy for internal deployments.** If all your agents run in the same Kubernetes cluster behind Istio with mTLS, requiring every agent to sign every ECT with ES256 is pure overhead. You already trust the transport. You already trust the agents (you operate them). The signature gives you non-repudiation, but you don't need non-repudiation — you need structured execution records for debugging and observability.
**Not heavy enough for regulated deployments.** A JWS signature proves who created the ECT and that it wasn't tampered with. But it doesn't prove the ECT was *recorded*. A malicious or buggy agent could create a valid signed ECT and then quietly drop it. For environments subject to FDA 21 CFR Part 11, MiFID II, or the EU AI Act, you need more: a tamper-evident ledger with cryptographic commitment, hash chains, and verifiable receipts proving that every ECT was committed to permanent storage at a specific time.
One size doesn't fit all. The -01 revision acknowledges this by introducing three assurance levels.
## Three Assurance Levels
### Level 1: Unsigned JSON
At L1, an ECT is a plain JSON object. No signature. No JWS envelope. Integrity comes from TLS.
**What you get:**
- Structured execution records with the full ECT payload (all the same claims)
- DAG validation (parent references, acyclicity, temporal ordering)
- Transport integrity via TLS/mTLS
**What you don't get:**
- Non-repudiation (no signature binds the ECT to the issuer)
- Tamper detection after delivery (a compromised intermediary could modify the ECT)
- Issuer authentication at the ECT layer
**Where it fits:** Internal microservice meshes, development environments, AI platforms where all agents are operated by the same team. Think of it as "structured tracing" — you get the DAG and the claims without the PKI overhead.
**Hard rule:** L1 MUST NOT cross trust domain boundaries. If agents from different organizations interact, L1 is not appropriate.
### Level 2: JOSE Asymmetric Signing
L2 is the -00 behavior, generalized. Each ECT is a signed JWT in JWS Compact Serialization format. The agent signs with a private key bound to its identity credential — whether that's a WIMSE WIT, an X.509 certificate, or a key from a JWK set. The verifier resolves the `kid` through the deployment's identity framework.
**What you get (beyond L1):**
- Non-repudiation (the cryptographic signature proves the issuer)
- Tamper detection (any modification invalidates the signature)
- Issuer authentication (the `kid` links to the agent's identity credential)
- Audience restriction (`aud` limits who can verify)
**Where it fits:** Cross-organization deployments, peer-to-peer agent communication, any environment requiring cryptographic proof of origin. L2 is the recommended default for production.
**Backward compatibility:** Implementations of draft-nennemann-wimse-ect-00 are L2-compatible. No changes needed.
### Level 3: JOSE Signing with Audit Ledger
L3 is L2 plus a mandatory audit ledger with cryptographic commitment. The JWS signing is identical to L2. The difference is what happens after signing: the ECT must be recorded in a ledger that provides hash chains, Merkle proofs (or equivalent), and verifiable receipts.
**What you get (beyond L2):**
- Tamper-evident history (hash chains detect modification, insertion, or deletion of ledger entries)
- Cryptographic commitment (Merkle proofs allow independent verification of inclusion)
- Non-repudiation of recording (the receipt proves the ECT was committed at a specific time and position)
**The ledger requirements:**
- Each entry includes a cryptographic hash of the previous entry (hash chain)
- The ledger provides verifiable commitment proofs
- Upon append, the ledger returns a receipt with the sequence number, entry hash, and commitment proof
- The ledger should be operated independently of the workflow agents
**Recording modes:** L3 supports both synchronous recording (wait for receipt before sending the ECT downstream) and asynchronous recording (send first, verify receipt later). Synchronous is stronger; asynchronous is faster. Deployments choose based on latency requirements.
**Where it fits:** Regulated environments — healthcare (FDA audit trails), financial services (MiFID II transaction reporting), any system subject to the EU AI Act's requirements for activity logs in high-risk AI systems.
## Decision Framework
Choosing the right level is straightforward. Ask three questions:
**1. Do agents cross trust domain boundaries?**
If yes, you need at least L2. Non-repudiation is essential when different organizations operate different agents. If no, L1 is an option.
**2. Do you need cryptographic non-repudiation?**
If you need to prove, after the fact, that a specific agent produced a specific ECT, you need L2 or L3. If TLS-level trust is sufficient, L1 works.
**3. Do you operate under regulatory requirements for tamper-evident audit trails?**
If yes, you need L3. The hash-chained ledger with cryptographic commitment provides the tamper evidence that regulators require. If no, L2 is sufficient for production deployments with non-repudiation.
The decision tree in practice:
```
Internal only, no non-repudiation needed? → L1
Cross-org or non-repudiation needed? → L2
Regulatory tamper-evident audit trail required? → L3
```
A single deployment can use different levels for different workflows. Your internal data preprocessing pipeline might be L1, your cross-org partner integrations L2, and your regulated clinical decision workflows L3.
## Real-World Applications
**Healthcare — Clinical Decision Support (L3).** A clinical AI system recommends treatments, checks drug interactions, and routes cases for specialist review. Every step is an ECT. The DAG proves that the safety check happened before the treatment recommendation reached the physician. The L3 ledger provides the tamper-evident audit trail that FDA 21 CFR Part 11 requires for electronic records.
**Financial Services — Algorithmic Trading (L3).** An investment bank's agents coordinate with an external credit rating agency. Portfolio risk analysis, credit assessment, compliance verification, and trade execution form a cross-organizational DAG. L3 ensures every step is signed and committed to a tamper-evident ledger for MiFID II compliance.
**Internal AI Platform — Model Serving (L1).** A company's internal AI platform chains preprocessors, model inference, and postprocessors. All services run in the same Kubernetes cluster with mTLS. L1 ECTs provide structured execution tracing for debugging without PKI overhead.
**Multi-Organization API Gateway (L2).** A SaaS platform's agents interact with customer-operated agents. Both sides need to prove what happened, but there's no regulatory requirement for a tamper-evident ledger. L2 gives both organizations signed, verifiable execution records.
## The Upgrade Path
This is arguably the most important design property of the assurance levels: **the payload is the same at every level.** The same `jti`, `iss`, `aud`, `iat`, `exp`, `exec_act`, `pred`, `inp_hash`, `out_hash`, and `ect_ext` claims appear in every ECT, whether it's unsigned JSON or a ledger-committed JWS token.
What changes is the envelope and the verification procedure. This means upgrading from L1 to L2 means adding a JWS wrapper around the same payload. Upgrading from L2 to L3 means deploying an audit ledger and adding the ledger recording step after JWS verification.
Your DAG structure doesn't change. Your claim semantics don't change. Your workflow logic doesn't change. The agents still produce the same payload — they just wrap it differently and verify it more thoroughly.
This is intentional. A startup might begin with L1 during development, move to L2 when they start working with external partners, and add L3 when they enter regulated markets. At each step, the payload format is familiar, the DAG is the same, and only the security envelope changes.
## What Changed from -00 to -01
A summary of the structural changes:
**New Section 1.2: Assurance Levels and Applicability.** Defines L1, L2, L3 with applicability guidance.
**Restructured Section 3: ECT Format.** The payload structure is now level-independent (Section 3.1), with separate subsections for L1 (3.3), L2 (3.4), and L3 (3.5). Each level has its own transport, verification, and security properties subsections.
**iss/aud requirement relaxation.** The `iss` and `aud` claims move from REQUIRED to RECOMMENDED at the payload level. They remain REQUIRED for L2 and L3 (the verification procedure mandates them), but L1 deployments can omit them when identity is handled entirely at the transport layer.
**Updated verification.** Section 6 adds level detection as the first step. L1 gets a simplified procedure (no signature steps). L2 retains the -00 procedure. L3 adds ledger recording and receipt verification.
**Expanded audit ledger.** Section 7 distinguishes baseline ledger requirements (all levels) from L3-specific requirements (hash chains, commitment proofs, receipts). New subsections cover ledger independence and L3 verification procedures.
**Level-specific security analysis.** Section 8 adds a new subsection analyzing each level's security properties, including L1 limitations and L3 ledger properties.
**Identity-framework agnostic design.** The -00 draft hardcoded WIMSE WIT/WPT as the identity layer. The -01 draft introduces an abstract "Identity Binding" section defining what any identity framework must provide (key association, issuer correspondence, algorithm consistency, revocation checking) and then defines concrete profiles for WIMSE, X.509, and JWK sets. The `typ` header moves from `wimse-exec+jwt` to `exec+jwt` (with backward compatibility for the old value). This means you can use ECTs without deploying WIMSE — bring your own identity framework.
## What's Next
The -01 draft is open for working group feedback. Key questions for the community:
- Is the three-level structure the right granularity, or should there be intermediate levels?
- Should L3 ledger requirements reference SCITT (Supply Chain Integrity, Transparency, and Trust) more directly?
- Are the L1 restrictions (MUST NOT cross trust domain boundaries) sufficient, or should L1 be further constrained?
- Are the identity binding profiles (WIMSE, X.509, JWK set) sufficient, or should additional profiles be defined (e.g., OAuth client credentials, DID-based)?
The reference implementations (Go and Python) will be updated to support all three levels and the pluggable identity binding model. Community contributions and feedback are welcome — the specification is developed in the open, and the goal is a standard that works for real deployments, from internal microservices to regulated clinical AI systems.
---
*Christian Nennemann is an independent researcher focused on workload identity and execution accountability in agentic AI systems. The ECT specification originated within the IETF WIMSE working group and is designed for use with any identity framework.*

View File

@@ -1,22 +1,39 @@
#!/bin/bash
set -e
DRAFT="draft-nennemann-wimse-ect-00"
DIR="$(cd "$(dirname "$0")" && pwd)"
SRC="$DIR/draft-nennemann-wimse-ect.md"
# Extract docname from YAML front matter
DRAFT=$(grep '^docname:' "$SRC" | head -1 | awk '{print $2}')
if [ -z "$DRAFT" ]; then
echo "Error: could not extract docname from $SRC"
exit 1
fi
# Tool paths
KRAMDOWN="/usr/local/lib/ruby/gems/3.4.0/bin/kramdown-rfc2629"
XML2RFC="/Users/christian/Library/Python/3.9/bin/xml2rfc"
KRAMDOWN="$(which kramdown-rfc2629 2>/dev/null)"
XML2RFC="$(which xml2rfc 2>/dev/null)"
if [ -z "$KRAMDOWN" ]; then
echo "Error: kramdown-rfc2629 not found. Install with: gem install kramdown-rfc2629"
exit 1
fi
if [ -z "$XML2RFC" ]; then
echo "Error: xml2rfc not found. Install with: pip install xml2rfc"
exit 1
fi
export PYTHONWARNINGS="ignore::UserWarning"
echo "Building: $DRAFT"
echo "Using kramdown-rfc2629: $KRAMDOWN"
echo "Using xml2rfc: $XML2RFC"
echo ""
# Step 1: Markdown -> XML
echo "Converting markdown to XML..."
"$KRAMDOWN" "$DIR/$DRAFT.md" > "$DIR/$DRAFT.xml"
"$KRAMDOWN" "$SRC" > "$DIR/$DRAFT.xml"
# Step 2: XML -> TXT
echo "Generating text output..."

File diff suppressed because it is too large Load Diff

View File

@@ -1,926 +0,0 @@
---
title: "Execution Context Tokens for Distributed Agentic Workflows"
abbrev: "WIMSE Execution Context"
category: std
docname: draft-nennemann-wimse-ect-00
submissiontype: IETF
number:
date:
v: 3
area: "ART"
workgroup: "WIMSE"
keyword:
- execution context
- workload identity
- agentic workflows
- audit trail
author:
-
fullname: Christian Nennemann
organization: Independent Researcher
email: ietf@nennemann.de
normative:
RFC7515:
RFC7517:
RFC7519:
RFC7518:
RFC9562:
RFC9110:
I-D.ietf-wimse-arch:
I-D.ietf-wimse-s2s-protocol:
informative:
RFC8693:
SPIFFE:
title: "Secure Production Identity Framework for Everyone (SPIFFE)"
target: https://spiffe.io/docs/latest/spiffe-about/overview/
date: false
OPENTELEMETRY:
title: "OpenTelemetry Specification"
target: https://opentelemetry.io/docs/specs/otel/
date: false
author:
- org: Cloud Native Computing Foundation
I-D.ietf-scitt-architecture:
RFC9449:
I-D.ietf-oauth-transaction-tokens:
I-D.oauth-transaction-tokens-for-agents:
--- abstract
This document defines Execution Context Tokens (ECTs), a JWT-based
extension to the WIMSE architecture that records task execution
across distributed agentic workflows. Each ECT is a signed record
of a single task, linked to predecessor tasks through a directed
acyclic graph (DAG). ECTs reuse the WIMSE signing model and are
transported in a new Execution-Context HTTP header field alongside
existing WIMSE identity headers.
--- middle
# Introduction
The WIMSE framework {{I-D.ietf-wimse-arch}} and its service-to-
service protocol {{I-D.ietf-wimse-s2s-protocol}} authenticate
workloads across call chains but do not record what those
workloads actually did. This document defines Execution Context
Tokens (ECTs), a JWT-based extension that fills the gap between
workload identity and execution accountability. Each ECT is a
signed record of a single task, linked to predecessor tasks
through a directed acyclic graph (DAG).
## Scope and Applicability
This document defines:
- The Execution Context Token (ECT) format ({{ect-format}})
- DAG structure for task dependency ordering ({{dag-validation}})
- An HTTP header for ECT transport ({{http-header}})
- Audit ledger interface requirements ({{ledger-interface}})
The following are out of scope and are handled by WIMSE:
- Workload authentication and identity provisioning
- Key distribution and management
- Trust domain establishment and management
- Credential lifecycle management
# Conventions and Definitions
{::boilerplate bcp14-tagged}
The following terms are used in this document:
Agent:
: An autonomous workload, as defined by WIMSE
{{I-D.ietf-wimse-arch}}, that executes tasks within a workflow.
Task:
: A discrete unit of agent work that consumes inputs and produces
outputs.
Directed Acyclic Graph (DAG):
: A graph structure representing task dependency ordering where
edges are directed and no cycles exist.
Execution Context Token (ECT):
: A JSON Web Token {{RFC7519}} defined by this specification that
records task execution details.
Audit Ledger:
: An append-only, immutable log of all ECTs within a workflow or
set of workflows, used for audit and verification.
Workload Identity Token (WIT):
: A WIMSE credential proving a workload's identity within a trust
domain.
Workload Proof Token (WPT):
: A WIMSE proof-of-possession token used for request-level
authentication.
Trust Domain:
: A WIMSE concept representing an organizational boundary with a
shared identity issuer, corresponding to a SPIFFE {{SPIFFE}}
trust domain.
# Execution Context Token Format {#ect-format}
An Execution Context Token is a JSON Web Token (JWT) {{RFC7519}}
signed as a JSON Web Signature (JWS) {{RFC7515}}. ECTs MUST use
JWS Compact Serialization (the base64url-encoded
`header.payload.signature` format) so that they can be carried in
a single HTTP header value.
ECTs reuse the WIMSE signing model. The ECT MUST be signed with
the same private key associated with the agent's WIT. The JOSE
header "kid" parameter MUST reference the public key identifier
from the agent's WIT, and the "alg" parameter MUST match the
algorithm used in the corresponding WIT. In WIMSE deployments,
the ECT "iss" claim SHOULD use the WIMSE workload identifier
format (a SPIFFE ID {{SPIFFE}}).
## JOSE Header {#jose-header}
The ECT JOSE header MUST contain the following parameters:
~~~json
{
"alg": "ES256",
"typ": "wimse-exec+jwt",
"kid": "agent-a-key-id-123"
}
~~~
{: #fig-header title="ECT JOSE Header Example"}
alg:
: REQUIRED. The digital signature algorithm used to sign the ECT.
MUST match the algorithm in the corresponding WIT.
Implementations MUST support ES256 {{RFC7518}}. The "alg"
value MUST NOT be "none". Symmetric algorithms (e.g., HS256,
HS384, HS512) MUST NOT be used, as ECTs require asymmetric
signatures for non-repudiation.
typ:
: REQUIRED. MUST be set to "wimse-exec+jwt" to distinguish ECTs
from other JWT types, consistent with the WIMSE convention for
type parameter values.
kid:
: REQUIRED. The key identifier referencing the public key from
the agent's WIT {{RFC7517}}. Used by verifiers to look up the
correct public key for signature verification.
## JWT Claims {#jwt-claims}
### Standard JWT Claims
An ECT MUST contain the following standard JWT claims {{RFC7519}}:
iss:
: REQUIRED. StringOrURI. A URI identifying the issuer of the
ECT. In WIMSE deployments, this SHOULD be the workload's
SPIFFE ID in the format `spiffe://<trust-domain>/<path>`,
matching the "sub" claim of the agent's WIT. Non-WIMSE
deployments MAY use other URI schemes (e.g., HTTPS URLs or
URN:UUID identifiers).
aud:
: REQUIRED. StringOrURI or array of StringOrURI. The intended
recipient(s) of the ECT. The "aud" claim SHOULD contain the
identifiers of all entities that will verify the ECT. When
an ECT must be verified by both the next agent and the audit
ledger independently, "aud" MUST be an array containing both
identifiers. Each verifier checks that its own identity
appears in "aud".
iat:
: REQUIRED. NumericDate. The time at which the ECT was issued.
exp:
: REQUIRED. NumericDate. The expiration time of the ECT.
Implementations SHOULD set this to 5 to 15 minutes after "iat".
jti:
: REQUIRED. String. A unique identifier for both the ECT and
the task it records, in UUID format {{RFC9562}}. The "jti"
serves as both the token identifier (for replay detection) and
the task identifier (for DAG parent references in "par").
Receivers MUST reject ECTs whose "jti" has already been seen
within the expiration window. When "wid" is present,
uniqueness is scoped to the workflow; when "wid" is absent,
uniqueness MUST be enforced globally across the ECT store.
### Execution Context {#exec-claims}
The following claims are defined by this specification:
wid:
: OPTIONAL. String. A workflow identifier that groups related
ECTs into a single workflow. When present, MUST be a UUID
{{RFC9562}}.
exec_act:
: REQUIRED. String. The action or task type identifier describing
what the agent performed (e.g., "process_payment",
"validate_safety"). This claim name avoids collision with the
"act" (Actor) claim registered by {{RFC8693}}.
par:
: REQUIRED. Array of strings. Parent task identifiers
representing DAG dependencies. Each element MUST be the "jti"
value of a previously verified ECT. An empty array indicates
a root task with no dependencies. A workflow MAY contain
multiple root tasks.
### Data Integrity {#data-integrity-claims}
The following claims provide integrity verification for task
inputs and outputs without revealing the data itself:
inp_hash:
: OPTIONAL. String. The base64url encoding (without padding) of
the SHA-256 hash of the input data, computed over the raw octets
of the input. SHA-256 is the mandatory algorithm with no
algorithm prefix in the value, consistent with {{RFC9449}} and
{{I-D.ietf-wimse-s2s-protocol}}.
out_hash:
: OPTIONAL. String. The base64url encoding (without padding) of
the SHA-256 hash of the output data, using the same format as
"inp_hash".
### Extensions {#extension-claims}
ext:
: OPTIONAL. Object. A general-purpose extension object for
domain-specific claims not defined by this specification.
Implementations that do not understand extension claims MUST
ignore them. Extension key names SHOULD use reverse domain
notation (e.g., "com.example.custom_field") to avoid
collisions. The serialized "ext" object SHOULD NOT exceed
4096 bytes and SHOULD NOT exceed a nesting depth of 5 levels.
## Complete ECT Example
The following is a complete ECT payload example:
~~~json
{
"iss": "spiffe://example.com/agent/clinical",
"aud": "spiffe://example.com/agent/safety",
"iat": 1772064150,
"exp": 1772064750,
"jti": "550e8400-e29b-41d4-a716-446655440001",
"wid": "a0b1c2d3-e4f5-6789-abcd-ef0123456789",
"exec_act": "recommend_treatment",
"par": [],
"inp_hash": "n4bQgYhMfWWaL-qgxVrQFaO_TxsrC4Is0V1sFbDwCgg",
"out_hash": "LCa0a2j_xo_5m0U8HTBBNBNCLXBkg7-g-YpeiGJm564",
"ext": {
"com.example.trace_id": "abc123"
}
}
~~~
{: #fig-full-ect title="Complete ECT Payload Example"}
# HTTP Header Transport {#http-header}
## Execution-Context Header Field
This specification defines the Execution-Context HTTP header field
{{RFC9110}} for transporting ECTs between agents.
The header field value is the ECT in JWS Compact Serialization
format {{RFC7515}}. The value consists of three Base64url-encoded
parts separated by period (".") characters.
An agent sending a request to another agent includes the
Execution-Context header alongside the WIMSE Workload-Identity
header. When a Workload Proof Token (WPT) is available per
{{I-D.ietf-wimse-s2s-protocol}}, agents SHOULD include it
alongside the WIT and ECT.
~~~
GET /api/safety-check HTTP/1.1
Host: safety-agent.example.com
Workload-Identity: eyJhbGci...WIT...
Execution-Context: eyJhbGci...ECT...
~~~
{: #fig-http-example title="HTTP Request with ECT Header"}
When multiple parent tasks contribute context to a single request,
multiple Execution-Context header field lines MAY be included, each
carrying a separate ECT in JWS Compact Serialization format.
When a receiver processes multiple Execution-Context headers, it
MUST individually verify each ECT per the procedure in
{{verification}}. If any single ECT fails verification, the
receiver MUST reject the entire request. The set of verified
parent task IDs across all received ECTs represents the complete
set of parent dependencies available for the receiving agent's
subsequent ECT.
# DAG Validation {#dag-validation}
ECTs form a Directed Acyclic Graph (DAG) where each task
references its parent tasks via the "par" claim. DAG validation
is performed against the ECT store — either an audit ledger or
the set of parent ECTs received inline.
When receiving and verifying an ECT, implementations MUST perform
the following DAG validation steps:
1. Task ID Uniqueness: The "jti" claim MUST be unique within the
applicable scope (the workflow identified by "wid", or the
entire ECT store if "wid" is absent). If an ECT with the same
"jti" already exists, the ECT MUST be rejected.
2. Parent Existence: Every task identifier listed in the "par"
array MUST correspond to a task that is available in the ECT
store (either previously recorded in the ledger or received
inline as a verified parent ECT). If any parent task is not
found, the ECT MUST be rejected.
3. Temporal Ordering: The "iat" value of every parent task MUST
NOT be greater than the "iat" value of the current task plus a
configurable clock skew tolerance (RECOMMENDED: 30 seconds).
That is, for each parent: `parent.iat < child.iat +
clock_skew_tolerance`. The tolerance accounts for clock skew
between agents; it does not guarantee strict causal ordering
from timestamps alone. Causal ordering is primarily enforced
by the DAG structure (parent existence in the ECT store), not by
timestamps. If any parent task violates this constraint, the
ECT MUST be rejected.
4. Acyclicity: Following the chain of parent references MUST NOT
lead back to the current ECT's "jti". If a cycle is detected,
the ECT MUST be rejected.
5. Trust Domain Consistency: Parent tasks SHOULD belong to the
same trust domain or to a trust domain with which a federation
relationship has been established.
To prevent denial-of-service via extremely deep or wide DAGs,
implementations SHOULD enforce a maximum ancestor traversal limit
(RECOMMENDED: 10000 nodes). If the limit is reached before cycle
detection completes, the ECT SHOULD be rejected.
In distributed deployments, a parent ECT may not yet be available
locally due to replication lag. Implementations MAY defer
validation to allow parent ECTs to arrive, but MUST NOT treat
the ECT as verified until all parent references are resolved.
# Signature and Token Verification {#verification}
## Verification Procedure
When an agent receives an ECT, it MUST perform the following
verification steps in order:
1. Parse the JWS Compact Serialization to extract the JOSE header,
payload, and signature components per {{RFC7515}}.
2. Verify that the "typ" header parameter is "wimse-exec+jwt".
3. Verify that the "alg" header parameter appears in the
verifier's configured allowlist of accepted signing algorithms.
The allowlist MUST NOT include "none" or any symmetric
algorithm (e.g., HS256, HS384, HS512). Implementations MUST
include ES256 in the allowlist; additional asymmetric algorithms
MAY be included per deployment policy.
4. Verify the "kid" header parameter references a known, valid
public key from a WIT within the trust domain.
5. Retrieve the public key identified by "kid" and verify the JWS
signature per {{RFC7515}} Section 5.2.
6. Verify that the signing key identified by "kid" has not been
revoked within the trust domain. Implementations MUST check
the key's revocation status using the trust domain's key
lifecycle mechanism (e.g., certificate revocation list, OCSP,
or SPIFFE trust bundle updates).
7. Verify the "alg" header parameter matches the algorithm in the
corresponding WIT.
8. Verify the "iss" claim matches the "sub" claim of the WIT
associated with the "kid" public key.
9. Verify the "aud" claim contains the verifier's own workload
identity. When "aud" is an array, it is sufficient that the
verifier's identity appears as one element; the presence of
other audience values does not cause verification failure.
When the verifier is the audit ledger, the ledger's own
identity MUST appear in "aud".
10. Verify the "exp" claim indicates the ECT has not expired.
11. Verify the "iat" claim is not unreasonably far in the past
(implementation-specific threshold, RECOMMENDED maximum of
15 minutes) and is not unreasonably far in the future
(RECOMMENDED: no more than 30 seconds ahead of the
verifier's current time, to account for clock skew).
12. Verify all required claims ("jti", "exec_act", "par") are
present and well-formed.
13. Perform DAG validation per {{dag-validation}}.
14. If all checks pass and an audit ledger is deployed, the ECT
SHOULD be appended to the ledger.
If any verification step fails, the ECT MUST be rejected and the
failure MUST be logged for audit purposes. Error messages
SHOULD NOT reveal whether specific parent task IDs exist in the
ECT store, to prevent information disclosure.
When ECT verification fails during HTTP request processing, the
receiving agent SHOULD respond with HTTP 403 (Forbidden) if the
WIT is valid but the ECT is invalid, and HTTP 401
(Unauthorized) if the ECT signature verification fails. The
response body SHOULD include a generic error indicator without
revealing which specific verification step failed. The receiving
agent MUST NOT process the requested action when ECT verification
fails.
# Audit Ledger Interface {#ledger-interface}
ECTs MAY be recorded in an immutable audit ledger for compliance
verification and post-hoc analysis. A ledger is RECOMMENDED for
regulated environments but is not required for point-to-point
operation. This specification does not mandate a specific storage
technology. Implementations MAY use append-only logs, databases
with cryptographic commitment schemes, distributed ledgers, or
any storage mechanism that provides the required properties.
When an audit ledger is deployed, the implementation MUST provide:
1. Append-only semantics: Once an ECT is recorded, it MUST NOT be
modified or deleted.
2. Ordering: The ledger MUST maintain a total ordering of ECT
entries via a monotonically increasing sequence number.
3. Lookup by ECT ID: The ledger MUST support efficient retrieval
of ECT entries by "jti" value.
4. Integrity verification: The ledger SHOULD provide a mechanism
to verify that no entries have been tampered with (e.g.,
hash chains or Merkle trees).
The ledger SHOULD be maintained by an entity independent of the
workflow agents to reduce the risk of collusion.
# Security Considerations
## Threat Model
The threat model considers: (1) a malicious agent that creates
false ECT claims, (2) an agent whose private key has been
compromised, (3) a ledger tamperer attempting to modify recorded
entries, and (4) a time manipulator altering timestamps to affect
perceived ordering.
## Self-Assertion Limitation {#self-assertion-limitation}
ECTs are self-asserted by the executing agent. The agent claims
what it did, and this claim is signed with its private key. A
compromised or malicious agent could create ECTs with false claims
(e.g., claiming an action was performed when it was not).
ECTs do not independently verify that:
- The claimed execution actually occurred as described
- The input/output hashes correspond to the actual data processed
- The agent faithfully performed the stated action
The trustworthiness of ECT claims depends on the trustworthiness
of the signing agent and the integrity of the broader deployment
environment. ECTs provide a technical mechanism for execution
recording; they do not by themselves satisfy any specific
regulatory compliance requirement.
## Signature Verification
ECTs MUST be signed with the agent's private key using JWS
{{RFC7515}}. The signature algorithm MUST match the algorithm
specified in the agent's WIT. Receivers MUST verify the ECT
signature against the WIT public key before processing any
claims. Receivers MUST verify that the signing key has not been
revoked within the trust domain (see step 6 in
{{verification}}).
If signature verification fails or if the signing key has been
revoked, the ECT MUST be rejected entirely and the failure MUST
be logged.
Implementations MUST use established JWS libraries and MUST NOT
implement custom signature verification.
## Replay Attack Prevention
ECTs include short expiration times (RECOMMENDED: 5-15 minutes)
and audience restriction via "aud" to limit replay attacks.
Implementations MUST maintain a cache of recently-seen "jti"
values and MUST reject ECTs with duplicate "jti" values. Each
ECT is cryptographically bound to the issuing agent via "kid";
verifiers MUST confirm that "kid" resolves to the "iss" agent's
key (step 8 in {{verification}}).
## Man-in-the-Middle Protection
ECTs MUST be transmitted over TLS or mTLS connections. When used
with {{I-D.ietf-wimse-s2s-protocol}}, transport security is
already established.
## Key Compromise
If an agent's private key is compromised, an attacker can forge
ECTs that appear to originate from that agent. Mitigations:
- Implementations SHOULD use short-lived keys and rotate them
frequently.
- Private keys SHOULD be stored in hardware security modules or
equivalent secure key storage.
- Trust domains MUST support rapid key revocation.
ECTs recorded before key revocation remain valid historical
records but SHOULD be flagged for audit purposes. New ECTs
MUST NOT reference a parent ECT whose signing key is known to
be revoked at creation time.
## Collusion and DAG Integrity {#collusion-and-false-claims}
A single malicious agent cannot forge parent task references
because DAG validation requires parent tasks to exist in the ECT
store. However, multiple colluding agents could create a false
execution history. Additionally, a malicious agent may omit
actual parent dependencies from "par" to hide influences on its
output; because ECTs are self-asserted
({{self-assertion-limitation}}), no mechanism can force complete
dependency declaration.
Mitigations include:
- The ledger SHOULD be maintained by an entity independent of the
workflow agents.
- Multiple independent ledger replicas can be compared for
consistency.
- External auditors can compare the declared DAG against expected
workflow patterns.
Verifiers SHOULD validate that the declared "wid" of parent ECTs
matches the "wid" of the child ECT, rejecting cross-workflow
parent references unless explicitly permitted by deployment
policy.
## Privilege Escalation via ECTs
ECTs record execution history; they do not convey authorization.
Verifiers MUST NOT interpret the presence of an ECT, or a
particular set of parent references in "par", as an authorization
grant. Authorization decisions MUST remain with the identity and
authorization layer (WIT, WPT, and deployment policy).
## Denial of Service
Implementations SHOULD apply rate limiting to prevent excessive
ECT submissions. DAG validation SHOULD be performed after
signature verification to avoid wasting resources on unsigned
tokens.
## Timestamp Accuracy
Implementations SHOULD use synchronized time sources (e.g., NTP)
and SHOULD allow a configurable clock skew tolerance (RECOMMENDED:
30 seconds). Cross-organizational deployments MAY require a
higher tolerance and SHOULD document the configured value.
## ECT Size Constraints
Implementations SHOULD limit the "par" array to a maximum of
256 entries. See {{extension-claims}} for "ext" size limits.
# Privacy Considerations
## Data Exposure in ECTs
ECTs necessarily reveal:
- Agent identities ("iss", "aud") for accountability purposes
- Action descriptions ("exec_act") for audit trail completeness
- Timestamps ("iat", "exp") for temporal ordering
ECTs are designed to NOT reveal:
- Actual input or output data values (replaced with cryptographic
hashes via "inp_hash" and "out_hash")
- Internal computation details or intermediate steps
- Proprietary algorithms or intellectual property
- Personally identifiable information (PII)
## Data Minimization {#data-minimization}
Implementations SHOULD minimize the information included in ECTs.
The "exec_act" claim SHOULD use structured identifiers (e.g.,
"process_payment") rather than natural language descriptions.
Extension keys in "ext" ({{extension-claims}}) deserve particular
attention: human-readable values risk exposing sensitive operational
details. See {{extension-claims}} for guidance on using
structured identifiers.
## Storage and Access Control
ECTs stored in audit ledgers SHOULD be access-controlled so that
only authorized auditors can read them. Implementations SHOULD
consider encryption at rest for ledger storage. ECTs provide
structural records of execution ordering; they are not intended
for public disclosure.
Full input and output data (corresponding to the hashes in ECTs)
SHOULD be stored separately from the ledger with additional access
controls, since auditors may need to verify hash correctness but
general access to the data values is not needed.
# IANA Considerations
## Media Type Registration
This document requests registration of the following media type
in the "Media Types" registry maintained by IANA:
Type name:
: application
Subtype name:
: wimse-exec+jwt
Required parameters:
: none
Optional parameters:
: none
Encoding considerations:
: 8bit; an ECT is a JWT that is a JWS using the Compact
Serialization, which is a sequence of Base64url-encoded values
separated by period characters.
Security considerations:
: See the Security Considerations section of this document.
Interoperability considerations:
: none
Published specification:
: This document
Applications that use this media type:
: Applications that implement agentic workflows requiring execution
context tracing and audit trails.
Additional information:
: Magic number(s): none
File extension(s): none
Macintosh file type code(s): none
Person and email address to contact for further information:
: Christian Nennemann, ietf@nennemann.de
Intended usage:
: COMMON
Restrictions on usage:
: none
Author:
: Christian Nennemann
Change controller:
: IETF
## HTTP Header Field Registration {#header-registration}
This document requests registration of the following header field
in the "Hypertext Transfer Protocol (HTTP) Field Name Registry"
maintained by IANA:
Field name:
: Execution-Context
Status:
: permanent
Specification document:
: This document, {{http-header}}
## JWT Claims Registration {#claims-registration}
This document requests registration of the following claims in
the "JSON Web Token Claims" registry maintained by IANA:
| Claim Name | Claim Description | Change Controller | Reference |
|:---:|:---|:---:|:---:|
| wid | Workflow Identifier | IETF | {{exec-claims}} |
| exec_act | Action/Task Type | IETF | {{exec-claims}} |
| par | Parent Task Identifiers | IETF | {{exec-claims}} |
| inp_hash | Input Data Hash | IETF | {{data-integrity-claims}} |
| out_hash | Output Data Hash | IETF | {{data-integrity-claims}} |
| ext | Extension Object | IETF | {{extension-claims}} |
{: #table-claims title="JWT Claims Registrations"}
--- back
# Use Cases {#use-cases}
{:numbered="false"}
This section describes a representative use case demonstrating how
ECTs provide structured execution records.
Note: task identifiers in this section are abbreviated for
readability. In production, all "jti" values are required to be
UUIDs per {{exec-claims}}.
## Cross-Organization Financial Trading
{:numbered="false"}
In a cross-organization trading workflow, an investment bank's
agents coordinate with an external credit rating agency. The
agents operate in separate trust domains with a federation
relationship. The DAG records that independent assessments from
both organizations were completed before trade execution.
~~~
Trust Domain: bank.example
Agent A1 (Portfolio Risk):
jti: task-001 par: []
iss: spiffe://bank.example/agent/risk
exec_act: analyze_portfolio_risk
Trust Domain: ratings.example (external)
Agent B1 (Credit Rating):
jti: task-002 par: []
iss: spiffe://ratings.example/agent/credit
exec_act: assess_credit_rating
Trust Domain: bank.example
Agent A2 (Compliance):
jti: task-003 par: [task-001, task-002]
iss: spiffe://bank.example/agent/compliance
exec_act: verify_trade_compliance
Agent A3 (Execution):
jti: task-004 par: [task-003]
iss: spiffe://bank.example/agent/execution
exec_act: execute_trade
~~~
{: #fig-finance title="Cross-Organization Trading Workflow"}
The resulting DAG:
~~~
task-001 (analyze_portfolio_risk) task-002 (assess_credit_rating)
[bank.example] [ratings.example]
\ /
v v
task-003 (verify_trade_compliance)
[bank.example]
|
v
task-004 (execute_trade)
[bank.example]
~~~
{: #fig-finance-dag title="Cross-Organization DAG"}
Task 003 has two parents from different trust domains,
demonstrating cross-organizational fan-in. The compliance agent
verifies both parent ECTs — one signed by a local key and one by
a federated key from the rating agency's trust domain.
# Related Work
{:numbered="false"}
## WIMSE Workload Identity
{:numbered="false"}
The WIMSE architecture {{I-D.ietf-wimse-arch}} and service-to-
service protocol {{I-D.ietf-wimse-s2s-protocol}} provide the
identity foundation upon which ECTs are built. WIT/WPT answer
"who is this agent?" and "does it control the claimed key?" while
ECTs record "what did this agent do?" Together they form an
identity-plus-accountability framework for regulated agentic
systems.
## OAuth 2.0 Token Exchange and the "act" Claim
{:numbered="false"}
{{RFC8693}} defines the OAuth 2.0 Token Exchange protocol and
registers the "act" (Actor) claim in the JWT Claims registry.
The "act" claim creates nested JSON objects representing a
delegation chain: "who is acting on behalf of whom." While
the nesting superficially resembles a chain, it is strictly
linear (each "act" object contains at most one nested "act"),
represents authorization delegation rather than task execution,
and carries no task identifiers or input/output integrity
data. The "act" chain cannot represent
branching (fan-out) or convergence (fan-in) and therefore
cannot form a DAG.
ECTs intentionally use the distinct claim name "exec_act" for the
action/task type to avoid collision with the "act" claim. The
two concepts are orthogonal: "act" records "who authorized whom,"
ECTs record "what was done, in what order."
## Transaction Tokens
{:numbered="false"}
OAuth Transaction Tokens {{I-D.ietf-oauth-transaction-tokens}}
propagate authorization context across workload call chains.
The Txn-Token "req_wl" claim accumulates a comma-separated list
of workloads that requested replacement tokens, which is the
closest existing mechanism to call-chain recording.
However, "req_wl" cannot form a DAG because:
- It is linear: a comma-separated string with no branching or
merging representation. When a workload fans out to multiple
downstream services, each receives the same "req_wl" value and
the branching is invisible.
- It is incomplete: only workloads that request a replacement
token from the Transaction Token Service appear in "req_wl";
workloads that forward the token unchanged are not recorded.
- It carries no task-level granularity, no parent references,
and no execution content.
- It cannot represent convergence (fan-in): when two independent
paths must both complete before a dependent task proceeds, a
linear "req_wl" string cannot express that relationship.
Extensions for agentic use cases
({{I-D.oauth-transaction-tokens-for-agents}}) add agent
identity and constraints ("agentic_ctx") but no execution
ordering or DAG structure.
ECTs and Transaction Tokens are complementary: a Txn-Token
propagates authorization context ("this request is authorized
for scope X on behalf of user Y"), while an ECT records
execution accountability ("task T was performed, depending on
tasks P1 and P2"). An
agent request could carry both a Txn-Token for authorization
and an ECT for execution recording. The WPT "tth" claim
defined in {{I-D.ietf-wimse-s2s-protocol}} can hash-bind a
WPT to a co-present Txn-Token; a similar binding mechanism
for ECTs is a potential future extension.
## Distributed Tracing (OpenTelemetry)
{:numbered="false"}
OpenTelemetry {{OPENTELEMETRY}} and similar distributed tracing
systems provide observability for debugging and monitoring. ECTs
differ in several important ways: ECTs are cryptographically
signed per-task with the agent's private key; ECTs are
tamper-evident through JWS signatures; ECTs enforce DAG validation
rules; and ECTs are designed for regulatory audit rather than
operational monitoring. OpenTelemetry data is typically controlled
by the platform operator and can be modified or deleted without
detection. ECTs and distributed traces are complementary: traces
provide observability while ECTs provide signed execution records.
ECTs may reference OpenTelemetry trace identifiers in the "ext"
claim for correlation.
## W3C Provenance Data Model (PROV)
{:numbered="false"}
The W3C PROV Data Model defines an Entity-Activity-Agent ontology
for representing provenance information. PROV's concepts map
closely to ECT structures: PROV Activities correspond to ECT
tasks, PROV Agents correspond to WIMSE workloads, and PROV's
"wasInformedBy" relation corresponds to ECT "par" references.
However, PROV uses RDF/OWL ontologies designed for post-hoc
documentation, while ECTs are runtime-embeddable JWT tokens with
cryptographic signatures. ECT audit data could be exported to
PROV format for interoperability with provenance-aware systems.
## SCITT (Supply Chain Integrity, Transparency, and Trust)
{:numbered="false"}
The SCITT architecture {{I-D.ietf-scitt-architecture}} defines a
framework for transparent and auditable supply chain records.
ECTs and SCITT are complementary: the ECT "wid" claim can serve
as a correlation identifier in SCITT Signed Statements, linking
an ECT audit trail to a supply chain transparency record.
# Acknowledgments
{:numbered="false"}
The author thanks the WIMSE working group for their foundational
work on workload identity in multi-system environments. The
concepts of Workload Identity Tokens and Workload Proof Tokens
provide the identity foundation upon which execution context
tracing is built.

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

1839
draft-nennemann-wimse-ect.md Normal file

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -7,7 +7,7 @@ Suggestions that could make the implementations more robust, spec-strict, or pro
## 1. **Spec alignment** ✅
- **ext size/depth (Section 4.2.7)**
**Done.** Both refimpls reject when serialized `ext` exceeds 4096 bytes or JSON depth exceeds 5 (`ValidateExt` / `validate_ext`). Used in create and verify.
**Done.** Both refimpls reject when serialized `ect_ext` exceeds 4096 bytes or JSON depth exceeds 5 (`ValidateExt` / `validate_ext`). Used in create and verify.
- **jti / wid format**
**Done.** Optional UUID (RFC 9562) validation: `CreateOptions.ValidateUUIDs` / `VerifyOptions.ValidateUUIDs` (Go), `validate_uuids` (Python). Helpers: `ValidUUID` / `valid_uuid`.
@@ -50,7 +50,7 @@ Suggestions that could make the implementations more robust, spec-strict, or pro
## 5. **Nice-to-have** ✅
- **inp_hash / out_hash format**
**Done.** Optional check in create and verify: `algorithm:base64url` with algorithm in allowlist (sha-256, sha-384, sha-512). Helpers: `ValidateHashFormat` / `validate_hash_format`.
**Done.** Optional check in create and verify: plain base64url without algorithm prefix, per -01 spec and RFC 9449. Helpers: `ValidateHashFormat` / `validate_hash_format`.
- **Constant-time comparison**
**Done.** **Go:** `crypto/subtle.ConstantTimeCompare` for `typ` in verify. **Python:** `hmac.compare_digest` for `typ`.
@@ -58,3 +58,18 @@ Suggestions that could make the implementations more robust, spec-strict, or pro
---
**Summary:** All listed improvements are implemented. For production, also consider: key rotation, WIT integration, and metrics around verify/create latency and error kinds.
---
## 6. **draft-01 migration** (PARTIALLY IMPLEMENTED)
The refimpl was built against draft-nennemann-wimse-ect-00. The -01 draft introduced breaking changes:
- **Rename `par` to `pred`**: ✅ **Done.** Struct fields, JSON tags, serialization/deserialization, tests, testdata, READMEs updated in both Go and Python.
- **Remove `pol` and `pol_decision`**: ✅ **Done.** Policy claims removed from core Payload. DAG policy checks now read from `ect_ext`. Tests and demos updated to use ext.
- **Remove `sub`**: ✅ **Done.** Removed from Payload struct (Go) and dataclass (Python). Create no longer defaults sub=iss.
- **Update `typ` default**: ✅ **Done.** `exec+jwt` is now preferred; `wimse-exec+jwt` accepted for backward compat. Verify checks both (constant-time).
- **Update `MaxParLength` naming**: ✅ **Done.** Renamed to `MaxPredLength` / `max_pred_length` everywhere.
- **Add L1 support**: The -01 draft introduces unsigned JSON ECTs (Level 1). The refimpl currently only supports L2 (signed JWS).
- **Add L3 support**: The -01 draft introduces audit ledger requirements for Level 3. The existing in-memory ledger needs hash chain and receipt support.
- **Update hash format**: ✅ **Done.** Both Go and Python validate plain base64url without algorithm prefix, consistent with -01 spec and RFC 9449.

View File

@@ -1,6 +1,18 @@
# WIMSE Execution Context Tokens — Reference Implementations
This directory contains **reference implementations** of [Execution Context Tokens (ECTs)](../draft-nennemann-wimse-execution-context-00.txt) for the WIMSE (Workload Identity in Multi System Environments) draft. Each refimpl provides ECT creation, verification, DAG validation, and an in-memory audit ledger.
> These reference implementations are aligned with **draft-nennemann-wimse-ect-01**.
>
> The following claim name changes from -00 have been applied:
>
> | -00 (previous) | -01 (current) | Notes |
> |----------------|---------------|-------|
> | `par` | `pred` | Predecessor task IDs |
> | `pol`, `pol_decision` | removed (use `ect_ext`) | Policy claims moved to extension object |
> | `sub` | not defined | Standard JWT claim, not part of ECT spec |
> | `typ: wimse-exec+jwt` | `typ: exec+jwt` (preferred) | Both accepted for backward compat |
> | `MaxParLength` | `MaxPredLength` | Renamed to match `pred` claim |
This directory contains **reference implementations** of Execution Context Tokens (ECTs) for the WIMSE (Workload Identity in Multi System Environments) draft. Each refimpl provides ECT creation, verification, DAG validation, and an in-memory audit ledger.
## Implementations
@@ -11,11 +23,11 @@ This directory contains **reference implementations** of [Execution Context Toke
## Scope (all refimpls)
- **ECT format**: JWT (JWS Compact Serialization) with required/optional claims per the spec (Section 4).
- **Creation**: Build and sign ECTs with ES256; `kid` and `typ: wimse-exec+jwt` in the JOSE header.
- **Verification**: Full Section 7 procedure (parse, typ/alg, key resolution, signature, claims, optional DAG).
- **DAG validation**: Section 6 (uniqueness, parent existence, temporal ordering, acyclicity, parent policy).
- **Ledger**: Interface plus in-memory append-only store (Section 9).
- **ECT format**: JWT (JWS Compact Serialization) with required/optional claims per the spec.
- **Creation**: Build and sign ECTs with ES256; `kid` and `typ` in the JOSE header.
- **Verification**: Full verification procedure (parse, typ/alg, key resolution, signature, claims, optional DAG).
- **DAG validation**: Uniqueness, predecessor existence, temporal ordering, acyclicity, predecessor policy.
- **Ledger**: Interface plus in-memory append-only store.
No WIT/WPT issuance or full WIMSE stack; refimpls use key resolution only. Suitable for conformance testing and as a template for production integrations.
@@ -41,8 +53,8 @@ python3 -m pytest tests/ -v
## Specification
- **Draft**: `draft-nennemann-wimse-execution-context-00`
- **Sections**: 4 (format), 5 (HTTP header), 6 (DAG), 7 (verification), 9 (ledger interface).
- **Current draft**: `draft-nennemann-wimse-ect-01`
- **Refimpl implements**: `-01` claim names
## License

View File

@@ -1,6 +1,6 @@
# WIMSE ECT — Go Reference Implementation
Go reference implementation of [Execution Context Tokens (ECTs)](../../draft-nennemann-wimse-execution-context-00.txt) for WIMSE. Implements ECT creation (ES256), verification (Section 7), DAG validation (Section 6), and an in-memory audit ledger (Section 9).
Go reference implementation of [Execution Context Tokens (ECTs)](../../draft-nennemann-wimse-execution-context-01.txt) for WIMSE. Implements ECT creation (ES256), verification (Section 7), DAG validation (Section 6), and an in-memory audit ledger (Section 9).
## Layout
@@ -43,9 +43,11 @@ payload := &ect.Payload{
Exp: time.Now().Add(10*time.Minute).Unix(),
Jti: "550e8400-e29b-41d4-a716-446655440000",
ExecAct: "review_spec",
Par: []string{},
Pol: "policy_v1",
PolDecision: ect.PolDecisionApproved,
Pred: []string{},
Ext: map[string]interface{}{
"pol": "policy_v1",
"pol_decision": "approved",
},
}
compact, err := ect.Create(payload, key, cfg.CreateOptions("agent-a-key"))
@@ -84,6 +86,16 @@ cd refimpl/go-lang && go test ./ect/... -cover
Unit tests are in `ect/*_test.go`. Coverage target: **~90%** (run `go test ./ect/... -coverprofile=cover.out && go tool cover -func=cover.out`). Remaining uncovered lines are mostly Parse/Verify error paths that require custom JWS or multi-sig tokens.
## draft-01 claim changes
| -00 (previous) | -01 (current) | Notes |
|----------------|---------------|-------|
| `par` | `pred` | Predecessor task IDs |
| `pol`, `pol_decision` | removed (use `ect_ext`) | Policy claims moved to extension object |
| `sub` | not defined | Standard JWT claim, not part of ECT spec |
| `typ: wimse-exec+jwt` | `typ: exec+jwt` (preferred) | Both accepted for backward compat |
| `MaxParLength` | `MaxPredLength` | Renamed to match `pred` claim |
## Production configuration (environment)
| Variable | Default | Description |
@@ -96,7 +108,7 @@ Unit tests are in `ect/*_test.go`. Coverage target: **~90%** (run `go test ./ect
### Replay cache (multi-instance)
`JTICache` is in-memory only. For multiple verifier instances (e.g. behind a load balancer), use a shared store (Redis, database) so every instance sees the same seen JTIs. Implement `JTISeen` as a function that checks (and optionally records) the JTI in that store (e.g. with TTL). Pass it in `VerifyOptions.JTISeen`. See refimpl/README for an overview.
`JTICache` is in-memory only. For multiple verifier instances (e.g. behind a load balancer), use a shared store (Redis, database) so every instance sees the same "seen" JTIs. Implement `JTISeen` as a function that checks (and optionally records) the JTI in that store (e.g. with TTL). Pass it in `VerifyOptions.JTISeen`. See refimpl/README for an overview.
## Dependencies

View File

@@ -34,9 +34,11 @@ func main() {
Jti: "550e8400-e29b-41d4-a716-446655440001",
Wid: "wf-demo-001",
ExecAct: "review_requirements_spec",
Par: []string{},
Pol: "spec_review_policy_v2",
PolDecision: ect.PolDecisionApproved,
Pred: []string{},
Ext: map[string]interface{}{
"pol": "spec_review_policy_v2",
"pol_decision": "approved",
},
}
ectA, err := ect.Create(payloadA, keyA, ect.CreateOptions{KeyID: kidA})
if err != nil {
@@ -69,7 +71,7 @@ func main() {
}
fmt.Println("Agent B verified root ECT and appended to ledger")
// 3) Agent B creates child ECT (par contains parent jti values per spec)
// 3) Agent B creates child ECT (pred contains predecessor jti values per spec)
keyB, _ := ect.GenerateKey()
kidB := "agent-b-key"
payloadB := &ect.Payload{
@@ -80,15 +82,17 @@ func main() {
Jti: "550e8400-e29b-41d4-a716-446655440002",
Wid: "wf-demo-001",
ExecAct: "implement_module",
Par: []string{"550e8400-e29b-41d4-a716-446655440001"},
Pol: "coding_standards_v3",
PolDecision: ect.PolDecisionApproved,
Pred: []string{"550e8400-e29b-41d4-a716-446655440001"},
Ext: map[string]interface{}{
"pol": "coding_standards_v3",
"pol_decision": "approved",
},
}
ectB, err := ect.Create(payloadB, keyB, ect.CreateOptions{KeyID: kidB})
if err != nil {
log.Fatal(err)
}
fmt.Println("Agent B created child ECT (jti=550e8400-...002, implement_module, par=[parent jti])")
fmt.Println("Agent B created child ECT (jti=550e8400-...002, implement_module, pred=[predecessor jti])")
// 4) Verify child ECT with DAG (ledger has task-001)
resolverB := ect.KeyResolver(func(kid string) (*ecdsa.PublicKey, error) {

View File

@@ -21,8 +21,8 @@ type CreateOptions struct {
DefaultExpiry time.Duration
// ValidateUUIDs when true requires jti and wid (if set) to be UUID format (RFC 9562).
ValidateUUIDs bool
// MaxParLength is the max number of parent references (0 = no limit; recommended 100).
MaxParLength int
// MaxPredLength is the max number of predecessor references (0 = no limit; recommended 100).
MaxPredLength int
}
// DefaultCreateOptions returns recommended defaults.
@@ -53,11 +53,8 @@ func Create(payload *Payload, privateKey *ecdsa.PrivateKey, opts CreateOptions)
}
payload.Exp = now.Add(opts.DefaultExpiry).Unix()
}
if payload.Sub == "" {
payload.Sub = payload.Iss
}
if payload.Par == nil {
payload.Par = []string{}
if payload.Pred == nil {
payload.Pred = []string{}
}
if err := validatePayloadForCreate(payload, opts); err != nil {
@@ -110,8 +107,8 @@ func validatePayloadForCreate(p *Payload, opts CreateOptions) error {
return ErrInvalidWID
}
}
if opts.MaxParLength > 0 && len(p.Par) > opts.MaxParLength {
return ErrParLength
if opts.MaxPredLength > 0 && len(p.Pred) > opts.MaxPredLength {
return ErrPredLength
}
if p.InpHash != "" {
if err := ValidateHashFormat(p.InpHash); err != nil {
@@ -126,15 +123,6 @@ func validatePayloadForCreate(p *Payload, opts CreateOptions) error {
if err := ValidateExt(p.Ext); err != nil {
return err
}
// pol/pol_decision are OPTIONAL; if either is set, both must be present and valid
if p.Pol != "" || p.PolDecision != "" {
if p.Pol == "" || p.PolDecision == "" {
return ErrPolPolDecisionPair
}
if !ValidPolDecision(p.PolDecision) {
return ErrInvalidPolDecision
}
}
// compensation_* live in ext per spec
if p.Ext != nil {
if _, hasReason := p.Ext["compensation_reason"]; hasReason {

View File

@@ -21,9 +21,7 @@ func TestCreateRoundtrip(t *testing.T) {
Exp: now.Add(10 * time.Minute).Unix(),
Jti: "e4f5a6b7-c8d9-0123-ef01-234567890abc",
ExecAct: "review_spec",
Par: []string{},
Pol: "spec_review_policy_v2",
PolDecision: PolDecisionApproved,
Pred: []string{},
}
compact, err := Create(payload, key, CreateOptions{KeyID: "agent-a-key-1"})
if err != nil {
@@ -68,7 +66,7 @@ func TestDefaultCreateOptions(t *testing.T) {
func TestCreate_Errors(t *testing.T) {
key, _ := GenerateKey()
payload := &Payload{Iss: "i", Aud: []string{"a"}, Jti: "j", ExecAct: "e", Par: []string{}, Pol: "p", PolDecision: PolDecisionApproved, Iat: 1, Exp: 2}
payload := &Payload{Iss: "i", Aud: []string{"a"}, Jti: "j", ExecAct: "e", Pred: []string{}, Iat: 1, Exp: 2}
if _, err := Create(nil, key, CreateOptions{KeyID: "k"}); err == nil {
t.Error("expected error for nil payload")
}
@@ -85,7 +83,7 @@ func TestCreate_OptionalPol(t *testing.T) {
now := time.Now()
payload := &Payload{
Iss: "iss", Aud: []string{"aud"}, Iat: now.Unix(), Exp: now.Add(time.Hour).Unix(),
Jti: "jti-nopol", ExecAct: "act", Par: []string{},
Jti: "jti-nopol", ExecAct: "act", Pred: []string{},
}
compact, err := Create(payload, key, CreateOptions{KeyID: "kid"})
if err != nil {
@@ -100,7 +98,7 @@ func TestCreate_ZeroExpiryUsesDefault(t *testing.T) {
key, _ := GenerateKey()
payload := &Payload{
Iss: "i", Aud: []string{"a"}, Iat: 0, Exp: 0,
Jti: "jti-z", ExecAct: "e", Par: []string{},
Jti: "jti-z", ExecAct: "e", Pred: []string{},
}
_, err := Create(payload, key, CreateOptions{KeyID: "kid", DefaultExpiry: 5 * time.Minute})
if err != nil {
@@ -115,7 +113,7 @@ func TestCreate_ExtCompensationReasonRequiresRequired(t *testing.T) {
key, _ := GenerateKey()
payload := &Payload{
Iss: "i", Aud: []string{"a"}, Iat: 1, Exp: 2,
Jti: "j", ExecAct: "e", Par: []string{},
Jti: "j", ExecAct: "e", Pred: []string{},
Ext: map[string]interface{}{"compensation_reason": "rollback", "compensation_required": false},
}
_, err := Create(payload, key, CreateOptions{KeyID: "k"})
@@ -130,12 +128,10 @@ func TestCreate_ValidationErrors(t *testing.T) {
name string
p *Payload
}{
{"missing iss", &Payload{Iss: "", Aud: []string{"a"}, Jti: "j", ExecAct: "e", Par: []string{}, Iat: 1, Exp: 2}},
{"missing aud", &Payload{Iss: "i", Aud: nil, Jti: "j", ExecAct: "e", Par: []string{}, Iat: 1, Exp: 2}},
{"missing jti", &Payload{Iss: "i", Aud: []string{"a"}, Jti: "", ExecAct: "e", Par: []string{}, Iat: 1, Exp: 2}},
{"missing exec_act", &Payload{Iss: "i", Aud: []string{"a"}, Jti: "j", ExecAct: "", Par: []string{}, Iat: 1, Exp: 2}},
{"pol without pol_decision", &Payload{Iss: "i", Aud: []string{"a"}, Jti: "j", ExecAct: "e", Par: []string{}, Pol: "p", PolDecision: "", Iat: 1, Exp: 2}},
{"invalid pol_decision", &Payload{Iss: "i", Aud: []string{"a"}, Jti: "j", ExecAct: "e", Par: []string{}, Pol: "p", PolDecision: "bad", Iat: 1, Exp: 2}},
{"missing iss", &Payload{Iss: "", Aud: []string{"a"}, Jti: "j", ExecAct: "e", Pred: []string{}, Iat: 1, Exp: 2}},
{"missing aud", &Payload{Iss: "i", Aud: nil, Jti: "j", ExecAct: "e", Pred: []string{}, Iat: 1, Exp: 2}},
{"missing jti", &Payload{Iss: "i", Aud: []string{"a"}, Jti: "", ExecAct: "e", Pred: []string{}, Iat: 1, Exp: 2}},
{"missing exec_act", &Payload{Iss: "i", Aud: []string{"a"}, Jti: "j", ExecAct: "", Pred: []string{}, Iat: 1, Exp: 2}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {

View File

@@ -24,7 +24,7 @@ type ECTStore interface {
type DAGConfig struct {
ClockSkewTolerance int // seconds; recommended 30
MaxAncestorLimit int // recommended 10000
MaxParLength int // max par length (0 = no limit; recommended 100)
MaxPredLength int // max pred length (0 = no limit; recommended 100)
}
// DefaultDAGConfig returns recommended defaults.
@@ -32,7 +32,7 @@ func DefaultDAGConfig() DAGConfig {
return DAGConfig{
ClockSkewTolerance: DefaultClockSkewTolerance,
MaxAncestorLimit: DefaultMaxAncestorLimit,
MaxParLength: DefaultMaxParLength,
MaxPredLength: DefaultMaxPredLength,
}
}
@@ -48,8 +48,8 @@ func ValidateDAG(ect *Payload, store ECTStore, cfg DAGConfig) error {
if cfg.MaxAncestorLimit <= 0 {
cfg.MaxAncestorLimit = DefaultMaxAncestorLimit
}
if cfg.MaxParLength > 0 && len(ect.Par) > cfg.MaxParLength {
return ErrParLength
if cfg.MaxPredLength > 0 && len(ect.Pred) > cfg.MaxPredLength {
return ErrPredLength
}
// 1. Task ID Uniqueness (task id = jti per spec)
@@ -57,31 +57,33 @@ func ValidateDAG(ect *Payload, store ECTStore, cfg DAGConfig) error {
return fmt.Errorf("ect: task ID (jti) already exists: %s", ect.Jti)
}
// 2. Parent Existence and 3. Temporal Ordering
for _, parentID := range ect.Par {
parent := store.GetByTid(parentID)
if parent == nil {
return fmt.Errorf("ect: parent task not found: %s", parentID)
// 2. Predecessor Existence and 3. Temporal Ordering
for _, predID := range ect.Pred {
pred := store.GetByTid(predID)
if pred == nil {
return fmt.Errorf("ect: predecessor task not found: %s", predID)
}
// parent.iat < child.iat + clock_skew_tolerance => parent.iat - ect.iat <= clock_skew_tolerance
if parent.Iat >= ect.Iat+int64(cfg.ClockSkewTolerance) {
return fmt.Errorf("ect: parent task not earlier than current: %s", parentID)
// pred.iat < child.iat + clock_skew_tolerance
if pred.Iat >= ect.Iat+int64(cfg.ClockSkewTolerance) {
return fmt.Errorf("ect: predecessor task not earlier than current: %s", predID)
}
}
// 4. Acyclicity (and depth limit)
visited := make(map[string]struct{})
if hasCycle(ect.Jti, ect.Par, store, visited, cfg.MaxAncestorLimit) {
if hasCycle(ect.Jti, ect.Pred, store, visited, cfg.MaxAncestorLimit) {
return errors.New("ect: circular dependency or depth limit exceeded")
}
// 5. Parent Policy Decision (only when parent has policy claims per spec)
for _, parentID := range ect.Par {
parent := store.GetByTid(parentID)
if parent != nil && parent.HasPolicyClaims() &&
(parent.PolDecision == PolDecisionRejected || parent.PolDecision == PolDecisionPendingHumanReview) {
// 5. Predecessor Policy Decision (only when predecessor has policy claims in ext per -01)
for _, predID := range ect.Pred {
pred := store.GetByTid(predID)
if pred != nil && pred.HasPolicyClaims() {
polDec := pred.PolDecision()
if polDec == "rejected" || polDec == "pending_human_review" {
if !ect.CompensationRequired() {
return errors.New("ect: parent has non-approved pol_decision; current ECT must be compensation/remediation or have ext.compensation_required true")
return errors.New("ect: predecessor has non-approved pol_decision; current ECT must be compensation/remediation or have ext.compensation_required true")
}
}
}
}
@@ -89,23 +91,23 @@ func ValidateDAG(ect *Payload, store ECTStore, cfg DAGConfig) error {
return nil
}
// hasCycle returns true if following par from the given parent IDs leads back to targetTid
// hasCycle returns true if following pred from the given predecessor IDs leads back to targetTid
// or if traversal exceeds maxDepth. visited is mutated.
func hasCycle(targetTid string, parentIDs []string, store ECTStore, visited map[string]struct{}, maxDepth int) bool {
func hasCycle(targetTid string, predIDs []string, store ECTStore, visited map[string]struct{}, maxDepth int) bool {
if len(visited) >= maxDepth {
return true
}
for _, parentID := range parentIDs {
if parentID == targetTid {
for _, predID := range predIDs {
if predID == targetTid {
return true
}
if _, ok := visited[parentID]; ok {
if _, ok := visited[predID]; ok {
continue
}
visited[parentID] = struct{}{}
parent := store.GetByTid(parentID)
if parent != nil {
if hasCycle(targetTid, parent.Par, store, visited, maxDepth) {
visited[predID] = struct{}{}
pred := store.GetByTid(predID)
if pred != nil {
if hasCycle(targetTid, pred.Pred, store, visited, maxDepth) {
return true
}
}

View File

@@ -10,8 +10,7 @@ func TestValidateDAG_Root(t *testing.T) {
payload := &Payload{
Jti: "jti-001",
Wid: "wf-1",
Par: []string{},
PolDecision: PolDecisionApproved,
Pred: []string{},
}
err := ValidateDAG(payload, store, DefaultDAGConfig())
if err != nil {
@@ -21,22 +20,21 @@ func TestValidateDAG_Root(t *testing.T) {
func TestValidateDAG_DuplicateJti(t *testing.T) {
store := NewMemoryLedger()
_, _ = store.Append("dummy-jws", &Payload{Jti: "jti-001", Wid: "wf-1", Par: []string{}, PolDecision: PolDecisionApproved})
payload := &Payload{Jti: "jti-001", Wid: "wf-1", Par: []string{}, PolDecision: PolDecisionApproved}
_, _ = store.Append("dummy-jws", &Payload{Jti: "jti-001", Wid: "wf-1", Pred: []string{}})
payload := &Payload{Jti: "jti-001", Wid: "wf-1", Pred: []string{}}
err := ValidateDAG(payload, store, DefaultDAGConfig())
if err == nil {
t.Fatal("expected error for duplicate jti")
}
}
func TestValidateDAG_ParentExists(t *testing.T) {
func TestValidateDAG_PredExists(t *testing.T) {
store := NewMemoryLedger()
_, _ = store.Append("jws1", &Payload{Jti: "jti-001", Wid: "wf-1", Par: []string{}, PolDecision: PolDecisionApproved, Iat: time.Now().Unix() - 60})
_, _ = store.Append("jws1", &Payload{Jti: "jti-001", Wid: "wf-1", Pred: []string{}, Iat: time.Now().Unix() - 60})
payload := &Payload{
Jti: "jti-002",
Wid: "wf-1",
Par: []string{"jti-001"},
PolDecision: PolDecisionApproved,
Pred: []string{"jti-001"},
Iat: time.Now().Unix(),
}
err := ValidateDAG(payload, store, DefaultDAGConfig())
@@ -45,17 +43,16 @@ func TestValidateDAG_ParentExists(t *testing.T) {
}
}
func TestValidateDAG_ParentNotFound(t *testing.T) {
func TestValidateDAG_PredNotFound(t *testing.T) {
store := NewMemoryLedger()
payload := &Payload{
Jti: "jti-002",
Par: []string{"jti-missing"},
PolDecision: PolDecisionApproved,
Pred: []string{"jti-missing"},
Iat: time.Now().Unix(),
}
err := ValidateDAG(payload, store, DefaultDAGConfig())
if err == nil {
t.Fatal("expected error when parent not found")
t.Fatal("expected error when predecessor not found")
}
}
@@ -63,10 +60,10 @@ func TestValidateDAG_DepthLimit(t *testing.T) {
store := NewMemoryLedger()
now := time.Now().Unix()
// Chain: jti-1 -> jti-2 -> jti-3 -> ...; validate with maxAncestorLimit=2 so we exceed it
_, _ = store.Append("jws1", &Payload{Jti: "jti-1", Wid: "wf", Par: []string{}, PolDecision: PolDecisionApproved, Iat: now - 100})
_, _ = store.Append("jws2", &Payload{Jti: "jti-2", Wid: "wf", Par: []string{"jti-1"}, PolDecision: PolDecisionApproved, Iat: now - 50})
_, _ = store.Append("jws1", &Payload{Jti: "jti-1", Wid: "wf", Pred: []string{}, Iat: now - 100})
_, _ = store.Append("jws2", &Payload{Jti: "jti-2", Wid: "wf", Pred: []string{"jti-1"}, Iat: now - 50})
cfg := DAGConfig{ClockSkewTolerance: DefaultClockSkewTolerance, MaxAncestorLimit: 2}
payload := &Payload{Jti: "jti-3", Wid: "wf", Par: []string{"jti-2"}, PolDecision: PolDecisionApproved, Iat: now}
payload := &Payload{Jti: "jti-3", Wid: "wf", Pred: []string{"jti-2"}, Iat: now}
err := ValidateDAG(payload, store, cfg)
if err == nil {
t.Fatal("expected error when ancestor limit exceeded")
@@ -74,7 +71,7 @@ func TestValidateDAG_DepthLimit(t *testing.T) {
}
func TestValidateDAG_StoreNil(t *testing.T) {
payload := &Payload{Jti: "j1", Par: []string{}, PolDecision: PolDecisionApproved, Iat: time.Now().Unix()}
payload := &Payload{Jti: "j1", Pred: []string{}, Iat: time.Now().Unix()}
err := ValidateDAG(payload, nil, DefaultDAGConfig())
if err == nil {
t.Fatal("expected error when store is nil")
@@ -84,53 +81,56 @@ func TestValidateDAG_StoreNil(t *testing.T) {
func TestValidateDAG_TemporalOrdering(t *testing.T) {
store := NewMemoryLedger()
now := time.Now().Unix()
_, _ = store.Append("jws1", &Payload{Jti: "jti-1", Wid: "wf", Par: []string{}, PolDecision: PolDecisionApproved, Iat: now})
// child has iat before parent + skew: parent.iat (now) >= child.iat (now+100) + 30 => invalid
payload := &Payload{Jti: "jti-2", Wid: "wf", Par: []string{"jti-1"}, PolDecision: PolDecisionApproved, Iat: now + 100}
_, _ = store.Append("jws1", &Payload{Jti: "jti-1", Wid: "wf", Pred: []string{}, Iat: now})
// child has iat after pred: valid
payload := &Payload{Jti: "jti-2", Wid: "wf", Pred: []string{"jti-1"}, Iat: now + 100}
err := ValidateDAG(payload, store, DAGConfig{ClockSkewTolerance: 30, MaxAncestorLimit: 10000})
if err != nil {
t.Fatal(err)
}
// parent.iat >= child.iat + skew: parent at now+50, child at now+10, skew 30 => 50 >= 40 => invalid
_, _ = store.Append("jws2", &Payload{Jti: "jti-1b", Wid: "wf", Par: []string{}, PolDecision: PolDecisionApproved, Iat: now + 50})
payload2 := &Payload{Jti: "jti-2b", Wid: "wf", Par: []string{"jti-1b"}, PolDecision: PolDecisionApproved, Iat: now + 10}
// pred.iat >= child.iat + skew: pred at now+50, child at now+10, skew 30 => 50 >= 40 => invalid
_, _ = store.Append("jws2", &Payload{Jti: "jti-1b", Wid: "wf", Pred: []string{}, Iat: now + 50})
payload2 := &Payload{Jti: "jti-2b", Wid: "wf", Pred: []string{"jti-1b"}, Iat: now + 10}
err = ValidateDAG(payload2, store, DAGConfig{ClockSkewTolerance: 30, MaxAncestorLimit: 10000})
if err == nil {
t.Fatal("expected error when parent not earlier than child")
t.Fatal("expected error when predecessor not earlier than child")
}
}
func TestValidateDAG_DirectCycle(t *testing.T) {
// par contains own jti (direct self-reference) -> parent not found
// pred contains own jti (direct self-reference) -> predecessor not found
store := NewMemoryLedger()
now := time.Now().Unix()
payload := &Payload{Jti: "jti-self", Wid: "wf", Par: []string{"jti-self"}, PolDecision: PolDecisionApproved, Iat: now}
payload := &Payload{Jti: "jti-self", Wid: "wf", Pred: []string{"jti-self"}, Iat: now}
err := ValidateDAG(payload, store, DefaultDAGConfig())
if err == nil {
t.Fatal("expected error for direct cycle (par contains self)")
t.Fatal("expected error for direct cycle (pred contains self)")
}
}
func TestValidateDAG_hasCycle_visitedContinue(t *testing.T) {
// par has duplicate parent ID so we hit "if _, ok := visited[parentID]; ok { continue }"
// pred has duplicate predecessor ID so we hit "if _, ok := visited[predID]; ok { continue }"
store := NewMemoryLedger()
now := time.Now().Unix()
_, _ = store.Append("jws1", &Payload{Jti: "jti-a", Wid: "wf", Par: []string{}, PolDecision: PolDecisionApproved, Iat: now - 10})
payload := &Payload{Jti: "jti-b", Wid: "wf", Par: []string{"jti-a", "jti-a"}, PolDecision: PolDecisionApproved, Iat: now}
_, _ = store.Append("jws1", &Payload{Jti: "jti-a", Wid: "wf", Pred: []string{}, Iat: now - 10})
payload := &Payload{Jti: "jti-b", Wid: "wf", Pred: []string{"jti-a", "jti-a"}, Iat: now}
err := ValidateDAG(payload, store, DefaultDAGConfig())
if err != nil {
t.Fatal(err)
}
}
func TestValidateDAG_ParentPolicyRejected_RequiresCompensation(t *testing.T) {
func TestValidateDAG_PredPolicyRejected_RequiresCompensation(t *testing.T) {
store := NewMemoryLedger()
now := time.Now().Unix()
_, _ = store.Append("jws1", &Payload{Jti: "jti-rej", Wid: "wf", Par: []string{}, Pol: "p", PolDecision: PolDecisionRejected, Iat: now - 60})
payload := &Payload{Jti: "jti-child", Wid: "wf", Par: []string{"jti-rej"}, PolDecision: PolDecisionApproved, Iat: now}
_, _ = store.Append("jws1", &Payload{
Jti: "jti-rej", Wid: "wf", Pred: []string{}, Iat: now - 60,
Ext: map[string]interface{}{"pol": "p", "pol_decision": "rejected"},
})
payload := &Payload{Jti: "jti-child", Wid: "wf", Pred: []string{"jti-rej"}, Iat: now}
err := ValidateDAG(payload, store, DefaultDAGConfig())
if err == nil {
t.Fatal("expected error when parent rejected and no compensation")
t.Fatal("expected error when predecessor rejected and no compensation")
}
payload.Ext = map[string]interface{}{"compensation_required": true}
err = ValidateDAG(payload, store, DefaultDAGConfig())

View File

@@ -15,15 +15,13 @@ var (
ErrExpired = errors.New("ect: token expired")
ErrIATTooOld = errors.New("ect: iat too far in the past")
ErrIATInFuture = errors.New("ect: iat in the future")
ErrMissingClaims = errors.New("ect: missing required claims (jti, exec_act, par)")
ErrPolPolDecisionPair = errors.New("ect: pol and pol_decision must both be present when either is set")
ErrInvalidPolDecision = errors.New("ect: invalid pol_decision value")
ErrMissingClaims = errors.New("ect: missing required claims (jti, exec_act, pred)")
ErrReplay = errors.New("ect: jti already seen (replay)")
ErrResolveKeyRequired = errors.New("ect: ResolveKey required")
ErrExtSize = errors.New("ect: ext exceeds max size (4096 bytes)")
ErrExtDepth = errors.New("ect: ext exceeds max nesting depth (5)")
ErrInvalidJTI = errors.New("ect: jti must be UUID format")
ErrInvalidWID = errors.New("ect: wid must be UUID format when set")
ErrParLength = errors.New("ect: par exceeds max length")
ErrHashFormat = errors.New("ect: inp_hash/out_hash must be algorithm:base64url (e.g. sha-256:...)")
ErrPredLength = errors.New("ect: pred exceeds max length")
ErrHashFormat = errors.New("ect: inp_hash/out_hash must be plain base64url (no prefix)")
)

View File

@@ -12,7 +12,7 @@ type LedgerEntry struct {
TaskID string `json:"task_id"`
AgentID string `json:"agent_id"`
Action string `json:"action"`
Parents []string `json:"parents"`
Predecessors []string `json:"predecessors"`
ECTJWS string `json:"ect_jws"`
SignatureVerified bool `json:"signature_verified"`
VerificationTime time.Time `json:"verification_timestamp"`
@@ -66,7 +66,7 @@ func (m *MemoryLedger) Append(ectJWS string, payload *Payload) (int64, error) {
TaskID: payload.Jti, // task id = jti per spec
AgentID: payload.Iss,
Action: payload.ExecAct,
Parents: append([]string(nil), payload.Par...),
Predecessors: append([]string(nil), payload.Pred...),
ECTJWS: ectJWS,
SignatureVerified: true,
VerificationTime: time.Now().UTC(),

View File

@@ -7,7 +7,7 @@ import (
func TestMemoryLedger_AppendAndGet(t *testing.T) {
m := NewMemoryLedger()
p := &Payload{Jti: "jti-1", Iss: "iss", ExecAct: "act", Par: []string{}, Iat: time.Now().Unix(), Exp: time.Now().Add(time.Hour).Unix()}
p := &Payload{Jti: "jti-1", Iss: "iss", ExecAct: "act", Pred: []string{}, Iat: time.Now().Unix(), Exp: time.Now().Add(time.Hour).Unix()}
seq, err := m.Append("jws1", p)
if err != nil {
t.Fatal(err)
@@ -23,7 +23,7 @@ func TestMemoryLedger_AppendAndGet(t *testing.T) {
func TestMemoryLedger_ErrTaskIDExists(t *testing.T) {
m := NewMemoryLedger()
p := &Payload{Jti: "jti-dup", Iss: "i", ExecAct: "e", Par: []string{}, Iat: 1, Exp: 2}
p := &Payload{Jti: "jti-dup", Iss: "i", ExecAct: "e", Pred: []string{}, Iat: 1, Exp: 2}
_, _ = m.Append("jws1", p)
_, err := m.Append("jws2", p)
if err != ErrTaskIDExists {
@@ -33,7 +33,7 @@ func TestMemoryLedger_ErrTaskIDExists(t *testing.T) {
func TestMemoryLedger_ContainsWid(t *testing.T) {
m := NewMemoryLedger()
p := &Payload{Jti: "j1", Wid: "wf1", Iss: "i", ExecAct: "e", Par: []string{}, Iat: 1, Exp: 2}
p := &Payload{Jti: "j1", Wid: "wf1", Iss: "i", ExecAct: "e", Pred: []string{}, Iat: 1, Exp: 2}
_, _ = m.Append("jws", p)
if !m.Contains("j1", "") {
t.Error("Contains(j1, \"\") should be true")

View File

@@ -1,24 +1,20 @@
// Package ect implements Execution Context Tokens (ECTs) per
// draft-nennemann-wimse-execution-context-00.
// draft-nennemann-wimse-execution-context-01.
package ect
import "time"
// ECTType is the JOSE typ value for ECTs.
const ECTType = "wimse-exec+jwt"
// PolDecision values per Section 4.2.3.
// ECTType is the preferred JOSE typ value for ECTs per -01.
// ECTTypeLegacy is accepted for backward compatibility with -00.
const (
PolDecisionApproved = "approved"
PolDecisionRejected = "rejected"
PolDecisionPendingHumanReview = "pending_human_review"
ECTType = "exec+jwt"
ECTTypeLegacy = "wimse-exec+jwt"
)
// Payload holds ECT JWT claims per Section 4.2.
type Payload struct {
// Standard JWT claims (required unless noted)
Iss string `json:"iss"` // REQUIRED: issuer, SPIFFE ID
Sub string `json:"sub,omitempty"`
Aud Audience `json:"aud"` // REQUIRED
Iat int64 `json:"iat"` // REQUIRED: NumericDate
Exp int64 `json:"exp"` // REQUIRED
@@ -28,13 +24,7 @@ type Payload struct {
// Task identity is jti only; no separate "tid" claim per spec.
Wid string `json:"wid,omitempty"` // OPTIONAL: workflow ID, UUID
ExecAct string `json:"exec_act"` // REQUIRED
Par []string `json:"par"` // REQUIRED: parent jti values
// Policy evaluation (Section 4.2.3 / policy-claims) — OPTIONAL
Pol string `json:"pol,omitempty"`
PolDecision string `json:"pol_decision,omitempty"`
PolEnforcer string `json:"pol_enforcer,omitempty"`
PolTimestamp int64 `json:"pol_timestamp,omitempty"`
Pred []string `json:"pred"` // REQUIRED: predecessor jti values (renamed from par in -01)
// Data integrity (Section 4.2.4)
InpHash string `json:"inp_hash,omitempty"`
@@ -42,8 +32,9 @@ type Payload struct {
InpClassification string `json:"inp_classification,omitempty"`
// Extensions (Section 4.2.7): exec_time_ms, regulated_domain, model_version,
// witnessed_by, inp_classification, pol_timestamp, compensation_required, compensation_reason
Ext map[string]interface{} `json:"ext,omitempty"`
// witnessed_by, inp_classification, compensation_required, compensation_reason,
// and domain-specific claims like pol, pol_decision (moved from core in -01).
Ext map[string]interface{} `json:"ect_ext,omitempty"`
}
// Audience is aud claim: string or array of strings.
@@ -62,11 +53,6 @@ func (a *Audience) UnmarshalJSON(data []byte) error {
return unmarshalAudience(data, a)
}
// ValidPolDecision returns true if s is a registered pol_decision value.
func ValidPolDecision(s string) bool {
return s == PolDecisionApproved || s == PolDecisionRejected || s == PolDecisionPendingHumanReview
}
// ContainsAudience returns true if verifierID is in the audience.
func (p *Payload) ContainsAudience(verifierID string) bool {
for _, id := range p.Aud {
@@ -96,7 +82,21 @@ func (p *Payload) CompensationRequired() bool {
return v
}
// HasPolicyClaims returns true if both pol and pol_decision are present (optional pair per spec).
// HasPolicyClaims returns true if both pol and pol_decision are present in ext (per -01, moved to extension).
func (p *Payload) HasPolicyClaims() bool {
return p.Pol != "" && p.PolDecision != ""
if p.Ext == nil {
return false
}
pol, _ := p.Ext["pol"].(string)
polDec, _ := p.Ext["pol_decision"].(string)
return pol != "" && polDec != ""
}
// PolDecision returns the pol_decision value from ext, or empty string.
func (p *Payload) PolDecision() string {
if p.Ext == nil {
return ""
}
v, _ := p.Ext["pol_decision"].(string)
return v
}

View File

@@ -59,18 +59,6 @@ func TestAudience_UnmarshalJSON_invalid(t *testing.T) {
}
}
func TestValidPolDecision(t *testing.T) {
if !ValidPolDecision(PolDecisionApproved) {
t.Error("approved should be valid")
}
if !ValidPolDecision(PolDecisionRejected) {
t.Error("rejected should be valid")
}
if ValidPolDecision("invalid") {
t.Error("invalid should not be valid")
}
}
func TestPayload_ContainsAudience(t *testing.T) {
p := &Payload{Aud: []string{"a", "b"}}
if !p.ContainsAudience("a") {
@@ -104,12 +92,27 @@ func TestPayload_CompensationRequired(t *testing.T) {
}
func TestPayload_HasPolicyClaims(t *testing.T) {
p := &Payload{Pol: "p", PolDecision: PolDecisionApproved}
p := &Payload{Ext: map[string]interface{}{"pol": "p", "pol_decision": "approved"}}
if !p.HasPolicyClaims() {
t.Error("both pol and pol_decision set should have policy claims")
t.Error("both pol and pol_decision in ext should have policy claims")
}
p.Pol = ""
p.Ext = map[string]interface{}{"pol_decision": "approved"}
if p.HasPolicyClaims() {
t.Error("missing pol should not have policy claims")
t.Error("missing pol in ext should not have policy claims")
}
p.Ext = nil
if p.HasPolicyClaims() {
t.Error("nil ext should not have policy claims")
}
}
func TestPayload_PolDecision(t *testing.T) {
p := &Payload{Ext: map[string]interface{}{"pol_decision": "rejected"}}
if p.PolDecision() != "rejected" {
t.Errorf("expected rejected, got %q", p.PolDecision())
}
p.Ext = nil
if p.PolDecision() != "" {
t.Errorf("expected empty for nil ext, got %q", p.PolDecision())
}
}

View File

@@ -4,7 +4,6 @@ import (
"encoding/base64"
"encoding/json"
"regexp"
"strings"
)
// ExtMaxSize is the recommended max serialized size of ext (Section 4.2.7).
@@ -13,14 +12,14 @@ const ExtMaxSize = 4096
// ExtMaxDepth is the recommended max JSON nesting depth in ext.
const ExtMaxDepth = 5
// DefaultMaxParLength is the recommended max number of parent references.
const DefaultMaxParLength = 100
// DefaultMaxPredLength is the recommended max number of predecessor references.
const DefaultMaxPredLength = 100
// uuidRegex matches RFC 9562 UUID: 8-4-4-4-12 hex.
var uuidRegex = regexp.MustCompile(`^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$`)
// allowedHashAlgs are the spec-recommended hash algorithm prefixes for inp_hash/out_hash.
var allowedHashAlgs = map[string]bool{"sha-256": true, "sha-384": true, "sha-512": true}
// base64urlRegex matches a non-empty base64url string without padding.
var base64urlRegex = regexp.MustCompile(`^[A-Za-z0-9_-]+$`)
// ValidateExt returns an error if ext exceeds ExtMaxSize or ExtMaxDepth.
func ValidateExt(ext map[string]interface{}) error {
@@ -61,23 +60,18 @@ func ValidUUID(s string) bool {
return uuidRegex.MatchString(s)
}
// ValidateHashFormat returns nil if s is empty or matches "algorithm:base64url" (sha-256, sha-384, sha-512).
// ValidateHashFormat returns nil if s is empty or is plain base64url (no padding)
// per draft-nennemann-wimse-ect-01 and RFC 9449 (no algorithm prefix).
func ValidateHashFormat(s string) error {
if s == "" {
return nil
}
idx := strings.Index(s, ":")
if idx <= 0 {
if !base64urlRegex.MatchString(s) {
return ErrHashFormat
}
alg := strings.ToLower(s[:idx])
if !allowedHashAlgs[alg] {
_, err := base64.RawURLEncoding.DecodeString(s)
if err != nil {
return ErrHashFormat
}
encoded := s[idx+1:]
if encoded == "" {
return ErrHashFormat
}
_, err := base64.RawURLEncoding.DecodeString(encoded)
return err
return nil
}

View File

@@ -44,8 +44,8 @@ type VerifyOptions struct {
WITSubject string
// ValidateUUIDs when true requires jti and wid (if set) to be UUID format.
ValidateUUIDs bool
// MaxParLength caps par length (0 = no limit). Applied before DAG; DAG may also enforce via DAG.MaxParLength.
MaxParLength int
// MaxPredLength caps pred length (0 = no limit). Applied before DAG; DAG may also enforce via DAG.MaxPredLength.
MaxPredLength int
// LogVerify if set is called after verification with jti and any error (for observability).
LogVerify func(jti string, err error)
}
@@ -104,12 +104,13 @@ func Verify(compact string, opts VerifyOptions) (parsed *ParsedECT, err error) {
sig := jws.Signatures[0]
header := &sig.Header
// 2. typ must be wimse-exec+jwt (constant-time compare)
// 2. typ must be exec+jwt (preferred) or wimse-exec+jwt (legacy); constant-time compare
typ, _ := header.ExtraHeaders["typ"].(string)
if typ == "" {
typ, _ = header.ExtraHeaders[jose.HeaderType].(string)
}
if subtle.ConstantTimeCompare([]byte(typ), []byte(ECTType)) != 1 {
if subtle.ConstantTimeCompare([]byte(typ), []byte(ECTType)) != 1 &&
subtle.ConstantTimeCompare([]byte(typ), []byte(ECTTypeLegacy)) != 1 {
return nil, ErrInvalidTyp
}
@@ -182,15 +183,15 @@ func Verify(compact string, opts VerifyOptions) (parsed *ParsedECT, err error) {
return nil, ErrIATInFuture
}
// 12. Required claims present (jti, exec_act, par)
// 12. Required claims present (jti, exec_act, pred)
if p.Jti == "" || p.ExecAct == "" {
return nil, ErrMissingClaims
}
if p.Par == nil {
p.Par = []string{}
if p.Pred == nil {
p.Pred = []string{}
}
if opts.MaxParLength > 0 && len(p.Par) > opts.MaxParLength {
return nil, ErrParLength
if opts.MaxPredLength > 0 && len(p.Pred) > opts.MaxPredLength {
return nil, ErrPredLength
}
if opts.ValidateUUIDs {
if !ValidUUID(p.Jti) {
@@ -211,24 +212,14 @@ func Verify(compact string, opts VerifyOptions) (parsed *ParsedECT, err error) {
}
}
// 13. If pol or pol_decision present, both must be present and pol_decision in registry
if p.Pol != "" || p.PolDecision != "" {
if p.Pol == "" || p.PolDecision == "" {
return nil, ErrPolPolDecisionPair
}
if !ValidPolDecision(p.PolDecision) {
return nil, ErrInvalidPolDecision
}
}
// 14. DAG validation
// 13. DAG validation
if opts.Store != nil {
if err := ValidateDAG(&p, opts.Store, opts.DAG); err != nil {
return nil, err
}
}
// 15. Replay (jti seen)
// 14. Replay (jti seen)
if opts.JTISeen != nil && opts.JTISeen(p.Jti) {
return nil, ErrReplay
}

View File

@@ -17,9 +17,7 @@ func TestParse(t *testing.T) {
Exp: now.Add(time.Hour).Unix(),
Jti: "jti-parse",
ExecAct: "act",
Par: []string{},
Pol: "pol",
PolDecision: PolDecisionApproved,
Pred: []string{},
}
compact, err := Create(payload, key, CreateOptions{KeyID: "kid"})
if err != nil {
@@ -56,9 +54,7 @@ func TestVerify_Expired(t *testing.T) {
Exp: now.Add(-1 * time.Minute).Unix(),
Jti: "jti-exp",
ExecAct: "act",
Par: []string{},
Pol: "pol",
PolDecision: PolDecisionApproved,
Pred: []string{},
}
compact, _ := Create(payload, key, CreateOptions{KeyID: "kid"})
resolver := func(kid string) (*ecdsa.PublicKey, error) {
@@ -87,9 +83,7 @@ func TestVerify_Replay(t *testing.T) {
Exp: now.Add(time.Hour).Unix(),
Jti: "jti-replay",
ExecAct: "act",
Par: []string{},
Pol: "p",
PolDecision: PolDecisionApproved,
Pred: []string{},
}
compact, _ := Create(payload, key, CreateOptions{KeyID: "kid"})
resolver := func(kid string) (*ecdsa.PublicKey, error) {
@@ -125,7 +119,7 @@ func TestVerify_WITSubjectMismatch(t *testing.T) {
now := time.Now()
payload := &Payload{
Iss: "iss", Aud: []string{"v"}, Iat: now.Unix(), Exp: now.Add(time.Hour).Unix(),
Jti: "jti-wit", ExecAct: "act", Par: []string{}, Pol: "p", PolDecision: PolDecisionApproved,
Jti: "jti-wit", ExecAct: "act", Pred: []string{},
}
compact, _ := Create(payload, key, CreateOptions{KeyID: "kid"})
resolver := func(kid string) (*ecdsa.PublicKey, error) {
@@ -147,7 +141,7 @@ func TestVerify_IATTooFarPast(t *testing.T) {
now := time.Now()
payload := &Payload{
Iss: "iss", Aud: []string{"v"}, Iat: now.Add(-1 * time.Hour).Unix(), Exp: now.Add(time.Hour).Unix(),
Jti: "jti-iat", ExecAct: "act", Par: []string{}, Pol: "p", PolDecision: PolDecisionApproved,
Jti: "jti-iat", ExecAct: "act", Pred: []string{},
}
compact, _ := Create(payload, key, CreateOptions{KeyID: "kid"})
resolver := func(kid string) (*ecdsa.PublicKey, error) {
@@ -169,7 +163,7 @@ func TestVerify_IATInFuture(t *testing.T) {
now := time.Now()
payload := &Payload{
Iss: "iss", Aud: []string{"v"}, Iat: now.Add(60 * time.Second).Unix(), Exp: now.Add(2 * time.Hour).Unix(),
Jti: "jti-fut", ExecAct: "act", Par: []string{}, Pol: "p", PolDecision: PolDecisionApproved,
Jti: "jti-fut", ExecAct: "act", Pred: []string{},
}
compact, _ := Create(payload, key, CreateOptions{KeyID: "kid"})
resolver := func(kid string) (*ecdsa.PublicKey, error) {
@@ -191,7 +185,7 @@ func TestVerify_ResolveKeyError(t *testing.T) {
now := time.Now()
payload := &Payload{
Iss: "iss", Aud: []string{"v"}, Iat: now.Unix(), Exp: now.Add(time.Hour).Unix(),
Jti: "jti-err", ExecAct: "act", Par: []string{}, Pol: "p", PolDecision: PolDecisionApproved,
Jti: "jti-err", ExecAct: "act", Pred: []string{},
}
compact, _ := Create(payload, key, CreateOptions{KeyID: "kid"})
resolver := func(kid string) (*ecdsa.PublicKey, error) {
@@ -211,7 +205,7 @@ func TestVerify_WithDAG(t *testing.T) {
now := time.Now()
root := &Payload{
Iss: "iss", Aud: []string{"v"}, Iat: now.Unix(), Exp: now.Add(time.Hour).Unix(),
Jti: "jti-root", ExecAct: "act", Par: []string{}, Pol: "p", PolDecision: PolDecisionApproved,
Jti: "jti-root", ExecAct: "act", Pred: []string{},
}
compactRoot, _ := Create(root, key, CreateOptions{KeyID: "kid"})
resolver := func(kid string) (*ecdsa.PublicKey, error) {
@@ -230,7 +224,7 @@ func TestVerify_WithDAG(t *testing.T) {
_, _ = ledger.Append(compactRoot, parsed.Payload)
child := &Payload{
Iss: "iss", Aud: []string{"v"}, Iat: now.Unix() + 1, Exp: now.Add(time.Hour).Unix(),
Jti: "jti-child", ExecAct: "act2", Par: []string{"jti-root"}, Pol: "p", PolDecision: PolDecisionApproved,
Jti: "jti-child", ExecAct: "act2", Pred: []string{"jti-root"},
}
compactChild, _ := Create(child, key, CreateOptions{KeyID: "kid"})
parsed2, err := Verify(compactChild, opts)

View File

@@ -1 +1 @@
{"iss":"spiffe://example.com/agent/clinical","sub":"spiffe://example.com/agent/clinical","aud":"spiffe://example.com/agent/safety","iat":1772064150,"exp":1772064750,"jti":"7f3a8b2c-d1e4-4f56-9a0b-c3d4e5f6a7b8","wid":"a0b1c2d3-e4f5-6789-abcd-ef0123456789","exec_act":"recommend_treatment","par":[],"pol":"clinical_reasoning_policy_v2","pol_decision":"approved"}
{"iss":"spiffe://example.com/agent/clinical","aud":"spiffe://example.com/agent/safety","iat":1772064150,"exp":1772064750,"jti":"7f3a8b2c-d1e4-4f56-9a0b-c3d4e5f6a7b8","wid":"a0b1c2d3-e4f5-6789-abcd-ef0123456789","exec_act":"recommend_treatment","pred":[],"ect_ext":{"pol":"clinical_reasoning_policy_v2","pol_decision":"approved"}}

View File

@@ -1,6 +1,6 @@
# WIMSE ECT — Python Reference Implementation
Python reference implementation of [Execution Context Tokens (ECTs)](../../draft-nennemann-wimse-execution-context-00.txt) for WIMSE. Implements ECT creation (ES256), verification (Section 7), DAG validation (Section 6), and an in-memory audit ledger (Section 9).
Python reference implementation of [Execution Context Tokens (ECTs)](../../draft-nennemann-wimse-execution-context-01.txt) for WIMSE. Implements ECT creation (ES256), verification (Section 7), DAG validation (Section 6), and an in-memory audit ledger (Section 9).
## Layout
@@ -42,7 +42,6 @@ from ect import (
verify,
VerifyOptions,
MemoryLedger,
POL_DECISION_APPROVED,
)
cfg = load_config_from_env()
@@ -54,9 +53,11 @@ payload = Payload(
exp=int(time.time()) + 600,
jti="550e8400-e29b-41d4-a716-446655440000",
exec_act="review_spec",
par=[],
pol="policy_v1",
pol_decision=POL_DECISION_APPROVED,
pred=[],
ext={
"pol": "policy_v1",
"pol_decision": "approved",
},
)
compact = create(payload, key, cfg.create_options("agent-a-key"))
@@ -83,6 +84,16 @@ cd refimpl/python && python3 -m pytest tests/ -v
Unit tests require **90% coverage** minimum (`pytest` is configured with `--cov-fail-under=90` in `pyproject.toml`). Install dev deps: `pip install -e ".[dev]"`. Uncovered lines are mainly abstract base methods and a few verify branches that need manually built tokens.
## draft-01 claim changes
| -00 (previous) | -01 (current) | Notes |
|----------------|---------------|-------|
| `par` | `pred` | Predecessor task IDs |
| `pol`, `pol_decision` | removed (use `ect_ext`) | Policy claims moved to extension object |
| `sub` | not defined | Standard JWT claim, not part of ECT spec |
| `typ: wimse-exec+jwt` | `typ: exec+jwt` (preferred) | Both accepted for backward compat |
| `max_par_length` | `max_pred_length` | Renamed to match `pred` claim |
## Production configuration (environment)
Same env vars as the Go refimpl: `ECT_IAT_MAX_AGE_MINUTES`, `ECT_IAT_MAX_FUTURE_SEC`, `ECT_DEFAULT_EXPIRY_MIN`, `ECT_JTI_REPLAY_CACHE_SIZE`, `ECT_JTI_REPLAY_TTL_MIN`.

View File

@@ -11,7 +11,6 @@ from ect import (
verify,
VerifyOptions,
MemoryLedger,
POL_DECISION_APPROVED,
)
def main():
@@ -33,9 +32,11 @@ def main():
jti=root_jti,
wid="wf-demo-001",
exec_act="review_requirements_spec",
par=[],
pol="spec_review_policy_v2",
pol_decision=POL_DECISION_APPROVED,
pred=[],
ext={
"pol": "spec_review_policy_v2",
"pol_decision": "approved",
},
)
ect_a = create(payload_a, key_a, CreateOptions(key_id=kid_a))
print("Agent A created root ECT (jti=550e8400-..., review_requirements_spec)")
@@ -56,7 +57,7 @@ def main():
ledger.append(ect_a, parsed.payload)
print("Agent B verified root ECT and appended to ledger")
# 3) Agent B creates child ECT (par contains parent jti values per spec)
# 3) Agent B creates child ECT (pred contains predecessor jti values per spec)
key_b = generate_key()
kid_b = "agent-b-key"
child_jti = "550e8400-e29b-41d4-a716-446655440002"
@@ -68,12 +69,14 @@ def main():
jti=child_jti,
wid="wf-demo-001",
exec_act="implement_module",
par=[root_jti],
pol="coding_standards_v3",
pol_decision=POL_DECISION_APPROVED,
pred=[root_jti],
ext={
"pol": "coding_standards_v3",
"pol_decision": "approved",
},
)
ect_b = create(payload_b, key_b, CreateOptions(key_id=kid_b))
print("Agent B created child ECT (jti=550e8400-...002, implement_module, par=[parent jti])")
print("Agent B created child ECT (jti=550e8400-...002, implement_module, pred=[predecessor jti])")
# 4) Verify child ECT with DAG
def resolver_b(kid):

View File

@@ -1,13 +1,10 @@
# WIMSE Execution Context Tokens (ECT) — Python reference implementation
# draft-nennemann-wimse-execution-context-00
# draft-nennemann-wimse-execution-context-01
from ect.types import (
ECT_TYPE,
POL_DECISION_APPROVED,
POL_DECISION_REJECTED,
POL_DECISION_PENDING_HUMAN_REVIEW,
ECT_TYPE_LEGACY,
Payload,
valid_pol_decision,
)
from ect.create import create, generate_key, CreateOptions, default_create_options
from ect.verify import (
@@ -30,11 +27,8 @@ from ect.jti_cache import JTICache, new_jti_cache
__all__ = [
"ECT_TYPE",
"POL_DECISION_APPROVED",
"POL_DECISION_REJECTED",
"POL_DECISION_PENDING_HUMAN_REVIEW",
"ECT_TYPE_LEGACY",
"Payload",
"valid_pol_decision",
"create",
"generate_key",
"CreateOptions",

View File

@@ -10,9 +10,9 @@ from typing import Optional
import jwt
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePrivateKey
from ect.types import Payload, valid_pol_decision
from ect.types import ECT_TYPE, Payload
from ect.validate import (
DEFAULT_MAX_PAR_LENGTH,
DEFAULT_MAX_PRED_LENGTH,
validate_ext,
validate_hash_format,
valid_uuid,
@@ -25,7 +25,7 @@ class CreateOptions:
iat_max_age_sec: int = 900 # 15 min
default_expiry_sec: int = 600 # 10 min
validate_uuids: bool = False
max_par_length: int = 0 # 0 = no limit; use DEFAULT_MAX_PAR_LENGTH for 100
max_pred_length: int = 0 # 0 = no limit; use DEFAULT_MAX_PRED_LENGTH for 100
def default_create_options() -> CreateOptions:
@@ -46,22 +46,14 @@ def _validate_payload(p: Payload, opts: CreateOptions) -> None:
raise ValueError("ect: jti must be UUID format")
if p.wid and not valid_uuid(p.wid):
raise ValueError("ect: wid must be UUID format when set")
max_par = opts.max_par_length or 0
if max_par > 0 and len(p.par) > max_par:
raise ValueError("ect: par exceeds max length")
max_pred = opts.max_pred_length or 0
if max_pred > 0 and len(p.pred) > max_pred:
raise ValueError("ect: pred exceeds max length")
if p.inp_hash:
validate_hash_format(p.inp_hash)
if p.out_hash:
validate_hash_format(p.out_hash)
validate_ext(p.ext)
# pol/pol_decision OPTIONAL; if either set, both must be present and valid
if p.pol or p.pol_decision:
if not p.pol or not p.pol_decision:
raise ValueError("ect: pol and pol_decision must both be present when either is set")
if not valid_pol_decision(p.pol_decision):
raise ValueError(
"ect: pol_decision must be approved, rejected, or pending_human_review"
)
# compensation in ext per spec
if p.ext and p.ext.get("compensation_reason") and not p.ext.get("compensation_required"):
raise ValueError("ect: ext.compensation_reason requires ext.compensation_required true")
@@ -73,8 +65,7 @@ def create(
opts: CreateOptions,
) -> str:
"""Build and sign an ECT. Payload must have required claims; iat/exp can be 0 for defaults.
create() may modify the payload in place (iat, exp, sub, par) when filling defaults;
pass a copy if the original must stay unchanged.
create() works on a deep copy so the caller's payload is not modified.
"""
if not opts.key_id:
raise ValueError("ect: KeyID required")
@@ -87,16 +78,14 @@ def create(
payload.iat = now
if payload.exp == 0:
payload.exp = now + (opts.default_expiry_sec or 600)
if not payload.sub:
payload.sub = payload.iss
if payload.par is None:
payload.par = []
if payload.pred is None:
payload.pred = []
_validate_payload(payload, opts)
claims = payload.to_claims()
headers = {
"typ": "wimse-exec+jwt",
"typ": ECT_TYPE,
"alg": "ES256",
"kid": opts.key_id,
}

View File

@@ -8,7 +8,7 @@ from typing import TYPE_CHECKING
if TYPE_CHECKING:
from ect.types import Payload
from ect.validate import DEFAULT_MAX_PAR_LENGTH
from ect.validate import DEFAULT_MAX_PRED_LENGTH
DEFAULT_CLOCK_SKEW_TOLERANCE = 30
DEFAULT_MAX_ANCESTOR_LIMIT = 10000
@@ -31,11 +31,11 @@ class DAGConfig:
self,
clock_skew_tolerance: int = DEFAULT_CLOCK_SKEW_TOLERANCE,
max_ancestor_limit: int = DEFAULT_MAX_ANCESTOR_LIMIT,
max_par_length: int = 0,
max_pred_length: int = 0,
):
self.clock_skew_tolerance = clock_skew_tolerance or DEFAULT_CLOCK_SKEW_TOLERANCE
self.max_ancestor_limit = max_ancestor_limit or DEFAULT_MAX_ANCESTOR_LIMIT
self.max_par_length = max_par_length or 0
self.max_pred_length = max_pred_length or 0
def default_dag_config() -> DAGConfig:
@@ -44,22 +44,22 @@ def default_dag_config() -> DAGConfig:
def _has_cycle(
target_tid: str,
parent_ids: list[str],
pred_ids: list[str],
store: ECTStore,
visited: set[str],
max_depth: int,
) -> bool:
if len(visited) >= max_depth:
return True
for parent_id in parent_ids:
if parent_id == target_tid:
for pred_id in pred_ids:
if pred_id == target_tid:
return True
if parent_id in visited:
if pred_id in visited:
continue
visited.add(parent_id)
parent = store.get_by_tid(parent_id)
if parent is not None:
if _has_cycle(target_tid, parent.par, store, visited, max_depth):
visited.add(pred_id)
pred = store.get_by_tid(pred_id)
if pred is not None:
if _has_cycle(target_tid, pred.pred, store, visited, max_depth):
return True
return False
@@ -69,29 +69,28 @@ def validate_dag(
store: ECTStore,
cfg: DAGConfig,
) -> None:
"""Section 6.2: uniqueness (by jti), parent existence, temporal ordering, acyclicity, parent policy."""
if cfg.max_par_length > 0 and len(payload.par) > cfg.max_par_length:
raise ValueError("ect: par exceeds max length")
"""Section 6.2: uniqueness (by jti), predecessor existence, temporal ordering, acyclicity, predecessor policy."""
if cfg.max_pred_length > 0 and len(payload.pred) > cfg.max_pred_length:
raise ValueError("ect: pred exceeds max length")
if store.contains(payload.jti, payload.wid or ""):
raise ValueError(f"ect: task ID (jti) already exists: {payload.jti}")
from ect.types import POL_DECISION_REJECTED, POL_DECISION_PENDING_HUMAN_REVIEW
for parent_id in payload.par:
parent = store.get_by_tid(parent_id)
if parent is None:
raise ValueError(f"ect: parent task not found: {parent_id}")
if parent.iat >= payload.iat + cfg.clock_skew_tolerance:
raise ValueError(f"ect: parent task not earlier than current: {parent_id}")
for pred_id in payload.pred:
pred = store.get_by_tid(pred_id)
if pred is None:
raise ValueError(f"ect: predecessor task not found: {pred_id}")
if pred.iat >= payload.iat + cfg.clock_skew_tolerance:
raise ValueError(f"ect: predecessor task not earlier than current: {pred_id}")
visited: set[str] = set()
if _has_cycle(payload.jti, payload.par, store, visited, cfg.max_ancestor_limit):
if _has_cycle(payload.jti, payload.pred, store, visited, cfg.max_ancestor_limit):
raise ValueError("ect: circular dependency or depth limit exceeded")
# Parent policy decision: only when parent has policy claims per spec
for parent_id in payload.par:
parent = store.get_by_tid(parent_id)
if parent and parent.has_policy_claims() and parent.pol_decision in (POL_DECISION_REJECTED, POL_DECISION_PENDING_HUMAN_REVIEW):
# Predecessor policy decision: only when predecessor has policy claims in ext per -01
for pred_id in payload.pred:
pred = store.get_by_tid(pred_id)
if pred and pred.has_policy_claims() and pred.pol_decision() in ("rejected", "pending_human_review"):
if not payload.compensation_required():
raise ValueError(
"ect: parent has non-approved pol_decision; current ECT must be compensation/remediation or have ext.compensation_required true"
"ect: predecessor has non-approved pol_decision; current ECT must be compensation/remediation or have ext.compensation_required true"
)

View File

@@ -23,7 +23,7 @@ class LedgerEntry:
task_id: str
agent_id: str
action: str
parents: list[str]
predecessors: list[str]
ect_jws: str
signature_verified: bool
verification_timestamp: float
@@ -70,7 +70,7 @@ class MemoryLedger(Ledger):
task_id=payload.jti,
agent_id=payload.iss,
action=payload.exec_act,
parents=list(payload.par) if payload.par else [],
predecessors=list(payload.pred) if payload.pred else [],
ect_jws=ect_jws,
signature_verified=True,
verification_timestamp=now,

View File

@@ -1,4 +1,4 @@
"""ECT payload and claim types per draft Section 4."""
"""ECT payload and claim types per draft-nennemann-wimse-ect-01 Section 4."""
from __future__ import annotations
@@ -6,19 +6,9 @@ import json
from dataclasses import dataclass, field
from typing import Any
ECT_TYPE = "wimse-exec+jwt"
POL_DECISION_APPROVED = "approved"
POL_DECISION_REJECTED = "rejected"
POL_DECISION_PENDING_HUMAN_REVIEW = "pending_human_review"
def valid_pol_decision(s: str) -> bool:
return s in (
POL_DECISION_APPROVED,
POL_DECISION_REJECTED,
POL_DECISION_PENDING_HUMAN_REVIEW,
)
# Preferred typ per -01; legacy accepted for backward compatibility.
ECT_TYPE = "exec+jwt"
ECT_TYPE_LEGACY = "wimse-exec+jwt"
def _audience_serialize(aud: list[str]) -> str | list[str]:
@@ -45,20 +35,15 @@ class Payload:
exp: int
jti: str
exec_act: str
par: list[str]
pol: str = ""
pol_decision: str = ""
sub: str = ""
pred: list[str] # predecessor jti values (renamed from par in -01)
wid: str = ""
pol_enforcer: str = ""
pol_timestamp: int = 0
inp_hash: str = ""
out_hash: str = ""
inp_classification: str = ""
ext: dict[str, Any] = field(default_factory=dict)
def to_claims(self) -> dict[str, Any]:
"""Export as JWT claims. Compensation in ext per spec."""
"""Export as JWT claims. Policy and compensation in ext per -01 spec."""
out: dict[str, Any] = {
"iss": self.iss,
"aud": _audience_serialize(self.aud),
@@ -66,20 +51,10 @@ class Payload:
"exp": self.exp,
"jti": self.jti,
"exec_act": self.exec_act,
"par": self.par,
"pred": self.pred,
}
if self.sub:
out["sub"] = self.sub
if self.wid:
out["wid"] = self.wid
if self.pol:
out["pol"] = self.pol
if self.pol_decision:
out["pol_decision"] = self.pol_decision
if self.pol_enforcer:
out["pol_enforcer"] = self.pol_enforcer
if self.pol_timestamp:
out["pol_timestamp"] = self.pol_timestamp
if self.inp_hash:
out["inp_hash"] = self.inp_hash
if self.out_hash:
@@ -87,13 +62,13 @@ class Payload:
if self.inp_classification:
out["inp_classification"] = self.inp_classification
if self.ext:
out["ext"] = dict(self.ext)
out["ect_ext"] = dict(self.ext)
return out
@classmethod
def from_claims(cls, claims: dict[str, Any]) -> Payload:
"""Build Payload from JWT claims. Compensation read from ext per spec."""
ext = claims.get("ext") or {}
"""Build Payload from JWT claims. Policy claims read from ext per -01 spec."""
ext = claims.get("ect_ext") or {}
return cls(
iss=claims["iss"],
aud=_audience_deserialize(claims["aud"]),
@@ -101,13 +76,8 @@ class Payload:
exp=int(claims["exp"]),
jti=claims["jti"],
exec_act=claims["exec_act"],
par=claims.get("par") or [],
pol=claims.get("pol", ""),
pol_decision=claims.get("pol_decision", ""),
sub=claims.get("sub", ""),
pred=claims.get("pred") or [],
wid=claims.get("wid", ""),
pol_enforcer=claims.get("pol_enforcer", ""),
pol_timestamp=int(claims.get("pol_timestamp") or 0),
inp_hash=claims.get("inp_hash", ""),
out_hash=claims.get("out_hash", ""),
inp_classification=claims.get("inp_classification", ""),
@@ -124,5 +94,13 @@ class Payload:
return bool(self.ext.get("compensation_required"))
def has_policy_claims(self) -> bool:
"""True if both pol and pol_decision are present (optional pair per spec)."""
return bool(self.pol and self.pol_decision)
"""True if both pol and pol_decision are present in ext (per -01, moved to extension)."""
if not self.ext:
return False
return bool(self.ext.get("pol")) and bool(self.ext.get("pol_decision"))
def pol_decision(self) -> str:
"""Return pol_decision from ext, or empty string."""
if not self.ext:
return ""
return str(self.ext.get("pol_decision", ""))

View File

@@ -9,14 +9,11 @@ from typing import Any
EXT_MAX_SIZE = 4096
EXT_MAX_DEPTH = 5
DEFAULT_MAX_PAR_LENGTH = 100
DEFAULT_MAX_PRED_LENGTH = 100
_UUID_RE = re.compile(
r"^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$"
)
_ALLOWED_HASH_ALGS = frozenset(("sha-256", "sha-384", "sha-512"))
def _json_depth(obj: Any, depth: int = 0) -> int:
if depth > EXT_MAX_DEPTH:
return depth
@@ -44,22 +41,22 @@ def valid_uuid(s: str) -> bool:
def validate_hash_format(s: str) -> None:
"""Raise ValueError if s is non-empty and not algorithm:base64url (sha-256, sha-384, sha-512)."""
"""Raise ValueError if s is non-empty and not plain base64url per RFC 9449 / ECT spec.
The ECT spec (draft-nennemann-wimse-ect-01) and RFC 9449 specify
``base64url(SHA-256(data))`` — a plain base64url string without any
algorithm prefix. This matches how ACT handles hashes.
"""
if not s:
return
idx = s.find(":")
if idx <= 0:
raise ValueError("ect: inp_hash/out_hash must be algorithm:base64url (e.g. sha-256:...)")
alg = s[:idx].lower()
if alg not in _ALLOWED_HASH_ALGS:
raise ValueError("ect: inp_hash/out_hash must be algorithm:base64url (e.g. sha-256:...)")
encoded = s[idx + 1:]
if not encoded:
raise ValueError("ect: inp_hash/out_hash must be algorithm:base64url (e.g. sha-256:...)")
pad = 4 - len(encoded) % 4
if pad != 4:
encoded += "=" * pad
# Reject strings containing non-base64url characters.
# base64url alphabet: A-Z a-z 0-9 - _ (no padding '=' expected)
if not re.fullmatch(r"[A-Za-z0-9_-]+", s):
raise ValueError("ect: inp_hash/out_hash must be plain base64url (no prefix)")
# Verify it actually decodes.
pad = 4 - len(s) % 4
padded = s + "=" * pad if pad != 4 else s
try:
base64.urlsafe_b64decode(encoded)
base64.urlsafe_b64decode(padded)
except Exception:
raise ValueError("ect: inp_hash/out_hash must be algorithm:base64url (e.g. sha-256:...)") from None
raise ValueError("ect: inp_hash/out_hash must be plain base64url (no prefix)") from None

View File

@@ -10,7 +10,7 @@ from typing import Callable, Optional
import jwt
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePublicKey
from ect.types import ECT_TYPE, Payload, valid_pol_decision
from ect.types import ECT_TYPE, ECT_TYPE_LEGACY, Payload
from ect.dag import ECTStore, DAGConfig, validate_dag
from ect.validate import validate_ext, validate_hash_format, valid_uuid
@@ -37,7 +37,7 @@ class VerifyOptions:
jti_seen: Optional[Callable[[str], bool]] = None
wit_subject: str = ""
validate_uuids: bool = False
max_par_length: int = 0 # 0 = no limit
max_pred_length: int = 0 # 0 = no limit
on_verify_attempt: Optional[Callable[[str, Optional[Exception]], None]] = None # (jti, err) for observability
@@ -83,8 +83,8 @@ def verify(compact: str, opts: VerifyOptions) -> ParsedECT:
def _verify_impl(compact: str, opts: VerifyOptions, set_log_jti: Callable[[str], None]) -> ParsedECT:
header = jwt.get_unverified_header(compact)
typ = header.get("typ") or ""
# Constant-time comparison for typ
if not hmac.compare_digest(typ, ECT_TYPE):
# Constant-time comparison for typ; accept both preferred and legacy values
if not hmac.compare_digest(typ, ECT_TYPE) and not hmac.compare_digest(typ, ECT_TYPE_LEGACY):
raise ValueError("ect: invalid typ parameter")
alg = header.get("alg")
if alg in ("none", "HS256", "HS384", "HS512"):
@@ -114,8 +114,8 @@ def _verify_impl(compact: str, opts: VerifyOptions, set_log_jti: Callable[[str],
set_log_jti(payload.jti)
validate_ext(payload.ext)
if opts.max_par_length > 0 and len(payload.par) > opts.max_par_length:
raise ValueError("ect: par exceeds max length")
if opts.max_pred_length > 0 and len(payload.pred) > opts.max_pred_length:
raise ValueError("ect: pred exceeds max length")
if opts.validate_uuids:
if not valid_uuid(payload.jti):
raise ValueError("ect: jti must be UUID format")
@@ -139,17 +139,11 @@ def _verify_impl(compact: str, opts: VerifyOptions, set_log_jti: Callable[[str],
if payload.iat > now + opts.iat_max_future_sec:
raise ValueError("ect: iat in the future")
# Required claims per spec: jti, exec_act, par. par may be set to [] when missing (from_claims already uses []).
# Required claims per spec: jti, exec_act, pred. pred may be set to [] when missing (from_claims already uses []).
if not payload.jti or not payload.exec_act:
raise ValueError("ect: missing required claims (jti, exec_act, par)")
if payload.par is None:
payload.par = []
# If pol or pol_decision present, both must be present and valid
if payload.pol or payload.pol_decision:
if not payload.pol or not payload.pol_decision:
raise ValueError("ect: pol and pol_decision must both be present when either is set")
if not valid_pol_decision(payload.pol_decision):
raise ValueError("ect: invalid pol_decision value")
raise ValueError("ect: missing required claims (jti, exec_act, pred)")
if payload.pred is None:
payload.pred = []
if opts.store is not None and opts.dag is not None:
validate_dag(payload, opts.store, opts.dag)

View File

@@ -1 +1 @@
{"iss":"spiffe://example.com/agent/clinical","sub":"spiffe://example.com/agent/clinical","aud":"spiffe://example.com/agent/safety","iat":1772064150,"exp":1772064750,"jti":"7f3a8b2c-d1e4-4f56-9a0b-c3d4e5f6a7b8","wid":"a0b1c2d3-e4f5-6789-abcd-ef0123456789","exec_act":"recommend_treatment","par":[],"pol":"clinical_reasoning_policy_v2","pol_decision":"approved"}
{"iss":"spiffe://example.com/agent/clinical","aud":"spiffe://example.com/agent/safety","iat":1772064150,"exp":1772064750,"jti":"7f3a8b2c-d1e4-4f56-9a0b-c3d4e5f6a7b8","wid":"a0b1c2d3-e4f5-6789-abcd-ef0123456789","exec_act":"recommend_treatment","pred":[],"ect_ext":{"pol":"clinical_reasoning_policy_v2","pol_decision":"approved"}}

View File

@@ -13,7 +13,6 @@ from ect import (
CreateOptions,
verify,
VerifyOptions,
POL_DECISION_APPROVED,
)
@@ -27,9 +26,7 @@ def test_create_roundtrip():
exp=now + 600,
jti="e4f5a6b7-c8d9-0123-ef01-234567890abc",
exec_act="review_spec",
par=[],
pol="spec_review_policy_v2",
pol_decision=POL_DECISION_APPROVED,
pred=[],
)
compact = create(payload, key, CreateOptions(key_id="agent-a-key-1"))
assert compact

View File

@@ -4,7 +4,7 @@ import time
import pytest
from ect import Payload, create, generate_key, CreateOptions, default_create_options, POL_DECISION_APPROVED
from ect import Payload, create, generate_key, CreateOptions, default_create_options
def test_default_create_options():
@@ -14,7 +14,7 @@ def test_default_create_options():
def test_create_errors():
key = generate_key()
p = Payload(iss="i", aud=["a"], iat=1, exp=2, jti="j", exec_act="e", par=[], pol="p", pol_decision=POL_DECISION_APPROVED)
p = Payload(iss="i", aud=["a"], iat=1, exp=2, jti="j", exec_act="e", pred=[])
with pytest.raises(ValueError, match="KeyID|required"):
create(p, key, CreateOptions(key_id=""))
with pytest.raises((ValueError, TypeError, AttributeError)):
@@ -26,7 +26,7 @@ def test_create_optional_pol():
now = int(time.time())
p = Payload(
iss="iss", aud=["a"], iat=now, exp=now + 3600,
jti="jti-nopol", exec_act="act", par=[],
jti="jti-nopol", exec_act="act", pred=[],
)
compact = create(p, key, CreateOptions(key_id="kid"))
assert compact
@@ -34,7 +34,7 @@ def test_create_optional_pol():
def test_create_validation_errors():
key = generate_key()
base = dict(iss="i", aud=["a"], iat=1, exp=2, jti="j", exec_act="e", par=[])
base = dict(iss="i", aud=["a"], iat=1, exp=2, jti="j", exec_act="e", pred=[])
with pytest.raises(ValueError, match="iss"):
create(Payload(**{**base, "iss": ""}), key, CreateOptions(key_id="k"))
with pytest.raises(ValueError, match="aud"):
@@ -43,16 +43,12 @@ def test_create_validation_errors():
create(Payload(**{**base, "jti": ""}), key, CreateOptions(key_id="k"))
with pytest.raises(ValueError, match="exec_act"):
create(Payload(**{**base, "exec_act": ""}), key, CreateOptions(key_id="k"))
with pytest.raises(ValueError, match="pol and pol_decision"):
create(Payload(**{**base, "pol": "p", "pol_decision": ""}), key, CreateOptions(key_id="k"))
with pytest.raises(ValueError, match="pol_decision"):
create(Payload(**{**base, "pol": "p", "pol_decision": "bad"}), key, CreateOptions(key_id="k"))
def test_create_ext_compensation_reason_requires_required():
key = generate_key()
p = Payload(
iss="i", aud=["a"], iat=1, exp=2, jti="j", exec_act="e", par=[],
iss="i", aud=["a"], iat=1, exp=2, jti="j", exec_act="e", pred=[],
ext={"compensation_reason": "rollback", "compensation_required": False},
)
with pytest.raises(ValueError, match="compensation_required"):
@@ -61,7 +57,7 @@ def test_create_ext_compensation_reason_requires_required():
def test_create_zero_expiry_uses_default():
key = generate_key()
p = Payload(iss="i", aud=["a"], iat=0, exp=0, jti="j", exec_act="e", par=[])
p = Payload(iss="i", aud=["a"], iat=0, exp=0, jti="j", exec_act="e", pred=[])
compact = create(p, key, CreateOptions(key_id="k", default_expiry_sec=300))
assert compact
# create() works on a copy; decode the token to verify defaults were applied
@@ -73,17 +69,17 @@ def test_create_zero_expiry_uses_default():
def test_create_validate_uuids_rejects_non_uuid_jti():
key = generate_key()
now = int(time.time())
p = Payload(iss="i", aud=["a"], iat=now, exp=now + 3600, jti="not-a-uuid", exec_act="e", par=[])
p = Payload(iss="i", aud=["a"], iat=now, exp=now + 3600, jti="not-a-uuid", exec_act="e", pred=[])
with pytest.raises(ValueError, match="jti must be UUID"):
create(p, key, CreateOptions(key_id="k", validate_uuids=True))
def test_create_max_par_length():
def test_create_max_pred_length():
key = generate_key()
now = int(time.time())
p = Payload(iss="i", aud=["a"], iat=now, exp=now + 3600, jti="550e8400-e29b-41d4-a716-446655440000", exec_act="e", par=["p1", "p2"])
with pytest.raises(ValueError, match="par exceeds max length"):
create(p, key, CreateOptions(key_id="k", max_par_length=1))
p = Payload(iss="i", aud=["a"], iat=now, exp=now + 3600, jti="550e8400-e29b-41d4-a716-446655440000", exec_act="e", pred=["p1", "p2"])
with pytest.raises(ValueError, match="pred exceeds max length"):
create(p, key, CreateOptions(key_id="k", max_pred_length=1))
def test_create_ext_size_rejected():
@@ -91,7 +87,7 @@ def test_create_ext_size_rejected():
key = generate_key()
now = int(time.time())
p = Payload(
iss="i", aud=["a"], iat=now, exp=now + 3600, jti="550e8400-e29b-41d4-a716-446655440000", exec_act="e", par=[],
iss="i", aud=["a"], iat=now, exp=now + 3600, jti="550e8400-e29b-41d4-a716-446655440000", exec_act="e", pred=[],
ext={"x": "y" * (EXT_MAX_SIZE - 5)},
)
with pytest.raises(ValueError, match="ext exceeds max size"):

View File

@@ -4,7 +4,7 @@ import time
import pytest
from ect import Payload, MemoryLedger, validate_dag, default_dag_config, POL_DECISION_APPROVED
from ect import Payload, MemoryLedger, validate_dag, default_dag_config
def test_validate_dag_root():
@@ -16,9 +16,7 @@ def test_validate_dag_root():
exp=0,
jti="jti-001",
exec_act="",
par=[],
pol="",
pol_decision=POL_DECISION_APPROVED,
pred=[],
wid="wf-1",
)
validate_dag(payload, store, default_dag_config())
@@ -33,9 +31,7 @@ def test_validate_dag_duplicate_jti():
exp=0,
jti="jti-001",
exec_act="a",
par=[],
pol="p",
pol_decision=POL_DECISION_APPROVED,
pred=[],
wid="wf-1",
)
store.append("dummy-jws", p)
@@ -46,16 +42,14 @@ def test_validate_dag_duplicate_jti():
exp=0,
jti="jti-001",
exec_act="",
par=[],
pol="",
pol_decision=POL_DECISION_APPROVED,
pred=[],
wid="wf-1",
)
with pytest.raises(ValueError, match="task ID.*already exists"):
validate_dag(payload, store, default_dag_config())
def test_validate_dag_parent_exists():
def test_validate_dag_pred_exists():
store = MemoryLedger()
now = int(time.time())
p = Payload(
@@ -65,9 +59,7 @@ def test_validate_dag_parent_exists():
exp=now + 600,
jti="jti-001",
exec_act="a",
par=[],
pol="p",
pol_decision=POL_DECISION_APPROVED,
pred=[],
wid="wf-1",
)
store.append("jws1", p)
@@ -78,15 +70,13 @@ def test_validate_dag_parent_exists():
exp=now + 600,
jti="jti-002",
exec_act="b",
par=["jti-001"],
pol="p",
pol_decision=POL_DECISION_APPROVED,
pred=["jti-001"],
wid="wf-1",
)
validate_dag(payload, store, default_dag_config())
def test_validate_dag_parent_not_found():
def test_validate_dag_pred_not_found():
store = MemoryLedger()
now = int(time.time())
payload = Payload(
@@ -96,26 +86,24 @@ def test_validate_dag_parent_not_found():
exp=now + 600,
jti="jti-002",
exec_act="",
par=["jti-missing"],
pol="",
pol_decision=POL_DECISION_APPROVED,
pred=["jti-missing"],
)
with pytest.raises(ValueError, match="parent task not found"):
with pytest.raises(ValueError, match="predecessor task not found"):
validate_dag(payload, store, default_dag_config())
def test_validate_dag_parent_policy_rejected_requires_compensation():
from ect import POL_DECISION_REJECTED
def test_validate_dag_pred_policy_rejected_requires_compensation():
store = MemoryLedger()
now = int(time.time())
p = Payload(
iss="x", aud=["y"], iat=now - 60, exp=now + 600,
jti="jti-rej", exec_act="a", par=[], pol="p", pol_decision=POL_DECISION_REJECTED, wid="wf-1",
jti="jti-rej", exec_act="a", pred=[], wid="wf-1",
ext={"pol": "p", "pol_decision": "rejected"},
)
store.append("jws1", p)
payload = Payload(
iss="", aud=[], iat=now, exp=now + 600,
jti="jti-child", exec_act="b", par=["jti-rej"], pol="p", pol_decision=POL_DECISION_APPROVED, wid="wf-1",
jti="jti-child", exec_act="b", pred=["jti-rej"], wid="wf-1",
)
with pytest.raises(ValueError, match="compensation"):
validate_dag(payload, store, default_dag_config())

View File

@@ -4,12 +4,12 @@ import time
import pytest
from ect import Payload, MemoryLedger, ErrTaskIDExists, POL_DECISION_APPROVED
from ect import Payload, MemoryLedger, ErrTaskIDExists
def test_ledger_append_and_get():
m = MemoryLedger()
p = Payload(iss="i", aud=["a"], iat=1, exp=2, jti="j1", exec_act="act", par=[])
p = Payload(iss="i", aud=["a"], iat=1, exp=2, jti="j1", exec_act="act", pred=[])
seq = m.append("jws1", p)
assert seq == 1
assert m.get_by_tid("j1").jti == "j1"
@@ -17,7 +17,7 @@ def test_ledger_append_and_get():
def test_ledger_err_task_id_exists():
m = MemoryLedger()
p = Payload(iss="i", aud=["a"], iat=1, exp=2, jti="j-dup", exec_act="e", par=[])
p = Payload(iss="i", aud=["a"], iat=1, exp=2, jti="j-dup", exec_act="e", pred=[])
m.append("jws1", p)
with pytest.raises(ErrTaskIDExists):
m.append("jws2", p)
@@ -25,7 +25,7 @@ def test_ledger_err_task_id_exists():
def test_ledger_contains_wid():
m = MemoryLedger()
p = Payload(iss="i", aud=["a"], iat=1, exp=2, jti="j1", exec_act="e", par=[], wid="wf1")
p = Payload(iss="i", aud=["a"], iat=1, exp=2, jti="j1", exec_act="e", pred=[], wid="wf1")
m.append("jws", p)
assert m.contains("j1", "") is True
assert m.contains("j1", "wf1") is True

View File

@@ -2,62 +2,63 @@
import pytest
from ect import Payload, POL_DECISION_APPROVED
from ect.types import valid_pol_decision
def test_valid_pol_decision():
assert valid_pol_decision("approved") is True
assert valid_pol_decision("rejected") is True
assert valid_pol_decision("pending_human_review") is True
assert valid_pol_decision("invalid") is False
from ect import Payload
def test_payload_contains_audience():
p = Payload(iss="", aud=["a", "b"], iat=0, exp=0, jti="", exec_act="", par=[])
p = Payload(iss="", aud=["a", "b"], iat=0, exp=0, jti="", exec_act="", pred=[])
assert p.contains_audience("a") is True
assert p.contains_audience("c") is False
def test_payload_compensation_required():
p = Payload(iss="", aud=[], iat=0, exp=0, jti="", exec_act="", par=[])
p = Payload(iss="", aud=[], iat=0, exp=0, jti="", exec_act="", pred=[])
assert p.compensation_required() is False
p.ext = {"compensation_required": True}
assert p.compensation_required() is True
def test_payload_has_policy_claims():
p = Payload(iss="", aud=[], iat=0, exp=0, jti="", exec_act="", par=[], pol="p", pol_decision=POL_DECISION_APPROVED)
p = Payload(iss="", aud=[], iat=0, exp=0, jti="", exec_act="", pred=[],
ext={"pol": "p", "pol_decision": "approved"})
assert p.has_policy_claims() is True
p.pol = ""
p.ext = {"pol_decision": "approved"}
assert p.has_policy_claims() is False
p.ext = None
assert p.has_policy_claims() is False
def test_payload_pol_decision():
p = Payload(iss="", aud=[], iat=0, exp=0, jti="", exec_act="", pred=[],
ext={"pol_decision": "rejected"})
assert p.pol_decision() == "rejected"
p.ext = None
assert p.pol_decision() == ""
def test_payload_to_claims_optional():
p = Payload(iss="i", aud=["a"], iat=1, exp=2, jti="j", exec_act="e", par=[], wid="wf")
p = Payload(iss="i", aud=["a"], iat=1, exp=2, jti="j", exec_act="e", pred=[], wid="wf")
claims = p.to_claims()
assert claims["wid"] == "wf"
assert "ext" not in claims or not claims.get("ext")
assert "ect_ext" not in claims or not claims.get("ect_ext")
def test_payload_from_claims_aud_string():
claims = {"iss": "i", "aud": "single", "iat": 1, "exp": 2, "jti": "j", "exec_act": "e", "par": []}
claims = {"iss": "i", "aud": "single", "iat": 1, "exp": 2, "jti": "j", "exec_act": "e", "pred": []}
p = Payload.from_claims(claims)
assert p.aud == ["single"]
def test_payload_to_claims_all_optional():
p = Payload(
iss="i", aud=["a"], iat=1, exp=2, jti="j", exec_act="e", par=[],
sub="s", wid="w", pol="p", pol_decision="approved", pol_enforcer="e",
pol_timestamp=1, inp_hash="h", out_hash="o", inp_classification="c",
iss="i", aud=["a"], iat=1, exp=2, jti="j", exec_act="e", pred=[],
wid="w", inp_hash="h", out_hash="o", inp_classification="c",
ext={"pol": "p", "pol_decision": "approved"},
)
claims = p.to_claims()
assert claims["sub"] == "s"
assert claims["wid"] == "w"
assert claims["pol"] == "p"
assert claims["pol_enforcer"] == "e"
assert claims["pol_timestamp"] == 1
assert claims["inp_hash"] == "h"
assert claims["out_hash"] == "o"
assert claims["inp_classification"] == "c"
assert claims["ect_ext"]["pol"] == "p"
assert claims["ect_ext"]["pol_decision"] == "approved"

View File

@@ -47,17 +47,18 @@ def test_validate_hash_format_empty():
def test_validate_hash_format_ok():
# sha-256:base64url (minimal valid)
validate_hash_format("sha-256:YQ")
validate_hash_format("sha-384:YQ")
validate_hash_format("sha-512:YQ")
# Plain base64url per RFC 9449 / ECT spec (no algorithm prefix)
validate_hash_format("YQ")
validate_hash_format("dBjftJeZ4CVP-mB92K27uhbUJU1p1r_wW1gFWFOEjXk")
validate_hash_format("abc123-_XYZ")
def test_validate_hash_format_bad():
with pytest.raises(ValueError, match="algorithm:base64url|inp_hash"):
validate_hash_format("md5:abc")
with pytest.raises(ValueError, match="algorithm:base64url|inp_hash"):
validate_hash_format("no-colon")
# Invalid base64 that triggers decode error (e.g. binary)
with pytest.raises(ValueError, match="algorithm:base64url|inp_hash"):
validate_hash_format("sha-256:YQ\x00") # null in payload
# Colon is not valid base64url — rejects old prefixed format
with pytest.raises(ValueError, match="plain base64url"):
validate_hash_format("sha-256:YQ")
with pytest.raises(ValueError, match="plain base64url"):
validate_hash_format("not valid!!")
# Null byte in payload
with pytest.raises(ValueError, match="plain base64url"):
validate_hash_format("YQ\x00")

View File

@@ -13,7 +13,6 @@ from ect import (
verify,
VerifyOptions,
default_verify_options,
POL_DECISION_APPROVED,
)
@@ -22,7 +21,7 @@ def test_parse():
now = int(time.time())
p = Payload(
iss="iss", aud=["a"], iat=now, exp=now + 3600,
jti="jti-parse", exec_act="act", par=[], pol="p", pol_decision=POL_DECISION_APPROVED,
jti="jti-parse", exec_act="act", pred=[],
)
compact = create(p, key, CreateOptions(key_id="kid"))
parsed = parse(compact)
@@ -41,7 +40,7 @@ def test_verify_expired():
now = int(time.time())
p = Payload(
iss="iss", aud=["v"], iat=now - 3600, exp=now - 60,
jti="jti-exp", exec_act="act", par=[], pol="p", pol_decision=POL_DECISION_APPROVED,
jti="jti-exp", exec_act="act", pred=[],
)
compact = create(p, key, CreateOptions(key_id="kid"))
resolver = lambda kid: key.public_key() if kid == "kid" else None
@@ -54,7 +53,7 @@ def test_verify_replay():
now = int(time.time())
p = Payload(
iss="iss", aud=["v"], iat=now, exp=now + 3600,
jti="jti-replay", exec_act="act", par=[], pol="p", pol_decision=POL_DECISION_APPROVED,
jti="jti-replay", exec_act="act", pred=[],
)
compact = create(p, key, CreateOptions(key_id="kid"))
resolver = lambda kid: key.public_key() if kid == "kid" else None
@@ -76,7 +75,7 @@ def test_verify_audience_mismatch():
now = int(time.time())
p = Payload(
iss="iss", aud=["other"], iat=now, exp=now + 3600,
jti="jti-a", exec_act="act", par=[], pol="p", pol_decision=POL_DECISION_APPROVED,
jti="jti-a", exec_act="act", pred=[],
)
compact = create(p, key, CreateOptions(key_id="kid"))
resolver = lambda kid: key.public_key() if kid == "kid" else None
@@ -89,7 +88,7 @@ def test_verify_wit_subject_mismatch():
now = int(time.time())
p = Payload(
iss="wrong-iss", aud=["v"], iat=now, exp=now + 3600,
jti="jti-w", exec_act="act", par=[], pol="p", pol_decision=POL_DECISION_APPROVED,
jti="jti-w", exec_act="act", pred=[],
)
compact = create(p, key, CreateOptions(key_id="kid"))
resolver = lambda kid: key.public_key() if kid == "kid" else None
@@ -104,7 +103,7 @@ def test_verify_iat_too_old():
now = int(time.time())
p = Payload(
iss="iss", aud=["v"], iat=now - 2000, exp=now + 3600,
jti="jti-old", exec_act="act", par=[], pol="p", pol_decision=POL_DECISION_APPROVED,
jti="jti-old", exec_act="act", pred=[],
)
compact = create(p, key, CreateOptions(key_id="kid"))
resolver = lambda kid: key.public_key() if kid == "kid" else None
@@ -119,7 +118,7 @@ def test_verify_unknown_key():
now = int(time.time())
p = Payload(
iss="iss", aud=["v"], iat=now, exp=now + 3600,
jti="jti-k", exec_act="act", par=[], pol="p", pol_decision=POL_DECISION_APPROVED,
jti="jti-k", exec_act="act", pred=[],
)
compact = create(p, key, CreateOptions(key_id="kid"))
resolver = lambda kid: None # unknown key
@@ -132,7 +131,7 @@ def test_verify_resolve_key_required():
now = int(time.time())
p = Payload(
iss="iss", aud=["v"], iat=now, exp=now + 3600,
jti="jti-r", exec_act="act", par=[], pol="p", pol_decision=POL_DECISION_APPROVED,
jti="jti-r", exec_act="act", pred=[],
)
compact = create(p, key, CreateOptions(key_id="kid"))
with pytest.raises(ValueError, match="ResolveKey"):
@@ -146,7 +145,7 @@ def test_verify_with_dag():
now = int(time.time())
root = Payload(
iss="iss", aud=["v"], iat=now, exp=now + 3600,
jti="jti-root", exec_act="act", par=[], pol="p", pol_decision=POL_DECISION_APPROVED,
jti="jti-root", exec_act="act", pred=[],
)
compact_root = create(root, key, CreateOptions(key_id="kid"))
resolver = lambda kid: key.public_key() if kid == "kid" else None
@@ -155,7 +154,7 @@ def test_verify_with_dag():
ledger.append(compact_root, parsed.payload)
child = Payload(
iss="iss", aud=["v"], iat=now + 1, exp=now + 3600,
jti="jti-child", exec_act="act2", par=["jti-root"], pol="p", pol_decision=POL_DECISION_APPROVED,
jti="jti-child", exec_act="act2", pred=["jti-root"],
)
compact_child = create(child, key, CreateOptions(key_id="kid"))
parsed2 = verify(compact_child, opts)
@@ -166,7 +165,7 @@ def test_on_verify_attempt_callback():
"""Observability: on_verify_attempt is called with jti and error (or None)."""
key = generate_key()
now = int(time.time())
p = Payload(iss="i", aud=["v"], iat=now, exp=now + 3600, jti="jti-obs", exec_act="a", par=[])
p = Payload(iss="i", aud=["v"], iat=now, exp=now + 3600, jti="jti-obs", exec_act="a", pred=[])
compact = create(p, key, CreateOptions(key_id="kid"))
resolver = lambda k: key.public_key() if k == "kid" else None
seen = []
@@ -183,7 +182,7 @@ def test_on_verify_attempt_callback():
def test_on_verify_attempt_called_on_failure():
key = generate_key()
now = int(time.time())
p = Payload(iss="i", aud=["v"], iat=now, exp=now - 1, jti="jti-fail", exec_act="a", par=[])
p = Payload(iss="i", aud=["v"], iat=now, exp=now - 1, jti="jti-fail", exec_act="a", pred=[])
compact = create(p, key, CreateOptions(key_id="kid"))
resolver = lambda k: key.public_key() if k == "kid" else None
seen = []
@@ -193,5 +192,3 @@ def test_on_verify_attempt_called_on_failure():
assert len(seen) == 1
assert seen[0][0] == "jti-fail"
assert seen[0][1] is not None