feat(p2p): add hybrid routing with direct-first and server relay fallback
Adds a routing module to quicproquo-p2p implementing hybrid message delivery: attempts direct P2P via iroh QUIC (with NAT traversal) first, then falls back to server relay if direct delivery fails or times out. Includes per-peer ConnectionStats tracking direct vs relayed counts, latency averages, and direct delivery ratio metrics.
This commit is contained in:
@@ -15,6 +15,7 @@
|
||||
pub mod broadcast;
|
||||
pub mod envelope;
|
||||
pub mod identity;
|
||||
pub mod routing;
|
||||
pub mod store;
|
||||
#[cfg(feature = "traffic-resistance")]
|
||||
pub mod traffic_resistance;
|
||||
|
||||
425
crates/quicproquo-p2p/src/routing.rs
Normal file
425
crates/quicproquo-p2p/src/routing.rs
Normal file
@@ -0,0 +1,425 @@
|
||||
//! Hybrid routing: direct P2P first, server relay fallback.
|
||||
//!
|
||||
//! The [`HybridRouter`] attempts to deliver messages directly to peers via
|
||||
//! iroh QUIC connections (with automatic NAT traversal). If direct delivery
|
||||
//! fails or the peer is unreachable, messages are relayed through the
|
||||
//! central server as a fallback.
|
||||
//!
|
||||
//! # Routing strategy
|
||||
//!
|
||||
//! ```text
|
||||
//! send_message(peer, payload)
|
||||
//! ├─ try direct P2P via iroh (QUIC + relay hole-punching)
|
||||
//! │ ├─ success → record DirectSuccess metric
|
||||
//! │ └─ failure → fall through
|
||||
//! └─ relay through server
|
||||
//! ├─ success → record RelayFallback metric
|
||||
//! └─ failure → return error
|
||||
//! ```
|
||||
//!
|
||||
//! # Connection quality
|
||||
//!
|
||||
//! Per-peer [`ConnectionStats`] are tracked, recording direct vs relayed
|
||||
//! delivery counts and latency measurements. The router exposes these
|
||||
//! stats for UI display and adaptive routing decisions.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use iroh::{EndpointAddr, PublicKey};
|
||||
|
||||
use crate::P2pNode;
|
||||
|
||||
/// Callback trait for relaying messages through the server when direct P2P fails.
|
||||
///
|
||||
/// Implementations typically call the SDK's enqueue RPC to relay through
|
||||
/// the server's store-and-forward path.
|
||||
pub trait ServerRelay: Send + Sync {
|
||||
/// Relay a message to `recipient` through the server.
|
||||
///
|
||||
/// Returns `Ok(())` on successful relay, or an error if the server
|
||||
/// is unreachable or rejects the message.
|
||||
fn relay(
|
||||
&self,
|
||||
recipient: &[u8],
|
||||
payload: &[u8],
|
||||
) -> std::pin::Pin<Box<dyn std::future::Future<Output = anyhow::Result<()>> + Send + '_>>;
|
||||
}
|
||||
|
||||
/// How a message was delivered.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum DeliveryMethod {
|
||||
/// Delivered directly via P2P QUIC connection.
|
||||
Direct,
|
||||
/// Relayed through the central server.
|
||||
Relayed,
|
||||
}
|
||||
|
||||
/// Per-peer connection quality statistics.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ConnectionStats {
|
||||
/// Number of messages delivered directly via P2P.
|
||||
pub direct_count: u64,
|
||||
/// Number of messages relayed through the server.
|
||||
pub relayed_count: u64,
|
||||
/// Average direct delivery latency (if any direct deliveries occurred).
|
||||
pub avg_direct_latency: Option<Duration>,
|
||||
/// Average relay delivery latency (if any relay deliveries occurred).
|
||||
pub avg_relay_latency: Option<Duration>,
|
||||
/// Last successful direct delivery time.
|
||||
pub last_direct: Option<Instant>,
|
||||
/// Last successful relay delivery time.
|
||||
pub last_relay: Option<Instant>,
|
||||
}
|
||||
|
||||
impl ConnectionStats {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
direct_count: 0,
|
||||
relayed_count: 0,
|
||||
avg_direct_latency: None,
|
||||
avg_relay_latency: None,
|
||||
last_direct: None,
|
||||
last_relay: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn record_direct(&mut self, latency: Duration) {
|
||||
self.direct_count += 1;
|
||||
self.last_direct = Some(Instant::now());
|
||||
self.avg_direct_latency = Some(match self.avg_direct_latency {
|
||||
Some(prev) => {
|
||||
let n = self.direct_count as u32;
|
||||
// Exponential moving average with 1/n weight for first few, then ~1/8.
|
||||
let alpha = if n < 8 { n } else { 8 };
|
||||
(prev * (alpha - 1) + latency) / alpha
|
||||
}
|
||||
None => latency,
|
||||
});
|
||||
}
|
||||
|
||||
fn record_relay(&mut self, latency: Duration) {
|
||||
self.relayed_count += 1;
|
||||
self.last_relay = Some(Instant::now());
|
||||
self.avg_relay_latency = Some(match self.avg_relay_latency {
|
||||
Some(prev) => {
|
||||
let n = self.relayed_count as u32;
|
||||
let alpha = if n < 8 { n } else { 8 };
|
||||
(prev * (alpha - 1) + latency) / alpha
|
||||
}
|
||||
None => latency,
|
||||
});
|
||||
}
|
||||
|
||||
/// Fraction of messages delivered directly (0.0 to 1.0).
|
||||
pub fn direct_ratio(&self) -> f64 {
|
||||
let total = self.direct_count + self.relayed_count;
|
||||
if total == 0 {
|
||||
return 0.0;
|
||||
}
|
||||
self.direct_count as f64 / total as f64
|
||||
}
|
||||
}
|
||||
|
||||
/// Known peer address information for routing decisions.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct PeerRoute {
|
||||
/// iroh endpoint address for direct P2P delivery.
|
||||
pub endpoint_addr: EndpointAddr,
|
||||
/// Recipient identity key (for server relay addressing).
|
||||
pub identity_key: Vec<u8>,
|
||||
}
|
||||
|
||||
/// Hybrid message router: tries direct P2P, falls back to server relay.
|
||||
pub struct HybridRouter {
|
||||
node: Arc<P2pNode>,
|
||||
relay: Arc<dyn ServerRelay>,
|
||||
/// Known peer routes, keyed by iroh PublicKey.
|
||||
peers: Mutex<HashMap<PublicKey, PeerRoute>>,
|
||||
/// Per-peer connection quality statistics.
|
||||
stats: Mutex<HashMap<PublicKey, ConnectionStats>>,
|
||||
/// Timeout for direct P2P delivery attempts before falling back.
|
||||
direct_timeout: Duration,
|
||||
}
|
||||
|
||||
impl HybridRouter {
|
||||
/// Create a new hybrid router.
|
||||
///
|
||||
/// `direct_timeout` controls how long to wait for a direct P2P delivery
|
||||
/// before falling back to the server relay.
|
||||
pub fn new(
|
||||
node: Arc<P2pNode>,
|
||||
relay: Arc<dyn ServerRelay>,
|
||||
direct_timeout: Duration,
|
||||
) -> Self {
|
||||
Self {
|
||||
node,
|
||||
relay,
|
||||
peers: Mutex::new(HashMap::new()),
|
||||
stats: Mutex::new(HashMap::new()),
|
||||
direct_timeout,
|
||||
}
|
||||
}
|
||||
|
||||
/// Register a peer's routing information.
|
||||
pub fn add_peer(&self, peer_id: PublicKey, route: PeerRoute) {
|
||||
if let Ok(mut peers) = self.peers.lock() {
|
||||
peers.insert(peer_id, route);
|
||||
}
|
||||
}
|
||||
|
||||
/// Remove a peer's routing information.
|
||||
pub fn remove_peer(&self, peer_id: &PublicKey) {
|
||||
if let Ok(mut peers) = self.peers.lock() {
|
||||
peers.remove(peer_id);
|
||||
}
|
||||
}
|
||||
|
||||
/// Send a message using hybrid routing: direct P2P first, server relay fallback.
|
||||
///
|
||||
/// Returns the delivery method used on success.
|
||||
pub async fn send(
|
||||
&self,
|
||||
peer_id: &PublicKey,
|
||||
payload: &[u8],
|
||||
) -> anyhow::Result<DeliveryMethod> {
|
||||
let route = {
|
||||
let peers = self
|
||||
.peers
|
||||
.lock()
|
||||
.map_err(|e| anyhow::anyhow!("peers lock: {e}"))?;
|
||||
peers.get(peer_id).cloned()
|
||||
};
|
||||
|
||||
let route = route
|
||||
.ok_or_else(|| anyhow::anyhow!("no route known for peer {}", peer_id.fmt_short()))?;
|
||||
|
||||
// Try direct P2P first.
|
||||
let start = Instant::now();
|
||||
let direct_result = tokio::time::timeout(
|
||||
self.direct_timeout,
|
||||
self.node.send(route.endpoint_addr.clone(), payload),
|
||||
)
|
||||
.await;
|
||||
|
||||
match direct_result {
|
||||
Ok(Ok(())) => {
|
||||
let latency = start.elapsed();
|
||||
tracing::debug!(
|
||||
peer = %peer_id.fmt_short(),
|
||||
latency_ms = latency.as_millis(),
|
||||
"direct P2P delivery succeeded"
|
||||
);
|
||||
if let Ok(mut stats) = self.stats.lock() {
|
||||
stats
|
||||
.entry(*peer_id)
|
||||
.or_insert_with(ConnectionStats::new)
|
||||
.record_direct(latency);
|
||||
}
|
||||
return Ok(DeliveryMethod::Direct);
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
tracing::debug!(
|
||||
peer = %peer_id.fmt_short(),
|
||||
error = %e,
|
||||
"direct P2P delivery failed, falling back to relay"
|
||||
);
|
||||
}
|
||||
Err(_) => {
|
||||
tracing::debug!(
|
||||
peer = %peer_id.fmt_short(),
|
||||
timeout_ms = self.direct_timeout.as_millis(),
|
||||
"direct P2P delivery timed out, falling back to relay"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Fall back to server relay.
|
||||
let start = Instant::now();
|
||||
self.relay.relay(&route.identity_key, payload).await?;
|
||||
let latency = start.elapsed();
|
||||
|
||||
tracing::debug!(
|
||||
peer = %peer_id.fmt_short(),
|
||||
latency_ms = latency.as_millis(),
|
||||
"server relay delivery succeeded"
|
||||
);
|
||||
|
||||
if let Ok(mut stats) = self.stats.lock() {
|
||||
stats
|
||||
.entry(*peer_id)
|
||||
.or_insert_with(ConnectionStats::new)
|
||||
.record_relay(latency);
|
||||
}
|
||||
|
||||
Ok(DeliveryMethod::Relayed)
|
||||
}
|
||||
|
||||
/// Get connection statistics for a specific peer.
|
||||
pub fn peer_stats(&self, peer_id: &PublicKey) -> Option<ConnectionStats> {
|
||||
self.stats
|
||||
.lock()
|
||||
.ok()
|
||||
.and_then(|s| s.get(peer_id).cloned())
|
||||
}
|
||||
|
||||
/// Get connection statistics for all known peers.
|
||||
pub fn all_stats(&self) -> HashMap<PublicKey, ConnectionStats> {
|
||||
self.stats
|
||||
.lock()
|
||||
.map(|s| s.clone())
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
/// Get the aggregate direct delivery ratio across all peers.
|
||||
pub fn overall_direct_ratio(&self) -> f64 {
|
||||
let stats = match self.stats.lock() {
|
||||
Ok(s) => s,
|
||||
Err(_) => return 0.0,
|
||||
};
|
||||
let (direct, relayed) = stats
|
||||
.values()
|
||||
.fold((0u64, 0u64), |(d, r), s| (d + s.direct_count, r + s.relayed_count));
|
||||
let total = direct + relayed;
|
||||
if total == 0 {
|
||||
return 0.0;
|
||||
}
|
||||
direct as f64 / total as f64
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
/// A mock server relay that records calls and succeeds.
|
||||
struct MockRelay {
|
||||
calls: Mutex<Vec<(Vec<u8>, Vec<u8>)>>,
|
||||
}
|
||||
|
||||
impl MockRelay {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
calls: Mutex::new(Vec::new()),
|
||||
}
|
||||
}
|
||||
|
||||
fn call_count(&self) -> usize {
|
||||
self.calls.lock().map(|c| c.len()).unwrap_or(0)
|
||||
}
|
||||
}
|
||||
|
||||
impl ServerRelay for MockRelay {
|
||||
fn relay(
|
||||
&self,
|
||||
recipient: &[u8],
|
||||
payload: &[u8],
|
||||
) -> std::pin::Pin<Box<dyn std::future::Future<Output = anyhow::Result<()>> + Send + '_>>
|
||||
{
|
||||
if let Ok(mut calls) = self.calls.lock() {
|
||||
calls.push((recipient.to_vec(), payload.to_vec()));
|
||||
}
|
||||
Box::pin(async { Ok(()) })
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn connection_stats_direct_ratio() {
|
||||
let mut stats = ConnectionStats::new();
|
||||
assert_eq!(stats.direct_ratio(), 0.0);
|
||||
|
||||
stats.record_direct(Duration::from_millis(10));
|
||||
assert_eq!(stats.direct_ratio(), 1.0);
|
||||
|
||||
stats.record_relay(Duration::from_millis(50));
|
||||
assert_eq!(stats.direct_ratio(), 0.5);
|
||||
|
||||
stats.record_direct(Duration::from_millis(15));
|
||||
// 2 direct, 1 relayed = 2/3
|
||||
assert!((stats.direct_ratio() - 2.0 / 3.0).abs() < 0.001);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn connection_stats_latency_tracking() {
|
||||
let mut stats = ConnectionStats::new();
|
||||
stats.record_direct(Duration::from_millis(10));
|
||||
assert!(stats.avg_direct_latency.is_some());
|
||||
assert!(stats.last_direct.is_some());
|
||||
assert!(stats.avg_relay_latency.is_none());
|
||||
|
||||
stats.record_relay(Duration::from_millis(100));
|
||||
assert!(stats.avg_relay_latency.is_some());
|
||||
assert!(stats.last_relay.is_some());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn hybrid_router_falls_back_to_relay() {
|
||||
// Create a node with relay disabled — direct sends to a bogus address will fail.
|
||||
let node = P2pNode::start(None).await.expect("start node");
|
||||
let node = Arc::new(node);
|
||||
let relay = Arc::new(MockRelay::new());
|
||||
|
||||
let router = HybridRouter::new(
|
||||
Arc::clone(&node),
|
||||
Arc::clone(&relay) as Arc<dyn ServerRelay>,
|
||||
Duration::from_millis(500),
|
||||
);
|
||||
|
||||
// Register a peer with a bogus address that will fail direct delivery.
|
||||
let peer_key = iroh::SecretKey::from_bytes(&rand::random());
|
||||
let peer_id = peer_key.public();
|
||||
let route = PeerRoute {
|
||||
endpoint_addr: EndpointAddr::from(peer_id),
|
||||
identity_key: vec![0xAA; 32],
|
||||
};
|
||||
router.add_peer(peer_id, route);
|
||||
|
||||
// Send — should fall back to relay.
|
||||
let method = router
|
||||
.send(&peer_id, b"hello hybrid")
|
||||
.await
|
||||
.expect("send should succeed via relay");
|
||||
assert_eq!(method, DeliveryMethod::Relayed);
|
||||
assert_eq!(relay.call_count(), 1);
|
||||
|
||||
// Stats should reflect one relayed delivery.
|
||||
let stats = router.peer_stats(&peer_id).expect("stats should exist");
|
||||
assert_eq!(stats.relayed_count, 1);
|
||||
assert_eq!(stats.direct_count, 0);
|
||||
assert_eq!(stats.direct_ratio(), 0.0);
|
||||
|
||||
drop(router);
|
||||
Arc::try_unwrap(node).ok().expect("sole owner").close().await;
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn add_remove_peer() {
|
||||
let rt = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("runtime");
|
||||
|
||||
rt.block_on(async {
|
||||
let node = Arc::new(P2pNode::start(None).await.expect("start"));
|
||||
let relay = Arc::new(MockRelay::new());
|
||||
let router = HybridRouter::new(node.clone(), relay, Duration::from_secs(1));
|
||||
|
||||
let sk = iroh::SecretKey::from_bytes(&rand::random());
|
||||
let pk = sk.public();
|
||||
let route = PeerRoute {
|
||||
endpoint_addr: EndpointAddr::from(pk),
|
||||
identity_key: vec![0xBB; 32],
|
||||
};
|
||||
|
||||
router.add_peer(pk, route);
|
||||
assert!(router.peers.lock().unwrap().contains_key(&pk));
|
||||
|
||||
router.remove_peer(&pk);
|
||||
assert!(!router.peers.lock().unwrap().contains_key(&pk));
|
||||
|
||||
drop(router);
|
||||
Arc::try_unwrap(node).ok().expect("sole owner").close().await;
|
||||
});
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user