- Add uploadBlob (@21) and downloadBlob (@22) RPCs to Cap'n Proto schema with SHA-256 content addressing and chunked transfer - Server blob handler: 256KB chunks, SHA-256 verification on finalize, .meta JSON sidecar, 50MB size limit, content-addressable storage - Add FileRef (0x08) AppMessage variant with blob_id, filename, file_size, mime_type - /send-file command: read file, compute hash, upload in chunks with progress display, send FileRef via MLS, MIME auto-detection - /download command: fetch blob in chunks with progress, verify hash, save to disk with collision avoidance - 2 new E2E tests: upload/download round-trip with partial reads, hash mismatch rejection (14 E2E tests total) - New error codes: E024-E027 for blob operations
1543 lines
52 KiB
Rust
1543 lines
52 KiB
Rust
// cargo_bin! only works for current package's binary; we spawn qpq-server from another package.
|
|
#![allow(deprecated)]
|
|
|
|
use std::{path::PathBuf, process::Command, sync::Mutex, time::Duration};
|
|
|
|
use assert_cmd::cargo::cargo_bin;
|
|
use portpicker::pick_unused_port;
|
|
use rand::RngCore;
|
|
use tempfile::TempDir;
|
|
use tokio::time::sleep;
|
|
use hex;
|
|
|
|
// Required by rustls 0.23 when QUIC/TLS is used from this process (e.g. client in test).
|
|
fn ensure_rustls_provider() {
|
|
let _ = rustls::crypto::ring::default_provider().install_default();
|
|
}
|
|
|
|
use sha2::{Sha256, Digest};
|
|
|
|
use quicproquo_client::{
|
|
cmd_create_group, cmd_invite, cmd_join, cmd_login, cmd_ping, cmd_register_state,
|
|
cmd_register_user, cmd_send, connect_node, create_channel, enqueue, fetch_wait, init_auth,
|
|
opaque_login, receive_pending_plaintexts, resolve_user, ClientAuth,
|
|
};
|
|
use quicproquo_core::{GroupMember, HybridKeypair, IdentityKeypair, ReceivedMessage};
|
|
|
|
/// Serialises all tests that call `init_auth` with a non-devtoken session to prevent
|
|
/// the global `AUTH_CONTEXT` from being overwritten by concurrent tests.
|
|
static AUTH_LOCK: Mutex<()> = Mutex::new(());
|
|
|
|
fn hex_encode(bytes: &[u8]) -> String {
|
|
bytes.iter().map(|b| format!("{b:02x}")).collect()
|
|
}
|
|
|
|
#[derive(serde::Deserialize)]
|
|
struct StoredStateCompat {
|
|
identity_seed: [u8; 32],
|
|
#[allow(dead_code)]
|
|
group: Option<Vec<u8>>,
|
|
}
|
|
|
|
struct ChildGuard(std::process::Child);
|
|
impl Drop for ChildGuard {
|
|
fn drop(&mut self) {
|
|
let _ = self.0.kill();
|
|
}
|
|
}
|
|
|
|
async fn wait_for_health(server: &str, ca_cert: &PathBuf, server_name: &str) -> anyhow::Result<()> {
|
|
let local = tokio::task::LocalSet::new();
|
|
for _ in 0..30 {
|
|
if local
|
|
.run_until(cmd_ping(server, ca_cert, server_name))
|
|
.await
|
|
.is_ok()
|
|
{
|
|
return Ok(());
|
|
}
|
|
sleep(Duration::from_millis(200)).await;
|
|
}
|
|
anyhow::bail!("server health never became ready")
|
|
}
|
|
|
|
/// Spawns a server with the given extra args and returns (listen_addr, ca_cert_path, ChildGuard).
|
|
fn spawn_server(base: &std::path::Path, extra_args: &[&str]) -> (String, PathBuf, ChildGuard) {
|
|
let port = pick_unused_port().expect("free port");
|
|
let listen = format!("127.0.0.1:{port}");
|
|
let ca_cert = base.join("server-cert.der");
|
|
let tls_key = base.join("server-key.der");
|
|
let data_dir = base.join("data");
|
|
|
|
let server_bin = cargo_bin("qpq-server");
|
|
let mut cmd = Command::new(server_bin);
|
|
cmd.arg("--listen")
|
|
.arg(&listen)
|
|
.arg("--data-dir")
|
|
.arg(&data_dir)
|
|
.arg("--tls-cert")
|
|
.arg(&ca_cert)
|
|
.arg("--tls-key")
|
|
.arg(&tls_key)
|
|
.arg("--auth-token")
|
|
.arg("devtoken")
|
|
.arg("--allow-insecure-auth");
|
|
for arg in extra_args {
|
|
cmd.arg(arg);
|
|
}
|
|
let child = cmd.spawn().expect("spawn server");
|
|
(listen, ca_cert, ChildGuard(child))
|
|
}
|
|
|
|
// ─── existing tests (fixed: add --sealed-sender so enqueue works with bearer token) ─────────────
|
|
|
|
/// Creator and joiner register; creator creates group and invites joiner; joiner joins;
|
|
/// creator sends a message; assert joiner's mailbox receives it.
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn e2e_happy_path_register_invite_join_send_recv() -> anyhow::Result<()> {
|
|
ensure_rustls_provider();
|
|
|
|
let temp = TempDir::new()?;
|
|
let base = temp.path();
|
|
let auth_token = "devtoken";
|
|
|
|
let (server, ca_cert, _child) = spawn_server(base, &["--sealed-sender"]);
|
|
|
|
wait_for_health(&server, &ca_cert, "localhost").await?;
|
|
init_auth(ClientAuth::from_parts(auth_token.to_string(), None));
|
|
|
|
let local = tokio::task::LocalSet::new();
|
|
|
|
let creator_state = base.join("creator.bin");
|
|
let joiner_state = base.join("joiner.bin");
|
|
|
|
local
|
|
.run_until(cmd_register_state(
|
|
&creator_state,
|
|
&server,
|
|
&ca_cert,
|
|
"localhost",
|
|
None,
|
|
))
|
|
.await?;
|
|
|
|
local
|
|
.run_until(cmd_register_state(
|
|
&joiner_state,
|
|
&server,
|
|
&ca_cert,
|
|
"localhost",
|
|
None,
|
|
))
|
|
.await?;
|
|
|
|
local
|
|
.run_until(cmd_create_group(&creator_state, &server, "test-group", None))
|
|
.await?;
|
|
|
|
let joiner_bytes = std::fs::read(&joiner_state)?;
|
|
let joiner_state_compat: StoredStateCompat = bincode::deserialize(&joiner_bytes)?;
|
|
let joiner_identity = IdentityKeypair::from_seed(joiner_state_compat.identity_seed);
|
|
let joiner_pk_hex = hex_encode(&joiner_identity.public_key_bytes());
|
|
|
|
local
|
|
.run_until(cmd_invite(
|
|
&creator_state,
|
|
&server,
|
|
&ca_cert,
|
|
"localhost",
|
|
&joiner_pk_hex,
|
|
None,
|
|
))
|
|
.await?;
|
|
|
|
local
|
|
.run_until(cmd_join(&joiner_state, &server, &ca_cert, "localhost", None))
|
|
.await?;
|
|
|
|
local
|
|
.run_until(cmd_send(
|
|
&creator_state,
|
|
&server,
|
|
&ca_cert,
|
|
"localhost",
|
|
Some(&joiner_pk_hex),
|
|
false,
|
|
"hello",
|
|
None,
|
|
))
|
|
.await?;
|
|
|
|
local
|
|
.run_until(async {
|
|
let client = connect_node(&server, &ca_cert, "localhost").await?;
|
|
let payloads = fetch_wait(&client, &joiner_identity.public_key_bytes(), 1000).await?;
|
|
anyhow::ensure!(!payloads.is_empty(), "no payloads delivered to joiner");
|
|
Ok::<(), anyhow::Error>(())
|
|
})
|
|
.await?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Three-party group: A creates group, invites B then C; B and C join; A sends, B and C receive;
|
|
/// B sends, A and C receive.
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn e2e_three_party_group_invite_join_send_recv() -> anyhow::Result<()> {
|
|
ensure_rustls_provider();
|
|
|
|
let temp = TempDir::new()?;
|
|
let base = temp.path();
|
|
let auth_token = "devtoken";
|
|
|
|
let (server, ca_cert, _child) = spawn_server(base, &["--sealed-sender"]);
|
|
|
|
wait_for_health(&server, &ca_cert, "localhost").await?;
|
|
init_auth(ClientAuth::from_parts(auth_token.to_string(), None));
|
|
|
|
let local = tokio::task::LocalSet::new();
|
|
|
|
let creator_state = base.join("creator.bin");
|
|
let b_state = base.join("b.bin");
|
|
let c_state = base.join("c.bin");
|
|
|
|
local
|
|
.run_until(cmd_register_state(
|
|
&creator_state,
|
|
&server,
|
|
&ca_cert,
|
|
"localhost",
|
|
None,
|
|
))
|
|
.await?;
|
|
local
|
|
.run_until(cmd_register_state(
|
|
&b_state,
|
|
&server,
|
|
&ca_cert,
|
|
"localhost",
|
|
None,
|
|
))
|
|
.await?;
|
|
local
|
|
.run_until(cmd_register_state(
|
|
&c_state,
|
|
&server,
|
|
&ca_cert,
|
|
"localhost",
|
|
None,
|
|
))
|
|
.await?;
|
|
|
|
let b_bytes = std::fs::read(&b_state)?;
|
|
let b_compat: StoredStateCompat = bincode::deserialize(&b_bytes)?;
|
|
let b_pk_hex = hex_encode(&IdentityKeypair::from_seed(b_compat.identity_seed).public_key_bytes());
|
|
|
|
let c_bytes = std::fs::read(&c_state)?;
|
|
let c_compat: StoredStateCompat = bincode::deserialize(&c_bytes)?;
|
|
let c_pk_hex = hex_encode(&IdentityKeypair::from_seed(c_compat.identity_seed).public_key_bytes());
|
|
|
|
local
|
|
.run_until(cmd_create_group(&creator_state, &server, "test-group", None))
|
|
.await?;
|
|
|
|
local
|
|
.run_until(cmd_invite(
|
|
&creator_state,
|
|
&server,
|
|
&ca_cert,
|
|
"localhost",
|
|
&b_pk_hex,
|
|
None,
|
|
))
|
|
.await?;
|
|
|
|
local
|
|
.run_until(cmd_invite(
|
|
&creator_state,
|
|
&server,
|
|
&ca_cert,
|
|
"localhost",
|
|
&c_pk_hex,
|
|
None,
|
|
))
|
|
.await?;
|
|
|
|
local
|
|
.run_until(cmd_join(&b_state, &server, &ca_cert, "localhost", None))
|
|
.await?;
|
|
local
|
|
.run_until(cmd_join(&c_state, &server, &ca_cert, "localhost", None))
|
|
.await?;
|
|
|
|
local
|
|
.run_until(cmd_send(
|
|
&creator_state,
|
|
&server,
|
|
&ca_cert,
|
|
"localhost",
|
|
None,
|
|
true,
|
|
"hello",
|
|
None,
|
|
))
|
|
.await?;
|
|
|
|
sleep(Duration::from_millis(150)).await;
|
|
|
|
let b_plaintexts = local
|
|
.run_until(receive_pending_plaintexts(
|
|
&b_state,
|
|
&server,
|
|
&ca_cert,
|
|
"localhost",
|
|
1500,
|
|
None,
|
|
))
|
|
.await?;
|
|
let c_plaintexts = local
|
|
.run_until(receive_pending_plaintexts(
|
|
&c_state,
|
|
&server,
|
|
&ca_cert,
|
|
"localhost",
|
|
1500,
|
|
None,
|
|
))
|
|
.await?;
|
|
anyhow::ensure!(
|
|
b_plaintexts.iter().any(|p| p.as_slice() == b"hello"),
|
|
"B did not receive 'hello', got {:?}",
|
|
b_plaintexts
|
|
);
|
|
anyhow::ensure!(
|
|
c_plaintexts.iter().any(|p| p.as_slice() == b"hello"),
|
|
"C did not receive 'hello', got {:?}",
|
|
c_plaintexts
|
|
);
|
|
|
|
local
|
|
.run_until(cmd_send(
|
|
&b_state,
|
|
&server,
|
|
&ca_cert,
|
|
"localhost",
|
|
None,
|
|
true,
|
|
"hi",
|
|
None,
|
|
))
|
|
.await?;
|
|
|
|
sleep(Duration::from_millis(200)).await;
|
|
|
|
let a_plaintexts = local
|
|
.run_until(receive_pending_plaintexts(
|
|
&creator_state,
|
|
&server,
|
|
&ca_cert,
|
|
"localhost",
|
|
1500,
|
|
None,
|
|
))
|
|
.await?;
|
|
let c_plaintexts2 = local
|
|
.run_until(receive_pending_plaintexts(
|
|
&c_state,
|
|
&server,
|
|
&ca_cert,
|
|
"localhost",
|
|
1500,
|
|
None,
|
|
))
|
|
.await?;
|
|
anyhow::ensure!(
|
|
a_plaintexts.iter().any(|p| p.as_slice() == b"hi"),
|
|
"A did not receive 'hi', got {:?}",
|
|
a_plaintexts
|
|
);
|
|
anyhow::ensure!(
|
|
c_plaintexts2.iter().any(|p| p.as_slice() == b"hi"),
|
|
"C did not receive 'hi', got {:?}",
|
|
c_plaintexts2
|
|
);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Login should refuse if the presented identity key does not match the registered key.
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn e2e_login_rejects_mismatched_identity() -> anyhow::Result<()> {
|
|
ensure_rustls_provider();
|
|
let _auth = AUTH_LOCK.lock().unwrap();
|
|
|
|
let temp = TempDir::new()?;
|
|
let base = temp.path();
|
|
|
|
let (server, ca_cert, _child) = spawn_server(base, &[]);
|
|
|
|
wait_for_health(&server, &ca_cert, "localhost").await?;
|
|
|
|
init_auth(ClientAuth::from_parts("devtoken".to_string(), None));
|
|
|
|
let local = tokio::task::LocalSet::new();
|
|
let state_path = base.join("user.bin");
|
|
|
|
// Register and persist state (includes identity key binding).
|
|
local
|
|
.run_until(cmd_register_state(
|
|
&state_path,
|
|
&server,
|
|
&ca_cert,
|
|
"localhost",
|
|
None,
|
|
))
|
|
.await?;
|
|
|
|
// Register the user with the bound identity so login can enforce mismatches.
|
|
let state_bytes = std::fs::read(&state_path)?;
|
|
let stored_state: StoredStateCompat = bincode::deserialize(&state_bytes)?;
|
|
let identity_hex = hex::encode(
|
|
IdentityKeypair::from_seed(stored_state.identity_seed).public_key_bytes(),
|
|
);
|
|
|
|
local
|
|
.run_until(cmd_register_user(
|
|
&server,
|
|
&ca_cert,
|
|
"localhost",
|
|
"user1",
|
|
"pass",
|
|
Some(&identity_hex),
|
|
))
|
|
.await?;
|
|
|
|
// Craft an unrelated identity key and attempt login with it.
|
|
let mut bogus_identity = [0u8; 32];
|
|
rand::thread_rng().fill_bytes(&mut bogus_identity);
|
|
let bogus_hex = hex::encode(bogus_identity);
|
|
|
|
let result = local
|
|
.run_until(cmd_login(
|
|
&server,
|
|
&ca_cert,
|
|
"localhost",
|
|
"user1",
|
|
"pass",
|
|
Some(&bogus_hex),
|
|
None,
|
|
None,
|
|
))
|
|
.await;
|
|
|
|
match result {
|
|
Ok(_) => anyhow::bail!("login unexpectedly succeeded with mismatched identity"),
|
|
Err(e) => {
|
|
let msg = format!("{e:#}");
|
|
anyhow::ensure!(
|
|
msg.contains("identity") || msg.contains("E016"),
|
|
"login failed but not for identity mismatch: {msg}"
|
|
);
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Sealed Sender: enqueue with valid token (no identity binding) succeeds; recipient can fetch.
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn e2e_sealed_sender_enqueue_then_fetch() -> anyhow::Result<()> {
|
|
ensure_rustls_provider();
|
|
|
|
let temp = TempDir::new()?;
|
|
let base = temp.path();
|
|
|
|
let (server, ca_cert, _child) = spawn_server(base, &["--sealed-sender"]);
|
|
|
|
wait_for_health(&server, &ca_cert, "localhost").await?;
|
|
init_auth(ClientAuth::from_parts("devtoken".to_string(), None));
|
|
|
|
let local = tokio::task::LocalSet::new();
|
|
let state_path = base.join("recipient.bin");
|
|
|
|
local
|
|
.run_until(cmd_register_state(
|
|
&state_path,
|
|
&server,
|
|
&ca_cert,
|
|
"localhost",
|
|
None,
|
|
))
|
|
.await?;
|
|
|
|
let state_bytes = std::fs::read(&state_path)?;
|
|
let stored: StoredStateCompat = bincode::deserialize(&state_bytes)?;
|
|
let recipient_key = IdentityKeypair::from_seed(stored.identity_seed).public_key_bytes();
|
|
let identity_hex = hex_encode(&recipient_key);
|
|
|
|
local
|
|
.run_until(cmd_register_user(
|
|
&server,
|
|
&ca_cert,
|
|
"localhost",
|
|
"recipient",
|
|
"pass",
|
|
Some(&identity_hex),
|
|
))
|
|
.await?;
|
|
|
|
local
|
|
.run_until(cmd_login(
|
|
&server,
|
|
&ca_cert,
|
|
"localhost",
|
|
"recipient",
|
|
"pass",
|
|
Some(&identity_hex),
|
|
None,
|
|
None,
|
|
))
|
|
.await?;
|
|
|
|
let client = local.run_until(connect_node(&server, &ca_cert, "localhost")).await?;
|
|
local
|
|
.run_until(enqueue(&client, &recipient_key, b"sealed-payload"))
|
|
.await?;
|
|
|
|
let payloads = local
|
|
.run_until(fetch_wait(&client, &recipient_key, 500))
|
|
.await?;
|
|
anyhow::ensure!(
|
|
payloads.len() == 1 && payloads[0].1.as_slice() == b"sealed-payload",
|
|
"expected one payload 'sealed-payload', got {:?}",
|
|
payloads
|
|
);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
// ─── new tests: was_new semantics, resolve_user, DM MLS flow ─────────────────────────────────
|
|
|
|
/// `create_channel` returns `was_new=true` for the first caller and `was_new=false` for the
|
|
/// second, and both callers receive the same stable `channel_id` regardless of argument order.
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn e2e_create_channel_was_new_semantics() -> anyhow::Result<()> {
|
|
ensure_rustls_provider();
|
|
// Holds AUTH_CONTEXT for the duration of this test.
|
|
let _auth = AUTH_LOCK.lock().unwrap();
|
|
|
|
let temp = TempDir::new()?;
|
|
let base = temp.path();
|
|
|
|
// No --sealed-sender: create_channel requires identity-bound session.
|
|
let (server, ca_cert, _child) = spawn_server(base, &[]);
|
|
|
|
wait_for_health(&server, &ca_cert, "localhost").await?;
|
|
|
|
// Register identity states (uses devtoken / allow-insecure for upload_key_package).
|
|
init_auth(ClientAuth::from_parts("devtoken".to_string(), None));
|
|
let local = tokio::task::LocalSet::new();
|
|
|
|
let alice_state = base.join("alice.bin");
|
|
let bob_state = base.join("bob.bin");
|
|
|
|
local
|
|
.run_until(cmd_register_state(&alice_state, &server, &ca_cert, "localhost", None))
|
|
.await?;
|
|
local
|
|
.run_until(cmd_register_state(&bob_state, &server, &ca_cert, "localhost", None))
|
|
.await?;
|
|
|
|
let alice_seed: [u8; 32] = bincode::deserialize::<StoredStateCompat>(&std::fs::read(&alice_state)?)?.identity_seed;
|
|
let bob_seed: [u8; 32] = bincode::deserialize::<StoredStateCompat>(&std::fs::read(&bob_state)?)?.identity_seed;
|
|
let alice_pk = IdentityKeypair::from_seed(alice_seed).public_key_bytes().to_vec();
|
|
let bob_pk = IdentityKeypair::from_seed(bob_seed).public_key_bytes().to_vec();
|
|
let alice_pk_hex = hex_encode(&alice_pk);
|
|
let bob_pk_hex = hex_encode(&bob_pk);
|
|
|
|
// OPAQUE register (unauthenticated — no AUTH_CONTEXT needed).
|
|
local
|
|
.run_until(cmd_register_user(&server, &ca_cert, "localhost", "alice", "pass", Some(&alice_pk_hex)))
|
|
.await?;
|
|
local
|
|
.run_until(cmd_register_user(&server, &ca_cert, "localhost", "bob", "pass", Some(&bob_pk_hex)))
|
|
.await?;
|
|
|
|
// Alice OPAQUE login → identity-bound session.
|
|
let client = local.run_until(connect_node(&server, &ca_cert, "localhost")).await?;
|
|
let session_alice = local
|
|
.run_until(opaque_login(&client, "alice", "pass", &alice_pk))
|
|
.await?;
|
|
init_auth(ClientAuth::from_raw(session_alice, None));
|
|
|
|
let (ch_alice, was_new_alice) = local
|
|
.run_until(create_channel(&client, &bob_pk))
|
|
.await?;
|
|
|
|
anyhow::ensure!(was_new_alice, "Alice's create_channel must return was_new=true");
|
|
anyhow::ensure!(ch_alice.len() == 16, "channel_id must be 16 bytes");
|
|
|
|
// Bob OPAQUE login → identity-bound session.
|
|
let session_bob = local
|
|
.run_until(opaque_login(&client, "bob", "pass", &bob_pk))
|
|
.await?;
|
|
init_auth(ClientAuth::from_raw(session_bob, None));
|
|
|
|
let (ch_bob, was_new_bob) = local
|
|
.run_until(create_channel(&client, &alice_pk))
|
|
.await?;
|
|
|
|
anyhow::ensure!(!was_new_bob, "Bob's create_channel must return was_new=false (channel already exists)");
|
|
anyhow::ensure!(
|
|
ch_alice == ch_bob,
|
|
"Both callers must receive the same channel_id (got alice={} bob={})",
|
|
hex_encode(&ch_alice),
|
|
hex_encode(&ch_bob)
|
|
);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// `resolve_user` returns the identity key when the user registered WITH one,
|
|
/// and returns `None` when the user registered WITHOUT an identity key.
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn e2e_resolve_user_requires_identity_key_binding() -> anyhow::Result<()> {
|
|
ensure_rustls_provider();
|
|
let _auth = AUTH_LOCK.lock().unwrap();
|
|
|
|
let temp = TempDir::new()?;
|
|
let base = temp.path();
|
|
|
|
let (server, ca_cert, _child) = spawn_server(base, &[]);
|
|
|
|
wait_for_health(&server, &ca_cert, "localhost").await?;
|
|
init_auth(ClientAuth::from_parts("devtoken".to_string(), None));
|
|
|
|
let local = tokio::task::LocalSet::new();
|
|
|
|
// Generate Alice's identity (bound) and Bob's identity (unbound).
|
|
let alice_state = base.join("alice.bin");
|
|
local
|
|
.run_until(cmd_register_state(&alice_state, &server, &ca_cert, "localhost", None))
|
|
.await?;
|
|
|
|
let alice_seed = bincode::deserialize::<StoredStateCompat>(&std::fs::read(&alice_state)?)?.identity_seed;
|
|
let alice_pk = IdentityKeypair::from_seed(alice_seed).public_key_bytes().to_vec();
|
|
let alice_pk_hex = hex_encode(&alice_pk);
|
|
|
|
// Alice registers WITH identity key.
|
|
local
|
|
.run_until(cmd_register_user(&server, &ca_cert, "localhost", "alice", "pass", Some(&alice_pk_hex)))
|
|
.await?;
|
|
|
|
// Bob registers WITHOUT identity key.
|
|
local
|
|
.run_until(cmd_register_user(&server, &ca_cert, "localhost", "bob", "pass", None))
|
|
.await?;
|
|
|
|
// resolve_user needs a valid auth context (devtoken is sufficient — just needs bearer token).
|
|
let client = local.run_until(connect_node(&server, &ca_cert, "localhost")).await?;
|
|
|
|
let alice_resolved = local
|
|
.run_until(resolve_user(&client, "alice"))
|
|
.await?;
|
|
|
|
anyhow::ensure!(
|
|
alice_resolved == Some(alice_pk.clone()),
|
|
"resolve_user('alice') must return alice's identity key, got {:?}",
|
|
alice_resolved.as_ref().map(|k| hex_encode(k))
|
|
);
|
|
|
|
let bob_resolved = local
|
|
.run_until(resolve_user(&client, "bob"))
|
|
.await?;
|
|
|
|
anyhow::ensure!(
|
|
bob_resolved.is_none(),
|
|
"resolve_user('bob') must return None (no identity key bound), got {:?}",
|
|
bob_resolved.as_ref().map(|k| hex_encode(k))
|
|
);
|
|
|
|
let ghost_resolved = local
|
|
.run_until(resolve_user(&client, "nonexistent"))
|
|
.await?;
|
|
|
|
anyhow::ensure!(
|
|
ghost_resolved.is_none(),
|
|
"resolve_user('nonexistent') must return None"
|
|
);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Both Alice and Bob call `/dm` on each other (simultaneous DM initiation).
|
|
/// Only the first caller (was_new=true) creates the MLS group and sends a Welcome.
|
|
/// The second caller (was_new=false) joins via the Welcome.
|
|
/// After joining, Alice sends a message and Bob decrypts it with no epoch error.
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn e2e_bidirectional_dm_mls_no_epoch_conflict() -> anyhow::Result<()> {
|
|
ensure_rustls_provider();
|
|
let _auth = AUTH_LOCK.lock().unwrap();
|
|
|
|
let temp = TempDir::new()?;
|
|
let base = temp.path();
|
|
|
|
// No --sealed-sender: tests the production path where enqueue requires identity session.
|
|
let (server, ca_cert, _child) = spawn_server(base, &[]);
|
|
|
|
wait_for_health(&server, &ca_cert, "localhost").await?;
|
|
|
|
// Register state files (uploads KeyPackages + hybrid keys) using devtoken.
|
|
init_auth(ClientAuth::from_parts("devtoken".to_string(), None));
|
|
let local = tokio::task::LocalSet::new();
|
|
|
|
let alice_state = base.join("alice.bin");
|
|
let bob_state = base.join("bob.bin");
|
|
|
|
local
|
|
.run_until(cmd_register_state(&alice_state, &server, &ca_cert, "localhost", None))
|
|
.await?;
|
|
local
|
|
.run_until(cmd_register_state(&bob_state, &server, &ca_cert, "localhost", None))
|
|
.await?;
|
|
|
|
let alice_seed = bincode::deserialize::<StoredStateCompat>(&std::fs::read(&alice_state)?)?.identity_seed;
|
|
let bob_seed = bincode::deserialize::<StoredStateCompat>(&std::fs::read(&bob_state)?)?.identity_seed;
|
|
let alice_pk = IdentityKeypair::from_seed(alice_seed).public_key_bytes().to_vec();
|
|
let bob_pk = IdentityKeypair::from_seed(bob_seed).public_key_bytes().to_vec();
|
|
let alice_pk_hex = hex_encode(&alice_pk);
|
|
let bob_pk_hex = hex_encode(&bob_pk);
|
|
|
|
// OPAQUE register both users.
|
|
local
|
|
.run_until(cmd_register_user(&server, &ca_cert, "localhost", "alice", "pass", Some(&alice_pk_hex)))
|
|
.await?;
|
|
local
|
|
.run_until(cmd_register_user(&server, &ca_cert, "localhost", "bob", "pass", Some(&bob_pk_hex)))
|
|
.await?;
|
|
|
|
// Alice logs in and calls create_channel → must get was_new=true.
|
|
let client = local.run_until(connect_node(&server, &ca_cert, "localhost")).await?;
|
|
let session_alice = local
|
|
.run_until(opaque_login(&client, "alice", "pass", &alice_pk))
|
|
.await?;
|
|
init_auth(ClientAuth::from_raw(session_alice.clone(), None));
|
|
|
|
let (channel_id, was_new_alice) = local
|
|
.run_until(create_channel(&client, &bob_pk))
|
|
.await?;
|
|
anyhow::ensure!(was_new_alice, "Alice must get was_new=true");
|
|
|
|
// Alice creates MLS group (channel_id as group name) and invites Bob.
|
|
local
|
|
.run_until(cmd_create_group(&alice_state, &server, &hex_encode(&channel_id), None))
|
|
.await?;
|
|
local
|
|
.run_until(cmd_invite(&alice_state, &server, &ca_cert, "localhost", &bob_pk_hex, None))
|
|
.await?;
|
|
|
|
// Bob logs in and calls create_channel → must get was_new=false with same channel_id.
|
|
let session_bob = local
|
|
.run_until(opaque_login(&client, "bob", "pass", &bob_pk))
|
|
.await?;
|
|
init_auth(ClientAuth::from_raw(session_bob.clone(), None));
|
|
|
|
let (channel_id_bob, was_new_bob) = local
|
|
.run_until(create_channel(&client, &alice_pk))
|
|
.await?;
|
|
anyhow::ensure!(!was_new_bob, "Bob must get was_new=false (Alice created first)");
|
|
anyhow::ensure!(
|
|
channel_id == channel_id_bob,
|
|
"Both sides must see the same channel_id"
|
|
);
|
|
|
|
// Bob joins via Welcome that Alice sent (was_new=false path: no group creation, just join).
|
|
local
|
|
.run_until(cmd_join(&bob_state, &server, &ca_cert, "localhost", None))
|
|
.await?;
|
|
|
|
// Alice sends "hello" to Bob.
|
|
init_auth(ClientAuth::from_raw(session_alice, None));
|
|
local
|
|
.run_until(cmd_send(
|
|
&alice_state,
|
|
&server,
|
|
&ca_cert,
|
|
"localhost",
|
|
Some(&bob_pk_hex),
|
|
false,
|
|
"hello from alice",
|
|
None,
|
|
))
|
|
.await?;
|
|
|
|
// Bob receives and decrypts — no epoch conflict.
|
|
init_auth(ClientAuth::from_raw(session_bob, None));
|
|
let plaintexts = local
|
|
.run_until(receive_pending_plaintexts(
|
|
&bob_state,
|
|
&server,
|
|
&ca_cert,
|
|
"localhost",
|
|
1000,
|
|
None,
|
|
))
|
|
.await?;
|
|
|
|
anyhow::ensure!(
|
|
plaintexts.iter().any(|p| p.as_slice() == b"hello from alice"),
|
|
"Bob must decrypt Alice's message without epoch error; got {:?}",
|
|
plaintexts.iter().map(|p| String::from_utf8_lossy(p).to_string()).collect::<Vec<_>>()
|
|
);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Send 10 messages alternating Alice→Bob and Bob→Alice through an MLS DM channel.
|
|
/// All messages must decrypt successfully, proving epoch stays in sync.
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn e2e_dm_multi_message_epoch_synchronized() -> anyhow::Result<()> {
|
|
ensure_rustls_provider();
|
|
let _auth = AUTH_LOCK.lock().unwrap();
|
|
|
|
let temp = TempDir::new()?;
|
|
let base = temp.path();
|
|
|
|
let (server, ca_cert, _child) = spawn_server(base, &[]);
|
|
|
|
wait_for_health(&server, &ca_cert, "localhost").await?;
|
|
init_auth(ClientAuth::from_parts("devtoken".to_string(), None));
|
|
|
|
let local = tokio::task::LocalSet::new();
|
|
|
|
let alice_state = base.join("alice.bin");
|
|
let bob_state = base.join("bob.bin");
|
|
|
|
local
|
|
.run_until(cmd_register_state(&alice_state, &server, &ca_cert, "localhost", None))
|
|
.await?;
|
|
local
|
|
.run_until(cmd_register_state(&bob_state, &server, &ca_cert, "localhost", None))
|
|
.await?;
|
|
|
|
let alice_seed = bincode::deserialize::<StoredStateCompat>(&std::fs::read(&alice_state)?)?.identity_seed;
|
|
let bob_seed = bincode::deserialize::<StoredStateCompat>(&std::fs::read(&bob_state)?)?.identity_seed;
|
|
let alice_pk = IdentityKeypair::from_seed(alice_seed).public_key_bytes().to_vec();
|
|
let bob_pk = IdentityKeypair::from_seed(bob_seed).public_key_bytes().to_vec();
|
|
let alice_pk_hex = hex_encode(&alice_pk);
|
|
let bob_pk_hex = hex_encode(&bob_pk);
|
|
|
|
local
|
|
.run_until(cmd_register_user(&server, &ca_cert, "localhost", "alice", "pass", Some(&alice_pk_hex)))
|
|
.await?;
|
|
local
|
|
.run_until(cmd_register_user(&server, &ca_cert, "localhost", "bob", "pass", Some(&bob_pk_hex)))
|
|
.await?;
|
|
|
|
let client = local.run_until(connect_node(&server, &ca_cert, "localhost")).await?;
|
|
|
|
// Alice creates the DM channel and invites Bob.
|
|
let session_alice = local
|
|
.run_until(opaque_login(&client, "alice", "pass", &alice_pk))
|
|
.await?;
|
|
init_auth(ClientAuth::from_raw(session_alice.clone(), None));
|
|
|
|
let (channel_id, was_new) = local
|
|
.run_until(create_channel(&client, &bob_pk))
|
|
.await?;
|
|
anyhow::ensure!(was_new, "first create_channel must be was_new=true");
|
|
|
|
local
|
|
.run_until(cmd_create_group(&alice_state, &server, &hex_encode(&channel_id), None))
|
|
.await?;
|
|
local
|
|
.run_until(cmd_invite(&alice_state, &server, &ca_cert, "localhost", &bob_pk_hex, None))
|
|
.await?;
|
|
|
|
// Bob joins.
|
|
let session_bob = local
|
|
.run_until(opaque_login(&client, "bob", "pass", &bob_pk))
|
|
.await?;
|
|
init_auth(ClientAuth::from_raw(session_bob.clone(), None));
|
|
local
|
|
.run_until(cmd_join(&bob_state, &server, &ca_cert, "localhost", None))
|
|
.await?;
|
|
|
|
// 10 messages: Alice→Bob on even, Bob→Alice on odd.
|
|
for i in 0u32..10 {
|
|
let msg = format!("msg_{i}");
|
|
if i % 2 == 0 {
|
|
// Alice sends to Bob.
|
|
init_auth(ClientAuth::from_raw(session_alice.clone(), None));
|
|
local
|
|
.run_until(cmd_send(
|
|
&alice_state,
|
|
&server,
|
|
&ca_cert,
|
|
"localhost",
|
|
Some(&bob_pk_hex),
|
|
false,
|
|
&msg,
|
|
None,
|
|
))
|
|
.await?;
|
|
|
|
// Bob receives.
|
|
init_auth(ClientAuth::from_raw(session_bob.clone(), None));
|
|
let plaintexts = local
|
|
.run_until(receive_pending_plaintexts(
|
|
&bob_state,
|
|
&server,
|
|
&ca_cert,
|
|
"localhost",
|
|
1000,
|
|
None,
|
|
))
|
|
.await?;
|
|
anyhow::ensure!(
|
|
plaintexts.iter().any(|p| p.as_slice() == msg.as_bytes()),
|
|
"Bob did not receive '{msg}' at iteration {i}; got {:?}",
|
|
plaintexts.iter().map(|p| String::from_utf8_lossy(p).to_string()).collect::<Vec<_>>()
|
|
);
|
|
} else {
|
|
// Bob sends to Alice.
|
|
init_auth(ClientAuth::from_raw(session_bob.clone(), None));
|
|
local
|
|
.run_until(cmd_send(
|
|
&bob_state,
|
|
&server,
|
|
&ca_cert,
|
|
"localhost",
|
|
Some(&alice_pk_hex),
|
|
false,
|
|
&msg,
|
|
None,
|
|
))
|
|
.await?;
|
|
|
|
// Alice receives.
|
|
init_auth(ClientAuth::from_raw(session_alice.clone(), None));
|
|
let plaintexts = local
|
|
.run_until(receive_pending_plaintexts(
|
|
&alice_state,
|
|
&server,
|
|
&ca_cert,
|
|
"localhost",
|
|
1000,
|
|
None,
|
|
))
|
|
.await?;
|
|
anyhow::ensure!(
|
|
plaintexts.iter().any(|p| p.as_slice() == msg.as_bytes()),
|
|
"Alice did not receive '{msg}' at iteration {i}; got {:?}",
|
|
plaintexts.iter().map(|p| String::from_utf8_lossy(p).to_string()).collect::<Vec<_>>()
|
|
);
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
// ─── new tests: round-trip message delivery, key rotation, oversized payload, multi-party ─────
|
|
|
|
/// Helper: load a state file and reconstruct a GroupMember with its keystore.
|
|
fn load_member(state_path: &std::path::Path) -> (GroupMember, Option<HybridKeypair>) {
|
|
let bytes = std::fs::read(state_path).expect("read state");
|
|
let state: quicproquo_client::client::state::StoredState =
|
|
bincode::deserialize(&bytes).expect("decode state");
|
|
state.into_parts(state_path).expect("into_parts")
|
|
}
|
|
|
|
/// Helper: save a GroupMember back to its state file.
|
|
fn save_member(state_path: &std::path::Path, member: &GroupMember, hybrid: Option<&HybridKeypair>) {
|
|
quicproquo_client::client::state::save_state(state_path, member, hybrid, None)
|
|
.expect("save state");
|
|
}
|
|
|
|
/// Basic happy-path: Alice registers, Bob registers, Alice creates a DM channel + MLS group,
|
|
/// invites Bob, sends "ping", Bob fetches and decrypts "ping".
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn e2e_message_delivery_round_trip() -> anyhow::Result<()> {
|
|
ensure_rustls_provider();
|
|
let _auth = AUTH_LOCK.lock().unwrap();
|
|
|
|
let temp = TempDir::new()?;
|
|
let base = temp.path();
|
|
|
|
let (server, ca_cert, _child) = spawn_server(base, &[]);
|
|
wait_for_health(&server, &ca_cert, "localhost").await?;
|
|
init_auth(ClientAuth::from_parts("devtoken".to_string(), None));
|
|
|
|
let local = tokio::task::LocalSet::new();
|
|
|
|
let alice_state = base.join("alice.bin");
|
|
let bob_state = base.join("bob.bin");
|
|
|
|
// Register identity states (KeyPackage + hybrid key upload).
|
|
local
|
|
.run_until(cmd_register_state(&alice_state, &server, &ca_cert, "localhost", None))
|
|
.await?;
|
|
local
|
|
.run_until(cmd_register_state(&bob_state, &server, &ca_cert, "localhost", None))
|
|
.await?;
|
|
|
|
let alice_seed = bincode::deserialize::<StoredStateCompat>(&std::fs::read(&alice_state)?)?.identity_seed;
|
|
let bob_seed = bincode::deserialize::<StoredStateCompat>(&std::fs::read(&bob_state)?)?.identity_seed;
|
|
let alice_pk = IdentityKeypair::from_seed(alice_seed).public_key_bytes().to_vec();
|
|
let bob_pk = IdentityKeypair::from_seed(bob_seed).public_key_bytes().to_vec();
|
|
let alice_pk_hex = hex_encode(&alice_pk);
|
|
let bob_pk_hex = hex_encode(&bob_pk);
|
|
|
|
// OPAQUE register both.
|
|
local
|
|
.run_until(cmd_register_user(&server, &ca_cert, "localhost", "alice", "pass", Some(&alice_pk_hex)))
|
|
.await?;
|
|
local
|
|
.run_until(cmd_register_user(&server, &ca_cert, "localhost", "bob", "pass", Some(&bob_pk_hex)))
|
|
.await?;
|
|
|
|
// Alice logs in, creates DM channel, MLS group, invites Bob.
|
|
let client = local.run_until(connect_node(&server, &ca_cert, "localhost")).await?;
|
|
let session_alice = local
|
|
.run_until(opaque_login(&client, "alice", "pass", &alice_pk))
|
|
.await?;
|
|
init_auth(ClientAuth::from_raw(session_alice.clone(), None));
|
|
|
|
let (channel_id, was_new) = local
|
|
.run_until(create_channel(&client, &bob_pk))
|
|
.await?;
|
|
anyhow::ensure!(was_new, "Alice must get was_new=true");
|
|
|
|
local
|
|
.run_until(cmd_create_group(&alice_state, &server, &hex_encode(&channel_id), None))
|
|
.await?;
|
|
local
|
|
.run_until(cmd_invite(&alice_state, &server, &ca_cert, "localhost", &bob_pk_hex, None))
|
|
.await?;
|
|
|
|
// Bob logs in and joins.
|
|
let session_bob = local
|
|
.run_until(opaque_login(&client, "bob", "pass", &bob_pk))
|
|
.await?;
|
|
init_auth(ClientAuth::from_raw(session_bob.clone(), None));
|
|
local
|
|
.run_until(cmd_join(&bob_state, &server, &ca_cert, "localhost", None))
|
|
.await?;
|
|
|
|
// Alice sends "ping".
|
|
init_auth(ClientAuth::from_raw(session_alice, None));
|
|
local
|
|
.run_until(cmd_send(
|
|
&alice_state,
|
|
&server,
|
|
&ca_cert,
|
|
"localhost",
|
|
Some(&bob_pk_hex),
|
|
false,
|
|
"ping",
|
|
None,
|
|
))
|
|
.await?;
|
|
|
|
// Bob receives and decrypts.
|
|
init_auth(ClientAuth::from_raw(session_bob, None));
|
|
let plaintexts = local
|
|
.run_until(receive_pending_plaintexts(
|
|
&bob_state,
|
|
&server,
|
|
&ca_cert,
|
|
"localhost",
|
|
1500,
|
|
None,
|
|
))
|
|
.await?;
|
|
|
|
anyhow::ensure!(
|
|
plaintexts.iter().any(|p| p.as_slice() == b"ping"),
|
|
"Bob did not receive 'ping'; got {:?}",
|
|
plaintexts.iter().map(|p| String::from_utf8_lossy(p).to_string()).collect::<Vec<_>>()
|
|
);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Alice proposes a self-update (MLS key rotation) in a DM group with Bob.
|
|
/// Alice commits the pending proposal and fans out the proposal + commit.
|
|
/// Bob processes them. After rotation, Alice sends a message and Bob decrypts it.
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn e2e_key_rotation_update_path() -> anyhow::Result<()> {
|
|
ensure_rustls_provider();
|
|
let _auth = AUTH_LOCK.lock().unwrap();
|
|
|
|
let temp = TempDir::new()?;
|
|
let base = temp.path();
|
|
|
|
let (server, ca_cert, _child) = spawn_server(base, &["--sealed-sender"]);
|
|
wait_for_health(&server, &ca_cert, "localhost").await?;
|
|
init_auth(ClientAuth::from_parts("devtoken".to_string(), None));
|
|
|
|
let local = tokio::task::LocalSet::new();
|
|
|
|
let alice_state = base.join("alice.bin");
|
|
let bob_state = base.join("bob.bin");
|
|
|
|
local
|
|
.run_until(cmd_register_state(&alice_state, &server, &ca_cert, "localhost", None))
|
|
.await?;
|
|
local
|
|
.run_until(cmd_register_state(&bob_state, &server, &ca_cert, "localhost", None))
|
|
.await?;
|
|
|
|
let alice_seed = bincode::deserialize::<StoredStateCompat>(&std::fs::read(&alice_state)?)?.identity_seed;
|
|
let bob_seed = bincode::deserialize::<StoredStateCompat>(&std::fs::read(&bob_state)?)?.identity_seed;
|
|
let alice_pk = IdentityKeypair::from_seed(alice_seed).public_key_bytes().to_vec();
|
|
let bob_pk = IdentityKeypair::from_seed(bob_seed).public_key_bytes().to_vec();
|
|
let bob_pk_hex = hex_encode(&bob_pk);
|
|
|
|
// Set up the MLS group: Alice creates, invites Bob, Bob joins.
|
|
local
|
|
.run_until(cmd_create_group(&alice_state, &server, "rotation-test", None))
|
|
.await?;
|
|
local
|
|
.run_until(cmd_invite(&alice_state, &server, &ca_cert, "localhost", &bob_pk_hex, None))
|
|
.await?;
|
|
local
|
|
.run_until(cmd_join(&bob_state, &server, &ca_cert, "localhost", None))
|
|
.await?;
|
|
|
|
// --- Key rotation via core MLS API ---
|
|
// Load Alice's GroupMember, propose self-update, commit, save.
|
|
let (mut alice_member, alice_hybrid) = load_member(&alice_state);
|
|
let proposal = alice_member.propose_self_update()?;
|
|
let (commit, _welcome) = alice_member.commit_pending_proposals()?;
|
|
save_member(&alice_state, &alice_member, alice_hybrid.as_ref());
|
|
|
|
// Fan out proposal + commit to Bob via enqueue.
|
|
let client = local.run_until(connect_node(&server, &ca_cert, "localhost")).await?;
|
|
local.run_until(enqueue(&client, &bob_pk, &proposal)).await?;
|
|
local.run_until(enqueue(&client, &bob_pk, &commit)).await?;
|
|
|
|
// Bob fetches and processes the proposal + commit.
|
|
let (mut bob_member, bob_hybrid) = load_member(&bob_state);
|
|
let mut raw_payloads =
|
|
local.run_until(fetch_wait(&client, &bob_pk, 1000)).await?;
|
|
raw_payloads.sort_by_key(|(seq, _)| *seq);
|
|
|
|
for (_, payload) in &raw_payloads {
|
|
match bob_member.receive_message(payload) {
|
|
Ok(ReceivedMessage::StateChanged) => {}
|
|
Ok(other) => anyhow::bail!("expected StateChanged, got {other:?}"),
|
|
Err(e) => anyhow::bail!("Bob failed to process rotation message: {e}"),
|
|
}
|
|
}
|
|
save_member(&bob_state, &bob_member, bob_hybrid.as_ref());
|
|
|
|
// After rotation, Alice sends a message and Bob decrypts it.
|
|
local
|
|
.run_until(cmd_send(
|
|
&alice_state,
|
|
&server,
|
|
&ca_cert,
|
|
"localhost",
|
|
Some(&bob_pk_hex),
|
|
false,
|
|
"post-rotation",
|
|
None,
|
|
))
|
|
.await?;
|
|
|
|
let plaintexts = local
|
|
.run_until(receive_pending_plaintexts(
|
|
&bob_state,
|
|
&server,
|
|
&ca_cert,
|
|
"localhost",
|
|
1500,
|
|
None,
|
|
))
|
|
.await?;
|
|
|
|
anyhow::ensure!(
|
|
plaintexts.iter().any(|p| p.as_slice() == b"post-rotation"),
|
|
"Bob did not receive 'post-rotation' after key rotation; got {:?}",
|
|
plaintexts.iter().map(|p| String::from_utf8_lossy(p).to_string()).collect::<Vec<_>>()
|
|
);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Sending a payload larger than 5 MB must be rejected by the server with E006.
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn e2e_hook_rejects_oversized_payload() -> anyhow::Result<()> {
|
|
ensure_rustls_provider();
|
|
|
|
let temp = TempDir::new()?;
|
|
let base = temp.path();
|
|
|
|
let (server, ca_cert, _child) = spawn_server(base, &["--sealed-sender"]);
|
|
wait_for_health(&server, &ca_cert, "localhost").await?;
|
|
init_auth(ClientAuth::from_parts("devtoken".to_string(), None));
|
|
|
|
let local = tokio::task::LocalSet::new();
|
|
|
|
// Register a recipient so enqueue has a valid target.
|
|
let state_path = base.join("recipient.bin");
|
|
local
|
|
.run_until(cmd_register_state(&state_path, &server, &ca_cert, "localhost", None))
|
|
.await?;
|
|
let state_bytes = std::fs::read(&state_path)?;
|
|
let stored: StoredStateCompat = bincode::deserialize(&state_bytes)?;
|
|
let recipient_key = IdentityKeypair::from_seed(stored.identity_seed).public_key_bytes();
|
|
|
|
let client = local.run_until(connect_node(&server, &ca_cert, "localhost")).await?;
|
|
|
|
// Payload just over the 5 MB limit (5 * 1024 * 1024 + 1 bytes).
|
|
let oversized = vec![0xAAu8; 5 * 1024 * 1024 + 1];
|
|
let result = local.run_until(enqueue(&client, &recipient_key, &oversized)).await;
|
|
|
|
match result {
|
|
Ok(_) => anyhow::bail!("enqueue with oversized payload should have been rejected"),
|
|
Err(e) => {
|
|
let msg = format!("{e:#}");
|
|
anyhow::ensure!(
|
|
msg.contains("payload exceeds max size") || msg.contains("E006"),
|
|
"expected E006 / payload size error, got: {msg}"
|
|
);
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Three-party group: Alice creates, invites Bob, then Carol.
|
|
/// All three exchange messages and verify cross-member delivery:
|
|
/// Alice -> group, Bob -> group, Carol -> group.
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn e2e_multi_party_group() -> anyhow::Result<()> {
|
|
ensure_rustls_provider();
|
|
|
|
let temp = TempDir::new()?;
|
|
let base = temp.path();
|
|
|
|
let (server, ca_cert, _child) = spawn_server(base, &["--sealed-sender"]);
|
|
wait_for_health(&server, &ca_cert, "localhost").await?;
|
|
init_auth(ClientAuth::from_parts("devtoken".to_string(), None));
|
|
|
|
let local = tokio::task::LocalSet::new();
|
|
|
|
let alice_state = base.join("alice.bin");
|
|
let bob_state = base.join("bob.bin");
|
|
let carol_state = base.join("carol.bin");
|
|
|
|
// Register all three.
|
|
local
|
|
.run_until(cmd_register_state(&alice_state, &server, &ca_cert, "localhost", None))
|
|
.await?;
|
|
local
|
|
.run_until(cmd_register_state(&bob_state, &server, &ca_cert, "localhost", None))
|
|
.await?;
|
|
local
|
|
.run_until(cmd_register_state(&carol_state, &server, &ca_cert, "localhost", None))
|
|
.await?;
|
|
|
|
let bob_bytes = std::fs::read(&bob_state)?;
|
|
let bob_compat: StoredStateCompat = bincode::deserialize(&bob_bytes)?;
|
|
let bob_pk_hex = hex_encode(&IdentityKeypair::from_seed(bob_compat.identity_seed).public_key_bytes());
|
|
|
|
let carol_bytes = std::fs::read(&carol_state)?;
|
|
let carol_compat: StoredStateCompat = bincode::deserialize(&carol_bytes)?;
|
|
let carol_pk_hex = hex_encode(&IdentityKeypair::from_seed(carol_compat.identity_seed).public_key_bytes());
|
|
|
|
// Alice creates group, invites Bob then Carol.
|
|
local
|
|
.run_until(cmd_create_group(&alice_state, &server, "trio", None))
|
|
.await?;
|
|
local
|
|
.run_until(cmd_invite(&alice_state, &server, &ca_cert, "localhost", &bob_pk_hex, None))
|
|
.await?;
|
|
local
|
|
.run_until(cmd_invite(&alice_state, &server, &ca_cert, "localhost", &carol_pk_hex, None))
|
|
.await?;
|
|
|
|
// Bob and Carol join.
|
|
local
|
|
.run_until(cmd_join(&bob_state, &server, &ca_cert, "localhost", None))
|
|
.await?;
|
|
local
|
|
.run_until(cmd_join(&carol_state, &server, &ca_cert, "localhost", None))
|
|
.await?;
|
|
|
|
// Alice sends to all members.
|
|
local
|
|
.run_until(cmd_send(&alice_state, &server, &ca_cert, "localhost", None, true, "from-alice", None))
|
|
.await?;
|
|
|
|
sleep(Duration::from_millis(200)).await;
|
|
|
|
let bob_pt = local
|
|
.run_until(receive_pending_plaintexts(&bob_state, &server, &ca_cert, "localhost", 1500, None))
|
|
.await?;
|
|
let carol_pt = local
|
|
.run_until(receive_pending_plaintexts(&carol_state, &server, &ca_cert, "localhost", 1500, None))
|
|
.await?;
|
|
|
|
anyhow::ensure!(
|
|
bob_pt.iter().any(|p| p.as_slice() == b"from-alice"),
|
|
"Bob did not receive 'from-alice'; got {:?}",
|
|
bob_pt.iter().map(|p| String::from_utf8_lossy(p).to_string()).collect::<Vec<_>>()
|
|
);
|
|
anyhow::ensure!(
|
|
carol_pt.iter().any(|p| p.as_slice() == b"from-alice"),
|
|
"Carol did not receive 'from-alice'; got {:?}",
|
|
carol_pt.iter().map(|p| String::from_utf8_lossy(p).to_string()).collect::<Vec<_>>()
|
|
);
|
|
|
|
// Bob sends to all.
|
|
local
|
|
.run_until(cmd_send(&bob_state, &server, &ca_cert, "localhost", None, true, "from-bob", None))
|
|
.await?;
|
|
|
|
sleep(Duration::from_millis(200)).await;
|
|
|
|
let alice_pt = local
|
|
.run_until(receive_pending_plaintexts(&alice_state, &server, &ca_cert, "localhost", 1500, None))
|
|
.await?;
|
|
let carol_pt2 = local
|
|
.run_until(receive_pending_plaintexts(&carol_state, &server, &ca_cert, "localhost", 1500, None))
|
|
.await?;
|
|
|
|
anyhow::ensure!(
|
|
alice_pt.iter().any(|p| p.as_slice() == b"from-bob"),
|
|
"Alice did not receive 'from-bob'; got {:?}",
|
|
alice_pt.iter().map(|p| String::from_utf8_lossy(p).to_string()).collect::<Vec<_>>()
|
|
);
|
|
anyhow::ensure!(
|
|
carol_pt2.iter().any(|p| p.as_slice() == b"from-bob"),
|
|
"Carol did not receive 'from-bob'; got {:?}",
|
|
carol_pt2.iter().map(|p| String::from_utf8_lossy(p).to_string()).collect::<Vec<_>>()
|
|
);
|
|
|
|
// Carol sends to all.
|
|
local
|
|
.run_until(cmd_send(&carol_state, &server, &ca_cert, "localhost", None, true, "from-carol", None))
|
|
.await?;
|
|
|
|
sleep(Duration::from_millis(200)).await;
|
|
|
|
let alice_pt2 = local
|
|
.run_until(receive_pending_plaintexts(&alice_state, &server, &ca_cert, "localhost", 1500, None))
|
|
.await?;
|
|
let bob_pt2 = local
|
|
.run_until(receive_pending_plaintexts(&bob_state, &server, &ca_cert, "localhost", 1500, None))
|
|
.await?;
|
|
|
|
anyhow::ensure!(
|
|
alice_pt2.iter().any(|p| p.as_slice() == b"from-carol"),
|
|
"Alice did not receive 'from-carol'; got {:?}",
|
|
alice_pt2.iter().map(|p| String::from_utf8_lossy(p).to_string()).collect::<Vec<_>>()
|
|
);
|
|
anyhow::ensure!(
|
|
bob_pt2.iter().any(|p| p.as_slice() == b"from-carol"),
|
|
"Bob did not receive 'from-carol'; got {:?}",
|
|
bob_pt2.iter().map(|p| String::from_utf8_lossy(p).to_string()).collect::<Vec<_>>()
|
|
);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
// ─── blob upload / download tests ────────────────────────────────────────────
|
|
|
|
/// Upload a 2 KB blob, download it in full, then download a partial slice.
|
|
/// Verifies SHA-256 integrity, blobId, and partial-range semantics.
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn e2e_file_upload_download() -> anyhow::Result<()> {
|
|
ensure_rustls_provider();
|
|
|
|
let temp = TempDir::new()?;
|
|
let base = temp.path();
|
|
|
|
let (server, ca_cert, _child) = spawn_server(base, &["--sealed-sender"]);
|
|
wait_for_health(&server, &ca_cert, "localhost").await?;
|
|
init_auth(ClientAuth::from_parts("devtoken".to_string(), None));
|
|
|
|
let local = tokio::task::LocalSet::new();
|
|
|
|
// Register Alice (needed so the auth context is valid).
|
|
let alice_state = base.join("alice.bin");
|
|
local
|
|
.run_until(cmd_register_state(&alice_state, &server, &ca_cert, "localhost", None))
|
|
.await?;
|
|
|
|
let client = local.run_until(connect_node(&server, &ca_cert, "localhost")).await?;
|
|
|
|
// Build 2 KB of known data.
|
|
let pattern = b"hello-world-file-test\n";
|
|
let repeat_count = (2048 + pattern.len() - 1) / pattern.len();
|
|
let file_data: Vec<u8> = pattern.iter().copied().cycle().take(repeat_count * pattern.len()).collect();
|
|
let file_data = &file_data[..2048]; // exactly 2 KB
|
|
|
|
// Compute SHA-256.
|
|
let hash: [u8; 32] = Sha256::digest(file_data).into();
|
|
|
|
// ── Upload ──
|
|
let blob_id = local
|
|
.run_until(async {
|
|
let mut req = client.upload_blob_request();
|
|
{
|
|
let mut p = req.get();
|
|
let mut auth = p.reborrow().init_auth();
|
|
quicproquo_client::client::rpc::set_auth(&mut auth)?;
|
|
p.set_blob_hash(&hash);
|
|
p.set_chunk(file_data);
|
|
p.set_offset(0);
|
|
p.set_total_size(file_data.len() as u64);
|
|
p.set_mime_type("application/octet-stream");
|
|
}
|
|
let resp = req.send().promise.await
|
|
.map_err(|e| anyhow::anyhow!("uploadBlob RPC failed: {e}"))?;
|
|
let blob_id = resp.get()
|
|
.map_err(|e| anyhow::anyhow!("uploadBlob bad response: {e}"))?
|
|
.get_blob_id()
|
|
.map_err(|e| anyhow::anyhow!("uploadBlob missing blobId: {e}"))?
|
|
.to_vec();
|
|
Ok::<Vec<u8>, anyhow::Error>(blob_id)
|
|
})
|
|
.await?;
|
|
|
|
anyhow::ensure!(
|
|
blob_id == hash,
|
|
"blobId must equal SHA-256 hash; got {} vs {}",
|
|
hex_encode(&blob_id),
|
|
hex_encode(&hash)
|
|
);
|
|
|
|
// ── Full download ──
|
|
let (chunk, total_size) = local
|
|
.run_until(async {
|
|
let mut req = client.download_blob_request();
|
|
{
|
|
let mut p = req.get();
|
|
let mut auth = p.reborrow().init_auth();
|
|
quicproquo_client::client::rpc::set_auth(&mut auth)?;
|
|
p.set_blob_id(&blob_id);
|
|
p.set_offset(0);
|
|
p.set_length(file_data.len() as u32);
|
|
}
|
|
let resp = req.send().promise.await
|
|
.map_err(|e| anyhow::anyhow!("downloadBlob RPC failed: {e}"))?;
|
|
let r = resp.get()
|
|
.map_err(|e| anyhow::anyhow!("downloadBlob bad response: {e}"))?;
|
|
let chunk = r.get_chunk()
|
|
.map_err(|e| anyhow::anyhow!("downloadBlob missing chunk: {e}"))?
|
|
.to_vec();
|
|
let total = r.get_total_size();
|
|
Ok::<(Vec<u8>, u64), anyhow::Error>((chunk, total))
|
|
})
|
|
.await?;
|
|
|
|
anyhow::ensure!(
|
|
total_size == file_data.len() as u64,
|
|
"totalSize mismatch: {} vs {}",
|
|
total_size,
|
|
file_data.len()
|
|
);
|
|
anyhow::ensure!(
|
|
chunk == file_data,
|
|
"downloaded data does not match uploaded data (len {} vs {})",
|
|
chunk.len(),
|
|
file_data.len()
|
|
);
|
|
|
|
// ── Partial download: offset=100, length=200 ──
|
|
let partial = local
|
|
.run_until(async {
|
|
let mut req = client.download_blob_request();
|
|
{
|
|
let mut p = req.get();
|
|
let mut auth = p.reborrow().init_auth();
|
|
quicproquo_client::client::rpc::set_auth(&mut auth)?;
|
|
p.set_blob_id(&blob_id);
|
|
p.set_offset(100);
|
|
p.set_length(200);
|
|
}
|
|
let resp = req.send().promise.await
|
|
.map_err(|e| anyhow::anyhow!("downloadBlob partial RPC failed: {e}"))?;
|
|
let r = resp.get()
|
|
.map_err(|e| anyhow::anyhow!("downloadBlob partial bad response: {e}"))?;
|
|
let chunk = r.get_chunk()
|
|
.map_err(|e| anyhow::anyhow!("downloadBlob partial missing chunk: {e}"))?
|
|
.to_vec();
|
|
Ok::<Vec<u8>, anyhow::Error>(chunk)
|
|
})
|
|
.await?;
|
|
|
|
anyhow::ensure!(
|
|
partial == &file_data[100..300],
|
|
"partial download [100..300] does not match expected slice"
|
|
);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Uploading with a blobHash that does not match the chunk data must return E026.
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn e2e_blob_hash_mismatch() -> anyhow::Result<()> {
|
|
ensure_rustls_provider();
|
|
|
|
let temp = TempDir::new()?;
|
|
let base = temp.path();
|
|
|
|
let (server, ca_cert, _child) = spawn_server(base, &["--sealed-sender"]);
|
|
wait_for_health(&server, &ca_cert, "localhost").await?;
|
|
init_auth(ClientAuth::from_parts("devtoken".to_string(), None));
|
|
|
|
let local = tokio::task::LocalSet::new();
|
|
|
|
let alice_state = base.join("alice.bin");
|
|
local
|
|
.run_until(cmd_register_state(&alice_state, &server, &ca_cert, "localhost", None))
|
|
.await?;
|
|
|
|
let client = local.run_until(connect_node(&server, &ca_cert, "localhost")).await?;
|
|
|
|
// Chunk data.
|
|
let chunk_data = b"some file content for mismatch test";
|
|
|
|
// Wrong hash (all zeros — will not match any real data).
|
|
let wrong_hash = [0u8; 32];
|
|
|
|
let result = local
|
|
.run_until(async {
|
|
let mut req = client.upload_blob_request();
|
|
{
|
|
let mut p = req.get();
|
|
let mut auth = p.reborrow().init_auth();
|
|
quicproquo_client::client::rpc::set_auth(&mut auth)?;
|
|
p.set_blob_hash(&wrong_hash);
|
|
p.set_chunk(&chunk_data[..]);
|
|
p.set_offset(0);
|
|
p.set_total_size(chunk_data.len() as u64);
|
|
p.set_mime_type("text/plain");
|
|
}
|
|
let resp = req.send().promise.await
|
|
.map_err(|e| anyhow::anyhow!("uploadBlob RPC: {e}"))?;
|
|
resp.get()
|
|
.map_err(|e| anyhow::anyhow!("uploadBlob response: {e}"))?;
|
|
Ok::<(), anyhow::Error>(())
|
|
})
|
|
.await;
|
|
|
|
match result {
|
|
Ok(_) => anyhow::bail!("uploadBlob with wrong hash should have been rejected"),
|
|
Err(e) => {
|
|
let msg = format!("{e:#}");
|
|
anyhow::ensure!(
|
|
msg.contains("E026") || msg.contains("hash") || msg.contains("mismatch"),
|
|
"expected E026 / hash mismatch error, got: {msg}"
|
|
);
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|