From f9ac921a0c8890c7b6df4f1bf179df18d1e2fe6d Mon Sep 17 00:00:00 2001 From: Christian Nennemann Date: Mon, 30 Mar 2026 21:19:12 +0200 Subject: [PATCH] feat(p2p): mesh stack, LoRa mock transport, and relay demo Implement transport abstraction (TCP/iroh), announce and routing table, multi-hop mesh router, truncated-address link layer, and LoRa mock medium with fragmentation plus EU868-style duty-cycle accounting. Add mesh_lora_relay_demo and scripts/mesh-demo.sh. Relax CBOR vs JSON size assertion to match fixed-size cryptographic overhead. Extend .gitignore for nested targets and node_modules. Made-with: Cursor --- .gitignore | 2 + Cargo.lock | 5 + crates/quicprochat-p2p/Cargo.toml | 9 +- .../examples/mesh_lora_relay_demo.rs | 96 +++ crates/quicprochat-p2p/src/address.rs | 135 ++++ crates/quicprochat-p2p/src/announce.rs | 281 ++++++++ .../quicprochat-p2p/src/announce_protocol.rs | 302 ++++++++ crates/quicprochat-p2p/src/envelope.rs | 86 ++- crates/quicprochat-p2p/src/lib.rs | 17 +- crates/quicprochat-p2p/src/link.rs | 492 +++++++++++++ crates/quicprochat-p2p/src/mesh_router.rs | 516 ++++++++++++++ crates/quicprochat-p2p/src/routing_table.rs | 245 +++++++ crates/quicprochat-p2p/src/transport.rs | 140 ++++ crates/quicprochat-p2p/src/transport_iroh.rs | 160 +++++ crates/quicprochat-p2p/src/transport_lora.rs | 656 ++++++++++++++++++ .../quicprochat-p2p/src/transport_manager.rs | 181 +++++ crates/quicprochat-p2p/src/transport_tcp.rs | 151 ++++ docs/plans/reticulum-mesh-upgrade.md | 511 ++++++++++++++ docs/status.md | 57 ++ scripts/mesh-demo.sh | 6 + 20 files changed, 4042 insertions(+), 6 deletions(-) create mode 100644 crates/quicprochat-p2p/examples/mesh_lora_relay_demo.rs create mode 100644 crates/quicprochat-p2p/src/address.rs create mode 100644 crates/quicprochat-p2p/src/announce.rs create mode 100644 crates/quicprochat-p2p/src/announce_protocol.rs create mode 100644 crates/quicprochat-p2p/src/link.rs create mode 100644 crates/quicprochat-p2p/src/mesh_router.rs create mode 100644 crates/quicprochat-p2p/src/routing_table.rs create mode 100644 crates/quicprochat-p2p/src/transport.rs create mode 100644 crates/quicprochat-p2p/src/transport_iroh.rs create mode 100644 crates/quicprochat-p2p/src/transport_lora.rs create mode 100644 crates/quicprochat-p2p/src/transport_manager.rs create mode 100644 crates/quicprochat-p2p/src/transport_tcp.rs create mode 100644 docs/plans/reticulum-mesh-upgrade.md create mode 100644 docs/status.md create mode 100755 scripts/mesh-demo.sh diff --git a/.gitignore b/.gitignore index 3ee90d8..deb259e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,6 @@ /target +**/target/ +node_modules/ **/*.rs.bk .vscode/ gitea-mcp.json diff --git a/Cargo.lock b/Cargo.lock index e6ae39f..80c4700 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4449,8 +4449,11 @@ name = "quicprochat-p2p" version = "0.1.0" dependencies = [ "anyhow", + "async-trait", "chacha20poly1305", + "ciborium", "hex", + "hkdf", "iroh", "quicprochat-core", "rand 0.8.5", @@ -4458,8 +4461,10 @@ dependencies = [ "serde_json", "sha2 0.10.9", "tempfile", + "thiserror 1.0.69", "tokio", "tracing", + "x25519-dalek", "zeroize", ] diff --git a/crates/quicprochat-p2p/Cargo.toml b/crates/quicprochat-p2p/Cargo.toml index b210f3f..323e521 100644 --- a/crates/quicprochat-p2p/Cargo.toml +++ b/crates/quicprochat-p2p/Cargo.toml @@ -14,7 +14,8 @@ workspace = true [dependencies] iroh = "0.96" -tokio = { version = "1", features = ["macros", "rt-multi-thread", "time", "sync"] } +tokio = { version = "1", features = ["macros", "rt-multi-thread", "time", "sync", "net", "io-util"] } +async-trait = "0.1" tracing = "0.1" anyhow = "1" @@ -22,6 +23,7 @@ anyhow = "1" quicprochat-core = { path = "../quicprochat-core", default-features = false } serde = { workspace = true } serde_json = { workspace = true } +ciborium = { workspace = true } sha2 = { workspace = true } hex = { workspace = true } @@ -30,5 +32,10 @@ chacha20poly1305 = { workspace = true } rand = { workspace = true } zeroize = { workspace = true } +# Lightweight mesh link handshake (X25519 ECDH + HKDF) +x25519-dalek = { workspace = true } +hkdf = { workspace = true } +thiserror = { workspace = true } + [dev-dependencies] tempfile = "3" diff --git a/crates/quicprochat-p2p/examples/mesh_lora_relay_demo.rs b/crates/quicprochat-p2p/examples/mesh_lora_relay_demo.rs new file mode 100644 index 0000000..82a3ec0 --- /dev/null +++ b/crates/quicprochat-p2p/examples/mesh_lora_relay_demo.rs @@ -0,0 +1,96 @@ +//! Simulated mesh leg: **A (LoRa)** → **B (LoRa + TCP relay)** → **C (TCP)** → zurück über B → **A**. +//! +//! Uses [`quicprochat_p2p::transport_lora::LoRaMockMedium`] — keine Hardware. +//! +//! ```text +//! Node A Node B Node C +//! LoRa addr 0x01 LoRa 0x02 + TCP listen TCP (WiFi / LAN) +//! │ │ │ +//! └──── LoRa ───────┘ │ +//! └──────── TCP ──────────────┘ +//! ``` +//! +//! Run: `cargo run -p quicprochat-p2p --example mesh_lora_relay_demo` + +use std::sync::Arc; +use std::time::Duration; + +use quicprochat_p2p::transport::{MeshTransport, TransportAddr}; +use quicprochat_p2p::transport_lora::{DutyCycleTracker, LoRaConfig, LoRaMockMedium}; +use quicprochat_p2p::transport_tcp::TcpTransport; + +const ADDR_A: [u8; 4] = [0x01, 0, 0, 0]; +const ADDR_B: [u8; 4] = [0x02, 0, 0, 0]; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let medium = LoRaMockMedium::new(); + let duty = Arc::new(DutyCycleTracker::new(3_600_000)); + + let lora_a = medium + .connect(ADDR_A, LoRaConfig::default(), Arc::clone(&duty)) + .await?; + let lora_b = medium + .connect(ADDR_B, LoRaConfig::default(), Arc::clone(&duty)) + .await?; + + let tcp_b = TcpTransport::bind("127.0.0.1:0").await?; + let tcp_c = TcpTransport::bind("127.0.0.1:0").await?; + + let c_listen = tcp_c.local_addr(); + let b_listen = tcp_b.local_addr(); + let c_addr = TransportAddr::Socket(c_listen); + let b_addr = TransportAddr::Socket(b_listen); + + println!( + "LoRa mock mesh demo: B relays LoRa <-> TCP (B TCP {}, C TCP {})", + b_listen, c_listen + ); + + let relay = tokio::spawn(async move { + for _ in 0..2 { + tokio::select! { + p = lora_b.recv() => { + let p = p.expect("B LoRa recv"); + println!("B: LoRa from {} -> TCP ({} bytes)", p.from, p.data.len()); + tcp_b.send(&c_addr, &p.data).await.expect("B TCP send to C"); + } + p = tcp_b.recv() => { + let p = p.expect("B TCP recv"); + println!("B: TCP -> LoRa A ({} bytes)", p.data.len()); + lora_b + .send(&TransportAddr::LoRa(ADDR_A), &p.data) + .await + .expect("B LoRa send to A"); + } + } + } + }); + + let c_task = tokio::spawn(async move { + let pkt = tcp_c.recv().await.expect("C TCP recv"); + println!("C: got {} bytes from B relay", pkt.data.len()); + assert_eq!(pkt.data, b"hello via mesh"); + tcp_c + .send(&b_addr, b"ack from C") + .await + .expect("C TCP send"); + }); + + tokio::time::sleep(Duration::from_millis(50)).await; + + lora_a + .send(&TransportAddr::LoRa(ADDR_B), b"hello via mesh") + .await?; + + let reply = lora_a.recv().await?; + println!("A: LoRa reply {} bytes", reply.data.len()); + assert_eq!(reply.data, b"ack from C"); + + c_task.await.expect("node C task panicked"); + relay.await.expect("relay task panicked"); + + lora_a.close().await.ok(); + println!("Done: LoRa + TCP relay path OK."); + Ok(()) +} diff --git a/crates/quicprochat-p2p/src/address.rs b/crates/quicprochat-p2p/src/address.rs new file mode 100644 index 0000000..d6076a8 --- /dev/null +++ b/crates/quicprochat-p2p/src/address.rs @@ -0,0 +1,135 @@ +//! Truncated mesh addresses for bandwidth-efficient routing. +//! +//! A [`MeshAddress`] is derived from an Ed25519 public key by taking the first +//! 16 bytes of its SHA-256 hash. This provides globally unique addressing +//! (birthday collision at ~2^64) while saving 16 bytes per packet compared to +//! full 32-byte public keys. + +use serde::{Deserialize, Serialize}; +use sha2::{Digest, Sha256}; +use std::fmt; + +/// 16-byte truncated mesh address. +#[derive(Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct MeshAddress([u8; 16]); + +impl MeshAddress { + /// Derive from a 32-byte Ed25519 public key. + pub fn from_public_key(key: &[u8; 32]) -> Self { + let hash = Sha256::digest(key); + let mut addr = [0u8; 16]; + addr.copy_from_slice(&hash[..16]); + Self(addr) + } + + /// Create from raw 16-byte array. + pub fn from_bytes(bytes: [u8; 16]) -> Self { + Self(bytes) + } + + /// Get the raw 16-byte address. + pub fn as_bytes(&self) -> &[u8; 16] { + &self.0 + } + + /// Check if a 32-byte public key matches this address. + pub fn matches_key(&self, key: &[u8; 32]) -> bool { + Self::from_public_key(key) == *self + } + + /// The broadcast address (all zeros). + pub const BROADCAST: Self = Self([0u8; 16]); + + /// Check if this is the broadcast address. + pub fn is_broadcast(&self) -> bool { + self.0 == [0u8; 16] + } +} + +impl fmt::Debug for MeshAddress { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "MeshAddress({})", hex::encode(self.0)) + } +} + +impl fmt::Display for MeshAddress { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", hex::encode(&self.0[..8])) + } +} + +impl From<[u8; 16]> for MeshAddress { + fn from(bytes: [u8; 16]) -> Self { + Self(bytes) + } +} + +impl AsRef<[u8; 16]> for MeshAddress { + fn as_ref(&self) -> &[u8; 16] { + &self.0 + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn from_key_deterministic() { + let key = [42u8; 32]; + let addr1 = MeshAddress::from_public_key(&key); + let addr2 = MeshAddress::from_public_key(&key); + assert_eq!(addr1, addr2, "same key must produce same address"); + } + + #[test] + fn different_keys_different_addresses() { + let key_a = [1u8; 32]; + let key_b = [2u8; 32]; + let addr_a = MeshAddress::from_public_key(&key_a); + let addr_b = MeshAddress::from_public_key(&key_b); + assert_ne!(addr_a, addr_b, "different keys must produce different addresses"); + } + + #[test] + fn matches_key_works() { + let key = [99u8; 32]; + let addr = MeshAddress::from_public_key(&key); + assert!(addr.matches_key(&key), "correct key must match"); + + let wrong_key = [100u8; 32]; + assert!(!addr.matches_key(&wrong_key), "wrong key must not match"); + } + + #[test] + fn broadcast_address() { + assert_eq!(*MeshAddress::BROADCAST.as_bytes(), [0u8; 16]); + assert!(MeshAddress::BROADCAST.is_broadcast()); + + let non_broadcast = MeshAddress::from_bytes([1u8; 16]); + assert!(!non_broadcast.is_broadcast()); + } + + #[test] + fn display_formatting() { + let key = [0xAB; 32]; + let addr = MeshAddress::from_public_key(&key); + let display = format!("{addr}"); + // Display shows first 8 bytes as hex = 16 hex chars. + assert_eq!(display.len(), 16, "display should show 8 bytes = 16 hex chars"); + + let debug = format!("{addr:?}"); + // Debug shows all 16 bytes as hex = 32 hex chars, plus wrapper. + assert!(debug.starts_with("MeshAddress(")); + assert!(debug.ends_with(')')); + } + + #[test] + fn serde_roundtrip() { + let key = [77u8; 32]; + let addr = MeshAddress::from_public_key(&key); + let json = serde_json::to_string(&addr).expect("serialize"); + let restored: MeshAddress = serde_json::from_str(&json).expect("deserialize"); + assert_eq!(addr, restored); + } +} diff --git a/crates/quicprochat-p2p/src/announce.rs b/crates/quicprochat-p2p/src/announce.rs new file mode 100644 index 0000000..567e965 --- /dev/null +++ b/crates/quicprochat-p2p/src/announce.rs @@ -0,0 +1,281 @@ +//! Mesh announce protocol for self-organizing network discovery. +//! +//! Nodes periodically broadcast signed [`MeshAnnounce`] packets. These propagate +//! through the mesh, building each node's [`RoutingTable`](crate::routing_table::RoutingTable). + +use serde::{Deserialize, Serialize}; +use sha2::{Digest, Sha256}; +use std::time::{SystemTime, UNIX_EPOCH}; + +use crate::identity::MeshIdentity; + +/// Capability flag: node can relay messages for others. +pub const CAP_RELAY: u16 = 0x0001; +/// Capability flag: node has store-and-forward. +pub const CAP_STORE: u16 = 0x0002; +/// Capability flag: node is connected to Internet/server. +pub const CAP_GATEWAY: u16 = 0x0004; +/// Capability flag: node is on a low-bandwidth transport only. +pub const CAP_CONSTRAINED: u16 = 0x0008; + +/// A signed mesh node announcement. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct MeshAnnounce { + /// Ed25519 public key of the announcing node (32 bytes). + pub identity_key: Vec, + /// Truncated address: SHA-256(identity_key)[0..16] — used for routing. + pub address: [u8; 16], + /// Capability bitfield. + pub capabilities: u16, + /// Monotonically increasing sequence number (per node). + pub sequence: u64, + /// Unix timestamp of creation. + pub timestamp: u64, + /// Transports this node is reachable on: Vec<(transport_name, serialized_addr)>. + pub reachable_via: Vec<(String, Vec)>, + /// Current hop count (incremented on re-broadcast). + pub hop_count: u8, + /// Maximum propagation hops. + pub max_hops: u8, + /// Ed25519 signature over all fields except signature and hop_count. + pub signature: Vec, +} + +/// Compute the 16-byte mesh address from an Ed25519 public key. +/// +/// The address is the first 16 bytes of SHA-256(identity_key). +pub fn compute_address(identity_key: &[u8]) -> [u8; 16] { + let hash = Sha256::digest(identity_key); + let mut addr = [0u8; 16]; + addr.copy_from_slice(&hash[..16]); + addr +} + +impl MeshAnnounce { + /// Create and sign a new mesh announcement. + pub fn new( + identity: &MeshIdentity, + capabilities: u16, + reachable_via: Vec<(String, Vec)>, + max_hops: u8, + ) -> Self { + let identity_key = identity.public_key().to_vec(); + let address = compute_address(&identity_key); + let timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + + let mut announce = Self { + identity_key, + address, + capabilities, + sequence: 0, + timestamp, + reachable_via, + hop_count: 0, + max_hops, + signature: Vec::new(), + }; + + let signable = announce.signable_bytes(); + announce.signature = identity.sign(&signable).to_vec(); + announce + } + + /// Create and sign with a specific sequence number. + pub fn with_sequence( + identity: &MeshIdentity, + capabilities: u16, + reachable_via: Vec<(String, Vec)>, + max_hops: u8, + sequence: u64, + ) -> Self { + let mut announce = Self::new(identity, capabilities, reachable_via, max_hops); + announce.sequence = sequence; + // Re-sign with the correct sequence number. + let signable = announce.signable_bytes(); + announce.signature = identity.sign(&signable).to_vec(); + announce + } + + /// Assemble the byte string that is signed / verified. + /// + /// `hop_count` and `signature` are excluded: forwarding nodes increment + /// hop_count without re-signing (same design as [`MeshEnvelope`]). + fn signable_bytes(&self) -> Vec { + let mut buf = Vec::with_capacity( + self.identity_key.len() + 16 + 2 + 8 + 8 + self.reachable_via.len() * 32 + 1, + ); + buf.extend_from_slice(&self.identity_key); + buf.extend_from_slice(&self.address); + buf.extend_from_slice(&self.capabilities.to_le_bytes()); + buf.extend_from_slice(&self.sequence.to_le_bytes()); + buf.extend_from_slice(&self.timestamp.to_le_bytes()); + for (name, addr) in &self.reachable_via { + buf.extend_from_slice(name.as_bytes()); + buf.extend_from_slice(addr); + } + buf.push(self.max_hops); + buf + } + + /// Verify the Ed25519 signature on this announcement. + pub fn verify(&self) -> bool { + let identity_key: [u8; 32] = match self.identity_key.as_slice().try_into() { + Ok(k) => k, + Err(_) => return false, + }; + let sig: [u8; 64] = match self.signature.as_slice().try_into() { + Ok(s) => s, + Err(_) => return false, + }; + let signable = self.signable_bytes(); + quicprochat_core::IdentityKeypair::verify_raw(&identity_key, &signable, &sig).is_ok() + } + + /// Check whether this announce has expired relative to a maximum age. + pub fn is_expired(&self, max_age_secs: u64) -> bool { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + now.saturating_sub(self.timestamp) > max_age_secs + } + + /// Create a forwarded copy with `hop_count` incremented by one. + /// + /// The signature remains the original — forwarding nodes do not re-sign. + pub fn forwarded(&self) -> Self { + let mut copy = self.clone(); + copy.hop_count = copy.hop_count.saturating_add(1); + copy + } + + /// Whether this announce can still propagate (under hop limit and not expired). + /// + /// Uses a generous default max age of 1800 seconds (30 minutes) for the + /// expiry check. Callers that need a different max age should check + /// [`is_expired`](Self::is_expired) separately. + pub fn can_propagate(&self) -> bool { + self.hop_count < self.max_hops && !self.is_expired(1800) + } + + /// Serialize to compact CBOR binary format (for wire transmission). + pub fn to_wire(&self) -> Vec { + let mut buf = Vec::new(); + ciborium::into_writer(self, &mut buf).expect("CBOR serialization should not fail"); + buf + } + + /// Deserialize from CBOR binary format. + pub fn from_wire(bytes: &[u8]) -> anyhow::Result { + let announce: Self = ciborium::from_reader(bytes)?; + Ok(announce) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn test_identity() -> MeshIdentity { + MeshIdentity::generate() + } + + #[test] + fn create_and_verify() { + let id = test_identity(); + let announce = MeshAnnounce::new( + &id, + CAP_RELAY | CAP_STORE, + vec![("tcp".into(), b"127.0.0.1:9000".to_vec())], + 8, + ); + + assert!(announce.verify(), "freshly created announce must verify"); + assert_eq!(announce.hop_count, 0); + assert_eq!(announce.identity_key, id.public_key().to_vec()); + assert_eq!(announce.capabilities, CAP_RELAY | CAP_STORE); + assert_eq!(announce.max_hops, 8); + } + + #[test] + fn tampered_fails_verify() { + let id = test_identity(); + let mut announce = MeshAnnounce::new(&id, CAP_RELAY, vec![], 4); + announce.capabilities = CAP_GATEWAY; // tamper + assert!( + !announce.verify(), + "tampered announce must fail verification" + ); + } + + #[test] + fn forwarded_still_verifies() { + let id = test_identity(); + let announce = MeshAnnounce::new(&id, CAP_RELAY, vec![], 8); + assert!(announce.verify()); + + let fwd = announce.forwarded(); + assert_eq!(fwd.hop_count, 1); + assert!( + fwd.verify(), + "forwarded announce must still verify (hop_count excluded from signature)" + ); + + let fwd2 = fwd.forwarded(); + assert_eq!(fwd2.hop_count, 2); + assert!(fwd2.verify(), "double-forwarded must still verify"); + } + + #[test] + fn expired_announce() { + let id = test_identity(); + let mut announce = MeshAnnounce::new(&id, 0, vec![], 4); + // Set timestamp far in the past. + announce.timestamp = 0; + assert!(announce.is_expired(60), "announce from epoch should be expired with 60s max age"); + } + + #[test] + fn address_from_key_deterministic() { + let key = [42u8; 32]; + let addr1 = compute_address(&key); + let addr2 = compute_address(&key); + assert_eq!(addr1, addr2, "same key must produce same address"); + + // Different key produces different address. + let other_key = [99u8; 32]; + let other_addr = compute_address(&other_key); + assert_ne!(addr1, other_addr); + } + + #[test] + fn cbor_roundtrip() { + let id = test_identity(); + let announce = MeshAnnounce::new( + &id, + CAP_RELAY | CAP_GATEWAY, + vec![ + ("tcp".into(), b"127.0.0.1:9000".to_vec()), + ("lora".into(), vec![0x01, 0x02, 0x03, 0x04]), + ], + 6, + ); + + let wire = announce.to_wire(); + let restored = MeshAnnounce::from_wire(&wire).expect("CBOR deserialize"); + + assert_eq!(announce.identity_key, restored.identity_key); + assert_eq!(announce.address, restored.address); + assert_eq!(announce.capabilities, restored.capabilities); + assert_eq!(announce.sequence, restored.sequence); + assert_eq!(announce.timestamp, restored.timestamp); + assert_eq!(announce.reachable_via, restored.reachable_via); + assert_eq!(announce.hop_count, restored.hop_count); + assert_eq!(announce.max_hops, restored.max_hops); + assert_eq!(announce.signature, restored.signature); + assert!(restored.verify()); + } +} diff --git a/crates/quicprochat-p2p/src/announce_protocol.rs b/crates/quicprochat-p2p/src/announce_protocol.rs new file mode 100644 index 0000000..960e84f --- /dev/null +++ b/crates/quicprochat-p2p/src/announce_protocol.rs @@ -0,0 +1,302 @@ +//! Announce protocol engine — sends, receives, and propagates mesh announcements. +//! +//! This module ties together [`MeshAnnounce`], [`RoutingTable`], and +//! deduplication logic to form a complete announce processing pipeline. + +use std::collections::HashSet; +use std::time::Duration; + +use crate::announce::MeshAnnounce; +use crate::identity::MeshIdentity; +use crate::routing_table::RoutingTable; +use crate::transport::TransportAddr; + +/// Configuration for the announce protocol. +#[derive(Clone, Debug)] +pub struct AnnounceConfig { + /// Interval between periodic re-announcements. + pub announce_interval: Duration, + /// Maximum age before an announce is considered expired. + pub max_announce_age: Duration, + /// Maximum hops for announce propagation. + pub max_hops: u8, + /// This node's capabilities. + pub capabilities: u16, + /// Interval for routing table garbage collection. + pub gc_interval: Duration, +} + +impl Default for AnnounceConfig { + fn default() -> Self { + Self { + announce_interval: Duration::from_secs(600), // 10 minutes + max_announce_age: Duration::from_secs(1800), // 30 minutes + max_hops: 8, + capabilities: 0, + gc_interval: Duration::from_secs(60), + } + } +} + +/// Tracks which announces we've already seen (to prevent re-broadcast loops). +pub struct AnnounceDedup { + /// Set of (address, sequence) pairs we've seen. + seen: HashSet<([u8; 16], u64)>, + /// Maximum entries before pruning. + max_entries: usize, +} + +impl AnnounceDedup { + /// Create a new dedup tracker with the given capacity. + pub fn new(max_entries: usize) -> Self { + Self { + seen: HashSet::new(), + max_entries, + } + } + + /// Check if this announce is new (not seen before). + /// + /// Returns `true` if the (address, sequence) pair has not been seen before, + /// and adds it to the set. Returns `false` if it was already seen. + pub fn is_new(&mut self, address: &[u8; 16], sequence: u64) -> bool { + if self.seen.len() >= self.max_entries { + self.prune(); + } + self.seen.insert((*address, sequence)) + } + + /// Remove all entries when the set exceeds capacity. + /// + /// Uses a simple clear-all strategy; a more sophisticated implementation + /// could track insertion order and evict oldest entries. + pub fn prune(&mut self) { + self.seen.clear(); + } +} + +/// Create this node's own mesh announcement. +pub fn create_announce( + identity: &MeshIdentity, + config: &AnnounceConfig, + sequence: u64, + reachable_via: Vec<(String, Vec)>, +) -> MeshAnnounce { + MeshAnnounce::with_sequence( + identity, + config.capabilities, + reachable_via, + config.max_hops, + sequence, + ) +} + +/// Process a received mesh announcement. +/// +/// Steps: +/// 1. Verify signature — return `None` if invalid. +/// 2. Check if expired — return `None` if stale. +/// 3. Check dedup — return `None` if already seen. +/// 4. Update routing table. +/// 5. If `can_propagate` — return `Some(forwarded)` for re-broadcast. +/// 6. Otherwise return `None`. +pub fn process_received_announce( + announce: &MeshAnnounce, + routing_table: &mut RoutingTable, + dedup: &mut AnnounceDedup, + received_via: &str, + received_from: TransportAddr, + max_age: Duration, +) -> Option { + // 1. Verify signature. + if !announce.verify() { + return None; + } + + // 2. Check expiry. + if announce.is_expired(max_age.as_secs()) { + return None; + } + + // 3. Dedup check. + if !dedup.is_new(&announce.address, announce.sequence) { + return None; + } + + // 4. Update routing table. + routing_table.update(announce, received_via, received_from); + + // 5. Check if the announce can propagate further. + if announce.hop_count < announce.max_hops && !announce.is_expired(max_age.as_secs()) { + Some(announce.forwarded()) + } else { + None + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::announce::CAP_RELAY; + use crate::identity::MeshIdentity; + + fn test_identity() -> MeshIdentity { + MeshIdentity::generate() + } + + fn default_config() -> AnnounceConfig { + AnnounceConfig { + capabilities: CAP_RELAY, + ..AnnounceConfig::default() + } + } + + #[test] + fn create_announce_is_valid() { + let id = test_identity(); + let config = default_config(); + let announce = create_announce( + &id, + &config, + 1, + vec![("tcp".into(), b"127.0.0.1:9000".to_vec())], + ); + + assert!(announce.verify()); + assert_eq!(announce.sequence, 1); + assert_eq!(announce.capabilities, CAP_RELAY); + assert_eq!(announce.max_hops, 8); + assert_eq!(announce.hop_count, 0); + } + + #[test] + fn process_valid_announce_updates_table() { + let id = test_identity(); + let config = default_config(); + let announce = create_announce(&id, &config, 1, vec![]); + + let mut table = RoutingTable::new(Duration::from_secs(300)); + let mut dedup = AnnounceDedup::new(1000); + let addr = TransportAddr::Socket("127.0.0.1:9000".parse().unwrap()); + + let result = process_received_announce( + &announce, + &mut table, + &mut dedup, + "tcp", + addr, + Duration::from_secs(1800), + ); + + // Should propagate (hop_count 0 < max_hops 8). + assert!(result.is_some()); + // Routing table should have the entry. + assert_eq!(table.len(), 1); + } + + #[test] + fn process_duplicate_ignored() { + let id = test_identity(); + let config = default_config(); + let announce = create_announce(&id, &config, 1, vec![]); + + let mut table = RoutingTable::new(Duration::from_secs(300)); + let mut dedup = AnnounceDedup::new(1000); + let addr = TransportAddr::Socket("127.0.0.1:9000".parse().unwrap()); + + // First time — accepted. + let result1 = process_received_announce( + &announce, + &mut table, + &mut dedup, + "tcp", + addr.clone(), + Duration::from_secs(1800), + ); + assert!(result1.is_some()); + + // Second time — duplicate, ignored. + let result2 = process_received_announce( + &announce, + &mut table, + &mut dedup, + "tcp", + addr, + Duration::from_secs(1800), + ); + assert!(result2.is_none()); + } + + #[test] + fn process_expired_ignored() { + let id = test_identity(); + let config = default_config(); + let mut announce = create_announce(&id, &config, 1, vec![]); + // Set timestamp far in the past. + announce.timestamp = 0; + + let mut table = RoutingTable::new(Duration::from_secs(300)); + let mut dedup = AnnounceDedup::new(1000); + let addr = TransportAddr::Socket("127.0.0.1:9000".parse().unwrap()); + + let result = process_received_announce( + &announce, + &mut table, + &mut dedup, + "tcp", + addr, + Duration::from_secs(60), + ); + assert!(result.is_none(), "expired announce must be ignored"); + assert!(table.is_empty()); + } + + #[test] + fn process_invalid_sig_ignored() { + let id = test_identity(); + let config = default_config(); + let mut announce = create_announce(&id, &config, 1, vec![]); + // Tamper with capabilities to invalidate signature. + announce.capabilities = 0xFFFF; + + let mut table = RoutingTable::new(Duration::from_secs(300)); + let mut dedup = AnnounceDedup::new(1000); + let addr = TransportAddr::Socket("127.0.0.1:9000".parse().unwrap()); + + let result = process_received_announce( + &announce, + &mut table, + &mut dedup, + "tcp", + addr, + Duration::from_secs(1800), + ); + assert!(result.is_none(), "tampered announce must be ignored"); + assert!(table.is_empty()); + } + + #[test] + fn process_returns_forwarded_for_propagation() { + let id = test_identity(); + let config = default_config(); + let announce = create_announce(&id, &config, 1, vec![]); + assert_eq!(announce.hop_count, 0); + + let mut table = RoutingTable::new(Duration::from_secs(300)); + let mut dedup = AnnounceDedup::new(1000); + let addr = TransportAddr::Socket("127.0.0.1:9000".parse().unwrap()); + + let result = process_received_announce( + &announce, + &mut table, + &mut dedup, + "tcp", + addr, + Duration::from_secs(1800), + ); + + let forwarded = result.expect("should return forwarded announce"); + assert_eq!(forwarded.hop_count, 1); + assert!(forwarded.verify(), "forwarded announce must still verify"); + } +} diff --git a/crates/quicprochat-p2p/src/envelope.rs b/crates/quicprochat-p2p/src/envelope.rs index 6d7685f..89bafb7 100644 --- a/crates/quicprochat-p2p/src/envelope.rs +++ b/crates/quicprochat-p2p/src/envelope.rs @@ -176,13 +176,31 @@ impl MeshEnvelope { copy } - /// Serialize to bytes (JSON). + /// Serialize to compact CBOR binary format (for wire transmission). + pub fn to_wire(&self) -> Vec { + let mut buf = Vec::new(); + ciborium::into_writer(self, &mut buf).expect("CBOR serialization should not fail"); + buf + } + + /// Deserialize from CBOR binary format. + pub fn from_wire(bytes: &[u8]) -> anyhow::Result { + let env: Self = ciborium::from_reader(bytes)?; + Ok(env) + } + + /// Deserialize from wire format, trying CBOR first then JSON fallback. + pub fn from_wire_or_json(bytes: &[u8]) -> anyhow::Result { + Self::from_wire(bytes).or_else(|_| Self::from_bytes(bytes)) + } + + /// Serialize to bytes (JSON). Kept for backward compatibility and debugging. pub fn to_bytes(&self) -> Vec { // serde_json::to_vec should not fail on a well-formed envelope. serde_json::to_vec(self).expect("envelope serialization should not fail") } - /// Deserialize from bytes (JSON). + /// Deserialize from bytes (JSON). Kept for backward compatibility and debugging. pub fn from_bytes(bytes: &[u8]) -> anyhow::Result { let env: Self = serde_json::from_slice(bytes)?; Ok(env) @@ -293,4 +311,68 @@ mod tests { assert!(env.recipient_key.is_empty()); assert!(env.verify()); } + + #[test] + fn cbor_roundtrip() { + let id = test_identity(); + let recipient = [0xABu8; 32]; + let env = MeshEnvelope::new(&id, &recipient, b"cbor roundtrip".to_vec(), 3600, 5); + + let wire = env.to_wire(); + let restored = MeshEnvelope::from_wire(&wire).expect("CBOR deserialize"); + + assert_eq!(env.id, restored.id); + assert_eq!(env.sender_key, restored.sender_key); + assert_eq!(env.recipient_key, restored.recipient_key); + assert_eq!(env.payload, restored.payload); + assert_eq!(env.ttl_secs, restored.ttl_secs); + assert_eq!(env.hop_count, restored.hop_count); + assert_eq!(env.max_hops, restored.max_hops); + assert_eq!(env.timestamp, restored.timestamp); + assert_eq!(env.signature, restored.signature); + assert!(restored.verify()); + } + + #[test] + fn cbor_smaller_than_json() { + let id = test_identity(); + let recipient = [0xCCu8; 32]; + let payload = b"a typical chat message for size comparison testing".to_vec(); + let env = MeshEnvelope::new(&id, &recipient, payload, 3600, 5); + + let wire_len = env.to_wire().len(); + let json_len = env.to_bytes().len(); + + println!("CBOR wire size: {wire_len} bytes"); + println!("JSON size: {json_len} bytes"); + println!("Ratio: {:.1}x smaller", json_len as f64 / wire_len as f64); + + assert!( + json_len * 2 > wire_len * 3, + "CBOR ({wire_len}B) should be materially smaller than JSON ({json_len}B)" + ); + } + + #[test] + fn cbor_backward_compat() { + let id = test_identity(); + let env = MeshEnvelope::new(&id, &[0xDD; 32], b"json compat".to_vec(), 60, 3); + + // Serialize as JSON (old format). + let json_bytes = env.to_bytes(); + + // from_wire_or_json should fall back to JSON parsing. + let restored = MeshEnvelope::from_wire_or_json(&json_bytes) + .expect("from_wire_or_json should handle JSON"); + assert_eq!(env.id, restored.id); + assert_eq!(env.payload, restored.payload); + assert!(restored.verify()); + } + + #[test] + fn cbor_from_wire_rejects_garbage() { + let garbage = [0xFF, 0xFE, 0x00, 0x42, 0x99, 0x01, 0x02, 0x03]; + let result = MeshEnvelope::from_wire(&garbage); + assert!(result.is_err(), "garbage input must return Err, not panic"); + } } diff --git a/crates/quicprochat-p2p/src/lib.rs b/crates/quicprochat-p2p/src/lib.rs index a17ddf7..5dfbb16 100644 --- a/crates/quicprochat-p2p/src/lib.rs +++ b/crates/quicprochat-p2p/src/lib.rs @@ -12,11 +12,22 @@ //! └── QUIC/TLS ── Server ── QUIC/TLS ┘ (fallback: store-and-forward) //! ``` +pub mod address; +pub mod announce; +pub mod announce_protocol; pub mod broadcast; pub mod envelope; pub mod identity; +pub mod link; +pub mod mesh_router; pub mod routing; +pub mod routing_table; pub mod store; +pub mod transport; +pub mod transport_iroh; +pub mod transport_manager; +pub mod transport_tcp; +pub mod transport_lora; #[cfg(feature = "traffic-resistance")] pub mod traffic_resistance; @@ -204,7 +215,7 @@ impl P2pNode { .ok_or_else(|| anyhow::anyhow!("mesh identity not configured"))?; let envelope = MeshEnvelope::new(identity, recipient_key, payload, ttl_secs, 0); - let bytes = envelope.to_bytes(); + let bytes = envelope.to_wire(); if let Some(addr) = peer_addr { self.send(addr, &bytes).await?; @@ -257,7 +268,7 @@ impl P2pNode { for env in envelopes { if env.can_forward() { let fwd = env.forwarded(); - let bytes = fwd.to_bytes(); + let bytes = fwd.to_wire(); self.send(peer_addr.clone(), &bytes).await?; forwarded += 1; } @@ -318,7 +329,7 @@ impl P2pNode { // Create a broadcast envelope (empty recipient_key signals broadcast). let envelope = MeshEnvelope::new(identity, &[], encrypted, 300, 0); - let bytes = envelope.to_bytes(); + let bytes = envelope.to_wire(); // Store in the mesh store for flood-forwarding. let mut store = self diff --git a/crates/quicprochat-p2p/src/link.rs b/crates/quicprochat-p2p/src/link.rs new file mode 100644 index 0000000..76e69c2 --- /dev/null +++ b/crates/quicprochat-p2p/src/link.rs @@ -0,0 +1,492 @@ +//! Lightweight encrypted mesh link for constrained transports. +//! +//! On high-bandwidth transports (QUIC/TCP), we use TLS 1.3. On constrained +//! transports (LoRa, Serial), the full TLS handshake is too expensive +//! (~2-4 KB). This module provides a minimal 3-packet handshake that +//! establishes a ChaCha20-Poly1305 encrypted session in ~240 bytes total. +//! +//! # Handshake Protocol +//! +//! ```text +//! Packet 1: Initiator -> Responder (80 bytes) +//! [initiator_addr: 16][eph_x25519_pub: 32][nonce: 24][flags: 8] +//! +//! Packet 2: Responder -> Initiator (96 bytes) +//! [responder_addr: 16][eph_x25519_pub: 32][encrypted_proof: 32][tag: 16] +//! +//! Packet 3: Initiator -> Responder (48 bytes) +//! [encrypted_proof: 32][tag: 16] +//! +//! Total: 224 bytes +//! +//! Shared secret: HKDF-SHA256(ikm = X25519(eph_a, eph_b), info = "qpc-mesh-link-v1") +//! ``` + +use chacha20poly1305::aead::{Aead, KeyInit}; +use chacha20poly1305::{ChaCha20Poly1305, Nonce}; +use hkdf::Hkdf; +use rand::rngs::OsRng; +use rand::RngCore; +use sha2::Sha256; +use x25519_dalek::{EphemeralSecret, PublicKey as X25519Public}; +use zeroize::Zeroize; + +use crate::address::MeshAddress; + +/// Errors that can occur during link handshake or encryption. +#[derive(Debug, thiserror::Error)] +pub enum LinkError { + /// Received packet has wrong length. + #[error("invalid packet length: expected {expected}, got {got}")] + InvalidLength { expected: usize, got: usize }, + + /// AEAD decryption failed (wrong key or tampered data). + #[error("decryption failed: invalid ciphertext or authentication tag")] + DecryptionFailed, + + /// The proof inside a handshake packet did not match the expected address. + #[error("handshake proof mismatch: peer address does not match encrypted proof")] + ProofMismatch, +} + +/// Packet sizes for the 3-packet handshake. +pub const PACKET1_LEN: usize = 80; // 16 + 32 + 24 + 8 +pub const PACKET2_LEN: usize = 96; // 16 + 32 + 16 + 16 + 16 (addr + pub + encrypted_addr + tag) +pub const PACKET3_LEN: usize = 48; // 16 + 16 + 16 (encrypted_addr + tag) + +/// Derive a 32-byte session key from a shared secret and nonce via HKDF-SHA256. +fn derive_session_key(shared_secret: &[u8], salt: &[u8]) -> [u8; 32] { + let hk = Hkdf::::new(Some(salt), shared_secret); + let mut key = [0u8; 32]; + hk.expand(b"qpc-mesh-link-v1", &mut key) + .expect("HKDF expand to 32 bytes should never fail"); + key +} + +/// Build a ChaCha20Poly1305 nonce from a u64 counter (zero-padded, little-endian). +fn counter_nonce(counter: u64) -> Nonce { + let mut nonce_bytes = [0u8; 12]; + nonce_bytes[..8].copy_from_slice(&counter.to_le_bytes()); + *Nonce::from_slice(&nonce_bytes) +} + +/// An established encrypted mesh link session. +pub struct MeshLink { + /// Derived symmetric key for ChaCha20-Poly1305. + session_key: [u8; 32], + /// Remote peer's mesh address. + remote_address: MeshAddress, + /// Message counter for nonce derivation (send direction). + send_counter: u64, + /// Message counter for nonce derivation (receive direction). + recv_counter: u64, +} + +impl Drop for MeshLink { + fn drop(&mut self) { + self.session_key.zeroize(); + } +} + +impl MeshLink { + /// Encrypt a message using the session key. + /// + /// Returns the ciphertext (plaintext + 16-byte Poly1305 tag). + pub fn encrypt(&mut self, plaintext: &[u8]) -> Result, LinkError> { + // Nonces for encrypt start at offset 256 to avoid collision with handshake nonces. + let nonce = counter_nonce(256 + self.send_counter); + let cipher = ChaCha20Poly1305::new((&self.session_key).into()); + let ciphertext = cipher + .encrypt(&nonce, plaintext) + .map_err(|_| LinkError::DecryptionFailed)?; + self.send_counter += 1; + Ok(ciphertext) + } + + /// Decrypt a message using the session key. + pub fn decrypt(&mut self, ciphertext: &[u8]) -> Result, LinkError> { + let nonce = counter_nonce(256 + self.recv_counter); + let cipher = ChaCha20Poly1305::new((&self.session_key).into()); + let plaintext = cipher + .decrypt(&nonce, ciphertext) + .map_err(|_| LinkError::DecryptionFailed)?; + self.recv_counter += 1; + Ok(plaintext) + } + + /// Remote peer's address. + pub fn remote_address(&self) -> MeshAddress { + self.remote_address + } + + /// Number of messages sent on this link. + pub fn messages_sent(&self) -> u64 { + self.send_counter + } + + /// Number of messages received on this link. + pub fn messages_received(&self) -> u64 { + self.recv_counter + } + + /// Access the session key (for testing only). + #[cfg(test)] + fn session_key(&self) -> &[u8; 32] { + &self.session_key + } +} + +/// Handshake state for the initiator side of a mesh link. +pub struct LinkInitiator { + local_address: MeshAddress, + eph_secret: EphemeralSecret, + nonce: [u8; 24], +} + +/// Handshake state for the responder side of a mesh link. +pub struct LinkResponder { + remote_address: MeshAddress, + session_key: [u8; 32], +} + +impl Drop for LinkResponder { + fn drop(&mut self) { + self.session_key.zeroize(); + } +} + +impl LinkInitiator { + /// Create initiator state and generate Packet 1. + /// + /// Packet 1 layout (80 bytes): + /// `[initiator_addr: 16][eph_pub: 32][nonce: 24][flags: 8]` + pub fn new(local_address: MeshAddress) -> (Self, Vec) { + let eph_secret = EphemeralSecret::random_from_rng(OsRng); + let eph_public = X25519Public::from(&eph_secret); + + let mut nonce = [0u8; 24]; + OsRng.fill_bytes(&mut nonce); + + let mut packet = Vec::with_capacity(PACKET1_LEN); + packet.extend_from_slice(local_address.as_bytes()); + packet.extend_from_slice(eph_public.as_bytes()); + packet.extend_from_slice(&nonce); + packet.extend_from_slice(&[0u8; 8]); // flags: reserved + + let initiator = Self { + local_address, + eph_secret, + nonce, + }; + + (initiator, packet) + } + + /// Process Packet 2 from responder, generate Packet 3, return completed link. + /// + /// Packet 2 layout (96 bytes): + /// `[responder_addr: 16][eph_pub: 32][encrypted_responder_addr: 16+16]` + /// + /// Packet 3 layout (48 bytes): + /// `[encrypted_initiator_addr: 16+16][padding: 16]` + pub fn process_response(self, packet2: &[u8]) -> Result<(MeshLink, Vec), LinkError> { + if packet2.len() != PACKET2_LEN { + return Err(LinkError::InvalidLength { + expected: PACKET2_LEN, + got: packet2.len(), + }); + } + + // Parse Packet 2. + let mut responder_addr_bytes = [0u8; 16]; + responder_addr_bytes.copy_from_slice(&packet2[..16]); + let responder_address = MeshAddress::from_bytes(responder_addr_bytes); + + let mut responder_eph_pub_bytes = [0u8; 32]; + responder_eph_pub_bytes.copy_from_slice(&packet2[16..48]); + let responder_eph_pub = X25519Public::from(responder_eph_pub_bytes); + + let encrypted_proof = &packet2[48..80]; // 16-byte ciphertext + 16-byte Poly1305 tag = 32 bytes + + // Compute shared secret (consumes eph_secret). + let shared_secret = self.eph_secret.diffie_hellman(&responder_eph_pub); + + // Derive session key. + let session_key = derive_session_key(shared_secret.as_bytes(), &self.nonce); + + // Verify responder's proof: decrypt and check it matches responder_addr. + let cipher = ChaCha20Poly1305::new((&session_key).into()); + let proof_nonce = counter_nonce(0); + let decrypted_proof = cipher + .decrypt(&proof_nonce, encrypted_proof) + .map_err(|_| LinkError::DecryptionFailed)?; + + if decrypted_proof.as_slice() != responder_addr_bytes.as_slice() { + return Err(LinkError::ProofMismatch); + } + + // Build Packet 3: encrypt our address as proof. + let proof_nonce_3 = counter_nonce(1); + let encrypted_initiator_addr = cipher + .encrypt(&proof_nonce_3, self.local_address.as_bytes().as_slice()) + .map_err(|_| LinkError::DecryptionFailed)?; + + let mut packet3 = Vec::with_capacity(PACKET3_LEN); + packet3.extend_from_slice(&encrypted_initiator_addr); + // Pad to 48 bytes. + packet3.resize(PACKET3_LEN, 0); + + let link = MeshLink { + session_key, + remote_address: responder_address, + send_counter: 0, + recv_counter: 0, + }; + + Ok((link, packet3)) + } +} + +impl LinkResponder { + /// Process Packet 1 from initiator, generate Packet 2. + /// + /// Packet 1 layout (80 bytes): + /// `[initiator_addr: 16][eph_pub: 32][nonce: 24][flags: 8]` + /// + /// Packet 2 layout (96 bytes): + /// `[responder_addr: 16][eph_pub: 32][encrypted_responder_addr: 16+16]` + pub fn new( + local_address: MeshAddress, + packet1: &[u8], + ) -> Result<(Self, Vec), LinkError> { + if packet1.len() != PACKET1_LEN { + return Err(LinkError::InvalidLength { + expected: PACKET1_LEN, + got: packet1.len(), + }); + } + + // Parse Packet 1. + let mut initiator_addr_bytes = [0u8; 16]; + initiator_addr_bytes.copy_from_slice(&packet1[..16]); + let remote_address = MeshAddress::from_bytes(initiator_addr_bytes); + + let mut initiator_eph_pub_bytes = [0u8; 32]; + initiator_eph_pub_bytes.copy_from_slice(&packet1[16..48]); + let initiator_eph_pub = X25519Public::from(initiator_eph_pub_bytes); + + let mut nonce = [0u8; 24]; + nonce.copy_from_slice(&packet1[48..72]); + // flags at [72..80] — reserved, ignored. + + // Generate our ephemeral keypair. + let eph_secret = EphemeralSecret::random_from_rng(OsRng); + let eph_public = X25519Public::from(&eph_secret); + + // Compute shared secret (consumes eph_secret). + let shared_secret = eph_secret.diffie_hellman(&initiator_eph_pub); + + // Derive session key. + let session_key = derive_session_key(shared_secret.as_bytes(), &nonce); + + // Build Packet 2: our address + our eph_pub + encrypted proof of our address. + let cipher = ChaCha20Poly1305::new((&session_key).into()); + let proof_nonce = counter_nonce(0); + let encrypted_proof = cipher + .encrypt(&proof_nonce, local_address.as_bytes().as_slice()) + .map_err(|_| LinkError::DecryptionFailed)?; + + let mut packet2 = Vec::with_capacity(PACKET2_LEN); + packet2.extend_from_slice(local_address.as_bytes()); + packet2.extend_from_slice(eph_public.as_bytes()); + packet2.extend_from_slice(&encrypted_proof); + // Pad to PACKET2_LEN for fixed-size framing on constrained transports. + packet2.resize(PACKET2_LEN, 0); + + let responder = Self { + remote_address, + session_key, + }; + + Ok((responder, packet2)) + } + + /// Process Packet 3 from initiator, return completed link. + /// + /// Packet 3 layout (48 bytes): + /// `[encrypted_initiator_addr: 16+16][padding: 16]` + pub fn complete(self, packet3: &[u8]) -> Result { + if packet3.len() != PACKET3_LEN { + return Err(LinkError::InvalidLength { + expected: PACKET3_LEN, + got: packet3.len(), + }); + } + + // The encrypted proof is the first 32 bytes (16 plaintext + 16 tag). + let encrypted_proof = &packet3[..32]; + + let cipher = ChaCha20Poly1305::new((&self.session_key).into()); + let proof_nonce = counter_nonce(1); + let decrypted_proof = cipher + .decrypt(&proof_nonce, encrypted_proof) + .map_err(|_| LinkError::DecryptionFailed)?; + + let mut expected_addr = [0u8; 16]; + expected_addr.copy_from_slice(self.remote_address.as_bytes()); + + if decrypted_proof.as_slice() != expected_addr.as_slice() { + return Err(LinkError::ProofMismatch); + } + + Ok(MeshLink { + session_key: self.session_key, + remote_address: self.remote_address, + send_counter: 0, + recv_counter: 0, + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn test_address(byte: u8) -> MeshAddress { + MeshAddress::from_public_key(&[byte; 32]) + } + + #[test] + fn full_handshake_roundtrip() { + let addr_a = test_address(1); + let addr_b = test_address(2); + + // Initiator creates Packet 1. + let (initiator, packet1) = LinkInitiator::new(addr_a); + assert_eq!(packet1.len(), PACKET1_LEN); + + // Responder processes Packet 1, creates Packet 2. + let (responder, packet2) = LinkResponder::new(addr_b, &packet1).expect("responder::new"); + assert_eq!(packet2.len(), PACKET2_LEN); + + // Initiator processes Packet 2, creates Packet 3, gets link. + let (link_a, packet3) = initiator + .process_response(&packet2) + .expect("initiator::process_response"); + assert_eq!(packet3.len(), PACKET3_LEN); + + // Responder processes Packet 3, gets link. + let link_b = responder.complete(&packet3).expect("responder::complete"); + + // Both sides should have the same session key. + assert_eq!(link_a.session_key(), link_b.session_key()); + + // Check remote addresses. + assert_eq!(link_a.remote_address(), addr_b); + assert_eq!(link_b.remote_address(), addr_a); + } + + #[test] + fn encrypt_decrypt_roundtrip() { + let addr_a = test_address(10); + let addr_b = test_address(20); + + let (initiator, packet1) = LinkInitiator::new(addr_a); + let (responder, packet2) = LinkResponder::new(addr_b, &packet1).expect("responder"); + let (mut link_a, packet3) = initiator.process_response(&packet2).expect("initiator"); + let mut link_b = responder.complete(&packet3).expect("complete"); + + let plaintext = b"hello constrained mesh"; + let ciphertext = link_a.encrypt(plaintext).expect("encrypt"); + let decrypted = link_b.decrypt(&ciphertext).expect("decrypt"); + assert_eq!(decrypted, plaintext); + + // Reverse direction. + let plaintext2 = b"hello back"; + let ciphertext2 = link_b.encrypt(plaintext2).expect("encrypt"); + let decrypted2 = link_a.decrypt(&ciphertext2).expect("decrypt"); + assert_eq!(decrypted2, plaintext2); + } + + #[test] + fn wrong_key_fails_decrypt() { + let addr_a = test_address(30); + let addr_b = test_address(40); + + let (initiator, packet1) = LinkInitiator::new(addr_a); + let (responder, packet2) = LinkResponder::new(addr_b, &packet1).expect("responder"); + let (mut link_a, packet3) = initiator.process_response(&packet2).expect("initiator"); + let _link_b = responder.complete(&packet3).expect("complete"); + + let ciphertext = link_a.encrypt(b"secret").expect("encrypt"); + + // Create a link with a different session key. + let mut fake_link = MeshLink { + session_key: [0xFFu8; 32], + remote_address: addr_a, + send_counter: 0, + recv_counter: 0, + }; + + let result = fake_link.decrypt(&ciphertext); + assert!(result.is_err(), "decryption with wrong key must fail"); + } + + #[test] + fn counter_increments() { + let addr_a = test_address(50); + let addr_b = test_address(60); + + let (initiator, packet1) = LinkInitiator::new(addr_a); + let (responder, packet2) = LinkResponder::new(addr_b, &packet1).expect("responder"); + let (mut link_a, packet3) = initiator.process_response(&packet2).expect("initiator"); + let mut link_b = responder.complete(&packet3).expect("complete"); + + assert_eq!(link_a.messages_sent(), 0); + assert_eq!(link_b.messages_received(), 0); + + link_a.encrypt(b"msg1").expect("encrypt"); + assert_eq!(link_a.messages_sent(), 1); + + link_a.encrypt(b"msg2").expect("encrypt"); + assert_eq!(link_a.messages_sent(), 2); + + // Decrypt two messages on the other side. + // We need fresh ciphertexts — re-do with proper counter tracking. + let addr_c = test_address(70); + let addr_d = test_address(80); + let (init2, p1) = LinkInitiator::new(addr_c); + let (resp2, p2) = LinkResponder::new(addr_d, &p1).expect("responder"); + let (mut la, p3) = init2.process_response(&p2).expect("initiator"); + let mut lb = resp2.complete(&p3).expect("complete"); + + let ct1 = la.encrypt(b"msg1").expect("encrypt"); + let ct2 = la.encrypt(b"msg2").expect("encrypt"); + + lb.decrypt(&ct1).expect("decrypt"); + assert_eq!(lb.messages_received(), 1); + + lb.decrypt(&ct2).expect("decrypt"); + assert_eq!(lb.messages_received(), 2); + } + + #[test] + fn packet_sizes() { + let addr = test_address(90); + + let (_initiator, packet1) = LinkInitiator::new(addr); + assert_eq!(packet1.len(), 80, "packet 1 must be 80 bytes"); + + // Complete a handshake to check packet 2 and 3 sizes. + let addr_b = test_address(91); + let (init, p1) = LinkInitiator::new(addr); + let (resp, p2) = LinkResponder::new(addr_b, &p1).expect("responder"); + assert_eq!(p2.len(), 96, "packet 2 must be 96 bytes"); + + let (_link, p3) = init.process_response(&p2).expect("initiator"); + assert_eq!(p3.len(), 48, "packet 3 must be 48 bytes"); + + // Verify responder can complete. + resp.complete(&p3).expect("complete"); + } +} diff --git a/crates/quicprochat-p2p/src/mesh_router.rs b/crates/quicprochat-p2p/src/mesh_router.rs new file mode 100644 index 0000000..eec9378 --- /dev/null +++ b/crates/quicprochat-p2p/src/mesh_router.rs @@ -0,0 +1,516 @@ +//! Multi-hop mesh router using the distributed routing table. +//! +//! The [`MeshRouter`] delivers messages using the best available path: +//! direct transport -> multi-hop via intermediate nodes -> store-and-forward. +//! +//! # Routing Algorithm +//! +//! ```text +//! send(destination, payload): +//! 1. Look up destination in routing table +//! 2. If direct transport available -> send via transport +//! 3. If next-hop known -> wrap in MeshEnvelope, send to next-hop +//! 4. If no route -> queue in store-and-forward +//! ``` + +use std::collections::HashMap; +use std::sync::{Arc, Mutex, RwLock}; +use std::time::{Duration, Instant}; + +use anyhow::{bail, Result}; + +use crate::announce::compute_address; +use crate::envelope::MeshEnvelope; +use crate::identity::MeshIdentity; +use crate::routing_table::RoutingTable; +use crate::store::MeshStore; +use crate::transport::TransportAddr; +use crate::transport_manager::TransportManager; + +/// How a message was delivered. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum DeliveryResult { + /// Sent directly to destination via a transport. + Direct, + /// Forwarded to next-hop node for relay. + Forwarded, + /// Queued in store-and-forward (destination unreachable). + Stored, + /// Delivered via server relay (legacy fallback). + ServerRelay, +} + +/// What to do with an incoming envelope. +#[derive(Debug)] +pub enum IncomingAction { + /// Message is for us — deliver to application. + Deliver(MeshEnvelope), + /// Message is for someone else — forward it. + Forward { + envelope: MeshEnvelope, + next_hop: TransportAddr, + }, + /// Message should be stored for later forwarding. + Store(MeshEnvelope), + /// Message was dropped (expired, max hops, invalid). + Dropped(String), +} + +/// Per-destination delivery statistics. +#[derive(Debug, Clone, Default)] +pub struct DeliveryStats { + pub direct_count: u64, + pub forwarded_count: u64, + pub stored_count: u64, + pub relay_count: u64, + pub last_delivery: Option, + pub avg_latency: Option, +} + +impl DeliveryStats { + fn record(&mut self, method: DeliveryResult, latency: Duration) { + match method { + DeliveryResult::Direct => self.direct_count += 1, + DeliveryResult::Forwarded => self.forwarded_count += 1, + DeliveryResult::Stored => self.stored_count += 1, + DeliveryResult::ServerRelay => self.relay_count += 1, + } + self.last_delivery = Some(Instant::now()); + self.avg_latency = Some(match self.avg_latency { + Some(prev) => (prev + latency) / 2, + None => latency, + }); + } + + /// Total number of deliveries across all methods. + pub fn total(&self) -> u64 { + self.direct_count + self.forwarded_count + self.stored_count + self.relay_count + } +} + +/// Multi-hop mesh message router. +pub struct MeshRouter { + /// This node's mesh identity. + identity: MeshIdentity, + /// This node's 16-byte truncated address. + local_address: [u8; 16], + /// Distributed routing table. + routes: Arc>, + /// Transport manager for sending packets. + transports: Arc, + /// Store-and-forward queue for unreachable destinations. + store: Arc>, + /// Per-destination delivery stats. + stats: Mutex>, +} + +impl MeshRouter { + /// Create a new mesh router. + pub fn new( + identity: MeshIdentity, + routes: Arc>, + transports: Arc, + store: Arc>, + ) -> Self { + let local_address = compute_address(&identity.public_key()); + Self { + identity, + local_address, + routes, + transports, + store, + stats: Mutex::new(HashMap::new()), + } + } + + /// Send a payload to a destination identified by its 16-byte mesh address. + /// + /// Routing priority: + /// 1. Route found in routing table -> wrap in envelope and send via transport + /// 2. No route -> store for later forwarding + pub async fn send(&self, dest_address: &[u8; 16], payload: &[u8]) -> Result { + let start = Instant::now(); + + // Look up destination in routing table. + let route_info = { + let table = self + .routes + .read() + .map_err(|e| anyhow::anyhow!("routing table lock poisoned: {e}"))?; + table.lookup(dest_address).map(|entry| { + ( + entry.identity_key, + entry.next_hop_addr.clone(), + entry.hops, + ) + }) + }; + + if let Some((dest_key, next_hop_addr, hops)) = route_info { + // Build an envelope addressed to the destination. + let envelope = + MeshEnvelope::new(&self.identity, &dest_key, payload.to_vec(), 300, 0); + let wire = envelope.to_wire(); + + self.transports.send(&next_hop_addr, &wire).await?; + + // Classify: if destination is directly reachable (hop count <= 1), + // consider it Direct; otherwise it's Forwarded through intermediaries. + let result = if hops <= 1 { + DeliveryResult::Direct + } else { + DeliveryResult::Forwarded + }; + + let latency = start.elapsed(); + self.record_stats(dest_address, result, latency); + Ok(result) + } else { + // No route — store for later forwarding. + // We need a recipient key for the store. Since we only have the address + // and no key, store with the address zero-padded to 32 bytes as a key + // placeholder. The drain_store_for method matches on this convention. + let mut recipient_key = [0u8; 32]; + recipient_key[..16].copy_from_slice(dest_address); + + let envelope = MeshEnvelope::new( + &self.identity, + &recipient_key, + payload.to_vec(), + 300, + 0, + ); + let stored = { + let mut store = self + .store + .lock() + .map_err(|e| anyhow::anyhow!("store lock poisoned: {e}"))?; + store.store(envelope) + }; + if !stored { + bail!("store rejected envelope (duplicate or at capacity)"); + } + + let latency = start.elapsed(); + let result = DeliveryResult::Stored; + self.record_stats(dest_address, result, latency); + Ok(result) + } + } + + /// Convenience: compute the 16-byte address from a 32-byte key, then send. + pub async fn send_to_key( + &self, + dest_key: &[u8; 32], + payload: &[u8], + ) -> Result { + let addr = compute_address(dest_key); + self.send(&addr, payload).await + } + + /// Process a received envelope and decide what to do with it. + pub fn handle_incoming(&self, envelope: MeshEnvelope) -> Result { + // Verify envelope signature. + if !envelope.verify() { + return Ok(IncomingAction::Dropped( + "invalid signature".to_string(), + )); + } + + // Check if it's for us (recipient_key matches our identity). + let our_key = self.identity.public_key(); + if envelope.recipient_key.len() == 32 { + let recipient: [u8; 32] = envelope + .recipient_key + .as_slice() + .try_into() + .map_err(|_| anyhow::anyhow!("invalid recipient key length"))?; + if recipient == our_key { + return Ok(IncomingAction::Deliver(envelope)); + } + } + + // Broadcast (empty recipient) — always deliver locally. + if envelope.recipient_key.is_empty() { + return Ok(IncomingAction::Deliver(envelope)); + } + + // Not for us — check if we can forward. + if !envelope.can_forward() { + let reason = if envelope.is_expired() { + "envelope expired" + } else { + "max hops reached" + }; + return Ok(IncomingAction::Dropped(reason.to_string())); + } + + // Look up the recipient in the routing table. + let dest_address = compute_address(&envelope.recipient_key); + let next_hop = { + let table = self + .routes + .read() + .map_err(|e| anyhow::anyhow!("routing table lock poisoned: {e}"))?; + table + .lookup(&dest_address) + .map(|entry| entry.next_hop_addr.clone()) + }; + + match next_hop { + Some(addr) => { + let forwarded = envelope.forwarded(); + Ok(IncomingAction::Forward { + envelope: forwarded, + next_hop: addr, + }) + } + None => Ok(IncomingAction::Store(envelope)), + } + } + + /// Forward an envelope to its next hop based on the routing table. + /// + /// The envelope is sent as-is (callers such as [`handle_incoming`](Self::handle_incoming) + /// are expected to have already incremented the hop count via [`MeshEnvelope::forwarded`]). + pub async fn forward(&self, envelope: MeshEnvelope) -> Result { + let start = Instant::now(); + let dest_address = compute_address(&envelope.recipient_key); + + let next_hop_addr = { + let table = self + .routes + .read() + .map_err(|e| anyhow::anyhow!("routing table lock poisoned: {e}"))?; + table + .lookup(&dest_address) + .map(|entry| entry.next_hop_addr.clone()) + .ok_or_else(|| anyhow::anyhow!("no route for forwarding target"))? + }; + + let wire = envelope.to_wire(); + self.transports.send(&next_hop_addr, &wire).await?; + + let latency = start.elapsed(); + let result = DeliveryResult::Forwarded; + self.record_stats(&dest_address, result, latency); + Ok(result) + } + + /// Drain stored messages for a destination and attempt to forward them. + /// + /// Call this when a new route appears (e.g., from an announce) to flush + /// queued messages. Returns the count of successfully forwarded messages. + pub async fn drain_store_for(&self, dest_address: &[u8; 16]) -> Result { + // Look up the route to get identity key and next-hop. + let (identity_key, next_hop_addr) = { + let table = self + .routes + .read() + .map_err(|e| anyhow::anyhow!("routing table lock poisoned: {e}"))?; + match table.lookup(dest_address) { + Some(entry) => (entry.identity_key, entry.next_hop_addr.clone()), + None => return Ok(0), + } + }; + + // Fetch stored envelopes keyed by the full identity key. + let envelopes = { + let mut store = self + .store + .lock() + .map_err(|e| anyhow::anyhow!("store lock poisoned: {e}"))?; + let mut result = store.fetch(&identity_key); + // Also try the zero-padded address convention used by send(). + let mut padded_key = [0u8; 32]; + padded_key[..16].copy_from_slice(dest_address); + result.extend(store.fetch(&padded_key)); + result + }; + + let mut forwarded_count = 0; + for env in envelopes { + if env.can_forward() { + let fwd = env.forwarded(); + let wire = fwd.to_wire(); + if self.transports.send(&next_hop_addr, &wire).await.is_ok() { + forwarded_count += 1; + } + } + } + + Ok(forwarded_count) + } + + /// Get delivery statistics for a specific destination. + pub fn stats(&self, address: &[u8; 16]) -> Option { + self.stats + .lock() + .ok() + .and_then(|s| s.get(address).cloned()) + } + + /// Get delivery statistics for all known destinations. + pub fn all_stats(&self) -> HashMap<[u8; 16], DeliveryStats> { + self.stats + .lock() + .map(|s| s.clone()) + .unwrap_or_default() + } + + /// This node's 16-byte truncated mesh address. + pub fn local_address(&self) -> &[u8; 16] { + &self.local_address + } + + /// Record a delivery in the per-destination stats. + fn record_stats(&self, address: &[u8; 16], method: DeliveryResult, latency: Duration) { + if let Ok(mut stats) = self.stats.lock() { + stats + .entry(*address) + .or_default() + .record(method, latency); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn delivery_stats_tracking() { + let mut stats = DeliveryStats::default(); + assert_eq!(stats.total(), 0); + + stats.record(DeliveryResult::Direct, Duration::from_millis(10)); + assert_eq!(stats.direct_count, 1); + assert_eq!(stats.total(), 1); + assert!(stats.last_delivery.is_some()); + assert!(stats.avg_latency.is_some()); + + stats.record(DeliveryResult::Forwarded, Duration::from_millis(20)); + assert_eq!(stats.forwarded_count, 1); + assert_eq!(stats.total(), 2); + + stats.record(DeliveryResult::Stored, Duration::from_millis(5)); + assert_eq!(stats.stored_count, 1); + assert_eq!(stats.total(), 3); + + stats.record(DeliveryResult::ServerRelay, Duration::from_millis(50)); + assert_eq!(stats.relay_count, 1); + assert_eq!(stats.total(), 4); + + // avg_latency should be present and reasonable. + let avg = stats.avg_latency.unwrap(); + assert!(avg.as_millis() > 0); + } + + #[test] + fn incoming_action_deliver_to_self() { + let identity = MeshIdentity::generate(); + let our_key = identity.public_key(); + let routes = Arc::new(RwLock::new(RoutingTable::new(Duration::from_secs(300)))); + let transports = Arc::new(TransportManager::new()); + let store = Arc::new(Mutex::new(MeshStore::new(100))); + + let router = MeshRouter::new(identity, routes, transports, store); + + // Create an envelope addressed to our key. + let sender = MeshIdentity::generate(); + let envelope = + MeshEnvelope::new(&sender, &our_key, b"hello self".to_vec(), 3600, 5); + + let action = router.handle_incoming(envelope).expect("handle_incoming"); + match action { + IncomingAction::Deliver(env) => { + assert_eq!(env.payload, b"hello self"); + } + other => panic!("expected Deliver, got {:?}", std::mem::discriminant(&other)), + } + } + + #[test] + fn incoming_action_broadcast_delivers() { + let identity = MeshIdentity::generate(); + let routes = Arc::new(RwLock::new(RoutingTable::new(Duration::from_secs(300)))); + let transports = Arc::new(TransportManager::new()); + let store = Arc::new(Mutex::new(MeshStore::new(100))); + + let router = MeshRouter::new(identity, routes, transports, store); + + // Create a broadcast envelope (empty recipient key). + let sender = MeshIdentity::generate(); + let envelope = + MeshEnvelope::new(&sender, &[], b"broadcast msg".to_vec(), 3600, 5); + + let action = router.handle_incoming(envelope).expect("handle_incoming"); + match action { + IncomingAction::Deliver(env) => { + assert_eq!(env.payload, b"broadcast msg"); + assert!(env.recipient_key.is_empty()); + } + other => panic!("expected Deliver, got {:?}", std::mem::discriminant(&other)), + } + } + + #[test] + fn incoming_action_dropped_expired() { + let identity = MeshIdentity::generate(); + let routes = Arc::new(RwLock::new(RoutingTable::new(Duration::from_secs(300)))); + let transports = Arc::new(TransportManager::new()); + let store = Arc::new(Mutex::new(MeshStore::new(100))); + + let router = MeshRouter::new(identity, routes, transports, store); + + // Create an envelope addressed to someone else with TTL=0. + // is_expired() checks: now - timestamp > ttl_secs. + // With ttl=0 and timestamp=now, we need to wait >0 seconds for expiry. + let sender = MeshIdentity::generate(); + let other_key = [0xBB; 32]; + let envelope = + MeshEnvelope::new(&sender, &other_key, b"expired".to_vec(), 0, 5); + + // Sleep briefly so that now - timestamp > 0 (the TTL). + std::thread::sleep(Duration::from_millis(1100)); + + let action = router.handle_incoming(envelope).expect("handle_incoming"); + match action { + IncomingAction::Dropped(reason) => { + assert!( + reason.contains("expired"), + "expected expired reason, got: {reason}" + ); + } + other => panic!("expected Dropped, got {:?}", std::mem::discriminant(&other)), + } + } + + #[test] + fn incoming_action_dropped_invalid_sig() { + let identity = MeshIdentity::generate(); + let routes = Arc::new(RwLock::new(RoutingTable::new(Duration::from_secs(300)))); + let transports = Arc::new(TransportManager::new()); + let store = Arc::new(Mutex::new(MeshStore::new(100))); + + let router = MeshRouter::new(identity, routes, transports, store); + + // Create a valid envelope then tamper with the payload. + let sender = MeshIdentity::generate(); + let other_key = [0xCC; 32]; + let mut envelope = + MeshEnvelope::new(&sender, &other_key, b"original".to_vec(), 3600, 5); + envelope.payload = b"tampered".to_vec(); + + let action = router.handle_incoming(envelope).expect("handle_incoming"); + match action { + IncomingAction::Dropped(reason) => { + assert!( + reason.contains("invalid signature"), + "expected invalid signature reason, got: {reason}" + ); + } + other => panic!("expected Dropped, got {:?}", std::mem::discriminant(&other)), + } + } +} diff --git a/crates/quicprochat-p2p/src/routing_table.rs b/crates/quicprochat-p2p/src/routing_table.rs new file mode 100644 index 0000000..040e91d --- /dev/null +++ b/crates/quicprochat-p2p/src/routing_table.rs @@ -0,0 +1,245 @@ +//! Distributed routing table built from mesh announcements. +//! +//! The [`RoutingTable`] stores [`RoutingEntry`] records keyed by 16-byte +//! truncated mesh addresses, enabling multi-hop packet forwarding through +//! the mesh network. + +use std::collections::HashMap; +use std::time::{Duration, Instant}; + +use crate::announce::MeshAnnounce; +use crate::transport::TransportAddr; + +/// A routing entry for a known mesh destination. +#[derive(Clone, Debug)] +pub struct RoutingEntry { + /// Full 32-byte Ed25519 public key of the destination. + pub identity_key: [u8; 32], + /// 16-byte truncated mesh address. + pub address: [u8; 16], + /// Next-hop transport name (e.g. "tcp", "iroh-quic", "lora"). + pub next_hop_transport: String, + /// Next-hop address to send through. + pub next_hop_addr: TransportAddr, + /// Number of hops to this destination. + pub hops: u8, + /// Estimated cost (lower is better). Currently computed as hops as f64. + pub cost: f64, + /// Capabilities of the destination node. + pub capabilities: u16, + /// Last announce sequence number seen from this node. + pub last_sequence: u64, + /// When this entry was last updated. + pub last_seen: Instant, + /// When this entry expires (based on announce TTL). + pub expires_at: Instant, +} + +/// Distributed routing table built from received mesh announcements. +pub struct RoutingTable { + /// Entries keyed by 16-byte truncated address. + entries: HashMap<[u8; 16], RoutingEntry>, + /// Default entry TTL. + default_ttl: Duration, +} + +impl RoutingTable { + /// Create a new empty routing table with the given default TTL for entries. + pub fn new(default_ttl: Duration) -> Self { + Self { + entries: HashMap::new(), + default_ttl, + } + } + + /// Update the routing table from a received mesh announcement. + /// + /// Returns `true` if this was a new or improved route. + /// + /// Logic: + /// - If `sequence <= last_sequence` for this address, the announce is stale — ignored. + /// - If the entry is new or has lower cost, it replaces the existing entry. + pub fn update( + &mut self, + announce: &MeshAnnounce, + received_via_transport: &str, + received_from: TransportAddr, + ) -> bool { + let address = announce.address; + let new_cost = announce.hop_count as f64; + let now = Instant::now(); + + let identity_key: [u8; 32] = match announce.identity_key.as_slice().try_into() { + Ok(k) => k, + Err(_) => return false, + }; + + if let Some(existing) = self.entries.get(&address) { + // Stale announce — older or same sequence number. + if announce.sequence <= existing.last_sequence { + return false; + } + // Only replace if the new route is better or equal (newer sequence wins on tie). + if new_cost > existing.cost && announce.sequence == existing.last_sequence + 1 { + // Higher cost with only incremental sequence — still update since it's fresher. + } + } + + let entry = RoutingEntry { + identity_key, + address, + next_hop_transport: received_via_transport.to_string(), + next_hop_addr: received_from, + hops: announce.hop_count, + cost: new_cost, + capabilities: announce.capabilities, + last_sequence: announce.sequence, + last_seen: now, + expires_at: now + self.default_ttl, + }; + + self.entries.insert(address, entry); + true + } + + /// Look up a routing entry by 16-byte truncated mesh address. + pub fn lookup(&self, address: &[u8; 16]) -> Option<&RoutingEntry> { + self.entries.get(address) + } + + /// Look up a routing entry by the full 32-byte Ed25519 public key. + pub fn lookup_by_key(&self, identity_key: &[u8; 32]) -> Option<&RoutingEntry> { + self.entries.values().find(|e| &e.identity_key == identity_key) + } + + /// Remove all expired entries. Returns the number of entries removed. + pub fn remove_expired(&mut self) -> usize { + let now = Instant::now(); + let before = self.entries.len(); + self.entries.retain(|_, entry| entry.expires_at > now); + before - self.entries.len() + } + + /// Iterate over all routing entries. + pub fn entries(&self) -> impl Iterator { + self.entries.values() + } + + /// Number of entries in the routing table. + pub fn len(&self) -> usize { + self.entries.len() + } + + /// Whether the routing table is empty. + pub fn is_empty(&self) -> bool { + self.entries.is_empty() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::announce::{compute_address, CAP_RELAY}; + use crate::identity::MeshIdentity; + + fn make_announce(identity: &MeshIdentity, sequence: u64, hop_count: u8) -> MeshAnnounce { + let mut announce = + MeshAnnounce::with_sequence(identity, CAP_RELAY, vec![], 8, sequence); + announce.hop_count = hop_count; + announce + } + + #[test] + fn insert_and_lookup() { + let mut table = RoutingTable::new(Duration::from_secs(300)); + let id = MeshIdentity::generate(); + let announce = make_announce(&id, 1, 1); + let addr = TransportAddr::Socket("127.0.0.1:9000".parse().unwrap()); + + assert!(table.update(&announce, "tcp", addr.clone())); + assert_eq!(table.len(), 1); + + let mesh_addr = compute_address(&id.public_key()); + let entry = table.lookup(&mesh_addr).expect("entry should exist"); + assert_eq!(entry.hops, 1); + assert_eq!(entry.last_sequence, 1); + assert_eq!(entry.next_hop_transport, "tcp"); + assert_eq!(entry.next_hop_addr, addr); + } + + #[test] + fn update_with_better_route() { + let mut table = RoutingTable::new(Duration::from_secs(300)); + let id = MeshIdentity::generate(); + let addr = TransportAddr::Socket("127.0.0.1:9000".parse().unwrap()); + + // First announce: 3 hops, sequence 1. + let announce1 = make_announce(&id, 1, 3); + assert!(table.update(&announce1, "tcp", addr.clone())); + + let mesh_addr = compute_address(&id.public_key()); + assert_eq!(table.lookup(&mesh_addr).unwrap().hops, 3); + + // Second announce: 1 hop, sequence 2 — should replace. + let announce2 = make_announce(&id, 2, 1); + assert!(table.update(&announce2, "tcp", addr)); + + let entry = table.lookup(&mesh_addr).unwrap(); + assert_eq!(entry.hops, 1); + assert_eq!(entry.last_sequence, 2); + } + + #[test] + fn reject_stale_sequence() { + let mut table = RoutingTable::new(Duration::from_secs(300)); + let id = MeshIdentity::generate(); + let addr = TransportAddr::Socket("127.0.0.1:9000".parse().unwrap()); + + // Insert with sequence 5. + let announce1 = make_announce(&id, 5, 1); + assert!(table.update(&announce1, "tcp", addr.clone())); + + // Try to update with sequence 3 — should be rejected. + let announce2 = make_announce(&id, 3, 1); + assert!( + !table.update(&announce2, "tcp", addr), + "stale sequence must be rejected" + ); + + let mesh_addr = compute_address(&id.public_key()); + assert_eq!(table.lookup(&mesh_addr).unwrap().last_sequence, 5); + } + + #[test] + fn expire_old_entries() { + let mut table = RoutingTable::new(Duration::from_millis(1)); + let id = MeshIdentity::generate(); + let addr = TransportAddr::Socket("127.0.0.1:9000".parse().unwrap()); + + let announce = make_announce(&id, 1, 1); + table.update(&announce, "tcp", addr); + assert_eq!(table.len(), 1); + + // Wait for TTL to expire. + std::thread::sleep(Duration::from_millis(10)); + + let removed = table.remove_expired(); + assert_eq!(removed, 1); + assert!(table.is_empty()); + } + + #[test] + fn lookup_by_key_works() { + let mut table = RoutingTable::new(Duration::from_secs(300)); + let id = MeshIdentity::generate(); + let addr = TransportAddr::Socket("127.0.0.1:9000".parse().unwrap()); + + let announce = make_announce(&id, 1, 2); + table.update(&announce, "tcp", addr); + + let pk = id.public_key(); + let entry = table.lookup_by_key(&pk).expect("should find by key"); + assert_eq!(entry.identity_key, pk); + assert_eq!(entry.hops, 2); + } +} diff --git a/crates/quicprochat-p2p/src/transport.rs b/crates/quicprochat-p2p/src/transport.rs new file mode 100644 index 0000000..b1509c8 --- /dev/null +++ b/crates/quicprochat-p2p/src/transport.rs @@ -0,0 +1,140 @@ +//! Transport abstraction for pluggable mesh backends. +//! +//! Every mesh transport (iroh QUIC, TCP, LoRa, Serial) implements the +//! [`MeshTransport`] trait. The [`TransportAddr`] enum provides a +//! transport-agnostic address type. + +use std::fmt; + +use anyhow::Result; + +/// Transport-agnostic peer address. +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub enum TransportAddr { + /// iroh node ID (32-byte public key) with optional relay info. + Iroh(Vec), + /// IP socket address for TCP/UDP transports. + Socket(std::net::SocketAddr), + /// LoRa device address (4 bytes). + LoRa([u8; 4]), + /// Serial port identifier. + Serial(String), + /// Opaque bytes for unknown/future transports. + Raw(Vec), +} + +impl fmt::Display for TransportAddr { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Iroh(id) => write!(f, "iroh:{}", hex::encode(&id[..4.min(id.len())])), + Self::Socket(addr) => write!(f, "tcp:{addr}"), + Self::LoRa(addr) => write!(f, "lora:{}", hex::encode(addr)), + Self::Serial(port) => write!(f, "serial:{port}"), + Self::Raw(data) => write!(f, "raw:{}", hex::encode(&data[..4.min(data.len())])), + } + } +} + +/// Metadata about a transport's capabilities. +#[derive(Clone, Debug)] +pub struct TransportInfo { + /// Human-readable transport name. + pub name: String, + /// Maximum transmission unit in bytes. + pub mtu: usize, + /// Estimated bitrate in bits/second. + pub bitrate: u64, + /// Whether this transport supports bidirectional communication. + pub bidirectional: bool, +} + +/// Received packet from a transport. +#[derive(Clone, Debug)] +pub struct TransportPacket { + /// Source address of the sender. + pub from: TransportAddr, + /// Raw packet data. + pub data: Vec, +} + +/// A pluggable mesh transport backend. +/// +/// Implementations provide send/receive over a specific medium (QUIC, TCP, LoRa, etc). +#[async_trait::async_trait] +pub trait MeshTransport: Send + Sync { + /// Transport metadata (name, MTU, bitrate). + fn info(&self) -> TransportInfo; + + /// Send raw bytes to a destination. + async fn send(&self, dest: &TransportAddr, data: &[u8]) -> Result<()>; + + /// Receive the next incoming packet. Blocks until data arrives. + async fn recv(&self) -> Result; + + /// Discover reachable peers on this transport. + /// Returns an empty vec if discovery is not supported. + async fn discover(&self) -> Result> { + Ok(Vec::new()) + } + + /// Gracefully shut down this transport. + async fn close(&self) -> Result<()> { + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn transport_addr_display_iroh() { + let addr = TransportAddr::Iroh(vec![0xDE, 0xAD, 0xBE, 0xEF, 0x01, 0x02]); + assert_eq!(addr.to_string(), "iroh:deadbeef"); + } + + #[test] + fn transport_addr_display_iroh_short() { + let addr = TransportAddr::Iroh(vec![0xAB, 0xCD]); + assert_eq!(addr.to_string(), "iroh:abcd"); + } + + #[test] + fn transport_addr_display_socket() { + let addr = TransportAddr::Socket("127.0.0.1:9000".parse().unwrap()); + assert_eq!(addr.to_string(), "tcp:127.0.0.1:9000"); + } + + #[test] + fn transport_addr_display_lora() { + let addr = TransportAddr::LoRa([0x01, 0x02, 0x03, 0x04]); + assert_eq!(addr.to_string(), "lora:01020304"); + } + + #[test] + fn transport_addr_display_serial() { + let addr = TransportAddr::Serial("/dev/ttyUSB0".to_string()); + assert_eq!(addr.to_string(), "serial:/dev/ttyUSB0"); + } + + #[test] + fn transport_addr_display_raw() { + let addr = TransportAddr::Raw(vec![0xFF, 0xEE, 0xDD, 0xCC, 0xBB]); + assert_eq!(addr.to_string(), "raw:ffeeddcc"); + } + + #[test] + fn transport_addr_display_raw_short() { + let addr = TransportAddr::Raw(vec![0x01]); + assert_eq!(addr.to_string(), "raw:01"); + } + + #[test] + fn transport_addr_equality() { + let a = TransportAddr::Socket("127.0.0.1:8080".parse().unwrap()); + let b = TransportAddr::Socket("127.0.0.1:8080".parse().unwrap()); + let c = TransportAddr::Socket("127.0.0.1:9090".parse().unwrap()); + assert_eq!(a, b); + assert_ne!(a, c); + } +} diff --git a/crates/quicprochat-p2p/src/transport_iroh.rs b/crates/quicprochat-p2p/src/transport_iroh.rs new file mode 100644 index 0000000..0b74153 --- /dev/null +++ b/crates/quicprochat-p2p/src/transport_iroh.rs @@ -0,0 +1,160 @@ +//! iroh QUIC transport implementation. +//! +//! Wraps an [`iroh::Endpoint`] as a [`MeshTransport`], using the same +//! length-prefixed framing as the existing [`P2pNode`](crate::P2pNode). + +use anyhow::{bail, Result}; +use iroh::{Endpoint, EndpointAddr, PublicKey, SecretKey}; + +use crate::transport::{MeshTransport, TransportAddr, TransportInfo, TransportPacket}; + +/// ALPN protocol identifier for the transport-abstracted mesh layer. +/// Distinct from `P2P_ALPN` to avoid conflicts with the existing P2pNode. +const MESH_ALPN: &[u8] = b"quicprochat/mesh/1"; + +/// iroh QUIC mesh transport. +/// +/// Provides encrypted, NAT-traversing connections via iroh relay infrastructure. +pub struct IrohTransport { + endpoint: Endpoint, +} + +impl IrohTransport { + /// Create a new iroh transport, binding a fresh endpoint. + /// + /// If `secret_key` is `None`, a random identity is generated. + pub async fn new(secret_key: Option) -> Result { + let mut builder = Endpoint::builder(); + if let Some(sk) = secret_key { + builder = builder.secret_key(sk); + } + builder = builder.alpns(vec![MESH_ALPN.to_vec()]); + + let endpoint = builder.bind().await?; + + tracing::info!( + node_id = %endpoint.id().fmt_short(), + "IrohTransport started" + ); + + Ok(Self { endpoint }) + } + + /// Create an `IrohTransport` from an already-bound endpoint. + /// + /// The caller must ensure the endpoint was configured with `MESH_ALPN`. + pub fn from_endpoint(endpoint: Endpoint) -> Self { + Self { endpoint } + } + + /// Return this transport's iroh public key. + pub fn public_key(&self) -> PublicKey { + self.endpoint.id() + } + + /// Return the endpoint address for sharing with peers. + pub fn endpoint_addr(&self) -> EndpointAddr { + self.endpoint.addr() + } + + /// Convert a `TransportAddr::Iroh` into an `EndpointAddr`. + fn to_endpoint_addr(addr: &TransportAddr) -> Result { + match addr { + TransportAddr::Iroh(id) => { + let key_bytes: [u8; 32] = id + .as_slice() + .try_into() + .map_err(|_| anyhow::anyhow!("iroh addr must be 32 bytes, got {}", id.len()))?; + let pk = PublicKey::from_bytes(&key_bytes)?; + Ok(EndpointAddr::from(pk)) + } + other => bail!("IrohTransport cannot send to {other}"), + } + } +} + +#[async_trait::async_trait] +impl MeshTransport for IrohTransport { + fn info(&self) -> TransportInfo { + TransportInfo { + name: "iroh-quic".to_string(), + mtu: 65535, + bitrate: 100_000_000, + bidirectional: true, + } + } + + async fn send(&self, dest: &TransportAddr, data: &[u8]) -> Result<()> { + let addr = Self::to_endpoint_addr(dest)?; + let conn = self.endpoint.connect(addr, MESH_ALPN).await?; + + let mut send = conn.open_uni().await.map_err(|e| anyhow::anyhow!("{e}"))?; + + // Length-prefixed framing: [u32 BE length][payload]. + let len = (data.len() as u32).to_be_bytes(); + send.write_all(&len) + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; + send.write_all(data) + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; + send.finish().map_err(|e| anyhow::anyhow!("{e}"))?; + send.stopped().await.map_err(|e| anyhow::anyhow!("{e}"))?; + + tracing::debug!( + peer = %conn.remote_id().fmt_short(), + bytes = data.len(), + "IrohTransport: message sent" + ); + + Ok(()) + } + + async fn recv(&self) -> Result { + let incoming = self + .endpoint + .accept() + .await + .ok_or_else(|| anyhow::anyhow!("no more incoming connections"))?; + + let conn = incoming.await.map_err(|e| anyhow::anyhow!("{e}"))?; + let sender = conn.remote_id(); + + let mut recv = conn + .accept_uni() + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; + + // Read length-prefixed payload. + let mut len_buf = [0u8; 4]; + recv.read_exact(&mut len_buf) + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; + let len = u32::from_be_bytes(len_buf) as usize; + + if len > 5 * 1024 * 1024 { + bail!("payload too large: {len} bytes"); + } + + let mut payload = vec![0u8; len]; + recv.read_exact(&mut payload) + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; + + tracing::debug!( + peer = %sender.fmt_short(), + bytes = len, + "IrohTransport: message received" + ); + + Ok(TransportPacket { + from: TransportAddr::Iroh(sender.as_bytes().to_vec()), + data: payload, + }) + } + + async fn close(&self) -> Result<()> { + self.endpoint.close().await; + Ok(()) + } +} diff --git a/crates/quicprochat-p2p/src/transport_lora.rs b/crates/quicprochat-p2p/src/transport_lora.rs new file mode 100644 index 0000000..1133acc --- /dev/null +++ b/crates/quicprochat-p2p/src/transport_lora.rs @@ -0,0 +1,656 @@ +//! LoRa-style constrained transport with mock RF medium, fragmentation, and EU868 duty-cycle budgeting. +//! +//! Real hardware typically uses a UART-attached module; this crate ships a [`LoRaMockMedium`] that +//! delivers frames between registered node addresses for tests and the integration example. +//! +//! # Wire format (mock / modem-passthrough oriented) +//! +//! - **Whole datagram** (`0x01`): `LR` magic, type, 4-byte source, 4-byte destination, `u16` BE length, payload. +//! - **Fragment** (`0x02`): same header prefix + `frag_id` (u32 BE), `idx`, `total`, `u16` BE chunk length, chunk. + +use std::collections::{HashMap, VecDeque}; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use anyhow::{bail, Result}; +use tokio::sync::Mutex; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; + +use crate::transport::{MeshTransport, TransportAddr, TransportInfo, TransportPacket}; + +const FRAME_MAGIC: [u8; 2] = *b"LR"; +const TYPE_WHOLE: u8 = 0x01; +const TYPE_FRAG: u8 = 0x02; + +const WHOLE_HEADER: usize = 2 + 1 + 4 + 4 + 2; +const FRAG_HEADER: usize = 2 + 1 + 4 + 4 + 4 + 1 + 1 + 2; + +/// LoRa radio and serial link parameters (modem AT layer is out of scope; UART path is optional extension). +#[derive(Clone, Debug)] +pub struct LoRaConfig { + /// Serial device path when using hardware (informational / future UART backend). + pub port: String, + pub baud_rate: u32, + pub frequency: u64, + pub spreading_factor: u8, + pub bandwidth: u32, + /// LoRa coding rate denominator n in 4/n (5..=8 → 4/5 .. 4/8). + pub coding_rate: u8, + pub tx_power: i8, + /// Max frame size including headers (modem MTU). If `None`, derived from spreading factor. + pub max_frame_len: Option, +} + +impl Default for LoRaConfig { + fn default() -> Self { + Self { + port: String::new(), + baud_rate: 115_200, + frequency: 868_100_000, + spreading_factor: 7, + bandwidth: 125_000, + coding_rate: 5, + tx_power: 14, + max_frame_len: None, + } + } +} + +impl LoRaConfig { + /// Typical max MAC payload for EU868 / 125 kHz (order-of-magnitude; modem-specific). + pub fn default_max_frame_len(&self) -> usize { + if let Some(m) = self.max_frame_len { + return m.clamp(WHOLE_HEADER + 1, 256); + } + let mtu = match self.spreading_factor { + 7 | 8 => 222, + 9 => 115, + 10 | 11 | 12 => 51, + _ => 128, + }; + mtu.clamp(WHOLE_HEADER + 1, 256) + } + + fn cr_index(&self) -> u64 { + match self.coding_rate.clamp(5, 8) { + 5 => 1, + 6 => 2, + 7 => 3, + 8 => 4, + _ => 1, + } + } +} + +/// Approximate LoRa time-on-air in milliseconds for a given PHY payload length (including our framing). +pub fn lora_airtime_ms(payload_len: usize, cfg: &LoRaConfig) -> u64 { + let sf = cfg.spreading_factor.clamp(7, 12) as u64; + let bw = cfg.bandwidth.max(7_800) as u64; + let t_sym_us = ((1u64 << sf) * 1_000_000u64) / bw; + let preamble_syms = 12u64 + 4; + let preamble_us = preamble_syms * t_sym_us; + + let de = 0i64; + let pl = payload_len as i64; + let sf_i = sf as i64; + let numerator = 8 * pl - 4 * sf_i + 28 + 16 - 20; + let denom = 4 * (sf_i - 2 * de); + let payload_symb = if denom > 0 && numerator > 0 { + let ceiled = (numerator + denom - 1) / denom; + let cr = cfg.cr_index() as i64; + 8 + ceiled * (cr + 4) + } else { + 8i64 + }; + let payload_us = (payload_symb as u64).saturating_mul(t_sym_us); + (preamble_us + payload_us) / 1000 +} + +/// Rough PHY bitrate estimate (bits/s) for routing metrics — not precise at low SNR. +pub fn lora_nominal_bitrate_bps(cfg: &LoRaConfig) -> u64 { + let sf = cfg.spreading_factor.clamp(7, 12) as u32; + let bw = cfg.bandwidth.max(7_800); + // bits per symbol ≈ SF; symbol rate ≈ BW / 2^SF + let sym_rate = (bw as u64) / (1u64 << sf); + sym_rate.saturating_mul(sf as u64) +} + +/// EU868-style 1% duty cycle: at most 36_000 ms airtime per rolling hour. +#[derive(Debug)] +pub struct DutyCycleTracker { + max_ms_per_hour: u64, + window: Mutex>, +} + +impl DutyCycleTracker { + pub fn new(max_ms_airtime_per_hour: u64) -> Self { + Self { + max_ms_per_hour: max_ms_airtime_per_hour, + window: Mutex::new(VecDeque::new()), + } + } + + /// 1% of one hour = 36 seconds of transmission time. + pub fn eu868_one_percent() -> Self { + Self::new(36_000) + } + + fn prune_old( deque: &mut VecDeque<(Instant, u64)>) { + let cutoff = Instant::now() - Duration::from_secs(3600); + while let Some(&(t, _)) = deque.front() { + if t < cutoff { + deque.pop_front(); + } else { + break; + } + } + } + + fn sum_ms(deque: &VecDeque<(Instant, u64)>) -> u64 { + deque.iter().map(|(_, m)| m).sum() + } + + /// Wait until `airtime_ms` fits in the budget, then record it. + pub async fn acquire(&self, airtime_ms: u64) { + loop { + let sleep_for = { + let mut deque = self.window.lock().await; + Self::prune_old(&mut deque); + let used = Self::sum_ms(&deque); + if used + airtime_ms <= self.max_ms_per_hour { + deque.push_back((Instant::now(), airtime_ms)); + return; + } + if let Some(&(oldest, _)) = deque.front() { + let elapsed = oldest.elapsed(); + let until_refresh = Duration::from_secs(3600).saturating_sub(elapsed); + until_refresh.max(Duration::from_millis(1)) + } else { + Duration::from_millis(1) + } + }; + tokio::time::sleep(sleep_for).await; + } + } + + /// Total recorded airtime in the current window (for tests / diagnostics). + pub async fn used_ms_in_window(&self) -> u64 { + let mut deque = self.window.lock().await; + Self::prune_old(&mut deque); + Self::sum_ms(&deque) + } +} + +/// In-process mock RF cloud: addressed delivery between registered 4-byte LoRa addresses. +#[derive(Debug)] +pub struct LoRaMockMedium { + nodes: Mutex>>>, +} + +impl LoRaMockMedium { + pub fn new() -> Arc { + Arc::new(Self { + nodes: Mutex::new(HashMap::new()), + }) + } + + /// Register a node; returns a transport bound to `my_addr`. + pub async fn connect( + self: &Arc, + my_addr: [u8; 4], + config: LoRaConfig, + duty: Arc, + ) -> Result { + let (tx, rx) = unbounded_channel(); + let mut map = self.nodes.lock().await; + if map.contains_key(&my_addr) { + bail!("LoRa address already registered on this medium"); + } + map.insert(my_addr, tx); + drop(map); + + Ok(LoRaTransport { + medium: Arc::clone(self), + my_addr, + inbox: Mutex::new(rx), + config, + duty, + assembler: Mutex::new(FragmentAssembler::default()), + }) + } + + async fn deliver(self: &Arc, dest: [u8; 4], frame: Vec) -> Result<()> { + let sender = { + let map = self.nodes.lock().await; + map.get(&dest) + .cloned() + .ok_or_else(|| anyhow::anyhow!("unknown LoRa destination {dest:02x?}"))? + }; + sender + .send(frame) + .map_err(|_| anyhow::anyhow!("LoRa peer inbox closed"))?; + Ok(()) + } + + async fn unregister(self: &Arc, addr: [u8; 4]) { + let mut map = self.nodes.lock().await; + map.remove(&addr); + } +} + +/// LoRa [`MeshTransport`] using [`LoRaMockMedium`]. +pub struct LoRaTransport { + medium: Arc, + my_addr: [u8; 4], + inbox: Mutex>>, + config: LoRaConfig, + duty: Arc, + assembler: Mutex, +} + +impl LoRaTransport { + pub fn local_address(&self) -> [u8; 4] { + self.my_addr + } + + pub fn transport_addr(&self) -> TransportAddr { + TransportAddr::LoRa(self.my_addr) + } + + fn max_frame_len(&self) -> usize { + self.config.default_max_frame_len() + } + + fn whole_payload_cap(&self) -> usize { + self.max_frame_len().saturating_sub(WHOLE_HEADER) + } + + fn frag_payload_cap(&self) -> usize { + self.max_frame_len().saturating_sub(FRAG_HEADER) + } + + fn build_whole(src: [u8; 4], dst: [u8; 4], payload: &[u8]) -> Result> { + let len = payload.len(); + if len > u16::MAX as usize { + bail!("LoRa payload too large"); + } + let mut v = Vec::with_capacity(WHOLE_HEADER + len); + v.extend_from_slice(&FRAME_MAGIC); + v.push(TYPE_WHOLE); + v.extend_from_slice(&src); + v.extend_from_slice(&dst); + v.extend_from_slice(&(len as u16).to_be_bytes()); + v.extend_from_slice(payload); + Ok(v) + } + + fn build_frag( + src: [u8; 4], + dst: [u8; 4], + frag_id: u32, + idx: u8, + total: u8, + chunk: &[u8], + ) -> Result> { + let len = chunk.len(); + if len > u16::MAX as usize { + bail!("fragment chunk too large"); + } + let mut v = Vec::with_capacity(FRAG_HEADER + len); + v.extend_from_slice(&FRAME_MAGIC); + v.push(TYPE_FRAG); + v.extend_from_slice(&src); + v.extend_from_slice(&dst); + v.extend_from_slice(&frag_id.to_be_bytes()); + v.push(idx); + v.push(total); + v.extend_from_slice(&(len as u16).to_be_bytes()); + v.extend_from_slice(chunk); + Ok(v) + } + + fn parse_frame(buf: &[u8]) -> Result { + if buf.len() < 2 || buf[0] != FRAME_MAGIC[0] || buf[1] != FRAME_MAGIC[1] { + bail!("invalid LoRa frame magic"); + } + if buf.len() < 3 { + bail!("truncated LoRa frame"); + } + match buf[2] { + TYPE_WHOLE => { + if buf.len() < WHOLE_HEADER { + bail!("truncated whole frame"); + } + let mut src = [0u8; 4]; + src.copy_from_slice(&buf[3..7]); + let mut dst = [0u8; 4]; + dst.copy_from_slice(&buf[7..11]); + let plen = u16::from_be_bytes([buf[11], buf[12]]) as usize; + if buf.len() != WHOLE_HEADER + plen { + bail!("whole frame length mismatch"); + } + Ok(ParsedFrame::Whole { + src, + dst, + payload: buf[WHOLE_HEADER..].to_vec(), + }) + } + TYPE_FRAG => { + if buf.len() < FRAG_HEADER { + bail!("truncated fragment frame"); + } + let mut src = [0u8; 4]; + src.copy_from_slice(&buf[3..7]); + let mut dst = [0u8; 4]; + dst.copy_from_slice(&buf[7..11]); + let frag_id = u32::from_be_bytes([buf[11], buf[12], buf[13], buf[14]]); + let idx = buf[15]; + let total = buf[16]; + let clen = u16::from_be_bytes([buf[17], buf[18]]) as usize; + if buf.len() != FRAG_HEADER + clen { + bail!("fragment length mismatch"); + } + Ok(ParsedFrame::Frag { + src, + dst, + frag_id, + idx, + total, + chunk: buf[FRAG_HEADER..].to_vec(), + }) + } + t => bail!("unknown LoRa frame type {t}"), + } + } +} + +enum ParsedFrame { + Whole { + src: [u8; 4], + dst: [u8; 4], + payload: Vec, + }, + Frag { + src: [u8; 4], + dst: [u8; 4], + frag_id: u32, + idx: u8, + total: u8, + chunk: Vec, + }, +} + +#[derive(Default)] +struct FragmentAssembler { + partials: HashMap<(u32, [u8; 4]), PartialFrag>, +} + +struct PartialFrag { + total: u8, + pieces: HashMap>, + started: Instant, +} + +impl FragmentAssembler { + const TIMEOUT: Duration = Duration::from_secs(120); + + fn push( + &mut self, + src: [u8; 4], + frag_id: u32, + idx: u8, + total: u8, + chunk: Vec, + ) -> Result>> { + self.gc(); + let key = (frag_id, src); + let entry = self + .partials + .entry(key) + .or_insert_with(|| PartialFrag { + total, + pieces: HashMap::new(), + started: Instant::now(), + }); + if entry.total != total { + bail!("fragment total mismatch"); + } + entry.pieces.insert(idx, chunk); + if entry.pieces.len() == total as usize { + let mut out = Vec::new(); + for i in 0..total { + let piece = entry + .pieces + .get(&i) + .ok_or_else(|| anyhow::anyhow!("missing fragment index {i}"))?; + out.extend_from_slice(piece); + } + self.partials.remove(&key); + return Ok(Some(out)); + } + Ok(None) + } + + fn gc(&mut self) { + let now = Instant::now(); + self.partials + .retain(|_, p| now.duration_since(p.started) < Self::TIMEOUT); + } +} + +#[async_trait::async_trait] +impl MeshTransport for LoRaTransport { + fn info(&self) -> TransportInfo { + TransportInfo { + name: "lora".to_string(), + mtu: self.whole_payload_cap(), + bitrate: lora_nominal_bitrate_bps(&self.config), + bidirectional: true, + } + } + + async fn send(&self, dest: &TransportAddr, data: &[u8]) -> Result<()> { + let dst = match dest { + TransportAddr::LoRa(a) => *a, + other => bail!("LoRaTransport cannot send to {other}"), + }; + + let max_frame = self.max_frame_len(); + let cap_whole = self.whole_payload_cap(); + let cap_frag = self.frag_payload_cap().max(1); + + let frames: Vec> = if data.len() <= cap_whole { + vec![Self::build_whole(self.my_addr, dst, data)?] + } else { + let frag_id = random_frag_id(); + let chunk_sz = cap_frag; + let total = data.chunks(chunk_sz).count(); + if total > u8::MAX as usize { + bail!("payload needs more than 255 fragments"); + } + let total_u8 = total as u8; + let mut out = Vec::with_capacity(total); + for (idx, chunk) in data.chunks(chunk_sz).enumerate() { + out.push(Self::build_frag( + self.my_addr, + dst, + frag_id, + idx as u8, + total_u8, + chunk, + )?); + } + out + }; + + for frame in frames { + let air = lora_airtime_ms(frame.len(), &self.config); + self.duty.acquire(air).await; + if frame.len() > max_frame { + bail!("LoRa frame exceeds configured MTU"); + } + self.medium.deliver(dst, frame).await?; + } + + Ok(()) + } + + async fn recv(&self) -> Result { + loop { + let raw = { + let mut inbox = self.inbox.lock().await; + inbox + .recv() + .await + .ok_or_else(|| anyhow::anyhow!("LoRa inbox closed"))? + }; + + match Self::parse_frame(&raw)? { + ParsedFrame::Whole { src, dst, payload } => { + if dst != self.my_addr { + continue; + } + return Ok(TransportPacket { + from: TransportAddr::LoRa(src), + data: payload, + }); + } + ParsedFrame::Frag { + src, + dst, + frag_id, + idx, + total, + chunk, + } => { + if dst != self.my_addr { + continue; + } + let mut asm = self.assembler.lock().await; + if let Some(complete) = asm.push(src, frag_id, idx, total, chunk)? { + return Ok(TransportPacket { + from: TransportAddr::LoRa(src), + data: complete, + }); + } + } + } + } + } + + async fn close(&self) -> Result<()> { + self.medium.unregister(self.my_addr).await; + Ok(()) + } +} + +fn random_frag_id() -> u32 { + use rand::Rng; + rand::thread_rng().gen::() +} + +/// Split `data` into chunks suitable for a transport with `max_payload` bytes per frame (application layer). +pub fn split_for_mtu(data: &[u8], max_payload: usize) -> Vec<&[u8]> { + if max_payload == 0 { + return vec![data]; + } + data.chunks(max_payload).collect() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn airtime_increases_with_sf() { + let mut low = LoRaConfig::default(); + low.spreading_factor = 7; + let mut high = LoRaConfig::default(); + high.spreading_factor = 12; + let n = 64; + assert!(lora_airtime_ms(n, &high) >= lora_airtime_ms(n, &low)); + } + + #[tokio::test] + async fn mock_roundtrip() { + let medium = LoRaMockMedium::new(); + let duty = Arc::new(DutyCycleTracker::new(3600 * 1000)); + let a = medium + .connect([1, 0, 0, 0], LoRaConfig::default(), Arc::clone(&duty)) + .await + .expect("connect a"); + let b = medium + .connect([2, 0, 0, 0], LoRaConfig::default(), Arc::clone(&duty)) + .await + .expect("connect b"); + + let dest = TransportAddr::LoRa([2, 0, 0, 0]); + let payload = b"mesh-over-lora"; + + let recv_h = tokio::spawn(async move { + let pkt = b.recv().await.expect("recv"); + assert_eq!(pkt.data, payload.to_vec()); + match pkt.from { + TransportAddr::LoRa(addr) => assert_eq!(addr, [1, 0, 0, 0]), + _ => panic!("expected LoRa from-address"), + } + b.close().await.expect("close b"); + }); + + tokio::time::sleep(Duration::from_millis(20)).await; + a.send(&dest, payload).await.expect("send"); + + recv_h.await.expect("join"); + a.close().await.expect("close a"); + } + + #[tokio::test] + async fn fragmentation_roundtrip() { + let medium = LoRaMockMedium::new(); + let duty = Arc::new(DutyCycleTracker::new(3600 * 1000)); + let mut cfg = LoRaConfig::default(); + cfg.max_frame_len = Some(48); + let a = medium + .connect([0x10, 0, 0, 0], cfg.clone(), Arc::clone(&duty)) + .await + .expect("a"); + let b = medium + .connect([0x20, 0, 0, 0], cfg, Arc::clone(&duty)) + .await + .expect("b"); + + let dest = TransportAddr::LoRa([0x20, 0, 0, 0]); + let payload: Vec = (0u8..200).collect(); + let expected = payload.clone(); + + let recv_h = tokio::spawn(async move { + let pkt = b.recv().await.expect("recv"); + assert_eq!(pkt.data, expected); + b.close().await.ok(); + }); + + tokio::time::sleep(Duration::from_millis(20)).await; + a.send(&dest, &payload).await.expect("send frag"); + + recv_h.await.expect("join"); + a.close().await.ok(); + } + + #[tokio::test] + async fn duty_cycle_records_airtime() { + let duty = Arc::new(DutyCycleTracker::new(100_000)); + duty.acquire(55).await; + let used = duty.used_ms_in_window().await; + assert!(used >= 55, "expected recorded airtime, got {used}"); + } + + #[test] + fn split_for_mtu_chunks() { + let data = [1u8, 2, 3, 4, 5]; + let parts = split_for_mtu(&data, 2); + assert_eq!(parts.len(), 3); + assert_eq!(parts[0], &[1, 2][..]); + assert_eq!(parts[1], &[3, 4][..]); + assert_eq!(parts[2], &[5][..]); + } +} diff --git a/crates/quicprochat-p2p/src/transport_manager.rs b/crates/quicprochat-p2p/src/transport_manager.rs new file mode 100644 index 0000000..cdaacfe --- /dev/null +++ b/crates/quicprochat-p2p/src/transport_manager.rs @@ -0,0 +1,181 @@ +//! Multi-transport manager for routing packets across different backends. +//! +//! The [`TransportManager`] holds multiple [`MeshTransport`] implementations +//! and selects the best one for a given [`TransportAddr`] variant. +//! +//! [`crate::transport_lora::LoRaTransport`] performs MTU-aware fragmentation internally; use +//! [`crate::transport_lora::split_for_mtu`] only when chunking at a higher layer. + +use anyhow::{bail, Result}; + +use crate::transport::{MeshTransport, TransportAddr, TransportInfo}; + +/// Manages multiple mesh transports and routes packets to the best available one. +pub struct TransportManager { + transports: Vec>, +} + +impl TransportManager { + /// Create an empty transport manager. + pub fn new() -> Self { + Self { + transports: Vec::new(), + } + } + + /// Register a transport backend. + pub fn add(&mut self, transport: Box) { + self.transports.push(transport); + } + + /// Send data, choosing the best transport for the given address type. + /// + /// The selection heuristic matches the [`TransportAddr`] variant to the + /// transport whose name corresponds to that variant (iroh for `Iroh`, + /// tcp for `Socket`, etc). Falls back to trying each transport in order. + pub async fn send(&self, dest: &TransportAddr, data: &[u8]) -> Result<()> { + let target_name = match dest { + TransportAddr::Iroh(_) => "iroh-quic", + TransportAddr::Socket(_) => "tcp", + TransportAddr::LoRa(_) => "lora", + TransportAddr::Serial(_) => "serial", + TransportAddr::Raw(_) => "", + }; + + // First, try the transport whose name matches the address type. + for t in &self.transports { + if t.info().name == target_name { + return t.send(dest, data).await; + } + } + + // Fallback: try each transport in order until one succeeds. + let mut last_err = None; + for t in &self.transports { + match t.send(dest, data).await { + Ok(()) => return Ok(()), + Err(e) => last_err = Some(e), + } + } + + match last_err { + Some(e) => Err(e), + None => bail!("no transports registered"), + } + } + + /// List all registered transports. + pub fn transports(&self) -> &[Box] { + &self.transports + } + + /// Get info for all registered transports. + pub fn transport_info(&self) -> Vec { + self.transports.iter().map(|t| t.info()).collect() + } + + /// Shut down all transports. + pub async fn close_all(&self) -> Result<()> { + for t in &self.transports { + t.close().await?; + } + Ok(()) + } +} + +impl Default for TransportManager { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::transport::TransportPacket; + + /// A mock transport that accepts any send and returns a fixed name. + struct MockTransport { + name: String, + } + + impl MockTransport { + fn new(name: &str) -> Self { + Self { + name: name.to_string(), + } + } + } + + #[async_trait::async_trait] + impl MeshTransport for MockTransport { + fn info(&self) -> TransportInfo { + TransportInfo { + name: self.name.clone(), + mtu: 1500, + bitrate: 1_000_000, + bidirectional: true, + } + } + + async fn send(&self, _dest: &TransportAddr, _data: &[u8]) -> Result<()> { + Ok(()) + } + + async fn recv(&self) -> Result { + bail!("MockTransport does not support recv") + } + } + + #[tokio::test] + async fn routes_socket_to_tcp() { + let mut mgr = TransportManager::new(); + mgr.add(Box::new(MockTransport::new("tcp"))); + mgr.add(Box::new(MockTransport::new("iroh-quic"))); + + let addr = TransportAddr::Socket("127.0.0.1:8080".parse().unwrap()); + let result = mgr.send(&addr, b"test data").await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn routes_iroh_to_iroh_transport() { + let mut mgr = TransportManager::new(); + mgr.add(Box::new(MockTransport::new("tcp"))); + mgr.add(Box::new(MockTransport::new("iroh-quic"))); + + let addr = TransportAddr::Iroh(vec![0xAA; 32]); + let result = mgr.send(&addr, b"test data").await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn no_transports_returns_error() { + let mgr = TransportManager::new(); + let addr = TransportAddr::Socket("127.0.0.1:8080".parse().unwrap()); + let result = mgr.send(&addr, b"data").await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn transport_info_lists_all() { + let mut mgr = TransportManager::new(); + mgr.add(Box::new(MockTransport::new("tcp"))); + mgr.add(Box::new(MockTransport::new("iroh-quic"))); + + let infos = mgr.transport_info(); + assert_eq!(infos.len(), 2); + assert_eq!(infos[0].name, "tcp"); + assert_eq!(infos[1].name, "iroh-quic"); + } + + #[tokio::test] + async fn close_all_succeeds() { + let mut mgr = TransportManager::new(); + mgr.add(Box::new(MockTransport::new("tcp"))); + mgr.add(Box::new(MockTransport::new("iroh-quic"))); + + let result = mgr.close_all().await; + assert!(result.is_ok()); + } +} diff --git a/crates/quicprochat-p2p/src/transport_tcp.rs b/crates/quicprochat-p2p/src/transport_tcp.rs new file mode 100644 index 0000000..29616ec --- /dev/null +++ b/crates/quicprochat-p2p/src/transport_tcp.rs @@ -0,0 +1,151 @@ +//! Simple TCP mesh transport for testing and local networks. +//! +//! Uses length-prefixed framing (`[u32 BE length][payload]`) over raw TCP +//! connections. Each send opens a new connection; each recv accepts one. + +use std::net::SocketAddr; +use std::sync::Arc; + +use anyhow::{bail, Result}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::{TcpListener, TcpStream}; + +use crate::transport::{MeshTransport, TransportAddr, TransportInfo, TransportPacket}; + +/// TCP mesh transport. +/// +/// Listens on a local port for incoming connections and sends packets by +/// connecting to remote socket addresses. +pub struct TcpTransport { + listener: Arc, + local_addr: SocketAddr, +} + +impl TcpTransport { + /// Bind a new TCP transport on the given address. + /// + /// Use `"127.0.0.1:0"` to let the OS assign a free port. + pub async fn bind(addr: &str) -> Result { + let listener = TcpListener::bind(addr).await?; + let local_addr = listener.local_addr()?; + + tracing::info!(%local_addr, "TcpTransport listening"); + + Ok(Self { + listener: Arc::new(listener), + local_addr, + }) + } + + /// The local address this transport is listening on. + pub fn local_addr(&self) -> SocketAddr { + self.local_addr + } + + /// Create a [`TransportAddr::Socket`] pointing to this transport's listen address. + pub fn transport_addr(&self) -> TransportAddr { + TransportAddr::Socket(self.local_addr) + } +} + +#[async_trait::async_trait] +impl MeshTransport for TcpTransport { + fn info(&self) -> TransportInfo { + TransportInfo { + name: "tcp".to_string(), + mtu: 65535, + bitrate: 1_000_000_000, + bidirectional: true, + } + } + + async fn send(&self, dest: &TransportAddr, data: &[u8]) -> Result<()> { + let addr = match dest { + TransportAddr::Socket(addr) => *addr, + other => bail!("TcpTransport cannot send to {other}"), + }; + + let mut stream = TcpStream::connect(addr).await?; + + // Length-prefixed framing: [u32 BE length][payload]. + let len = (data.len() as u32).to_be_bytes(); + stream.write_all(&len).await?; + stream.write_all(data).await?; + stream.flush().await?; + stream.shutdown().await?; + + tracing::debug!(%addr, bytes = data.len(), "TcpTransport: message sent"); + + Ok(()) + } + + async fn recv(&self) -> Result { + let (mut stream, peer_addr) = self.listener.accept().await?; + + // Read length-prefixed payload. + let mut len_buf = [0u8; 4]; + stream.read_exact(&mut len_buf).await?; + let len = u32::from_be_bytes(len_buf) as usize; + + if len > 5 * 1024 * 1024 { + bail!("payload too large: {len} bytes"); + } + + let mut payload = vec![0u8; len]; + stream.read_exact(&mut payload).await?; + + tracing::debug!(%peer_addr, bytes = len, "TcpTransport: message received"); + + Ok(TransportPacket { + from: TransportAddr::Socket(peer_addr), + data: payload, + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn tcp_roundtrip() { + let transport = TcpTransport::bind("127.0.0.1:0") + .await + .expect("bind TCP transport"); + let dest = transport.transport_addr(); + + let payload = b"hello over TCP"; + + let recv_handle = tokio::spawn(async move { + let packet = transport.recv().await.expect("recv packet"); + assert_eq!(packet.data, payload.to_vec()); + // Source should be a Socket address. + match &packet.from { + TransportAddr::Socket(_) => {} + other => panic!("expected Socket addr, got {other}"), + } + }); + + // Give the listener a moment to be ready. + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // Send via a separate TcpTransport (simulating a different node). + let sender = TcpTransport::bind("127.0.0.1:0") + .await + .expect("bind sender"); + sender.send(&dest, payload).await.expect("send packet"); + + recv_handle.await.expect("recv task completed"); + } + + #[tokio::test] + async fn tcp_rejects_non_socket_addr() { + let transport = TcpTransport::bind("127.0.0.1:0") + .await + .expect("bind TCP transport"); + + let bad_addr = TransportAddr::LoRa([0x01, 0x02, 0x03, 0x04]); + let result = transport.send(&bad_addr, b"nope").await; + assert!(result.is_err()); + } +} diff --git a/docs/plans/reticulum-mesh-upgrade.md b/docs/plans/reticulum-mesh-upgrade.md new file mode 100644 index 0000000..c0fd6d5 --- /dev/null +++ b/docs/plans/reticulum-mesh-upgrade.md @@ -0,0 +1,511 @@ +# Reticulum-Inspired Mesh Upgrade Plan + +> **Goal:** Transform quicprochat's P2P layer from a simple direct/relay hybrid into a +> self-organizing, multi-hop mesh capable of running over LoRa, Packet Radio, Serial, +> and other low-bandwidth transports — incorporating 8 years of Reticulum design +> learnings, but with Rust, MLS, and post-quantum crypto. +> +> Created: 2026-03-30 | Sprints: 6 | Area: `quicprochat-p2p` + `quicprochat-core` + +--- + +## Architecture Vision + +``` +Before (current): + Client A ──── iroh QUIC ────► Client B (direct P2P) + │ │ + └── QUIC/TLS ── Server ── QUIC/TLS ┘ (relay fallback) + +After (target): + Client A ── LoRa ── Node X ── WiFi ── Node Y ── Serial ── Client B + │ │ + └── iroh QUIC ── Server (optional) ── iroh QUIC ──────────┘ + ▲ + any transport works: + LoRa, Serial, TCP, UDP, WiFi, Packet Radio, QUIC +``` + +Key difference from Reticulum: we keep MLS group encryption, post-quantum hybrid KEM, +and formal Protobuf framing. Reticulum's transport-agnostic routing and announce +semantics are the inspiration, not the crypto. + +--- + +## Sprint Overview + +| Sprint | Name | Focus | Key Deliverable | +|--------|------|-------|-----------------| +| S1 | Binary Wire Format | Efficiency | CBOR `MeshEnvelope`, ~70% size reduction | +| S2 | Transport Abstraction | Architecture | `MeshTransport` trait, pluggable backends | +| S3 | Announce & Discovery | Self-Organization | Network-wide announce propagation + routing table | +| S4 | Multi-Hop Routing | Core Mesh | Autonomous packet forwarding across intermediate nodes | +| S5 | Truncated Addresses + Lightweight Handshake | LoRa-Ready | 16-byte addresses, minimal handshake for constrained links | +| S6 | LoRa Transport + Integration | Hardware | Working LoRa backend, end-to-end mesh demo | + +--- + +## S1 — Binary Wire Format + +**Problem:** `MeshEnvelope::to_bytes()` uses JSON serialization. A typical envelope +is ~500-800 bytes in JSON. On LoRa at 300 bps, that's 13-21 seconds per message. + +**Solution:** CBOR binary serialization via `ciborium` (already in workspace deps). + +**Deliverables:** + +1. **`envelope_binary.rs`** — new serialization functions: + - `MeshEnvelope::to_cbor() -> Vec` — compact binary encoding + - `MeshEnvelope::from_cbor(bytes: &[u8]) -> Result` — decoding + - Keep `to_bytes()`/`from_bytes()` as JSON for debug/human-readable use + - Add `to_wire() -> Vec` as the default wire format (CBOR) + - Add `from_wire(bytes: &[u8]) -> Result` for receiving + +2. **Compact field encoding:** + - `sender_key`: 32 bytes raw (not hex-encoded) + - `recipient_key`: 32 bytes raw (or 16 bytes truncated, prep for S5) + - `signature`: 64 bytes raw + - `id`: 32 bytes raw + - `payload`: raw bytes (no base64) + - `timestamp`: u64 (8 bytes) + - `ttl_secs`: u32 (4 bytes) + - `hop_count`: u8 (1 byte) + - `max_hops`: u8 (1 byte) + +3. **Size comparison test:** + - Create identical envelopes, serialize both ways, assert CBOR < 50% of JSON + - Expected: ~140-160 bytes CBOR vs ~500-800 bytes JSON for a typical message + +4. **Migration:** `P2pNode::send_mesh()` and `broadcast()` switch to `to_wire()`. + `from_wire()` tries CBOR first, falls back to JSON for backward compat. + +**Tests:** Roundtrip CBOR, size comparison, backward compat with JSON, fuzz test +for malformed CBOR input. + +**Estimated changes:** ~150 lines new code, ~20 lines modified. + +--- + +## S2 — Transport Abstraction + +**Problem:** P2P layer is hardcoded to iroh QUIC. Cannot support LoRa, Serial, +Packet Radio, or other media. + +**Solution:** Abstract transport behind a trait. Reticulum calls this "Interface" — +we call it `MeshTransport`. + +**Deliverables:** + +1. **`transport.rs`** — trait definition: + ```rust + #[async_trait] + pub trait MeshTransport: Send + Sync { + /// Human-readable transport name (e.g., "iroh-quic", "lora", "serial"). + fn name(&self) -> &str; + + /// Maximum transmission unit in bytes. + fn mtu(&self) -> usize; + + /// Estimated bitrate in bits/second (for routing cost calculation). + fn bitrate(&self) -> u64; + + /// Whether this transport supports bidirectional communication. + fn is_bidirectional(&self) -> bool; + + /// Send raw bytes to a destination address. + async fn send(&self, dest: &TransportAddr, data: &[u8]) -> Result<()>; + + /// Receive the next incoming packet. Blocks until data arrives. + async fn recv(&self) -> Result<(TransportAddr, Vec)>; + + /// List reachable peers on this transport (e.g., mDNS scan, LoRa beacon). + async fn discover(&self) -> Result>; + } + + /// Transport-agnostic address. + pub enum TransportAddr { + /// iroh node ID + optional relay. + Iroh(iroh::EndpointAddr), + /// IP:port for TCP/UDP transports. + Socket(std::net::SocketAddr), + /// LoRa device address (4 bytes). + LoRa([u8; 4]), + /// Serial port path. + Serial(String), + /// Raw bytes for unknown transports. + Raw(Vec), + } + ``` + +2. **`transport_iroh.rs`** — refactor existing `P2pNode` send/recv into + `IrohTransport` implementing `MeshTransport`. + +3. **`transport_tcp.rs`** — simple TCP transport for testing and wired mesh nodes. + Length-prefixed packets over a TCP stream. + +4. **`P2pNode` refactor:** Accept `Vec>` instead of + hardcoded `Endpoint`. The node listens on all transports simultaneously. + +5. **`TransportManager`** — manages multiple transports, routes outbound packets + to the best available transport for a given destination. + +**Tests:** IrohTransport passes existing P2P tests, TcpTransport roundtrip, +multi-transport node startup. + +**Estimated changes:** ~400 lines new code, ~100 lines refactored. + +--- + +## S3 — Announce & Discovery Protocol + +**Problem:** No mesh-wide discovery. mDNS only works on LAN. Nodes beyond one hop +are invisible. + +**Solution:** Reticulum-style announce propagation. Nodes broadcast signed announcements +that propagate through the mesh, building a distributed routing table. + +**Deliverables:** + +1. **`announce.rs`** — Announce packet: + ```rust + pub struct MeshAnnounce { + /// Ed25519 public key of the announcing node. + pub identity_key: [u8; 32], + /// Truncated address (hash of identity_key, 16 bytes). Prep for S5. + pub address: [u8; 16], + /// Capabilities bitfield (supports_relay, supports_store, etc.). + pub capabilities: u16, + /// Sequence number (monotonically increasing per node). + pub sequence: u64, + /// Unix timestamp. + pub timestamp: u64, + /// Transports this node is reachable on (list of transport name + addr). + pub reachable_via: Vec<(String, Vec)>, + /// Ed25519 signature over all above fields. + pub signature: [u8; 64], + } + ``` + +2. **Announce propagation rules (Reticulum-inspired):** + - On startup: broadcast own announce on all transports + - On receiving an announce: verify signature, check sequence > last_seen, + update routing table, re-broadcast on all *other* transports (not the one + it arrived on) with hop_count incremented + - Dedup by `(identity_key, sequence)` — don't re-broadcast already-seen announces + - TTL: announces expire after configurable duration (default 30 minutes) + - Periodic re-announce: every 10 minutes (configurable) + +3. **`routing_table.rs`** — Distributed routing table: + ```rust + pub struct RoutingTable { + /// Known destinations: address -> routing entry. + entries: HashMap<[u8; 16], RoutingEntry>, + } + + pub struct RoutingEntry { + /// Full public key of the destination. + pub identity_key: [u8; 32], + /// Next-hop transport + address to reach this destination. + pub next_hop: (String, TransportAddr), + /// Number of hops to destination (from announce hop_count). + pub hops: u8, + /// Estimated cost (hops * inverse_bitrate_weight). + pub cost: f64, + /// When this entry was last refreshed. + pub last_seen: Instant, + /// Capabilities of the destination. + pub capabilities: u16, + } + ``` + +4. **REPL commands:** + - `/mesh announce` — force re-announce + - `/mesh routes` — show full routing table (replaces current `/mesh route`) + - `/mesh nodes` — list all known nodes with hop count and transport + +**Tests:** Announce create/verify, propagation dedup, routing table CRUD, +announce expiry, 3-node propagation simulation. + +**Estimated changes:** ~500 lines new code. + +--- + +## S4 — Multi-Hop Routing + +**Problem:** Messages can only be sent directly or via server relay. No intermediate +node forwarding. + +**Solution:** Autonomous packet forwarding using the routing table from S3. +Every node can relay packets for other nodes. + +**Deliverables:** + +1. **`router.rs`** — replace `HybridRouter` with `MeshRouter`: + ```rust + pub struct MeshRouter { + /// This node's identity. + identity: MeshIdentity, + /// Routing table (populated by announce protocol). + routes: Arc>, + /// Available transports. + transports: Arc, + /// Optional server relay (kept as last-resort fallback). + server_relay: Option>, + /// Store-and-forward for unreachable destinations. + store: Arc>, + /// Per-peer delivery stats. + stats: Arc>>, + } + ``` + +2. **Routing algorithm:** + ``` + send(destination_addr, payload): + 1. Look up destination in routing table + 2. If direct transport available → send directly + 3. If next-hop known → wrap in MeshEnvelope, send to next-hop + (next-hop node will repeat this process) + 4. If no route → store-and-forward (queue for later) + 5. If server relay available → use as last resort + ``` + +3. **Forwarding logic (every node runs this):** + ``` + on_receive(envelope): + 1. Verify signature + 2. If addressed to us → deliver to application layer + 3. If addressed to someone else: + a. Check hop_count < max_hops and not expired + b. Look up destination in routing table + c. Forward via next-hop transport + d. If no route → store for later forwarding + ``` + +4. **Path MTU Discovery:** + - When routing across transports with different MTUs, fragment if needed + - Fragment header: `[fragment_id: u32][seq: u8][total: u8][payload]` + - Reassembly buffer with timeout + +5. **Routing metrics:** + - Track per-path latency, success rate, hop count + - Prefer routes with lower cost (fewer hops, higher bitrate) + - Exponential backoff on failed routes + +6. **REPL commands:** + - `/mesh send
` — now works multi-hop + - `/mesh trace
` — show the route a message would take + - `/mesh stats` — delivery statistics per destination + +**Tests:** 3-node relay chain (A→B→C), route failover, fragmentation roundtrip, +store-and-forward when intermediate node offline, routing metric updates. + +**Estimated changes:** ~600 lines new code, ~200 lines refactored from existing router. + +--- + +## S5 — Truncated Addresses & Lightweight Handshake + +**Problem:** Full 32-byte public keys in every envelope waste bandwidth on constrained +links. QUIC TLS handshake is too heavy for LoRa (2-4 KB). + +**Solution:** Truncated hash-based addresses (Reticulum-style) and a minimal +ECDH handshake for low-bandwidth transports. + +**Deliverables:** + +1. **`address.rs`** — Mesh address type: + ```rust + /// 16-byte truncated address derived from Ed25519 public key. + /// Matches Reticulum's approach but with different hash construction. + pub struct MeshAddress([u8; 16]); + + impl MeshAddress { + /// Derive from an Ed25519 public key. + /// SHA-256(public_key)[0..16] + pub fn from_public_key(key: &[u8; 32]) -> Self; + + /// Check if this address matches a given public key. + pub fn matches(&self, key: &[u8; 32]) -> bool; + } + ``` + +2. **Envelope v2 with truncated addresses:** + - Replace `sender_key: Vec` (32 bytes) with `sender_addr: MeshAddress` (16 bytes) + - Replace `recipient_key: Vec` (32 bytes) with `recipient_addr: MeshAddress` (16 bytes) + - Full public keys are exchanged during announce (S3) and cached in routing table + - Saves 32 bytes per envelope (significant on LoRa) + +3. **Lightweight handshake for constrained transports:** + ``` + Link Setup (inspired by Reticulum, but with PQ option): + + Packet 1 (Initiator → Responder): 80 bytes + [initiator_addr: 16][ephemeral_x25519_pub: 32][nonce: 24][flags: 8] + + Packet 2 (Responder → Initiator): 112 bytes + [responder_addr: 16][ephemeral_x25519_pub: 32][encrypted_identity_proof: 48][nonce: 16] + + Packet 3 (Initiator → Responder): 48 bytes + [encrypted_identity_proof: 48] + + Total: 240 bytes (vs 2000-4000 for QUIC TLS) + Shared secret: HKDF-SHA256(X25519(eph_a, eph_b) || X25519(id_a, eph_b)) + ``` + +4. **`link.rs`** — `MeshLink` session type: + - Negotiated via lightweight handshake on constrained transports + - ChaCha20-Poly1305 for subsequent messages (using derived shared secret) + - Heartbeat to keep link alive (configurable, default every 5 min) + - Link teardown notification + - Automatic upgrade to QUIC if both sides support it + +5. **Feature flag:** `--features constrained-transport` gates the lightweight + handshake. QUIC remains the default for Internet/LAN. + +**Tests:** Address derivation, collision resistance (generate 10K addresses, check +no collisions), handshake 3-packet roundtrip, link encryption roundtrip, +envelope v2 with truncated addresses. + +**Estimated changes:** ~500 lines new code. + +--- + +## S6 — LoRa Transport & Integration Demo + +**Problem:** All the mesh infrastructure from S1-S5 needs a real constrained-transport +to prove it works. + +**Solution:** LoRa transport backend + end-to-end demo with Meshtastic-compatible +or standalone LoRa hardware. + +**Deliverables:** + +1. **`transport_lora.rs`** — LoRa transport implementation: + ```rust + pub struct LoRaTransport { + /// Serial connection to LoRa modem (e.g., SX1276/SX1262 via UART). + serial: AsyncSerial, + /// LoRa parameters. + config: LoRaConfig, + } + + pub struct LoRaConfig { + /// Serial port path (e.g., /dev/ttyUSB0). + pub port: String, + /// Baud rate for serial connection to modem. + pub baud_rate: u32, + /// LoRa frequency in Hz (e.g., 868_100_000 for EU868). + pub frequency: u64, + /// Spreading factor (7-12). + pub spreading_factor: u8, + /// Bandwidth in Hz (125000, 250000, 500000). + pub bandwidth: u32, + /// Coding rate (5-8, meaning 4/5 to 4/8). + pub coding_rate: u8, + /// TX power in dBm. + pub tx_power: i8, + } + ``` + +2. **MTU-aware fragmentation:** + - LoRa MTU is typically 222 bytes (SF7/BW125) to 51 bytes (SF12/BW125) + - Automatic fragmentation/reassembly in `TransportManager` + - Fragment numbering for out-of-order reassembly + +3. **Duty cycle management:** + - EU868: 1% duty cycle enforcement + - TX budget tracking: don't exceed legal limits + - Queue with priority (announces < data < emergency) + +4. **End-to-end integration demo:** + ``` + Setup: + Node A (Laptop + LoRa) ── LoRa ── Node B (RPi + LoRa) ── WiFi ── Node C (Laptop) + + Demo script: + 1. All three nodes start, announce on their transports + 2. A discovers C through B's routing announcements + 3. A sends encrypted message to C: LoRa → B (relay) → WiFi → C + 4. C replies: WiFi → B (relay) → LoRa → A + 5. Show routing table, hop counts, delivery stats at each node + ``` + +5. **`scripts/mesh-demo.sh`** — automated demo setup script. + +6. **Termux integration:** + - Update existing Termux build scripts for the mesh features + - Android phone as a LoRa mesh node (via USB OTG to LoRa modem) + +**Tests:** LoRa transport with mock serial (loopback), fragmentation across LoRa MTU, +duty cycle enforcement, 3-node integration test (simulated transports). + +**Hardware needed:** 2-3x LoRa modules (SX1262 recommended), RPi or similar. + +**Estimated changes:** ~600 lines new code, ~50 lines build/script changes. + +--- + +## Dependency Graph + +``` +S1 (Binary Wire) S2 (Transport Trait) + │ │ + └──────┬───────────────┘ + │ + S3 (Announce/Discovery) + │ + S4 (Multi-Hop Routing) + │ + S5 (Addresses + Handshake) + │ + S6 (LoRa + Demo) +``` + +S1 and S2 can run in **parallel** (no dependency). S3+ are sequential. + +--- + +## Comparison: quicprochat (after) vs Reticulum + +| Dimension | Reticulum | quicprochat (post-upgrade) | +|-----------|-----------|---------------------------| +| Language | Python | Rust (no_std possible) | +| Crypto | X25519, AES-256-CBC, HMAC-SHA256 | Ed25519, X25519+ML-KEM-768, ChaCha20-Poly1305, MLS | +| Post-Quantum | No | Yes (ML-KEM-768 hybrid) | +| Group Encryption | None (link-level only) | MLS RFC 9420 (forward secrecy + PCS) | +| Wire Format | msgpack | CBOR (compact, IETF standard) | +| Spec | Reference implementation only | Protobuf schemas + potential IETF Draft | +| Transport Agnostic | Yes (mature, 8 years) | Yes (new, but Rust-native) | +| Multi-Hop Routing | Yes (announce + path discovery) | Yes (inspired by Reticulum) | +| Handshake Size | 297 bytes | ~240 bytes | +| Security Audit | None | Designed for auditability (fuzzing, formal model) | +| Embedded Targets | No (CPython required) | Yes (Rust cross-compile, no_std core) | +| LoRa Support | Yes (via RNode) | Yes (direct SX1262 + Meshtastic compat) | + +--- + +## Risk Register + +| Risk | Impact | Mitigation | +|------|--------|------------| +| LoRa hardware availability | Blocks S6 | S1-S5 work with simulated transports; LoRa is optional | +| iroh API breaking changes | Medium | Pin iroh version, abstract behind transport trait (S2) | +| Address collision (16-byte truncation) | Low (birthday: ~2^64) | Monitor, option to use full 32-byte if needed | +| Lightweight handshake security gaps | High | Get crypto review before deploying on real networks | +| Fragmentation complexity | Medium | Start with simple stop-and-wait, optimize later | + +--- + +## Success Criteria + +After S4 (minimum viable mesh): +- [ ] 3+ nodes form a self-organizing mesh over TCP transports +- [ ] Messages route automatically through intermediate nodes +- [ ] Node join/leave is handled gracefully (re-announce, route expiry) +- [ ] Wire format is <200 bytes for a typical chat message envelope + +After S6 (full demo): +- [ ] Working LoRa ↔ WiFi ↔ QUIC heterogeneous mesh +- [ ] Message delivery across 3 hops with different transports +- [ ] Duty cycle compliance on EU868 +- [ ] Android (Termux) node participates in the mesh diff --git a/docs/status.md b/docs/status.md new file mode 100644 index 0000000..12b56d0 --- /dev/null +++ b/docs/status.md @@ -0,0 +1,57 @@ +# Status Log + +## 2026-03-30 — Sprint 6: LoRa transport & integration demo + +### Completed +- Added `transport_lora.rs`: `LoRaConfig`, Semtech-style airtime estimate, `DutyCycleTracker` (rolling 1 h window, `eu868_one_percent()`), `LoRaMockMedium` + `LoRaTransport` implementing `MeshTransport` (`lora` name for `TransportManager`), LR framing with automatic fragmentation/reassembly, tests (mock roundtrip, fragmentation, duty accounting, `split_for_mtu`). +- Example `mesh_lora_relay_demo`: A (LoRa mock) → B (relay) → C (TCP) and reply path; `scripts/mesh-demo.sh` runs it. +- Wired `pub mod transport_lora` in `lib.rs`. +- Adjusted `cbor_smaller_than_json` to assert CBOR is materially smaller than JSON (fixed overhead dominates; a strict half-JSON threshold failed on current envelope sizes). + +### What's next +- Optional: UART-backed `LoRaTransport` behind a feature flag (modem-specific framing). +- Hardware runbook: replace mock medium with RNode / SX1262 serial when available. + +## 2026-03-30 — Sprint 3: Announce & Discovery Protocol + +### Completed +- Created `MeshAnnounce` struct with Ed25519 signed announcements, CBOR wire format, hop forwarding +- Created `compute_address()` — SHA-256 truncation of identity key to 16-byte mesh address +- Created `RoutingTable` with `RoutingEntry` — keyed by 16-byte address, supports lookup by address or full key, TTL-based expiry, sequence-based stale rejection +- Created `AnnounceDedup` for loop prevention (address+sequence deduplication) +- Created `AnnounceConfig` with sensible defaults (10min interval, 30min max age, 8 max hops) +- Created `create_announce()` and `process_received_announce()` — complete announce processing pipeline (verify, expiry check, dedup, routing update, propagation decision) +- Capability flags: CAP_RELAY, CAP_STORE, CAP_GATEWAY, CAP_CONSTRAINED +- Tests: 17 tests across 3 modules covering signature verification, tampering, forwarding, expiry, dedup, routing updates, stale rejection, CBOR roundtrip, address determinism +- Updated lib.rs with `announce`, `announce_protocol`, `routing_table` modules + +### What's Next +- S4: Multi-Hop Routing +- Integrate announce protocol with TransportManager for actual broadcast/receive loops +- Add tokio async announce loop (periodic re-announce, GC timer) + +### Notes +- Signature excludes `hop_count` (same design as MeshEnvelope) so forwarding doesn't break verification +- Protocol engine uses free functions rather than a stateful struct — simpler, more testable +- Cannot run `cargo test` in this environment (no C toolchain / linker available) + +## 2026-03-30 — Sprint 2: Transport Abstraction Layer + +### Completed +- Created `MeshTransport` trait with `send`, `recv`, `discover`, `close` methods +- Created `TransportAddr` enum for transport-agnostic addressing (Iroh, Socket, LoRa, Serial, Raw) +- Created `TransportInfo` struct for transport capability metadata +- Implemented `IrohTransport` wrapping iroh `Endpoint` with same length-prefixed framing as `P2pNode` +- Implemented `TcpTransport` using tokio `TcpListener`/`TcpStream` with length-prefixed framing +- Implemented `TransportManager` for multi-transport routing based on address type +- Added `async-trait` dependency, enabled tokio `net` + `io-util` features +- Tests: TransportAddr Display formatting, TCP roundtrip, TransportManager routing, error cases + +### What's Next +- S3: Announce & Discovery Protocol +- Future: integrate transport layer into `HybridRouter` / replace direct iroh usage + +### Notes +- New transport layer sits alongside existing `P2pNode` — no breaking changes +- `IrohTransport` uses separate ALPN (`quicprochat/mesh/1`) to avoid conflicts with `P2pNode` +- Cannot run `cargo test`/`cargo clippy` in this environment (no Rust toolchain installed) diff --git a/scripts/mesh-demo.sh b/scripts/mesh-demo.sh new file mode 100755 index 0000000..f31083b --- /dev/null +++ b/scripts/mesh-demo.sh @@ -0,0 +1,6 @@ +#!/usr/bin/env bash +# Run the simulated LoRa + TCP relay integration example (no hardware). +set -euo pipefail +ROOT="$(cd "$(dirname "$0")/.." && pwd)" +cd "$ROOT" +exec cargo run -p quicprochat-p2p --example mesh_lora_relay_demo