From 5cc37cc88b2b6281c92c917f54d0f90c505e976e Mon Sep 17 00:00:00 2001 From: Christian Nennemann Date: Wed, 4 Mar 2026 21:09:42 +0100 Subject: [PATCH] feat(p2p): add hybrid routing with direct-first and server relay fallback Adds a routing module to quicproquo-p2p implementing hybrid message delivery: attempts direct P2P via iroh QUIC (with NAT traversal) first, then falls back to server relay if direct delivery fails or times out. Includes per-peer ConnectionStats tracking direct vs relayed counts, latency averages, and direct delivery ratio metrics. --- crates/quicproquo-p2p/src/lib.rs | 1 + crates/quicproquo-p2p/src/routing.rs | 425 +++++++++++++++++++++++++++ 2 files changed, 426 insertions(+) create mode 100644 crates/quicproquo-p2p/src/routing.rs diff --git a/crates/quicproquo-p2p/src/lib.rs b/crates/quicproquo-p2p/src/lib.rs index a12244e..d3d2284 100644 --- a/crates/quicproquo-p2p/src/lib.rs +++ b/crates/quicproquo-p2p/src/lib.rs @@ -15,6 +15,7 @@ pub mod broadcast; pub mod envelope; pub mod identity; +pub mod routing; pub mod store; #[cfg(feature = "traffic-resistance")] pub mod traffic_resistance; diff --git a/crates/quicproquo-p2p/src/routing.rs b/crates/quicproquo-p2p/src/routing.rs new file mode 100644 index 0000000..53353f9 --- /dev/null +++ b/crates/quicproquo-p2p/src/routing.rs @@ -0,0 +1,425 @@ +//! Hybrid routing: direct P2P first, server relay fallback. +//! +//! The [`HybridRouter`] attempts to deliver messages directly to peers via +//! iroh QUIC connections (with automatic NAT traversal). If direct delivery +//! fails or the peer is unreachable, messages are relayed through the +//! central server as a fallback. +//! +//! # Routing strategy +//! +//! ```text +//! send_message(peer, payload) +//! ├─ try direct P2P via iroh (QUIC + relay hole-punching) +//! │ ├─ success → record DirectSuccess metric +//! │ └─ failure → fall through +//! └─ relay through server +//! ├─ success → record RelayFallback metric +//! └─ failure → return error +//! ``` +//! +//! # Connection quality +//! +//! Per-peer [`ConnectionStats`] are tracked, recording direct vs relayed +//! delivery counts and latency measurements. The router exposes these +//! stats for UI display and adaptive routing decisions. + +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; + +use iroh::{EndpointAddr, PublicKey}; + +use crate::P2pNode; + +/// Callback trait for relaying messages through the server when direct P2P fails. +/// +/// Implementations typically call the SDK's enqueue RPC to relay through +/// the server's store-and-forward path. +pub trait ServerRelay: Send + Sync { + /// Relay a message to `recipient` through the server. + /// + /// Returns `Ok(())` on successful relay, or an error if the server + /// is unreachable or rejects the message. + fn relay( + &self, + recipient: &[u8], + payload: &[u8], + ) -> std::pin::Pin> + Send + '_>>; +} + +/// How a message was delivered. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum DeliveryMethod { + /// Delivered directly via P2P QUIC connection. + Direct, + /// Relayed through the central server. + Relayed, +} + +/// Per-peer connection quality statistics. +#[derive(Debug, Clone)] +pub struct ConnectionStats { + /// Number of messages delivered directly via P2P. + pub direct_count: u64, + /// Number of messages relayed through the server. + pub relayed_count: u64, + /// Average direct delivery latency (if any direct deliveries occurred). + pub avg_direct_latency: Option, + /// Average relay delivery latency (if any relay deliveries occurred). + pub avg_relay_latency: Option, + /// Last successful direct delivery time. + pub last_direct: Option, + /// Last successful relay delivery time. + pub last_relay: Option, +} + +impl ConnectionStats { + fn new() -> Self { + Self { + direct_count: 0, + relayed_count: 0, + avg_direct_latency: None, + avg_relay_latency: None, + last_direct: None, + last_relay: None, + } + } + + fn record_direct(&mut self, latency: Duration) { + self.direct_count += 1; + self.last_direct = Some(Instant::now()); + self.avg_direct_latency = Some(match self.avg_direct_latency { + Some(prev) => { + let n = self.direct_count as u32; + // Exponential moving average with 1/n weight for first few, then ~1/8. + let alpha = if n < 8 { n } else { 8 }; + (prev * (alpha - 1) + latency) / alpha + } + None => latency, + }); + } + + fn record_relay(&mut self, latency: Duration) { + self.relayed_count += 1; + self.last_relay = Some(Instant::now()); + self.avg_relay_latency = Some(match self.avg_relay_latency { + Some(prev) => { + let n = self.relayed_count as u32; + let alpha = if n < 8 { n } else { 8 }; + (prev * (alpha - 1) + latency) / alpha + } + None => latency, + }); + } + + /// Fraction of messages delivered directly (0.0 to 1.0). + pub fn direct_ratio(&self) -> f64 { + let total = self.direct_count + self.relayed_count; + if total == 0 { + return 0.0; + } + self.direct_count as f64 / total as f64 + } +} + +/// Known peer address information for routing decisions. +#[derive(Debug, Clone)] +pub struct PeerRoute { + /// iroh endpoint address for direct P2P delivery. + pub endpoint_addr: EndpointAddr, + /// Recipient identity key (for server relay addressing). + pub identity_key: Vec, +} + +/// Hybrid message router: tries direct P2P, falls back to server relay. +pub struct HybridRouter { + node: Arc, + relay: Arc, + /// Known peer routes, keyed by iroh PublicKey. + peers: Mutex>, + /// Per-peer connection quality statistics. + stats: Mutex>, + /// Timeout for direct P2P delivery attempts before falling back. + direct_timeout: Duration, +} + +impl HybridRouter { + /// Create a new hybrid router. + /// + /// `direct_timeout` controls how long to wait for a direct P2P delivery + /// before falling back to the server relay. + pub fn new( + node: Arc, + relay: Arc, + direct_timeout: Duration, + ) -> Self { + Self { + node, + relay, + peers: Mutex::new(HashMap::new()), + stats: Mutex::new(HashMap::new()), + direct_timeout, + } + } + + /// Register a peer's routing information. + pub fn add_peer(&self, peer_id: PublicKey, route: PeerRoute) { + if let Ok(mut peers) = self.peers.lock() { + peers.insert(peer_id, route); + } + } + + /// Remove a peer's routing information. + pub fn remove_peer(&self, peer_id: &PublicKey) { + if let Ok(mut peers) = self.peers.lock() { + peers.remove(peer_id); + } + } + + /// Send a message using hybrid routing: direct P2P first, server relay fallback. + /// + /// Returns the delivery method used on success. + pub async fn send( + &self, + peer_id: &PublicKey, + payload: &[u8], + ) -> anyhow::Result { + let route = { + let peers = self + .peers + .lock() + .map_err(|e| anyhow::anyhow!("peers lock: {e}"))?; + peers.get(peer_id).cloned() + }; + + let route = route + .ok_or_else(|| anyhow::anyhow!("no route known for peer {}", peer_id.fmt_short()))?; + + // Try direct P2P first. + let start = Instant::now(); + let direct_result = tokio::time::timeout( + self.direct_timeout, + self.node.send(route.endpoint_addr.clone(), payload), + ) + .await; + + match direct_result { + Ok(Ok(())) => { + let latency = start.elapsed(); + tracing::debug!( + peer = %peer_id.fmt_short(), + latency_ms = latency.as_millis(), + "direct P2P delivery succeeded" + ); + if let Ok(mut stats) = self.stats.lock() { + stats + .entry(*peer_id) + .or_insert_with(ConnectionStats::new) + .record_direct(latency); + } + return Ok(DeliveryMethod::Direct); + } + Ok(Err(e)) => { + tracing::debug!( + peer = %peer_id.fmt_short(), + error = %e, + "direct P2P delivery failed, falling back to relay" + ); + } + Err(_) => { + tracing::debug!( + peer = %peer_id.fmt_short(), + timeout_ms = self.direct_timeout.as_millis(), + "direct P2P delivery timed out, falling back to relay" + ); + } + } + + // Fall back to server relay. + let start = Instant::now(); + self.relay.relay(&route.identity_key, payload).await?; + let latency = start.elapsed(); + + tracing::debug!( + peer = %peer_id.fmt_short(), + latency_ms = latency.as_millis(), + "server relay delivery succeeded" + ); + + if let Ok(mut stats) = self.stats.lock() { + stats + .entry(*peer_id) + .or_insert_with(ConnectionStats::new) + .record_relay(latency); + } + + Ok(DeliveryMethod::Relayed) + } + + /// Get connection statistics for a specific peer. + pub fn peer_stats(&self, peer_id: &PublicKey) -> Option { + self.stats + .lock() + .ok() + .and_then(|s| s.get(peer_id).cloned()) + } + + /// Get connection statistics for all known peers. + pub fn all_stats(&self) -> HashMap { + self.stats + .lock() + .map(|s| s.clone()) + .unwrap_or_default() + } + + /// Get the aggregate direct delivery ratio across all peers. + pub fn overall_direct_ratio(&self) -> f64 { + let stats = match self.stats.lock() { + Ok(s) => s, + Err(_) => return 0.0, + }; + let (direct, relayed) = stats + .values() + .fold((0u64, 0u64), |(d, r), s| (d + s.direct_count, r + s.relayed_count)); + let total = direct + relayed; + if total == 0 { + return 0.0; + } + direct as f64 / total as f64 + } +} + +#[cfg(test)] +mod tests { + use super::*; + + /// A mock server relay that records calls and succeeds. + struct MockRelay { + calls: Mutex, Vec)>>, + } + + impl MockRelay { + fn new() -> Self { + Self { + calls: Mutex::new(Vec::new()), + } + } + + fn call_count(&self) -> usize { + self.calls.lock().map(|c| c.len()).unwrap_or(0) + } + } + + impl ServerRelay for MockRelay { + fn relay( + &self, + recipient: &[u8], + payload: &[u8], + ) -> std::pin::Pin> + Send + '_>> + { + if let Ok(mut calls) = self.calls.lock() { + calls.push((recipient.to_vec(), payload.to_vec())); + } + Box::pin(async { Ok(()) }) + } + } + + #[test] + fn connection_stats_direct_ratio() { + let mut stats = ConnectionStats::new(); + assert_eq!(stats.direct_ratio(), 0.0); + + stats.record_direct(Duration::from_millis(10)); + assert_eq!(stats.direct_ratio(), 1.0); + + stats.record_relay(Duration::from_millis(50)); + assert_eq!(stats.direct_ratio(), 0.5); + + stats.record_direct(Duration::from_millis(15)); + // 2 direct, 1 relayed = 2/3 + assert!((stats.direct_ratio() - 2.0 / 3.0).abs() < 0.001); + } + + #[test] + fn connection_stats_latency_tracking() { + let mut stats = ConnectionStats::new(); + stats.record_direct(Duration::from_millis(10)); + assert!(stats.avg_direct_latency.is_some()); + assert!(stats.last_direct.is_some()); + assert!(stats.avg_relay_latency.is_none()); + + stats.record_relay(Duration::from_millis(100)); + assert!(stats.avg_relay_latency.is_some()); + assert!(stats.last_relay.is_some()); + } + + #[tokio::test] + async fn hybrid_router_falls_back_to_relay() { + // Create a node with relay disabled — direct sends to a bogus address will fail. + let node = P2pNode::start(None).await.expect("start node"); + let node = Arc::new(node); + let relay = Arc::new(MockRelay::new()); + + let router = HybridRouter::new( + Arc::clone(&node), + Arc::clone(&relay) as Arc, + Duration::from_millis(500), + ); + + // Register a peer with a bogus address that will fail direct delivery. + let peer_key = iroh::SecretKey::from_bytes(&rand::random()); + let peer_id = peer_key.public(); + let route = PeerRoute { + endpoint_addr: EndpointAddr::from(peer_id), + identity_key: vec![0xAA; 32], + }; + router.add_peer(peer_id, route); + + // Send — should fall back to relay. + let method = router + .send(&peer_id, b"hello hybrid") + .await + .expect("send should succeed via relay"); + assert_eq!(method, DeliveryMethod::Relayed); + assert_eq!(relay.call_count(), 1); + + // Stats should reflect one relayed delivery. + let stats = router.peer_stats(&peer_id).expect("stats should exist"); + assert_eq!(stats.relayed_count, 1); + assert_eq!(stats.direct_count, 0); + assert_eq!(stats.direct_ratio(), 0.0); + + drop(router); + Arc::try_unwrap(node).ok().expect("sole owner").close().await; + } + + #[test] + fn add_remove_peer() { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("runtime"); + + rt.block_on(async { + let node = Arc::new(P2pNode::start(None).await.expect("start")); + let relay = Arc::new(MockRelay::new()); + let router = HybridRouter::new(node.clone(), relay, Duration::from_secs(1)); + + let sk = iroh::SecretKey::from_bytes(&rand::random()); + let pk = sk.public(); + let route = PeerRoute { + endpoint_addr: EndpointAddr::from(pk), + identity_key: vec![0xBB; 32], + }; + + router.add_peer(pk, route); + assert!(router.peers.lock().unwrap().contains_key(&pk)); + + router.remove_peer(&pk); + assert!(!router.peers.lock().unwrap().contains_key(&pk)); + + drop(router); + Arc::try_unwrap(node).ok().expect("sole owner").close().await; + }); + } +}