diff --git a/crates/quicproquo-client/src/client/conversation.rs b/crates/quicproquo-client/src/client/conversation.rs index 650cd88..cf55842 100644 --- a/crates/quicproquo-client/src/client/conversation.rs +++ b/crates/quicproquo-client/src/client/conversation.rs @@ -749,6 +749,15 @@ impl ConversationStore { )?; Ok(()) } + + /// Delete messages older than `cutoff_ms` (epoch milliseconds) across all conversations. + pub fn delete_messages_before(&self, cutoff_ms: u64) -> anyhow::Result { + let rows = self.conn.execute( + "DELETE FROM messages WHERE timestamp_ms < ?1", + params![cutoff_ms as i64], + )?; + Ok(rows) + } } /// An entry in the offline outbox queue. diff --git a/crates/quicproquo-client/src/client/repl.rs b/crates/quicproquo-client/src/client/repl.rs index 2a0024a..86c7fee 100644 --- a/crates/quicproquo-client/src/client/repl.rs +++ b/crates/quicproquo-client/src/client/repl.rs @@ -12,7 +12,7 @@ use anyhow::Context; use quicproquo_core::{ AppMessage, DiskKeyStore, GroupMember, IdentityKeypair, ReceivedMessage, compute_safety_number, hybrid_encrypt, parse as parse_app_msg, serialize_chat, - serialize_delete, serialize_edit, serialize_file_ref, serialize_reaction, + serialize_delete, serialize_dummy, serialize_edit, serialize_file_ref, serialize_reaction, serialize_read_receipt, serialize_typing, }; use quicproquo_proto::node_capnp::node_service; @@ -27,8 +27,9 @@ use super::conversation::{ use super::display; use super::rpc::{ connect_node, create_channel, delete_account, download_blob_chunk, enqueue, - enqueue_with_ttl, fetch_hybrid_key, fetch_key_package, fetch_wait, resolve_identity, - resolve_user, try_hybrid_decrypt, upload_blob_chunk, upload_hybrid_key, upload_key_package, + enqueue_with_ttl, fetch_hybrid_key, fetch_key_package, fetch_wait, list_devices, + register_device, resolve_identity, resolve_user, revoke_device, try_hybrid_decrypt, + upload_blob_chunk, upload_hybrid_key, upload_key_package, }; use super::session::SessionState; use super::state::{decode_identity_key, load_or_init_state}; @@ -89,6 +90,18 @@ enum SlashCommand { DeleteAccount, /// Set or query disappearing message TTL for the active conversation. Disappear { arg: Option }, + /// Privacy controls: redact-keys, auto-clear, padding. + Privacy { arg: Option }, + /// Verify that MLS epoch has advanced since last send (forward secrecy check). + VerifyFs, + /// Rotate MLS leaf key AND regenerate + upload hybrid KEM keypair. + RotateAllKeys, + /// List all registered devices. + Devices, + /// Register this device with a name. + RegisterDevice { name: String }, + /// Revoke a device by hex ID prefix. + RevokeDevice { id_prefix: String }, } fn parse_input(line: &str) -> Input { @@ -284,6 +297,24 @@ fn parse_input(line: &str) -> Input { }, "/delete-account" => Input::Slash(SlashCommand::DeleteAccount), "/disappear" => Input::Slash(SlashCommand::Disappear { arg }), + "/privacy" => Input::Slash(SlashCommand::Privacy { arg }), + "/verify-fs" => Input::Slash(SlashCommand::VerifyFs), + "/rotate-all-keys" => Input::Slash(SlashCommand::RotateAllKeys), + "/devices" => Input::Slash(SlashCommand::Devices), + "/register-device" => match arg { + Some(name) => Input::Slash(SlashCommand::RegisterDevice { name }), + None => { + display::print_error("usage: /register-device "); + Input::Empty + } + }, + "/revoke-device" => match arg { + Some(id_prefix) => Input::Slash(SlashCommand::RevokeDevice { id_prefix }), + None => { + display::print_error("usage: /revoke-device "); + Input::Empty + } + }, _ => { display::print_error(&format!("unknown command: {cmd}. Try /help")); Input::Empty @@ -566,6 +597,28 @@ pub async fn run_repl( let now = std::time::Instant::now(); session.typing_indicators.retain(|_, ts| now.duration_since(*ts).as_secs() < 10); + // Auto-clear: delete messages older than the configured duration. + if let Some(max_age_secs) = session.auto_clear_secs { + let cutoff_ms = now_ms().saturating_sub(max_age_secs as u64 * 1000); + if let Err(e) = session.conv_store.delete_messages_before(cutoff_ms) { + tracing::debug!(error = %e, "auto-clear failed"); + } + } + + // Traffic padding: send a dummy message every ~30 seconds. + if session.padding_enabled { + static LAST_PADDING: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); + let now_secs = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + let last = LAST_PADDING.load(std::sync::atomic::Ordering::Relaxed); + if now_secs.saturating_sub(last) >= 30 { + LAST_PADDING.store(now_secs, std::sync::atomic::Ordering::Relaxed); + send_dummy_message(&mut session, &client).await; + } + } + match poll_messages(&mut session, &client).await { Ok(()) => { consecutive_errors = 0; @@ -782,6 +835,12 @@ async fn handle_slash( SlashCommand::Download { index } => cmd_download(session, client, index).await, SlashCommand::DeleteAccount => cmd_delete_account(session, client).await, SlashCommand::Disappear { arg } => cmd_disappear(session, arg.as_deref()), + SlashCommand::Privacy { arg } => cmd_privacy(session, arg.as_deref()), + SlashCommand::VerifyFs => cmd_verify_fs(session), + SlashCommand::RotateAllKeys => cmd_rotate_all_keys(session, client).await, + SlashCommand::Devices => cmd_devices(client).await, + SlashCommand::RegisterDevice { name } => cmd_register_device(client, &name).await, + SlashCommand::RevokeDevice { id_prefix } => cmd_revoke_device(client, &id_prefix).await, }; if let Err(e) = result { display::print_error(&format!("{e:#}")); @@ -824,6 +883,15 @@ fn print_help() { display::print_status(" /disappear - Set disappearing messages (1h, 30m, 1d, 300)"); display::print_status(" /disappear off - Disable disappearing messages"); display::print_status(" /disappear - Show current setting"); + display::print_status(" /privacy - Show current privacy settings"); + display::print_status(" /privacy redact-keys on|off - Redact identity keys in /members, /group-info"); + display::print_status(" /privacy auto-clear - Auto-clear local messages older than "); + display::print_status(" /privacy padding on|off - Toggle dummy traffic padding"); + display::print_status(" /verify-fs - Verify MLS forward secrecy (epoch advancement)"); + display::print_status(" /rotate-all-keys - Rotate MLS key + regenerate hybrid KEM keypair"); + display::print_status(" /devices - List all registered devices"); + display::print_status(" /register-device - Register this device with a name"); + display::print_status(" /revoke-device - Remove a device by hex ID prefix"); display::print_status(" /quit - Exit"); } @@ -898,6 +966,148 @@ fn cmd_disappear( Ok(()) } +fn cmd_privacy( + session: &mut SessionState, + arg: Option<&str>, +) -> anyhow::Result<()> { + match arg { + None => { + display::print_status(&format!( + "redact-keys: {}", + if session.redact_keys { "on" } else { "off" } + )); + display::print_status(&format!( + "auto-clear: {}", + match session.auto_clear_secs { + Some(secs) => format_ttl(secs), + None => "off".to_string(), + } + )); + display::print_status(&format!( + "padding: {}", + if session.padding_enabled { "on" } else { "off" } + )); + } + Some(s) if s.starts_with("redact-keys ") => { + let val = s.trim_start_matches("redact-keys ").trim(); + match val { + "on" => { + session.redact_keys = true; + display::print_status("key redaction enabled"); + } + "off" => { + session.redact_keys = false; + display::print_status("key redaction disabled"); + } + _ => display::print_error("usage: /privacy redact-keys on|off"), + } + } + Some(s) if s.starts_with("auto-clear ") => { + let val = s.trim_start_matches("auto-clear ").trim(); + if val.eq_ignore_ascii_case("off") { + session.auto_clear_secs = None; + display::print_status("auto-clear disabled"); + } else { + let secs = parse_duration_secs(val) + .context("invalid duration; use e.g. 30m, 1h, 1d, or 300")?; + if secs == 0 { + anyhow::bail!("auto-clear duration must be greater than 0"); + } + session.auto_clear_secs = Some(secs); + display::print_status(&format!( + "auto-clear enabled: messages older than {} will be deleted", + format_ttl(secs) + )); + } + } + Some(s) if s.starts_with("padding ") => { + let val = s.trim_start_matches("padding ").trim(); + match val { + "on" => { + session.padding_enabled = true; + display::print_status("traffic padding enabled (dummy messages every 30s)"); + } + "off" => { + session.padding_enabled = false; + display::print_status("traffic padding disabled"); + } + "status" => { + display::print_status(&format!( + "padding: {}", + if session.padding_enabled { "on" } else { "off" } + )); + } + _ => display::print_error("usage: /privacy padding on|off|status"), + } + } + Some(_) => { + display::print_error("usage: /privacy [redact-keys on|off | auto-clear |off | padding on|off|status]"); + } + } + Ok(()) +} + +fn cmd_verify_fs(session: &SessionState) -> anyhow::Result<()> { + let conv_id = session + .active_conversation + .as_ref() + .context("no active conversation")?; + + let member = session + .members + .get(conv_id) + .context("no group member for active conversation")?; + + let current_epoch = member.epoch().context("no MLS group in active conversation")?; + + match session.last_send_epoch { + Some(last) if current_epoch > last => { + display::print_status(&format!( + "forward secrecy OK: epoch advanced from {} to {}", + last, current_epoch + )); + } + Some(last) if current_epoch == last => { + display::print_status(&format!( + "warning: MLS epoch has NOT advanced since last send (epoch {}). \ + Use /rotate-all-keys or /update-key to rotate keys.", + current_epoch + )); + } + Some(last) => { + display::print_status(&format!( + "unexpected: current epoch {} < last send epoch {}", + current_epoch, last + )); + } + None => { + display::print_status(&format!( + "no previous send recorded. Current epoch: {}", + current_epoch + )); + } + } + + Ok(()) +} + +async fn cmd_rotate_all_keys( + session: &mut SessionState, + client: &node_service::Client, +) -> anyhow::Result<()> { + // Step 1: MLS leaf key rotation (same as /update-key). + cmd_update_key(session, client).await?; + + // Step 2: Generate new hybrid KEM keypair and upload. + let new_kp = quicproquo_core::HybridKeypair::generate(); + let id_key = session.identity.public_key_bytes(); + upload_hybrid_key(client, &id_key, &new_kp.public_key()).await?; + session.hybrid_kp = Some(new_kp); + + display::print_status("all keys rotated: MLS leaf key + hybrid KEM keypair"); + Ok(()) +} + /// Discover nearby qpq servers via mDNS (requires `--features mesh` build). fn cmd_mesh_peers() -> anyhow::Result<()> { use super::mesh_discovery::MeshDiscovery; @@ -1626,11 +1836,20 @@ async fn cmd_members( let ids = member.member_identities(); let mut names = Vec::with_capacity(ids.len()); for id in &ids { - let mut name = resolve_or_hex(client, id).await; - if id.as_slice() == my_key.as_slice() { - name.push_str(" (you)"); + if session.redact_keys { + let short = &hex::encode(&id[..4.min(id.len())]); + let mut name = format!("[redacted-{short}]"); + if id.as_slice() == my_key.as_slice() { + name.push_str(" (you)"); + } + names.push(name); + } else { + let mut name = resolve_or_hex(client, id).await; + if id.as_slice() == my_key.as_slice() { + name.push_str(" (you)"); + } + names.push(name); } - names.push(name); } display::print_status(&format!("Members ({}): {}", names.len(), names.join(", "))); Ok(()) @@ -1665,11 +1884,20 @@ async fn cmd_group_info( let mut names = Vec::with_capacity(ids.len()); for id in &ids { - let mut name = resolve_or_hex(client, id).await; - if id.as_slice() == my_key.as_slice() { - name.push_str(" (you)"); + if session.redact_keys { + let short = &hex::encode(&id[..4.min(id.len())]); + let mut name = format!("[redacted-{short}]"); + if id.as_slice() == my_key.as_slice() { + name.push_str(" (you)"); + } + names.push(name); + } else { + let mut name = resolve_or_hex(client, id).await; + if id.as_slice() == my_key.as_slice() { + name.push_str(" (you)"); + } + names.push(name); } - names.push(name); } display::print_status(&format!(" {}", names.join(", "))); @@ -2451,12 +2679,19 @@ async fn do_send( .send_message(&padded) .context("MLS send_message failed")?; + // Collect epoch and recipients before releasing the mutable borrow on session. + let epoch = member.epoch(); let recipients: Vec> = member .member_identities() .into_iter() .filter(|id| id.as_slice() != my_key.as_slice()) .collect(); + // Track epoch for /verify-fs (must be after member borrow is released). + if let Some(epoch) = epoch { + session.last_send_epoch = Some(epoch); + } + let ttl = session.disappear_ttl.get(&conv_id).copied(); for recipient_key in &recipients { @@ -2496,6 +2731,60 @@ async fn do_send( Ok(()) } +/// Send a dummy message for traffic analysis resistance. +async fn send_dummy_message( + session: &mut SessionState, + client: &node_service::Client, +) { + let conv_id = match session.active_conversation.as_ref() { + Some(id) => id.clone(), + None => return, + }; + let my_key = session.identity_bytes(); + let identity = std::sync::Arc::clone(&session.identity); + + let member = match session.get_member_mut(&conv_id) { + Some(m) => m, + None => return, + }; + if member.group_ref().is_none() { + return; + } + + let recipients: Vec> = member + .member_identities() + .into_iter() + .filter(|id| id.as_slice() != my_key.as_slice()) + .collect(); + + if recipients.is_empty() { + return; + } + + let dummy_payload = serialize_dummy(); + let sealed = quicproquo_core::sealed_sender::seal(&identity, &dummy_payload); + let padded = quicproquo_core::padding::pad(&sealed); + + let ct = match member.send_message(&padded) { + Ok(ct) => ct, + Err(_) => return, + }; + + let idx = (now_ms() as usize) % recipients.len(); + let rk = &recipients[idx]; + + let payload = match fetch_hybrid_key(client, rk).await { + Ok(Some(ref pk)) => match hybrid_encrypt(pk, &ct, b"", b"") { + Ok(p) => p, + Err(_) => ct, + }, + _ => ct, + }; + + let _ = enqueue(client, rk, &payload).await; + let _ = session.save_member(&conv_id); +} + // ── Outbox drain ───────────────────────────────────────────────────────────── async fn drain_outbox( @@ -2600,6 +2889,13 @@ async fn poll_messages( break; } + // Dummy messages: silently discard (traffic padding). + if let Ok((_, AppMessage::Dummy)) = &parsed { + any_changed = true; + handled = true; + break; + } + // Read receipts: ephemeral, show subtle notification. if let Ok((_, AppMessage::ReadReceipt { .. })) = &parsed { let is_active = session @@ -2941,3 +3237,81 @@ async fn replenish_pending_key( } } } + +// ── Device management commands ────────────────────────────────────────────── + +async fn cmd_devices(client: &node_service::Client) -> anyhow::Result<()> { + let devices = list_devices(client).await?; + if devices.is_empty() { + display::print_status("No devices registered."); + return Ok(()); + } + display::print_status(&format!("{} device(s):", devices.len())); + for (device_id, name, registered_at) in &devices { + let id_hex = hex::encode(device_id); + let name_display = if name.is_empty() { "(unnamed)" } else { name.as_str() }; + display::print_status(&format!( + " {} - {} (registered: {})", + &id_hex[..16.min(id_hex.len())], + name_display, + registered_at + )); + } + Ok(()) +} + +async fn cmd_register_device( + client: &node_service::Client, + name: &str, +) -> anyhow::Result<()> { + let mut device_id = [0u8; 16]; + rand::RngCore::fill_bytes(&mut rand::rngs::OsRng, &mut device_id); + let success = register_device(client, &device_id, name).await?; + if success { + display::print_status(&format!( + "Device registered: {} ({})", + hex::encode(device_id), + name + )); + } else { + display::print_error("Device already exists with that ID."); + } + Ok(()) +} + +async fn cmd_revoke_device( + client: &node_service::Client, + id_prefix: &str, +) -> anyhow::Result<()> { + let devices = list_devices(client).await?; + let prefix_lower = id_prefix.to_lowercase(); + let matches: Vec<_> = devices + .iter() + .filter(|(id, _, _)| hex::encode(id).starts_with(&prefix_lower)) + .collect(); + + match matches.len() { + 0 => { + display::print_error(&format!("No device matching prefix '{id_prefix}'")); + } + 1 => { + let (device_id, name, _) = matches[0]; + let success = revoke_device(client, device_id).await?; + if success { + display::print_status(&format!( + "Device revoked: {} ({})", + hex::encode(device_id), + if name.is_empty() { "(unnamed)" } else { name.as_str() } + )); + } else { + display::print_error("Device not found on server."); + } + } + n => { + display::print_error(&format!( + "Ambiguous prefix '{id_prefix}' matches {n} devices. Be more specific." + )); + } + } + Ok(()) +} diff --git a/crates/quicproquo-client/src/client/rpc.rs b/crates/quicproquo-client/src/client/rpc.rs index 6e540da..04d2a3f 100644 --- a/crates/quicproquo-client/src/client/rpc.rs +++ b/crates/quicproquo-client/src/client/rpc.rs @@ -870,6 +870,105 @@ pub async fn delete_account( Ok(success) } +/// Register a device for the authenticated identity. +pub async fn register_device( + client: &node_service::Client, + device_id: &[u8], + device_name: &str, +) -> anyhow::Result { + let mut req = client.register_device_request(); + { + let mut p = req.get(); + p.set_device_id(device_id); + p.set_device_name(device_name); + let mut auth = p.reborrow().init_auth(); + set_auth(&mut auth)?; + } + + let resp = req + .send() + .promise + .await + .context("register_device RPC failed")?; + + let success = resp + .get() + .context("register_device: bad response")? + .get_success(); + + Ok(success) +} + +/// List all registered devices for the authenticated identity. +pub async fn list_devices( + client: &node_service::Client, +) -> anyhow::Result, String, u64)>> { + let mut req = client.list_devices_request(); + { + let mut p = req.get(); + let mut auth = p.reborrow().init_auth(); + set_auth(&mut auth)?; + } + + let resp = req + .send() + .promise + .await + .context("list_devices RPC failed")?; + + let devices = resp + .get() + .context("list_devices: bad response")? + .get_devices() + .context("list_devices: missing devices field")?; + + let mut result = Vec::with_capacity(devices.len() as usize); + for i in 0..devices.len() { + let entry = devices.get(i); + let device_id = entry + .get_device_id() + .context("list_devices: missing device_id")? + .to_vec(); + let device_name = entry + .get_device_name() + .context("list_devices: missing device_name")? + .to_str() + .unwrap_or("") + .to_string(); + let registered_at = entry.get_registered_at(); + result.push((device_id, device_name, registered_at)); + } + + Ok(result) +} + +/// Revoke (remove) a registered device. +pub async fn revoke_device( + client: &node_service::Client, + device_id: &[u8], +) -> anyhow::Result { + let mut req = client.revoke_device_request(); + { + let mut p = req.get(); + p.set_device_id(device_id); + let mut auth = p.reborrow().init_auth(); + set_auth(&mut auth)?; + } + + let resp = req + .send() + .promise + .await + .context("revoke_device RPC failed")?; + + let success = resp + .get() + .context("revoke_device: bad response")? + .get_success(); + + Ok(success) +} + /// Return the current Unix timestamp in milliseconds. pub fn current_timestamp_ms() -> u64 { std::time::SystemTime::now() diff --git a/crates/quicproquo-client/src/client/session.rs b/crates/quicproquo-client/src/client/session.rs index b170947..346ac6a 100644 --- a/crates/quicproquo-client/src/client/session.rs +++ b/crates/quicproquo-client/src/client/session.rs @@ -44,6 +44,14 @@ pub struct SessionState { pub typing_indicators: HashMap, /// Per-conversation disappearing message TTL in seconds. None = messages persist. pub disappear_ttl: HashMap, + /// When true, /members and /group-info redact identity keys as `[redacted-XXXX]`. + pub redact_keys: bool, + /// When Some(secs), auto-clear local messages older than this duration. + pub auto_clear_secs: Option, + /// When true, send periodic dummy messages for traffic analysis resistance. + pub padding_enabled: bool, + /// Last epoch at which we sent a message (for /verify-fs). + pub last_send_epoch: Option, } impl SessionState { @@ -80,6 +88,10 @@ impl SessionState { typing_notify_enabled: true, typing_indicators: HashMap::new(), disappear_ttl: HashMap::new(), + redact_keys: false, + auto_clear_secs: None, + padding_enabled: false, + last_send_epoch: None, }; // Migrate legacy single-group into conversations if present and not yet migrated. diff --git a/crates/quicproquo-core/src/app_message.rs b/crates/quicproquo-core/src/app_message.rs index e24c150..247db1b 100644 --- a/crates/quicproquo-core/src/app_message.rs +++ b/crates/quicproquo-core/src/app_message.rs @@ -27,6 +27,7 @@ pub enum MessageType { Edit = 0x06, Delete = 0x07, FileRef = 0x08, + Dummy = 0x09, } impl MessageType { @@ -40,6 +41,7 @@ impl MessageType { 0x06 => Some(MessageType::Edit), 0x07 => Some(MessageType::Delete), 0x08 => Some(MessageType::FileRef), + 0x09 => Some(MessageType::Dummy), _ => None, } } @@ -84,6 +86,8 @@ pub enum AppMessage { file_size: u64, mime_type: Vec, }, + /// Dummy message for traffic analysis resistance (no user-visible content). + Dummy, } /// Generate a new 16-byte message ID (e.g. for Chat/Reply so recipients can reference it). @@ -203,6 +207,11 @@ pub fn serialize_file_ref( Ok(serialize(MessageType::FileRef, &payload)) } +/// Serialize a Dummy message (traffic padding — no user content). +pub fn serialize_dummy() -> Vec { + serialize(MessageType::Dummy, &[]) +} + /// Parse bytes into (MessageType, AppMessage). Fails if version/type unknown or payload too short. pub fn parse(bytes: &[u8]) -> Result<(MessageType, AppMessage), CoreError> { if bytes.len() < 2 { @@ -225,6 +234,7 @@ pub fn parse(bytes: &[u8]) -> Result<(MessageType, AppMessage), CoreError> { MessageType::Edit => parse_edit(payload)?, MessageType::Delete => parse_delete(payload)?, MessageType::FileRef => parse_file_ref(payload)?, + MessageType::Dummy => AppMessage::Dummy, }; Ok((msg_type, app)) } @@ -502,4 +512,12 @@ mod tests { _ => panic!("expected FileRef"), } } + + #[test] + fn roundtrip_dummy() { + let encoded = serialize_dummy(); + let (t, msg) = parse(&encoded).unwrap(); + assert_eq!(t, MessageType::Dummy); + assert_eq!(msg, AppMessage::Dummy); + } } diff --git a/crates/quicproquo-core/src/lib.rs b/crates/quicproquo-core/src/lib.rs index 63fe909..27d67e9 100644 --- a/crates/quicproquo-core/src/lib.rs +++ b/crates/quicproquo-core/src/lib.rs @@ -59,9 +59,9 @@ pub mod opaque_auth; // ── Public API (always available) ─────────────────────────────────────────── pub use app_message::{ - serialize, serialize_chat, serialize_delete, serialize_edit, serialize_file_ref, - serialize_reaction, serialize_read_receipt, serialize_reply, serialize_typing, - parse, generate_message_id, + serialize, serialize_chat, serialize_delete, serialize_dummy, serialize_edit, + serialize_file_ref, serialize_reaction, serialize_read_receipt, serialize_reply, + serialize_typing, parse, generate_message_id, AppMessage, MessageType, VERSION as APP_MESSAGE_VERSION, }; pub use error::CoreError; diff --git a/crates/quicproquo-server/migrations/008_devices.sql b/crates/quicproquo-server/migrations/008_devices.sql new file mode 100644 index 0000000..e5db6ae --- /dev/null +++ b/crates/quicproquo-server/migrations/008_devices.sql @@ -0,0 +1,8 @@ +CREATE TABLE IF NOT EXISTS devices ( + identity_key BLOB NOT NULL, + device_id BLOB NOT NULL, + device_name TEXT NOT NULL DEFAULT '', + registered_at INTEGER NOT NULL DEFAULT (strftime('%s','now')), + PRIMARY KEY (identity_key, device_id) +); +CREATE INDEX IF NOT EXISTS idx_devices_identity ON devices(identity_key); diff --git a/crates/quicproquo-server/src/config.rs b/crates/quicproquo-server/src/config.rs index 2ea0c0f..e64117f 100644 --- a/crates/quicproquo-server/src/config.rs +++ b/crates/quicproquo-server/src/config.rs @@ -33,6 +33,9 @@ pub struct FileConfig { pub federation: Option, /// Directory containing plugin `.so` / `.dylib` files to load at startup. pub plugin_dir: Option, + /// When true, audit logs hash identity key prefixes and omit payload sizes. + #[serde(default)] + pub redact_logs: Option, } #[derive(Debug)] @@ -55,6 +58,8 @@ pub struct EffectiveConfig { pub federation: Option, /// Directory to scan for plugin `.so` / `.dylib` files at startup. None = no plugins. pub plugin_dir: Option, + /// When true, audit logs hash identity key prefixes and omit payload sizes. + pub redact_logs: bool, } #[derive(Debug, Default, Deserialize)] @@ -219,6 +224,7 @@ pub fn merge_config(args: &crate::Args, file: &FileConfig) -> EffectiveConfig { }; let plugin_dir = args.plugin_dir.clone().or_else(|| file.plugin_dir.clone()); + let redact_logs = args.redact_logs || file.redact_logs.unwrap_or(false); EffectiveConfig { listen, @@ -235,6 +241,7 @@ pub fn merge_config(args: &crate::Args, file: &FileConfig) -> EffectiveConfig { metrics_enabled, federation, plugin_dir, + redact_logs, } } diff --git a/crates/quicproquo-server/src/error_codes.rs b/crates/quicproquo-server/src/error_codes.rs index d74c669..86ff3a7 100644 --- a/crates/quicproquo-server/src/error_codes.rs +++ b/crates/quicproquo-server/src/error_codes.rs @@ -31,6 +31,8 @@ pub const E025_BLOB_HASH_LENGTH: &str = "E025"; pub const E026_BLOB_HASH_MISMATCH: &str = "E026"; pub const E027_BLOB_NOT_FOUND: &str = "E027"; pub const E028_ACCOUNT_DELETION_FAILED: &str = "E028"; +pub const E029_DEVICE_LIMIT: &str = "E029"; +pub const E030_DEVICE_NOT_FOUND: &str = "E030"; /// Build a `capnp::Error::failed()` with the structured code prefix. pub fn coded_error(code: &str, msg: impl std::fmt::Display) -> capnp::Error { diff --git a/crates/quicproquo-server/src/main.rs b/crates/quicproquo-server/src/main.rs index a8f4a4e..497cfe3 100644 --- a/crates/quicproquo-server/src/main.rs +++ b/crates/quicproquo-server/src/main.rs @@ -115,6 +115,10 @@ struct Args { /// Each library must export `extern "C" fn qpq_plugin_init(vtable: *mut HookVTable) -> i32`. #[arg(long, env = "QPQ_PLUGIN_DIR")] plugin_dir: Option, + + /// Redact identity key prefixes and payload sizes in audit logs for metadata minimization. + #[arg(long, env = "QPQ_REDACT_LOGS", default_value_t = false)] + redact_logs: bool, } // ── Entry point ─────────────────────────────────────────────────────────────── @@ -599,6 +603,7 @@ async fn main() -> anyhow::Result<()> { let conn_hooks = Arc::clone(&hooks); let conn_kt_log = Arc::clone(&kt_log); let conn_data_dir = PathBuf::from(&effective.data_dir); + let conn_redact_logs = effective.redact_logs; tokio::task::spawn_local(async move { if let Err(e) = handle_node_connection( @@ -617,6 +622,7 @@ async fn main() -> anyhow::Result<()> { conn_hooks, conn_kt_log, conn_data_dir, + conn_redact_logs, ) .await { diff --git a/crates/quicproquo-server/src/node_service/delivery.rs b/crates/quicproquo-server/src/node_service/delivery.rs index 34856f1..fec2bc6 100644 --- a/crates/quicproquo-server/src/node_service/delivery.rs +++ b/crates/quicproquo-server/src/node_service/delivery.rs @@ -21,6 +21,12 @@ use crate::hooks::{HookAction, MessageEvent, FetchEvent}; // Audit events here must not include secrets: no payload content, no full recipient/token bytes (prefix only). +/// Hash first 4 bytes of the key's SHA-256 as a hex string (for redacted audit logs). +fn redacted_prefix(key: &[u8]) -> String { + let hash = Sha256::digest(key); + fmt_hex(&hash[..4]) +} + const MAX_PAYLOAD_BYTES: usize = 5 * 1024 * 1024; // 5 MB cap per message const MAX_QUEUE_DEPTH: usize = 1000; @@ -219,7 +225,7 @@ impl NodeServiceImpl { // Hook: on_message_enqueue — fires after validation, before storage. let hook_event = MessageEvent { - sender_identity, + sender_identity: sender_identity.clone(), recipient_key: recipient_key.clone(), channel_id: channel_id.clone(), payload_len, @@ -254,13 +260,26 @@ impl NodeServiceImpl { if let Ok(depth) = self.store.queue_depth(&recipient_key, &channel_id) { metrics::record_delivery_queue_depth(depth); } - tracing::info!( - sender_prefix = sender_prefix.as_deref().unwrap_or("sealed"), - recipient_prefix = %fmt_hex(&recipient_key[..4]), - payload_len = payload_len, - seq = seq, - "audit: enqueue" - ); + if self.redact_logs { + let redacted_sender = sender_identity + .as_deref() + .map(|id| redacted_prefix(id)) + .unwrap_or_else(|| "sealed".to_string()); + tracing::info!( + sender_prefix = %redacted_sender, + recipient_prefix = %redacted_prefix(&recipient_key), + seq = seq, + "audit: enqueue" + ); + } else { + tracing::info!( + sender_prefix = sender_prefix.as_deref().unwrap_or("sealed"), + recipient_prefix = %fmt_hex(&recipient_key[..4]), + payload_len = payload_len, + seq = seq, + "audit: enqueue" + ); + } crate::auth::waiter(&self.waiters, &recipient_key).notify_waiters(); @@ -380,11 +399,19 @@ impl NodeServiceImpl { // Audit: fetch — do not log payload or full keys. metrics::record_fetch_total(); - tracing::info!( - recipient_prefix = %fmt_hex(&recipient_key[..4]), - count = messages.len(), - "audit: fetch" - ); + if self.redact_logs { + tracing::info!( + recipient_prefix = %redacted_prefix(&recipient_key), + count = messages.len(), + "audit: fetch" + ); + } else { + tracing::info!( + recipient_prefix = %fmt_hex(&recipient_key[..4]), + count = messages.len(), + "audit: fetch" + ); + } let mut list = results.get().init_payloads(messages.len() as u32); for (i, (seq, data)) in messages.iter().enumerate() { @@ -546,11 +573,19 @@ impl NodeServiceImpl { Err(e) => return Promise::err(e), }; - tracing::info!( - recipient_prefix = %fmt_hex(&recipient_key[..4]), - count = messages.len(), - "audit: peek" - ); + if self.redact_logs { + tracing::info!( + recipient_prefix = %redacted_prefix(&recipient_key), + count = messages.len(), + "audit: peek" + ); + } else { + tracing::info!( + recipient_prefix = %fmt_hex(&recipient_key[..4]), + count = messages.len(), + "audit: peek" + ); + } let mut list = results.get().init_payloads(messages.len() as u32); for (i, (seq, data)) in messages.iter().enumerate() { @@ -610,12 +645,21 @@ impl NodeServiceImpl { .map_err(storage_err) { Ok(removed) => { - tracing::info!( - recipient_prefix = %fmt_hex(&recipient_key[..4]), - seq_up_to = seq_up_to, - removed = removed, - "audit: ack" - ); + if self.redact_logs { + tracing::info!( + recipient_prefix = %redacted_prefix(&recipient_key), + seq_up_to = seq_up_to, + removed = removed, + "audit: ack" + ); + } else { + tracing::info!( + recipient_prefix = %fmt_hex(&recipient_key[..4]), + seq_up_to = seq_up_to, + removed = removed, + "audit: ack" + ); + } } Err(e) => return Promise::err(e), } @@ -778,6 +822,7 @@ impl NodeServiceImpl { let fed_client = self.federation_client.clone(); let local_domain = self.local_domain.clone(); let hooks = Arc::clone(&self.hooks); + let redact_logs = self.redact_logs; // Use an async future to support federation relay alongside local enqueue. // All storage operations are synchronous; only federation relay calls are await-ed. @@ -839,12 +884,19 @@ impl NodeServiceImpl { // Hook: on_batch_enqueue — fires after all messages are stored. hooks.on_batch_enqueue(&hook_events); - tracing::info!( - sender_prefix = sender_prefix.as_deref().unwrap_or("sealed"), - recipient_count = n, - payload_len = payload.len(), - "audit: batch_enqueue" - ); + if redact_logs { + tracing::info!( + recipient_count = n, + "audit: batch_enqueue" + ); + } else { + tracing::info!( + sender_prefix = sender_prefix.as_deref().unwrap_or("sealed"), + recipient_count = n, + payload_len = payload.len(), + "audit: batch_enqueue" + ); + } Ok(()) }) diff --git a/crates/quicproquo-server/src/node_service/device_ops.rs b/crates/quicproquo-server/src/node_service/device_ops.rs new file mode 100644 index 0000000..b0e00d9 --- /dev/null +++ b/crates/quicproquo-server/src/node_service/device_ops.rs @@ -0,0 +1,151 @@ +//! Device registry RPC handlers: registerDevice, listDevices, revokeDevice. + +use capnp::capability::Promise; +use quicproquo_proto::node_capnp::node_service; + +use crate::auth::{coded_error, require_identity, validate_auth_context}; +use crate::error_codes::*; +use crate::storage::StorageError; + +use super::NodeServiceImpl; + +const MAX_DEVICES_PER_IDENTITY: usize = 5; + +fn storage_err(err: StorageError) -> capnp::Error { + coded_error(E009_STORAGE_ERROR, err) +} + +impl NodeServiceImpl { + pub fn handle_register_device( + &mut self, + params: node_service::RegisterDeviceParams, + mut results: node_service::RegisterDeviceResults, + ) -> Promise<(), capnp::Error> { + let p = match params.get() { + Ok(p) => p, + Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)), + }; + + let auth_ctx = match validate_auth_context(&self.auth_cfg, &self.sessions, p.get_auth()) { + Ok(ctx) => ctx, + Err(e) => return Promise::err(e), + }; + + let identity_key = match require_identity(&auth_ctx) { + Ok(ik) => ik.to_vec(), + Err(e) => return Promise::err(e), + }; + + let device_id = match p.get_device_id() { + Ok(v) => v.to_vec(), + Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)), + }; + + if device_id.is_empty() { + return Promise::err(coded_error(E020_BAD_PARAMS, "deviceId must not be empty")); + } + + let device_name = match p.get_device_name() { + Ok(n) => n.to_str().unwrap_or("").to_string(), + Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)), + }; + + // Check device limit. + match self.store.device_count(&identity_key) { + Ok(count) if count >= MAX_DEVICES_PER_IDENTITY => { + return Promise::err(coded_error( + E029_DEVICE_LIMIT, + format!("maximum {MAX_DEVICES_PER_IDENTITY} devices per identity"), + )); + } + Err(e) => return Promise::err(storage_err(e)), + _ => {} + } + + match self.store.register_device(&identity_key, &device_id, &device_name) { + Ok(success) => { + results.get().set_success(success); + Promise::ok(()) + } + Err(e) => Promise::err(storage_err(e)), + } + } + + pub fn handle_list_devices( + &mut self, + params: node_service::ListDevicesParams, + mut results: node_service::ListDevicesResults, + ) -> Promise<(), capnp::Error> { + let p = match params.get() { + Ok(p) => p, + Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)), + }; + + let auth_ctx = match validate_auth_context(&self.auth_cfg, &self.sessions, p.get_auth()) { + Ok(ctx) => ctx, + Err(e) => return Promise::err(e), + }; + + let identity_key = match require_identity(&auth_ctx) { + Ok(ik) => ik.to_vec(), + Err(e) => return Promise::err(e), + }; + + let devices = match self.store.list_devices(&identity_key) { + Ok(d) => d, + Err(e) => return Promise::err(storage_err(e)), + }; + + let r = results.get(); + let mut list = r.init_devices(devices.len() as u32); + for (i, (device_id, name, registered_at)) in devices.iter().enumerate() { + let mut entry = list.reborrow().get(i as u32); + entry.set_device_id(device_id); + entry.set_device_name(name); + entry.set_registered_at(*registered_at); + } + + Promise::ok(()) + } + + pub fn handle_revoke_device( + &mut self, + params: node_service::RevokeDeviceParams, + mut results: node_service::RevokeDeviceResults, + ) -> Promise<(), capnp::Error> { + let p = match params.get() { + Ok(p) => p, + Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)), + }; + + let auth_ctx = match validate_auth_context(&self.auth_cfg, &self.sessions, p.get_auth()) { + Ok(ctx) => ctx, + Err(e) => return Promise::err(e), + }; + + let identity_key = match require_identity(&auth_ctx) { + Ok(ik) => ik.to_vec(), + Err(e) => return Promise::err(e), + }; + + let device_id = match p.get_device_id() { + Ok(v) => v.to_vec(), + Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)), + }; + + if device_id.is_empty() { + return Promise::err(coded_error(E020_BAD_PARAMS, "deviceId must not be empty")); + } + + match self.store.revoke_device(&identity_key, &device_id) { + Ok(true) => { + results.get().set_success(true); + Promise::ok(()) + } + Ok(false) => { + Promise::err(coded_error(E030_DEVICE_NOT_FOUND, "device not found")) + } + Err(e) => Promise::err(storage_err(e)), + } + } +} diff --git a/crates/quicproquo-server/src/node_service/mod.rs b/crates/quicproquo-server/src/node_service/mod.rs index 63a168f..2a55f02 100644 --- a/crates/quicproquo-server/src/node_service/mod.rs +++ b/crates/quicproquo-server/src/node_service/mod.rs @@ -25,6 +25,7 @@ mod auth_ops; mod blob_ops; mod channel_ops; mod delivery; +mod device_ops; mod key_ops; mod p2p_ops; mod user_ops; @@ -221,6 +222,30 @@ impl node_service::Server for NodeServiceImpl { ) -> capnp::capability::Promise<(), capnp::Error> { self.handle_delete_account(params, results) } + + fn register_device( + &mut self, + params: node_service::RegisterDeviceParams, + results: node_service::RegisterDeviceResults, + ) -> capnp::capability::Promise<(), capnp::Error> { + self.handle_register_device(params, results) + } + + fn list_devices( + &mut self, + params: node_service::ListDevicesParams, + results: node_service::ListDevicesResults, + ) -> capnp::capability::Promise<(), capnp::Error> { + self.handle_list_devices(params, results) + } + + fn revoke_device( + &mut self, + params: node_service::RevokeDeviceParams, + results: node_service::RevokeDeviceResults, + ) -> capnp::capability::Promise<(), capnp::Error> { + self.handle_revoke_device(params, results) + } } pub const CURRENT_WIRE_VERSION: u16 = 1; @@ -247,6 +272,8 @@ pub struct NodeServiceImpl { pub kt_log: Arc>, /// Server data directory (used for blob storage). pub data_dir: PathBuf, + /// When true, hash identity key prefixes and omit payload sizes in audit logs. + pub redact_logs: bool, } impl NodeServiceImpl { @@ -266,6 +293,7 @@ impl NodeServiceImpl { hooks: Arc, kt_log: Arc>, data_dir: PathBuf, + redact_logs: bool, ) -> Self { Self { store, @@ -282,6 +310,7 @@ impl NodeServiceImpl { signing_key, kt_log, data_dir, + redact_logs, } } } @@ -303,6 +332,7 @@ pub async fn handle_node_connection( hooks: Arc, kt_log: Arc>, data_dir: PathBuf, + redact_logs: bool, ) -> Result<(), anyhow::Error> { let connection = connecting.await?; @@ -338,6 +368,7 @@ pub async fn handle_node_connection( hooks, kt_log, data_dir, + redact_logs, )); RpcSystem::new(Box::new(network), Some(service.client)) diff --git a/crates/quicproquo-server/src/sql_store.rs b/crates/quicproquo-server/src/sql_store.rs index af37629..0295c8e 100644 --- a/crates/quicproquo-server/src/sql_store.rs +++ b/crates/quicproquo-server/src/sql_store.rs @@ -9,7 +9,7 @@ use rusqlite::{params, Connection}; use crate::storage::{StorageError, Store}; /// Schema version after introducing the migration runner (existing DBs had 1). -const SCHEMA_VERSION: i32 = 8; +const SCHEMA_VERSION: i32 = 9; /// Migrations: (migration_number, SQL). Files named NNN_name.sql, applied in order when N > user_version. const MIGRATIONS: &[(i32, &str)] = &[ @@ -20,6 +20,7 @@ const MIGRATIONS: &[(i32, &str)] = &[ (6, include_str!("../migrations/005_signing_key.sql")), (7, include_str!("../migrations/006_kt_log.sql")), (8, include_str!("../migrations/007_add_expiry.sql")), + (9, include_str!("../migrations/008_devices.sql")), ]; /// Runs pending migrations on an open connection: applies any migration whose number is greater @@ -693,6 +694,12 @@ impl Store for SqlStore { params![identity_key], ); + // 8. Delete devices. + let _ = conn.execute( + "DELETE FROM devices WHERE identity_key = ?1", + params![identity_key], + ); + // Do NOT delete KT log entries — append-only for auditability. Ok(()) @@ -710,6 +717,69 @@ impl Store for SqlStore { } } } + + fn register_device(&self, identity_key: &[u8], device_id: &[u8], device_name: &str) -> Result { + let conn = self.lock_conn()?; + // Check if device already exists. + let exists: bool = conn + .query_row( + "SELECT EXISTS(SELECT 1 FROM devices WHERE identity_key = ?1 AND device_id = ?2)", + params![identity_key, device_id], + |row| row.get(0), + ) + .map_err(|e| StorageError::Db(e.to_string()))?; + if exists { + return Ok(false); + } + conn.execute( + "INSERT INTO devices (identity_key, device_id, device_name) VALUES (?1, ?2, ?3)", + params![identity_key, device_id, device_name], + ) + .map_err(|e| StorageError::Db(e.to_string()))?; + Ok(true) + } + + fn list_devices(&self, identity_key: &[u8]) -> Result, String, u64)>, StorageError> { + let conn = self.lock_conn()?; + let mut stmt = conn + .prepare("SELECT device_id, device_name, registered_at FROM devices WHERE identity_key = ?1 ORDER BY registered_at ASC") + .map_err(|e| StorageError::Db(e.to_string()))?; + let rows = stmt + .query_map(params![identity_key], |row| { + Ok(( + row.get::<_, Vec>(0)?, + row.get::<_, String>(1)?, + row.get::<_, i64>(2)? as u64, + )) + }) + .map_err(|e| StorageError::Db(e.to_string()))? + .collect::, _>>() + .map_err(|e| StorageError::Db(e.to_string()))?; + Ok(rows) + } + + fn revoke_device(&self, identity_key: &[u8], device_id: &[u8]) -> Result { + let conn = self.lock_conn()?; + let deleted = conn + .execute( + "DELETE FROM devices WHERE identity_key = ?1 AND device_id = ?2", + params![identity_key, device_id], + ) + .map_err(|e| StorageError::Db(e.to_string()))?; + Ok(deleted > 0) + } + + fn device_count(&self, identity_key: &[u8]) -> Result { + let conn = self.lock_conn()?; + let count: i64 = conn + .query_row( + "SELECT COUNT(*) FROM devices WHERE identity_key = ?1", + params![identity_key], + |row| row.get(0), + ) + .map_err(|e| StorageError::Db(e.to_string()))?; + Ok(count as usize) + } } /// Convenience extension for `rusqlite::OptionalExtension`. diff --git a/crates/quicproquo-server/src/storage.rs b/crates/quicproquo-server/src/storage.rs index f78c85b..76a28b5 100644 --- a/crates/quicproquo-server/src/storage.rs +++ b/crates/quicproquo-server/src/storage.rs @@ -184,6 +184,21 @@ pub trait Store: Send + Sync { /// user identity key mapping, and the user record itself. /// Does NOT delete KT log entries (append-only for auditability). fn delete_account(&self, identity_key: &[u8]) -> Result<(), StorageError>; + + // ── Device registry ───────────────────────────────────────────────────── + + /// Register a device for an identity. Returns false if the device already exists. + /// Caller must check device_count < 5 before calling. + fn register_device(&self, identity_key: &[u8], device_id: &[u8], device_name: &str) -> Result; + + /// List all registered devices for an identity: (device_id, name, registered_at). + fn list_devices(&self, identity_key: &[u8]) -> Result, String, u64)>, StorageError>; + + /// Revoke (remove) a registered device. Returns false if not found. + fn revoke_device(&self, identity_key: &[u8], device_id: &[u8]) -> Result; + + /// Return the number of registered devices for an identity. + fn device_count(&self, identity_key: &[u8]) -> Result; } // ── ChannelKey ─────────────────────────────────────────────────────────────── @@ -247,6 +262,8 @@ pub struct FileBackedStore { users: Mutex>>, identity_keys: Mutex>>, endpoints: Mutex, Vec>>, + /// Device registry: identity_key -> Vec<(device_id, device_name, registered_at)> + devices: Mutex, Vec<(Vec, String, u64)>>>, } impl FileBackedStore { @@ -289,6 +306,7 @@ impl FileBackedStore { users, identity_keys, endpoints: Mutex::new(HashMap::new()), + devices: Mutex::new(HashMap::new()), }) } @@ -837,8 +855,49 @@ impl Store for FileBackedStore { ep.remove(identity_key); } + // Remove devices. + { + let mut dev = lock(&self.devices)?; + dev.remove(identity_key); + } + Ok(()) } + + fn register_device(&self, identity_key: &[u8], device_id: &[u8], device_name: &str) -> Result { + let mut map = lock(&self.devices)?; + let devices = map.entry(identity_key.to_vec()).or_default(); + if devices.iter().any(|(id, _, _)| id == device_id) { + return Ok(false); + } + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + devices.push((device_id.to_vec(), device_name.to_string(), now)); + Ok(true) + } + + fn list_devices(&self, identity_key: &[u8]) -> Result, String, u64)>, StorageError> { + let map = lock(&self.devices)?; + Ok(map.get(identity_key).cloned().unwrap_or_default()) + } + + fn revoke_device(&self, identity_key: &[u8], device_id: &[u8]) -> Result { + let mut map = lock(&self.devices)?; + if let Some(devices) = map.get_mut(identity_key) { + let before = devices.len(); + devices.retain(|(id, _, _)| id != device_id); + Ok(devices.len() < before) + } else { + Ok(false) + } + } + + fn device_count(&self, identity_key: &[u8]) -> Result { + let map = lock(&self.devices)?; + Ok(map.get(identity_key).map(|v| v.len()).unwrap_or(0)) + } } #[cfg(test)] diff --git a/schemas/node.capnp b/schemas/node.capnp index e09e9a3..c0763e9 100644 --- a/schemas/node.capnp +++ b/schemas/node.capnp @@ -123,6 +123,15 @@ interface NodeService { # A tombstone entry is added to the KT log for auditability. # All active sessions for the identity are invalidated. deleteAccount @23 (auth :Auth) -> (success :Bool); + + # Register a device for the authenticated identity. Max 5 devices per identity. + registerDevice @24 (auth :Auth, deviceId :Data, deviceName :Text) -> (success :Bool); + + # List all registered devices for the authenticated identity. + listDevices @25 (auth :Auth) -> (devices :List(Device)); + + # Revoke (remove) a registered device. + revokeDevice @26 (auth :Auth, deviceId :Data) -> (success :Bool); } struct Auth { @@ -131,6 +140,12 @@ struct Auth { deviceId @2 :Data; # optional UUID bytes for auditing/rate limiting } +struct Device { + deviceId @0 :Data; + deviceName @1 :Text; + registeredAt @2 :UInt64; +} + # A delivery envelope pairing a per-inbox sequence number with an opaque payload. # Clients sort by `seq` before processing to guarantee MLS commit ordering. struct Envelope {