diff --git a/Cargo.lock b/Cargo.lock index 7f759dc..58e9b55 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3202,6 +3202,21 @@ version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" +[[package]] +name = "meshservice" +version = "0.1.0" +dependencies = [ + "anyhow", + "ciborium", + "ed25519-dalek 2.2.0", + "rand 0.8.5", + "serde", + "sha2 0.10.9", + "thiserror 1.0.69", + "tokio", + "x25519-dalek", +] + [[package]] name = "metrics" version = "0.22.4" diff --git a/Cargo.toml b/Cargo.toml index 2df47c8..987a330 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,8 @@ members = [ # P2P crate uses iroh (~90 extra deps). Only compiled when the `mesh` # feature is enabled on quicprochat-client. "crates/quicprochat-p2p", + # Generic decentralized service layer (FAPP, Housing, etc.) + "crates/meshservice", ] [workspace.package] diff --git a/crates/meshservice/Cargo.toml b/crates/meshservice/Cargo.toml new file mode 100644 index 0000000..8e4f59e --- /dev/null +++ b/crates/meshservice/Cargo.toml @@ -0,0 +1,43 @@ +[package] +name = "meshservice" +version = "0.1.0" +edition = "2021" +authors = ["Chris "] +description = "Generic decentralized service layer for mesh networks" +license = "MIT" +repository = "https://git.xorwell.de/c/meshservice" +keywords = ["mesh", "p2p", "decentralized", "services"] +categories = ["network-programming"] + +[dependencies] +# Serialization +serde = { version = "1.0", features = ["derive"] } +ciborium = "0.2" + +# Crypto +ed25519-dalek = { version = "2.1", features = ["serde"] } +sha2 = "0.10" +rand = "0.8" +x25519-dalek = "2.0" + +# Async +tokio = { version = "1.36", features = ["sync", "time"] } + +# Error handling +anyhow = "1.0" +thiserror = "1.0" + +[dev-dependencies] +tokio = { version = "1.36", features = ["rt-multi-thread", "macros"] } + +[[example]] +name = "fapp_service" +path = "examples/fapp_service.rs" + +[[example]] +name = "housing_service" +path = "examples/housing_service.rs" + +[[example]] +name = "multi_service" +path = "examples/multi_service.rs" diff --git a/crates/meshservice/README.md b/crates/meshservice/README.md new file mode 100644 index 0000000..6c7648d --- /dev/null +++ b/crates/meshservice/README.md @@ -0,0 +1,220 @@ +# MeshService + +A generic decentralized service layer for mesh networks. Build any peer-to-peer service following the **Announce → Query → Response → Reserve** pattern. + +## Architecture + +``` +┌─────────────────────────────────────────────────────────────┐ +│ Application Services │ +│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ +│ │ FAPP │ │ Housing │ │ Repair │ │ Custom │ ... │ +│ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ │ +│ └────────────┴────────────┴────────────┘ │ +│ Service Layer (this crate) │ +│ ServiceMessage, ServiceRouter, Verification │ +│ ─────────────────────────────────────────────────────── │ +│ Mesh Layer │ +│ (provided by quicprochat-p2p or other mesh impl) │ +└─────────────────────────────────────────────────────────────┘ +``` + +## Features + +- **Generic Protocol**: Any service can be built on top (therapy appointments, housing, repairs, tutoring...) +- **Ed25519 Signatures**: All messages cryptographically signed +- **Verification Framework**: Multi-level trust (self-asserted, peer-endorsed, registry-verified) +- **Efficient Wire Format**: Fixed 64-byte header + CBOR payload +- **Pluggable Handlers**: Register custom services with the router +- **Built-in Services**: FAPP (psychotherapy) and Housing included + +## Quick Start + +```rust +use meshservice::{ + capabilities, + identity::ServiceIdentity, + router::ServiceRouter, + services::fapp::{FappService, SlotAnnounce, SlotQuery, Specialism, Modality}, +}; + +// Create identity +let identity = ServiceIdentity::generate(); + +// Create router with FAPP service +let mut router = ServiceRouter::new(capabilities::RELAY); +router.register(Box::new(FappService::relay())); + +// Therapist announces slots +let announce = SlotAnnounce::new( + &[Specialism::CognitiveBehavioral], + Modality::VideoCall, + "104", // Postal prefix +) +.with_slots(3) +.with_profile("https://therapists.de/dr-mueller"); + +let msg = meshservice::services::fapp::create_announce(&identity, &announce, 1)?; +router.handle(msg, Some(identity.public_key()))?; + +// Patient queries +let query = SlotQuery::new(Specialism::CognitiveBehavioral, "104"); +let query_msg = meshservice::services::fapp::create_query(&identity, &query)?; +let matches = router.query(&query_msg); + +println!("Found {} therapists", matches.len()); +``` + +## Built-in Services + +### FAPP (Free Appointment Propagation Protocol) + +Decentralized psychotherapy appointment discovery: + +| Service ID | Purpose | +|------------|---------| +| `0x0001` | Therapist slot announcements, patient queries | + +```rust +use meshservice::services::fapp::{SlotAnnounce, Specialism, Modality}; + +let announce = SlotAnnounce::new( + &[Specialism::TraumaFocused, Specialism::CognitiveBehavioral], + Modality::InPerson, + "104", +) +.with_slots(2) +.with_profile("https://kbv.de/123"); +``` + +### Housing + +Decentralized room/apartment sharing: + +| Service ID | Purpose | +|------------|---------| +| `0x0002` | Listing announcements, seeker queries | + +```rust +use meshservice::services::housing::{ListingAnnounce, ListingType, amenities}; + +let listing = ListingAnnounce::new(ListingType::Apartment, 65, 850, "104") + .with_rooms(2) + .with_amenities(amenities::FURNISHED | amenities::BALCONY); +``` + +## Verification Framework + +Three trust levels: + +| Level | Description | Example | +|-------|-------------|---------| +| 0 - None | Bare announcement | Anonymous | +| 1 - Self-Asserted | Profile URL provided | Website link | +| 2 - Peer-Endorsed | Trusted peers vouch | Community rating | +| 3 - Registry-Verified | Official registry | KBV license | + +```rust +use meshservice::verification::{Verification, TrustedVerifiers, VerificationLevel}; + +// Add trusted verifier +let mut verifiers = TrustedVerifiers::new(); +verifiers.add(registry_public_key, "KBV Registry", VerificationLevel::RegistryVerified); +router.set_trusted_verifiers(verifiers); + +// Require verification for announces +router.set_min_verification_level(2); +``` + +## Wire Protocol + +64-byte fixed header for efficient parsing: + +``` + 0-3 service_id (u32 LE) + 4 message_type (u8) + 5 version (u8) + 6-7 flags (reserved) + 8-23 message_id (16 bytes) +24-39 sender_address (16 bytes) +40-47 sequence (u64 LE) +48-49 ttl_hours (u16 LE) +50-57 timestamp (u64 LE) +58 hop_count (u8) +59 max_hops (u8) +60-63 payload_len (u32 LE) +--- +64+ signature (64 bytes) +128+ payload (CBOR) +... verifications (optional CBOR) +``` + +## Building Custom Services + +Implement `ServiceHandler`: + +```rust +use meshservice::router::{ServiceHandler, ServiceAction, HandlerContext}; + +struct MyService; + +impl ServiceHandler for MyService { + fn service_id(&self) -> u32 { 0x8001 } // Custom range + fn name(&self) -> &str { "MyService" } + + fn handle(&self, message: &ServiceMessage, ctx: &HandlerContext) + -> Result + { + match message.message_type { + MessageType::Announce => Ok(ServiceAction::StoreAndForward), + MessageType::Query => { + // Find matches, respond... + Ok(ServiceAction::Handled) + } + _ => Ok(ServiceAction::Drop) + } + } + + fn matches_query(&self, announce: &StoredMessage, query: &ServiceMessage) -> bool { + // Custom matching logic + true + } +} +``` + +## Service IDs + +| ID | Service | +|----|---------| +| `0x0001` | FAPP (Psychotherapy) | +| `0x0002` | Housing | +| `0x0003` | Repair | +| `0x0004` | Tutoring | +| `0x0005` | Medical | +| `0x0006` | Legal | +| `0x0007` | Volunteer | +| `0x0008` | Events | +| `0x8000+` | Custom/User-defined | + +## Examples + +```bash +# FAPP demo (therapist + patient) +cargo run --example fapp_service + +# Housing demo (landlord + seeker) +cargo run --example housing_service + +# Multi-service mesh +cargo run --example multi_service +``` + +## Testing + +```bash +cargo test +``` + +## License + +MIT diff --git a/crates/meshservice/examples/fapp_service.rs b/crates/meshservice/examples/fapp_service.rs new file mode 100644 index 0000000..a2eb1b7 --- /dev/null +++ b/crates/meshservice/examples/fapp_service.rs @@ -0,0 +1,86 @@ +//! FAPP Service Demo +//! +//! Demonstrates therapist announcement and patient query flow. + +use meshservice::{ + capabilities, + identity::ServiceIdentity, + router::ServiceRouter, + services::fapp::{create_announce, create_query, FappService, Modality, SlotAnnounce, SlotQuery, Specialism}, +}; + +fn main() { + println!("=== FAPP Service Demo ===\n"); + + // Create identities + let therapist = ServiceIdentity::generate(); + let patient = ServiceIdentity::generate(); + let relay = ServiceIdentity::generate(); + + println!("Therapist address: {:?}", hex(&therapist.address())); + println!("Patient address: {:?}", hex(&patient.address())); + println!("Relay address: {:?}\n", hex(&relay.address())); + + // Create router with FAPP service + let mut router = ServiceRouter::new(capabilities::RELAY); + router.register(Box::new(FappService::relay())); + + // Therapist creates announcement + let announce = SlotAnnounce::new( + &[Specialism::CognitiveBehavioral, Specialism::TraumaFocused], + Modality::VideoCall, + "104", // Berlin Kreuzberg + ) + .with_slots(3) + .with_profile("https://therapists.de/dr-schmidt") + .with_name("Dr. Anna Schmidt"); + + println!("Therapist announces:"); + println!(" Specialisms: CBT, Trauma"); + println!(" Modality: Video"); + println!(" Location: 104xx"); + println!(" Slots: 3"); + println!(" Profile: https://therapists.de/dr-schmidt\n"); + + let msg = create_announce(&therapist, &announce, 1).unwrap(); + let action = router.handle(msg.clone(), Some(therapist.public_key())).unwrap(); + println!("Router action: {:?}", action); + println!("Stored messages: {}\n", router.store().len()); + + // Patient creates query + let query = SlotQuery::new(Specialism::CognitiveBehavioral, "104") + .with_modality(Modality::VideoCall) + .with_max_wait(30); + + println!("Patient queries:"); + println!(" Looking for: CBT"); + println!(" Location: 104xx"); + println!(" Modality: Video"); + println!(" Max wait: 30 days\n"); + + let query_msg = create_query(&patient, &query).unwrap(); + + // Find matches + let matches = router.query(&query_msg); + println!("Found {} matching therapist(s):", matches.len()); + + for (i, m) in matches.iter().enumerate() { + if let Ok(data) = meshservice::services::fapp::SlotAnnounce::from_bytes(&m.message.payload) { + println!(" {}. {} in {}xx ({} slots)", + i + 1, + data.display_name.as_deref().unwrap_or("Unknown"), + data.postal_prefix, + data.available_slots + ); + if let Some(profile) = &data.profile_url { + println!(" Verify: {}", profile); + } + } + } + + println!("\n=== Demo Complete ==="); +} + +fn hex(bytes: &[u8]) -> String { + bytes.iter().map(|b| format!("{b:02x}")).collect() +} diff --git a/crates/meshservice/examples/housing_service.rs b/crates/meshservice/examples/housing_service.rs new file mode 100644 index 0000000..8b7a71f --- /dev/null +++ b/crates/meshservice/examples/housing_service.rs @@ -0,0 +1,97 @@ +//! Housing Service Demo +//! +//! Demonstrates landlord listing and seeker query flow. + +use meshservice::{ + capabilities, + identity::ServiceIdentity, + router::ServiceRouter, + services::housing::{ + amenities, create_announce, create_query, HousingService, ListingAnnounce, ListingQuery, + ListingType, + }, +}; + +fn main() { + println!("=== Housing Service Demo ===\n"); + + // Create identities + let landlord1 = ServiceIdentity::generate(); + let landlord2 = ServiceIdentity::generate(); + let seeker = ServiceIdentity::generate(); + + // Create router with Housing service + let mut router = ServiceRouter::new(capabilities::RELAY); + router.register(Box::new(HousingService::relay())); + + // Landlord 1: Kreuzberg apartment + let listing1 = ListingAnnounce::new(ListingType::Apartment, 65, 950, "104") + .with_rooms(2) + .with_amenities(amenities::FURNISHED | amenities::BALCONY | amenities::INTERNET) + .with_title("Sunny 2-room in Kreuzberg"); + + println!("Landlord 1 announces:"); + println!(" {} sqm {} in {}xx", listing1.size_sqm, "Apartment", listing1.postal_prefix); + println!(" Rent: {} EUR/month", listing1.rent_euros()); + println!(" Rooms: {}", listing1.rooms); + println!(" Amenities: Furnished, Balcony, Internet\n"); + + let msg1 = create_announce(&landlord1, &listing1, 1).unwrap(); + router.handle(msg1, Some(landlord1.public_key())).unwrap(); + + // Landlord 2: Neukölln shared flat room + let listing2 = ListingAnnounce::new(ListingType::Room, 18, 450, "120") + .with_rooms(1) + .with_amenities(amenities::WASHING_MACHINE | amenities::INTERNET) + .with_title("Room in friendly WG"); + + println!("Landlord 2 announces:"); + println!(" {} sqm {} in {}xx", listing2.size_sqm, "Room", listing2.postal_prefix); + println!(" Rent: {} EUR/month", listing2.rent_euros()); + println!(" Amenities: Washing machine, Internet\n"); + + let msg2 = create_announce(&landlord2, &listing2, 1).unwrap(); + router.handle(msg2, Some(landlord2.public_key())).unwrap(); + + println!("Total listings in store: {}\n", router.store().len()); + + // Seeker 1: Looking for affordable apartment + println!("--- Seeker Query 1: Affordable apartment ---"); + let query1 = ListingQuery::new("10", 800) // Any 10xxx area, max 800 EUR + .with_type(ListingType::Apartment) + .with_min_size(40); + + println!(" Area: 10xxx"); + println!(" Type: Apartment"); + println!(" Max rent: 800 EUR"); + println!(" Min size: 40 sqm\n"); + + let query_msg1 = create_query(&seeker, &query1).unwrap(); + let matches1 = router.query(&query_msg1); + println!("Found {} matches:", matches1.len()); + for m in &matches1 { + if let Ok(l) = ListingAnnounce::from_bytes(&m.message.payload) { + println!(" - {} ({}xx, {} EUR)", l.title.as_deref().unwrap_or("No title"), l.postal_prefix, l.rent_euros()); + } + } + + // Seeker 2: Looking for any cheap room + println!("\n--- Seeker Query 2: Any room under 500 EUR ---"); + let query2 = ListingQuery::new("1", 500); // Any 1xxxx area + + let query_msg2 = create_query(&seeker, &query2).unwrap(); + let matches2 = router.query(&query_msg2); + println!("Found {} matches:", matches2.len()); + for m in &matches2 { + if let Ok(l) = ListingAnnounce::from_bytes(&m.message.payload) { + println!(" - {} ({}xx, {} sqm, {} EUR)", + l.title.as_deref().unwrap_or("No title"), + l.postal_prefix, + l.size_sqm, + l.rent_euros() + ); + } + } + + println!("\n=== Demo Complete ==="); +} diff --git a/crates/meshservice/examples/multi_service.rs b/crates/meshservice/examples/multi_service.rs new file mode 100644 index 0000000..040951e --- /dev/null +++ b/crates/meshservice/examples/multi_service.rs @@ -0,0 +1,89 @@ +//! Multi-Service Demo +//! +//! Shows how multiple services can run on the same mesh router. + +use meshservice::{ + capabilities, + identity::ServiceIdentity, + router::ServiceRouter, + service_ids, + services::{ + fapp::{create_announce as fapp_announce, FappService, Modality, SlotAnnounce, Specialism}, + housing::{ + amenities, create_announce as housing_announce, HousingService, ListingAnnounce, + ListingType, + }, + }, + verification::{TrustedVerifiers, Verification, VerificationLevel}, +}; + +fn main() { + println!("=== Multi-Service Mesh Demo ===\n"); + + // Create a router that handles both FAPP and Housing + let mut router = ServiceRouter::new(capabilities::RELAY | capabilities::CONSUMER); + router.register(Box::new(FappService::relay())); + router.register(Box::new(HousingService::relay())); + + println!("Registered services:"); + for (id, name) in router.services() { + println!(" 0x{:04x} - {}", id, name); + } + println!(); + + // Create identities + let therapist = ServiceIdentity::generate(); + let landlord = ServiceIdentity::generate(); + let registry = ServiceIdentity::generate(); + + // Setup trusted verifiers + let mut verifiers = TrustedVerifiers::new(); + verifiers.add( + registry.public_key(), + "Health Registry", + VerificationLevel::RegistryVerified, + ); + router.set_trusted_verifiers(verifiers); + + // Therapist announcement with verification + println!("--- Adding FAPP announcement ---"); + let fapp_data = SlotAnnounce::new(&[Specialism::Psychoanalysis], Modality::InPerson, "104") + .with_profile("https://kbv.de/therapists/12345"); + + let mut fapp_msg = fapp_announce(&therapist, &fapp_data, 1).unwrap(); + + // Registry verifies therapist + let verification = Verification::registry( + ®istry, + &therapist.address(), + "licensed_therapist", + "KBV-12345", + ); + fapp_msg.add_verification(verification); + + router.handle(fapp_msg, Some(therapist.public_key())).unwrap(); + println!("FAPP announcement stored (with registry verification)\n"); + + // Housing announcement + println!("--- Adding Housing announcement ---"); + let housing_data = ListingAnnounce::new(ListingType::Studio, 35, 700, "104") + .with_amenities(amenities::FURNISHED | amenities::INTERNET) + .with_title("Cozy studio near therapist offices"); + + let housing_msg = housing_announce(&landlord, &housing_data, 1).unwrap(); + router.handle(housing_msg, Some(landlord.public_key())).unwrap(); + println!("Housing announcement stored\n"); + + // Summary + println!("--- Store Summary ---"); + println!("FAPP messages: {}", router.store().service_count(service_ids::FAPP)); + println!("Housing messages: {}", router.store().service_count(service_ids::HOUSING)); + println!("Total messages: {}", router.store().len()); + + println!("\n=== Multi-Service Demo Complete ==="); + println!("\nThe mesh can route and store messages for multiple services"); + println!("using a single router instance. Each service has its own:"); + println!(" - Payload format"); + println!(" - Query matching logic"); + println!(" - Handler implementation"); +} diff --git a/crates/meshservice/src/anti_abuse.rs b/crates/meshservice/src/anti_abuse.rs new file mode 100644 index 0000000..753f332 --- /dev/null +++ b/crates/meshservice/src/anti_abuse.rs @@ -0,0 +1,532 @@ +//! Anti-abuse mechanisms for preventing slot blocking and spam. + +use std::collections::HashMap; +use std::time::{SystemTime, UNIX_EPOCH}; + +use sha2::{Digest, Sha256}; + +/// Rate limiting configuration. +#[derive(Debug, Clone)] +pub struct RateLimits { + /// Max reservations per sender per hour. + pub max_reservations_per_hour: u8, + /// Max pending (unconfirmed) reservations per sender. + pub max_pending_reservations: u8, + /// Min time between reservations (seconds). + pub reservation_cooldown_secs: u32, + /// Max queries per sender per minute. + pub max_queries_per_minute: u8, +} + +impl Default for RateLimits { + fn default() -> Self { + Self { + max_reservations_per_hour: 3, + max_pending_reservations: 2, + reservation_cooldown_secs: 300, + max_queries_per_minute: 10, + } + } +} + +/// Tracks sender activity for rate limiting. +#[derive(Debug, Default)] +pub struct RateLimiter { + limits: RateLimits, + /// sender_address -> activity + activity: HashMap<[u8; 16], SenderActivity>, +} + +#[derive(Debug, Default)] +struct SenderActivity { + /// Timestamps of reservations in last hour. + reservation_times: Vec, + /// Count of pending reservations. + pending_count: u8, + /// Timestamp of last reservation. + last_reservation: u64, + /// Query timestamps in last minute. + query_times: Vec, +} + +impl RateLimiter { + /// Create with default limits. + pub fn new() -> Self { + Self::default() + } + + /// Create with custom limits. + pub fn with_limits(limits: RateLimits) -> Self { + Self { + limits, + activity: HashMap::new(), + } + } + + /// Check if a reservation is allowed. + pub fn check_reservation(&mut self, sender: &[u8; 16]) -> RateLimitResult { + let now = now(); + let activity = self.activity.entry(*sender).or_default(); + + // Clean old entries + activity.reservation_times.retain(|&t| now - t < 3600); + + // Check cooldown + if now - activity.last_reservation < u64::from(self.limits.reservation_cooldown_secs) { + return RateLimitResult::Cooldown { + wait_secs: self.limits.reservation_cooldown_secs - (now - activity.last_reservation) as u32, + }; + } + + // Check hourly limit + if activity.reservation_times.len() >= self.limits.max_reservations_per_hour as usize { + return RateLimitResult::HourlyLimitReached; + } + + // Check pending limit + if activity.pending_count >= self.limits.max_pending_reservations { + return RateLimitResult::TooManyPending; + } + + RateLimitResult::Allowed + } + + /// Record a reservation attempt. + pub fn record_reservation(&mut self, sender: &[u8; 16]) { + let now = now(); + let activity = self.activity.entry(*sender).or_default(); + activity.reservation_times.push(now); + activity.last_reservation = now; + activity.pending_count = activity.pending_count.saturating_add(1); + } + + /// Record reservation confirmed/completed (reduce pending). + pub fn record_reservation_resolved(&mut self, sender: &[u8; 16]) { + if let Some(activity) = self.activity.get_mut(sender) { + activity.pending_count = activity.pending_count.saturating_sub(1); + } + } + + /// Check if a query is allowed. + pub fn check_query(&mut self, sender: &[u8; 16]) -> RateLimitResult { + let now = now(); + let activity = self.activity.entry(*sender).or_default(); + + // Clean old entries + activity.query_times.retain(|&t| now - t < 60); + + if activity.query_times.len() >= self.limits.max_queries_per_minute as usize { + return RateLimitResult::QueryLimitReached; + } + + RateLimitResult::Allowed + } + + /// Record a query. + pub fn record_query(&mut self, sender: &[u8; 16]) { + let now = now(); + let activity = self.activity.entry(*sender).or_default(); + activity.query_times.push(now); + } + + /// Prune old activity data. + pub fn prune(&mut self) { + let now = now(); + self.activity.retain(|_, a| { + a.reservation_times.retain(|&t| now - t < 3600); + a.query_times.retain(|&t| now - t < 60); + !a.reservation_times.is_empty() || !a.query_times.is_empty() || a.pending_count > 0 + }); + } +} + +/// Result of rate limit check. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum RateLimitResult { + /// Request allowed. + Allowed, + /// Must wait before next reservation. + Cooldown { wait_secs: u32 }, + /// Hourly reservation limit reached. + HourlyLimitReached, + /// Too many pending reservations. + TooManyPending, + /// Query rate limit reached. + QueryLimitReached, +} + +impl RateLimitResult { + pub fn is_allowed(&self) -> bool { + matches!(self, RateLimitResult::Allowed) + } +} + +/// Proof-of-work for reservation requests. +#[derive(Debug, Clone)] +pub struct ProofOfWork { + /// Nonce that produces valid hash. + pub nonce: u64, + /// Required difficulty (leading zero bits). + pub difficulty: u8, +} + +impl ProofOfWork { + /// Default difficulty (20 bits ≈ 1-2 seconds on modern CPU). + pub const DEFAULT_DIFFICULTY: u8 = 20; + + /// Generate proof-of-work for a reservation. + pub fn generate(reservation_id: &[u8; 16], difficulty: u8) -> Self { + let mut nonce = 0u64; + loop { + if Self::check_hash(reservation_id, nonce, difficulty) { + return Self { nonce, difficulty }; + } + nonce = nonce.wrapping_add(1); + } + } + + /// Verify proof-of-work. + pub fn verify(&self, reservation_id: &[u8; 16]) -> bool { + Self::check_hash(reservation_id, self.nonce, self.difficulty) + } + + fn check_hash(reservation_id: &[u8; 16], nonce: u64, difficulty: u8) -> bool { + let mut hasher = Sha256::new(); + hasher.update(reservation_id); + hasher.update(&nonce.to_le_bytes()); + let hash = hasher.finalize(); + leading_zero_bits(&hash) >= difficulty + } +} + +/// Count leading zero bits in a byte slice. +fn leading_zero_bits(data: &[u8]) -> u8 { + let mut count = 0u8; + for byte in data { + if *byte == 0 { + count += 8; + } else { + count += byte.leading_zeros() as u8; + break; + } + } + count +} + +/// Sender reputation tracking. +#[derive(Debug, Clone, Default)] +pub struct SenderReputation { + pub address: [u8; 16], + pub reservations_made: u32, + pub reservations_honored: u32, + pub reservations_cancelled: u32, + pub no_shows: u32, + pub last_no_show: Option, +} + +impl SenderReputation { + /// Create for a new sender. + pub fn new(address: [u8; 16]) -> Self { + Self { + address, + ..Default::default() + } + } + + /// Calculate honor rate (0.0 to 1.0). + pub fn honor_rate(&self) -> f32 { + if self.reservations_made == 0 { + return 0.5; // Neutral for new users + } + (self.reservations_honored as f32) / (self.reservations_made as f32) + } + + /// Check if sender should be blocked. + pub fn is_blocked(&self) -> bool { + self.no_shows >= 3 || (self.reservations_made >= 5 && self.honor_rate() < 0.5) + } + + /// Record a completed reservation. + pub fn record_honored(&mut self) { + self.reservations_made += 1; + self.reservations_honored += 1; + } + + /// Record a cancelled reservation (with notice). + pub fn record_cancelled(&mut self) { + self.reservations_made += 1; + self.reservations_cancelled += 1; + } + + /// Record a no-show. + pub fn record_no_show(&mut self) { + self.reservations_made += 1; + self.no_shows += 1; + self.last_no_show = Some(now()); + } +} + +/// Reputation store. +#[derive(Debug, Default)] +pub struct ReputationStore { + reputations: HashMap<[u8; 16], SenderReputation>, +} + +impl ReputationStore { + pub fn new() -> Self { + Self::default() + } + + /// Get or create reputation for a sender. + pub fn get_or_create(&mut self, address: [u8; 16]) -> &mut SenderReputation { + self.reputations + .entry(address) + .or_insert_with(|| SenderReputation::new(address)) + } + + /// Get reputation (read-only). + pub fn get(&self, address: &[u8; 16]) -> Option<&SenderReputation> { + self.reputations.get(address) + } + + /// Check if sender is blocked. + pub fn is_blocked(&self, address: &[u8; 16]) -> bool { + self.reputations + .get(address) + .map(|r| r.is_blocked()) + .unwrap_or(false) + } + + /// Get honor rate (0.5 for unknown). + pub fn honor_rate(&self, address: &[u8; 16]) -> f32 { + self.reputations + .get(address) + .map(|r| r.honor_rate()) + .unwrap_or(0.5) + } +} + +/// Blocklist entry. +#[derive(Debug, Clone)] +pub struct BlocklistEntry { + pub blocked_address: [u8; 16], + pub reason: BlockReason, + pub reported_by: [u8; 16], + pub signature: Vec, + pub timestamp: u64, +} + +/// Reason for blocking. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[repr(u8)] +pub enum BlockReason { + NoShow = 1, + Spam = 2, + Harassment = 3, + FakeIdentity = 4, +} + +/// Therapist-defined reservation policy. +#[derive(Debug, Clone)] +pub struct TherapistPolicy { + /// Max pending reservations from new senders. + pub max_pending_new: u8, + /// Max pending from established senders. + pub max_pending_established: u8, + /// Require this verification level for reservations. + pub min_verification_level: u8, + /// Auto-reject senders with honor rate below this. + pub min_honor_rate: f32, + /// Require proof-of-work. + pub require_pow: bool, + /// PoW difficulty (if required). + pub pow_difficulty: u8, +} + +impl Default for TherapistPolicy { + fn default() -> Self { + Self { + max_pending_new: 1, + max_pending_established: 3, + min_verification_level: 0, + min_honor_rate: 0.5, + require_pow: true, + pow_difficulty: ProofOfWork::DEFAULT_DIFFICULTY, + } + } +} + +impl TherapistPolicy { + /// Check if a reservation request meets policy. + pub fn check( + &self, + sender_reputation: &SenderReputation, + sender_verification_level: u8, + pow: Option<&ProofOfWork>, + reservation_id: &[u8; 16], + ) -> PolicyResult { + // Check verification level + if sender_verification_level < self.min_verification_level { + return PolicyResult::InsufficientVerification; + } + + // Check honor rate + if sender_reputation.honor_rate() < self.min_honor_rate { + return PolicyResult::LowReputation; + } + + // Check blocked + if sender_reputation.is_blocked() { + return PolicyResult::Blocked; + } + + // Check proof-of-work + if self.require_pow { + match pow { + Some(p) if p.difficulty >= self.pow_difficulty && p.verify(reservation_id) => {} + Some(_) => return PolicyResult::InvalidPoW, + None => return PolicyResult::MissingPoW, + } + } + + PolicyResult::Allowed + } +} + +/// Result of policy check. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum PolicyResult { + Allowed, + InsufficientVerification, + LowReputation, + Blocked, + MissingPoW, + InvalidPoW, +} + +impl PolicyResult { + pub fn is_allowed(&self) -> bool { + matches!(self, PolicyResult::Allowed) + } +} + +fn now() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn rate_limiter_allows_first_reservation() { + let mut limiter = RateLimiter::new(); + let sender = [1u8; 16]; + + assert!(limiter.check_reservation(&sender).is_allowed()); + } + + #[test] + fn rate_limiter_enforces_cooldown() { + let mut limiter = RateLimiter::with_limits(RateLimits { + reservation_cooldown_secs: 300, + ..Default::default() + }); + let sender = [2u8; 16]; + + limiter.record_reservation(&sender); + let result = limiter.check_reservation(&sender); + + assert!(matches!(result, RateLimitResult::Cooldown { .. })); + } + + #[test] + fn rate_limiter_enforces_hourly_limit() { + let mut limiter = RateLimiter::with_limits(RateLimits { + max_reservations_per_hour: 2, + reservation_cooldown_secs: 0, + ..Default::default() + }); + let sender = [3u8; 16]; + + limiter.record_reservation(&sender); + limiter.record_reservation(&sender); + + assert_eq!(limiter.check_reservation(&sender), RateLimitResult::HourlyLimitReached); + } + + #[test] + fn pow_generation_and_verification() { + let reservation_id = [42u8; 16]; + let pow = ProofOfWork::generate(&reservation_id, 8); // Low difficulty for test + + assert!(pow.verify(&reservation_id)); + assert!(!pow.verify(&[0u8; 16])); // Wrong ID + } + + #[test] + fn reputation_tracking() { + let mut rep = SenderReputation::new([5u8; 16]); + + rep.record_honored(); + rep.record_honored(); + rep.record_no_show(); + + assert_eq!(rep.reservations_made, 3); + assert_eq!(rep.honor_rate(), 2.0 / 3.0); + assert!(!rep.is_blocked()); + + rep.record_no_show(); + rep.record_no_show(); + + assert!(rep.is_blocked()); // 3 no-shows + } + + #[test] + fn policy_check_pow() { + let policy = TherapistPolicy { + require_pow: true, + pow_difficulty: 8, + ..Default::default() + }; + let rep = SenderReputation::new([6u8; 16]); + let reservation_id = [7u8; 16]; + + // No PoW + assert_eq!( + policy.check(&rep, 0, None, &reservation_id), + PolicyResult::MissingPoW + ); + + // Valid PoW + let pow = ProofOfWork::generate(&reservation_id, 8); + assert_eq!( + policy.check(&rep, 0, Some(&pow), &reservation_id), + PolicyResult::Allowed + ); + } + + #[test] + fn policy_check_verification_level() { + let policy = TherapistPolicy { + min_verification_level: 2, + require_pow: false, + ..Default::default() + }; + let rep = SenderReputation::new([8u8; 16]); + let reservation_id = [9u8; 16]; + + assert_eq!( + policy.check(&rep, 1, None, &reservation_id), + PolicyResult::InsufficientVerification + ); + + assert_eq!( + policy.check(&rep, 2, None, &reservation_id), + PolicyResult::Allowed + ); + } +} diff --git a/crates/meshservice/src/error.rs b/crates/meshservice/src/error.rs new file mode 100644 index 0000000..cf8d061 --- /dev/null +++ b/crates/meshservice/src/error.rs @@ -0,0 +1,55 @@ +//! Error types for the mesh service layer. + +use thiserror::Error; + +/// Errors that can occur in the service layer. +#[derive(Debug, Error)] +pub enum ServiceError { + #[error("invalid message format: {0}")] + InvalidFormat(String), + + #[error("unknown service ID: {0}")] + UnknownService(u32), + + #[error("signature verification failed")] + SignatureInvalid, + + #[error("message expired")] + Expired, + + #[error("max hops exceeded")] + MaxHopsExceeded, + + #[error("missing capability: {0}")] + MissingCapability(String), + + #[error("store full")] + StoreFull, + + #[error("duplicate message")] + Duplicate, + + #[error("serialization error: {0}")] + Serialization(String), + + #[error("crypto error: {0}")] + Crypto(String), + + #[error("verification required: minimum level {0}")] + VerificationRequired(u8), + + #[error("service handler error: {0}")] + Handler(String), +} + +impl From> for ServiceError { + fn from(e: ciborium::ser::Error) -> Self { + ServiceError::Serialization(e.to_string()) + } +} + +impl From> for ServiceError { + fn from(e: ciborium::de::Error) -> Self { + ServiceError::Serialization(e.to_string()) + } +} diff --git a/crates/meshservice/src/identity.rs b/crates/meshservice/src/identity.rs new file mode 100644 index 0000000..d4d400d --- /dev/null +++ b/crates/meshservice/src/identity.rs @@ -0,0 +1,119 @@ +//! Service identity management using Ed25519. + +use ed25519_dalek::{Signature, Signer, SigningKey, Verifier, VerifyingKey}; +use rand::rngs::OsRng; +use sha2::{Digest, Sha256}; + +/// A service participant's identity (Ed25519 keypair). +#[derive(Clone)] +pub struct ServiceIdentity { + signing_key: SigningKey, +} + +impl ServiceIdentity { + /// Generate a new random identity. + pub fn generate() -> Self { + use rand::RngCore; + let mut secret = [0u8; 32]; + OsRng.fill_bytes(&mut secret); + let signing_key = SigningKey::from_bytes(&secret); + Self { signing_key } + } + + /// Create from an existing secret key. + pub fn from_secret(secret: &[u8; 32]) -> Self { + let signing_key = SigningKey::from_bytes(secret); + Self { signing_key } + } + + /// Get the 32-byte public key. + pub fn public_key(&self) -> [u8; 32] { + self.signing_key.verifying_key().to_bytes() + } + + /// Get the 32-byte secret key (for persistence). + pub fn secret_key(&self) -> [u8; 32] { + self.signing_key.to_bytes() + } + + /// Compute the 16-byte mesh address from the public key. + pub fn address(&self) -> [u8; 16] { + compute_address(&self.public_key()) + } + + /// Sign a message. + pub fn sign(&self, message: &[u8]) -> [u8; 64] { + let sig = self.signing_key.sign(message); + sig.to_bytes() + } + + /// Verify a signature against a public key. + pub fn verify(public_key: &[u8; 32], message: &[u8], signature: &[u8; 64]) -> bool { + let Ok(verifying_key) = VerifyingKey::from_bytes(public_key) else { + return false; + }; + let sig = Signature::from_bytes(signature); + verifying_key.verify(message, &sig).is_ok() + } +} + +/// Compute a 16-byte mesh address from a 32-byte public key. +/// +/// Address = SHA-256(public_key)[0..16] +pub fn compute_address(public_key: &[u8; 32]) -> [u8; 16] { + let hash = Sha256::digest(public_key); + let mut addr = [0u8; 16]; + addr.copy_from_slice(&hash[..16]); + addr +} + +impl std::fmt::Debug for ServiceIdentity { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ServiceIdentity") + .field("address", &hex::encode(self.address())) + .finish() + } +} + +// Hex encoding for debug output +mod hex { + pub fn encode(bytes: impl AsRef<[u8]>) -> String { + bytes.as_ref().iter().map(|b| format!("{b:02x}")).collect() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn generate_and_sign() { + let id = ServiceIdentity::generate(); + let msg = b"hello world"; + let sig = id.sign(msg); + assert!(ServiceIdentity::verify(&id.public_key(), msg, &sig)); + } + + #[test] + fn address_is_deterministic() { + let id = ServiceIdentity::generate(); + let addr1 = id.address(); + let addr2 = compute_address(&id.public_key()); + assert_eq!(addr1, addr2); + } + + #[test] + fn wrong_message_fails() { + let id = ServiceIdentity::generate(); + let sig = id.sign(b"correct"); + assert!(!ServiceIdentity::verify(&id.public_key(), b"wrong", &sig)); + } + + #[test] + fn roundtrip_secret() { + let id = ServiceIdentity::generate(); + let secret = id.secret_key(); + let restored = ServiceIdentity::from_secret(&secret); + assert_eq!(id.public_key(), restored.public_key()); + } +} diff --git a/crates/meshservice/src/lib.rs b/crates/meshservice/src/lib.rs new file mode 100644 index 0000000..421552e --- /dev/null +++ b/crates/meshservice/src/lib.rs @@ -0,0 +1,88 @@ +//! # MeshService — Generic Decentralized Service Layer +//! +//! A protocol and runtime for building decentralized services on mesh networks. +//! Any service following the Announce → Query → Response → Reserve pattern +//! can be implemented on this layer. +//! +//! ## Architecture +//! +//! ```text +//! ┌─────────────────────────────────────────────────────────────┐ +//! │ Application Services │ +//! │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ +//! │ │ FAPP │ │ Housing │ │ Repair │ │ Custom │ ... │ +//! │ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ │ +//! │ └────────────┴────────────┴────────────┘ │ +//! │ Service Layer (this crate) │ +//! │ ServiceMessage, ServiceRouter, Verification │ +//! │ ─────────────────────────────────────────────────────── │ +//! │ Mesh Layer │ +//! │ (provided by quicprochat-p2p or other mesh impl) │ +//! └─────────────────────────────────────────────────────────────┘ +//! ``` +//! +//! ## Quick Start +//! +//! ```rust,ignore +//! use meshservice::{ServiceRouter, ServiceMessage, services::fapp::FappService}; +//! +//! // Create router +//! let mut router = ServiceRouter::new(identity, capabilities); +//! +//! // Register services +//! router.register(FappService::new()); +//! router.register(HousingService::new()); +//! +//! // Handle incoming message +//! let action = router.handle(&incoming_bytes); +//! ``` + +pub mod identity; +pub mod message; +pub mod router; +pub mod store; +pub mod verification; +pub mod services; +pub mod wire; +pub mod error; +pub mod anti_abuse; + +pub use identity::ServiceIdentity; +pub use message::{ServiceMessage, MessageType}; +pub use router::{ServiceRouter, ServiceHandler, ServiceAction}; +pub use store::ServiceStore; +pub use verification::{Verification, VerificationLevel}; +pub use error::ServiceError; +pub use anti_abuse::{RateLimiter, RateLimits, ProofOfWork, SenderReputation, TherapistPolicy}; + +/// Well-known service IDs. +pub mod service_ids { + /// Free Appointment Propagation Protocol (psychotherapy). + pub const FAPP: u32 = 0x0001; + /// Housing / room sharing. + pub const HOUSING: u32 = 0x0002; + /// Repair services / craftsmen. + pub const REPAIR: u32 = 0x0003; + /// Tutoring / education. + pub const TUTOR: u32 = 0x0004; + /// Medical appointments. + pub const MEDICAL: u32 = 0x0005; + /// Legal consultation. + pub const LEGAL: u32 = 0x0006; + /// Volunteer coordination. + pub const VOLUNTEER: u32 = 0x0007; + /// Events / tickets. + pub const EVENTS: u32 = 0x0008; + /// Reserved for user-defined services. + pub const CUSTOM_START: u32 = 0x8000; +} + +/// Capability flags for service participation. +pub mod capabilities { + /// Node can announce/provide services. + pub const PROVIDER: u16 = 0x0100; + /// Node caches and relays service messages. + pub const RELAY: u16 = 0x0200; + /// Node can query/consume services. + pub const CONSUMER: u16 = 0x0400; +} diff --git a/crates/meshservice/src/message.rs b/crates/meshservice/src/message.rs new file mode 100644 index 0000000..3758042 --- /dev/null +++ b/crates/meshservice/src/message.rs @@ -0,0 +1,321 @@ +//! Core message types for the service layer. + +use std::time::{SystemTime, UNIX_EPOCH}; + +use serde::{Deserialize, Serialize}; + +use crate::identity::ServiceIdentity; +use crate::verification::Verification; + +/// Message types within a service. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[repr(u8)] +pub enum MessageType { + /// Provider announces availability. + Announce = 0x01, + /// Consumer queries for matches. + Query = 0x02, + /// Response to a query. + Response = 0x03, + /// Consumer reserves a slot/item. + Reserve = 0x04, + /// Provider confirms/rejects reservation. + Confirm = 0x05, + /// Either party cancels. + Cancel = 0x06, + /// Provider updates an existing announce (partial). + Update = 0x07, + /// Provider revokes an announce. + Revoke = 0x08, +} + +impl TryFrom for MessageType { + type Error = (); + + fn try_from(value: u8) -> Result { + match value { + 0x01 => Ok(MessageType::Announce), + 0x02 => Ok(MessageType::Query), + 0x03 => Ok(MessageType::Response), + 0x04 => Ok(MessageType::Reserve), + 0x05 => Ok(MessageType::Confirm), + 0x06 => Ok(MessageType::Cancel), + 0x07 => Ok(MessageType::Update), + 0x08 => Ok(MessageType::Revoke), + _ => Err(()), + } + } +} + +/// A generic service message that can carry any application payload. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ServiceMessage { + /// Service identifier (which application). + pub service_id: u32, + /// Message type within service. + pub message_type: MessageType, + /// Protocol version for forward compatibility. + pub version: u8, + /// Unique message ID. + pub id: [u8; 16], + /// Sender's mesh address. + pub sender_address: [u8; 16], + /// Application-specific CBOR payload. + pub payload: Vec, + /// Ed25519 signature over signable fields. + pub signature: Vec, + /// Optional verifications from trusted parties. + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub verifications: Vec, + /// Monotonically increasing per sender (dedup/supersede). + pub sequence: u64, + /// Time-to-live in hours. + pub ttl_hours: u16, + /// Unix timestamp of creation. + pub timestamp: u64, + /// Current hop count (incremented on re-broadcast). + pub hop_count: u8, + /// Maximum propagation hops. + pub max_hops: u8, +} + +/// Default TTL: 7 days. +const DEFAULT_TTL_HOURS: u16 = 168; +/// Default max hops. +const DEFAULT_MAX_HOPS: u8 = 8; + +impl ServiceMessage { + /// Create a new service message. + pub fn new( + identity: &ServiceIdentity, + service_id: u32, + message_type: MessageType, + payload: Vec, + sequence: u64, + ) -> Self { + Self::with_options( + identity, + service_id, + message_type, + payload, + sequence, + DEFAULT_TTL_HOURS, + DEFAULT_MAX_HOPS, + ) + } + + /// Create with custom TTL and max hops. + pub fn with_options( + identity: &ServiceIdentity, + service_id: u32, + message_type: MessageType, + payload: Vec, + sequence: u64, + ttl_hours: u16, + max_hops: u8, + ) -> Self { + use sha2::{Digest, Sha256}; + + let sender_address = identity.address(); + + // Generate unique ID from address + sequence + let id_hash = Sha256::digest( + [&sender_address[..], &sequence.to_le_bytes()].concat() + ); + let mut id = [0u8; 16]; + id.copy_from_slice(&id_hash[..16]); + + let timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + + let mut msg = Self { + service_id, + message_type, + version: 1, + id, + sender_address, + payload, + signature: Vec::new(), + verifications: Vec::new(), + sequence, + ttl_hours, + timestamp, + hop_count: 0, + max_hops, + }; + + let signable = msg.signable_bytes(); + msg.signature = identity.sign(&signable).to_vec(); + msg + } + + /// Create an announce message. + pub fn announce( + identity: &ServiceIdentity, + service_id: u32, + payload: Vec, + sequence: u64, + ) -> Self { + Self::new(identity, service_id, MessageType::Announce, payload, sequence) + } + + /// Create a query message. + pub fn query( + identity: &ServiceIdentity, + service_id: u32, + payload: Vec, + ) -> Self { + // Queries use random sequence (not monotonic) + let sequence = rand::random(); + Self::with_options( + identity, + service_id, + MessageType::Query, + payload, + sequence, + 1, // 1 hour TTL for queries + DEFAULT_MAX_HOPS, + ) + } + + /// Create a response message. + pub fn response( + identity: &ServiceIdentity, + service_id: u32, + query_id: [u8; 16], + payload: Vec, + ) -> Self { + let mut msg = Self::new( + identity, + service_id, + MessageType::Response, + payload, + rand::random(), + ); + // Response ID matches query ID for correlation + msg.id = query_id; + msg + } + + /// Assemble bytes for signing/verification. + /// Excludes signature, hop_count, verifications (mutable fields). + fn signable_bytes(&self) -> Vec { + let mut buf = Vec::with_capacity(256); + buf.extend_from_slice(&self.service_id.to_le_bytes()); + buf.push(self.message_type as u8); + buf.push(self.version); + buf.extend_from_slice(&self.id); + buf.extend_from_slice(&self.sender_address); + buf.extend_from_slice(&(self.payload.len() as u32).to_le_bytes()); + buf.extend_from_slice(&self.payload); + buf.extend_from_slice(&self.sequence.to_le_bytes()); + buf.extend_from_slice(&self.ttl_hours.to_le_bytes()); + buf.extend_from_slice(&self.timestamp.to_le_bytes()); + buf.push(self.max_hops); + buf + } + + /// Verify the signature using the sender's public key. + pub fn verify(&self, sender_public_key: &[u8; 32]) -> bool { + use crate::identity::compute_address; + + // Verify address matches key + if compute_address(sender_public_key) != self.sender_address { + return false; + } + + let sig: [u8; 64] = match self.signature.as_slice().try_into() { + Ok(s) => s, + Err(_) => return false, + }; + + let signable = self.signable_bytes(); + ServiceIdentity::verify(sender_public_key, &signable, &sig) + } + + /// Check if the message has expired. + pub fn is_expired(&self) -> bool { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + let ttl_secs = u64::from(self.ttl_hours) * 3600; + now.saturating_sub(self.timestamp) > ttl_secs + } + + /// Check if the message can still propagate. + pub fn can_propagate(&self) -> bool { + self.hop_count < self.max_hops && !self.is_expired() + } + + /// Create a forwarded copy with incremented hop count. + pub fn forwarded(&self) -> Self { + let mut copy = self.clone(); + copy.hop_count = copy.hop_count.saturating_add(1); + copy + } + + /// Get the highest verification level attached. + pub fn verification_level(&self) -> u8 { + self.verifications + .iter() + .map(|v| v.level) + .max() + .unwrap_or(0) + } + + /// Add a verification to the message. + pub fn add_verification(&mut self, verification: Verification) { + self.verifications.push(verification); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn create_and_verify() { + let id = ServiceIdentity::generate(); + let msg = ServiceMessage::announce( + &id, + crate::service_ids::FAPP, + b"test payload".to_vec(), + 1, + ); + + assert!(msg.verify(&id.public_key())); + assert!(!msg.is_expired()); + assert!(msg.can_propagate()); + assert_eq!(msg.hop_count, 0); + } + + #[test] + fn forwarded_increments_hop() { + let id = ServiceIdentity::generate(); + let msg = ServiceMessage::announce(&id, 1, vec![], 1); + let fwd = msg.forwarded(); + + assert_eq!(fwd.hop_count, 1); + assert!(fwd.verify(&id.public_key())); // Still valid + } + + #[test] + fn tampered_fails_verify() { + let id = ServiceIdentity::generate(); + let mut msg = ServiceMessage::announce(&id, 1, b"original".to_vec(), 1); + msg.payload = b"tampered".to_vec(); + + assert!(!msg.verify(&id.public_key())); + } + + #[test] + fn query_has_short_ttl() { + let id = ServiceIdentity::generate(); + let msg = ServiceMessage::query(&id, 1, vec![]); + + assert_eq!(msg.ttl_hours, 1); + } +} diff --git a/crates/meshservice/src/router.rs b/crates/meshservice/src/router.rs new file mode 100644 index 0000000..293a581 --- /dev/null +++ b/crates/meshservice/src/router.rs @@ -0,0 +1,289 @@ +//! Service router dispatches messages to service-specific handlers. + +use std::collections::HashMap; + +use crate::error::ServiceError; +use crate::message::{MessageType, ServiceMessage}; +use crate::store::{ServiceStore, StoredMessage}; +use crate::verification::TrustedVerifiers; + +/// Action returned by a service handler. +#[derive(Debug)] +pub enum ServiceAction { + /// Message handled, do nothing more. + Handled, + /// Store the message locally. + Store, + /// Store and forward to peers. + StoreAndForward, + /// Forward without storing (pass-through relay). + ForwardOnly, + /// Drop the message silently. + Drop, + /// Send a response back. + Respond(ServiceMessage), + /// Reject with error. + Reject(ServiceError), +} + +/// Trait for service-specific handlers. +pub trait ServiceHandler: Send + Sync { + /// The service ID this handler manages. + fn service_id(&self) -> u32; + + /// Human-readable service name. + fn name(&self) -> &str; + + /// Handle an incoming message. + fn handle( + &self, + message: &ServiceMessage, + context: &HandlerContext, + ) -> Result; + + /// Validate a message payload (service-specific logic). + fn validate(&self, message: &ServiceMessage) -> Result<(), ServiceError> { + // Default: accept all + let _ = message; + Ok(()) + } + + /// Check if a message matches a query. + fn matches_query(&self, announce: &StoredMessage, query: &ServiceMessage) -> bool; +} + +/// Context passed to handlers. +pub struct HandlerContext<'a> { + /// Current node's capabilities. + pub capabilities: u16, + /// The store (for lookups during handle). + pub store: &'a ServiceStore, + /// Trusted verifiers for checking. + pub trusted_verifiers: &'a TrustedVerifiers, + /// Sender's public key (if known). + pub sender_public_key: Option<[u8; 32]>, +} + +/// Routes messages to appropriate service handlers. +pub struct ServiceRouter { + /// Service ID -> Handler. + handlers: HashMap>, + /// Shared message store. + store: ServiceStore, + /// Node capabilities. + capabilities: u16, + /// Trusted verifiers. + trusted_verifiers: TrustedVerifiers, + /// Minimum verification level to accept announces (0 = any). + min_verification_level: u8, +} + +impl ServiceRouter { + /// Create a new router. + pub fn new(capabilities: u16) -> Self { + Self { + handlers: HashMap::new(), + store: ServiceStore::new(), + capabilities, + trusted_verifiers: TrustedVerifiers::new(), + min_verification_level: 0, + } + } + + /// Register a service handler. + pub fn register(&mut self, handler: Box) { + let id = handler.service_id(); + self.handlers.insert(id, handler); + } + + /// Set trusted verifiers. + pub fn set_trusted_verifiers(&mut self, verifiers: TrustedVerifiers) { + self.trusted_verifiers = verifiers; + } + + /// Set minimum verification level for announces. + pub fn set_min_verification_level(&mut self, level: u8) { + self.min_verification_level = level; + } + + /// Access the store. + pub fn store(&self) -> &ServiceStore { + &self.store + } + + /// Mutable access to store. + pub fn store_mut(&mut self) -> &mut ServiceStore { + &mut self.store + } + + /// Check if a service is registered. + pub fn has_service(&self, service_id: u32) -> bool { + self.handlers.contains_key(&service_id) + } + + /// Handle an incoming message. + pub fn handle( + &mut self, + message: ServiceMessage, + sender_public_key: Option<[u8; 32]>, + ) -> Result { + // Basic validation + if message.is_expired() { + return Err(ServiceError::Expired); + } + + if message.hop_count > message.max_hops { + return Err(ServiceError::MaxHopsExceeded); + } + + // Get handler + let handler = self + .handlers + .get(&message.service_id) + .ok_or(ServiceError::UnknownService(message.service_id))?; + + // Validate message with handler + handler.validate(&message)?; + + // Verify signature if we have public key + if let Some(pk) = &sender_public_key { + if !message.verify(pk) { + return Err(ServiceError::SignatureInvalid); + } + } + + // Check verification level for announces + if message.message_type == MessageType::Announce && self.min_verification_level > 0 { + let level = self + .trusted_verifiers + .highest_level(&message.verifications, &message.sender_address); + if (level as u8) < self.min_verification_level { + return Err(ServiceError::VerificationRequired(self.min_verification_level)); + } + } + + // Build context + let context = HandlerContext { + capabilities: self.capabilities, + store: &self.store, + trusted_verifiers: &self.trusted_verifiers, + sender_public_key, + }; + + // Dispatch to handler + let action = handler.handle(&message, &context)?; + + // Process action + match &action { + ServiceAction::Store | ServiceAction::StoreAndForward => { + if let Some(pk) = sender_public_key { + self.store.store(message, pk); + } + } + _ => {} + } + + Ok(action) + } + + /// Query the store for matching announces. + pub fn query(&self, query: &ServiceMessage) -> Vec<&StoredMessage> { + let Some(handler) = self.handlers.get(&query.service_id) else { + return Vec::new(); + }; + + self.store.query(query.service_id, |stored| { + stored.message.message_type == MessageType::Announce + && handler.matches_query(stored, query) + }) + } + + /// Get handler name for a service. + pub fn service_name(&self, service_id: u32) -> Option<&str> { + self.handlers.get(&service_id).map(|h| h.name()) + } + + /// List registered services. + pub fn services(&self) -> Vec<(u32, &str)> { + self.handlers + .iter() + .map(|(&id, h)| (id, h.name())) + .collect() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{identity::ServiceIdentity, service_ids::FAPP}; + + struct TestHandler; + + impl ServiceHandler for TestHandler { + fn service_id(&self) -> u32 { + FAPP + } + + fn name(&self) -> &str { + "Test" + } + + fn handle( + &self, + message: &ServiceMessage, + _context: &HandlerContext, + ) -> Result { + match message.message_type { + MessageType::Announce => Ok(ServiceAction::StoreAndForward), + MessageType::Query => Ok(ServiceAction::Handled), + _ => Ok(ServiceAction::Drop), + } + } + + fn matches_query(&self, _announce: &StoredMessage, _query: &ServiceMessage) -> bool { + true // Match all for test + } + } + + #[test] + fn register_and_handle() { + let mut router = ServiceRouter::new(crate::capabilities::RELAY); + router.register(Box::new(TestHandler)); + + assert!(router.has_service(FAPP)); + assert_eq!(router.service_name(FAPP), Some("Test")); + + let id = ServiceIdentity::generate(); + let msg = ServiceMessage::announce(&id, FAPP, vec![], 1); + + let action = router.handle(msg.clone(), Some(id.public_key())).unwrap(); + assert!(matches!(action, ServiceAction::StoreAndForward)); + + // Message should be stored + assert_eq!(router.store().len(), 1); + } + + #[test] + fn unknown_service_rejected() { + let mut router = ServiceRouter::new(0); + let id = ServiceIdentity::generate(); + let msg = ServiceMessage::announce(&id, 9999, vec![], 1); + + let result = router.handle(msg, Some(id.public_key())); + assert!(matches!(result, Err(ServiceError::UnknownService(9999)))); + } + + #[test] + fn invalid_signature_rejected() { + let mut router = ServiceRouter::new(0); + router.register(Box::new(TestHandler)); + + let id1 = ServiceIdentity::generate(); + let id2 = ServiceIdentity::generate(); + let msg = ServiceMessage::announce(&id1, FAPP, vec![], 1); + + // Pass wrong public key + let result = router.handle(msg, Some(id2.public_key())); + assert!(matches!(result, Err(ServiceError::SignatureInvalid))); + } +} diff --git a/crates/meshservice/src/services/fapp.rs b/crates/meshservice/src/services/fapp.rs new file mode 100644 index 0000000..762dbf4 --- /dev/null +++ b/crates/meshservice/src/services/fapp.rs @@ -0,0 +1,479 @@ +//! FAPP — Free Appointment Propagation Protocol. +//! +//! Decentralized psychotherapy appointment discovery. +//! +//! ## Flow +//! +//! 1. Therapist announces available slots (specialism, location, modality). +//! 2. Announcement floods through mesh (TTL-limited, signature-verified). +//! 3. Patient queries for matching slots (specialism, distance). +//! 4. Relays respond with cached matches. +//! 5. Patient reserves slot (E2E encrypted to therapist). +//! 6. Therapist confirms/rejects. + +use serde::{Deserialize, Serialize}; + +use crate::error::ServiceError; +use crate::message::{MessageType, ServiceMessage}; +use crate::router::{HandlerContext, ServiceAction, ServiceHandler}; +use crate::service_ids::FAPP; +use crate::store::StoredMessage; +use crate::wire::{decode_payload, encode_payload}; + +/// Therapy specialisms. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[repr(u8)] +pub enum Specialism { + GeneralPsychotherapy = 0x01, + CognitiveBehavioral = 0x02, + Psychoanalysis = 0x03, + SystemicTherapy = 0x04, + TraumaFocused = 0x05, + ChildAndAdolescent = 0x06, + CoupleAndFamily = 0x07, + Addiction = 0x08, + Neuropsychology = 0x09, +} + +impl TryFrom for Specialism { + type Error = (); + + fn try_from(value: u8) -> Result { + match value { + 0x01 => Ok(Self::GeneralPsychotherapy), + 0x02 => Ok(Self::CognitiveBehavioral), + 0x03 => Ok(Self::Psychoanalysis), + 0x04 => Ok(Self::SystemicTherapy), + 0x05 => Ok(Self::TraumaFocused), + 0x06 => Ok(Self::ChildAndAdolescent), + 0x07 => Ok(Self::CoupleAndFamily), + 0x08 => Ok(Self::Addiction), + 0x09 => Ok(Self::Neuropsychology), + _ => Err(()), + } + } +} + +/// Therapy modality. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[repr(u8)] +pub enum Modality { + InPerson = 0x01, + VideoCall = 0x02, + PhoneCall = 0x03, + TextBased = 0x04, +} + +/// Slot announcement payload. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SlotAnnounce { + /// Therapist's specialisms (bitfield). + pub specialisms: u16, + /// Modality (bitfield). + pub modality: u8, + /// Postal code (first 3 digits for privacy). + pub postal_prefix: String, + /// Geohash (6 chars, ~1.2km precision). + #[serde(default, skip_serializing_if = "Option::is_none")] + pub geohash: Option, + /// Available slots count. + pub available_slots: u8, + /// Earliest available date (days from epoch). + pub earliest_days: u16, + /// Insurance types accepted (bitfield). + pub insurance: u8, + /// Optional profile URL for verification. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub profile_url: Option, + /// Optional display name. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub display_name: Option, +} + +impl SlotAnnounce { + /// Create a new announcement. + pub fn new(specialisms: &[Specialism], modality: Modality, postal_prefix: &str) -> Self { + let spec_bits = specialisms.iter().fold(0u16, |acc, s| acc | (1 << (*s as u8))); + + Self { + specialisms: spec_bits, + modality: modality as u8, + postal_prefix: postal_prefix.into(), + geohash: None, + available_slots: 1, + earliest_days: 0, + insurance: 0xFF, // All accepted by default + profile_url: None, + display_name: None, + } + } + + /// Set geohash location. + pub fn with_geohash(mut self, geohash: &str) -> Self { + self.geohash = Some(geohash[..6.min(geohash.len())].into()); + self + } + + /// Set available slots count. + pub fn with_slots(mut self, count: u8) -> Self { + self.available_slots = count; + self + } + + /// Set earliest availability. + pub fn with_earliest(mut self, days_from_now: u16) -> Self { + self.earliest_days = days_from_now; + self + } + + /// Set profile URL. + pub fn with_profile(mut self, url: &str) -> Self { + self.profile_url = Some(url.into()); + self + } + + /// Set display name. + pub fn with_name(mut self, name: &str) -> Self { + self.display_name = Some(name.into()); + self + } + + /// Check if a specialism is offered. + pub fn has_specialism(&self, spec: Specialism) -> bool { + self.specialisms & (1 << (spec as u8)) != 0 + } + + /// Encode to CBOR bytes. + pub fn to_bytes(&self) -> Result, ServiceError> { + encode_payload(self) + } + + /// Decode from CBOR bytes. + pub fn from_bytes(data: &[u8]) -> Result { + decode_payload(data) + } +} + +/// Insurance types. +pub mod insurance { + pub const PRIVATE: u8 = 0x01; + pub const PUBLIC: u8 = 0x02; + pub const SELF_PAY: u8 = 0x04; +} + +/// Slot query payload. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SlotQuery { + /// Desired specialisms (bitfield, any match). + pub specialisms: u16, + /// Postal prefix to search. + pub postal_prefix: String, + /// Max distance in km (optional). + #[serde(default, skip_serializing_if = "Option::is_none")] + pub max_distance_km: Option, + /// Required modality (0 = any). + pub modality: u8, + /// Max wait in days. + pub max_wait_days: u16, + /// Insurance type required. + pub insurance: u8, +} + +impl SlotQuery { + /// Create a query for a specialism in a postal area. + pub fn new(specialism: Specialism, postal_prefix: &str) -> Self { + Self { + specialisms: 1 << (specialism as u8), + postal_prefix: postal_prefix.into(), + max_distance_km: None, + modality: 0, + max_wait_days: 365, + insurance: 0xFF, + } + } + + /// Require specific modality. + pub fn with_modality(mut self, modality: Modality) -> Self { + self.modality = modality as u8; + self + } + + /// Set max wait time. + pub fn with_max_wait(mut self, days: u16) -> Self { + self.max_wait_days = days; + self + } + + /// Check if an announce matches this query. + pub fn matches(&self, announce: &SlotAnnounce) -> bool { + // Specialism overlap + if announce.specialisms & self.specialisms == 0 { + return false; + } + + // Postal prefix + if !announce.postal_prefix.starts_with(&self.postal_prefix) + && !self.postal_prefix.starts_with(&announce.postal_prefix) + { + return false; + } + + // Modality + if self.modality != 0 && announce.modality & self.modality == 0 { + return false; + } + + // Wait time + if announce.earliest_days > self.max_wait_days { + return false; + } + + // Insurance + if announce.insurance & self.insurance == 0 { + return false; + } + + // Available slots + announce.available_slots > 0 + } + + /// Encode to CBOR bytes. + pub fn to_bytes(&self) -> Result, ServiceError> { + encode_payload(self) + } + + /// Decode from CBOR bytes. + pub fn from_bytes(data: &[u8]) -> Result { + decode_payload(data) + } +} + +/// FAPP service handler. +pub struct FappService { + /// Whether this node is a therapist (can announce). + pub is_provider: bool, + /// Whether this node relays FAPP messages. + pub is_relay: bool, +} + +impl FappService { + /// Create a new FAPP handler. + pub fn new(is_provider: bool, is_relay: bool) -> Self { + Self { + is_provider, + is_relay, + } + } + + /// Create a relay-only handler. + pub fn relay() -> Self { + Self::new(false, true) + } + + /// Create a provider handler. + pub fn provider() -> Self { + Self::new(true, true) + } +} + +impl ServiceHandler for FappService { + fn service_id(&self) -> u32 { + FAPP + } + + fn name(&self) -> &str { + "FAPP" + } + + fn handle( + &self, + message: &ServiceMessage, + context: &HandlerContext, + ) -> Result { + match message.message_type { + MessageType::Announce => { + // Validate payload + let _announce = SlotAnnounce::from_bytes(&message.payload)?; + + // Store and forward if we're a relay + if self.is_relay { + Ok(ServiceAction::StoreAndForward) + } else { + Ok(ServiceAction::Store) + } + } + + MessageType::Query => { + // Parse query + let query = SlotQuery::from_bytes(&message.payload)?; + + // Find matches in store + let matches: Vec<_> = context + .store + .by_service(FAPP) + .into_iter() + .filter(|stored| { + if stored.message.message_type != MessageType::Announce { + return false; + } + if let Ok(announce) = SlotAnnounce::from_bytes(&stored.message.payload) { + query.matches(&announce) + } else { + false + } + }) + .collect(); + + // If we have matches, we could respond (simplified for now) + if !matches.is_empty() { + // In a real impl, we'd aggregate and send response + Ok(ServiceAction::Handled) + } else if self.is_relay { + Ok(ServiceAction::ForwardOnly) + } else { + Ok(ServiceAction::Handled) + } + } + + MessageType::Reserve | MessageType::Confirm | MessageType::Cancel => { + // E2E encrypted, just forward + if self.is_relay { + Ok(ServiceAction::ForwardOnly) + } else { + Ok(ServiceAction::Handled) + } + } + + MessageType::Revoke => { + // Remove from store + Ok(ServiceAction::Handled) + } + + _ => Ok(ServiceAction::Drop), + } + } + + fn validate(&self, message: &ServiceMessage) -> Result<(), ServiceError> { + match message.message_type { + MessageType::Announce => { + SlotAnnounce::from_bytes(&message.payload)?; + } + MessageType::Query => { + SlotQuery::from_bytes(&message.payload)?; + } + _ => {} + } + Ok(()) + } + + fn matches_query(&self, announce: &StoredMessage, query_msg: &ServiceMessage) -> bool { + let Ok(announce_data) = SlotAnnounce::from_bytes(&announce.message.payload) else { + return false; + }; + let Ok(query) = SlotQuery::from_bytes(&query_msg.payload) else { + return false; + }; + query.matches(&announce_data) + } +} + +/// Helper to create a FAPP announce message. +pub fn create_announce( + identity: &crate::ServiceIdentity, + announce: &SlotAnnounce, + sequence: u64, +) -> Result { + let payload = announce.to_bytes()?; + Ok(ServiceMessage::announce(identity, FAPP, payload, sequence)) +} + +/// Helper to create a FAPP query message. +pub fn create_query( + identity: &crate::ServiceIdentity, + query: &SlotQuery, +) -> Result { + let payload = query.to_bytes()?; + Ok(ServiceMessage::query(identity, FAPP, payload)) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::identity::ServiceIdentity; + + #[test] + fn slot_announce_roundtrip() { + let announce = SlotAnnounce::new( + &[Specialism::CognitiveBehavioral, Specialism::TraumaFocused], + Modality::VideoCall, + "104", + ) + .with_slots(3) + .with_profile("https://therapists.de/dr-mueller"); + + let bytes = announce.to_bytes().unwrap(); + let decoded = SlotAnnounce::from_bytes(&bytes).unwrap(); + + assert!(decoded.has_specialism(Specialism::CognitiveBehavioral)); + assert!(decoded.has_specialism(Specialism::TraumaFocused)); + assert!(!decoded.has_specialism(Specialism::Addiction)); + assert_eq!(decoded.available_slots, 3); + assert_eq!( + decoded.profile_url, + Some("https://therapists.de/dr-mueller".into()) + ); + } + + #[test] + fn query_matches_announce() { + let announce = SlotAnnounce::new( + &[Specialism::CognitiveBehavioral], + Modality::InPerson, + "104", + ) + .with_slots(2); + + let matching_query = SlotQuery::new(Specialism::CognitiveBehavioral, "104"); + assert!(matching_query.matches(&announce)); + + let wrong_spec = SlotQuery::new(Specialism::Addiction, "104"); + assert!(!wrong_spec.matches(&announce)); + + let wrong_location = SlotQuery::new(Specialism::CognitiveBehavioral, "200"); + assert!(!wrong_location.matches(&announce)); + } + + #[test] + fn create_message_helpers() { + let id = ServiceIdentity::generate(); + + let announce = SlotAnnounce::new(&[Specialism::GeneralPsychotherapy], Modality::VideoCall, "10"); + let msg = create_announce(&id, &announce, 1).unwrap(); + assert_eq!(msg.service_id, FAPP); + assert_eq!(msg.message_type, MessageType::Announce); + + let query = SlotQuery::new(Specialism::GeneralPsychotherapy, "10"); + let msg = create_query(&id, &query).unwrap(); + assert_eq!(msg.service_id, FAPP); + assert_eq!(msg.message_type, MessageType::Query); + } + + #[test] + fn fapp_handler_processes_announce() { + use crate::router::ServiceRouter; + use crate::capabilities; + + let mut router = ServiceRouter::new(capabilities::RELAY); + router.register(Box::new(FappService::relay())); + + let id = ServiceIdentity::generate(); + let announce = SlotAnnounce::new(&[Specialism::TraumaFocused], Modality::InPerson, "100"); + let msg = create_announce(&id, &announce, 1).unwrap(); + + let action = router.handle(msg.clone(), Some(id.public_key())).unwrap(); + assert!(matches!(action, ServiceAction::StoreAndForward)); + + // Should be stored + assert_eq!(router.store().service_count(FAPP), 1); + } +} diff --git a/crates/meshservice/src/services/housing.rs b/crates/meshservice/src/services/housing.rs new file mode 100644 index 0000000..f0ee47a --- /dev/null +++ b/crates/meshservice/src/services/housing.rs @@ -0,0 +1,489 @@ +//! Housing Service — Decentralized room/apartment sharing. +//! +//! Demonstrates how a second service can be built on the mesh layer. +//! +//! ## Flow +//! +//! 1. Landlord announces available room (type, size, price, location). +//! 2. Announcement floods through mesh. +//! 3. Seeker queries for matching listings. +//! 4. Relays respond with cached matches. +//! 5. Seeker reserves viewing slot (E2E encrypted). +//! 6. Landlord confirms/rejects. + +use serde::{Deserialize, Serialize}; + +use crate::error::ServiceError; +use crate::message::{MessageType, ServiceMessage}; +use crate::router::{HandlerContext, ServiceAction, ServiceHandler}; +use crate::service_ids::HOUSING; +use crate::store::StoredMessage; +use crate::wire::{decode_payload, encode_payload}; + +/// Listing type. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[repr(u8)] +pub enum ListingType { + Room = 0x01, + SharedFlat = 0x02, + Apartment = 0x03, + House = 0x04, + Studio = 0x05, + Sublet = 0x06, +} + +impl TryFrom for ListingType { + type Error = (); + + fn try_from(value: u8) -> Result { + match value { + 0x01 => Ok(Self::Room), + 0x02 => Ok(Self::SharedFlat), + 0x03 => Ok(Self::Apartment), + 0x04 => Ok(Self::House), + 0x05 => Ok(Self::Studio), + 0x06 => Ok(Self::Sublet), + _ => Err(()), + } + } +} + +/// Amenities bitfield. +pub mod amenities { + pub const FURNISHED: u16 = 0x0001; + pub const BALCONY: u16 = 0x0002; + pub const PARKING: u16 = 0x0004; + pub const PETS_ALLOWED: u16 = 0x0008; + pub const WASHING_MACHINE: u16 = 0x0010; + pub const DISHWASHER: u16 = 0x0020; + pub const ELEVATOR: u16 = 0x0040; + pub const GARDEN: u16 = 0x0080; + pub const INTERNET: u16 = 0x0100; + pub const HEATING_INCLUDED: u16 = 0x0200; +} + +/// Room/listing announcement. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ListingAnnounce { + /// Type of listing. + pub listing_type: u8, + /// Size in square meters. + pub size_sqm: u16, + /// Monthly rent in cents (EUR). + pub rent_cents: u32, + /// Postal prefix (3 digits). + pub postal_prefix: String, + /// Geohash for location (6 chars). + #[serde(default, skip_serializing_if = "Option::is_none")] + pub geohash: Option, + /// Number of rooms (0 for studio). + pub rooms: u8, + /// Available from (days from epoch). + pub available_from_days: u16, + /// Minimum rental period in months (0 = unlimited). + pub min_months: u8, + /// Maximum rental period in months (0 = unlimited). + pub max_months: u8, + /// Amenities bitfield. + pub amenities: u16, + /// Optional title. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub title: Option, + /// Optional external listing URL. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub listing_url: Option, +} + +impl ListingAnnounce { + /// Create a new listing. + pub fn new(listing_type: ListingType, size_sqm: u16, rent_euros: u32, postal_prefix: &str) -> Self { + Self { + listing_type: listing_type as u8, + size_sqm, + rent_cents: rent_euros * 100, + postal_prefix: postal_prefix.into(), + geohash: None, + rooms: 1, + available_from_days: 0, + min_months: 0, + max_months: 0, + amenities: 0, + title: None, + listing_url: None, + } + } + + /// Set rooms count. + pub fn with_rooms(mut self, rooms: u8) -> Self { + self.rooms = rooms; + self + } + + /// Set geohash. + pub fn with_geohash(mut self, geohash: &str) -> Self { + self.geohash = Some(geohash[..6.min(geohash.len())].into()); + self + } + + /// Set amenities. + pub fn with_amenities(mut self, amenities: u16) -> Self { + self.amenities = amenities; + self + } + + /// Set title. + pub fn with_title(mut self, title: &str) -> Self { + self.title = Some(title.into()); + self + } + + /// Set minimum/maximum rental period. + pub fn with_term(mut self, min_months: u8, max_months: u8) -> Self { + self.min_months = min_months; + self.max_months = max_months; + self + } + + /// Check if has amenity. + pub fn has_amenity(&self, amenity: u16) -> bool { + self.amenities & amenity != 0 + } + + /// Get rent in euros. + pub fn rent_euros(&self) -> u32 { + self.rent_cents / 100 + } + + /// Encode to CBOR. + pub fn to_bytes(&self) -> Result, ServiceError> { + encode_payload(self) + } + + /// Decode from CBOR. + pub fn from_bytes(data: &[u8]) -> Result { + decode_payload(data) + } +} + +/// Housing query. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ListingQuery { + /// Desired listing types (bitfield). + pub listing_types: u8, + /// Postal prefix. + pub postal_prefix: String, + /// Min size in sqm. + pub min_size_sqm: u16, + /// Max rent in cents. + pub max_rent_cents: u32, + /// Min rooms. + pub min_rooms: u8, + /// Required amenities (all must match). + pub required_amenities: u16, + /// Max move-in days. + pub max_move_in_days: u16, +} + +impl ListingQuery { + /// Create a simple query. + pub fn new(postal_prefix: &str, max_rent_euros: u32) -> Self { + Self { + listing_types: 0xFF, // Any type + postal_prefix: postal_prefix.into(), + min_size_sqm: 0, + max_rent_cents: max_rent_euros * 100, + min_rooms: 0, + required_amenities: 0, + max_move_in_days: 365, + } + } + + /// Filter by type. + pub fn with_type(mut self, listing_type: ListingType) -> Self { + self.listing_types = 1 << (listing_type as u8); + self + } + + /// Require minimum size. + pub fn with_min_size(mut self, sqm: u16) -> Self { + self.min_size_sqm = sqm; + self + } + + /// Require minimum rooms. + pub fn with_min_rooms(mut self, rooms: u8) -> Self { + self.min_rooms = rooms; + self + } + + /// Require amenities. + pub fn with_amenities(mut self, amenities: u16) -> Self { + self.required_amenities = amenities; + self + } + + /// Check if listing matches. + pub fn matches(&self, listing: &ListingAnnounce) -> bool { + // Type match + if self.listing_types != 0xFF && (self.listing_types & (1 << listing.listing_type) == 0) { + return false; + } + + // Location + if !listing.postal_prefix.starts_with(&self.postal_prefix) + && !self.postal_prefix.starts_with(&listing.postal_prefix) + { + return false; + } + + // Size + if listing.size_sqm < self.min_size_sqm { + return false; + } + + // Rent + if listing.rent_cents > self.max_rent_cents { + return false; + } + + // Rooms + if listing.rooms < self.min_rooms { + return false; + } + + // Amenities (all required must be present) + if listing.amenities & self.required_amenities != self.required_amenities { + return false; + } + + // Availability + listing.available_from_days <= self.max_move_in_days + } + + /// Encode to CBOR. + pub fn to_bytes(&self) -> Result, ServiceError> { + encode_payload(self) + } + + /// Decode from CBOR. + pub fn from_bytes(data: &[u8]) -> Result { + decode_payload(data) + } +} + +/// Housing service handler. +pub struct HousingService { + pub is_provider: bool, + pub is_relay: bool, +} + +impl HousingService { + /// Create a new handler. + pub fn new(is_provider: bool, is_relay: bool) -> Self { + Self { + is_provider, + is_relay, + } + } + + /// Create a relay-only handler. + pub fn relay() -> Self { + Self::new(false, true) + } + + /// Create a provider handler. + pub fn provider() -> Self { + Self::new(true, true) + } +} + +impl ServiceHandler for HousingService { + fn service_id(&self) -> u32 { + HOUSING + } + + fn name(&self) -> &str { + "Housing" + } + + fn handle( + &self, + message: &ServiceMessage, + context: &HandlerContext, + ) -> Result { + match message.message_type { + MessageType::Announce => { + let _listing = ListingAnnounce::from_bytes(&message.payload)?; + + if self.is_relay { + Ok(ServiceAction::StoreAndForward) + } else { + Ok(ServiceAction::Store) + } + } + + MessageType::Query => { + let query = ListingQuery::from_bytes(&message.payload)?; + + let _matches: Vec<_> = context + .store + .by_service(HOUSING) + .into_iter() + .filter(|stored| { + if stored.message.message_type != MessageType::Announce { + return false; + } + if let Ok(listing) = ListingAnnounce::from_bytes(&stored.message.payload) { + query.matches(&listing) + } else { + false + } + }) + .collect(); + + if self.is_relay { + Ok(ServiceAction::ForwardOnly) + } else { + Ok(ServiceAction::Handled) + } + } + + MessageType::Reserve | MessageType::Confirm | MessageType::Cancel => { + if self.is_relay { + Ok(ServiceAction::ForwardOnly) + } else { + Ok(ServiceAction::Handled) + } + } + + MessageType::Revoke => Ok(ServiceAction::Handled), + + _ => Ok(ServiceAction::Drop), + } + } + + fn validate(&self, message: &ServiceMessage) -> Result<(), ServiceError> { + match message.message_type { + MessageType::Announce => { + ListingAnnounce::from_bytes(&message.payload)?; + } + MessageType::Query => { + ListingQuery::from_bytes(&message.payload)?; + } + _ => {} + } + Ok(()) + } + + fn matches_query(&self, listing: &StoredMessage, query_msg: &ServiceMessage) -> bool { + let Ok(listing_data) = ListingAnnounce::from_bytes(&listing.message.payload) else { + return false; + }; + let Ok(query) = ListingQuery::from_bytes(&query_msg.payload) else { + return false; + }; + query.matches(&listing_data) + } +} + +/// Helper to create a housing announce. +pub fn create_announce( + identity: &crate::ServiceIdentity, + listing: &ListingAnnounce, + sequence: u64, +) -> Result { + let payload = listing.to_bytes()?; + Ok(ServiceMessage::announce(identity, HOUSING, payload, sequence)) +} + +/// Helper to create a housing query. +pub fn create_query( + identity: &crate::ServiceIdentity, + query: &ListingQuery, +) -> Result { + let payload = query.to_bytes()?; + Ok(ServiceMessage::query(identity, HOUSING, payload)) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::identity::ServiceIdentity; + + #[test] + fn listing_roundtrip() { + let listing = ListingAnnounce::new(ListingType::Apartment, 65, 850, "104") + .with_rooms(2) + .with_amenities(amenities::FURNISHED | amenities::BALCONY) + .with_title("Cozy 2-room in Kreuzberg"); + + let bytes = listing.to_bytes().unwrap(); + let decoded = ListingAnnounce::from_bytes(&bytes).unwrap(); + + assert_eq!(decoded.size_sqm, 65); + assert_eq!(decoded.rent_euros(), 850); + assert_eq!(decoded.rooms, 2); + assert!(decoded.has_amenity(amenities::FURNISHED)); + assert!(decoded.has_amenity(amenities::BALCONY)); + assert!(!decoded.has_amenity(amenities::PARKING)); + } + + #[test] + fn query_matches() { + let listing = ListingAnnounce::new(ListingType::Apartment, 50, 700, "104") + .with_rooms(2) + .with_amenities(amenities::FURNISHED); + + // Basic match + let query = ListingQuery::new("104", 800); + assert!(query.matches(&listing)); + + // Too expensive for query + let cheap_query = ListingQuery::new("104", 500); + assert!(!cheap_query.matches(&listing)); + + // Wrong location + let wrong_loc = ListingQuery::new("200", 800); + assert!(!wrong_loc.matches(&listing)); + + // Size requirement + let big_query = ListingQuery::new("104", 800).with_min_size(60); + assert!(!big_query.matches(&listing)); + + // Amenity requirement + let needs_parking = ListingQuery::new("104", 800).with_amenities(amenities::PARKING); + assert!(!needs_parking.matches(&listing)); + } + + #[test] + fn create_message_helpers() { + let id = ServiceIdentity::generate(); + + let listing = ListingAnnounce::new(ListingType::Room, 20, 400, "100"); + let msg = create_announce(&id, &listing, 1).unwrap(); + assert_eq!(msg.service_id, HOUSING); + assert_eq!(msg.message_type, MessageType::Announce); + + let query = ListingQuery::new("100", 500); + let msg = create_query(&id, &query).unwrap(); + assert_eq!(msg.service_id, HOUSING); + assert_eq!(msg.message_type, MessageType::Query); + } + + #[test] + fn housing_handler_processes_listing() { + use crate::capabilities; + use crate::router::ServiceRouter; + + let mut router = ServiceRouter::new(capabilities::RELAY); + router.register(Box::new(HousingService::relay())); + + let id = ServiceIdentity::generate(); + let listing = ListingAnnounce::new(ListingType::SharedFlat, 15, 350, "100"); + let msg = create_announce(&id, &listing, 1).unwrap(); + + let action = router.handle(msg, Some(id.public_key())).unwrap(); + assert!(matches!(action, ServiceAction::StoreAndForward)); + assert_eq!(router.store().service_count(HOUSING), 1); + } +} diff --git a/crates/meshservice/src/services/mod.rs b/crates/meshservice/src/services/mod.rs new file mode 100644 index 0000000..758e6a2 --- /dev/null +++ b/crates/meshservice/src/services/mod.rs @@ -0,0 +1,4 @@ +//! Built-in service implementations. + +pub mod fapp; +pub mod housing; diff --git a/crates/meshservice/src/store.rs b/crates/meshservice/src/store.rs new file mode 100644 index 0000000..70a9131 --- /dev/null +++ b/crates/meshservice/src/store.rs @@ -0,0 +1,406 @@ +//! In-memory message store with eviction policies. + +use std::collections::HashMap; +use std::time::{SystemTime, UNIX_EPOCH}; + +use crate::message::ServiceMessage; + +/// Configuration for the message store. +#[derive(Debug, Clone)] +pub struct StoreConfig { + /// Maximum messages per service. + pub max_per_service: usize, + /// Maximum messages per sender (per service). + pub max_per_sender: usize, + /// Maximum total messages. + pub max_total: usize, + /// Prune interval in seconds. + pub prune_interval_secs: u64, +} + +impl Default for StoreConfig { + fn default() -> Self { + Self { + max_per_service: 10_000, + max_per_sender: 100, + max_total: 50_000, + prune_interval_secs: 300, + } + } +} + +/// A stored message with metadata. +#[derive(Debug, Clone)] +pub struct StoredMessage { + pub message: ServiceMessage, + /// Sender's public key (needed for verification). + pub sender_public_key: [u8; 32], + /// When we stored this message. + pub stored_at: u64, +} + +/// Generic service message store. +/// +/// Organized by service_id, then by sender_address, then by message_id. +pub struct ServiceStore { + config: StoreConfig, + /// service_id -> sender_address -> message_id -> StoredMessage + messages: HashMap>>, + /// Total message count. + total_count: usize, + /// Last prune timestamp. + last_prune: u64, +} + +impl ServiceStore { + /// Create a new store with default config. + pub fn new() -> Self { + Self::with_config(StoreConfig::default()) + } + + /// Create with custom config. + pub fn with_config(config: StoreConfig) -> Self { + Self { + config, + messages: HashMap::new(), + total_count: 0, + last_prune: 0, + } + } + + /// Store a message, returning true if it was new. + pub fn store(&mut self, message: ServiceMessage, sender_public_key: [u8; 32]) -> bool { + // Prune if interval passed + self.maybe_prune(); + + let service_id = message.service_id; + let sender_address = message.sender_address; + let message_id = message.id; + + // Check per-service limit and evict if needed + { + let service_count: usize = self.messages + .get(&service_id) + .map(|s| s.values().map(|m| m.len()).sum()) + .unwrap_or(0); + if service_count >= self.config.max_per_service { + self.evict_oldest_in_service(service_id); + } + } + + // Check per-sender limit and evict if needed + { + let sender_count = self.messages + .get(&service_id) + .and_then(|s| s.get(&sender_address)) + .map(|m| m.len()) + .unwrap_or(0); + if sender_count >= self.config.max_per_sender { + self.evict_oldest_from_sender(service_id, sender_address); + } + } + + // Get or create maps + let service_map = self.messages.entry(service_id).or_default(); + let sender_map = service_map.entry(sender_address).or_default(); + + // Check for existing message + let is_new_or_update = if let Some(existing) = sender_map.get(&message_id) { + // Existing: only update if higher sequence + if message.sequence <= existing.message.sequence { + return false; + } + // This is an update, not a new message + false + } else { + // New message + true + }; + + let stored_at = now(); + sender_map.insert( + message_id, + StoredMessage { + message, + sender_public_key, + stored_at, + }, + ); + + if is_new_or_update { + self.total_count += 1; + } + + // Return true for both new messages and updates + true + } + + /// Get a message by service, sender, and ID. + pub fn get( + &self, + service_id: u32, + sender_address: &[u8; 16], + message_id: &[u8; 16], + ) -> Option<&StoredMessage> { + self.messages + .get(&service_id)? + .get(sender_address)? + .get(message_id) + } + + /// Get all messages from a sender in a service. + pub fn by_sender(&self, service_id: u32, sender_address: &[u8; 16]) -> Vec<&StoredMessage> { + self.messages + .get(&service_id) + .and_then(|s| s.get(sender_address)) + .map(|m| m.values().collect()) + .unwrap_or_default() + } + + /// Get all messages in a service. + pub fn by_service(&self, service_id: u32) -> Vec<&StoredMessage> { + self.messages + .get(&service_id) + .map(|s| s.values().flat_map(|m| m.values()).collect()) + .unwrap_or_default() + } + + /// Query messages with a predicate. + pub fn query(&self, service_id: u32, predicate: F) -> Vec<&StoredMessage> + where + F: Fn(&StoredMessage) -> bool, + { + self.by_service(service_id) + .into_iter() + .filter(|m| predicate(m)) + .collect() + } + + /// Remove a specific message. + pub fn remove( + &mut self, + service_id: u32, + sender_address: &[u8; 16], + message_id: &[u8; 16], + ) -> Option { + let result = self + .messages + .get_mut(&service_id)? + .get_mut(sender_address)? + .remove(message_id); + + if result.is_some() { + self.total_count = self.total_count.saturating_sub(1); + } + + result + } + + /// Remove all messages from a sender. + pub fn remove_sender(&mut self, service_id: u32, sender_address: &[u8; 16]) -> usize { + let count = self + .messages + .get_mut(&service_id) + .and_then(|s| s.remove(sender_address)) + .map(|m| m.len()) + .unwrap_or(0); + + self.total_count = self.total_count.saturating_sub(count); + count + } + + /// Prune expired messages. + pub fn prune_expired(&mut self) -> usize { + let now = now(); + let mut removed = 0; + + for service_map in self.messages.values_mut() { + for sender_map in service_map.values_mut() { + let expired: Vec<[u8; 16]> = sender_map + .iter() + .filter(|(_, m)| m.message.is_expired()) + .map(|(id, _)| *id) + .collect(); + + for id in expired { + sender_map.remove(&id); + removed += 1; + } + } + } + + self.total_count = self.total_count.saturating_sub(removed); + self.last_prune = now; + removed + } + + /// Get total message count. + pub fn len(&self) -> usize { + self.total_count + } + + /// Check if empty. + pub fn is_empty(&self) -> bool { + self.total_count == 0 + } + + /// Get count by service. + pub fn service_count(&self, service_id: u32) -> usize { + self.messages + .get(&service_id) + .map(|s| s.values().map(|m| m.len()).sum()) + .unwrap_or(0) + } + + /// Run prune if interval passed. + fn maybe_prune(&mut self) { + let now = now(); + if now.saturating_sub(self.last_prune) >= self.config.prune_interval_secs { + self.prune_expired(); + } + } + + /// Evict oldest message in a service. + fn evict_oldest_in_service(&mut self, service_id: u32) { + let Some(service_map) = self.messages.get_mut(&service_id) else { + return; + }; + + let mut oldest: Option<([u8; 16], [u8; 16], u64)> = None; + + for (sender, msgs) in service_map.iter() { + for (id, stored) in msgs.iter() { + match oldest { + Some((_, _, ts)) if stored.message.timestamp < ts => { + oldest = Some((*sender, *id, stored.message.timestamp)); + } + None => { + oldest = Some((*sender, *id, stored.message.timestamp)); + } + _ => {} + } + } + } + + if let Some((sender, id, _)) = oldest { + if let Some(sender_map) = service_map.get_mut(&sender) { + sender_map.remove(&id); + self.total_count = self.total_count.saturating_sub(1); + } + } + } + + /// Evict oldest message from a sender. + fn evict_oldest_from_sender(&mut self, service_id: u32, sender_address: [u8; 16]) { + let Some(sender_map) = self + .messages + .get_mut(&service_id) + .and_then(|s| s.get_mut(&sender_address)) + else { + return; + }; + + let oldest = sender_map + .iter() + .min_by_key(|(_, m)| m.message.timestamp) + .map(|(id, _)| *id); + + if let Some(id) = oldest { + sender_map.remove(&id); + self.total_count = self.total_count.saturating_sub(1); + } + } +} + +impl Default for ServiceStore { + fn default() -> Self { + Self::new() + } +} + +fn now() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs() +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{identity::ServiceIdentity, message::ServiceMessage, service_ids::FAPP}; + + fn make_message(id: &ServiceIdentity, seq: u64) -> ServiceMessage { + ServiceMessage::announce(id, FAPP, b"test".to_vec(), seq) + } + + #[test] + fn store_and_retrieve() { + let mut store = ServiceStore::new(); + let id = ServiceIdentity::generate(); + let msg = make_message(&id, 1); + + assert!(store.store(msg.clone(), id.public_key())); + assert_eq!(store.len(), 1); + + let retrieved = store.get(FAPP, &id.address(), &msg.id); + assert!(retrieved.is_some()); + } + + #[test] + fn duplicate_rejected() { + let mut store = ServiceStore::new(); + let id = ServiceIdentity::generate(); + let msg = make_message(&id, 1); + + assert!(store.store(msg.clone(), id.public_key())); + assert!(!store.store(msg.clone(), id.public_key())); // Duplicate + assert_eq!(store.len(), 1); + } + + #[test] + fn higher_sequence_updates() { + let mut store = ServiceStore::new(); + let id = ServiceIdentity::generate(); + let msg1 = make_message(&id, 1); + let mut msg2 = make_message(&id, 2); + msg2.id = msg1.id; // Same ID + + store.store(msg1.clone(), id.public_key()); + assert!(store.store(msg2.clone(), id.public_key())); // Updates + + let retrieved = store.get(FAPP, &id.address(), &msg1.id).unwrap(); + assert_eq!(retrieved.message.sequence, 2); + } + + #[test] + fn query_by_sender() { + let mut store = ServiceStore::new(); + let id1 = ServiceIdentity::generate(); + let id2 = ServiceIdentity::generate(); + + store.store(make_message(&id1, 1), id1.public_key()); + store.store(make_message(&id1, 2), id1.public_key()); + store.store(make_message(&id2, 1), id2.public_key()); + + let sender1_msgs = store.by_sender(FAPP, &id1.address()); + assert_eq!(sender1_msgs.len(), 2); + + let sender2_msgs = store.by_sender(FAPP, &id2.address()); + assert_eq!(sender2_msgs.len(), 1); + } + + #[test] + fn remove_sender() { + let mut store = ServiceStore::new(); + let id = ServiceIdentity::generate(); + + store.store(make_message(&id, 1), id.public_key()); + store.store(make_message(&id, 2), id.public_key()); + assert_eq!(store.len(), 2); + + let removed = store.remove_sender(FAPP, &id.address()); + assert_eq!(removed, 2); + assert_eq!(store.len(), 0); + } +} diff --git a/crates/meshservice/src/verification.rs b/crates/meshservice/src/verification.rs new file mode 100644 index 0000000..dc6da85 --- /dev/null +++ b/crates/meshservice/src/verification.rs @@ -0,0 +1,290 @@ +//! Verification framework for building trust in decentralized services. +//! +//! Verification levels: +//! - 0: None (bare announce) +//! - 1: Self-asserted (profile URL, metadata) +//! - 2: Endorsed by trusted peers +//! - 3: Registry-verified (KBV for therapists, trade registry for craftsmen) + +use serde::{Deserialize, Serialize}; + +use crate::identity::ServiceIdentity; + +/// Verification levels (higher = more trusted). +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Default)] +#[repr(u8)] +pub enum VerificationLevel { + #[default] + None = 0, + SelfAsserted = 1, + PeerEndorsed = 2, + RegistryVerified = 3, +} + +impl From for VerificationLevel { + fn from(value: u8) -> Self { + match value { + 1 => VerificationLevel::SelfAsserted, + 2 => VerificationLevel::PeerEndorsed, + 3.. => VerificationLevel::RegistryVerified, + _ => VerificationLevel::None, + } + } +} + +/// A verification attestation attached to a service message. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Verification { + /// Verification level. + pub level: u8, + /// Verifier's mesh address. + pub verifier_address: [u8; 16], + /// What is being verified (e.g., "license", "identity"). + pub claim: String, + /// Optional external reference (URL, registry ID). + #[serde(default, skip_serializing_if = "Option::is_none")] + pub reference: Option, + /// Signature over (level || sender_address || claim). + pub signature: Vec, + /// Timestamp of verification. + pub timestamp: u64, + /// Optional expiry timestamp. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub expires: Option, +} + +impl Verification { + /// Create a new peer endorsement. + pub fn peer_endorsement( + verifier: &ServiceIdentity, + subject_address: &[u8; 16], + claim: impl Into, + ) -> Self { + Self::new( + verifier, + VerificationLevel::PeerEndorsed, + subject_address, + claim, + None, + ) + } + + /// Create a registry verification. + pub fn registry( + verifier: &ServiceIdentity, + subject_address: &[u8; 16], + claim: impl Into, + reference: impl Into, + ) -> Self { + Self::new( + verifier, + VerificationLevel::RegistryVerified, + subject_address, + claim, + Some(reference.into()), + ) + } + + /// Create a new verification. + pub fn new( + verifier: &ServiceIdentity, + level: VerificationLevel, + subject_address: &[u8; 16], + claim: impl Into, + reference: Option, + ) -> Self { + use std::time::{SystemTime, UNIX_EPOCH}; + + let claim = claim.into(); + let timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + + let signable = Self::signable_bytes(level as u8, subject_address, &claim); + let signature = verifier.sign(&signable).to_vec(); + + Self { + level: level as u8, + verifier_address: verifier.address(), + claim, + reference, + signature, + timestamp, + expires: None, + } + } + + /// Set expiry time. + pub fn with_expiry(mut self, expires: u64) -> Self { + self.expires = Some(expires); + self + } + + /// Create signable bytes. + fn signable_bytes(level: u8, subject_address: &[u8; 16], claim: &str) -> Vec { + let mut buf = Vec::with_capacity(17 + claim.len()); + buf.push(level); + buf.extend_from_slice(subject_address); + buf.extend_from_slice(claim.as_bytes()); + buf + } + + /// Verify this attestation. + pub fn verify(&self, verifier_public_key: &[u8; 32], subject_address: &[u8; 16]) -> bool { + use crate::identity::compute_address; + + // Verify verifier address matches key + if compute_address(verifier_public_key) != self.verifier_address { + return false; + } + + // Check expiry + if let Some(expires) = self.expires { + use std::time::{SystemTime, UNIX_EPOCH}; + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + if now > expires { + return false; + } + } + + let sig: [u8; 64] = match self.signature.as_slice().try_into() { + Ok(s) => s, + Err(_) => return false, + }; + + let signable = Self::signable_bytes(self.level, subject_address, &self.claim); + ServiceIdentity::verify(verifier_public_key, &signable, &sig) + } +} + +/// Set of known trusted verifiers (registries, endorsers). +#[derive(Default)] +pub struct TrustedVerifiers { + /// Known public keys with their trust level. + verifiers: Vec, +} + +/// A trusted verifier entry. +#[derive(Clone)] +pub struct TrustedVerifier { + pub public_key: [u8; 32], + pub address: [u8; 16], + pub name: String, + pub max_level: VerificationLevel, +} + +impl TrustedVerifiers { + /// Create empty set. + pub fn new() -> Self { + Self::default() + } + + /// Add a trusted verifier. + pub fn add( + &mut self, + public_key: [u8; 32], + name: impl Into, + max_level: VerificationLevel, + ) { + use crate::identity::compute_address; + + self.verifiers.push(TrustedVerifier { + public_key, + address: compute_address(&public_key), + name: name.into(), + max_level, + }); + } + + /// Find a verifier by address. + pub fn find_by_address(&self, address: &[u8; 16]) -> Option<&TrustedVerifier> { + self.verifiers.iter().find(|v| &v.address == address) + } + + /// Verify a verification against known trusted verifiers. + /// Returns the effective level (or 0 if not trusted). + pub fn check(&self, verification: &Verification, subject_address: &[u8; 16]) -> u8 { + let Some(verifier) = self.find_by_address(&verification.verifier_address) else { + return 0; + }; + + // Level cannot exceed verifier's max + let claimed_level = verification.level.min(verifier.max_level as u8); + + // Actually verify the signature + if verification.verify(&verifier.public_key, subject_address) { + claimed_level + } else { + 0 + } + } + + /// Get the highest trusted verification level from a list. + pub fn highest_level( + &self, + verifications: &[Verification], + subject_address: &[u8; 16], + ) -> VerificationLevel { + verifications + .iter() + .map(|v| self.check(v, subject_address)) + .max() + .map(VerificationLevel::from) + .unwrap_or(VerificationLevel::None) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn peer_endorsement_roundtrip() { + let verifier = ServiceIdentity::generate(); + let subject_address = [1u8; 16]; + + let v = Verification::peer_endorsement(&verifier, &subject_address, "good_actor"); + assert!(v.verify(&verifier.public_key(), &subject_address)); + assert_eq!(v.level, VerificationLevel::PeerEndorsed as u8); + } + + #[test] + fn trusted_verifiers_check() { + let verifier = ServiceIdentity::generate(); + let subject_address = [2u8; 16]; + + let mut trusted = TrustedVerifiers::new(); + trusted.add(verifier.public_key(), "Test Registry", VerificationLevel::RegistryVerified); + + let v = Verification::registry(&verifier, &subject_address, "licensed", "REG-12345"); + let level = trusted.check(&v, &subject_address); + assert_eq!(level, VerificationLevel::RegistryVerified as u8); + } + + #[test] + fn untrusted_verifier_returns_zero() { + let verifier = ServiceIdentity::generate(); + let subject_address = [3u8; 16]; + + let trusted = TrustedVerifiers::new(); // Empty + + let v = Verification::registry(&verifier, &subject_address, "licensed", "REG-999"); + let level = trusted.check(&v, &subject_address); + assert_eq!(level, 0); + } + + #[test] + fn expired_verification_fails() { + let verifier = ServiceIdentity::generate(); + let subject_address = [4u8; 16]; + + let v = Verification::peer_endorsement(&verifier, &subject_address, "trusted") + .with_expiry(1); // Expired in 1970 + + assert!(!v.verify(&verifier.public_key(), &subject_address)); + } +} diff --git a/crates/meshservice/src/wire.rs b/crates/meshservice/src/wire.rs new file mode 100644 index 0000000..535db1d --- /dev/null +++ b/crates/meshservice/src/wire.rs @@ -0,0 +1,259 @@ +//! Wire format for service messages. +//! +//! Binary format for efficient network transmission. +//! Uses CBOR for payload encoding. + +use std::io::{Cursor, Read}; + +use crate::error::ServiceError; +use crate::message::{MessageType, ServiceMessage}; + +/// Wire message header (fixed 64 bytes). +/// +/// ```text +/// ┌─────────────────────────────────────────────────────┐ +/// │ 0-3 │ service_id (u32 LE) │ +/// │ 4 │ message_type (u8) │ +/// │ 5 │ version (u8) │ +/// │ 6-7 │ flags (u16 LE, reserved) │ +/// │ 8-23 │ message_id (16 bytes) │ +/// │ 24-39 │ sender_address (16 bytes) │ +/// │ 40-47 │ sequence (u64 LE) │ +/// │ 48-49 │ ttl_hours (u16 LE) │ +/// │ 50-57 │ timestamp (u64 LE) │ +/// │ 58 │ hop_count (u8) │ +/// │ 59 │ max_hops (u8) │ +/// │ 60-63 │ payload_len (u32 LE) │ +/// └─────────────────────────────────────────────────────┘ +/// Followed by: +/// │ 64-... │ signature (64 bytes) │ +/// │ signature_end-.. │ payload (payload_len bytes) │ +/// │ payload_end-.. │ verifications (CBOR, optional) │ +/// ``` + +const HEADER_SIZE: usize = 64; +const SIGNATURE_SIZE: usize = 64; + +/// Encode a ServiceMessage to bytes. +pub fn encode(msg: &ServiceMessage) -> Result, ServiceError> { + let verifications_bytes = if msg.verifications.is_empty() { + Vec::new() + } else { + let mut buf = Vec::new(); + ciborium::into_writer(&msg.verifications, &mut buf)?; + buf + }; + + let total_size = HEADER_SIZE + SIGNATURE_SIZE + msg.payload.len() + verifications_bytes.len(); + let mut buf = Vec::with_capacity(total_size); + + // Header + buf.extend_from_slice(&msg.service_id.to_le_bytes()); // 0-3 + buf.push(msg.message_type as u8); // 4 + buf.push(msg.version); // 5 + buf.extend_from_slice(&0u16.to_le_bytes()); // 6-7 flags (reserved) + buf.extend_from_slice(&msg.id); // 8-23 + buf.extend_from_slice(&msg.sender_address); // 24-39 + buf.extend_from_slice(&msg.sequence.to_le_bytes()); // 40-47 + buf.extend_from_slice(&msg.ttl_hours.to_le_bytes()); // 48-49 + buf.extend_from_slice(&msg.timestamp.to_le_bytes()); // 50-57 + buf.push(msg.hop_count); // 58 + buf.push(msg.max_hops); // 59 + buf.extend_from_slice(&(msg.payload.len() as u32).to_le_bytes()); // 60-63 + + // Signature + if msg.signature.len() != SIGNATURE_SIZE { + return Err(ServiceError::InvalidFormat(format!( + "signature must be {} bytes, got {}", + SIGNATURE_SIZE, + msg.signature.len() + ))); + } + buf.extend_from_slice(&msg.signature); + + // Payload + buf.extend_from_slice(&msg.payload); + + // Verifications (optional) + buf.extend_from_slice(&verifications_bytes); + + Ok(buf) +} + +/// Decode bytes to a ServiceMessage. +pub fn decode(data: &[u8]) -> Result { + if data.len() < HEADER_SIZE + SIGNATURE_SIZE { + return Err(ServiceError::InvalidFormat("message too short".into())); + } + + let mut cursor = Cursor::new(data); + let mut buf4 = [0u8; 4]; + let mut buf8 = [0u8; 8]; + let mut buf16 = [0u8; 16]; + let mut buf2 = [0u8; 2]; + + // Read header + cursor.read_exact(&mut buf4)?; + let service_id = u32::from_le_bytes(buf4); + + let mut type_byte = [0u8; 1]; + cursor.read_exact(&mut type_byte)?; + let message_type = MessageType::try_from(type_byte[0]) + .map_err(|_| ServiceError::InvalidFormat("invalid message type".into()))?; + + cursor.read_exact(&mut type_byte)?; + let version = type_byte[0]; + + cursor.read_exact(&mut buf2)?; // flags (ignored) + + cursor.read_exact(&mut buf16)?; + let id = buf16; + + cursor.read_exact(&mut buf16)?; + let sender_address = buf16; + + cursor.read_exact(&mut buf8)?; + let sequence = u64::from_le_bytes(buf8); + + cursor.read_exact(&mut buf2)?; + let ttl_hours = u16::from_le_bytes(buf2); + + cursor.read_exact(&mut buf8)?; + let timestamp = u64::from_le_bytes(buf8); + + cursor.read_exact(&mut type_byte)?; + let hop_count = type_byte[0]; + + cursor.read_exact(&mut type_byte)?; + let max_hops = type_byte[0]; + + cursor.read_exact(&mut buf4)?; + let payload_len = u32::from_le_bytes(buf4) as usize; + + // Read signature + let mut signature = vec![0u8; SIGNATURE_SIZE]; + cursor.read_exact(&mut signature)?; + + // Read payload + if data.len() < HEADER_SIZE + SIGNATURE_SIZE + payload_len { + return Err(ServiceError::InvalidFormat("payload truncated".into())); + } + let mut payload = vec![0u8; payload_len]; + cursor.read_exact(&mut payload)?; + + // Read verifications (remaining bytes) + let verifications = if cursor.position() < data.len() as u64 { + let mut remaining = Vec::new(); + cursor.read_to_end(&mut remaining)?; + if remaining.is_empty() { + Vec::new() + } else { + ciborium::from_reader(&remaining[..]) + .map_err(|e| ServiceError::Serialization(e.to_string()))? + } + } else { + Vec::new() + }; + + Ok(ServiceMessage { + service_id, + message_type, + version, + id, + sender_address, + payload, + signature, + verifications, + sequence, + ttl_hours, + timestamp, + hop_count, + max_hops, + }) +} + +// Implement std::io::Error conversion for Read trait +impl From for ServiceError { + fn from(e: std::io::Error) -> Self { + ServiceError::InvalidFormat(e.to_string()) + } +} + +/// Encode a payload struct to CBOR. +pub fn encode_payload(payload: &T) -> Result, ServiceError> { + let mut buf = Vec::new(); + ciborium::into_writer(payload, &mut buf)?; + Ok(buf) +} + +/// Decode a payload from CBOR. +pub fn decode_payload(data: &[u8]) -> Result { + ciborium::from_reader(data).map_err(|e| ServiceError::Serialization(e.to_string())) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::identity::ServiceIdentity; + use crate::service_ids::FAPP; + use crate::verification::Verification; + + #[test] + fn roundtrip_simple() { + let id = ServiceIdentity::generate(); + let msg = ServiceMessage::announce(&id, FAPP, b"hello world".to_vec(), 42); + + let encoded = encode(&msg).unwrap(); + let decoded = decode(&encoded).unwrap(); + + assert_eq!(decoded.service_id, FAPP); + assert_eq!(decoded.message_type, MessageType::Announce); + assert_eq!(decoded.sequence, 42); + assert_eq!(decoded.payload, b"hello world"); + assert_eq!(decoded.signature, msg.signature); + } + + #[test] + fn roundtrip_with_verifications() { + let id = ServiceIdentity::generate(); + let verifier = ServiceIdentity::generate(); + + let mut msg = ServiceMessage::announce(&id, FAPP, b"payload".to_vec(), 1); + msg.add_verification(Verification::peer_endorsement( + &verifier, + &id.address(), + "trusted", + )); + + let encoded = encode(&msg).unwrap(); + let decoded = decode(&encoded).unwrap(); + + assert_eq!(decoded.verifications.len(), 1); + assert_eq!(decoded.verifications[0].claim, "trusted"); + } + + #[test] + fn payload_codec() { + #[derive(serde::Serialize, serde::Deserialize, Debug, PartialEq)] + struct TestPayload { + name: String, + value: i32, + } + + let payload = TestPayload { + name: "test".into(), + value: 123, + }; + + let encoded = encode_payload(&payload).unwrap(); + let decoded: TestPayload = decode_payload(&encoded).unwrap(); + + assert_eq!(payload, decoded); + } + + #[test] + fn truncated_rejected() { + let result = decode(&[0u8; 10]); + assert!(matches!(result, Err(ServiceError::InvalidFormat(_)))); + } +} diff --git a/crates/quicprochat-p2p/src/lib.rs b/crates/quicprochat-p2p/src/lib.rs index a7bccba..899a9f9 100644 --- a/crates/quicprochat-p2p/src/lib.rs +++ b/crates/quicprochat-p2p/src/lib.rs @@ -32,6 +32,7 @@ pub mod rate_limit; pub mod shutdown; pub mod identity; pub mod link; +pub mod mesh_node; pub mod mesh_router; pub mod routing; pub mod routing_table; diff --git a/crates/quicprochat-p2p/src/mesh_node.rs b/crates/quicprochat-p2p/src/mesh_node.rs new file mode 100644 index 0000000..f9ca925 --- /dev/null +++ b/crates/quicprochat-p2p/src/mesh_node.rs @@ -0,0 +1,529 @@ +//! Production-ready mesh node integrating all subsystems. +//! +//! [`MeshNode`] combines: +//! - P2P transport (iroh QUIC) +//! - Mesh routing and store-and-forward +//! - FAPP (appointment discovery) +//! - Rate limiting and backpressure +//! - Metrics collection +//! - Graceful shutdown +//! +//! This is the main entry point for production deployments. + +use std::sync::{Arc, RwLock}; +use std::time::Duration; + +use iroh::{Endpoint, EndpointAddr, PublicKey, SecretKey}; +use tokio::sync::mpsc; + +use crate::address::MeshAddress; +use crate::broadcast::BroadcastManager; +use crate::config::MeshConfig; +use crate::envelope::MeshEnvelope; +use crate::error::{MeshError, MeshResult}; +use crate::fapp::{FappStore, CAP_FAPP_PATIENT, CAP_FAPP_RELAY, CAP_FAPP_THERAPIST}; +use crate::fapp_router::FappRouter; +use crate::identity::MeshIdentity; +use crate::mesh_router::{IncomingAction, MeshRouter}; +use crate::metrics::{self, MeshMetrics}; +use crate::rate_limit::{BackpressureController, RateLimiter}; +use crate::routing_table::RoutingTable; +use crate::shutdown::{ShutdownCoordinator, ShutdownSignal, ShutdownTrigger}; +use crate::store::MeshStore; +use crate::transport::TransportAddr; +use crate::transport_manager::TransportManager; + +/// ALPN for mesh protocol. +const MESH_ALPN: &[u8] = b"quicprochat/mesh/1"; + +/// Production mesh node with all subsystems integrated. +pub struct MeshNode { + /// Node configuration. + config: MeshConfig, + /// iroh endpoint for QUIC transport. + endpoint: Endpoint, + /// Mesh identity (Ed25519 keypair). + identity: MeshIdentity, + /// Mesh address (truncated from identity). + address: MeshAddress, + /// Routing table for mesh forwarding. + routing_table: Arc>, + /// Store-and-forward message queue. + mesh_store: Arc>, + /// Broadcast channel manager. + broadcast_mgr: Arc>, + /// Multi-transport manager. + transport_manager: Arc, + /// Mesh router for envelope handling. + mesh_router: Arc, + /// FAPP router (optional, based on capabilities). + fapp_router: Option>, + /// Rate limiter for DoS protection. + rate_limiter: Arc, + /// Backpressure controller. + backpressure: Arc, + /// Metrics collector. + metrics: Arc, + /// Shutdown coordinator. + shutdown: Arc, + /// Shutdown trigger (clone for external use). + shutdown_trigger: ShutdownTrigger, +} + +/// Builder for MeshNode with sensible defaults. +pub struct MeshNodeBuilder { + config: MeshConfig, + identity: Option, + secret_key: Option, + fapp_capabilities: u16, +} + +impl MeshNodeBuilder { + pub fn new() -> Self { + Self { + config: MeshConfig::default(), + identity: None, + secret_key: None, + fapp_capabilities: 0, + } + } + + /// Use a specific configuration. + pub fn config(mut self, config: MeshConfig) -> Self { + self.config = config; + self + } + + /// Use existing mesh identity. + pub fn identity(mut self, identity: MeshIdentity) -> Self { + self.identity = Some(identity); + self + } + + /// Use existing iroh secret key. + pub fn secret_key(mut self, key: SecretKey) -> Self { + self.secret_key = Some(key); + self + } + + /// Enable FAPP therapist capabilities. + pub fn fapp_therapist(mut self) -> Self { + self.fapp_capabilities |= CAP_FAPP_THERAPIST; + self + } + + /// Enable FAPP relay capabilities. + pub fn fapp_relay(mut self) -> Self { + self.fapp_capabilities |= CAP_FAPP_RELAY; + self + } + + /// Enable FAPP patient capabilities. + pub fn fapp_patient(mut self) -> Self { + self.fapp_capabilities |= CAP_FAPP_PATIENT; + self + } + + /// Build and start the mesh node. + pub async fn build(self) -> MeshResult { + MeshNode::start( + self.config, + self.identity, + self.secret_key, + self.fapp_capabilities, + ) + .await + } +} + +impl Default for MeshNodeBuilder { + fn default() -> Self { + Self::new() + } +} + +impl MeshNode { + /// Start a new mesh node with full configuration. + pub async fn start( + config: MeshConfig, + identity: Option, + secret_key: Option, + fapp_capabilities: u16, + ) -> MeshResult { + // Initialize metrics + let metrics = Arc::new(MeshMetrics::new()); + + // Create identity + let identity = identity.unwrap_or_else(MeshIdentity::generate); + let address = MeshAddress::from_public_key(&identity.public_key()); + + // Build iroh endpoint + let mut builder = Endpoint::builder(); + if let Some(sk) = secret_key { + builder = builder.secret_key(sk); + } + builder = builder.alpns(vec![MESH_ALPN.to_vec()]); + + let endpoint = builder.bind().await.map_err(|e| { + MeshError::Internal(format!("failed to bind endpoint: {}", e)) + })?; + + tracing::info!( + node_id = %endpoint.id().fmt_short(), + mesh_addr = %address, + "Mesh node starting" + ); + + // Create routing table + let routing_table = Arc::new(RwLock::new(RoutingTable::new( + config.routing.default_ttl, + ))); + + // Create stores + let mesh_store = Arc::new(std::sync::Mutex::new(MeshStore::new( + config.store.max_messages, + ))); + let broadcast_mgr = Arc::new(std::sync::Mutex::new(BroadcastManager::new())); + + // Create transport manager + let transport_manager = Arc::new(TransportManager::new()); + + // Create mesh router (needs its own identity copy) + let router_identity = MeshIdentity::from_seed(identity.seed_bytes()); + let mesh_router = Arc::new(MeshRouter::new( + router_identity, + Arc::clone(&routing_table), + Arc::clone(&transport_manager), + Arc::clone(&mesh_store), + )); + + // Create FAPP router if capabilities are set + let fapp_router = if fapp_capabilities != 0 { + Some(Arc::new(FappRouter::new( + FappStore::new(), + Arc::clone(&routing_table), + Arc::clone(&transport_manager), + fapp_capabilities, + ))) + } else { + None + }; + + // Create rate limiter + let rate_limiter = Arc::new(RateLimiter::new(config.rate_limit.clone())); + + // Create backpressure controller + let backpressure = Arc::new(BackpressureController::default_for_standard()); + + // Create shutdown coordinator + let shutdown = Arc::new(ShutdownCoordinator::new()); + let (shutdown_trigger, _shutdown_signal) = ShutdownSignal::new(); + + let node = Self { + config, + endpoint, + identity, + address, + routing_table, + mesh_store, + broadcast_mgr, + transport_manager, + mesh_router, + fapp_router, + rate_limiter, + backpressure, + metrics, + shutdown, + shutdown_trigger, + }; + + tracing::info!( + mesh_addr = %node.address, + fapp = fapp_capabilities != 0, + "Mesh node started" + ); + + Ok(node) + } + + /// Get the node's mesh address. + pub fn address(&self) -> MeshAddress { + self.address + } + + /// Get the node's iroh public key. + pub fn node_id(&self) -> PublicKey { + self.endpoint.id() + } + + /// Get the node's endpoint address for sharing. + pub fn endpoint_addr(&self) -> EndpointAddr { + self.endpoint.addr() + } + + /// Get a reference to the mesh identity. + pub fn identity(&self) -> &MeshIdentity { + &self.identity + } + + /// Get a reference to the configuration. + pub fn config(&self) -> &MeshConfig { + &self.config + } + + /// Get a reference to the metrics. + pub fn metrics(&self) -> &Arc { + &self.metrics + } + + /// Get a reference to the mesh router. + pub fn mesh_router(&self) -> &Arc { + &self.mesh_router + } + + /// Get a reference to the FAPP router, if enabled. + pub fn fapp_router(&self) -> Option<&Arc> { + self.fapp_router.as_ref() + } + + /// Get a reference to the routing table. + pub fn routing_table(&self) -> &Arc> { + &self.routing_table + } + + /// Get a reference to the transport manager. + pub fn transport_manager(&self) -> &Arc { + &self.transport_manager + } + + /// Get a clone of the shutdown trigger. + pub fn shutdown_trigger(&self) -> ShutdownTrigger { + self.shutdown_trigger.clone() + } + + /// Send a mesh envelope to a peer. + pub async fn send(&self, dest: &TransportAddr, envelope: &MeshEnvelope) -> MeshResult<()> { + let wire = envelope.to_wire(); + + self.metrics.transport("mesh").sent.inc(); + self.metrics.transport("mesh").bytes_sent.inc_by(wire.len() as u64); + + self.transport_manager + .send(dest, &wire) + .await + .map_err(|e| MeshError::Internal(e.to_string())) + } + + /// Process an incoming envelope with rate limiting and metrics. + pub fn process_incoming(&self, sender: &MeshAddress, envelope: MeshEnvelope) -> MeshResult { + // Rate limiting check + let rate_result = self.rate_limiter.check_message(sender)?; + if !rate_result.is_allowed() { + self.metrics.protocol.oversized.inc(); + return Ok(IncomingAction::Dropped("rate limited".into())); + } + + // Backpressure check + let _bp_level = self.backpressure.level(); + // For now, we process all messages regardless of backpressure + // In production, we'd check message priority + + // Update metrics + self.metrics.transport("mesh").received.inc(); + self.metrics.transport("mesh").bytes_received.inc_by(envelope.payload.len() as u64); + + // Delegate to mesh router + let action = self.mesh_router.handle_incoming(envelope) + .map_err(|e| MeshError::Internal(e.to_string()))?; + + // Update routing metrics based on action + match &action { + IncomingAction::Deliver(_) => { + self.metrics.store.messages_delivered.inc(); + } + IncomingAction::Forward { .. } => { + self.metrics.routing.announcements_forwarded.inc(); + } + IncomingAction::Store(_) => { + self.metrics.store.messages_stored.inc(); + } + IncomingAction::Dropped(_) => { + self.metrics.protocol.parse_errors.inc(); + } + } + + Ok(action) + } + + /// Parse and process raw incoming bytes. + pub fn process_incoming_bytes(&self, sender: &MeshAddress, data: &[u8]) -> MeshResult { + let envelope = MeshEnvelope::from_wire(data) + .map_err(|e| MeshError::Protocol(crate::error::ProtocolError::InvalidFormat(e.to_string())))?; + self.process_incoming(sender, envelope) + } + + /// Store a message for offline delivery. + pub fn store_for_delivery(&self, envelope: MeshEnvelope) -> MeshResult { + let mut store = self.mesh_store.lock().map_err(|e| { + MeshError::Internal(format!("mesh store lock poisoned: {}", e)) + })?; + + let stored = store.store(envelope); + if stored { + self.metrics.store.messages_stored.inc(); + self.metrics.store.current_size.set(store.stats().0 as u64); + } + + Ok(stored) + } + + /// Fetch stored messages for a recipient. + pub fn fetch_stored(&self, recipient: &[u8]) -> MeshResult> { + let mut store = self.mesh_store.lock().map_err(|e| { + MeshError::Internal(format!("mesh store lock poisoned: {}", e)) + })?; + + let messages = store.fetch(recipient); + self.metrics.store.current_size.set(store.stats().0 as u64); + + Ok(messages) + } + + /// Run garbage collection on stores. + pub fn gc(&self) -> MeshResult { + let mut stats = GcStats::default(); + + // GC mesh store + { + let mut store = self.mesh_store.lock().map_err(|e| { + MeshError::Internal(format!("mesh store lock: {}", e)) + })?; + stats.messages_expired = store.gc_expired(); + self.metrics.store.messages_expired.inc_by(stats.messages_expired as u64); + } + + // GC routing table + { + let mut table = self.routing_table.write().map_err(|e| { + MeshError::Internal(format!("routing table lock: {}", e)) + })?; + stats.routes_expired = table.remove_expired(); + self.metrics.routing.routes_expired.inc_by(stats.routes_expired as u64); + } + + // GC rate limiter (remove idle peers) + stats.rate_limiters_cleaned = self.rate_limiter.cleanup(Duration::from_secs(3600)); + + tracing::debug!( + messages = stats.messages_expired, + routes = stats.routes_expired, + rate_limiters = stats.rate_limiters_cleaned, + "GC completed" + ); + + Ok(stats) + } + + /// Gracefully shut down the node. + pub async fn shutdown(self) { + tracing::info!("Mesh node shutting down"); + + // Trigger shutdown + self.shutdown_trigger.trigger(); + + // Run shutdown coordinator + self.shutdown.shutdown().await; + + // Close transports + self.transport_manager.close_all().await; + + // Close iroh endpoint + self.endpoint.close().await; + + tracing::info!("Mesh node shutdown complete"); + } +} + +/// Statistics from garbage collection. +#[derive(Debug, Default)] +pub struct GcStats { + pub messages_expired: usize, + pub routes_expired: usize, + pub rate_limiters_cleaned: usize, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn mesh_node_starts() { + let node = MeshNodeBuilder::new() + .build() + .await + .expect("build node"); + + assert!(!node.address().is_broadcast()); + assert!(node.fapp_router().is_none()); + + node.shutdown().await; + } + + #[tokio::test] + async fn mesh_node_with_fapp() { + let node = MeshNodeBuilder::new() + .fapp_relay() + .fapp_patient() + .build() + .await + .expect("build node"); + + assert!(node.fapp_router().is_some()); + + node.shutdown().await; + } + + #[tokio::test] + async fn mesh_node_metrics() { + let node = MeshNodeBuilder::new() + .build() + .await + .expect("build node"); + + // Check metrics are accessible + let snapshot = node.metrics().snapshot(); + assert!(snapshot.uptime_secs < 5); + + node.shutdown().await; + } + + #[tokio::test] + async fn mesh_node_gc() { + let node = MeshNodeBuilder::new() + .build() + .await + .expect("build node"); + + let stats = node.gc().expect("gc"); + assert_eq!(stats.messages_expired, 0); + assert_eq!(stats.routes_expired, 0); + + node.shutdown().await; + } + + #[tokio::test] + async fn mesh_node_with_identity() { + let identity = MeshIdentity::generate(); + let pk = identity.public_key(); + + let node = MeshNodeBuilder::new() + .identity(identity) + .build() + .await + .expect("build node"); + + assert_eq!(node.identity().public_key(), pk); + + node.shutdown().await; + } +}