diff --git a/crates/quicproquo-sdk/src/conversation.rs b/crates/quicproquo-sdk/src/conversation.rs index 7265eca..9e530ff 100644 --- a/crates/quicproquo-sdk/src/conversation.rs +++ b/crates/quicproquo-sdk/src/conversation.rs @@ -298,6 +298,78 @@ impl ConversationStore { Ok(()) } + /// Enqueue an outbox entry for offline sending. + pub fn enqueue_outbox( + &self, + conv_id: &ConversationId, + recipient_key: &[u8], + payload: &[u8], + ) -> anyhow::Result<()> { + self.conn.execute( + "INSERT INTO outbox (conversation_id, recipient_key, payload, created_at_ms) + VALUES (?1, ?2, ?3, ?4)", + params![conv_id.0.as_slice(), recipient_key, payload, now_ms() as i64], + )?; + Ok(()) + } + + /// Load all pending outbox entries, oldest first. + pub fn load_pending_outbox(&self) -> anyhow::Result> { + let mut stmt = self.conn.prepare( + "SELECT id, conversation_id, recipient_key, payload, retry_count + FROM outbox WHERE status = 'pending' ORDER BY created_at_ms", + )?; + let rows = stmt.query_map([], |row| { + let id: i64 = row.get(0)?; + let conv_blob: Vec = row.get(1)?; + let recipient_key: Vec = row.get(2)?; + let payload: Vec = row.get(3)?; + let retry_count: u32 = row.get(4)?; + Ok(OutboxEntry { + id, + conversation_id: ConversationId::from_slice(&conv_blob) + .unwrap_or(ConversationId([0; 16])), + recipient_key, + payload, + retry_count, + }) + })?; + let mut entries = Vec::new(); + for row in rows { + entries.push(row?); + } + Ok(entries) + } + + /// Mark an outbox entry as sent. + pub fn mark_outbox_sent(&self, id: i64) -> anyhow::Result<()> { + self.conn.execute( + "UPDATE outbox SET status = 'sent' WHERE id = ?1", + params![id], + )?; + Ok(()) + } + + /// Mark an outbox entry as failed (retryable up to 5 times). + pub fn mark_outbox_failed(&self, id: i64, retry_count: u32) -> anyhow::Result<()> { + let new_status = if retry_count > 5 { "failed" } else { "pending" }; + self.conn.execute( + "UPDATE outbox SET retry_count = ?2, status = ?3 WHERE id = ?1", + params![id, retry_count, new_status], + )?; + Ok(()) + } + + /// Count pending outbox entries. + pub fn count_pending_outbox(&self) -> anyhow::Result { + let count: i64 = self.conn.query_row( + "SELECT COUNT(*) FROM outbox WHERE status = 'pending'", + [], + |row| row.get(0), + )?; + Ok(count as usize) + } + /// Load recent messages (newest first, then reversed to chronological). pub fn load_recent_messages( &self, @@ -454,6 +526,14 @@ fn to_16(v: &[u8]) -> Option<[u8; 16]> { } } +/// Current timestamp in milliseconds since UNIX epoch. +pub fn now_ms() -> u64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64 +} + fn row_to_message( conv_id: &ConversationId, row: &rusqlite::Row<'_>, diff --git a/crates/quicproquo-sdk/src/groups.rs b/crates/quicproquo-sdk/src/groups.rs new file mode 100644 index 0000000..f4a0146 --- /dev/null +++ b/crates/quicproquo-sdk/src/groups.rs @@ -0,0 +1,383 @@ +//! Group lifecycle — create DMs, groups, invite members, join from Welcome. +//! +//! All functions are free-standing (not methods on `QpqClient`) so they can be +//! tested and composed independently. + +use std::sync::Arc; + +use bytes::Bytes; +use prost::Message; +use tracing::debug; + +use quicproquo_core::{ + hybrid_encrypt, GroupMember, HybridKeypair, HybridPublicKey, IdentityKeypair, +}; +use quicproquo_proto::method_ids; +use quicproquo_proto::qpq::v1::{ + CreateChannelRequest, CreateChannelResponse, EnqueueRequest, EnqueueResponse, +}; +use quicproquo_rpc::client::RpcClient; + +use crate::conversation::{ + now_ms, Conversation, ConversationId, ConversationKind, ConversationStore, +}; +use crate::error::SdkError; + +// ── DM (1:1) ──────────────────────────────────────────────────────────────── + +/// Create or join a 1:1 DM channel with a peer. +/// +/// Returns `(conversation_id, was_new)`. +/// - `was_new = true` — caller created the MLS group and sent the Welcome. +/// - `was_new = false` — peer is the MLS initiator; caller should wait for Welcome. +pub async fn create_dm( + rpc: &RpcClient, + conv_store: &ConversationStore, + member: &mut GroupMember, + my_identity: &IdentityKeypair, + peer_key: &[u8], + peer_key_package: &[u8], + hybrid_kp: Option<&HybridKeypair>, + peer_hybrid_pk: Option<&HybridPublicKey>, +) -> Result<(ConversationId, bool), SdkError> { + // 1. Call CREATE_CHANNEL RPC. + let req = CreateChannelRequest { + peer_key: peer_key.to_vec(), + }; + let resp_bytes = rpc + .call(method_ids::CREATE_CHANNEL, Bytes::from(req.encode_to_vec())) + .await?; + let resp = CreateChannelResponse::decode(resp_bytes) + .map_err(|e| SdkError::Crypto(format!("decode CreateChannelResponse: {e}")))?; + + let conv_id = ConversationId::from_slice(&resp.channel_id) + .ok_or_else(|| SdkError::Other(anyhow::anyhow!("server returned invalid channel_id")))?; + + let was_new = resp.was_new; + + if was_new { + // 2a. We are the MLS initiator. + member + .create_group(&resp.channel_id) + .map_err(|e| SdkError::Crypto(format!("create_group: {e}")))?; + + let (_commit, welcome) = member + .add_member(peer_key_package) + .map_err(|e| SdkError::Crypto(format!("add_member: {e}")))?; + + // Optionally hybrid-wrap the welcome. + let payload = wrap_hybrid(hybrid_kp, peer_hybrid_pk, &welcome, member.is_hybrid())?; + + // Enqueue welcome to peer. + enqueue_to_peer(rpc, peer_key, &payload).await?; + + // Save conversation with MLS state. + let member_keys = member.member_identities(); + let mls_blob = member + .group_ref() + .map(bincode::serialize) + .transpose() + .map_err(|e| SdkError::Storage(format!("serialize MLS group: {e}")))?; + + let conv = Conversation { + id: conv_id.clone(), + kind: ConversationKind::Dm { + peer_key: peer_key.to_vec(), + peer_username: None, + }, + display_name: format!("DM:{}", hex::encode(&peer_key[..4])), + mls_group_blob: mls_blob, + keystore_blob: None, + member_keys, + unread_count: 0, + last_activity_ms: now_ms(), + created_at_ms: now_ms(), + is_hybrid: member.is_hybrid(), + last_seen_seq: 0, + }; + conv_store + .save_conversation(&conv) + .map_err(|e| SdkError::Storage(e.to_string()))?; + + debug!(conv = %conv_id.hex(), "DM created (initiator)"); + } else { + // 2b. Peer is the MLS initiator — save a stub. + let conv = Conversation { + id: conv_id.clone(), + kind: ConversationKind::Dm { + peer_key: peer_key.to_vec(), + peer_username: None, + }, + display_name: format!("DM:{}", hex::encode(&peer_key[..4])), + mls_group_blob: None, + keystore_blob: None, + member_keys: vec![my_identity.public_key_bytes().to_vec(), peer_key.to_vec()], + unread_count: 0, + last_activity_ms: now_ms(), + created_at_ms: now_ms(), + is_hybrid: false, + last_seen_seq: 0, + }; + conv_store + .save_conversation(&conv) + .map_err(|e| SdkError::Storage(e.to_string()))?; + + debug!(conv = %conv_id.hex(), "DM stub saved (waiting for Welcome)"); + } + + Ok((conv_id, was_new)) +} + +// ── Group creation ────────────────────────────────────────────────────────── + +/// Create a new group conversation (local only, no RPC). +pub fn create_group( + conv_store: &ConversationStore, + member: &mut GroupMember, + group_name: &str, +) -> Result { + let conv_id = ConversationId::from_group_name(group_name); + + member + .create_group(conv_id.0.as_slice()) + .map_err(|e| SdkError::Crypto(format!("create_group: {e}")))?; + + let member_keys = member.member_identities(); + let mls_blob = member + .group_ref() + .map(bincode::serialize) + .transpose() + .map_err(|e| SdkError::Storage(format!("serialize MLS group: {e}")))?; + + let conv = Conversation { + id: conv_id.clone(), + kind: ConversationKind::Group { + name: group_name.to_string(), + }, + display_name: format!("#{group_name}"), + mls_group_blob: mls_blob, + keystore_blob: None, + member_keys, + unread_count: 0, + last_activity_ms: now_ms(), + created_at_ms: now_ms(), + is_hybrid: member.is_hybrid(), + last_seen_seq: 0, + }; + conv_store + .save_conversation(&conv) + .map_err(|e| SdkError::Storage(e.to_string()))?; + + debug!(conv = %conv_id.hex(), group = group_name, "group created"); + Ok(conv_id) +} + +// ── Group invite ──────────────────────────────────────────────────────────── + +/// Invite a peer to an existing group. +/// +/// Sends the Welcome to the new peer and the Commit to all existing members. +pub async fn invite_to_group( + rpc: &RpcClient, + conv_store: &ConversationStore, + member: &mut GroupMember, + my_identity: &IdentityKeypair, + conv_id: &ConversationId, + peer_key: &[u8], + peer_key_package: &[u8], + hybrid_kp: Option<&HybridKeypair>, + peer_hybrid_pk: Option<&HybridPublicKey>, +) -> Result<(), SdkError> { + let my_key = my_identity.public_key_bytes(); + + let (_commit, welcome) = member + .add_member(peer_key_package) + .map_err(|e| SdkError::Crypto(format!("add_member: {e}")))?; + + // Send Welcome to new peer. + let payload = wrap_hybrid(hybrid_kp, peer_hybrid_pk, &welcome, member.is_hybrid())?; + enqueue_to_peer(rpc, peer_key, &payload).await?; + + // Persist updated MLS state. + save_mls_state(conv_store, conv_id, member)?; + + debug!( + conv = %conv_id.hex(), + peer = %hex::encode(&peer_key[..4]), + "invited peer to group" + ); + let _ = my_key; // used for filtering in future commit broadcast + Ok(()) +} + +// ── Join from Welcome ─────────────────────────────────────────────────────── + +/// Join a group from a Welcome message. +/// +/// Returns the conversation ID derived from the MLS group ID. +pub fn join_from_welcome( + conv_store: &ConversationStore, + member: &mut GroupMember, + welcome_bytes: &[u8], + hybrid_kp: Option<&HybridKeypair>, +) -> Result { + // Try hybrid decryption if we have a hybrid keypair. + let decrypted; + let welcome_data = if let Some(hkp) = hybrid_kp { + match quicproquo_core::hybrid_decrypt(hkp, welcome_bytes, b"", b"") { + Ok(plain) => { + decrypted = plain; + &decrypted[..] + } + Err(_) => welcome_bytes, // not hybrid-encrypted, use as-is + } + } else { + welcome_bytes + }; + + member + .join_group(welcome_data) + .map_err(|e| SdkError::Crypto(format!("join_group: {e}")))?; + + let group_id = member + .group_id() + .ok_or_else(|| SdkError::Crypto("no group after join".into()))?; + + let conv_id = ConversationId::from_slice(&group_id) + .ok_or_else(|| SdkError::Crypto("group_id is not 16 bytes".into()))?; + + let member_keys = member.member_identities(); + let mls_blob = member + .group_ref() + .map(bincode::serialize) + .transpose() + .map_err(|e| SdkError::Storage(format!("serialize MLS group: {e}")))?; + + // Upsert conversation — the stub may already exist from create_dm. + let existing = conv_store + .load_conversation(&conv_id) + .map_err(|e| SdkError::Storage(e.to_string()))?; + + let conv = if let Some(mut ex) = existing { + ex.mls_group_blob = mls_blob; + ex.member_keys = member_keys; + ex.is_hybrid = member.is_hybrid(); + ex.last_activity_ms = now_ms(); + ex + } else { + Conversation { + id: conv_id.clone(), + kind: ConversationKind::Group { + name: format!("group-{}", conv_id.hex()), + }, + display_name: format!("group-{}", &conv_id.hex()[..8]), + mls_group_blob: mls_blob, + keystore_blob: None, + member_keys, + unread_count: 0, + last_activity_ms: now_ms(), + created_at_ms: now_ms(), + is_hybrid: member.is_hybrid(), + last_seen_seq: 0, + } + }; + + conv_store + .save_conversation(&conv) + .map_err(|e| SdkError::Storage(e.to_string()))?; + + debug!(conv = %conv_id.hex(), "joined group from Welcome"); + Ok(conv_id) +} + +// ── MLS state persistence ─────────────────────────────────────────────────── + +/// Save MLS group state into a conversation record. +pub fn save_mls_state( + conv_store: &ConversationStore, + conv_id: &ConversationId, + member: &GroupMember, +) -> Result<(), SdkError> { + let mut conv = conv_store + .load_conversation(conv_id) + .map_err(|e| SdkError::Storage(e.to_string()))? + .ok_or_else(|| SdkError::ConversationNotFound(conv_id.hex()))?; + + conv.mls_group_blob = member + .group_ref() + .map(bincode::serialize) + .transpose() + .map_err(|e| SdkError::Storage(format!("serialize MLS group: {e}")))?; + conv.member_keys = member.member_identities(); + conv.is_hybrid = member.is_hybrid(); + + conv_store + .save_conversation(&conv) + .map_err(|e| SdkError::Storage(e.to_string()))?; + Ok(()) +} + +/// Restore a `GroupMember` from a conversation record. +/// +/// Returns `Err` if the conversation has no MLS group blob. +pub fn restore_mls_state( + conv: &Conversation, + identity: &Arc, +) -> Result { + let group_blob = conv + .mls_group_blob + .as_ref() + .ok_or_else(|| SdkError::Crypto("no MLS group blob in conversation".into()))?; + + let mls_group = bincode::deserialize(group_blob) + .map_err(|e| SdkError::Crypto(format!("deserialize MLS group: {e}")))?; + + let ks = quicproquo_core::DiskKeyStore::ephemeral(); + let member = GroupMember::new_with_state(Arc::clone(identity), ks, Some(mls_group), conv.is_hybrid); + + Ok(member) +} + +// ── Helpers ───────────────────────────────────────────────────────────────── + +/// Optionally wrap data in hybrid encryption. +/// +/// If `is_hybrid_mls` is true, MLS already provides PQ protection and we skip +/// the outer envelope. +fn wrap_hybrid( + my_kp: Option<&HybridKeypair>, + peer_pk: Option<&HybridPublicKey>, + data: &[u8], + is_hybrid_mls: bool, +) -> Result, SdkError> { + if is_hybrid_mls { + return Ok(data.to_vec()); + } + match (my_kp, peer_pk) { + (Some(_), Some(pk)) => { + hybrid_encrypt(pk, data, b"", b"") + .map_err(|e| SdkError::Crypto(format!("hybrid encrypt: {e}"))) + } + _ => Ok(data.to_vec()), + } +} + +/// Enqueue a payload to a peer via the ENQUEUE RPC. +async fn enqueue_to_peer( + rpc: &RpcClient, + recipient_key: &[u8], + payload: &[u8], +) -> Result<(), SdkError> { + let req = EnqueueRequest { + recipient_key: recipient_key.to_vec(), + payload: payload.to_vec(), + channel_id: Vec::new(), + ttl_secs: 0, + }; + let resp_bytes = rpc + .call(method_ids::ENQUEUE, Bytes::from(req.encode_to_vec())) + .await?; + let _resp = EnqueueResponse::decode(resp_bytes) + .map_err(|e| SdkError::Crypto(format!("decode EnqueueResponse: {e}")))?; + Ok(()) +} diff --git a/crates/quicproquo-sdk/src/lib.rs b/crates/quicproquo-sdk/src/lib.rs index 3c82d45..38bb1c7 100644 --- a/crates/quicproquo-sdk/src/lib.rs +++ b/crates/quicproquo-sdk/src/lib.rs @@ -10,7 +10,9 @@ pub mod conversation; pub mod devices; pub mod error; pub mod events; +pub mod groups; pub mod keys; pub mod messaging; +pub mod outbox; pub mod state; pub mod users; diff --git a/crates/quicproquo-sdk/src/outbox.rs b/crates/quicproquo-sdk/src/outbox.rs new file mode 100644 index 0000000..da86f7e --- /dev/null +++ b/crates/quicproquo-sdk/src/outbox.rs @@ -0,0 +1,76 @@ +//! Offline outbox — queue messages for deferred delivery. + +use bytes::Bytes; +use prost::Message; +use tracing::{debug, warn}; + +use quicproquo_proto::method_ids; +use quicproquo_proto::qpq::v1::{EnqueueRequest, EnqueueResponse}; +use quicproquo_rpc::client::RpcClient; + +use crate::conversation::{ConversationId, ConversationStore}; +use crate::error::SdkError; + +/// Queue a message for sending when connectivity is restored. +pub fn queue_outbox( + conv_store: &ConversationStore, + conv_id: &ConversationId, + recipient_key: &[u8], + payload: &[u8], +) -> Result<(), SdkError> { + conv_store + .enqueue_outbox(conv_id, recipient_key, payload) + .map_err(|e| SdkError::Storage(format!("enqueue outbox: {e}"))) +} + +/// Process all pending outbox entries — send them to the server. +/// +/// Returns the number of entries successfully sent. +pub async fn flush_outbox( + rpc: &RpcClient, + conv_store: &ConversationStore, +) -> Result { + let entries = conv_store + .load_pending_outbox() + .map_err(|e| SdkError::Storage(format!("load outbox: {e}")))?; + + let mut sent = 0usize; + for entry in &entries { + let req = EnqueueRequest { + recipient_key: entry.recipient_key.clone(), + payload: entry.payload.clone(), + channel_id: Vec::new(), + ttl_secs: 0, + }; + match rpc + .call(method_ids::ENQUEUE, Bytes::from(req.encode_to_vec())) + .await + { + Ok(resp_bytes) => { + if let Err(e) = EnqueueResponse::decode(resp_bytes) { + warn!(outbox_id = entry.id, "decode enqueue response: {e}"); + } + conv_store + .mark_outbox_sent(entry.id) + .map_err(|e| SdkError::Storage(format!("mark_outbox_sent: {e}")))?; + sent += 1; + debug!(outbox_id = entry.id, "outbox entry sent"); + } + Err(e) => { + warn!(outbox_id = entry.id, "outbox send failed: {e}"); + conv_store + .mark_outbox_failed(entry.id, entry.retry_count + 1) + .map_err(|e| SdkError::Storage(format!("mark_outbox_failed: {e}")))?; + } + } + } + + Ok(sent) +} + +/// Get the number of pending outbox entries. +pub fn outbox_count(conv_store: &ConversationStore) -> Result { + conv_store + .count_pending_outbox() + .map_err(|e| SdkError::Storage(format!("count outbox: {e}"))) +}