//! Group lifecycle — create DMs, groups, invite members, join from Welcome. //! //! All functions are free-standing (not methods on `QpqClient`) so they can be //! tested and composed independently. use std::sync::Arc; use bytes::Bytes; use prost::Message; use tracing::debug; use quicproquo_core::{ hybrid_encrypt, GroupMember, HybridKeypair, HybridPublicKey, IdentityKeypair, }; use quicproquo_proto::method_ids; use quicproquo_proto::qpq::v1::{ CreateChannelRequest, CreateChannelResponse, EnqueueRequest, EnqueueResponse, }; use quicproquo_rpc::client::RpcClient; use crate::conversation::{ now_ms, Conversation, ConversationId, ConversationKind, ConversationStore, }; use crate::error::SdkError; // ── DM (1:1) ──────────────────────────────────────────────────────────────── /// Create or join a 1:1 DM channel with a peer. /// /// Returns `(conversation_id, was_new)`. /// - `was_new = true` — caller created the MLS group and sent the Welcome. /// - `was_new = false` — peer is the MLS initiator; caller should wait for Welcome. pub async fn create_dm( rpc: &RpcClient, conv_store: &ConversationStore, member: &mut GroupMember, my_identity: &IdentityKeypair, peer_key: &[u8], peer_key_package: &[u8], hybrid_kp: Option<&HybridKeypair>, peer_hybrid_pk: Option<&HybridPublicKey>, ) -> Result<(ConversationId, bool), SdkError> { // 1. Call CREATE_CHANNEL RPC. let req = CreateChannelRequest { peer_key: peer_key.to_vec(), }; let resp_bytes = rpc .call(method_ids::CREATE_CHANNEL, Bytes::from(req.encode_to_vec())) .await?; let resp = CreateChannelResponse::decode(resp_bytes) .map_err(|e| SdkError::Crypto(format!("decode CreateChannelResponse: {e}")))?; let conv_id = ConversationId::from_slice(&resp.channel_id) .ok_or_else(|| SdkError::Other(anyhow::anyhow!("server returned invalid channel_id")))?; let was_new = resp.was_new; if was_new { // 2a. We are the MLS initiator. member .create_group(&resp.channel_id) .map_err(|e| SdkError::Crypto(format!("create_group: {e}")))?; let (_commit, welcome) = member .add_member(peer_key_package) .map_err(|e| SdkError::Crypto(format!("add_member: {e}")))?; // Optionally hybrid-wrap the welcome. let payload = wrap_hybrid(hybrid_kp, peer_hybrid_pk, &welcome, member.is_hybrid())?; // Enqueue welcome to peer. enqueue_to_peer(rpc, peer_key, &payload).await?; // Save conversation with MLS state. let member_keys = member.member_identities(); let mls_blob = member .group_ref() .map(bincode::serialize) .transpose() .map_err(|e| SdkError::Storage(format!("serialize MLS group: {e}")))?; let conv = Conversation { id: conv_id.clone(), kind: ConversationKind::Dm { peer_key: peer_key.to_vec(), peer_username: None, }, display_name: format!("DM:{}", hex::encode(&peer_key[..4])), mls_group_blob: mls_blob, keystore_blob: None, member_keys, unread_count: 0, last_activity_ms: now_ms(), created_at_ms: now_ms(), is_hybrid: member.is_hybrid(), last_seen_seq: 0, }; conv_store .save_conversation(&conv) .map_err(|e| SdkError::Storage(e.to_string()))?; debug!(conv = %conv_id.hex(), "DM created (initiator)"); } else { // 2b. Peer is the MLS initiator — save a stub. let conv = Conversation { id: conv_id.clone(), kind: ConversationKind::Dm { peer_key: peer_key.to_vec(), peer_username: None, }, display_name: format!("DM:{}", hex::encode(&peer_key[..4])), mls_group_blob: None, keystore_blob: None, member_keys: vec![my_identity.public_key_bytes().to_vec(), peer_key.to_vec()], unread_count: 0, last_activity_ms: now_ms(), created_at_ms: now_ms(), is_hybrid: false, last_seen_seq: 0, }; conv_store .save_conversation(&conv) .map_err(|e| SdkError::Storage(e.to_string()))?; debug!(conv = %conv_id.hex(), "DM stub saved (waiting for Welcome)"); } Ok((conv_id, was_new)) } // ── Group creation ────────────────────────────────────────────────────────── /// Create a new group conversation (local only, no RPC). pub fn create_group( conv_store: &ConversationStore, member: &mut GroupMember, group_name: &str, ) -> Result { let conv_id = ConversationId::from_group_name(group_name); member .create_group(conv_id.0.as_slice()) .map_err(|e| SdkError::Crypto(format!("create_group: {e}")))?; let member_keys = member.member_identities(); let mls_blob = member .group_ref() .map(bincode::serialize) .transpose() .map_err(|e| SdkError::Storage(format!("serialize MLS group: {e}")))?; let conv = Conversation { id: conv_id.clone(), kind: ConversationKind::Group { name: group_name.to_string(), }, display_name: format!("#{group_name}"), mls_group_blob: mls_blob, keystore_blob: None, member_keys, unread_count: 0, last_activity_ms: now_ms(), created_at_ms: now_ms(), is_hybrid: member.is_hybrid(), last_seen_seq: 0, }; conv_store .save_conversation(&conv) .map_err(|e| SdkError::Storage(e.to_string()))?; debug!(conv = %conv_id.hex(), group = group_name, "group created"); Ok(conv_id) } // ── Group invite ──────────────────────────────────────────────────────────── /// Invite a peer to an existing group. /// /// Sends the Welcome to the new peer and the Commit to all existing members. pub async fn invite_to_group( rpc: &RpcClient, conv_store: &ConversationStore, member: &mut GroupMember, my_identity: &IdentityKeypair, conv_id: &ConversationId, peer_key: &[u8], peer_key_package: &[u8], hybrid_kp: Option<&HybridKeypair>, peer_hybrid_pk: Option<&HybridPublicKey>, ) -> Result<(), SdkError> { let my_key = my_identity.public_key_bytes(); let (_commit, welcome) = member .add_member(peer_key_package) .map_err(|e| SdkError::Crypto(format!("add_member: {e}")))?; // Send Welcome to new peer. let payload = wrap_hybrid(hybrid_kp, peer_hybrid_pk, &welcome, member.is_hybrid())?; enqueue_to_peer(rpc, peer_key, &payload).await?; // Persist updated MLS state. save_mls_state(conv_store, conv_id, member)?; debug!( conv = %conv_id.hex(), peer = %hex::encode(&peer_key[..4]), "invited peer to group" ); let _ = my_key; // used for filtering in future commit broadcast Ok(()) } // ── Join from Welcome ─────────────────────────────────────────────────────── /// Join a group from a Welcome message. /// /// Returns the conversation ID derived from the MLS group ID. pub fn join_from_welcome( conv_store: &ConversationStore, member: &mut GroupMember, welcome_bytes: &[u8], hybrid_kp: Option<&HybridKeypair>, ) -> Result { // Try hybrid decryption if we have a hybrid keypair. let decrypted; let welcome_data = if let Some(hkp) = hybrid_kp { match quicproquo_core::hybrid_decrypt(hkp, welcome_bytes, b"", b"") { Ok(plain) => { decrypted = plain; &decrypted[..] } Err(_) => welcome_bytes, // not hybrid-encrypted, use as-is } } else { welcome_bytes }; member .join_group(welcome_data) .map_err(|e| SdkError::Crypto(format!("join_group: {e}")))?; let group_id = member .group_id() .ok_or_else(|| SdkError::Crypto("no group after join".into()))?; let conv_id = ConversationId::from_slice(&group_id) .ok_or_else(|| SdkError::Crypto("group_id is not 16 bytes".into()))?; let member_keys = member.member_identities(); let mls_blob = member .group_ref() .map(bincode::serialize) .transpose() .map_err(|e| SdkError::Storage(format!("serialize MLS group: {e}")))?; // Upsert conversation — the stub may already exist from create_dm. let existing = conv_store .load_conversation(&conv_id) .map_err(|e| SdkError::Storage(e.to_string()))?; let conv = if let Some(mut ex) = existing { ex.mls_group_blob = mls_blob; ex.member_keys = member_keys; ex.is_hybrid = member.is_hybrid(); ex.last_activity_ms = now_ms(); ex } else { Conversation { id: conv_id.clone(), kind: ConversationKind::Group { name: format!("group-{}", conv_id.hex()), }, display_name: format!("group-{}", &conv_id.hex()[..8]), mls_group_blob: mls_blob, keystore_blob: None, member_keys, unread_count: 0, last_activity_ms: now_ms(), created_at_ms: now_ms(), is_hybrid: member.is_hybrid(), last_seen_seq: 0, } }; conv_store .save_conversation(&conv) .map_err(|e| SdkError::Storage(e.to_string()))?; debug!(conv = %conv_id.hex(), "joined group from Welcome"); Ok(conv_id) } // ── MLS state persistence ─────────────────────────────────────────────────── /// Save MLS group state into a conversation record. pub fn save_mls_state( conv_store: &ConversationStore, conv_id: &ConversationId, member: &GroupMember, ) -> Result<(), SdkError> { let mut conv = conv_store .load_conversation(conv_id) .map_err(|e| SdkError::Storage(e.to_string()))? .ok_or_else(|| SdkError::ConversationNotFound(conv_id.hex()))?; conv.mls_group_blob = member .group_ref() .map(bincode::serialize) .transpose() .map_err(|e| SdkError::Storage(format!("serialize MLS group: {e}")))?; conv.member_keys = member.member_identities(); conv.is_hybrid = member.is_hybrid(); conv_store .save_conversation(&conv) .map_err(|e| SdkError::Storage(e.to_string()))?; Ok(()) } /// Restore a `GroupMember` from a conversation record. /// /// Returns `Err` if the conversation has no MLS group blob. pub fn restore_mls_state( conv: &Conversation, identity: &Arc, ) -> Result { let group_blob = conv .mls_group_blob .as_ref() .ok_or_else(|| SdkError::Crypto("no MLS group blob in conversation".into()))?; let mls_group = bincode::deserialize(group_blob) .map_err(|e| SdkError::Crypto(format!("deserialize MLS group: {e}")))?; let ks = quicproquo_core::DiskKeyStore::ephemeral(); let member = GroupMember::new_with_state(Arc::clone(identity), ks, Some(mls_group), conv.is_hybrid); Ok(member) } // ── Helpers ───────────────────────────────────────────────────────────────── /// Optionally wrap data in hybrid encryption. /// /// If `is_hybrid_mls` is true, MLS already provides PQ protection and we skip /// the outer envelope. fn wrap_hybrid( my_kp: Option<&HybridKeypair>, peer_pk: Option<&HybridPublicKey>, data: &[u8], is_hybrid_mls: bool, ) -> Result, SdkError> { if is_hybrid_mls { return Ok(data.to_vec()); } match (my_kp, peer_pk) { (Some(_), Some(pk)) => { hybrid_encrypt(pk, data, b"", b"") .map_err(|e| SdkError::Crypto(format!("hybrid encrypt: {e}"))) } _ => Ok(data.to_vec()), } } /// Enqueue a payload to a peer via the ENQUEUE RPC. async fn enqueue_to_peer( rpc: &RpcClient, recipient_key: &[u8], payload: &[u8], ) -> Result<(), SdkError> { let req = EnqueueRequest { recipient_key: recipient_key.to_vec(), payload: payload.to_vec(), channel_id: Vec::new(), ttl_secs: 0, }; let resp_bytes = rpc .call(method_ids::ENQUEUE, Bytes::from(req.encode_to_vec())) .await?; let _resp = EnqueueResponse::decode(resp_bytes) .map_err(|e| SdkError::Crypto(format!("decode EnqueueResponse: {e}")))?; Ok(()) }