End-to-end PoC demonstrating Agent Context Token authorization and Execution Context Token accountability over MCP tool calls, using a LangGraph agent with ES256-signed JWT tokens and DAG verification.
85 lines
2.8 KiB
Python
85 lines
2.8 KiB
Python
"""RFC-9421-shaped HTTP signature round-trip and tamper-detection."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import pytest
|
|
|
|
from act.errors import ACTSignatureError
|
|
|
|
from poc.http_sig import sign_request, verify_request
|
|
|
|
|
|
def _sign_verify_ok(identities, body: bytes):
|
|
agent = identities["agent"]
|
|
target = "http://127.0.0.1:8765/mcp"
|
|
signed = sign_request(
|
|
method="POST",
|
|
target_uri=target,
|
|
body=body,
|
|
wimse_ect="ect.placeholder.compact",
|
|
wimse_aud=identities["mcp-server"].name,
|
|
keyid=agent.kid,
|
|
private_key=agent.private_key,
|
|
)
|
|
parsed = verify_request(
|
|
method="POST",
|
|
target_uri=target,
|
|
body=body,
|
|
wimse_ect_header="ect.placeholder.compact",
|
|
content_digest_header=signed.content_digest,
|
|
signature_input_header=signed.signature_input,
|
|
signature_header=signed.signature,
|
|
expected_audience=identities["mcp-server"].name,
|
|
public_key=agent.public_key,
|
|
)
|
|
return signed, parsed
|
|
|
|
|
|
def test_signature_round_trips(identities):
|
|
signed, parsed = _sign_verify_ok(identities, body=b'{"method":"tools/call"}')
|
|
assert parsed.keyid == identities["agent"].kid
|
|
assert parsed.wimse_aud == "mcp-server"
|
|
assert parsed.alg == "ecdsa-p256-sha256"
|
|
|
|
|
|
def test_signature_fails_on_tampered_body(identities):
|
|
agent = identities["agent"]
|
|
signed, _ = _sign_verify_ok(identities, body=b"original")
|
|
with pytest.raises(ACTSignatureError):
|
|
verify_request(
|
|
method="POST",
|
|
target_uri="http://127.0.0.1:8765/mcp",
|
|
body=b"tampered", # different body → different digest → no match
|
|
wimse_ect_header="ect.placeholder.compact",
|
|
content_digest_header=signed.content_digest,
|
|
signature_input_header=signed.signature_input,
|
|
signature_header=signed.signature,
|
|
expected_audience="mcp-server",
|
|
public_key=agent.public_key,
|
|
)
|
|
|
|
|
|
def test_signature_fails_on_wrong_audience(identities):
|
|
agent = identities["agent"]
|
|
signed = sign_request(
|
|
method="POST",
|
|
target_uri="http://example/mcp",
|
|
body=b"{}",
|
|
wimse_ect="ect.placeholder",
|
|
wimse_aud="the-wrong-workload", # signed for the wrong audience
|
|
keyid=agent.kid,
|
|
private_key=agent.private_key,
|
|
)
|
|
with pytest.raises(ACTSignatureError):
|
|
verify_request(
|
|
method="POST",
|
|
target_uri="http://example/mcp",
|
|
body=b"{}",
|
|
wimse_ect_header="ect.placeholder",
|
|
content_digest_header=signed.content_digest,
|
|
signature_input_header=signed.signature_input,
|
|
signature_header=signed.signature,
|
|
expected_audience="mcp-server",
|
|
public_key=agent.public_key,
|
|
)
|