From 95ce8898fd4a08eea3e8a9f110a311d36767a6dc Mon Sep 17 00:00:00 2001 From: Christian Nennemann Date: Mon, 6 Apr 2026 21:43:28 +0200 Subject: [PATCH] feat: add mesh network visualizer - D3.js force-directed graph for real-time mesh visualization - WebSocket server (mesh-viz-bridge crate) for live updates - Demo mode with simulated topology - JSONL file upload for offline analysis - Optional viz logging in mesh_node forwarding --- Cargo.lock | 12 + Cargo.toml | 2 + crates/quicprochat-p2p/src/lib.rs | 1 + crates/quicprochat-p2p/src/mesh_node.rs | 8 +- crates/quicprochat-p2p/src/viz_log.rs | 45 +++ viz/bridge/Cargo.toml | 14 + viz/bridge/src/main.rs | 250 ++++++++++++ viz/mesh-graph.html | 493 ++++++++++++++++++++++++ viz/sample-feed.jsonl | 7 + 9 files changed, 831 insertions(+), 1 deletion(-) create mode 100644 crates/quicprochat-p2p/src/viz_log.rs create mode 100644 viz/bridge/Cargo.toml create mode 100644 viz/bridge/src/main.rs create mode 100644 viz/mesh-graph.html create mode 100644 viz/sample-feed.jsonl diff --git a/Cargo.lock b/Cargo.lock index 974f165..d4812de 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3202,6 +3202,18 @@ version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" +[[package]] +name = "mesh-viz-bridge" +version = "0.1.0" +dependencies = [ + "anyhow", + "clap", + "futures-util", + "serde_json", + "tokio", + "tokio-tungstenite", +] + [[package]] name = "meshservice" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 987a330..e72134e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,8 @@ members = [ "crates/quicprochat-p2p", # Generic decentralized service layer (FAPP, Housing, etc.) "crates/meshservice", + # WebSocket bridge for viz/mesh-graph.html (tails NDJSON → browsers) + "viz/bridge", ] [workspace.package] diff --git a/crates/quicprochat-p2p/src/lib.rs b/crates/quicprochat-p2p/src/lib.rs index 899a9f9..b36ede9 100644 --- a/crates/quicprochat-p2p/src/lib.rs +++ b/crates/quicprochat-p2p/src/lib.rs @@ -42,6 +42,7 @@ pub mod transport_iroh; pub mod transport_manager; pub mod transport_tcp; pub mod transport_lora; +pub mod viz_log; #[cfg(feature = "traffic-resistance")] pub mod traffic_resistance; diff --git a/crates/quicprochat-p2p/src/mesh_node.rs b/crates/quicprochat-p2p/src/mesh_node.rs index 8dc78d9..9d7c950 100644 --- a/crates/quicprochat-p2p/src/mesh_node.rs +++ b/crates/quicprochat-p2p/src/mesh_node.rs @@ -352,8 +352,14 @@ impl MeshNode { IncomingAction::Deliver(_) => { self.metrics.store.messages_delivered.inc(); } - IncomingAction::Forward { .. } => { + IncomingAction::Forward { + envelope: _, + next_hop, + } => { self.metrics.routing.announcements_forwarded.inc(); + let from = format!("{sender}"); + let to = next_hop.to_string(); + crate::viz_log::log_forward_hop(&from, &to, 0); } IncomingAction::Store(_) => { self.metrics.store.messages_stored.inc(); diff --git a/crates/quicprochat-p2p/src/viz_log.rs b/crates/quicprochat-p2p/src/viz_log.rs new file mode 100644 index 0000000..8b459d6 --- /dev/null +++ b/crates/quicprochat-p2p/src/viz_log.rs @@ -0,0 +1,45 @@ +//! Optional NDJSON events for the mesh graph visualizer (`viz/mesh-graph.html`). +//! +//! When the environment variable `QPC_MESH_VIZ_LOG` is set to a file path, one JSON object +//! per line is appended for selected mesh events. The `viz/bridge` binary can tail this file +//! and forward lines to the browser over WebSocket. + +use serde::Serialize; + +#[derive(Serialize)] +struct HopEvent<'a> { + #[serde(rename = "type")] + kind: &'static str, + from: &'a str, + to: &'a str, + ms: u64, +} + +/// Log a relay hop (forwarding to `next_hop`). No-op unless `QPC_MESH_VIZ_LOG` is set. +pub fn log_forward_hop(from_sender: &str, next_hop: &str, latency_ms: u64) { + let Ok(path) = std::env::var("QPC_MESH_VIZ_LOG") else { + return; + }; + let ev = HopEvent { + kind: "hop", + from: from_sender, + to: next_hop, + ms: latency_ms, + }; + let Ok(line) = serde_json::to_string(&ev) else { + return; + }; + append_line(&path, &line); +} + +fn append_line(path: &str, line: &str) { + use std::io::Write; + let Ok(mut f) = std::fs::OpenOptions::new() + .create(true) + .append(true) + .open(path) + else { + return; + }; + let _ = writeln!(f, "{line}"); +} diff --git a/viz/bridge/Cargo.toml b/viz/bridge/Cargo.toml new file mode 100644 index 0000000..8018e65 --- /dev/null +++ b/viz/bridge/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "mesh-viz-bridge" +version = "0.1.0" +edition = "2021" +description = "WebSocket bridge: tails NDJSON mesh viz events to browser clients" +license = "Apache-2.0 OR MIT" + +[dependencies] +anyhow = "1" +clap = { version = "4", features = ["derive"] } +futures-util = "0.3" +serde_json = "1" +tokio = { version = "1", features = ["macros", "rt-multi-thread", "signal", "time", "fs", "io-util", "net", "sync"] } +tokio-tungstenite = "0.26" diff --git a/viz/bridge/src/main.rs b/viz/bridge/src/main.rs new file mode 100644 index 0000000..b68ab6d --- /dev/null +++ b/viz/bridge/src/main.rs @@ -0,0 +1,250 @@ +//! Broadcasts newline-delimited JSON mesh events to all connected WebSocket clients. +//! +//! Sources: +//! - `--demo`: synthetic topology + hops (no file needed) +//! - `--file`: poll a JSONL file for appended lines (e.g. written by `QPC_MESH_VIZ_LOG`) + +use std::collections::HashSet; +use std::path::PathBuf; +use std::sync::Arc; + +use clap::Parser; +use futures_util::{SinkExt, StreamExt}; +use tokio::net::{TcpListener, TcpStream}; +use tokio::sync::broadcast; +use tokio_tungstenite::tungstenite::Message; + +#[derive(Parser, Debug)] +#[command(name = "mesh-viz-bridge")] +struct Args { + /// Listen address (WebSocket upgrade is raw TCP; use mesh-graph.html connect URL). + #[arg(long, default_value = "127.0.0.1:8765")] + listen: String, + + /// Poll this file for new NDJSON lines (append-only). + #[arg(long)] + file: Option, + + /// Emit synthetic events for UI development. + #[arg(long)] + demo: bool, + + /// Milliseconds between file polls when using `--file`. + #[arg(long, default_value = "250")] + poll_ms: u64, + + /// Milliseconds between demo events. + #[arg(long, default_value = "900")] + demo_interval_ms: u64, +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let args = Args::parse(); + + if args.file.is_some() && args.demo { + eprintln!("Use either --file or --demo, not both. Preferring --file."); + } + + let (tx, _rx) = broadcast::channel::(256); + let tx = Arc::new(tx); + + if args.demo && args.file.is_none() { + let txd = Arc::clone(&tx); + let interval = args.demo_interval_ms; + tokio::spawn(async move { + demo_loop(txd, interval).await; + }); + } else if let Some(ref path) = args.file { + let path = path.clone(); + let txf = Arc::clone(&tx); + let poll = args.poll_ms; + tokio::spawn(async move { + tail_file_loop(path, txf, poll).await; + }); + } else { + eprintln!("No --file or --demo: only WebSocket clients that receive externally pushed data would work."); + eprintln!("Start with: mesh-viz-bridge --demo OR mesh-viz-bridge --file ./mesh-viz-events.jsonl"); + } + + let listener = TcpListener::bind(&args.listen).await?; + eprintln!("mesh-viz-bridge WebSocket listening on ws://{}", args.listen); + + loop { + let (stream, addr) = listener.accept().await?; + let txc = Arc::clone(&tx); + tokio::spawn(async move { + if let Err(e) = handle_client(stream, txc).await { + eprintln!("client {} error: {}", addr, e); + } + }); + } +} + +async fn handle_client(stream: TcpStream, tx: Arc>) -> anyhow::Result<()> { + let ws = tokio_tungstenite::accept_async(stream).await?; + let (mut write, mut read) = ws.split(); + let mut rx = tx.subscribe(); + + loop { + tokio::select! { + msg = read.next() => { + match msg { + Some(Ok(Message::Close(_))) | None => break, + Some(Ok(Message::Ping(p))) => { + let _ = write.send(Message::Pong(p)).await; + } + Some(Err(e)) => return Err(e.into()), + _ => {} + } + } + line = rx.recv() => { + match line { + Ok(s) => write.send(Message::Text(s.into())).await?, + Err(broadcast::error::RecvError::Lagged(_)) => continue, + Err(broadcast::error::RecvError::Closed) => break, + } + } + } + } + Ok(()) +} + +async fn tail_file_loop(path: PathBuf, tx: Arc>, poll_ms: u64) { + let mut offset: u64 = 0; + loop { + match tokio::fs::File::open(&path).await { + Ok(file) => { + use tokio::io::{AsyncReadExt, AsyncSeekExt}; + let mut file = file; + if let Ok(meta) = file.metadata().await { + let len = meta.len(); + if len < offset { + offset = 0; + } + } + if file.seek(std::io::SeekFrom::Start(offset)).await.is_ok() { + let mut buf = Vec::new(); + if file.read_to_end(&mut buf).await.is_ok() { + offset = match file.metadata().await { + Ok(m) => m.len(), + Err(_) => offset + buf.len() as u64, + }; + let text = String::from_utf8_lossy(&buf); + for line in text.lines() { + let line = line.trim(); + if line.is_empty() { + continue; + } + let _ = tx.send(line.to_string()); + } + } + } + } + Err(_) => { + // Wait until file exists + } + } + tokio::time::sleep(std::time::Duration::from_millis(poll_ms)).await; + } +} + +async fn demo_loop(tx: Arc>, interval_ms: u64) { + let nodes = [ + ("n1", "alpha", "active", 12u64), + ("n2", "beta", "active", 18), + ("n3", "gamma", "idle", 45), + ("n4", "delta", "active", 22), + ]; + let mut tick: u64 = 0; + let mut present: HashSet<&'static str> = HashSet::new(); + loop { + // Simulate join/leave + if tick % 14 == 0 { + present.clear(); + present.insert("n1"); + present.insert("n2"); + } else if tick % 14 == 3 { + present.insert("n3"); + } else if tick % 14 == 7 { + present.insert("n4"); + } else if tick % 14 == 10 { + present.remove("n3"); + } else if tick % 14 == 12 { + let _ = tx.send( + serde_json::json!({ + "type": "node_status", + "id": "n2", + "status": "error", + "latency_ms": 999u64 + }) + .to_string(), + ); + } + + if tick % 14 != 12 { + let snap_nodes: Vec<_> = nodes + .iter() + .filter(|(id, _, _, _)| present.contains(id)) + .map(|(id, label, status, lat)| { + serde_json::json!({ + "id": id, + "label": label, + "status": status, + "latency_ms": lat + }) + }) + .collect(); + + let links: Vec<_> = { + let mut v = vec![]; + if present.contains("n1") && present.contains("n2") { + v.push(serde_json::json!({"source": "n1", "target": "n2"})); + } + if present.contains("n2") && present.contains("n3") { + v.push(serde_json::json!({"source": "n2", "target": "n3"})); + } + if present.contains("n3") && present.contains("n4") { + v.push(serde_json::json!({"source": "n3", "target": "n4"})); + } + if present.contains("n2") && present.contains("n4") { + v.push(serde_json::json!({"source": "n2", "target": "n4"})); + } + v + }; + + let _ = tx.send( + serde_json::json!({ + "type": "snapshot", + "nodes": snap_nodes, + "links": links + }) + .to_string(), + ); + } + + // Message hop animation + let hop_pairs = [ + ("n1", "n2"), + ("n2", "n3"), + ("n2", "n4"), + ("n3", "n4"), + ]; + let (a, b) = hop_pairs[(tick as usize) % hop_pairs.len()]; + if present.contains(a) && present.contains(b) { + let ms = 8 + (tick % 40); + let _ = tx.send( + serde_json::json!({ + "type": "hop", + "from": a, + "to": b, + "ms": ms + }) + .to_string(), + ); + } + + tick = tick.wrapping_add(1); + tokio::time::sleep(std::time::Duration::from_millis(interval_ms)).await; + } +} diff --git a/viz/mesh-graph.html b/viz/mesh-graph.html new file mode 100644 index 0000000..b44ceb9 --- /dev/null +++ b/viz/mesh-graph.html @@ -0,0 +1,493 @@ + + + + + + QuicProQuo mesh visualizer + + + + +
+

