Files
quicproquo/crates/quicprochat-p2p/src/store.rs
Christian Nennemann a710037dde chore: rename quicproquo → quicprochat in Rust workspace
Rename all crate directories, package names, binary names, proto
package/module paths, ALPN strings, env var prefixes, config filenames,
mDNS service names, and plugin ABI symbols from quicproquo/qpq to
quicprochat/qpc.
2026-03-21 19:14:06 +01:00

219 lines
6.7 KiB
Rust

//! In-memory store-and-forward message queue for mesh nodes.
//!
//! [`MeshStore`] buffers [`MeshEnvelope`]s for offline recipients and
//! provides deduplication and automatic garbage collection of expired messages.
use std::collections::{HashMap, HashSet, VecDeque};
use crate::envelope::MeshEnvelope;
/// Default maximum messages stored per recipient.
const DEFAULT_MAX_STORED: usize = 1000;
/// Maximum number of envelope IDs retained in the seen set for deduplication.
/// Once exceeded, the oldest IDs are evicted to bound memory growth.
const MAX_SEEN_IDS: usize = 100_000;
/// In-memory store-and-forward queue keyed by recipient public key.
pub struct MeshStore {
/// Recipient public key -> queued envelopes.
inbox: HashMap<Vec<u8>, Vec<MeshEnvelope>>,
/// Set of envelope IDs already processed (deduplication).
seen: HashSet<[u8; 32]>,
/// Insertion-ordered queue of seen IDs for bounded eviction.
seen_order: VecDeque<[u8; 32]>,
/// Maximum envelopes held per recipient.
max_stored: usize,
}
impl MeshStore {
/// Create a new store with the given per-recipient capacity.
///
/// A `max_stored` of 0 uses [`DEFAULT_MAX_STORED`].
pub fn new(max_stored: usize) -> Self {
Self {
inbox: HashMap::new(),
seen: HashSet::new(),
seen_order: VecDeque::new(),
max_stored: if max_stored == 0 {
DEFAULT_MAX_STORED
} else {
max_stored
},
}
}
/// Store an envelope for later delivery.
///
/// Returns `false` (without storing) if:
/// - the envelope ID has already been seen (dedup), or
/// - the recipient's inbox is at capacity.
pub fn store(&mut self, envelope: MeshEnvelope) -> bool {
if self.seen.contains(&envelope.id) {
return false;
}
let queue = self.inbox.entry(envelope.recipient_key.clone()).or_default();
if queue.len() >= self.max_stored {
return false;
}
self.seen.insert(envelope.id);
self.seen_order.push_back(envelope.id);
// Evict oldest seen IDs if the set exceeds the bound.
while self.seen_order.len() > MAX_SEEN_IDS {
if let Some(old_id) = self.seen_order.pop_front() {
self.seen.remove(&old_id);
}
}
queue.push(envelope);
true
}
/// Drain and return all queued messages for `recipient_key`.
pub fn fetch(&mut self, recipient_key: &[u8]) -> Vec<MeshEnvelope> {
self.inbox.remove(recipient_key).unwrap_or_default()
}
/// Peek at queued messages for `recipient_key` without draining.
pub fn peek(&self, recipient_key: &[u8]) -> &[MeshEnvelope] {
self.inbox
.get(recipient_key)
.map(|v| v.as_slice())
.unwrap_or_default()
}
/// Remove all expired envelopes from every inbox and return the count removed.
pub fn gc_expired(&mut self) -> usize {
let mut removed = 0;
self.inbox.retain(|_key, queue| {
let before = queue.len();
queue.retain(|env| !env.is_expired());
removed += before - queue.len();
!queue.is_empty()
});
removed
}
/// Check whether an envelope ID has already been processed.
pub fn seen(&self, id: &[u8; 32]) -> bool {
self.seen.contains(id)
}
/// Return `(total_messages, unique_recipients)`.
pub fn stats(&self) -> (usize, usize) {
let total: usize = self.inbox.values().map(|q| q.len()).sum();
let recipients = self.inbox.len();
(total, recipients)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::identity::MeshIdentity;
fn make_envelope(recipient: &[u8], payload: &[u8], ttl: u32) -> MeshEnvelope {
let id = MeshIdentity::generate();
MeshEnvelope::new(&id, recipient, payload.to_vec(), ttl, 5)
}
#[test]
fn store_and_fetch() {
let mut store = MeshStore::new(10);
let recip = [0xAAu8; 32];
let env = make_envelope(&recip, b"hello", 3600);
assert!(store.store(env));
assert_eq!(store.stats(), (1, 1));
let msgs = store.fetch(&recip);
assert_eq!(msgs.len(), 1);
assert_eq!(msgs[0].payload, b"hello");
// After fetch, inbox is drained.
assert_eq!(store.stats(), (0, 0));
}
#[test]
fn deduplication() {
let mut store = MeshStore::new(10);
let recip = [0xBBu8; 32];
let env = make_envelope(&recip, b"dup", 3600);
let env2 = env.clone();
assert!(store.store(env));
assert!(!store.store(env2), "duplicate should be rejected");
assert_eq!(store.stats(), (1, 1));
}
#[test]
fn capacity_limit() {
let mut store = MeshStore::new(2);
let recip = [0xCCu8; 32];
assert!(store.store(make_envelope(&recip, b"1", 3600)));
assert!(store.store(make_envelope(&recip, b"2", 3600)));
assert!(
!store.store(make_envelope(&recip, b"3", 3600)),
"should reject when at capacity"
);
assert_eq!(store.stats(), (2, 1));
}
#[test]
fn gc_expired_messages() {
let mut store = MeshStore::new(10);
let recip = [0xDDu8; 32];
// Create an already-expired envelope (TTL=0, timestamp in the past).
let id = MeshIdentity::generate();
let mut env = MeshEnvelope::new(&id, &recip, b"old".to_vec(), 0, 5);
env.timestamp = 0; // far in the past
store.store(env);
// And a fresh one.
store.store(make_envelope(&recip, b"fresh", 3600));
assert_eq!(store.stats(), (2, 1));
let removed = store.gc_expired();
assert_eq!(removed, 1);
assert_eq!(store.stats(), (1, 1));
}
#[test]
fn peek_does_not_drain() {
let mut store = MeshStore::new(10);
let recip = [0xEEu8; 32];
store.store(make_envelope(&recip, b"peek", 3600));
assert_eq!(store.peek(&recip).len(), 1);
assert_eq!(store.peek(&recip).len(), 1); // still there
assert_eq!(store.stats(), (1, 1));
}
#[test]
fn seen_tracks_processed_ids() {
let mut store = MeshStore::new(10);
let env = make_envelope(&[0xFF; 32], b"track", 3600);
let id = env.id;
assert!(!store.seen(&id));
store.store(env);
assert!(store.seen(&id));
}
#[test]
fn fetch_empty_inbox() {
let mut store = MeshStore::new(10);
let msgs = store.fetch(&[0x00; 32]);
assert!(msgs.is_empty());
}
#[test]
fn peek_empty_inbox() {
let store = MeshStore::new(10);
assert!(store.peek(&[0x00; 32]).is_empty());
}
}