feat(server): add multi-device delivery fan-out

Enqueue now resolves all registered devices for a recipient identity
and fans out the message to each device-scoped queue. Single-device
clients remain backwards compatible (bare identity_key queue).

Also adds FileBackedStore::ephemeral() test helper.
This commit is contained in:
2026-03-04 20:15:26 +01:00
parent 12b19b6931
commit eaca24397b
2 changed files with 271 additions and 17 deletions

View File

@@ -1,6 +1,18 @@
//! Delivery domain logic — enqueue, fetch, peek, ack. //! Delivery domain logic — enqueue, fetch, peek, ack.
//! //!
//! Pure business logic operating on `Store` trait and domain types. //! Pure business logic operating on `Store` trait and domain types.
//!
//! ## Multi-device delivery
//!
//! When a message is enqueued for a recipient identity, the service resolves
//! all registered device IDs for that identity and enqueues a copy of the
//! payload to each device-scoped queue. The queue key is a composite of
//! `identity_key + device_id`, so each device maintains its own sequence
//! counter and ack state.
//!
//! If the recipient has no registered devices, the message is delivered to
//! the bare `identity_key` queue (backwards compatible with single-device
//! clients).
use std::sync::Arc; use std::sync::Arc;
@@ -11,6 +23,18 @@ use crate::storage::Store;
use super::types::*; use super::types::*;
/// Build a device-scoped recipient key: `identity_key || device_id`.
/// When `device_id` is empty, returns a clone of `identity_key` (single-device compat).
fn device_recipient_key(identity_key: &[u8], device_id: &[u8]) -> Vec<u8> {
if device_id.is_empty() {
return identity_key.to_vec();
}
let mut key = Vec::with_capacity(identity_key.len() + device_id.len());
key.extend_from_slice(identity_key);
key.extend_from_slice(device_id);
key
}
/// Shared state needed by delivery operations. /// Shared state needed by delivery operations.
pub struct DeliveryService { pub struct DeliveryService {
pub store: Arc<dyn Store>, pub store: Arc<dyn Store>,
@@ -18,7 +42,33 @@ pub struct DeliveryService {
} }
impl DeliveryService { impl DeliveryService {
/// Enqueue a payload for delivery. /// Resolve the device-scoped recipient keys for an identity.
/// Returns a list of composite keys (identity_key + device_id) for each
/// registered device. If no devices are registered, returns a single-element
/// list with the bare identity_key for backwards compatibility.
fn resolve_device_keys(&self, identity_key: &[u8]) -> Vec<Vec<u8>> {
let devices = self.store.list_devices(identity_key).unwrap_or_default();
if devices.is_empty() {
vec![identity_key.to_vec()]
} else {
devices
.into_iter()
.map(|(device_id, _, _)| device_recipient_key(identity_key, &device_id))
.collect()
}
}
/// Wake any long-polling waiter for the given recipient key.
fn wake_waiter(&self, recipient_key: &[u8]) {
if let Some(notify) = self.waiters.get(recipient_key) {
notify.notify_one();
}
}
/// Enqueue a payload for delivery to all devices of the recipient.
///
/// Returns the sequence number from the *first* device queue (for backwards
/// compatibility with single-device callers).
pub fn enqueue(&self, req: EnqueueReq) -> Result<EnqueueResp, crate::storage::StorageError> { pub fn enqueue(&self, req: EnqueueReq) -> Result<EnqueueResp, crate::storage::StorageError> {
let ttl = if req.ttl_secs > 0 { let ttl = if req.ttl_secs > 0 {
Some(req.ttl_secs) Some(req.ttl_secs)
@@ -26,25 +76,35 @@ impl DeliveryService {
None None
}; };
let seq = self.store.enqueue( let device_keys = self.resolve_device_keys(&req.recipient_key);
&req.recipient_key, let mut first_seq = 0;
&req.channel_id,
req.payload,
ttl,
)?;
// Wake any long-polling waiter for this recipient. for (i, dk) in device_keys.iter().enumerate() {
if let Some(notify) = self.waiters.get(&req.recipient_key) { let seq = self.store.enqueue(
notify.notify_one(); dk,
&req.channel_id,
req.payload.clone(),
ttl,
)?;
if i == 0 {
first_seq = seq;
}
self.wake_waiter(dk);
} }
// Also wake the bare identity_key waiter (legacy clients).
self.wake_waiter(&req.recipient_key);
Ok(EnqueueResp { Ok(EnqueueResp {
seq, seq: first_seq,
delivery_proof: Vec::new(), // TODO: sign in Phase 2 delivery_proof: Vec::new(), // TODO: sign in Phase 2
}) })
} }
/// Fetch and drain queued messages. /// Fetch and drain queued messages for a specific device.
///
/// The `recipient_key` should be the device-scoped composite key
/// (`identity_key + device_id`) or bare `identity_key` for single-device.
pub fn fetch(&self, req: FetchReq) -> Result<FetchResp, crate::storage::StorageError> { pub fn fetch(&self, req: FetchReq) -> Result<FetchResp, crate::storage::StorageError> {
let messages = if req.limit > 0 { let messages = if req.limit > 0 {
self.store self.store
@@ -84,7 +144,9 @@ impl DeliveryService {
Ok(()) Ok(())
} }
/// Batch enqueue to multiple recipients. /// Batch enqueue to multiple recipients (with multi-device fan-out for each).
///
/// Returns one sequence number per recipient identity (from the first device queue).
pub fn batch_enqueue( pub fn batch_enqueue(
&self, &self,
req: BatchEnqueueReq, req: BatchEnqueueReq,
@@ -97,14 +159,194 @@ impl DeliveryService {
let mut seqs = Vec::with_capacity(req.recipient_keys.len()); let mut seqs = Vec::with_capacity(req.recipient_keys.len());
for rk in &req.recipient_keys { for rk in &req.recipient_keys {
let seq = self.store.enqueue(rk, &req.channel_id, req.payload.clone(), ttl)?; let device_keys = self.resolve_device_keys(rk);
seqs.push(seq); let mut first_seq = 0;
if let Some(notify) = self.waiters.get(rk) { for (i, dk) in device_keys.iter().enumerate() {
notify.notify_one(); let seq = self.store.enqueue(dk, &req.channel_id, req.payload.clone(), ttl)?;
if i == 0 {
first_seq = seq;
}
self.wake_waiter(dk);
} }
self.wake_waiter(rk);
seqs.push(first_seq);
} }
Ok(BatchEnqueueResp { seqs }) Ok(BatchEnqueueResp { seqs })
} }
/// Build a device-scoped recipient key from identity_key and device_id.
/// Public helper for RPC handlers to build the correct fetch/ack key.
pub fn device_recipient_key(identity_key: &[u8], device_id: &[u8]) -> Vec<u8> {
device_recipient_key(identity_key, device_id)
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
use crate::storage::FileBackedStore;
fn test_service() -> (tempfile::TempDir, DeliveryService) {
let dir = tempfile::tempdir().unwrap();
let store = Arc::new(FileBackedStore::open(dir.path()).unwrap());
let svc = DeliveryService {
store,
waiters: Arc::new(DashMap::new()),
};
(dir, svc)
}
#[test]
fn enqueue_single_device_backwards_compat() {
let (_dir, svc) = test_service();
let ik = vec![1u8; 32];
let ch = vec![0u8; 16];
// No devices registered — should enqueue to bare identity_key.
let resp = svc
.enqueue(EnqueueReq {
recipient_key: ik.clone(),
payload: b"hello".to_vec(),
channel_id: ch.clone(),
ttl_secs: 0,
})
.unwrap();
assert_eq!(resp.seq, 0);
// Fetch from bare identity_key.
let fetched = svc
.fetch(FetchReq {
recipient_key: ik,
channel_id: ch,
limit: 10,
})
.unwrap();
assert_eq!(fetched.payloads.len(), 1);
assert_eq!(fetched.payloads[0].data, b"hello");
}
#[test]
fn enqueue_multi_device_fanout() {
let (_dir, svc) = test_service();
let ik = vec![2u8; 32];
let ch = vec![0u8; 16];
let dev_a = b"device-a".to_vec();
let dev_b = b"device-b".to_vec();
// Register two devices.
svc.store
.register_device(&ik, &dev_a, "Phone")
.unwrap();
svc.store
.register_device(&ik, &dev_b, "Laptop")
.unwrap();
// Enqueue a message.
svc.enqueue(EnqueueReq {
recipient_key: ik.clone(),
payload: b"fanout-msg".to_vec(),
channel_id: ch.clone(),
ttl_secs: 0,
})
.unwrap();
// Each device should receive the message on its own queue.
let key_a = device_recipient_key(&ik, &dev_a);
let key_b = device_recipient_key(&ik, &dev_b);
let msgs_a = svc
.fetch(FetchReq {
recipient_key: key_a,
channel_id: ch.clone(),
limit: 10,
})
.unwrap();
assert_eq!(msgs_a.payloads.len(), 1);
assert_eq!(msgs_a.payloads[0].data, b"fanout-msg");
let msgs_b = svc
.fetch(FetchReq {
recipient_key: key_b,
channel_id: ch.clone(),
limit: 10,
})
.unwrap();
assert_eq!(msgs_b.payloads.len(), 1);
assert_eq!(msgs_b.payloads[0].data, b"fanout-msg");
// Bare identity_key queue should be empty (not used when devices exist).
let msgs_bare = svc
.fetch(FetchReq {
recipient_key: ik,
channel_id: ch,
limit: 10,
})
.unwrap();
assert!(msgs_bare.payloads.is_empty());
}
#[test]
fn batch_enqueue_multi_device() {
let (_dir, svc) = test_service();
let ik1 = vec![3u8; 32];
let ik2 = vec![4u8; 32];
let ch = vec![0u8; 16];
let dev = b"dev1".to_vec();
// ik1 has a device, ik2 has none.
svc.store
.register_device(&ik1, &dev, "Phone")
.unwrap();
let resp = svc
.batch_enqueue(BatchEnqueueReq {
recipient_keys: vec![ik1.clone(), ik2.clone()],
payload: b"batch-msg".to_vec(),
channel_id: ch.clone(),
ttl_secs: 0,
})
.unwrap();
assert_eq!(resp.seqs.len(), 2);
// ik1 device should have the message.
let key_1 = device_recipient_key(&ik1, &dev);
let msgs_1 = svc
.fetch(FetchReq {
recipient_key: key_1,
channel_id: ch.clone(),
limit: 10,
})
.unwrap();
assert_eq!(msgs_1.payloads.len(), 1);
// ik2 (no devices) should have it on bare key.
let msgs_2 = svc
.fetch(FetchReq {
recipient_key: ik2,
channel_id: ch,
limit: 10,
})
.unwrap();
assert_eq!(msgs_2.payloads.len(), 1);
}
#[test]
fn device_recipient_key_construction() {
let ik = vec![1u8; 32];
let dev = b"my-device".to_vec();
// With device_id.
let key = device_recipient_key(&ik, &dev);
assert_eq!(key.len(), 32 + dev.len());
assert_eq!(&key[..32], &ik[..]);
assert_eq!(&key[32..], dev.as_slice());
// Empty device_id returns bare identity_key.
let bare = device_recipient_key(&ik, &[]);
assert_eq!(bare, ik);
}
} }

View File

@@ -429,6 +429,18 @@ pub struct FileBackedStore {
} }
impl FileBackedStore { impl FileBackedStore {
/// Create an ephemeral in-memory store backed by a tempdir.
/// Intended for unit tests only — the tempdir is leaked (cleaned up by OS).
#[cfg(test)]
#[allow(dead_code)]
pub fn ephemeral() -> Self {
let dir = tempfile::tempdir().expect("failed to create tempdir");
let store = Self::open(dir.path()).expect("failed to open ephemeral store");
// Leak the tempdir so it isn't deleted while the store is alive.
std::mem::forget(dir);
store
}
pub fn open(dir: impl AsRef<Path>) -> Result<Self, StorageError> { pub fn open(dir: impl AsRef<Path>) -> Result<Self, StorageError> {
let dir = dir.as_ref(); let dir = dir.as_ref();
if !dir.exists() { if !dir.exists() {