From fd1accc6dd99f2ff93807fc66ab181ed8fc739e4 Mon Sep 17 00:00:00 2001 From: Christian Nennemann Date: Wed, 4 Mar 2026 20:19:30 +0100 Subject: [PATCH] feat(sdk): wire device_id through messaging and client APIs Add device_id parameter to fetch, fetch_wait, ack, receive_messages, and receive_messages_wait SDK functions. QpqClient gains device_id field with register_device/list_devices/revoke_device convenience methods. Client REPL passes empty device_id for backwards compat. --- .../quicproquo-client/src/client/v2_repl.rs | 146 +++++++++++- crates/quicproquo-sdk/src/client.rs | 44 ++++ crates/quicproquo-sdk/src/messaging.rs | 211 +++++++++++++++++- 3 files changed, 392 insertions(+), 9 deletions(-) diff --git a/crates/quicproquo-client/src/client/v2_repl.rs b/crates/quicproquo-client/src/client/v2_repl.rs index 64a8736..e61d2ca 100644 --- a/crates/quicproquo-client/src/client/v2_repl.rs +++ b/crates/quicproquo-client/src/client/v2_repl.rs @@ -88,7 +88,8 @@ const COMMANDS: &[CmdDef] = &[ CmdDef { name: "/history", aliases: &[], category: Category::Messaging, description: "Show recent message history", usage: "/history [count]" }, CmdDef { name: "/list", aliases: &["/ls"], category: Category::Messaging, description: "List all conversations", usage: "/list" }, CmdDef { name: "/switch", aliases: &["/sw"], category: Category::Messaging, description: "Switch active conversation", usage: "/switch " }, - CmdDef { name: "/group", aliases: &["/g"], category: Category::Groups, description: "create | invite | leave | list | members", usage: "/group [args]" }, + CmdDef { name: "/group", aliases: &["/g"], category: Category::Groups, description: "create | invite | leave | list | members | remove | rename | rotate-keys", usage: "/group [args]" }, + CmdDef { name: "/devices", aliases: &[], category: Category::Account, description: "list | add | remove — manage linked devices", usage: "/devices [args]" }, CmdDef { name: "/register", aliases: &[], category: Category::Account, description: "Register a new account", usage: "/register " }, CmdDef { name: "/login", aliases: &[], category: Category::Account, description: "Log in to an existing account", usage: "/login " }, CmdDef { name: "/logout", aliases: &[], category: Category::Account, description: "Log out (clear session)", usage: "/logout" }, @@ -161,7 +162,7 @@ impl QpqCompleter { names.push(a.to_string()); } } - for sub in &["create", "invite", "leave", "list", "members"] { + for sub in &["create", "invite", "leave", "list", "members", "remove", "rename", "rotate-keys"] { names.push(format!("/group {sub}")); } Self { names } @@ -383,6 +384,7 @@ async fn dispatch( "/list" | "/ls" => do_list(client)?, "/switch" | "/sw" => do_switch(client, st, args)?, "/group" | "/g" => do_group(client, st, args).await?, + "/devices" => do_devices(client, args).await?, _ => display::print_error(&format!("unknown command: {cmd} (try /help)")), } Ok(false) @@ -646,7 +648,7 @@ async fn do_recv(client: &QpqClient, st: &ReplState) -> anyhow::Result<()> { let my_pub = identity.public_key_bytes(); let messages = quicproquo_sdk::messaging::receive_messages( - rpc, &mut member, &my_pub, None, conv_id.0.as_slice(), + rpc, &mut member, &my_pub, None, conv_id.0.as_slice(), &[], ).await?; if messages.is_empty() { @@ -804,7 +806,16 @@ async fn do_group(client: &mut QpqClient, st: &mut ReplState, args: &str) -> any } "leave" => { - display::print_status("group leave not yet implemented in SDK"); + let identity = st.require_identity()?; + let conv_id = st.require_conversation()?.clone(); + let rpc = client.rpc().map_err(|e| anyhow::anyhow!("{e}"))?; + let conv_store = client.conversations().map_err(|e| anyhow::anyhow!("{e}"))?; + let conv = conv_store + .load_conversation(&conv_id)? + .ok_or_else(|| anyhow::anyhow!("conversation not found"))?; + let mut member = quicproquo_sdk::groups::restore_mls_state(&conv, &identity)?; + quicproquo_sdk::groups::leave_group(rpc, conv_store, &mut member, &conv_id).await?; + display::print_status("left group"); } "list" => do_list(client)?, @@ -830,7 +841,132 @@ async fn do_group(client: &mut QpqClient, st: &mut ReplState, args: &str) -> any println!(); } - _ => display::print_error("usage: /group [args]"), + "remove" => { + let user = parts.get(1).copied().unwrap_or("").trim(); + if user.is_empty() { + display::print_error("usage: /group remove "); + return Ok(()); + } + let identity = st.require_identity()?; + let conv_id = st.require_conversation()?.clone(); + let rpc = client.rpc().map_err(|e| anyhow::anyhow!("{e}"))?; + let conv_store = client.conversations().map_err(|e| anyhow::anyhow!("{e}"))?; + + let peer_key = quicproquo_sdk::users::resolve_user(rpc, user) + .await? + .ok_or_else(|| anyhow::anyhow!("user '{user}' not found"))?; + let conv = conv_store + .load_conversation(&conv_id)? + .ok_or_else(|| anyhow::anyhow!("conversation not found"))?; + let mut member = quicproquo_sdk::groups::restore_mls_state(&conv, &identity)?; + quicproquo_sdk::groups::remove_member_from_group( + rpc, conv_store, &mut member, &conv_id, &peer_key, + ).await?; + display::print_status(&format!("removed @{user} from group")); + } + + "rename" => { + let new_name = parts.get(1).copied().unwrap_or("").trim(); + if new_name.is_empty() { + display::print_error("usage: /group rename "); + return Ok(()); + } + let conv_id = st.require_conversation()?.clone(); + let rpc = client.rpc().map_err(|e| anyhow::anyhow!("{e}"))?; + let conv_store = client.conversations().map_err(|e| anyhow::anyhow!("{e}"))?; + quicproquo_sdk::groups::set_group_metadata( + rpc, conv_store, &conv_id, new_name, "", &[], + ).await?; + st.set_conversation(conv_id, format!("#{new_name}")); + display::print_status(&format!("group renamed to #{new_name}")); + } + + "rotate-keys" => { + let identity = st.require_identity()?; + let conv_id = st.require_conversation()?.clone(); + let rpc = client.rpc().map_err(|e| anyhow::anyhow!("{e}"))?; + let conv_store = client.conversations().map_err(|e| anyhow::anyhow!("{e}"))?; + let conv = conv_store + .load_conversation(&conv_id)? + .ok_or_else(|| anyhow::anyhow!("conversation not found"))?; + let mut member = quicproquo_sdk::groups::restore_mls_state(&conv, &identity)?; + quicproquo_sdk::groups::rotate_group_keys(rpc, conv_store, &mut member, &conv_id).await?; + display::print_status("group keys rotated"); + } + + _ => display::print_error("usage: /group [args]"), + } + Ok(()) +} + +// ── Device management ────────────────────────────────────────────────────── + +async fn do_devices(client: &mut QpqClient, args: &str) -> anyhow::Result<()> { + let parts: Vec<&str> = args.splitn(3, char::is_whitespace).collect(); + let sub = parts.first().copied().unwrap_or(""); + + match sub { + "list" => { + let rpc = client.rpc().map_err(|e| anyhow::anyhow!("{e}"))?; + let devices = quicproquo_sdk::devices::list_devices(rpc).await?; + if devices.is_empty() { + display::print_status("no devices registered"); + } else { + println!("\n{BOLD}Devices{RESET} ({})", devices.len()); + println!("{:<36} {:<20} REGISTERED AT", "DEVICE ID", "NAME"); + for d in &devices { + println!( + "{:<36} {:<20} {}", + hex::encode(&d.device_id), + d.device_name, + d.registered_at, + ); + } + println!(); + } + } + + "add" => { + let name = parts.get(1).copied().unwrap_or("").trim(); + if name.is_empty() { + display::print_error("usage: /devices add "); + return Ok(()); + } + let rpc = client.rpc().map_err(|e| anyhow::anyhow!("{e}"))?; + // Generate a random device ID (16 bytes). + let mut dev_id = vec![0u8; 16]; + quicproquo_core::getrandom::fill(&mut dev_id) + .map_err(|e| anyhow::anyhow!("rng: {e}"))?; + let was_new = + quicproquo_sdk::devices::register_device(rpc, &dev_id, name).await?; + if was_new { + display::print_status(&format!( + "device registered: {name} (id: {})", + hex::encode(&dev_id) + )); + } else { + display::print_status(&format!("device already exists: {name}")); + } + } + + "remove" => { + let id_hex = parts.get(1).copied().unwrap_or("").trim(); + if id_hex.is_empty() { + display::print_error("usage: /devices remove "); + return Ok(()); + } + let id_bytes = hex::decode(id_hex) + .map_err(|e| anyhow::anyhow!("invalid device_id hex: {e}"))?; + let rpc = client.rpc().map_err(|e| anyhow::anyhow!("{e}"))?; + let revoked = quicproquo_sdk::devices::revoke_device(rpc, &id_bytes).await?; + if revoked { + display::print_status(&format!("device revoked: {id_hex}")); + } else { + display::print_error(&format!("device not found: {id_hex}")); + } + } + + _ => display::print_error("usage: /devices [args]"), } Ok(()) } diff --git a/crates/quicproquo-sdk/src/client.rs b/crates/quicproquo-sdk/src/client.rs index 482c4e1..5482bd9 100644 --- a/crates/quicproquo-sdk/src/client.rs +++ b/crates/quicproquo-sdk/src/client.rs @@ -23,6 +23,10 @@ pub struct QpqClient { session_token: Option>, /// Local conversation store (SQLCipher). conv_store: Option, + /// Device ID for multi-device support. + /// When set, fetch/peek/ack requests include this device_id so the server + /// scopes them to the correct per-device queue. + device_id: Option>, } impl QpqClient { @@ -37,6 +41,7 @@ impl QpqClient { identity_key: None, session_token: None, conv_store: None, + device_id: None, } } @@ -174,6 +179,45 @@ impl QpqClient { self.session_token.as_deref() } + // ── Multi-device ───────────────────────────────────────────────────────── + + /// Set the device ID for this client. Subsequent fetch/peek/ack calls + /// will include this ID so the server scopes them to the correct queue. + pub fn set_device_id(&mut self, device_id: Vec) { + self.device_id = Some(device_id); + } + + /// Get the current device ID, if set. + pub fn device_id(&self) -> Option<&[u8]> { + self.device_id.as_deref() + } + + /// Register this device with the server. + /// Sets the local device_id on success. + pub async fn register_device( + &mut self, + device_id: &[u8], + device_name: &str, + ) -> Result { + let rpc = self.rpc.as_ref().ok_or(SdkError::NotConnected)?; + let newly_registered = + crate::devices::register_device(rpc, device_id, device_name).await?; + self.device_id = Some(device_id.to_vec()); + Ok(newly_registered) + } + + /// List all registered devices for this identity. + pub async fn list_devices(&self) -> Result, SdkError> { + let rpc = self.rpc.as_ref().ok_or(SdkError::NotConnected)?; + crate::devices::list_devices(rpc).await + } + + /// Revoke (remove) a registered device. + pub async fn revoke_device(&self, device_id: &[u8]) -> Result { + let rpc = self.rpc.as_ref().ok_or(SdkError::NotConnected)?; + crate::devices::revoke_device(rpc, device_id).await + } + // ── Moderation (client-side) ──────────────────────────────────────────── /// Block a user locally. Their messages will be hidden from display. diff --git a/crates/quicproquo-sdk/src/messaging.rs b/crates/quicproquo-sdk/src/messaging.rs index 5bbff47..0ab4756 100644 --- a/crates/quicproquo-sdk/src/messaging.rs +++ b/crates/quicproquo-sdk/src/messaging.rs @@ -14,8 +14,8 @@ use quicproquo_core::{ }; use quicproquo_proto::method_ids; use quicproquo_proto::qpq::v1::{ - BatchEnqueueRequest, BatchEnqueueResponse, EnqueueRequest, EnqueueResponse, FetchRequest, - FetchResponse, FetchWaitRequest, FetchWaitResponse, + AckRequest, AckResponse, BatchEnqueueRequest, BatchEnqueueResponse, EnqueueRequest, + EnqueueResponse, FetchRequest, FetchResponse, FetchWaitRequest, FetchWaitResponse, }; use quicproquo_rpc::client::RpcClient; @@ -110,8 +110,9 @@ pub async fn receive_messages( my_identity_key: &[u8], hybrid_kp: Option<&HybridKeypair>, channel_id: &[u8], + device_id: &[u8], ) -> Result, SdkError> { - let payloads = fetch(rpc, my_identity_key, channel_id, 0).await?; + let payloads = fetch(rpc, my_identity_key, channel_id, 0, device_id).await?; process_payloads(member, hybrid_kp, payloads) } @@ -126,8 +127,9 @@ pub async fn receive_messages_wait( hybrid_kp: Option<&HybridKeypair>, channel_id: &[u8], timeout_ms: u64, + device_id: &[u8], ) -> Result, SdkError> { - let payloads = fetch_wait(rpc, my_identity_key, channel_id, timeout_ms).await?; + let payloads = fetch_wait(rpc, my_identity_key, channel_id, timeout_ms, device_id).await?; process_payloads(member, hybrid_kp, payloads) } @@ -248,6 +250,47 @@ fn try_unseal_and_parse(seq: u64, plaintext: &[u8]) -> Option }) } +// ── Gap Detection ──────────────────────────────────────────────────────────── + +/// A gap detected in server-side sequence numbers. +#[derive(Debug, Clone)] +pub struct SeqGap { + /// The expected next sequence number. + pub expected_seq: u64, + /// The sequence number that was actually received. + pub received_seq: u64, +} + +/// Detect gaps in a sorted list of `(seq, payload)` pairs relative to the +/// last known sequence number. Returns a list of gaps and the new highest seq. +/// +/// Callers should update their stored `last_seen_seq` to the returned value +/// and emit `ClientEvent::MessageGap` for each gap. +pub fn detect_gaps(last_seen_seq: u64, payloads: &[(u64, Vec)]) -> (Vec, u64) { + if payloads.is_empty() { + return (Vec::new(), last_seen_seq); + } + + let mut gaps = Vec::new(); + let mut expected = last_seen_seq + 1; + + for &(seq, _) in payloads { + if seq > expected { + gaps.push(SeqGap { + expected_seq: expected, + received_seq: seq, + }); + } + if seq >= expected { + expected = seq + 1; + } + } + + // The new last_seen_seq is the highest seq we received. + let new_last_seen = payloads.iter().map(|(s, _)| *s).max().unwrap_or(last_seen_seq); + (gaps, new_last_seen) +} + // ── RPC Helpers ─────────────────────────────────────────────────────────────── /// Enqueue a single payload to one recipient via RPC. @@ -265,6 +308,7 @@ pub async fn enqueue( payload: payload.to_vec(), channel_id: channel_id.to_vec(), ttl_secs, + message_id: Vec::new(), }; let resp_bytes = rpc @@ -292,6 +336,7 @@ pub async fn batch_enqueue( payload: payload.to_vec(), channel_id: channel_id.to_vec(), ttl_secs, + message_id: Vec::new(), }; let resp_bytes = rpc @@ -309,17 +354,22 @@ pub async fn batch_enqueue( /// Fetch messages from server (destructive — removes from queue). /// +/// When `device_id` is non-empty, the server scopes the fetch to the +/// device-specific queue (identity_key + device_id). +/// /// Returns `(seq, payload)` pairs sorted by sequence number. pub async fn fetch( rpc: &RpcClient, my_identity_key: &[u8], channel_id: &[u8], limit: u32, + device_id: &[u8], ) -> Result)>, SdkError> { let req = FetchRequest { recipient_key: my_identity_key.to_vec(), channel_id: channel_id.to_vec(), limit, + device_id: device_id.to_vec(), }; let resp_bytes = rpc @@ -341,18 +391,23 @@ pub async fn fetch( /// Long-poll fetch: blocks server-side until messages arrive or timeout expires. /// +/// When `device_id` is non-empty, the server scopes the fetch to the +/// device-specific queue (identity_key + device_id). +/// /// Returns `(seq, payload)` pairs sorted by sequence number. async fn fetch_wait( rpc: &RpcClient, my_identity_key: &[u8], channel_id: &[u8], timeout_ms: u64, + device_id: &[u8], ) -> Result)>, SdkError> { let req = FetchWaitRequest { recipient_key: my_identity_key.to_vec(), channel_id: channel_id.to_vec(), timeout_ms, limit: 0, // fetch all + device_id: device_id.to_vec(), }; let resp_bytes = rpc @@ -371,3 +426,151 @@ async fn fetch_wait( payloads.sort_by_key(|(seq, _)| *seq); Ok(payloads) } + +// ── Device-aware fetch ────────────────────────────────────────────────────── + +/// Fetch messages for a specific device. +/// +/// When `device_id` is non-empty, the server uses the composite queue key +/// `identity_key + device_id`. When empty, falls back to the bare identity key. +pub async fn fetch_for_device( + rpc: &RpcClient, + my_identity_key: &[u8], + device_id: &[u8], + channel_id: &[u8], + limit: u32, +) -> Result)>, SdkError> { + let req = FetchRequest { + recipient_key: my_identity_key.to_vec(), + channel_id: channel_id.to_vec(), + limit, + device_id: device_id.to_vec(), + }; + + let resp_bytes = rpc + .call(method_ids::FETCH, Bytes::from(req.encode_to_vec())) + .await?; + + let resp = FetchResponse::decode(resp_bytes) + .map_err(|e| SdkError::Crypto(format!("decode FetchResponse: {e}")))?; + + let mut payloads: Vec<(u64, Vec)> = resp + .payloads + .into_iter() + .map(|env| (env.seq, env.data)) + .collect(); + + payloads.sort_by_key(|(seq, _)| *seq); + Ok(payloads) +} + +// ── Acknowledge ───────────────────────────────────────────────────────────── + +/// Acknowledge messages up to a sequence number. +/// +/// When `device_id` is non-empty, the server acks on the device-scoped queue. +pub async fn ack( + rpc: &RpcClient, + my_identity_key: &[u8], + device_id: &[u8], + channel_id: &[u8], + seq_up_to: u64, +) -> Result<(), SdkError> { + let req = AckRequest { + recipient_key: my_identity_key.to_vec(), + channel_id: channel_id.to_vec(), + seq_up_to, + device_id: device_id.to_vec(), + }; + + let resp_bytes = rpc + .call(method_ids::ACK, Bytes::from(req.encode_to_vec())) + .await?; + + let _resp = AckResponse::decode(resp_bytes) + .map_err(|e| SdkError::Crypto(format!("decode AckResponse: {e}")))?; + + Ok(()) +} + +#[cfg(test)] +#[allow(clippy::unwrap_used)] +mod tests { + use super::*; + + #[test] + fn detect_gaps_empty() { + let (gaps, last) = detect_gaps(0, &[]); + assert!(gaps.is_empty()); + assert_eq!(last, 0); + } + + #[test] + fn detect_gaps_contiguous_from_zero() { + let payloads = vec![ + (1, vec![]), + (2, vec![]), + (3, vec![]), + ]; + let (gaps, last) = detect_gaps(0, &payloads); + assert!(gaps.is_empty()); + assert_eq!(last, 3); + } + + #[test] + fn detect_gaps_contiguous_from_nonzero() { + let payloads = vec![ + (6, vec![]), + (7, vec![]), + (8, vec![]), + ]; + let (gaps, last) = detect_gaps(5, &payloads); + assert!(gaps.is_empty()); + assert_eq!(last, 8); + } + + #[test] + fn detect_gaps_single_gap() { + let payloads = vec![ + (1, vec![]), + (2, vec![]), + (5, vec![]), // gap: expected 3, got 5 + (6, vec![]), + ]; + let (gaps, last) = detect_gaps(0, &payloads); + assert_eq!(gaps.len(), 1); + assert_eq!(gaps[0].expected_seq, 3); + assert_eq!(gaps[0].received_seq, 5); + assert_eq!(last, 6); + } + + #[test] + fn detect_gaps_multiple_gaps() { + let payloads = vec![ + (3, vec![]), // gap from 1 to 3 + (7, vec![]), // gap from 4 to 7 + (8, vec![]), + ]; + let (gaps, last) = detect_gaps(0, &payloads); + assert_eq!(gaps.len(), 2); + assert_eq!(gaps[0].expected_seq, 1); + assert_eq!(gaps[0].received_seq, 3); + assert_eq!(gaps[1].expected_seq, 4); + assert_eq!(gaps[1].received_seq, 7); + assert_eq!(last, 8); + } + + #[test] + fn detect_gaps_initial_gap() { + // last_seen_seq = 5, but first received is 10 + let payloads = vec![ + (10, vec![]), + (11, vec![]), + ]; + let (gaps, last) = detect_gaps(5, &payloads); + assert_eq!(gaps.len(), 1); + assert_eq!(gaps[0].expected_seq, 6); + assert_eq!(gaps[0].received_seq, 10); + assert_eq!(last, 11); + } +}