16 integration tests covering: - Rate limiting per-peer isolation - Store-and-forward for offline peers - Message deduplication - Envelope V2 signatures, forwarding, broadcast - Metrics tracking and snapshots - Config validation and TOML roundtrip - Shutdown coordination with task tracking - Concurrent store access safety - GC of expired messages Total tests: 205 (189 lib + 16 integration)
415 lines
11 KiB
Rust
415 lines
11 KiB
Rust
//! Multi-node integration tests for mesh networking.
|
|
//!
|
|
//! These tests verify the behavior of multiple mesh nodes communicating
|
|
//! via TCP transport. They cover routing, store-and-forward, and failure
|
|
//! scenarios.
|
|
|
|
use std::sync::Arc;
|
|
use std::time::Duration;
|
|
|
|
use quicprochat_p2p::address::MeshAddress;
|
|
use quicprochat_p2p::config::{MeshConfig, RateLimitConfig};
|
|
use quicprochat_p2p::envelope::MeshEnvelope;
|
|
use quicprochat_p2p::envelope_v2::{MeshEnvelopeV2, Priority};
|
|
use quicprochat_p2p::identity::MeshIdentity;
|
|
use quicprochat_p2p::metrics::MeshMetrics;
|
|
use quicprochat_p2p::rate_limit::RateLimiter;
|
|
use quicprochat_p2p::store::MeshStore;
|
|
use quicprochat_p2p::shutdown::{ShutdownCoordinator, ShutdownSignal};
|
|
|
|
#[tokio::test]
|
|
async fn rate_limiting_blocks_excessive_traffic() {
|
|
let config = RateLimitConfig {
|
|
message_per_peer_per_min: 5,
|
|
..Default::default()
|
|
};
|
|
let limiter = RateLimiter::new(config);
|
|
|
|
let peer = MeshAddress::from_bytes([0xAB; 16]);
|
|
|
|
// First 5 should be allowed
|
|
for _ in 0..5 {
|
|
let result = limiter.check_message(&peer).unwrap();
|
|
assert!(result.is_allowed());
|
|
}
|
|
|
|
// 6th should be denied
|
|
let result = limiter.check_message(&peer).unwrap();
|
|
assert!(!result.is_allowed());
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn store_and_forward_for_offline_peer() {
|
|
let mut store = MeshStore::new(100);
|
|
|
|
let identity = MeshIdentity::generate();
|
|
let recipient_key = identity.public_key();
|
|
|
|
// Create an envelope for the recipient
|
|
let sender = MeshIdentity::generate();
|
|
let envelope = MeshEnvelope::new(
|
|
&sender,
|
|
&recipient_key,
|
|
b"message for offline peer".to_vec(),
|
|
3600,
|
|
5,
|
|
);
|
|
|
|
// Store message
|
|
assert!(store.store(envelope.clone()));
|
|
|
|
// Verify it's in the store
|
|
let messages = store.peek(&recipient_key);
|
|
assert_eq!(messages.len(), 1);
|
|
assert_eq!(messages[0].payload, b"message for offline peer");
|
|
|
|
// Fetch (consume) messages
|
|
let fetched = store.fetch(&recipient_key);
|
|
assert_eq!(fetched.len(), 1);
|
|
|
|
// Should be empty now
|
|
let remaining = store.peek(&recipient_key);
|
|
assert!(remaining.is_empty());
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn message_deduplication() {
|
|
let mut store = MeshStore::new(100);
|
|
|
|
let sender = MeshIdentity::generate();
|
|
let recipient = MeshIdentity::generate();
|
|
|
|
let envelope = MeshEnvelope::new(
|
|
&sender,
|
|
&recipient.public_key(),
|
|
b"test payload".to_vec(),
|
|
3600,
|
|
5,
|
|
);
|
|
|
|
// First store should succeed
|
|
assert!(store.store(envelope.clone()));
|
|
|
|
// Same envelope (same ID) should be rejected
|
|
assert!(!store.store(envelope.clone()));
|
|
|
|
// Only one message should be stored
|
|
let messages = store.peek(&recipient.public_key());
|
|
assert_eq!(messages.len(), 1);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn envelope_v2_signature_verification() {
|
|
let identity = MeshIdentity::generate();
|
|
let recipient = MeshAddress::from_bytes([0xEE; 16]);
|
|
|
|
let envelope = MeshEnvelopeV2::new(
|
|
&identity,
|
|
recipient,
|
|
b"test payload".to_vec(),
|
|
3600,
|
|
5,
|
|
Priority::Normal,
|
|
);
|
|
|
|
// Verify with correct key
|
|
let pk = identity.public_key();
|
|
assert!(envelope.verify_with_key(&pk));
|
|
|
|
// Verify with wrong key should fail
|
|
let other_identity = MeshIdentity::generate();
|
|
let other_pk = other_identity.public_key();
|
|
assert!(!envelope.verify_with_key(&other_pk));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn envelope_v2_forwarding() {
|
|
let sender = MeshIdentity::generate();
|
|
let recipient = MeshAddress::from_bytes([0xAA; 16]);
|
|
|
|
let envelope = MeshEnvelopeV2::new(
|
|
&sender,
|
|
recipient,
|
|
b"forward me".to_vec(),
|
|
3600,
|
|
3, // max 3 hops
|
|
Priority::Normal,
|
|
);
|
|
|
|
assert_eq!(envelope.hop_count, 0);
|
|
assert!(envelope.can_forward());
|
|
|
|
// Forward once
|
|
let fwd1 = envelope.forwarded();
|
|
assert_eq!(fwd1.hop_count, 1);
|
|
assert!(fwd1.can_forward());
|
|
|
|
// Forward twice
|
|
let fwd2 = fwd1.forwarded();
|
|
assert_eq!(fwd2.hop_count, 2);
|
|
assert!(fwd2.can_forward());
|
|
|
|
// Forward thrice - should hit max
|
|
let fwd3 = fwd2.forwarded();
|
|
assert_eq!(fwd3.hop_count, 3);
|
|
assert!(!fwd3.can_forward()); // max_hops reached
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn envelope_v2_broadcast() {
|
|
let sender = MeshIdentity::generate();
|
|
|
|
let envelope = MeshEnvelopeV2::broadcast(
|
|
&sender,
|
|
b"broadcast message".to_vec(),
|
|
3600,
|
|
5,
|
|
Priority::High,
|
|
);
|
|
|
|
assert!(envelope.is_broadcast());
|
|
assert_eq!(envelope.recipient_addr, MeshAddress::BROADCAST);
|
|
assert_eq!(envelope.priority(), Priority::High);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn metrics_tracking() {
|
|
let metrics = MeshMetrics::new();
|
|
|
|
// Transport metrics
|
|
let tcp_metrics = metrics.transport("tcp");
|
|
tcp_metrics.sent.inc_by(10);
|
|
tcp_metrics.bytes_sent.inc_by(1024);
|
|
|
|
assert_eq!(metrics.transport("tcp").sent.get(), 10);
|
|
assert_eq!(metrics.transport("tcp").bytes_sent.get(), 1024);
|
|
|
|
// Routing metrics
|
|
metrics.routing.lookups.inc_by(100);
|
|
metrics.routing.lookup_misses.inc_by(5);
|
|
|
|
// Snapshot
|
|
let snapshot = metrics.snapshot();
|
|
assert!(snapshot.uptime_secs < 2); // Just started
|
|
assert_eq!(snapshot.routing.lookups, 100);
|
|
assert_eq!(snapshot.routing.lookup_misses, 5);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn config_validation() {
|
|
// Valid config
|
|
let config = MeshConfig::default();
|
|
assert!(config.validate().is_ok());
|
|
|
|
// Invalid announce interval
|
|
let mut bad_config = MeshConfig::default();
|
|
bad_config.announce.interval = Duration::from_secs(1); // Too short
|
|
assert!(bad_config.validate().is_err());
|
|
|
|
// Invalid duty cycle
|
|
let mut bad_config = MeshConfig::default();
|
|
bad_config.rate_limit.lora_duty_cycle = 2.0; // > 1.0
|
|
assert!(bad_config.validate().is_err());
|
|
|
|
// Constrained config should be valid
|
|
let constrained = MeshConfig::constrained();
|
|
assert!(constrained.validate().is_ok());
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn shutdown_coordination() {
|
|
let coordinator = Arc::new(ShutdownCoordinator::with_timeouts(
|
|
Duration::from_millis(100),
|
|
Duration::from_millis(50),
|
|
));
|
|
|
|
let coord_clone = Arc::clone(&coordinator);
|
|
|
|
// Spawn a task that registers itself
|
|
let handle = tokio::spawn(async move {
|
|
let _guard = coord_clone.register_task();
|
|
tokio::time::sleep(Duration::from_millis(50)).await;
|
|
// guard dropped here, task complete
|
|
});
|
|
|
|
// Start shutdown
|
|
coordinator.shutdown().await;
|
|
|
|
// Task should have completed
|
|
handle.await.unwrap();
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn shutdown_signal_propagation() {
|
|
let (trigger, mut signal) = ShutdownSignal::new();
|
|
|
|
assert!(!signal.is_triggered());
|
|
|
|
let handle = tokio::spawn(async move {
|
|
signal.wait().await;
|
|
true
|
|
});
|
|
|
|
// Small delay to ensure task is waiting
|
|
tokio::time::sleep(Duration::from_millis(10)).await;
|
|
|
|
trigger.trigger();
|
|
let result = handle.await.unwrap();
|
|
assert!(result);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn concurrent_store_access() {
|
|
let store = Arc::new(std::sync::RwLock::new(MeshStore::new(1000)));
|
|
|
|
let recipient = MeshIdentity::generate();
|
|
let recipient_key = recipient.public_key();
|
|
|
|
// Spawn multiple writers
|
|
let mut handles = Vec::new();
|
|
for i in 0..10 {
|
|
let store_clone = Arc::clone(&store);
|
|
let rk = recipient_key.clone();
|
|
let handle = tokio::spawn(async move {
|
|
for j in 0..10 {
|
|
let sender = MeshIdentity::generate();
|
|
let envelope = MeshEnvelope::new(
|
|
&sender,
|
|
&rk,
|
|
format!("msg-{}-{}", i, j).into_bytes(),
|
|
3600,
|
|
5,
|
|
);
|
|
let mut s = store_clone.write().unwrap();
|
|
s.store(envelope);
|
|
}
|
|
});
|
|
handles.push(handle);
|
|
}
|
|
|
|
// Wait for all writers
|
|
for handle in handles {
|
|
handle.await.unwrap();
|
|
}
|
|
|
|
// Should have 100 messages
|
|
let s = store.read().unwrap();
|
|
let messages = s.peek(&recipient_key);
|
|
assert_eq!(messages.len(), 100);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn store_gc_removes_expired() {
|
|
let mut store = MeshStore::new(100);
|
|
|
|
let sender = MeshIdentity::generate();
|
|
let recipient = MeshIdentity::generate();
|
|
|
|
// Store with very short TTL
|
|
let envelope = MeshEnvelope::new(
|
|
&sender,
|
|
&recipient.public_key(),
|
|
b"short-lived".to_vec(),
|
|
1, // 1 second TTL
|
|
5,
|
|
);
|
|
store.store(envelope);
|
|
|
|
// Verify it's stored
|
|
let before = store.peek(&recipient.public_key());
|
|
assert_eq!(before.len(), 1);
|
|
|
|
// Wait for expiry
|
|
tokio::time::sleep(Duration::from_secs(2)).await;
|
|
|
|
// Run GC
|
|
let removed = store.gc_expired();
|
|
assert_eq!(removed, 1);
|
|
|
|
// Should be empty now
|
|
let messages = store.peek(&recipient.public_key());
|
|
assert!(messages.is_empty());
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn mesh_address_derivation() {
|
|
let identity = MeshIdentity::generate();
|
|
let pk = identity.public_key();
|
|
|
|
let addr1 = MeshAddress::from_public_key(&pk);
|
|
let addr2 = MeshAddress::from_public_key(&pk);
|
|
|
|
// Same key -> same address
|
|
assert_eq!(addr1, addr2);
|
|
|
|
// Address matches its key
|
|
assert!(addr1.matches_key(&pk));
|
|
|
|
// Different key -> different address
|
|
let other = MeshIdentity::generate();
|
|
assert!(!addr1.matches_key(&other.public_key()));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn envelope_v2_wire_roundtrip() {
|
|
let sender = MeshIdentity::generate();
|
|
let recipient = MeshAddress::from_bytes([0xBB; 16]);
|
|
|
|
let envelope = MeshEnvelopeV2::new(
|
|
&sender,
|
|
recipient,
|
|
b"roundtrip test".to_vec(),
|
|
3600,
|
|
5,
|
|
Priority::High,
|
|
);
|
|
|
|
// Serialize
|
|
let wire = envelope.to_wire();
|
|
|
|
// Deserialize
|
|
let restored = MeshEnvelopeV2::from_wire(&wire).expect("deserialize failed");
|
|
|
|
assert_eq!(restored.payload, b"roundtrip test");
|
|
assert_eq!(restored.recipient_addr, recipient);
|
|
assert_eq!(restored.priority(), Priority::High);
|
|
assert!(restored.verify_with_key(&sender.public_key()));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn rate_limiter_per_peer_isolation() {
|
|
let config = RateLimitConfig {
|
|
message_per_peer_per_min: 2,
|
|
..Default::default()
|
|
};
|
|
let limiter = RateLimiter::new(config);
|
|
|
|
let peer1 = MeshAddress::from_bytes([1; 16]);
|
|
let peer2 = MeshAddress::from_bytes([2; 16]);
|
|
|
|
// Use up peer1's allowance
|
|
assert!(limiter.check_message(&peer1).unwrap().is_allowed());
|
|
assert!(limiter.check_message(&peer1).unwrap().is_allowed());
|
|
assert!(!limiter.check_message(&peer1).unwrap().is_allowed());
|
|
|
|
// peer2 should still have its allowance
|
|
assert!(limiter.check_message(&peer2).unwrap().is_allowed());
|
|
assert!(limiter.check_message(&peer2).unwrap().is_allowed());
|
|
assert!(!limiter.check_message(&peer2).unwrap().is_allowed());
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn config_toml_roundtrip() {
|
|
let config = MeshConfig::default();
|
|
let toml = config.to_toml().expect("serialize");
|
|
|
|
// Should contain key config values
|
|
assert!(toml.contains("announce"));
|
|
assert!(toml.contains("routing"));
|
|
assert!(toml.contains("rate_limit"));
|
|
|
|
// Should parse back
|
|
let restored = MeshConfig::from_toml(&toml).expect("parse");
|
|
assert_eq!(config.announce.max_hops, restored.announce.max_hops);
|
|
}
|