QuicProQuo mesh

+ disconnected + + + + + +
+
+ +
+
+
+ + + + diff --git a/viz/sample-feed.jsonl b/viz/sample-feed.jsonl new file mode 100644 index 0000000..b0d222d --- /dev/null +++ b/viz/sample-feed.jsonl @@ -0,0 +1,7 @@ +{"type":"snapshot","nodes":[{"id":"relay-a","label":"relay-a","status":"active","latency_ms":14},{"id":"relay-b","label":"relay-b","status":"active","latency_ms":21},{"id":"edge-c","label":"edge-c","status":"idle","latency_ms":48}],"links":[{"source":"relay-a","target":"relay-b"},{"source":"relay-b","target":"edge-c"}]} +{"type":"hop","from":"relay-a","to":"relay-b","ms":18} +{"type":"hop","from":"relay-b","to":"edge-c","ms":33} +{"type":"node_status","id":"edge-c","status":"error","latency_ms":500} +{"type":"node_status","id":"edge-c","status":"idle","latency_ms":55} +{"type":"node_leave","id":"edge-c"} +{"type":"snapshot","nodes":[{"id":"relay-a","label":"relay-a","status":"active","latency_ms":14},{"id":"relay-b","label":"relay-b","status":"active","latency_ms":21}],"links":[{"source":"relay-a","target":"relay-b"}]}