feat: add observability module and wire MeshNode run() with background tasks

Add health checks (/healthz), Prometheus metrics export (/metricsz),
and tracing spans to the P2P mesh node. MeshNode.run() starts GC and
health server as background tasks, returning a RunHandle for lifecycle
management. Health endpoint returns 503 during graceful shutdown drain.
This commit is contained in:
2026-04-11 17:52:03 +02:00
parent 95ce8898fd
commit da0085f1a6
4 changed files with 592 additions and 2 deletions

View File

@@ -42,6 +42,7 @@ pub mod transport_iroh;
pub mod transport_manager;
pub mod transport_tcp;
pub mod transport_lora;
pub mod observability;
pub mod viz_log;
#[cfg(feature = "traffic-resistance")]
pub mod traffic_resistance;

View File

@@ -10,13 +10,16 @@
//!
//! This is the main entry point for production deployments.
use std::net::SocketAddr;
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use iroh::{Endpoint, EndpointAddr, PublicKey, SecretKey};
use tokio::sync::mpsc;
use tokio::sync::{mpsc, watch};
use crate::address::MeshAddress;
use crate::announce_protocol::{self, AnnounceConfig as AnnounceProtoConfig, AnnounceDedup};
use crate::broadcast::BroadcastManager;
use crate::config::MeshConfig;
use crate::envelope::MeshEnvelope;
@@ -26,6 +29,7 @@ use crate::fapp_router::{is_fapp_payload, FappRouter};
use crate::identity::MeshIdentity;
use crate::mesh_router::{IncomingAction, MeshRouter};
use crate::metrics::{self, MeshMetrics};
use crate::observability::{HealthServer, NodeHealth};
use crate::rate_limit::{BackpressureController, RateLimiter};
use crate::routing_table::RoutingTable;
use crate::shutdown::{ShutdownCoordinator, ShutdownSignal, ShutdownTrigger};
@@ -68,6 +72,10 @@ pub struct MeshNode {
shutdown: Arc<ShutdownCoordinator>,
/// Shutdown trigger (clone for external use).
shutdown_trigger: ShutdownTrigger,
/// Whether the node is draining (shutting down).
draining: Arc<AtomicBool>,
/// Health/metrics HTTP listen address (if configured).
health_listen: Option<SocketAddr>,
}
/// Builder for MeshNode with sensible defaults.
@@ -76,6 +84,7 @@ pub struct MeshNodeBuilder {
identity: Option<MeshIdentity>,
secret_key: Option<SecretKey>,
fapp_capabilities: u16,
health_listen: Option<SocketAddr>,
}
impl MeshNodeBuilder {
@@ -85,6 +94,7 @@ impl MeshNodeBuilder {
identity: None,
secret_key: None,
fapp_capabilities: 0,
health_listen: None,
}
}
@@ -124,6 +134,12 @@ impl MeshNodeBuilder {
self
}
/// Enable health/metrics HTTP endpoint on the given address.
pub fn health_listen(mut self, addr: SocketAddr) -> Self {
self.health_listen = Some(addr);
self
}
/// Build and start the mesh node.
pub async fn build(self) -> MeshResult<MeshNode> {
MeshNode::start(
@@ -131,6 +147,7 @@ impl MeshNodeBuilder {
self.identity,
self.secret_key,
self.fapp_capabilities,
self.health_listen,
)
.await
}
@@ -149,6 +166,7 @@ impl MeshNode {
identity: Option<MeshIdentity>,
secret_key: Option<SecretKey>,
fapp_capabilities: u16,
health_listen: Option<SocketAddr>,
) -> MeshResult<Self> {
// Initialize metrics
let metrics = Arc::new(MeshMetrics::new());
@@ -219,6 +237,8 @@ impl MeshNode {
let shutdown = Arc::new(ShutdownCoordinator::new());
let (shutdown_trigger, _shutdown_signal) = ShutdownSignal::new();
let draining = Arc::new(AtomicBool::new(false));
let node = Self {
config,
endpoint,
@@ -235,11 +255,14 @@ impl MeshNode {
metrics,
shutdown,
shutdown_trigger,
draining,
health_listen,
};
tracing::info!(
mesh_addr = %node.address,
fapp = fapp_capabilities != 0,
health = ?node.health_listen,
"Mesh node started"
);
@@ -301,7 +324,19 @@ impl MeshNode {
self.shutdown_trigger.clone()
}
/// Whether the node is currently draining (shutting down).
pub fn is_draining(&self) -> bool {
self.draining.load(std::sync::atomic::Ordering::Relaxed)
}
/// Get a snapshot of the current node health.
pub fn health(&self) -> NodeHealth {
let snapshot = self.metrics.snapshot();
NodeHealth::from_snapshot(&snapshot, self.is_draining())
}
/// Send a mesh envelope to a peer.
#[tracing::instrument(skip(self, envelope), fields(dest = %dest, payload_len = envelope.payload.len()))]
pub async fn send(&self, dest: &TransportAddr, envelope: &MeshEnvelope) -> MeshResult<()> {
let wire = envelope.to_wire();
@@ -315,6 +350,7 @@ impl MeshNode {
}
/// Process an incoming envelope with rate limiting and metrics.
#[tracing::instrument(skip(self, envelope), fields(sender = %sender, payload_len = envelope.payload.len()))]
pub fn process_incoming(&self, sender: &MeshAddress, envelope: MeshEnvelope) -> MeshResult<IncomingAction> {
// Rate limiting check
let rate_result = self.rate_limiter.check_message(sender)?;
@@ -444,10 +480,104 @@ impl MeshNode {
Ok(stats)
}
/// Run the mesh node event loop with background tasks.
///
/// Starts:
/// - Periodic garbage collection (routing table, store, rate limiters)
/// - Health/metrics HTTP server (if `health_listen` is configured)
///
/// Returns a [`RunHandle`] that can be used to await shutdown or trigger it.
pub async fn run(self) -> MeshResult<RunHandle> {
let (shutdown_tx, shutdown_rx) = watch::channel(false);
// Start health server if configured.
let health_addr = if let Some(addr) = self.health_listen {
let server = HealthServer::new(
Arc::clone(&self.metrics),
Arc::clone(&self.draining),
);
match server.serve(addr, shutdown_rx.clone()).await {
Ok(bound) => Some(bound),
Err(e) => {
tracing::warn!(error = %e, "failed to start health server");
None
}
}
} else {
None
};
// Spawn GC task.
let gc_metrics = Arc::clone(&self.metrics);
let gc_store = Arc::clone(&self.mesh_store);
let gc_routing = Arc::clone(&self.routing_table);
let gc_rate_limiter = Arc::clone(&self.rate_limiter);
let gc_interval = self.config.routing.gc_interval;
let mut gc_shutdown = shutdown_rx.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(gc_interval);
interval.tick().await; // skip immediate first tick
loop {
tokio::select! {
biased;
_ = gc_shutdown.changed() => break,
_ = interval.tick() => {
let _span = tracing::info_span!("mesh_gc").entered();
let mut expired_messages = 0usize;
let mut expired_routes = 0usize;
let mut cleaned_limiters = 0usize;
// GC store.
if let Ok(mut store) = gc_store.lock() {
expired_messages = store.gc_expired();
gc_metrics.store.messages_expired.inc_by(expired_messages as u64);
}
// GC routing table.
if let Ok(mut table) = gc_routing.write() {
expired_routes = table.remove_expired();
gc_metrics.routing.routes_expired.inc_by(expired_routes as u64);
}
// GC rate limiters.
cleaned_limiters = gc_rate_limiter.cleanup(Duration::from_secs(3600));
if expired_messages > 0 || expired_routes > 0 || cleaned_limiters > 0 {
tracing::debug!(
messages = expired_messages,
routes = expired_routes,
rate_limiters = cleaned_limiters,
"GC cycle completed"
);
}
}
}
}
});
tracing::info!(
mesh_addr = %self.address,
health = ?health_addr,
"Mesh node running"
);
Ok(RunHandle {
node: self,
shutdown_tx,
health_addr,
})
}
/// Gracefully shut down the node.
pub async fn shutdown(self) {
tracing::info!("Mesh node shutting down");
// Mark as draining for health checks.
self.draining.store(true, std::sync::atomic::Ordering::Relaxed);
// Trigger shutdown
self.shutdown_trigger.trigger();
@@ -455,7 +585,7 @@ impl MeshNode {
self.shutdown.shutdown().await;
// Close transports
self.transport_manager.close_all().await;
let _ = self.transport_manager.close_all().await;
// Close iroh endpoint
self.endpoint.close().await;
@@ -472,6 +602,48 @@ pub struct GcStats {
pub rate_limiters_cleaned: usize,
}
/// Handle for a running mesh node.
///
/// Provides access to the node and controls for shutdown.
pub struct RunHandle {
/// The running mesh node.
node: MeshNode,
/// Shutdown sender — drop or send to stop background tasks.
shutdown_tx: watch::Sender<bool>,
/// Bound health server address (if started).
health_addr: Option<SocketAddr>,
}
impl RunHandle {
/// Get a reference to the running mesh node.
pub fn node(&self) -> &MeshNode {
&self.node
}
/// Get the health server's bound address, if running.
pub fn health_addr(&self) -> Option<SocketAddr> {
self.health_addr
}
/// Trigger graceful shutdown and wait for completion.
pub async fn shutdown(self) {
// Signal background tasks to stop.
let _ = self.shutdown_tx.send(true);
// Run node shutdown (drains transports, etc.).
self.node.shutdown().await;
}
/// Get a snapshot of current node health.
pub fn health(&self) -> NodeHealth {
self.node.health()
}
/// Get a snapshot of current metrics.
pub fn metrics_snapshot(&self) -> crate::metrics::MetricsSnapshot {
self.node.metrics().snapshot()
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -0,0 +1,381 @@
//! Observability for mesh nodes: health checks, metrics export, and tracing helpers.
//!
//! Provides:
//! - [`NodeHealth`] — structured health status for the mesh node
//! - [`HealthServer`] — lightweight HTTP server for `/healthz` and `/metricsz`
//! - Prometheus text format export from [`MeshMetrics`]
use std::collections::HashMap;
use std::io::Write as IoWrite;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpListener;
use crate::metrics::{MeshMetrics, MetricsSnapshot};
/// Node health status.
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
#[serde(rename_all = "lowercase")]
pub enum HealthStatus {
/// Node is healthy and accepting traffic.
Healthy,
/// Node is degraded but still operational.
Degraded,
/// Node is shutting down (draining connections).
Draining,
/// Node is unhealthy.
Unhealthy,
}
impl std::fmt::Display for HealthStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Healthy => write!(f, "healthy"),
Self::Degraded => write!(f, "degraded"),
Self::Draining => write!(f, "draining"),
Self::Unhealthy => write!(f, "unhealthy"),
}
}
}
/// Structured health check response.
#[derive(Debug, Clone, serde::Serialize)]
pub struct NodeHealth {
/// Overall node status.
pub status: HealthStatus,
/// Node uptime in seconds.
pub uptime_secs: u64,
/// Number of active transport connections.
pub connections: u64,
/// Routing table size.
pub routing_table_size: u64,
/// Store queue depth.
pub store_size: u64,
/// Messages processed since start.
pub messages_processed: u64,
/// Individual subsystem checks.
pub checks: HashMap<String, SubsystemHealth>,
}
/// Per-subsystem health.
#[derive(Debug, Clone, serde::Serialize)]
pub struct SubsystemHealth {
pub status: HealthStatus,
pub message: String,
}
impl NodeHealth {
/// Build a health check from a metrics snapshot and node state.
pub fn from_snapshot(snapshot: &MetricsSnapshot, is_draining: bool) -> Self {
let mut checks = HashMap::new();
// Transport health: degraded if error rate > 10%.
let total_sent: u64 = snapshot.transports.values().map(|t| t.sent).sum();
let total_errors: u64 = snapshot.transports.values().map(|t| t.send_errors).sum();
let transport_status = if is_draining {
HealthStatus::Draining
} else if total_sent > 0 && total_errors * 10 > total_sent {
HealthStatus::Degraded
} else {
HealthStatus::Healthy
};
checks.insert(
"transport".to_string(),
SubsystemHealth {
status: transport_status,
message: format!(
"sent={}, errors={}, connections={}",
total_sent,
total_errors,
snapshot.transports.values().map(|t| t.connections).sum::<u64>(),
),
},
);
// Routing health.
let routing_status = HealthStatus::Healthy;
checks.insert(
"routing".to_string(),
SubsystemHealth {
status: routing_status,
message: format!(
"table_size={}, lookups={}, misses={}",
snapshot.routing.table_size,
snapshot.routing.lookups,
snapshot.routing.lookup_misses,
),
},
);
// Store health.
checks.insert(
"store".to_string(),
SubsystemHealth {
status: HealthStatus::Healthy,
message: format!(
"stored={}, delivered={}, expired={}, current={}",
snapshot.store.messages_stored,
snapshot.store.messages_delivered,
snapshot.store.messages_expired,
snapshot.store.current_size,
),
},
);
// Overall status: worst of all subsystems.
let overall = if is_draining {
HealthStatus::Draining
} else if checks.values().any(|c| c.status == HealthStatus::Unhealthy) {
HealthStatus::Unhealthy
} else if checks.values().any(|c| c.status == HealthStatus::Degraded) {
HealthStatus::Degraded
} else {
HealthStatus::Healthy
};
let connections = snapshot.transports.values().map(|t| t.connections).sum();
let messages_processed: u64 = snapshot.transports.values().map(|t| t.received).sum();
Self {
status: overall,
uptime_secs: snapshot.uptime_secs,
connections,
routing_table_size: snapshot.routing.table_size,
store_size: snapshot.store.current_size,
messages_processed,
checks,
}
}
/// HTTP status code for this health status.
pub fn http_status_code(&self) -> u16 {
match self.status {
HealthStatus::Healthy => 200,
HealthStatus::Degraded => 200,
HealthStatus::Draining => 503,
HealthStatus::Unhealthy => 503,
}
}
}
/// Render a [`MetricsSnapshot`] in Prometheus text exposition format.
pub fn prometheus_text(snapshot: &MetricsSnapshot) -> String {
let mut buf = Vec::with_capacity(2048);
// Uptime.
writeln!(buf, "# HELP mesh_uptime_seconds Node uptime in seconds.").ok();
writeln!(buf, "# TYPE mesh_uptime_seconds gauge").ok();
writeln!(buf, "mesh_uptime_seconds {}", snapshot.uptime_secs).ok();
// Transport metrics.
for (name, t) in &snapshot.transports {
writeln!(buf, "# HELP mesh_transport_sent_total Messages sent via transport.").ok();
writeln!(buf, "# TYPE mesh_transport_sent_total counter").ok();
writeln!(buf, "mesh_transport_sent_total{{transport=\"{}\"}} {}", name, t.sent).ok();
writeln!(buf, "mesh_transport_received_total{{transport=\"{}\"}} {}", name, t.received).ok();
writeln!(buf, "mesh_transport_send_errors_total{{transport=\"{}\"}} {}", name, t.send_errors).ok();
writeln!(buf, "mesh_transport_bytes_sent_total{{transport=\"{}\"}} {}", name, t.bytes_sent).ok();
writeln!(buf, "mesh_transport_bytes_received_total{{transport=\"{}\"}} {}", name, t.bytes_received).ok();
writeln!(buf, "# HELP mesh_transport_connections Active connections.").ok();
writeln!(buf, "# TYPE mesh_transport_connections gauge").ok();
writeln!(buf, "mesh_transport_connections{{transport=\"{}\"}} {}", name, t.connections).ok();
}
// Routing metrics.
writeln!(buf, "# HELP mesh_routing_table_size Current routing table entries.").ok();
writeln!(buf, "# TYPE mesh_routing_table_size gauge").ok();
writeln!(buf, "mesh_routing_table_size {}", snapshot.routing.table_size).ok();
writeln!(buf, "mesh_routing_lookups_total {}", snapshot.routing.lookups).ok();
writeln!(buf, "mesh_routing_lookup_misses_total {}", snapshot.routing.lookup_misses).ok();
writeln!(buf, "mesh_routing_announcements_processed_total {}", snapshot.routing.announcements_processed).ok();
// Store metrics.
writeln!(buf, "# HELP mesh_store_current_size Current messages in store.").ok();
writeln!(buf, "# TYPE mesh_store_current_size gauge").ok();
writeln!(buf, "mesh_store_current_size {}", snapshot.store.current_size).ok();
writeln!(buf, "mesh_store_messages_stored_total {}", snapshot.store.messages_stored).ok();
writeln!(buf, "mesh_store_messages_delivered_total {}", snapshot.store.messages_delivered).ok();
writeln!(buf, "mesh_store_messages_expired_total {}", snapshot.store.messages_expired).ok();
// Crypto metrics.
writeln!(buf, "mesh_crypto_encryptions_total {}", snapshot.crypto.encryptions).ok();
writeln!(buf, "mesh_crypto_decryptions_total {}", snapshot.crypto.decryptions).ok();
writeln!(buf, "mesh_crypto_signature_verifications_total {}", snapshot.crypto.signature_verifications).ok();
writeln!(buf, "mesh_crypto_signature_failures_total {}", snapshot.crypto.signature_failures).ok();
writeln!(buf, "mesh_crypto_replay_detections_total {}", snapshot.crypto.replay_detections).ok();
String::from_utf8(buf).unwrap_or_default()
}
/// Lightweight HTTP health/metrics server for the mesh node.
///
/// Serves:
/// - `GET /healthz` — JSON health check
/// - `GET /metricsz` — Prometheus text format metrics
///
/// Uses raw TCP + minimal HTTP parsing to avoid adding heavy dependencies
/// (no axum/hyper/warp needed).
pub struct HealthServer {
metrics: Arc<MeshMetrics>,
draining: Arc<std::sync::atomic::AtomicBool>,
}
impl HealthServer {
/// Create a new health server backed by the given metrics.
pub fn new(metrics: Arc<MeshMetrics>, draining: Arc<std::sync::atomic::AtomicBool>) -> Self {
Self { metrics, draining }
}
/// Start serving on the given address. Returns when the listener is bound.
///
/// The server runs as a background tokio task and stops when dropped or
/// when the `shutdown` future completes.
pub async fn serve(
self,
addr: SocketAddr,
mut shutdown: tokio::sync::watch::Receiver<bool>,
) -> Result<SocketAddr, std::io::Error> {
let listener = TcpListener::bind(addr).await?;
let bound = listener.local_addr()?;
tracing::info!(addr = %bound, "health/metrics server listening");
let metrics = self.metrics;
let draining = self.draining;
tokio::spawn(async move {
loop {
tokio::select! {
biased;
_ = shutdown.changed() => {
tracing::debug!("health server shutting down");
break;
}
accept = listener.accept() => {
match accept {
Ok((mut stream, _peer)) => {
let metrics = Arc::clone(&metrics);
let is_draining = draining.load(std::sync::atomic::Ordering::Relaxed);
tokio::spawn(async move {
// Read the request (up to 4KB — we only need the path).
let mut buf = [0u8; 4096];
let n = match tokio::io::AsyncReadExt::read(&mut stream, &mut buf).await {
Ok(n) => n,
Err(_) => return,
};
let request = String::from_utf8_lossy(&buf[..n]);
// Minimal HTTP path extraction.
let path = request
.lines()
.next()
.and_then(|line| line.split_whitespace().nth(1))
.unwrap_or("/");
let (status, content_type, body) = match path {
"/healthz" => {
let snapshot = metrics.snapshot();
let health = NodeHealth::from_snapshot(&snapshot, is_draining);
let code = health.http_status_code();
let json = serde_json::to_string_pretty(&health).unwrap_or_default();
(code, "application/json", json)
}
"/metricsz" => {
let snapshot = metrics.snapshot();
let text = prometheus_text(&snapshot);
(200, "text/plain; version=0.0.4", text)
}
_ => (404, "text/plain", "Not Found\n".to_string()),
};
let response = format!(
"HTTP/1.1 {} {}\r\nContent-Type: {}\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
status,
match status { 200 => "OK", 503 => "Service Unavailable", _ => "Not Found" },
content_type,
body.len(),
body,
);
let _ = stream.write_all(response.as_bytes()).await;
});
}
Err(e) => {
tracing::warn!(error = %e, "health server accept error");
}
}
}
}
}
});
Ok(bound)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::metrics::MeshMetrics;
#[test]
fn health_from_snapshot_healthy() {
let m = MeshMetrics::new();
m.transport("tcp").sent.inc_by(100);
m.transport("tcp").connections.set(5);
m.routing.table_size.set(42);
let snapshot = m.snapshot();
let health = NodeHealth::from_snapshot(&snapshot, false);
assert_eq!(health.status, HealthStatus::Healthy);
assert_eq!(health.connections, 5);
assert_eq!(health.routing_table_size, 42);
assert_eq!(health.http_status_code(), 200);
}
#[test]
fn health_from_snapshot_draining() {
let m = MeshMetrics::new();
let snapshot = m.snapshot();
let health = NodeHealth::from_snapshot(&snapshot, true);
assert_eq!(health.status, HealthStatus::Draining);
assert_eq!(health.http_status_code(), 503);
}
#[test]
fn health_from_snapshot_degraded() {
let m = MeshMetrics::new();
// >10% error rate triggers degraded.
m.transport("tcp").sent.inc_by(10);
m.transport("tcp").send_errors.inc_by(5);
let snapshot = m.snapshot();
let health = NodeHealth::from_snapshot(&snapshot, false);
assert_eq!(health.status, HealthStatus::Degraded);
}
#[test]
fn prometheus_text_format() {
let m = MeshMetrics::new();
m.transport("tcp").sent.inc_by(42);
m.routing.table_size.set(10);
m.store.messages_stored.inc_by(5);
let snapshot = m.snapshot();
let text = prometheus_text(&snapshot);
assert!(text.contains("mesh_uptime_seconds"));
assert!(text.contains("mesh_transport_sent_total{transport=\"tcp\"} 42"));
assert!(text.contains("mesh_routing_table_size 10"));
assert!(text.contains("mesh_store_messages_stored_total 5"));
}
}