Files
quicproquo/crates/quicprochat-p2p/src/mesh_router.rs
Christian Nennemann f9ac921a0c feat(p2p): mesh stack, LoRa mock transport, and relay demo
Implement transport abstraction (TCP/iroh), announce and routing table,
multi-hop mesh router, truncated-address link layer, and LoRa mock
medium with fragmentation plus EU868-style duty-cycle accounting.
Add mesh_lora_relay_demo and scripts/mesh-demo.sh. Relax CBOR vs JSON
size assertion to match fixed-size cryptographic overhead. Extend
.gitignore for nested targets and node_modules.

Made-with: Cursor
2026-03-30 21:19:12 +02:00

517 lines
18 KiB
Rust

//! Multi-hop mesh router using the distributed routing table.
//!
//! The [`MeshRouter`] delivers messages using the best available path:
//! direct transport -> multi-hop via intermediate nodes -> store-and-forward.
//!
//! # Routing Algorithm
//!
//! ```text
//! send(destination, payload):
//! 1. Look up destination in routing table
//! 2. If direct transport available -> send via transport
//! 3. If next-hop known -> wrap in MeshEnvelope, send to next-hop
//! 4. If no route -> queue in store-and-forward
//! ```
use std::collections::HashMap;
use std::sync::{Arc, Mutex, RwLock};
use std::time::{Duration, Instant};
use anyhow::{bail, Result};
use crate::announce::compute_address;
use crate::envelope::MeshEnvelope;
use crate::identity::MeshIdentity;
use crate::routing_table::RoutingTable;
use crate::store::MeshStore;
use crate::transport::TransportAddr;
use crate::transport_manager::TransportManager;
/// How a message was delivered.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DeliveryResult {
/// Sent directly to destination via a transport.
Direct,
/// Forwarded to next-hop node for relay.
Forwarded,
/// Queued in store-and-forward (destination unreachable).
Stored,
/// Delivered via server relay (legacy fallback).
ServerRelay,
}
/// What to do with an incoming envelope.
#[derive(Debug)]
pub enum IncomingAction {
/// Message is for us — deliver to application.
Deliver(MeshEnvelope),
/// Message is for someone else — forward it.
Forward {
envelope: MeshEnvelope,
next_hop: TransportAddr,
},
/// Message should be stored for later forwarding.
Store(MeshEnvelope),
/// Message was dropped (expired, max hops, invalid).
Dropped(String),
}
/// Per-destination delivery statistics.
#[derive(Debug, Clone, Default)]
pub struct DeliveryStats {
pub direct_count: u64,
pub forwarded_count: u64,
pub stored_count: u64,
pub relay_count: u64,
pub last_delivery: Option<Instant>,
pub avg_latency: Option<Duration>,
}
impl DeliveryStats {
fn record(&mut self, method: DeliveryResult, latency: Duration) {
match method {
DeliveryResult::Direct => self.direct_count += 1,
DeliveryResult::Forwarded => self.forwarded_count += 1,
DeliveryResult::Stored => self.stored_count += 1,
DeliveryResult::ServerRelay => self.relay_count += 1,
}
self.last_delivery = Some(Instant::now());
self.avg_latency = Some(match self.avg_latency {
Some(prev) => (prev + latency) / 2,
None => latency,
});
}
/// Total number of deliveries across all methods.
pub fn total(&self) -> u64 {
self.direct_count + self.forwarded_count + self.stored_count + self.relay_count
}
}
/// Multi-hop mesh message router.
pub struct MeshRouter {
/// This node's mesh identity.
identity: MeshIdentity,
/// This node's 16-byte truncated address.
local_address: [u8; 16],
/// Distributed routing table.
routes: Arc<RwLock<RoutingTable>>,
/// Transport manager for sending packets.
transports: Arc<TransportManager>,
/// Store-and-forward queue for unreachable destinations.
store: Arc<Mutex<MeshStore>>,
/// Per-destination delivery stats.
stats: Mutex<HashMap<[u8; 16], DeliveryStats>>,
}
impl MeshRouter {
/// Create a new mesh router.
pub fn new(
identity: MeshIdentity,
routes: Arc<RwLock<RoutingTable>>,
transports: Arc<TransportManager>,
store: Arc<Mutex<MeshStore>>,
) -> Self {
let local_address = compute_address(&identity.public_key());
Self {
identity,
local_address,
routes,
transports,
store,
stats: Mutex::new(HashMap::new()),
}
}
/// Send a payload to a destination identified by its 16-byte mesh address.
///
/// Routing priority:
/// 1. Route found in routing table -> wrap in envelope and send via transport
/// 2. No route -> store for later forwarding
pub async fn send(&self, dest_address: &[u8; 16], payload: &[u8]) -> Result<DeliveryResult> {
let start = Instant::now();
// Look up destination in routing table.
let route_info = {
let table = self
.routes
.read()
.map_err(|e| anyhow::anyhow!("routing table lock poisoned: {e}"))?;
table.lookup(dest_address).map(|entry| {
(
entry.identity_key,
entry.next_hop_addr.clone(),
entry.hops,
)
})
};
if let Some((dest_key, next_hop_addr, hops)) = route_info {
// Build an envelope addressed to the destination.
let envelope =
MeshEnvelope::new(&self.identity, &dest_key, payload.to_vec(), 300, 0);
let wire = envelope.to_wire();
self.transports.send(&next_hop_addr, &wire).await?;
// Classify: if destination is directly reachable (hop count <= 1),
// consider it Direct; otherwise it's Forwarded through intermediaries.
let result = if hops <= 1 {
DeliveryResult::Direct
} else {
DeliveryResult::Forwarded
};
let latency = start.elapsed();
self.record_stats(dest_address, result, latency);
Ok(result)
} else {
// No route — store for later forwarding.
// We need a recipient key for the store. Since we only have the address
// and no key, store with the address zero-padded to 32 bytes as a key
// placeholder. The drain_store_for method matches on this convention.
let mut recipient_key = [0u8; 32];
recipient_key[..16].copy_from_slice(dest_address);
let envelope = MeshEnvelope::new(
&self.identity,
&recipient_key,
payload.to_vec(),
300,
0,
);
let stored = {
let mut store = self
.store
.lock()
.map_err(|e| anyhow::anyhow!("store lock poisoned: {e}"))?;
store.store(envelope)
};
if !stored {
bail!("store rejected envelope (duplicate or at capacity)");
}
let latency = start.elapsed();
let result = DeliveryResult::Stored;
self.record_stats(dest_address, result, latency);
Ok(result)
}
}
/// Convenience: compute the 16-byte address from a 32-byte key, then send.
pub async fn send_to_key(
&self,
dest_key: &[u8; 32],
payload: &[u8],
) -> Result<DeliveryResult> {
let addr = compute_address(dest_key);
self.send(&addr, payload).await
}
/// Process a received envelope and decide what to do with it.
pub fn handle_incoming(&self, envelope: MeshEnvelope) -> Result<IncomingAction> {
// Verify envelope signature.
if !envelope.verify() {
return Ok(IncomingAction::Dropped(
"invalid signature".to_string(),
));
}
// Check if it's for us (recipient_key matches our identity).
let our_key = self.identity.public_key();
if envelope.recipient_key.len() == 32 {
let recipient: [u8; 32] = envelope
.recipient_key
.as_slice()
.try_into()
.map_err(|_| anyhow::anyhow!("invalid recipient key length"))?;
if recipient == our_key {
return Ok(IncomingAction::Deliver(envelope));
}
}
// Broadcast (empty recipient) — always deliver locally.
if envelope.recipient_key.is_empty() {
return Ok(IncomingAction::Deliver(envelope));
}
// Not for us — check if we can forward.
if !envelope.can_forward() {
let reason = if envelope.is_expired() {
"envelope expired"
} else {
"max hops reached"
};
return Ok(IncomingAction::Dropped(reason.to_string()));
}
// Look up the recipient in the routing table.
let dest_address = compute_address(&envelope.recipient_key);
let next_hop = {
let table = self
.routes
.read()
.map_err(|e| anyhow::anyhow!("routing table lock poisoned: {e}"))?;
table
.lookup(&dest_address)
.map(|entry| entry.next_hop_addr.clone())
};
match next_hop {
Some(addr) => {
let forwarded = envelope.forwarded();
Ok(IncomingAction::Forward {
envelope: forwarded,
next_hop: addr,
})
}
None => Ok(IncomingAction::Store(envelope)),
}
}
/// Forward an envelope to its next hop based on the routing table.
///
/// The envelope is sent as-is (callers such as [`handle_incoming`](Self::handle_incoming)
/// are expected to have already incremented the hop count via [`MeshEnvelope::forwarded`]).
pub async fn forward(&self, envelope: MeshEnvelope) -> Result<DeliveryResult> {
let start = Instant::now();
let dest_address = compute_address(&envelope.recipient_key);
let next_hop_addr = {
let table = self
.routes
.read()
.map_err(|e| anyhow::anyhow!("routing table lock poisoned: {e}"))?;
table
.lookup(&dest_address)
.map(|entry| entry.next_hop_addr.clone())
.ok_or_else(|| anyhow::anyhow!("no route for forwarding target"))?
};
let wire = envelope.to_wire();
self.transports.send(&next_hop_addr, &wire).await?;
let latency = start.elapsed();
let result = DeliveryResult::Forwarded;
self.record_stats(&dest_address, result, latency);
Ok(result)
}
/// Drain stored messages for a destination and attempt to forward them.
///
/// Call this when a new route appears (e.g., from an announce) to flush
/// queued messages. Returns the count of successfully forwarded messages.
pub async fn drain_store_for(&self, dest_address: &[u8; 16]) -> Result<usize> {
// Look up the route to get identity key and next-hop.
let (identity_key, next_hop_addr) = {
let table = self
.routes
.read()
.map_err(|e| anyhow::anyhow!("routing table lock poisoned: {e}"))?;
match table.lookup(dest_address) {
Some(entry) => (entry.identity_key, entry.next_hop_addr.clone()),
None => return Ok(0),
}
};
// Fetch stored envelopes keyed by the full identity key.
let envelopes = {
let mut store = self
.store
.lock()
.map_err(|e| anyhow::anyhow!("store lock poisoned: {e}"))?;
let mut result = store.fetch(&identity_key);
// Also try the zero-padded address convention used by send().
let mut padded_key = [0u8; 32];
padded_key[..16].copy_from_slice(dest_address);
result.extend(store.fetch(&padded_key));
result
};
let mut forwarded_count = 0;
for env in envelopes {
if env.can_forward() {
let fwd = env.forwarded();
let wire = fwd.to_wire();
if self.transports.send(&next_hop_addr, &wire).await.is_ok() {
forwarded_count += 1;
}
}
}
Ok(forwarded_count)
}
/// Get delivery statistics for a specific destination.
pub fn stats(&self, address: &[u8; 16]) -> Option<DeliveryStats> {
self.stats
.lock()
.ok()
.and_then(|s| s.get(address).cloned())
}
/// Get delivery statistics for all known destinations.
pub fn all_stats(&self) -> HashMap<[u8; 16], DeliveryStats> {
self.stats
.lock()
.map(|s| s.clone())
.unwrap_or_default()
}
/// This node's 16-byte truncated mesh address.
pub fn local_address(&self) -> &[u8; 16] {
&self.local_address
}
/// Record a delivery in the per-destination stats.
fn record_stats(&self, address: &[u8; 16], method: DeliveryResult, latency: Duration) {
if let Ok(mut stats) = self.stats.lock() {
stats
.entry(*address)
.or_default()
.record(method, latency);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn delivery_stats_tracking() {
let mut stats = DeliveryStats::default();
assert_eq!(stats.total(), 0);
stats.record(DeliveryResult::Direct, Duration::from_millis(10));
assert_eq!(stats.direct_count, 1);
assert_eq!(stats.total(), 1);
assert!(stats.last_delivery.is_some());
assert!(stats.avg_latency.is_some());
stats.record(DeliveryResult::Forwarded, Duration::from_millis(20));
assert_eq!(stats.forwarded_count, 1);
assert_eq!(stats.total(), 2);
stats.record(DeliveryResult::Stored, Duration::from_millis(5));
assert_eq!(stats.stored_count, 1);
assert_eq!(stats.total(), 3);
stats.record(DeliveryResult::ServerRelay, Duration::from_millis(50));
assert_eq!(stats.relay_count, 1);
assert_eq!(stats.total(), 4);
// avg_latency should be present and reasonable.
let avg = stats.avg_latency.unwrap();
assert!(avg.as_millis() > 0);
}
#[test]
fn incoming_action_deliver_to_self() {
let identity = MeshIdentity::generate();
let our_key = identity.public_key();
let routes = Arc::new(RwLock::new(RoutingTable::new(Duration::from_secs(300))));
let transports = Arc::new(TransportManager::new());
let store = Arc::new(Mutex::new(MeshStore::new(100)));
let router = MeshRouter::new(identity, routes, transports, store);
// Create an envelope addressed to our key.
let sender = MeshIdentity::generate();
let envelope =
MeshEnvelope::new(&sender, &our_key, b"hello self".to_vec(), 3600, 5);
let action = router.handle_incoming(envelope).expect("handle_incoming");
match action {
IncomingAction::Deliver(env) => {
assert_eq!(env.payload, b"hello self");
}
other => panic!("expected Deliver, got {:?}", std::mem::discriminant(&other)),
}
}
#[test]
fn incoming_action_broadcast_delivers() {
let identity = MeshIdentity::generate();
let routes = Arc::new(RwLock::new(RoutingTable::new(Duration::from_secs(300))));
let transports = Arc::new(TransportManager::new());
let store = Arc::new(Mutex::new(MeshStore::new(100)));
let router = MeshRouter::new(identity, routes, transports, store);
// Create a broadcast envelope (empty recipient key).
let sender = MeshIdentity::generate();
let envelope =
MeshEnvelope::new(&sender, &[], b"broadcast msg".to_vec(), 3600, 5);
let action = router.handle_incoming(envelope).expect("handle_incoming");
match action {
IncomingAction::Deliver(env) => {
assert_eq!(env.payload, b"broadcast msg");
assert!(env.recipient_key.is_empty());
}
other => panic!("expected Deliver, got {:?}", std::mem::discriminant(&other)),
}
}
#[test]
fn incoming_action_dropped_expired() {
let identity = MeshIdentity::generate();
let routes = Arc::new(RwLock::new(RoutingTable::new(Duration::from_secs(300))));
let transports = Arc::new(TransportManager::new());
let store = Arc::new(Mutex::new(MeshStore::new(100)));
let router = MeshRouter::new(identity, routes, transports, store);
// Create an envelope addressed to someone else with TTL=0.
// is_expired() checks: now - timestamp > ttl_secs.
// With ttl=0 and timestamp=now, we need to wait >0 seconds for expiry.
let sender = MeshIdentity::generate();
let other_key = [0xBB; 32];
let envelope =
MeshEnvelope::new(&sender, &other_key, b"expired".to_vec(), 0, 5);
// Sleep briefly so that now - timestamp > 0 (the TTL).
std::thread::sleep(Duration::from_millis(1100));
let action = router.handle_incoming(envelope).expect("handle_incoming");
match action {
IncomingAction::Dropped(reason) => {
assert!(
reason.contains("expired"),
"expected expired reason, got: {reason}"
);
}
other => panic!("expected Dropped, got {:?}", std::mem::discriminant(&other)),
}
}
#[test]
fn incoming_action_dropped_invalid_sig() {
let identity = MeshIdentity::generate();
let routes = Arc::new(RwLock::new(RoutingTable::new(Duration::from_secs(300))));
let transports = Arc::new(TransportManager::new());
let store = Arc::new(Mutex::new(MeshStore::new(100)));
let router = MeshRouter::new(identity, routes, transports, store);
// Create a valid envelope then tamper with the payload.
let sender = MeshIdentity::generate();
let other_key = [0xCC; 32];
let mut envelope =
MeshEnvelope::new(&sender, &other_key, b"original".to_vec(), 3600, 5);
envelope.payload = b"tampered".to_vec();
let action = router.handle_incoming(envelope).expect("handle_incoming");
match action {
IncomingAction::Dropped(reason) => {
assert!(
reason.contains("invalid signature"),
"expected invalid signature reason, got: {reason}"
);
}
other => panic!("expected Dropped, got {:?}", std::mem::discriminant(&other)),
}
}
}