test(e2e): add auth failure, message ordering, OPAQUE flow, key exhaustion, and rate limit tests

This commit is contained in:
2026-03-04 13:33:21 +01:00
parent 3a42130518
commit 4013b223ff

View File

@@ -1540,3 +1540,401 @@ async fn e2e_blob_hash_mismatch() -> anyhow::Result<()> {
Ok(()) Ok(())
} }
// ─── Phase 2.1 tests: auth failure, message ordering, OPAQUE, key exhaustion, rate limit ─────
/// Spawns a server with fully custom args (no default --auth-token / --allow-insecure-auth).
fn spawn_server_custom(base: &std::path::Path, args: &[&str]) -> (String, PathBuf, ChildGuard) {
let port = pick_unused_port().expect("free port");
let listen = format!("127.0.0.1:{port}");
let ca_cert = base.join("server-cert.der");
let tls_key = base.join("server-key.der");
let data_dir = base.join("data");
let server_bin = cargo_bin("qpq-server");
let mut cmd = Command::new(server_bin);
cmd.arg("--listen")
.arg(&listen)
.arg("--data-dir")
.arg(&data_dir)
.arg("--tls-cert")
.arg(&ca_cert)
.arg("--tls-key")
.arg(&tls_key);
for arg in args {
cmd.arg(arg);
}
let child = cmd.spawn().expect("spawn server");
(listen, ca_cert, ChildGuard(child))
}
/// Using a wrong auth token should cause operations to fail with an auth error.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn e2e_auth_failure_wrong_token() -> anyhow::Result<()> {
ensure_rustls_provider();
let temp = TempDir::new()?;
let base = temp.path();
let (server, ca_cert, _child) = spawn_server(base, &["--sealed-sender"]);
wait_for_health(&server, &ca_cert, "localhost").await?;
// Set auth to a WRONG token (server expects "devtoken").
init_auth(ClientAuth::from_parts("wrongtoken".to_string(), None));
let local = tokio::task::LocalSet::new();
let state_path = base.join("user.bin");
// Try to register state with the wrong token — should fail with auth error.
let result = local
.run_until(cmd_register_state(
&state_path,
&server,
&ca_cert,
"localhost",
None,
))
.await;
match result {
Ok(_) => anyhow::bail!("register_state with wrong token should have been rejected"),
Err(e) => {
let msg = format!("{e:#}");
anyhow::ensure!(
msg.contains("auth")
|| msg.contains("E001")
|| msg.contains("E003")
|| msg.contains("token")
|| msg.contains("Token")
|| msg.contains("denied")
|| msg.contains("unauthorized")
|| msg.contains("Unauthorized"),
"expected auth-related error, got: {msg}"
);
}
}
Ok(())
}
/// Send 10 ordered messages and verify they arrive in the correct order.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn e2e_message_ordering_preserved() -> anyhow::Result<()> {
ensure_rustls_provider();
let temp = TempDir::new()?;
let base = temp.path();
let auth_token = "devtoken";
let (server, ca_cert, _child) = spawn_server(base, &["--sealed-sender"]);
wait_for_health(&server, &ca_cert, "localhost").await?;
init_auth(ClientAuth::from_parts(auth_token.to_string(), None));
let local = tokio::task::LocalSet::new();
let a_state = base.join("a.bin");
let b_state = base.join("b.bin");
local
.run_until(cmd_register_state(&a_state, &server, &ca_cert, "localhost", None))
.await?;
local
.run_until(cmd_register_state(&b_state, &server, &ca_cert, "localhost", None))
.await?;
let b_bytes = std::fs::read(&b_state)?;
let b_compat: StoredStateCompat = bincode::deserialize(&b_bytes)?;
let b_pk_hex = hex_encode(&IdentityKeypair::from_seed(b_compat.identity_seed).public_key_bytes());
// A creates group, invites B, B joins.
local
.run_until(cmd_create_group(&a_state, &server, "order-test", None))
.await?;
local
.run_until(cmd_invite(&a_state, &server, &ca_cert, "localhost", &b_pk_hex, None))
.await?;
local
.run_until(cmd_join(&b_state, &server, &ca_cert, "localhost", None))
.await?;
// A sends 10 messages with bodies "msg-0" through "msg-9".
for i in 0..10u32 {
let body = format!("msg-{i}");
local
.run_until(cmd_send(
&a_state,
&server,
&ca_cert,
"localhost",
Some(&b_pk_hex),
false,
&body,
None,
))
.await?;
}
sleep(Duration::from_millis(300)).await;
// B fetches all messages and verifies ordering.
let plaintexts = local
.run_until(receive_pending_plaintexts(
&b_state,
&server,
&ca_cert,
"localhost",
2000,
None,
))
.await?;
// Filter to only the msg-N messages (there may be MLS group state messages).
let ordered: Vec<String> = plaintexts
.iter()
.filter_map(|p| {
let s = String::from_utf8_lossy(p);
if s.starts_with("msg-") {
Some(s.to_string())
} else {
None
}
})
.collect();
anyhow::ensure!(
ordered.len() == 10,
"expected 10 msg-N messages, got {} : {:?}",
ordered.len(),
ordered
);
for (i, msg) in ordered.iter().enumerate() {
let expected = format!("msg-{i}");
anyhow::ensure!(
*msg == expected,
"message at position {i} is '{msg}', expected '{expected}'; full sequence: {:?}",
ordered
);
}
Ok(())
}
/// Full OPAQUE register + login flow on an OPAQUE-only server (no --allow-insecure-auth).
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn e2e_opaque_register_login_full_flow() -> anyhow::Result<()> {
ensure_rustls_provider();
let _auth = AUTH_LOCK.lock().unwrap();
let temp = TempDir::new()?;
let base = temp.path();
// Server with auth token but WITHOUT --allow-insecure-auth (OPAQUE required for identity).
let (server, ca_cert, _child) =
spawn_server_custom(base, &["--auth-token", "devtoken", "--sealed-sender"]);
wait_for_health(&server, &ca_cert, "localhost").await?;
let local = tokio::task::LocalSet::new();
// Register user via OPAQUE (register RPCs are unauthenticated).
let alice_identity = IdentityKeypair::generate();
let alice_pk = alice_identity.public_key_bytes().to_vec();
let alice_pk_hex = hex_encode(&alice_pk);
local
.run_until(cmd_register_user(
&server,
&ca_cert,
"localhost",
"alice",
"correct-password",
Some(&alice_pk_hex),
))
.await?;
// Login with correct credentials.
let client = local.run_until(connect_node(&server, &ca_cert, "localhost")).await?;
let session_token = local
.run_until(opaque_login(&client, "alice", "correct-password", &alice_pk))
.await?;
anyhow::ensure!(
!session_token.is_empty(),
"OPAQUE login must return a non-empty session token"
);
// Verify the session works: set it as auth and call resolve_user on ourselves.
init_auth(ClientAuth::from_raw(session_token, None));
let resolved = local.run_until(resolve_user(&client, "alice")).await?;
anyhow::ensure!(
resolved == Some(alice_pk),
"resolve_user('alice') must return alice's identity key after OPAQUE login"
);
Ok(())
}
/// OPAQUE login with wrong password must fail.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn e2e_opaque_login_wrong_password() -> anyhow::Result<()> {
ensure_rustls_provider();
let _auth = AUTH_LOCK.lock().unwrap();
let temp = TempDir::new()?;
let base = temp.path();
let (server, ca_cert, _child) =
spawn_server_custom(base, &["--auth-token", "devtoken", "--sealed-sender"]);
wait_for_health(&server, &ca_cert, "localhost").await?;
let local = tokio::task::LocalSet::new();
let alice_identity = IdentityKeypair::generate();
let alice_pk = alice_identity.public_key_bytes().to_vec();
let alice_pk_hex = hex_encode(&alice_pk);
// Register with correct password.
local
.run_until(cmd_register_user(
&server,
&ca_cert,
"localhost",
"alice",
"correct-password",
Some(&alice_pk_hex),
))
.await?;
// Try login with WRONG password.
let client = local.run_until(connect_node(&server, &ca_cert, "localhost")).await?;
let result = local
.run_until(opaque_login(&client, "alice", "wrong-password", &alice_pk))
.await;
match result {
Ok(_) => anyhow::bail!("OPAQUE login with wrong password should have failed"),
Err(e) => {
let msg = format!("{e:#}");
anyhow::ensure!(
msg.contains("password")
|| msg.contains("OPAQUE")
|| msg.contains("login")
|| msg.contains("credential")
|| msg.contains("InvalidLogin")
|| msg.contains("finish"),
"expected OPAQUE login failure, got: {msg}"
);
}
}
Ok(())
}
/// After A's single KeyPackage is consumed by an invite, fetching it again must return empty
/// (graceful degradation, not a panic).
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn e2e_keypackage_exhaustion_graceful() -> anyhow::Result<()> {
ensure_rustls_provider();
let temp = TempDir::new()?;
let base = temp.path();
let (server, ca_cert, _child) = spawn_server(base, &["--sealed-sender"]);
wait_for_health(&server, &ca_cert, "localhost").await?;
init_auth(ClientAuth::from_parts("devtoken".to_string(), None));
let local = tokio::task::LocalSet::new();
// Register A (uploads 1 KeyPackage).
let a_state = base.join("a.bin");
local
.run_until(cmd_register_state(&a_state, &server, &ca_cert, "localhost", None))
.await?;
let a_bytes = std::fs::read(&a_state)?;
let a_compat: StoredStateCompat = bincode::deserialize(&a_bytes)?;
let a_pk = IdentityKeypair::from_seed(a_compat.identity_seed).public_key_bytes();
// Register B, create group, invite A (consumes A's KeyPackage).
let b_state = base.join("b.bin");
local
.run_until(cmd_register_state(&b_state, &server, &ca_cert, "localhost", None))
.await?;
local
.run_until(cmd_create_group(&b_state, &server, "exhaust-test", None))
.await?;
let a_pk_hex = hex_encode(&a_pk);
local
.run_until(cmd_invite(&b_state, &server, &ca_cert, "localhost", &a_pk_hex, None))
.await?;
// Now try to fetch A's KeyPackage again — it should be exhausted.
let client = local.run_until(connect_node(&server, &ca_cert, "localhost")).await?;
let pkg = local
.run_until(quicproquo_client::client::rpc::fetch_key_package(&client, &a_pk))
.await?;
// Graceful: either empty (no package available) or an error — but NOT a panic.
anyhow::ensure!(
pkg.is_empty(),
"expected empty KeyPackage after exhaustion, got {} bytes",
pkg.len()
);
Ok(())
}
/// Sending more than 100 enqueues in the rate-limit window must eventually be rejected.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn e2e_rate_limiting_rejects_excess() -> anyhow::Result<()> {
ensure_rustls_provider();
let temp = TempDir::new()?;
let base = temp.path();
let (server, ca_cert, _child) = spawn_server(base, &["--sealed-sender"]);
wait_for_health(&server, &ca_cert, "localhost").await?;
init_auth(ClientAuth::from_parts("devtoken".to_string(), None));
let local = tokio::task::LocalSet::new();
// Register a recipient.
let state_path = base.join("recipient.bin");
local
.run_until(cmd_register_state(&state_path, &server, &ca_cert, "localhost", None))
.await?;
let state_bytes = std::fs::read(&state_path)?;
let stored: StoredStateCompat = bincode::deserialize(&state_bytes)?;
let recipient_key = IdentityKeypair::from_seed(stored.identity_seed).public_key_bytes();
let client = local.run_until(connect_node(&server, &ca_cert, "localhost")).await?;
// Send 110 enqueues rapidly — the server limit is 100 per 60s window.
let mut hit_rate_limit = false;
for i in 0u32..110 {
let payload = format!("spam-{i}");
let result = local
.run_until(enqueue(&client, &recipient_key, payload.as_bytes()))
.await;
if let Err(e) = result {
let msg = format!("{e:#}");
if msg.contains("rate limit") || msg.contains("E014") || msg.contains("Rate") {
hit_rate_limit = true;
break;
}
// Other errors (e.g. connection reset) might happen after rate limit — accept them.
hit_rate_limit = true;
break;
}
}
anyhow::ensure!(
hit_rate_limit,
"expected rate limit error after 100+ enqueues, but all 110 succeeded"
);
Ok(())
}