//! noiseml-server — Delivery Service + Authentication Service binary. //! //! # M3 scope //! //! The server exposes two Noise_XX-protected Cap'n Proto RPC endpoints: //! //! * **AS** (`--listen`, default `0.0.0.0:7000`) — `AuthenticationService`: //! upload and fetch single-use MLS KeyPackages. //! * **DS** (`--ds-listen`, default `0.0.0.0:7001`) — `DeliveryService`: //! enqueue and fetch opaque payloads (Welcome messages, Commits, Application //! messages) keyed by recipient Ed25519 public key. //! //! # Architecture //! //! ```text //! TcpListener (AS, 7000) TcpListener (DS, 7001) //! └─ Noise_XX handshake └─ Noise_XX handshake //! └─ capnp-rpc VatNetwork (LocalSet, !Send) //! ├─ AuthServiceImpl (shares KeyPackageStore via Arc) //! └─ DeliveryServiceImpl (shares DeliveryStore via Arc) //! ``` //! //! Because `capnp-rpc` uses `Rc>` internally it is `!Send`. //! The entire RPC stack lives on a `tokio::task::LocalSet` spawned per //! connection. //! //! # Configuration //! //! | Env var | CLI flag | Default | //! |---------------------|----------------|-----------------| //! | `NOISEML_LISTEN` | `--listen` | `0.0.0.0:7000` | //! | `NOISEML_DS_LISTEN` | `--ds-listen` | `0.0.0.0:7001` | //! | `RUST_LOG` | — | `info` | use std::{collections::VecDeque, sync::Arc}; use anyhow::Context; use capnp::capability::Promise; use capnp_rpc::{RpcSystem, rpc_twoparty_capnp::Side, twoparty}; use clap::Parser; use dashmap::DashMap; use noiseml_core::{NoiseKeypair, handshake_responder}; use noiseml_proto::{ auth_capnp::authentication_service, delivery_capnp::delivery_service, }; use sha2::{Digest, Sha256}; use tokio::net::{TcpListener, TcpStream}; use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt}; use tracing::Instrument; // ── CLI ─────────────────────────────────────────────────────────────────────── #[derive(Debug, Parser)] #[command( name = "noiseml-server", about = "noiseml Delivery Service + Authentication Service", version )] struct Args { /// TCP address for the Authentication Service. #[arg(long, default_value = "0.0.0.0:7000", env = "NOISEML_LISTEN")] listen: String, /// TCP address for the Delivery Service. #[arg(long, default_value = "0.0.0.0:7001", env = "NOISEML_DS_LISTEN")] ds_listen: String, } // ── Shared store types ──────────────────────────────────────────────────────── /// Thread-safe map from Ed25519 identity public key bytes (32 B) to a queue /// of serialised MLS KeyPackage blobs. /// /// Each KeyPackage is single-use per RFC 9420: `fetch_key_package` removes /// and returns exactly one entry. type KeyPackageStore = Arc, VecDeque>>>; /// Thread-safe message queue for the Delivery Service. /// /// Maps recipient Ed25519 public key (32 bytes) to a FIFO queue of opaque /// payload bytes (TLS-encoded MLS messages or other framed data). type DeliveryStore = Arc, VecDeque>>>; // ── Authentication Service implementation ───────────────────────────────────── /// Cap'n Proto RPC server implementation for `AuthenticationService`. struct AuthServiceImpl { store: KeyPackageStore, } impl authentication_service::Server for AuthServiceImpl { /// Upload a single-use KeyPackage and return its SHA-256 fingerprint. fn upload_key_package( &mut self, params: authentication_service::UploadKeyPackageParams, mut results: authentication_service::UploadKeyPackageResults, ) -> Promise<(), capnp::Error> { let params = params.get().map_err(|e| { capnp::Error::failed(format!("upload_key_package: bad params: {e}")) }); let (identity_key, package) = match params { Ok(p) => { let ik = match p.get_identity_key() { Ok(v) => v.to_vec(), Err(e) => return Promise::err(capnp::Error::failed(format!("{e}"))), }; let pkg = match p.get_package() { Ok(v) => v.to_vec(), Err(e) => return Promise::err(capnp::Error::failed(format!("{e}"))), }; (ik, pkg) } Err(e) => return Promise::err(e), }; if identity_key.len() != 32 { return Promise::err(capnp::Error::failed(format!( "identityKey must be exactly 32 bytes, got {}", identity_key.len() ))); } if package.is_empty() { return Promise::err(capnp::Error::failed( "package must not be empty".to_string(), )); } let fingerprint: Vec = Sha256::digest(&package).to_vec(); self.store .entry(identity_key) .or_default() .push_back(package); results .get() .set_fingerprint(&fingerprint); tracing::debug!( fingerprint = %fmt_hex(&fingerprint[..4]), "KeyPackage uploaded" ); Promise::ok(()) } /// Atomically remove and return one KeyPackage for the given identity key. fn fetch_key_package( &mut self, params: authentication_service::FetchKeyPackageParams, mut results: authentication_service::FetchKeyPackageResults, ) -> Promise<(), capnp::Error> { let identity_key = match params.get() { Ok(p) => match p.get_identity_key() { Ok(v) => v.to_vec(), Err(e) => return Promise::err(capnp::Error::failed(format!("{e}"))), }, Err(e) => return Promise::err(capnp::Error::failed(format!("{e}"))), }; if identity_key.len() != 32 { return Promise::err(capnp::Error::failed(format!( "identityKey must be exactly 32 bytes, got {}", identity_key.len() ))); } // Atomically pop one package from the front of the queue. let package = self .store .get_mut(&identity_key) .and_then(|mut q| q.pop_front()); match package { Some(pkg) => { tracing::debug!( identity = %fmt_hex(&identity_key[..4]), "KeyPackage fetched" ); results.get().set_package(&pkg); } None => { tracing::debug!( identity = %fmt_hex(&identity_key[..4]), "no KeyPackage available for identity" ); // Return empty Data — schema specifies this as the "no package" sentinel. results.get().set_package(&[]); } } Promise::ok(()) } } // ── Delivery Service implementation ─────────────────────────────────────────── /// Cap'n Proto RPC server implementation for `DeliveryService`. /// /// Provides a simple store-and-forward relay for MLS messages: /// * `enqueue` appends an opaque payload to the recipient's FIFO queue. /// * `fetch` atomically drains and returns the entire queue. struct DeliveryServiceImpl { store: DeliveryStore, } impl delivery_service::Server for DeliveryServiceImpl { /// Append `payload` to the queue for `recipient_key`. fn enqueue( &mut self, params: delivery_service::EnqueueParams, _results: delivery_service::EnqueueResults, ) -> Promise<(), capnp::Error> { let p = match params.get() { Ok(p) => p, Err(e) => return Promise::err(capnp::Error::failed(format!("{e}"))), }; let recipient_key = match p.get_recipient_key() { Ok(v) => v.to_vec(), Err(e) => return Promise::err(capnp::Error::failed(format!("{e}"))), }; let payload = match p.get_payload() { Ok(v) => v.to_vec(), Err(e) => return Promise::err(capnp::Error::failed(format!("{e}"))), }; if recipient_key.len() != 32 { return Promise::err(capnp::Error::failed(format!( "recipientKey must be exactly 32 bytes, got {}", recipient_key.len() ))); } if payload.is_empty() { return Promise::err(capnp::Error::failed( "payload must not be empty".to_string(), )); } self.store .entry(recipient_key.clone()) .or_default() .push_back(payload); tracing::debug!( recipient = %fmt_hex(&recipient_key[..4]), "message enqueued" ); Promise::ok(()) } /// Atomically drain and return all queued payloads for `recipient_key`. fn fetch( &mut self, params: delivery_service::FetchParams, mut results: delivery_service::FetchResults, ) -> Promise<(), capnp::Error> { let recipient_key = match params.get() { Ok(p) => match p.get_recipient_key() { Ok(v) => v.to_vec(), Err(e) => return Promise::err(capnp::Error::failed(format!("{e}"))), }, Err(e) => return Promise::err(capnp::Error::failed(format!("{e}"))), }; if recipient_key.len() != 32 { return Promise::err(capnp::Error::failed(format!( "recipientKey must be exactly 32 bytes, got {}", recipient_key.len() ))); } // Atomically drain the entire queue. let messages: Vec> = self .store .get_mut(&recipient_key) .map(|mut q| q.drain(..).collect()) .unwrap_or_default(); tracing::debug!( recipient = %fmt_hex(&recipient_key[..4]), count = messages.len(), "messages fetched" ); let mut list = results.get().init_payloads(messages.len() as u32); for (i, msg) in messages.iter().enumerate() { list.set(i as u32, msg); } Promise::ok(()) } } // ── Entry point ─────────────────────────────────────────────────────────────── #[tokio::main] async fn main() -> anyhow::Result<()> { tracing_subscriber::fmt() .with_env_filter( tracing_subscriber::EnvFilter::try_from_default_env() .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")), ) .init(); let args = Args::parse(); // Generate a fresh static Noise keypair for this server instance. // M6 replaces this with persistent key loading from SQLite. let keypair = Arc::new(NoiseKeypair::generate()); { let pub_bytes = keypair.public_bytes(); tracing::info!( listen = %args.listen, ds_listen = %args.ds_listen, public_key = %fmt_hex(&pub_bytes[..4]), "noiseml-server starting (M3) — keypair is ephemeral" ); } // Shared stores — all connections share the same in-memory maps. let kp_store: KeyPackageStore = Arc::new(DashMap::new()); let ds_store: DeliveryStore = Arc::new(DashMap::new()); let as_listener = TcpListener::bind(&args.listen) .await .with_context(|| format!("failed to bind AS to {}", args.listen))?; let ds_listener = TcpListener::bind(&args.ds_listen) .await .with_context(|| format!("failed to bind DS to {}", args.ds_listen))?; tracing::info!( as_addr = %args.listen, ds_addr = %args.ds_listen, "accepting connections" ); // capnp-rpc is !Send (Rc internals), so all RPC tasks must stay on a // LocalSet. Both accept loops share one LocalSet. let local = tokio::task::LocalSet::new(); local .run_until(async move { loop { tokio::select! { result = as_listener.accept() => { let (stream, peer_addr) = result.context("AS accept failed")?; let keypair = Arc::clone(&keypair); let store = Arc::clone(&kp_store); tokio::task::spawn_local( async move { match handle_as_connection(stream, keypair, store).await { Ok(()) => tracing::debug!("AS connection closed"), Err(e) => tracing::warn!(error = %e, "AS connection error"), } } .instrument(tracing::info_span!("as_conn", peer = %peer_addr)), ); } result = ds_listener.accept() => { let (stream, peer_addr) = result.context("DS accept failed")?; let keypair = Arc::clone(&keypair); let store = Arc::clone(&ds_store); tokio::task::spawn_local( async move { match handle_ds_connection(stream, keypair, store).await { Ok(()) => tracing::debug!("DS connection closed"), Err(e) => tracing::warn!(error = %e, "DS connection error"), } } .instrument(tracing::info_span!("ds_conn", peer = %peer_addr)), ); } } } #[allow(unreachable_code)] Ok::<(), anyhow::Error>(()) }) .await } // ── Per-connection handlers ─────────────────────────────────────────────────── /// Handle one Authentication Service connection. async fn handle_as_connection( stream: TcpStream, keypair: Arc, store: KeyPackageStore, ) -> Result<(), anyhow::Error> { let transport = noise_handshake(stream, &keypair, "AS").await?; let (reader, writer) = transport.into_capnp_io(); let network = twoparty::VatNetwork::new( reader.compat(), writer.compat_write(), Side::Server, Default::default(), ); let service: authentication_service::Client = capnp_rpc::new_client(AuthServiceImpl { store }); RpcSystem::new(Box::new(network), Some(service.client)) .await .map_err(|e| anyhow::anyhow!("AS RPC error: {e}")) } /// Handle one Delivery Service connection. async fn handle_ds_connection( stream: TcpStream, keypair: Arc, store: DeliveryStore, ) -> Result<(), anyhow::Error> { let transport = noise_handshake(stream, &keypair, "DS").await?; let (reader, writer) = transport.into_capnp_io(); let network = twoparty::VatNetwork::new( reader.compat(), writer.compat_write(), Side::Server, Default::default(), ); let service: delivery_service::Client = capnp_rpc::new_client(DeliveryServiceImpl { store }); RpcSystem::new(Box::new(network), Some(service.client)) .await .map_err(|e| anyhow::anyhow!("DS RPC error: {e}")) } /// Perform the Noise_XX handshake and log the remote key. async fn noise_handshake( stream: TcpStream, keypair: &NoiseKeypair, label: &str, ) -> anyhow::Result { let transport = handshake_responder(stream, keypair) .await .map_err(|e| anyhow::anyhow!("{label} Noise handshake failed: {e}"))?; let remote = transport .remote_static_public_key() .map(|k| fmt_hex(&k[..4])) .unwrap_or_else(|| "unknown".into()); tracing::info!(remote_key = %remote, "{label} Noise_XX handshake complete"); Ok(transport) } // ── Helpers ─────────────────────────────────────────────────────────────────── /// Format the first `n` bytes of a slice as lowercase hex with a trailing `…`. fn fmt_hex(bytes: &[u8]) -> String { let hex: String = bytes.iter().map(|b| format!("{b:02x}")).collect(); format!("{hex}…") }