feat: add 11 features and bug fixes across server, SDK, and client

Server fixes:
- Wire v2 moderation handlers to ModerationService (SQL persistence) —
  bans now survive restarts instead of living in-memory DashMap
- Add admin role enforcement via QPC_ADMIN_KEYS env var for ban/unban
- Fix audit.rs now_iso8601() to emit actual ISO-8601 timestamps
- Add group admin authorization — only creator can remove members or
  update metadata

Server features:
- Add DeleteBlob RPC (method 602) with filesystem cleanup
- Register delete_blob in v2 handler method registry

SDK features:
- Add ClientEvent::IdentityKeyChanged for safety number change alerts
- Add ClientEvent::ReadReceipt and DeliveryConfirmation variants
- Add peer_identity_keys table with store/get methods for key tracking
- Add search_messages() full-text search across all conversations
- Add delete_conversation() with cascading message/outbox cleanup

Client features:
- Wire v2 TUI message sending to SDK MLS encryption pipeline
- Add /search command to v2 REPL with cross-conversation results
- Add /delete-conversation command to v2 REPL
- Add unread count badges in v1 TUI sidebar (yellow+bold styling)
This commit is contained in:
2026-04-04 23:31:37 +02:00
parent 4dadd01c6b
commit f58ce2529d
14 changed files with 662 additions and 127 deletions

View File

