feat: wire up federation message routing and P2P client fallback

- Enqueue handler checks resolve_destination() for remote recipients
- User resolution supports user@domain federated addresses
- P2P mesh commands (/mesh start, /mesh stop) wired into client session
- Federation routing integration tests with SqlStore
- Fix DashMap deadlock in validate_session()
This commit is contained in:
2026-03-09 20:38:38 +01:00
parent 872695e5f1
commit 416618f4cf
11 changed files with 265 additions and 32 deletions

View File

@@ -60,6 +60,8 @@ pub(crate) enum SlashCommand {
Rename { name: String },
History { count: usize },
/// Mesh subcommands: /mesh peers, /mesh server <addr>, etc.
MeshStart,
MeshStop,
MeshPeers,
MeshServer { addr: String },
MeshSend { peer_id: String, message: String },
@@ -173,6 +175,8 @@ pub(crate) fn parse_input(line: &str) -> Input {
Input::Slash(SlashCommand::History { count })
}
"/mesh" => match arg.as_deref() {
Some("start") => Input::Slash(SlashCommand::MeshStart),
Some("stop") => Input::Slash(SlashCommand::MeshStop),
Some("peers") => Input::Slash(SlashCommand::MeshPeers),
Some(rest) if rest.starts_with("server ") => {
let addr = rest.trim_start_matches("server ").trim().to_string();
@@ -221,7 +225,7 @@ pub(crate) fn parse_input(line: &str) -> Input {
Some("store") => Input::Slash(SlashCommand::MeshStore),
_ => {
display::print_error(
"usage: /mesh peers|server|send|broadcast|subscribe|route|identity|store"
"usage: /mesh start|stop|peers|server|send|broadcast|subscribe|route|identity|store"
);
Input::Empty
}
@@ -804,6 +808,8 @@ async fn handle_slash(
SlashCommand::GroupInfo => cmd_group_info(session, client).await,
SlashCommand::Rename { name } => cmd_rename(session, &name),
SlashCommand::History { count } => cmd_history(session, count),
SlashCommand::MeshStart => cmd_mesh_start(session).await,
SlashCommand::MeshStop => cmd_mesh_stop(session).await,
SlashCommand::MeshPeers => cmd_mesh_peers(),
SlashCommand::MeshServer { addr } => {
display::print_status(&format!(
@@ -811,9 +817,9 @@ async fn handle_slash(
));
Ok(())
}
SlashCommand::MeshSend { peer_id, message } => cmd_mesh_send(&peer_id, &message),
SlashCommand::MeshBroadcast { topic, message } => cmd_mesh_broadcast(&topic, &message),
SlashCommand::MeshSubscribe { topic } => cmd_mesh_subscribe(&topic),
SlashCommand::MeshSend { peer_id, message } => cmd_mesh_send(session, &peer_id, &message).await,
SlashCommand::MeshBroadcast { topic, message } => cmd_mesh_broadcast(session, &topic, &message).await,
SlashCommand::MeshSubscribe { topic } => cmd_mesh_subscribe(session, &topic),
SlashCommand::MeshRoute => cmd_mesh_route(session),
SlashCommand::MeshIdentity => cmd_mesh_identity(session),
SlashCommand::MeshStore => cmd_mesh_store(session),
@@ -862,6 +868,8 @@ pub(crate) fn print_help() {
display::print_status(" /rename <name> - Rename the current conversation");
display::print_status(" /history [N] - Show last N messages (default: 20)");
display::print_status(" /whoami - Show your identity");
display::print_status(" /mesh start - Start the P2P node for direct messaging");
display::print_status(" /mesh stop - Stop the P2P node");
display::print_status(" /mesh peers - Discover nearby qpc nodes via mDNS");
display::print_status(" /mesh server <host:port> - Show how to reconnect to a mesh node");
display::print_status(" /mesh send <peer> <msg> - Send a P2P message to a mesh peer");
@@ -1108,6 +1116,94 @@ pub(crate) async fn cmd_rotate_all_keys(
Ok(())
}
/// Start the P2P node for mesh messaging.
pub(crate) async fn cmd_mesh_start(session: &mut SessionState) -> anyhow::Result<()> {
#[cfg(feature = "mesh")]
{
if session.p2p_node.is_some() {
display::print_status("P2P node is already running");
return Ok(());
}
display::print_status("starting P2P node...");
// Try to load a persisted mesh identity or generate a new one.
let mesh_state_path = session.state_path.with_extension("mesh.json");
let mesh_id = if mesh_state_path.exists() {
match quicprochat_p2p::identity::MeshIdentity::load(&mesh_state_path) {
Ok(id) => {
display::print_status("loaded existing mesh identity");
Some(id)
}
Err(e) => {
display::print_status(&format!("could not load mesh identity: {e}, generating new"));
None
}
}
} else {
None
};
let node = if let Some(id) = mesh_id {
match quicprochat_p2p::P2pNode::start_with_mesh(None, id, 1000).await {
Ok(n) => n,
Err(e) => {
display::print_error(&format!("failed to start P2P node: {e}"));
return Ok(());
}
}
} else {
match quicprochat_p2p::P2pNode::start(None).await {
Ok(n) => n,
Err(e) => {
display::print_error(&format!("failed to start P2P node: {e}"));
return Ok(());
}
}
};
let node_id = node.node_id();
session.p2p_node = Some(Arc::new(node));
display::print_status(&format!("P2P node started: {}", node_id.fmt_short()));
}
#[cfg(not(feature = "mesh"))]
{
let _ = session;
display::print_error("requires --features mesh");
}
Ok(())
}
/// Stop the P2P node.
pub(crate) async fn cmd_mesh_stop(session: &mut SessionState) -> anyhow::Result<()> {
#[cfg(feature = "mesh")]
{
match session.p2p_node.take() {
Some(node) => {
// Try to unwrap the Arc; if there are other references, just drop our handle.
match Arc::try_unwrap(node) {
Ok(owned) => {
owned.close().await;
display::print_status("P2P node stopped");
}
Err(_arc) => {
display::print_status("P2P node reference released (other tasks may still hold it)");
}
}
}
None => {
display::print_status("P2P node is not running");
}
}
}
#[cfg(not(feature = "mesh"))]
{
let _ = session;
display::print_error("requires --features mesh");
}
Ok(())
}
/// Discover nearby qpc servers via mDNS (requires `--features mesh` build).
pub(crate) fn cmd_mesh_peers() -> anyhow::Result<()> {
use super::mesh_discovery::MeshDiscovery;
@@ -1137,46 +1233,98 @@ pub(crate) fn cmd_mesh_peers() -> anyhow::Result<()> {
Ok(())
}
/// Send a direct P2P mesh message (stub — P2pNode not yet wired into session).
pub(crate) fn cmd_mesh_send(peer_id: &str, message: &str) -> anyhow::Result<()> {
/// Send a direct P2P mesh message via the session's P2P node.
pub(crate) async fn cmd_mesh_send(session: &SessionState, peer_id: &str, message: &str) -> anyhow::Result<()> {
#[cfg(feature = "mesh")]
{
display::print_status(&format!("mesh send: would send to {peer_id}: {message}"));
display::print_status("(P2P node integration pending — message not actually sent)");
match &session.p2p_node {
Some(node) => {
// Parse the peer_id as an iroh PublicKey hex string and create an EndpointAddr.
let pk_bytes = hex::decode(peer_id)
.map_err(|e| anyhow::anyhow!("invalid peer_id hex: {e}"))?;
let pk_array: [u8; 32] = pk_bytes
.as_slice()
.try_into()
.map_err(|_| anyhow::anyhow!("peer_id must be 32 bytes (64 hex chars)"))?;
let pk = iroh::PublicKey::from_bytes(&pk_array);
let addr = iroh::EndpointAddr::from(pk);
match node.send(addr, message.as_bytes()).await {
Ok(()) => {
display::print_status(&format!("sent to {}: {message}", &peer_id[..8.min(peer_id.len())]));
}
Err(e) => {
display::print_error(&format!("P2P send failed: {e}"));
}
}
}
None => {
display::print_error("P2P node not started. Use /mesh start to initialize.");
}
}
}
#[cfg(not(feature = "mesh"))]
{
let _ = (peer_id, message);
let _ = (session, peer_id, message);
display::print_error("requires --features mesh");
}
Ok(())
}
/// Broadcast an encrypted message on a topic (stub — P2pNode not yet wired into session).
pub(crate) fn cmd_mesh_broadcast(topic: &str, message: &str) -> anyhow::Result<()> {
/// Broadcast an encrypted message on a topic via the session's P2P node.
pub(crate) async fn cmd_mesh_broadcast(session: &SessionState, topic: &str, message: &str) -> anyhow::Result<()> {
#[cfg(feature = "mesh")]
{
display::print_status(&format!("mesh broadcast to {topic}: {message}"));
display::print_status("(P2P node integration pending — message not actually sent)");
match &session.p2p_node {
Some(node) => {
match node.broadcast(topic, message.as_bytes()).await {
Ok(()) => {
display::print_status(&format!("broadcast to {topic}: {message}"));
}
Err(e) => {
display::print_error(&format!("broadcast failed: {e}"));
}
}
}
None => {
display::print_error("P2P node not started. Use /mesh start to initialize.");
}
}
}
#[cfg(not(feature = "mesh"))]
{
let _ = (topic, message);
let _ = (session, topic, message);
display::print_error("requires --features mesh");
}
Ok(())
}
/// Subscribe to a broadcast topic (stub — P2pNode not yet wired into session).
pub(crate) fn cmd_mesh_subscribe(topic: &str) -> anyhow::Result<()> {
/// Subscribe to a broadcast topic on the session's P2P node.
pub(crate) fn cmd_mesh_subscribe(session: &SessionState, topic: &str) -> anyhow::Result<()> {
#[cfg(feature = "mesh")]
{
display::print_status(&format!("subscribed to topic: {topic}"));
display::print_status("(P2P node integration pending — subscription is not persisted)");
match &session.p2p_node {
Some(node) => {
// Generate a random key for the subscription.
let key: [u8; 32] = rand::random();
match node.subscribe(topic, key) {
Ok(()) => {
display::print_status(&format!("subscribed to topic: {topic}"));
display::print_status(&format!("share this key to let others join: {}", hex::encode(key)));
}
Err(e) => {
display::print_error(&format!("subscribe failed: {e}"));
}
}
}
None => {
display::print_error("P2P node not started. Use /mesh start to initialize.");
}
}
}
#[cfg(not(feature = "mesh"))]
{
let _ = topic;
let _ = (session, topic);
display::print_error("requires --features mesh");
}
Ok(())