Files
Christian Nennemann 50a63a6b96 feat(p2p): add integration tests for production scenarios
16 integration tests covering:
- Rate limiting per-peer isolation
- Store-and-forward for offline peers
- Message deduplication
- Envelope V2 signatures, forwarding, broadcast
- Metrics tracking and snapshots
- Config validation and TOML roundtrip
- Shutdown coordination with task tracking
- Concurrent store access safety
- GC of expired messages

Total tests: 205 (189 lib + 16 integration)
2026-04-01 09:21:32 +02:00

415 lines
11 KiB
Rust

//! Multi-node integration tests for mesh networking.
//!
//! These tests verify the behavior of multiple mesh nodes communicating
//! via TCP transport. They cover routing, store-and-forward, and failure
//! scenarios.
use std::sync::Arc;
use std::time::Duration;
use quicprochat_p2p::address::MeshAddress;
use quicprochat_p2p::config::{MeshConfig, RateLimitConfig};
use quicprochat_p2p::envelope::MeshEnvelope;
use quicprochat_p2p::envelope_v2::{MeshEnvelopeV2, Priority};
use quicprochat_p2p::identity::MeshIdentity;
use quicprochat_p2p::metrics::MeshMetrics;
use quicprochat_p2p::rate_limit::RateLimiter;
use quicprochat_p2p::store::MeshStore;
use quicprochat_p2p::shutdown::{ShutdownCoordinator, ShutdownSignal};
#[tokio::test]
async fn rate_limiting_blocks_excessive_traffic() {
let config = RateLimitConfig {
message_per_peer_per_min: 5,
..Default::default()
};
let limiter = RateLimiter::new(config);
let peer = MeshAddress::from_bytes([0xAB; 16]);
// First 5 should be allowed
for _ in 0..5 {
let result = limiter.check_message(&peer).unwrap();
assert!(result.is_allowed());
}
// 6th should be denied
let result = limiter.check_message(&peer).unwrap();
assert!(!result.is_allowed());
}
#[tokio::test]
async fn store_and_forward_for_offline_peer() {
let mut store = MeshStore::new(100);
let identity = MeshIdentity::generate();
let recipient_key = identity.public_key();
// Create an envelope for the recipient
let sender = MeshIdentity::generate();
let envelope = MeshEnvelope::new(
&sender,
&recipient_key,
b"message for offline peer".to_vec(),
3600,
5,
);
// Store message
assert!(store.store(envelope.clone()));
// Verify it's in the store
let messages = store.peek(&recipient_key);
assert_eq!(messages.len(), 1);
assert_eq!(messages[0].payload, b"message for offline peer");
// Fetch (consume) messages
let fetched = store.fetch(&recipient_key);
assert_eq!(fetched.len(), 1);
// Should be empty now
let remaining = store.peek(&recipient_key);
assert!(remaining.is_empty());
}
#[tokio::test]
async fn message_deduplication() {
let mut store = MeshStore::new(100);
let sender = MeshIdentity::generate();
let recipient = MeshIdentity::generate();
let envelope = MeshEnvelope::new(
&sender,
&recipient.public_key(),
b"test payload".to_vec(),
3600,
5,
);
// First store should succeed
assert!(store.store(envelope.clone()));
// Same envelope (same ID) should be rejected
assert!(!store.store(envelope.clone()));
// Only one message should be stored
let messages = store.peek(&recipient.public_key());
assert_eq!(messages.len(), 1);
}
#[tokio::test]
async fn envelope_v2_signature_verification() {
let identity = MeshIdentity::generate();
let recipient = MeshAddress::from_bytes([0xEE; 16]);
let envelope = MeshEnvelopeV2::new(
&identity,
recipient,
b"test payload".to_vec(),
3600,
5,
Priority::Normal,
);
// Verify with correct key
let pk = identity.public_key();
assert!(envelope.verify_with_key(&pk));
// Verify with wrong key should fail
let other_identity = MeshIdentity::generate();
let other_pk = other_identity.public_key();
assert!(!envelope.verify_with_key(&other_pk));
}
#[tokio::test]
async fn envelope_v2_forwarding() {
let sender = MeshIdentity::generate();
let recipient = MeshAddress::from_bytes([0xAA; 16]);
let envelope = MeshEnvelopeV2::new(
&sender,
recipient,
b"forward me".to_vec(),
3600,
3, // max 3 hops
Priority::Normal,
);
assert_eq!(envelope.hop_count, 0);
assert!(envelope.can_forward());
// Forward once
let fwd1 = envelope.forwarded();
assert_eq!(fwd1.hop_count, 1);
assert!(fwd1.can_forward());
// Forward twice
let fwd2 = fwd1.forwarded();
assert_eq!(fwd2.hop_count, 2);
assert!(fwd2.can_forward());
// Forward thrice - should hit max
let fwd3 = fwd2.forwarded();
assert_eq!(fwd3.hop_count, 3);
assert!(!fwd3.can_forward()); // max_hops reached
}
#[tokio::test]
async fn envelope_v2_broadcast() {
let sender = MeshIdentity::generate();
let envelope = MeshEnvelopeV2::broadcast(
&sender,
b"broadcast message".to_vec(),
3600,
5,
Priority::High,
);
assert!(envelope.is_broadcast());
assert_eq!(envelope.recipient_addr, MeshAddress::BROADCAST);
assert_eq!(envelope.priority(), Priority::High);
}
#[tokio::test]
async fn metrics_tracking() {
let metrics = MeshMetrics::new();
// Transport metrics
let tcp_metrics = metrics.transport("tcp");
tcp_metrics.sent.inc_by(10);
tcp_metrics.bytes_sent.inc_by(1024);
assert_eq!(metrics.transport("tcp").sent.get(), 10);
assert_eq!(metrics.transport("tcp").bytes_sent.get(), 1024);
// Routing metrics
metrics.routing.lookups.inc_by(100);
metrics.routing.lookup_misses.inc_by(5);
// Snapshot
let snapshot = metrics.snapshot();
assert!(snapshot.uptime_secs < 2); // Just started
assert_eq!(snapshot.routing.lookups, 100);
assert_eq!(snapshot.routing.lookup_misses, 5);
}
#[tokio::test]
async fn config_validation() {
// Valid config
let config = MeshConfig::default();
assert!(config.validate().is_ok());
// Invalid announce interval
let mut bad_config = MeshConfig::default();
bad_config.announce.interval = Duration::from_secs(1); // Too short
assert!(bad_config.validate().is_err());
// Invalid duty cycle
let mut bad_config = MeshConfig::default();
bad_config.rate_limit.lora_duty_cycle = 2.0; // > 1.0
assert!(bad_config.validate().is_err());
// Constrained config should be valid
let constrained = MeshConfig::constrained();
assert!(constrained.validate().is_ok());
}
#[tokio::test]
async fn shutdown_coordination() {
let coordinator = Arc::new(ShutdownCoordinator::with_timeouts(
Duration::from_millis(100),
Duration::from_millis(50),
));
let coord_clone = Arc::clone(&coordinator);
// Spawn a task that registers itself
let handle = tokio::spawn(async move {
let _guard = coord_clone.register_task();
tokio::time::sleep(Duration::from_millis(50)).await;
// guard dropped here, task complete
});
// Start shutdown
coordinator.shutdown().await;
// Task should have completed
handle.await.unwrap();
}
#[tokio::test]
async fn shutdown_signal_propagation() {
let (trigger, mut signal) = ShutdownSignal::new();
assert!(!signal.is_triggered());
let handle = tokio::spawn(async move {
signal.wait().await;
true
});
// Small delay to ensure task is waiting
tokio::time::sleep(Duration::from_millis(10)).await;
trigger.trigger();
let result = handle.await.unwrap();
assert!(result);
}
#[tokio::test]
async fn concurrent_store_access() {
let store = Arc::new(std::sync::RwLock::new(MeshStore::new(1000)));
let recipient = MeshIdentity::generate();
let recipient_key = recipient.public_key();
// Spawn multiple writers
let mut handles = Vec::new();
for i in 0..10 {
let store_clone = Arc::clone(&store);
let rk = recipient_key.clone();
let handle = tokio::spawn(async move {
for j in 0..10 {
let sender = MeshIdentity::generate();
let envelope = MeshEnvelope::new(
&sender,
&rk,
format!("msg-{}-{}", i, j).into_bytes(),
3600,
5,
);
let mut s = store_clone.write().unwrap();
s.store(envelope);
}
});
handles.push(handle);
}
// Wait for all writers
for handle in handles {
handle.await.unwrap();
}
// Should have 100 messages
let s = store.read().unwrap();
let messages = s.peek(&recipient_key);
assert_eq!(messages.len(), 100);
}
#[tokio::test]
async fn store_gc_removes_expired() {
let mut store = MeshStore::new(100);
let sender = MeshIdentity::generate();
let recipient = MeshIdentity::generate();
// Store with very short TTL
let envelope = MeshEnvelope::new(
&sender,
&recipient.public_key(),
b"short-lived".to_vec(),
1, // 1 second TTL
5,
);
store.store(envelope);
// Verify it's stored
let before = store.peek(&recipient.public_key());
assert_eq!(before.len(), 1);
// Wait for expiry
tokio::time::sleep(Duration::from_secs(2)).await;
// Run GC
let removed = store.gc_expired();
assert_eq!(removed, 1);
// Should be empty now
let messages = store.peek(&recipient.public_key());
assert!(messages.is_empty());
}
#[tokio::test]
async fn mesh_address_derivation() {
let identity = MeshIdentity::generate();
let pk = identity.public_key();
let addr1 = MeshAddress::from_public_key(&pk);
let addr2 = MeshAddress::from_public_key(&pk);
// Same key -> same address
assert_eq!(addr1, addr2);
// Address matches its key
assert!(addr1.matches_key(&pk));
// Different key -> different address
let other = MeshIdentity::generate();
assert!(!addr1.matches_key(&other.public_key()));
}
#[tokio::test]
async fn envelope_v2_wire_roundtrip() {
let sender = MeshIdentity::generate();
let recipient = MeshAddress::from_bytes([0xBB; 16]);
let envelope = MeshEnvelopeV2::new(
&sender,
recipient,
b"roundtrip test".to_vec(),
3600,
5,
Priority::High,
);
// Serialize
let wire = envelope.to_wire();
// Deserialize
let restored = MeshEnvelopeV2::from_wire(&wire).expect("deserialize failed");
assert_eq!(restored.payload, b"roundtrip test");
assert_eq!(restored.recipient_addr, recipient);
assert_eq!(restored.priority(), Priority::High);
assert!(restored.verify_with_key(&sender.public_key()));
}
#[tokio::test]
async fn rate_limiter_per_peer_isolation() {
let config = RateLimitConfig {
message_per_peer_per_min: 2,
..Default::default()
};
let limiter = RateLimiter::new(config);
let peer1 = MeshAddress::from_bytes([1; 16]);
let peer2 = MeshAddress::from_bytes([2; 16]);
// Use up peer1's allowance
assert!(limiter.check_message(&peer1).unwrap().is_allowed());
assert!(limiter.check_message(&peer1).unwrap().is_allowed());
assert!(!limiter.check_message(&peer1).unwrap().is_allowed());
// peer2 should still have its allowance
assert!(limiter.check_message(&peer2).unwrap().is_allowed());
assert!(limiter.check_message(&peer2).unwrap().is_allowed());
assert!(!limiter.check_message(&peer2).unwrap().is_allowed());
}
#[tokio::test]
async fn config_toml_roundtrip() {
let config = MeshConfig::default();
let toml = config.to_toml().expect("serialize");
// Should contain key config values
assert!(toml.contains("announce"));
assert!(toml.contains("routing"));
assert!(toml.contains("rate_limit"));
// Should parse back
let restored = MeshConfig::from_toml(&toml).expect("parse");
assert_eq!(config.announce.max_hops, restored.announce.max_hops);
}