diff --git a/crates/quicproquo-server/src/v2_handlers/delivery.rs b/crates/quicproquo-server/src/v2_handlers/delivery.rs index c9323d6..fc266d6 100644 --- a/crates/quicproquo-server/src/v2_handlers/delivery.rs +++ b/crates/quicproquo-server/src/v2_handlers/delivery.rs @@ -35,6 +35,19 @@ pub async fn handle_enqueue(state: Arc, ctx: RequestContext) -> Han return HandlerResult::err(RpcStatus::RateLimited, "rate limit exceeded"); } + // Idempotency dedup: if message_id is provided and already seen, return the cached seq. + if !req.message_id.is_empty() { + if let Some(entry) = state.seen_message_ids.get(&req.message_id) { + let (cached_seq, _ts) = *entry; + let proto = v1::EnqueueResponse { + seq: cached_seq, + delivery_proof: Vec::new(), + duplicate: true, + }; + return HandlerResult::ok(Bytes::from(proto.encode_to_vec())); + } + } + let svc = DeliveryService { store: Arc::clone(&state.store), waiters: Arc::clone(&state.waiters), @@ -49,6 +62,12 @@ pub async fn handle_enqueue(state: Arc, ctx: RequestContext) -> Han match svc.enqueue(domain_req) { Ok(resp) => { + // Record message_id for dedup. + if !req.message_id.is_empty() { + let now = crate::auth::current_timestamp(); + state.seen_message_ids.insert(req.message_id, (resp.seq, now)); + } + // Fire hook. let action = state.hooks.on_message_enqueue(&MessageEvent { sender_identity: Some(identity_key), @@ -64,6 +83,7 @@ pub async fn handle_enqueue(state: Arc, ctx: RequestContext) -> Han let proto = v1::EnqueueResponse { seq: resp.seq, delivery_proof: resp.delivery_proof, + duplicate: false, }; HandlerResult::ok(Bytes::from(proto.encode_to_vec())) } @@ -87,12 +107,19 @@ pub async fn handle_fetch(state: Arc, ctx: RequestContext) -> Handl waiters: Arc::clone(&state.waiters), }; + let base_key = if req.recipient_key.is_empty() { + identity_key + } else { + req.recipient_key + }; + let recipient_key = if req.device_id.is_empty() { + base_key + } else { + DeliveryService::device_recipient_key(&base_key, &req.device_id) + }; + let domain_req = FetchReq { - recipient_key: if req.recipient_key.is_empty() { - identity_key - } else { - req.recipient_key - }, + recipient_key, channel_id: req.channel_id, limit: req.limit, }; @@ -126,11 +153,16 @@ pub async fn handle_fetch_wait(state: Arc, ctx: RequestContext) -> Err(e) => return HandlerResult::err(RpcStatus::BadRequest, &format!("decode: {e}")), }; - let recipient_key = if req.recipient_key.is_empty() { + let base_key = if req.recipient_key.is_empty() { identity_key } else { req.recipient_key }; + let recipient_key = if req.device_id.is_empty() { + base_key + } else { + DeliveryService::device_recipient_key(&base_key, &req.device_id) + }; let timeout_ms = if req.timeout_ms == 0 { 30_000 @@ -221,12 +253,19 @@ pub async fn handle_peek(state: Arc, ctx: RequestContext) -> Handle waiters: Arc::clone(&state.waiters), }; + let base_key = if req.recipient_key.is_empty() { + identity_key + } else { + req.recipient_key + }; + let recipient_key = if req.device_id.is_empty() { + base_key + } else { + DeliveryService::device_recipient_key(&base_key, &req.device_id) + }; + let domain_req = PeekReq { - recipient_key: if req.recipient_key.is_empty() { - identity_key - } else { - req.recipient_key - }, + recipient_key, channel_id: req.channel_id, limit: req.limit, }; @@ -265,12 +304,19 @@ pub async fn handle_ack(state: Arc, ctx: RequestContext) -> Handler waiters: Arc::clone(&state.waiters), }; + let base_key = if req.recipient_key.is_empty() { + identity_key + } else { + req.recipient_key + }; + let recipient_key = if req.device_id.is_empty() { + base_key + } else { + DeliveryService::device_recipient_key(&base_key, &req.device_id) + }; + let domain_req = AckReq { - recipient_key: if req.recipient_key.is_empty() { - identity_key - } else { - req.recipient_key - }, + recipient_key, channel_id: req.channel_id, seq_up_to: req.seq_up_to, }; diff --git a/proto/qpq/v1/delivery.proto b/proto/qpq/v1/delivery.proto index 66d4b88..630564e 100644 --- a/proto/qpq/v1/delivery.proto +++ b/proto/qpq/v1/delivery.proto @@ -14,17 +14,25 @@ message EnqueueRequest { bytes payload = 2; bytes channel_id = 3; uint32 ttl_secs = 4; + // Client-generated idempotency key (16 bytes, UUID v7). + // Server deduplicates enqueue requests with the same message_id within a TTL window. + bytes message_id = 5; } message EnqueueResponse { uint64 seq = 1; bytes delivery_proof = 2; + // True if this was a duplicate enqueue (message_id already seen). + bool duplicate = 3; } message FetchRequest { bytes recipient_key = 1; bytes channel_id = 2; uint32 limit = 3; + // Device ID for multi-device scoping. When set, the server builds + // a composite queue key (identity_key + device_id). Empty = bare key. + bytes device_id = 4; } message FetchResponse { @@ -36,6 +44,7 @@ message FetchWaitRequest { bytes channel_id = 2; uint64 timeout_ms = 3; uint32 limit = 4; + bytes device_id = 5; } message FetchWaitResponse { @@ -46,6 +55,7 @@ message PeekRequest { bytes recipient_key = 1; bytes channel_id = 2; uint32 limit = 3; + bytes device_id = 4; } message PeekResponse { @@ -56,6 +66,7 @@ message AckRequest { bytes recipient_key = 1; bytes channel_id = 2; uint64 seq_up_to = 3; + bytes device_id = 4; } message AckResponse {} @@ -65,6 +76,8 @@ message BatchEnqueueRequest { bytes payload = 2; bytes channel_id = 3; uint32 ttl_secs = 4; + // Client-generated idempotency key (16 bytes, UUID v7). + bytes message_id = 5; } message BatchEnqueueResponse {