//! Multi-hop mesh router using the distributed routing table. //! //! The [`MeshRouter`] delivers messages using the best available path: //! direct transport -> multi-hop via intermediate nodes -> store-and-forward. //! //! # Routing Algorithm //! //! ```text //! send(destination, payload): //! 1. Look up destination in routing table //! 2. If direct transport available -> send via transport //! 3. If next-hop known -> wrap in MeshEnvelope, send to next-hop //! 4. If no route -> queue in store-and-forward //! ``` use std::collections::HashMap; use std::sync::{Arc, Mutex, RwLock}; use std::time::{Duration, Instant}; use anyhow::{bail, Result}; use crate::announce::compute_address; use crate::envelope::MeshEnvelope; use crate::identity::MeshIdentity; use crate::routing_table::RoutingTable; use crate::store::MeshStore; use crate::transport::TransportAddr; use crate::transport_manager::TransportManager; /// How a message was delivered. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum DeliveryResult { /// Sent directly to destination via a transport. Direct, /// Forwarded to next-hop node for relay. Forwarded, /// Queued in store-and-forward (destination unreachable). Stored, /// Delivered via server relay (legacy fallback). ServerRelay, } /// What to do with an incoming envelope. #[derive(Debug)] pub enum IncomingAction { /// Message is for us — deliver to application. Deliver(MeshEnvelope), /// Message is for someone else — forward it. Forward { envelope: MeshEnvelope, next_hop: TransportAddr, }, /// Message should be stored for later forwarding. Store(MeshEnvelope), /// Message was dropped (expired, max hops, invalid). Dropped(String), } /// Per-destination delivery statistics. #[derive(Debug, Clone, Default)] pub struct DeliveryStats { pub direct_count: u64, pub forwarded_count: u64, pub stored_count: u64, pub relay_count: u64, pub last_delivery: Option, pub avg_latency: Option, } impl DeliveryStats { fn record(&mut self, method: DeliveryResult, latency: Duration) { match method { DeliveryResult::Direct => self.direct_count += 1, DeliveryResult::Forwarded => self.forwarded_count += 1, DeliveryResult::Stored => self.stored_count += 1, DeliveryResult::ServerRelay => self.relay_count += 1, } self.last_delivery = Some(Instant::now()); self.avg_latency = Some(match self.avg_latency { Some(prev) => (prev + latency) / 2, None => latency, }); } /// Total number of deliveries across all methods. pub fn total(&self) -> u64 { self.direct_count + self.forwarded_count + self.stored_count + self.relay_count } } /// Multi-hop mesh message router. pub struct MeshRouter { /// This node's mesh identity. identity: MeshIdentity, /// This node's 16-byte truncated address. local_address: [u8; 16], /// Distributed routing table. routes: Arc>, /// Transport manager for sending packets. transports: Arc, /// Store-and-forward queue for unreachable destinations. store: Arc>, /// Per-destination delivery stats. stats: Mutex>, } impl MeshRouter { /// Create a new mesh router. pub fn new( identity: MeshIdentity, routes: Arc>, transports: Arc, store: Arc>, ) -> Self { let local_address = compute_address(&identity.public_key()); Self { identity, local_address, routes, transports, store, stats: Mutex::new(HashMap::new()), } } /// Send a payload to a destination identified by its 16-byte mesh address. /// /// Routing priority: /// 1. Route found in routing table -> wrap in envelope and send via transport /// 2. No route -> store for later forwarding pub async fn send(&self, dest_address: &[u8; 16], payload: &[u8]) -> Result { let start = Instant::now(); // Look up destination in routing table. let route_info = { let table = self .routes .read() .map_err(|e| anyhow::anyhow!("routing table lock poisoned: {e}"))?; table.lookup(dest_address).map(|entry| { ( entry.identity_key, entry.next_hop_addr.clone(), entry.hops, ) }) }; if let Some((dest_key, next_hop_addr, hops)) = route_info { // Build an envelope addressed to the destination. let envelope = MeshEnvelope::new(&self.identity, &dest_key, payload.to_vec(), 300, 0); let wire = envelope.to_wire(); self.transports.send(&next_hop_addr, &wire).await?; // Classify: if destination is directly reachable (hop count <= 1), // consider it Direct; otherwise it's Forwarded through intermediaries. let result = if hops <= 1 { DeliveryResult::Direct } else { DeliveryResult::Forwarded }; let latency = start.elapsed(); self.record_stats(dest_address, result, latency); Ok(result) } else { // No route — store for later forwarding. // We need a recipient key for the store. Since we only have the address // and no key, store with the address zero-padded to 32 bytes as a key // placeholder. The drain_store_for method matches on this convention. let mut recipient_key = [0u8; 32]; recipient_key[..16].copy_from_slice(dest_address); let envelope = MeshEnvelope::new( &self.identity, &recipient_key, payload.to_vec(), 300, 0, ); let stored = { let mut store = self .store .lock() .map_err(|e| anyhow::anyhow!("store lock poisoned: {e}"))?; store.store(envelope) }; if !stored { bail!("store rejected envelope (duplicate or at capacity)"); } let latency = start.elapsed(); let result = DeliveryResult::Stored; self.record_stats(dest_address, result, latency); Ok(result) } } /// Convenience: compute the 16-byte address from a 32-byte key, then send. pub async fn send_to_key( &self, dest_key: &[u8; 32], payload: &[u8], ) -> Result { let addr = compute_address(dest_key); self.send(&addr, payload).await } /// Process a received envelope and decide what to do with it. pub fn handle_incoming(&self, envelope: MeshEnvelope) -> Result { // Verify envelope signature. if !envelope.verify() { return Ok(IncomingAction::Dropped( "invalid signature".to_string(), )); } // Check if it's for us (recipient_key matches our identity). let our_key = self.identity.public_key(); if envelope.recipient_key.len() == 32 { let recipient: [u8; 32] = envelope .recipient_key .as_slice() .try_into() .map_err(|_| anyhow::anyhow!("invalid recipient key length"))?; if recipient == our_key { return Ok(IncomingAction::Deliver(envelope)); } } // Broadcast (empty recipient) — always deliver locally. if envelope.recipient_key.is_empty() { return Ok(IncomingAction::Deliver(envelope)); } // Not for us — check if we can forward. if !envelope.can_forward() { let reason = if envelope.is_expired() { "envelope expired" } else { "max hops reached" }; return Ok(IncomingAction::Dropped(reason.to_string())); } // Look up the recipient in the routing table. let dest_address = compute_address(&envelope.recipient_key); let next_hop = { let table = self .routes .read() .map_err(|e| anyhow::anyhow!("routing table lock poisoned: {e}"))?; table .lookup(&dest_address) .map(|entry| entry.next_hop_addr.clone()) }; match next_hop { Some(addr) => { let forwarded = envelope.forwarded(); Ok(IncomingAction::Forward { envelope: forwarded, next_hop: addr, }) } None => Ok(IncomingAction::Store(envelope)), } } /// Forward an envelope to its next hop based on the routing table. /// /// The envelope is sent as-is (callers such as [`handle_incoming`](Self::handle_incoming) /// are expected to have already incremented the hop count via [`MeshEnvelope::forwarded`]). pub async fn forward(&self, envelope: MeshEnvelope) -> Result { let start = Instant::now(); let dest_address = compute_address(&envelope.recipient_key); let next_hop_addr = { let table = self .routes .read() .map_err(|e| anyhow::anyhow!("routing table lock poisoned: {e}"))?; table .lookup(&dest_address) .map(|entry| entry.next_hop_addr.clone()) .ok_or_else(|| anyhow::anyhow!("no route for forwarding target"))? }; let wire = envelope.to_wire(); self.transports.send(&next_hop_addr, &wire).await?; let latency = start.elapsed(); let result = DeliveryResult::Forwarded; self.record_stats(&dest_address, result, latency); Ok(result) } /// Drain stored messages for a destination and attempt to forward them. /// /// Call this when a new route appears (e.g., from an announce) to flush /// queued messages. Returns the count of successfully forwarded messages. pub async fn drain_store_for(&self, dest_address: &[u8; 16]) -> Result { // Look up the route to get identity key and next-hop. let (identity_key, next_hop_addr) = { let table = self .routes .read() .map_err(|e| anyhow::anyhow!("routing table lock poisoned: {e}"))?; match table.lookup(dest_address) { Some(entry) => (entry.identity_key, entry.next_hop_addr.clone()), None => return Ok(0), } }; // Fetch stored envelopes keyed by the full identity key. let envelopes = { let mut store = self .store .lock() .map_err(|e| anyhow::anyhow!("store lock poisoned: {e}"))?; let mut result = store.fetch(&identity_key); // Also try the zero-padded address convention used by send(). let mut padded_key = [0u8; 32]; padded_key[..16].copy_from_slice(dest_address); result.extend(store.fetch(&padded_key)); result }; let mut forwarded_count = 0; for env in envelopes { if env.can_forward() { let fwd = env.forwarded(); let wire = fwd.to_wire(); if self.transports.send(&next_hop_addr, &wire).await.is_ok() { forwarded_count += 1; } } } Ok(forwarded_count) } /// Get delivery statistics for a specific destination. pub fn stats(&self, address: &[u8; 16]) -> Option { self.stats .lock() .ok() .and_then(|s| s.get(address).cloned()) } /// Get delivery statistics for all known destinations. pub fn all_stats(&self) -> HashMap<[u8; 16], DeliveryStats> { self.stats .lock() .map(|s| s.clone()) .unwrap_or_default() } /// This node's 16-byte truncated mesh address. pub fn local_address(&self) -> &[u8; 16] { &self.local_address } /// Record a delivery in the per-destination stats. fn record_stats(&self, address: &[u8; 16], method: DeliveryResult, latency: Duration) { if let Ok(mut stats) = self.stats.lock() { stats .entry(*address) .or_default() .record(method, latency); } } } #[cfg(test)] mod tests { use super::*; #[test] fn delivery_stats_tracking() { let mut stats = DeliveryStats::default(); assert_eq!(stats.total(), 0); stats.record(DeliveryResult::Direct, Duration::from_millis(10)); assert_eq!(stats.direct_count, 1); assert_eq!(stats.total(), 1); assert!(stats.last_delivery.is_some()); assert!(stats.avg_latency.is_some()); stats.record(DeliveryResult::Forwarded, Duration::from_millis(20)); assert_eq!(stats.forwarded_count, 1); assert_eq!(stats.total(), 2); stats.record(DeliveryResult::Stored, Duration::from_millis(5)); assert_eq!(stats.stored_count, 1); assert_eq!(stats.total(), 3); stats.record(DeliveryResult::ServerRelay, Duration::from_millis(50)); assert_eq!(stats.relay_count, 1); assert_eq!(stats.total(), 4); // avg_latency should be present and reasonable. let avg = stats.avg_latency.unwrap(); assert!(avg.as_millis() > 0); } #[test] fn incoming_action_deliver_to_self() { let identity = MeshIdentity::generate(); let our_key = identity.public_key(); let routes = Arc::new(RwLock::new(RoutingTable::new(Duration::from_secs(300)))); let transports = Arc::new(TransportManager::new()); let store = Arc::new(Mutex::new(MeshStore::new(100))); let router = MeshRouter::new(identity, routes, transports, store); // Create an envelope addressed to our key. let sender = MeshIdentity::generate(); let envelope = MeshEnvelope::new(&sender, &our_key, b"hello self".to_vec(), 3600, 5); let action = router.handle_incoming(envelope).expect("handle_incoming"); match action { IncomingAction::Deliver(env) => { assert_eq!(env.payload, b"hello self"); } other => panic!("expected Deliver, got {:?}", std::mem::discriminant(&other)), } } #[test] fn incoming_action_broadcast_delivers() { let identity = MeshIdentity::generate(); let routes = Arc::new(RwLock::new(RoutingTable::new(Duration::from_secs(300)))); let transports = Arc::new(TransportManager::new()); let store = Arc::new(Mutex::new(MeshStore::new(100))); let router = MeshRouter::new(identity, routes, transports, store); // Create a broadcast envelope (empty recipient key). let sender = MeshIdentity::generate(); let envelope = MeshEnvelope::new(&sender, &[], b"broadcast msg".to_vec(), 3600, 5); let action = router.handle_incoming(envelope).expect("handle_incoming"); match action { IncomingAction::Deliver(env) => { assert_eq!(env.payload, b"broadcast msg"); assert!(env.recipient_key.is_empty()); } other => panic!("expected Deliver, got {:?}", std::mem::discriminant(&other)), } } #[test] fn incoming_action_dropped_expired() { let identity = MeshIdentity::generate(); let routes = Arc::new(RwLock::new(RoutingTable::new(Duration::from_secs(300)))); let transports = Arc::new(TransportManager::new()); let store = Arc::new(Mutex::new(MeshStore::new(100))); let router = MeshRouter::new(identity, routes, transports, store); // Create an envelope addressed to someone else with TTL=0. // is_expired() checks: now - timestamp > ttl_secs. // With ttl=0 and timestamp=now, we need to wait >0 seconds for expiry. let sender = MeshIdentity::generate(); let other_key = [0xBB; 32]; let envelope = MeshEnvelope::new(&sender, &other_key, b"expired".to_vec(), 0, 5); // Sleep briefly so that now - timestamp > 0 (the TTL). std::thread::sleep(Duration::from_millis(1100)); let action = router.handle_incoming(envelope).expect("handle_incoming"); match action { IncomingAction::Dropped(reason) => { assert!( reason.contains("expired"), "expected expired reason, got: {reason}" ); } other => panic!("expected Dropped, got {:?}", std::mem::discriminant(&other)), } } #[test] fn incoming_action_dropped_invalid_sig() { let identity = MeshIdentity::generate(); let routes = Arc::new(RwLock::new(RoutingTable::new(Duration::from_secs(300)))); let transports = Arc::new(TransportManager::new()); let store = Arc::new(Mutex::new(MeshStore::new(100))); let router = MeshRouter::new(identity, routes, transports, store); // Create a valid envelope then tamper with the payload. let sender = MeshIdentity::generate(); let other_key = [0xCC; 32]; let mut envelope = MeshEnvelope::new(&sender, &other_key, b"original".to_vec(), 3600, 5); envelope.payload = b"tampered".to_vec(); let action = router.handle_incoming(envelope).expect("handle_incoming"); match action { IncomingAction::Dropped(reason) => { assert!( reason.contains("invalid signature"), "expected invalid signature reason, got: {reason}" ); } other => panic!("expected Dropped, got {:?}", std::mem::discriminant(&other)), } } }