Files
quicproquo/crates/quicprochat-p2p/src/metrics.rs
Christian Nennemann 024b6c91d1 feat(p2p): add production infrastructure modules
- error.rs: Structured error types with context for all subsystems
  (transport, routing, crypto, protocol, store, config)
- config.rs: Runtime configuration with TOML parsing and validation
- metrics.rs: Counter/gauge/histogram metrics with transport-specific
  tracking and JSON-serializable snapshots
- rate_limit.rs: Token bucket rate limiting with per-peer tracking,
  duty cycle enforcement for LoRa, and backpressure control

These modules provide the foundation for production deployment.
2026-04-01 09:16:44 +02:00

503 lines
13 KiB
Rust

//! Observability metrics for mesh networking.
//!
//! This module provides structured metrics collection for monitoring
//! mesh node health, performance, and resource usage.
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
/// Atomic counter for thread-safe metric updates.
#[derive(Debug, Default)]
pub struct Counter(AtomicU64);
impl Counter {
pub fn new() -> Self {
Self(AtomicU64::new(0))
}
pub fn inc(&self) {
self.0.fetch_add(1, Ordering::Relaxed);
}
pub fn inc_by(&self, n: u64) {
self.0.fetch_add(n, Ordering::Relaxed);
}
pub fn get(&self) -> u64 {
self.0.load(Ordering::Relaxed)
}
pub fn reset(&self) -> u64 {
self.0.swap(0, Ordering::Relaxed)
}
}
/// Gauge for values that can go up and down.
#[derive(Debug, Default)]
pub struct Gauge(AtomicU64);
impl Gauge {
pub fn new() -> Self {
Self(AtomicU64::new(0))
}
pub fn set(&self, val: u64) {
self.0.store(val, Ordering::Relaxed);
}
pub fn inc(&self) {
self.0.fetch_add(1, Ordering::Relaxed);
}
pub fn dec(&self) {
self.0.fetch_sub(1, Ordering::Relaxed);
}
pub fn get(&self) -> u64 {
self.0.load(Ordering::Relaxed)
}
}
/// Histogram for tracking distributions (simple bucket-based).
#[derive(Debug)]
pub struct Histogram {
/// Bucket boundaries (upper limits).
buckets: Vec<u64>,
/// Count in each bucket.
counts: Vec<AtomicU64>,
/// Sum of all values.
sum: AtomicU64,
/// Total count.
count: AtomicU64,
}
impl Histogram {
/// Create with default latency buckets (ms).
pub fn latency_ms() -> Self {
Self::new(vec![1, 5, 10, 25, 50, 100, 250, 500, 1000, 5000, 10000])
}
/// Create with default size buckets (bytes).
pub fn size_bytes() -> Self {
Self::new(vec![64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 65536])
}
pub fn new(buckets: Vec<u64>) -> Self {
let counts = buckets.iter().map(|_| AtomicU64::new(0)).collect();
Self {
buckets,
counts,
sum: AtomicU64::new(0),
count: AtomicU64::new(0),
}
}
pub fn observe(&self, value: u64) {
self.sum.fetch_add(value, Ordering::Relaxed);
self.count.fetch_add(1, Ordering::Relaxed);
for (i, &upper) in self.buckets.iter().enumerate() {
if value <= upper {
self.counts[i].fetch_add(1, Ordering::Relaxed);
return;
}
}
// Value exceeds all buckets — count in last
if let Some(last) = self.counts.last() {
last.fetch_add(1, Ordering::Relaxed);
}
}
pub fn observe_duration(&self, d: Duration) {
self.observe(d.as_millis() as u64);
}
pub fn sum(&self) -> u64 {
self.sum.load(Ordering::Relaxed)
}
pub fn count(&self) -> u64 {
self.count.load(Ordering::Relaxed)
}
pub fn avg(&self) -> f64 {
let count = self.count();
if count == 0 {
0.0
} else {
self.sum() as f64 / count as f64
}
}
}
/// Per-transport metrics.
#[derive(Debug, Default)]
pub struct TransportMetrics {
/// Messages sent successfully.
pub sent: Counter,
/// Messages received.
pub received: Counter,
/// Send failures.
pub send_errors: Counter,
/// Receive errors.
pub recv_errors: Counter,
/// Bytes sent.
pub bytes_sent: Counter,
/// Bytes received.
pub bytes_received: Counter,
/// Active connections (for connection-oriented transports).
pub connections: Gauge,
}
/// Per-peer metrics.
#[derive(Debug)]
pub struct PeerMetrics {
/// Messages sent to this peer.
pub messages_sent: Counter,
/// Messages received from this peer.
pub messages_received: Counter,
/// Last seen timestamp.
pub last_seen: RwLock<Option<Instant>>,
/// Round-trip time samples.
pub rtt_ms: Histogram,
}
impl Default for PeerMetrics {
fn default() -> Self {
Self {
messages_sent: Counter::new(),
messages_received: Counter::new(),
last_seen: RwLock::new(None),
rtt_ms: Histogram::latency_ms(),
}
}
}
impl PeerMetrics {
pub fn touch(&self) {
if let Ok(mut last) = self.last_seen.write() {
*last = Some(Instant::now());
}
}
pub fn age(&self) -> Option<Duration> {
self.last_seen
.read()
.ok()
.and_then(|t| t.map(|i| i.elapsed()))
}
}
/// Global mesh metrics.
#[derive(Debug)]
pub struct MeshMetrics {
/// Transport metrics by name.
pub transports: RwLock<HashMap<String, Arc<TransportMetrics>>>,
/// Routing metrics.
pub routing: RoutingMetrics,
/// Store metrics.
pub store: StoreMetrics,
/// Crypto metrics.
pub crypto: CryptoMetrics,
/// Protocol metrics.
pub protocol: ProtocolMetrics,
/// Node start time.
pub started_at: Instant,
}
impl Default for MeshMetrics {
fn default() -> Self {
Self::new()
}
}
impl MeshMetrics {
pub fn new() -> Self {
Self {
transports: RwLock::new(HashMap::new()),
routing: RoutingMetrics::default(),
store: StoreMetrics::default(),
crypto: CryptoMetrics::default(),
protocol: ProtocolMetrics::default(),
started_at: Instant::now(),
}
}
/// Get or create transport metrics.
pub fn transport(&self, name: &str) -> Arc<TransportMetrics> {
{
let map = self.transports.read().unwrap();
if let Some(m) = map.get(name) {
return Arc::clone(m);
}
}
let mut map = self.transports.write().unwrap();
map.entry(name.to_string())
.or_insert_with(|| Arc::new(TransportMetrics::default()))
.clone()
}
/// Node uptime.
pub fn uptime(&self) -> Duration {
self.started_at.elapsed()
}
/// Export metrics as a snapshot.
pub fn snapshot(&self) -> MetricsSnapshot {
let transports = self.transports.read().unwrap();
let transport_snapshots: HashMap<String, TransportSnapshot> = transports
.iter()
.map(|(name, m)| {
(
name.clone(),
TransportSnapshot {
sent: m.sent.get(),
received: m.received.get(),
send_errors: m.send_errors.get(),
bytes_sent: m.bytes_sent.get(),
bytes_received: m.bytes_received.get(),
connections: m.connections.get(),
},
)
})
.collect();
MetricsSnapshot {
uptime_secs: self.uptime().as_secs(),
transports: transport_snapshots,
routing: RoutingSnapshot {
table_size: self.routing.table_size.get(),
lookups: self.routing.lookups.get(),
lookup_misses: self.routing.lookup_misses.get(),
announcements_processed: self.routing.announcements_processed.get(),
},
store: StoreSnapshot {
messages_stored: self.store.messages_stored.get(),
messages_delivered: self.store.messages_delivered.get(),
messages_expired: self.store.messages_expired.get(),
current_size: self.store.current_size.get(),
},
crypto: CryptoSnapshot {
encryptions: self.crypto.encryptions.get(),
decryptions: self.crypto.decryptions.get(),
signature_verifications: self.crypto.signature_verifications.get(),
signature_failures: self.crypto.signature_failures.get(),
replay_detections: self.crypto.replay_detections.get(),
},
}
}
}
/// Routing subsystem metrics.
#[derive(Debug, Default)]
pub struct RoutingMetrics {
/// Current routing table size.
pub table_size: Gauge,
/// Route lookups.
pub lookups: Counter,
/// Route lookup misses.
pub lookup_misses: Counter,
/// Routes added.
pub routes_added: Counter,
/// Routes expired.
pub routes_expired: Counter,
/// Announcements processed.
pub announcements_processed: Counter,
/// Announcements forwarded.
pub announcements_forwarded: Counter,
/// Duplicate announcements dropped.
pub duplicates_dropped: Counter,
}
/// Store subsystem metrics.
#[derive(Debug, Default)]
pub struct StoreMetrics {
/// Messages stored.
pub messages_stored: Counter,
/// Messages delivered.
pub messages_delivered: Counter,
/// Messages expired.
pub messages_expired: Counter,
/// Current store size.
pub current_size: Gauge,
/// Store capacity reached events.
pub capacity_reached: Counter,
}
/// Crypto subsystem metrics.
#[derive(Debug)]
pub struct CryptoMetrics {
/// Successful encryptions.
pub encryptions: Counter,
/// Successful decryptions.
pub decryptions: Counter,
/// Decryption failures.
pub decryption_failures: Counter,
/// Signature verifications.
pub signature_verifications: Counter,
/// Signature failures.
pub signature_failures: Counter,
/// Replay attacks detected.
pub replay_detections: Counter,
/// Encryption latency.
pub encrypt_latency: Histogram,
}
impl Default for CryptoMetrics {
fn default() -> Self {
Self {
encryptions: Counter::new(),
decryptions: Counter::new(),
decryption_failures: Counter::new(),
signature_verifications: Counter::new(),
signature_failures: Counter::new(),
replay_detections: Counter::new(),
encrypt_latency: Histogram::latency_ms(),
}
}
}
/// Protocol metrics.
#[derive(Debug, Default)]
pub struct ProtocolMetrics {
/// Messages parsed.
pub messages_parsed: Counter,
/// Parse errors.
pub parse_errors: Counter,
/// Unknown message types.
pub unknown_types: Counter,
/// Messages too large.
pub oversized: Counter,
}
/// Point-in-time snapshot of metrics.
#[derive(Debug, Clone, serde::Serialize)]
pub struct MetricsSnapshot {
pub uptime_secs: u64,
pub transports: HashMap<String, TransportSnapshot>,
pub routing: RoutingSnapshot,
pub store: StoreSnapshot,
pub crypto: CryptoSnapshot,
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct TransportSnapshot {
pub sent: u64,
pub received: u64,
pub send_errors: u64,
pub bytes_sent: u64,
pub bytes_received: u64,
pub connections: u64,
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct RoutingSnapshot {
pub table_size: u64,
pub lookups: u64,
pub lookup_misses: u64,
pub announcements_processed: u64,
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct StoreSnapshot {
pub messages_stored: u64,
pub messages_delivered: u64,
pub messages_expired: u64,
pub current_size: u64,
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct CryptoSnapshot {
pub encryptions: u64,
pub decryptions: u64,
pub signature_verifications: u64,
pub signature_failures: u64,
pub replay_detections: u64,
}
/// Global metrics instance.
static GLOBAL_METRICS: std::sync::OnceLock<Arc<MeshMetrics>> = std::sync::OnceLock::new();
/// Get the global metrics instance.
pub fn metrics() -> &'static Arc<MeshMetrics> {
GLOBAL_METRICS.get_or_init(|| Arc::new(MeshMetrics::new()))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn counter_basics() {
let c = Counter::new();
assert_eq!(c.get(), 0);
c.inc();
assert_eq!(c.get(), 1);
c.inc_by(5);
assert_eq!(c.get(), 6);
let old = c.reset();
assert_eq!(old, 6);
assert_eq!(c.get(), 0);
}
#[test]
fn gauge_basics() {
let g = Gauge::new();
assert_eq!(g.get(), 0);
g.set(10);
assert_eq!(g.get(), 10);
g.inc();
assert_eq!(g.get(), 11);
g.dec();
assert_eq!(g.get(), 10);
}
#[test]
fn histogram_basics() {
let h = Histogram::new(vec![10, 50, 100]);
h.observe(5);
h.observe(25);
h.observe(75);
h.observe(200);
assert_eq!(h.count(), 4);
assert_eq!(h.sum(), 5 + 25 + 75 + 200);
}
#[test]
fn transport_metrics() {
let m = MeshMetrics::new();
let tcp = m.transport("tcp");
tcp.sent.inc();
tcp.bytes_sent.inc_by(100);
assert_eq!(tcp.sent.get(), 1);
assert_eq!(tcp.bytes_sent.get(), 100);
// Same name returns same instance
let tcp2 = m.transport("tcp");
assert_eq!(tcp2.sent.get(), 1);
}
#[test]
fn snapshot_serializes() {
let m = MeshMetrics::new();
m.transport("tcp").sent.inc();
m.routing.lookups.inc_by(10);
let snapshot = m.snapshot();
let json = serde_json::to_string(&snapshot).expect("serialize");
assert!(json.contains("\"uptime_secs\":"));
assert!(json.contains("\"lookups\":10"));
}
#[test]
fn global_metrics() {
let m = metrics();
m.protocol.messages_parsed.inc();
assert_eq!(metrics().protocol.messages_parsed.get(), 1);
}
}