feat(fapp): add FappRouter for mesh integration

New fapp_router.rs module:
- FappAction enum (Ignore, Dropped, Forward, QueryResponse)
- Wire format: 1-byte tag (0x01-0x05) + CBOR body
- FappRouter with shared RoutingTable and TransportManager
- handle_incoming() decodes and dispatches FAPP frames
- process_slot_announce() with relay/flood logic
- process_slot_query() answers from local FappStore
- broadcast_announce() / send_query() for outbound floods
- drain_pending_sends() for async send integration
- 3 unit tests

Also fixed borrow checker issue in FappStore::store
This commit is contained in:
2026-04-01 07:47:33 +02:00
parent 0b3d5c5100
commit 65ce5aec18
3 changed files with 346 additions and 11 deletions

View File

@@ -426,15 +426,15 @@ impl FappStore {
} }
} }
// Capacity check per therapist. let addr = announce.therapist_address;
let queue = self
.announces // Capacity check per therapist (evict oldest if needed).
.entry(announce.therapist_address) {
.or_default(); let queue = self.announces.entry(addr).or_default();
if queue.len() >= MAX_PER_THERAPIST { if queue.len() >= MAX_PER_THERAPIST {
// Evict oldest.
queue.remove(0); queue.remove(0);
} }
}
// Total capacity check. // Total capacity check.
let total: usize = self.announces.values().map(|q| q.len()).sum(); let total: usize = self.announces.values().map(|q| q.len()).sum();
@@ -452,10 +452,9 @@ impl FappStore {
} }
// Update latest sequence. // Update latest sequence.
self.latest_sequence self.latest_sequence.insert(addr, announce.sequence);
.insert(announce.therapist_address, announce.sequence);
queue.push(announce); self.announces.entry(addr).or_default().push(announce);
true true
} }

View File

