From 011ff541bbc95b0a889d08cb717d28a791f67f50 Mon Sep 17 00:00:00 2001 From: Christian Nennemann Date: Wed, 4 Mar 2026 12:39:15 +0100 Subject: [PATCH] =?UTF-8?q?feat(sdk):=20messaging=20pipeline=20=E2=80=94?= =?UTF-8?q?=20send/receive=20with=20MLS,=20sealed=20sender,=20hybrid=20KEM?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Full send pipeline: serialize_chat → MLS encrypt → sealed sender → per-recipient hybrid wrap → batch/individual enqueue via v2 RPC. Full receive pipeline: fetch/fetch_wait → sort by seq → hybrid unwrap → MLS decrypt → unseal → parse AppMessage. Includes retry loop for multi-epoch batches. --- crates/quicproquo-sdk/src/lib.rs | 5 +- crates/quicproquo-sdk/src/messaging.rs | 373 +++++++++++++++++++++++++ 2 files changed, 377 insertions(+), 1 deletion(-) create mode 100644 crates/quicproquo-sdk/src/messaging.rs diff --git a/crates/quicproquo-sdk/src/lib.rs b/crates/quicproquo-sdk/src/lib.rs index 87d36d6..3c82d45 100644 --- a/crates/quicproquo-sdk/src/lib.rs +++ b/crates/quicproquo-sdk/src/lib.rs @@ -3,11 +3,14 @@ //! Provides `QpqClient` — a single entry point for connecting, authenticating, //! sending/receiving messages, and subscribing to real-time events. +pub mod auth; pub mod client; pub mod config; pub mod conversation; pub mod devices; -pub mod events; pub mod error; +pub mod events; pub mod keys; +pub mod messaging; +pub mod state; pub mod users; diff --git a/crates/quicproquo-sdk/src/messaging.rs b/crates/quicproquo-sdk/src/messaging.rs new file mode 100644 index 0000000..5bbff47 --- /dev/null +++ b/crates/quicproquo-sdk/src/messaging.rs @@ -0,0 +1,373 @@ +//! Messaging pipeline: send and receive messages through the MLS + sealed sender +//! + hybrid KEM stack. +//! +//! This module wraps the full encryption pipeline: +//! 1. **Send**: serialize → MLS encrypt → sealed sender → hybrid wrap → enqueue +//! 2. **Receive**: fetch → hybrid unwrap → MLS decrypt → unseal → parse + +use bytes::Bytes; +use prost::Message; +use tracing::debug; + +use quicproquo_core::{ + AppMessage, GroupMember, HybridKeypair, HybridPublicKey, IdentityKeypair, ReceivedMessage, +}; +use quicproquo_proto::method_ids; +use quicproquo_proto::qpq::v1::{ + BatchEnqueueRequest, BatchEnqueueResponse, EnqueueRequest, EnqueueResponse, FetchRequest, + FetchResponse, FetchWaitRequest, FetchWaitResponse, +}; +use quicproquo_rpc::client::RpcClient; + +use crate::error::SdkError; + +// ── Types ───────────────────────────────────────────────────────────────────── + +/// A successfully decrypted application message with sender info. +#[derive(Debug)] +pub struct ReceivedPlaintext { + /// Sender's Ed25519 identity key (from sealed sender envelope). + pub sender_key: [u8; 32], + /// The parsed application message (Chat, Reply, Reaction, etc.). + pub message: AppMessage, + /// Server-assigned sequence number. + pub seq: u64, +} + +/// Default TTL for enqueued messages (24 hours). +const DEFAULT_TTL_SECS: u32 = 86400; + +// ── Send Pipeline ───────────────────────────────────────────────────────────── + +/// Encrypt and send a message to a conversation. +/// +/// Pipeline: generate_message_id → serialize → MLS encrypt → seal → per-recipient +/// hybrid wrap → batch enqueue. +/// +/// Returns the server-assigned sequence numbers (one per recipient). +pub async fn send_message( + rpc: &RpcClient, + member: &mut GroupMember, + identity: &IdentityKeypair, + body: &str, + recipient_keys: &[Vec], + hybrid_keys: &[Option], + channel_id: &[u8], +) -> Result, SdkError> { + // 1. Generate message ID. + let message_id = quicproquo_core::generate_message_id(); + + // 2. Serialize application payload. + let serialized = quicproquo_core::serialize_chat(body.as_bytes(), Some(message_id)) + .map_err(|e| SdkError::Crypto(format!("serialize_chat: {e}")))?; + + // 3. MLS encrypt. + let mls_ciphertext = member + .send_message(&serialized) + .map_err(|e| SdkError::Crypto(format!("MLS encrypt: {e}")))?; + + // 4. Sealed sender wrap. + let sealed = quicproquo_core::sealed_sender::seal(identity, &mls_ciphertext); + + // 5. Per-recipient hybrid wrap + enqueue. + // If all recipients can share the same payload (no hybrid keys), use batch enqueue. + // Otherwise, enqueue individually with per-recipient hybrid wrapping. + let all_no_hybrid = hybrid_keys.iter().all(|k| k.is_none()); + + if all_no_hybrid { + // Batch enqueue — same payload for all recipients. + let seqs = batch_enqueue(rpc, recipient_keys, channel_id, &sealed, DEFAULT_TTL_SECS).await?; + debug!(count = seqs.len(), "batch enqueue complete"); + Ok(seqs) + } else { + // Per-recipient enqueue with optional hybrid wrapping. + let mut seqs = Vec::with_capacity(recipient_keys.len()); + for (i, recipient_key) in recipient_keys.iter().enumerate() { + let payload = if let Some(Some(ref pk)) = hybrid_keys.get(i) { + quicproquo_core::hybrid_encrypt(pk, &sealed, b"", b"") + .map_err(|e| SdkError::Crypto(format!("hybrid encrypt: {e}")))? + } else { + sealed.clone() + }; + let seq = enqueue(rpc, recipient_key, channel_id, &payload, DEFAULT_TTL_SECS).await?; + seqs.push(seq); + } + debug!(count = seqs.len(), "per-recipient enqueue complete"); + Ok(seqs) + } +} + +// ── Receive Pipeline ────────────────────────────────────────────────────────── + +/// Receive and decrypt pending messages from the server. +/// +/// Pipeline: fetch → sort by seq → for each: hybrid unwrap → MLS decrypt → +/// unseal → parse. Includes retry loop for multi-epoch batches where commits +/// must apply before application messages can be decrypted. +pub async fn receive_messages( + rpc: &RpcClient, + member: &mut GroupMember, + my_identity_key: &[u8], + hybrid_kp: Option<&HybridKeypair>, + channel_id: &[u8], +) -> Result, SdkError> { + let payloads = fetch(rpc, my_identity_key, channel_id, 0).await?; + process_payloads(member, hybrid_kp, payloads) +} + +/// Long-poll for new messages with timeout. +/// +/// Same pipeline as [`receive_messages`] but uses the FETCH_WAIT RPC which +/// blocks server-side until messages arrive or the timeout expires. +pub async fn receive_messages_wait( + rpc: &RpcClient, + member: &mut GroupMember, + my_identity_key: &[u8], + hybrid_kp: Option<&HybridKeypair>, + channel_id: &[u8], + timeout_ms: u64, +) -> Result, SdkError> { + let payloads = fetch_wait(rpc, my_identity_key, channel_id, timeout_ms).await?; + process_payloads(member, hybrid_kp, payloads) +} + +/// Shared processing logic for received payloads. +/// +/// Sorts by sequence number, then processes each payload through the decryption +/// pipeline. Uses a retry loop to handle multi-epoch batches where MLS commits +/// must be applied before subsequent application messages can be decrypted. +fn process_payloads( + member: &mut GroupMember, + hybrid_kp: Option<&HybridKeypair>, + mut payloads: Vec<(u64, Vec)>, +) -> Result, SdkError> { + if payloads.is_empty() { + return Ok(Vec::new()); + } + + // Sort by server-assigned sequence number — commits must arrive before + // application messages that depend on the resulting epoch. + payloads.sort_by_key(|(seq, _)| *seq); + + let mut results = Vec::new(); + let mut pending: Vec<(u64, Vec)> = Vec::new(); + + for (seq, raw_payload) in &payloads { + // (a) Try hybrid decrypt; fall back to raw bytes if not hybrid-wrapped. + let mls_bytes = try_hybrid_unwrap(hybrid_kp, raw_payload); + + // (b) MLS decrypt. + match member.receive_message(&mls_bytes) { + Ok(ReceivedMessage::Application(plaintext)) => { + if let Some(rp) = try_unseal_and_parse(*seq, &plaintext) { + results.push(rp); + } + } + Ok(ReceivedMessage::StateChanged | ReceivedMessage::SelfRemoved) => { + debug!(seq, "commit/state-change applied"); + } + Err(_) => { + // MLS decryption failed — likely an epoch mismatch. + // Stash for retry after commits are applied. + pending.push((*seq, mls_bytes)); + } + } + } + + // Retry loop: keep retrying pending messages until no more progress. + // This handles multi-epoch batches where commits must apply first. + loop { + let before = pending.len(); + pending.retain_mut(|(seq, mls_bytes)| { + match member.receive_message(mls_bytes) { + Ok(ReceivedMessage::Application(plaintext)) => { + if let Some(rp) = try_unseal_and_parse(*seq, &plaintext) { + results.push(rp); + } + false // processed + } + Ok(ReceivedMessage::StateChanged | ReceivedMessage::SelfRemoved) => { + debug!(seq, "commit applied (retry)"); + false // processed + } + Err(_) => true, // still pending + } + }); + if pending.len() == before { + break; // no progress — remaining messages are unprocessable + } + } + + if !pending.is_empty() { + debug!( + remaining = pending.len(), + "unprocessable messages after all retries" + ); + } + + Ok(results) +} + +/// Try to hybrid-decrypt a payload. If the caller has a hybrid keypair, attempt +/// decryption. If it fails (payload might not be hybrid-wrapped), return the +/// raw bytes as-is. +fn try_hybrid_unwrap(hybrid_kp: Option<&HybridKeypair>, payload: &[u8]) -> Vec { + if let Some(kp) = hybrid_kp { + match quicproquo_core::hybrid_decrypt(kp, payload, b"", b"") { + Ok(inner) => inner, + Err(_) => payload.to_vec(), // not hybrid-wrapped, use raw + } + } else { + payload.to_vec() + } +} + +/// Unseal (verify sender identity + Ed25519 signature) then parse the inner +/// application message. Returns None on failure (logged as debug). +fn try_unseal_and_parse(seq: u64, plaintext: &[u8]) -> Option { + let (sender_key, inner) = match quicproquo_core::sealed_sender::unseal(plaintext) { + Ok(pair) => pair, + Err(e) => { + debug!(seq, error = %e, "unseal failed"); + return None; + } + }; + + let (_msg_type, message) = match quicproquo_core::parse(&inner) { + Ok(pair) => pair, + Err(e) => { + debug!(seq, error = %e, "app_message parse failed"); + return None; + } + }; + + Some(ReceivedPlaintext { + sender_key, + message, + seq, + }) +} + +// ── RPC Helpers ─────────────────────────────────────────────────────────────── + +/// Enqueue a single payload to one recipient via RPC. +/// +/// Returns the server-assigned sequence number. +pub async fn enqueue( + rpc: &RpcClient, + recipient_key: &[u8], + channel_id: &[u8], + payload: &[u8], + ttl_secs: u32, +) -> Result { + let req = EnqueueRequest { + recipient_key: recipient_key.to_vec(), + payload: payload.to_vec(), + channel_id: channel_id.to_vec(), + ttl_secs, + }; + + let resp_bytes = rpc + .call(method_ids::ENQUEUE, Bytes::from(req.encode_to_vec())) + .await?; + + let resp = EnqueueResponse::decode(resp_bytes) + .map_err(|e| SdkError::Crypto(format!("decode EnqueueResponse: {e}")))?; + + Ok(resp.seq) +} + +/// Batch enqueue the same payload to multiple recipients via RPC. +/// +/// Returns per-recipient sequence numbers. +pub async fn batch_enqueue( + rpc: &RpcClient, + recipient_keys: &[Vec], + channel_id: &[u8], + payload: &[u8], + ttl_secs: u32, +) -> Result, SdkError> { + let req = BatchEnqueueRequest { + recipient_keys: recipient_keys.to_vec(), + payload: payload.to_vec(), + channel_id: channel_id.to_vec(), + ttl_secs, + }; + + let resp_bytes = rpc + .call( + method_ids::BATCH_ENQUEUE, + Bytes::from(req.encode_to_vec()), + ) + .await?; + + let resp = BatchEnqueueResponse::decode(resp_bytes) + .map_err(|e| SdkError::Crypto(format!("decode BatchEnqueueResponse: {e}")))?; + + Ok(resp.seqs) +} + +/// Fetch messages from server (destructive — removes from queue). +/// +/// Returns `(seq, payload)` pairs sorted by sequence number. +pub async fn fetch( + rpc: &RpcClient, + my_identity_key: &[u8], + channel_id: &[u8], + limit: u32, +) -> Result)>, SdkError> { + let req = FetchRequest { + recipient_key: my_identity_key.to_vec(), + channel_id: channel_id.to_vec(), + limit, + }; + + let resp_bytes = rpc + .call(method_ids::FETCH, Bytes::from(req.encode_to_vec())) + .await?; + + let resp = FetchResponse::decode(resp_bytes) + .map_err(|e| SdkError::Crypto(format!("decode FetchResponse: {e}")))?; + + let mut payloads: Vec<(u64, Vec)> = resp + .payloads + .into_iter() + .map(|env| (env.seq, env.data)) + .collect(); + + payloads.sort_by_key(|(seq, _)| *seq); + Ok(payloads) +} + +/// Long-poll fetch: blocks server-side until messages arrive or timeout expires. +/// +/// Returns `(seq, payload)` pairs sorted by sequence number. +async fn fetch_wait( + rpc: &RpcClient, + my_identity_key: &[u8], + channel_id: &[u8], + timeout_ms: u64, +) -> Result)>, SdkError> { + let req = FetchWaitRequest { + recipient_key: my_identity_key.to_vec(), + channel_id: channel_id.to_vec(), + timeout_ms, + limit: 0, // fetch all + }; + + let resp_bytes = rpc + .call(method_ids::FETCH_WAIT, Bytes::from(req.encode_to_vec())) + .await?; + + let resp = FetchWaitResponse::decode(resp_bytes) + .map_err(|e| SdkError::Crypto(format!("decode FetchWaitResponse: {e}")))?; + + let mut payloads: Vec<(u64, Vec)> = resp + .payloads + .into_iter() + .map(|env| (env.seq, env.data)) + .collect(); + + payloads.sort_by_key(|(seq, _)| *seq); + Ok(payloads) +}