//! P2P transport layer for quicprochat using iroh. //! //! Provides direct peer-to-peer QUIC connections with NAT traversal via iroh //! relay servers. When both peers are online, messages bypass the central //! server entirely. //! //! # Architecture //! //! ```text //! Client A ── iroh direct (QUIC) ── Client B (preferred: low latency) //! │ │ //! └── QUIC/TLS ── Server ── QUIC/TLS ┘ (fallback: store-and-forward) //! ``` pub mod address; pub mod announce; pub mod announce_protocol; pub mod config; pub mod crypto_negotiation; pub mod error; pub mod fapp; pub mod fapp_router; pub mod broadcast; pub mod envelope; pub mod envelope_v2; pub mod keypackage_cache; pub mod mesh_protocol; pub mod metrics; pub mod mls_lite; pub mod persistence; pub mod rate_limit; pub mod shutdown; pub mod identity; pub mod link; pub mod mesh_node; pub mod mesh_router; pub mod routing; pub mod routing_table; pub mod store; pub mod transport; pub mod transport_iroh; pub mod transport_manager; pub mod transport_tcp; pub mod transport_lora; #[cfg(feature = "traffic-resistance")] pub mod traffic_resistance; use std::sync::{Arc, Mutex}; use iroh::{Endpoint, EndpointAddr, PublicKey, SecretKey}; use crate::broadcast::BroadcastManager; use crate::envelope::MeshEnvelope; use crate::identity::MeshIdentity; use crate::store::MeshStore; /// ALPN protocol identifier for quicprochat P2P messaging. /// Updated from the original project name "quicnprotochat" to "quicprochat" (breaking wire change; /// all peers must be on the same version to connect). const P2P_ALPN: &[u8] = b"quicprochat/p2p/1"; /// A P2P node backed by an iroh endpoint. /// /// Manages direct QUIC connections to peers with automatic NAT traversal. pub struct P2pNode { endpoint: Endpoint, /// Optional self-sovereign mesh identity for store-and-forward messaging. mesh_identity: Option, /// Shared store-and-forward queue. mesh_store: Arc>, /// Broadcast channel manager for pub/sub mesh announcements. broadcast_mgr: Arc>, } /// Received P2P message with sender information. pub struct P2pMessage { pub sender: PublicKey, pub payload: Vec, } impl P2pNode { /// Start a new P2P node. /// /// Generates a fresh identity or reuses a provided secret key. pub async fn start(secret_key: Option) -> anyhow::Result { let mut builder = Endpoint::builder(); if let Some(sk) = secret_key { builder = builder.secret_key(sk); } builder = builder.alpns(vec![P2P_ALPN.to_vec()]); let endpoint = builder.bind().await?; tracing::info!( node_id = %endpoint.id().fmt_short(), "P2P node started" ); Ok(Self { endpoint, mesh_identity: None, mesh_store: Arc::new(Mutex::new(MeshStore::new(0))), broadcast_mgr: Arc::new(Mutex::new(BroadcastManager::new())), }) } /// Start a new P2P node with a mesh identity and store-and-forward enabled. pub async fn start_with_mesh( secret_key: Option, mesh_identity: MeshIdentity, max_stored: usize, ) -> anyhow::Result { let mut node = Self::start(secret_key).await?; node.mesh_identity = Some(mesh_identity); node.mesh_store = Arc::new(Mutex::new(MeshStore::new(max_stored))); Ok(node) } /// This node's public key (used as node ID for peer discovery). pub fn node_id(&self) -> PublicKey { self.endpoint.id() } /// This node's secret key (for persistence across restarts). pub fn secret_key(&self) -> SecretKey { self.endpoint.secret_key().clone() } /// Get the node's network address information for publishing to discovery. pub fn endpoint_addr(&self) -> EndpointAddr { self.endpoint.addr() } /// Return a reference to the mesh identity, if set. pub fn mesh_identity(&self) -> Option<&MeshIdentity> { self.mesh_identity.as_ref() } /// Return a clone of the shared mesh store handle. pub fn mesh_store(&self) -> Arc> { Arc::clone(&self.mesh_store) } /// Send a payload directly to a peer via P2P QUIC. pub async fn send(&self, peer: impl Into, payload: &[u8]) -> anyhow::Result<()> { let peer = peer.into(); let conn = self.endpoint.connect(peer, P2P_ALPN).await?; let mut send = conn.open_uni().await.map_err(|e| anyhow::anyhow!("{e}"))?; // Simple framing: 4-byte length prefix + payload. let len = (payload.len() as u32).to_be_bytes(); send.write_all(&len) .await .map_err(|e| anyhow::anyhow!("{e}"))?; send.write_all(payload) .await .map_err(|e| anyhow::anyhow!("{e}"))?; send.finish().map_err(|e| anyhow::anyhow!("{e}"))?; // Wait until the peer has consumed the stream before dropping. send.stopped().await.map_err(|e| anyhow::anyhow!("{e}"))?; tracing::debug!( peer = %conn.remote_id().fmt_short(), bytes = payload.len(), "P2P message sent" ); Ok(()) } /// Accept a single incoming P2P message. /// /// Blocks until a peer connects and sends data. pub async fn recv(&self) -> anyhow::Result { let incoming = self .endpoint .accept() .await .ok_or_else(|| anyhow::anyhow!("no more incoming connections"))?; let conn = incoming.await.map_err(|e| anyhow::anyhow!("{e}"))?; let sender = conn.remote_id(); let mut recv = conn .accept_uni() .await .map_err(|e| anyhow::anyhow!("{e}"))?; // Read length-prefixed payload. let mut len_buf = [0u8; 4]; recv.read_exact(&mut len_buf) .await .map_err(|e| anyhow::anyhow!("{e}"))?; let len = u32::from_be_bytes(len_buf) as usize; if len > 5 * 1024 * 1024 { anyhow::bail!("P2P payload too large: {len} bytes"); } let mut payload = vec![0u8; len]; recv.read_exact(&mut payload) .await .map_err(|e| anyhow::anyhow!("{e}"))?; tracing::debug!( peer = %sender.fmt_short(), bytes = len, "P2P message received" ); Ok(P2pMessage { sender, payload }) } /// Create a [`MeshEnvelope`] and send it to a peer, or store it for later forwarding. /// /// If `peer_addr` is `Some`, the envelope is sent immediately via P2P. /// Otherwise it is queued in the mesh store for future forwarding. pub async fn send_mesh( &self, peer_addr: Option>, recipient_key: &[u8], payload: Vec, ttl_secs: u32, ) -> anyhow::Result<()> { let identity = self .mesh_identity .as_ref() .ok_or_else(|| anyhow::anyhow!("mesh identity not configured"))?; let envelope = MeshEnvelope::new(identity, recipient_key, payload, ttl_secs, 0); let bytes = envelope.to_wire(); if let Some(addr) = peer_addr { self.send(addr, &bytes).await?; tracing::debug!("mesh envelope sent directly"); } else { let mut store = self .mesh_store .lock() .map_err(|e| anyhow::anyhow!("mesh store lock poisoned: {e}"))?; if !store.store(envelope) { anyhow::bail!("mesh store rejected envelope (duplicate or at capacity)"); } tracing::debug!("mesh envelope queued for forwarding"); } Ok(()) } /// Fetch all stored mesh envelopes addressed to this node's identity. pub fn receive_mesh(&self) -> anyhow::Result> { let identity = self .mesh_identity .as_ref() .ok_or_else(|| anyhow::anyhow!("mesh identity not configured"))?; let pk = identity.public_key(); let mut store = self .mesh_store .lock() .map_err(|e| anyhow::anyhow!("mesh store lock poisoned: {e}"))?; Ok(store.fetch(&pk)) } /// Forward stored envelopes to a connected peer. /// /// Sends all forwardable envelopes that match `recipient_key` to `peer_addr`. pub async fn forward_stored( &self, peer_addr: impl Into + Clone, recipient_key: &[u8], ) -> anyhow::Result { let envelopes = { let mut store = self .mesh_store .lock() .map_err(|e| anyhow::anyhow!("mesh store lock poisoned: {e}"))?; store.fetch(recipient_key) }; let mut forwarded = 0; for env in envelopes { if env.can_forward() { let fwd = env.forwarded(); let bytes = fwd.to_wire(); self.send(peer_addr.clone(), &bytes).await?; forwarded += 1; } } if forwarded > 0 { tracing::debug!(count = forwarded, "forwarded stored mesh envelopes"); } Ok(forwarded) } /// Return a clone of the shared broadcast manager handle. pub fn broadcast_mgr(&self) -> Arc> { Arc::clone(&self.broadcast_mgr) } /// Subscribe to a broadcast channel with a pre-shared key. pub fn subscribe(&self, topic: &str, key: [u8; 32]) -> anyhow::Result<()> { let mut mgr = self .broadcast_mgr .lock() .map_err(|e| anyhow::anyhow!("broadcast manager lock poisoned: {e}"))?; mgr.subscribe(topic, key); Ok(()) } /// Create a new broadcast channel with a random key. Returns the key for sharing. pub fn create_broadcast(&self, topic: &str) -> anyhow::Result<[u8; 32]> { let mut mgr = self .broadcast_mgr .lock() .map_err(|e| anyhow::anyhow!("broadcast manager lock poisoned: {e}"))?; let ch = mgr.create_channel(topic); Ok(*ch.key()) } /// Encrypt a payload on a broadcast topic and flood it to all connected peers /// as a MeshEnvelope with an empty recipient key (broadcast). pub async fn broadcast( &self, topic: &str, payload: &[u8], ) -> anyhow::Result<()> { let identity = self .mesh_identity .as_ref() .ok_or_else(|| anyhow::anyhow!("mesh identity not configured"))?; let encrypted = { let mgr = self .broadcast_mgr .lock() .map_err(|e| anyhow::anyhow!("broadcast manager lock poisoned: {e}"))?; mgr.encrypt(topic, payload) .ok_or_else(|| anyhow::anyhow!("not subscribed to topic: {topic}"))?? }; // Create a broadcast envelope (empty recipient_key signals broadcast). let envelope = MeshEnvelope::new(identity, &[], encrypted, 300, 0); let bytes = envelope.to_wire(); // Store in the mesh store for flood-forwarding. let mut store = self .mesh_store .lock() .map_err(|e| anyhow::anyhow!("mesh store lock poisoned: {e}"))?; if !store.store(envelope) { tracing::debug!("broadcast envelope dedup or at capacity, skipping store"); } drop(store); tracing::debug!(topic = topic, bytes = bytes.len(), "broadcast envelope queued"); Ok(()) } /// List all subscribed broadcast topics. pub fn topics(&self) -> anyhow::Result> { let mgr = self .broadcast_mgr .lock() .map_err(|e| anyhow::anyhow!("broadcast manager lock poisoned: {e}"))?; Ok(mgr.topics()) } /// Gracefully shut down the P2P node. pub async fn close(self) { self.endpoint.close().await; } } #[cfg(test)] mod tests { use super::*; use iroh::RelayMode; /// Create a local-only P2P node with relays disabled (for testing). async fn local_node() -> P2pNode { let endpoint = Endpoint::builder() .alpns(vec![P2P_ALPN.to_vec()]) .relay_mode(RelayMode::Disabled) .bind() .await .expect("bind local endpoint"); P2pNode { endpoint, mesh_identity: None, mesh_store: Arc::new(Mutex::new(MeshStore::new(0))), broadcast_mgr: Arc::new(Mutex::new(BroadcastManager::new())), } } #[tokio::test] async fn p2p_round_trip() { let sender = local_node().await; let receiver = local_node().await; let receiver_addr = receiver.endpoint_addr(); let sender_id = sender.node_id(); let payload = b"hello via P2P"; let recv_handle = tokio::spawn(async move { let msg = receiver.recv().await.expect("receive message"); assert_eq!(msg.payload, payload.to_vec()); assert_eq!(msg.sender, sender_id); }); tokio::time::sleep(std::time::Duration::from_millis(200)).await; sender.send(receiver_addr, payload).await.expect("send message"); recv_handle.await.expect("recv task"); tokio::time::sleep(std::time::Duration::from_millis(100)).await; sender.close().await; } #[tokio::test] async fn mesh_store_and_receive() { let id = MeshIdentity::generate(); let pk = id.public_key(); let node = P2pNode::start_with_mesh(None, id, 100) .await .expect("start mesh node"); // Queue a message for ourselves via the store. { let sender_id = MeshIdentity::generate(); let env = MeshEnvelope::new(&sender_id, &pk, b"stored msg".to_vec(), 3600, 5); let mut store = node.mesh_store.lock().expect("lock"); assert!(store.store(env)); } let msgs = node.receive_mesh().expect("receive_mesh"); assert_eq!(msgs.len(), 1); assert_eq!(msgs[0].payload, b"stored msg"); node.close().await; } }