@@ -83,6 +83,8 @@ struct App {
channel_names: Vec<String>,
/// Conversation IDs, parallel to `channel_names`.
channel_ids: Vec<ConversationId>,
/// Unread message counts, parallel to `channel_names`.
unread_counts: Vec<u32>,
/// Index of the selected channel in the sidebar.
selected_channel: usize,
/// Messages for the currently active channel.
@@ -102,10 +104,12 @@ impl App {
let convs = session.conv_store.list_conversations()?;
let channel_names: Vec<String> = convs.iter().map(|c| c.display_name.clone()).collect();
let channel_ids: Vec<ConversationId> = convs.iter().map(|c| c.id.clone()).collect();
let unread_counts: Vec<u32> = convs.iter().map(|c| c.unread_count).collect();
Ok(Self {
channel_names,
channel_ids,
unread_counts,
selected_channel: 0,
messages: Vec::new(),
input: String::new(),
@@ -232,14 +236,27 @@ fn draw_sidebar(frame: &mut Frame, app: &App, area: Rect) {
.iter()
.enumerate()
.map(|(i, name)| {
let style = if i == app.selected_channel {
let unread = app.unread_counts.get(i).copied().unwrap_or(0);
let is_selected = i == app.selected_channel;
let label = if unread > 0 && !is_selected {
format!("{name} ({unread})")
} else {
name.clone()
};
let style = if is_selected {
Style::default()
.fg(Color::Cyan)
.add_modifier(Modifier::BOLD | Modifier::REVERSED)
} else if unread > 0 {
Style::default()
.fg(Color::Yellow)
.add_modifier(Modifier::BOLD)
} else {
Style::default().fg(Color::Cyan)
};
ListItem::new(Line::from(Span::styled(name.clone(), style)))
ListItem::new(Line::from(Span::styled(label, style)))
})
.collect();

View File

@@ -100,6 +100,8 @@ const COMMANDS: &[CmdDef] = &[
CmdDef { name: "/help", aliases: &["/?"], category: Category::Utility, description: "Show this help message", usage: "/help" },
CmdDef { name: "/quit", aliases: &["/q", "/exit"], category: Category::Utility, description: "Exit the REPL", usage: "/quit" },
CmdDef { name: "/clear", aliases: &[], category: Category::Utility, description: "Clear the terminal", usage: "/clear" },
CmdDef { name: "/search", aliases: &[], category: Category::Messaging, description: "Search messages across all conversations", usage: "/search <query>" },
CmdDef { name: "/delete-conversation", aliases: &["/delconv"], category: Category::Messaging, description: "Delete a conversation and its messages", usage: "/delete-conversation [name]" },
CmdDef { name: "/health", aliases: &[], category: Category::Debug, description: "Check server connection health", usage: "/health" },
CmdDef { name: "/status", aliases: &[], category: Category::Debug, description: "Show connection and auth state", usage: "/status" },
];
@@ -397,6 +399,8 @@ async fn dispatch(
"/switch" | "/sw" => do_switch(client, st, args)?,
"/group" | "/g" => do_group(client, st, args).await?,
"/devices" => do_devices(client, args).await?,
"/search" => do_search(client, args)?,
"/delete-conversation" | "/delconv" => do_delete_conversation(client, st, args)?,
_ => display::print_error(&format!("unknown command: {cmd} (try /help)")),
}
Ok(false)
@@ -983,6 +987,81 @@ async fn do_devices(client: &mut QpqClient, args: &str) -> anyhow::Result<()> {
Ok(())
}
// ── Search ──────────────────────────────────────────────────────────────────
fn do_search(client: &QpqClient, args: &str) -> anyhow::Result<()> {
let query = args.trim();
if query.is_empty() {
display::print_error("usage: /search <query>");
return Ok(());
}
let results = client.conversations().map_err(|e| anyhow::anyhow!("{e}"))?.search_messages(query, 25)?;
if results.is_empty() {
display::print_status(&format!("no messages matching \"{query}\""));
return Ok(());
}
println!("\n{BOLD}Search results for \"{query}\"{RESET} ({} matches)\n", results.len());
for r in &results {
let ts = format_timestamp_ms(r.timestamp_ms);
let sender = r.sender_name.as_deref().unwrap_or("?");
println!(
" {DIM}[{ts}]{RESET} {CYAN}{}{RESET} > {GREEN}{sender}{RESET}: {}",
r.conversation_name,
r.body,
);
}
println!();
Ok(())
}
fn format_timestamp_ms(ms: u64) -> String {
let secs = ms / 1000;
let hours = (secs % 86400) / 3600;
let minutes = (secs % 3600) / 60;
format!("{hours:02}:{minutes:02}")
}
// ── Delete conversation ─────────────────────────────────────────────────────
fn do_delete_conversation(
client: &QpqClient,
st: &mut ReplState,
args: &str,
) -> anyhow::Result<()> {
let name = args.trim();
// Find by name, or use current conversation.
let target = if name.is_empty() {
st.current_conversation.clone()
} else {
let convs = client.conversations().map_err(|e| anyhow::anyhow!("{e}"))?.list_conversations()?;
convs
.iter()
.find(|c| c.display_name.eq_ignore_ascii_case(name))
.map(|c| c.id.clone())
};
let Some(conv_id) = target else {
display::print_error("no matching conversation (specify name or switch first)");
return Ok(());
};
let deleted = client.conversations().map_err(|e| anyhow::anyhow!("{e}"))?.delete_conversation(&conv_id)?;
if deleted {
// If we deleted the active conversation, clear it.
if st.current_conversation.as_ref() == Some(&conv_id) {
st.current_conversation = None;
st.current_display_name = None;
}
display::print_status("conversation deleted");
} else {
display::print_error("conversation not found");
}
Ok(())
}
// ── Entry point ─────────────────────────────────────────────────────────────
/// Run the v2 REPL over a `QpqClient`.

View File

@@ -21,8 +21,7 @@
//!
//! Feature gate: requires both `v2` and `tui` features.
//!
//! **Note:** Message display is currently local-only. Use the REPL client for
//! end-to-end encrypted delivery. See `quicprochat-sdk::messaging` for the full pipeline.
//! Messages are sent via the SDK's MLS encryption pipeline (sealed sender + hybrid wrap).
use std::time::Duration;
@@ -41,8 +40,11 @@ use ratatui::{
};
use tokio::sync::broadcast;
use std::sync::Arc;
use quicprochat_core::IdentityKeypair;
use quicprochat_sdk::client::{ConnectionState, QpqClient};
use quicprochat_sdk::conversation::ConversationStore;
use quicprochat_sdk::conversation::{ConversationId, ConversationStore, StoredMessage};
use quicprochat_sdk::events::ClientEvent;
// ── Data Types ──────────────────────────────────────────────────────────────
@@ -91,6 +93,8 @@ pub struct TuiApp {
conn_state: quicprochat_sdk::client::ConnectionState,
/// Current MLS epoch for the active conversation (if available).
mls_epoch: Option<u64>,
/// Identity keypair for MLS operations (set after login).
identity: Option<Arc<IdentityKeypair>>,
}
impl TuiApp {
@@ -110,6 +114,7 @@ impl TuiApp {
notification: None,
conn_state: ConnectionState::Disconnected,
mls_epoch: None,
identity: None,
}
}
@@ -573,14 +578,83 @@ async fn handle_input(app: &mut TuiApp, client: &mut QpqClient, text: &str) {
// Snap to bottom.
app.scroll_offset = 0;
// NOTE: TUI message display is local-only. The full MLS encryption
// pipeline (sealed sender + hybrid wrap + enqueue) is implemented in
// quicprochat-sdk/src/messaging.rs but is not yet wired into the TUI.
// Use the REPL client (`qpc repl`) for end-to-end message delivery.
app.notification = Some("Message queued locally (TUI send not yet wired to SDK)".to_string());
// Send via MLS encryption pipeline.
let conv_id_bytes = *app.active_conv_id().unwrap();
let conv_id = ConversationId(conv_id_bytes);
let send_result = send_tui_message(client, app, &conv_id, text).await;
match send_result {
Ok(()) => {
app.notification = Some("Sent".to_string());
}
Err(e) => {
app.notification = Some(format!("Send failed: {e}"));
}
}
}
}
/// Send a message via the SDK's MLS encryption pipeline.
async fn send_tui_message(
client: &QpqClient,
app: &TuiApp,
conv_id: &ConversationId,
text: &str,
) -> anyhow::Result<()> {
let identity = app
.identity
.as_ref()
.ok_or_else(|| anyhow::anyhow!("not logged in — identity not loaded"))?;
let rpc = client.rpc().map_err(|e| anyhow::anyhow!("{e}"))?;
let conv_store = client.conversations().map_err(|e| anyhow::anyhow!("{e}"))?;
let conv = conv_store
.load_conversation(conv_id)?
.ok_or_else(|| anyhow::anyhow!("conversation not found"))?;
let mut member = quicprochat_sdk::groups::restore_mls_state(&conv, identity)?;
let my_pub = identity.public_key_bytes();
let recipients: Vec<Vec<u8>> = conv
.member_keys
.iter()
.filter(|k| k.as_slice() != my_pub.as_slice())
.cloned()
.collect();
if recipients.is_empty() {
return Err(anyhow::anyhow!("no recipients in conversation"));
}
let hybrid_keys = vec![None; recipients.len()];
quicprochat_sdk::messaging::send_message(
rpc,
&mut member,
identity,
text,
&recipients,
&hybrid_keys,
conv_id.0.as_slice(),
)
.await?;
quicprochat_sdk::groups::save_mls_state(conv_store, conv_id, &member)?;
let now = quicprochat_sdk::conversation::now_ms();
conv_store.save_message(&StoredMessage {
conversation_id: conv_id.clone(),
message_id: None,
sender_key: my_pub.to_vec(),
sender_name: client.username().map(|s| s.to_string()),
body: text.to_string(),
msg_type: "chat".to_string(),
ref_msg_id: None,
timestamp_ms: now,
is_outgoing: true,
})?;
Ok(())
}
/// Handle a /command.
async fn handle_command(app: &mut TuiApp, client: &mut QpqClient, cmd: &str) {
let parts: Vec<&str> = cmd.splitn(3, ' ').collect();

View File

@@ -112,9 +112,10 @@ pub mod method_ids {
pub const CHECK_REVOCATION: u16 = 511;
pub const AUDIT_KEY_TRANSPARENCY: u16 = 520;
// Blob (600-601)
// Blob (600-602)
pub const UPLOAD_BLOB: u16 = 600;
pub const DOWNLOAD_BLOB: u16 = 601;
pub const DELETE_BLOB: u16 = 602;
// Device (700-702, 710)
pub const REGISTER_DEVICE: u16 = 700;

View File

@@ -185,6 +185,13 @@ impl ConversationStore {
identity_key BLOB PRIMARY KEY,
blocked_at_ms INTEGER NOT NULL,
reason TEXT NOT NULL DEFAULT ''
);
CREATE TABLE IF NOT EXISTS peer_identity_keys (
username TEXT PRIMARY KEY,
identity_key BLOB NOT NULL,
first_seen_ms INTEGER NOT NULL,
last_seen_ms INTEGER NOT NULL
);",
)
.context("migrate conversation db")
@@ -524,6 +531,112 @@ impl ConversationStore {
msgs.reverse();
Ok(msgs)
}
// ── Peer identity key tracking ──────────────────────────────────────────
/// Look up the stored identity key for a peer by username.
pub fn get_peer_identity_key(&self, username: &str) -> anyhow::Result<Option<Vec<u8>>> {
let key: Option<Vec<u8>> = self
.conn
.query_row(
"SELECT identity_key FROM peer_identity_keys WHERE username = ?1",
params![username],
|row| row.get(0),
)
.optional()?;
Ok(key)
}
/// Store (or update) a peer's identity key. Returns the previous key if it changed.
pub fn store_peer_identity_key(
&self,
username: &str,
identity_key: &[u8],
) -> anyhow::Result<Option<Vec<u8>>> {
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as i64;
let old = self.get_peer_identity_key(username)?;
self.conn.execute(
"INSERT INTO peer_identity_keys (username, identity_key, first_seen_ms, last_seen_ms)
VALUES (?1, ?2, ?3, ?3)
ON CONFLICT(username) DO UPDATE SET identity_key = ?2, last_seen_ms = ?3",
params![username, identity_key, now_ms],
)?;
// Return the old key only if it's different from the new one.
match old {
Some(ref prev) if prev != identity_key => Ok(old),
_ => Ok(None),
}
}
// ── Full-text search ────────────────────────────────────────────────────
/// Search messages across all conversations by body text.
pub fn search_messages(
&self,
query: &str,
limit: usize,
) -> anyhow::Result<Vec<SearchResult>> {
let pattern = format!("%{query}%");
let mut stmt = self.conn.prepare(
"SELECT m.conversation_id, c.display_name, m.sender_name, m.body,
m.timestamp_ms, m.message_id
FROM messages m
JOIN conversations c ON c.id = m.conversation_id
WHERE m.body LIKE ?1
ORDER BY m.timestamp_ms DESC
LIMIT ?2",
)?;
let rows = stmt.query_map(
params![pattern, limit.min(u32::MAX as usize) as u32],
|row| {
let conv_id_raw: Vec<u8> = row.get(0)?;
let mut conv_id = [0u8; 16];
if conv_id_raw.len() == 16 {
conv_id.copy_from_slice(&conv_id_raw);
}
Ok(SearchResult {
conversation_id: ConversationId(conv_id),
conversation_name: row.get(1)?,
sender_name: row.get(2)?,
body: row.get(3)?,
timestamp_ms: row.get::<_, i64>(4)? as u64,
message_id: row.get(5)?,
})
},
)?;
rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
}
// ── Conversation deletion ───────────────────────────────────────────────
/// Delete a conversation and all its messages.
pub fn delete_conversation(&self, id: &ConversationId) -> anyhow::Result<bool> {
self.conn
.execute("DELETE FROM messages WHERE conversation_id = ?1", params![id.0.as_slice()])?;
self.conn
.execute("DELETE FROM outbox WHERE conversation_id = ?1", params![id.0.as_slice()])?;
let rows = self
.conn
.execute("DELETE FROM conversations WHERE id = ?1", params![id.0.as_slice()])?;
Ok(rows > 0)
}
}
/// A search result across conversations.
#[derive(Clone, Debug)]
pub struct SearchResult {
pub conversation_id: ConversationId,
pub conversation_name: String,
pub sender_name: Option<String>,
pub body: String,
pub timestamp_ms: u64,
pub message_id: Option<Vec<u8>>,
}
// ── Helpers ──────────────────────────────────────────────────────────────────

View File

@@ -82,6 +82,28 @@ pub enum ClientEvent {
received_seq: u64,
},
/// A peer's identity key changed — possible re-registration, new device,
/// or MITM attack. The UI MUST alert the user (like Signal's "safety number changed").
IdentityKeyChanged {
username: String,
old_fingerprint: String,
new_fingerprint: String,
},
/// A read receipt was received — the reader has read messages up to the given ID.
ReadReceipt {
conversation_id: [u8; 16],
reader: String,
up_to_message_id: Vec<u8>,
timestamp_ms: u64,
},
/// Server confirmed delivery of a message.
DeliveryConfirmation {
conversation_id: [u8; 16],
message_id: Vec<u8>,
},
/// An error occurred in the background.
Error { message: String },
}
@@ -219,11 +241,26 @@ mod tests {
expected_seq: 0,
received_seq: 1,
},
ClientEvent::IdentityKeyChanged {
username: "u".into(),
old_fingerprint: "old".into(),
new_fingerprint: "new".into(),
},
ClientEvent::ReadReceipt {
conversation_id: [0; 16],
reader: "r".into(),
up_to_message_id: vec![],
timestamp_ms: 0,
},
ClientEvent::DeliveryConfirmation {
conversation_id: [0; 16],
message_id: vec![],
},
ClientEvent::Error { message: "e".into() },
];
for event in &events {
let _ = event.clone();
}
assert_eq!(events.len(), 17);
assert_eq!(events.len(), 20);
}
}

View File

@@ -142,15 +142,33 @@ pub fn format_actor(identity_key: &[u8], redact: bool) -> String {
}
}
/// Current ISO-8601 UTC timestamp.
/// Current ISO-8601 UTC timestamp (e.g. `2026-04-04T12:30:45Z`).
pub fn now_iso8601() -> String {
// Use SystemTime to avoid pulling in chrono.
let d = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default();
let secs = d.as_secs();
// Simple UTC formatting: enough for audit logs.
format!("{secs}")
// Manual UTC calendar conversion — avoids pulling in chrono.
let days = secs / 86400;
let time_of_day = secs % 86400;
let hours = time_of_day / 3600;
let minutes = (time_of_day % 3600) / 60;
let seconds = time_of_day % 60;
// Civil date from day count (epoch = 1970-01-01, algorithm from Howard Hinnant).
let z = days as i64 + 719468;
let era = if z >= 0 { z } else { z - 146096 } / 146097;
let doe = (z - era * 146097) as u64; // day of era [0, 146096]
let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146096) / 365;
let y = yoe as i64 + era * 400;
let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
let mp = (5 * doy + 2) / 153;
let d = doy - (153 * mp + 2) / 5 + 1;
let m = if mp < 10 { mp + 3 } else { mp - 9 };
let y = if m <= 2 { y + 1 } else { y };
format!("{y:04}-{m:02}-{d:02}T{hours:02}:{minutes:02}:{seconds:02}Z")
}
#[cfg(test)]

