diff --git a/crates/quicproquo-server/src/domain/delivery.rs b/crates/quicproquo-server/src/domain/delivery.rs index cefdc86..6883d89 100644 --- a/crates/quicproquo-server/src/domain/delivery.rs +++ b/crates/quicproquo-server/src/domain/delivery.rs @@ -1,6 +1,18 @@ //! Delivery domain logic — enqueue, fetch, peek, ack. //! //! Pure business logic operating on `Store` trait and domain types. +//! +//! ## Multi-device delivery +//! +//! When a message is enqueued for a recipient identity, the service resolves +//! all registered device IDs for that identity and enqueues a copy of the +//! payload to each device-scoped queue. The queue key is a composite of +//! `identity_key + device_id`, so each device maintains its own sequence +//! counter and ack state. +//! +//! If the recipient has no registered devices, the message is delivered to +//! the bare `identity_key` queue (backwards compatible with single-device +//! clients). use std::sync::Arc; @@ -11,6 +23,18 @@ use crate::storage::Store; use super::types::*; +/// Build a device-scoped recipient key: `identity_key || device_id`. +/// When `device_id` is empty, returns a clone of `identity_key` (single-device compat). +fn device_recipient_key(identity_key: &[u8], device_id: &[u8]) -> Vec { + if device_id.is_empty() { + return identity_key.to_vec(); + } + let mut key = Vec::with_capacity(identity_key.len() + device_id.len()); + key.extend_from_slice(identity_key); + key.extend_from_slice(device_id); + key +} + /// Shared state needed by delivery operations. pub struct DeliveryService { pub store: Arc, @@ -18,7 +42,33 @@ pub struct DeliveryService { } impl DeliveryService { - /// Enqueue a payload for delivery. + /// Resolve the device-scoped recipient keys for an identity. + /// Returns a list of composite keys (identity_key + device_id) for each + /// registered device. If no devices are registered, returns a single-element + /// list with the bare identity_key for backwards compatibility. + fn resolve_device_keys(&self, identity_key: &[u8]) -> Vec> { + let devices = self.store.list_devices(identity_key).unwrap_or_default(); + if devices.is_empty() { + vec![identity_key.to_vec()] + } else { + devices + .into_iter() + .map(|(device_id, _, _)| device_recipient_key(identity_key, &device_id)) + .collect() + } + } + + /// Wake any long-polling waiter for the given recipient key. + fn wake_waiter(&self, recipient_key: &[u8]) { + if let Some(notify) = self.waiters.get(recipient_key) { + notify.notify_one(); + } + } + + /// Enqueue a payload for delivery to all devices of the recipient. + /// + /// Returns the sequence number from the *first* device queue (for backwards + /// compatibility with single-device callers). pub fn enqueue(&self, req: EnqueueReq) -> Result { let ttl = if req.ttl_secs > 0 { Some(req.ttl_secs) @@ -26,25 +76,35 @@ impl DeliveryService { None }; - let seq = self.store.enqueue( - &req.recipient_key, - &req.channel_id, - req.payload, - ttl, - )?; + let device_keys = self.resolve_device_keys(&req.recipient_key); + let mut first_seq = 0; - // Wake any long-polling waiter for this recipient. - if let Some(notify) = self.waiters.get(&req.recipient_key) { - notify.notify_one(); + for (i, dk) in device_keys.iter().enumerate() { + let seq = self.store.enqueue( + dk, + &req.channel_id, + req.payload.clone(), + ttl, + )?; + if i == 0 { + first_seq = seq; + } + self.wake_waiter(dk); } + // Also wake the bare identity_key waiter (legacy clients). + self.wake_waiter(&req.recipient_key); + Ok(EnqueueResp { - seq, + seq: first_seq, delivery_proof: Vec::new(), // TODO: sign in Phase 2 }) } - /// Fetch and drain queued messages. + /// Fetch and drain queued messages for a specific device. + /// + /// The `recipient_key` should be the device-scoped composite key + /// (`identity_key + device_id`) or bare `identity_key` for single-device. pub fn fetch(&self, req: FetchReq) -> Result { let messages = if req.limit > 0 { self.store @@ -84,7 +144,9 @@ impl DeliveryService { Ok(()) } - /// Batch enqueue to multiple recipients. + /// Batch enqueue to multiple recipients (with multi-device fan-out for each). + /// + /// Returns one sequence number per recipient identity (from the first device queue). pub fn batch_enqueue( &self, req: BatchEnqueueReq, @@ -97,14 +159,194 @@ impl DeliveryService { let mut seqs = Vec::with_capacity(req.recipient_keys.len()); for rk in &req.recipient_keys { - let seq = self.store.enqueue(rk, &req.channel_id, req.payload.clone(), ttl)?; - seqs.push(seq); + let device_keys = self.resolve_device_keys(rk); + let mut first_seq = 0; - if let Some(notify) = self.waiters.get(rk) { - notify.notify_one(); + for (i, dk) in device_keys.iter().enumerate() { + let seq = self.store.enqueue(dk, &req.channel_id, req.payload.clone(), ttl)?; + if i == 0 { + first_seq = seq; + } + self.wake_waiter(dk); } + + self.wake_waiter(rk); + seqs.push(first_seq); } Ok(BatchEnqueueResp { seqs }) } + + /// Build a device-scoped recipient key from identity_key and device_id. + /// Public helper for RPC handlers to build the correct fetch/ack key. + pub fn device_recipient_key(identity_key: &[u8], device_id: &[u8]) -> Vec { + device_recipient_key(identity_key, device_id) + } +} + +#[cfg(test)] +#[allow(clippy::unwrap_used)] +mod tests { + use super::*; + use crate::storage::FileBackedStore; + + fn test_service() -> (tempfile::TempDir, DeliveryService) { + let dir = tempfile::tempdir().unwrap(); + let store = Arc::new(FileBackedStore::open(dir.path()).unwrap()); + let svc = DeliveryService { + store, + waiters: Arc::new(DashMap::new()), + }; + (dir, svc) + } + + #[test] + fn enqueue_single_device_backwards_compat() { + let (_dir, svc) = test_service(); + let ik = vec![1u8; 32]; + let ch = vec![0u8; 16]; + + // No devices registered — should enqueue to bare identity_key. + let resp = svc + .enqueue(EnqueueReq { + recipient_key: ik.clone(), + payload: b"hello".to_vec(), + channel_id: ch.clone(), + ttl_secs: 0, + }) + .unwrap(); + assert_eq!(resp.seq, 0); + + // Fetch from bare identity_key. + let fetched = svc + .fetch(FetchReq { + recipient_key: ik, + channel_id: ch, + limit: 10, + }) + .unwrap(); + assert_eq!(fetched.payloads.len(), 1); + assert_eq!(fetched.payloads[0].data, b"hello"); + } + + #[test] + fn enqueue_multi_device_fanout() { + let (_dir, svc) = test_service(); + let ik = vec![2u8; 32]; + let ch = vec![0u8; 16]; + let dev_a = b"device-a".to_vec(); + let dev_b = b"device-b".to_vec(); + + // Register two devices. + svc.store + .register_device(&ik, &dev_a, "Phone") + .unwrap(); + svc.store + .register_device(&ik, &dev_b, "Laptop") + .unwrap(); + + // Enqueue a message. + svc.enqueue(EnqueueReq { + recipient_key: ik.clone(), + payload: b"fanout-msg".to_vec(), + channel_id: ch.clone(), + ttl_secs: 0, + }) + .unwrap(); + + // Each device should receive the message on its own queue. + let key_a = device_recipient_key(&ik, &dev_a); + let key_b = device_recipient_key(&ik, &dev_b); + + let msgs_a = svc + .fetch(FetchReq { + recipient_key: key_a, + channel_id: ch.clone(), + limit: 10, + }) + .unwrap(); + assert_eq!(msgs_a.payloads.len(), 1); + assert_eq!(msgs_a.payloads[0].data, b"fanout-msg"); + + let msgs_b = svc + .fetch(FetchReq { + recipient_key: key_b, + channel_id: ch.clone(), + limit: 10, + }) + .unwrap(); + assert_eq!(msgs_b.payloads.len(), 1); + assert_eq!(msgs_b.payloads[0].data, b"fanout-msg"); + + // Bare identity_key queue should be empty (not used when devices exist). + let msgs_bare = svc + .fetch(FetchReq { + recipient_key: ik, + channel_id: ch, + limit: 10, + }) + .unwrap(); + assert!(msgs_bare.payloads.is_empty()); + } + + #[test] + fn batch_enqueue_multi_device() { + let (_dir, svc) = test_service(); + let ik1 = vec![3u8; 32]; + let ik2 = vec![4u8; 32]; + let ch = vec![0u8; 16]; + let dev = b"dev1".to_vec(); + + // ik1 has a device, ik2 has none. + svc.store + .register_device(&ik1, &dev, "Phone") + .unwrap(); + + let resp = svc + .batch_enqueue(BatchEnqueueReq { + recipient_keys: vec![ik1.clone(), ik2.clone()], + payload: b"batch-msg".to_vec(), + channel_id: ch.clone(), + ttl_secs: 0, + }) + .unwrap(); + assert_eq!(resp.seqs.len(), 2); + + // ik1 device should have the message. + let key_1 = device_recipient_key(&ik1, &dev); + let msgs_1 = svc + .fetch(FetchReq { + recipient_key: key_1, + channel_id: ch.clone(), + limit: 10, + }) + .unwrap(); + assert_eq!(msgs_1.payloads.len(), 1); + + // ik2 (no devices) should have it on bare key. + let msgs_2 = svc + .fetch(FetchReq { + recipient_key: ik2, + channel_id: ch, + limit: 10, + }) + .unwrap(); + assert_eq!(msgs_2.payloads.len(), 1); + } + + #[test] + fn device_recipient_key_construction() { + let ik = vec![1u8; 32]; + let dev = b"my-device".to_vec(); + + // With device_id. + let key = device_recipient_key(&ik, &dev); + assert_eq!(key.len(), 32 + dev.len()); + assert_eq!(&key[..32], &ik[..]); + assert_eq!(&key[32..], dev.as_slice()); + + // Empty device_id returns bare identity_key. + let bare = device_recipient_key(&ik, &[]); + assert_eq!(bare, ik); + } } diff --git a/crates/quicproquo-server/src/storage.rs b/crates/quicproquo-server/src/storage.rs index 303baaa..1b5b071 100644 --- a/crates/quicproquo-server/src/storage.rs +++ b/crates/quicproquo-server/src/storage.rs @@ -429,6 +429,18 @@ pub struct FileBackedStore { } impl FileBackedStore { + /// Create an ephemeral in-memory store backed by a tempdir. + /// Intended for unit tests only — the tempdir is leaked (cleaned up by OS). + #[cfg(test)] + #[allow(dead_code)] + pub fn ephemeral() -> Self { + let dir = tempfile::tempdir().expect("failed to create tempdir"); + let store = Self::open(dir.path()).expect("failed to open ephemeral store"); + // Leak the tempdir so it isn't deleted while the store is alive. + std::mem::forget(dir); + store + } + pub fn open(dir: impl AsRef) -> Result { let dir = dir.as_ref(); if !dir.exists() {