feat: Sprint 9 — mesh identity, store-and-forward, broadcast channels
Self-sovereign mesh networking for offline-capable Freifunk deployments. - MeshIdentity: Ed25519 keypair-based identity without AS registration, JSON-persisted seed + known peers directory, sign/verify - MeshEnvelope: signed store-and-forward envelope with TTL, hop_count, max_hops, SHA-256 dedup ID, Ed25519 signature verification - MeshStore: in-memory message queue with dedup, per-recipient capacity limits, TTL-based garbage collection - BroadcastChannel: symmetric ChaCha20-Poly1305 encrypted topic-based pub/sub for mesh announcements, no MLS overhead - BroadcastManager: subscribe/unsubscribe/create channels by topic - P2pNode integration: send_mesh(), receive_mesh(), forward_stored(), subscribe(), create_broadcast(), broadcast() - Extended mesh REPL: /mesh send, /mesh broadcast, /mesh subscribe, /mesh route, /mesh identity, /mesh store (feature-gated) 28 P2P tests pass (21 existing + 7 broadcast). All builds clean.
This commit is contained in:
202
crates/quicproquo-p2p/src/store.rs
Normal file
202
crates/quicproquo-p2p/src/store.rs
Normal file
@@ -0,0 +1,202 @@
|
||||
//! In-memory store-and-forward message queue for mesh nodes.
|
||||
//!
|
||||
//! [`MeshStore`] buffers [`MeshEnvelope`]s for offline recipients and
|
||||
//! provides deduplication and automatic garbage collection of expired messages.
|
||||
|
||||
use std::collections::{HashMap, HashSet};
|
||||
|
||||
use crate::envelope::MeshEnvelope;
|
||||
|
||||
/// Default maximum messages stored per recipient.
|
||||
const DEFAULT_MAX_STORED: usize = 1000;
|
||||
|
||||
/// In-memory store-and-forward queue keyed by recipient public key.
|
||||
pub struct MeshStore {
|
||||
/// Recipient public key -> queued envelopes.
|
||||
inbox: HashMap<Vec<u8>, Vec<MeshEnvelope>>,
|
||||
/// Set of envelope IDs already processed (deduplication).
|
||||
seen: HashSet<[u8; 32]>,
|
||||
/// Maximum envelopes held per recipient.
|
||||
max_stored: usize,
|
||||
}
|
||||
|
||||
impl MeshStore {
|
||||
/// Create a new store with the given per-recipient capacity.
|
||||
///
|
||||
/// A `max_stored` of 0 uses [`DEFAULT_MAX_STORED`].
|
||||
pub fn new(max_stored: usize) -> Self {
|
||||
Self {
|
||||
inbox: HashMap::new(),
|
||||
seen: HashSet::new(),
|
||||
max_stored: if max_stored == 0 {
|
||||
DEFAULT_MAX_STORED
|
||||
} else {
|
||||
max_stored
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Store an envelope for later delivery.
|
||||
///
|
||||
/// Returns `false` (without storing) if:
|
||||
/// - the envelope ID has already been seen (dedup), or
|
||||
/// - the recipient's inbox is at capacity.
|
||||
pub fn store(&mut self, envelope: MeshEnvelope) -> bool {
|
||||
if self.seen.contains(&envelope.id) {
|
||||
return false;
|
||||
}
|
||||
let queue = self.inbox.entry(envelope.recipient_key.clone()).or_default();
|
||||
if queue.len() >= self.max_stored {
|
||||
return false;
|
||||
}
|
||||
self.seen.insert(envelope.id);
|
||||
queue.push(envelope);
|
||||
true
|
||||
}
|
||||
|
||||
/// Drain and return all queued messages for `recipient_key`.
|
||||
pub fn fetch(&mut self, recipient_key: &[u8]) -> Vec<MeshEnvelope> {
|
||||
self.inbox.remove(recipient_key).unwrap_or_default()
|
||||
}
|
||||
|
||||
/// Peek at queued messages for `recipient_key` without draining.
|
||||
pub fn peek(&self, recipient_key: &[u8]) -> &[MeshEnvelope] {
|
||||
self.inbox
|
||||
.get(recipient_key)
|
||||
.map(|v| v.as_slice())
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
/// Remove all expired envelopes from every inbox and return the count removed.
|
||||
pub fn gc_expired(&mut self) -> usize {
|
||||
let mut removed = 0;
|
||||
self.inbox.retain(|_key, queue| {
|
||||
let before = queue.len();
|
||||
queue.retain(|env| !env.is_expired());
|
||||
removed += before - queue.len();
|
||||
!queue.is_empty()
|
||||
});
|
||||
removed
|
||||
}
|
||||
|
||||
/// Check whether an envelope ID has already been processed.
|
||||
pub fn seen(&self, id: &[u8; 32]) -> bool {
|
||||
self.seen.contains(id)
|
||||
}
|
||||
|
||||
/// Return `(total_messages, unique_recipients)`.
|
||||
pub fn stats(&self) -> (usize, usize) {
|
||||
let total: usize = self.inbox.values().map(|q| q.len()).sum();
|
||||
let recipients = self.inbox.len();
|
||||
(total, recipients)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::identity::MeshIdentity;
|
||||
|
||||
fn make_envelope(recipient: &[u8], payload: &[u8], ttl: u32) -> MeshEnvelope {
|
||||
let id = MeshIdentity::generate();
|
||||
MeshEnvelope::new(&id, recipient, payload.to_vec(), ttl, 5)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn store_and_fetch() {
|
||||
let mut store = MeshStore::new(10);
|
||||
let recip = [0xAAu8; 32];
|
||||
let env = make_envelope(&recip, b"hello", 3600);
|
||||
|
||||
assert!(store.store(env));
|
||||
assert_eq!(store.stats(), (1, 1));
|
||||
|
||||
let msgs = store.fetch(&recip);
|
||||
assert_eq!(msgs.len(), 1);
|
||||
assert_eq!(msgs[0].payload, b"hello");
|
||||
|
||||
// After fetch, inbox is drained.
|
||||
assert_eq!(store.stats(), (0, 0));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn deduplication() {
|
||||
let mut store = MeshStore::new(10);
|
||||
let recip = [0xBBu8; 32];
|
||||
let env = make_envelope(&recip, b"dup", 3600);
|
||||
let env2 = env.clone();
|
||||
|
||||
assert!(store.store(env));
|
||||
assert!(!store.store(env2), "duplicate should be rejected");
|
||||
assert_eq!(store.stats(), (1, 1));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn capacity_limit() {
|
||||
let mut store = MeshStore::new(2);
|
||||
let recip = [0xCCu8; 32];
|
||||
|
||||
assert!(store.store(make_envelope(&recip, b"1", 3600)));
|
||||
assert!(store.store(make_envelope(&recip, b"2", 3600)));
|
||||
assert!(
|
||||
!store.store(make_envelope(&recip, b"3", 3600)),
|
||||
"should reject when at capacity"
|
||||
);
|
||||
assert_eq!(store.stats(), (2, 1));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn gc_expired_messages() {
|
||||
let mut store = MeshStore::new(10);
|
||||
let recip = [0xDDu8; 32];
|
||||
|
||||
// Create an already-expired envelope (TTL=0, timestamp in the past).
|
||||
let id = MeshIdentity::generate();
|
||||
let mut env = MeshEnvelope::new(&id, &recip, b"old".to_vec(), 0, 5);
|
||||
env.timestamp = 0; // far in the past
|
||||
store.store(env);
|
||||
|
||||
// And a fresh one.
|
||||
store.store(make_envelope(&recip, b"fresh", 3600));
|
||||
|
||||
assert_eq!(store.stats(), (2, 1));
|
||||
let removed = store.gc_expired();
|
||||
assert_eq!(removed, 1);
|
||||
assert_eq!(store.stats(), (1, 1));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn peek_does_not_drain() {
|
||||
let mut store = MeshStore::new(10);
|
||||
let recip = [0xEEu8; 32];
|
||||
store.store(make_envelope(&recip, b"peek", 3600));
|
||||
|
||||
assert_eq!(store.peek(&recip).len(), 1);
|
||||
assert_eq!(store.peek(&recip).len(), 1); // still there
|
||||
assert_eq!(store.stats(), (1, 1));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn seen_tracks_processed_ids() {
|
||||
let mut store = MeshStore::new(10);
|
||||
let env = make_envelope(&[0xFF; 32], b"track", 3600);
|
||||
let id = env.id;
|
||||
|
||||
assert!(!store.seen(&id));
|
||||
store.store(env);
|
||||
assert!(store.seen(&id));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn fetch_empty_inbox() {
|
||||
let mut store = MeshStore::new(10);
|
||||
let msgs = store.fetch(&[0x00; 32]);
|
||||
assert!(msgs.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn peek_empty_inbox() {
|
||||
let store = MeshStore::new(10);
|
||||
assert!(store.peek(&[0x00; 32]).is_empty());
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user