feat(p2p): mesh stack, LoRa mock transport, and relay demo
Implement transport abstraction (TCP/iroh), announce and routing table, multi-hop mesh router, truncated-address link layer, and LoRa mock medium with fragmentation plus EU868-style duty-cycle accounting. Add mesh_lora_relay_demo and scripts/mesh-demo.sh. Relax CBOR vs JSON size assertion to match fixed-size cryptographic overhead. Extend .gitignore for nested targets and node_modules. Made-with: Cursor
This commit is contained in:
516
crates/quicprochat-p2p/src/mesh_router.rs
Normal file
516
crates/quicprochat-p2p/src/mesh_router.rs
Normal file
@@ -0,0 +1,516 @@
|
||||
//! Multi-hop mesh router using the distributed routing table.
|
||||
//!
|
||||
//! The [`MeshRouter`] delivers messages using the best available path:
|
||||
//! direct transport -> multi-hop via intermediate nodes -> store-and-forward.
|
||||
//!
|
||||
//! # Routing Algorithm
|
||||
//!
|
||||
//! ```text
|
||||
//! send(destination, payload):
|
||||
//! 1. Look up destination in routing table
|
||||
//! 2. If direct transport available -> send via transport
|
||||
//! 3. If next-hop known -> wrap in MeshEnvelope, send to next-hop
|
||||
//! 4. If no route -> queue in store-and-forward
|
||||
//! ```
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{Arc, Mutex, RwLock};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use anyhow::{bail, Result};
|
||||
|
||||
use crate::announce::compute_address;
|
||||
use crate::envelope::MeshEnvelope;
|
||||
use crate::identity::MeshIdentity;
|
||||
use crate::routing_table::RoutingTable;
|
||||
use crate::store::MeshStore;
|
||||
use crate::transport::TransportAddr;
|
||||
use crate::transport_manager::TransportManager;
|
||||
|
||||
/// How a message was delivered.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum DeliveryResult {
|
||||
/// Sent directly to destination via a transport.
|
||||
Direct,
|
||||
/// Forwarded to next-hop node for relay.
|
||||
Forwarded,
|
||||
/// Queued in store-and-forward (destination unreachable).
|
||||
Stored,
|
||||
/// Delivered via server relay (legacy fallback).
|
||||
ServerRelay,
|
||||
}
|
||||
|
||||
/// What to do with an incoming envelope.
|
||||
#[derive(Debug)]
|
||||
pub enum IncomingAction {
|
||||
/// Message is for us — deliver to application.
|
||||
Deliver(MeshEnvelope),
|
||||
/// Message is for someone else — forward it.
|
||||
Forward {
|
||||
envelope: MeshEnvelope,
|
||||
next_hop: TransportAddr,
|
||||
},
|
||||
/// Message should be stored for later forwarding.
|
||||
Store(MeshEnvelope),
|
||||
/// Message was dropped (expired, max hops, invalid).
|
||||
Dropped(String),
|
||||
}
|
||||
|
||||
/// Per-destination delivery statistics.
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct DeliveryStats {
|
||||
pub direct_count: u64,
|
||||
pub forwarded_count: u64,
|
||||
pub stored_count: u64,
|
||||
pub relay_count: u64,
|
||||
pub last_delivery: Option<Instant>,
|
||||
pub avg_latency: Option<Duration>,
|
||||
}
|
||||
|
||||
impl DeliveryStats {
|
||||
fn record(&mut self, method: DeliveryResult, latency: Duration) {
|
||||
match method {
|
||||
DeliveryResult::Direct => self.direct_count += 1,
|
||||
DeliveryResult::Forwarded => self.forwarded_count += 1,
|
||||
DeliveryResult::Stored => self.stored_count += 1,
|
||||
DeliveryResult::ServerRelay => self.relay_count += 1,
|
||||
}
|
||||
self.last_delivery = Some(Instant::now());
|
||||
self.avg_latency = Some(match self.avg_latency {
|
||||
Some(prev) => (prev + latency) / 2,
|
||||
None => latency,
|
||||
});
|
||||
}
|
||||
|
||||
/// Total number of deliveries across all methods.
|
||||
pub fn total(&self) -> u64 {
|
||||
self.direct_count + self.forwarded_count + self.stored_count + self.relay_count
|
||||
}
|
||||
}
|
||||
|
||||
/// Multi-hop mesh message router.
|
||||
pub struct MeshRouter {
|
||||
/// This node's mesh identity.
|
||||
identity: MeshIdentity,
|
||||
/// This node's 16-byte truncated address.
|
||||
local_address: [u8; 16],
|
||||
/// Distributed routing table.
|
||||
routes: Arc<RwLock<RoutingTable>>,
|
||||
/// Transport manager for sending packets.
|
||||
transports: Arc<TransportManager>,
|
||||
/// Store-and-forward queue for unreachable destinations.
|
||||
store: Arc<Mutex<MeshStore>>,
|
||||
/// Per-destination delivery stats.
|
||||
stats: Mutex<HashMap<[u8; 16], DeliveryStats>>,
|
||||
}
|
||||
|
||||
impl MeshRouter {
|
||||
/// Create a new mesh router.
|
||||
pub fn new(
|
||||
identity: MeshIdentity,
|
||||
routes: Arc<RwLock<RoutingTable>>,
|
||||
transports: Arc<TransportManager>,
|
||||
store: Arc<Mutex<MeshStore>>,
|
||||
) -> Self {
|
||||
let local_address = compute_address(&identity.public_key());
|
||||
Self {
|
||||
identity,
|
||||
local_address,
|
||||
routes,
|
||||
transports,
|
||||
store,
|
||||
stats: Mutex::new(HashMap::new()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Send a payload to a destination identified by its 16-byte mesh address.
|
||||
///
|
||||
/// Routing priority:
|
||||
/// 1. Route found in routing table -> wrap in envelope and send via transport
|
||||
/// 2. No route -> store for later forwarding
|
||||
pub async fn send(&self, dest_address: &[u8; 16], payload: &[u8]) -> Result<DeliveryResult> {
|
||||
let start = Instant::now();
|
||||
|
||||
// Look up destination in routing table.
|
||||
let route_info = {
|
||||
let table = self
|
||||
.routes
|
||||
.read()
|
||||
.map_err(|e| anyhow::anyhow!("routing table lock poisoned: {e}"))?;
|
||||
table.lookup(dest_address).map(|entry| {
|
||||
(
|
||||
entry.identity_key,
|
||||
entry.next_hop_addr.clone(),
|
||||
entry.hops,
|
||||
)
|
||||
})
|
||||
};
|
||||
|
||||
if let Some((dest_key, next_hop_addr, hops)) = route_info {
|
||||
// Build an envelope addressed to the destination.
|
||||
let envelope =
|
||||
MeshEnvelope::new(&self.identity, &dest_key, payload.to_vec(), 300, 0);
|
||||
let wire = envelope.to_wire();
|
||||
|
||||
self.transports.send(&next_hop_addr, &wire).await?;
|
||||
|
||||
// Classify: if destination is directly reachable (hop count <= 1),
|
||||
// consider it Direct; otherwise it's Forwarded through intermediaries.
|
||||
let result = if hops <= 1 {
|
||||
DeliveryResult::Direct
|
||||
} else {
|
||||
DeliveryResult::Forwarded
|
||||
};
|
||||
|
||||
let latency = start.elapsed();
|
||||
self.record_stats(dest_address, result, latency);
|
||||
Ok(result)
|
||||
} else {
|
||||
// No route — store for later forwarding.
|
||||
// We need a recipient key for the store. Since we only have the address
|
||||
// and no key, store with the address zero-padded to 32 bytes as a key
|
||||
// placeholder. The drain_store_for method matches on this convention.
|
||||
let mut recipient_key = [0u8; 32];
|
||||
recipient_key[..16].copy_from_slice(dest_address);
|
||||
|
||||
let envelope = MeshEnvelope::new(
|
||||
&self.identity,
|
||||
&recipient_key,
|
||||
payload.to_vec(),
|
||||
300,
|
||||
0,
|
||||
);
|
||||
let stored = {
|
||||
let mut store = self
|
||||
.store
|
||||
.lock()
|
||||
.map_err(|e| anyhow::anyhow!("store lock poisoned: {e}"))?;
|
||||
store.store(envelope)
|
||||
};
|
||||
if !stored {
|
||||
bail!("store rejected envelope (duplicate or at capacity)");
|
||||
}
|
||||
|
||||
let latency = start.elapsed();
|
||||
let result = DeliveryResult::Stored;
|
||||
self.record_stats(dest_address, result, latency);
|
||||
Ok(result)
|
||||
}
|
||||
}
|
||||
|
||||
/// Convenience: compute the 16-byte address from a 32-byte key, then send.
|
||||
pub async fn send_to_key(
|
||||
&self,
|
||||
dest_key: &[u8; 32],
|
||||
payload: &[u8],
|
||||
) -> Result<DeliveryResult> {
|
||||
let addr = compute_address(dest_key);
|
||||
self.send(&addr, payload).await
|
||||
}
|
||||
|
||||
/// Process a received envelope and decide what to do with it.
|
||||
pub fn handle_incoming(&self, envelope: MeshEnvelope) -> Result<IncomingAction> {
|
||||
// Verify envelope signature.
|
||||
if !envelope.verify() {
|
||||
return Ok(IncomingAction::Dropped(
|
||||
"invalid signature".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
// Check if it's for us (recipient_key matches our identity).
|
||||
let our_key = self.identity.public_key();
|
||||
if envelope.recipient_key.len() == 32 {
|
||||
let recipient: [u8; 32] = envelope
|
||||
.recipient_key
|
||||
.as_slice()
|
||||
.try_into()
|
||||
.map_err(|_| anyhow::anyhow!("invalid recipient key length"))?;
|
||||
if recipient == our_key {
|
||||
return Ok(IncomingAction::Deliver(envelope));
|
||||
}
|
||||
}
|
||||
|
||||
// Broadcast (empty recipient) — always deliver locally.
|
||||
if envelope.recipient_key.is_empty() {
|
||||
return Ok(IncomingAction::Deliver(envelope));
|
||||
}
|
||||
|
||||
// Not for us — check if we can forward.
|
||||
if !envelope.can_forward() {
|
||||
let reason = if envelope.is_expired() {
|
||||
"envelope expired"
|
||||
} else {
|
||||
"max hops reached"
|
||||
};
|
||||
return Ok(IncomingAction::Dropped(reason.to_string()));
|
||||
}
|
||||
|
||||
// Look up the recipient in the routing table.
|
||||
let dest_address = compute_address(&envelope.recipient_key);
|
||||
let next_hop = {
|
||||
let table = self
|
||||
.routes
|
||||
.read()
|
||||
.map_err(|e| anyhow::anyhow!("routing table lock poisoned: {e}"))?;
|
||||
table
|
||||
.lookup(&dest_address)
|
||||
.map(|entry| entry.next_hop_addr.clone())
|
||||
};
|
||||
|
||||
match next_hop {
|
||||
Some(addr) => {
|
||||
let forwarded = envelope.forwarded();
|
||||
Ok(IncomingAction::Forward {
|
||||
envelope: forwarded,
|
||||
next_hop: addr,
|
||||
})
|
||||
}
|
||||
None => Ok(IncomingAction::Store(envelope)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Forward an envelope to its next hop based on the routing table.
|
||||
///
|
||||
/// The envelope is sent as-is (callers such as [`handle_incoming`](Self::handle_incoming)
|
||||
/// are expected to have already incremented the hop count via [`MeshEnvelope::forwarded`]).
|
||||
pub async fn forward(&self, envelope: MeshEnvelope) -> Result<DeliveryResult> {
|
||||
let start = Instant::now();
|
||||
let dest_address = compute_address(&envelope.recipient_key);
|
||||
|
||||
let next_hop_addr = {
|
||||
let table = self
|
||||
.routes
|
||||
.read()
|
||||
.map_err(|e| anyhow::anyhow!("routing table lock poisoned: {e}"))?;
|
||||
table
|
||||
.lookup(&dest_address)
|
||||
.map(|entry| entry.next_hop_addr.clone())
|
||||
.ok_or_else(|| anyhow::anyhow!("no route for forwarding target"))?
|
||||
};
|
||||
|
||||
let wire = envelope.to_wire();
|
||||
self.transports.send(&next_hop_addr, &wire).await?;
|
||||
|
||||
let latency = start.elapsed();
|
||||
let result = DeliveryResult::Forwarded;
|
||||
self.record_stats(&dest_address, result, latency);
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Drain stored messages for a destination and attempt to forward them.
|
||||
///
|
||||
/// Call this when a new route appears (e.g., from an announce) to flush
|
||||
/// queued messages. Returns the count of successfully forwarded messages.
|
||||
pub async fn drain_store_for(&self, dest_address: &[u8; 16]) -> Result<usize> {
|
||||
// Look up the route to get identity key and next-hop.
|
||||
let (identity_key, next_hop_addr) = {
|
||||
let table = self
|
||||
.routes
|
||||
.read()
|
||||
.map_err(|e| anyhow::anyhow!("routing table lock poisoned: {e}"))?;
|
||||
match table.lookup(dest_address) {
|
||||
Some(entry) => (entry.identity_key, entry.next_hop_addr.clone()),
|
||||
None => return Ok(0),
|
||||
}
|
||||
};
|
||||
|
||||
// Fetch stored envelopes keyed by the full identity key.
|
||||
let envelopes = {
|
||||
let mut store = self
|
||||
.store
|
||||
.lock()
|
||||
.map_err(|e| anyhow::anyhow!("store lock poisoned: {e}"))?;
|
||||
let mut result = store.fetch(&identity_key);
|
||||
// Also try the zero-padded address convention used by send().
|
||||
let mut padded_key = [0u8; 32];
|
||||
padded_key[..16].copy_from_slice(dest_address);
|
||||
result.extend(store.fetch(&padded_key));
|
||||
result
|
||||
};
|
||||
|
||||
let mut forwarded_count = 0;
|
||||
for env in envelopes {
|
||||
if env.can_forward() {
|
||||
let fwd = env.forwarded();
|
||||
let wire = fwd.to_wire();
|
||||
if self.transports.send(&next_hop_addr, &wire).await.is_ok() {
|
||||
forwarded_count += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(forwarded_count)
|
||||
}
|
||||
|
||||
/// Get delivery statistics for a specific destination.
|
||||
pub fn stats(&self, address: &[u8; 16]) -> Option<DeliveryStats> {
|
||||
self.stats
|
||||
.lock()
|
||||
.ok()
|
||||
.and_then(|s| s.get(address).cloned())
|
||||
}
|
||||
|
||||
/// Get delivery statistics for all known destinations.
|
||||
pub fn all_stats(&self) -> HashMap<[u8; 16], DeliveryStats> {
|
||||
self.stats
|
||||
.lock()
|
||||
.map(|s| s.clone())
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
/// This node's 16-byte truncated mesh address.
|
||||
pub fn local_address(&self) -> &[u8; 16] {
|
||||
&self.local_address
|
||||
}
|
||||
|
||||
/// Record a delivery in the per-destination stats.
|
||||
fn record_stats(&self, address: &[u8; 16], method: DeliveryResult, latency: Duration) {
|
||||
if let Ok(mut stats) = self.stats.lock() {
|
||||
stats
|
||||
.entry(*address)
|
||||
.or_default()
|
||||
.record(method, latency);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn delivery_stats_tracking() {
|
||||
let mut stats = DeliveryStats::default();
|
||||
assert_eq!(stats.total(), 0);
|
||||
|
||||
stats.record(DeliveryResult::Direct, Duration::from_millis(10));
|
||||
assert_eq!(stats.direct_count, 1);
|
||||
assert_eq!(stats.total(), 1);
|
||||
assert!(stats.last_delivery.is_some());
|
||||
assert!(stats.avg_latency.is_some());
|
||||
|
||||
stats.record(DeliveryResult::Forwarded, Duration::from_millis(20));
|
||||
assert_eq!(stats.forwarded_count, 1);
|
||||
assert_eq!(stats.total(), 2);
|
||||
|
||||
stats.record(DeliveryResult::Stored, Duration::from_millis(5));
|
||||
assert_eq!(stats.stored_count, 1);
|
||||
assert_eq!(stats.total(), 3);
|
||||
|
||||
stats.record(DeliveryResult::ServerRelay, Duration::from_millis(50));
|
||||
assert_eq!(stats.relay_count, 1);
|
||||
assert_eq!(stats.total(), 4);
|
||||
|
||||
// avg_latency should be present and reasonable.
|
||||
let avg = stats.avg_latency.unwrap();
|
||||
assert!(avg.as_millis() > 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn incoming_action_deliver_to_self() {
|
||||
let identity = MeshIdentity::generate();
|
||||
let our_key = identity.public_key();
|
||||
let routes = Arc::new(RwLock::new(RoutingTable::new(Duration::from_secs(300))));
|
||||
let transports = Arc::new(TransportManager::new());
|
||||
let store = Arc::new(Mutex::new(MeshStore::new(100)));
|
||||
|
||||
let router = MeshRouter::new(identity, routes, transports, store);
|
||||
|
||||
// Create an envelope addressed to our key.
|
||||
let sender = MeshIdentity::generate();
|
||||
let envelope =
|
||||
MeshEnvelope::new(&sender, &our_key, b"hello self".to_vec(), 3600, 5);
|
||||
|
||||
let action = router.handle_incoming(envelope).expect("handle_incoming");
|
||||
match action {
|
||||
IncomingAction::Deliver(env) => {
|
||||
assert_eq!(env.payload, b"hello self");
|
||||
}
|
||||
other => panic!("expected Deliver, got {:?}", std::mem::discriminant(&other)),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn incoming_action_broadcast_delivers() {
|
||||
let identity = MeshIdentity::generate();
|
||||
let routes = Arc::new(RwLock::new(RoutingTable::new(Duration::from_secs(300))));
|
||||
let transports = Arc::new(TransportManager::new());
|
||||
let store = Arc::new(Mutex::new(MeshStore::new(100)));
|
||||
|
||||
let router = MeshRouter::new(identity, routes, transports, store);
|
||||
|
||||
// Create a broadcast envelope (empty recipient key).
|
||||
let sender = MeshIdentity::generate();
|
||||
let envelope =
|
||||
MeshEnvelope::new(&sender, &[], b"broadcast msg".to_vec(), 3600, 5);
|
||||
|
||||
let action = router.handle_incoming(envelope).expect("handle_incoming");
|
||||
match action {
|
||||
IncomingAction::Deliver(env) => {
|
||||
assert_eq!(env.payload, b"broadcast msg");
|
||||
assert!(env.recipient_key.is_empty());
|
||||
}
|
||||
other => panic!("expected Deliver, got {:?}", std::mem::discriminant(&other)),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn incoming_action_dropped_expired() {
|
||||
let identity = MeshIdentity::generate();
|
||||
let routes = Arc::new(RwLock::new(RoutingTable::new(Duration::from_secs(300))));
|
||||
let transports = Arc::new(TransportManager::new());
|
||||
let store = Arc::new(Mutex::new(MeshStore::new(100)));
|
||||
|
||||
let router = MeshRouter::new(identity, routes, transports, store);
|
||||
|
||||
// Create an envelope addressed to someone else with TTL=0.
|
||||
// is_expired() checks: now - timestamp > ttl_secs.
|
||||
// With ttl=0 and timestamp=now, we need to wait >0 seconds for expiry.
|
||||
let sender = MeshIdentity::generate();
|
||||
let other_key = [0xBB; 32];
|
||||
let envelope =
|
||||
MeshEnvelope::new(&sender, &other_key, b"expired".to_vec(), 0, 5);
|
||||
|
||||
// Sleep briefly so that now - timestamp > 0 (the TTL).
|
||||
std::thread::sleep(Duration::from_millis(1100));
|
||||
|
||||
let action = router.handle_incoming(envelope).expect("handle_incoming");
|
||||
match action {
|
||||
IncomingAction::Dropped(reason) => {
|
||||
assert!(
|
||||
reason.contains("expired"),
|
||||
"expected expired reason, got: {reason}"
|
||||
);
|
||||
}
|
||||
other => panic!("expected Dropped, got {:?}", std::mem::discriminant(&other)),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn incoming_action_dropped_invalid_sig() {
|
||||
let identity = MeshIdentity::generate();
|
||||
let routes = Arc::new(RwLock::new(RoutingTable::new(Duration::from_secs(300))));
|
||||
let transports = Arc::new(TransportManager::new());
|
||||
let store = Arc::new(Mutex::new(MeshStore::new(100)));
|
||||
|
||||
let router = MeshRouter::new(identity, routes, transports, store);
|
||||
|
||||
// Create a valid envelope then tamper with the payload.
|
||||
let sender = MeshIdentity::generate();
|
||||
let other_key = [0xCC; 32];
|
||||
let mut envelope =
|
||||
MeshEnvelope::new(&sender, &other_key, b"original".to_vec(), 3600, 5);
|
||||
envelope.payload = b"tampered".to_vec();
|
||||
|
||||
let action = router.handle_incoming(envelope).expect("handle_incoming");
|
||||
match action {
|
||||
IncomingAction::Dropped(reason) => {
|
||||
assert!(
|
||||
reason.contains("invalid signature"),
|
||||
"expected invalid signature reason, got: {reason}"
|
||||
);
|
||||
}
|
||||
other => panic!("expected Dropped, got {:?}", std::mem::discriminant(&other)),
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user