Compare commits
3 Commits
ac36534063
...
50a63a6b96
| Author | SHA1 | Date | |
|---|---|---|---|
| 50a63a6b96 | |||
| a258f98a40 | |||
| 024b6c91d1 |
18
Cargo.lock
generated
18
Cargo.lock
generated
@@ -2157,6 +2157,22 @@ version = "1.0.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9"
|
||||
|
||||
[[package]]
|
||||
name = "humantime"
|
||||
version = "2.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "135b12329e5e3ce057a9f972339ea52bc954fe1e9358ef27f95e89716fbc5424"
|
||||
|
||||
[[package]]
|
||||
name = "humantime-serde"
|
||||
version = "1.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "57a3db5ea5923d99402c94e9feb261dc5ee9b4efa158b0315f788cf549cc200c"
|
||||
dependencies = [
|
||||
"humantime",
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hybrid-array"
|
||||
version = "0.2.3"
|
||||
@@ -4454,6 +4470,7 @@ dependencies = [
|
||||
"ciborium",
|
||||
"hex",
|
||||
"hkdf",
|
||||
"humantime-serde",
|
||||
"iroh",
|
||||
"quicprochat-core",
|
||||
"rand 0.8.5",
|
||||
@@ -4463,6 +4480,7 @@ dependencies = [
|
||||
"tempfile",
|
||||
"thiserror 1.0.69",
|
||||
"tokio",
|
||||
"toml",
|
||||
"tracing",
|
||||
"x25519-dalek",
|
||||
"zeroize",
|
||||
|
||||
@@ -37,6 +37,10 @@ x25519-dalek = { workspace = true }
|
||||
hkdf = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
|
||||
# Configuration
|
||||
toml = "0.8"
|
||||
humantime-serde = "1"
|
||||
|
||||
[dev-dependencies]
|
||||
tempfile = "3"
|
||||
|
||||
|
||||
460
crates/quicprochat-p2p/src/config.rs
Normal file
460
crates/quicprochat-p2p/src/config.rs
Normal file
@@ -0,0 +1,460 @@
|
||||
//! Runtime configuration for mesh networking.
|
||||
//!
|
||||
//! This module provides centralized configuration with sensible defaults
|
||||
//! and validation. Configuration can be loaded from files, environment
|
||||
//! variables, or set programmatically.
|
||||
|
||||
use std::path::PathBuf;
|
||||
use std::time::Duration;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::error::{ConfigError, MeshResult};
|
||||
use crate::transport::CryptoMode;
|
||||
|
||||
/// Top-level mesh node configuration.
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
#[serde(default)]
|
||||
pub struct MeshConfig {
|
||||
/// Node identity configuration.
|
||||
pub identity: IdentityConfig,
|
||||
/// Announce protocol configuration.
|
||||
pub announce: AnnounceConfig,
|
||||
/// Routing configuration.
|
||||
pub routing: RoutingConfig,
|
||||
/// Store-and-forward configuration.
|
||||
pub store: StoreConfig,
|
||||
/// Transport configuration.
|
||||
pub transport: TransportConfig,
|
||||
/// Crypto configuration.
|
||||
pub crypto: CryptoConfig,
|
||||
/// Rate limiting configuration.
|
||||
pub rate_limit: RateLimitConfig,
|
||||
/// Logging configuration.
|
||||
pub logging: LoggingConfig,
|
||||
}
|
||||
|
||||
impl Default for MeshConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
identity: IdentityConfig::default(),
|
||||
announce: AnnounceConfig::default(),
|
||||
routing: RoutingConfig::default(),
|
||||
store: StoreConfig::default(),
|
||||
transport: TransportConfig::default(),
|
||||
crypto: CryptoConfig::default(),
|
||||
rate_limit: RateLimitConfig::default(),
|
||||
logging: LoggingConfig::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl MeshConfig {
|
||||
/// Load configuration from a TOML file.
|
||||
pub fn from_file(path: &PathBuf) -> MeshResult<Self> {
|
||||
let content = std::fs::read_to_string(path).map_err(|e| {
|
||||
ConfigError::Parse(format!("failed to read config file: {}", e))
|
||||
})?;
|
||||
Self::from_toml(&content)
|
||||
}
|
||||
|
||||
/// Parse configuration from TOML string.
|
||||
pub fn from_toml(toml: &str) -> MeshResult<Self> {
|
||||
let config: Self = toml::from_str(toml).map_err(|e| {
|
||||
ConfigError::Parse(format!("TOML parse error: {}", e))
|
||||
})?;
|
||||
config.validate()?;
|
||||
Ok(config)
|
||||
}
|
||||
|
||||
/// Serialize to TOML string.
|
||||
pub fn to_toml(&self) -> MeshResult<String> {
|
||||
toml::to_string_pretty(self).map_err(|e| {
|
||||
ConfigError::Parse(format!("TOML serialize error: {}", e)).into()
|
||||
})
|
||||
}
|
||||
|
||||
/// Validate configuration values.
|
||||
pub fn validate(&self) -> MeshResult<()> {
|
||||
self.announce.validate()?;
|
||||
self.routing.validate()?;
|
||||
self.store.validate()?;
|
||||
self.rate_limit.validate()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Create a minimal config for constrained devices.
|
||||
pub fn constrained() -> Self {
|
||||
Self {
|
||||
store: StoreConfig {
|
||||
max_messages: 100,
|
||||
max_keypackages: 50,
|
||||
..Default::default()
|
||||
},
|
||||
routing: RoutingConfig {
|
||||
max_entries: 100,
|
||||
..Default::default()
|
||||
},
|
||||
announce: AnnounceConfig {
|
||||
interval: Duration::from_secs(1800), // 30 min
|
||||
..Default::default()
|
||||
},
|
||||
crypto: CryptoConfig {
|
||||
default_mode: CryptoMode::MlsLiteUnsigned,
|
||||
..Default::default()
|
||||
},
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Identity configuration.
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
#[serde(default)]
|
||||
pub struct IdentityConfig {
|
||||
/// Path to persist identity keypair.
|
||||
pub keypair_path: Option<PathBuf>,
|
||||
/// Whether to auto-generate keypair if missing.
|
||||
pub auto_generate: bool,
|
||||
}
|
||||
|
||||
impl Default for IdentityConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
keypair_path: None,
|
||||
auto_generate: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Announce protocol configuration.
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
#[serde(default)]
|
||||
pub struct AnnounceConfig {
|
||||
/// Interval between periodic announcements.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub interval: Duration,
|
||||
/// Maximum age before announce is considered stale.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub max_age: Duration,
|
||||
/// Maximum propagation hops.
|
||||
pub max_hops: u8,
|
||||
/// Capabilities to advertise.
|
||||
pub capabilities: u16,
|
||||
/// Whether to include KeyPackage hash in announces.
|
||||
pub include_keypackage: bool,
|
||||
}
|
||||
|
||||
impl Default for AnnounceConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
interval: Duration::from_secs(600), // 10 min
|
||||
max_age: Duration::from_secs(1800), // 30 min
|
||||
max_hops: 8,
|
||||
capabilities: 0x0003, // CAP_RELAY | CAP_STORE
|
||||
include_keypackage: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl AnnounceConfig {
|
||||
fn validate(&self) -> MeshResult<()> {
|
||||
if self.interval < Duration::from_secs(10) {
|
||||
return Err(ConfigError::InvalidValue {
|
||||
key: "announce.interval".to_string(),
|
||||
reason: "must be at least 10 seconds".to_string(),
|
||||
}.into());
|
||||
}
|
||||
if self.max_hops == 0 || self.max_hops > 32 {
|
||||
return Err(ConfigError::InvalidValue {
|
||||
key: "announce.max_hops".to_string(),
|
||||
reason: "must be between 1 and 32".to_string(),
|
||||
}.into());
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Routing configuration.
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
#[serde(default)]
|
||||
pub struct RoutingConfig {
|
||||
/// Maximum routing table entries.
|
||||
pub max_entries: usize,
|
||||
/// Default route TTL.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub default_ttl: Duration,
|
||||
/// How often to garbage collect expired routes.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub gc_interval: Duration,
|
||||
}
|
||||
|
||||
impl Default for RoutingConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
max_entries: 10_000,
|
||||
default_ttl: Duration::from_secs(1800), // 30 min
|
||||
gc_interval: Duration::from_secs(60),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl RoutingConfig {
|
||||
fn validate(&self) -> MeshResult<()> {
|
||||
if self.max_entries == 0 {
|
||||
return Err(ConfigError::InvalidValue {
|
||||
key: "routing.max_entries".to_string(),
|
||||
reason: "must be at least 1".to_string(),
|
||||
}.into());
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Store-and-forward configuration.
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
#[serde(default)]
|
||||
pub struct StoreConfig {
|
||||
/// Maximum messages in store.
|
||||
pub max_messages: usize,
|
||||
/// Maximum messages per recipient.
|
||||
pub max_per_recipient: usize,
|
||||
/// Maximum cached KeyPackages.
|
||||
pub max_keypackages: usize,
|
||||
/// Maximum KeyPackages per address.
|
||||
pub max_keypackages_per_addr: usize,
|
||||
/// Default message TTL.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub default_ttl: Duration,
|
||||
/// Path for persistent storage (None = in-memory only).
|
||||
pub persistence_path: Option<PathBuf>,
|
||||
}
|
||||
|
||||
impl Default for StoreConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
max_messages: 10_000,
|
||||
max_per_recipient: 100,
|
||||
max_keypackages: 1_000,
|
||||
max_keypackages_per_addr: 3,
|
||||
default_ttl: Duration::from_secs(24 * 3600), // 24 hours
|
||||
persistence_path: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl StoreConfig {
|
||||
fn validate(&self) -> MeshResult<()> {
|
||||
if self.max_messages == 0 {
|
||||
return Err(ConfigError::InvalidValue {
|
||||
key: "store.max_messages".to_string(),
|
||||
reason: "must be at least 1".to_string(),
|
||||
}.into());
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Transport configuration.
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
#[serde(default)]
|
||||
pub struct TransportConfig {
|
||||
/// Enable iroh/QUIC transport.
|
||||
pub enable_iroh: bool,
|
||||
/// Enable TCP transport.
|
||||
pub enable_tcp: bool,
|
||||
/// TCP listen address.
|
||||
pub tcp_listen: Option<String>,
|
||||
/// Enable LoRa transport.
|
||||
pub enable_lora: bool,
|
||||
/// LoRa device path (e.g., /dev/ttyUSB0).
|
||||
pub lora_device: Option<String>,
|
||||
/// LoRa spreading factor (7-12).
|
||||
pub lora_sf: u8,
|
||||
/// LoRa bandwidth in kHz.
|
||||
pub lora_bw: u32,
|
||||
/// Connection timeout.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub connect_timeout: Duration,
|
||||
/// Send timeout.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub send_timeout: Duration,
|
||||
}
|
||||
|
||||
impl Default for TransportConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
enable_iroh: true,
|
||||
enable_tcp: true,
|
||||
tcp_listen: None,
|
||||
enable_lora: false,
|
||||
lora_device: None,
|
||||
lora_sf: 10,
|
||||
lora_bw: 125,
|
||||
connect_timeout: Duration::from_secs(10),
|
||||
send_timeout: Duration::from_secs(30),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Crypto configuration.
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
#[serde(default)]
|
||||
pub struct CryptoConfig {
|
||||
/// Default crypto mode.
|
||||
pub default_mode: CryptoMode,
|
||||
/// Whether to auto-upgrade to better crypto when available.
|
||||
pub auto_upgrade: bool,
|
||||
/// Whether to sign MLS-Lite messages.
|
||||
pub mls_lite_sign: bool,
|
||||
/// Enable post-quantum hybrid mode.
|
||||
pub enable_pq: bool,
|
||||
}
|
||||
|
||||
impl Default for CryptoConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
default_mode: CryptoMode::MlsClassical,
|
||||
auto_upgrade: true,
|
||||
mls_lite_sign: true,
|
||||
enable_pq: false, // PQ is large, opt-in
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Rate limiting configuration.
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
#[serde(default)]
|
||||
pub struct RateLimitConfig {
|
||||
/// Maximum announces per peer per minute.
|
||||
pub announce_per_peer_per_min: u32,
|
||||
/// Maximum messages per peer per minute.
|
||||
pub message_per_peer_per_min: u32,
|
||||
/// Maximum KeyPackage requests per minute.
|
||||
pub keypackage_requests_per_min: u32,
|
||||
/// LoRa duty cycle limit (0.0-1.0, e.g., 0.01 = 1%).
|
||||
pub lora_duty_cycle: f32,
|
||||
}
|
||||
|
||||
impl Default for RateLimitConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
announce_per_peer_per_min: 10,
|
||||
message_per_peer_per_min: 60,
|
||||
keypackage_requests_per_min: 20,
|
||||
lora_duty_cycle: 0.01, // EU868 1% default
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl RateLimitConfig {
|
||||
fn validate(&self) -> MeshResult<()> {
|
||||
if self.lora_duty_cycle < 0.0 || self.lora_duty_cycle > 1.0 {
|
||||
return Err(ConfigError::InvalidValue {
|
||||
key: "rate_limit.lora_duty_cycle".to_string(),
|
||||
reason: "must be between 0.0 and 1.0".to_string(),
|
||||
}.into());
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Logging configuration.
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
#[serde(default)]
|
||||
pub struct LoggingConfig {
|
||||
/// Log level (trace, debug, info, warn, error).
|
||||
pub level: String,
|
||||
/// Whether to log to file.
|
||||
pub file: Option<PathBuf>,
|
||||
/// Whether to include timestamps.
|
||||
pub timestamps: bool,
|
||||
/// Whether to include span context.
|
||||
pub spans: bool,
|
||||
}
|
||||
|
||||
impl Default for LoggingConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
level: "info".to_string(),
|
||||
file: None,
|
||||
timestamps: true,
|
||||
spans: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Serde helper for CryptoMode
|
||||
impl Serialize for CryptoMode {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
let s = match self {
|
||||
CryptoMode::MlsHybrid => "mls-hybrid",
|
||||
CryptoMode::MlsClassical => "mls-classical",
|
||||
CryptoMode::MlsLiteSigned => "mls-lite-signed",
|
||||
CryptoMode::MlsLiteUnsigned => "mls-lite-unsigned",
|
||||
};
|
||||
serializer.serialize_str(s)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for CryptoMode {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
let s = String::deserialize(deserializer)?;
|
||||
match s.as_str() {
|
||||
"mls-hybrid" => Ok(CryptoMode::MlsHybrid),
|
||||
"mls-classical" => Ok(CryptoMode::MlsClassical),
|
||||
"mls-lite-signed" => Ok(CryptoMode::MlsLiteSigned),
|
||||
"mls-lite-unsigned" => Ok(CryptoMode::MlsLiteUnsigned),
|
||||
_ => Err(serde::de::Error::unknown_variant(
|
||||
&s,
|
||||
&["mls-hybrid", "mls-classical", "mls-lite-signed", "mls-lite-unsigned"],
|
||||
)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn default_config_is_valid() {
|
||||
let config = MeshConfig::default();
|
||||
assert!(config.validate().is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn constrained_config_is_valid() {
|
||||
let config = MeshConfig::constrained();
|
||||
assert!(config.validate().is_ok());
|
||||
assert_eq!(config.store.max_messages, 100);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn toml_roundtrip() {
|
||||
let config = MeshConfig::default();
|
||||
let toml = config.to_toml().expect("serialize");
|
||||
let restored = MeshConfig::from_toml(&toml).expect("parse");
|
||||
assert_eq!(config.announce.max_hops, restored.announce.max_hops);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn invalid_announce_interval() {
|
||||
let mut config = MeshConfig::default();
|
||||
config.announce.interval = Duration::from_secs(1); // Too short
|
||||
assert!(config.validate().is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn invalid_duty_cycle() {
|
||||
let mut config = MeshConfig::default();
|
||||
config.rate_limit.lora_duty_cycle = 2.0; // > 1.0
|
||||
assert!(config.validate().is_err());
|
||||
}
|
||||
}
|
||||
354
crates/quicprochat-p2p/src/error.rs
Normal file
354
crates/quicprochat-p2p/src/error.rs
Normal file
@@ -0,0 +1,354 @@
|
||||
//! Production-ready error types for the mesh P2P layer.
|
||||
//!
|
||||
//! This module provides structured error types with context for debugging
|
||||
//! and recovery. Errors are categorized by subsystem for easier handling.
|
||||
|
||||
use std::fmt;
|
||||
|
||||
use thiserror::Error;
|
||||
|
||||
use crate::address::MeshAddress;
|
||||
use crate::transport::TransportAddr;
|
||||
|
||||
/// Top-level mesh error type.
|
||||
#[derive(Debug, Error)]
|
||||
pub enum MeshError {
|
||||
/// Transport layer errors.
|
||||
#[error("transport error: {0}")]
|
||||
Transport(#[from] TransportError),
|
||||
|
||||
/// Routing errors.
|
||||
#[error("routing error: {0}")]
|
||||
Routing(#[from] RoutingError),
|
||||
|
||||
/// Crypto/encryption errors.
|
||||
#[error("crypto error: {0}")]
|
||||
Crypto(#[from] CryptoError),
|
||||
|
||||
/// Protocol errors (malformed messages, version mismatch).
|
||||
#[error("protocol error: {0}")]
|
||||
Protocol(#[from] ProtocolError),
|
||||
|
||||
/// Store/cache errors.
|
||||
#[error("store error: {0}")]
|
||||
Store(#[from] StoreError),
|
||||
|
||||
/// Configuration errors.
|
||||
#[error("config error: {0}")]
|
||||
Config(#[from] ConfigError),
|
||||
|
||||
/// Internal errors (bugs, invariant violations).
|
||||
#[error("internal error: {0}")]
|
||||
Internal(String),
|
||||
}
|
||||
|
||||
/// Transport layer errors.
|
||||
#[derive(Debug, Error)]
|
||||
pub enum TransportError {
|
||||
/// Failed to send data.
|
||||
#[error("send failed to {dest}: {reason}")]
|
||||
SendFailed { dest: String, reason: String },
|
||||
|
||||
/// Failed to receive data.
|
||||
#[error("receive failed: {0}")]
|
||||
ReceiveFailed(String),
|
||||
|
||||
/// Connection failed or lost.
|
||||
#[error("connection to {dest} failed: {reason}")]
|
||||
ConnectionFailed { dest: String, reason: String },
|
||||
|
||||
/// Transport not available.
|
||||
#[error("transport '{name}' not available")]
|
||||
NotAvailable { name: String },
|
||||
|
||||
/// No transports registered.
|
||||
#[error("no transports registered")]
|
||||
NoTransports,
|
||||
|
||||
/// MTU exceeded.
|
||||
#[error("payload {size} bytes exceeds MTU {mtu} bytes")]
|
||||
MtuExceeded { size: usize, mtu: usize },
|
||||
|
||||
/// Duty cycle limit reached.
|
||||
#[error("duty cycle limit reached: {used_ms}ms used of {limit_ms}ms allowed")]
|
||||
DutyCycleExceeded { used_ms: u64, limit_ms: u64 },
|
||||
|
||||
/// Timeout waiting for response.
|
||||
#[error("timeout waiting for response from {dest}")]
|
||||
Timeout { dest: String },
|
||||
|
||||
/// I/O error.
|
||||
#[error("I/O error: {0}")]
|
||||
Io(#[from] std::io::Error),
|
||||
}
|
||||
|
||||
/// Routing errors.
|
||||
#[derive(Debug, Error)]
|
||||
pub enum RoutingError {
|
||||
/// No route to destination.
|
||||
#[error("no route to {0}")]
|
||||
NoRoute(String),
|
||||
|
||||
/// Route expired.
|
||||
#[error("route to {dest} expired (last seen {age_secs}s ago)")]
|
||||
RouteExpired { dest: String, age_secs: u64 },
|
||||
|
||||
/// Too many hops.
|
||||
#[error("max hops ({max}) exceeded for message to {dest}")]
|
||||
MaxHopsExceeded { dest: String, max: u8 },
|
||||
|
||||
/// Message expired.
|
||||
#[error("message expired (TTL {ttl_secs}s, age {age_secs}s)")]
|
||||
MessageExpired { ttl_secs: u32, age_secs: u64 },
|
||||
|
||||
/// Duplicate message (dedup).
|
||||
#[error("duplicate message ID {0}")]
|
||||
Duplicate(String),
|
||||
|
||||
/// Routing table full.
|
||||
#[error("routing table full ({capacity} entries)")]
|
||||
TableFull { capacity: usize },
|
||||
}
|
||||
|
||||
/// Crypto/encryption errors.
|
||||
#[derive(Debug, Error)]
|
||||
pub enum CryptoError {
|
||||
/// Signature verification failed.
|
||||
#[error("signature verification failed for {context}")]
|
||||
SignatureInvalid { context: String },
|
||||
|
||||
/// Decryption failed.
|
||||
#[error("decryption failed: {0}")]
|
||||
DecryptionFailed(String),
|
||||
|
||||
/// Key not found.
|
||||
#[error("key not found for {0}")]
|
||||
KeyNotFound(String),
|
||||
|
||||
/// KeyPackage invalid or expired.
|
||||
#[error("KeyPackage invalid: {0}")]
|
||||
KeyPackageInvalid(String),
|
||||
|
||||
/// Replay attack detected.
|
||||
#[error("replay detected: sequence {seq} already seen from {sender}")]
|
||||
ReplayDetected { sender: String, seq: u32 },
|
||||
|
||||
/// Wrong epoch.
|
||||
#[error("wrong epoch: expected {expected}, got {got}")]
|
||||
WrongEpoch { expected: u16, got: u16 },
|
||||
|
||||
/// MLS error (from openmls).
|
||||
#[error("MLS error: {0}")]
|
||||
Mls(String),
|
||||
}
|
||||
|
||||
/// Protocol errors.
|
||||
#[derive(Debug, Error)]
|
||||
pub enum ProtocolError {
|
||||
/// Unknown message type.
|
||||
#[error("unknown message type: 0x{0:02x}")]
|
||||
UnknownMessageType(u8),
|
||||
|
||||
/// Invalid message format.
|
||||
#[error("invalid message format: {0}")]
|
||||
InvalidFormat(String),
|
||||
|
||||
/// Version mismatch.
|
||||
#[error("protocol version mismatch: expected {expected}, got {got}")]
|
||||
VersionMismatch { expected: u8, got: u8 },
|
||||
|
||||
/// Required field missing.
|
||||
#[error("required field missing: {0}")]
|
||||
MissingField(String),
|
||||
|
||||
/// CBOR decode error.
|
||||
#[error("CBOR decode error: {0}")]
|
||||
CborDecode(String),
|
||||
|
||||
/// CBOR encode error.
|
||||
#[error("CBOR encode error: {0}")]
|
||||
CborEncode(String),
|
||||
|
||||
/// Message too large.
|
||||
#[error("message too large: {size} bytes (max {max})")]
|
||||
MessageTooLarge { size: usize, max: usize },
|
||||
}
|
||||
|
||||
/// Store/cache errors.
|
||||
#[derive(Debug, Error)]
|
||||
pub enum StoreError {
|
||||
/// Store is full.
|
||||
#[error("store full: {current}/{capacity} items")]
|
||||
Full { current: usize, capacity: usize },
|
||||
|
||||
/// Item not found.
|
||||
#[error("item not found: {0}")]
|
||||
NotFound(String),
|
||||
|
||||
/// Persistence error.
|
||||
#[error("persistence error: {0}")]
|
||||
Persistence(String),
|
||||
|
||||
/// Serialization error.
|
||||
#[error("serialization error: {0}")]
|
||||
Serialization(String),
|
||||
}
|
||||
|
||||
/// Configuration errors.
|
||||
#[derive(Debug, Error)]
|
||||
pub enum ConfigError {
|
||||
/// Invalid configuration value.
|
||||
#[error("invalid config value for '{key}': {reason}")]
|
||||
InvalidValue { key: String, reason: String },
|
||||
|
||||
/// Missing required configuration.
|
||||
#[error("missing required config: {0}")]
|
||||
Missing(String),
|
||||
|
||||
/// Configuration parse error.
|
||||
#[error("config parse error: {0}")]
|
||||
Parse(String),
|
||||
}
|
||||
|
||||
/// Result type alias for mesh operations.
|
||||
pub type MeshResult<T> = Result<T, MeshError>;
|
||||
|
||||
/// Error context extension trait for adding context to errors.
|
||||
pub trait ErrorContext<T> {
|
||||
/// Add context to an error.
|
||||
fn context(self, context: impl Into<String>) -> MeshResult<T>;
|
||||
|
||||
/// Add context with a closure (lazy evaluation).
|
||||
fn with_context<F>(self, f: F) -> MeshResult<T>
|
||||
where
|
||||
F: FnOnce() -> String;
|
||||
}
|
||||
|
||||
impl<T, E: Into<MeshError>> ErrorContext<T> for Result<T, E> {
|
||||
fn context(self, context: impl Into<String>) -> MeshResult<T> {
|
||||
self.map_err(|e| {
|
||||
let err = e.into();
|
||||
MeshError::Internal(format!("{}: {}", context.into(), err))
|
||||
})
|
||||
}
|
||||
|
||||
fn with_context<F>(self, f: F) -> MeshResult<T>
|
||||
where
|
||||
F: FnOnce() -> String,
|
||||
{
|
||||
self.map_err(|e| {
|
||||
let err = e.into();
|
||||
MeshError::Internal(format!("{}: {}", f(), err))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert anyhow errors to MeshError.
|
||||
impl From<anyhow::Error> for MeshError {
|
||||
fn from(e: anyhow::Error) -> Self {
|
||||
MeshError::Internal(e.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper to create transport send errors.
|
||||
impl TransportError {
|
||||
pub fn send_failed(dest: &TransportAddr, reason: impl Into<String>) -> Self {
|
||||
Self::SendFailed {
|
||||
dest: dest.to_string(),
|
||||
reason: reason.into(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn connection_failed(dest: &TransportAddr, reason: impl Into<String>) -> Self {
|
||||
Self::ConnectionFailed {
|
||||
dest: dest.to_string(),
|
||||
reason: reason.into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper to create routing errors.
|
||||
impl RoutingError {
|
||||
pub fn no_route(addr: &MeshAddress) -> Self {
|
||||
Self::NoRoute(format!("{}", addr))
|
||||
}
|
||||
|
||||
pub fn no_route_bytes(addr: &[u8]) -> Self {
|
||||
Self::NoRoute(hex::encode(&addr[..8.min(addr.len())]))
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper to create crypto errors.
|
||||
impl CryptoError {
|
||||
pub fn signature_invalid(context: impl Into<String>) -> Self {
|
||||
Self::SignatureInvalid {
|
||||
context: context.into(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn replay(sender: &MeshAddress, seq: u32) -> Self {
|
||||
Self::ReplayDetected {
|
||||
sender: format!("{}", sender),
|
||||
seq,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper to create protocol errors.
|
||||
impl ProtocolError {
|
||||
pub fn cbor_decode(e: impl fmt::Display) -> Self {
|
||||
Self::CborDecode(e.to_string())
|
||||
}
|
||||
|
||||
pub fn cbor_encode(e: impl fmt::Display) -> Self {
|
||||
Self::CborEncode(e.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn error_display() {
|
||||
let err = TransportError::SendFailed {
|
||||
dest: "tcp:127.0.0.1:8080".to_string(),
|
||||
reason: "connection refused".to_string(),
|
||||
};
|
||||
assert!(err.to_string().contains("tcp:127.0.0.1:8080"));
|
||||
assert!(err.to_string().contains("connection refused"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn error_conversion() {
|
||||
let transport_err = TransportError::NoTransports;
|
||||
let mesh_err: MeshError = transport_err.into();
|
||||
assert!(matches!(mesh_err, MeshError::Transport(_)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn routing_error_helpers() {
|
||||
let addr = MeshAddress::from_bytes([0xAB; 16]);
|
||||
let err = RoutingError::no_route(&addr);
|
||||
assert!(err.to_string().contains("no route"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn crypto_error_helpers() {
|
||||
let addr = MeshAddress::from_bytes([0xCD; 16]);
|
||||
let err = CryptoError::replay(&addr, 42);
|
||||
assert!(err.to_string().contains("42"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn context_extension() {
|
||||
fn fallible() -> Result<(), TransportError> {
|
||||
Err(TransportError::NoTransports)
|
||||
}
|
||||
|
||||
let result: MeshResult<()> = fallible().context("during startup");
|
||||
assert!(result.is_err());
|
||||
let err_str = result.unwrap_err().to_string();
|
||||
assert!(err_str.contains("during startup"));
|
||||
}
|
||||
}
|
||||
@@ -15,7 +15,9 @@
|
||||
pub mod address;
|
||||
pub mod announce;
|
||||
pub mod announce_protocol;
|
||||
pub mod config;
|
||||
pub mod crypto_negotiation;
|
||||
pub mod error;
|
||||
pub mod fapp;
|
||||
pub mod fapp_router;
|
||||
pub mod broadcast;
|
||||
@@ -23,7 +25,11 @@ pub mod envelope;
|
||||
pub mod envelope_v2;
|
||||
pub mod keypackage_cache;
|
||||
pub mod mesh_protocol;
|
||||
pub mod metrics;
|
||||
pub mod mls_lite;
|
||||
pub mod persistence;
|
||||
pub mod rate_limit;
|
||||
pub mod shutdown;
|
||||
pub mod identity;
|
||||
pub mod link;
|
||||
pub mod mesh_router;
|
||||
|
||||
502
crates/quicprochat-p2p/src/metrics.rs
Normal file
502
crates/quicprochat-p2p/src/metrics.rs
Normal file
@@ -0,0 +1,502 @@
|
||||
//! Observability metrics for mesh networking.
|
||||
//!
|
||||
//! This module provides structured metrics collection for monitoring
|
||||
//! mesh node health, performance, and resource usage.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
/// Atomic counter for thread-safe metric updates.
|
||||
#[derive(Debug, Default)]
|
||||
pub struct Counter(AtomicU64);
|
||||
|
||||
impl Counter {
|
||||
pub fn new() -> Self {
|
||||
Self(AtomicU64::new(0))
|
||||
}
|
||||
|
||||
pub fn inc(&self) {
|
||||
self.0.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub fn inc_by(&self, n: u64) {
|
||||
self.0.fetch_add(n, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub fn get(&self) -> u64 {
|
||||
self.0.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
pub fn reset(&self) -> u64 {
|
||||
self.0.swap(0, Ordering::Relaxed)
|
||||
}
|
||||
}
|
||||
|
||||
/// Gauge for values that can go up and down.
|
||||
#[derive(Debug, Default)]
|
||||
pub struct Gauge(AtomicU64);
|
||||
|
||||
impl Gauge {
|
||||
pub fn new() -> Self {
|
||||
Self(AtomicU64::new(0))
|
||||
}
|
||||
|
||||
pub fn set(&self, val: u64) {
|
||||
self.0.store(val, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub fn inc(&self) {
|
||||
self.0.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub fn dec(&self) {
|
||||
self.0.fetch_sub(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub fn get(&self) -> u64 {
|
||||
self.0.load(Ordering::Relaxed)
|
||||
}
|
||||
}
|
||||
|
||||
/// Histogram for tracking distributions (simple bucket-based).
|
||||
#[derive(Debug)]
|
||||
pub struct Histogram {
|
||||
/// Bucket boundaries (upper limits).
|
||||
buckets: Vec<u64>,
|
||||
/// Count in each bucket.
|
||||
counts: Vec<AtomicU64>,
|
||||
/// Sum of all values.
|
||||
sum: AtomicU64,
|
||||
/// Total count.
|
||||
count: AtomicU64,
|
||||
}
|
||||
|
||||
impl Histogram {
|
||||
/// Create with default latency buckets (ms).
|
||||
pub fn latency_ms() -> Self {
|
||||
Self::new(vec![1, 5, 10, 25, 50, 100, 250, 500, 1000, 5000, 10000])
|
||||
}
|
||||
|
||||
/// Create with default size buckets (bytes).
|
||||
pub fn size_bytes() -> Self {
|
||||
Self::new(vec![64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 65536])
|
||||
}
|
||||
|
||||
pub fn new(buckets: Vec<u64>) -> Self {
|
||||
let counts = buckets.iter().map(|_| AtomicU64::new(0)).collect();
|
||||
Self {
|
||||
buckets,
|
||||
counts,
|
||||
sum: AtomicU64::new(0),
|
||||
count: AtomicU64::new(0),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn observe(&self, value: u64) {
|
||||
self.sum.fetch_add(value, Ordering::Relaxed);
|
||||
self.count.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
for (i, &upper) in self.buckets.iter().enumerate() {
|
||||
if value <= upper {
|
||||
self.counts[i].fetch_add(1, Ordering::Relaxed);
|
||||
return;
|
||||
}
|
||||
}
|
||||
// Value exceeds all buckets — count in last
|
||||
if let Some(last) = self.counts.last() {
|
||||
last.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn observe_duration(&self, d: Duration) {
|
||||
self.observe(d.as_millis() as u64);
|
||||
}
|
||||
|
||||
pub fn sum(&self) -> u64 {
|
||||
self.sum.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
pub fn count(&self) -> u64 {
|
||||
self.count.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
pub fn avg(&self) -> f64 {
|
||||
let count = self.count();
|
||||
if count == 0 {
|
||||
0.0
|
||||
} else {
|
||||
self.sum() as f64 / count as f64
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Per-transport metrics.
|
||||
#[derive(Debug, Default)]
|
||||
pub struct TransportMetrics {
|
||||
/// Messages sent successfully.
|
||||
pub sent: Counter,
|
||||
/// Messages received.
|
||||
pub received: Counter,
|
||||
/// Send failures.
|
||||
pub send_errors: Counter,
|
||||
/// Receive errors.
|
||||
pub recv_errors: Counter,
|
||||
/// Bytes sent.
|
||||
pub bytes_sent: Counter,
|
||||
/// Bytes received.
|
||||
pub bytes_received: Counter,
|
||||
/// Active connections (for connection-oriented transports).
|
||||
pub connections: Gauge,
|
||||
}
|
||||
|
||||
/// Per-peer metrics.
|
||||
#[derive(Debug)]
|
||||
pub struct PeerMetrics {
|
||||
/// Messages sent to this peer.
|
||||
pub messages_sent: Counter,
|
||||
/// Messages received from this peer.
|
||||
pub messages_received: Counter,
|
||||
/// Last seen timestamp.
|
||||
pub last_seen: RwLock<Option<Instant>>,
|
||||
/// Round-trip time samples.
|
||||
pub rtt_ms: Histogram,
|
||||
}
|
||||
|
||||
impl Default for PeerMetrics {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
messages_sent: Counter::new(),
|
||||
messages_received: Counter::new(),
|
||||
last_seen: RwLock::new(None),
|
||||
rtt_ms: Histogram::latency_ms(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PeerMetrics {
|
||||
pub fn touch(&self) {
|
||||
if let Ok(mut last) = self.last_seen.write() {
|
||||
*last = Some(Instant::now());
|
||||
}
|
||||
}
|
||||
|
||||
pub fn age(&self) -> Option<Duration> {
|
||||
self.last_seen
|
||||
.read()
|
||||
.ok()
|
||||
.and_then(|t| t.map(|i| i.elapsed()))
|
||||
}
|
||||
}
|
||||
|
||||
/// Global mesh metrics.
|
||||
#[derive(Debug)]
|
||||
pub struct MeshMetrics {
|
||||
/// Transport metrics by name.
|
||||
pub transports: RwLock<HashMap<String, Arc<TransportMetrics>>>,
|
||||
/// Routing metrics.
|
||||
pub routing: RoutingMetrics,
|
||||
/// Store metrics.
|
||||
pub store: StoreMetrics,
|
||||
/// Crypto metrics.
|
||||
pub crypto: CryptoMetrics,
|
||||
/// Protocol metrics.
|
||||
pub protocol: ProtocolMetrics,
|
||||
/// Node start time.
|
||||
pub started_at: Instant,
|
||||
}
|
||||
|
||||
impl Default for MeshMetrics {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl MeshMetrics {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
transports: RwLock::new(HashMap::new()),
|
||||
routing: RoutingMetrics::default(),
|
||||
store: StoreMetrics::default(),
|
||||
crypto: CryptoMetrics::default(),
|
||||
protocol: ProtocolMetrics::default(),
|
||||
started_at: Instant::now(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get or create transport metrics.
|
||||
pub fn transport(&self, name: &str) -> Arc<TransportMetrics> {
|
||||
{
|
||||
let map = self.transports.read().unwrap();
|
||||
if let Some(m) = map.get(name) {
|
||||
return Arc::clone(m);
|
||||
}
|
||||
}
|
||||
let mut map = self.transports.write().unwrap();
|
||||
map.entry(name.to_string())
|
||||
.or_insert_with(|| Arc::new(TransportMetrics::default()))
|
||||
.clone()
|
||||
}
|
||||
|
||||
/// Node uptime.
|
||||
pub fn uptime(&self) -> Duration {
|
||||
self.started_at.elapsed()
|
||||
}
|
||||
|
||||
/// Export metrics as a snapshot.
|
||||
pub fn snapshot(&self) -> MetricsSnapshot {
|
||||
let transports = self.transports.read().unwrap();
|
||||
let transport_snapshots: HashMap<String, TransportSnapshot> = transports
|
||||
.iter()
|
||||
.map(|(name, m)| {
|
||||
(
|
||||
name.clone(),
|
||||
TransportSnapshot {
|
||||
sent: m.sent.get(),
|
||||
received: m.received.get(),
|
||||
send_errors: m.send_errors.get(),
|
||||
bytes_sent: m.bytes_sent.get(),
|
||||
bytes_received: m.bytes_received.get(),
|
||||
connections: m.connections.get(),
|
||||
},
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
|
||||
MetricsSnapshot {
|
||||
uptime_secs: self.uptime().as_secs(),
|
||||
transports: transport_snapshots,
|
||||
routing: RoutingSnapshot {
|
||||
table_size: self.routing.table_size.get(),
|
||||
lookups: self.routing.lookups.get(),
|
||||
lookup_misses: self.routing.lookup_misses.get(),
|
||||
announcements_processed: self.routing.announcements_processed.get(),
|
||||
},
|
||||
store: StoreSnapshot {
|
||||
messages_stored: self.store.messages_stored.get(),
|
||||
messages_delivered: self.store.messages_delivered.get(),
|
||||
messages_expired: self.store.messages_expired.get(),
|
||||
current_size: self.store.current_size.get(),
|
||||
},
|
||||
crypto: CryptoSnapshot {
|
||||
encryptions: self.crypto.encryptions.get(),
|
||||
decryptions: self.crypto.decryptions.get(),
|
||||
signature_verifications: self.crypto.signature_verifications.get(),
|
||||
signature_failures: self.crypto.signature_failures.get(),
|
||||
replay_detections: self.crypto.replay_detections.get(),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Routing subsystem metrics.
|
||||
#[derive(Debug, Default)]
|
||||
pub struct RoutingMetrics {
|
||||
/// Current routing table size.
|
||||
pub table_size: Gauge,
|
||||
/// Route lookups.
|
||||
pub lookups: Counter,
|
||||
/// Route lookup misses.
|
||||
pub lookup_misses: Counter,
|
||||
/// Routes added.
|
||||
pub routes_added: Counter,
|
||||
/// Routes expired.
|
||||
pub routes_expired: Counter,
|
||||
/// Announcements processed.
|
||||
pub announcements_processed: Counter,
|
||||
/// Announcements forwarded.
|
||||
pub announcements_forwarded: Counter,
|
||||
/// Duplicate announcements dropped.
|
||||
pub duplicates_dropped: Counter,
|
||||
}
|
||||
|
||||
/// Store subsystem metrics.
|
||||
#[derive(Debug, Default)]
|
||||
pub struct StoreMetrics {
|
||||
/// Messages stored.
|
||||
pub messages_stored: Counter,
|
||||
/// Messages delivered.
|
||||
pub messages_delivered: Counter,
|
||||
/// Messages expired.
|
||||
pub messages_expired: Counter,
|
||||
/// Current store size.
|
||||
pub current_size: Gauge,
|
||||
/// Store capacity reached events.
|
||||
pub capacity_reached: Counter,
|
||||
}
|
||||
|
||||
/// Crypto subsystem metrics.
|
||||
#[derive(Debug)]
|
||||
pub struct CryptoMetrics {
|
||||
/// Successful encryptions.
|
||||
pub encryptions: Counter,
|
||||
/// Successful decryptions.
|
||||
pub decryptions: Counter,
|
||||
/// Decryption failures.
|
||||
pub decryption_failures: Counter,
|
||||
/// Signature verifications.
|
||||
pub signature_verifications: Counter,
|
||||
/// Signature failures.
|
||||
pub signature_failures: Counter,
|
||||
/// Replay attacks detected.
|
||||
pub replay_detections: Counter,
|
||||
/// Encryption latency.
|
||||
pub encrypt_latency: Histogram,
|
||||
}
|
||||
|
||||
impl Default for CryptoMetrics {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
encryptions: Counter::new(),
|
||||
decryptions: Counter::new(),
|
||||
decryption_failures: Counter::new(),
|
||||
signature_verifications: Counter::new(),
|
||||
signature_failures: Counter::new(),
|
||||
replay_detections: Counter::new(),
|
||||
encrypt_latency: Histogram::latency_ms(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Protocol metrics.
|
||||
#[derive(Debug, Default)]
|
||||
pub struct ProtocolMetrics {
|
||||
/// Messages parsed.
|
||||
pub messages_parsed: Counter,
|
||||
/// Parse errors.
|
||||
pub parse_errors: Counter,
|
||||
/// Unknown message types.
|
||||
pub unknown_types: Counter,
|
||||
/// Messages too large.
|
||||
pub oversized: Counter,
|
||||
}
|
||||
|
||||
/// Point-in-time snapshot of metrics.
|
||||
#[derive(Debug, Clone, serde::Serialize)]
|
||||
pub struct MetricsSnapshot {
|
||||
pub uptime_secs: u64,
|
||||
pub transports: HashMap<String, TransportSnapshot>,
|
||||
pub routing: RoutingSnapshot,
|
||||
pub store: StoreSnapshot,
|
||||
pub crypto: CryptoSnapshot,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, serde::Serialize)]
|
||||
pub struct TransportSnapshot {
|
||||
pub sent: u64,
|
||||
pub received: u64,
|
||||
pub send_errors: u64,
|
||||
pub bytes_sent: u64,
|
||||
pub bytes_received: u64,
|
||||
pub connections: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, serde::Serialize)]
|
||||
pub struct RoutingSnapshot {
|
||||
pub table_size: u64,
|
||||
pub lookups: u64,
|
||||
pub lookup_misses: u64,
|
||||
pub announcements_processed: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, serde::Serialize)]
|
||||
pub struct StoreSnapshot {
|
||||
pub messages_stored: u64,
|
||||
pub messages_delivered: u64,
|
||||
pub messages_expired: u64,
|
||||
pub current_size: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, serde::Serialize)]
|
||||
pub struct CryptoSnapshot {
|
||||
pub encryptions: u64,
|
||||
pub decryptions: u64,
|
||||
pub signature_verifications: u64,
|
||||
pub signature_failures: u64,
|
||||
pub replay_detections: u64,
|
||||
}
|
||||
|
||||
/// Global metrics instance.
|
||||
static GLOBAL_METRICS: std::sync::OnceLock<Arc<MeshMetrics>> = std::sync::OnceLock::new();
|
||||
|
||||
/// Get the global metrics instance.
|
||||
pub fn metrics() -> &'static Arc<MeshMetrics> {
|
||||
GLOBAL_METRICS.get_or_init(|| Arc::new(MeshMetrics::new()))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn counter_basics() {
|
||||
let c = Counter::new();
|
||||
assert_eq!(c.get(), 0);
|
||||
c.inc();
|
||||
assert_eq!(c.get(), 1);
|
||||
c.inc_by(5);
|
||||
assert_eq!(c.get(), 6);
|
||||
let old = c.reset();
|
||||
assert_eq!(old, 6);
|
||||
assert_eq!(c.get(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn gauge_basics() {
|
||||
let g = Gauge::new();
|
||||
assert_eq!(g.get(), 0);
|
||||
g.set(10);
|
||||
assert_eq!(g.get(), 10);
|
||||
g.inc();
|
||||
assert_eq!(g.get(), 11);
|
||||
g.dec();
|
||||
assert_eq!(g.get(), 10);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn histogram_basics() {
|
||||
let h = Histogram::new(vec![10, 50, 100]);
|
||||
h.observe(5);
|
||||
h.observe(25);
|
||||
h.observe(75);
|
||||
h.observe(200);
|
||||
|
||||
assert_eq!(h.count(), 4);
|
||||
assert_eq!(h.sum(), 5 + 25 + 75 + 200);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn transport_metrics() {
|
||||
let m = MeshMetrics::new();
|
||||
let tcp = m.transport("tcp");
|
||||
tcp.sent.inc();
|
||||
tcp.bytes_sent.inc_by(100);
|
||||
|
||||
assert_eq!(tcp.sent.get(), 1);
|
||||
assert_eq!(tcp.bytes_sent.get(), 100);
|
||||
|
||||
// Same name returns same instance
|
||||
let tcp2 = m.transport("tcp");
|
||||
assert_eq!(tcp2.sent.get(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn snapshot_serializes() {
|
||||
let m = MeshMetrics::new();
|
||||
m.transport("tcp").sent.inc();
|
||||
m.routing.lookups.inc_by(10);
|
||||
|
||||
let snapshot = m.snapshot();
|
||||
let json = serde_json::to_string(&snapshot).expect("serialize");
|
||||
assert!(json.contains("\"uptime_secs\":"));
|
||||
assert!(json.contains("\"lookups\":10"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn global_metrics() {
|
||||
let m = metrics();
|
||||
m.protocol.messages_parsed.inc();
|
||||
assert_eq!(metrics().protocol.messages_parsed.get(), 1);
|
||||
}
|
||||
}
|
||||
693
crates/quicprochat-p2p/src/persistence.rs
Normal file
693
crates/quicprochat-p2p/src/persistence.rs
Normal file
@@ -0,0 +1,693 @@
|
||||
//! Persistence layer for mesh node state.
|
||||
//!
|
||||
//! This module provides durable storage for:
|
||||
//! - Routing table entries
|
||||
//! - KeyPackage cache
|
||||
//! - Stored messages (store-and-forward)
|
||||
//! - Node identity
|
||||
//!
|
||||
//! Uses a simple append-only log format with periodic compaction.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::fs::{self, File, OpenOptions};
|
||||
use std::io::{self, BufRead, BufReader, BufWriter, Write};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::address::MeshAddress;
|
||||
use crate::error::{MeshResult, StoreError};
|
||||
|
||||
/// Storage entry types.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(tag = "type")]
|
||||
pub enum StorageEntry {
|
||||
/// Routing table entry.
|
||||
Route {
|
||||
address: [u8; 16],
|
||||
next_hop: String,
|
||||
hops: u8,
|
||||
sequence: u32,
|
||||
expires_at: u64,
|
||||
},
|
||||
/// Remove a route.
|
||||
RouteRemove { address: [u8; 16] },
|
||||
/// KeyPackage cache entry.
|
||||
KeyPackage {
|
||||
address: [u8; 16],
|
||||
data: Vec<u8>,
|
||||
hash: [u8; 8],
|
||||
expires_at: u64,
|
||||
},
|
||||
/// Remove a KeyPackage.
|
||||
KeyPackageRemove { address: [u8; 16], hash: [u8; 8] },
|
||||
/// Stored message.
|
||||
Message {
|
||||
id: Vec<u8>,
|
||||
recipient: [u8; 16],
|
||||
data: Vec<u8>,
|
||||
expires_at: u64,
|
||||
},
|
||||
/// Remove a message.
|
||||
MessageRemove { id: Vec<u8> },
|
||||
/// Identity keypair (encrypted or raw for development).
|
||||
Identity {
|
||||
public_key: Vec<u8>,
|
||||
secret_key_encrypted: Vec<u8>,
|
||||
},
|
||||
}
|
||||
|
||||
/// Append-only log for persistence.
|
||||
pub struct AppendLog {
|
||||
path: PathBuf,
|
||||
writer: Option<BufWriter<File>>,
|
||||
entries_since_compact: usize,
|
||||
compact_threshold: usize,
|
||||
}
|
||||
|
||||
impl AppendLog {
|
||||
/// Open or create a log file.
|
||||
pub fn open(path: impl AsRef<Path>) -> MeshResult<Self> {
|
||||
let path = path.as_ref().to_path_buf();
|
||||
|
||||
if let Some(parent) = path.parent() {
|
||||
fs::create_dir_all(parent).map_err(|e| {
|
||||
StoreError::Persistence(format!("failed to create directory: {}", e))
|
||||
})?;
|
||||
}
|
||||
|
||||
let file = OpenOptions::new()
|
||||
.create(true)
|
||||
.append(true)
|
||||
.open(&path)
|
||||
.map_err(|e| StoreError::Persistence(format!("failed to open log: {}", e)))?;
|
||||
|
||||
Ok(Self {
|
||||
path,
|
||||
writer: Some(BufWriter::new(file)),
|
||||
entries_since_compact: 0,
|
||||
compact_threshold: 10_000,
|
||||
})
|
||||
}
|
||||
|
||||
/// Append an entry to the log.
|
||||
pub fn append(&mut self, entry: &StorageEntry) -> MeshResult<()> {
|
||||
let writer = self.writer.as_mut().ok_or_else(|| {
|
||||
StoreError::Persistence("log not open".to_string())
|
||||
})?;
|
||||
|
||||
let json = serde_json::to_string(entry).map_err(|e| {
|
||||
StoreError::Serialization(format!("failed to serialize entry: {}", e))
|
||||
})?;
|
||||
|
||||
writeln!(writer, "{}", json).map_err(|e| {
|
||||
StoreError::Persistence(format!("failed to write entry: {}", e))
|
||||
})?;
|
||||
|
||||
writer.flush().map_err(|e| {
|
||||
StoreError::Persistence(format!("failed to flush: {}", e))
|
||||
})?;
|
||||
|
||||
self.entries_since_compact += 1;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Read all entries from the log.
|
||||
pub fn read_all(&self) -> MeshResult<Vec<StorageEntry>> {
|
||||
let file = File::open(&self.path).map_err(|e| {
|
||||
if e.kind() == io::ErrorKind::NotFound {
|
||||
return StoreError::NotFound(self.path.display().to_string());
|
||||
}
|
||||
StoreError::Persistence(format!("failed to open log: {}", e))
|
||||
})?;
|
||||
|
||||
let reader = BufReader::new(file);
|
||||
let mut entries = Vec::new();
|
||||
|
||||
for line in reader.lines() {
|
||||
let line = line.map_err(|e| {
|
||||
StoreError::Persistence(format!("failed to read line: {}", e))
|
||||
})?;
|
||||
|
||||
if line.trim().is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let entry: StorageEntry = serde_json::from_str(&line).map_err(|e| {
|
||||
StoreError::Serialization(format!("failed to parse entry: {}", e))
|
||||
})?;
|
||||
|
||||
entries.push(entry);
|
||||
}
|
||||
|
||||
Ok(entries)
|
||||
}
|
||||
|
||||
/// Check if compaction is needed.
|
||||
pub fn needs_compaction(&self) -> bool {
|
||||
self.entries_since_compact >= self.compact_threshold
|
||||
}
|
||||
|
||||
/// Compact the log by replaying and removing deleted entries.
|
||||
pub fn compact(&mut self) -> MeshResult<CompactStats> {
|
||||
let entries = self.read_all()?;
|
||||
|
||||
// Build current state by replaying log
|
||||
let mut routes: HashMap<[u8; 16], StorageEntry> = HashMap::new();
|
||||
let mut keypackages: HashMap<([u8; 16], [u8; 8]), StorageEntry> = HashMap::new();
|
||||
let mut messages: HashMap<Vec<u8>, StorageEntry> = HashMap::new();
|
||||
let mut identity: Option<StorageEntry> = None;
|
||||
|
||||
let now = now_secs();
|
||||
|
||||
for entry in entries {
|
||||
match &entry {
|
||||
StorageEntry::Route { address, expires_at, .. } => {
|
||||
if *expires_at > now {
|
||||
routes.insert(*address, entry);
|
||||
}
|
||||
}
|
||||
StorageEntry::RouteRemove { address } => {
|
||||
routes.remove(address);
|
||||
}
|
||||
StorageEntry::KeyPackage { address, hash, expires_at, .. } => {
|
||||
if *expires_at > now {
|
||||
keypackages.insert((*address, *hash), entry);
|
||||
}
|
||||
}
|
||||
StorageEntry::KeyPackageRemove { address, hash } => {
|
||||
keypackages.remove(&(*address, *hash));
|
||||
}
|
||||
StorageEntry::Message { id, expires_at, .. } => {
|
||||
if *expires_at > now {
|
||||
messages.insert(id.clone(), entry);
|
||||
}
|
||||
}
|
||||
StorageEntry::MessageRemove { id } => {
|
||||
messages.remove(id);
|
||||
}
|
||||
StorageEntry::Identity { .. } => {
|
||||
identity = Some(entry);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Write compacted log
|
||||
let tmp_path = self.path.with_extension("tmp");
|
||||
let mut tmp_file = File::create(&tmp_path).map_err(|e| {
|
||||
StoreError::Persistence(format!("failed to create temp file: {}", e))
|
||||
})?;
|
||||
|
||||
let mut written = 0;
|
||||
|
||||
if let Some(id) = identity {
|
||||
let json = serde_json::to_string(&id).map_err(|e| {
|
||||
StoreError::Serialization(e.to_string())
|
||||
})?;
|
||||
writeln!(tmp_file, "{}", json).map_err(|e| {
|
||||
StoreError::Persistence(e.to_string())
|
||||
})?;
|
||||
written += 1;
|
||||
}
|
||||
|
||||
for entry in routes.into_values() {
|
||||
let json = serde_json::to_string(&entry).map_err(|e| {
|
||||
StoreError::Serialization(e.to_string())
|
||||
})?;
|
||||
writeln!(tmp_file, "{}", json).map_err(|e| {
|
||||
StoreError::Persistence(e.to_string())
|
||||
})?;
|
||||
written += 1;
|
||||
}
|
||||
|
||||
for entry in keypackages.into_values() {
|
||||
let json = serde_json::to_string(&entry).map_err(|e| {
|
||||
StoreError::Serialization(e.to_string())
|
||||
})?;
|
||||
writeln!(tmp_file, "{}", json).map_err(|e| {
|
||||
StoreError::Persistence(e.to_string())
|
||||
})?;
|
||||
written += 1;
|
||||
}
|
||||
|
||||
for entry in messages.into_values() {
|
||||
let json = serde_json::to_string(&entry).map_err(|e| {
|
||||
StoreError::Serialization(e.to_string())
|
||||
})?;
|
||||
writeln!(tmp_file, "{}", json).map_err(|e| {
|
||||
StoreError::Persistence(e.to_string())
|
||||
})?;
|
||||
written += 1;
|
||||
}
|
||||
|
||||
tmp_file.sync_all().map_err(|e| {
|
||||
StoreError::Persistence(format!("failed to sync: {}", e))
|
||||
})?;
|
||||
drop(tmp_file);
|
||||
|
||||
// Close current writer
|
||||
self.writer = None;
|
||||
|
||||
// Replace old log with compacted one
|
||||
fs::rename(&tmp_path, &self.path).map_err(|e| {
|
||||
StoreError::Persistence(format!("failed to rename: {}", e))
|
||||
})?;
|
||||
|
||||
// Reopen
|
||||
let file = OpenOptions::new()
|
||||
.create(true)
|
||||
.append(true)
|
||||
.open(&self.path)
|
||||
.map_err(|e| StoreError::Persistence(format!("failed to reopen: {}", e)))?;
|
||||
|
||||
self.writer = Some(BufWriter::new(file));
|
||||
self.entries_since_compact = 0;
|
||||
|
||||
Ok(CompactStats {
|
||||
entries_before: self.entries_since_compact,
|
||||
entries_after: written,
|
||||
})
|
||||
}
|
||||
|
||||
/// Sync to disk.
|
||||
pub fn sync(&mut self) -> MeshResult<()> {
|
||||
if let Some(writer) = self.writer.as_mut() {
|
||||
writer.flush().map_err(|e| {
|
||||
StoreError::Persistence(format!("flush failed: {}", e))
|
||||
})?;
|
||||
writer.get_ref().sync_all().map_err(|e| {
|
||||
StoreError::Persistence(format!("sync failed: {}", e))
|
||||
})?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Compaction statistics.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct CompactStats {
|
||||
pub entries_before: usize,
|
||||
pub entries_after: usize,
|
||||
}
|
||||
|
||||
/// Persistent routing table storage.
|
||||
pub struct PersistentRoutingTable {
|
||||
log: AppendLog,
|
||||
routes: HashMap<MeshAddress, RouteEntry>,
|
||||
}
|
||||
|
||||
/// In-memory route entry.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct RouteEntry {
|
||||
pub next_hop: String,
|
||||
pub hops: u8,
|
||||
pub sequence: u32,
|
||||
pub expires_at: u64,
|
||||
}
|
||||
|
||||
impl PersistentRoutingTable {
|
||||
/// Open or create a persistent routing table.
|
||||
pub fn open(path: impl AsRef<Path>) -> MeshResult<Self> {
|
||||
let mut log = AppendLog::open(path)?;
|
||||
let mut routes = HashMap::new();
|
||||
|
||||
let now = now_secs();
|
||||
|
||||
for entry in log.read_all().unwrap_or_default() {
|
||||
if let StorageEntry::Route { address, next_hop, hops, sequence, expires_at } = entry {
|
||||
if expires_at > now {
|
||||
routes.insert(
|
||||
MeshAddress::from_bytes(address),
|
||||
RouteEntry { next_hop, hops, sequence, expires_at },
|
||||
);
|
||||
}
|
||||
} else if let StorageEntry::RouteRemove { address } = entry {
|
||||
routes.remove(&MeshAddress::from_bytes(address));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Self { log, routes })
|
||||
}
|
||||
|
||||
/// Insert or update a route.
|
||||
pub fn insert(
|
||||
&mut self,
|
||||
address: MeshAddress,
|
||||
next_hop: String,
|
||||
hops: u8,
|
||||
sequence: u32,
|
||||
ttl: Duration,
|
||||
) -> MeshResult<()> {
|
||||
let expires_at = now_secs() + ttl.as_secs();
|
||||
|
||||
self.log.append(&StorageEntry::Route {
|
||||
address: *address.as_bytes(),
|
||||
next_hop: next_hop.clone(),
|
||||
hops,
|
||||
sequence,
|
||||
expires_at,
|
||||
})?;
|
||||
|
||||
self.routes.insert(address, RouteEntry {
|
||||
next_hop,
|
||||
hops,
|
||||
sequence,
|
||||
expires_at,
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Look up a route.
|
||||
pub fn get(&self, address: &MeshAddress) -> Option<&RouteEntry> {
|
||||
let entry = self.routes.get(address)?;
|
||||
if entry.expires_at > now_secs() {
|
||||
Some(entry)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Remove a route.
|
||||
pub fn remove(&mut self, address: &MeshAddress) -> MeshResult<bool> {
|
||||
if self.routes.remove(address).is_some() {
|
||||
self.log.append(&StorageEntry::RouteRemove {
|
||||
address: *address.as_bytes(),
|
||||
})?;
|
||||
Ok(true)
|
||||
} else {
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
|
||||
/// Number of routes.
|
||||
pub fn len(&self) -> usize {
|
||||
self.routes.len()
|
||||
}
|
||||
|
||||
/// Check if empty.
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.routes.is_empty()
|
||||
}
|
||||
|
||||
/// Garbage collect expired routes.
|
||||
pub fn gc(&mut self) -> MeshResult<usize> {
|
||||
let now = now_secs();
|
||||
let expired: Vec<_> = self.routes
|
||||
.iter()
|
||||
.filter(|(_, e)| e.expires_at <= now)
|
||||
.map(|(a, _)| *a)
|
||||
.collect();
|
||||
|
||||
let count = expired.len();
|
||||
for addr in expired {
|
||||
self.remove(&addr)?;
|
||||
}
|
||||
Ok(count)
|
||||
}
|
||||
|
||||
/// Compact the underlying log.
|
||||
pub fn compact(&mut self) -> MeshResult<CompactStats> {
|
||||
self.log.compact()
|
||||
}
|
||||
|
||||
/// Sync to disk.
|
||||
pub fn sync(&mut self) -> MeshResult<()> {
|
||||
self.log.sync()
|
||||
}
|
||||
}
|
||||
|
||||
/// Persistent message store.
|
||||
pub struct PersistentMessageStore {
|
||||
log: AppendLog,
|
||||
messages: HashMap<Vec<u8>, MessageEntry>,
|
||||
by_recipient: HashMap<MeshAddress, Vec<Vec<u8>>>,
|
||||
}
|
||||
|
||||
/// In-memory message entry.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct MessageEntry {
|
||||
pub recipient: MeshAddress,
|
||||
pub data: Vec<u8>,
|
||||
pub expires_at: u64,
|
||||
}
|
||||
|
||||
impl PersistentMessageStore {
|
||||
/// Open or create a persistent message store.
|
||||
pub fn open(path: impl AsRef<Path>) -> MeshResult<Self> {
|
||||
let mut log = AppendLog::open(path)?;
|
||||
let mut messages = HashMap::new();
|
||||
let mut by_recipient: HashMap<MeshAddress, Vec<Vec<u8>>> = HashMap::new();
|
||||
|
||||
let now = now_secs();
|
||||
|
||||
for entry in log.read_all().unwrap_or_default() {
|
||||
if let StorageEntry::Message { id, recipient, data, expires_at } = entry {
|
||||
if expires_at > now {
|
||||
let addr = MeshAddress::from_bytes(recipient);
|
||||
messages.insert(id.clone(), MessageEntry {
|
||||
recipient: addr,
|
||||
data,
|
||||
expires_at,
|
||||
});
|
||||
by_recipient.entry(addr).or_default().push(id);
|
||||
}
|
||||
} else if let StorageEntry::MessageRemove { id } = entry {
|
||||
if let Some(entry) = messages.remove(&id) {
|
||||
if let Some(ids) = by_recipient.get_mut(&entry.recipient) {
|
||||
ids.retain(|i| i != &id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Self { log, messages, by_recipient })
|
||||
}
|
||||
|
||||
/// Store a message.
|
||||
pub fn store(
|
||||
&mut self,
|
||||
id: Vec<u8>,
|
||||
recipient: MeshAddress,
|
||||
data: Vec<u8>,
|
||||
ttl: Duration,
|
||||
) -> MeshResult<()> {
|
||||
let expires_at = now_secs() + ttl.as_secs();
|
||||
|
||||
self.log.append(&StorageEntry::Message {
|
||||
id: id.clone(),
|
||||
recipient: *recipient.as_bytes(),
|
||||
data: data.clone(),
|
||||
expires_at,
|
||||
})?;
|
||||
|
||||
self.messages.insert(id.clone(), MessageEntry {
|
||||
recipient,
|
||||
data,
|
||||
expires_at,
|
||||
});
|
||||
self.by_recipient.entry(recipient).or_default().push(id);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get messages for a recipient.
|
||||
pub fn get_for_recipient(&self, recipient: &MeshAddress) -> Vec<(Vec<u8>, Vec<u8>)> {
|
||||
let now = now_secs();
|
||||
self.by_recipient
|
||||
.get(recipient)
|
||||
.map(|ids| {
|
||||
ids.iter()
|
||||
.filter_map(|id| {
|
||||
let entry = self.messages.get(id)?;
|
||||
if entry.expires_at > now {
|
||||
Some((id.clone(), entry.data.clone()))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
})
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
/// Remove a message.
|
||||
pub fn remove(&mut self, id: &[u8]) -> MeshResult<bool> {
|
||||
if let Some(entry) = self.messages.remove(id) {
|
||||
if let Some(ids) = self.by_recipient.get_mut(&entry.recipient) {
|
||||
ids.retain(|i| i != id);
|
||||
}
|
||||
self.log.append(&StorageEntry::MessageRemove {
|
||||
id: id.to_vec(),
|
||||
})?;
|
||||
Ok(true)
|
||||
} else {
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
|
||||
/// Number of stored messages.
|
||||
pub fn len(&self) -> usize {
|
||||
self.messages.len()
|
||||
}
|
||||
|
||||
/// Check if empty.
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.messages.is_empty()
|
||||
}
|
||||
|
||||
/// Garbage collect expired messages.
|
||||
pub fn gc(&mut self) -> MeshResult<usize> {
|
||||
let now = now_secs();
|
||||
let expired: Vec<_> = self.messages
|
||||
.iter()
|
||||
.filter(|(_, e)| e.expires_at <= now)
|
||||
.map(|(id, _)| id.clone())
|
||||
.collect();
|
||||
|
||||
let count = expired.len();
|
||||
for id in expired {
|
||||
self.remove(&id)?;
|
||||
}
|
||||
Ok(count)
|
||||
}
|
||||
|
||||
/// Compact the underlying log.
|
||||
pub fn compact(&mut self) -> MeshResult<CompactStats> {
|
||||
self.log.compact()
|
||||
}
|
||||
|
||||
/// Sync to disk.
|
||||
pub fn sync(&mut self) -> MeshResult<()> {
|
||||
self.log.sync()
|
||||
}
|
||||
}
|
||||
|
||||
/// Get current time as Unix seconds.
|
||||
fn now_secs() -> u64 {
|
||||
SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_secs()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use tempfile::tempdir;
|
||||
|
||||
#[test]
|
||||
fn append_log_roundtrip() {
|
||||
let dir = tempdir().unwrap();
|
||||
let path = dir.path().join("test.log");
|
||||
|
||||
{
|
||||
let mut log = AppendLog::open(&path).unwrap();
|
||||
log.append(&StorageEntry::Route {
|
||||
address: [1u8; 16],
|
||||
next_hop: "tcp:127.0.0.1:8080".to_string(),
|
||||
hops: 2,
|
||||
sequence: 42,
|
||||
expires_at: now_secs() + 3600,
|
||||
}).unwrap();
|
||||
}
|
||||
|
||||
let log = AppendLog::open(&path).unwrap();
|
||||
let entries = log.read_all().unwrap();
|
||||
assert_eq!(entries.len(), 1);
|
||||
|
||||
if let StorageEntry::Route { sequence, .. } = &entries[0] {
|
||||
assert_eq!(*sequence, 42);
|
||||
} else {
|
||||
panic!("expected Route entry");
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn routing_table_persistence() {
|
||||
let dir = tempdir().unwrap();
|
||||
let path = dir.path().join("routes.log");
|
||||
|
||||
let addr = MeshAddress::from_bytes([0xAB; 16]);
|
||||
|
||||
{
|
||||
let mut rt = PersistentRoutingTable::open(&path).unwrap();
|
||||
rt.insert(
|
||||
addr,
|
||||
"tcp:192.168.1.1:8080".to_string(),
|
||||
3,
|
||||
100,
|
||||
Duration::from_secs(3600),
|
||||
).unwrap();
|
||||
rt.sync().unwrap();
|
||||
}
|
||||
|
||||
// Reopen and verify
|
||||
let rt = PersistentRoutingTable::open(&path).unwrap();
|
||||
let entry = rt.get(&addr).expect("route should exist");
|
||||
assert_eq!(entry.hops, 3);
|
||||
assert_eq!(entry.sequence, 100);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn message_store_persistence() {
|
||||
let dir = tempdir().unwrap();
|
||||
let path = dir.path().join("messages.log");
|
||||
|
||||
let recipient = MeshAddress::from_bytes([0xCD; 16]);
|
||||
let id = b"msg-001".to_vec();
|
||||
let data = b"Hello, mesh!".to_vec();
|
||||
|
||||
{
|
||||
let mut store = PersistentMessageStore::open(&path).unwrap();
|
||||
store.store(id.clone(), recipient, data.clone(), Duration::from_secs(3600)).unwrap();
|
||||
store.sync().unwrap();
|
||||
}
|
||||
|
||||
let store = PersistentMessageStore::open(&path).unwrap();
|
||||
let msgs = store.get_for_recipient(&recipient);
|
||||
assert_eq!(msgs.len(), 1);
|
||||
assert_eq!(msgs[0].0, id);
|
||||
assert_eq!(msgs[0].1, data);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn compaction_removes_deleted() {
|
||||
let dir = tempdir().unwrap();
|
||||
let path = dir.path().join("compact.log");
|
||||
|
||||
let addr1 = MeshAddress::from_bytes([1; 16]);
|
||||
let addr2 = MeshAddress::from_bytes([2; 16]);
|
||||
|
||||
{
|
||||
let mut rt = PersistentRoutingTable::open(&path).unwrap();
|
||||
rt.insert(addr1, "hop1".to_string(), 1, 1, Duration::from_secs(3600)).unwrap();
|
||||
rt.insert(addr2, "hop2".to_string(), 1, 1, Duration::from_secs(3600)).unwrap();
|
||||
rt.remove(&addr1).unwrap(); // Delete one
|
||||
rt.compact().unwrap();
|
||||
}
|
||||
|
||||
let rt = PersistentRoutingTable::open(&path).unwrap();
|
||||
assert!(rt.get(&addr1).is_none());
|
||||
assert!(rt.get(&addr2).is_some());
|
||||
assert_eq!(rt.len(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn gc_removes_expired() {
|
||||
let dir = tempdir().unwrap();
|
||||
let path = dir.path().join("gc.log");
|
||||
|
||||
let addr = MeshAddress::from_bytes([0xEE; 16]);
|
||||
|
||||
let mut rt = PersistentRoutingTable::open(&path).unwrap();
|
||||
rt.insert(addr, "hop".to_string(), 1, 1, Duration::from_secs(0)).unwrap();
|
||||
|
||||
// Should be expired immediately
|
||||
std::thread::sleep(Duration::from_millis(10));
|
||||
let gc_count = rt.gc().unwrap();
|
||||
assert_eq!(gc_count, 1);
|
||||
assert!(rt.get(&addr).is_none());
|
||||
}
|
||||
}
|
||||
482
crates/quicprochat-p2p/src/rate_limit.rs
Normal file
482
crates/quicprochat-p2p/src/rate_limit.rs
Normal file
@@ -0,0 +1,482 @@
|
||||
//! Rate limiting for DoS protection.
|
||||
//!
|
||||
//! This module provides token bucket rate limiters for controlling
|
||||
//! message rates per peer and globally. Designed for low overhead
|
||||
//! even on constrained devices.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::RwLock;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use crate::address::MeshAddress;
|
||||
use crate::config::RateLimitConfig;
|
||||
use crate::error::{MeshError, MeshResult};
|
||||
|
||||
/// Result of a rate limit check.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum RateLimitResult {
|
||||
/// Request allowed.
|
||||
Allowed,
|
||||
/// Request denied, retry after this duration.
|
||||
Denied { retry_after: Duration },
|
||||
/// Soft warning: approaching limit.
|
||||
Warning { remaining: u32 },
|
||||
}
|
||||
|
||||
impl RateLimitResult {
|
||||
pub fn is_allowed(&self) -> bool {
|
||||
matches!(self, Self::Allowed | Self::Warning { .. })
|
||||
}
|
||||
}
|
||||
|
||||
/// Token bucket rate limiter.
|
||||
#[derive(Debug)]
|
||||
pub struct TokenBucket {
|
||||
/// Maximum tokens (bucket capacity).
|
||||
capacity: u32,
|
||||
/// Current tokens.
|
||||
tokens: f64,
|
||||
/// Tokens added per second.
|
||||
refill_rate: f64,
|
||||
/// Last refill time.
|
||||
last_refill: Instant,
|
||||
}
|
||||
|
||||
impl TokenBucket {
|
||||
/// Create a new token bucket.
|
||||
pub fn new(capacity: u32, per_second: f64) -> Self {
|
||||
Self {
|
||||
capacity,
|
||||
tokens: capacity as f64,
|
||||
refill_rate: per_second,
|
||||
last_refill: Instant::now(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Create from per-minute rate.
|
||||
pub fn per_minute(per_minute: u32) -> Self {
|
||||
let capacity = per_minute.max(1);
|
||||
let per_second = per_minute as f64 / 60.0;
|
||||
Self::new(capacity, per_second)
|
||||
}
|
||||
|
||||
/// Refill tokens based on elapsed time.
|
||||
fn refill(&mut self) {
|
||||
let now = Instant::now();
|
||||
let elapsed = now.duration_since(self.last_refill);
|
||||
let add = elapsed.as_secs_f64() * self.refill_rate;
|
||||
self.tokens = (self.tokens + add).min(self.capacity as f64);
|
||||
self.last_refill = now;
|
||||
}
|
||||
|
||||
/// Try to consume one token.
|
||||
pub fn try_acquire(&mut self) -> RateLimitResult {
|
||||
self.try_acquire_n(1)
|
||||
}
|
||||
|
||||
/// Try to consume n tokens.
|
||||
pub fn try_acquire_n(&mut self, n: u32) -> RateLimitResult {
|
||||
self.refill();
|
||||
|
||||
let n_f = n as f64;
|
||||
if self.tokens >= n_f {
|
||||
self.tokens -= n_f;
|
||||
let remaining = self.tokens as u32;
|
||||
if remaining < self.capacity / 4 {
|
||||
RateLimitResult::Warning { remaining }
|
||||
} else {
|
||||
RateLimitResult::Allowed
|
||||
}
|
||||
} else {
|
||||
let deficit = n_f - self.tokens;
|
||||
let wait_secs = deficit / self.refill_rate;
|
||||
RateLimitResult::Denied {
|
||||
retry_after: Duration::from_secs_f64(wait_secs),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Current available tokens.
|
||||
pub fn available(&mut self) -> u32 {
|
||||
self.refill();
|
||||
self.tokens as u32
|
||||
}
|
||||
}
|
||||
|
||||
/// Per-peer rate limiter with multiple buckets.
|
||||
#[derive(Debug)]
|
||||
pub struct PeerRateLimiter {
|
||||
/// Message bucket.
|
||||
messages: TokenBucket,
|
||||
/// Announce bucket.
|
||||
announces: TokenBucket,
|
||||
/// KeyPackage request bucket.
|
||||
keypackage_requests: TokenBucket,
|
||||
/// Last activity (for cleanup).
|
||||
last_activity: Instant,
|
||||
}
|
||||
|
||||
impl PeerRateLimiter {
|
||||
pub fn from_config(config: &RateLimitConfig) -> Self {
|
||||
Self {
|
||||
messages: TokenBucket::per_minute(config.message_per_peer_per_min),
|
||||
announces: TokenBucket::per_minute(config.announce_per_peer_per_min),
|
||||
keypackage_requests: TokenBucket::per_minute(config.keypackage_requests_per_min),
|
||||
last_activity: Instant::now(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn check_message(&mut self) -> RateLimitResult {
|
||||
self.last_activity = Instant::now();
|
||||
self.messages.try_acquire()
|
||||
}
|
||||
|
||||
pub fn check_announce(&mut self) -> RateLimitResult {
|
||||
self.last_activity = Instant::now();
|
||||
self.announces.try_acquire()
|
||||
}
|
||||
|
||||
pub fn check_keypackage_request(&mut self) -> RateLimitResult {
|
||||
self.last_activity = Instant::now();
|
||||
self.keypackage_requests.try_acquire()
|
||||
}
|
||||
|
||||
/// Time since last activity.
|
||||
pub fn idle_time(&self) -> Duration {
|
||||
self.last_activity.elapsed()
|
||||
}
|
||||
}
|
||||
|
||||
/// Global rate limiter managing per-peer limits.
|
||||
pub struct RateLimiter {
|
||||
/// Configuration.
|
||||
config: RateLimitConfig,
|
||||
/// Per-peer limiters.
|
||||
peers: RwLock<HashMap<MeshAddress, PeerRateLimiter>>,
|
||||
/// Maximum tracked peers (to prevent memory exhaustion).
|
||||
max_peers: usize,
|
||||
}
|
||||
|
||||
impl RateLimiter {
|
||||
pub fn new(config: RateLimitConfig) -> Self {
|
||||
Self {
|
||||
config,
|
||||
peers: RwLock::new(HashMap::new()),
|
||||
max_peers: 10_000,
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if a message from peer is allowed.
|
||||
pub fn check_message(&self, peer: &MeshAddress) -> MeshResult<RateLimitResult> {
|
||||
let mut peers = self.peers.write().map_err(|_| {
|
||||
MeshError::Internal("rate limiter lock poisoned".to_string())
|
||||
})?;
|
||||
|
||||
let limiter = peers
|
||||
.entry(*peer)
|
||||
.or_insert_with(|| PeerRateLimiter::from_config(&self.config));
|
||||
|
||||
Ok(limiter.check_message())
|
||||
}
|
||||
|
||||
/// Check if an announce from peer is allowed.
|
||||
pub fn check_announce(&self, peer: &MeshAddress) -> MeshResult<RateLimitResult> {
|
||||
let mut peers = self.peers.write().map_err(|_| {
|
||||
MeshError::Internal("rate limiter lock poisoned".to_string())
|
||||
})?;
|
||||
|
||||
let limiter = peers
|
||||
.entry(*peer)
|
||||
.or_insert_with(|| PeerRateLimiter::from_config(&self.config));
|
||||
|
||||
Ok(limiter.check_announce())
|
||||
}
|
||||
|
||||
/// Check if a KeyPackage request from peer is allowed.
|
||||
pub fn check_keypackage_request(&self, peer: &MeshAddress) -> MeshResult<RateLimitResult> {
|
||||
let mut peers = self.peers.write().map_err(|_| {
|
||||
MeshError::Internal("rate limiter lock poisoned".to_string())
|
||||
})?;
|
||||
|
||||
let limiter = peers
|
||||
.entry(*peer)
|
||||
.or_insert_with(|| PeerRateLimiter::from_config(&self.config));
|
||||
|
||||
Ok(limiter.check_keypackage_request())
|
||||
}
|
||||
|
||||
/// Remove limiters for peers idle longer than max_idle.
|
||||
pub fn cleanup(&self, max_idle: Duration) -> usize {
|
||||
let mut peers = match self.peers.write() {
|
||||
Ok(p) => p,
|
||||
Err(_) => return 0,
|
||||
};
|
||||
|
||||
let before = peers.len();
|
||||
peers.retain(|_, limiter| limiter.idle_time() < max_idle);
|
||||
before - peers.len()
|
||||
}
|
||||
|
||||
/// Number of tracked peers.
|
||||
pub fn tracked_peers(&self) -> usize {
|
||||
self.peers.read().map(|p| p.len()).unwrap_or(0)
|
||||
}
|
||||
}
|
||||
|
||||
/// Duty cycle tracker for LoRa compliance.
|
||||
#[derive(Debug)]
|
||||
pub struct DutyCycleTracker {
|
||||
/// Duty cycle limit (0.0 to 1.0).
|
||||
limit: f32,
|
||||
/// Window size for tracking.
|
||||
window: Duration,
|
||||
/// Transmission records: (timestamp, duration_ms).
|
||||
transmissions: RwLock<Vec<(Instant, u64)>>,
|
||||
}
|
||||
|
||||
impl DutyCycleTracker {
|
||||
/// Create with a duty cycle limit (e.g., 0.01 for 1%).
|
||||
pub fn new(limit: f32) -> Self {
|
||||
Self {
|
||||
limit: limit.clamp(0.0, 1.0),
|
||||
window: Duration::from_secs(3600), // 1 hour window
|
||||
transmissions: RwLock::new(Vec::new()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if we can transmit for the given duration.
|
||||
pub fn can_transmit(&self, airtime_ms: u64) -> bool {
|
||||
let used = self.used_ms();
|
||||
let window_ms = self.window.as_millis() as u64;
|
||||
let limit_ms = (window_ms as f32 * self.limit) as u64;
|
||||
used + airtime_ms <= limit_ms
|
||||
}
|
||||
|
||||
/// Record a transmission.
|
||||
pub fn record(&self, airtime_ms: u64) {
|
||||
if let Ok(mut tx) = self.transmissions.write() {
|
||||
tx.push((Instant::now(), airtime_ms));
|
||||
}
|
||||
}
|
||||
|
||||
/// Get total airtime used in current window.
|
||||
pub fn used_ms(&self) -> u64 {
|
||||
let cutoff = Instant::now() - self.window;
|
||||
let tx = match self.transmissions.read() {
|
||||
Ok(t) => t,
|
||||
Err(_) => return 0,
|
||||
};
|
||||
|
||||
tx.iter()
|
||||
.filter(|(t, _)| *t > cutoff)
|
||||
.map(|(_, d)| *d)
|
||||
.sum()
|
||||
}
|
||||
|
||||
/// Get remaining airtime in current window.
|
||||
pub fn remaining_ms(&self) -> u64 {
|
||||
let window_ms = self.window.as_millis() as u64;
|
||||
let limit_ms = (window_ms as f32 * self.limit) as u64;
|
||||
limit_ms.saturating_sub(self.used_ms())
|
||||
}
|
||||
|
||||
/// Clean up old records.
|
||||
pub fn cleanup(&self) {
|
||||
let cutoff = Instant::now() - self.window;
|
||||
if let Ok(mut tx) = self.transmissions.write() {
|
||||
tx.retain(|(t, _)| *t > cutoff);
|
||||
}
|
||||
}
|
||||
|
||||
/// Current duty cycle usage as fraction.
|
||||
pub fn current_usage(&self) -> f32 {
|
||||
let window_ms = self.window.as_millis() as f32;
|
||||
self.used_ms() as f32 / window_ms
|
||||
}
|
||||
}
|
||||
|
||||
/// Backpressure signal for flow control.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum BackpressureLevel {
|
||||
/// No backpressure, process normally.
|
||||
None,
|
||||
/// Light pressure, shed low-priority work.
|
||||
Light,
|
||||
/// Medium pressure, shed non-critical work.
|
||||
Medium,
|
||||
/// Heavy pressure, only process critical messages.
|
||||
Heavy,
|
||||
/// Overloaded, reject new work.
|
||||
Overloaded,
|
||||
}
|
||||
|
||||
impl BackpressureLevel {
|
||||
/// Should we process a message at this priority (0 = highest)?
|
||||
pub fn should_process(&self, priority: u8) -> bool {
|
||||
match self {
|
||||
Self::None => true,
|
||||
Self::Light => priority <= 2,
|
||||
Self::Medium => priority <= 1,
|
||||
Self::Heavy => priority == 0,
|
||||
Self::Overloaded => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Backpressure controller based on queue depth.
|
||||
#[derive(Debug)]
|
||||
pub struct BackpressureController {
|
||||
/// Thresholds for each level.
|
||||
thresholds: [usize; 4],
|
||||
/// Current queue depth.
|
||||
current: std::sync::atomic::AtomicUsize,
|
||||
}
|
||||
|
||||
impl BackpressureController {
|
||||
pub fn new(light: usize, medium: usize, heavy: usize, overload: usize) -> Self {
|
||||
Self {
|
||||
thresholds: [light, medium, heavy, overload],
|
||||
current: std::sync::atomic::AtomicUsize::new(0),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn default_for_constrained() -> Self {
|
||||
Self::new(10, 25, 50, 100)
|
||||
}
|
||||
|
||||
pub fn default_for_standard() -> Self {
|
||||
Self::new(100, 500, 1000, 5000)
|
||||
}
|
||||
|
||||
pub fn set_queue_depth(&self, depth: usize) {
|
||||
self.current.store(depth, std::sync::atomic::Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub fn level(&self) -> BackpressureLevel {
|
||||
let depth = self.current.load(std::sync::atomic::Ordering::Relaxed);
|
||||
if depth >= self.thresholds[3] {
|
||||
BackpressureLevel::Overloaded
|
||||
} else if depth >= self.thresholds[2] {
|
||||
BackpressureLevel::Heavy
|
||||
} else if depth >= self.thresholds[1] {
|
||||
BackpressureLevel::Medium
|
||||
} else if depth >= self.thresholds[0] {
|
||||
BackpressureLevel::Light
|
||||
} else {
|
||||
BackpressureLevel::None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn token_bucket_allows_burst() {
|
||||
let mut bucket = TokenBucket::new(10, 1.0);
|
||||
for _ in 0..10 {
|
||||
assert!(bucket.try_acquire().is_allowed());
|
||||
}
|
||||
assert!(!bucket.try_acquire().is_allowed());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn token_bucket_refills() {
|
||||
let mut bucket = TokenBucket::new(2, 100.0); // 100/sec refill
|
||||
bucket.try_acquire();
|
||||
bucket.try_acquire();
|
||||
assert!(!bucket.try_acquire().is_allowed());
|
||||
|
||||
std::thread::sleep(Duration::from_millis(50));
|
||||
assert!(bucket.try_acquire().is_allowed());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn token_bucket_warning() {
|
||||
let mut bucket = TokenBucket::new(8, 1.0);
|
||||
// Use 7 tokens (leaves 1, which is < 8/4 = 2)
|
||||
for _ in 0..7 {
|
||||
bucket.try_acquire();
|
||||
}
|
||||
let result = bucket.try_acquire();
|
||||
assert!(matches!(result, RateLimitResult::Warning { remaining: 0 }));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn peer_rate_limiter() {
|
||||
let config = RateLimitConfig {
|
||||
message_per_peer_per_min: 5,
|
||||
..Default::default()
|
||||
};
|
||||
let mut limiter = PeerRateLimiter::from_config(&config);
|
||||
|
||||
for _ in 0..5 {
|
||||
assert!(limiter.check_message().is_allowed());
|
||||
}
|
||||
assert!(!limiter.check_message().is_allowed());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rate_limiter_per_peer() {
|
||||
let config = RateLimitConfig {
|
||||
message_per_peer_per_min: 2,
|
||||
..Default::default()
|
||||
};
|
||||
let limiter = RateLimiter::new(config);
|
||||
|
||||
let peer1 = MeshAddress::from_bytes([1; 16]);
|
||||
let peer2 = MeshAddress::from_bytes([2; 16]);
|
||||
|
||||
assert!(limiter.check_message(&peer1).unwrap().is_allowed());
|
||||
assert!(limiter.check_message(&peer1).unwrap().is_allowed());
|
||||
assert!(!limiter.check_message(&peer1).unwrap().is_allowed());
|
||||
|
||||
// peer2 has its own bucket
|
||||
assert!(limiter.check_message(&peer2).unwrap().is_allowed());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn duty_cycle_tracker() {
|
||||
let tracker = DutyCycleTracker::new(0.01); // 1%
|
||||
// 1 hour = 3600000 ms, 1% = 36000 ms
|
||||
|
||||
assert!(tracker.can_transmit(1000));
|
||||
tracker.record(1000);
|
||||
assert_eq!(tracker.used_ms(), 1000);
|
||||
|
||||
assert!(tracker.can_transmit(35000));
|
||||
tracker.record(35000);
|
||||
|
||||
// Now at 36000ms, at limit
|
||||
assert!(!tracker.can_transmit(1000));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn backpressure_levels() {
|
||||
let bp = BackpressureController::new(10, 50, 100, 200);
|
||||
|
||||
bp.set_queue_depth(5);
|
||||
assert_eq!(bp.level(), BackpressureLevel::None);
|
||||
|
||||
bp.set_queue_depth(30);
|
||||
assert_eq!(bp.level(), BackpressureLevel::Light);
|
||||
|
||||
bp.set_queue_depth(75);
|
||||
assert_eq!(bp.level(), BackpressureLevel::Medium);
|
||||
|
||||
bp.set_queue_depth(150);
|
||||
assert_eq!(bp.level(), BackpressureLevel::Heavy);
|
||||
|
||||
bp.set_queue_depth(250);
|
||||
assert_eq!(bp.level(), BackpressureLevel::Overloaded);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn backpressure_priority_filter() {
|
||||
assert!(BackpressureLevel::None.should_process(5));
|
||||
assert!(!BackpressureLevel::Light.should_process(5));
|
||||
assert!(BackpressureLevel::Light.should_process(2));
|
||||
assert!(!BackpressureLevel::Overloaded.should_process(0));
|
||||
}
|
||||
}
|
||||
470
crates/quicprochat-p2p/src/shutdown.rs
Normal file
470
crates/quicprochat-p2p/src/shutdown.rs
Normal file
@@ -0,0 +1,470 @@
|
||||
//! Graceful shutdown coordination for mesh nodes.
|
||||
//!
|
||||
//! This module provides coordinated shutdown with:
|
||||
//! - Signal handling (SIGTERM, SIGINT, SIGHUP)
|
||||
//! - Connection draining
|
||||
//! - State persistence
|
||||
//! - Cleanup hooks
|
||||
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use tokio::sync::{broadcast, mpsc, watch, Notify};
|
||||
use tokio::time::timeout;
|
||||
|
||||
/// Shutdown phase.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
#[repr(u8)]
|
||||
pub enum ShutdownPhase {
|
||||
/// Normal operation.
|
||||
Running = 0,
|
||||
/// Shutdown initiated, draining connections.
|
||||
Draining = 1,
|
||||
/// Persisting state.
|
||||
Persisting = 2,
|
||||
/// Running cleanup hooks.
|
||||
Cleanup = 3,
|
||||
/// Shutdown complete.
|
||||
Complete = 4,
|
||||
}
|
||||
|
||||
impl From<u8> for ShutdownPhase {
|
||||
fn from(v: u8) -> Self {
|
||||
match v {
|
||||
0 => Self::Running,
|
||||
1 => Self::Draining,
|
||||
2 => Self::Persisting,
|
||||
3 => Self::Cleanup,
|
||||
_ => Self::Complete,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Shutdown coordinator.
|
||||
pub struct ShutdownCoordinator {
|
||||
/// Current phase.
|
||||
phase: AtomicU8,
|
||||
/// Shutdown signal broadcast.
|
||||
shutdown_tx: broadcast::Sender<ShutdownPhase>,
|
||||
/// Notify when all tasks complete.
|
||||
all_done: Arc<Notify>,
|
||||
/// Active task count.
|
||||
active_tasks: std::sync::atomic::AtomicUsize,
|
||||
/// Drain timeout.
|
||||
drain_timeout: Duration,
|
||||
/// Persist timeout.
|
||||
persist_timeout: Duration,
|
||||
}
|
||||
|
||||
impl ShutdownCoordinator {
|
||||
pub fn new() -> Self {
|
||||
let (shutdown_tx, _) = broadcast::channel(16);
|
||||
Self {
|
||||
phase: AtomicU8::new(ShutdownPhase::Running as u8),
|
||||
shutdown_tx,
|
||||
all_done: Arc::new(Notify::new()),
|
||||
active_tasks: std::sync::atomic::AtomicUsize::new(0),
|
||||
drain_timeout: Duration::from_secs(30),
|
||||
persist_timeout: Duration::from_secs(10),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_timeouts(drain: Duration, persist: Duration) -> Self {
|
||||
let mut s = Self::new();
|
||||
s.drain_timeout = drain;
|
||||
s.persist_timeout = persist;
|
||||
s
|
||||
}
|
||||
|
||||
/// Get current phase.
|
||||
pub fn phase(&self) -> ShutdownPhase {
|
||||
self.phase.load(Ordering::SeqCst).into()
|
||||
}
|
||||
|
||||
/// Check if shutdown is in progress.
|
||||
pub fn is_shutting_down(&self) -> bool {
|
||||
self.phase() != ShutdownPhase::Running
|
||||
}
|
||||
|
||||
/// Subscribe to shutdown notifications.
|
||||
pub fn subscribe(&self) -> broadcast::Receiver<ShutdownPhase> {
|
||||
self.shutdown_tx.subscribe()
|
||||
}
|
||||
|
||||
/// Register a task.
|
||||
pub fn register_task(&self) -> TaskGuard {
|
||||
self.active_tasks.fetch_add(1, Ordering::SeqCst);
|
||||
TaskGuard {
|
||||
active_tasks: &self.active_tasks,
|
||||
all_done: Arc::clone(&self.all_done),
|
||||
}
|
||||
}
|
||||
|
||||
/// Initiate shutdown.
|
||||
pub async fn shutdown(&self) {
|
||||
// Phase 1: Draining
|
||||
self.set_phase(ShutdownPhase::Draining);
|
||||
|
||||
// Wait for tasks to complete or timeout
|
||||
let drain_result = timeout(
|
||||
self.drain_timeout,
|
||||
self.wait_for_tasks(),
|
||||
).await;
|
||||
|
||||
if drain_result.is_err() {
|
||||
tracing::warn!(
|
||||
"drain timeout reached with {} tasks remaining",
|
||||
self.active_tasks.load(Ordering::SeqCst)
|
||||
);
|
||||
}
|
||||
|
||||
// Phase 2: Persisting
|
||||
self.set_phase(ShutdownPhase::Persisting);
|
||||
|
||||
// Give persist hooks time to run
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
|
||||
// Phase 3: Cleanup
|
||||
self.set_phase(ShutdownPhase::Cleanup);
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
|
||||
// Complete
|
||||
self.set_phase(ShutdownPhase::Complete);
|
||||
}
|
||||
|
||||
fn set_phase(&self, phase: ShutdownPhase) {
|
||||
self.phase.store(phase as u8, Ordering::SeqCst);
|
||||
let _ = self.shutdown_tx.send(phase);
|
||||
}
|
||||
|
||||
async fn wait_for_tasks(&self) {
|
||||
while self.active_tasks.load(Ordering::SeqCst) > 0 {
|
||||
self.all_done.notified().await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for ShutdownCoordinator {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
/// RAII guard for tracking active tasks.
|
||||
pub struct TaskGuard<'a> {
|
||||
active_tasks: &'a std::sync::atomic::AtomicUsize,
|
||||
all_done: Arc<Notify>,
|
||||
}
|
||||
|
||||
impl<'a> Drop for TaskGuard<'a> {
|
||||
fn drop(&mut self) {
|
||||
let prev = self.active_tasks.fetch_sub(1, Ordering::SeqCst);
|
||||
if prev == 1 {
|
||||
self.all_done.notify_waiters();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Shutdown handle for use in async tasks.
|
||||
#[derive(Clone)]
|
||||
pub struct ShutdownSignal {
|
||||
/// Watch receiver for shutdown.
|
||||
watch_rx: watch::Receiver<bool>,
|
||||
}
|
||||
|
||||
impl ShutdownSignal {
|
||||
/// Create a new signal pair.
|
||||
pub fn new() -> (ShutdownTrigger, Self) {
|
||||
let (tx, rx) = watch::channel(false);
|
||||
(ShutdownTrigger { watch_tx: tx }, Self { watch_rx: rx })
|
||||
}
|
||||
|
||||
/// Check if shutdown has been triggered.
|
||||
pub fn is_triggered(&self) -> bool {
|
||||
*self.watch_rx.borrow()
|
||||
}
|
||||
|
||||
/// Wait for shutdown signal.
|
||||
pub async fn wait(&mut self) {
|
||||
let _ = self.watch_rx.wait_for(|&triggered| triggered).await;
|
||||
}
|
||||
|
||||
/// Create a future that completes on shutdown.
|
||||
pub fn recv(&mut self) -> impl Future<Output = ()> + '_ {
|
||||
async move {
|
||||
self.wait().await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for ShutdownSignal {
|
||||
fn default() -> Self {
|
||||
Self::new().1
|
||||
}
|
||||
}
|
||||
|
||||
/// Trigger for shutdown signal.
|
||||
#[derive(Clone)]
|
||||
pub struct ShutdownTrigger {
|
||||
watch_tx: watch::Sender<bool>,
|
||||
}
|
||||
|
||||
impl ShutdownTrigger {
|
||||
/// Trigger shutdown.
|
||||
pub fn trigger(&self) {
|
||||
let _ = self.watch_tx.send(true);
|
||||
}
|
||||
}
|
||||
|
||||
/// Shutdown hook type.
|
||||
pub type ShutdownHook = Box<
|
||||
dyn FnOnce() -> Pin<Box<dyn Future<Output = ()> + Send>> + Send
|
||||
>;
|
||||
|
||||
/// Manages shutdown hooks.
|
||||
pub struct ShutdownHooks {
|
||||
persist_hooks: Vec<ShutdownHook>,
|
||||
cleanup_hooks: Vec<ShutdownHook>,
|
||||
}
|
||||
|
||||
impl ShutdownHooks {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
persist_hooks: Vec::new(),
|
||||
cleanup_hooks: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Register a persist hook (runs during Persisting phase).
|
||||
pub fn on_persist<F, Fut>(&mut self, f: F)
|
||||
where
|
||||
F: FnOnce() -> Fut + Send + 'static,
|
||||
Fut: Future<Output = ()> + Send + 'static,
|
||||
{
|
||||
self.persist_hooks.push(Box::new(|| Box::pin(f())));
|
||||
}
|
||||
|
||||
/// Register a cleanup hook (runs during Cleanup phase).
|
||||
pub fn on_cleanup<F, Fut>(&mut self, f: F)
|
||||
where
|
||||
F: FnOnce() -> Fut + Send + 'static,
|
||||
Fut: Future<Output = ()> + Send + 'static,
|
||||
{
|
||||
self.cleanup_hooks.push(Box::new(|| Box::pin(f())));
|
||||
}
|
||||
|
||||
/// Run all persist hooks.
|
||||
pub async fn run_persist(&mut self) {
|
||||
for hook in self.persist_hooks.drain(..) {
|
||||
hook().await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Run all cleanup hooks.
|
||||
pub async fn run_cleanup(&mut self) {
|
||||
for hook in self.cleanup_hooks.drain(..) {
|
||||
hook().await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for ShutdownHooks {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
/// Draining connection tracker.
|
||||
pub struct ConnectionDrainer {
|
||||
/// Maximum connections to track.
|
||||
max_connections: usize,
|
||||
/// Active connections.
|
||||
active: std::sync::atomic::AtomicUsize,
|
||||
/// Notify when connection count changes.
|
||||
notify: Notify,
|
||||
/// Stopped accepting new connections.
|
||||
draining: AtomicBool,
|
||||
}
|
||||
|
||||
impl ConnectionDrainer {
|
||||
pub fn new(max_connections: usize) -> Self {
|
||||
Self {
|
||||
max_connections,
|
||||
active: std::sync::atomic::AtomicUsize::new(0),
|
||||
notify: Notify::new(),
|
||||
draining: AtomicBool::new(false),
|
||||
}
|
||||
}
|
||||
|
||||
/// Try to accept a new connection.
|
||||
pub fn try_accept(&self) -> Option<ConnectionGuard<'_>> {
|
||||
if self.draining.load(Ordering::SeqCst) {
|
||||
return None;
|
||||
}
|
||||
|
||||
let current = self.active.fetch_add(1, Ordering::SeqCst);
|
||||
if current >= self.max_connections {
|
||||
self.active.fetch_sub(1, Ordering::SeqCst);
|
||||
return None;
|
||||
}
|
||||
|
||||
Some(ConnectionGuard { drainer: self })
|
||||
}
|
||||
|
||||
/// Start draining (stop accepting new connections).
|
||||
pub fn start_drain(&self) {
|
||||
self.draining.store(true, Ordering::SeqCst);
|
||||
}
|
||||
|
||||
/// Wait for all connections to close.
|
||||
pub async fn wait_drained(&self) {
|
||||
while self.active.load(Ordering::SeqCst) > 0 {
|
||||
self.notify.notified().await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Current connection count.
|
||||
pub fn active_count(&self) -> usize {
|
||||
self.active.load(Ordering::SeqCst)
|
||||
}
|
||||
|
||||
/// Is draining?
|
||||
pub fn is_draining(&self) -> bool {
|
||||
self.draining.load(Ordering::SeqCst)
|
||||
}
|
||||
}
|
||||
|
||||
/// RAII guard for active connections.
|
||||
pub struct ConnectionGuard<'a> {
|
||||
drainer: &'a ConnectionDrainer,
|
||||
}
|
||||
|
||||
impl<'a> Drop for ConnectionGuard<'a> {
|
||||
fn drop(&mut self) {
|
||||
self.drainer.active.fetch_sub(1, Ordering::SeqCst);
|
||||
self.drainer.notify.notify_waiters();
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn shutdown_phases() {
|
||||
let coord = ShutdownCoordinator::with_timeouts(
|
||||
Duration::from_millis(100),
|
||||
Duration::from_millis(50),
|
||||
);
|
||||
|
||||
assert_eq!(coord.phase(), ShutdownPhase::Running);
|
||||
assert!(!coord.is_shutting_down());
|
||||
|
||||
let mut rx = coord.subscribe();
|
||||
|
||||
tokio::spawn(async move {
|
||||
coord.shutdown().await;
|
||||
});
|
||||
|
||||
// Should receive phase transitions
|
||||
let phase = rx.recv().await.unwrap();
|
||||
assert_eq!(phase, ShutdownPhase::Draining);
|
||||
|
||||
let phase = rx.recv().await.unwrap();
|
||||
assert_eq!(phase, ShutdownPhase::Persisting);
|
||||
|
||||
let phase = rx.recv().await.unwrap();
|
||||
assert_eq!(phase, ShutdownPhase::Cleanup);
|
||||
|
||||
let phase = rx.recv().await.unwrap();
|
||||
assert_eq!(phase, ShutdownPhase::Complete);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn task_tracking() {
|
||||
let coord = ShutdownCoordinator::with_timeouts(
|
||||
Duration::from_secs(1),
|
||||
Duration::from_millis(50),
|
||||
);
|
||||
|
||||
let guard1 = coord.register_task();
|
||||
let guard2 = coord.register_task();
|
||||
|
||||
assert_eq!(coord.active_tasks.load(Ordering::SeqCst), 2);
|
||||
|
||||
drop(guard1);
|
||||
assert_eq!(coord.active_tasks.load(Ordering::SeqCst), 1);
|
||||
|
||||
drop(guard2);
|
||||
assert_eq!(coord.active_tasks.load(Ordering::SeqCst), 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn shutdown_signal() {
|
||||
let (trigger, mut signal) = ShutdownSignal::new();
|
||||
|
||||
assert!(!signal.is_triggered());
|
||||
|
||||
let handle = tokio::spawn(async move {
|
||||
signal.wait().await;
|
||||
true
|
||||
});
|
||||
|
||||
trigger.trigger();
|
||||
assert!(handle.await.unwrap());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn connection_drainer() {
|
||||
let drainer = ConnectionDrainer::new(2);
|
||||
|
||||
let conn1 = drainer.try_accept().expect("should accept");
|
||||
let conn2 = drainer.try_accept().expect("should accept");
|
||||
assert!(drainer.try_accept().is_none()); // At capacity
|
||||
|
||||
assert_eq!(drainer.active_count(), 2);
|
||||
|
||||
drop(conn1);
|
||||
assert_eq!(drainer.active_count(), 1);
|
||||
|
||||
drainer.start_drain();
|
||||
assert!(drainer.try_accept().is_none()); // Draining
|
||||
|
||||
drop(conn2);
|
||||
|
||||
// Should complete immediately
|
||||
tokio::time::timeout(
|
||||
Duration::from_millis(100),
|
||||
drainer.wait_drained(),
|
||||
).await.expect("should drain quickly");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn shutdown_hooks() {
|
||||
use std::sync::atomic::AtomicBool;
|
||||
|
||||
let persist_ran = Arc::new(AtomicBool::new(false));
|
||||
let cleanup_ran = Arc::new(AtomicBool::new(false));
|
||||
|
||||
let persist_flag = Arc::clone(&persist_ran);
|
||||
let cleanup_flag = Arc::clone(&cleanup_ran);
|
||||
|
||||
let mut hooks = ShutdownHooks::new();
|
||||
hooks.on_persist(move || async move {
|
||||
persist_flag.store(true, Ordering::SeqCst);
|
||||
});
|
||||
hooks.on_cleanup(move || async move {
|
||||
cleanup_flag.store(true, Ordering::SeqCst);
|
||||
});
|
||||
|
||||
hooks.run_persist().await;
|
||||
assert!(persist_ran.load(Ordering::SeqCst));
|
||||
assert!(!cleanup_ran.load(Ordering::SeqCst));
|
||||
|
||||
hooks.run_cleanup().await;
|
||||
assert!(cleanup_ran.load(Ordering::SeqCst));
|
||||
}
|
||||
}
|
||||
414
crates/quicprochat-p2p/tests/multi_node.rs
Normal file
414
crates/quicprochat-p2p/tests/multi_node.rs
Normal file
@@ -0,0 +1,414 @@
|
||||
//! Multi-node integration tests for mesh networking.
|
||||
//!
|
||||
//! These tests verify the behavior of multiple mesh nodes communicating
|
||||
//! via TCP transport. They cover routing, store-and-forward, and failure
|
||||
//! scenarios.
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use quicprochat_p2p::address::MeshAddress;
|
||||
use quicprochat_p2p::config::{MeshConfig, RateLimitConfig};
|
||||
use quicprochat_p2p::envelope::MeshEnvelope;
|
||||
use quicprochat_p2p::envelope_v2::{MeshEnvelopeV2, Priority};
|
||||
use quicprochat_p2p::identity::MeshIdentity;
|
||||
use quicprochat_p2p::metrics::MeshMetrics;
|
||||
use quicprochat_p2p::rate_limit::RateLimiter;
|
||||
use quicprochat_p2p::store::MeshStore;
|
||||
use quicprochat_p2p::shutdown::{ShutdownCoordinator, ShutdownSignal};
|
||||
|
||||
#[tokio::test]
|
||||
async fn rate_limiting_blocks_excessive_traffic() {
|
||||
let config = RateLimitConfig {
|
||||
message_per_peer_per_min: 5,
|
||||
..Default::default()
|
||||
};
|
||||
let limiter = RateLimiter::new(config);
|
||||
|
||||
let peer = MeshAddress::from_bytes([0xAB; 16]);
|
||||
|
||||
// First 5 should be allowed
|
||||
for _ in 0..5 {
|
||||
let result = limiter.check_message(&peer).unwrap();
|
||||
assert!(result.is_allowed());
|
||||
}
|
||||
|
||||
// 6th should be denied
|
||||
let result = limiter.check_message(&peer).unwrap();
|
||||
assert!(!result.is_allowed());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn store_and_forward_for_offline_peer() {
|
||||
let mut store = MeshStore::new(100);
|
||||
|
||||
let identity = MeshIdentity::generate();
|
||||
let recipient_key = identity.public_key();
|
||||
|
||||
// Create an envelope for the recipient
|
||||
let sender = MeshIdentity::generate();
|
||||
let envelope = MeshEnvelope::new(
|
||||
&sender,
|
||||
&recipient_key,
|
||||
b"message for offline peer".to_vec(),
|
||||
3600,
|
||||
5,
|
||||
);
|
||||
|
||||
// Store message
|
||||
assert!(store.store(envelope.clone()));
|
||||
|
||||
// Verify it's in the store
|
||||
let messages = store.peek(&recipient_key);
|
||||
assert_eq!(messages.len(), 1);
|
||||
assert_eq!(messages[0].payload, b"message for offline peer");
|
||||
|
||||
// Fetch (consume) messages
|
||||
let fetched = store.fetch(&recipient_key);
|
||||
assert_eq!(fetched.len(), 1);
|
||||
|
||||
// Should be empty now
|
||||
let remaining = store.peek(&recipient_key);
|
||||
assert!(remaining.is_empty());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn message_deduplication() {
|
||||
let mut store = MeshStore::new(100);
|
||||
|
||||
let sender = MeshIdentity::generate();
|
||||
let recipient = MeshIdentity::generate();
|
||||
|
||||
let envelope = MeshEnvelope::new(
|
||||
&sender,
|
||||
&recipient.public_key(),
|
||||
b"test payload".to_vec(),
|
||||
3600,
|
||||
5,
|
||||
);
|
||||
|
||||
// First store should succeed
|
||||
assert!(store.store(envelope.clone()));
|
||||
|
||||
// Same envelope (same ID) should be rejected
|
||||
assert!(!store.store(envelope.clone()));
|
||||
|
||||
// Only one message should be stored
|
||||
let messages = store.peek(&recipient.public_key());
|
||||
assert_eq!(messages.len(), 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn envelope_v2_signature_verification() {
|
||||
let identity = MeshIdentity::generate();
|
||||
let recipient = MeshAddress::from_bytes([0xEE; 16]);
|
||||
|
||||
let envelope = MeshEnvelopeV2::new(
|
||||
&identity,
|
||||
recipient,
|
||||
b"test payload".to_vec(),
|
||||
3600,
|
||||
5,
|
||||
Priority::Normal,
|
||||
);
|
||||
|
||||
// Verify with correct key
|
||||
let pk = identity.public_key();
|
||||
assert!(envelope.verify_with_key(&pk));
|
||||
|
||||
// Verify with wrong key should fail
|
||||
let other_identity = MeshIdentity::generate();
|
||||
let other_pk = other_identity.public_key();
|
||||
assert!(!envelope.verify_with_key(&other_pk));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn envelope_v2_forwarding() {
|
||||
let sender = MeshIdentity::generate();
|
||||
let recipient = MeshAddress::from_bytes([0xAA; 16]);
|
||||
|
||||
let envelope = MeshEnvelopeV2::new(
|
||||
&sender,
|
||||
recipient,
|
||||
b"forward me".to_vec(),
|
||||
3600,
|
||||
3, // max 3 hops
|
||||
Priority::Normal,
|
||||
);
|
||||
|
||||
assert_eq!(envelope.hop_count, 0);
|
||||
assert!(envelope.can_forward());
|
||||
|
||||
// Forward once
|
||||
let fwd1 = envelope.forwarded();
|
||||
assert_eq!(fwd1.hop_count, 1);
|
||||
assert!(fwd1.can_forward());
|
||||
|
||||
// Forward twice
|
||||
let fwd2 = fwd1.forwarded();
|
||||
assert_eq!(fwd2.hop_count, 2);
|
||||
assert!(fwd2.can_forward());
|
||||
|
||||
// Forward thrice - should hit max
|
||||
let fwd3 = fwd2.forwarded();
|
||||
assert_eq!(fwd3.hop_count, 3);
|
||||
assert!(!fwd3.can_forward()); // max_hops reached
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn envelope_v2_broadcast() {
|
||||
let sender = MeshIdentity::generate();
|
||||
|
||||
let envelope = MeshEnvelopeV2::broadcast(
|
||||
&sender,
|
||||
b"broadcast message".to_vec(),
|
||||
3600,
|
||||
5,
|
||||
Priority::High,
|
||||
);
|
||||
|
||||
assert!(envelope.is_broadcast());
|
||||
assert_eq!(envelope.recipient_addr, MeshAddress::BROADCAST);
|
||||
assert_eq!(envelope.priority(), Priority::High);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn metrics_tracking() {
|
||||
let metrics = MeshMetrics::new();
|
||||
|
||||
// Transport metrics
|
||||
let tcp_metrics = metrics.transport("tcp");
|
||||
tcp_metrics.sent.inc_by(10);
|
||||
tcp_metrics.bytes_sent.inc_by(1024);
|
||||
|
||||
assert_eq!(metrics.transport("tcp").sent.get(), 10);
|
||||
assert_eq!(metrics.transport("tcp").bytes_sent.get(), 1024);
|
||||
|
||||
// Routing metrics
|
||||
metrics.routing.lookups.inc_by(100);
|
||||
metrics.routing.lookup_misses.inc_by(5);
|
||||
|
||||
// Snapshot
|
||||
let snapshot = metrics.snapshot();
|
||||
assert!(snapshot.uptime_secs < 2); // Just started
|
||||
assert_eq!(snapshot.routing.lookups, 100);
|
||||
assert_eq!(snapshot.routing.lookup_misses, 5);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn config_validation() {
|
||||
// Valid config
|
||||
let config = MeshConfig::default();
|
||||
assert!(config.validate().is_ok());
|
||||
|
||||
// Invalid announce interval
|
||||
let mut bad_config = MeshConfig::default();
|
||||
bad_config.announce.interval = Duration::from_secs(1); // Too short
|
||||
assert!(bad_config.validate().is_err());
|
||||
|
||||
// Invalid duty cycle
|
||||
let mut bad_config = MeshConfig::default();
|
||||
bad_config.rate_limit.lora_duty_cycle = 2.0; // > 1.0
|
||||
assert!(bad_config.validate().is_err());
|
||||
|
||||
// Constrained config should be valid
|
||||
let constrained = MeshConfig::constrained();
|
||||
assert!(constrained.validate().is_ok());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn shutdown_coordination() {
|
||||
let coordinator = Arc::new(ShutdownCoordinator::with_timeouts(
|
||||
Duration::from_millis(100),
|
||||
Duration::from_millis(50),
|
||||
));
|
||||
|
||||
let coord_clone = Arc::clone(&coordinator);
|
||||
|
||||
// Spawn a task that registers itself
|
||||
let handle = tokio::spawn(async move {
|
||||
let _guard = coord_clone.register_task();
|
||||
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||
// guard dropped here, task complete
|
||||
});
|
||||
|
||||
// Start shutdown
|
||||
coordinator.shutdown().await;
|
||||
|
||||
// Task should have completed
|
||||
handle.await.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn shutdown_signal_propagation() {
|
||||
let (trigger, mut signal) = ShutdownSignal::new();
|
||||
|
||||
assert!(!signal.is_triggered());
|
||||
|
||||
let handle = tokio::spawn(async move {
|
||||
signal.wait().await;
|
||||
true
|
||||
});
|
||||
|
||||
// Small delay to ensure task is waiting
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
|
||||
trigger.trigger();
|
||||
let result = handle.await.unwrap();
|
||||
assert!(result);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn concurrent_store_access() {
|
||||
let store = Arc::new(std::sync::RwLock::new(MeshStore::new(1000)));
|
||||
|
||||
let recipient = MeshIdentity::generate();
|
||||
let recipient_key = recipient.public_key();
|
||||
|
||||
// Spawn multiple writers
|
||||
let mut handles = Vec::new();
|
||||
for i in 0..10 {
|
||||
let store_clone = Arc::clone(&store);
|
||||
let rk = recipient_key.clone();
|
||||
let handle = tokio::spawn(async move {
|
||||
for j in 0..10 {
|
||||
let sender = MeshIdentity::generate();
|
||||
let envelope = MeshEnvelope::new(
|
||||
&sender,
|
||||
&rk,
|
||||
format!("msg-{}-{}", i, j).into_bytes(),
|
||||
3600,
|
||||
5,
|
||||
);
|
||||
let mut s = store_clone.write().unwrap();
|
||||
s.store(envelope);
|
||||
}
|
||||
});
|
||||
handles.push(handle);
|
||||
}
|
||||
|
||||
// Wait for all writers
|
||||
for handle in handles {
|
||||
handle.await.unwrap();
|
||||
}
|
||||
|
||||
// Should have 100 messages
|
||||
let s = store.read().unwrap();
|
||||
let messages = s.peek(&recipient_key);
|
||||
assert_eq!(messages.len(), 100);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn store_gc_removes_expired() {
|
||||
let mut store = MeshStore::new(100);
|
||||
|
||||
let sender = MeshIdentity::generate();
|
||||
let recipient = MeshIdentity::generate();
|
||||
|
||||
// Store with very short TTL
|
||||
let envelope = MeshEnvelope::new(
|
||||
&sender,
|
||||
&recipient.public_key(),
|
||||
b"short-lived".to_vec(),
|
||||
1, // 1 second TTL
|
||||
5,
|
||||
);
|
||||
store.store(envelope);
|
||||
|
||||
// Verify it's stored
|
||||
let before = store.peek(&recipient.public_key());
|
||||
assert_eq!(before.len(), 1);
|
||||
|
||||
// Wait for expiry
|
||||
tokio::time::sleep(Duration::from_secs(2)).await;
|
||||
|
||||
// Run GC
|
||||
let removed = store.gc_expired();
|
||||
assert_eq!(removed, 1);
|
||||
|
||||
// Should be empty now
|
||||
let messages = store.peek(&recipient.public_key());
|
||||
assert!(messages.is_empty());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn mesh_address_derivation() {
|
||||
let identity = MeshIdentity::generate();
|
||||
let pk = identity.public_key();
|
||||
|
||||
let addr1 = MeshAddress::from_public_key(&pk);
|
||||
let addr2 = MeshAddress::from_public_key(&pk);
|
||||
|
||||
// Same key -> same address
|
||||
assert_eq!(addr1, addr2);
|
||||
|
||||
// Address matches its key
|
||||
assert!(addr1.matches_key(&pk));
|
||||
|
||||
// Different key -> different address
|
||||
let other = MeshIdentity::generate();
|
||||
assert!(!addr1.matches_key(&other.public_key()));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn envelope_v2_wire_roundtrip() {
|
||||
let sender = MeshIdentity::generate();
|
||||
let recipient = MeshAddress::from_bytes([0xBB; 16]);
|
||||
|
||||
let envelope = MeshEnvelopeV2::new(
|
||||
&sender,
|
||||
recipient,
|
||||
b"roundtrip test".to_vec(),
|
||||
3600,
|
||||
5,
|
||||
Priority::High,
|
||||
);
|
||||
|
||||
// Serialize
|
||||
let wire = envelope.to_wire();
|
||||
|
||||
// Deserialize
|
||||
let restored = MeshEnvelopeV2::from_wire(&wire).expect("deserialize failed");
|
||||
|
||||
assert_eq!(restored.payload, b"roundtrip test");
|
||||
assert_eq!(restored.recipient_addr, recipient);
|
||||
assert_eq!(restored.priority(), Priority::High);
|
||||
assert!(restored.verify_with_key(&sender.public_key()));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn rate_limiter_per_peer_isolation() {
|
||||
let config = RateLimitConfig {
|
||||
message_per_peer_per_min: 2,
|
||||
..Default::default()
|
||||
};
|
||||
let limiter = RateLimiter::new(config);
|
||||
|
||||
let peer1 = MeshAddress::from_bytes([1; 16]);
|
||||
let peer2 = MeshAddress::from_bytes([2; 16]);
|
||||
|
||||
// Use up peer1's allowance
|
||||
assert!(limiter.check_message(&peer1).unwrap().is_allowed());
|
||||
assert!(limiter.check_message(&peer1).unwrap().is_allowed());
|
||||
assert!(!limiter.check_message(&peer1).unwrap().is_allowed());
|
||||
|
||||
// peer2 should still have its allowance
|
||||
assert!(limiter.check_message(&peer2).unwrap().is_allowed());
|
||||
assert!(limiter.check_message(&peer2).unwrap().is_allowed());
|
||||
assert!(!limiter.check_message(&peer2).unwrap().is_allowed());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn config_toml_roundtrip() {
|
||||
let config = MeshConfig::default();
|
||||
let toml = config.to_toml().expect("serialize");
|
||||
|
||||
// Should contain key config values
|
||||
assert!(toml.contains("announce"));
|
||||
assert!(toml.contains("routing"));
|
||||
assert!(toml.contains("rate_limit"));
|
||||
|
||||
// Should parse back
|
||||
let restored = MeshConfig::from_toml(&toml).expect("parse");
|
||||
assert_eq!(config.announce.max_hops, restored.announce.max_hops);
|
||||
}
|
||||
Reference in New Issue
Block a user