//! Multi-node integration tests for mesh networking. //! //! These tests verify the behavior of multiple mesh nodes communicating //! via TCP transport. They cover routing, store-and-forward, and failure //! scenarios. use std::sync::Arc; use std::time::Duration; use quicprochat_p2p::address::MeshAddress; use quicprochat_p2p::config::{MeshConfig, RateLimitConfig}; use quicprochat_p2p::envelope::MeshEnvelope; use quicprochat_p2p::envelope_v2::{MeshEnvelopeV2, Priority}; use quicprochat_p2p::identity::MeshIdentity; use quicprochat_p2p::metrics::MeshMetrics; use quicprochat_p2p::rate_limit::RateLimiter; use quicprochat_p2p::store::MeshStore; use quicprochat_p2p::shutdown::{ShutdownCoordinator, ShutdownSignal}; #[tokio::test] async fn rate_limiting_blocks_excessive_traffic() { let config = RateLimitConfig { message_per_peer_per_min: 5, ..Default::default() }; let limiter = RateLimiter::new(config); let peer = MeshAddress::from_bytes([0xAB; 16]); // First 5 should be allowed for _ in 0..5 { let result = limiter.check_message(&peer).unwrap(); assert!(result.is_allowed()); } // 6th should be denied let result = limiter.check_message(&peer).unwrap(); assert!(!result.is_allowed()); } #[tokio::test] async fn store_and_forward_for_offline_peer() { let mut store = MeshStore::new(100); let identity = MeshIdentity::generate(); let recipient_key = identity.public_key(); // Create an envelope for the recipient let sender = MeshIdentity::generate(); let envelope = MeshEnvelope::new( &sender, &recipient_key, b"message for offline peer".to_vec(), 3600, 5, ); // Store message assert!(store.store(envelope.clone())); // Verify it's in the store let messages = store.peek(&recipient_key); assert_eq!(messages.len(), 1); assert_eq!(messages[0].payload, b"message for offline peer"); // Fetch (consume) messages let fetched = store.fetch(&recipient_key); assert_eq!(fetched.len(), 1); // Should be empty now let remaining = store.peek(&recipient_key); assert!(remaining.is_empty()); } #[tokio::test] async fn message_deduplication() { let mut store = MeshStore::new(100); let sender = MeshIdentity::generate(); let recipient = MeshIdentity::generate(); let envelope = MeshEnvelope::new( &sender, &recipient.public_key(), b"test payload".to_vec(), 3600, 5, ); // First store should succeed assert!(store.store(envelope.clone())); // Same envelope (same ID) should be rejected assert!(!store.store(envelope.clone())); // Only one message should be stored let messages = store.peek(&recipient.public_key()); assert_eq!(messages.len(), 1); } #[tokio::test] async fn envelope_v2_signature_verification() { let identity = MeshIdentity::generate(); let recipient = MeshAddress::from_bytes([0xEE; 16]); let envelope = MeshEnvelopeV2::new( &identity, recipient, b"test payload".to_vec(), 3600, 5, Priority::Normal, ); // Verify with correct key let pk = identity.public_key(); assert!(envelope.verify_with_key(&pk)); // Verify with wrong key should fail let other_identity = MeshIdentity::generate(); let other_pk = other_identity.public_key(); assert!(!envelope.verify_with_key(&other_pk)); } #[tokio::test] async fn envelope_v2_forwarding() { let sender = MeshIdentity::generate(); let recipient = MeshAddress::from_bytes([0xAA; 16]); let envelope = MeshEnvelopeV2::new( &sender, recipient, b"forward me".to_vec(), 3600, 3, // max 3 hops Priority::Normal, ); assert_eq!(envelope.hop_count, 0); assert!(envelope.can_forward()); // Forward once let fwd1 = envelope.forwarded(); assert_eq!(fwd1.hop_count, 1); assert!(fwd1.can_forward()); // Forward twice let fwd2 = fwd1.forwarded(); assert_eq!(fwd2.hop_count, 2); assert!(fwd2.can_forward()); // Forward thrice - should hit max let fwd3 = fwd2.forwarded(); assert_eq!(fwd3.hop_count, 3); assert!(!fwd3.can_forward()); // max_hops reached } #[tokio::test] async fn envelope_v2_broadcast() { let sender = MeshIdentity::generate(); let envelope = MeshEnvelopeV2::broadcast( &sender, b"broadcast message".to_vec(), 3600, 5, Priority::High, ); assert!(envelope.is_broadcast()); assert_eq!(envelope.recipient_addr, MeshAddress::BROADCAST); assert_eq!(envelope.priority(), Priority::High); } #[tokio::test] async fn metrics_tracking() { let metrics = MeshMetrics::new(); // Transport metrics let tcp_metrics = metrics.transport("tcp"); tcp_metrics.sent.inc_by(10); tcp_metrics.bytes_sent.inc_by(1024); assert_eq!(metrics.transport("tcp").sent.get(), 10); assert_eq!(metrics.transport("tcp").bytes_sent.get(), 1024); // Routing metrics metrics.routing.lookups.inc_by(100); metrics.routing.lookup_misses.inc_by(5); // Snapshot let snapshot = metrics.snapshot(); assert!(snapshot.uptime_secs < 2); // Just started assert_eq!(snapshot.routing.lookups, 100); assert_eq!(snapshot.routing.lookup_misses, 5); } #[tokio::test] async fn config_validation() { // Valid config let config = MeshConfig::default(); assert!(config.validate().is_ok()); // Invalid announce interval let mut bad_config = MeshConfig::default(); bad_config.announce.interval = Duration::from_secs(1); // Too short assert!(bad_config.validate().is_err()); // Invalid duty cycle let mut bad_config = MeshConfig::default(); bad_config.rate_limit.lora_duty_cycle = 2.0; // > 1.0 assert!(bad_config.validate().is_err()); // Constrained config should be valid let constrained = MeshConfig::constrained(); assert!(constrained.validate().is_ok()); } #[tokio::test] async fn shutdown_coordination() { let coordinator = Arc::new(ShutdownCoordinator::with_timeouts( Duration::from_millis(100), Duration::from_millis(50), )); let coord_clone = Arc::clone(&coordinator); // Spawn a task that registers itself let handle = tokio::spawn(async move { let _guard = coord_clone.register_task(); tokio::time::sleep(Duration::from_millis(50)).await; // guard dropped here, task complete }); // Start shutdown coordinator.shutdown().await; // Task should have completed handle.await.unwrap(); } #[tokio::test] async fn shutdown_signal_propagation() { let (trigger, mut signal) = ShutdownSignal::new(); assert!(!signal.is_triggered()); let handle = tokio::spawn(async move { signal.wait().await; true }); // Small delay to ensure task is waiting tokio::time::sleep(Duration::from_millis(10)).await; trigger.trigger(); let result = handle.await.unwrap(); assert!(result); } #[tokio::test] async fn concurrent_store_access() { let store = Arc::new(std::sync::RwLock::new(MeshStore::new(1000))); let recipient = MeshIdentity::generate(); let recipient_key = recipient.public_key(); // Spawn multiple writers let mut handles = Vec::new(); for i in 0..10 { let store_clone = Arc::clone(&store); let rk = recipient_key.clone(); let handle = tokio::spawn(async move { for j in 0..10 { let sender = MeshIdentity::generate(); let envelope = MeshEnvelope::new( &sender, &rk, format!("msg-{}-{}", i, j).into_bytes(), 3600, 5, ); let mut s = store_clone.write().unwrap(); s.store(envelope); } }); handles.push(handle); } // Wait for all writers for handle in handles { handle.await.unwrap(); } // Should have 100 messages let s = store.read().unwrap(); let messages = s.peek(&recipient_key); assert_eq!(messages.len(), 100); } #[tokio::test] async fn store_gc_removes_expired() { let mut store = MeshStore::new(100); let sender = MeshIdentity::generate(); let recipient = MeshIdentity::generate(); // Store with very short TTL let envelope = MeshEnvelope::new( &sender, &recipient.public_key(), b"short-lived".to_vec(), 1, // 1 second TTL 5, ); store.store(envelope); // Verify it's stored let before = store.peek(&recipient.public_key()); assert_eq!(before.len(), 1); // Wait for expiry tokio::time::sleep(Duration::from_secs(2)).await; // Run GC let removed = store.gc_expired(); assert_eq!(removed, 1); // Should be empty now let messages = store.peek(&recipient.public_key()); assert!(messages.is_empty()); } #[tokio::test] async fn mesh_address_derivation() { let identity = MeshIdentity::generate(); let pk = identity.public_key(); let addr1 = MeshAddress::from_public_key(&pk); let addr2 = MeshAddress::from_public_key(&pk); // Same key -> same address assert_eq!(addr1, addr2); // Address matches its key assert!(addr1.matches_key(&pk)); // Different key -> different address let other = MeshIdentity::generate(); assert!(!addr1.matches_key(&other.public_key())); } #[tokio::test] async fn envelope_v2_wire_roundtrip() { let sender = MeshIdentity::generate(); let recipient = MeshAddress::from_bytes([0xBB; 16]); let envelope = MeshEnvelopeV2::new( &sender, recipient, b"roundtrip test".to_vec(), 3600, 5, Priority::High, ); // Serialize let wire = envelope.to_wire(); // Deserialize let restored = MeshEnvelopeV2::from_wire(&wire).expect("deserialize failed"); assert_eq!(restored.payload, b"roundtrip test"); assert_eq!(restored.recipient_addr, recipient); assert_eq!(restored.priority(), Priority::High); assert!(restored.verify_with_key(&sender.public_key())); } #[tokio::test] async fn rate_limiter_per_peer_isolation() { let config = RateLimitConfig { message_per_peer_per_min: 2, ..Default::default() }; let limiter = RateLimiter::new(config); let peer1 = MeshAddress::from_bytes([1; 16]); let peer2 = MeshAddress::from_bytes([2; 16]); // Use up peer1's allowance assert!(limiter.check_message(&peer1).unwrap().is_allowed()); assert!(limiter.check_message(&peer1).unwrap().is_allowed()); assert!(!limiter.check_message(&peer1).unwrap().is_allowed()); // peer2 should still have its allowance assert!(limiter.check_message(&peer2).unwrap().is_allowed()); assert!(limiter.check_message(&peer2).unwrap().is_allowed()); assert!(!limiter.check_message(&peer2).unwrap().is_allowed()); } #[tokio::test] async fn config_toml_roundtrip() { let config = MeshConfig::default(); let toml = config.to_toml().expect("serialize"); // Should contain key config values assert!(toml.contains("announce")); assert!(toml.contains("routing")); assert!(toml.contains("rate_limit")); // Should parse back let restored = MeshConfig::from_toml(&toml).expect("parse"); assert_eq!(config.announce.max_hops, restored.announce.max_hops); }