//! FAPP routing: decode wire frames, integrate with [`RoutingTable`](crate::routing_table::RoutingTable) //! and [`TransportManager`](crate::transport_manager::TransportManager). //! //! [`FappRouter::broadcast_announce`](FappRouter::broadcast_announce) and //! [`FappRouter::send_query`](FappRouter::send_query) enqueue outbound frames; call //! [`FappRouter::drain_pending_sends`](FappRouter::drain_pending_sends) and pass each //! payload to [`TransportManager::send`](crate::transport_manager::TransportManager::send) //! from an async context. use std::collections::HashSet; use std::sync::{Arc, Mutex, RwLock}; use anyhow::{bail, Result}; use crate::fapp::{ FappStore, SlotAnnounce, SlotConfirm, SlotQuery, SlotReserve, SlotResponse, CAP_FAPP_PATIENT, CAP_FAPP_RELAY, CAP_FAPP_THERAPIST, }; use crate::routing_table::RoutingTable; use crate::transport::TransportAddr; use crate::transport_manager::TransportManager; // --------------------------------------------------------------------------- // Wire message tags (CBOR body follows the tag byte) // --------------------------------------------------------------------------- /// [`SlotAnnounce`] frame. pub const FAPP_WIRE_ANNOUNCE: u8 = 0x01; /// [`SlotQuery`] frame. pub const FAPP_WIRE_QUERY: u8 = 0x02; /// [`SlotResponse`] frame. pub const FAPP_WIRE_RESPONSE: u8 = 0x03; /// [`SlotReserve`](crate::fapp::SlotReserve) frame (handled later). pub const FAPP_WIRE_RESERVE: u8 = 0x04; /// [`SlotConfirm`](crate::fapp::SlotConfirm) frame (handled later). pub const FAPP_WIRE_CONFIRM: u8 = 0x05; // --------------------------------------------------------------------------- // FappAction — what to do after handling an incoming FAPP frame // --------------------------------------------------------------------------- /// Result of processing an incoming FAPP payload (mirrors [`IncomingAction`](crate::mesh_router::IncomingAction) style). #[derive(Debug)] pub enum FappAction { /// No application-visible effect. Ignore, /// Invalid frame, unknown tag, or rejected message. Dropped(String), /// Flood this wire payload to each listed next hop. Forward { wire: Vec, next_hops: Vec, }, /// Relay answered from [`FappStore`] (matches may be empty). QueryResponse(SlotResponse), /// A SlotReserve was received and should be delivered to the therapist. /// Contains the therapist address (to route) and the wire-format reserve. DeliverReserve { therapist_address: [u8; 16], reserve: SlotReserve, }, /// A SlotConfirm was received and should be delivered to the patient. /// Contains the patient ephemeral key (for routing/lookup) and the confirm. DeliverConfirm { patient_ephemeral_key: [u8; 32], confirm: SlotConfirm, }, } // --------------------------------------------------------------------------- // Wire helpers // --------------------------------------------------------------------------- fn encode_tagged(tag: u8, cbor_body: &[u8]) -> Vec { let mut out = Vec::with_capacity(1 + cbor_body.len()); out.push(tag); out.extend_from_slice(cbor_body); out } fn slot_query_to_wire(query: &SlotQuery) -> Vec { let mut buf = Vec::new(); ciborium::into_writer(query, &mut buf).expect("SlotQuery CBOR"); buf } fn slot_query_from_wire(bytes: &[u8]) -> Result { let q: SlotQuery = ciborium::from_reader(bytes)?; Ok(q) } fn slot_reserve_from_wire(bytes: &[u8]) -> Result { let r: SlotReserve = ciborium::from_reader(bytes)?; Ok(r) } fn slot_confirm_from_wire(bytes: &[u8]) -> Result { let c: SlotConfirm = ciborium::from_reader(bytes)?; Ok(c) } fn slot_response_to_wire(response: &SlotResponse) -> Vec { let mut buf = Vec::new(); ciborium::into_writer(response, &mut buf).expect("SlotResponse CBOR"); buf } fn slot_response_from_wire(bytes: &[u8]) -> Result { let r: SlotResponse = ciborium::from_reader(bytes)?; Ok(r) } /// Unique next-hop addresses from the routing table (flood fan-out). fn flood_targets(table: &RoutingTable) -> Vec { let mut seen = HashSet::new(); let mut out = Vec::new(); for e in table.entries() { if seen.insert(e.next_hop_addr.clone()) { out.push(e.next_hop_addr.clone()); } } out } fn enqueue_flood( pending: &Mutex)>>, wire: Vec, table: &RoutingTable, ) -> Result<()> { let hops = flood_targets(table); if hops.is_empty() { bail!("no mesh neighbors in routing table for flood"); } let mut q = pending .lock() .map_err(|e| anyhow::anyhow!("pending_sends lock poisoned: {e}"))?; for addr in hops { q.push((addr, wire.clone())); } Ok(()) } // --------------------------------------------------------------------------- // FappRouter // --------------------------------------------------------------------------- /// FAPP message router integrated with the mesh [`RoutingTable`] and transports. pub struct FappRouter { /// Local announcement cache and query index (relay nodes). store: Mutex, /// Shared with [`MeshRouter`](crate::mesh_router::MeshRouter). routes: Arc>, /// Shared transport manager (same as [`MeshRouter`](crate::mesh_router::MeshRouter); wire-up sends via [`Self::drain_pending_sends`] until sync send exists). #[allow(dead_code)] transports: Arc, /// Bitfield: [`CAP_FAPP_THERAPIST`], [`CAP_FAPP_RELAY`], [`CAP_FAPP_PATIENT`]. local_capabilities: u16, /// Frames produced by [`Self::broadcast_announce`] and [`Self::send_query`]. pending_sends: Mutex)>>, } impl FappRouter { /// Create a router with the given store, shared routing table, transports, and capability mask. pub fn new( store: FappStore, routes: Arc>, transports: Arc, local_capabilities: u16, ) -> Self { Self { store: Mutex::new(store), routes, transports, local_capabilities, pending_sends: Mutex::new(Vec::new()), } } /// Decode a tagged FAPP wire frame and apply local policy. pub fn handle_incoming(&self, bytes: &[u8]) -> FappAction { if bytes.is_empty() { return FappAction::Dropped("empty FAPP frame".into()); } let tag = bytes[0]; let body = &bytes[1..]; match tag { FAPP_WIRE_ANNOUNCE => match SlotAnnounce::from_wire(body) { Ok(a) => self.process_slot_announce(a), Err(e) => FappAction::Dropped(format!("announce CBOR: {e}")), }, FAPP_WIRE_QUERY => match slot_query_from_wire(body) { Ok(q) => self.process_slot_query(q), Err(e) => FappAction::Dropped(format!("query CBOR: {e}")), }, FAPP_WIRE_RESPONSE => match slot_response_from_wire(body) { Ok(r) => self.process_slot_response(r), Err(e) => FappAction::Dropped(format!("response CBOR: {e}")), }, FAPP_WIRE_RESERVE => match slot_reserve_from_wire(body) { Ok(r) => self.process_slot_reserve(r), Err(e) => FappAction::Dropped(format!("reserve CBOR: {e}")), }, FAPP_WIRE_CONFIRM => match slot_confirm_from_wire(body) { Ok(c) => self.process_slot_confirm(c), Err(e) => FappAction::Dropped(format!("confirm CBOR: {e}")), }, _ => FappAction::Dropped(format!("unknown FAPP tag 0x{tag:02x}")), } } /// Enqueue a signed [`SlotAnnounce`] to all known next hops (therapist publish / relay re-flood). pub fn broadcast_announce(&self, announce: SlotAnnounce) -> Result<()> { if self.local_capabilities & CAP_FAPP_THERAPIST == 0 { bail!("missing CAP_FAPP_THERAPIST"); } let wire = encode_tagged(FAPP_WIRE_ANNOUNCE, &announce.to_wire()); let table = self .routes .read() .map_err(|e| anyhow::anyhow!("routing table lock poisoned: {e}"))?; enqueue_flood(&self.pending_sends, wire, &table) } /// Enqueue an anonymous [`SlotQuery`] flood (patient discovery). pub fn send_query(&self, query: SlotQuery) -> Result<()> { if self.local_capabilities & CAP_FAPP_PATIENT == 0 { bail!("missing CAP_FAPP_PATIENT"); } let body = slot_query_to_wire(&query); let wire = encode_tagged(FAPP_WIRE_QUERY, &body); let table = self .routes .read() .map_err(|e| anyhow::anyhow!("routing table lock poisoned: {e}"))?; enqueue_flood(&self.pending_sends, wire, &table) } /// Apply relay / propagation rules to a decoded [`SlotAnnounce`]. pub fn process_slot_announce(&self, announce: SlotAnnounce) -> FappAction { if !announce.can_propagate() { return FappAction::Dropped("announce expired or max hops".into()); } let has_relay = self.local_capabilities & CAP_FAPP_RELAY != 0; if !has_relay { return FappAction::Ignore; } let mut store = match self.store.lock() { Ok(g) => g, Err(e) => return FappAction::Dropped(format!("fapp store lock poisoned: {e}")), }; if store.seen(&announce.id) { return FappAction::Ignore; } let stored = store.store(announce.clone()); if !stored { return FappAction::Ignore; } let forwarded = announce.forwarded(); if !forwarded.can_propagate() { return FappAction::Ignore; } let wire = encode_tagged(FAPP_WIRE_ANNOUNCE, &forwarded.to_wire()); let next_hops = { let table = match self.routes.read() { Ok(t) => t, Err(e) => { return FappAction::Dropped(format!("routing table lock poisoned: {e}")); } }; flood_targets(&table) }; if next_hops.is_empty() { return FappAction::Ignore; } FappAction::Forward { wire, next_hops, } } /// Answer from cache and/or ignore (query flooding is a separate [`Self::send_query`] path). pub fn process_slot_query(&self, query: SlotQuery) -> FappAction { if self.local_capabilities & CAP_FAPP_RELAY == 0 { return FappAction::Ignore; } let store = match self.store.lock() { Ok(g) => g, Err(e) => return FappAction::Dropped(format!("fapp store lock poisoned: {e}")), }; let response = store.query(&query); FappAction::QueryResponse(response) } /// Process an incoming SlotResponse (patient receives query results). pub fn process_slot_response(&self, response: SlotResponse) -> FappAction { // Responses are delivered to the application layer; patient code handles them. // No relay/forwarding for responses — they're point-to-point. if self.local_capabilities & CAP_FAPP_PATIENT == 0 { return FappAction::Ignore; } // Return as QueryResponse for application handling FappAction::QueryResponse(response) } /// Process an incoming SlotReserve (relay routes to therapist). /// /// Relays look up the therapist address in the routing table and forward. /// Therapists receive the reserve for decryption and handling. pub fn process_slot_reserve(&self, reserve: SlotReserve) -> FappAction { // Look up the therapist address from the original slot announce let store = match self.store.lock() { Ok(g) => g, Err(e) => return FappAction::Dropped(format!("fapp store lock poisoned: {e}")), }; // Find the SlotAnnounce this reserve refers to for announces in store.announces_iter() { for announce in announces { if announce.id == reserve.slot_announce_id { // Found the therapist address return FappAction::DeliverReserve { therapist_address: announce.therapist_address, reserve, }; } } } // SlotAnnounce not in cache; forward to all neighbors (flood) let table = match self.routes.read() { Ok(t) => t, Err(e) => return FappAction::Dropped(format!("routing table lock: {e}")), }; let next_hops = flood_targets(&table); if next_hops.is_empty() { return FappAction::Dropped("no routes for reserve flood".into()); } let wire = encode_tagged(FAPP_WIRE_RESERVE, &reserve.to_wire()); FappAction::Forward { wire, next_hops } } /// Process an incoming SlotConfirm (relay routes to patient). /// /// Confirms are routed based on the patient's ephemeral key. pub fn process_slot_confirm(&self, confirm: SlotConfirm) -> FappAction { // The confirm contains the patient's ephemeral key; the patient // application needs to match this to their pending reservations. FappAction::DeliverConfirm { patient_ephemeral_key: confirm.therapist_ephemeral_key, // Note: this is for routing lookup confirm, } } /// Send a SlotReserve to a specific therapist address. pub fn send_reserve(&self, reserve: SlotReserve, therapist_address: &[u8; 16]) -> Result<()> { if self.local_capabilities & CAP_FAPP_PATIENT == 0 { bail!("missing CAP_FAPP_PATIENT"); } let table = self .routes .read() .map_err(|e| anyhow::anyhow!("routing table lock poisoned: {e}"))?; // Try to find a direct route to the therapist if let Some(entry) = table.lookup(therapist_address) { let wire = encode_tagged(FAPP_WIRE_RESERVE, &reserve.to_wire()); let mut q = self .pending_sends .lock() .map_err(|e| anyhow::anyhow!("pending_sends lock: {e}"))?; q.push((entry.next_hop_addr.clone(), wire)); return Ok(()); } // No direct route; flood to all neighbors let wire = encode_tagged(FAPP_WIRE_RESERVE, &reserve.to_wire()); enqueue_flood(&self.pending_sends, wire, &table) } /// Send a SlotConfirm response (therapist confirms/rejects a reservation). pub fn send_confirm(&self, confirm: SlotConfirm, patient_ephemeral: &[u8; 32]) -> Result<()> { if self.local_capabilities & CAP_FAPP_THERAPIST == 0 { bail!("missing CAP_FAPP_THERAPIST"); } // Confirms are flooded since we don't have routing info for ephemeral keys let wire = encode_tagged(FAPP_WIRE_CONFIRM, &confirm.to_wire()); let table = self .routes .read() .map_err(|e| anyhow::anyhow!("routing table lock poisoned: {e}"))?; enqueue_flood(&self.pending_sends, wire, &table) } /// Send a SlotResponse to a specific address (relay answering a query). pub fn send_response(&self, response: SlotResponse, dest: &TransportAddr) -> Result<()> { let wire = encode_tagged(FAPP_WIRE_RESPONSE, &slot_response_to_wire(&response)); let mut q = self .pending_sends .lock() .map_err(|e| anyhow::anyhow!("pending_sends lock: {e}"))?; q.push((dest.clone(), wire)); Ok(()) } /// Take queued outbound frames (typically sent with `TransportManager::send` in async code). pub fn drain_pending_sends(&self) -> Result)>> { let mut q = self .pending_sends .lock() .map_err(|e| anyhow::anyhow!("pending_sends lock poisoned: {e}"))?; let out = std::mem::take(&mut *q); Ok(out) } } #[cfg(test)] mod tests { use super::*; use std::time::Duration; use crate::fapp::{Fachrichtung, Kostentraeger, Modalitaet, SlotType, TimeSlot}; use crate::identity::MeshIdentity; #[test] fn handle_incoming_unknown_tag_dropped() { let routes = Arc::new(RwLock::new(RoutingTable::new(Duration::from_secs(300)))); let transports = Arc::new(TransportManager::new()); let r = FappRouter::new(FappStore::new(), routes, transports, CAP_FAPP_RELAY); match r.handle_incoming(&[0xFF]) { FappAction::Dropped(msg) => assert!(msg.contains("unknown")), other => panic!("expected Dropped, got {other:?}"), } } #[test] fn process_slot_query_requires_relay_cap() { let routes = Arc::new(RwLock::new(RoutingTable::new(Duration::from_secs(300)))); let transports = Arc::new(TransportManager::new()); let r = FappRouter::new(FappStore::new(), routes, transports, 0); let q = SlotQuery { query_id: [1u8; 16], fachrichtung: None, modalitaet: None, kostentraeger: None, plz_prefix: None, earliest: None, latest: None, slot_type: None, max_results: 5, }; assert!(matches!(r.process_slot_query(q), FappAction::Ignore)); } #[test] fn send_reserve_requires_patient_cap() { let routes = Arc::new(RwLock::new(RoutingTable::new(Duration::from_secs(300)))); let transports = Arc::new(TransportManager::new()); let r = FappRouter::new(FappStore::new(), routes, transports, CAP_FAPP_THERAPIST); let reserve = SlotReserve { slot_announce_id: [0xAA; 16], slot_index: 0, patient_ephemeral_key: [0xBB; 32], encrypted_contact: vec![1, 2, 3], }; assert!(r.send_reserve(reserve, &[0xCC; 16]).is_err()); } #[test] fn send_confirm_requires_therapist_cap() { let routes = Arc::new(RwLock::new(RoutingTable::new(Duration::from_secs(300)))); let transports = Arc::new(TransportManager::new()); let r = FappRouter::new(FappStore::new(), routes, transports, CAP_FAPP_PATIENT); let confirm = SlotConfirm { slot_announce_id: [0xAA; 16], slot_index: 0, confirmed: true, encrypted_details: vec![1, 2, 3], therapist_ephemeral_key: [0xDD; 32], }; assert!(r.send_confirm(confirm, &[0xEE; 32]).is_err()); } #[test] fn process_reserve_returns_deliver() { let routes = Arc::new(RwLock::new(RoutingTable::new(Duration::from_secs(300)))); let transports = Arc::new(TransportManager::new()); // Create a store with a known announce let id = MeshIdentity::generate(); let mut store = FappStore::new(); let announce = SlotAnnounce::new( &id, vec![Fachrichtung::Verhaltenstherapie], vec![Modalitaet::Praxis], vec![Kostentraeger::GKV], "80331".into(), vec![TimeSlot { start_unix: 99999999, duration_minutes: 50, slot_type: SlotType::Therapie, }], [0xAA; 32], 1, ); let announce_id = announce.id; let therapist_addr = announce.therapist_address; store.register_therapist_key(therapist_addr, id.public_key()); store.store(announce); let r = FappRouter::new(store, routes, transports, CAP_FAPP_RELAY); let reserve = SlotReserve { slot_announce_id: announce_id, slot_index: 0, patient_ephemeral_key: [0xBB; 32], encrypted_contact: vec![1, 2, 3], }; match r.process_slot_reserve(reserve) { FappAction::DeliverReserve { therapist_address, .. } => { assert_eq!(therapist_address, therapist_addr); } other => panic!("expected DeliverReserve, got {other:?}"), } } #[test] fn process_confirm_returns_deliver() { let routes = Arc::new(RwLock::new(RoutingTable::new(Duration::from_secs(300)))); let transports = Arc::new(TransportManager::new()); let r = FappRouter::new(FappStore::new(), routes, transports, CAP_FAPP_PATIENT); let confirm = SlotConfirm { slot_announce_id: [0xAA; 16], slot_index: 0, confirmed: true, encrypted_details: vec![1, 2, 3], therapist_ephemeral_key: [0xDD; 32], }; match r.process_slot_confirm(confirm.clone()) { FappAction::DeliverConfirm { patient_ephemeral_key, confirm: c } => { assert_eq!(patient_ephemeral_key, [0xDD; 32]); assert!(c.confirmed); } other => panic!("expected DeliverConfirm, got {other:?}"), } } #[test] fn broadcast_announce_requires_therapist_cap() { let routes = Arc::new(RwLock::new(RoutingTable::new(Duration::from_secs(300)))); let transports = Arc::new(TransportManager::new()); let r = FappRouter::new(FappStore::new(), routes, transports, CAP_FAPP_RELAY); let id = MeshIdentity::generate(); let a = SlotAnnounce::new( &id, vec![Fachrichtung::Verhaltenstherapie], vec![Modalitaet::Praxis], vec![Kostentraeger::GKV], "80331".into(), vec![TimeSlot { start_unix: 1, duration_minutes: 50, slot_type: SlotType::Therapie, }], [0xAA; 32], 1, ); assert!(r.broadcast_announce(a).is_err()); } }