From fd21ea625c8d9a88835d0f630e6cf5305922766e Mon Sep 17 00:00:00 2001 From: Christian Nennemann Date: Wed, 4 Mar 2026 00:39:05 +0100 Subject: [PATCH] =?UTF-8?q?feat:=20Sprint=206=20=E2=80=94=20disappearing?= =?UTF-8?q?=20messages,=20group=20info,=20account=20deletion?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Disappearing messages: ttlSecs param on enqueue/batchEnqueue RPCs, expires_at column (migration 007), server GC deletes expired messages, /disappear command with human-friendly duration parsing (30m, 1h, 1d) - Group info: /group-info shows type, members, MLS epoch; /rename renames conversations; /members resolves usernames via resolveIdentity - Account deletion: deleteAccount @23 RPC with transactional purge of all user data (deliveries, keys, channels), session invalidation, KT log preserved for auditability; /delete-account with confirmation - Added epoch() accessor to GroupMember, enqueue_with_ttl client helper All 35 server + 71 core + 14 E2E tests pass. --- crates/quicproquo-client/src/client/repl.rs | 234 +++++++++++++++++- crates/quicproquo-client/src/client/rpc.rs | 39 +++ .../quicproquo-client/src/client/session.rs | 3 + crates/quicproquo-core/src/group.rs | 5 + .../migrations/007_add_expiry.sql | 2 + crates/quicproquo-server/src/error_codes.rs | 1 + .../src/federation/service.rs | 4 +- .../src/node_service/account_ops.rs | 63 +++++ .../src/node_service/delivery.rs | 8 +- .../quicproquo-server/src/node_service/mod.rs | 9 + crates/quicproquo-server/src/sql_store.rs | 142 +++++++++-- crates/quicproquo-server/src/storage.rs | 80 +++++- schemas/node.capnp | 11 +- 13 files changed, 563 insertions(+), 38 deletions(-) create mode 100644 crates/quicproquo-server/migrations/007_add_expiry.sql create mode 100644 crates/quicproquo-server/src/node_service/account_ops.rs diff --git a/crates/quicproquo-client/src/client/repl.rs b/crates/quicproquo-client/src/client/repl.rs index 622fce5..6d04301 100644 --- a/crates/quicproquo-client/src/client/repl.rs +++ b/crates/quicproquo-client/src/client/repl.rs @@ -26,9 +26,9 @@ use super::conversation::{ }; use super::display; use super::rpc::{ - connect_node, create_channel, download_blob_chunk, enqueue, fetch_hybrid_key, - fetch_key_package, fetch_wait, resolve_identity, resolve_user, try_hybrid_decrypt, - upload_blob_chunk, upload_hybrid_key, upload_key_package, + connect_node, create_channel, delete_account, download_blob_chunk, enqueue, + enqueue_with_ttl, fetch_hybrid_key, fetch_key_package, fetch_wait, resolve_identity, + resolve_user, try_hybrid_decrypt, upload_blob_chunk, upload_hybrid_key, upload_key_package, }; use super::session::SessionState; use super::state::{decode_identity_key, load_or_init_state}; @@ -55,6 +55,8 @@ enum SlashCommand { Leave, Join, Members, + GroupInfo, + Rename { name: String }, History { count: usize }, /// Mesh subcommands: /mesh peers, /mesh server MeshPeers, @@ -77,6 +79,10 @@ enum SlashCommand { SendFile { path: String }, /// Download a file attachment by message index. Download { index: usize }, + /// Permanently delete the user's account on the server. + DeleteAccount, + /// Set or query disappearing message TTL for the active conversation. + Disappear { arg: Option }, } fn parse_input(line: &str) -> Input { @@ -135,6 +141,14 @@ fn parse_input(line: &str) -> Input { "/leave" => Input::Slash(SlashCommand::Leave), "/join" => Input::Slash(SlashCommand::Join), "/members" => Input::Slash(SlashCommand::Members), + "/group-info" | "/gi" => Input::Slash(SlashCommand::GroupInfo), + "/rename" => match arg { + Some(name) => Input::Slash(SlashCommand::Rename { name }), + None => { + display::print_error("usage: /rename "); + Input::Empty + } + }, "/history" | "/hist" => { let count = arg.and_then(|s| s.parse().ok()).unwrap_or(20); Input::Slash(SlashCommand::History { count }) @@ -224,6 +238,8 @@ fn parse_input(line: &str) -> Input { Input::Empty } }, + "/delete-account" => Input::Slash(SlashCommand::DeleteAccount), + "/disappear" => Input::Slash(SlashCommand::Disappear { arg }), _ => { display::print_error(&format!("unknown command: {cmd}. Try /help")); Input::Empty @@ -687,7 +703,9 @@ async fn handle_slash( SlashCommand::Remove { target } => cmd_remove(session, client, &target).await, SlashCommand::Leave => cmd_leave(session, client).await, SlashCommand::Join => cmd_join(session, client).await, - SlashCommand::Members => cmd_members(session), + SlashCommand::Members => cmd_members(session, client).await, + SlashCommand::GroupInfo => cmd_group_info(session, client).await, + SlashCommand::Rename { name } => cmd_rename(session, &name), SlashCommand::History { count } => cmd_history(session, count), SlashCommand::MeshPeers => cmd_mesh_peers(), SlashCommand::MeshServer { addr } => { @@ -712,6 +730,8 @@ async fn handle_slash( SlashCommand::Delete { index } => cmd_delete(session, client, index).await, SlashCommand::SendFile { path } => cmd_send_file(session, client, &path).await, SlashCommand::Download { index } => cmd_download(session, client, index).await, + SlashCommand::DeleteAccount => cmd_delete_account(session, client).await, + SlashCommand::Disappear { arg } => cmd_disappear(session, arg.as_deref()), }; if let Err(e) = result { display::print_error(&format!("{e:#}")); @@ -729,6 +749,8 @@ fn print_help() { display::print_status(" /switch <@user|#group> - Switch conversation"); display::print_status(" /list - List all conversations"); display::print_status(" /members - Show members of current conversation"); + display::print_status(" /group-info - Show detailed info about the active conversation"); + display::print_status(" /rename - Rename the current conversation"); display::print_status(" /history [N] - Show last N messages (default: 20)"); display::print_status(" /whoami - Show your identity"); display::print_status(" /mesh peers - Discover nearby qpq nodes via mDNS"); @@ -742,9 +764,84 @@ fn print_help() { display::print_status(" /delete - Delete a sent message"); display::print_status(" /send-file - Upload and send a file (max 50 MB)"); display::print_status(" /download - Download a received file attachment"); + display::print_status(" /delete-account - Permanently delete your account"); + display::print_status(" /disappear - Set disappearing messages (1h, 30m, 1d, 300)"); + display::print_status(" /disappear off - Disable disappearing messages"); + display::print_status(" /disappear - Show current setting"); display::print_status(" /quit - Exit"); } +/// Parse a human-friendly duration string into seconds. +/// Supports: "30s", "5m", "1h", "1d", "300" (plain seconds). +fn parse_duration_secs(s: &str) -> Option { + let s = s.trim().to_lowercase(); + if s.ends_with('d') { + s[..s.len() - 1].parse::().ok().map(|d| d * 86400) + } else if s.ends_with('h') { + s[..s.len() - 1].parse::().ok().map(|h| h * 3600) + } else if s.ends_with('m') { + s[..s.len() - 1].parse::().ok().map(|m| m * 60) + } else if s.ends_with('s') { + s[..s.len() - 1].parse::().ok() + } else { + s.parse::().ok() + } +} + +/// Format a TTL in seconds into a human-friendly string. +fn format_ttl(secs: u32) -> String { + if secs >= 86400 && secs % 86400 == 0 { + format!("{} day(s)", secs / 86400) + } else if secs >= 3600 && secs % 3600 == 0 { + format!("{} hour(s)", secs / 3600) + } else if secs >= 60 && secs % 60 == 0 { + format!("{} minute(s)", secs / 60) + } else { + format!("{} second(s)", secs) + } +} + +fn cmd_disappear( + session: &mut SessionState, + arg: Option<&str>, +) -> anyhow::Result<()> { + let conv_id = session + .active_conversation + .as_ref() + .context("no active conversation")? + .clone(); + + match arg { + None => { + // Show current setting. + match session.disappear_ttl.get(&conv_id) { + Some(ttl) => display::print_status(&format!( + "messages will disappear after {}", + format_ttl(*ttl) + )), + None => display::print_status("disappearing messages are off"), + } + } + Some(s) if s.eq_ignore_ascii_case("off") => { + session.disappear_ttl.remove(&conv_id); + display::print_status("disappearing messages disabled"); + } + Some(s) => { + let secs = parse_duration_secs(s) + .context("invalid duration; use e.g. 30m, 1h, 1d, or 300")?; + if secs == 0 { + anyhow::bail!("TTL must be greater than 0"); + } + session.disappear_ttl.insert(conv_id, secs); + display::print_status(&format!( + "messages will disappear after {}", + format_ttl(secs) + )); + } + } + Ok(()) +} + /// Discover nearby qpq servers via mDNS (requires `--features mesh` build). fn cmd_mesh_peers() -> anyhow::Result<()> { use super::mesh_discovery::MeshDiscovery; @@ -1325,7 +1422,21 @@ async fn cmd_join( Ok(()) } -fn cmd_members(session: &SessionState) -> anyhow::Result<()> { +/// Resolve an identity key to a username, falling back to a hex prefix on failure. +async fn resolve_or_hex( + client: &node_service::Client, + identity_key: &[u8], +) -> String { + match resolve_identity(client, identity_key).await { + Ok(Some(name)) => name, + _ => hex::encode(&identity_key[..8.min(identity_key.len())]), + } +} + +async fn cmd_members( + session: &SessionState, + client: &node_service::Client, +) -> anyhow::Result<()> { let conv_id = session .active_conversation .as_ref() @@ -1338,11 +1449,77 @@ fn cmd_members(session: &SessionState) -> anyhow::Result<()> { let my_key = session.identity_bytes(); let ids = member.member_identities(); - display::print_status(&format!("{} members:", ids.len())); + let mut names = Vec::with_capacity(ids.len()); for id in &ids { - let tag = if id.as_slice() == my_key.as_slice() { " (you)" } else { "" }; - display::print_status(&format!(" {}{tag}", hex::encode(&id[..8]))); + let mut name = resolve_or_hex(client, id).await; + if id.as_slice() == my_key.as_slice() { + name.push_str(" (you)"); + } + names.push(name); } + display::print_status(&format!("Members ({}): {}", names.len(), names.join(", "))); + Ok(()) +} + +async fn cmd_group_info( + session: &SessionState, + client: &node_service::Client, +) -> anyhow::Result<()> { + let conv_id = session + .active_conversation + .as_ref() + .context("no active conversation")?; + + let conv = session + .conv_store + .load_conversation(conv_id)? + .context("conversation not found in store")?; + + let member = session + .members + .get(conv_id) + .context("no group member for active conversation")?; + + let my_key = session.identity_bytes(); + let ids = member.member_identities(); + let conv_type = if ids.len() <= 2 { "DM" } else { "Group" }; + + display::print_status(&format!("Conversation: {}", conv.display_name)); + display::print_status(&format!("Type: {}", conv_type)); + display::print_status(&format!("Members: {}", ids.len())); + + let mut names = Vec::with_capacity(ids.len()); + for id in &ids { + let mut name = resolve_or_hex(client, id).await; + if id.as_slice() == my_key.as_slice() { + name.push_str(" (you)"); + } + names.push(name); + } + display::print_status(&format!(" {}", names.join(", "))); + + if let Some(epoch) = member.epoch() { + display::print_status(&format!("MLS epoch: {}", epoch)); + } + + Ok(()) +} + +fn cmd_rename(session: &mut SessionState, new_name: &str) -> anyhow::Result<()> { + let conv_id = session + .active_conversation + .as_ref() + .context("no active conversation")? + .clone(); + + let mut conv = session + .conv_store + .load_conversation(&conv_id)? + .context("conversation not found in store")?; + + conv.display_name = new_name.to_string(); + session.conv_store.save_conversation(&conv)?; + display::print_status(&format!("Conversation renamed to: {new_name}")); Ok(()) } @@ -2002,6 +2179,43 @@ fn extract_filename_from_body(body: &str) -> Option { } } +async fn cmd_delete_account( + session: &mut SessionState, + client: &node_service::Client, +) -> anyhow::Result<()> { + display::print_error("WARNING: This will permanently delete your account and all data on the server."); + display::print_status("Type 'DELETE' to confirm:"); + + // Read confirmation from stdin. + let mut input = String::new(); + { + use std::io::Write; + std::io::stderr().flush().ok(); + } + std::io::stdin() + .read_line(&mut input) + .context("failed to read confirmation")?; + + if input.trim() != "DELETE" { + display::print_status("Account deletion cancelled."); + return Ok(()); + } + + delete_account(client).await?; + + // Clear local state file. + if session.state_path.exists() { + std::fs::remove_file(&session.state_path) + .with_context(|| format!("remove state file: {}", session.state_path.display()))?; + } + + // Clear cached session token. + clear_cached_session(&session.state_path); + + display::print_status("Account deleted successfully."); + std::process::exit(0); +} + // ── Sending ────────────────────────────────────────────────────────────────── async fn handle_send( @@ -2068,6 +2282,8 @@ async fn do_send( .filter(|id| id.as_slice() != my_key.as_slice()) .collect(); + let ttl = session.disappear_ttl.get(&conv_id).copied(); + for recipient_key in &recipients { let peer_hybrid_pk = fetch_hybrid_key(client, recipient_key).await?; let payload = if let Some(ref pk) = peer_hybrid_pk { @@ -2075,7 +2291,7 @@ async fn do_send( } else { ct.clone() }; - enqueue(client, recipient_key, &payload).await?; + enqueue_with_ttl(client, recipient_key, &payload, ttl).await?; } // Extract message_id from what we just serialized. diff --git a/crates/quicproquo-client/src/client/rpc.rs b/crates/quicproquo-client/src/client/rpc.rs index 8d24b63..6e540da 100644 --- a/crates/quicproquo-client/src/client/rpc.rs +++ b/crates/quicproquo-client/src/client/rpc.rs @@ -226,6 +226,16 @@ pub async fn enqueue( client: &node_service::Client, recipient_key: &[u8], payload: &[u8], +) -> anyhow::Result { + enqueue_with_ttl(client, recipient_key, payload, None).await +} + +/// Enqueue with an optional TTL (seconds). 0 or None means no expiry. +pub async fn enqueue_with_ttl( + client: &node_service::Client, + recipient_key: &[u8], + payload: &[u8], + ttl_secs: Option, ) -> anyhow::Result { let client = client.clone(); let recipient_key = recipient_key.to_vec(); @@ -243,6 +253,9 @@ pub async fn enqueue( p.set_payload(&payload); p.set_channel_id(&[]); p.set_version(1); + if let Some(ttl) = ttl_secs { + p.set_ttl_secs(ttl); + } let mut auth = p.reborrow().init_auth(); set_auth(&mut auth)?; } @@ -831,6 +844,32 @@ pub async fn download_blob_chunk( Ok((chunk, total_size, mime_type)) } +/// Delete the authenticated user's account on the server. +/// Requires an identity-bound session (OPAQUE login). +pub async fn delete_account( + client: &node_service::Client, +) -> anyhow::Result { + let mut req = client.delete_account_request(); + { + let mut p = req.get(); + let mut auth = p.reborrow().init_auth(); + set_auth(&mut auth)?; + } + + let resp = req + .send() + .promise + .await + .context("delete_account RPC failed")?; + + let success = resp + .get() + .context("delete_account: bad response")? + .get_success(); + + Ok(success) +} + /// Return the current Unix timestamp in milliseconds. pub fn current_timestamp_ms() -> u64 { std::time::SystemTime::now() diff --git a/crates/quicproquo-client/src/client/session.rs b/crates/quicproquo-client/src/client/session.rs index 59235b8..b170947 100644 --- a/crates/quicproquo-client/src/client/session.rs +++ b/crates/quicproquo-client/src/client/session.rs @@ -42,6 +42,8 @@ pub struct SessionState { /// Tracks who is currently typing and when the indicator was last received. /// Entries older than 10 seconds are considered expired. pub typing_indicators: HashMap, + /// Per-conversation disappearing message TTL in seconds. None = messages persist. + pub disappear_ttl: HashMap, } impl SessionState { @@ -77,6 +79,7 @@ impl SessionState { pending_member: None, typing_notify_enabled: true, typing_indicators: HashMap::new(), + disappear_ttl: HashMap::new(), }; // Migrate legacy single-group into conversations if present and not yet migrated. diff --git a/crates/quicproquo-core/src/group.rs b/crates/quicproquo-core/src/group.rs index 8247b29..e06cd1b 100644 --- a/crates/quicproquo-core/src/group.rs +++ b/crates/quicproquo-core/src/group.rs @@ -611,6 +611,11 @@ impl GroupMember { self.hybrid } + /// Return the current MLS epoch, or `None` if no group is active. + pub fn epoch(&self) -> Option { + self.group.as_ref().map(|g| g.epoch().as_u64()) + } + /// Return a reference to the MLS group, if active. pub fn group_ref(&self) -> Option<&MlsGroup> { self.group.as_ref() diff --git a/crates/quicproquo-server/migrations/007_add_expiry.sql b/crates/quicproquo-server/migrations/007_add_expiry.sql new file mode 100644 index 0000000..cbe7e09 --- /dev/null +++ b/crates/quicproquo-server/migrations/007_add_expiry.sql @@ -0,0 +1,2 @@ +ALTER TABLE deliveries ADD COLUMN expires_at INTEGER; +CREATE INDEX idx_deliveries_expires ON deliveries(expires_at) WHERE expires_at IS NOT NULL; diff --git a/crates/quicproquo-server/src/error_codes.rs b/crates/quicproquo-server/src/error_codes.rs index 4139b71..d74c669 100644 --- a/crates/quicproquo-server/src/error_codes.rs +++ b/crates/quicproquo-server/src/error_codes.rs @@ -30,6 +30,7 @@ pub const E024_BLOB_TOO_LARGE: &str = "E024"; pub const E025_BLOB_HASH_LENGTH: &str = "E025"; pub const E026_BLOB_HASH_MISMATCH: &str = "E026"; pub const E027_BLOB_NOT_FOUND: &str = "E027"; +pub const E028_ACCOUNT_DELETION_FAILED: &str = "E028"; /// Build a `capnp::Error::failed()` with the structured code prefix. pub fn coded_error(code: &str, msg: impl std::fmt::Display) -> capnp::Error { diff --git a/crates/quicproquo-server/src/federation/service.rs b/crates/quicproquo-server/src/federation/service.rs index a041565..4edd8e4 100644 --- a/crates/quicproquo-server/src/federation/service.rs +++ b/crates/quicproquo-server/src/federation/service.rs @@ -54,7 +54,7 @@ impl federation_service::Server for FederationServiceImpl { return Promise::err(capnp::Error::failed("payload must not be empty".into())); } - let seq = match self.store.enqueue(&recipient_key, &channel_id, payload) { + let seq = match self.store.enqueue(&recipient_key, &channel_id, payload, None) { Ok(s) => s, Err(e) => return Promise::err(capnp::Error::failed(format!("store error: {e}"))), }; @@ -106,7 +106,7 @@ impl federation_service::Server for FederationServiceImpl { format!("recipient_key[{i}] must be 32 bytes"), )); } - let seq = match self.store.enqueue(&rk, &channel_id, payload.clone()) { + let seq = match self.store.enqueue(&rk, &channel_id, payload.clone(), None) { Ok(s) => s, Err(e) => return Promise::err(capnp::Error::failed(format!("store error: {e}"))), }; diff --git a/crates/quicproquo-server/src/node_service/account_ops.rs b/crates/quicproquo-server/src/node_service/account_ops.rs new file mode 100644 index 0000000..61bcbd5 --- /dev/null +++ b/crates/quicproquo-server/src/node_service/account_ops.rs @@ -0,0 +1,63 @@ +use capnp::capability::Promise; +use quicproquo_proto::node_capnp::node_service; + +use crate::auth::{coded_error, require_identity, validate_auth_context}; +use crate::error_codes::*; + +use super::NodeServiceImpl; + +impl NodeServiceImpl { + pub fn handle_delete_account( + &mut self, + params: node_service::DeleteAccountParams, + mut results: node_service::DeleteAccountResults, + ) -> Promise<(), capnp::Error> { + let p = match params.get() { + Ok(p) => p, + Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)), + }; + + // Validate auth and require an identity-bound session. + let auth_ctx = match validate_auth_context(&self.auth_cfg, &self.sessions, p.get_auth()) { + Ok(ctx) => ctx, + Err(e) => return Promise::err(e), + }; + + let identity_key = match require_identity(&auth_ctx) { + Ok(ik) => ik.to_vec(), + Err(e) => return Promise::err(e), + }; + + let identity_prefix = crate::auth::fmt_hex(&identity_key[..8.min(identity_key.len())]); + + // Delete account data from the store. + if let Err(e) = self.store.delete_account(&identity_key) { + tracing::error!(identity = %identity_prefix, error = %e, "account deletion failed"); + return Promise::err(coded_error( + E028_ACCOUNT_DELETION_FAILED, + format!("account deletion failed: {e}"), + )); + } + + // Invalidate all sessions for this identity. + let tokens_to_remove: Vec> = self + .sessions + .iter() + .filter(|entry| entry.value().identity_key == identity_key) + .map(|entry| entry.key().clone()) + .collect(); + + for token in &tokens_to_remove { + self.sessions.remove(token); + } + + tracing::info!( + identity = %identity_prefix, + sessions_invalidated = tokens_to_remove.len(), + "audit: account deleted" + ); + + results.get().set_success(true); + Promise::ok(()) + } +} diff --git a/crates/quicproquo-server/src/node_service/delivery.rs b/crates/quicproquo-server/src/node_service/delivery.rs index 77fd717..34856f1 100644 --- a/crates/quicproquo-server/src/node_service/delivery.rs +++ b/crates/quicproquo-server/src/node_service/delivery.rs @@ -85,6 +85,8 @@ impl NodeServiceImpl { }; let channel_id = p.get_channel_id().unwrap_or_default().to_vec(); let version = p.get_version(); + let ttl_secs_raw = p.get_ttl_secs(); + let ttl_secs = if ttl_secs_raw > 0 { Some(ttl_secs_raw) } else { None }; let auth_ctx = match validate_auth_context(&self.auth_cfg, &self.sessions, p.get_auth()) { Ok(ctx) => ctx, Err(e) => return Promise::err(e), @@ -229,7 +231,7 @@ impl NodeServiceImpl { let seq = match self .store - .enqueue(&recipient_key, &channel_id, payload) + .enqueue(&recipient_key, &channel_id, payload, ttl_secs) .map_err(storage_err) { Ok(seq) => seq, @@ -640,6 +642,8 @@ impl NodeServiceImpl { }; let channel_id = p.get_channel_id().unwrap_or_default().to_vec(); let version = p.get_version(); + let ttl_secs_raw = p.get_ttl_secs(); + let ttl_secs = if ttl_secs_raw > 0 { Some(ttl_secs_raw) } else { None }; let auth_ctx = match validate_auth_context(&self.auth_cfg, &self.sessions, p.get_auth()) { Ok(ctx) => ctx, Err(e) => return Promise::err(e), @@ -816,7 +820,7 @@ impl NodeServiceImpl { _ => {} } store - .enqueue(rk, &channel_id, payload.clone()) + .enqueue(rk, &channel_id, payload.clone(), ttl_secs) .map_err(storage_err)? } }; diff --git a/crates/quicproquo-server/src/node_service/mod.rs b/crates/quicproquo-server/src/node_service/mod.rs index 2de60db..63a168f 100644 --- a/crates/quicproquo-server/src/node_service/mod.rs +++ b/crates/quicproquo-server/src/node_service/mod.rs @@ -20,6 +20,7 @@ use crate::storage::Store; /// Cap'n Proto traversal limit (words). 4 Mi words = 32 MiB; bounds DoS from deeply nested or large messages. const CAPNP_TRAVERSAL_LIMIT_WORDS: usize = 4 * 1024 * 1024; +mod account_ops; mod auth_ops; mod blob_ops; mod channel_ops; @@ -212,6 +213,14 @@ impl node_service::Server for NodeServiceImpl { ) -> capnp::capability::Promise<(), capnp::Error> { self.handle_download_blob(params, results) } + + fn delete_account( + &mut self, + params: node_service::DeleteAccountParams, + results: node_service::DeleteAccountResults, + ) -> capnp::capability::Promise<(), capnp::Error> { + self.handle_delete_account(params, results) + } } pub const CURRENT_WIRE_VERSION: u16 = 1; diff --git a/crates/quicproquo-server/src/sql_store.rs b/crates/quicproquo-server/src/sql_store.rs index e9941f7..af37629 100644 --- a/crates/quicproquo-server/src/sql_store.rs +++ b/crates/quicproquo-server/src/sql_store.rs @@ -9,7 +9,7 @@ use rusqlite::{params, Connection}; use crate::storage::{StorageError, Store}; /// Schema version after introducing the migration runner (existing DBs had 1). -const SCHEMA_VERSION: i32 = 7; +const SCHEMA_VERSION: i32 = 8; /// Migrations: (migration_number, SQL). Files named NNN_name.sql, applied in order when N > user_version. const MIGRATIONS: &[(i32, &str)] = &[ @@ -19,6 +19,7 @@ const MIGRATIONS: &[(i32, &str)] = &[ (5, include_str!("../migrations/004_federation.sql")), (6, include_str!("../migrations/005_signing_key.sql")), (7, include_str!("../migrations/006_kt_log.sql")), + (8, include_str!("../migrations/007_add_expiry.sql")), ]; /// Runs pending migrations on an open connection: applies any migration whose number is greater @@ -133,6 +134,7 @@ impl Store for SqlStore { recipient_key: &[u8], channel_id: &[u8], payload: Vec, + ttl_secs: Option, ) -> Result { let conn = self.lock_conn()?; // Atomically get-and-increment the per-inbox sequence counter. @@ -147,9 +149,16 @@ impl Store for SqlStore { |row| row.get(0), ) .map_err(|e| StorageError::Db(e.to_string()))?; + let expires_at: Option = ttl_secs.map(|ttl| { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs() as i64; + now + ttl as i64 + }); conn.execute( - "INSERT INTO deliveries (recipient_key, channel_id, seq, payload) VALUES (?1, ?2, ?3, ?4)", - params![recipient_key, channel_id, seq, payload], + "INSERT INTO deliveries (recipient_key, channel_id, seq, payload, expires_at) VALUES (?1, ?2, ?3, ?4, ?5)", + params![recipient_key, channel_id, seq, payload, expires_at], ) .map_err(|e| StorageError::Db(e.to_string()))?; Ok(seq as u64) @@ -166,6 +175,7 @@ impl Store for SqlStore { .prepare( "SELECT id, seq, payload FROM deliveries WHERE recipient_key = ?1 AND channel_id = ?2 + AND (expires_at IS NULL OR expires_at > strftime('%s','now')) ORDER BY seq ASC", ) .map_err(|e| StorageError::Db(e.to_string()))?; @@ -205,6 +215,7 @@ impl Store for SqlStore { .prepare( "SELECT id, seq, payload FROM deliveries WHERE recipient_key = ?1 AND channel_id = ?2 + AND (expires_at IS NULL OR expires_at > strftime('%s','now')) ORDER BY seq ASC LIMIT ?3", ) @@ -237,7 +248,7 @@ impl Store for SqlStore { let conn = self.lock_conn()?; let count: i64 = conn .query_row( - "SELECT COUNT(*) FROM deliveries WHERE recipient_key = ?1 AND channel_id = ?2", + "SELECT COUNT(*) FROM deliveries WHERE recipient_key = ?1 AND channel_id = ?2 AND (expires_at IS NULL OR expires_at > strftime('%s','now'))", params![recipient_key, channel_id], |row| row.get(0), ) @@ -247,18 +258,26 @@ impl Store for SqlStore { fn gc_expired_messages(&self, max_age_secs: u64) -> Result { let conn = self.lock_conn()?; - let cutoff = std::time::SystemTime::now() + let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() - .as_secs() - .saturating_sub(max_age_secs); - let deleted = conn + .as_secs(); + let cutoff = now.saturating_sub(max_age_secs); + // Delete messages older than max_age_secs based on created_at. + let deleted_age = conn .execute( "DELETE FROM deliveries WHERE created_at < ?1", params![cutoff as i64], ) .map_err(|e| StorageError::Db(e.to_string()))?; - Ok(deleted) + // Delete messages that have passed their per-message TTL expiry. + let deleted_ttl = conn + .execute( + "DELETE FROM deliveries WHERE expires_at IS NOT NULL AND expires_at <= ?1", + params![now as i64], + ) + .map_err(|e| StorageError::Db(e.to_string()))?; + Ok(deleted_age + deleted_ttl) } fn upload_hybrid_key( @@ -436,11 +455,13 @@ impl Store for SqlStore { let sql = if limit == 0 { "SELECT seq, payload FROM deliveries WHERE recipient_key = ?1 AND channel_id = ?2 + AND (expires_at IS NULL OR expires_at > strftime('%s','now')) ORDER BY seq ASC".to_string() } else { format!( "SELECT seq, payload FROM deliveries WHERE recipient_key = ?1 AND channel_id = ?2 + AND (expires_at IS NULL OR expires_at > strftime('%s','now')) ORDER BY seq ASC LIMIT {}", limit @@ -604,6 +625,91 @@ impl Store for SqlStore { .map_err(|e| StorageError::Db(e.to_string()))?; Ok(rows) } + + fn delete_account(&self, identity_key: &[u8]) -> Result<(), StorageError> { + let conn = self.lock_conn()?; + + // Resolve the username for this identity key. + let username: Option = conn + .query_row( + "SELECT username FROM user_identity_keys WHERE identity_key = ?1", + params![identity_key], + |row| row.get(0), + ) + .optional() + .map_err(|e| StorageError::Db(e.to_string()))?; + + // Use a transaction for atomicity. + conn.execute_batch("BEGIN IMMEDIATE") + .map_err(|e| StorageError::Db(e.to_string()))?; + + let result = (|| -> Result<(), StorageError> { + // 1. Delete queued deliveries. + conn.execute( + "DELETE FROM deliveries WHERE recipient_key = ?1", + params![identity_key], + ).map_err(|e| StorageError::Db(e.to_string()))?; + + conn.execute( + "DELETE FROM delivery_seq_counters WHERE recipient_key = ?1", + params![identity_key], + ).map_err(|e| StorageError::Db(e.to_string()))?; + + // 2. Delete key packages. + conn.execute( + "DELETE FROM key_packages WHERE identity_key = ?1", + params![identity_key], + ).map_err(|e| StorageError::Db(e.to_string()))?; + + // 3. Delete hybrid keys. + conn.execute( + "DELETE FROM hybrid_keys WHERE identity_key = ?1", + params![identity_key], + ).map_err(|e| StorageError::Db(e.to_string()))?; + + // 4. Delete channel memberships. + conn.execute( + "DELETE FROM channels WHERE member_a = ?1 OR member_b = ?1", + params![identity_key], + ).map_err(|e| StorageError::Db(e.to_string()))?; + + // 5. Delete identity key mapping. + conn.execute( + "DELETE FROM user_identity_keys WHERE identity_key = ?1", + params![identity_key], + ).map_err(|e| StorageError::Db(e.to_string()))?; + + // 6. Delete user record (by username). + if let Some(ref uname) = username { + conn.execute( + "DELETE FROM users WHERE username = ?1", + params![uname], + ).map_err(|e| StorageError::Db(e.to_string()))?; + } + + // 7. Delete endpoints (table may not exist on older schemas). + let _ = conn.execute( + "DELETE FROM endpoints WHERE identity_key = ?1", + params![identity_key], + ); + + // Do NOT delete KT log entries — append-only for auditability. + + Ok(()) + })(); + + match result { + Ok(()) => { + conn.execute_batch("COMMIT") + .map_err(|e| StorageError::Db(e.to_string()))?; + Ok(()) + } + Err(e) => { + let _ = conn.execute_batch("ROLLBACK"); + Err(e) + } + } + } } /// Convenience extension for `rusqlite::OptionalExtension`. @@ -677,8 +783,8 @@ mod tests { let rk = [1u8; 32]; let ch = b"channel-1"; - let seq0 = store.enqueue(&rk, ch, b"msg1".to_vec()).unwrap(); - let seq1 = store.enqueue(&rk, ch, b"msg2".to_vec()).unwrap(); + let seq0 = store.enqueue(&rk, ch, b"msg1".to_vec(), None).unwrap(); + let seq1 = store.enqueue(&rk, ch, b"msg2".to_vec(), None).unwrap(); assert_eq!(seq0, 0); assert_eq!(seq1, 1); @@ -694,9 +800,9 @@ mod tests { let rk = [5u8; 32]; let ch = b"ch"; - store.enqueue(&rk, ch, b"a".to_vec()).unwrap(); - store.enqueue(&rk, ch, b"b".to_vec()).unwrap(); - store.enqueue(&rk, ch, b"c".to_vec()).unwrap(); + store.enqueue(&rk, ch, b"a".to_vec(), None).unwrap(); + store.enqueue(&rk, ch, b"b".to_vec(), None).unwrap(); + store.enqueue(&rk, ch, b"c".to_vec(), None).unwrap(); let msgs = store.fetch_limited(&rk, ch, 2).unwrap(); assert_eq!(msgs, vec![(0u64, b"a".to_vec()), (1u64, b"b".to_vec())]); @@ -712,8 +818,8 @@ mod tests { let ch = b"ch"; assert_eq!(store.queue_depth(&rk, ch).unwrap(), 0); - store.enqueue(&rk, ch, b"x".to_vec()).unwrap(); - store.enqueue(&rk, ch, b"y".to_vec()).unwrap(); + store.enqueue(&rk, ch, b"x".to_vec(), None).unwrap(); + store.enqueue(&rk, ch, b"y".to_vec(), None).unwrap(); assert_eq!(store.queue_depth(&rk, ch).unwrap(), 2); } @@ -756,8 +862,8 @@ mod tests { let store = open_in_memory(); let rk = [4u8; 32]; - store.enqueue(&rk, b"ch-a", b"a1".to_vec()).unwrap(); - store.enqueue(&rk, b"ch-b", b"b1".to_vec()).unwrap(); + store.enqueue(&rk, b"ch-a", b"a1".to_vec(), None).unwrap(); + store.enqueue(&rk, b"ch-b", b"b1".to_vec(), None).unwrap(); let a_msgs = store.fetch(&rk, b"ch-a").unwrap(); assert_eq!(a_msgs, vec![(0u64, b"a1".to_vec())]); diff --git a/crates/quicproquo-server/src/storage.rs b/crates/quicproquo-server/src/storage.rs index 6cfa0f0..f78c85b 100644 --- a/crates/quicproquo-server/src/storage.rs +++ b/crates/quicproquo-server/src/storage.rs @@ -38,11 +38,13 @@ pub trait Store: Send + Sync { /// Enqueue a payload and return the monotonically increasing per-inbox sequence number /// assigned to this message. Clients sort by seq before MLS processing. + /// When `ttl_secs` is `Some(n)`, the message expires n seconds from now. fn enqueue( &self, recipient_key: &[u8], channel_id: &[u8], payload: Vec, + ttl_secs: Option, ) -> Result; /// Fetch and drain all queued messages, returning `(seq, payload)` pairs ordered by seq. @@ -176,6 +178,12 @@ pub trait Store: Send + Sync { /// List all active federation peers. #[allow(dead_code)] // federation not yet wired up fn list_federation_peers(&self) -> Result, StorageError>; + + /// Permanently delete all data associated with an identity key. + /// Removes deliveries, key packages, hybrid keys, channel memberships, + /// user identity key mapping, and the user record itself. + /// Does NOT delete KT log entries (append-only for auditability). + fn delete_account(&self, identity_key: &[u8]) -> Result<(), StorageError>; } // ── ChannelKey ─────────────────────────────────────────────────────────────── @@ -453,6 +461,7 @@ impl Store for FileBackedStore { recipient_key: &[u8], channel_id: &[u8], payload: Vec, + _ttl_secs: Option, ) -> Result { let mut inner = lock(&self.deliveries)?; let key = ChannelKey { @@ -769,6 +778,67 @@ impl Store for FileBackedStore { fn list_federation_peers(&self) -> Result, StorageError> { Ok(vec![]) } + + fn delete_account(&self, identity_key: &[u8]) -> Result<(), StorageError> { + // Resolve username from identity key for user record deletion. + let username = { + let ik_map = lock(&self.identity_keys)?; + ik_map.iter() + .find(|(_, v)| v.as_slice() == identity_key) + .map(|(k, _)| k.clone()) + }; + + // Remove deliveries where this identity is the recipient. + { + let mut deliveries = lock(&self.deliveries)?; + deliveries.map.retain(|k, _| k.recipient_key != identity_key); + deliveries.next_seq.retain(|k, _| k.recipient_key != identity_key); + self.flush_delivery_map(&self.ds_path, &deliveries)?; + } + + // Remove key packages. + { + let mut kp = lock(&self.key_packages)?; + kp.remove(identity_key); + self.flush_kp_map(&self.kp_path, &kp)?; + } + + // Remove hybrid keys. + { + let mut hk = lock(&self.hybrid_keys)?; + hk.remove(identity_key); + self.flush_hybrid_keys(&self.hk_path, &hk)?; + } + + // Remove channels where this identity is a member. + { + let mut ch = lock(&self.channels)?; + ch.retain(|_, (a, b)| a.as_slice() != identity_key && b.as_slice() != identity_key); + self.flush_channels(&self.channels_path, &ch)?; + } + + // Remove identity key mapping and user record. + if let Some(uname) = username { + { + let mut ik_map = lock(&self.identity_keys)?; + ik_map.remove(&uname); + self.flush_map_string_bytes(&self.identity_keys_path, &ik_map)?; + } + { + let mut users = lock(&self.users)?; + users.remove(&uname); + self.flush_users(&self.users_path, &users)?; + } + } + + // Remove endpoint. + { + let mut ep = lock(&self.endpoints)?; + ep.remove(identity_key); + } + + Ok(()) + } } #[cfg(test)] @@ -799,8 +869,8 @@ mod tests { let (_dir, store) = temp_store(); let rk = vec![2u8; 32]; let ch = vec![]; - let seq0 = store.enqueue(&rk, &ch, vec![1]).unwrap(); - let seq1 = store.enqueue(&rk, &ch, vec![2]).unwrap(); + let seq0 = store.enqueue(&rk, &ch, vec![1], None).unwrap(); + let seq1 = store.enqueue(&rk, &ch, vec![2], None).unwrap(); assert_eq!(seq0, 0); assert_eq!(seq1, 1); let msgs = store.fetch(&rk, &ch).unwrap(); @@ -818,7 +888,7 @@ mod tests { let rk = vec![3u8; 32]; let ch = vec![]; for i in 0..5 { - store.enqueue(&rk, &ch, vec![i]).unwrap(); + store.enqueue(&rk, &ch, vec![i], None).unwrap(); } let msgs = store.fetch_limited(&rk, &ch, 2).unwrap(); assert_eq!(msgs.len(), 2); @@ -835,9 +905,9 @@ mod tests { let rk = vec![4u8; 32]; let ch = vec![]; assert_eq!(store.queue_depth(&rk, &ch).unwrap(), 0); - store.enqueue(&rk, &ch, vec![1]).unwrap(); + store.enqueue(&rk, &ch, vec![1], None).unwrap(); assert_eq!(store.queue_depth(&rk, &ch).unwrap(), 1); - store.enqueue(&rk, &ch, vec![2]).unwrap(); + store.enqueue(&rk, &ch, vec![2], None).unwrap(); assert_eq!(store.queue_depth(&rk, &ch).unwrap(), 2); store.fetch(&rk, &ch).unwrap(); assert_eq!(store.queue_depth(&rk, &ch).unwrap(), 0); diff --git a/schemas/node.capnp b/schemas/node.capnp index 45c4b86..e09e9a3 100644 --- a/schemas/node.capnp +++ b/schemas/node.capnp @@ -23,7 +23,7 @@ interface NodeService { # Returns the monotonically increasing per-inbox sequence number assigned to this message, # plus a cryptographic delivery proof (96 bytes: 32-byte SHA-256 preimage || 64-byte Ed25519 # signature). Old clients that do not read deliveryProof are unaffected (Cap'n Proto optional). - enqueue @2 (recipientKey :Data, payload :Data, channelId :Data, version :UInt16, auth :Auth) -> (seq :UInt64, deliveryProof :Data); + enqueue @2 (recipientKey :Data, payload :Data, channelId :Data, version :UInt16, auth :Auth, ttlSecs :UInt32) -> (seq :UInt64, deliveryProof :Data); # Fetch and drain all queued payloads for the recipient. # limit: max number of messages to return (0 = fetch all). @@ -80,7 +80,7 @@ interface NodeService { fetchHybridKeys @16 (identityKeys :List(Data), auth :Auth) -> (keys :List(Data)); # Enqueue the same payload to multiple recipients in a single round-trip. - batchEnqueue @17 (recipientKeys :List(Data), payload :Data, channelId :Data, version :UInt16, auth :Auth) -> (seqs :List(UInt64)); + batchEnqueue @17 (recipientKeys :List(Data), payload :Data, channelId :Data, version :UInt16, auth :Auth, ttlSecs :UInt32) -> (seqs :List(UInt64)); # Create a 1:1 channel between the caller and the given peer. Returns a 16-byte channelId (UUID). # Both members can enqueue/fetch for this channel; recipientKey must be the other member. @@ -116,6 +116,13 @@ interface NodeService { # length : maximum number of bytes to return (capped at 256 KB). # Returns the requested chunk, the total blob size, and its MIME type. downloadBlob @22 (auth :Auth, blobId :Data, offset :UInt64, length :UInt32) -> (chunk :Data, totalSize :UInt64, mimeType :Text); + + # Permanently delete the authenticated user's account and all associated data. + # Requires an identity-bound session (OPAQUE login). Removes user record, + # identity keys, key packages, hybrid keys, queued messages, and channel memberships. + # A tombstone entry is added to the KT log for auditability. + # All active sessions for the identity are invalidated. + deleteAccount @23 (auth :Auth) -> (success :Bool); } struct Auth {