View File

@@ -194,4 +194,27 @@ impl BlobService {
mime_type: meta.mime_type,
})
}
/// Delete a blob and its metadata from disk.
pub fn delete_blob(&self, blob_id: &[u8]) -> Result<bool, DomainError> {
if blob_id.len() != 32 {
return Err(DomainError::BlobHashLength(blob_id.len()));
}
let blob_hex = hex::encode(blob_id);
let dir = self.blobs_dir();
let blob_path = dir.join(&blob_hex);
let meta_path = dir.join(format!("{blob_hex}.meta"));
let part_path = dir.join(format!("{blob_hex}.part"));
if !blob_path.exists() && !part_path.exists() {
return Ok(false);
}
let _ = std::fs::remove_file(&blob_path);
let _ = std::fs::remove_file(&meta_path);
let _ = std::fs::remove_file(&part_path);
Ok(true)
}
}

View File

@@ -34,6 +34,38 @@ mod ws_bridge;
#[cfg(feature = "webtransport")]
mod webtransport;
/// Parse `QPC_ADMIN_KEYS` env var — comma-separated hex-encoded Ed25519 public keys.
/// Returns empty vec if unset (backward-compatible: all users can moderate).
#[cfg(feature = "webtransport")]
fn parse_admin_keys() -> Vec<Vec<u8>> {
let Ok(val) = std::env::var("QPC_ADMIN_KEYS") else {
return Vec::new();
};
val.split(',')
.filter_map(|s| {
let s = s.trim();
if s.is_empty() {
return None;
}
match hex::decode(s) {
Ok(key) if key.len() == 32 => Some(key),
Ok(key) => {
tracing::warn!(
len = key.len(),
hex = s,
"QPC_ADMIN_KEYS: ignoring key with wrong length (expected 32 bytes)"
);
None
}
Err(e) => {
tracing::warn!(hex = s, error = %e, "QPC_ADMIN_KEYS: ignoring invalid hex");
None
}
}
})
.collect()
}
use auth::{AuthConfig, PendingLogin, RateEntry, SessionInfo};
use config::{
load_config, merge_config, validate_production_config, DEFAULT_DATA_DIR, DEFAULT_DB_PATH,
@@ -433,6 +465,7 @@ async fn main() -> anyhow::Result<()> {
storage_backend: effective.store_backend.clone(),
federation_client: None,
local_domain: effective.federation.as_ref().map(|f| f.domain.clone()).unwrap_or_default(),
admin_keys: parse_admin_keys(),
});
let wt_registry = Arc::new(v2_handlers::build_registry(

View File

@@ -99,3 +99,32 @@ pub async fn handle_download_blob(state: Arc<ServerState>, ctx: RequestContext)
Err(e) => domain_err(e),
}
}
pub async fn handle_delete_blob(state: Arc<ServerState>, ctx: RequestContext) -> HandlerResult {
let _identity_key = match require_auth(&state, &ctx) {
Ok(ik) => ik,
Err(e) => return e,
};
let req = match v1::DeleteBlobRequest::decode(ctx.payload) {
Ok(r) => r,
Err(e) => {
return HandlerResult::err(
quicprochat_rpc::error::RpcStatus::BadRequest,
&format!("decode: {e}"),
)
}
};
let svc = BlobService {
data_dir: state.data_dir.clone(),
};
match svc.delete_blob(&req.blob_id) {
Ok(deleted) => {
let proto = v1::DeleteBlobResponse { deleted };
HandlerResult::ok(Bytes::from(proto.encode_to_vec()))
}
Err(e) => domain_err(e),
}
}

