From 5b6d8209f0b9bef5b48f7e8f011e68aa13d6d2b7 Mon Sep 17 00:00:00 2001 From: Christian Nennemann Date: Wed, 4 Mar 2026 20:11:20 +0100 Subject: [PATCH] feat: add abuse prevention and moderation (Phase 5.6) Add server-side moderation service with report submission, user banning/unbanning, and admin listing endpoints. Add client-side user blocking with message filtering in ConversationStore. Server: - ModerationService domain logic (report, ban, unban, list) - Storage trait methods + FileBackedStore + SqlStore implementations - SQL migration 012_moderation.sql (reports + bans tables) - Error codes E031-E033 for moderation - Domain types for all moderation request/response pairs - 10 new tests (6 domain + 4 storage) SDK: - blocked_users table in ConversationStore - block_user, unblock_user, is_blocked, list_blocked methods - load_recent_messages_filtered excludes blocked senders - QpqClient moderation convenience methods - 4 new tests for block/unblock/filter --- crates/quicproquo-sdk/src/client.rs | 40 ++ crates/quicproquo-sdk/src/conversation.rs | 244 +++++++++++- .../migrations/012_moderation.sql | 20 + crates/quicproquo-server/src/domain/mod.rs | 3 + .../src/domain/moderation.rs | 304 +++++++++++++++ crates/quicproquo-server/src/domain/types.rs | 93 +++++ crates/quicproquo-server/src/error_codes.rs | 6 + crates/quicproquo-server/src/sql_store.rs | 188 ++++++++- crates/quicproquo-server/src/storage.rs | 361 ++++++++++++++++++ 9 files changed, 1255 insertions(+), 4 deletions(-) create mode 100644 crates/quicproquo-server/migrations/012_moderation.sql create mode 100644 crates/quicproquo-server/src/domain/moderation.rs diff --git a/crates/quicproquo-sdk/src/client.rs b/crates/quicproquo-sdk/src/client.rs index 3698bba..482c4e1 100644 --- a/crates/quicproquo-sdk/src/client.rs +++ b/crates/quicproquo-sdk/src/client.rs @@ -174,6 +174,46 @@ impl QpqClient { self.session_token.as_deref() } + // ── Moderation (client-side) ──────────────────────────────────────────── + + /// Block a user locally. Their messages will be hidden from display. + pub fn block_user(&self, identity_key: &[u8], reason: &str) -> Result<(), SdkError> { + let store = self.conversations()?; + store + .block_user(identity_key, reason) + .map_err(|e| SdkError::Storage(e.to_string()))?; + info!(identity = %hex::encode(identity_key), "user blocked"); + Ok(()) + } + + /// Unblock a user locally. + pub fn unblock_user(&self, identity_key: &[u8]) -> Result { + let store = self.conversations()?; + let removed = store + .unblock_user(identity_key) + .map_err(|e| SdkError::Storage(e.to_string()))?; + if removed { + info!(identity = %hex::encode(identity_key), "user unblocked"); + } + Ok(removed) + } + + /// Check if a user is blocked locally. + pub fn is_blocked(&self, identity_key: &[u8]) -> Result { + let store = self.conversations()?; + store + .is_blocked(identity_key) + .map_err(|e| SdkError::Storage(e.to_string())) + } + + /// List all locally blocked users. + pub fn list_blocked(&self) -> Result, SdkError> { + let store = self.conversations()?; + store + .list_blocked() + .map_err(|e| SdkError::Storage(e.to_string())) + } + /// Disconnect from the server. pub fn disconnect(&mut self) { if let Some(rpc) = self.rpc.take() { diff --git a/crates/quicproquo-sdk/src/conversation.rs b/crates/quicproquo-sdk/src/conversation.rs index 8810193..9ca0f94 100644 --- a/crates/quicproquo-sdk/src/conversation.rs +++ b/crates/quicproquo-sdk/src/conversation.rs @@ -96,6 +96,14 @@ pub struct OutboxEntry { pub retry_count: u32, } +/// A blocked user entry. +#[derive(Clone, Debug)] +pub struct BlockedUser { + pub identity_key: Vec, + pub blocked_at_ms: u64, + pub reason: String, +} + // ── ConversationStore ──────────────────────────────────────────────────────── /// SQLCipher-backed conversation and message store. @@ -171,7 +179,13 @@ impl ConversationStore { status TEXT NOT NULL DEFAULT 'pending' ); CREATE INDEX IF NOT EXISTS idx_outbox_status - ON outbox(status, created_at_ms);", + ON outbox(status, created_at_ms); + + CREATE TABLE IF NOT EXISTS blocked_users ( + identity_key BLOB PRIMARY KEY, + blocked_at_ms INTEGER NOT NULL, + reason TEXT NOT NULL DEFAULT '' + );", ) .context("migrate conversation db") } @@ -351,9 +365,9 @@ impl ConversationStore { Ok(()) } - /// Mark an outbox entry as failed (retryable up to 5 times). + /// Mark an outbox entry as failed (retryable up to 10 times). pub fn mark_outbox_failed(&self, id: i64, retry_count: u32) -> anyhow::Result<()> { - let new_status = if retry_count > 5 { "failed" } else { "pending" }; + let new_status = if retry_count > 10 { "failed" } else { "pending" }; self.conn.execute( "UPDATE outbox SET retry_count = ?2, status = ?3 WHERE id = ?1", params![id, retry_count, new_status], @@ -371,6 +385,125 @@ impl ConversationStore { Ok(count as usize) } + /// Delete all permanently failed outbox entries (status = 'failed'). + /// Returns the number of entries removed. + pub fn clear_failed_outbox(&self) -> anyhow::Result { + let count = self.conn.execute( + "DELETE FROM outbox WHERE status = 'failed'", + [], + )?; + Ok(count) + } + + // ── Sequence tracking ────────────────────────────────────────────────── + + /// Update the last seen server sequence number for a conversation. + /// Only increases; a lower seq is a no-op. + pub fn update_last_seen_seq( + &self, + conv_id: &ConversationId, + seq: u64, + ) -> anyhow::Result<()> { + self.conn.execute( + "UPDATE conversations SET last_seen_seq = ?2 WHERE id = ?1 AND last_seen_seq < ?2", + params![conv_id.0.as_slice(), seq as i64], + )?; + Ok(()) + } + + /// Get the last seen server sequence number for a conversation. + pub fn get_last_seen_seq(&self, conv_id: &ConversationId) -> anyhow::Result { + let seq: i64 = self + .conn + .query_row( + "SELECT last_seen_seq FROM conversations WHERE id = ?1", + params![conv_id.0.as_slice()], + |row| row.get(0), + ) + .optional()? + .unwrap_or(0); + Ok(seq as u64) + } + + // ── Blocked users ───────────────────────────────────────────────────── + + /// Block a user by identity key. + pub fn block_user(&self, identity_key: &[u8], reason: &str) -> anyhow::Result<()> { + self.conn.execute( + "INSERT OR REPLACE INTO blocked_users (identity_key, blocked_at_ms, reason) + VALUES (?1, ?2, ?3)", + params![identity_key, now_ms() as i64, reason], + )?; + Ok(()) + } + + /// Unblock a user. Returns true if the user was blocked. + pub fn unblock_user(&self, identity_key: &[u8]) -> anyhow::Result { + let rows = self.conn.execute( + "DELETE FROM blocked_users WHERE identity_key = ?1", + params![identity_key], + )?; + Ok(rows > 0) + } + + /// Check if a user is blocked. + pub fn is_blocked(&self, identity_key: &[u8]) -> anyhow::Result { + let count: i64 = self.conn.query_row( + "SELECT COUNT(*) FROM blocked_users WHERE identity_key = ?1", + params![identity_key], + |row| row.get(0), + )?; + Ok(count > 0) + } + + /// List all blocked users: (identity_key, blocked_at_ms, reason). + pub fn list_blocked(&self) -> anyhow::Result> { + let mut stmt = self.conn.prepare( + "SELECT identity_key, blocked_at_ms, reason FROM blocked_users + ORDER BY blocked_at_ms DESC", + )?; + let rows = stmt.query_map([], |row| { + let identity_key: Vec = row.get(0)?; + let blocked_at_ms: u64 = row.get(1)?; + let reason: String = row.get(2)?; + Ok(BlockedUser { + identity_key, + blocked_at_ms, + reason, + }) + })?; + let mut users = Vec::new(); + for row in rows { + users.push(row?); + } + Ok(users) + } + + /// Load recent messages, filtering out messages from blocked users. + pub fn load_recent_messages_filtered( + &self, + conv_id: &ConversationId, + limit: usize, + ) -> anyhow::Result> { + let mut stmt = self.conn.prepare( + "SELECT m.message_id, m.sender_key, m.sender_name, m.body, m.msg_type, + m.ref_msg_id, m.timestamp_ms, m.is_outgoing + FROM messages m + WHERE m.conversation_id = ?1 + AND NOT EXISTS ( + SELECT 1 FROM blocked_users b WHERE b.identity_key = m.sender_key + ) + ORDER BY m.timestamp_ms DESC LIMIT ?2", + )?; + let rows = stmt.query_map( + params![conv_id.0.as_slice(), limit.min(u32::MAX as usize) as u32], + |row| row_to_message(conv_id, row), + )?; + let mut msgs: Vec = rows.collect::>()?; + msgs.reverse(); + Ok(msgs) + } + /// Load recent messages (newest first, then reversed to chronological). pub fn load_recent_messages( &self, @@ -742,4 +875,109 @@ mod tests { assert!(ConversationId::from_slice(&[]).is_none()); assert!(ConversationId::from_slice(&[0u8; 16]).is_some()); } + + #[test] + fn block_unblock_user() { + let (_dir, store) = open_test_store(); + let ik = vec![42u8; 32]; + + assert!(!store.is_blocked(&ik).unwrap()); + + store.block_user(&ik, "spam").unwrap(); + assert!(store.is_blocked(&ik).unwrap()); + + let blocked = store.list_blocked().unwrap(); + assert_eq!(blocked.len(), 1); + assert_eq!(blocked[0].identity_key, ik); + assert_eq!(blocked[0].reason, "spam"); + + let removed = store.unblock_user(&ik).unwrap(); + assert!(removed); + assert!(!store.is_blocked(&ik).unwrap()); + assert!(store.list_blocked().unwrap().is_empty()); + } + + #[test] + fn block_user_idempotent() { + let (_dir, store) = open_test_store(); + let ik = vec![42u8; 32]; + + store.block_user(&ik, "first").unwrap(); + store.block_user(&ik, "updated").unwrap(); + + let blocked = store.list_blocked().unwrap(); + assert_eq!(blocked.len(), 1); + assert_eq!(blocked[0].reason, "updated"); + } + + #[test] + fn unblock_nonexistent_returns_false() { + let (_dir, store) = open_test_store(); + let removed = store.unblock_user(&[99u8; 32]).unwrap(); + assert!(!removed); + } + + #[test] + fn filtered_messages_exclude_blocked() { + let (_dir, store) = open_test_store(); + let conv = make_group_conv("modchat", 1000); + store.save_conversation(&conv).unwrap(); + + let alice_key = vec![1u8; 32]; + let bob_key = vec![2u8; 32]; + + // Alice sends 2 messages, Bob sends 1. + store.save_message(&StoredMessage { + conversation_id: conv.id.clone(), + message_id: None, + sender_key: alice_key.clone(), + sender_name: Some("alice".to_string()), + body: "hello from alice".to_string(), + msg_type: "chat".to_string(), + ref_msg_id: None, + timestamp_ms: 1000, + is_outgoing: false, + }).unwrap(); + + store.save_message(&StoredMessage { + conversation_id: conv.id.clone(), + message_id: None, + sender_key: bob_key.clone(), + sender_name: Some("bob".to_string()), + body: "hello from bob".to_string(), + msg_type: "chat".to_string(), + ref_msg_id: None, + timestamp_ms: 2000, + is_outgoing: false, + }).unwrap(); + + store.save_message(&StoredMessage { + conversation_id: conv.id.clone(), + message_id: None, + sender_key: alice_key.clone(), + sender_name: Some("alice".to_string()), + body: "another from alice".to_string(), + msg_type: "chat".to_string(), + ref_msg_id: None, + timestamp_ms: 3000, + is_outgoing: false, + }).unwrap(); + + // All 3 messages unfiltered. + let all = store.load_recent_messages(&conv.id, 10).unwrap(); + assert_eq!(all.len(), 3); + + // Block alice. + store.block_user(&alice_key, "spam").unwrap(); + + // Filtered: only bob's message. + let filtered = store.load_recent_messages_filtered(&conv.id, 10).unwrap(); + assert_eq!(filtered.len(), 1); + assert_eq!(filtered[0].body, "hello from bob"); + + // Unblock alice — all messages visible again. + store.unblock_user(&alice_key).unwrap(); + let unblocked = store.load_recent_messages_filtered(&conv.id, 10).unwrap(); + assert_eq!(unblocked.len(), 3); + } } diff --git a/crates/quicproquo-server/migrations/012_moderation.sql b/crates/quicproquo-server/migrations/012_moderation.sql new file mode 100644 index 0000000..128f0d8 --- /dev/null +++ b/crates/quicproquo-server/migrations/012_moderation.sql @@ -0,0 +1,20 @@ +-- Moderation tables: reports and bans. + +CREATE TABLE IF NOT EXISTS reports ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + encrypted_report BLOB NOT NULL, + conversation_id BLOB NOT NULL, + reporter_identity BLOB NOT NULL, + created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now')) +); + +CREATE INDEX IF NOT EXISTS idx_reports_created ON reports(created_at); + +CREATE TABLE IF NOT EXISTS bans ( + identity_key BLOB PRIMARY KEY, + reason TEXT NOT NULL DEFAULT '', + banned_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now')), + expires_at INTEGER NOT NULL DEFAULT 0 +); + +CREATE INDEX IF NOT EXISTS idx_bans_expires ON bans(expires_at); diff --git a/crates/quicproquo-server/src/domain/mod.rs b/crates/quicproquo-server/src/domain/mod.rs index dfd0b81..406fd81 100644 --- a/crates/quicproquo-server/src/domain/mod.rs +++ b/crates/quicproquo-server/src/domain/mod.rs @@ -13,5 +13,8 @@ pub mod channels; pub mod users; pub mod blobs; pub mod devices; +pub mod groups; pub mod p2p; pub mod account; +pub mod moderation; +pub mod recovery; diff --git a/crates/quicproquo-server/src/domain/moderation.rs b/crates/quicproquo-server/src/domain/moderation.rs new file mode 100644 index 0000000..31cc599 --- /dev/null +++ b/crates/quicproquo-server/src/domain/moderation.rs @@ -0,0 +1,304 @@ +//! Moderation domain logic — report, ban, unban, list. +//! +//! Pure business logic operating on `Store` trait and domain types. + +use std::sync::Arc; + +use crate::storage::Store; + +use super::types::*; + +/// Shared state needed by moderation operations. +pub struct ModerationService { + pub store: Arc, +} + +impl ModerationService { + /// Submit an encrypted report for a message. + pub fn report_message( + &self, + req: ReportMessageReq, + ) -> Result { + if req.encrypted_report.is_empty() { + return Err(DomainError::BadParams( + "encrypted report must not be empty".into(), + )); + } + + self.store + .store_report( + &req.encrypted_report, + &req.conversation_id, + &req.reporter_identity, + ) + .map_err(DomainError::Storage)?; + + tracing::info!( + reporter_prefix = %hex_prefix(&req.reporter_identity), + "audit: message reported" + ); + + Ok(ReportMessageResp { accepted: true }) + } + + /// Ban a user by identity key. + pub fn ban_user(&self, req: BanUserReq) -> Result { + if req.identity_key.len() != 32 { + return Err(DomainError::InvalidIdentityKey(req.identity_key.len())); + } + + let expires_at = if req.duration_secs == 0 { + 0 // permanent + } else { + now_secs() + req.duration_secs + }; + + self.store + .ban_user(&req.identity_key, &req.reason, expires_at) + .map_err(DomainError::Storage)?; + + tracing::info!( + identity_prefix = %hex_prefix(&req.identity_key), + reason = %req.reason, + expires_at, + "audit: user banned" + ); + + Ok(BanUserResp { success: true }) + } + + /// Unban a user by identity key. + pub fn unban_user(&self, req: UnbanUserReq) -> Result { + if req.identity_key.len() != 32 { + return Err(DomainError::InvalidIdentityKey(req.identity_key.len())); + } + + let removed = self + .store + .unban_user(&req.identity_key) + .map_err(DomainError::Storage)?; + + if removed { + tracing::info!( + identity_prefix = %hex_prefix(&req.identity_key), + "audit: user unbanned" + ); + } + + Ok(UnbanUserResp { success: removed }) + } + + /// Check if a user is currently banned. + pub fn check_ban(&self, identity_key: &[u8]) -> Result, DomainError> { + self.store + .is_banned(identity_key) + .map_err(DomainError::Storage) + } + + /// List reports with pagination. + pub fn list_reports(&self, req: ListReportsReq) -> Result { + let raw = self + .store + .list_reports(req.limit, req.offset) + .map_err(DomainError::Storage)?; + + let reports = raw + .into_iter() + .map( + |(id, encrypted_report, conversation_id, reporter_identity, timestamp)| { + ReportEntry { + id, + encrypted_report, + conversation_id, + reporter_identity, + timestamp, + } + }, + ) + .collect(); + + Ok(ListReportsResp { reports }) + } + + /// List all currently banned users. + pub fn list_banned(&self) -> Result { + let raw = self.store.list_banned().map_err(DomainError::Storage)?; + + let users = raw + .into_iter() + .map( + |(identity_key, reason, banned_at, expires_at)| BannedUserEntry { + identity_key, + reason, + banned_at, + expires_at, + }, + ) + .collect(); + + Ok(ListBannedResp { users }) + } +} + +fn now_secs() -> u64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs() +} + +fn hex_prefix(bytes: &[u8]) -> String { + let len = bytes.len().min(4); + let hex: String = bytes[..len].iter().map(|b| format!("{b:02x}")).collect(); + format!("{hex}...") +} + +#[cfg(test)] +#[allow(clippy::unwrap_used)] +mod tests { + use super::*; + use crate::storage::FileBackedStore; + + fn test_service() -> (tempfile::TempDir, ModerationService) { + let dir = tempfile::tempdir().unwrap(); + let store = Arc::new(FileBackedStore::open(dir.path()).unwrap()); + let svc = ModerationService { store }; + (dir, svc) + } + + #[test] + fn report_store_and_list() { + let (_dir, svc) = test_service(); + + let resp = svc + .report_message(ReportMessageReq { + encrypted_report: vec![1, 2, 3], + conversation_id: vec![10; 16], + reporter_identity: vec![20; 32], + }) + .unwrap(); + assert!(resp.accepted); + + let reports = svc + .list_reports(ListReportsReq { + limit: 10, + offset: 0, + }) + .unwrap(); + assert_eq!(reports.reports.len(), 1); + assert_eq!(reports.reports[0].encrypted_report, vec![1, 2, 3]); + assert_eq!(reports.reports[0].conversation_id, vec![10; 16]); + assert_eq!(reports.reports[0].reporter_identity, vec![20; 32]); + } + + #[test] + fn report_empty_rejected() { + let (_dir, svc) = test_service(); + let result = svc.report_message(ReportMessageReq { + encrypted_report: vec![], + conversation_id: vec![10; 16], + reporter_identity: vec![20; 32], + }); + assert!(result.is_err()); + } + + #[test] + fn ban_unban_lifecycle() { + let (_dir, svc) = test_service(); + let ik = vec![1u8; 32]; + + // Not banned initially. + assert!(svc.check_ban(&ik).unwrap().is_none()); + + // Ban permanently. + let resp = svc + .ban_user(BanUserReq { + identity_key: ik.clone(), + reason: "spam".into(), + duration_secs: 0, + }) + .unwrap(); + assert!(resp.success); + + // Now banned. + let reason = svc.check_ban(&ik).unwrap(); + assert_eq!(reason, Some("spam".to_string())); + + // Listed in banned users. + let banned = svc.list_banned().unwrap(); + assert_eq!(banned.users.len(), 1); + assert_eq!(banned.users[0].identity_key, ik); + assert_eq!(banned.users[0].reason, "spam"); + assert_eq!(banned.users[0].expires_at, 0); // permanent + + // Unban. + let resp = svc.unban_user(UnbanUserReq { identity_key: ik.clone() }).unwrap(); + assert!(resp.success); + + // No longer banned. + assert!(svc.check_ban(&ik).unwrap().is_none()); + assert!(svc.list_banned().unwrap().users.is_empty()); + } + + #[test] + fn ban_invalid_identity_key() { + let (_dir, svc) = test_service(); + let result = svc.ban_user(BanUserReq { + identity_key: vec![1u8; 16], // wrong length + reason: "test".into(), + duration_secs: 0, + }); + assert!(result.is_err()); + } + + #[test] + fn list_reports_pagination() { + let (_dir, svc) = test_service(); + + for i in 0..5u8 { + svc.report_message(ReportMessageReq { + encrypted_report: vec![i], + conversation_id: vec![10; 16], + reporter_identity: vec![20; 32], + }) + .unwrap(); + } + + let page1 = svc + .list_reports(ListReportsReq { + limit: 2, + offset: 0, + }) + .unwrap(); + assert_eq!(page1.reports.len(), 2); + assert_eq!(page1.reports[0].encrypted_report, vec![0]); + + let page2 = svc + .list_reports(ListReportsReq { + limit: 2, + offset: 2, + }) + .unwrap(); + assert_eq!(page2.reports.len(), 2); + assert_eq!(page2.reports[0].encrypted_report, vec![2]); + + let page3 = svc + .list_reports(ListReportsReq { + limit: 2, + offset: 4, + }) + .unwrap(); + assert_eq!(page3.reports.len(), 1); + } + + #[test] + fn unban_nonexistent_returns_false() { + let (_dir, svc) = test_service(); + let resp = svc + .unban_user(UnbanUserReq { + identity_key: vec![99u8; 32], + }) + .unwrap(); + assert!(!resp.success); + } +} diff --git a/crates/quicproquo-server/src/domain/types.rs b/crates/quicproquo-server/src/domain/types.rs index cc87d5b..c315377 100644 --- a/crates/quicproquo-server/src/domain/types.rs +++ b/crates/quicproquo-server/src/domain/types.rs @@ -42,6 +42,9 @@ pub enum DomainError { #[error("device not found")] DeviceNotFound, + #[error("group not found")] + GroupNotFound, + #[error("bad parameters: {0}")] BadParams(String), @@ -290,6 +293,96 @@ pub struct RevokeDeviceResp { pub success: bool, } +// ── Group metadata ─────────────────────────────────────────────────── + +pub struct GroupMetadata { + pub group_id: Vec, + pub name: String, + pub description: String, + pub avatar_hash: Vec, + pub creator_key: Vec, + pub created_at: u64, +} + +pub struct UpdateGroupMetadataReq { + pub group_id: Vec, + pub name: String, + pub description: String, + pub avatar_hash: Vec, +} + +pub struct ListGroupMembersReq { + pub group_id: Vec, +} + +pub struct GroupMemberInfo { + pub identity_key: Vec, + pub username: String, + pub joined_at: u64, +} + +pub struct ListGroupMembersResp { + pub members: Vec, +} + +// ── Moderation ─────────────────────────────────────────────────────────────── + +pub struct ReportMessageReq { + pub encrypted_report: Vec, + pub conversation_id: Vec, + pub reporter_identity: Vec, +} + +pub struct ReportMessageResp { + pub accepted: bool, +} + +pub struct BanUserReq { + pub identity_key: Vec, + pub reason: String, + pub duration_secs: u64, +} + +pub struct BanUserResp { + pub success: bool, +} + +pub struct UnbanUserReq { + pub identity_key: Vec, +} + +pub struct UnbanUserResp { + pub success: bool, +} + +pub struct ListReportsReq { + pub limit: u32, + pub offset: u32, +} + +pub struct ReportEntry { + pub id: u64, + pub encrypted_report: Vec, + pub conversation_id: Vec, + pub reporter_identity: Vec, + pub timestamp: u64, +} + +pub struct ListReportsResp { + pub reports: Vec, +} + +pub struct BannedUserEntry { + pub identity_key: Vec, + pub reason: String, + pub banned_at: u64, + pub expires_at: u64, +} + +pub struct ListBannedResp { + pub users: Vec, +} + // ── P2P ────────────────────────────────────────────────────────────────────── pub struct PublishEndpointReq { diff --git a/crates/quicproquo-server/src/error_codes.rs b/crates/quicproquo-server/src/error_codes.rs index 86ff3a7..37c54a2 100644 --- a/crates/quicproquo-server/src/error_codes.rs +++ b/crates/quicproquo-server/src/error_codes.rs @@ -33,6 +33,12 @@ pub const E027_BLOB_NOT_FOUND: &str = "E027"; pub const E028_ACCOUNT_DELETION_FAILED: &str = "E028"; pub const E029_DEVICE_LIMIT: &str = "E029"; pub const E030_DEVICE_NOT_FOUND: &str = "E030"; +#[allow(dead_code)] // used by v2 RPC moderation handlers +pub const E031_USER_BANNED: &str = "E031"; +#[allow(dead_code)] // used by v2 RPC moderation handlers +pub const E032_REPORT_EMPTY: &str = "E032"; +#[allow(dead_code)] // used by v2 RPC moderation handlers +pub const E033_ADMIN_REQUIRED: &str = "E033"; /// Build a `capnp::Error::failed()` with the structured code prefix. pub fn coded_error(code: &str, msg: impl std::fmt::Display) -> capnp::Error { diff --git a/crates/quicproquo-server/src/sql_store.rs b/crates/quicproquo-server/src/sql_store.rs index efbda90..f0b7bf5 100644 --- a/crates/quicproquo-server/src/sql_store.rs +++ b/crates/quicproquo-server/src/sql_store.rs @@ -10,7 +10,7 @@ use sha2::{Digest, Sha256}; use crate::storage::{SessionRecord, StorageError, Store}; /// Schema version after introducing the migration runner (existing DBs had 1). -const SCHEMA_VERSION: i32 = 11; +const SCHEMA_VERSION: i32 = 13; /// Default number of connections in the pool. const DEFAULT_POOL_SIZE: usize = 4; @@ -27,6 +27,8 @@ const MIGRATIONS: &[(i32, &str)] = &[ (9, include_str!("../migrations/008_devices.sql")), (10, include_str!("../migrations/009_sessions.sql")), (11, include_str!("../migrations/010_blobs.sql")), + (12, include_str!("../migrations/011_recovery_bundles.sql")), + (13, include_str!("../migrations/012_moderation.sql")), ]; /// Runs pending migrations on an open connection: applies any migration whose number is greater @@ -986,6 +988,190 @@ impl Store for SqlStore { .optional() .map_err(|e| StorageError::Db(e.to_string())) } + + fn store_group_metadata( + &self, + _group_id: &[u8], + _name: &str, + _description: &str, + _avatar_hash: &[u8], + _creator_key: &[u8], + ) -> Result<(), StorageError> { + Ok(()) + } + + fn get_group_metadata(&self, _group_id: &[u8]) -> Result, Vec, u64)>, StorageError> { + Ok(None) + } + + fn add_group_member(&self, _group_id: &[u8], _identity_key: &[u8]) -> Result<(), StorageError> { + Ok(()) + } + + fn remove_group_member(&self, _group_id: &[u8], _identity_key: &[u8]) -> Result { + Ok(false) + } + + fn list_group_members(&self, _group_id: &[u8]) -> Result, u64)>, StorageError> { + Ok(Vec::new()) + } + + fn store_recovery_bundle( + &self, + token_hash: &[u8], + bundle: Vec, + ttl_secs: u64, + ) -> Result<(), StorageError> { + let conn = self.get_conn()?; + conn.execute( + "INSERT OR REPLACE INTO recovery_bundles (token_hash, bundle, ttl_secs) VALUES (?1, ?2, ?3)", + params![token_hash, bundle, ttl_secs as i64], + ) + .map_err(|e| StorageError::Db(e.to_string()))?; + Ok(()) + } + + fn get_recovery_bundle(&self, token_hash: &[u8]) -> Result>, StorageError> { + let conn = self.get_conn()?; + let mut stmt = conn + .prepare("SELECT bundle FROM recovery_bundles WHERE token_hash = ?1") + .map_err(|e| StorageError::Db(e.to_string()))?; + stmt.query_row(params![token_hash], |row| row.get::<_, Vec>(0)) + .optional() + .map_err(|e| StorageError::Db(e.to_string())) + } + + fn delete_recovery_bundle(&self, token_hash: &[u8]) -> Result { + let conn = self.get_conn()?; + let affected = conn + .execute( + "DELETE FROM recovery_bundles WHERE token_hash = ?1", + params![token_hash], + ) + .map_err(|e| StorageError::Db(e.to_string()))?; + Ok(affected > 0) + } + + fn store_report( + &self, + encrypted_report: &[u8], + conversation_id: &[u8], + reporter_identity: &[u8], + ) -> Result { + let conn = self.get_conn()?; + conn.execute( + "INSERT INTO reports (encrypted_report, conversation_id, reporter_identity) + VALUES (?1, ?2, ?3)", + params![encrypted_report, conversation_id, reporter_identity], + ) + .map_err(|e| StorageError::Db(e.to_string()))?; + Ok(conn.last_insert_rowid() as u64) + } + + fn list_reports( + &self, + limit: u32, + offset: u32, + ) -> Result, Vec, Vec, u64)>, StorageError> { + let conn = self.get_conn()?; + let effective_limit = if limit == 0 { i64::MAX } else { limit as i64 }; + let mut stmt = conn + .prepare( + "SELECT id, encrypted_report, conversation_id, reporter_identity, created_at + FROM reports ORDER BY id LIMIT ?1 OFFSET ?2", + ) + .map_err(|e| StorageError::Db(e.to_string()))?; + let rows = stmt + .query_map(params![effective_limit, offset as i64], |row| { + Ok(( + row.get::<_, i64>(0)? as u64, + row.get::<_, Vec>(1)?, + row.get::<_, Vec>(2)?, + row.get::<_, Vec>(3)?, + row.get::<_, i64>(4)? as u64, + )) + }) + .map_err(|e| StorageError::Db(e.to_string()))?; + let mut result = Vec::new(); + for row in rows { + result.push(row.map_err(|e| StorageError::Db(e.to_string()))?); + } + Ok(result) + } + + fn ban_user( + &self, + identity_key: &[u8], + reason: &str, + expires_at: u64, + ) -> Result<(), StorageError> { + let conn = self.get_conn()?; + conn.execute( + "INSERT OR REPLACE INTO bans (identity_key, reason, expires_at) + VALUES (?1, ?2, ?3)", + params![identity_key, reason, expires_at as i64], + ) + .map_err(|e| StorageError::Db(e.to_string()))?; + Ok(()) + } + + fn unban_user(&self, identity_key: &[u8]) -> Result { + let conn = self.get_conn()?; + let affected = conn + .execute( + "DELETE FROM bans WHERE identity_key = ?1", + params![identity_key], + ) + .map_err(|e| StorageError::Db(e.to_string()))?; + Ok(affected > 0) + } + + fn is_banned(&self, identity_key: &[u8]) -> Result, StorageError> { + let conn = self.get_conn()?; + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs() as i64; + let mut stmt = conn + .prepare( + "SELECT reason FROM bans + WHERE identity_key = ?1 AND (expires_at = 0 OR expires_at > ?2)", + ) + .map_err(|e| StorageError::Db(e.to_string()))?; + stmt.query_row(params![identity_key, now], |row| row.get::<_, String>(0)) + .optional() + .map_err(|e| StorageError::Db(e.to_string())) + } + + fn list_banned(&self) -> Result, String, u64, u64)>, StorageError> { + let conn = self.get_conn()?; + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs() as i64; + let mut stmt = conn + .prepare( + "SELECT identity_key, reason, banned_at, expires_at FROM bans + WHERE expires_at = 0 OR expires_at > ?1 + ORDER BY banned_at", + ) + .map_err(|e| StorageError::Db(e.to_string()))?; + let rows = stmt + .query_map(params![now], |row| { + Ok(( + row.get::<_, Vec>(0)?, + row.get::<_, String>(1)?, + row.get::<_, i64>(2)? as u64, + row.get::<_, i64>(3)? as u64, + )) + }) + .map_err(|e| StorageError::Db(e.to_string()))?; + let mut result = Vec::new(); + for row in rows { + result.push(row.map_err(|e| StorageError::Db(e.to_string()))?); + } + Ok(result) + } } /// Convenience extension for `rusqlite::OptionalExtension`. diff --git a/crates/quicproquo-server/src/storage.rs b/crates/quicproquo-server/src/storage.rs index 6d2563a..35fa8fb 100644 --- a/crates/quicproquo-server/src/storage.rs +++ b/crates/quicproquo-server/src/storage.rs @@ -208,6 +208,76 @@ pub trait Store: Send + Sync { /// Return the number of registered devices for an identity. fn device_count(&self, identity_key: &[u8]) -> Result; + // ── Group metadata ───────────────────────────────────────────────── + + /// Store group metadata (name, description, avatar_hash). + fn store_group_metadata( + &self, + group_id: &[u8], + name: &str, + description: &str, + avatar_hash: &[u8], + creator_key: &[u8], + ) -> Result<(), StorageError>; + + /// Retrieve group metadata by group_id. + fn get_group_metadata(&self, group_id: &[u8]) -> Result, Vec, u64)>, StorageError>; + + /// Store a group membership record. + fn add_group_member( + &self, + group_id: &[u8], + identity_key: &[u8], + ) -> Result<(), StorageError>; + + /// Remove a group membership record. + fn remove_group_member( + &self, + group_id: &[u8], + identity_key: &[u8], + ) -> Result; + + /// List group members: (identity_key, joined_at). + fn list_group_members( + &self, + group_id: &[u8], + ) -> Result, u64)>, StorageError>; + + // ── Moderation ─────────────────────────────────────────────────────────── + + /// Store an encrypted report. Returns the report ID. + fn store_report( + &self, + encrypted_report: &[u8], + conversation_id: &[u8], + reporter_identity: &[u8], + ) -> Result; + + /// List reports with pagination: (id, encrypted_report, conversation_id, reporter_identity, timestamp). + #[allow(clippy::type_complexity)] + fn list_reports( + &self, + limit: u32, + offset: u32, + ) -> Result, Vec, Vec, u64)>, StorageError>; + + /// Ban a user. `expires_at` = 0 means permanent. + fn ban_user( + &self, + identity_key: &[u8], + reason: &str, + expires_at: u64, + ) -> Result<(), StorageError>; + + /// Unban a user. Returns false if the user was not banned. + fn unban_user(&self, identity_key: &[u8]) -> Result; + + /// Check if a user is currently banned (not expired). Returns ban reason if banned. + fn is_banned(&self, identity_key: &[u8]) -> Result, StorageError>; + + /// List all currently banned users: (identity_key, reason, banned_at, expires_at). + fn list_banned(&self) -> Result, String, u64, u64)>, StorageError>; + // ── Session persistence ──────────────────────────────────────────────── /// Store a session token → record mapping. @@ -230,6 +300,23 @@ pub trait Store: Send + Sync { Ok(()) } + // ── Recovery bundle storage ───────────────────────────────────────────── + + /// Store an encrypted recovery bundle keyed by token_hash. + /// `ttl_secs` is advisory (for GC); 0 means no expiry. + fn store_recovery_bundle( + &self, + token_hash: &[u8], + bundle: Vec, + ttl_secs: u64, + ) -> Result<(), StorageError>; + + /// Fetch an encrypted recovery bundle by token_hash. + fn get_recovery_bundle(&self, token_hash: &[u8]) -> Result>, StorageError>; + + /// Delete an encrypted recovery bundle by token_hash. Returns true if found. + fn delete_recovery_bundle(&self, token_hash: &[u8]) -> Result; + // ── Blob storage ─────────────────────────────────────────────────────── /// Append a chunk to the staging area for an in-progress upload. @@ -322,6 +409,21 @@ pub struct FileBackedStore { /// Device registry: identity_key -> Vec<(device_id, device_name, registered_at)> #[allow(clippy::type_complexity)] devices: Mutex, Vec<(Vec, String, u64)>>>, + /// Group metadata: group_id -> (name, description, avatar_hash, creator_key, created_at) + #[allow(clippy::type_complexity)] + group_metadata: Mutex, (String, String, Vec, Vec, u64)>>, + /// Group membership: group_id -> Vec<(identity_key, joined_at)> + #[allow(clippy::type_complexity)] + group_members: Mutex, Vec<(Vec, u64)>>>, + /// Reports: Vec<(id, encrypted_report, conversation_id, reporter_identity, timestamp)> + #[allow(clippy::type_complexity)] + reports: Mutex, Vec, Vec, u64)>>, + /// Next report ID counter. + next_report_id: Mutex, + /// Banned users: identity_key -> (reason, banned_at, expires_at) + bans: Mutex, (String, u64, u64)>>, + /// Recovery bundles: token_hash -> encrypted bundle bytes. + recovery_bundles: Mutex, Vec>>, } impl FileBackedStore { @@ -365,6 +467,12 @@ impl FileBackedStore { identity_keys, endpoints: Mutex::new(HashMap::new()), devices: Mutex::new(HashMap::new()), + group_metadata: Mutex::new(HashMap::new()), + group_members: Mutex::new(HashMap::new()), + reports: Mutex::new(Vec::new()), + next_report_id: Mutex::new(1), + bans: Mutex::new(HashMap::new()), + recovery_bundles: Mutex::new(HashMap::new()), }) } @@ -956,6 +1064,191 @@ impl Store for FileBackedStore { let map = lock(&self.devices)?; Ok(map.get(identity_key).map(|v| v.len()).unwrap_or(0)) } + + fn store_group_metadata( + &self, + group_id: &[u8], + name: &str, + description: &str, + avatar_hash: &[u8], + creator_key: &[u8], + ) -> Result<(), StorageError> { + let mut map = lock(&self.group_metadata)?; + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + let entry = map.entry(group_id.to_vec()).or_insert_with(|| { + (String::new(), String::new(), Vec::new(), creator_key.to_vec(), now) + }); + entry.0 = name.to_string(); + entry.1 = description.to_string(); + entry.2 = avatar_hash.to_vec(); + Ok(()) + } + + fn get_group_metadata(&self, group_id: &[u8]) -> Result, Vec, u64)>, StorageError> { + let map = lock(&self.group_metadata)?; + Ok(map.get(group_id).cloned()) + } + + fn add_group_member( + &self, + group_id: &[u8], + identity_key: &[u8], + ) -> Result<(), StorageError> { + let mut map = lock(&self.group_members)?; + let members = map.entry(group_id.to_vec()).or_default(); + if !members.iter().any(|(k, _)| k == identity_key) { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + members.push((identity_key.to_vec(), now)); + } + Ok(()) + } + + fn remove_group_member( + &self, + group_id: &[u8], + identity_key: &[u8], + ) -> Result { + let mut map = lock(&self.group_members)?; + if let Some(members) = map.get_mut(group_id) { + let before = members.len(); + members.retain(|(k, _)| k != identity_key); + Ok(members.len() < before) + } else { + Ok(false) + } + } + + fn list_group_members( + &self, + group_id: &[u8], + ) -> Result, u64)>, StorageError> { + let map = lock(&self.group_members)?; + Ok(map.get(group_id).cloned().unwrap_or_default()) + } + + fn store_report( + &self, + encrypted_report: &[u8], + conversation_id: &[u8], + reporter_identity: &[u8], + ) -> Result { + let mut id_counter = lock(&self.next_report_id)?; + let id = *id_counter; + *id_counter = id + 1; + + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + + let mut reports = lock(&self.reports)?; + reports.push(( + id, + encrypted_report.to_vec(), + conversation_id.to_vec(), + reporter_identity.to_vec(), + now, + )); + Ok(id) + } + + fn list_reports( + &self, + limit: u32, + offset: u32, + ) -> Result, Vec, Vec, u64)>, StorageError> { + let reports = lock(&self.reports)?; + let result = reports + .iter() + .skip(offset as usize) + .take(if limit == 0 { usize::MAX } else { limit as usize }) + .cloned() + .collect(); + Ok(result) + } + + fn ban_user( + &self, + identity_key: &[u8], + reason: &str, + expires_at: u64, + ) -> Result<(), StorageError> { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + + let mut bans = lock(&self.bans)?; + bans.insert( + identity_key.to_vec(), + (reason.to_string(), now, expires_at), + ); + Ok(()) + } + + fn unban_user(&self, identity_key: &[u8]) -> Result { + let mut bans = lock(&self.bans)?; + Ok(bans.remove(identity_key).is_some()) + } + + fn is_banned(&self, identity_key: &[u8]) -> Result, StorageError> { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + + let bans = lock(&self.bans)?; + if let Some((reason, _banned_at, expires_at)) = bans.get(identity_key) { + if *expires_at == 0 || *expires_at > now { + return Ok(Some(reason.clone())); + } + } + Ok(None) + } + + fn list_banned(&self) -> Result, String, u64, u64)>, StorageError> { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + + let bans = lock(&self.bans)?; + let result = bans + .iter() + .filter(|(_, (_, _, expires_at))| *expires_at == 0 || *expires_at > now) + .map(|(ik, (reason, banned_at, expires_at))| { + (ik.clone(), reason.clone(), *banned_at, *expires_at) + }) + .collect(); + Ok(result) + } + + fn store_recovery_bundle( + &self, + token_hash: &[u8], + bundle: Vec, + _ttl_secs: u64, + ) -> Result<(), StorageError> { + let mut map = lock(&self.recovery_bundles)?; + map.insert(token_hash.to_vec(), bundle); + Ok(()) + } + + fn get_recovery_bundle(&self, token_hash: &[u8]) -> Result>, StorageError> { + let map = lock(&self.recovery_bundles)?; + Ok(map.get(token_hash).cloned()) + } + + fn delete_recovery_bundle(&self, token_hash: &[u8]) -> Result { + let mut map = lock(&self.recovery_bundles)?; + Ok(map.remove(token_hash).is_some()) + } } #[cfg(test)] @@ -1108,4 +1401,72 @@ mod tests { assert_ne!(id_ab, id_bc); assert_ne!(id_ac, id_bc); } + + #[test] + fn report_store_and_list() { + let (_dir, store) = temp_store(); + let id1 = store.store_report(b"report1", b"conv1", b"alice").unwrap(); + let id2 = store.store_report(b"report2", b"conv2", b"bob").unwrap(); + assert_ne!(id1, id2); + + let reports = store.list_reports(10, 0).unwrap(); + assert_eq!(reports.len(), 2); + assert_eq!(reports[0].1, b"report1"); + assert_eq!(reports[1].1, b"report2"); + + // Pagination: offset=1 + let page = store.list_reports(10, 1).unwrap(); + assert_eq!(page.len(), 1); + assert_eq!(page[0].1, b"report2"); + } + + #[test] + fn ban_unban_user() { + let (_dir, store) = temp_store(); + let ik = vec![10u8; 32]; + + // Not banned initially. + assert!(store.is_banned(&ik).unwrap().is_none()); + + // Ban permanently (expires_at = 0). + store.ban_user(&ik, "spam", 0).unwrap(); + assert_eq!(store.is_banned(&ik).unwrap(), Some("spam".to_string())); + + let banned = store.list_banned().unwrap(); + assert_eq!(banned.len(), 1); + assert_eq!(banned[0].0, ik); + + // Unban. + assert!(store.unban_user(&ik).unwrap()); + assert!(store.is_banned(&ik).unwrap().is_none()); + assert!(store.list_banned().unwrap().is_empty()); + + // Unban nonexistent. + assert!(!store.unban_user(&ik).unwrap()); + } + + #[test] + fn ban_with_expiry_in_future() { + let (_dir, store) = temp_store(); + let ik = vec![11u8; 32]; + + // Ban with far-future expiry. + let future = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs() + 3600; + store.ban_user(&ik, "test", future).unwrap(); + assert!(store.is_banned(&ik).unwrap().is_some()); + } + + #[test] + fn ban_with_past_expiry_not_active() { + let (_dir, store) = temp_store(); + let ik = vec![12u8; 32]; + + // Ban with past expiry (already expired). + store.ban_user(&ik, "expired", 1).unwrap(); + assert!(store.is_banned(&ik).unwrap().is_none()); + assert!(store.list_banned().unwrap().is_empty()); + } }