//! Delivery domain logic — enqueue, fetch, peek, ack. //! //! Pure business logic operating on `Store` trait and domain types. //! //! ## Multi-device delivery //! //! When a message is enqueued for a recipient identity, the service resolves //! all registered device IDs for that identity and enqueues a copy of the //! payload to each device-scoped queue. The queue key is a composite of //! `identity_key + device_id`, so each device maintains its own sequence //! counter and ack state. //! //! If the recipient has no registered devices, the message is delivered to //! the bare `identity_key` queue (backwards compatible with single-device //! clients). use std::sync::Arc; use dashmap::DashMap; use tokio::sync::Notify; use crate::storage::Store; use super::types::*; /// Build a device-scoped recipient key: `identity_key || device_id`. /// When `device_id` is empty, returns a clone of `identity_key` (single-device compat). fn device_recipient_key(identity_key: &[u8], device_id: &[u8]) -> Vec { if device_id.is_empty() { return identity_key.to_vec(); } let mut key = Vec::with_capacity(identity_key.len() + device_id.len()); key.extend_from_slice(identity_key); key.extend_from_slice(device_id); key } /// Shared state needed by delivery operations. pub struct DeliveryService { pub store: Arc, pub waiters: Arc, Arc>>, } impl DeliveryService { /// Resolve the device-scoped recipient keys for an identity. /// Returns a list of composite keys (identity_key + device_id) for each /// registered device. If no devices are registered, returns a single-element /// list with the bare identity_key for backwards compatibility. fn resolve_device_keys(&self, identity_key: &[u8]) -> Vec> { let devices = self.store.list_devices(identity_key).unwrap_or_default(); if devices.is_empty() { vec![identity_key.to_vec()] } else { devices .into_iter() .map(|(device_id, _, _)| device_recipient_key(identity_key, &device_id)) .collect() } } /// Wake any long-polling waiter for the given recipient key. fn wake_waiter(&self, recipient_key: &[u8]) { if let Some(notify) = self.waiters.get(recipient_key) { notify.notify_one(); } } /// Enqueue a payload for delivery to all devices of the recipient. /// /// Returns the sequence number from the *first* device queue (for backwards /// compatibility with single-device callers). pub fn enqueue(&self, req: EnqueueReq) -> Result { let ttl = if req.ttl_secs > 0 { Some(req.ttl_secs) } else { None }; let device_keys = self.resolve_device_keys(&req.recipient_key); let mut first_seq = 0; for (i, dk) in device_keys.iter().enumerate() { let start = std::time::Instant::now(); let seq = self.store.enqueue( dk, &req.channel_id, req.payload.clone(), ttl, )?; crate::metrics::record_storage_latency("enqueue", start.elapsed()); if i == 0 { first_seq = seq; } self.wake_waiter(dk); } // Also wake the bare identity_key waiter (legacy clients). self.wake_waiter(&req.recipient_key); Ok(EnqueueResp { seq: first_seq, delivery_proof: Vec::new(), // Proof generated at RPC handler layer (see v2_handlers/delivery.rs) }) } /// Fetch and drain queued messages for a specific device. /// /// The `recipient_key` should be the device-scoped composite key /// (`identity_key + device_id`) or bare `identity_key` for single-device. pub fn fetch(&self, req: FetchReq) -> Result { let start = std::time::Instant::now(); let messages = if req.limit > 0 { self.store .fetch_limited(&req.recipient_key, &req.channel_id, req.limit as usize)? } else { self.store.fetch(&req.recipient_key, &req.channel_id)? }; crate::metrics::record_storage_latency("fetch", start.elapsed()); Ok(FetchResp { payloads: messages .into_iter() .map(|(seq, data)| Envelope { seq, data }) .collect(), }) } /// Peek at messages without removing them. pub fn peek(&self, req: PeekReq) -> Result { let messages = self.store.peek( &req.recipient_key, &req.channel_id, if req.limit > 0 { req.limit as usize } else { 0 }, )?; Ok(PeekResp { payloads: messages .into_iter() .map(|(seq, data)| Envelope { seq, data }) .collect(), }) } /// Acknowledge messages up to a sequence number. pub fn ack(&self, req: AckReq) -> Result<(), crate::storage::StorageError> { self.store .ack(&req.recipient_key, &req.channel_id, req.seq_up_to)?; Ok(()) } /// Batch enqueue to multiple recipients (with multi-device fan-out for each). /// /// Returns one sequence number per recipient identity (from the first device queue). pub fn batch_enqueue( &self, req: BatchEnqueueReq, ) -> Result { let ttl = if req.ttl_secs > 0 { Some(req.ttl_secs) } else { None }; let mut seqs = Vec::with_capacity(req.recipient_keys.len()); for rk in &req.recipient_keys { let device_keys = self.resolve_device_keys(rk); let mut first_seq = 0; for (i, dk) in device_keys.iter().enumerate() { let seq = self.store.enqueue(dk, &req.channel_id, req.payload.clone(), ttl)?; if i == 0 { first_seq = seq; } self.wake_waiter(dk); } self.wake_waiter(rk); seqs.push(first_seq); } Ok(BatchEnqueueResp { seqs }) } /// Build a device-scoped recipient key from identity_key and device_id. /// Public helper for RPC handlers to build the correct fetch/ack key. pub fn device_recipient_key(identity_key: &[u8], device_id: &[u8]) -> Vec { device_recipient_key(identity_key, device_id) } } #[cfg(test)] #[allow(clippy::unwrap_used)] mod tests { use super::*; use crate::storage::FileBackedStore; fn test_service() -> (tempfile::TempDir, DeliveryService) { let dir = tempfile::tempdir().unwrap(); let store = Arc::new(FileBackedStore::open(dir.path()).unwrap()); let svc = DeliveryService { store, waiters: Arc::new(DashMap::new()), }; (dir, svc) } #[test] fn enqueue_single_device_backwards_compat() { let (_dir, svc) = test_service(); let ik = vec![1u8; 32]; let ch = vec![0u8; 16]; // No devices registered — should enqueue to bare identity_key. let resp = svc .enqueue(EnqueueReq { recipient_key: ik.clone(), payload: b"hello".to_vec(), channel_id: ch.clone(), ttl_secs: 0, }) .unwrap(); assert_eq!(resp.seq, 0); // Fetch from bare identity_key. let fetched = svc .fetch(FetchReq { recipient_key: ik, channel_id: ch, limit: 10, }) .unwrap(); assert_eq!(fetched.payloads.len(), 1); assert_eq!(fetched.payloads[0].data, b"hello"); } #[test] fn enqueue_multi_device_fanout() { let (_dir, svc) = test_service(); let ik = vec![2u8; 32]; let ch = vec![0u8; 16]; let dev_a = b"device-a".to_vec(); let dev_b = b"device-b".to_vec(); // Register two devices. svc.store .register_device(&ik, &dev_a, "Phone") .unwrap(); svc.store .register_device(&ik, &dev_b, "Laptop") .unwrap(); // Enqueue a message. svc.enqueue(EnqueueReq { recipient_key: ik.clone(), payload: b"fanout-msg".to_vec(), channel_id: ch.clone(), ttl_secs: 0, }) .unwrap(); // Each device should receive the message on its own queue. let key_a = device_recipient_key(&ik, &dev_a); let key_b = device_recipient_key(&ik, &dev_b); let msgs_a = svc .fetch(FetchReq { recipient_key: key_a, channel_id: ch.clone(), limit: 10, }) .unwrap(); assert_eq!(msgs_a.payloads.len(), 1); assert_eq!(msgs_a.payloads[0].data, b"fanout-msg"); let msgs_b = svc .fetch(FetchReq { recipient_key: key_b, channel_id: ch.clone(), limit: 10, }) .unwrap(); assert_eq!(msgs_b.payloads.len(), 1); assert_eq!(msgs_b.payloads[0].data, b"fanout-msg"); // Bare identity_key queue should be empty (not used when devices exist). let msgs_bare = svc .fetch(FetchReq { recipient_key: ik, channel_id: ch, limit: 10, }) .unwrap(); assert!(msgs_bare.payloads.is_empty()); } #[test] fn batch_enqueue_multi_device() { let (_dir, svc) = test_service(); let ik1 = vec![3u8; 32]; let ik2 = vec![4u8; 32]; let ch = vec![0u8; 16]; let dev = b"dev1".to_vec(); // ik1 has a device, ik2 has none. svc.store .register_device(&ik1, &dev, "Phone") .unwrap(); let resp = svc .batch_enqueue(BatchEnqueueReq { recipient_keys: vec![ik1.clone(), ik2.clone()], payload: b"batch-msg".to_vec(), channel_id: ch.clone(), ttl_secs: 0, }) .unwrap(); assert_eq!(resp.seqs.len(), 2); // ik1 device should have the message. let key_1 = device_recipient_key(&ik1, &dev); let msgs_1 = svc .fetch(FetchReq { recipient_key: key_1, channel_id: ch.clone(), limit: 10, }) .unwrap(); assert_eq!(msgs_1.payloads.len(), 1); // ik2 (no devices) should have it on bare key. let msgs_2 = svc .fetch(FetchReq { recipient_key: ik2, channel_id: ch, limit: 10, }) .unwrap(); assert_eq!(msgs_2.payloads.len(), 1); } #[test] fn device_recipient_key_construction() { let ik = vec![1u8; 32]; let dev = b"my-device".to_vec(); // With device_id. let key = device_recipient_key(&ik, &dev); assert_eq!(key.len(), 32 + dev.len()); assert_eq!(&key[..32], &ik[..]); assert_eq!(&key[32..], dev.as_slice()); // Empty device_id returns bare identity_key. let bare = device_recipient_key(&ik, &[]); assert_eq!(bare, ik); } }