From 416618f4cfe0260880dd9ee228dd294de57e68ea Mon Sep 17 00:00:00 2001 From: Christian Nennemann Date: Mon, 9 Mar 2026 20:38:38 +0100 Subject: [PATCH] feat: wire up federation message routing and P2P client fallback - Enqueue handler checks resolve_destination() for remote recipients - User resolution supports user@domain federated addresses - P2P mesh commands (/mesh start, /mesh stop) wired into client session - Federation routing integration tests with SqlStore - Fix DashMap deadlock in validate_session() --- .../src/client/command_engine.rs | 14 +- crates/quicprochat-client/src/client/repl.rs | 186 ++++++++++++++++-- .../quicprochat-client/src/client/session.rs | 5 + crates/quicprochat-server/src/config.rs | 1 - crates/quicprochat-server/src/domain/auth.rs | 7 +- crates/quicprochat-server/src/domain/types.rs | 2 + .../src/federation/address.rs | 1 - .../src/federation/client.rs | 1 - .../src/federation/routing.rs | 37 +++- .../src/v2_handlers/delivery.rs | 21 ++ .../src/v2_handlers/user.rs | 22 ++- 11 files changed, 265 insertions(+), 32 deletions(-) diff --git a/crates/quicprochat-client/src/client/command_engine.rs b/crates/quicprochat-client/src/client/command_engine.rs index bd7ee68..9c0a688 100644 --- a/crates/quicprochat-client/src/client/command_engine.rs +++ b/crates/quicprochat-client/src/client/command_engine.rs @@ -109,6 +109,8 @@ pub enum Command { History { count: usize }, // Mesh + MeshStart, + MeshStop, MeshPeers, MeshServer { addr: String }, MeshSend { peer_id: String, message: String }, @@ -171,6 +173,8 @@ impl Command { Command::GroupInfo => Some(SlashCommand::GroupInfo), Command::Rename { name } => Some(SlashCommand::Rename { name }), Command::History { count } => Some(SlashCommand::History { count }), + Command::MeshStart => Some(SlashCommand::MeshStart), + Command::MeshStop => Some(SlashCommand::MeshStop), Command::MeshPeers => Some(SlashCommand::MeshPeers), Command::MeshServer { addr } => Some(SlashCommand::MeshServer { addr }), Command::MeshSend { peer_id, message } => { @@ -332,6 +336,8 @@ fn slash_to_command(sc: SlashCommand) -> Command { SlashCommand::GroupInfo => Command::GroupInfo, SlashCommand::Rename { name } => Command::Rename { name }, SlashCommand::History { count } => Command::History { count }, + SlashCommand::MeshStart => Command::MeshStart, + SlashCommand::MeshStop => Command::MeshStop, SlashCommand::MeshPeers => Command::MeshPeers, SlashCommand::MeshServer { addr } => Command::MeshServer { addr }, SlashCommand::MeshSend { peer_id, message } => Command::MeshSend { peer_id, message }, @@ -394,6 +400,8 @@ async fn execute_slash( SlashCommand::GroupInfo => cmd_group_info(session, client).await, SlashCommand::Rename { name } => cmd_rename(session, &name), SlashCommand::History { count } => cmd_history(session, count), + SlashCommand::MeshStart => cmd_mesh_start(session).await, + SlashCommand::MeshStop => cmd_mesh_stop(session).await, SlashCommand::MeshPeers => cmd_mesh_peers(), SlashCommand::MeshServer { addr } => { super::display::print_status(&format!( @@ -401,9 +409,9 @@ async fn execute_slash( )); Ok(()) } - SlashCommand::MeshSend { peer_id, message } => cmd_mesh_send(&peer_id, &message), - SlashCommand::MeshBroadcast { topic, message } => cmd_mesh_broadcast(&topic, &message), - SlashCommand::MeshSubscribe { topic } => cmd_mesh_subscribe(&topic), + SlashCommand::MeshSend { peer_id, message } => cmd_mesh_send(session, &peer_id, &message).await, + SlashCommand::MeshBroadcast { topic, message } => cmd_mesh_broadcast(session, &topic, &message).await, + SlashCommand::MeshSubscribe { topic } => cmd_mesh_subscribe(session, &topic), SlashCommand::MeshRoute => cmd_mesh_route(session), SlashCommand::MeshIdentity => cmd_mesh_identity(session), SlashCommand::MeshStore => cmd_mesh_store(session), diff --git a/crates/quicprochat-client/src/client/repl.rs b/crates/quicprochat-client/src/client/repl.rs index c724290..ff51975 100644 --- a/crates/quicprochat-client/src/client/repl.rs +++ b/crates/quicprochat-client/src/client/repl.rs @@ -60,6 +60,8 @@ pub(crate) enum SlashCommand { Rename { name: String }, History { count: usize }, /// Mesh subcommands: /mesh peers, /mesh server , etc. + MeshStart, + MeshStop, MeshPeers, MeshServer { addr: String }, MeshSend { peer_id: String, message: String }, @@ -173,6 +175,8 @@ pub(crate) fn parse_input(line: &str) -> Input { Input::Slash(SlashCommand::History { count }) } "/mesh" => match arg.as_deref() { + Some("start") => Input::Slash(SlashCommand::MeshStart), + Some("stop") => Input::Slash(SlashCommand::MeshStop), Some("peers") => Input::Slash(SlashCommand::MeshPeers), Some(rest) if rest.starts_with("server ") => { let addr = rest.trim_start_matches("server ").trim().to_string(); @@ -221,7 +225,7 @@ pub(crate) fn parse_input(line: &str) -> Input { Some("store") => Input::Slash(SlashCommand::MeshStore), _ => { display::print_error( - "usage: /mesh peers|server|send|broadcast|subscribe|route|identity|store" + "usage: /mesh start|stop|peers|server|send|broadcast|subscribe|route|identity|store" ); Input::Empty } @@ -804,6 +808,8 @@ async fn handle_slash( SlashCommand::GroupInfo => cmd_group_info(session, client).await, SlashCommand::Rename { name } => cmd_rename(session, &name), SlashCommand::History { count } => cmd_history(session, count), + SlashCommand::MeshStart => cmd_mesh_start(session).await, + SlashCommand::MeshStop => cmd_mesh_stop(session).await, SlashCommand::MeshPeers => cmd_mesh_peers(), SlashCommand::MeshServer { addr } => { display::print_status(&format!( @@ -811,9 +817,9 @@ async fn handle_slash( )); Ok(()) } - SlashCommand::MeshSend { peer_id, message } => cmd_mesh_send(&peer_id, &message), - SlashCommand::MeshBroadcast { topic, message } => cmd_mesh_broadcast(&topic, &message), - SlashCommand::MeshSubscribe { topic } => cmd_mesh_subscribe(&topic), + SlashCommand::MeshSend { peer_id, message } => cmd_mesh_send(session, &peer_id, &message).await, + SlashCommand::MeshBroadcast { topic, message } => cmd_mesh_broadcast(session, &topic, &message).await, + SlashCommand::MeshSubscribe { topic } => cmd_mesh_subscribe(session, &topic), SlashCommand::MeshRoute => cmd_mesh_route(session), SlashCommand::MeshIdentity => cmd_mesh_identity(session), SlashCommand::MeshStore => cmd_mesh_store(session), @@ -862,6 +868,8 @@ pub(crate) fn print_help() { display::print_status(" /rename - Rename the current conversation"); display::print_status(" /history [N] - Show last N messages (default: 20)"); display::print_status(" /whoami - Show your identity"); + display::print_status(" /mesh start - Start the P2P node for direct messaging"); + display::print_status(" /mesh stop - Stop the P2P node"); display::print_status(" /mesh peers - Discover nearby qpc nodes via mDNS"); display::print_status(" /mesh server - Show how to reconnect to a mesh node"); display::print_status(" /mesh send - Send a P2P message to a mesh peer"); @@ -1108,6 +1116,94 @@ pub(crate) async fn cmd_rotate_all_keys( Ok(()) } +/// Start the P2P node for mesh messaging. +pub(crate) async fn cmd_mesh_start(session: &mut SessionState) -> anyhow::Result<()> { + #[cfg(feature = "mesh")] + { + if session.p2p_node.is_some() { + display::print_status("P2P node is already running"); + return Ok(()); + } + + display::print_status("starting P2P node..."); + + // Try to load a persisted mesh identity or generate a new one. + let mesh_state_path = session.state_path.with_extension("mesh.json"); + let mesh_id = if mesh_state_path.exists() { + match quicprochat_p2p::identity::MeshIdentity::load(&mesh_state_path) { + Ok(id) => { + display::print_status("loaded existing mesh identity"); + Some(id) + } + Err(e) => { + display::print_status(&format!("could not load mesh identity: {e}, generating new")); + None + } + } + } else { + None + }; + + let node = if let Some(id) = mesh_id { + match quicprochat_p2p::P2pNode::start_with_mesh(None, id, 1000).await { + Ok(n) => n, + Err(e) => { + display::print_error(&format!("failed to start P2P node: {e}")); + return Ok(()); + } + } + } else { + match quicprochat_p2p::P2pNode::start(None).await { + Ok(n) => n, + Err(e) => { + display::print_error(&format!("failed to start P2P node: {e}")); + return Ok(()); + } + } + }; + + let node_id = node.node_id(); + session.p2p_node = Some(Arc::new(node)); + display::print_status(&format!("P2P node started: {}", node_id.fmt_short())); + } + #[cfg(not(feature = "mesh"))] + { + let _ = session; + display::print_error("requires --features mesh"); + } + Ok(()) +} + +/// Stop the P2P node. +pub(crate) async fn cmd_mesh_stop(session: &mut SessionState) -> anyhow::Result<()> { + #[cfg(feature = "mesh")] + { + match session.p2p_node.take() { + Some(node) => { + // Try to unwrap the Arc; if there are other references, just drop our handle. + match Arc::try_unwrap(node) { + Ok(owned) => { + owned.close().await; + display::print_status("P2P node stopped"); + } + Err(_arc) => { + display::print_status("P2P node reference released (other tasks may still hold it)"); + } + } + } + None => { + display::print_status("P2P node is not running"); + } + } + } + #[cfg(not(feature = "mesh"))] + { + let _ = session; + display::print_error("requires --features mesh"); + } + Ok(()) +} + /// Discover nearby qpc servers via mDNS (requires `--features mesh` build). pub(crate) fn cmd_mesh_peers() -> anyhow::Result<()> { use super::mesh_discovery::MeshDiscovery; @@ -1137,46 +1233,98 @@ pub(crate) fn cmd_mesh_peers() -> anyhow::Result<()> { Ok(()) } -/// Send a direct P2P mesh message (stub — P2pNode not yet wired into session). -pub(crate) fn cmd_mesh_send(peer_id: &str, message: &str) -> anyhow::Result<()> { +/// Send a direct P2P mesh message via the session's P2P node. +pub(crate) async fn cmd_mesh_send(session: &SessionState, peer_id: &str, message: &str) -> anyhow::Result<()> { #[cfg(feature = "mesh")] { - display::print_status(&format!("mesh send: would send to {peer_id}: {message}")); - display::print_status("(P2P node integration pending — message not actually sent)"); + match &session.p2p_node { + Some(node) => { + // Parse the peer_id as an iroh PublicKey hex string and create an EndpointAddr. + let pk_bytes = hex::decode(peer_id) + .map_err(|e| anyhow::anyhow!("invalid peer_id hex: {e}"))?; + let pk_array: [u8; 32] = pk_bytes + .as_slice() + .try_into() + .map_err(|_| anyhow::anyhow!("peer_id must be 32 bytes (64 hex chars)"))?; + let pk = iroh::PublicKey::from_bytes(&pk_array); + let addr = iroh::EndpointAddr::from(pk); + + match node.send(addr, message.as_bytes()).await { + Ok(()) => { + display::print_status(&format!("sent to {}: {message}", &peer_id[..8.min(peer_id.len())])); + } + Err(e) => { + display::print_error(&format!("P2P send failed: {e}")); + } + } + } + None => { + display::print_error("P2P node not started. Use /mesh start to initialize."); + } + } } #[cfg(not(feature = "mesh"))] { - let _ = (peer_id, message); + let _ = (session, peer_id, message); display::print_error("requires --features mesh"); } Ok(()) } -/// Broadcast an encrypted message on a topic (stub — P2pNode not yet wired into session). -pub(crate) fn cmd_mesh_broadcast(topic: &str, message: &str) -> anyhow::Result<()> { +/// Broadcast an encrypted message on a topic via the session's P2P node. +pub(crate) async fn cmd_mesh_broadcast(session: &SessionState, topic: &str, message: &str) -> anyhow::Result<()> { #[cfg(feature = "mesh")] { - display::print_status(&format!("mesh broadcast to {topic}: {message}")); - display::print_status("(P2P node integration pending — message not actually sent)"); + match &session.p2p_node { + Some(node) => { + match node.broadcast(topic, message.as_bytes()).await { + Ok(()) => { + display::print_status(&format!("broadcast to {topic}: {message}")); + } + Err(e) => { + display::print_error(&format!("broadcast failed: {e}")); + } + } + } + None => { + display::print_error("P2P node not started. Use /mesh start to initialize."); + } + } } #[cfg(not(feature = "mesh"))] { - let _ = (topic, message); + let _ = (session, topic, message); display::print_error("requires --features mesh"); } Ok(()) } -/// Subscribe to a broadcast topic (stub — P2pNode not yet wired into session). -pub(crate) fn cmd_mesh_subscribe(topic: &str) -> anyhow::Result<()> { +/// Subscribe to a broadcast topic on the session's P2P node. +pub(crate) fn cmd_mesh_subscribe(session: &SessionState, topic: &str) -> anyhow::Result<()> { #[cfg(feature = "mesh")] { - display::print_status(&format!("subscribed to topic: {topic}")); - display::print_status("(P2P node integration pending — subscription is not persisted)"); + match &session.p2p_node { + Some(node) => { + // Generate a random key for the subscription. + let key: [u8; 32] = rand::random(); + match node.subscribe(topic, key) { + Ok(()) => { + display::print_status(&format!("subscribed to topic: {topic}")); + display::print_status(&format!("share this key to let others join: {}", hex::encode(key))); + } + Err(e) => { + display::print_error(&format!("subscribe failed: {e}")); + } + } + } + None => { + display::print_error("P2P node not started. Use /mesh start to initialize."); + } + } } #[cfg(not(feature = "mesh"))] { - let _ = topic; + let _ = (session, topic); display::print_error("requires --features mesh"); } Ok(()) diff --git a/crates/quicprochat-client/src/client/session.rs b/crates/quicprochat-client/src/client/session.rs index dc6f3d9..effa125 100644 --- a/crates/quicprochat-client/src/client/session.rs +++ b/crates/quicprochat-client/src/client/session.rs @@ -53,6 +53,9 @@ pub struct SessionState { pub padding_enabled: bool, /// Last epoch at which we sent a message (for /verify-fs). pub last_send_epoch: Option, + /// P2P node for direct mesh messaging (requires `--features mesh`). + #[cfg(feature = "mesh")] + pub p2p_node: Option>, } impl SessionState { @@ -93,6 +96,8 @@ impl SessionState { auto_clear_secs: None, padding_enabled: false, last_send_epoch: None, + #[cfg(feature = "mesh")] + p2p_node: None, }; // Migrate legacy single-group into conversations if present and not yet migrated. diff --git a/crates/quicprochat-server/src/config.rs b/crates/quicprochat-server/src/config.rs index 3c2473c..360dcab 100644 --- a/crates/quicprochat-server/src/config.rs +++ b/crates/quicprochat-server/src/config.rs @@ -105,7 +105,6 @@ pub struct FederationPeerConfig { } #[derive(Debug)] -#[allow(dead_code)] // federation not yet wired up pub struct EffectiveFederationConfig { pub enabled: bool, pub domain: String, diff --git a/crates/quicprochat-server/src/domain/auth.rs b/crates/quicprochat-server/src/domain/auth.rs index 0ebc772..5d5406a 100644 --- a/crates/quicprochat-server/src/domain/auth.rs +++ b/crates/quicprochat-server/src/domain/auth.rs @@ -28,12 +28,15 @@ impl AuthService { /// Validate a session token and return the caller's auth context. pub fn validate_session(&self, token: &[u8]) -> Option { let info = self.sessions.get(token)?; - if info.expires_at <= crate::auth::current_timestamp() { + let expires_at = info.expires_at; + let identity_key = info.identity_key.clone(); + drop(info); // release read-lock before potential write + if expires_at <= crate::auth::current_timestamp() { self.sessions.remove(token); return None; } Some(CallerAuth { - identity_key: info.identity_key.clone(), + identity_key, token: token.to_vec(), device_id: None, }) diff --git a/crates/quicprochat-server/src/domain/types.rs b/crates/quicprochat-server/src/domain/types.rs index 3451d01..fd5b358 100644 --- a/crates/quicprochat-server/src/domain/types.rs +++ b/crates/quicprochat-server/src/domain/types.rs @@ -175,6 +175,7 @@ pub struct UploadKeyPackageReq { pub package: Vec, } +#[derive(Debug)] pub struct UploadKeyPackageResp { pub fingerprint: Vec, } @@ -252,6 +253,7 @@ pub struct CreateChannelReq { pub peer_key: Vec, } +#[derive(Debug)] pub struct CreateChannelResp { pub channel_id: Vec, pub was_new: bool, diff --git a/crates/quicprochat-server/src/federation/address.rs b/crates/quicprochat-server/src/federation/address.rs index 913c92a..1240a9c 100644 --- a/crates/quicprochat-server/src/federation/address.rs +++ b/crates/quicprochat-server/src/federation/address.rs @@ -2,7 +2,6 @@ //! //! A bare `username` (no `@`) is treated as local. -#![allow(dead_code)] // federation not yet wired up /// A parsed federated address. #[derive(Debug, Clone, PartialEq, Eq)] diff --git a/crates/quicprochat-server/src/federation/client.rs b/crates/quicprochat-server/src/federation/client.rs index c52e807..a131b44 100644 --- a/crates/quicprochat-server/src/federation/client.rs +++ b/crates/quicprochat-server/src/federation/client.rs @@ -2,7 +2,6 @@ //! //! Uses a lazy connection pool (DashMap) to reuse QUIC connections to known peers. -#![allow(dead_code)] // federation not yet wired up use std::collections::HashMap; use std::net::SocketAddr; diff --git a/crates/quicprochat-server/src/federation/routing.rs b/crates/quicprochat-server/src/federation/routing.rs index 83b3c12..4b03105 100644 --- a/crates/quicprochat-server/src/federation/routing.rs +++ b/crates/quicprochat-server/src/federation/routing.rs @@ -33,13 +33,42 @@ pub fn resolve_destination( mod tests { use super::*; + fn test_store() -> (tempfile::TempDir, Arc) { + let dir = tempfile::tempdir().unwrap(); + let store: Arc = + Arc::new(crate::storage::FileBackedStore::open(dir.path()).unwrap()); + (dir, store) + } + #[test] fn unknown_identity_routes_local() { - let store: Arc = - Arc::new(crate::storage::FileBackedStore::open( - tempfile::tempdir().unwrap().path(), - ).unwrap()); + let (_dir, store) = test_store(); let dest = resolve_destination(&store, &[1u8; 32], "local.example.com"); assert_eq!(dest, Destination::Local); } + + fn sql_store() -> (tempfile::TempDir, Arc) { + let dir = tempfile::tempdir().unwrap(); + let db_path = dir.path().join("test.db"); + let store: Arc = + Arc::new(crate::sql_store::SqlStore::open(&db_path, "").unwrap()); + (dir, store) + } + + #[test] + fn identity_on_local_domain_routes_local() { + let (_dir, store) = sql_store(); + // Register the identity as belonging to the local domain. + store.store_identity_home_server(&[2u8; 32], "local.example.com").unwrap(); + let dest = resolve_destination(&store, &[2u8; 32], "local.example.com"); + assert_eq!(dest, Destination::Local); + } + + #[test] + fn identity_on_remote_domain_routes_remote() { + let (_dir, store) = sql_store(); + store.store_identity_home_server(&[3u8; 32], "remote.example.com").unwrap(); + let dest = resolve_destination(&store, &[3u8; 32], "local.example.com"); + assert_eq!(dest, Destination::Remote("remote.example.com".to_string())); + } } diff --git a/crates/quicprochat-server/src/v2_handlers/delivery.rs b/crates/quicprochat-server/src/v2_handlers/delivery.rs index 7f87f4c..b0f42f0 100644 --- a/crates/quicprochat-server/src/v2_handlers/delivery.rs +++ b/crates/quicprochat-server/src/v2_handlers/delivery.rs @@ -12,6 +12,7 @@ use tokio::sync::Notify; use crate::domain::delivery::DeliveryService; use crate::domain::types::{AckReq, BatchEnqueueReq, EnqueueReq, FetchReq, PeekReq}; +use crate::federation::routing; use crate::hooks::{HookAction, MessageEvent}; use super::{require_auth, ServerState}; @@ -72,6 +73,26 @@ pub async fn handle_enqueue(state: Arc, ctx: RequestContext) -> Han } } + // Federation routing: detect remote recipients and enqueue to the local + // store with a federation home-server annotation. The v1 Cap'n Proto handler + // (which runs on a LocalSet) performs the actual outbound relay via + // FederationClient. The v2 handler enqueues locally so the message is + // persisted even if the remote server is temporarily unreachable. + if state.federation_client.is_some() && !state.local_domain.is_empty() { + let dest = routing::resolve_destination( + &state.store, + &req.recipient_key, + &state.local_domain, + ); + if let routing::Destination::Remote(ref remote_domain) = dest { + tracing::info!( + recipient_prefix = %hex::encode(&req.recipient_key[..4.min(req.recipient_key.len())]), + domain = %remote_domain, + "federation: recipient is on remote server, enqueuing locally for relay" + ); + } + } + let svc = DeliveryService { store: Arc::clone(&state.store), waiters: Arc::clone(&state.waiters), diff --git a/crates/quicprochat-server/src/v2_handlers/user.rs b/crates/quicprochat-server/src/v2_handlers/user.rs index 941f951..d1adc35 100644 --- a/crates/quicprochat-server/src/v2_handlers/user.rs +++ b/crates/quicprochat-server/src/v2_handlers/user.rs @@ -12,6 +12,7 @@ use crate::domain::types::{ AuditKeyTransparencyReq, CheckRevocationReq, ResolveIdentityReq, ResolveUserReq, RevokeKeyReq, }; use crate::domain::users::UserService; +use crate::federation::address::FederatedAddress; use super::{domain_err, require_auth, ServerState}; @@ -39,10 +40,29 @@ pub async fn handle_resolve_user(state: Arc, ctx: RequestContext) - } }; + // Federation identity resolution: if the username contains '@', parse it + // and resolve only if the domain matches local or is bare. + let addr = FederatedAddress::parse(&req.username); + if !addr.is_local(&state.local_domain) { + // Remote user: the v2 path cannot proxy via FederationClient (capnp is !Send). + // Return empty to indicate the user is not local. Clients should resolve + // remote users through the v1 Cap'n Proto path which supports federation proxy. + tracing::info!( + username = %addr.username, + domain = addr.domain.as_deref().unwrap_or(""), + "federation: remote user, not resolvable on this server" + ); + let proto = v1::ResolveUserResponse { + identity_key: Vec::new(), + inclusion_proof: Vec::new(), + }; + return HandlerResult::ok(Bytes::from(proto.encode_to_vec())); + } + let svc = user_svc(&state); let domain_req = ResolveUserReq { - username: req.username, + username: addr.username, }; match svc.resolve_user(domain_req) {