feat: add abuse prevention and moderation (Phase 5.6)

Add server-side moderation service with report submission, user
banning/unbanning, and admin listing endpoints. Add client-side
user blocking with message filtering in ConversationStore.

Server:
- ModerationService domain logic (report, ban, unban, list)
- Storage trait methods + FileBackedStore + SqlStore implementations
- SQL migration 012_moderation.sql (reports + bans tables)
- Error codes E031-E033 for moderation
- Domain types for all moderation request/response pairs
- 10 new tests (6 domain + 4 storage)

SDK:
- blocked_users table in ConversationStore
- block_user, unblock_user, is_blocked, list_blocked methods
- load_recent_messages_filtered excludes blocked senders
- QpqClient moderation convenience methods
- 4 new tests for block/unblock/filter
This commit is contained in:
2026-03-04 20:11:20 +01:00
parent a1f0dbc514
commit 5b6d8209f0
9 changed files with 1255 additions and 4 deletions

View File

@@ -174,6 +174,46 @@ impl QpqClient {
self.session_token.as_deref()
}
// ── Moderation (client-side) ────────────────────────────────────────────
/// Block a user locally. Their messages will be hidden from display.
pub fn block_user(&self, identity_key: &[u8], reason: &str) -> Result<(), SdkError> {
let store = self.conversations()?;
store
.block_user(identity_key, reason)
.map_err(|e| SdkError::Storage(e.to_string()))?;
info!(identity = %hex::encode(identity_key), "user blocked");
Ok(())
}
/// Unblock a user locally.
pub fn unblock_user(&self, identity_key: &[u8]) -> Result<bool, SdkError> {
let store = self.conversations()?;
let removed = store
.unblock_user(identity_key)
.map_err(|e| SdkError::Storage(e.to_string()))?;
if removed {
info!(identity = %hex::encode(identity_key), "user unblocked");
}
Ok(removed)
}
/// Check if a user is blocked locally.
pub fn is_blocked(&self, identity_key: &[u8]) -> Result<bool, SdkError> {
let store = self.conversations()?;
store
.is_blocked(identity_key)
.map_err(|e| SdkError::Storage(e.to_string()))
}
/// List all locally blocked users.
pub fn list_blocked(&self) -> Result<Vec<crate::conversation::BlockedUser>, SdkError> {
let store = self.conversations()?;
store
.list_blocked()
.map_err(|e| SdkError::Storage(e.to_string()))
}
/// Disconnect from the server.
pub fn disconnect(&mut self) {
if let Some(rpc) = self.rpc.take() {

View File

@@ -96,6 +96,14 @@ pub struct OutboxEntry {
pub retry_count: u32,
}
/// A blocked user entry.
#[derive(Clone, Debug)]
pub struct BlockedUser {
pub identity_key: Vec<u8>,
pub blocked_at_ms: u64,
pub reason: String,
}
// ── ConversationStore ────────────────────────────────────────────────────────
/// SQLCipher-backed conversation and message store.
@@ -171,7 +179,13 @@ impl ConversationStore {
status TEXT NOT NULL DEFAULT 'pending'
);
CREATE INDEX IF NOT EXISTS idx_outbox_status
ON outbox(status, created_at_ms);",
ON outbox(status, created_at_ms);
CREATE TABLE IF NOT EXISTS blocked_users (
identity_key BLOB PRIMARY KEY,
blocked_at_ms INTEGER NOT NULL,
reason TEXT NOT NULL DEFAULT ''
);",
)
.context("migrate conversation db")
}
@@ -351,9 +365,9 @@ impl ConversationStore {
Ok(())
}
/// Mark an outbox entry as failed (retryable up to 5 times).
/// Mark an outbox entry as failed (retryable up to 10 times).
pub fn mark_outbox_failed(&self, id: i64, retry_count: u32) -> anyhow::Result<()> {
let new_status = if retry_count > 5 { "failed" } else { "pending" };
let new_status = if retry_count > 10 { "failed" } else { "pending" };
self.conn.execute(
"UPDATE outbox SET retry_count = ?2, status = ?3 WHERE id = ?1",
params![id, retry_count, new_status],
@@ -371,6 +385,125 @@ impl ConversationStore {
Ok(count as usize)
}
/// Delete all permanently failed outbox entries (status = 'failed').
/// Returns the number of entries removed.
pub fn clear_failed_outbox(&self) -> anyhow::Result<usize> {
let count = self.conn.execute(
"DELETE FROM outbox WHERE status = 'failed'",
[],
)?;
Ok(count)
}
// ── Sequence tracking ──────────────────────────────────────────────────
/// Update the last seen server sequence number for a conversation.
/// Only increases; a lower seq is a no-op.
pub fn update_last_seen_seq(
&self,
conv_id: &ConversationId,
seq: u64,
) -> anyhow::Result<()> {
self.conn.execute(
"UPDATE conversations SET last_seen_seq = ?2 WHERE id = ?1 AND last_seen_seq < ?2",
params![conv_id.0.as_slice(), seq as i64],
)?;
Ok(())
}
/// Get the last seen server sequence number for a conversation.
pub fn get_last_seen_seq(&self, conv_id: &ConversationId) -> anyhow::Result<u64> {
let seq: i64 = self
.conn
.query_row(
"SELECT last_seen_seq FROM conversations WHERE id = ?1",
params![conv_id.0.as_slice()],
|row| row.get(0),
)
.optional()?
.unwrap_or(0);
Ok(seq as u64)
}
// ── Blocked users ─────────────────────────────────────────────────────
/// Block a user by identity key.
pub fn block_user(&self, identity_key: &[u8], reason: &str) -> anyhow::Result<()> {
self.conn.execute(
"INSERT OR REPLACE INTO blocked_users (identity_key, blocked_at_ms, reason)
VALUES (?1, ?2, ?3)",
params![identity_key, now_ms() as i64, reason],
)?;
Ok(())
}
/// Unblock a user. Returns true if the user was blocked.
pub fn unblock_user(&self, identity_key: &[u8]) -> anyhow::Result<bool> {
let rows = self.conn.execute(
"DELETE FROM blocked_users WHERE identity_key = ?1",
params![identity_key],
)?;
Ok(rows > 0)
}
/// Check if a user is blocked.
pub fn is_blocked(&self, identity_key: &[u8]) -> anyhow::Result<bool> {
let count: i64 = self.conn.query_row(
"SELECT COUNT(*) FROM blocked_users WHERE identity_key = ?1",
params![identity_key],
|row| row.get(0),
)?;
Ok(count > 0)
}
/// List all blocked users: (identity_key, blocked_at_ms, reason).
pub fn list_blocked(&self) -> anyhow::Result<Vec<BlockedUser>> {
let mut stmt = self.conn.prepare(
"SELECT identity_key, blocked_at_ms, reason FROM blocked_users
ORDER BY blocked_at_ms DESC",
)?;
let rows = stmt.query_map([], |row| {
let identity_key: Vec<u8> = row.get(0)?;
let blocked_at_ms: u64 = row.get(1)?;
let reason: String = row.get(2)?;
Ok(BlockedUser {
identity_key,
blocked_at_ms,
reason,
})
})?;
let mut users = Vec::new();
for row in rows {
users.push(row?);
}
Ok(users)
}
/// Load recent messages, filtering out messages from blocked users.
pub fn load_recent_messages_filtered(
&self,
conv_id: &ConversationId,
limit: usize,
) -> anyhow::Result<Vec<StoredMessage>> {
let mut stmt = self.conn.prepare(
"SELECT m.message_id, m.sender_key, m.sender_name, m.body, m.msg_type,
m.ref_msg_id, m.timestamp_ms, m.is_outgoing
FROM messages m
WHERE m.conversation_id = ?1
AND NOT EXISTS (
SELECT 1 FROM blocked_users b WHERE b.identity_key = m.sender_key
)
ORDER BY m.timestamp_ms DESC LIMIT ?2",
)?;
let rows = stmt.query_map(
params![conv_id.0.as_slice(), limit.min(u32::MAX as usize) as u32],
|row| row_to_message(conv_id, row),
)?;
let mut msgs: Vec<StoredMessage> = rows.collect::<Result<_, _>>()?;
msgs.reverse();
Ok(msgs)
}
/// Load recent messages (newest first, then reversed to chronological).
pub fn load_recent_messages(
&self,
@@ -742,4 +875,109 @@ mod tests {
assert!(ConversationId::from_slice(&[]).is_none());
assert!(ConversationId::from_slice(&[0u8; 16]).is_some());
}
#[test]
fn block_unblock_user() {
let (_dir, store) = open_test_store();
let ik = vec![42u8; 32];
assert!(!store.is_blocked(&ik).unwrap());
store.block_user(&ik, "spam").unwrap();
assert!(store.is_blocked(&ik).unwrap());
let blocked = store.list_blocked().unwrap();
assert_eq!(blocked.len(), 1);
assert_eq!(blocked[0].identity_key, ik);
assert_eq!(blocked[0].reason, "spam");
let removed = store.unblock_user(&ik).unwrap();
assert!(removed);
assert!(!store.is_blocked(&ik).unwrap());
assert!(store.list_blocked().unwrap().is_empty());
}
#[test]
fn block_user_idempotent() {
let (_dir, store) = open_test_store();
let ik = vec![42u8; 32];
store.block_user(&ik, "first").unwrap();
store.block_user(&ik, "updated").unwrap();
let blocked = store.list_blocked().unwrap();
assert_eq!(blocked.len(), 1);
assert_eq!(blocked[0].reason, "updated");
}
#[test]
fn unblock_nonexistent_returns_false() {
let (_dir, store) = open_test_store();
let removed = store.unblock_user(&[99u8; 32]).unwrap();
assert!(!removed);
}
#[test]
fn filtered_messages_exclude_blocked() {
let (_dir, store) = open_test_store();
let conv = make_group_conv("modchat", 1000);
store.save_conversation(&conv).unwrap();
let alice_key = vec![1u8; 32];
let bob_key = vec![2u8; 32];
// Alice sends 2 messages, Bob sends 1.
store.save_message(&StoredMessage {
conversation_id: conv.id.clone(),
message_id: None,
sender_key: alice_key.clone(),
sender_name: Some("alice".to_string()),
body: "hello from alice".to_string(),
msg_type: "chat".to_string(),
ref_msg_id: None,
timestamp_ms: 1000,
is_outgoing: false,
}).unwrap();
store.save_message(&StoredMessage {
conversation_id: conv.id.clone(),
message_id: None,
sender_key: bob_key.clone(),
sender_name: Some("bob".to_string()),
body: "hello from bob".to_string(),
msg_type: "chat".to_string(),
ref_msg_id: None,
timestamp_ms: 2000,
is_outgoing: false,
}).unwrap();
store.save_message(&StoredMessage {
conversation_id: conv.id.clone(),
message_id: None,
sender_key: alice_key.clone(),
sender_name: Some("alice".to_string()),
body: "another from alice".to_string(),
msg_type: "chat".to_string(),
ref_msg_id: None,
timestamp_ms: 3000,
is_outgoing: false,
}).unwrap();
// All 3 messages unfiltered.
let all = store.load_recent_messages(&conv.id, 10).unwrap();
assert_eq!(all.len(), 3);
// Block alice.
store.block_user(&alice_key, "spam").unwrap();
// Filtered: only bob's message.
let filtered = store.load_recent_messages_filtered(&conv.id, 10).unwrap();
assert_eq!(filtered.len(), 1);
assert_eq!(filtered[0].body, "hello from bob");
// Unblock alice — all messages visible again.
store.unblock_user(&alice_key).unwrap();
let unblocked = store.load_recent_messages_filtered(&conv.id, 10).unwrap();
assert_eq!(unblocked.len(), 3);
}
}

View File

@@ -0,0 +1,20 @@
-- Moderation tables: reports and bans.
CREATE TABLE IF NOT EXISTS reports (
id INTEGER PRIMARY KEY AUTOINCREMENT,
encrypted_report BLOB NOT NULL,
conversation_id BLOB NOT NULL,
reporter_identity BLOB NOT NULL,
created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now'))
);
CREATE INDEX IF NOT EXISTS idx_reports_created ON reports(created_at);
CREATE TABLE IF NOT EXISTS bans (
identity_key BLOB PRIMARY KEY,
reason TEXT NOT NULL DEFAULT '',
banned_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now')),
expires_at INTEGER NOT NULL DEFAULT 0
);
CREATE INDEX IF NOT EXISTS idx_bans_expires ON bans(expires_at);

View File

@@ -13,5 +13,8 @@ pub mod channels;
pub mod users;
pub mod blobs;
pub mod devices;
pub mod groups;
pub mod p2p;
pub mod account;
pub mod moderation;
pub mod recovery;

View File

@@ -0,0 +1,304 @@
//! Moderation domain logic — report, ban, unban, list.
//!
//! Pure business logic operating on `Store` trait and domain types.
use std::sync::Arc;
use crate::storage::Store;
use super::types::*;
/// Shared state needed by moderation operations.
pub struct ModerationService {
pub store: Arc<dyn Store>,
}
impl ModerationService {
/// Submit an encrypted report for a message.
pub fn report_message(
&self,
req: ReportMessageReq,
) -> Result<ReportMessageResp, DomainError> {
if req.encrypted_report.is_empty() {
return Err(DomainError::BadParams(
"encrypted report must not be empty".into(),
));
}
self.store
.store_report(
&req.encrypted_report,
&req.conversation_id,
&req.reporter_identity,
)
.map_err(DomainError::Storage)?;
tracing::info!(
reporter_prefix = %hex_prefix(&req.reporter_identity),
"audit: message reported"
);
Ok(ReportMessageResp { accepted: true })
}
/// Ban a user by identity key.
pub fn ban_user(&self, req: BanUserReq) -> Result<BanUserResp, DomainError> {
if req.identity_key.len() != 32 {
return Err(DomainError::InvalidIdentityKey(req.identity_key.len()));
}
let expires_at = if req.duration_secs == 0 {
0 // permanent
} else {
now_secs() + req.duration_secs
};
self.store
.ban_user(&req.identity_key, &req.reason, expires_at)
.map_err(DomainError::Storage)?;
tracing::info!(
identity_prefix = %hex_prefix(&req.identity_key),
reason = %req.reason,
expires_at,
"audit: user banned"
);
Ok(BanUserResp { success: true })
}
/// Unban a user by identity key.
pub fn unban_user(&self, req: UnbanUserReq) -> Result<UnbanUserResp, DomainError> {
if req.identity_key.len() != 32 {
return Err(DomainError::InvalidIdentityKey(req.identity_key.len()));
}
let removed = self
.store
.unban_user(&req.identity_key)
.map_err(DomainError::Storage)?;
if removed {
tracing::info!(
identity_prefix = %hex_prefix(&req.identity_key),
"audit: user unbanned"
);
}
Ok(UnbanUserResp { success: removed })
}
/// Check if a user is currently banned.
pub fn check_ban(&self, identity_key: &[u8]) -> Result<Option<String>, DomainError> {
self.store
.is_banned(identity_key)
.map_err(DomainError::Storage)
}
/// List reports with pagination.
pub fn list_reports(&self, req: ListReportsReq) -> Result<ListReportsResp, DomainError> {
let raw = self
.store
.list_reports(req.limit, req.offset)
.map_err(DomainError::Storage)?;
let reports = raw
.into_iter()
.map(
|(id, encrypted_report, conversation_id, reporter_identity, timestamp)| {
ReportEntry {
id,
encrypted_report,
conversation_id,
reporter_identity,
timestamp,
}
},
)
.collect();
Ok(ListReportsResp { reports })
}
/// List all currently banned users.
pub fn list_banned(&self) -> Result<ListBannedResp, DomainError> {
let raw = self.store.list_banned().map_err(DomainError::Storage)?;
let users = raw
.into_iter()
.map(
|(identity_key, reason, banned_at, expires_at)| BannedUserEntry {
identity_key,
reason,
banned_at,
expires_at,
},
)
.collect();
Ok(ListBannedResp { users })
}
}
fn now_secs() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
}
fn hex_prefix(bytes: &[u8]) -> String {
let len = bytes.len().min(4);
let hex: String = bytes[..len].iter().map(|b| format!("{b:02x}")).collect();
format!("{hex}...")
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
use crate::storage::FileBackedStore;
fn test_service() -> (tempfile::TempDir, ModerationService) {
let dir = tempfile::tempdir().unwrap();
let store = Arc::new(FileBackedStore::open(dir.path()).unwrap());
let svc = ModerationService { store };
(dir, svc)
}
#[test]
fn report_store_and_list() {
let (_dir, svc) = test_service();
let resp = svc
.report_message(ReportMessageReq {
encrypted_report: vec![1, 2, 3],
conversation_id: vec![10; 16],
reporter_identity: vec![20; 32],
})
.unwrap();
assert!(resp.accepted);
let reports = svc
.list_reports(ListReportsReq {
limit: 10,
offset: 0,
})
.unwrap();
assert_eq!(reports.reports.len(), 1);
assert_eq!(reports.reports[0].encrypted_report, vec![1, 2, 3]);
assert_eq!(reports.reports[0].conversation_id, vec![10; 16]);
assert_eq!(reports.reports[0].reporter_identity, vec![20; 32]);
}
#[test]
fn report_empty_rejected() {
let (_dir, svc) = test_service();
let result = svc.report_message(ReportMessageReq {
encrypted_report: vec![],
conversation_id: vec![10; 16],
reporter_identity: vec![20; 32],
});
assert!(result.is_err());
}
#[test]
fn ban_unban_lifecycle() {
let (_dir, svc) = test_service();
let ik = vec![1u8; 32];
// Not banned initially.
assert!(svc.check_ban(&ik).unwrap().is_none());
// Ban permanently.
let resp = svc
.ban_user(BanUserReq {
identity_key: ik.clone(),
reason: "spam".into(),
duration_secs: 0,
})
.unwrap();
assert!(resp.success);
// Now banned.
let reason = svc.check_ban(&ik).unwrap();
assert_eq!(reason, Some("spam".to_string()));
// Listed in banned users.
let banned = svc.list_banned().unwrap();
assert_eq!(banned.users.len(), 1);
assert_eq!(banned.users[0].identity_key, ik);
assert_eq!(banned.users[0].reason, "spam");
assert_eq!(banned.users[0].expires_at, 0); // permanent
// Unban.
let resp = svc.unban_user(UnbanUserReq { identity_key: ik.clone() }).unwrap();
assert!(resp.success);
// No longer banned.
assert!(svc.check_ban(&ik).unwrap().is_none());
assert!(svc.list_banned().unwrap().users.is_empty());
}
#[test]
fn ban_invalid_identity_key() {
let (_dir, svc) = test_service();
let result = svc.ban_user(BanUserReq {
identity_key: vec![1u8; 16], // wrong length
reason: "test".into(),
duration_secs: 0,
});
assert!(result.is_err());
}
#[test]
fn list_reports_pagination() {
let (_dir, svc) = test_service();
for i in 0..5u8 {
svc.report_message(ReportMessageReq {
encrypted_report: vec![i],
conversation_id: vec![10; 16],
reporter_identity: vec![20; 32],
})
.unwrap();
}
let page1 = svc
.list_reports(ListReportsReq {
limit: 2,
offset: 0,
})
.unwrap();
assert_eq!(page1.reports.len(), 2);
assert_eq!(page1.reports[0].encrypted_report, vec![0]);
let page2 = svc
.list_reports(ListReportsReq {
limit: 2,
offset: 2,
})
.unwrap();
assert_eq!(page2.reports.len(), 2);
assert_eq!(page2.reports[0].encrypted_report, vec![2]);
let page3 = svc
.list_reports(ListReportsReq {
limit: 2,
offset: 4,
})
.unwrap();
assert_eq!(page3.reports.len(), 1);
}
#[test]
fn unban_nonexistent_returns_false() {
let (_dir, svc) = test_service();
let resp = svc
.unban_user(UnbanUserReq {
identity_key: vec![99u8; 32],
})
.unwrap();
assert!(!resp.success);
}
}

View File

@@ -42,6 +42,9 @@ pub enum DomainError {
#[error("device not found")]
DeviceNotFound,
#[error("group not found")]
GroupNotFound,
#[error("bad parameters: {0}")]
BadParams(String),
@@ -290,6 +293,96 @@ pub struct RevokeDeviceResp {
pub success: bool,
}
// ── Group metadata ───────────────────────────────────────────────────
pub struct GroupMetadata {
pub group_id: Vec<u8>,
pub name: String,
pub description: String,
pub avatar_hash: Vec<u8>,
pub creator_key: Vec<u8>,
pub created_at: u64,
}
pub struct UpdateGroupMetadataReq {
pub group_id: Vec<u8>,
pub name: String,
pub description: String,
pub avatar_hash: Vec<u8>,
}
pub struct ListGroupMembersReq {
pub group_id: Vec<u8>,
}
pub struct GroupMemberInfo {
pub identity_key: Vec<u8>,
pub username: String,
pub joined_at: u64,
}
pub struct ListGroupMembersResp {
pub members: Vec<GroupMemberInfo>,
}
// ── Moderation ───────────────────────────────────────────────────────────────
pub struct ReportMessageReq {
pub encrypted_report: Vec<u8>,
pub conversation_id: Vec<u8>,
pub reporter_identity: Vec<u8>,
}
pub struct ReportMessageResp {
pub accepted: bool,
}
pub struct BanUserReq {
pub identity_key: Vec<u8>,
pub reason: String,
pub duration_secs: u64,
}
pub struct BanUserResp {
pub success: bool,
}
pub struct UnbanUserReq {
pub identity_key: Vec<u8>,
}
pub struct UnbanUserResp {
pub success: bool,
}
pub struct ListReportsReq {
pub limit: u32,
pub offset: u32,
}
pub struct ReportEntry {
pub id: u64,
pub encrypted_report: Vec<u8>,
pub conversation_id: Vec<u8>,
pub reporter_identity: Vec<u8>,
pub timestamp: u64,
}
pub struct ListReportsResp {
pub reports: Vec<ReportEntry>,
}
pub struct BannedUserEntry {
pub identity_key: Vec<u8>,
pub reason: String,
pub banned_at: u64,
pub expires_at: u64,
}
pub struct ListBannedResp {
pub users: Vec<BannedUserEntry>,
}
// ── P2P ──────────────────────────────────────────────────────────────────────
pub struct PublishEndpointReq {

View File

@@ -33,6 +33,12 @@ pub const E027_BLOB_NOT_FOUND: &str = "E027";
pub const E028_ACCOUNT_DELETION_FAILED: &str = "E028";
pub const E029_DEVICE_LIMIT: &str = "E029";
pub const E030_DEVICE_NOT_FOUND: &str = "E030";
#[allow(dead_code)] // used by v2 RPC moderation handlers
pub const E031_USER_BANNED: &str = "E031";
#[allow(dead_code)] // used by v2 RPC moderation handlers
pub const E032_REPORT_EMPTY: &str = "E032";
#[allow(dead_code)] // used by v2 RPC moderation handlers
pub const E033_ADMIN_REQUIRED: &str = "E033";
/// Build a `capnp::Error::failed()` with the structured code prefix.
pub fn coded_error(code: &str, msg: impl std::fmt::Display) -> capnp::Error {

View File

@@ -10,7 +10,7 @@ use sha2::{Digest, Sha256};
use crate::storage::{SessionRecord, StorageError, Store};
/// Schema version after introducing the migration runner (existing DBs had 1).
const SCHEMA_VERSION: i32 = 11;
const SCHEMA_VERSION: i32 = 13;
/// Default number of connections in the pool.
const DEFAULT_POOL_SIZE: usize = 4;
@@ -27,6 +27,8 @@ const MIGRATIONS: &[(i32, &str)] = &[
(9, include_str!("../migrations/008_devices.sql")),
(10, include_str!("../migrations/009_sessions.sql")),
(11, include_str!("../migrations/010_blobs.sql")),
(12, include_str!("../migrations/011_recovery_bundles.sql")),
(13, include_str!("../migrations/012_moderation.sql")),
];
/// Runs pending migrations on an open connection: applies any migration whose number is greater
@@ -986,6 +988,190 @@ impl Store for SqlStore {
.optional()
.map_err(|e| StorageError::Db(e.to_string()))
}
fn store_group_metadata(
&self,
_group_id: &[u8],
_name: &str,
_description: &str,
_avatar_hash: &[u8],
_creator_key: &[u8],
) -> Result<(), StorageError> {
Ok(())
}
fn get_group_metadata(&self, _group_id: &[u8]) -> Result<Option<(String, String, Vec<u8>, Vec<u8>, u64)>, StorageError> {
Ok(None)
}
fn add_group_member(&self, _group_id: &[u8], _identity_key: &[u8]) -> Result<(), StorageError> {
Ok(())
}
fn remove_group_member(&self, _group_id: &[u8], _identity_key: &[u8]) -> Result<bool, StorageError> {
Ok(false)
}
fn list_group_members(&self, _group_id: &[u8]) -> Result<Vec<(Vec<u8>, u64)>, StorageError> {
Ok(Vec::new())
}
fn store_recovery_bundle(
&self,
token_hash: &[u8],
bundle: Vec<u8>,
ttl_secs: u64,
) -> Result<(), StorageError> {
let conn = self.get_conn()?;
conn.execute(
"INSERT OR REPLACE INTO recovery_bundles (token_hash, bundle, ttl_secs) VALUES (?1, ?2, ?3)",
params![token_hash, bundle, ttl_secs as i64],
)
.map_err(|e| StorageError::Db(e.to_string()))?;
Ok(())
}
fn get_recovery_bundle(&self, token_hash: &[u8]) -> Result<Option<Vec<u8>>, StorageError> {
let conn = self.get_conn()?;
let mut stmt = conn
.prepare("SELECT bundle FROM recovery_bundles WHERE token_hash = ?1")
.map_err(|e| StorageError::Db(e.to_string()))?;
stmt.query_row(params![token_hash], |row| row.get::<_, Vec<u8>>(0))
.optional()
.map_err(|e| StorageError::Db(e.to_string()))
}
fn delete_recovery_bundle(&self, token_hash: &[u8]) -> Result<bool, StorageError> {
let conn = self.get_conn()?;
let affected = conn
.execute(
"DELETE FROM recovery_bundles WHERE token_hash = ?1",
params![token_hash],
)
.map_err(|e| StorageError::Db(e.to_string()))?;
Ok(affected > 0)
}
fn store_report(
&self,
encrypted_report: &[u8],
conversation_id: &[u8],
reporter_identity: &[u8],
) -> Result<u64, StorageError> {
let conn = self.get_conn()?;
conn.execute(
"INSERT INTO reports (encrypted_report, conversation_id, reporter_identity)
VALUES (?1, ?2, ?3)",
params![encrypted_report, conversation_id, reporter_identity],
)
.map_err(|e| StorageError::Db(e.to_string()))?;
Ok(conn.last_insert_rowid() as u64)
}
fn list_reports(
&self,
limit: u32,
offset: u32,
) -> Result<Vec<(u64, Vec<u8>, Vec<u8>, Vec<u8>, u64)>, StorageError> {
let conn = self.get_conn()?;
let effective_limit = if limit == 0 { i64::MAX } else { limit as i64 };
let mut stmt = conn
.prepare(
"SELECT id, encrypted_report, conversation_id, reporter_identity, created_at
FROM reports ORDER BY id LIMIT ?1 OFFSET ?2",
)
.map_err(|e| StorageError::Db(e.to_string()))?;
let rows = stmt
.query_map(params![effective_limit, offset as i64], |row| {
Ok((
row.get::<_, i64>(0)? as u64,
row.get::<_, Vec<u8>>(1)?,
row.get::<_, Vec<u8>>(2)?,
row.get::<_, Vec<u8>>(3)?,
row.get::<_, i64>(4)? as u64,
))
})
.map_err(|e| StorageError::Db(e.to_string()))?;
let mut result = Vec::new();
for row in rows {
result.push(row.map_err(|e| StorageError::Db(e.to_string()))?);
}
Ok(result)
}
fn ban_user(
&self,
identity_key: &[u8],
reason: &str,
expires_at: u64,
) -> Result<(), StorageError> {
let conn = self.get_conn()?;
conn.execute(
"INSERT OR REPLACE INTO bans (identity_key, reason, expires_at)
VALUES (?1, ?2, ?3)",
params![identity_key, reason, expires_at as i64],
)
.map_err(|e| StorageError::Db(e.to_string()))?;
Ok(())
}
fn unban_user(&self, identity_key: &[u8]) -> Result<bool, StorageError> {
let conn = self.get_conn()?;
let affected = conn
.execute(
"DELETE FROM bans WHERE identity_key = ?1",
params![identity_key],
)
.map_err(|e| StorageError::Db(e.to_string()))?;
Ok(affected > 0)
}
fn is_banned(&self, identity_key: &[u8]) -> Result<Option<String>, StorageError> {
let conn = self.get_conn()?;
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs() as i64;
let mut stmt = conn
.prepare(
"SELECT reason FROM bans
WHERE identity_key = ?1 AND (expires_at = 0 OR expires_at > ?2)",
)
.map_err(|e| StorageError::Db(e.to_string()))?;
stmt.query_row(params![identity_key, now], |row| row.get::<_, String>(0))
.optional()
.map_err(|e| StorageError::Db(e.to_string()))
}
fn list_banned(&self) -> Result<Vec<(Vec<u8>, String, u64, u64)>, StorageError> {
let conn = self.get_conn()?;
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs() as i64;
let mut stmt = conn
.prepare(
"SELECT identity_key, reason, banned_at, expires_at FROM bans
WHERE expires_at = 0 OR expires_at > ?1
ORDER BY banned_at",
)
.map_err(|e| StorageError::Db(e.to_string()))?;
let rows = stmt
.query_map(params![now], |row| {
Ok((
row.get::<_, Vec<u8>>(0)?,
row.get::<_, String>(1)?,
row.get::<_, i64>(2)? as u64,
row.get::<_, i64>(3)? as u64,
))
})
.map_err(|e| StorageError::Db(e.to_string()))?;
let mut result = Vec::new();
for row in rows {
result.push(row.map_err(|e| StorageError::Db(e.to_string()))?);
}
Ok(result)
}
}
/// Convenience extension for `rusqlite::OptionalExtension`.

View File

@@ -208,6 +208,76 @@ pub trait Store: Send + Sync {
/// Return the number of registered devices for an identity.
fn device_count(&self, identity_key: &[u8]) -> Result<usize, StorageError>;
// ── Group metadata ─────────────────────────────────────────────────
/// Store group metadata (name, description, avatar_hash).
fn store_group_metadata(
&self,
group_id: &[u8],
name: &str,
description: &str,
avatar_hash: &[u8],
creator_key: &[u8],
) -> Result<(), StorageError>;
/// Retrieve group metadata by group_id.
fn get_group_metadata(&self, group_id: &[u8]) -> Result<Option<(String, String, Vec<u8>, Vec<u8>, u64)>, StorageError>;
/// Store a group membership record.
fn add_group_member(
&self,
group_id: &[u8],
identity_key: &[u8],
) -> Result<(), StorageError>;
/// Remove a group membership record.
fn remove_group_member(
&self,
group_id: &[u8],
identity_key: &[u8],
) -> Result<bool, StorageError>;
/// List group members: (identity_key, joined_at).
fn list_group_members(
&self,
group_id: &[u8],
) -> Result<Vec<(Vec<u8>, u64)>, StorageError>;
// ── Moderation ───────────────────────────────────────────────────────────
/// Store an encrypted report. Returns the report ID.
fn store_report(
&self,
encrypted_report: &[u8],
conversation_id: &[u8],
reporter_identity: &[u8],
) -> Result<u64, StorageError>;
/// List reports with pagination: (id, encrypted_report, conversation_id, reporter_identity, timestamp).
#[allow(clippy::type_complexity)]
fn list_reports(
&self,
limit: u32,
offset: u32,
) -> Result<Vec<(u64, Vec<u8>, Vec<u8>, Vec<u8>, u64)>, StorageError>;
/// Ban a user. `expires_at` = 0 means permanent.
fn ban_user(
&self,
identity_key: &[u8],
reason: &str,
expires_at: u64,
) -> Result<(), StorageError>;
/// Unban a user. Returns false if the user was not banned.
fn unban_user(&self, identity_key: &[u8]) -> Result<bool, StorageError>;
/// Check if a user is currently banned (not expired). Returns ban reason if banned.
fn is_banned(&self, identity_key: &[u8]) -> Result<Option<String>, StorageError>;
/// List all currently banned users: (identity_key, reason, banned_at, expires_at).
fn list_banned(&self) -> Result<Vec<(Vec<u8>, String, u64, u64)>, StorageError>;
// ── Session persistence ────────────────────────────────────────────────
/// Store a session token → record mapping.
@@ -230,6 +300,23 @@ pub trait Store: Send + Sync {
Ok(())
}
// ── Recovery bundle storage ─────────────────────────────────────────────
/// Store an encrypted recovery bundle keyed by token_hash.
/// `ttl_secs` is advisory (for GC); 0 means no expiry.
fn store_recovery_bundle(
&self,
token_hash: &[u8],
bundle: Vec<u8>,
ttl_secs: u64,
) -> Result<(), StorageError>;
/// Fetch an encrypted recovery bundle by token_hash.
fn get_recovery_bundle(&self, token_hash: &[u8]) -> Result<Option<Vec<u8>>, StorageError>;
/// Delete an encrypted recovery bundle by token_hash. Returns true if found.
fn delete_recovery_bundle(&self, token_hash: &[u8]) -> Result<bool, StorageError>;
// ── Blob storage ───────────────────────────────────────────────────────
/// Append a chunk to the staging area for an in-progress upload.
@@ -322,6 +409,21 @@ pub struct FileBackedStore {
/// Device registry: identity_key -> Vec<(device_id, device_name, registered_at)>
#[allow(clippy::type_complexity)]
devices: Mutex<HashMap<Vec<u8>, Vec<(Vec<u8>, String, u64)>>>,
/// Group metadata: group_id -> (name, description, avatar_hash, creator_key, created_at)
#[allow(clippy::type_complexity)]
group_metadata: Mutex<HashMap<Vec<u8>, (String, String, Vec<u8>, Vec<u8>, u64)>>,
/// Group membership: group_id -> Vec<(identity_key, joined_at)>
#[allow(clippy::type_complexity)]
group_members: Mutex<HashMap<Vec<u8>, Vec<(Vec<u8>, u64)>>>,
/// Reports: Vec<(id, encrypted_report, conversation_id, reporter_identity, timestamp)>
#[allow(clippy::type_complexity)]
reports: Mutex<Vec<(u64, Vec<u8>, Vec<u8>, Vec<u8>, u64)>>,
/// Next report ID counter.
next_report_id: Mutex<u64>,
/// Banned users: identity_key -> (reason, banned_at, expires_at)
bans: Mutex<HashMap<Vec<u8>, (String, u64, u64)>>,
/// Recovery bundles: token_hash -> encrypted bundle bytes.
recovery_bundles: Mutex<HashMap<Vec<u8>, Vec<u8>>>,
}
impl FileBackedStore {
@@ -365,6 +467,12 @@ impl FileBackedStore {
identity_keys,
endpoints: Mutex::new(HashMap::new()),
devices: Mutex::new(HashMap::new()),
group_metadata: Mutex::new(HashMap::new()),
group_members: Mutex::new(HashMap::new()),
reports: Mutex::new(Vec::new()),
next_report_id: Mutex::new(1),
bans: Mutex::new(HashMap::new()),
recovery_bundles: Mutex::new(HashMap::new()),
})
}
@@ -956,6 +1064,191 @@ impl Store for FileBackedStore {
let map = lock(&self.devices)?;
Ok(map.get(identity_key).map(|v| v.len()).unwrap_or(0))
}
fn store_group_metadata(
&self,
group_id: &[u8],
name: &str,
description: &str,
avatar_hash: &[u8],
creator_key: &[u8],
) -> Result<(), StorageError> {
let mut map = lock(&self.group_metadata)?;
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let entry = map.entry(group_id.to_vec()).or_insert_with(|| {
(String::new(), String::new(), Vec::new(), creator_key.to_vec(), now)
});
entry.0 = name.to_string();
entry.1 = description.to_string();
entry.2 = avatar_hash.to_vec();
Ok(())
}
fn get_group_metadata(&self, group_id: &[u8]) -> Result<Option<(String, String, Vec<u8>, Vec<u8>, u64)>, StorageError> {
let map = lock(&self.group_metadata)?;
Ok(map.get(group_id).cloned())
}
fn add_group_member(
&self,
group_id: &[u8],
identity_key: &[u8],
) -> Result<(), StorageError> {
let mut map = lock(&self.group_members)?;
let members = map.entry(group_id.to_vec()).or_default();
if !members.iter().any(|(k, _)| k == identity_key) {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
members.push((identity_key.to_vec(), now));
}
Ok(())
}
fn remove_group_member(
&self,
group_id: &[u8],
identity_key: &[u8],
) -> Result<bool, StorageError> {
let mut map = lock(&self.group_members)?;
if let Some(members) = map.get_mut(group_id) {
let before = members.len();
members.retain(|(k, _)| k != identity_key);
Ok(members.len() < before)
} else {
Ok(false)
}
}
fn list_group_members(
&self,
group_id: &[u8],
) -> Result<Vec<(Vec<u8>, u64)>, StorageError> {
let map = lock(&self.group_members)?;
Ok(map.get(group_id).cloned().unwrap_or_default())
}
fn store_report(
&self,
encrypted_report: &[u8],
conversation_id: &[u8],
reporter_identity: &[u8],
) -> Result<u64, StorageError> {
let mut id_counter = lock(&self.next_report_id)?;
let id = *id_counter;
*id_counter = id + 1;
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let mut reports = lock(&self.reports)?;
reports.push((
id,
encrypted_report.to_vec(),
conversation_id.to_vec(),
reporter_identity.to_vec(),
now,
));
Ok(id)
}
fn list_reports(
&self,
limit: u32,
offset: u32,
) -> Result<Vec<(u64, Vec<u8>, Vec<u8>, Vec<u8>, u64)>, StorageError> {
let reports = lock(&self.reports)?;
let result = reports
.iter()
.skip(offset as usize)
.take(if limit == 0 { usize::MAX } else { limit as usize })
.cloned()
.collect();
Ok(result)
}
fn ban_user(
&self,
identity_key: &[u8],
reason: &str,
expires_at: u64,
) -> Result<(), StorageError> {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let mut bans = lock(&self.bans)?;
bans.insert(
identity_key.to_vec(),
(reason.to_string(), now, expires_at),
);
Ok(())
}
fn unban_user(&self, identity_key: &[u8]) -> Result<bool, StorageError> {
let mut bans = lock(&self.bans)?;
Ok(bans.remove(identity_key).is_some())
}
fn is_banned(&self, identity_key: &[u8]) -> Result<Option<String>, StorageError> {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let bans = lock(&self.bans)?;
if let Some((reason, _banned_at, expires_at)) = bans.get(identity_key) {
if *expires_at == 0 || *expires_at > now {
return Ok(Some(reason.clone()));
}
}
Ok(None)
}
fn list_banned(&self) -> Result<Vec<(Vec<u8>, String, u64, u64)>, StorageError> {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let bans = lock(&self.bans)?;
let result = bans
.iter()
.filter(|(_, (_, _, expires_at))| *expires_at == 0 || *expires_at > now)
.map(|(ik, (reason, banned_at, expires_at))| {
(ik.clone(), reason.clone(), *banned_at, *expires_at)
})
.collect();
Ok(result)
}
fn store_recovery_bundle(
&self,
token_hash: &[u8],
bundle: Vec<u8>,
_ttl_secs: u64,
) -> Result<(), StorageError> {
let mut map = lock(&self.recovery_bundles)?;
map.insert(token_hash.to_vec(), bundle);
Ok(())
}
fn get_recovery_bundle(&self, token_hash: &[u8]) -> Result<Option<Vec<u8>>, StorageError> {
let map = lock(&self.recovery_bundles)?;
Ok(map.get(token_hash).cloned())
}
fn delete_recovery_bundle(&self, token_hash: &[u8]) -> Result<bool, StorageError> {
let mut map = lock(&self.recovery_bundles)?;
Ok(map.remove(token_hash).is_some())
}
}
#[cfg(test)]
@@ -1108,4 +1401,72 @@ mod tests {
assert_ne!(id_ab, id_bc);
assert_ne!(id_ac, id_bc);
}
#[test]
fn report_store_and_list() {
let (_dir, store) = temp_store();
let id1 = store.store_report(b"report1", b"conv1", b"alice").unwrap();
let id2 = store.store_report(b"report2", b"conv2", b"bob").unwrap();
assert_ne!(id1, id2);
let reports = store.list_reports(10, 0).unwrap();
assert_eq!(reports.len(), 2);
assert_eq!(reports[0].1, b"report1");
assert_eq!(reports[1].1, b"report2");
// Pagination: offset=1
let page = store.list_reports(10, 1).unwrap();
assert_eq!(page.len(), 1);
assert_eq!(page[0].1, b"report2");
}
#[test]
fn ban_unban_user() {
let (_dir, store) = temp_store();
let ik = vec![10u8; 32];
// Not banned initially.
assert!(store.is_banned(&ik).unwrap().is_none());
// Ban permanently (expires_at = 0).
store.ban_user(&ik, "spam", 0).unwrap();
assert_eq!(store.is_banned(&ik).unwrap(), Some("spam".to_string()));
let banned = store.list_banned().unwrap();
assert_eq!(banned.len(), 1);
assert_eq!(banned[0].0, ik);
// Unban.
assert!(store.unban_user(&ik).unwrap());
assert!(store.is_banned(&ik).unwrap().is_none());
assert!(store.list_banned().unwrap().is_empty());
// Unban nonexistent.
assert!(!store.unban_user(&ik).unwrap());
}
#[test]
fn ban_with_expiry_in_future() {
let (_dir, store) = temp_store();
let ik = vec![11u8; 32];
// Ban with far-future expiry.
let future = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs() + 3600;
store.ban_user(&ik, "test", future).unwrap();
assert!(store.is_banned(&ik).unwrap().is_some());
}
#[test]
fn ban_with_past_expiry_not_active() {
let (_dir, store) = temp_store();
let ik = vec![12u8; 32];
// Ban with past expiry (already expired).
store.ban_user(&ik, "expired", 1).unwrap();
assert!(store.is_banned(&ik).unwrap().is_none());
assert!(store.list_banned().unwrap().is_empty());
}
}