@@ -0,0 +1,335 @@
//! FAPP routing: decode wire frames, integrate with [`RoutingTable`](crate::routing_table::RoutingTable)
//! and [`TransportManager`](crate::transport_manager::TransportManager).
//!
//! [`FappRouter::broadcast_announce`](FappRouter::broadcast_announce) and
//! [`FappRouter::send_query`](FappRouter::send_query) enqueue outbound frames; call
//! [`FappRouter::drain_pending_sends`](FappRouter::drain_pending_sends) and pass each
//! payload to [`TransportManager::send`](crate::transport_manager::TransportManager::send)
//! from an async context.
use std::collections::HashSet;
use std::sync::{Arc, Mutex, RwLock};
use anyhow::{bail, Result};
use crate::fapp::{
FappStore, SlotAnnounce, SlotQuery, SlotResponse, CAP_FAPP_PATIENT, CAP_FAPP_RELAY,
CAP_FAPP_THERAPIST,
};
use crate::routing_table::RoutingTable;
use crate::transport::TransportAddr;
use crate::transport_manager::TransportManager;
// ---------------------------------------------------------------------------
// Wire message tags (CBOR body follows the tag byte)
// ---------------------------------------------------------------------------
/// [`SlotAnnounce`] frame.
pub const FAPP_WIRE_ANNOUNCE: u8 = 0x01;
/// [`SlotQuery`] frame.
pub const FAPP_WIRE_QUERY: u8 = 0x02;
/// [`SlotResponse`] frame.
pub const FAPP_WIRE_RESPONSE: u8 = 0x03;
/// [`SlotReserve`](crate::fapp::SlotReserve) frame (handled later).
pub const FAPP_WIRE_RESERVE: u8 = 0x04;
/// [`SlotConfirm`](crate::fapp::SlotConfirm) frame (handled later).
pub const FAPP_WIRE_CONFIRM: u8 = 0x05;
// ---------------------------------------------------------------------------
// FappAction — what to do after handling an incoming FAPP frame
// ---------------------------------------------------------------------------
/// Result of processing an incoming FAPP payload (mirrors [`IncomingAction`](crate::mesh_router::IncomingAction) style).
#[derive(Debug)]
pub enum FappAction {
/// No application-visible effect.
Ignore,
/// Invalid frame, unknown tag, or rejected message.
Dropped(String),
/// Flood this wire payload to each listed next hop.
Forward {
wire: Vec<u8>,
next_hops: Vec<TransportAddr>,
},
/// Relay answered from [`FappStore`] (matches may be empty).
QueryResponse(SlotResponse),
}
// ---------------------------------------------------------------------------
// Wire helpers
// ---------------------------------------------------------------------------
fn encode_tagged(tag: u8, cbor_body: &[u8]) -> Vec<u8> {
let mut out = Vec::with_capacity(1 + cbor_body.len());
out.push(tag);
out.extend_from_slice(cbor_body);
out
}
fn slot_query_to_wire(query: &SlotQuery) -> Vec<u8> {
let mut buf = Vec::new();
ciborium::into_writer(query, &mut buf).expect("SlotQuery CBOR");
buf
}
fn slot_query_from_wire(bytes: &[u8]) -> Result<SlotQuery> {
let q: SlotQuery = ciborium::from_reader(bytes)?;
Ok(q)
}
/// Unique next-hop addresses from the routing table (flood fan-out).
fn flood_targets(table: &RoutingTable) -> Vec<TransportAddr> {
let mut seen = HashSet::new();
let mut out = Vec::new();
for e in table.entries() {
if seen.insert(e.next_hop_addr.clone()) {
out.push(e.next_hop_addr.clone());
}
}
out
}
fn enqueue_flood(
pending: &Mutex<Vec<(TransportAddr, Vec<u8>)>>,
wire: Vec<u8>,
table: &RoutingTable,
) -> Result<()> {
let hops = flood_targets(table);
if hops.is_empty() {
bail!("no mesh neighbors in routing table for flood");
}
let mut q = pending
.lock()
.map_err(|e| anyhow::anyhow!("pending_sends lock poisoned: {e}"))?;
for addr in hops {
q.push((addr, wire.clone()));
}
Ok(())
}
// ---------------------------------------------------------------------------
// FappRouter
// ---------------------------------------------------------------------------
/// FAPP message router integrated with the mesh [`RoutingTable`] and transports.
pub struct FappRouter {
/// Local announcement cache and query index (relay nodes).
store: Mutex<FappStore>,
/// Shared with [`MeshRouter`](crate::mesh_router::MeshRouter).
routes: Arc<RwLock<RoutingTable>>,
/// Shared transport manager (same as [`MeshRouter`](crate::mesh_router::MeshRouter); wire-up sends via [`Self::drain_pending_sends`] until sync send exists).
#[allow(dead_code)]
transports: Arc<TransportManager>,
/// Bitfield: [`CAP_FAPP_THERAPIST`], [`CAP_FAPP_RELAY`], [`CAP_FAPP_PATIENT`].
local_capabilities: u16,
/// Frames produced by [`Self::broadcast_announce`] and [`Self::send_query`].
pending_sends: Mutex<Vec<(TransportAddr, Vec<u8>)>>,
}
impl FappRouter {
/// Create a router with the given store, shared routing table, transports, and capability mask.
pub fn new(
store: FappStore,
routes: Arc<RwLock<RoutingTable>>,
transports: Arc<TransportManager>,
local_capabilities: u16,
) -> Self {
Self {
store: Mutex::new(store),
routes,
transports,
local_capabilities,
pending_sends: Mutex::new(Vec::new()),
}
}
/// Decode a tagged FAPP wire frame and apply local policy.
pub fn handle_incoming(&self, bytes: &[u8]) -> FappAction {
if bytes.is_empty() {
return FappAction::Dropped("empty FAPP frame".into());
}
let tag = bytes[0];
let body = &bytes[1..];
match tag {
FAPP_WIRE_ANNOUNCE => match SlotAnnounce::from_wire(body) {
Ok(a) => self.process_slot_announce(a),
Err(e) => FappAction::Dropped(format!("announce CBOR: {e}")),
},
FAPP_WIRE_QUERY => match slot_query_from_wire(body) {
Ok(q) => self.process_slot_query(q),
Err(e) => FappAction::Dropped(format!("query CBOR: {e}")),
},
FAPP_WIRE_RESPONSE | FAPP_WIRE_RESERVE | FAPP_WIRE_CONFIRM => {
FappAction::Dropped(format!("unhandled FAPP tag 0x{tag:02x}"))
}
_ => FappAction::Dropped(format!("unknown FAPP tag 0x{tag:02x}")),
}
}
/// Enqueue a signed [`SlotAnnounce`] to all known next hops (therapist publish / relay re-flood).
pub fn broadcast_announce(&self, announce: SlotAnnounce) -> Result<()> {
if self.local_capabilities & CAP_FAPP_THERAPIST == 0 {
bail!("missing CAP_FAPP_THERAPIST");
}
let wire = encode_tagged(FAPP_WIRE_ANNOUNCE, &announce.to_wire());
let table = self
.routes
.read()
.map_err(|e| anyhow::anyhow!("routing table lock poisoned: {e}"))?;
enqueue_flood(&self.pending_sends, wire, &table)
}
/// Enqueue an anonymous [`SlotQuery`] flood (patient discovery).
pub fn send_query(&self, query: SlotQuery) -> Result<()> {
if self.local_capabilities & CAP_FAPP_PATIENT == 0 {
bail!("missing CAP_FAPP_PATIENT");
}
let body = slot_query_to_wire(&query);
let wire = encode_tagged(FAPP_WIRE_QUERY, &body);
let table = self
.routes
.read()
.map_err(|e| anyhow::anyhow!("routing table lock poisoned: {e}"))?;
enqueue_flood(&self.pending_sends, wire, &table)
}
/// Apply relay / propagation rules to a decoded [`SlotAnnounce`].
pub fn process_slot_announce(&self, announce: SlotAnnounce) -> FappAction {
if !announce.can_propagate() {
return FappAction::Dropped("announce expired or max hops".into());
}
let has_relay = self.local_capabilities & CAP_FAPP_RELAY != 0;
if !has_relay {
return FappAction::Ignore;
}
let mut store = match self.store.lock() {
Ok(g) => g,
Err(e) => return FappAction::Dropped(format!("fapp store lock poisoned: {e}")),
};
if store.seen(&announce.id) {
return FappAction::Ignore;
}
let stored = store.store(announce.clone());
if !stored {
return FappAction::Ignore;
}
let forwarded = announce.forwarded();
if !forwarded.can_propagate() {
return FappAction::Ignore;
}
let wire = encode_tagged(FAPP_WIRE_ANNOUNCE, &forwarded.to_wire());
let next_hops = {
let table = match self.routes.read() {
Ok(t) => t,
Err(e) => {
return FappAction::Dropped(format!("routing table lock poisoned: {e}"));
}
};
flood_targets(&table)
};
if next_hops.is_empty() {
return FappAction::Ignore;
}
FappAction::Forward {
wire,
next_hops,
}
}
/// Answer from cache and/or ignore (query flooding is a separate [`Self::send_query`] path).
pub fn process_slot_query(&self, query: SlotQuery) -> FappAction {
if self.local_capabilities & CAP_FAPP_RELAY == 0 {
return FappAction::Ignore;
}
let store = match self.store.lock() {
Ok(g) => g,
Err(e) => return FappAction::Dropped(format!("fapp store lock poisoned: {e}")),
};
let response = store.query(&query);
FappAction::QueryResponse(response)
}
/// Take queued outbound frames (typically sent with `TransportManager::send` in async code).
pub fn drain_pending_sends(&self) -> Result<Vec<(TransportAddr, Vec<u8>)>> {
let mut q = self
.pending_sends
.lock()
.map_err(|e| anyhow::anyhow!("pending_sends lock poisoned: {e}"))?;
let out = std::mem::take(&mut *q);
Ok(out)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
use crate::fapp::{Fachrichtung, Kostentraeger, Modalitaet, SlotType, TimeSlot};
use crate::identity::MeshIdentity;
#[test]
fn handle_incoming_unknown_tag_dropped() {
let routes = Arc::new(RwLock::new(RoutingTable::new(Duration::from_secs(300))));
let transports = Arc::new(TransportManager::new());
let r = FappRouter::new(FappStore::new(), routes, transports, CAP_FAPP_RELAY);
match r.handle_incoming(&[0xFF]) {
FappAction::Dropped(msg) => assert!(msg.contains("unknown")),
other => panic!("expected Dropped, got {other:?}"),
}
}
#[test]
fn process_slot_query_requires_relay_cap() {
let routes = Arc::new(RwLock::new(RoutingTable::new(Duration::from_secs(300))));
let transports = Arc::new(TransportManager::new());
let r = FappRouter::new(FappStore::new(), routes, transports, 0);
let q = SlotQuery {
query_id: [1u8; 16],
fachrichtung: None,
modalitaet: None,
kostentraeger: None,
plz_prefix: None,
earliest: None,
latest: None,
slot_type: None,
max_results: 5,
};
assert!(matches!(r.process_slot_query(q), FappAction::Ignore));
}
#[test]
fn broadcast_announce_requires_therapist_cap() {
let routes = Arc::new(RwLock::new(RoutingTable::new(Duration::from_secs(300))));
let transports = Arc::new(TransportManager::new());
let r = FappRouter::new(FappStore::new(), routes, transports, CAP_FAPP_RELAY);
let id = MeshIdentity::generate();
let a = SlotAnnounce::new(
&id,
vec![Fachrichtung::Verhaltenstherapie],
vec![Modalitaet::Praxis],
vec![Kostentraeger::GKV],
"80331".into(),
vec![TimeSlot {
start_unix: 1,
duration_minutes: 50,
slot_type: SlotType::Therapie,
}],
[0xAA; 32],
1,
);
assert!(r.broadcast_announce(a).is_err());
}
}

View File

@@ -16,6 +16,7 @@ pub mod address;
pub mod announce; pub mod announce;
pub mod announce_protocol; pub mod announce_protocol;
pub mod fapp; pub mod fapp;
pub mod fapp_router;
pub mod broadcast; pub mod broadcast;
pub mod envelope; pub mod envelope;
pub mod envelope_v2; pub mod envelope_v2;