View File

@@ -42,9 +42,18 @@ pub async fn handle_remove_member(
store: Arc::clone(&state.store),
};
// Only group creator (admin) can remove members.
if let Ok(Some(meta)) = svc.get_metadata(&req.group_id) {
if !meta.creator_key.is_empty() && meta.creator_key != identity_key {
return HandlerResult::err(
RpcStatus::Forbidden,
"only the group creator can remove members",
);
}
}
match svc.remove_member(&req.group_id, &req.member_identity_key) {
Ok(_) => {
let _ = identity_key; // caller is authorized; removal tracked
let proto = v1::RemoveMemberResponse {
commit: Vec::new(), // commit is generated client-side
};
@@ -73,6 +82,16 @@ pub async fn handle_update_group_metadata(
store: Arc::clone(&state.store),
};
// Only group creator (admin) can update metadata.
if let Ok(Some(meta)) = svc.get_metadata(&req.group_id) {
if !meta.creator_key.is_empty() && meta.creator_key != identity_key {
return HandlerResult::err(
RpcStatus::Forbidden,
"only the group creator can update metadata",
);
}
}
let domain_req = UpdateGroupMetadataReq {
group_id: req.group_id,
name: req.name,

View File

@@ -68,6 +68,8 @@ pub struct ServerState {
pub federation_client: Option<Arc<crate::federation::FederationClient>>,
/// This server's domain for federation addressing. Empty when federation is disabled.
pub local_domain: String,
/// Admin identity keys (from `QPC_ADMIN_USERS` env or config). Empty = allow all (MVP).
pub admin_keys: Vec<Vec<u8>>,
}
/// A ban record for a user.
@@ -316,6 +318,11 @@ pub fn build_registry(default_rpc_timeout: std::time::Duration) -> MethodRegistr
std::time::Duration::from_secs(120),
blob::handle_download_blob,
);
reg.register(
method_ids::DELETE_BLOB,
"DeleteBlob",
blob::handle_delete_blob,
);
// Device (700-702)
reg.register(

View File

@@ -1,4 +1,8 @@
//! Moderation handlers — report, ban, unban, list reports, list banned.
//!
//! All mutations are persisted via `ModerationService` (SQL store).
//! The in-memory `banned_users` DashMap is kept as a hot cache for the
//! auth middleware's fast-path ban check.
use std::sync::Arc;
@@ -9,7 +13,34 @@ use quicprochat_rpc::error::RpcStatus;
use quicprochat_rpc::method::{HandlerResult, RequestContext};
use tracing::{info, warn};
use super::{require_auth, BanRecord, ModerationReport, ServerState};
use crate::domain::moderation::ModerationService;
use crate::domain::types::*;
use super::{require_auth, BanRecord, ServerState};
/// Build a `ModerationService` from shared state.
fn mod_service(state: &ServerState) -> ModerationService {
ModerationService {
store: Arc::clone(&state.store),
}
}
/// Check whether the caller is an admin. Admins are identified by identity
/// key listed in `state.admin_keys`. Returns `Err(HandlerResult)` with
/// `Forbidden` status for non-admins.
fn require_admin(state: &ServerState, identity_key: &[u8]) -> Result<(), HandlerResult> {
if state.admin_keys.is_empty() {
// No admin list configured — allow all (backward-compatible MVP behavior).
return Ok(());
}
if state.admin_keys.iter().any(|k| k.as_slice() == identity_key) {
return Ok(());
}
Err(HandlerResult::err(
RpcStatus::Forbidden,
"admin role required",
))
}
/// Submit an encrypted report. Any authenticated user can report.
pub async fn handle_report_message(state: Arc<ServerState>, ctx: RequestContext) -> HandlerResult {
@@ -23,81 +54,91 @@ pub async fn handle_report_message(state: Arc<ServerState>, ctx: RequestContext)
Err(e) => return HandlerResult::err(RpcStatus::BadRequest, &format!("decode: {e}")),
};
if req.encrypted_report.is_empty() {
return HandlerResult::err(RpcStatus::BadRequest, "encrypted_report required");
let svc = mod_service(&state);
match svc.report_message(ReportMessageReq {
encrypted_report: req.encrypted_report,
conversation_id: req.conversation_id,
reporter_identity: identity_key.clone(),
}) {
Ok(resp) => {
info!(
reporter = hex::encode(&identity_key[..4.min(identity_key.len())]),
"moderation report submitted (persisted)"
);
let proto = v1::ReportMessageResponse {
accepted: resp.accepted,
};
HandlerResult::ok(Bytes::from(proto.encode_to_vec()))
}
Err(DomainError::BadParams(msg)) => HandlerResult::err(RpcStatus::BadRequest, &msg),
Err(e) => {
warn!(error = %e, "report_message failed");
HandlerResult::err(RpcStatus::Internal, "internal error")
}
}
let now = crate::auth::current_timestamp();
let report = {
let mut reports = match state.moderation_reports.lock() {
Ok(r) => r,
Err(e) => {
warn!("moderation_reports lock poisoned: {e}");
return HandlerResult::err(RpcStatus::Internal, "internal error");
}
};
let id = reports.len() as u64;
let report = ModerationReport {
id,
encrypted_report: req.encrypted_report,
conversation_id: req.conversation_id,
reporter_identity: identity_key.clone(),
timestamp: now,
};
reports.push(report.clone());
report
};
info!(
report_id = report.id,
reporter = hex::encode(&identity_key[..4.min(identity_key.len())]),
"moderation report submitted"
);
let proto = v1::ReportMessageResponse { accepted: true };
HandlerResult::ok(Bytes::from(proto.encode_to_vec()))
}
/// Ban a user. Requires admin role (currently: any authenticated user for MVP).
/// Ban a user. Requires admin role.
pub async fn handle_ban_user(state: Arc<ServerState>, ctx: RequestContext) -> HandlerResult {
let admin_key = match require_auth(&state, &ctx) {
Ok(ik) => ik,
Err(e) => return e,
};
if let Err(e) = require_admin(&state, &admin_key) {
return e;
}
let req = match v1::BanUserRequest::decode(ctx.payload) {
Ok(r) => r,
Err(e) => return HandlerResult::err(RpcStatus::BadRequest, &format!("decode: {e}")),
};
if req.identity_key.is_empty() || req.identity_key.len() != 32 {
return HandlerResult::err(RpcStatus::BadRequest, "identity_key must be 32 bytes");
}
let now = crate::auth::current_timestamp();
let expires_at = if req.duration_secs == 0 {
0 // permanent
} else {
now + req.duration_secs
};
let record = BanRecord {
let svc = mod_service(&state);
match svc.ban_user(BanUserReq {
identity_key: req.identity_key.clone(),
reason: req.reason.clone(),
banned_at: now,
expires_at,
};
state.banned_users.insert(req.identity_key.clone(), record);
duration_secs: req.duration_secs,
}) {
Ok(resp) => {
// Update hot cache so auth middleware picks it up immediately.
let now = crate::auth::current_timestamp();
let expires_at = if req.duration_secs == 0 {
0
} else {
now + req.duration_secs
};
state.banned_users.insert(
req.identity_key.clone(),
BanRecord {
reason: req.reason.clone(),
banned_at: now,
expires_at,
},
);
info!(
target_key = hex::encode(&req.identity_key[..4]),
admin_key = hex::encode(&admin_key[..4.min(admin_key.len())]),
reason = %req.reason,
duration_secs = req.duration_secs,
"user banned"
);
info!(
target_key = hex::encode(&req.identity_key[..4.min(req.identity_key.len())]),
admin_key = hex::encode(&admin_key[..4.min(admin_key.len())]),
reason = %req.reason,
duration_secs = req.duration_secs,
"user banned (persisted)"
);
let proto = v1::BanUserResponse { success: true };
HandlerResult::ok(Bytes::from(proto.encode_to_vec()))
let proto = v1::BanUserResponse {
success: resp.success,
};
HandlerResult::ok(Bytes::from(proto.encode_to_vec()))
}
Err(DomainError::InvalidIdentityKey(len)) => HandlerResult::err(
RpcStatus::BadRequest,
&format!("identity_key must be 32 bytes, got {len}"),
),
Err(e) => {
warn!(error = %e, "ban_user failed");
HandlerResult::err(RpcStatus::Internal, "internal error")
}
}
}
/// Unban a user. Requires admin role.
@@ -107,6 +148,10 @@ pub async fn handle_unban_user(state: Arc<ServerState>, ctx: RequestContext) ->
Err(e) => return e,
};
if let Err(e) = require_admin(&state, &admin_key) {
return e;
}
let req = match v1::UnbanUserRequest::decode(ctx.payload) {
Ok(r) => r,
Err(e) => return HandlerResult::err(RpcStatus::BadRequest, &format!("decode: {e}")),
@@ -116,84 +161,115 @@ pub async fn handle_unban_user(state: Arc<ServerState>, ctx: RequestContext) ->
return HandlerResult::err(RpcStatus::BadRequest, "identity_key required");
}
let removed = state.banned_users.remove(&req.identity_key).is_some();
let svc = mod_service(&state);
match svc.unban_user(UnbanUserReq {
identity_key: req.identity_key.clone(),
}) {
Ok(resp) => {
// Remove from hot cache.
state.banned_users.remove(&req.identity_key);
info!(
target_key = hex::encode(&req.identity_key[..4.min(req.identity_key.len())]),
admin_key = hex::encode(&admin_key[..4.min(admin_key.len())]),
removed,
"user unbanned"
);
info!(
target_key = hex::encode(&req.identity_key[..4.min(req.identity_key.len())]),
admin_key = hex::encode(&admin_key[..4.min(admin_key.len())]),
removed = resp.success,
"user unbanned (persisted)"
);
let proto = v1::UnbanUserResponse { success: removed };
HandlerResult::ok(Bytes::from(proto.encode_to_vec()))
let proto = v1::UnbanUserResponse {
success: resp.success,
};
HandlerResult::ok(Bytes::from(proto.encode_to_vec()))
}
Err(e) => {
warn!(error = %e, "unban_user failed");
HandlerResult::err(RpcStatus::Internal, "internal error")
}
}
}
/// List moderation reports. Requires admin role.
pub async fn handle_list_reports(state: Arc<ServerState>, ctx: RequestContext) -> HandlerResult {
let _admin_key = match require_auth(&state, &ctx) {
let admin_key = match require_auth(&state, &ctx) {
Ok(ik) => ik,
Err(e) => return e,
};
if let Err(e) = require_admin(&state, &admin_key) {
return e;
}
let req = match v1::ListReportsRequest::decode(ctx.payload) {
Ok(r) => r,
Err(e) => return HandlerResult::err(RpcStatus::BadRequest, &format!("decode: {e}")),
};
let reports = match state.moderation_reports.lock() {
Ok(r) => r,
Err(e) => {
warn!("moderation_reports lock poisoned: {e}");
return HandlerResult::err(RpcStatus::Internal, "internal error");
let limit = if req.limit == 0 { 50 } else { req.limit };
let svc = mod_service(&state);
match svc.list_reports(ListReportsReq {
limit,
offset: req.offset,
}) {
Ok(resp) => {
let entries: Vec<v1::ReportEntry> = resp
.reports
.into_iter()
.map(|r| v1::ReportEntry {
id: r.id,
encrypted_report: r.encrypted_report,
conversation_id: r.conversation_id,
reporter_identity: r.reporter_identity,
timestamp: r.timestamp,
})
.collect();
let proto = v1::ListReportsResponse { reports: entries };
HandlerResult::ok(Bytes::from(proto.encode_to_vec()))
}
};
let offset = req.offset as usize;
let limit = if req.limit == 0 { 50 } else { req.limit as usize };
let entries: Vec<v1::ReportEntry> = reports
.iter()
.skip(offset)
.take(limit)
.map(|r| v1::ReportEntry {
id: r.id,
encrypted_report: r.encrypted_report.clone(),
conversation_id: r.conversation_id.clone(),
reporter_identity: r.reporter_identity.clone(),
timestamp: r.timestamp,
})
.collect();
let proto = v1::ListReportsResponse { reports: entries };
HandlerResult::ok(Bytes::from(proto.encode_to_vec()))
Err(e) => {
warn!(error = %e, "list_reports failed");
HandlerResult::err(RpcStatus::Internal, "internal error")
}
}
}
/// List banned users.
/// List banned users. Requires admin role.
pub async fn handle_list_banned(state: Arc<ServerState>, ctx: RequestContext) -> HandlerResult {
let _admin_key = match require_auth(&state, &ctx) {
let admin_key = match require_auth(&state, &ctx) {
Ok(ik) => ik,
Err(e) => return e,
};
if let Err(e) = require_admin(&state, &admin_key) {
return e;
}
let _req = match v1::ListBannedRequest::decode(ctx.payload) {
Ok(r) => r,
Err(e) => return HandlerResult::err(RpcStatus::BadRequest, &format!("decode: {e}")),
};
let now = crate::auth::current_timestamp();
let entries: Vec<v1::BannedUserEntry> = state
.banned_users
.iter()
.filter(|entry| entry.expires_at == 0 || entry.expires_at > now)
.map(|entry| v1::BannedUserEntry {
identity_key: entry.key().clone(),
reason: entry.reason.clone(),
banned_at: entry.banned_at,
expires_at: entry.expires_at,
})
.collect();
let svc = mod_service(&state);
match svc.list_banned() {
Ok(resp) => {
let entries: Vec<v1::BannedUserEntry> = resp
.users
.into_iter()
.map(|u| v1::BannedUserEntry {
identity_key: u.identity_key,
reason: u.reason,
banned_at: u.banned_at,
expires_at: u.expires_at,
})
.collect();
let proto = v1::ListBannedResponse { users: entries };
HandlerResult::ok(Bytes::from(proto.encode_to_vec()))
let proto = v1::ListBannedResponse { users: entries };
HandlerResult::ok(Bytes::from(proto.encode_to_vec()))
}
Err(e) => {
warn!(error = %e, "list_banned failed");
HandlerResult::err(RpcStatus::Internal, "internal error")
}
}
}