feat: add mesh network visualizer

- D3.js force-directed graph for real-time mesh visualization
- WebSocket server (mesh-viz-bridge crate) for live updates
- Demo mode with simulated topology
- JSONL file upload for offline analysis
- Optional viz logging in mesh_node forwarding
This commit is contained in:
2026-04-06 21:43:28 +02:00
parent 99d36679c8
commit 95ce8898fd
9 changed files with 831 additions and 1 deletions

250
viz/bridge/src/main.rs Normal file
View File

@@ -0,0 +1,250 @@
//! Broadcasts newline-delimited JSON mesh events to all connected WebSocket clients.
//!
//! Sources:
//! - `--demo`: synthetic topology + hops (no file needed)
//! - `--file`: poll a JSONL file for appended lines (e.g. written by `QPC_MESH_VIZ_LOG`)
use std::collections::HashSet;
use std::path::PathBuf;
use std::sync::Arc;
use clap::Parser;
use futures_util::{SinkExt, StreamExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::broadcast;
use tokio_tungstenite::tungstenite::Message;
#[derive(Parser, Debug)]
#[command(name = "mesh-viz-bridge")]
struct Args {
/// Listen address (WebSocket upgrade is raw TCP; use mesh-graph.html connect URL).
#[arg(long, default_value = "127.0.0.1:8765")]
listen: String,
/// Poll this file for new NDJSON lines (append-only).
#[arg(long)]
file: Option<PathBuf>,
/// Emit synthetic events for UI development.
#[arg(long)]
demo: bool,
/// Milliseconds between file polls when using `--file`.
#[arg(long, default_value = "250")]
poll_ms: u64,
/// Milliseconds between demo events.
#[arg(long, default_value = "900")]
demo_interval_ms: u64,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let args = Args::parse();
if args.file.is_some() && args.demo {
eprintln!("Use either --file or --demo, not both. Preferring --file.");
}
let (tx, _rx) = broadcast::channel::<String>(256);
let tx = Arc::new(tx);
if args.demo && args.file.is_none() {
let txd = Arc::clone(&tx);
let interval = args.demo_interval_ms;
tokio::spawn(async move {
demo_loop(txd, interval).await;
});
} else if let Some(ref path) = args.file {
let path = path.clone();
let txf = Arc::clone(&tx);
let poll = args.poll_ms;
tokio::spawn(async move {
tail_file_loop(path, txf, poll).await;
});
} else {
eprintln!("No --file or --demo: only WebSocket clients that receive externally pushed data would work.");
eprintln!("Start with: mesh-viz-bridge --demo OR mesh-viz-bridge --file ./mesh-viz-events.jsonl");
}
let listener = TcpListener::bind(&args.listen).await?;
eprintln!("mesh-viz-bridge WebSocket listening on ws://{}", args.listen);
loop {
let (stream, addr) = listener.accept().await?;
let txc = Arc::clone(&tx);
tokio::spawn(async move {
if let Err(e) = handle_client(stream, txc).await {
eprintln!("client {} error: {}", addr, e);
}
});
}
}
async fn handle_client(stream: TcpStream, tx: Arc<broadcast::Sender<String>>) -> anyhow::Result<()> {
let ws = tokio_tungstenite::accept_async(stream).await?;
let (mut write, mut read) = ws.split();
let mut rx = tx.subscribe();
loop {
tokio::select! {
msg = read.next() => {
match msg {
Some(Ok(Message::Close(_))) | None => break,
Some(Ok(Message::Ping(p))) => {
let _ = write.send(Message::Pong(p)).await;
}
Some(Err(e)) => return Err(e.into()),
_ => {}
}
}
line = rx.recv() => {
match line {
Ok(s) => write.send(Message::Text(s.into())).await?,
Err(broadcast::error::RecvError::Lagged(_)) => continue,
Err(broadcast::error::RecvError::Closed) => break,
}
}
}
}
Ok(())
}
async fn tail_file_loop(path: PathBuf, tx: Arc<broadcast::Sender<String>>, poll_ms: u64) {
let mut offset: u64 = 0;
loop {
match tokio::fs::File::open(&path).await {
Ok(file) => {
use tokio::io::{AsyncReadExt, AsyncSeekExt};
let mut file = file;
if let Ok(meta) = file.metadata().await {
let len = meta.len();
if len < offset {
offset = 0;
}
}
if file.seek(std::io::SeekFrom::Start(offset)).await.is_ok() {
let mut buf = Vec::new();
if file.read_to_end(&mut buf).await.is_ok() {
offset = match file.metadata().await {
Ok(m) => m.len(),
Err(_) => offset + buf.len() as u64,
};
let text = String::from_utf8_lossy(&buf);
for line in text.lines() {
let line = line.trim();
if line.is_empty() {
continue;
}
let _ = tx.send(line.to_string());
}
}
}
}
Err(_) => {
// Wait until file exists
}
}
tokio::time::sleep(std::time::Duration::from_millis(poll_ms)).await;
}
}
async fn demo_loop(tx: Arc<broadcast::Sender<String>>, interval_ms: u64) {
let nodes = [
("n1", "alpha", "active", 12u64),
("n2", "beta", "active", 18),
("n3", "gamma", "idle", 45),
("n4", "delta", "active", 22),
];
let mut tick: u64 = 0;
let mut present: HashSet<&'static str> = HashSet::new();
loop {
// Simulate join/leave
if tick % 14 == 0 {
present.clear();
present.insert("n1");
present.insert("n2");
} else if tick % 14 == 3 {
present.insert("n3");
} else if tick % 14 == 7 {
present.insert("n4");
} else if tick % 14 == 10 {
present.remove("n3");
} else if tick % 14 == 12 {
let _ = tx.send(
serde_json::json!({
"type": "node_status",
"id": "n2",
"status": "error",
"latency_ms": 999u64
})
.to_string(),
);
}
if tick % 14 != 12 {
let snap_nodes: Vec<_> = nodes
.iter()
.filter(|(id, _, _, _)| present.contains(id))
.map(|(id, label, status, lat)| {
serde_json::json!({
"id": id,
"label": label,
"status": status,
"latency_ms": lat
})
})
.collect();
let links: Vec<_> = {
let mut v = vec![];
if present.contains("n1") && present.contains("n2") {
v.push(serde_json::json!({"source": "n1", "target": "n2"}));
}
if present.contains("n2") && present.contains("n3") {
v.push(serde_json::json!({"source": "n2", "target": "n3"}));
}
if present.contains("n3") && present.contains("n4") {
v.push(serde_json::json!({"source": "n3", "target": "n4"}));
}
if present.contains("n2") && present.contains("n4") {
v.push(serde_json::json!({"source": "n2", "target": "n4"}));
}
v
};
let _ = tx.send(
serde_json::json!({
"type": "snapshot",
"nodes": snap_nodes,
"links": links
})
.to_string(),
);
}
// Message hop animation
let hop_pairs = [
("n1", "n2"),
("n2", "n3"),
("n2", "n4"),
("n3", "n4"),
];
let (a, b) = hop_pairs[(tick as usize) % hop_pairs.len()];
if present.contains(a) && present.contains(b) {
let ms = 8 + (tick % 40);
let _ = tx.send(
serde_json::json!({
"type": "hop",
"from": a,
"to": b,
"ms": ms
})
.to_string(),
);
}
tick = tick.wrapping_add(1);
tokio::time::sleep(std::time::Duration::from_millis(interval_ms)).await;
}
}