//! v2 RPC handler dispatch — protobuf in, domain logic, protobuf out. use std::path::PathBuf; use std::sync::atomic::AtomicBool; use std::sync::Arc; use dashmap::DashMap; use opaque_ke::ServerSetup; use quicproquo_core::opaque_auth::OpaqueSuite; use quicproquo_proto::method_ids; use quicproquo_rpc::error::RpcStatus; use quicproquo_rpc::method::{HandlerResult, MethodRegistry, RequestContext}; use tokio::sync::Notify; use crate::audit::AuditLogger; use crate::auth::{AuthConfig, PendingLogin, RateEntry, SessionInfo}; use crate::hooks::ServerHooks; use crate::storage::Store; pub mod account; pub mod auth; pub mod blob; pub mod channel; pub mod delivery; pub mod device; pub mod federation; pub mod group; pub mod keys; pub mod moderation; pub mod p2p; pub mod recovery; pub mod user; /// Shared server state accessible by all v2 RPC handlers. pub struct ServerState { pub store: Arc, pub waiters: Arc, Arc>>, pub auth_cfg: Arc, pub opaque_setup: Arc>, pub pending_logins: Arc>, pub sessions: Arc, SessionInfo>>, pub rate_limits: Arc, RateEntry>>, pub sealed_sender: bool, pub hooks: Arc, pub signing_key: Arc, pub kt_log: Arc>, pub revocation_log: Arc>, pub data_dir: PathBuf, pub redact_logs: bool, /// Structured audit logger for security-relevant events. pub audit_logger: Arc, /// When true, the server is draining and will reject new work. /// Health endpoint returns "draining" status so load balancers stop routing. pub draining: Arc, /// Idempotency dedup: message_id -> (seq, timestamp). TTL-cleaned by cleanup task. pub seen_message_ids: Arc, (u64, u64)>>, /// Banned users: identity_key -> BanRecord. pub banned_users: Arc, BanRecord>>, /// Moderation reports (append-only). pub moderation_reports: Arc>>, /// Unique node identifier for multi-node health reporting. pub node_id: String, /// Process start time for uptime calculation. pub start_time: std::time::Instant, /// Storage backend name (e.g. "sql", "file"). pub storage_backend: String, } /// A ban record for a user. #[derive(Debug, Clone)] pub struct BanRecord { pub reason: String, pub banned_at: u64, /// 0 = permanent. pub expires_at: u64, } /// A stored moderation report. #[derive(Debug, Clone)] pub struct ModerationReport { pub id: u64, pub encrypted_report: Vec, pub conversation_id: Vec, pub reporter_identity: Vec, pub timestamp: u64, } /// Validate the session token from the request context and return the /// authenticated caller's identity key. Returns an Unauthorized HandlerResult /// on failure. pub fn require_auth(state: &ServerState, ctx: &RequestContext) -> Result, HandlerResult> { let token = ctx .session_token .as_deref() .or(ctx.identity_key.as_deref()) .unwrap_or(&[]); if token.is_empty() { return Err(HandlerResult::err( RpcStatus::Unauthorized, "missing session token", )); } // Check session store. if let Some(session) = state.sessions.get(token) { let now = crate::auth::current_timestamp(); if session.expires_at > now && !session.identity_key.is_empty() { // Check ban status. if let Some(ban) = state.banned_users.get(&session.identity_key) { if ban.expires_at == 0 || ban.expires_at > now { return Err(HandlerResult::err( RpcStatus::Forbidden, "account banned", )); } // Ban expired — remove it. drop(ban); state.banned_users.remove(&session.identity_key); } return Ok(session.identity_key.clone()); } } // Fall back to static bearer token (dev mode). if state.auth_cfg.allow_insecure_identity_from_request { if let Some(ik) = ctx.identity_key.as_deref() { if !ik.is_empty() { return Ok(ik.to_vec()); } } } Err(HandlerResult::err( RpcStatus::Unauthorized, "invalid or expired session token", )) } /// Map a domain error to an RPC HandlerResult error. pub fn domain_err(e: crate::domain::types::DomainError) -> HandlerResult { use crate::domain::types::DomainError; match &e { DomainError::InvalidIdentityKey(_) | DomainError::EmptyPackage | DomainError::EmptyHybridKey | DomainError::EmptyUsername | DomainError::BlobHashLength(_) | DomainError::BadParams(_) => HandlerResult::err(RpcStatus::BadRequest, &e.to_string()), DomainError::BlobNotFound | DomainError::DeviceNotFound | DomainError::GroupNotFound => { HandlerResult::err(RpcStatus::NotFound, &e.to_string()) } DomainError::PackageTooLarge(_) | DomainError::BlobTooLarge(_) => { HandlerResult::err(RpcStatus::BadRequest, &e.to_string()) } DomainError::BlobHashMismatch => { HandlerResult::err(RpcStatus::BadRequest, &e.to_string()) } DomainError::DeviceLimit(_) => HandlerResult::err(RpcStatus::Forbidden, &e.to_string()), DomainError::Io(_) | DomainError::Storage(_) => { HandlerResult::err(RpcStatus::Internal, &e.to_string()) } } } /// Build the v2 method registry with all handlers registered. /// /// `default_rpc_timeout` sets the server-wide per-RPC timeout. Individual methods /// (e.g. blob upload, health) may override this with shorter or longer values. pub fn build_registry(default_rpc_timeout: std::time::Duration) -> MethodRegistry { let mut reg = MethodRegistry::new(); reg.set_default_timeout(default_rpc_timeout); // Auth (100-103) reg.register( method_ids::OPAQUE_REGISTER_START, "OpaqueRegisterStart", auth::handle_opaque_register_start, ); reg.register( method_ids::OPAQUE_REGISTER_FINISH, "OpaqueRegisterFinish", auth::handle_opaque_register_finish, ); reg.register( method_ids::OPAQUE_LOGIN_START, "OpaqueLoginStart", auth::handle_opaque_login_start, ); reg.register( method_ids::OPAQUE_LOGIN_FINISH, "OpaqueLoginFinish", auth::handle_opaque_login_finish, ); // Delivery (200-205) reg.register(method_ids::ENQUEUE, "Enqueue", delivery::handle_enqueue); reg.register(method_ids::FETCH, "Fetch", delivery::handle_fetch); reg.register( method_ids::FETCH_WAIT, "FetchWait", delivery::handle_fetch_wait, ); reg.register(method_ids::PEEK, "Peek", delivery::handle_peek); reg.register(method_ids::ACK, "Ack", delivery::handle_ack); reg.register( method_ids::BATCH_ENQUEUE, "BatchEnqueue", delivery::handle_batch_enqueue, ); // Keys (300-304) reg.register( method_ids::UPLOAD_KEY_PACKAGE, "UploadKeyPackage", keys::handle_upload_key_package, ); reg.register( method_ids::FETCH_KEY_PACKAGE, "FetchKeyPackage", keys::handle_fetch_key_package, ); reg.register( method_ids::UPLOAD_HYBRID_KEY, "UploadHybridKey", keys::handle_upload_hybrid_key, ); reg.register( method_ids::FETCH_HYBRID_KEY, "FetchHybridKey", keys::handle_fetch_hybrid_key, ); reg.register( method_ids::FETCH_HYBRID_KEYS, "FetchHybridKeys", keys::handle_fetch_hybrid_keys, ); // Channel (400) reg.register( method_ids::CREATE_CHANNEL, "CreateChannel", channel::handle_create_channel, ); // Group management (410-413) reg.register( method_ids::REMOVE_MEMBER, "RemoveMember", group::handle_remove_member, ); reg.register( method_ids::UPDATE_GROUP_METADATA, "UpdateGroupMetadata", group::handle_update_group_metadata, ); reg.register( method_ids::LIST_GROUP_MEMBERS, "ListGroupMembers", group::handle_list_group_members, ); reg.register( method_ids::ROTATE_KEYS, "RotateKeys", group::handle_rotate_keys, ); // User (500-501) reg.register( method_ids::RESOLVE_USER, "ResolveUser", user::handle_resolve_user, ); reg.register( method_ids::RESOLVE_IDENTITY, "ResolveIdentity", user::handle_resolve_identity, ); // Key Transparency (510-520) reg.register( method_ids::REVOKE_KEY, "RevokeKey", user::handle_revoke_key, ); reg.register( method_ids::CHECK_REVOCATION, "CheckRevocation", user::handle_check_revocation, ); reg.register( method_ids::AUDIT_KEY_TRANSPARENCY, "AuditKeyTransparency", user::handle_audit_key_transparency, ); // Blob (600-601) — longer timeout for file transfers. reg.register_with_timeout( method_ids::UPLOAD_BLOB, "UploadBlob", std::time::Duration::from_secs(120), blob::handle_upload_blob, ); reg.register_with_timeout( method_ids::DOWNLOAD_BLOB, "DownloadBlob", std::time::Duration::from_secs(120), blob::handle_download_blob, ); // Device (700-702) reg.register( method_ids::REGISTER_DEVICE, "RegisterDevice", device::handle_register_device, ); reg.register( method_ids::LIST_DEVICES, "ListDevices", device::handle_list_devices, ); reg.register( method_ids::REVOKE_DEVICE, "RevokeDevice", device::handle_revoke_device, ); // P2P (800-802) reg.register( method_ids::PUBLISH_ENDPOINT, "PublishEndpoint", p2p::handle_publish_endpoint, ); reg.register( method_ids::RESOLVE_ENDPOINT, "ResolveEndpoint", p2p::handle_resolve_endpoint, ); reg.register_with_timeout( method_ids::HEALTH, "Health", std::time::Duration::from_secs(5), p2p::handle_health, ); // Federation (900-905) reg.register( method_ids::RELAY_ENQUEUE, "RelayEnqueue", federation::handle_relay_enqueue, ); reg.register( method_ids::RELAY_BATCH_ENQUEUE, "RelayBatchEnqueue", federation::handle_relay_batch_enqueue, ); reg.register( method_ids::PROXY_FETCH_KEY_PACKAGE, "ProxyFetchKeyPackage", federation::handle_proxy_fetch_key_package, ); reg.register( method_ids::PROXY_FETCH_HYBRID_KEY, "ProxyFetchHybridKey", federation::handle_proxy_fetch_hybrid_key, ); reg.register( method_ids::PROXY_RESOLVE_USER, "ProxyResolveUser", federation::handle_proxy_resolve_user, ); reg.register( method_ids::FEDERATION_HEALTH, "FederationHealth", federation::handle_federation_health, ); // Moderation (420-424) reg.register( method_ids::REPORT_MESSAGE, "ReportMessage", moderation::handle_report_message, ); reg.register( method_ids::BAN_USER, "BanUser", moderation::handle_ban_user, ); reg.register( method_ids::UNBAN_USER, "UnbanUser", moderation::handle_unban_user, ); reg.register( method_ids::LIST_REPORTS, "ListReports", moderation::handle_list_reports, ); reg.register( method_ids::LIST_BANNED, "ListBanned", moderation::handle_list_banned, ); // Recovery (750-752) reg.register( method_ids::STORE_RECOVERY_BUNDLE, "StoreRecoveryBundle", recovery::handle_store_recovery_bundle, ); reg.register( method_ids::FETCH_RECOVERY_BUNDLE, "FetchRecoveryBundle", recovery::handle_fetch_recovery_bundle, ); reg.register( method_ids::DELETE_RECOVERY_BUNDLE, "DeleteRecoveryBundle", recovery::handle_delete_recovery_bundle, ); // Account (950) reg.register( method_ids::DELETE_ACCOUNT, "DeleteAccount", account::handle_delete_account, ); reg }