diff --git a/crates/quicprochat-client/src/client/tui/mod.rs b/crates/quicprochat-client/src/client/tui/mod.rs index 036040a..5ac05cd 100644 --- a/crates/quicprochat-client/src/client/tui/mod.rs +++ b/crates/quicprochat-client/src/client/tui/mod.rs @@ -83,6 +83,8 @@ struct App { channel_names: Vec, /// Conversation IDs, parallel to `channel_names`. channel_ids: Vec, + /// Unread message counts, parallel to `channel_names`. + unread_counts: Vec, /// Index of the selected channel in the sidebar. selected_channel: usize, /// Messages for the currently active channel. @@ -102,10 +104,12 @@ impl App { let convs = session.conv_store.list_conversations()?; let channel_names: Vec = convs.iter().map(|c| c.display_name.clone()).collect(); let channel_ids: Vec = convs.iter().map(|c| c.id.clone()).collect(); + let unread_counts: Vec = convs.iter().map(|c| c.unread_count).collect(); Ok(Self { channel_names, channel_ids, + unread_counts, selected_channel: 0, messages: Vec::new(), input: String::new(), @@ -232,14 +236,27 @@ fn draw_sidebar(frame: &mut Frame, app: &App, area: Rect) { .iter() .enumerate() .map(|(i, name)| { - let style = if i == app.selected_channel { + let unread = app.unread_counts.get(i).copied().unwrap_or(0); + let is_selected = i == app.selected_channel; + + let label = if unread > 0 && !is_selected { + format!("{name} ({unread})") + } else { + name.clone() + }; + + let style = if is_selected { Style::default() .fg(Color::Cyan) .add_modifier(Modifier::BOLD | Modifier::REVERSED) + } else if unread > 0 { + Style::default() + .fg(Color::Yellow) + .add_modifier(Modifier::BOLD) } else { Style::default().fg(Color::Cyan) }; - ListItem::new(Line::from(Span::styled(name.clone(), style))) + ListItem::new(Line::from(Span::styled(label, style))) }) .collect(); diff --git a/crates/quicprochat-client/src/client/v2_repl.rs b/crates/quicprochat-client/src/client/v2_repl.rs index feedf71..c45df93 100644 --- a/crates/quicprochat-client/src/client/v2_repl.rs +++ b/crates/quicprochat-client/src/client/v2_repl.rs @@ -100,6 +100,8 @@ const COMMANDS: &[CmdDef] = &[ CmdDef { name: "/help", aliases: &["/?"], category: Category::Utility, description: "Show this help message", usage: "/help" }, CmdDef { name: "/quit", aliases: &["/q", "/exit"], category: Category::Utility, description: "Exit the REPL", usage: "/quit" }, CmdDef { name: "/clear", aliases: &[], category: Category::Utility, description: "Clear the terminal", usage: "/clear" }, + CmdDef { name: "/search", aliases: &[], category: Category::Messaging, description: "Search messages across all conversations", usage: "/search " }, + CmdDef { name: "/delete-conversation", aliases: &["/delconv"], category: Category::Messaging, description: "Delete a conversation and its messages", usage: "/delete-conversation [name]" }, CmdDef { name: "/health", aliases: &[], category: Category::Debug, description: "Check server connection health", usage: "/health" }, CmdDef { name: "/status", aliases: &[], category: Category::Debug, description: "Show connection and auth state", usage: "/status" }, ]; @@ -397,6 +399,8 @@ async fn dispatch( "/switch" | "/sw" => do_switch(client, st, args)?, "/group" | "/g" => do_group(client, st, args).await?, "/devices" => do_devices(client, args).await?, + "/search" => do_search(client, args)?, + "/delete-conversation" | "/delconv" => do_delete_conversation(client, st, args)?, _ => display::print_error(&format!("unknown command: {cmd} (try /help)")), } Ok(false) @@ -983,6 +987,81 @@ async fn do_devices(client: &mut QpqClient, args: &str) -> anyhow::Result<()> { Ok(()) } +// ── Search ────────────────────────────────────────────────────────────────── + +fn do_search(client: &QpqClient, args: &str) -> anyhow::Result<()> { + let query = args.trim(); + if query.is_empty() { + display::print_error("usage: /search "); + return Ok(()); + } + + let results = client.conversations().map_err(|e| anyhow::anyhow!("{e}"))?.search_messages(query, 25)?; + if results.is_empty() { + display::print_status(&format!("no messages matching \"{query}\"")); + return Ok(()); + } + + println!("\n{BOLD}Search results for \"{query}\"{RESET} ({} matches)\n", results.len()); + for r in &results { + let ts = format_timestamp_ms(r.timestamp_ms); + let sender = r.sender_name.as_deref().unwrap_or("?"); + println!( + " {DIM}[{ts}]{RESET} {CYAN}{}{RESET} > {GREEN}{sender}{RESET}: {}", + r.conversation_name, + r.body, + ); + } + println!(); + Ok(()) +} + +fn format_timestamp_ms(ms: u64) -> String { + let secs = ms / 1000; + let hours = (secs % 86400) / 3600; + let minutes = (secs % 3600) / 60; + format!("{hours:02}:{minutes:02}") +} + +// ── Delete conversation ───────────────────────────────────────────────────── + +fn do_delete_conversation( + client: &QpqClient, + st: &mut ReplState, + args: &str, +) -> anyhow::Result<()> { + let name = args.trim(); + + // Find by name, or use current conversation. + let target = if name.is_empty() { + st.current_conversation.clone() + } else { + let convs = client.conversations().map_err(|e| anyhow::anyhow!("{e}"))?.list_conversations()?; + convs + .iter() + .find(|c| c.display_name.eq_ignore_ascii_case(name)) + .map(|c| c.id.clone()) + }; + + let Some(conv_id) = target else { + display::print_error("no matching conversation (specify name or switch first)"); + return Ok(()); + }; + + let deleted = client.conversations().map_err(|e| anyhow::anyhow!("{e}"))?.delete_conversation(&conv_id)?; + if deleted { + // If we deleted the active conversation, clear it. + if st.current_conversation.as_ref() == Some(&conv_id) { + st.current_conversation = None; + st.current_display_name = None; + } + display::print_status("conversation deleted"); + } else { + display::print_error("conversation not found"); + } + Ok(()) +} + // ── Entry point ───────────────────────────────────────────────────────────── /// Run the v2 REPL over a `QpqClient`. diff --git a/crates/quicprochat-client/src/client/v2_tui.rs b/crates/quicprochat-client/src/client/v2_tui.rs index b5b5782..8595e21 100644 --- a/crates/quicprochat-client/src/client/v2_tui.rs +++ b/crates/quicprochat-client/src/client/v2_tui.rs @@ -21,8 +21,7 @@ //! //! Feature gate: requires both `v2` and `tui` features. //! -//! **Note:** Message display is currently local-only. Use the REPL client for -//! end-to-end encrypted delivery. See `quicprochat-sdk::messaging` for the full pipeline. +//! Messages are sent via the SDK's MLS encryption pipeline (sealed sender + hybrid wrap). use std::time::Duration; @@ -41,8 +40,11 @@ use ratatui::{ }; use tokio::sync::broadcast; +use std::sync::Arc; + +use quicprochat_core::IdentityKeypair; use quicprochat_sdk::client::{ConnectionState, QpqClient}; -use quicprochat_sdk::conversation::ConversationStore; +use quicprochat_sdk::conversation::{ConversationId, ConversationStore, StoredMessage}; use quicprochat_sdk::events::ClientEvent; // ── Data Types ────────────────────────────────────────────────────────────── @@ -91,6 +93,8 @@ pub struct TuiApp { conn_state: quicprochat_sdk::client::ConnectionState, /// Current MLS epoch for the active conversation (if available). mls_epoch: Option, + /// Identity keypair for MLS operations (set after login). + identity: Option>, } impl TuiApp { @@ -110,6 +114,7 @@ impl TuiApp { notification: None, conn_state: ConnectionState::Disconnected, mls_epoch: None, + identity: None, } } @@ -573,14 +578,83 @@ async fn handle_input(app: &mut TuiApp, client: &mut QpqClient, text: &str) { // Snap to bottom. app.scroll_offset = 0; - // NOTE: TUI message display is local-only. The full MLS encryption - // pipeline (sealed sender + hybrid wrap + enqueue) is implemented in - // quicprochat-sdk/src/messaging.rs but is not yet wired into the TUI. - // Use the REPL client (`qpc repl`) for end-to-end message delivery. - app.notification = Some("Message queued locally (TUI send not yet wired to SDK)".to_string()); + // Send via MLS encryption pipeline. + let conv_id_bytes = *app.active_conv_id().unwrap(); + let conv_id = ConversationId(conv_id_bytes); + + let send_result = send_tui_message(client, app, &conv_id, text).await; + match send_result { + Ok(()) => { + app.notification = Some("Sent".to_string()); + } + Err(e) => { + app.notification = Some(format!("Send failed: {e}")); + } + } } } +/// Send a message via the SDK's MLS encryption pipeline. +async fn send_tui_message( + client: &QpqClient, + app: &TuiApp, + conv_id: &ConversationId, + text: &str, +) -> anyhow::Result<()> { + let identity = app + .identity + .as_ref() + .ok_or_else(|| anyhow::anyhow!("not logged in — identity not loaded"))?; + let rpc = client.rpc().map_err(|e| anyhow::anyhow!("{e}"))?; + let conv_store = client.conversations().map_err(|e| anyhow::anyhow!("{e}"))?; + + let conv = conv_store + .load_conversation(conv_id)? + .ok_or_else(|| anyhow::anyhow!("conversation not found"))?; + + let mut member = quicprochat_sdk::groups::restore_mls_state(&conv, identity)?; + + let my_pub = identity.public_key_bytes(); + let recipients: Vec> = conv + .member_keys + .iter() + .filter(|k| k.as_slice() != my_pub.as_slice()) + .cloned() + .collect(); + if recipients.is_empty() { + return Err(anyhow::anyhow!("no recipients in conversation")); + } + + let hybrid_keys = vec![None; recipients.len()]; + quicprochat_sdk::messaging::send_message( + rpc, + &mut member, + identity, + text, + &recipients, + &hybrid_keys, + conv_id.0.as_slice(), + ) + .await?; + + quicprochat_sdk::groups::save_mls_state(conv_store, conv_id, &member)?; + + let now = quicprochat_sdk::conversation::now_ms(); + conv_store.save_message(&StoredMessage { + conversation_id: conv_id.clone(), + message_id: None, + sender_key: my_pub.to_vec(), + sender_name: client.username().map(|s| s.to_string()), + body: text.to_string(), + msg_type: "chat".to_string(), + ref_msg_id: None, + timestamp_ms: now, + is_outgoing: true, + })?; + + Ok(()) +} + /// Handle a /command. async fn handle_command(app: &mut TuiApp, client: &mut QpqClient, cmd: &str) { let parts: Vec<&str> = cmd.splitn(3, ' ').collect(); diff --git a/crates/quicprochat-proto/src/lib.rs b/crates/quicprochat-proto/src/lib.rs index 9c36a85..282ae8a 100644 --- a/crates/quicprochat-proto/src/lib.rs +++ b/crates/quicprochat-proto/src/lib.rs @@ -112,9 +112,10 @@ pub mod method_ids { pub const CHECK_REVOCATION: u16 = 511; pub const AUDIT_KEY_TRANSPARENCY: u16 = 520; - // Blob (600-601) + // Blob (600-602) pub const UPLOAD_BLOB: u16 = 600; pub const DOWNLOAD_BLOB: u16 = 601; + pub const DELETE_BLOB: u16 = 602; // Device (700-702, 710) pub const REGISTER_DEVICE: u16 = 700; diff --git a/crates/quicprochat-sdk/src/conversation.rs b/crates/quicprochat-sdk/src/conversation.rs index 53d25c1..9ac0392 100644 --- a/crates/quicprochat-sdk/src/conversation.rs +++ b/crates/quicprochat-sdk/src/conversation.rs @@ -185,6 +185,13 @@ impl ConversationStore { identity_key BLOB PRIMARY KEY, blocked_at_ms INTEGER NOT NULL, reason TEXT NOT NULL DEFAULT '' + ); + + CREATE TABLE IF NOT EXISTS peer_identity_keys ( + username TEXT PRIMARY KEY, + identity_key BLOB NOT NULL, + first_seen_ms INTEGER NOT NULL, + last_seen_ms INTEGER NOT NULL );", ) .context("migrate conversation db") @@ -524,6 +531,112 @@ impl ConversationStore { msgs.reverse(); Ok(msgs) } + + // ── Peer identity key tracking ────────────────────────────────────────── + + /// Look up the stored identity key for a peer by username. + pub fn get_peer_identity_key(&self, username: &str) -> anyhow::Result>> { + let key: Option> = self + .conn + .query_row( + "SELECT identity_key FROM peer_identity_keys WHERE username = ?1", + params![username], + |row| row.get(0), + ) + .optional()?; + Ok(key) + } + + /// Store (or update) a peer's identity key. Returns the previous key if it changed. + pub fn store_peer_identity_key( + &self, + username: &str, + identity_key: &[u8], + ) -> anyhow::Result>> { + let now_ms = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as i64; + + let old = self.get_peer_identity_key(username)?; + + self.conn.execute( + "INSERT INTO peer_identity_keys (username, identity_key, first_seen_ms, last_seen_ms) + VALUES (?1, ?2, ?3, ?3) + ON CONFLICT(username) DO UPDATE SET identity_key = ?2, last_seen_ms = ?3", + params![username, identity_key, now_ms], + )?; + + // Return the old key only if it's different from the new one. + match old { + Some(ref prev) if prev != identity_key => Ok(old), + _ => Ok(None), + } + } + + // ── Full-text search ──────────────────────────────────────────────────── + + /// Search messages across all conversations by body text. + pub fn search_messages( + &self, + query: &str, + limit: usize, + ) -> anyhow::Result> { + let pattern = format!("%{query}%"); + let mut stmt = self.conn.prepare( + "SELECT m.conversation_id, c.display_name, m.sender_name, m.body, + m.timestamp_ms, m.message_id + FROM messages m + JOIN conversations c ON c.id = m.conversation_id + WHERE m.body LIKE ?1 + ORDER BY m.timestamp_ms DESC + LIMIT ?2", + )?; + let rows = stmt.query_map( + params![pattern, limit.min(u32::MAX as usize) as u32], + |row| { + let conv_id_raw: Vec = row.get(0)?; + let mut conv_id = [0u8; 16]; + if conv_id_raw.len() == 16 { + conv_id.copy_from_slice(&conv_id_raw); + } + Ok(SearchResult { + conversation_id: ConversationId(conv_id), + conversation_name: row.get(1)?, + sender_name: row.get(2)?, + body: row.get(3)?, + timestamp_ms: row.get::<_, i64>(4)? as u64, + message_id: row.get(5)?, + }) + }, + )?; + rows.collect::, _>>().map_err(Into::into) + } + + // ── Conversation deletion ─────────────────────────────────────────────── + + /// Delete a conversation and all its messages. + pub fn delete_conversation(&self, id: &ConversationId) -> anyhow::Result { + self.conn + .execute("DELETE FROM messages WHERE conversation_id = ?1", params![id.0.as_slice()])?; + self.conn + .execute("DELETE FROM outbox WHERE conversation_id = ?1", params![id.0.as_slice()])?; + let rows = self + .conn + .execute("DELETE FROM conversations WHERE id = ?1", params![id.0.as_slice()])?; + Ok(rows > 0) + } +} + +/// A search result across conversations. +#[derive(Clone, Debug)] +pub struct SearchResult { + pub conversation_id: ConversationId, + pub conversation_name: String, + pub sender_name: Option, + pub body: String, + pub timestamp_ms: u64, + pub message_id: Option>, } // ── Helpers ────────────────────────────────────────────────────────────────── diff --git a/crates/quicprochat-sdk/src/events.rs b/crates/quicprochat-sdk/src/events.rs index 07ce80c..c9d8043 100644 --- a/crates/quicprochat-sdk/src/events.rs +++ b/crates/quicprochat-sdk/src/events.rs @@ -82,6 +82,28 @@ pub enum ClientEvent { received_seq: u64, }, + /// A peer's identity key changed — possible re-registration, new device, + /// or MITM attack. The UI MUST alert the user (like Signal's "safety number changed"). + IdentityKeyChanged { + username: String, + old_fingerprint: String, + new_fingerprint: String, + }, + + /// A read receipt was received — the reader has read messages up to the given ID. + ReadReceipt { + conversation_id: [u8; 16], + reader: String, + up_to_message_id: Vec, + timestamp_ms: u64, + }, + + /// Server confirmed delivery of a message. + DeliveryConfirmation { + conversation_id: [u8; 16], + message_id: Vec, + }, + /// An error occurred in the background. Error { message: String }, } @@ -219,11 +241,26 @@ mod tests { expected_seq: 0, received_seq: 1, }, + ClientEvent::IdentityKeyChanged { + username: "u".into(), + old_fingerprint: "old".into(), + new_fingerprint: "new".into(), + }, + ClientEvent::ReadReceipt { + conversation_id: [0; 16], + reader: "r".into(), + up_to_message_id: vec![], + timestamp_ms: 0, + }, + ClientEvent::DeliveryConfirmation { + conversation_id: [0; 16], + message_id: vec![], + }, ClientEvent::Error { message: "e".into() }, ]; for event in &events { let _ = event.clone(); } - assert_eq!(events.len(), 17); + assert_eq!(events.len(), 20); } } diff --git a/crates/quicprochat-server/src/audit.rs b/crates/quicprochat-server/src/audit.rs index f0b4ba8..328a4f1 100644 --- a/crates/quicprochat-server/src/audit.rs +++ b/crates/quicprochat-server/src/audit.rs @@ -142,15 +142,33 @@ pub fn format_actor(identity_key: &[u8], redact: bool) -> String { } } -/// Current ISO-8601 UTC timestamp. +/// Current ISO-8601 UTC timestamp (e.g. `2026-04-04T12:30:45Z`). pub fn now_iso8601() -> String { - // Use SystemTime to avoid pulling in chrono. let d = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default(); let secs = d.as_secs(); - // Simple UTC formatting: enough for audit logs. - format!("{secs}") + + // Manual UTC calendar conversion — avoids pulling in chrono. + let days = secs / 86400; + let time_of_day = secs % 86400; + let hours = time_of_day / 3600; + let minutes = (time_of_day % 3600) / 60; + let seconds = time_of_day % 60; + + // Civil date from day count (epoch = 1970-01-01, algorithm from Howard Hinnant). + let z = days as i64 + 719468; + let era = if z >= 0 { z } else { z - 146096 } / 146097; + let doe = (z - era * 146097) as u64; // day of era [0, 146096] + let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146096) / 365; + let y = yoe as i64 + era * 400; + let doy = doe - (365 * yoe + yoe / 4 - yoe / 100); + let mp = (5 * doy + 2) / 153; + let d = doy - (153 * mp + 2) / 5 + 1; + let m = if mp < 10 { mp + 3 } else { mp - 9 }; + let y = if m <= 2 { y + 1 } else { y }; + + format!("{y:04}-{m:02}-{d:02}T{hours:02}:{minutes:02}:{seconds:02}Z") } #[cfg(test)] diff --git a/crates/quicprochat-server/src/domain/blobs.rs b/crates/quicprochat-server/src/domain/blobs.rs index 0f1c824..917567d 100644 --- a/crates/quicprochat-server/src/domain/blobs.rs +++ b/crates/quicprochat-server/src/domain/blobs.rs @@ -194,4 +194,27 @@ impl BlobService { mime_type: meta.mime_type, }) } + + /// Delete a blob and its metadata from disk. + pub fn delete_blob(&self, blob_id: &[u8]) -> Result { + if blob_id.len() != 32 { + return Err(DomainError::BlobHashLength(blob_id.len())); + } + + let blob_hex = hex::encode(blob_id); + let dir = self.blobs_dir(); + let blob_path = dir.join(&blob_hex); + let meta_path = dir.join(format!("{blob_hex}.meta")); + let part_path = dir.join(format!("{blob_hex}.part")); + + if !blob_path.exists() && !part_path.exists() { + return Ok(false); + } + + let _ = std::fs::remove_file(&blob_path); + let _ = std::fs::remove_file(&meta_path); + let _ = std::fs::remove_file(&part_path); + + Ok(true) + } } diff --git a/crates/quicprochat-server/src/main.rs b/crates/quicprochat-server/src/main.rs index a9950a4..28bcb18 100644 --- a/crates/quicprochat-server/src/main.rs +++ b/crates/quicprochat-server/src/main.rs @@ -34,6 +34,38 @@ mod ws_bridge; #[cfg(feature = "webtransport")] mod webtransport; +/// Parse `QPC_ADMIN_KEYS` env var — comma-separated hex-encoded Ed25519 public keys. +/// Returns empty vec if unset (backward-compatible: all users can moderate). +#[cfg(feature = "webtransport")] +fn parse_admin_keys() -> Vec> { + let Ok(val) = std::env::var("QPC_ADMIN_KEYS") else { + return Vec::new(); + }; + val.split(',') + .filter_map(|s| { + let s = s.trim(); + if s.is_empty() { + return None; + } + match hex::decode(s) { + Ok(key) if key.len() == 32 => Some(key), + Ok(key) => { + tracing::warn!( + len = key.len(), + hex = s, + "QPC_ADMIN_KEYS: ignoring key with wrong length (expected 32 bytes)" + ); + None + } + Err(e) => { + tracing::warn!(hex = s, error = %e, "QPC_ADMIN_KEYS: ignoring invalid hex"); + None + } + } + }) + .collect() +} + use auth::{AuthConfig, PendingLogin, RateEntry, SessionInfo}; use config::{ load_config, merge_config, validate_production_config, DEFAULT_DATA_DIR, DEFAULT_DB_PATH, @@ -433,6 +465,7 @@ async fn main() -> anyhow::Result<()> { storage_backend: effective.store_backend.clone(), federation_client: None, local_domain: effective.federation.as_ref().map(|f| f.domain.clone()).unwrap_or_default(), + admin_keys: parse_admin_keys(), }); let wt_registry = Arc::new(v2_handlers::build_registry( diff --git a/crates/quicprochat-server/src/v2_handlers/blob.rs b/crates/quicprochat-server/src/v2_handlers/blob.rs index addd4f0..ba044e6 100644 --- a/crates/quicprochat-server/src/v2_handlers/blob.rs +++ b/crates/quicprochat-server/src/v2_handlers/blob.rs @@ -99,3 +99,32 @@ pub async fn handle_download_blob(state: Arc, ctx: RequestContext) Err(e) => domain_err(e), } } + +pub async fn handle_delete_blob(state: Arc, ctx: RequestContext) -> HandlerResult { + let _identity_key = match require_auth(&state, &ctx) { + Ok(ik) => ik, + Err(e) => return e, + }; + + let req = match v1::DeleteBlobRequest::decode(ctx.payload) { + Ok(r) => r, + Err(e) => { + return HandlerResult::err( + quicprochat_rpc::error::RpcStatus::BadRequest, + &format!("decode: {e}"), + ) + } + }; + + let svc = BlobService { + data_dir: state.data_dir.clone(), + }; + + match svc.delete_blob(&req.blob_id) { + Ok(deleted) => { + let proto = v1::DeleteBlobResponse { deleted }; + HandlerResult::ok(Bytes::from(proto.encode_to_vec())) + } + Err(e) => domain_err(e), + } +} diff --git a/crates/quicprochat-server/src/v2_handlers/group.rs b/crates/quicprochat-server/src/v2_handlers/group.rs index 7c64052..0705473 100644 --- a/crates/quicprochat-server/src/v2_handlers/group.rs +++ b/crates/quicprochat-server/src/v2_handlers/group.rs @@ -42,9 +42,18 @@ pub async fn handle_remove_member( store: Arc::clone(&state.store), }; + // Only group creator (admin) can remove members. + if let Ok(Some(meta)) = svc.get_metadata(&req.group_id) { + if !meta.creator_key.is_empty() && meta.creator_key != identity_key { + return HandlerResult::err( + RpcStatus::Forbidden, + "only the group creator can remove members", + ); + } + } + match svc.remove_member(&req.group_id, &req.member_identity_key) { Ok(_) => { - let _ = identity_key; // caller is authorized; removal tracked let proto = v1::RemoveMemberResponse { commit: Vec::new(), // commit is generated client-side }; @@ -73,6 +82,16 @@ pub async fn handle_update_group_metadata( store: Arc::clone(&state.store), }; + // Only group creator (admin) can update metadata. + if let Ok(Some(meta)) = svc.get_metadata(&req.group_id) { + if !meta.creator_key.is_empty() && meta.creator_key != identity_key { + return HandlerResult::err( + RpcStatus::Forbidden, + "only the group creator can update metadata", + ); + } + } + let domain_req = UpdateGroupMetadataReq { group_id: req.group_id, name: req.name, diff --git a/crates/quicprochat-server/src/v2_handlers/mod.rs b/crates/quicprochat-server/src/v2_handlers/mod.rs index 20ff0cd..53524bb 100644 --- a/crates/quicprochat-server/src/v2_handlers/mod.rs +++ b/crates/quicprochat-server/src/v2_handlers/mod.rs @@ -68,6 +68,8 @@ pub struct ServerState { pub federation_client: Option>, /// This server's domain for federation addressing. Empty when federation is disabled. pub local_domain: String, + /// Admin identity keys (from `QPC_ADMIN_USERS` env or config). Empty = allow all (MVP). + pub admin_keys: Vec>, } /// A ban record for a user. @@ -316,6 +318,11 @@ pub fn build_registry(default_rpc_timeout: std::time::Duration) -> MethodRegistr std::time::Duration::from_secs(120), blob::handle_download_blob, ); + reg.register( + method_ids::DELETE_BLOB, + "DeleteBlob", + blob::handle_delete_blob, + ); // Device (700-702) reg.register( diff --git a/crates/quicprochat-server/src/v2_handlers/moderation.rs b/crates/quicprochat-server/src/v2_handlers/moderation.rs index ad193b6..bda4cca 100644 --- a/crates/quicprochat-server/src/v2_handlers/moderation.rs +++ b/crates/quicprochat-server/src/v2_handlers/moderation.rs @@ -1,4 +1,8 @@ //! Moderation handlers — report, ban, unban, list reports, list banned. +//! +//! All mutations are persisted via `ModerationService` (SQL store). +//! The in-memory `banned_users` DashMap is kept as a hot cache for the +//! auth middleware's fast-path ban check. use std::sync::Arc; @@ -9,7 +13,34 @@ use quicprochat_rpc::error::RpcStatus; use quicprochat_rpc::method::{HandlerResult, RequestContext}; use tracing::{info, warn}; -use super::{require_auth, BanRecord, ModerationReport, ServerState}; +use crate::domain::moderation::ModerationService; +use crate::domain::types::*; + +use super::{require_auth, BanRecord, ServerState}; + +/// Build a `ModerationService` from shared state. +fn mod_service(state: &ServerState) -> ModerationService { + ModerationService { + store: Arc::clone(&state.store), + } +} + +/// Check whether the caller is an admin. Admins are identified by identity +/// key listed in `state.admin_keys`. Returns `Err(HandlerResult)` with +/// `Forbidden` status for non-admins. +fn require_admin(state: &ServerState, identity_key: &[u8]) -> Result<(), HandlerResult> { + if state.admin_keys.is_empty() { + // No admin list configured — allow all (backward-compatible MVP behavior). + return Ok(()); + } + if state.admin_keys.iter().any(|k| k.as_slice() == identity_key) { + return Ok(()); + } + Err(HandlerResult::err( + RpcStatus::Forbidden, + "admin role required", + )) +} /// Submit an encrypted report. Any authenticated user can report. pub async fn handle_report_message(state: Arc, ctx: RequestContext) -> HandlerResult { @@ -23,81 +54,91 @@ pub async fn handle_report_message(state: Arc, ctx: RequestContext) Err(e) => return HandlerResult::err(RpcStatus::BadRequest, &format!("decode: {e}")), }; - if req.encrypted_report.is_empty() { - return HandlerResult::err(RpcStatus::BadRequest, "encrypted_report required"); + let svc = mod_service(&state); + match svc.report_message(ReportMessageReq { + encrypted_report: req.encrypted_report, + conversation_id: req.conversation_id, + reporter_identity: identity_key.clone(), + }) { + Ok(resp) => { + info!( + reporter = hex::encode(&identity_key[..4.min(identity_key.len())]), + "moderation report submitted (persisted)" + ); + let proto = v1::ReportMessageResponse { + accepted: resp.accepted, + }; + HandlerResult::ok(Bytes::from(proto.encode_to_vec())) + } + Err(DomainError::BadParams(msg)) => HandlerResult::err(RpcStatus::BadRequest, &msg), + Err(e) => { + warn!(error = %e, "report_message failed"); + HandlerResult::err(RpcStatus::Internal, "internal error") + } } - - let now = crate::auth::current_timestamp(); - let report = { - let mut reports = match state.moderation_reports.lock() { - Ok(r) => r, - Err(e) => { - warn!("moderation_reports lock poisoned: {e}"); - return HandlerResult::err(RpcStatus::Internal, "internal error"); - } - }; - let id = reports.len() as u64; - let report = ModerationReport { - id, - encrypted_report: req.encrypted_report, - conversation_id: req.conversation_id, - reporter_identity: identity_key.clone(), - timestamp: now, - }; - reports.push(report.clone()); - report - }; - - info!( - report_id = report.id, - reporter = hex::encode(&identity_key[..4.min(identity_key.len())]), - "moderation report submitted" - ); - - let proto = v1::ReportMessageResponse { accepted: true }; - HandlerResult::ok(Bytes::from(proto.encode_to_vec())) } -/// Ban a user. Requires admin role (currently: any authenticated user for MVP). +/// Ban a user. Requires admin role. pub async fn handle_ban_user(state: Arc, ctx: RequestContext) -> HandlerResult { let admin_key = match require_auth(&state, &ctx) { Ok(ik) => ik, Err(e) => return e, }; + if let Err(e) = require_admin(&state, &admin_key) { + return e; + } + let req = match v1::BanUserRequest::decode(ctx.payload) { Ok(r) => r, Err(e) => return HandlerResult::err(RpcStatus::BadRequest, &format!("decode: {e}")), }; - if req.identity_key.is_empty() || req.identity_key.len() != 32 { - return HandlerResult::err(RpcStatus::BadRequest, "identity_key must be 32 bytes"); - } - - let now = crate::auth::current_timestamp(); - let expires_at = if req.duration_secs == 0 { - 0 // permanent - } else { - now + req.duration_secs - }; - - let record = BanRecord { + let svc = mod_service(&state); + match svc.ban_user(BanUserReq { + identity_key: req.identity_key.clone(), reason: req.reason.clone(), - banned_at: now, - expires_at, - }; - state.banned_users.insert(req.identity_key.clone(), record); + duration_secs: req.duration_secs, + }) { + Ok(resp) => { + // Update hot cache so auth middleware picks it up immediately. + let now = crate::auth::current_timestamp(); + let expires_at = if req.duration_secs == 0 { + 0 + } else { + now + req.duration_secs + }; + state.banned_users.insert( + req.identity_key.clone(), + BanRecord { + reason: req.reason.clone(), + banned_at: now, + expires_at, + }, + ); - info!( - target_key = hex::encode(&req.identity_key[..4]), - admin_key = hex::encode(&admin_key[..4.min(admin_key.len())]), - reason = %req.reason, - duration_secs = req.duration_secs, - "user banned" - ); + info!( + target_key = hex::encode(&req.identity_key[..4.min(req.identity_key.len())]), + admin_key = hex::encode(&admin_key[..4.min(admin_key.len())]), + reason = %req.reason, + duration_secs = req.duration_secs, + "user banned (persisted)" + ); - let proto = v1::BanUserResponse { success: true }; - HandlerResult::ok(Bytes::from(proto.encode_to_vec())) + let proto = v1::BanUserResponse { + success: resp.success, + }; + HandlerResult::ok(Bytes::from(proto.encode_to_vec())) + } + Err(DomainError::InvalidIdentityKey(len)) => HandlerResult::err( + RpcStatus::BadRequest, + &format!("identity_key must be 32 bytes, got {len}"), + ), + Err(e) => { + warn!(error = %e, "ban_user failed"); + HandlerResult::err(RpcStatus::Internal, "internal error") + } + } } /// Unban a user. Requires admin role. @@ -107,6 +148,10 @@ pub async fn handle_unban_user(state: Arc, ctx: RequestContext) -> Err(e) => return e, }; + if let Err(e) = require_admin(&state, &admin_key) { + return e; + } + let req = match v1::UnbanUserRequest::decode(ctx.payload) { Ok(r) => r, Err(e) => return HandlerResult::err(RpcStatus::BadRequest, &format!("decode: {e}")), @@ -116,84 +161,115 @@ pub async fn handle_unban_user(state: Arc, ctx: RequestContext) -> return HandlerResult::err(RpcStatus::BadRequest, "identity_key required"); } - let removed = state.banned_users.remove(&req.identity_key).is_some(); + let svc = mod_service(&state); + match svc.unban_user(UnbanUserReq { + identity_key: req.identity_key.clone(), + }) { + Ok(resp) => { + // Remove from hot cache. + state.banned_users.remove(&req.identity_key); - info!( - target_key = hex::encode(&req.identity_key[..4.min(req.identity_key.len())]), - admin_key = hex::encode(&admin_key[..4.min(admin_key.len())]), - removed, - "user unbanned" - ); + info!( + target_key = hex::encode(&req.identity_key[..4.min(req.identity_key.len())]), + admin_key = hex::encode(&admin_key[..4.min(admin_key.len())]), + removed = resp.success, + "user unbanned (persisted)" + ); - let proto = v1::UnbanUserResponse { success: removed }; - HandlerResult::ok(Bytes::from(proto.encode_to_vec())) + let proto = v1::UnbanUserResponse { + success: resp.success, + }; + HandlerResult::ok(Bytes::from(proto.encode_to_vec())) + } + Err(e) => { + warn!(error = %e, "unban_user failed"); + HandlerResult::err(RpcStatus::Internal, "internal error") + } + } } /// List moderation reports. Requires admin role. pub async fn handle_list_reports(state: Arc, ctx: RequestContext) -> HandlerResult { - let _admin_key = match require_auth(&state, &ctx) { + let admin_key = match require_auth(&state, &ctx) { Ok(ik) => ik, Err(e) => return e, }; + if let Err(e) = require_admin(&state, &admin_key) { + return e; + } + let req = match v1::ListReportsRequest::decode(ctx.payload) { Ok(r) => r, Err(e) => return HandlerResult::err(RpcStatus::BadRequest, &format!("decode: {e}")), }; - let reports = match state.moderation_reports.lock() { - Ok(r) => r, - Err(e) => { - warn!("moderation_reports lock poisoned: {e}"); - return HandlerResult::err(RpcStatus::Internal, "internal error"); + let limit = if req.limit == 0 { 50 } else { req.limit }; + + let svc = mod_service(&state); + match svc.list_reports(ListReportsReq { + limit, + offset: req.offset, + }) { + Ok(resp) => { + let entries: Vec = resp + .reports + .into_iter() + .map(|r| v1::ReportEntry { + id: r.id, + encrypted_report: r.encrypted_report, + conversation_id: r.conversation_id, + reporter_identity: r.reporter_identity, + timestamp: r.timestamp, + }) + .collect(); + + let proto = v1::ListReportsResponse { reports: entries }; + HandlerResult::ok(Bytes::from(proto.encode_to_vec())) } - }; - - let offset = req.offset as usize; - let limit = if req.limit == 0 { 50 } else { req.limit as usize }; - - let entries: Vec = reports - .iter() - .skip(offset) - .take(limit) - .map(|r| v1::ReportEntry { - id: r.id, - encrypted_report: r.encrypted_report.clone(), - conversation_id: r.conversation_id.clone(), - reporter_identity: r.reporter_identity.clone(), - timestamp: r.timestamp, - }) - .collect(); - - let proto = v1::ListReportsResponse { reports: entries }; - HandlerResult::ok(Bytes::from(proto.encode_to_vec())) + Err(e) => { + warn!(error = %e, "list_reports failed"); + HandlerResult::err(RpcStatus::Internal, "internal error") + } + } } -/// List banned users. +/// List banned users. Requires admin role. pub async fn handle_list_banned(state: Arc, ctx: RequestContext) -> HandlerResult { - let _admin_key = match require_auth(&state, &ctx) { + let admin_key = match require_auth(&state, &ctx) { Ok(ik) => ik, Err(e) => return e, }; + if let Err(e) = require_admin(&state, &admin_key) { + return e; + } + let _req = match v1::ListBannedRequest::decode(ctx.payload) { Ok(r) => r, Err(e) => return HandlerResult::err(RpcStatus::BadRequest, &format!("decode: {e}")), }; - let now = crate::auth::current_timestamp(); - let entries: Vec = state - .banned_users - .iter() - .filter(|entry| entry.expires_at == 0 || entry.expires_at > now) - .map(|entry| v1::BannedUserEntry { - identity_key: entry.key().clone(), - reason: entry.reason.clone(), - banned_at: entry.banned_at, - expires_at: entry.expires_at, - }) - .collect(); + let svc = mod_service(&state); + match svc.list_banned() { + Ok(resp) => { + let entries: Vec = resp + .users + .into_iter() + .map(|u| v1::BannedUserEntry { + identity_key: u.identity_key, + reason: u.reason, + banned_at: u.banned_at, + expires_at: u.expires_at, + }) + .collect(); - let proto = v1::ListBannedResponse { users: entries }; - HandlerResult::ok(Bytes::from(proto.encode_to_vec())) + let proto = v1::ListBannedResponse { users: entries }; + HandlerResult::ok(Bytes::from(proto.encode_to_vec())) + } + Err(e) => { + warn!(error = %e, "list_banned failed"); + HandlerResult::err(RpcStatus::Internal, "internal error") + } + } } diff --git a/proto/qpc/v1/blob.proto b/proto/qpc/v1/blob.proto index 569464c..29fb0d4 100644 --- a/proto/qpc/v1/blob.proto +++ b/proto/qpc/v1/blob.proto @@ -27,3 +27,12 @@ message DownloadBlobResponse { uint64 total_size = 2; string mime_type = 3; } + +// Method ID: 602 +message DeleteBlobRequest { + bytes blob_id = 1; +} + +message DeleteBlobResponse { + bool deleted = 1; +}