diff --git a/crates/quicprochat-p2p/src/fapp.rs b/crates/quicprochat-p2p/src/fapp.rs index cdfd2bb..96c9fc4 100644 --- a/crates/quicprochat-p2p/src/fapp.rs +++ b/crates/quicprochat-p2p/src/fapp.rs @@ -426,14 +426,14 @@ impl FappStore { } } - // Capacity check per therapist. - let queue = self - .announces - .entry(announce.therapist_address) - .or_default(); - if queue.len() >= MAX_PER_THERAPIST { - // Evict oldest. - queue.remove(0); + let addr = announce.therapist_address; + + // Capacity check per therapist (evict oldest if needed). + { + let queue = self.announces.entry(addr).or_default(); + if queue.len() >= MAX_PER_THERAPIST { + queue.remove(0); + } } // Total capacity check. @@ -452,10 +452,9 @@ impl FappStore { } // Update latest sequence. - self.latest_sequence - .insert(announce.therapist_address, announce.sequence); + self.latest_sequence.insert(addr, announce.sequence); - queue.push(announce); + self.announces.entry(addr).or_default().push(announce); true } diff --git a/crates/quicprochat-p2p/src/fapp_router.rs b/crates/quicprochat-p2p/src/fapp_router.rs new file mode 100644 index 0000000..ff36a34 --- /dev/null +++ b/crates/quicprochat-p2p/src/fapp_router.rs @@ -0,0 +1,335 @@ +//! FAPP routing: decode wire frames, integrate with [`RoutingTable`](crate::routing_table::RoutingTable) +//! and [`TransportManager`](crate::transport_manager::TransportManager). +//! +//! [`FappRouter::broadcast_announce`](FappRouter::broadcast_announce) and +//! [`FappRouter::send_query`](FappRouter::send_query) enqueue outbound frames; call +//! [`FappRouter::drain_pending_sends`](FappRouter::drain_pending_sends) and pass each +//! payload to [`TransportManager::send`](crate::transport_manager::TransportManager::send) +//! from an async context. + +use std::collections::HashSet; +use std::sync::{Arc, Mutex, RwLock}; + +use anyhow::{bail, Result}; + +use crate::fapp::{ + FappStore, SlotAnnounce, SlotQuery, SlotResponse, CAP_FAPP_PATIENT, CAP_FAPP_RELAY, + CAP_FAPP_THERAPIST, +}; +use crate::routing_table::RoutingTable; +use crate::transport::TransportAddr; +use crate::transport_manager::TransportManager; + +// --------------------------------------------------------------------------- +// Wire message tags (CBOR body follows the tag byte) +// --------------------------------------------------------------------------- + +/// [`SlotAnnounce`] frame. +pub const FAPP_WIRE_ANNOUNCE: u8 = 0x01; +/// [`SlotQuery`] frame. +pub const FAPP_WIRE_QUERY: u8 = 0x02; +/// [`SlotResponse`] frame. +pub const FAPP_WIRE_RESPONSE: u8 = 0x03; +/// [`SlotReserve`](crate::fapp::SlotReserve) frame (handled later). +pub const FAPP_WIRE_RESERVE: u8 = 0x04; +/// [`SlotConfirm`](crate::fapp::SlotConfirm) frame (handled later). +pub const FAPP_WIRE_CONFIRM: u8 = 0x05; + +// --------------------------------------------------------------------------- +// FappAction — what to do after handling an incoming FAPP frame +// --------------------------------------------------------------------------- + +/// Result of processing an incoming FAPP payload (mirrors [`IncomingAction`](crate::mesh_router::IncomingAction) style). +#[derive(Debug)] +pub enum FappAction { + /// No application-visible effect. + Ignore, + /// Invalid frame, unknown tag, or rejected message. + Dropped(String), + /// Flood this wire payload to each listed next hop. + Forward { + wire: Vec, + next_hops: Vec, + }, + /// Relay answered from [`FappStore`] (matches may be empty). + QueryResponse(SlotResponse), +} + +// --------------------------------------------------------------------------- +// Wire helpers +// --------------------------------------------------------------------------- + +fn encode_tagged(tag: u8, cbor_body: &[u8]) -> Vec { + let mut out = Vec::with_capacity(1 + cbor_body.len()); + out.push(tag); + out.extend_from_slice(cbor_body); + out +} + +fn slot_query_to_wire(query: &SlotQuery) -> Vec { + let mut buf = Vec::new(); + ciborium::into_writer(query, &mut buf).expect("SlotQuery CBOR"); + buf +} + +fn slot_query_from_wire(bytes: &[u8]) -> Result { + let q: SlotQuery = ciborium::from_reader(bytes)?; + Ok(q) +} + +/// Unique next-hop addresses from the routing table (flood fan-out). +fn flood_targets(table: &RoutingTable) -> Vec { + let mut seen = HashSet::new(); + let mut out = Vec::new(); + for e in table.entries() { + if seen.insert(e.next_hop_addr.clone()) { + out.push(e.next_hop_addr.clone()); + } + } + out +} + +fn enqueue_flood( + pending: &Mutex)>>, + wire: Vec, + table: &RoutingTable, +) -> Result<()> { + let hops = flood_targets(table); + if hops.is_empty() { + bail!("no mesh neighbors in routing table for flood"); + } + let mut q = pending + .lock() + .map_err(|e| anyhow::anyhow!("pending_sends lock poisoned: {e}"))?; + for addr in hops { + q.push((addr, wire.clone())); + } + Ok(()) +} + +// --------------------------------------------------------------------------- +// FappRouter +// --------------------------------------------------------------------------- + +/// FAPP message router integrated with the mesh [`RoutingTable`] and transports. +pub struct FappRouter { + /// Local announcement cache and query index (relay nodes). + store: Mutex, + /// Shared with [`MeshRouter`](crate::mesh_router::MeshRouter). + routes: Arc>, + /// Shared transport manager (same as [`MeshRouter`](crate::mesh_router::MeshRouter); wire-up sends via [`Self::drain_pending_sends`] until sync send exists). + #[allow(dead_code)] + transports: Arc, + /// Bitfield: [`CAP_FAPP_THERAPIST`], [`CAP_FAPP_RELAY`], [`CAP_FAPP_PATIENT`]. + local_capabilities: u16, + /// Frames produced by [`Self::broadcast_announce`] and [`Self::send_query`]. + pending_sends: Mutex)>>, +} + +impl FappRouter { + /// Create a router with the given store, shared routing table, transports, and capability mask. + pub fn new( + store: FappStore, + routes: Arc>, + transports: Arc, + local_capabilities: u16, + ) -> Self { + Self { + store: Mutex::new(store), + routes, + transports, + local_capabilities, + pending_sends: Mutex::new(Vec::new()), + } + } + + /// Decode a tagged FAPP wire frame and apply local policy. + pub fn handle_incoming(&self, bytes: &[u8]) -> FappAction { + if bytes.is_empty() { + return FappAction::Dropped("empty FAPP frame".into()); + } + let tag = bytes[0]; + let body = &bytes[1..]; + match tag { + FAPP_WIRE_ANNOUNCE => match SlotAnnounce::from_wire(body) { + Ok(a) => self.process_slot_announce(a), + Err(e) => FappAction::Dropped(format!("announce CBOR: {e}")), + }, + FAPP_WIRE_QUERY => match slot_query_from_wire(body) { + Ok(q) => self.process_slot_query(q), + Err(e) => FappAction::Dropped(format!("query CBOR: {e}")), + }, + FAPP_WIRE_RESPONSE | FAPP_WIRE_RESERVE | FAPP_WIRE_CONFIRM => { + FappAction::Dropped(format!("unhandled FAPP tag 0x{tag:02x}")) + } + _ => FappAction::Dropped(format!("unknown FAPP tag 0x{tag:02x}")), + } + } + + /// Enqueue a signed [`SlotAnnounce`] to all known next hops (therapist publish / relay re-flood). + pub fn broadcast_announce(&self, announce: SlotAnnounce) -> Result<()> { + if self.local_capabilities & CAP_FAPP_THERAPIST == 0 { + bail!("missing CAP_FAPP_THERAPIST"); + } + let wire = encode_tagged(FAPP_WIRE_ANNOUNCE, &announce.to_wire()); + let table = self + .routes + .read() + .map_err(|e| anyhow::anyhow!("routing table lock poisoned: {e}"))?; + enqueue_flood(&self.pending_sends, wire, &table) + } + + /// Enqueue an anonymous [`SlotQuery`] flood (patient discovery). + pub fn send_query(&self, query: SlotQuery) -> Result<()> { + if self.local_capabilities & CAP_FAPP_PATIENT == 0 { + bail!("missing CAP_FAPP_PATIENT"); + } + let body = slot_query_to_wire(&query); + let wire = encode_tagged(FAPP_WIRE_QUERY, &body); + let table = self + .routes + .read() + .map_err(|e| anyhow::anyhow!("routing table lock poisoned: {e}"))?; + enqueue_flood(&self.pending_sends, wire, &table) + } + + /// Apply relay / propagation rules to a decoded [`SlotAnnounce`]. + pub fn process_slot_announce(&self, announce: SlotAnnounce) -> FappAction { + if !announce.can_propagate() { + return FappAction::Dropped("announce expired or max hops".into()); + } + + let has_relay = self.local_capabilities & CAP_FAPP_RELAY != 0; + if !has_relay { + return FappAction::Ignore; + } + + let mut store = match self.store.lock() { + Ok(g) => g, + Err(e) => return FappAction::Dropped(format!("fapp store lock poisoned: {e}")), + }; + + if store.seen(&announce.id) { + return FappAction::Ignore; + } + + let stored = store.store(announce.clone()); + if !stored { + return FappAction::Ignore; + } + + let forwarded = announce.forwarded(); + if !forwarded.can_propagate() { + return FappAction::Ignore; + } + + let wire = encode_tagged(FAPP_WIRE_ANNOUNCE, &forwarded.to_wire()); + let next_hops = { + let table = match self.routes.read() { + Ok(t) => t, + Err(e) => { + return FappAction::Dropped(format!("routing table lock poisoned: {e}")); + } + }; + flood_targets(&table) + }; + + if next_hops.is_empty() { + return FappAction::Ignore; + } + + FappAction::Forward { + wire, + next_hops, + } + } + + /// Answer from cache and/or ignore (query flooding is a separate [`Self::send_query`] path). + pub fn process_slot_query(&self, query: SlotQuery) -> FappAction { + if self.local_capabilities & CAP_FAPP_RELAY == 0 { + return FappAction::Ignore; + } + + let store = match self.store.lock() { + Ok(g) => g, + Err(e) => return FappAction::Dropped(format!("fapp store lock poisoned: {e}")), + }; + + let response = store.query(&query); + FappAction::QueryResponse(response) + } + + /// Take queued outbound frames (typically sent with `TransportManager::send` in async code). + pub fn drain_pending_sends(&self) -> Result)>> { + let mut q = self + .pending_sends + .lock() + .map_err(|e| anyhow::anyhow!("pending_sends lock poisoned: {e}"))?; + let out = std::mem::take(&mut *q); + Ok(out) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::time::Duration; + + use crate::fapp::{Fachrichtung, Kostentraeger, Modalitaet, SlotType, TimeSlot}; + use crate::identity::MeshIdentity; + + #[test] + fn handle_incoming_unknown_tag_dropped() { + let routes = Arc::new(RwLock::new(RoutingTable::new(Duration::from_secs(300)))); + let transports = Arc::new(TransportManager::new()); + let r = FappRouter::new(FappStore::new(), routes, transports, CAP_FAPP_RELAY); + + match r.handle_incoming(&[0xFF]) { + FappAction::Dropped(msg) => assert!(msg.contains("unknown")), + other => panic!("expected Dropped, got {other:?}"), + } + } + + #[test] + fn process_slot_query_requires_relay_cap() { + let routes = Arc::new(RwLock::new(RoutingTable::new(Duration::from_secs(300)))); + let transports = Arc::new(TransportManager::new()); + let r = FappRouter::new(FappStore::new(), routes, transports, 0); + + let q = SlotQuery { + query_id: [1u8; 16], + fachrichtung: None, + modalitaet: None, + kostentraeger: None, + plz_prefix: None, + earliest: None, + latest: None, + slot_type: None, + max_results: 5, + }; + assert!(matches!(r.process_slot_query(q), FappAction::Ignore)); + } + + #[test] + fn broadcast_announce_requires_therapist_cap() { + let routes = Arc::new(RwLock::new(RoutingTable::new(Duration::from_secs(300)))); + let transports = Arc::new(TransportManager::new()); + let r = FappRouter::new(FappStore::new(), routes, transports, CAP_FAPP_RELAY); + let id = MeshIdentity::generate(); + let a = SlotAnnounce::new( + &id, + vec![Fachrichtung::Verhaltenstherapie], + vec![Modalitaet::Praxis], + vec![Kostentraeger::GKV], + "80331".into(), + vec![TimeSlot { + start_unix: 1, + duration_minutes: 50, + slot_type: SlotType::Therapie, + }], + [0xAA; 32], + 1, + ); + assert!(r.broadcast_announce(a).is_err()); + } +} diff --git a/crates/quicprochat-p2p/src/lib.rs b/crates/quicprochat-p2p/src/lib.rs index 4adf93c..3354a91 100644 --- a/crates/quicprochat-p2p/src/lib.rs +++ b/crates/quicprochat-p2p/src/lib.rs @@ -16,6 +16,7 @@ pub mod address; pub mod announce; pub mod announce_protocol; pub mod fapp; +pub mod fapp_router; pub mod broadcast; pub mod envelope; pub mod envelope_v2;