//! In-memory store-and-forward message queue for mesh nodes. //! //! [`MeshStore`] buffers [`MeshEnvelope`]s for offline recipients and //! provides deduplication and automatic garbage collection of expired messages. use std::collections::{HashMap, HashSet, VecDeque}; use crate::envelope::MeshEnvelope; /// Default maximum messages stored per recipient. const DEFAULT_MAX_STORED: usize = 1000; /// Maximum number of envelope IDs retained in the seen set for deduplication. /// Once exceeded, the oldest IDs are evicted to bound memory growth. const MAX_SEEN_IDS: usize = 100_000; /// In-memory store-and-forward queue keyed by recipient public key. pub struct MeshStore { /// Recipient public key -> queued envelopes. inbox: HashMap, Vec>, /// Set of envelope IDs already processed (deduplication). seen: HashSet<[u8; 32]>, /// Insertion-ordered queue of seen IDs for bounded eviction. seen_order: VecDeque<[u8; 32]>, /// Maximum envelopes held per recipient. max_stored: usize, } impl MeshStore { /// Create a new store with the given per-recipient capacity. /// /// A `max_stored` of 0 uses [`DEFAULT_MAX_STORED`]. pub fn new(max_stored: usize) -> Self { Self { inbox: HashMap::new(), seen: HashSet::new(), seen_order: VecDeque::new(), max_stored: if max_stored == 0 { DEFAULT_MAX_STORED } else { max_stored }, } } /// Store an envelope for later delivery. /// /// Returns `false` (without storing) if: /// - the envelope ID has already been seen (dedup), or /// - the recipient's inbox is at capacity. pub fn store(&mut self, envelope: MeshEnvelope) -> bool { if self.seen.contains(&envelope.id) { return false; } let queue = self.inbox.entry(envelope.recipient_key.clone()).or_default(); if queue.len() >= self.max_stored { return false; } self.seen.insert(envelope.id); self.seen_order.push_back(envelope.id); // Evict oldest seen IDs if the set exceeds the bound. while self.seen_order.len() > MAX_SEEN_IDS { if let Some(old_id) = self.seen_order.pop_front() { self.seen.remove(&old_id); } } queue.push(envelope); true } /// Drain and return all queued messages for `recipient_key`. pub fn fetch(&mut self, recipient_key: &[u8]) -> Vec { self.inbox.remove(recipient_key).unwrap_or_default() } /// Peek at queued messages for `recipient_key` without draining. pub fn peek(&self, recipient_key: &[u8]) -> &[MeshEnvelope] { self.inbox .get(recipient_key) .map(|v| v.as_slice()) .unwrap_or_default() } /// Remove all expired envelopes from every inbox and return the count removed. pub fn gc_expired(&mut self) -> usize { let mut removed = 0; self.inbox.retain(|_key, queue| { let before = queue.len(); queue.retain(|env| !env.is_expired()); removed += before - queue.len(); !queue.is_empty() }); removed } /// Check whether an envelope ID has already been processed. pub fn seen(&self, id: &[u8; 32]) -> bool { self.seen.contains(id) } /// Return `(total_messages, unique_recipients)`. pub fn stats(&self) -> (usize, usize) { let total: usize = self.inbox.values().map(|q| q.len()).sum(); let recipients = self.inbox.len(); (total, recipients) } } #[cfg(test)] mod tests { use super::*; use crate::identity::MeshIdentity; fn make_envelope(recipient: &[u8], payload: &[u8], ttl: u32) -> MeshEnvelope { let id = MeshIdentity::generate(); MeshEnvelope::new(&id, recipient, payload.to_vec(), ttl, 5) } #[test] fn store_and_fetch() { let mut store = MeshStore::new(10); let recip = [0xAAu8; 32]; let env = make_envelope(&recip, b"hello", 3600); assert!(store.store(env)); assert_eq!(store.stats(), (1, 1)); let msgs = store.fetch(&recip); assert_eq!(msgs.len(), 1); assert_eq!(msgs[0].payload, b"hello"); // After fetch, inbox is drained. assert_eq!(store.stats(), (0, 0)); } #[test] fn deduplication() { let mut store = MeshStore::new(10); let recip = [0xBBu8; 32]; let env = make_envelope(&recip, b"dup", 3600); let env2 = env.clone(); assert!(store.store(env)); assert!(!store.store(env2), "duplicate should be rejected"); assert_eq!(store.stats(), (1, 1)); } #[test] fn capacity_limit() { let mut store = MeshStore::new(2); let recip = [0xCCu8; 32]; assert!(store.store(make_envelope(&recip, b"1", 3600))); assert!(store.store(make_envelope(&recip, b"2", 3600))); assert!( !store.store(make_envelope(&recip, b"3", 3600)), "should reject when at capacity" ); assert_eq!(store.stats(), (2, 1)); } #[test] fn gc_expired_messages() { let mut store = MeshStore::new(10); let recip = [0xDDu8; 32]; // Create an already-expired envelope (TTL=0, timestamp in the past). let id = MeshIdentity::generate(); let mut env = MeshEnvelope::new(&id, &recip, b"old".to_vec(), 0, 5); env.timestamp = 0; // far in the past store.store(env); // And a fresh one. store.store(make_envelope(&recip, b"fresh", 3600)); assert_eq!(store.stats(), (2, 1)); let removed = store.gc_expired(); assert_eq!(removed, 1); assert_eq!(store.stats(), (1, 1)); } #[test] fn peek_does_not_drain() { let mut store = MeshStore::new(10); let recip = [0xEEu8; 32]; store.store(make_envelope(&recip, b"peek", 3600)); assert_eq!(store.peek(&recip).len(), 1); assert_eq!(store.peek(&recip).len(), 1); // still there assert_eq!(store.stats(), (1, 1)); } #[test] fn seen_tracks_processed_ids() { let mut store = MeshStore::new(10); let env = make_envelope(&[0xFF; 32], b"track", 3600); let id = env.id; assert!(!store.seen(&id)); store.store(env); assert!(store.seen(&id)); } #[test] fn fetch_empty_inbox() { let mut store = MeshStore::new(10); let msgs = store.fetch(&[0x00; 32]); assert!(msgs.is_empty()); } #[test] fn peek_empty_inbox() { let store = MeshStore::new(10); assert!(store.peek(&[0x00; 32]).is_empty()); } }