/// QUIC/Cap'n Proto connection helpers — same logic as quicnprotochat-client/src/client/rpc.rs /// but with explicit per-call auth instead of a process-global OnceLock. use std::net::SocketAddr; use std::path::Path; use std::sync::Arc; use anyhow::Context; use capnp_rpc::{rpc_twoparty_capnp::Side, twoparty, RpcSystem}; use quinn::{ClientConfig, Endpoint}; use quinn_proto::crypto::rustls::QuicClientConfig; use rustls::{pki_types::CertificateDer, ClientConfig as RustlsClientConfig, RootCertStore}; use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt}; use quicnprotochat_core::HybridPublicKey; use quicnprotochat_proto::node_capnp::{auth, node_service}; /// Establish a QUIC/TLS connection and return a `NodeService` RPC client. /// /// Must be called inside a `LocalSet` because `capnp-rpc` is `!Send`. pub async fn connect_node( server: &str, ca_cert: &Path, server_name: &str, ) -> anyhow::Result { let addr: SocketAddr = server .parse() .with_context(|| format!("server must be host:port, got {server}"))?; let cert_bytes = std::fs::read(ca_cert) .with_context(|| format!("read ca_cert {ca_cert:?}"))?; let mut roots = RootCertStore::empty(); roots .add(CertificateDer::from(cert_bytes)) .context("add root cert")?; let mut tls = RustlsClientConfig::builder() .with_root_certificates(roots) .with_no_client_auth(); tls.alpn_protocols = vec![b"capnp".to_vec()]; let crypto = QuicClientConfig::try_from(tls) .map_err(|e| anyhow::anyhow!("invalid client TLS config: {e}"))?; let bind_addr: SocketAddr = "0.0.0.0:0".parse().context("parse client bind address")?; let mut endpoint = Endpoint::client(bind_addr)?; endpoint.set_default_client_config(ClientConfig::new(Arc::new(crypto))); let connection = endpoint .connect(addr, server_name) .context("quic connect init")? .await .context("quic connect failed")?; let (send, recv) = connection.open_bi().await.context("open bi stream")?; let network = twoparty::VatNetwork::new( recv.compat(), send.compat_write(), Side::Client, Default::default(), ); let mut rpc_system = RpcSystem::new(Box::new(network), None); let client: node_service::Client = rpc_system.bootstrap(Side::Server); tokio::task::spawn_local(rpc_system); Ok(client) } /// Populate an auth field from explicit token/device bytes (no global state). pub fn set_auth(auth: &mut auth::Builder<'_>, token: &[u8], device: &[u8]) { auth.set_version(1); auth.set_access_token(token); auth.set_device_id(device); } pub async fn upload_key_package( client: &node_service::Client, identity_key: &[u8], package: &[u8], token: &[u8], device: &[u8], ) -> anyhow::Result<()> { let mut req = client.upload_key_package_request(); { let mut p = req.get(); p.set_identity_key(identity_key); p.set_package(package); let mut auth = p.reborrow().init_auth(); set_auth(&mut auth, token, device); } let resp = req .send() .promise .await .context("upload_key_package RPC failed")?; let server_fp = resp .get() .context("upload_key_package: bad response")? .get_fingerprint() .context("upload_key_package: missing fingerprint")? .to_vec(); let local_fp = super::state::sha256(package); anyhow::ensure!(server_fp == local_fp, "fingerprint mismatch"); Ok(()) } pub async fn fetch_key_package( client: &node_service::Client, identity_key: &[u8], token: &[u8], device: &[u8], ) -> anyhow::Result> { let mut req = client.fetch_key_package_request(); { let mut p = req.get(); p.set_identity_key(identity_key); let mut auth = p.reborrow().init_auth(); set_auth(&mut auth, token, device); } let resp = req .send() .promise .await .context("fetch_key_package RPC failed")?; Ok(resp .get() .context("fetch_key_package: bad response")? .get_package() .context("fetch_key_package: missing package")? .to_vec()) } pub async fn enqueue( client: &node_service::Client, recipient_key: &[u8], payload: &[u8], token: &[u8], device: &[u8], ) -> anyhow::Result<()> { let mut req = client.enqueue_request(); { let mut p = req.get(); p.set_recipient_key(recipient_key); p.set_payload(payload); p.set_channel_id(&[]); p.set_version(1); let mut auth = p.reborrow().init_auth(); set_auth(&mut auth, token, device); } req.send().promise.await.context("enqueue RPC failed")?; Ok(()) } pub async fn fetch_all( client: &node_service::Client, recipient_key: &[u8], token: &[u8], device: &[u8], ) -> anyhow::Result>> { let mut req = client.fetch_request(); { let mut p = req.get(); p.set_recipient_key(recipient_key); p.set_channel_id(&[]); p.set_version(1); p.set_limit(0); let mut auth = p.reborrow().init_auth(); set_auth(&mut auth, token, device); } let resp = req.send().promise.await.context("fetch RPC failed")?; let list = resp .get() .context("fetch: bad response")? .get_payloads() .context("fetch: missing payloads")?; let mut out = Vec::with_capacity(list.len() as usize); for i in 0..list.len() { out.push(list.get(i).context("fetch: payload read error")?.to_vec()); } Ok(out) } pub async fn fetch_wait( client: &node_service::Client, recipient_key: &[u8], timeout_ms: u64, token: &[u8], device: &[u8], ) -> anyhow::Result>> { let mut req = client.fetch_wait_request(); { let mut p = req.get(); p.set_recipient_key(recipient_key); p.set_timeout_ms(timeout_ms); p.set_channel_id(&[]); p.set_version(1); p.set_limit(0); let mut auth = p.reborrow().init_auth(); set_auth(&mut auth, token, device); } let resp = req.send().promise.await.context("fetch_wait RPC failed")?; let list = resp .get() .context("fetch_wait: bad response")? .get_payloads() .context("fetch_wait: missing payloads")?; let mut out = Vec::with_capacity(list.len() as usize); for i in 0..list.len() { out.push(list.get(i).context("fetch_wait: payload read error")?.to_vec()); } Ok(out) } pub async fn upload_hybrid_key( client: &node_service::Client, identity_key: &[u8], hybrid_pk: &HybridPublicKey, token: &[u8], device: &[u8], ) -> anyhow::Result<()> { let mut req = client.upload_hybrid_key_request(); { let mut p = req.get(); p.set_identity_key(identity_key); p.set_hybrid_public_key(&hybrid_pk.to_bytes()); let mut auth = p.reborrow().init_auth(); set_auth(&mut auth, token, device); } req.send() .promise .await .context("upload_hybrid_key RPC failed")?; Ok(()) } pub async fn fetch_hybrid_key( client: &node_service::Client, identity_key: &[u8], token: &[u8], device: &[u8], ) -> anyhow::Result> { let mut req = client.fetch_hybrid_key_request(); { let mut p = req.get(); p.set_identity_key(identity_key); let mut auth = p.reborrow().init_auth(); set_auth(&mut auth, token, device); } let resp = req .send() .promise .await .context("fetch_hybrid_key RPC failed")?; let pk_bytes = resp .get() .context("fetch_hybrid_key: bad response")? .get_hybrid_public_key() .context("fetch_hybrid_key: missing field")? .to_vec(); if pk_bytes.is_empty() { return Ok(None); } let pk = HybridPublicKey::from_bytes(&pk_bytes).context("invalid hybrid public key")?; Ok(Some(pk)) } /// Attempt hybrid decryption; returns the inner payload on success. pub fn try_hybrid_decrypt( hybrid_kp: Option<&quicnprotochat_core::HybridKeypair>, payload: &[u8], ) -> anyhow::Result> { let kp = hybrid_kp.ok_or_else(|| anyhow::anyhow!("hybrid key required for decryption"))?; quicnprotochat_core::hybrid_decrypt(kp, payload).map_err(|e| anyhow::anyhow!("{e}")) } pub fn current_timestamp_ms() -> u64 { std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_millis() as u64 }