//! Delivery handlers — enqueue, fetch, fetch_wait, peek, ack, batch_enqueue. use std::sync::Arc; use bytes::Bytes; use prost::Message; use quicprochat_proto::qpc::v1; use quicprochat_rpc::error::RpcStatus; use quicprochat_rpc::method::{HandlerResult, RequestContext}; use sha2::{Digest, Sha256}; use tokio::sync::Notify; use crate::domain::delivery::DeliveryService; use crate::domain::types::{AckReq, BatchEnqueueReq, EnqueueReq, FetchReq, PeekReq}; use crate::hooks::{HookAction, MessageEvent}; use super::{require_auth, ServerState}; /// Build a 96-byte delivery proof: `SHA-256(seq || recipient_key || timestamp_ms) || Ed25519(hash)`. /// /// The sender stores this as cryptographic evidence that the server enqueued the message. fn build_delivery_proof( signing_key: &quicprochat_core::IdentityKeypair, seq: u64, recipient_key: &[u8], timestamp_ms: u64, ) -> Vec { let mut hasher = Sha256::new(); hasher.update(seq.to_le_bytes()); hasher.update(recipient_key); hasher.update(timestamp_ms.to_le_bytes()); let hash: [u8; 32] = hasher.finalize().into(); let sig = signing_key.sign_raw(&hash); let mut proof = vec![0u8; 96]; proof[..32].copy_from_slice(&hash); proof[32..].copy_from_slice(&sig); proof } pub async fn handle_enqueue(state: Arc, ctx: RequestContext) -> HandlerResult { let identity_key = match require_auth(&state, &ctx) { Ok(ik) => ik, Err(e) => return e, }; let req = match v1::EnqueueRequest::decode(ctx.payload) { Ok(r) => r, Err(e) => return HandlerResult::err(RpcStatus::BadRequest, &format!("decode: {e}")), }; if req.recipient_key.is_empty() || req.payload.is_empty() { return HandlerResult::err(RpcStatus::BadRequest, "recipient_key and payload required"); } // Rate limiting. if let Err(_e) = crate::auth::check_rate_limit(&state.rate_limits, &identity_key) { return HandlerResult::err(RpcStatus::RateLimited, "rate limit exceeded"); } // Idempotency dedup: if message_id is provided and already seen, return the cached seq. if !req.message_id.is_empty() { if let Some(entry) = state.seen_message_ids.get(&req.message_id) { let (cached_seq, _ts) = *entry; let proto = v1::EnqueueResponse { seq: cached_seq, delivery_proof: Vec::new(), duplicate: true, }; return HandlerResult::ok(Bytes::from(proto.encode_to_vec())); } } let svc = DeliveryService { store: Arc::clone(&state.store), waiters: Arc::clone(&state.waiters), }; let domain_req = EnqueueReq { recipient_key: req.recipient_key.clone(), payload: req.payload.clone(), channel_id: req.channel_id.clone(), ttl_secs: req.ttl_secs, }; match svc.enqueue(domain_req) { Ok(resp) => { // Record message_id for dedup. if !req.message_id.is_empty() { let now = crate::auth::current_timestamp(); state.seen_message_ids.insert(req.message_id, (resp.seq, now)); } // Fire hook. let action = state.hooks.on_message_enqueue(&MessageEvent { sender_identity: Some(identity_key), recipient_key: req.recipient_key.clone(), channel_id: req.channel_id, payload_len: req.payload.len(), seq: resp.seq, }); if let HookAction::Reject(reason) = action { return HandlerResult::err(RpcStatus::Forbidden, &reason); } // Build server-signed delivery proof. let timestamp_ms = crate::auth::current_timestamp(); let delivery_proof = build_delivery_proof( &state.signing_key, resp.seq, &req.recipient_key, timestamp_ms, ); let proto = v1::EnqueueResponse { seq: resp.seq, delivery_proof, duplicate: false, }; HandlerResult::ok(Bytes::from(proto.encode_to_vec())) } Err(e) => HandlerResult::err(RpcStatus::Internal, &format!("enqueue: {e}")), } } pub async fn handle_fetch(state: Arc, ctx: RequestContext) -> HandlerResult { let identity_key = match require_auth(&state, &ctx) { Ok(ik) => ik, Err(e) => return e, }; let req = match v1::FetchRequest::decode(ctx.payload) { Ok(r) => r, Err(e) => return HandlerResult::err(RpcStatus::BadRequest, &format!("decode: {e}")), }; let svc = DeliveryService { store: Arc::clone(&state.store), waiters: Arc::clone(&state.waiters), }; let base_key = if req.recipient_key.is_empty() { identity_key } else { req.recipient_key }; let recipient_key = if req.device_id.is_empty() { base_key } else { DeliveryService::device_recipient_key(&base_key, &req.device_id) }; let domain_req = FetchReq { recipient_key, channel_id: req.channel_id, limit: req.limit, }; match svc.fetch(domain_req) { Ok(resp) => { let proto = v1::FetchResponse { payloads: resp .payloads .into_iter() .map(|e| v1::Envelope { seq: e.seq, data: e.data, }) .collect(), }; HandlerResult::ok(Bytes::from(proto.encode_to_vec())) } Err(e) => HandlerResult::err(RpcStatus::Internal, &format!("fetch: {e}")), } } pub async fn handle_fetch_wait(state: Arc, ctx: RequestContext) -> HandlerResult { let identity_key = match require_auth(&state, &ctx) { Ok(ik) => ik, Err(e) => return e, }; let req = match v1::FetchWaitRequest::decode(ctx.payload) { Ok(r) => r, Err(e) => return HandlerResult::err(RpcStatus::BadRequest, &format!("decode: {e}")), }; let base_key = if req.recipient_key.is_empty() { identity_key } else { req.recipient_key }; let recipient_key = if req.device_id.is_empty() { base_key } else { DeliveryService::device_recipient_key(&base_key, &req.device_id) }; let timeout_ms = if req.timeout_ms == 0 { 30_000 } else { req.timeout_ms.min(60_000) }; let svc = DeliveryService { store: Arc::clone(&state.store), waiters: Arc::clone(&state.waiters), }; // Try immediate fetch first. let fetch_req = FetchReq { recipient_key: recipient_key.clone(), channel_id: req.channel_id.clone(), limit: req.limit, }; match svc.fetch(fetch_req) { Ok(resp) if !resp.payloads.is_empty() => { let proto = v1::FetchWaitResponse { payloads: resp .payloads .into_iter() .map(|e| v1::Envelope { seq: e.seq, data: e.data, }) .collect(), }; return HandlerResult::ok(Bytes::from(proto.encode_to_vec())); } Err(e) => { return HandlerResult::err(RpcStatus::Internal, &format!("fetch: {e}")); } _ => {} } // Long-poll: wait for notification or timeout. let notify = state .waiters .entry(recipient_key.clone()) .or_insert_with(|| Arc::new(Notify::new())) .clone(); let timeout = tokio::time::Duration::from_millis(timeout_ms); let _ = tokio::time::timeout(timeout, notify.notified()).await; // Re-fetch after wake or timeout. let fetch_req = FetchReq { recipient_key, channel_id: req.channel_id, limit: req.limit, }; match svc.fetch(fetch_req) { Ok(resp) => { let proto = v1::FetchWaitResponse { payloads: resp .payloads .into_iter() .map(|e| v1::Envelope { seq: e.seq, data: e.data, }) .collect(), }; HandlerResult::ok(Bytes::from(proto.encode_to_vec())) } Err(e) => HandlerResult::err(RpcStatus::Internal, &format!("fetch: {e}")), } } pub async fn handle_peek(state: Arc, ctx: RequestContext) -> HandlerResult { let identity_key = match require_auth(&state, &ctx) { Ok(ik) => ik, Err(e) => return e, }; let req = match v1::PeekRequest::decode(ctx.payload) { Ok(r) => r, Err(e) => return HandlerResult::err(RpcStatus::BadRequest, &format!("decode: {e}")), }; let svc = DeliveryService { store: Arc::clone(&state.store), waiters: Arc::clone(&state.waiters), }; let base_key = if req.recipient_key.is_empty() { identity_key } else { req.recipient_key }; let recipient_key = if req.device_id.is_empty() { base_key } else { DeliveryService::device_recipient_key(&base_key, &req.device_id) }; let domain_req = PeekReq { recipient_key, channel_id: req.channel_id, limit: req.limit, }; match svc.peek(domain_req) { Ok(resp) => { let proto = v1::PeekResponse { payloads: resp .payloads .into_iter() .map(|e| v1::Envelope { seq: e.seq, data: e.data, }) .collect(), }; HandlerResult::ok(Bytes::from(proto.encode_to_vec())) } Err(e) => HandlerResult::err(RpcStatus::Internal, &format!("peek: {e}")), } } pub async fn handle_ack(state: Arc, ctx: RequestContext) -> HandlerResult { let identity_key = match require_auth(&state, &ctx) { Ok(ik) => ik, Err(e) => return e, }; let req = match v1::AckRequest::decode(ctx.payload) { Ok(r) => r, Err(e) => return HandlerResult::err(RpcStatus::BadRequest, &format!("decode: {e}")), }; let svc = DeliveryService { store: Arc::clone(&state.store), waiters: Arc::clone(&state.waiters), }; let base_key = if req.recipient_key.is_empty() { identity_key } else { req.recipient_key }; let recipient_key = if req.device_id.is_empty() { base_key } else { DeliveryService::device_recipient_key(&base_key, &req.device_id) }; let domain_req = AckReq { recipient_key, channel_id: req.channel_id, seq_up_to: req.seq_up_to, }; match svc.ack(domain_req) { Ok(()) => { let proto = v1::AckResponse {}; HandlerResult::ok(Bytes::from(proto.encode_to_vec())) } Err(e) => HandlerResult::err(RpcStatus::Internal, &format!("ack: {e}")), } } pub async fn handle_batch_enqueue(state: Arc, ctx: RequestContext) -> HandlerResult { let identity_key = match require_auth(&state, &ctx) { Ok(ik) => ik, Err(e) => return e, }; let req = match v1::BatchEnqueueRequest::decode(ctx.payload) { Ok(r) => r, Err(e) => return HandlerResult::err(RpcStatus::BadRequest, &format!("decode: {e}")), }; if req.recipient_keys.is_empty() || req.payload.is_empty() { return HandlerResult::err( RpcStatus::BadRequest, "recipient_keys and payload required", ); } // Rate limiting. if let Err(_e) = crate::auth::check_rate_limit(&state.rate_limits, &identity_key) { return HandlerResult::err(RpcStatus::RateLimited, "rate limit exceeded"); } let svc = DeliveryService { store: Arc::clone(&state.store), waiters: Arc::clone(&state.waiters), }; let domain_req = BatchEnqueueReq { recipient_keys: req.recipient_keys, payload: req.payload, channel_id: req.channel_id, ttl_secs: req.ttl_secs, }; match svc.batch_enqueue(domain_req) { Ok(resp) => { let proto = v1::BatchEnqueueResponse { seqs: resp.seqs }; HandlerResult::ok(Bytes::from(proto.encode_to_vec())) } Err(e) => HandlerResult::err(RpcStatus::Internal, &format!("batch_enqueue: {e}")), } }