//! Integration test: full MLS group flow via Authentication Service + Delivery Service. //! //! Steps: //! - Start in-process AS and DS (Noise_XX + capnp-rpc) on a LocalSet. //! - Alice and Bob generate KeyPackages and upload to AS. //! - Alice fetches Bob's KeyPackage, creates a group, and invites Bob. //! - Welcome + application messages traverse the Delivery Service. //! - Both sides decrypt and confirm plaintext payloads. use std::{collections::VecDeque, sync::Arc, time::Duration}; use anyhow::Context; use capnp::capability::Promise; use capnp_rpc::{rpc_twoparty_capnp::Side, twoparty, RpcSystem}; use dashmap::DashMap; use quicnprotochat_core::{ handshake_initiator, handshake_responder, GroupMember, IdentityKeypair, NoiseKeypair, }; use quicnprotochat_proto::{auth_capnp::authentication_service, delivery_capnp::delivery_service}; use sha2::{Digest, Sha256}; use tokio::net::{TcpListener, TcpStream}; use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt}; type KeyPackageStore = Arc, VecDeque>>>; type DeliveryStore = Arc, VecDeque>>>; /// Full Alice↔Bob MLS round-trip against live AS + DS. #[tokio::test] async fn mls_group_end_to_end_round_trip() -> anyhow::Result<()> { let local = tokio::task::LocalSet::new(); local .run_until(async move { let server_keypair = Arc::new(NoiseKeypair::generate()); let kp_store: KeyPackageStore = Arc::new(DashMap::new()); let ds_store: DeliveryStore = Arc::new(DashMap::new()); let as_addr = spawn_as_server(2, Arc::clone(&server_keypair), Arc::clone(&kp_store)).await; let ds_addr = spawn_ds_server(2, Arc::clone(&server_keypair), Arc::clone(&ds_store)).await; tokio::time::sleep(Duration::from_millis(10)).await; let alice_id = Arc::new(IdentityKeypair::generate()); let bob_id = Arc::new(IdentityKeypair::generate()); let mut alice = GroupMember::new(Arc::clone(&alice_id)); let mut bob = GroupMember::new(Arc::clone(&bob_id)); let alice_kp = alice.generate_key_package()?; let bob_kp = bob.generate_key_package()?; let alice_as = connect_as(as_addr, &NoiseKeypair::generate()).await?; let bob_as = connect_as(as_addr, &NoiseKeypair::generate()).await?; upload_key_package(&alice_as, &alice_id.public_key_bytes(), &alice_kp).await?; upload_key_package(&bob_as, &bob_id.public_key_bytes(), &bob_kp).await?; let fetched_bob_kp = fetch_key_package(&alice_as, &bob_id.public_key_bytes()).await?; anyhow::ensure!( !fetched_bob_kp.is_empty(), "AS must return Bob's KeyPackage" ); alice.create_group(b"m3-integration")?; let (_commit, welcome) = alice.add_member(&fetched_bob_kp)?; let alice_ds = connect_ds(ds_addr, &NoiseKeypair::generate()).await?; let bob_ds = connect_ds(ds_addr, &NoiseKeypair::generate()).await?; enqueue(&alice_ds, &bob_id.public_key_bytes(), &welcome).await?; let welcome_payloads = fetch_all(&bob_ds, &bob_id.public_key_bytes()).await?; let welcome_bytes = welcome_payloads .first() .cloned() .context("welcome must be present")?; bob.join_group(&welcome_bytes)?; let ct_ab = alice.send_message(b"hello bob")?; enqueue(&alice_ds, &bob_id.public_key_bytes(), &ct_ab).await?; let bob_msgs = fetch_all(&bob_ds, &bob_id.public_key_bytes()).await?; let ab_plaintext = bob .receive_message(bob_msgs.first().context("missing alice→bob payload")?)? .context("alice→bob must be application message")?; assert_eq!(ab_plaintext, b"hello bob"); let ct_ba = bob.send_message(b"hello alice")?; enqueue(&bob_ds, &alice_id.public_key_bytes(), &ct_ba).await?; let alice_msgs = fetch_all(&alice_ds, &alice_id.public_key_bytes()).await?; let ba_plaintext = alice .receive_message(alice_msgs.first().context("missing bob→alice payload")?)? .context("bob→alice must be application message")?; assert_eq!(ba_plaintext, b"hello alice"); Ok(()) }) .await } // ── Test helpers ──────────────────────────────────────────────────────────── async fn spawn_as_server( n_connections: usize, keypair: Arc, store: KeyPackageStore, ) -> std::net::SocketAddr { let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let addr = listener.local_addr().unwrap(); tokio::task::spawn_local(async move { for _ in 0..n_connections { let (stream, _) = listener.accept().await.unwrap(); let kp = Arc::clone(&keypair); let st = Arc::clone(&store); tokio::task::spawn_local(async move { serve_as_connection(stream, kp, st).await; }); } }); addr } async fn serve_as_connection( stream: TcpStream, keypair: Arc, store: KeyPackageStore, ) { let transport = handshake_responder(stream, &keypair).await.unwrap(); let (reader, writer) = transport.into_capnp_io(); let network = twoparty::VatNetwork::new( reader.compat(), writer.compat_write(), Side::Server, Default::default(), ); let service: authentication_service::Client = capnp_rpc::new_client(AuthService { store }); RpcSystem::new(Box::new(network), Some(service.client)) .await .ok(); } async fn spawn_ds_server( n_connections: usize, keypair: Arc, store: DeliveryStore, ) -> std::net::SocketAddr { let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let addr = listener.local_addr().unwrap(); tokio::task::spawn_local(async move { for _ in 0..n_connections { let (stream, _) = listener.accept().await.unwrap(); let kp = Arc::clone(&keypair); let st = Arc::clone(&store); tokio::task::spawn_local(async move { serve_ds_connection(stream, kp, st).await; }); } }); addr } async fn serve_ds_connection(stream: TcpStream, keypair: Arc, store: DeliveryStore) { let transport = handshake_responder(stream, &keypair).await.unwrap(); let (reader, writer) = transport.into_capnp_io(); let network = twoparty::VatNetwork::new( reader.compat(), writer.compat_write(), Side::Server, Default::default(), ); let service: delivery_service::Client = capnp_rpc::new_client(DeliveryService { store }); RpcSystem::new(Box::new(network), Some(service.client)) .await .ok(); } async fn connect_as( addr: std::net::SocketAddr, noise_keypair: &NoiseKeypair, ) -> anyhow::Result { let stream = TcpStream::connect(addr) .await .with_context(|| format!("could not connect to AS at {addr}"))?; let transport = handshake_initiator(stream, noise_keypair) .await .context("Noise handshake to AS failed")?; let (reader, writer) = transport.into_capnp_io(); let network = twoparty::VatNetwork::new( reader.compat(), writer.compat_write(), Side::Client, Default::default(), ); let mut rpc = RpcSystem::new(Box::new(network), None); let client: authentication_service::Client = rpc.bootstrap(Side::Server); tokio::task::spawn_local(rpc); Ok(client) } async fn connect_ds( addr: std::net::SocketAddr, noise_keypair: &NoiseKeypair, ) -> anyhow::Result { let stream = TcpStream::connect(addr) .await .with_context(|| format!("could not connect to DS at {addr}"))?; let transport = handshake_initiator(stream, noise_keypair) .await .context("Noise handshake to DS failed")?; let (reader, writer) = transport.into_capnp_io(); let network = twoparty::VatNetwork::new( reader.compat(), writer.compat_write(), Side::Client, Default::default(), ); let mut rpc = RpcSystem::new(Box::new(network), None); let client: delivery_service::Client = rpc.bootstrap(Side::Server); tokio::task::spawn_local(rpc); Ok(client) } async fn upload_key_package( as_client: &authentication_service::Client, identity_key: &[u8], package: &[u8], ) -> anyhow::Result<()> { let mut req = as_client.upload_key_package_request(); req.get().set_identity_key(identity_key); req.get().set_package(package); let resp = req .send() .promise .await .context("upload_key_package RPC failed")?; let server_fp = resp .get() .context("upload_key_package: bad response")? .get_fingerprint() .context("upload_key_package: missing fingerprint")? .to_vec(); let local_fp: Vec = Sha256::digest(package).to_vec(); anyhow::ensure!(server_fp == local_fp, "fingerprint mismatch"); Ok(()) } async fn fetch_key_package( as_client: &authentication_service::Client, identity_key: &[u8], ) -> anyhow::Result> { let mut req = as_client.fetch_key_package_request(); req.get().set_identity_key(identity_key); let resp = req .send() .promise .await .context("fetch_key_package RPC failed")?; let pkg = resp .get() .context("fetch_key_package: bad response")? .get_package() .context("fetch_key_package: missing package")? .to_vec(); Ok(pkg) } async fn enqueue( ds_client: &delivery_service::Client, recipient_key: &[u8], payload: &[u8], ) -> anyhow::Result<()> { let mut req = ds_client.enqueue_request(); req.get().set_recipient_key(recipient_key); req.get().set_payload(payload); req.send().promise.await.context("enqueue RPC failed")?; Ok(()) } async fn fetch_all( ds_client: &delivery_service::Client, recipient_key: &[u8], ) -> anyhow::Result>> { let mut req = ds_client.fetch_request(); req.get().set_recipient_key(recipient_key); let resp = req.send().promise.await.context("fetch RPC failed")?; let list = resp .get() .context("fetch: bad response")? .get_payloads() .context("fetch: missing payloads")?; let mut payloads = Vec::with_capacity(list.len() as usize); for i in 0..list.len() { payloads.push(list.get(i).context("fetch: payload read failed")?.to_vec()); } Ok(payloads) } // ── Inline service implementations ─────────────────────────────────────────- struct AuthService { store: KeyPackageStore, } impl authentication_service::Server for AuthService { fn upload_key_package( &mut self, params: authentication_service::UploadKeyPackageParams, mut results: authentication_service::UploadKeyPackageResults, ) -> Promise<(), capnp::Error> { let params = match params.get() { Ok(p) => p, Err(e) => return Promise::err(e), }; let ik = match params.get_identity_key() { Ok(v) => v.to_vec(), Err(e) => return Promise::err(capnp::Error::failed(format!("{e}"))), }; let pkg = match params.get_package() { Ok(v) => v.to_vec(), Err(e) => return Promise::err(capnp::Error::failed(format!("{e}"))), }; let fp: Vec = Sha256::digest(&pkg).to_vec(); self.store.entry(ik).or_default().push_back(pkg); results.get().set_fingerprint(&fp); Promise::ok(()) } fn fetch_key_package( &mut self, params: authentication_service::FetchKeyPackageParams, mut results: authentication_service::FetchKeyPackageResults, ) -> Promise<(), capnp::Error> { let ik = match params.get() { Ok(p) => match p.get_identity_key() { Ok(v) => v.to_vec(), Err(e) => return Promise::err(capnp::Error::failed(format!("{e}"))), }, Err(e) => return Promise::err(capnp::Error::failed(format!("{e}"))), }; let pkg = self .store .get_mut(&ik) .and_then(|mut q| q.pop_front()) .unwrap_or_default(); results.get().set_package(&pkg); Promise::ok(()) } } struct DeliveryService { store: DeliveryStore, } impl delivery_service::Server for DeliveryService { fn enqueue( &mut self, params: delivery_service::EnqueueParams, _results: delivery_service::EnqueueResults, ) -> Promise<(), capnp::Error> { let params = match params.get() { Ok(p) => p, Err(e) => return Promise::err(e), }; let recipient = match params.get_recipient_key() { Ok(v) => v.to_vec(), Err(e) => return Promise::err(capnp::Error::failed(format!("{e}"))), }; let payload = match params.get_payload() { Ok(v) => v.to_vec(), Err(e) => return Promise::err(capnp::Error::failed(format!("{e}"))), }; self.store.entry(recipient).or_default().push_back(payload); Promise::ok(()) } fn fetch( &mut self, params: delivery_service::FetchParams, mut results: delivery_service::FetchResults, ) -> Promise<(), capnp::Error> { let recipient = match params.get() { Ok(p) => match p.get_recipient_key() { Ok(v) => v.to_vec(), Err(e) => return Promise::err(capnp::Error::failed(format!("{e}"))), }, Err(e) => return Promise::err(capnp::Error::failed(format!("{e}"))), }; let messages: Vec> = self .store .get_mut(&recipient) .map(|mut q| q.drain(..).collect()) .unwrap_or_default(); let mut list = results.get().init_payloads(messages.len() as u32); for (i, msg) in messages.iter().enumerate() { list.set(i as u32, msg); } Promise::ok(()) } }