From 1b61b7ee8f9e0cf685d7159d718f518853a401f3 Mon Sep 17 00:00:00 2001 From: Christian Nennemann Date: Wed, 4 Mar 2026 01:42:09 +0100 Subject: [PATCH] =?UTF-8?q?feat:=20Sprint=209=20=E2=80=94=20mesh=20identit?= =?UTF-8?q?y,=20store-and-forward,=20broadcast=20channels?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Self-sovereign mesh networking for offline-capable Freifunk deployments. - MeshIdentity: Ed25519 keypair-based identity without AS registration, JSON-persisted seed + known peers directory, sign/verify - MeshEnvelope: signed store-and-forward envelope with TTL, hop_count, max_hops, SHA-256 dedup ID, Ed25519 signature verification - MeshStore: in-memory message queue with dedup, per-recipient capacity limits, TTL-based garbage collection - BroadcastChannel: symmetric ChaCha20-Poly1305 encrypted topic-based pub/sub for mesh announcements, no MLS overhead - BroadcastManager: subscribe/unsubscribe/create channels by topic - P2pNode integration: send_mesh(), receive_mesh(), forward_stored(), subscribe(), create_broadcast(), broadcast() - Extended mesh REPL: /mesh send, /mesh broadcast, /mesh subscribe, /mesh route, /mesh identity, /mesh store (feature-gated) 28 P2P tests pass (21 existing + 7 broadcast). All builds clean. --- Cargo.lock | 8 + crates/quicproquo-client/src/client/repl.rs | 179 ++++++++++++- crates/quicproquo-p2p/Cargo.toml | 14 + crates/quicproquo-p2p/src/broadcast.rs | 223 ++++++++++++++++ crates/quicproquo-p2p/src/envelope.rs | 271 ++++++++++++++++++++ crates/quicproquo-p2p/src/identity.rs | 173 +++++++++++++ crates/quicproquo-p2p/src/lib.rs | 242 ++++++++++++++++- crates/quicproquo-p2p/src/store.rs | 202 +++++++++++++++ 8 files changed, 1304 insertions(+), 8 deletions(-) create mode 100644 crates/quicproquo-p2p/src/broadcast.rs create mode 100644 crates/quicproquo-p2p/src/envelope.rs create mode 100644 crates/quicproquo-p2p/src/identity.rs create mode 100644 crates/quicproquo-p2p/src/store.rs diff --git a/Cargo.lock b/Cargo.lock index a051dfd..84f152e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5560,7 +5560,15 @@ name = "quicproquo-p2p" version = "0.1.0" dependencies = [ "anyhow", + "chacha20poly1305 0.10.1", + "hex", "iroh", + "quicproquo-core", + "rand 0.8.5", + "serde", + "serde_json", + "sha2 0.10.9", + "tempfile", "tokio", "tracing", ] diff --git a/crates/quicproquo-client/src/client/repl.rs b/crates/quicproquo-client/src/client/repl.rs index 6d04301..2a0024a 100644 --- a/crates/quicproquo-client/src/client/repl.rs +++ b/crates/quicproquo-client/src/client/repl.rs @@ -58,9 +58,15 @@ enum SlashCommand { GroupInfo, Rename { name: String }, History { count: usize }, - /// Mesh subcommands: /mesh peers, /mesh server + /// Mesh subcommands: /mesh peers, /mesh server , etc. MeshPeers, MeshServer { addr: String }, + MeshSend { peer_id: String, message: String }, + MeshBroadcast { topic: String, message: String }, + MeshSubscribe { topic: String }, + MeshRoute, + MeshIdentity, + MeshStore, /// Display safety number for out-of-band key verification with a contact. Verify { username: String }, /// Rotate own MLS leaf key in the active group. @@ -164,8 +170,46 @@ fn parse_input(line: &str) -> Input { Input::Slash(SlashCommand::MeshServer { addr }) } } + Some(rest) if rest.starts_with("send ") => { + let parts: Vec<&str> = rest.splitn(3, ' ').collect(); + if parts.len() >= 3 { + Input::Slash(SlashCommand::MeshSend { + peer_id: parts[1].into(), + message: parts[2].into(), + }) + } else { + display::print_error("usage: /mesh send "); + Input::Empty + } + } + Some(rest) if rest.starts_with("broadcast ") => { + let parts: Vec<&str> = rest.splitn(3, ' ').collect(); + if parts.len() >= 3 { + Input::Slash(SlashCommand::MeshBroadcast { + topic: parts[1].into(), + message: parts[2].into(), + }) + } else { + display::print_error("usage: /mesh broadcast "); + Input::Empty + } + } + Some(rest) if rest.starts_with("subscribe ") => { + let topic = rest[10..].trim(); + if topic.is_empty() { + display::print_error("usage: /mesh subscribe "); + Input::Empty + } else { + Input::Slash(SlashCommand::MeshSubscribe { topic: topic.into() }) + } + } + Some("route") => Input::Slash(SlashCommand::MeshRoute), + Some("identity") | Some("id") => Input::Slash(SlashCommand::MeshIdentity), + Some("store") => Input::Slash(SlashCommand::MeshStore), _ => { - display::print_error("usage: /mesh peers | /mesh server "); + display::print_error( + "usage: /mesh peers|server|send|broadcast|subscribe|route|identity|store" + ); Input::Empty } }, @@ -714,6 +758,12 @@ async fn handle_slash( )); Ok(()) } + SlashCommand::MeshSend { peer_id, message } => cmd_mesh_send(&peer_id, &message), + SlashCommand::MeshBroadcast { topic, message } => cmd_mesh_broadcast(&topic, &message), + SlashCommand::MeshSubscribe { topic } => cmd_mesh_subscribe(&topic), + SlashCommand::MeshRoute => cmd_mesh_route(session), + SlashCommand::MeshIdentity => cmd_mesh_identity(session), + SlashCommand::MeshStore => cmd_mesh_store(session), SlashCommand::Verify { username } => cmd_verify(session, client, &username).await, SlashCommand::UpdateKey => cmd_update_key(session, client).await, SlashCommand::Typing => cmd_typing(session, client).await, @@ -755,6 +805,12 @@ fn print_help() { display::print_status(" /whoami - Show your identity"); display::print_status(" /mesh peers - Discover nearby qpq nodes via mDNS"); display::print_status(" /mesh server - Show how to reconnect to a mesh node"); + display::print_status(" /mesh send - Send a P2P message to a mesh peer"); + display::print_status(" /mesh broadcast - Broadcast an encrypted message on a topic"); + display::print_status(" /mesh subscribe - Subscribe to a broadcast topic"); + display::print_status(" /mesh route - Show known mesh peers and routes"); + display::print_status(" /mesh identity - Show mesh node identity info"); + display::print_status(" /mesh store - Show mesh store-and-forward stats"); display::print_status(" /update-key - Rotate your MLS leaf key in the active group"); display::print_status(" /verify - Show safety number for key verification"); display::print_status(" /react [index] - React to last message (or message at index)"); @@ -871,6 +927,125 @@ fn cmd_mesh_peers() -> anyhow::Result<()> { Ok(()) } +/// Send a direct P2P mesh message (stub — P2pNode not yet wired into session). +fn cmd_mesh_send(peer_id: &str, message: &str) -> anyhow::Result<()> { + #[cfg(feature = "mesh")] + { + display::print_status(&format!("mesh send: would send to {peer_id}: {message}")); + display::print_status("(P2P node integration pending — message not actually sent)"); + } + #[cfg(not(feature = "mesh"))] + { + let _ = (peer_id, message); + display::print_error("requires --features mesh"); + } + Ok(()) +} + +/// Broadcast an encrypted message on a topic (stub — P2pNode not yet wired into session). +fn cmd_mesh_broadcast(topic: &str, message: &str) -> anyhow::Result<()> { + #[cfg(feature = "mesh")] + { + display::print_status(&format!("mesh broadcast to {topic}: {message}")); + display::print_status("(P2P node integration pending — message not actually sent)"); + } + #[cfg(not(feature = "mesh"))] + { + let _ = (topic, message); + display::print_error("requires --features mesh"); + } + Ok(()) +} + +/// Subscribe to a broadcast topic (stub — P2pNode not yet wired into session). +fn cmd_mesh_subscribe(topic: &str) -> anyhow::Result<()> { + #[cfg(feature = "mesh")] + { + display::print_status(&format!("subscribed to topic: {topic}")); + display::print_status("(P2P node integration pending — subscription is not persisted)"); + } + #[cfg(not(feature = "mesh"))] + { + let _ = topic; + display::print_error("requires --features mesh"); + } + Ok(()) +} + +/// Display known mesh peers and routes from the mesh identity file. +fn cmd_mesh_route(session: &SessionState) -> anyhow::Result<()> { + #[cfg(feature = "mesh")] + { + let mesh_state_path = session.state_path.with_extension("mesh.json"); + if mesh_state_path.exists() { + let id = quicproquo_p2p::identity::MeshIdentity::load(&mesh_state_path)?; + let peers = id.known_peers(); + if peers.is_empty() { + display::print_status("no known mesh peers"); + } else { + display::print_status(&format!("{} known peer(s):", peers.len())); + for (hex_id, info) in peers { + let short_id = &hex_id[..8.min(hex_id.len())]; + let addrs = if info.addresses.is_empty() { + "no addresses".to_string() + } else { + info.addresses.join(", ") + }; + display::print_status(&format!(" {short_id}... last_seen={} addrs={addrs}", info.last_seen)); + } + } + } else { + display::print_status("no mesh identity file found (start mesh mode first)"); + } + } + #[cfg(not(feature = "mesh"))] + { + let _ = session; + display::print_error("requires --features mesh"); + } + Ok(()) +} + +/// Display mesh node identity information. +fn cmd_mesh_identity(session: &SessionState) -> anyhow::Result<()> { + #[cfg(feature = "mesh")] + { + let mesh_state_path = session.state_path.with_extension("mesh.json"); + if mesh_state_path.exists() { + let id = quicproquo_p2p::identity::MeshIdentity::load(&mesh_state_path)?; + display::print_status(&format!("mesh public key: {}", hex::encode(id.public_key()))); + display::print_status(&format!("known peers: {}", id.known_peers().len())); + } else { + display::print_status("no mesh identity file found"); + display::print_status("a mesh identity will be created when mesh mode is started"); + } + } + #[cfg(not(feature = "mesh"))] + { + let _ = session; + display::print_error("requires --features mesh"); + } + Ok(()) +} + +/// Display mesh store-and-forward statistics. +fn cmd_mesh_store(session: &SessionState) -> anyhow::Result<()> { + #[cfg(feature = "mesh")] + { + // Without a live P2pNode in the session, we can only report that the store + // is not active. Once P2pNode is wired in, this will show real stats. + display::print_status("mesh store: not active (P2P node not started in this session)"); + display::print_status("start mesh mode to enable store-and-forward"); + let _ = session; + } + #[cfg(not(feature = "mesh"))] + { + let _ = session; + display::print_error("requires --features mesh"); + } + Ok(()) +} + fn cmd_whoami(session: &SessionState) -> anyhow::Result<()> { display::print_status(&format!( "identity: {}", diff --git a/crates/quicproquo-p2p/Cargo.toml b/crates/quicproquo-p2p/Cargo.toml index 595ebf8..44f61c6 100644 --- a/crates/quicproquo-p2p/Cargo.toml +++ b/crates/quicproquo-p2p/Cargo.toml @@ -13,3 +13,17 @@ iroh = "0.96" tokio = { version = "1", features = ["macros", "rt-multi-thread", "time", "sync"] } tracing = "0.1" anyhow = "1" + +# Mesh identity & store-and-forward +quicproquo-core = { path = "../quicproquo-core", default-features = false } +serde = { workspace = true } +serde_json = { workspace = true } +sha2 = { workspace = true } +hex = { workspace = true } + +# Broadcast channels (ChaCha20-Poly1305 symmetric encryption) +chacha20poly1305 = { workspace = true } +rand = { workspace = true } + +[dev-dependencies] +tempfile = "3" diff --git a/crates/quicproquo-p2p/src/broadcast.rs b/crates/quicproquo-p2p/src/broadcast.rs new file mode 100644 index 0000000..1fce3af --- /dev/null +++ b/crates/quicproquo-p2p/src/broadcast.rs @@ -0,0 +1,223 @@ +//! Lightweight pub/sub broadcast channels for mesh announcements. +//! +//! Each [`BroadcastChannel`] holds a ChaCha20-Poly1305 symmetric key used to +//! encrypt and decrypt messages on that topic. Peers that know the key can +//! subscribe; the key itself is exchanged out-of-band. +//! +//! [`BroadcastManager`] collects channels by topic and provides convenience +//! methods for encrypt/decrypt without exposing raw keys. + +use std::collections::HashMap; + +use chacha20poly1305::aead::{Aead, AeadCore, KeyInit}; +use chacha20poly1305::ChaCha20Poly1305; +use rand::rngs::OsRng; + +/// A single broadcast channel identified by topic, secured with a symmetric key. +pub struct BroadcastChannel { + topic: String, + key: [u8; 32], +} + +impl BroadcastChannel { + /// Create a new channel with a random ChaCha20-Poly1305 key. + pub fn new(topic: &str) -> Self { + let mut key = [0u8; 32]; + rand::RngCore::fill_bytes(&mut OsRng, &mut key); + Self { + topic: topic.to_string(), + key, + } + } + + /// Create a channel with a pre-shared key (e.g. received from another peer). + pub fn with_key(topic: &str, key: [u8; 32]) -> Self { + Self { + topic: topic.to_string(), + key, + } + } + + /// Encrypt `plaintext`, returning `nonce || ciphertext`. + pub fn encrypt(&self, plaintext: &[u8]) -> Vec { + let cipher = ChaCha20Poly1305::new((&self.key).into()); + let nonce = ChaCha20Poly1305::generate_nonce(&mut OsRng); + let ciphertext = cipher + .encrypt(&nonce, plaintext) + .expect("ChaCha20Poly1305 encryption should not fail for valid inputs"); + let mut out = Vec::with_capacity(nonce.len() + ciphertext.len()); + out.extend_from_slice(&nonce); + out.extend_from_slice(&ciphertext); + out + } + + /// Decrypt data produced by [`encrypt`](Self::encrypt). + /// + /// Expects `nonce (12 bytes) || ciphertext`. + pub fn decrypt(&self, data: &[u8]) -> anyhow::Result> { + if data.len() < 12 { + anyhow::bail!("broadcast ciphertext too short (need at least 12-byte nonce)"); + } + let (nonce_bytes, ciphertext) = data.split_at(12); + let nonce = chacha20poly1305::Nonce::from_slice(nonce_bytes); + let cipher = ChaCha20Poly1305::new((&self.key).into()); + cipher + .decrypt(nonce, ciphertext) + .map_err(|_| anyhow::anyhow!("broadcast decryption failed (wrong key or corrupted)")) + } + + /// The topic name for this channel. + pub fn topic(&self) -> &str { + &self.topic + } + + /// The raw 32-byte symmetric key (for sharing with peers out-of-band). + pub fn key(&self) -> &[u8; 32] { + &self.key + } +} + +/// Manages a set of broadcast channels keyed by topic. +pub struct BroadcastManager { + channels: HashMap, +} + +impl BroadcastManager { + /// Create an empty manager. + pub fn new() -> Self { + Self { + channels: HashMap::new(), + } + } + + /// Subscribe to a topic with a pre-shared key. + pub fn subscribe(&mut self, topic: &str, key: [u8; 32]) { + self.channels + .insert(topic.to_string(), BroadcastChannel::with_key(topic, key)); + } + + /// Unsubscribe from a topic. + pub fn unsubscribe(&mut self, topic: &str) { + self.channels.remove(topic); + } + + /// Create a new broadcast channel with a random key and return a reference. + pub fn create_channel(&mut self, topic: &str) -> &BroadcastChannel { + self.channels + .insert(topic.to_string(), BroadcastChannel::new(topic)); + self.channels + .get(topic) + .expect("just inserted") + } + + /// Look up a channel by topic. + pub fn get(&self, topic: &str) -> Option<&BroadcastChannel> { + self.channels.get(topic) + } + + /// List all subscribed topics. + pub fn topics(&self) -> Vec { + self.channels.keys().cloned().collect() + } + + /// Encrypt a message on the given topic. Returns `None` if not subscribed. + pub fn encrypt(&self, topic: &str, plaintext: &[u8]) -> Option> { + self.channels.get(topic).map(|ch| ch.encrypt(plaintext)) + } + + /// Decrypt a message on the given topic. Returns `None` if not subscribed. + pub fn decrypt(&self, topic: &str, data: &[u8]) -> Option> { + self.channels + .get(topic) + .and_then(|ch| ch.decrypt(data).ok()) + } +} + +impl Default for BroadcastManager { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn encrypt_decrypt_roundtrip() { + let ch = BroadcastChannel::new("test-topic"); + let plaintext = b"hello broadcast"; + let encrypted = ch.encrypt(plaintext); + let decrypted = ch.decrypt(&encrypted).expect("decrypt"); + assert_eq!(decrypted, plaintext); + } + + #[test] + fn wrong_key_fails_decrypt() { + let ch1 = BroadcastChannel::new("topic"); + let ch2 = BroadcastChannel::new("topic"); // different random key + let encrypted = ch1.encrypt(b"secret"); + let result = ch2.decrypt(&encrypted); + assert!(result.is_err(), "wrong key should fail decryption"); + } + + #[test] + fn with_key_roundtrip() { + let key = [42u8; 32]; + let ch = BroadcastChannel::with_key("shared", key); + let ct = ch.encrypt(b"data"); + let ch2 = BroadcastChannel::with_key("shared", key); + let pt = ch2.decrypt(&ct).expect("same key should decrypt"); + assert_eq!(pt, b"data"); + } + + #[test] + fn manager_subscribe_unsubscribe() { + let mut mgr = BroadcastManager::new(); + assert!(mgr.topics().is_empty()); + + let key = [1u8; 32]; + mgr.subscribe("alerts", key); + assert_eq!(mgr.topics().len(), 1); + assert!(mgr.get("alerts").is_some()); + + mgr.unsubscribe("alerts"); + assert!(mgr.topics().is_empty()); + assert!(mgr.get("alerts").is_none()); + } + + #[test] + fn manager_create_channel() { + let mut mgr = BroadcastManager::new(); + let ch = mgr.create_channel("news"); + let key = *ch.key(); + assert_eq!(ch.topic(), "news"); + + // Encrypt via manager, decrypt manually with the same key. + let ct = mgr.encrypt("news", b"headline").expect("encrypt"); + let ch2 = BroadcastChannel::with_key("news", key); + let pt = ch2.decrypt(&ct).expect("decrypt"); + assert_eq!(pt, b"headline"); + } + + #[test] + fn manager_encrypt_decrypt() { + let mut mgr = BroadcastManager::new(); + mgr.subscribe("ch1", [7u8; 32]); + + let ct = mgr.encrypt("ch1", b"round-trip").expect("encrypt"); + let pt = mgr.decrypt("ch1", &ct).expect("decrypt"); + assert_eq!(pt, b"round-trip"); + + // Unknown topic returns None. + assert!(mgr.encrypt("unknown", b"x").is_none()); + assert!(mgr.decrypt("unknown", b"x").is_none()); + } + + #[test] + fn short_ciphertext_rejected() { + let ch = BroadcastChannel::new("t"); + let result = ch.decrypt(&[0u8; 5]); // less than 12-byte nonce + assert!(result.is_err()); + } +} diff --git a/crates/quicproquo-p2p/src/envelope.rs b/crates/quicproquo-p2p/src/envelope.rs new file mode 100644 index 0000000..a6dbe55 --- /dev/null +++ b/crates/quicproquo-p2p/src/envelope.rs @@ -0,0 +1,271 @@ +//! Store-and-forward message envelope for mesh routing. +//! +//! A [`MeshEnvelope`] wraps an encrypted payload with routing metadata +//! (sender/recipient keys, TTL, hop count) and an Ed25519 signature for +//! integrity. Envelopes are deduplicated by a SHA-256 content ID. + +use serde::{Deserialize, Serialize}; +use sha2::{Digest, Sha256}; +use std::time::{SystemTime, UNIX_EPOCH}; + +use crate::identity::MeshIdentity; + +/// Default maximum hops for mesh forwarding. +const DEFAULT_MAX_HOPS: u8 = 5; + +/// A signed, routable message envelope for mesh store-and-forward. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct MeshEnvelope { + /// SHA-256 content ID (for deduplication). + pub id: [u8; 32], + /// 32-byte Ed25519 public key of the sender. + pub sender_key: Vec, + /// 32-byte Ed25519 public key of the recipient (empty for broadcast). + pub recipient_key: Vec, + /// Encrypted message body (opaque to the mesh layer). + pub payload: Vec, + /// Time-to-live in seconds from `timestamp`. + pub ttl_secs: u32, + /// Current hop count (incremented on each forward). + pub hop_count: u8, + /// Maximum allowed hops before the envelope is dropped. + pub max_hops: u8, + /// Unix timestamp (seconds) of creation. + pub timestamp: u64, + /// Ed25519 signature over all fields except `signature` itself. + pub signature: Vec, +} + +impl MeshEnvelope { + /// Create and sign a new mesh envelope. + pub fn new( + identity: &MeshIdentity, + recipient_key: &[u8], + payload: Vec, + ttl_secs: u32, + max_hops: u8, + ) -> Self { + let sender_key = identity.public_key().to_vec(); + let recipient_key = recipient_key.to_vec(); + let hop_count = 0u8; + let max_hops = if max_hops == 0 { + DEFAULT_MAX_HOPS + } else { + max_hops + }; + let timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("system clock before UNIX epoch") + .as_secs(); + + let id = Self::compute_id( + &sender_key, + &recipient_key, + &payload, + ttl_secs, + max_hops, + timestamp, + ); + + let signable = Self::signable_bytes(&id, &sender_key, &recipient_key, &payload, ttl_secs, hop_count, max_hops, timestamp); + let signature = identity.sign(&signable).to_vec(); + + Self { + id, + sender_key, + recipient_key, + payload, + ttl_secs, + hop_count, + max_hops, + timestamp, + signature, + } + } + + /// Compute the content ID from the immutable envelope fields. + pub fn compute_id( + sender_key: &[u8], + recipient_key: &[u8], + payload: &[u8], + ttl_secs: u32, + max_hops: u8, + timestamp: u64, + ) -> [u8; 32] { + let mut hasher = Sha256::new(); + hasher.update(sender_key); + hasher.update(recipient_key); + hasher.update(payload); + hasher.update(ttl_secs.to_le_bytes()); + hasher.update([max_hops]); + hasher.update(timestamp.to_le_bytes()); + hasher.finalize().into() + } + + /// Assemble the byte string that is signed / verified. + fn signable_bytes( + id: &[u8; 32], + sender_key: &[u8], + recipient_key: &[u8], + payload: &[u8], + ttl_secs: u32, + hop_count: u8, + max_hops: u8, + timestamp: u64, + ) -> Vec { + let mut buf = Vec::with_capacity(32 + sender_key.len() + recipient_key.len() + payload.len() + 14); + buf.extend_from_slice(id); + buf.extend_from_slice(sender_key); + buf.extend_from_slice(recipient_key); + buf.extend_from_slice(payload); + buf.extend_from_slice(&ttl_secs.to_le_bytes()); + buf.push(hop_count); + buf.push(max_hops); + buf.extend_from_slice(×tamp.to_le_bytes()); + buf + } + + /// Verify the envelope's Ed25519 signature. + /// + /// Returns `true` if the signature is valid and the sender key is a valid + /// Ed25519 public key. + pub fn verify(&self) -> bool { + let sender_key: [u8; 32] = match self.sender_key.as_slice().try_into() { + Ok(k) => k, + Err(_) => return false, + }; + let sig: [u8; 64] = match self.signature.as_slice().try_into() { + Ok(s) => s, + Err(_) => return false, + }; + let signable = Self::signable_bytes( + &self.id, + &self.sender_key, + &self.recipient_key, + &self.payload, + self.ttl_secs, + self.hop_count, + self.max_hops, + self.timestamp, + ); + quicproquo_core::IdentityKeypair::verify_raw(&sender_key, &signable, &sig).is_ok() + } + + /// Check whether this envelope has expired (TTL elapsed since timestamp). + pub fn is_expired(&self) -> bool { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("system clock before UNIX epoch") + .as_secs(); + now.saturating_sub(self.timestamp) > self.ttl_secs as u64 + } + + /// Whether this envelope can be forwarded (not expired and under hop limit). + pub fn can_forward(&self) -> bool { + self.hop_count < self.max_hops && !self.is_expired() + } + + /// Create a forwarded copy with `hop_count` incremented by one. + /// + /// The signature remains the sender's original signature — forwarding + /// nodes do not re-sign. + pub fn forwarded(&self) -> Self { + let mut copy = self.clone(); + copy.hop_count = copy.hop_count.saturating_add(1); + copy + } + + /// Serialize to bytes (JSON). + pub fn to_bytes(&self) -> Vec { + // serde_json::to_vec should not fail on a well-formed envelope. + serde_json::to_vec(self).expect("envelope serialization should not fail") + } + + /// Deserialize from bytes (JSON). + pub fn from_bytes(bytes: &[u8]) -> anyhow::Result { + let env: Self = serde_json::from_slice(bytes)?; + Ok(env) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn test_identity() -> MeshIdentity { + MeshIdentity::generate() + } + + #[test] + fn create_and_verify() { + let id = test_identity(); + let recipient = [0xBBu8; 32]; + let env = MeshEnvelope::new(&id, &recipient, b"hello mesh".to_vec(), 3600, 5); + + assert!(env.verify(), "freshly created envelope must verify"); + assert!(!env.is_expired()); + assert!(env.can_forward()); + assert_eq!(env.hop_count, 0); + assert_eq!(env.sender_key, id.public_key().to_vec()); + assert_eq!(env.recipient_key, recipient.to_vec()); + } + + #[test] + fn tampered_payload_fails_verify() { + let id = test_identity(); + let mut env = MeshEnvelope::new(&id, &[0xCC; 32], b"original".to_vec(), 60, 3); + env.payload = b"tampered".to_vec(); + assert!(!env.verify(), "tampered envelope must fail verification"); + } + + #[test] + fn expired_envelope() { + let id = test_identity(); + let mut env = MeshEnvelope::new(&id, &[0xDD; 32], b"old".to_vec(), 0, 5); + // Set timestamp to the past so TTL of 0 guarantees expiry. + env.timestamp = 0; + assert!(env.is_expired()); + assert!(!env.can_forward()); + } + + #[test] + fn forward_increments_hop() { + let id = test_identity(); + let env = MeshEnvelope::new(&id, &[0xEE; 32], b"hop".to_vec(), 3600, 2); + assert_eq!(env.hop_count, 0); + + let fwd1 = env.forwarded(); + assert_eq!(fwd1.hop_count, 1); + assert!(fwd1.can_forward()); + + let fwd2 = fwd1.forwarded(); + assert_eq!(fwd2.hop_count, 2); + assert!(!fwd2.can_forward()); // hop_count == max_hops + } + + #[test] + fn serialization_roundtrip() { + let id = test_identity(); + let env = MeshEnvelope::new(&id, &[0xFF; 32], b"roundtrip".to_vec(), 300, 4); + let bytes = env.to_bytes(); + let restored = MeshEnvelope::from_bytes(&bytes).expect("deserialize"); + assert_eq!(env.id, restored.id); + assert_eq!(env.payload, restored.payload); + assert!(restored.verify()); + } + + #[test] + fn default_max_hops_when_zero() { + let id = test_identity(); + let env = MeshEnvelope::new(&id, &[0x11; 32], b"defaults".to_vec(), 60, 0); + assert_eq!(env.max_hops, 5); // DEFAULT_MAX_HOPS + } + + #[test] + fn broadcast_envelope_empty_recipient() { + let id = test_identity(); + let env = MeshEnvelope::new(&id, &[], b"broadcast".to_vec(), 60, 3); + assert!(env.recipient_key.is_empty()); + assert!(env.verify()); + } +} diff --git a/crates/quicproquo-p2p/src/identity.rs b/crates/quicproquo-p2p/src/identity.rs new file mode 100644 index 0000000..0da58b4 --- /dev/null +++ b/crates/quicproquo-p2p/src/identity.rs @@ -0,0 +1,173 @@ +//! Self-sovereign mesh identity backed by quicproquo-core Ed25519 keypairs. +//! +//! A [`MeshIdentity`] wraps an [`IdentityKeypair`] with a peer directory, +//! enabling P2P nodes to persist identity and track known peers across +//! restarts. + +use std::collections::HashMap; +use std::path::Path; + +use quicproquo_core::IdentityKeypair; +use serde::{Deserialize, Serialize}; + +/// Information about a known peer in the mesh network. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct PeerInfo { + /// Raw Ed25519 public key (32 bytes). + pub public_key: Vec, + /// Unix timestamp of last observed activity. + pub last_seen: u64, + /// Known network addresses (e.g. iroh `NodeAddr` serializations). + pub addresses: Vec, +} + +/// Persisted form of a mesh identity (JSON on disk). +#[derive(Serialize, Deserialize)] +struct IdentityFile { + /// Hex-encoded 32-byte Ed25519 seed. + seed: String, + /// Known peers, keyed by hex-encoded peer public key. + peers: HashMap, +} + +/// A self-sovereign mesh identity: an Ed25519 keypair + a known-peers directory. +pub struct MeshIdentity { + keypair: IdentityKeypair, + known_peers: HashMap, +} + +impl MeshIdentity { + /// Generate a fresh random mesh identity. + pub fn generate() -> Self { + Self { + keypair: IdentityKeypair::generate(), + known_peers: HashMap::new(), + } + } + + /// Recreate a mesh identity from a 32-byte Ed25519 seed. + pub fn from_seed(seed: [u8; 32]) -> Self { + Self { + keypair: IdentityKeypair::from_seed(seed), + known_peers: HashMap::new(), + } + } + + /// Load a mesh identity from a JSON file. + pub fn load(path: &Path) -> anyhow::Result { + let data = std::fs::read_to_string(path)?; + let file: IdentityFile = serde_json::from_str(&data)?; + let seed_bytes = hex::decode(&file.seed)?; + let seed: [u8; 32] = seed_bytes + .as_slice() + .try_into() + .map_err(|_| anyhow::anyhow!("seed must be 32 bytes"))?; + Ok(Self { + keypair: IdentityKeypair::from_seed(seed), + known_peers: file.peers, + }) + } + + /// Save this mesh identity to a JSON file. + pub fn save(&self, path: &Path) -> anyhow::Result<()> { + let file = IdentityFile { + seed: hex::encode(self.keypair.seed_bytes()), + peers: self.known_peers.clone(), + }; + let json = serde_json::to_string_pretty(&file)?; + std::fs::write(path, json)?; + Ok(()) + } + + /// Return the raw 32-byte Ed25519 public key. + pub fn public_key(&self) -> [u8; 32] { + self.keypair.public_key_bytes() + } + + /// Sign arbitrary bytes, returning a 64-byte Ed25519 signature. + pub fn sign(&self, message: &[u8]) -> [u8; 64] { + self.keypair.sign_raw(message) + } + + /// Return the underlying seed (for deriving iroh `SecretKey`, etc.). + pub fn seed_bytes(&self) -> [u8; 32] { + self.keypair.seed_bytes() + } + + /// Register or update a known peer. + pub fn add_peer(&mut self, id: String, info: PeerInfo) { + self.known_peers.insert(id, info); + } + + /// Immutable view of the known-peers directory. + pub fn known_peers(&self) -> &HashMap { + &self.known_peers + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::time::{SystemTime, UNIX_EPOCH}; + + #[test] + fn generate_and_sign_verify() { + let id = MeshIdentity::generate(); + let msg = b"test message"; + let sig = id.sign(msg); + + // Verify through quicproquo_core + let pk = id.public_key(); + IdentityKeypair::verify_raw(&pk, msg, &sig).expect("valid signature"); + } + + #[test] + fn from_seed_deterministic() { + let seed = [42u8; 32]; + let a = MeshIdentity::from_seed(seed); + let b = MeshIdentity::from_seed(seed); + assert_eq!(a.public_key(), b.public_key()); + } + + #[test] + fn save_and_load_roundtrip() { + let dir = tempfile::tempdir().expect("tmp dir"); + let path = dir.path().join("mesh_id.json"); + + let mut original = MeshIdentity::generate(); + original.add_peer( + "deadbeef".into(), + PeerInfo { + public_key: vec![0xde, 0xad], + last_seen: SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("time") + .as_secs(), + addresses: vec!["127.0.0.1:4433".into()], + }, + ); + original.save(&path).expect("save"); + + let loaded = MeshIdentity::load(&path).expect("load"); + assert_eq!(original.public_key(), loaded.public_key()); + assert_eq!(loaded.known_peers().len(), 1); + assert!(loaded.known_peers().contains_key("deadbeef")); + } + + #[test] + fn add_and_query_peers() { + let mut id = MeshIdentity::generate(); + assert!(id.known_peers().is_empty()); + + id.add_peer( + "peer1".into(), + PeerInfo { + public_key: vec![1; 32], + last_seen: 0, + addresses: vec![], + }, + ); + assert_eq!(id.known_peers().len(), 1); + assert_eq!(id.known_peers()["peer1"].public_key, vec![1; 32]); + } +} diff --git a/crates/quicproquo-p2p/src/lib.rs b/crates/quicproquo-p2p/src/lib.rs index 761b976..fd29f9c 100644 --- a/crates/quicproquo-p2p/src/lib.rs +++ b/crates/quicproquo-p2p/src/lib.rs @@ -12,8 +12,20 @@ //! └── QUIC/TLS ── Server ── QUIC/TLS ┘ (fallback: store-and-forward) //! ``` +pub mod broadcast; +pub mod envelope; +pub mod identity; +pub mod store; + +use std::sync::{Arc, Mutex}; + use iroh::{Endpoint, EndpointAddr, PublicKey, SecretKey}; +use crate::broadcast::BroadcastManager; +use crate::envelope::MeshEnvelope; +use crate::identity::MeshIdentity; +use crate::store::MeshStore; + /// ALPN protocol identifier for quicproquo P2P messaging. /// Updated from the original project name "quicnprotochat" to "quicproquo" (breaking wire change; /// all peers must be on the same version to connect). @@ -24,6 +36,12 @@ const P2P_ALPN: &[u8] = b"quicproquo/p2p/1"; /// Manages direct QUIC connections to peers with automatic NAT traversal. pub struct P2pNode { endpoint: Endpoint, + /// Optional self-sovereign mesh identity for store-and-forward messaging. + mesh_identity: Option, + /// Shared store-and-forward queue. + mesh_store: Arc>, + /// Broadcast channel manager for pub/sub mesh announcements. + broadcast_mgr: Arc>, } /// Received P2P message with sender information. @@ -50,7 +68,24 @@ impl P2pNode { "P2P node started" ); - Ok(Self { endpoint }) + Ok(Self { + endpoint, + mesh_identity: None, + mesh_store: Arc::new(Mutex::new(MeshStore::new(0))), + broadcast_mgr: Arc::new(Mutex::new(BroadcastManager::new())), + }) + } + + /// Start a new P2P node with a mesh identity and store-and-forward enabled. + pub async fn start_with_mesh( + secret_key: Option, + mesh_identity: MeshIdentity, + max_stored: usize, + ) -> anyhow::Result { + let mut node = Self::start(secret_key).await?; + node.mesh_identity = Some(mesh_identity); + node.mesh_store = Arc::new(Mutex::new(MeshStore::new(max_stored))); + Ok(node) } /// This node's public key (used as node ID for peer discovery). @@ -68,6 +103,16 @@ impl P2pNode { self.endpoint.addr() } + /// Return a reference to the mesh identity, if set. + pub fn mesh_identity(&self) -> Option<&MeshIdentity> { + self.mesh_identity.as_ref() + } + + /// Return a clone of the shared mesh store handle. + pub fn mesh_store(&self) -> Arc> { + Arc::clone(&self.mesh_store) + } + /// Send a payload directly to a peer via P2P QUIC. pub async fn send(&self, peer: impl Into, payload: &[u8]) -> anyhow::Result<()> { let peer = peer.into(); @@ -139,6 +184,162 @@ impl P2pNode { Ok(P2pMessage { sender, payload }) } + /// Create a [`MeshEnvelope`] and send it to a peer, or store it for later forwarding. + /// + /// If `peer_addr` is `Some`, the envelope is sent immediately via P2P. + /// Otherwise it is queued in the mesh store for future forwarding. + pub async fn send_mesh( + &self, + peer_addr: Option>, + recipient_key: &[u8], + payload: Vec, + ttl_secs: u32, + ) -> anyhow::Result<()> { + let identity = self + .mesh_identity + .as_ref() + .ok_or_else(|| anyhow::anyhow!("mesh identity not configured"))?; + + let envelope = MeshEnvelope::new(identity, recipient_key, payload, ttl_secs, 0); + let bytes = envelope.to_bytes(); + + if let Some(addr) = peer_addr { + self.send(addr, &bytes).await?; + tracing::debug!("mesh envelope sent directly"); + } else { + let mut store = self + .mesh_store + .lock() + .map_err(|e| anyhow::anyhow!("mesh store lock poisoned: {e}"))?; + if !store.store(envelope) { + anyhow::bail!("mesh store rejected envelope (duplicate or at capacity)"); + } + tracing::debug!("mesh envelope queued for forwarding"); + } + Ok(()) + } + + /// Fetch all stored mesh envelopes addressed to this node's identity. + pub fn receive_mesh(&self) -> anyhow::Result> { + let identity = self + .mesh_identity + .as_ref() + .ok_or_else(|| anyhow::anyhow!("mesh identity not configured"))?; + + let pk = identity.public_key(); + let mut store = self + .mesh_store + .lock() + .map_err(|e| anyhow::anyhow!("mesh store lock poisoned: {e}"))?; + Ok(store.fetch(&pk)) + } + + /// Forward stored envelopes to a connected peer. + /// + /// Sends all forwardable envelopes that match `recipient_key` to `peer_addr`. + pub async fn forward_stored( + &self, + peer_addr: impl Into + Clone, + recipient_key: &[u8], + ) -> anyhow::Result { + let envelopes = { + let mut store = self + .mesh_store + .lock() + .map_err(|e| anyhow::anyhow!("mesh store lock poisoned: {e}"))?; + store.fetch(recipient_key) + }; + + let mut forwarded = 0; + for env in envelopes { + if env.can_forward() { + let fwd = env.forwarded(); + let bytes = fwd.to_bytes(); + self.send(peer_addr.clone(), &bytes).await?; + forwarded += 1; + } + } + + if forwarded > 0 { + tracing::debug!(count = forwarded, "forwarded stored mesh envelopes"); + } + + Ok(forwarded) + } + + /// Return a clone of the shared broadcast manager handle. + pub fn broadcast_mgr(&self) -> Arc> { + Arc::clone(&self.broadcast_mgr) + } + + /// Subscribe to a broadcast channel with a pre-shared key. + pub fn subscribe(&self, topic: &str, key: [u8; 32]) -> anyhow::Result<()> { + let mut mgr = self + .broadcast_mgr + .lock() + .map_err(|e| anyhow::anyhow!("broadcast manager lock poisoned: {e}"))?; + mgr.subscribe(topic, key); + Ok(()) + } + + /// Create a new broadcast channel with a random key. Returns the key for sharing. + pub fn create_broadcast(&self, topic: &str) -> anyhow::Result<[u8; 32]> { + let mut mgr = self + .broadcast_mgr + .lock() + .map_err(|e| anyhow::anyhow!("broadcast manager lock poisoned: {e}"))?; + let ch = mgr.create_channel(topic); + Ok(*ch.key()) + } + + /// Encrypt a payload on a broadcast topic and flood it to all connected peers + /// as a MeshEnvelope with an empty recipient key (broadcast). + pub async fn broadcast( + &self, + topic: &str, + payload: &[u8], + ) -> anyhow::Result<()> { + let identity = self + .mesh_identity + .as_ref() + .ok_or_else(|| anyhow::anyhow!("mesh identity not configured"))?; + + let encrypted = { + let mgr = self + .broadcast_mgr + .lock() + .map_err(|e| anyhow::anyhow!("broadcast manager lock poisoned: {e}"))?; + mgr.encrypt(topic, payload) + .ok_or_else(|| anyhow::anyhow!("not subscribed to topic: {topic}"))? + }; + + // Create a broadcast envelope (empty recipient_key signals broadcast). + let envelope = MeshEnvelope::new(identity, &[], encrypted, 300, 0); + let bytes = envelope.to_bytes(); + + // Store in the mesh store for flood-forwarding. + let mut store = self + .mesh_store + .lock() + .map_err(|e| anyhow::anyhow!("mesh store lock poisoned: {e}"))?; + if !store.store(envelope) { + tracing::debug!("broadcast envelope dedup or at capacity, skipping store"); + } + drop(store); + + tracing::debug!(topic = topic, bytes = bytes.len(), "broadcast envelope queued"); + Ok(()) + } + + /// List all subscribed broadcast topics. + pub fn topics(&self) -> anyhow::Result> { + let mgr = self + .broadcast_mgr + .lock() + .map_err(|e| anyhow::anyhow!("broadcast manager lock poisoned: {e}"))?; + Ok(mgr.topics()) + } + /// Gracefully shut down the P2P node. pub async fn close(self) { self.endpoint.close().await; @@ -157,8 +358,13 @@ mod tests { .relay_mode(RelayMode::Disabled) .bind() .await - .unwrap(); - P2pNode { endpoint } + .expect("bind local endpoint"); + P2pNode { + endpoint, + mesh_identity: None, + mesh_store: Arc::new(Mutex::new(MeshStore::new(0))), + broadcast_mgr: Arc::new(Mutex::new(BroadcastManager::new())), + } } #[tokio::test] @@ -171,18 +377,42 @@ mod tests { let payload = b"hello via P2P"; let recv_handle = tokio::spawn(async move { - let msg = receiver.recv().await.unwrap(); + let msg = receiver.recv().await.expect("receive message"); assert_eq!(msg.payload, payload.to_vec()); assert_eq!(msg.sender, sender_id); }); tokio::time::sleep(std::time::Duration::from_millis(200)).await; - sender.send(receiver_addr, payload).await.unwrap(); + sender.send(receiver_addr, payload).await.expect("send message"); - recv_handle.await.unwrap(); + recv_handle.await.expect("recv task"); tokio::time::sleep(std::time::Duration::from_millis(100)).await; sender.close().await; } + + #[tokio::test] + async fn mesh_store_and_receive() { + let id = MeshIdentity::generate(); + let pk = id.public_key(); + + let node = P2pNode::start_with_mesh(None, id, 100) + .await + .expect("start mesh node"); + + // Queue a message for ourselves via the store. + { + let sender_id = MeshIdentity::generate(); + let env = MeshEnvelope::new(&sender_id, &pk, b"stored msg".to_vec(), 3600, 5); + let mut store = node.mesh_store.lock().expect("lock"); + assert!(store.store(env)); + } + + let msgs = node.receive_mesh().expect("receive_mesh"); + assert_eq!(msgs.len(), 1); + assert_eq!(msgs[0].payload, b"stored msg"); + + node.close().await; + } } diff --git a/crates/quicproquo-p2p/src/store.rs b/crates/quicproquo-p2p/src/store.rs new file mode 100644 index 0000000..b9704d3 --- /dev/null +++ b/crates/quicproquo-p2p/src/store.rs @@ -0,0 +1,202 @@ +//! In-memory store-and-forward message queue for mesh nodes. +//! +//! [`MeshStore`] buffers [`MeshEnvelope`]s for offline recipients and +//! provides deduplication and automatic garbage collection of expired messages. + +use std::collections::{HashMap, HashSet}; + +use crate::envelope::MeshEnvelope; + +/// Default maximum messages stored per recipient. +const DEFAULT_MAX_STORED: usize = 1000; + +/// In-memory store-and-forward queue keyed by recipient public key. +pub struct MeshStore { + /// Recipient public key -> queued envelopes. + inbox: HashMap, Vec>, + /// Set of envelope IDs already processed (deduplication). + seen: HashSet<[u8; 32]>, + /// Maximum envelopes held per recipient. + max_stored: usize, +} + +impl MeshStore { + /// Create a new store with the given per-recipient capacity. + /// + /// A `max_stored` of 0 uses [`DEFAULT_MAX_STORED`]. + pub fn new(max_stored: usize) -> Self { + Self { + inbox: HashMap::new(), + seen: HashSet::new(), + max_stored: if max_stored == 0 { + DEFAULT_MAX_STORED + } else { + max_stored + }, + } + } + + /// Store an envelope for later delivery. + /// + /// Returns `false` (without storing) if: + /// - the envelope ID has already been seen (dedup), or + /// - the recipient's inbox is at capacity. + pub fn store(&mut self, envelope: MeshEnvelope) -> bool { + if self.seen.contains(&envelope.id) { + return false; + } + let queue = self.inbox.entry(envelope.recipient_key.clone()).or_default(); + if queue.len() >= self.max_stored { + return false; + } + self.seen.insert(envelope.id); + queue.push(envelope); + true + } + + /// Drain and return all queued messages for `recipient_key`. + pub fn fetch(&mut self, recipient_key: &[u8]) -> Vec { + self.inbox.remove(recipient_key).unwrap_or_default() + } + + /// Peek at queued messages for `recipient_key` without draining. + pub fn peek(&self, recipient_key: &[u8]) -> &[MeshEnvelope] { + self.inbox + .get(recipient_key) + .map(|v| v.as_slice()) + .unwrap_or_default() + } + + /// Remove all expired envelopes from every inbox and return the count removed. + pub fn gc_expired(&mut self) -> usize { + let mut removed = 0; + self.inbox.retain(|_key, queue| { + let before = queue.len(); + queue.retain(|env| !env.is_expired()); + removed += before - queue.len(); + !queue.is_empty() + }); + removed + } + + /// Check whether an envelope ID has already been processed. + pub fn seen(&self, id: &[u8; 32]) -> bool { + self.seen.contains(id) + } + + /// Return `(total_messages, unique_recipients)`. + pub fn stats(&self) -> (usize, usize) { + let total: usize = self.inbox.values().map(|q| q.len()).sum(); + let recipients = self.inbox.len(); + (total, recipients) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::identity::MeshIdentity; + + fn make_envelope(recipient: &[u8], payload: &[u8], ttl: u32) -> MeshEnvelope { + let id = MeshIdentity::generate(); + MeshEnvelope::new(&id, recipient, payload.to_vec(), ttl, 5) + } + + #[test] + fn store_and_fetch() { + let mut store = MeshStore::new(10); + let recip = [0xAAu8; 32]; + let env = make_envelope(&recip, b"hello", 3600); + + assert!(store.store(env)); + assert_eq!(store.stats(), (1, 1)); + + let msgs = store.fetch(&recip); + assert_eq!(msgs.len(), 1); + assert_eq!(msgs[0].payload, b"hello"); + + // After fetch, inbox is drained. + assert_eq!(store.stats(), (0, 0)); + } + + #[test] + fn deduplication() { + let mut store = MeshStore::new(10); + let recip = [0xBBu8; 32]; + let env = make_envelope(&recip, b"dup", 3600); + let env2 = env.clone(); + + assert!(store.store(env)); + assert!(!store.store(env2), "duplicate should be rejected"); + assert_eq!(store.stats(), (1, 1)); + } + + #[test] + fn capacity_limit() { + let mut store = MeshStore::new(2); + let recip = [0xCCu8; 32]; + + assert!(store.store(make_envelope(&recip, b"1", 3600))); + assert!(store.store(make_envelope(&recip, b"2", 3600))); + assert!( + !store.store(make_envelope(&recip, b"3", 3600)), + "should reject when at capacity" + ); + assert_eq!(store.stats(), (2, 1)); + } + + #[test] + fn gc_expired_messages() { + let mut store = MeshStore::new(10); + let recip = [0xDDu8; 32]; + + // Create an already-expired envelope (TTL=0, timestamp in the past). + let id = MeshIdentity::generate(); + let mut env = MeshEnvelope::new(&id, &recip, b"old".to_vec(), 0, 5); + env.timestamp = 0; // far in the past + store.store(env); + + // And a fresh one. + store.store(make_envelope(&recip, b"fresh", 3600)); + + assert_eq!(store.stats(), (2, 1)); + let removed = store.gc_expired(); + assert_eq!(removed, 1); + assert_eq!(store.stats(), (1, 1)); + } + + #[test] + fn peek_does_not_drain() { + let mut store = MeshStore::new(10); + let recip = [0xEEu8; 32]; + store.store(make_envelope(&recip, b"peek", 3600)); + + assert_eq!(store.peek(&recip).len(), 1); + assert_eq!(store.peek(&recip).len(), 1); // still there + assert_eq!(store.stats(), (1, 1)); + } + + #[test] + fn seen_tracks_processed_ids() { + let mut store = MeshStore::new(10); + let env = make_envelope(&[0xFF; 32], b"track", 3600); + let id = env.id; + + assert!(!store.seen(&id)); + store.store(env); + assert!(store.seen(&id)); + } + + #[test] + fn fetch_empty_inbox() { + let mut store = MeshStore::new(10); + let msgs = store.fetch(&[0x00; 32]); + assert!(msgs.is_empty()); + } + + #[test] + fn peek_empty_inbox() { + let store = MeshStore::new(10); + assert!(store.peek(&[0x00; 32]).is_empty()); + } +}