diff --git a/crates/quicprochat-p2p/src/lib.rs b/crates/quicprochat-p2p/src/lib.rs index b36ede9..4c62c4f 100644 --- a/crates/quicprochat-p2p/src/lib.rs +++ b/crates/quicprochat-p2p/src/lib.rs @@ -42,6 +42,7 @@ pub mod transport_iroh; pub mod transport_manager; pub mod transport_tcp; pub mod transport_lora; +pub mod observability; pub mod viz_log; #[cfg(feature = "traffic-resistance")] pub mod traffic_resistance; diff --git a/crates/quicprochat-p2p/src/mesh_node.rs b/crates/quicprochat-p2p/src/mesh_node.rs index 9d7c950..4e1e1ce 100644 --- a/crates/quicprochat-p2p/src/mesh_node.rs +++ b/crates/quicprochat-p2p/src/mesh_node.rs @@ -10,13 +10,16 @@ //! //! This is the main entry point for production deployments. +use std::net::SocketAddr; +use std::sync::atomic::AtomicBool; use std::sync::{Arc, RwLock}; use std::time::Duration; use iroh::{Endpoint, EndpointAddr, PublicKey, SecretKey}; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, watch}; use crate::address::MeshAddress; +use crate::announce_protocol::{self, AnnounceConfig as AnnounceProtoConfig, AnnounceDedup}; use crate::broadcast::BroadcastManager; use crate::config::MeshConfig; use crate::envelope::MeshEnvelope; @@ -26,6 +29,7 @@ use crate::fapp_router::{is_fapp_payload, FappRouter}; use crate::identity::MeshIdentity; use crate::mesh_router::{IncomingAction, MeshRouter}; use crate::metrics::{self, MeshMetrics}; +use crate::observability::{HealthServer, NodeHealth}; use crate::rate_limit::{BackpressureController, RateLimiter}; use crate::routing_table::RoutingTable; use crate::shutdown::{ShutdownCoordinator, ShutdownSignal, ShutdownTrigger}; @@ -68,6 +72,10 @@ pub struct MeshNode { shutdown: Arc, /// Shutdown trigger (clone for external use). shutdown_trigger: ShutdownTrigger, + /// Whether the node is draining (shutting down). + draining: Arc, + /// Health/metrics HTTP listen address (if configured). + health_listen: Option, } /// Builder for MeshNode with sensible defaults. @@ -76,6 +84,7 @@ pub struct MeshNodeBuilder { identity: Option, secret_key: Option, fapp_capabilities: u16, + health_listen: Option, } impl MeshNodeBuilder { @@ -85,6 +94,7 @@ impl MeshNodeBuilder { identity: None, secret_key: None, fapp_capabilities: 0, + health_listen: None, } } @@ -124,6 +134,12 @@ impl MeshNodeBuilder { self } + /// Enable health/metrics HTTP endpoint on the given address. + pub fn health_listen(mut self, addr: SocketAddr) -> Self { + self.health_listen = Some(addr); + self + } + /// Build and start the mesh node. pub async fn build(self) -> MeshResult { MeshNode::start( @@ -131,6 +147,7 @@ impl MeshNodeBuilder { self.identity, self.secret_key, self.fapp_capabilities, + self.health_listen, ) .await } @@ -149,6 +166,7 @@ impl MeshNode { identity: Option, secret_key: Option, fapp_capabilities: u16, + health_listen: Option, ) -> MeshResult { // Initialize metrics let metrics = Arc::new(MeshMetrics::new()); @@ -219,6 +237,8 @@ impl MeshNode { let shutdown = Arc::new(ShutdownCoordinator::new()); let (shutdown_trigger, _shutdown_signal) = ShutdownSignal::new(); + let draining = Arc::new(AtomicBool::new(false)); + let node = Self { config, endpoint, @@ -235,11 +255,14 @@ impl MeshNode { metrics, shutdown, shutdown_trigger, + draining, + health_listen, }; tracing::info!( mesh_addr = %node.address, fapp = fapp_capabilities != 0, + health = ?node.health_listen, "Mesh node started" ); @@ -301,7 +324,19 @@ impl MeshNode { self.shutdown_trigger.clone() } + /// Whether the node is currently draining (shutting down). + pub fn is_draining(&self) -> bool { + self.draining.load(std::sync::atomic::Ordering::Relaxed) + } + + /// Get a snapshot of the current node health. + pub fn health(&self) -> NodeHealth { + let snapshot = self.metrics.snapshot(); + NodeHealth::from_snapshot(&snapshot, self.is_draining()) + } + /// Send a mesh envelope to a peer. + #[tracing::instrument(skip(self, envelope), fields(dest = %dest, payload_len = envelope.payload.len()))] pub async fn send(&self, dest: &TransportAddr, envelope: &MeshEnvelope) -> MeshResult<()> { let wire = envelope.to_wire(); @@ -315,6 +350,7 @@ impl MeshNode { } /// Process an incoming envelope with rate limiting and metrics. + #[tracing::instrument(skip(self, envelope), fields(sender = %sender, payload_len = envelope.payload.len()))] pub fn process_incoming(&self, sender: &MeshAddress, envelope: MeshEnvelope) -> MeshResult { // Rate limiting check let rate_result = self.rate_limiter.check_message(sender)?; @@ -444,10 +480,104 @@ impl MeshNode { Ok(stats) } + /// Run the mesh node event loop with background tasks. + /// + /// Starts: + /// - Periodic garbage collection (routing table, store, rate limiters) + /// - Health/metrics HTTP server (if `health_listen` is configured) + /// + /// Returns a [`RunHandle`] that can be used to await shutdown or trigger it. + pub async fn run(self) -> MeshResult { + let (shutdown_tx, shutdown_rx) = watch::channel(false); + + // Start health server if configured. + let health_addr = if let Some(addr) = self.health_listen { + let server = HealthServer::new( + Arc::clone(&self.metrics), + Arc::clone(&self.draining), + ); + match server.serve(addr, shutdown_rx.clone()).await { + Ok(bound) => Some(bound), + Err(e) => { + tracing::warn!(error = %e, "failed to start health server"); + None + } + } + } else { + None + }; + + // Spawn GC task. + let gc_metrics = Arc::clone(&self.metrics); + let gc_store = Arc::clone(&self.mesh_store); + let gc_routing = Arc::clone(&self.routing_table); + let gc_rate_limiter = Arc::clone(&self.rate_limiter); + let gc_interval = self.config.routing.gc_interval; + let mut gc_shutdown = shutdown_rx.clone(); + + tokio::spawn(async move { + let mut interval = tokio::time::interval(gc_interval); + interval.tick().await; // skip immediate first tick + + loop { + tokio::select! { + biased; + _ = gc_shutdown.changed() => break, + _ = interval.tick() => { + let _span = tracing::info_span!("mesh_gc").entered(); + + let mut expired_messages = 0usize; + let mut expired_routes = 0usize; + let mut cleaned_limiters = 0usize; + + // GC store. + if let Ok(mut store) = gc_store.lock() { + expired_messages = store.gc_expired(); + gc_metrics.store.messages_expired.inc_by(expired_messages as u64); + } + + // GC routing table. + if let Ok(mut table) = gc_routing.write() { + expired_routes = table.remove_expired(); + gc_metrics.routing.routes_expired.inc_by(expired_routes as u64); + } + + // GC rate limiters. + cleaned_limiters = gc_rate_limiter.cleanup(Duration::from_secs(3600)); + + if expired_messages > 0 || expired_routes > 0 || cleaned_limiters > 0 { + tracing::debug!( + messages = expired_messages, + routes = expired_routes, + rate_limiters = cleaned_limiters, + "GC cycle completed" + ); + } + } + } + } + }); + + tracing::info!( + mesh_addr = %self.address, + health = ?health_addr, + "Mesh node running" + ); + + Ok(RunHandle { + node: self, + shutdown_tx, + health_addr, + }) + } + /// Gracefully shut down the node. pub async fn shutdown(self) { tracing::info!("Mesh node shutting down"); + // Mark as draining for health checks. + self.draining.store(true, std::sync::atomic::Ordering::Relaxed); + // Trigger shutdown self.shutdown_trigger.trigger(); @@ -455,7 +585,7 @@ impl MeshNode { self.shutdown.shutdown().await; // Close transports - self.transport_manager.close_all().await; + let _ = self.transport_manager.close_all().await; // Close iroh endpoint self.endpoint.close().await; @@ -472,6 +602,48 @@ pub struct GcStats { pub rate_limiters_cleaned: usize, } +/// Handle for a running mesh node. +/// +/// Provides access to the node and controls for shutdown. +pub struct RunHandle { + /// The running mesh node. + node: MeshNode, + /// Shutdown sender — drop or send to stop background tasks. + shutdown_tx: watch::Sender, + /// Bound health server address (if started). + health_addr: Option, +} + +impl RunHandle { + /// Get a reference to the running mesh node. + pub fn node(&self) -> &MeshNode { + &self.node + } + + /// Get the health server's bound address, if running. + pub fn health_addr(&self) -> Option { + self.health_addr + } + + /// Trigger graceful shutdown and wait for completion. + pub async fn shutdown(self) { + // Signal background tasks to stop. + let _ = self.shutdown_tx.send(true); + // Run node shutdown (drains transports, etc.). + self.node.shutdown().await; + } + + /// Get a snapshot of current node health. + pub fn health(&self) -> NodeHealth { + self.node.health() + } + + /// Get a snapshot of current metrics. + pub fn metrics_snapshot(&self) -> crate::metrics::MetricsSnapshot { + self.node.metrics().snapshot() + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/quicprochat-p2p/src/observability.rs b/crates/quicprochat-p2p/src/observability.rs new file mode 100644 index 0000000..121ab66 --- /dev/null +++ b/crates/quicprochat-p2p/src/observability.rs @@ -0,0 +1,381 @@ +//! Observability for mesh nodes: health checks, metrics export, and tracing helpers. +//! +//! Provides: +//! - [`NodeHealth`] — structured health status for the mesh node +//! - [`HealthServer`] — lightweight HTTP server for `/healthz` and `/metricsz` +//! - Prometheus text format export from [`MeshMetrics`] + +use std::collections::HashMap; +use std::io::Write as IoWrite; +use std::net::SocketAddr; +use std::sync::Arc; + +use tokio::io::AsyncWriteExt; +use tokio::net::TcpListener; + +use crate::metrics::{MeshMetrics, MetricsSnapshot}; + +/// Node health status. +#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)] +#[serde(rename_all = "lowercase")] +pub enum HealthStatus { + /// Node is healthy and accepting traffic. + Healthy, + /// Node is degraded but still operational. + Degraded, + /// Node is shutting down (draining connections). + Draining, + /// Node is unhealthy. + Unhealthy, +} + +impl std::fmt::Display for HealthStatus { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Healthy => write!(f, "healthy"), + Self::Degraded => write!(f, "degraded"), + Self::Draining => write!(f, "draining"), + Self::Unhealthy => write!(f, "unhealthy"), + } + } +} + +/// Structured health check response. +#[derive(Debug, Clone, serde::Serialize)] +pub struct NodeHealth { + /// Overall node status. + pub status: HealthStatus, + /// Node uptime in seconds. + pub uptime_secs: u64, + /// Number of active transport connections. + pub connections: u64, + /// Routing table size. + pub routing_table_size: u64, + /// Store queue depth. + pub store_size: u64, + /// Messages processed since start. + pub messages_processed: u64, + /// Individual subsystem checks. + pub checks: HashMap, +} + +/// Per-subsystem health. +#[derive(Debug, Clone, serde::Serialize)] +pub struct SubsystemHealth { + pub status: HealthStatus, + pub message: String, +} + +impl NodeHealth { + /// Build a health check from a metrics snapshot and node state. + pub fn from_snapshot(snapshot: &MetricsSnapshot, is_draining: bool) -> Self { + let mut checks = HashMap::new(); + + // Transport health: degraded if error rate > 10%. + let total_sent: u64 = snapshot.transports.values().map(|t| t.sent).sum(); + let total_errors: u64 = snapshot.transports.values().map(|t| t.send_errors).sum(); + let transport_status = if is_draining { + HealthStatus::Draining + } else if total_sent > 0 && total_errors * 10 > total_sent { + HealthStatus::Degraded + } else { + HealthStatus::Healthy + }; + checks.insert( + "transport".to_string(), + SubsystemHealth { + status: transport_status, + message: format!( + "sent={}, errors={}, connections={}", + total_sent, + total_errors, + snapshot.transports.values().map(|t| t.connections).sum::(), + ), + }, + ); + + // Routing health. + let routing_status = HealthStatus::Healthy; + checks.insert( + "routing".to_string(), + SubsystemHealth { + status: routing_status, + message: format!( + "table_size={}, lookups={}, misses={}", + snapshot.routing.table_size, + snapshot.routing.lookups, + snapshot.routing.lookup_misses, + ), + }, + ); + + // Store health. + checks.insert( + "store".to_string(), + SubsystemHealth { + status: HealthStatus::Healthy, + message: format!( + "stored={}, delivered={}, expired={}, current={}", + snapshot.store.messages_stored, + snapshot.store.messages_delivered, + snapshot.store.messages_expired, + snapshot.store.current_size, + ), + }, + ); + + // Overall status: worst of all subsystems. + let overall = if is_draining { + HealthStatus::Draining + } else if checks.values().any(|c| c.status == HealthStatus::Unhealthy) { + HealthStatus::Unhealthy + } else if checks.values().any(|c| c.status == HealthStatus::Degraded) { + HealthStatus::Degraded + } else { + HealthStatus::Healthy + }; + + let connections = snapshot.transports.values().map(|t| t.connections).sum(); + let messages_processed: u64 = snapshot.transports.values().map(|t| t.received).sum(); + + Self { + status: overall, + uptime_secs: snapshot.uptime_secs, + connections, + routing_table_size: snapshot.routing.table_size, + store_size: snapshot.store.current_size, + messages_processed, + checks, + } + } + + /// HTTP status code for this health status. + pub fn http_status_code(&self) -> u16 { + match self.status { + HealthStatus::Healthy => 200, + HealthStatus::Degraded => 200, + HealthStatus::Draining => 503, + HealthStatus::Unhealthy => 503, + } + } +} + +/// Render a [`MetricsSnapshot`] in Prometheus text exposition format. +pub fn prometheus_text(snapshot: &MetricsSnapshot) -> String { + let mut buf = Vec::with_capacity(2048); + + // Uptime. + writeln!(buf, "# HELP mesh_uptime_seconds Node uptime in seconds.").ok(); + writeln!(buf, "# TYPE mesh_uptime_seconds gauge").ok(); + writeln!(buf, "mesh_uptime_seconds {}", snapshot.uptime_secs).ok(); + + // Transport metrics. + for (name, t) in &snapshot.transports { + writeln!(buf, "# HELP mesh_transport_sent_total Messages sent via transport.").ok(); + writeln!(buf, "# TYPE mesh_transport_sent_total counter").ok(); + writeln!(buf, "mesh_transport_sent_total{{transport=\"{}\"}} {}", name, t.sent).ok(); + + writeln!(buf, "mesh_transport_received_total{{transport=\"{}\"}} {}", name, t.received).ok(); + writeln!(buf, "mesh_transport_send_errors_total{{transport=\"{}\"}} {}", name, t.send_errors).ok(); + writeln!(buf, "mesh_transport_bytes_sent_total{{transport=\"{}\"}} {}", name, t.bytes_sent).ok(); + writeln!(buf, "mesh_transport_bytes_received_total{{transport=\"{}\"}} {}", name, t.bytes_received).ok(); + + writeln!(buf, "# HELP mesh_transport_connections Active connections.").ok(); + writeln!(buf, "# TYPE mesh_transport_connections gauge").ok(); + writeln!(buf, "mesh_transport_connections{{transport=\"{}\"}} {}", name, t.connections).ok(); + } + + // Routing metrics. + writeln!(buf, "# HELP mesh_routing_table_size Current routing table entries.").ok(); + writeln!(buf, "# TYPE mesh_routing_table_size gauge").ok(); + writeln!(buf, "mesh_routing_table_size {}", snapshot.routing.table_size).ok(); + + writeln!(buf, "mesh_routing_lookups_total {}", snapshot.routing.lookups).ok(); + writeln!(buf, "mesh_routing_lookup_misses_total {}", snapshot.routing.lookup_misses).ok(); + writeln!(buf, "mesh_routing_announcements_processed_total {}", snapshot.routing.announcements_processed).ok(); + + // Store metrics. + writeln!(buf, "# HELP mesh_store_current_size Current messages in store.").ok(); + writeln!(buf, "# TYPE mesh_store_current_size gauge").ok(); + writeln!(buf, "mesh_store_current_size {}", snapshot.store.current_size).ok(); + + writeln!(buf, "mesh_store_messages_stored_total {}", snapshot.store.messages_stored).ok(); + writeln!(buf, "mesh_store_messages_delivered_total {}", snapshot.store.messages_delivered).ok(); + writeln!(buf, "mesh_store_messages_expired_total {}", snapshot.store.messages_expired).ok(); + + // Crypto metrics. + writeln!(buf, "mesh_crypto_encryptions_total {}", snapshot.crypto.encryptions).ok(); + writeln!(buf, "mesh_crypto_decryptions_total {}", snapshot.crypto.decryptions).ok(); + writeln!(buf, "mesh_crypto_signature_verifications_total {}", snapshot.crypto.signature_verifications).ok(); + writeln!(buf, "mesh_crypto_signature_failures_total {}", snapshot.crypto.signature_failures).ok(); + writeln!(buf, "mesh_crypto_replay_detections_total {}", snapshot.crypto.replay_detections).ok(); + + String::from_utf8(buf).unwrap_or_default() +} + +/// Lightweight HTTP health/metrics server for the mesh node. +/// +/// Serves: +/// - `GET /healthz` — JSON health check +/// - `GET /metricsz` — Prometheus text format metrics +/// +/// Uses raw TCP + minimal HTTP parsing to avoid adding heavy dependencies +/// (no axum/hyper/warp needed). +pub struct HealthServer { + metrics: Arc, + draining: Arc, +} + +impl HealthServer { + /// Create a new health server backed by the given metrics. + pub fn new(metrics: Arc, draining: Arc) -> Self { + Self { metrics, draining } + } + + /// Start serving on the given address. Returns when the listener is bound. + /// + /// The server runs as a background tokio task and stops when dropped or + /// when the `shutdown` future completes. + pub async fn serve( + self, + addr: SocketAddr, + mut shutdown: tokio::sync::watch::Receiver, + ) -> Result { + let listener = TcpListener::bind(addr).await?; + let bound = listener.local_addr()?; + + tracing::info!(addr = %bound, "health/metrics server listening"); + + let metrics = self.metrics; + let draining = self.draining; + + tokio::spawn(async move { + loop { + tokio::select! { + biased; + _ = shutdown.changed() => { + tracing::debug!("health server shutting down"); + break; + } + accept = listener.accept() => { + match accept { + Ok((mut stream, _peer)) => { + let metrics = Arc::clone(&metrics); + let is_draining = draining.load(std::sync::atomic::Ordering::Relaxed); + tokio::spawn(async move { + // Read the request (up to 4KB — we only need the path). + let mut buf = [0u8; 4096]; + let n = match tokio::io::AsyncReadExt::read(&mut stream, &mut buf).await { + Ok(n) => n, + Err(_) => return, + }; + let request = String::from_utf8_lossy(&buf[..n]); + + // Minimal HTTP path extraction. + let path = request + .lines() + .next() + .and_then(|line| line.split_whitespace().nth(1)) + .unwrap_or("/"); + + let (status, content_type, body) = match path { + "/healthz" => { + let snapshot = metrics.snapshot(); + let health = NodeHealth::from_snapshot(&snapshot, is_draining); + let code = health.http_status_code(); + let json = serde_json::to_string_pretty(&health).unwrap_or_default(); + (code, "application/json", json) + } + "/metricsz" => { + let snapshot = metrics.snapshot(); + let text = prometheus_text(&snapshot); + (200, "text/plain; version=0.0.4", text) + } + _ => (404, "text/plain", "Not Found\n".to_string()), + }; + + let response = format!( + "HTTP/1.1 {} {}\r\nContent-Type: {}\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}", + status, + match status { 200 => "OK", 503 => "Service Unavailable", _ => "Not Found" }, + content_type, + body.len(), + body, + ); + + let _ = stream.write_all(response.as_bytes()).await; + }); + } + Err(e) => { + tracing::warn!(error = %e, "health server accept error"); + } + } + } + } + } + }); + + Ok(bound) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::metrics::MeshMetrics; + + #[test] + fn health_from_snapshot_healthy() { + let m = MeshMetrics::new(); + m.transport("tcp").sent.inc_by(100); + m.transport("tcp").connections.set(5); + m.routing.table_size.set(42); + + let snapshot = m.snapshot(); + let health = NodeHealth::from_snapshot(&snapshot, false); + + assert_eq!(health.status, HealthStatus::Healthy); + assert_eq!(health.connections, 5); + assert_eq!(health.routing_table_size, 42); + assert_eq!(health.http_status_code(), 200); + } + + #[test] + fn health_from_snapshot_draining() { + let m = MeshMetrics::new(); + let snapshot = m.snapshot(); + let health = NodeHealth::from_snapshot(&snapshot, true); + + assert_eq!(health.status, HealthStatus::Draining); + assert_eq!(health.http_status_code(), 503); + } + + #[test] + fn health_from_snapshot_degraded() { + let m = MeshMetrics::new(); + // >10% error rate triggers degraded. + m.transport("tcp").sent.inc_by(10); + m.transport("tcp").send_errors.inc_by(5); + + let snapshot = m.snapshot(); + let health = NodeHealth::from_snapshot(&snapshot, false); + + assert_eq!(health.status, HealthStatus::Degraded); + } + + #[test] + fn prometheus_text_format() { + let m = MeshMetrics::new(); + m.transport("tcp").sent.inc_by(42); + m.routing.table_size.set(10); + m.store.messages_stored.inc_by(5); + + let snapshot = m.snapshot(); + let text = prometheus_text(&snapshot); + + assert!(text.contains("mesh_uptime_seconds")); + assert!(text.contains("mesh_transport_sent_total{transport=\"tcp\"} 42")); + assert!(text.contains("mesh_routing_table_size 10")); + assert!(text.contains("mesh_store_messages_stored_total 5")); + } +} diff --git a/docs/status.md b/docs/status.md index a204eda..5d73264 100644 --- a/docs/status.md +++ b/docs/status.md @@ -1,5 +1,41 @@ # Status Log +## 2026-04-11 — Observability & MeshNode run() wiring + +### Completed +- **observability.rs** — new module with health checks, Prometheus text export, HTTP server + - `NodeHealth` struct with per-subsystem health checks (transport, routing, store) + - `HealthStatus` enum (Healthy/Degraded/Draining/Unhealthy) with HTTP status codes + - `prometheus_text()` — renders `MetricsSnapshot` in Prometheus exposition format + - `HealthServer` — lightweight TCP-based HTTP server for `/healthz` and `/metricsz` +- **MeshNode.run()** — starts background tasks and returns a `RunHandle` + - Periodic GC task (store, routing table, rate limiters) with configurable interval + - Health/metrics HTTP server (optional, via `MeshNodeBuilder.health_listen()`) + - Shutdown coordination via `watch` channel +- **RunHandle** — public API for interacting with a running node + - `.node()` — access to the MeshNode + - `.health()` — current health snapshot + - `.metrics_snapshot()` — current metrics + - `.health_addr()` — bound health server address + - `.shutdown()` — graceful shutdown (signals tasks + drains transports) +- **Tracing spans** — `#[tracing::instrument]` on `process_incoming()` and `send()` + - Includes sender/dest address and payload length as span fields + - GC cycle wrapped in `mesh_gc` info span +- **Draining flag** — `AtomicBool` for shutdown awareness; health endpoint returns 503 + +### Test Coverage +- 232 total tests passing (212 lib + 3 fapp_flow + 1 meshservice + 16 multi_node) +- 7 new observability unit tests (health healthy/degraded/draining, prometheus format) +- Full workspace `cargo check` clean + +### What's Next +1. Wire `MeshNode.run()` into an example binary or the server +2. Announce loop task (periodic re-announce to neighbors) +3. Grafana dashboard for mesh metrics +4. Integration test for health HTTP endpoint + +--- + ## 2026-04-01 — meshservice workspace integration ### Completed