feat: DM epoch fix, federation relay, and mDNS mesh discovery

- schema: createChannel returns wasNew :Bool to elect the MLS initiator
  unambiguously; prevents duplicate group creation on concurrent /dm calls
- core: group helpers for epoch tracking and key-package lifecycle
- server: federation subsystem — mTLS QUIC server-to-server relay with
  Cap'n Proto RPC; enqueue/batchEnqueue relay unknown recipients to their
  home domain via FederationClient
- server: mDNS _quicproquo._udp.local. service announcement on startup
- server: storage + sql_store — identity_exists, peek/ack, federation
  home-server lookup helpers
- client: /mesh peers REPL command (mDNS discovery, feature = "mesh")
- client: MeshDiscovery — background mDNS browse with ServiceDaemon
- client: was_new=false path in cmd_dm waits for peer Welcome instead of
  creating a duplicate initiator group
- p2p: fix ALPN from quicnprotochat/p2p/1 → quicproquo/p2p/1
- workspace: re-include quicproquo-p2p in members
This commit is contained in:
2026-03-03 14:41:56 +01:00
parent e24497bf90
commit c8398d6cb7
27 changed files with 3375 additions and 303 deletions

1903
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -7,11 +7,11 @@ members = [
"crates/quicproquo-client",
"crates/quicproquo-gui",
"crates/quicproquo-mobile",
# P2P crate uses iroh (~90 extra deps). Kept in the workspace so it can be
# referenced as an optional dependency; only compiled when the `mesh` feature
# is enabled on quicproquo-client.
"crates/quicproquo-p2p",
]
# P2P-Crate (iroh-Transport) ist vom Default-Build ausgeschlossen,
# um ~90 exklusive iroh-Dependencies nicht mitzukompilieren.
# Quellcode bleibt im Repo für spätere Integration.
exclude = ["crates/quicproquo-p2p"]
# Shared dependency versions — bump here to affect the whole workspace.
[workspace.dependencies]

View File

@@ -59,6 +59,17 @@ hex = { workspace = true }
# Secure password prompting (no echo)
rpassword = "5"
# mDNS discovery for mesh mode (Freifunk). Only compiled with --features mesh.
mdns-sd = { version = "0.12", optional = true }
# Optional P2P transport for direct node-to-node messaging.
quicproquo-p2p = { path = "../quicproquo-p2p", optional = true }
[features]
# Enable mesh-mode features: mDNS local peer discovery + P2P transport.
# Build: cargo build -p quicproquo-client --features mesh
mesh = ["dep:mdns-sd", "dep:quicproquo-p2p"]
[dev-dependencies]
dashmap = { workspace = true }
assert_cmd = "2"

View File

@@ -7,7 +7,7 @@ use opaque_ke::{
};
use quicproquo_core::{
generate_key_package, hybrid_decrypt, hybrid_encrypt, opaque_auth::OpaqueSuite,
GroupMember, HybridKeypair, IdentityKeypair,
GroupMember, HybridKeypair, IdentityKeypair, ReceivedMessage,
};
use super::{
@@ -376,7 +376,7 @@ pub(crate) async fn opaque_register(
/// Perform OPAQUE login and return the raw session token bytes.
/// Does NOT require init_auth() — OPAQUE RPCs are unauthenticated.
pub(crate) async fn opaque_login(
pub async fn opaque_login(
client: &quicproquo_proto::node_capnp::node_service::Client,
username: &str,
password: &str,
@@ -725,9 +725,10 @@ pub async fn cmd_demo_group(server: &str, ca_cert: &Path, server_name: &str) ->
.context("joiner: missing ciphertext from DS")?;
let inner_creator_joiner =
hybrid_decrypt(&joiner_hybrid, raw_creator_joiner, b"", b"").context("hybrid decrypt failed")?;
let plaintext_creator_joiner = joiner
.receive_message(&inner_creator_joiner)?
.context("expected application message")?;
let plaintext_creator_joiner = match joiner.receive_message(&inner_creator_joiner)? {
ReceivedMessage::Application(pt) => pt,
other => anyhow::bail!("expected application message, got {other:?}"),
};
println!(
"creator -> joiner plaintext: {}",
String::from_utf8_lossy(&plaintext_creator_joiner)
@@ -749,9 +750,10 @@ pub async fn cmd_demo_group(server: &str, ca_cert: &Path, server_name: &str) ->
.context("creator: missing ciphertext from DS")?;
let inner_joiner_creator =
hybrid_decrypt(&creator_hybrid, raw_joiner_creator, b"", b"").context("hybrid decrypt failed")?;
let plaintext_joiner_creator = creator
.receive_message(&inner_joiner_creator)?
.context("expected application message")?;
let plaintext_joiner_creator = match creator.receive_message(&inner_joiner_creator)? {
ReceivedMessage::Application(pt) => pt,
other => anyhow::bail!("expected application message, got {other:?}"),
};
println!(
"joiner -> creator plaintext: {}",
String::from_utf8_lossy(&plaintext_joiner_creator)
@@ -1013,8 +1015,8 @@ pub async fn cmd_recv(
}
};
match member.receive_message(&mls_payload) {
Ok(Some(pt)) => println!("[{idx}] plaintext: {}", String::from_utf8_lossy(&pt)),
Ok(None) => println!("[{idx}] commit applied"),
Ok(ReceivedMessage::Application(pt)) => println!("[{idx}] plaintext: {}", String::from_utf8_lossy(&pt)),
Ok(ReceivedMessage::StateChanged) | Ok(ReceivedMessage::SelfRemoved) => println!("[{idx}] commit applied"),
Err(_) => pending.push((idx, mls_payload)),
}
}
@@ -1023,11 +1025,11 @@ pub async fn cmd_recv(
let before = pending.len();
pending.retain(|(idx, mls_payload)| {
match member.receive_message(mls_payload) {
Ok(Some(pt)) => {
Ok(ReceivedMessage::Application(pt)) => {
println!("[{idx}/retry] plaintext: {}", String::from_utf8_lossy(&pt));
false
}
Ok(None) => {
Ok(ReceivedMessage::StateChanged) | Ok(ReceivedMessage::SelfRemoved) => {
println!("[{idx}/retry] commit applied");
false
}
@@ -1078,8 +1080,8 @@ pub async fn receive_pending_plaintexts(
Err(_) => continue,
};
match member.receive_message(&mls_payload) {
Ok(Some(pt)) => plaintexts.push(pt),
Ok(None) => {}
Ok(ReceivedMessage::Application(pt)) => plaintexts.push(pt),
Ok(ReceivedMessage::StateChanged) | Ok(ReceivedMessage::SelfRemoved) => {}
Err(_) => pending.push(mls_payload),
}
}
@@ -1088,11 +1090,11 @@ pub async fn receive_pending_plaintexts(
let before = pending.len();
pending.retain(|mls_payload| {
match member.receive_message(mls_payload) {
Ok(Some(pt)) => {
Ok(ReceivedMessage::Application(pt)) => {
plaintexts.push(pt);
false
}
Ok(None) => false,
Ok(ReceivedMessage::StateChanged) | Ok(ReceivedMessage::SelfRemoved) => false,
Err(_) => true,
}
});
@@ -1250,12 +1252,12 @@ pub async fn cmd_chat(
Err(_) => continue,
};
match member.receive_message(&mls_payload) {
Ok(Some(pt)) => {
Ok(ReceivedMessage::Application(pt)) => {
let s = String::from_utf8_lossy(&pt);
println!("\r\n[peer] {s}\n> ");
std::io::stdout().flush().context("flush stdout")?;
}
Ok(None) => {}
Ok(ReceivedMessage::StateChanged) | Ok(ReceivedMessage::SelfRemoved) => {}
Err(_) => retry_payloads.push(mls_payload),
}
}
@@ -1264,13 +1266,13 @@ pub async fn cmd_chat(
let before = retry_payloads.len();
retry_payloads.retain(|mls_payload| {
match member.receive_message(mls_payload) {
Ok(Some(pt)) => {
Ok(ReceivedMessage::Application(pt)) => {
let s = String::from_utf8_lossy(&pt);
println!("\r\n[peer] {s}\n> ");
let _ = std::io::stdout().flush();
false
}
Ok(None) => false,
Ok(ReceivedMessage::StateChanged) | Ok(ReceivedMessage::SelfRemoved) => false,
Err(_) => true,
}
});

View File

@@ -71,6 +71,10 @@ pub struct Conversation {
pub unread_count: u32,
pub last_activity_ms: u64,
pub created_at_ms: u64,
/// Whether this conversation uses hybrid (X25519 + ML-KEM-768) MLS keys.
pub is_hybrid: bool,
/// Highest server-side delivery sequence number seen.
pub last_seen_seq: u64,
}
#[derive(Clone, Debug)]
@@ -251,9 +255,26 @@ impl ConversationStore {
);
CREATE INDEX IF NOT EXISTS idx_messages_conv
ON messages(conversation_id, timestamp_ms);",
ON messages(conversation_id, timestamp_ms);
CREATE TABLE IF NOT EXISTS outbox (
id INTEGER PRIMARY KEY AUTOINCREMENT,
conversation_id BLOB NOT NULL,
recipient_key BLOB NOT NULL,
payload BLOB NOT NULL,
created_at_ms INTEGER NOT NULL,
retry_count INTEGER NOT NULL DEFAULT 0,
status TEXT NOT NULL DEFAULT 'pending'
);
CREATE INDEX IF NOT EXISTS idx_outbox_status
ON outbox(status, created_at_ms);",
)
.context("migrate conversation db")?;
// Additive migrations for new columns (safe to re-run; errors ignored if column already exists).
conn.execute_batch("ALTER TABLE conversations ADD COLUMN is_hybrid INTEGER NOT NULL DEFAULT 0;").ok();
conn.execute_batch("ALTER TABLE conversations ADD COLUMN last_seen_seq INTEGER NOT NULL DEFAULT 0;").ok();
Ok(())
}
@@ -274,15 +295,17 @@ impl ConversationStore {
"INSERT INTO conversations
(id, kind, display_name, peer_key, peer_username, group_name,
mls_group_blob, keystore_blob, member_keys, unread_count,
last_activity_ms, created_at_ms)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)
last_activity_ms, created_at_ms, is_hybrid, last_seen_seq)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)
ON CONFLICT(id) DO UPDATE SET
display_name = excluded.display_name,
mls_group_blob = excluded.mls_group_blob,
keystore_blob = excluded.keystore_blob,
member_keys = excluded.member_keys,
unread_count = excluded.unread_count,
last_activity_ms = excluded.last_activity_ms",
last_activity_ms = excluded.last_activity_ms,
is_hybrid = excluded.is_hybrid,
last_seen_seq = excluded.last_seen_seq",
params![
conv.id.0.as_slice(),
kind_str,
@@ -296,6 +319,8 @@ impl ConversationStore {
conv.unread_count,
conv.last_activity_ms,
conv.created_at_ms,
conv.is_hybrid as i32,
conv.last_seen_seq as i64,
],
)?;
Ok(())
@@ -306,7 +331,7 @@ impl ConversationStore {
.query_row(
"SELECT kind, display_name, peer_key, peer_username, group_name,
mls_group_blob, keystore_blob, member_keys, unread_count,
last_activity_ms, created_at_ms
last_activity_ms, created_at_ms, is_hybrid, last_seen_seq
FROM conversations WHERE id = ?1",
params![id.0.as_slice()],
|row| {
@@ -321,6 +346,8 @@ impl ConversationStore {
let unread_count: u32 = row.get(8)?;
let last_activity_ms: u64 = row.get(9)?;
let created_at_ms: u64 = row.get(10)?;
let is_hybrid_int: i32 = row.get(11)?;
let last_seen_seq: i64 = row.get(12)?;
let kind = if kind_str == "dm" {
ConversationKind::Dm {
@@ -347,6 +374,8 @@ impl ConversationStore {
unread_count,
last_activity_ms,
created_at_ms,
is_hybrid: is_hybrid_int != 0,
last_seen_seq: last_seen_seq as u64,
})
},
)
@@ -358,7 +387,7 @@ impl ConversationStore {
let mut stmt = self.conn.prepare(
"SELECT id, kind, display_name, peer_key, peer_username, group_name,
mls_group_blob, keystore_blob, member_keys, unread_count,
last_activity_ms, created_at_ms
last_activity_ms, created_at_ms, is_hybrid, last_seen_seq
FROM conversations ORDER BY last_activity_ms DESC",
)?;
let rows = stmt.query_map([], |row| {
@@ -374,6 +403,8 @@ impl ConversationStore {
let unread_count: u32 = row.get(9)?;
let last_activity_ms: u64 = row.get(10)?;
let created_at_ms: u64 = row.get(11)?;
let is_hybrid_int: i32 = row.get(12)?;
let last_seen_seq: i64 = row.get(13)?;
let id = ConversationId::from_slice(&id_blob).unwrap_or(ConversationId([0; 16]));
let kind = if kind_str == "dm" {
@@ -400,6 +431,8 @@ impl ConversationStore {
unread_count,
last_activity_ms,
created_at_ms,
is_hybrid: is_hybrid_int != 0,
last_seen_seq: last_seen_seq as u64,
})
})?;
@@ -553,6 +586,103 @@ impl ConversationStore {
msgs.reverse();
Ok(msgs)
}
/// Save a message, deduplicating by message_id within the same conversation.
/// Returns `true` if the message was saved (new), `false` if it was a duplicate.
pub fn save_message_dedup(&self, msg: &StoredMessage) -> anyhow::Result<bool> {
if let Some(ref mid) = msg.message_id {
let exists: bool = self.conn.query_row(
"SELECT EXISTS(SELECT 1 FROM messages WHERE message_id = ?1 AND conversation_id = ?2)",
params![mid.as_slice(), msg.conversation_id.0.as_slice()],
|row| row.get(0),
)?;
if exists {
return Ok(false);
}
}
self.save_message(msg)?;
Ok(true)
}
// ── Sequence tracking ──────────────────────────────────────────────
pub fn update_last_seen_seq(&self, id: &ConversationId, seq: u64) -> anyhow::Result<()> {
self.conn.execute(
"UPDATE conversations SET last_seen_seq = ?2 WHERE id = ?1 AND last_seen_seq < ?2",
params![id.0.as_slice(), seq as i64],
)?;
Ok(())
}
// ── Outbox (offline queue) ────────────────────────────────────────
pub fn enqueue_outbox(
&self,
conv_id: &ConversationId,
recipient_key: &[u8],
payload: &[u8],
) -> anyhow::Result<()> {
self.conn.execute(
"INSERT INTO outbox (conversation_id, recipient_key, payload, created_at_ms)
VALUES (?1, ?2, ?3, ?4)",
params![conv_id.0.as_slice(), recipient_key, payload, now_ms() as i64],
)?;
Ok(())
}
pub fn load_pending_outbox(&self) -> anyhow::Result<Vec<OutboxEntry>> {
let mut stmt = self.conn.prepare(
"SELECT id, conversation_id, recipient_key, payload, retry_count
FROM outbox WHERE status = 'pending' ORDER BY created_at_ms",
)?;
let rows = stmt.query_map([], |row| {
let id: i64 = row.get(0)?;
let conv_blob: Vec<u8> = row.get(1)?;
let recipient_key: Vec<u8> = row.get(2)?;
let payload: Vec<u8> = row.get(3)?;
let retry_count: u32 = row.get(4)?;
Ok(OutboxEntry {
id,
conversation_id: ConversationId::from_slice(&conv_blob)
.unwrap_or(ConversationId([0; 16])),
recipient_key,
payload,
retry_count,
})
})?;
let mut entries = Vec::new();
for row in rows {
entries.push(row?);
}
Ok(entries)
}
pub fn mark_outbox_sent(&self, id: i64) -> anyhow::Result<()> {
self.conn.execute(
"UPDATE outbox SET status = 'sent' WHERE id = ?1",
params![id],
)?;
Ok(())
}
pub fn mark_outbox_failed(&self, id: i64, retry_count: u32) -> anyhow::Result<()> {
let new_status = if retry_count > 5 { "failed" } else { "pending" };
self.conn.execute(
"UPDATE outbox SET retry_count = ?2, status = ?3 WHERE id = ?1",
params![id, retry_count, new_status],
)?;
Ok(())
}
}
/// An entry in the offline outbox queue.
#[derive(Clone, Debug)]
pub struct OutboxEntry {
pub id: i64,
pub conversation_id: ConversationId,
pub recipient_key: Vec<u8>,
pub payload: Vec<u8>,
pub retry_count: u32,
}
pub fn now_ms() -> u64 {

View File

@@ -0,0 +1,148 @@
//! mDNS-based peer discovery for Freifunk / community mesh deployments.
//!
//! Browse for `_quicproquo._udp.local.` services on the local network and
//! surface them as [`DiscoveredPeer`] structs. Servers announce themselves
//! automatically on startup; this module lets clients find them without manual
//! configuration.
//!
//! # Usage
//!
//! ```no_run
//! use quicproquo_client::client::mesh_discovery::MeshDiscovery;
//!
//! let disc = MeshDiscovery::start()?;
//! // Give mDNS time to collect announcements before reading.
//! std::thread::sleep(std::time::Duration::from_secs(2));
//! for peer in disc.peers() {
//! println!("found: {} at {}", peer.domain, peer.server_addr);
//! }
//! # Ok::<(), quicproquo_client::client::mesh_discovery::MeshDiscoveryError>(())
//! ```
#[cfg(feature = "mesh")]
use mdns_sd::{ServiceDaemon, ServiceEvent};
use std::net::SocketAddr;
#[cfg(feature = "mesh")]
use std::sync::{Arc, Mutex};
#[cfg(feature = "mesh")]
use std::collections::HashMap;
/// A qpq server discovered on the local network via mDNS.
#[derive(Debug, Clone)]
pub struct DiscoveredPeer {
/// Federation domain of the remote server (e.g. `"node1.freifunk.net"`).
pub domain: String,
/// QUIC RPC address to connect to.
pub server_addr: SocketAddr,
}
/// A running mDNS browse session.
///
/// Starts immediately on construction; drop to stop browsing.
pub struct MeshDiscovery {
#[cfg(feature = "mesh")]
_daemon: ServiceDaemon,
#[cfg(feature = "mesh")]
peers: Arc<Mutex<HashMap<String, DiscoveredPeer>>>,
}
#[derive(thiserror::Error, Debug)]
pub enum MeshDiscoveryError {
#[error("mDNS daemon failed to start: {0}")]
DaemonError(String),
#[error("mDNS browse failed: {0}")]
BrowseError(String),
#[error("mesh feature not compiled (rebuild with --features mesh)")]
FeatureDisabled,
}
impl MeshDiscovery {
/// Start browsing for `_quicproquo._udp.local.` services.
///
/// Returns immediately; peers are collected in the background.
/// Returns [`MeshDiscoveryError::FeatureDisabled`] when built without the
/// `mesh` feature.
pub fn start() -> Result<Self, MeshDiscoveryError> {
#[cfg(feature = "mesh")]
{
Self::start_inner()
}
#[cfg(not(feature = "mesh"))]
{
Err(MeshDiscoveryError::FeatureDisabled)
}
}
#[cfg(feature = "mesh")]
fn start_inner() -> Result<Self, MeshDiscoveryError> {
let daemon = ServiceDaemon::new()
.map_err(|e| MeshDiscoveryError::DaemonError(e.to_string()))?;
let receiver = daemon
.browse("_quicproquo._udp.local.")
.map_err(|e| MeshDiscoveryError::BrowseError(e.to_string()))?;
let peers: Arc<Mutex<HashMap<String, DiscoveredPeer>>> =
Arc::new(Mutex::new(HashMap::new()));
let peers_bg = Arc::clone(&peers);
// Process mDNS events in a background thread (ServiceDaemon is sync).
std::thread::spawn(move || {
for event in receiver {
match event {
ServiceEvent::ServiceResolved(info) => {
// Extract the qpq server address from TXT records.
let server_addr_str = info
.get_property_val_str("server")
.map(|s| s.to_string());
let domain = info
.get_property_val_str("domain")
.map(|s| s.to_string())
.unwrap_or_else(|| info.get_fullname().to_string());
if let Some(addr_str) = server_addr_str {
if let Ok(addr) = addr_str.parse::<SocketAddr>() {
let peer = DiscoveredPeer {
domain: domain.clone(),
server_addr: addr,
};
if let Ok(mut map) = peers_bg.lock() {
map.insert(domain, peer);
}
}
}
}
ServiceEvent::ServiceRemoved(_ty, fullname) => {
if let Ok(mut map) = peers_bg.lock() {
map.retain(|_, p| {
!fullname.contains(&p.domain)
});
}
}
// Other events (SearchStarted, SearchStopped) are informational.
_ => {}
}
}
});
Ok(Self {
_daemon: daemon,
peers,
})
}
/// Return a snapshot of all peers discovered so far.
pub fn peers(&self) -> Vec<DiscoveredPeer> {
#[cfg(feature = "mesh")]
{
self.peers
.lock()
.map(|m| m.values().cloned().collect())
.unwrap_or_default()
}
#[cfg(not(feature = "mesh"))]
{
vec![]
}
}
}

View File

@@ -2,6 +2,7 @@ pub mod commands;
pub mod conversation;
pub mod display;
pub mod hex;
pub mod mesh_discovery;
pub mod repl;
pub mod retry;
pub mod rpc;

View File

@@ -54,6 +54,9 @@ enum SlashCommand {
Join,
Members,
History { count: usize },
/// Mesh subcommands: /mesh peers, /mesh server <addr>
MeshPeers,
MeshServer { addr: String },
}
fn parse_input(line: &str) -> Input {
@@ -116,6 +119,22 @@ fn parse_input(line: &str) -> Input {
let count = arg.and_then(|s| s.parse().ok()).unwrap_or(20);
Input::Slash(SlashCommand::History { count })
}
"/mesh" => match arg.as_deref() {
Some("peers") => Input::Slash(SlashCommand::MeshPeers),
Some(rest) if rest.starts_with("server ") => {
let addr = rest.trim_start_matches("server ").trim().to_string();
if addr.is_empty() {
display::print_error("usage: /mesh server <host:port>");
Input::Empty
} else {
Input::Slash(SlashCommand::MeshServer { addr })
}
}
_ => {
display::print_error("usage: /mesh peers | /mesh server <host:port>");
Input::Empty
}
},
_ => {
display::print_error(&format!("unknown command: {cmd}. Try /help"));
Input::Empty
@@ -575,6 +594,13 @@ async fn handle_slash(
SlashCommand::Join => cmd_join(session, client).await,
SlashCommand::Members => cmd_members(session),
SlashCommand::History { count } => cmd_history(session, count),
SlashCommand::MeshPeers => cmd_mesh_peers(),
SlashCommand::MeshServer { addr } => {
display::print_status(&format!(
"mesh server hint: reconnect with --server {addr} to use this node"
));
Ok(())
}
};
if let Err(e) = result {
display::print_error(&format!("{e:#}"));
@@ -594,9 +620,40 @@ fn print_help() {
display::print_status(" /members - Show members of current conversation");
display::print_status(" /history [N] - Show last N messages (default: 20)");
display::print_status(" /whoami - Show your identity");
display::print_status(" /mesh peers - Discover nearby qpq nodes via mDNS");
display::print_status(" /mesh server <host:port> - Show how to reconnect to a mesh node");
display::print_status(" /quit - Exit");
}
/// Discover nearby qpq servers via mDNS (requires `--features mesh` build).
fn cmd_mesh_peers() -> anyhow::Result<()> {
use super::mesh_discovery::MeshDiscovery;
match MeshDiscovery::start() {
Err(e) => {
display::print_error(&format!("mesh discovery: {e}"));
return Ok(());
}
Ok(disc) => {
display::print_status("scanning for nearby qpq nodes (2s)...");
// Block briefly to collect mDNS announcements from the local network.
std::thread::sleep(std::time::Duration::from_secs(2));
let peers = disc.peers();
if peers.is_empty() {
display::print_status("no qpq nodes found on the local network");
} else {
display::print_status(&format!("found {} node(s):", peers.len()));
for p in &peers {
display::print_status(&format!(" {} at {}", p.domain, p.server_addr));
}
display::print_status("use: /mesh server <host:port> to note the address,");
display::print_status("then reconnect with: qpq --server <host:port>");
}
}
}
Ok(())
}
fn cmd_whoami(session: &SessionState) -> anyhow::Result<()> {
display::print_status(&format!(
"identity: {}",
@@ -725,9 +782,23 @@ async fn cmd_dm(
return Ok(());
}
// Create server-side channel.
// Create or look up the server-side channel.
// was_new=true → this call created the channel; we are the MLS initiator.
// was_new=false → channel already existed; peer is the MLS initiator and has
// sent (or will send) us a Welcome. Wait for try_auto_join.
display::print_status("creating channel...");
let channel_id = create_channel(client, &peer_key).await?;
let (channel_id, was_new) = create_channel(client, &peer_key).await?;
if !was_new {
// Peer is the MLS initiator. Their Welcome is en route; the background
// poller's try_auto_join will process it within the next poll interval
// and auto-switch to the conversation automatically.
display::print_status(&format!(
"DM channel with @{username} exists — peer is initiator, auto-joining via Welcome (arrives within ~1 s)"
));
return Ok(());
}
let conv_id = ConversationId::from_slice(&channel_id)
.context("server returned invalid channel_id length")?;

View File

@@ -645,12 +645,16 @@ pub async fn resolve_identity(
}
}
/// Create a 1:1 DM channel with a peer. Returns the 16-byte channel ID.
/// If a channel already exists between the two users, returns the existing ID.
/// Create a 1:1 DM channel with a peer.
///
/// Returns `(channel_id, was_new)` where `channel_id` is the stable 16-byte identifier and
/// `was_new` is `true` iff this call created the channel for the first time. When `was_new` is
/// `false`, the channel already existed (created by the peer), and the caller should wait for
/// the peer's MLS Welcome to arrive via the background poller rather than creating a new MLS group.
pub async fn create_channel(
client: &node_service::Client,
peer_key: &[u8],
) -> anyhow::Result<Vec<u8>> {
) -> anyhow::Result<(Vec<u8>, bool)> {
let mut req = client.create_channel_request();
{
let mut p = req.get();
@@ -665,14 +669,14 @@ pub async fn create_channel(
.await
.context("create_channel RPC failed")?;
let channel_id = resp
.get()
.context("create_channel: bad response")?
let reader = resp.get().context("create_channel: bad response")?;
let channel_id = reader
.get_channel_id()
.context("create_channel: missing channel_id")?
.to_vec();
let was_new = reader.get_was_new();
Ok(channel_id)
Ok((channel_id, was_new))
}
/// Return the current Unix timestamp in milliseconds.

View File

@@ -133,6 +133,8 @@ impl SessionState {
unread_count: 0,
last_activity_ms: now_ms(),
created_at_ms: now_ms(),
is_hybrid: false,
last_seen_seq: 0,
};
self.conv_store.save_conversation(&conv)?;
@@ -171,7 +173,7 @@ impl SessionState {
Arc::clone(&self.identity),
ks,
group,
false, // existing conversations default to classical
conv.is_hybrid,
))
}

View File

@@ -22,11 +22,11 @@ pub use client::commands::{
cmd_chat, cmd_check_key, cmd_create_group, cmd_demo_group, cmd_fetch_key, cmd_health,
cmd_health_json, cmd_invite, cmd_join, cmd_login, cmd_ping, cmd_recv, cmd_register,
cmd_register_state, cmd_refresh_keypackage, cmd_register_user, cmd_send, cmd_whoami,
receive_pending_plaintexts, whoami_json,
opaque_login, receive_pending_plaintexts, whoami_json,
};
pub use client::repl::run_repl;
pub use client::rpc::{connect_node, enqueue, fetch_wait};
pub use client::rpc::{connect_node, create_channel, enqueue, fetch_wait, resolve_user};
// Global auth context — RwLock so the REPL can set it after OPAQUE login.
pub(crate) static AUTH_CONTEXT: RwLock<Option<ClientAuth>> = RwLock::new(None);

View File

@@ -1,7 +1,7 @@
// cargo_bin! only works for current package's binary; we spawn qpq-server from another package.
#![allow(deprecated)]
use std::{path::PathBuf, process::Command, time::Duration};
use std::{path::PathBuf, process::Command, sync::Mutex, time::Duration};
use assert_cmd::cargo::cargo_bin;
use portpicker::pick_unused_port;
@@ -17,11 +17,15 @@ fn ensure_rustls_provider() {
use quicproquo_client::{
cmd_create_group, cmd_invite, cmd_join, cmd_login, cmd_ping, cmd_register_state,
cmd_register_user, cmd_send, connect_node, enqueue, fetch_wait, init_auth,
receive_pending_plaintexts, ClientAuth,
cmd_register_user, cmd_send, connect_node, create_channel, enqueue, fetch_wait, init_auth,
opaque_login, receive_pending_plaintexts, resolve_user, ClientAuth,
};
use quicproquo_core::IdentityKeypair;
/// Serialises all tests that call `init_auth` with a non-devtoken session to prevent
/// the global `AUTH_CONTEXT` from being overwritten by concurrent tests.
static AUTH_LOCK: Mutex<()> = Mutex::new(());
fn hex_encode(bytes: &[u8]) -> String {
bytes.iter().map(|b| format!("{b:02x}")).collect()
}
@@ -33,6 +37,13 @@ struct StoredStateCompat {
group: Option<Vec<u8>>,
}
struct ChildGuard(std::process::Child);
impl Drop for ChildGuard {
fn drop(&mut self) {
let _ = self.0.kill();
}
}
async fn wait_for_health(server: &str, ca_cert: &PathBuf, server_name: &str) -> anyhow::Result<()> {
let local = tokio::task::LocalSet::new();
for _ in 0..30 {
@@ -48,26 +59,17 @@ async fn wait_for_health(server: &str, ca_cert: &PathBuf, server_name: &str) ->
anyhow::bail!("server health never became ready")
}
/// Creator and joiner register; creator creates group and invites joiner; joiner joins;
/// creator sends a message; assert joiner's mailbox receives it.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn e2e_happy_path_register_invite_join_send_recv() -> anyhow::Result<()> {
ensure_rustls_provider();
let temp = TempDir::new()?;
let base = temp.path();
/// Spawns a server with the given extra args and returns (listen_addr, ca_cert_path, ChildGuard).
fn spawn_server(base: &std::path::Path, extra_args: &[&str]) -> (String, PathBuf, ChildGuard) {
let port = pick_unused_port().expect("free port");
let listen = format!("127.0.0.1:{port}");
let server = listen.clone();
let ca_cert = base.join("server-cert.der");
let tls_key = base.join("server-key.der");
let data_dir = base.join("data");
let auth_token = "devtoken";
// Spawn server binary.
let server_bin = cargo_bin("qpq-server");
let child = Command::new(server_bin)
.arg("--listen")
let mut cmd = Command::new(server_bin);
cmd.arg("--listen")
.arg(&listen)
.arg("--data-dir")
.arg(&data_dir)
@@ -76,25 +78,30 @@ async fn e2e_happy_path_register_invite_join_send_recv() -> anyhow::Result<()> {
.arg("--tls-key")
.arg(&tls_key)
.arg("--auth-token")
.arg(auth_token)
.arg("--allow-insecure-auth")
.spawn()
.expect("spawn server");
// Ensure we always terminate the child.
struct ChildGuard(std::process::Child);
impl Drop for ChildGuard {
fn drop(&mut self) {
let _ = self.0.kill();
.arg("devtoken")
.arg("--allow-insecure-auth");
for arg in extra_args {
cmd.arg(arg);
}
}
let child_guard = ChildGuard(child);
let _ = child_guard;
let child = cmd.spawn().expect("spawn server");
(listen, ca_cert, ChildGuard(child))
}
// ─── existing tests (fixed: add --sealed-sender so enqueue works with bearer token) ─────────────
/// Creator and joiner register; creator creates group and invites joiner; joiner joins;
/// creator sends a message; assert joiner's mailbox receives it.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn e2e_happy_path_register_invite_join_send_recv() -> anyhow::Result<()> {
ensure_rustls_provider();
let temp = TempDir::new()?;
let base = temp.path();
let auth_token = "devtoken";
let (server, ca_cert, _child) = spawn_server(base, &["--sealed-sender"]);
// Wait for server to be healthy and certs to be generated.
wait_for_health(&server, &ca_cert, "localhost").await?;
// Set client auth context.
init_auth(ClientAuth::from_parts(auth_token.to_string(), None));
let local = tokio::task::LocalSet::new();
@@ -179,37 +186,9 @@ async fn e2e_three_party_group_invite_join_send_recv() -> anyhow::Result<()> {
let temp = TempDir::new()?;
let base = temp.path();
let port = pick_unused_port().expect("free port");
let listen = format!("127.0.0.1:{port}");
let server = listen.clone();
let ca_cert = base.join("server-cert.der");
let tls_key = base.join("server-key.der");
let data_dir = base.join("data");
let auth_token = "devtoken";
let server_bin = cargo_bin("qpq-server");
let child = Command::new(server_bin)
.arg("--listen")
.arg(&listen)
.arg("--data-dir")
.arg(&data_dir)
.arg("--tls-cert")
.arg(&ca_cert)
.arg("--tls-key")
.arg(&tls_key)
.arg("--auth-token")
.arg(auth_token)
.arg("--allow-insecure-auth")
.spawn()
.expect("spawn server");
struct ChildGuard(std::process::Child);
impl Drop for ChildGuard {
fn drop(&mut self) {
let _ = self.0.kill();
}
}
let _child_guard = ChildGuard(child);
let (server, ca_cert, _child) = spawn_server(base, &["--sealed-sender"]);
wait_for_health(&server, &ca_cert, "localhost").await?;
init_auth(ClientAuth::from_parts(auth_token.to_string(), None));
@@ -388,46 +367,16 @@ async fn e2e_three_party_group_invite_join_send_recv() -> anyhow::Result<()> {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn e2e_login_rejects_mismatched_identity() -> anyhow::Result<()> {
ensure_rustls_provider();
let _auth = AUTH_LOCK.lock().unwrap();
let temp = TempDir::new()?;
let base = temp.path();
let port = pick_unused_port().expect("free port");
let listen = format!("127.0.0.1:{port}");
let server = listen.clone();
let ca_cert = base.join("server-cert.der");
let tls_key = base.join("server-key.der");
let data_dir = base.join("data");
let auth_token = "devtoken";
// Spawn server binary.
let server_bin = cargo_bin("qpq-server");
let child = Command::new(server_bin)
.arg("--listen")
.arg(&listen)
.arg("--data-dir")
.arg(&data_dir)
.arg("--tls-cert")
.arg(&ca_cert)
.arg("--tls-key")
.arg(&tls_key)
.arg("--auth-token")
.arg(auth_token)
.arg("--allow-insecure-auth")
.spawn()
.expect("spawn server");
struct ChildGuard(std::process::Child);
impl Drop for ChildGuard {
fn drop(&mut self) {
let _ = self.0.kill();
}
}
let child_guard = ChildGuard(child);
let _ = child_guard;
let (server, ca_cert, _child) = spawn_server(base, &[]);
wait_for_health(&server, &ca_cert, "localhost").await?;
init_auth(ClientAuth::from_parts(auth_token.to_string(), None));
init_auth(ClientAuth::from_parts("devtoken".to_string(), None));
let local = tokio::task::LocalSet::new();
let state_path = base.join("user.bin");
@@ -482,7 +431,6 @@ async fn e2e_login_rejects_mismatched_identity() -> anyhow::Result<()> {
match result {
Ok(_) => anyhow::bail!("login unexpectedly succeeded with mismatched identity"),
Err(e) => {
// Show the full error chain so we can match the server's E016 response.
let msg = format!("{e:#}");
anyhow::ensure!(
msg.contains("identity") || msg.contains("E016"),
@@ -501,41 +449,11 @@ async fn e2e_sealed_sender_enqueue_then_fetch() -> anyhow::Result<()> {
let temp = TempDir::new()?;
let base = temp.path();
let port = pick_unused_port().expect("free port");
let listen = format!("127.0.0.1:{port}");
let server = listen.clone();
let ca_cert = base.join("server-cert.der");
let tls_key = base.join("server-key.der");
let data_dir = base.join("data");
let auth_token = "devtoken";
let server_bin = cargo_bin("qpq-server");
let child = Command::new(server_bin)
.arg("--listen")
.arg(&listen)
.arg("--data-dir")
.arg(&data_dir)
.arg("--tls-cert")
.arg(&ca_cert)
.arg("--tls-key")
.arg(&tls_key)
.arg("--auth-token")
.arg(auth_token)
.arg("--allow-insecure-auth")
.arg("--sealed-sender")
.spawn()
.expect("spawn server");
struct ChildGuard(std::process::Child);
impl Drop for ChildGuard {
fn drop(&mut self) {
let _ = self.0.kill();
}
}
let _child_guard = ChildGuard(child);
let (server, ca_cert, _child) = spawn_server(base, &["--sealed-sender"]);
wait_for_health(&server, &ca_cert, "localhost").await?;
init_auth(ClientAuth::from_parts(auth_token.to_string(), None));
init_auth(ClientAuth::from_parts("devtoken".to_string(), None));
let local = tokio::task::LocalSet::new();
let state_path = base.join("recipient.bin");
@@ -595,3 +513,425 @@ async fn e2e_sealed_sender_enqueue_then_fetch() -> anyhow::Result<()> {
Ok(())
}
// ─── new tests: was_new semantics, resolve_user, DM MLS flow ─────────────────────────────────
/// `create_channel` returns `was_new=true` for the first caller and `was_new=false` for the
/// second, and both callers receive the same stable `channel_id` regardless of argument order.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn e2e_create_channel_was_new_semantics() -> anyhow::Result<()> {
ensure_rustls_provider();
// Holds AUTH_CONTEXT for the duration of this test.
let _auth = AUTH_LOCK.lock().unwrap();
let temp = TempDir::new()?;
let base = temp.path();
// No --sealed-sender: create_channel requires identity-bound session.
let (server, ca_cert, _child) = spawn_server(base, &[]);
wait_for_health(&server, &ca_cert, "localhost").await?;
// Register identity states (uses devtoken / allow-insecure for upload_key_package).
init_auth(ClientAuth::from_parts("devtoken".to_string(), None));
let local = tokio::task::LocalSet::new();
let alice_state = base.join("alice.bin");
let bob_state = base.join("bob.bin");
local
.run_until(cmd_register_state(&alice_state, &server, &ca_cert, "localhost", None))
.await?;
local
.run_until(cmd_register_state(&bob_state, &server, &ca_cert, "localhost", None))
.await?;
let alice_seed: [u8; 32] = bincode::deserialize::<StoredStateCompat>(&std::fs::read(&alice_state)?)?.identity_seed;
let bob_seed: [u8; 32] = bincode::deserialize::<StoredStateCompat>(&std::fs::read(&bob_state)?)?.identity_seed;
let alice_pk = IdentityKeypair::from_seed(alice_seed).public_key_bytes().to_vec();
let bob_pk = IdentityKeypair::from_seed(bob_seed).public_key_bytes().to_vec();
let alice_pk_hex = hex_encode(&alice_pk);
let bob_pk_hex = hex_encode(&bob_pk);
// OPAQUE register (unauthenticated — no AUTH_CONTEXT needed).
local
.run_until(cmd_register_user(&server, &ca_cert, "localhost", "alice", "pass", Some(&alice_pk_hex)))
.await?;
local
.run_until(cmd_register_user(&server, &ca_cert, "localhost", "bob", "pass", Some(&bob_pk_hex)))
.await?;
// Alice OPAQUE login → identity-bound session.
let client = local.run_until(connect_node(&server, &ca_cert, "localhost")).await?;
let session_alice = local
.run_until(opaque_login(&client, "alice", "pass", &alice_pk))
.await?;
init_auth(ClientAuth::from_raw(session_alice, None));
let (ch_alice, was_new_alice) = local
.run_until(create_channel(&client, &bob_pk))
.await?;
anyhow::ensure!(was_new_alice, "Alice's create_channel must return was_new=true");
anyhow::ensure!(ch_alice.len() == 16, "channel_id must be 16 bytes");
// Bob OPAQUE login → identity-bound session.
let session_bob = local
.run_until(opaque_login(&client, "bob", "pass", &bob_pk))
.await?;
init_auth(ClientAuth::from_raw(session_bob, None));
let (ch_bob, was_new_bob) = local
.run_until(create_channel(&client, &alice_pk))
.await?;
anyhow::ensure!(!was_new_bob, "Bob's create_channel must return was_new=false (channel already exists)");
anyhow::ensure!(
ch_alice == ch_bob,
"Both callers must receive the same channel_id (got alice={} bob={})",
hex_encode(&ch_alice),
hex_encode(&ch_bob)
);
Ok(())
}
/// `resolve_user` returns the identity key when the user registered WITH one,
/// and returns `None` when the user registered WITHOUT an identity key.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn e2e_resolve_user_requires_identity_key_binding() -> anyhow::Result<()> {
ensure_rustls_provider();
let _auth = AUTH_LOCK.lock().unwrap();
let temp = TempDir::new()?;
let base = temp.path();
let (server, ca_cert, _child) = spawn_server(base, &[]);
wait_for_health(&server, &ca_cert, "localhost").await?;
init_auth(ClientAuth::from_parts("devtoken".to_string(), None));
let local = tokio::task::LocalSet::new();
// Generate Alice's identity (bound) and Bob's identity (unbound).
let alice_state = base.join("alice.bin");
local
.run_until(cmd_register_state(&alice_state, &server, &ca_cert, "localhost", None))
.await?;
let alice_seed = bincode::deserialize::<StoredStateCompat>(&std::fs::read(&alice_state)?)?.identity_seed;
let alice_pk = IdentityKeypair::from_seed(alice_seed).public_key_bytes().to_vec();
let alice_pk_hex = hex_encode(&alice_pk);
// Alice registers WITH identity key.
local
.run_until(cmd_register_user(&server, &ca_cert, "localhost", "alice", "pass", Some(&alice_pk_hex)))
.await?;
// Bob registers WITHOUT identity key.
local
.run_until(cmd_register_user(&server, &ca_cert, "localhost", "bob", "pass", None))
.await?;
// resolve_user needs a valid auth context (devtoken is sufficient — just needs bearer token).
let client = local.run_until(connect_node(&server, &ca_cert, "localhost")).await?;
let alice_resolved = local
.run_until(resolve_user(&client, "alice"))
.await?;
anyhow::ensure!(
alice_resolved == Some(alice_pk.clone()),
"resolve_user('alice') must return alice's identity key, got {:?}",
alice_resolved.as_ref().map(|k| hex_encode(k))
);
let bob_resolved = local
.run_until(resolve_user(&client, "bob"))
.await?;
anyhow::ensure!(
bob_resolved.is_none(),
"resolve_user('bob') must return None (no identity key bound), got {:?}",
bob_resolved.as_ref().map(|k| hex_encode(k))
);
let ghost_resolved = local
.run_until(resolve_user(&client, "nonexistent"))
.await?;
anyhow::ensure!(
ghost_resolved.is_none(),
"resolve_user('nonexistent') must return None"
);
Ok(())
}
/// Both Alice and Bob call `/dm` on each other (simultaneous DM initiation).
/// Only the first caller (was_new=true) creates the MLS group and sends a Welcome.
/// The second caller (was_new=false) joins via the Welcome.
/// After joining, Alice sends a message and Bob decrypts it with no epoch error.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn e2e_bidirectional_dm_mls_no_epoch_conflict() -> anyhow::Result<()> {
ensure_rustls_provider();
let _auth = AUTH_LOCK.lock().unwrap();
let temp = TempDir::new()?;
let base = temp.path();
// No --sealed-sender: tests the production path where enqueue requires identity session.
let (server, ca_cert, _child) = spawn_server(base, &[]);
wait_for_health(&server, &ca_cert, "localhost").await?;
// Register state files (uploads KeyPackages + hybrid keys) using devtoken.
init_auth(ClientAuth::from_parts("devtoken".to_string(), None));
let local = tokio::task::LocalSet::new();
let alice_state = base.join("alice.bin");
let bob_state = base.join("bob.bin");
local
.run_until(cmd_register_state(&alice_state, &server, &ca_cert, "localhost", None))
.await?;
local
.run_until(cmd_register_state(&bob_state, &server, &ca_cert, "localhost", None))
.await?;
let alice_seed = bincode::deserialize::<StoredStateCompat>(&std::fs::read(&alice_state)?)?.identity_seed;
let bob_seed = bincode::deserialize::<StoredStateCompat>(&std::fs::read(&bob_state)?)?.identity_seed;
let alice_pk = IdentityKeypair::from_seed(alice_seed).public_key_bytes().to_vec();
let bob_pk = IdentityKeypair::from_seed(bob_seed).public_key_bytes().to_vec();
let alice_pk_hex = hex_encode(&alice_pk);
let bob_pk_hex = hex_encode(&bob_pk);
// OPAQUE register both users.
local
.run_until(cmd_register_user(&server, &ca_cert, "localhost", "alice", "pass", Some(&alice_pk_hex)))
.await?;
local
.run_until(cmd_register_user(&server, &ca_cert, "localhost", "bob", "pass", Some(&bob_pk_hex)))
.await?;
// Alice logs in and calls create_channel → must get was_new=true.
let client = local.run_until(connect_node(&server, &ca_cert, "localhost")).await?;
let session_alice = local
.run_until(opaque_login(&client, "alice", "pass", &alice_pk))
.await?;
init_auth(ClientAuth::from_raw(session_alice.clone(), None));
let (channel_id, was_new_alice) = local
.run_until(create_channel(&client, &bob_pk))
.await?;
anyhow::ensure!(was_new_alice, "Alice must get was_new=true");
// Alice creates MLS group (channel_id as group name) and invites Bob.
local
.run_until(cmd_create_group(&alice_state, &server, &hex_encode(&channel_id), None))
.await?;
local
.run_until(cmd_invite(&alice_state, &server, &ca_cert, "localhost", &bob_pk_hex, None))
.await?;
// Bob logs in and calls create_channel → must get was_new=false with same channel_id.
let session_bob = local
.run_until(opaque_login(&client, "bob", "pass", &bob_pk))
.await?;
init_auth(ClientAuth::from_raw(session_bob.clone(), None));
let (channel_id_bob, was_new_bob) = local
.run_until(create_channel(&client, &alice_pk))
.await?;
anyhow::ensure!(!was_new_bob, "Bob must get was_new=false (Alice created first)");
anyhow::ensure!(
channel_id == channel_id_bob,
"Both sides must see the same channel_id"
);
// Bob joins via Welcome that Alice sent (was_new=false path: no group creation, just join).
local
.run_until(cmd_join(&bob_state, &server, &ca_cert, "localhost", None))
.await?;
// Alice sends "hello" to Bob.
init_auth(ClientAuth::from_raw(session_alice, None));
local
.run_until(cmd_send(
&alice_state,
&server,
&ca_cert,
"localhost",
Some(&bob_pk_hex),
false,
"hello from alice",
None,
))
.await?;
// Bob receives and decrypts — no epoch conflict.
init_auth(ClientAuth::from_raw(session_bob, None));
let plaintexts = local
.run_until(receive_pending_plaintexts(
&bob_state,
&server,
&ca_cert,
"localhost",
1000,
None,
))
.await?;
anyhow::ensure!(
plaintexts.iter().any(|p| p.as_slice() == b"hello from alice"),
"Bob must decrypt Alice's message without epoch error; got {:?}",
plaintexts.iter().map(|p| String::from_utf8_lossy(p).to_string()).collect::<Vec<_>>()
);
Ok(())
}
/// Send 10 messages alternating Alice→Bob and Bob→Alice through an MLS DM channel.
/// All messages must decrypt successfully, proving epoch stays in sync.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn e2e_dm_multi_message_epoch_synchronized() -> anyhow::Result<()> {
ensure_rustls_provider();
let _auth = AUTH_LOCK.lock().unwrap();
let temp = TempDir::new()?;
let base = temp.path();
let (server, ca_cert, _child) = spawn_server(base, &[]);
wait_for_health(&server, &ca_cert, "localhost").await?;
init_auth(ClientAuth::from_parts("devtoken".to_string(), None));
let local = tokio::task::LocalSet::new();
let alice_state = base.join("alice.bin");
let bob_state = base.join("bob.bin");
local
.run_until(cmd_register_state(&alice_state, &server, &ca_cert, "localhost", None))
.await?;
local
.run_until(cmd_register_state(&bob_state, &server, &ca_cert, "localhost", None))
.await?;
let alice_seed = bincode::deserialize::<StoredStateCompat>(&std::fs::read(&alice_state)?)?.identity_seed;
let bob_seed = bincode::deserialize::<StoredStateCompat>(&std::fs::read(&bob_state)?)?.identity_seed;
let alice_pk = IdentityKeypair::from_seed(alice_seed).public_key_bytes().to_vec();
let bob_pk = IdentityKeypair::from_seed(bob_seed).public_key_bytes().to_vec();
let alice_pk_hex = hex_encode(&alice_pk);
let bob_pk_hex = hex_encode(&bob_pk);
local
.run_until(cmd_register_user(&server, &ca_cert, "localhost", "alice", "pass", Some(&alice_pk_hex)))
.await?;
local
.run_until(cmd_register_user(&server, &ca_cert, "localhost", "bob", "pass", Some(&bob_pk_hex)))
.await?;
let client = local.run_until(connect_node(&server, &ca_cert, "localhost")).await?;
// Alice creates the DM channel and invites Bob.
let session_alice = local
.run_until(opaque_login(&client, "alice", "pass", &alice_pk))
.await?;
init_auth(ClientAuth::from_raw(session_alice.clone(), None));
let (channel_id, was_new) = local
.run_until(create_channel(&client, &bob_pk))
.await?;
anyhow::ensure!(was_new, "first create_channel must be was_new=true");
local
.run_until(cmd_create_group(&alice_state, &server, &hex_encode(&channel_id), None))
.await?;
local
.run_until(cmd_invite(&alice_state, &server, &ca_cert, "localhost", &bob_pk_hex, None))
.await?;
// Bob joins.
let session_bob = local
.run_until(opaque_login(&client, "bob", "pass", &bob_pk))
.await?;
init_auth(ClientAuth::from_raw(session_bob.clone(), None));
local
.run_until(cmd_join(&bob_state, &server, &ca_cert, "localhost", None))
.await?;
// 10 messages: Alice→Bob on even, Bob→Alice on odd.
for i in 0u32..10 {
let msg = format!("msg_{i}");
if i % 2 == 0 {
// Alice sends to Bob.
init_auth(ClientAuth::from_raw(session_alice.clone(), None));
local
.run_until(cmd_send(
&alice_state,
&server,
&ca_cert,
"localhost",
Some(&bob_pk_hex),
false,
&msg,
None,
))
.await?;
// Bob receives.
init_auth(ClientAuth::from_raw(session_bob.clone(), None));
let plaintexts = local
.run_until(receive_pending_plaintexts(
&bob_state,
&server,
&ca_cert,
"localhost",
1000,
None,
))
.await?;
anyhow::ensure!(
plaintexts.iter().any(|p| p.as_slice() == msg.as_bytes()),
"Bob did not receive '{msg}' at iteration {i}; got {:?}",
plaintexts.iter().map(|p| String::from_utf8_lossy(p).to_string()).collect::<Vec<_>>()
);
} else {
// Bob sends to Alice.
init_auth(ClientAuth::from_raw(session_bob.clone(), None));
local
.run_until(cmd_send(
&bob_state,
&server,
&ca_cert,
"localhost",
Some(&alice_pk_hex),
false,
&msg,
None,
))
.await?;
// Alice receives.
init_auth(ClientAuth::from_raw(session_alice.clone(), None));
let plaintexts = local
.run_until(receive_pending_plaintexts(
&alice_state,
&server,
&ca_cert,
"localhost",
1000,
None,
))
.await?;
anyhow::ensure!(
plaintexts.iter().any(|p| p.as_slice() == msg.as_bytes()),
"Alice did not receive '{msg}' at iteration {i}; got {:?}",
plaintexts.iter().map(|p| String::from_utf8_lossy(p).to_string()).collect::<Vec<_>>()
);
}
}
Ok(())
}

View File

@@ -677,17 +677,17 @@ mod tests {
joiner.join_group(&welcome).expect("joiner join group");
let ct_creator = creator.send_message(b"hello").expect("creator send");
let pt_joiner = joiner
.receive_message(&ct_creator)
.expect("joiner recv")
.expect("application message");
let pt_joiner = match joiner.receive_message(&ct_creator).expect("joiner recv") {
ReceivedMessage::Application(pt) => pt,
other => panic!("expected Application, got {other:?}"),
};
assert_eq!(pt_joiner, b"hello");
let ct_joiner = joiner.send_message(b"hello back").expect("joiner send");
let pt_creator = creator
.receive_message(&ct_joiner)
.expect("creator recv")
.expect("application message");
let pt_creator = match creator.receive_message(&ct_joiner).expect("creator recv") {
ReceivedMessage::Application(pt) => pt,
other => panic!("expected Application, got {other:?}"),
};
assert_eq!(pt_creator, b"hello back");
}
@@ -718,17 +718,17 @@ mod tests {
joiner.join_group(&welcome).expect("joiner join hybrid group");
let ct_creator = creator.send_message(b"hello PQ").expect("creator send");
let pt_joiner = joiner
.receive_message(&ct_creator)
.expect("joiner recv")
.expect("application message");
let pt_joiner = match joiner.receive_message(&ct_creator).expect("joiner recv") {
ReceivedMessage::Application(pt) => pt,
other => panic!("expected Application, got {other:?}"),
};
assert_eq!(pt_joiner, b"hello PQ");
let ct_joiner = joiner.send_message(b"quantum safe!").expect("joiner send");
let pt_creator = creator
.receive_message(&ct_joiner)
.expect("creator recv")
.expect("application message");
let pt_creator = match creator.receive_message(&ct_joiner).expect("creator recv") {
ReceivedMessage::Application(pt) => pt,
other => panic!("expected Application, got {other:?}"),
};
assert_eq!(pt_creator, b"quantum safe!");
}
@@ -746,4 +746,278 @@ mod tests {
"group_id must match what was passed"
);
}
/// Helper: set up a 3-party group (creator + A + B).
fn setup_three_party(hybrid: bool) -> (GroupMember, GroupMember, GroupMember) {
let creator_id = Arc::new(IdentityKeypair::generate());
let a_id = Arc::new(IdentityKeypair::generate());
let b_id = Arc::new(IdentityKeypair::generate());
let (mut creator, mut a, mut b) = if hybrid {
(
GroupMember::new_hybrid(creator_id),
GroupMember::new_hybrid(a_id),
GroupMember::new_hybrid(b_id),
)
} else {
(
GroupMember::new(creator_id),
GroupMember::new(a_id),
GroupMember::new(b_id),
)
};
let a_kp = a.generate_key_package().expect("A KeyPackage");
let b_kp = b.generate_key_package().expect("B KeyPackage");
creator.create_group(b"three-party").expect("create group");
// Add A
let (_commit_a, welcome_a) = creator.add_member(&a_kp).expect("add A");
a.join_group(&welcome_a).expect("A join");
// A must process the commit that added them (it's a StateChanged for A since
// the commit itself is what brought them in — but actually A joined via Welcome,
// so A doesn't process the add-commit). The creator already merged the pending
// commit in add_member, so creator is at epoch 2.
// Add B — at this point creator is at epoch 2 (after adding A).
let (commit_b, welcome_b) = creator.add_member(&b_kp).expect("add B");
b.join_group(&welcome_b).expect("B join");
// A must process the commit that added B to stay in sync.
match a.receive_message(&commit_b).expect("A recv add-B commit") {
ReceivedMessage::StateChanged => {}
other => panic!("expected StateChanged, got {other:?}"),
}
(creator, a, b)
}
/// Three-party hybrid MLS round-trip: all members exchange messages.
#[test]
fn three_party_hybrid_mls_round_trip() {
let (mut creator, mut a, mut b) = setup_three_party(true);
// Creator sends to A and B
let ct = creator.send_message(b"hello group").expect("creator send");
let pt_a = match a.receive_message(&ct).expect("A recv") {
ReceivedMessage::Application(pt) => pt,
other => panic!("expected Application, got {other:?}"),
};
let pt_b = match b.receive_message(&ct).expect("B recv") {
ReceivedMessage::Application(pt) => pt,
other => panic!("expected Application, got {other:?}"),
};
assert_eq!(pt_a, b"hello group");
assert_eq!(pt_b, b"hello group");
// A sends, creator and B receive
let ct_a = a.send_message(b"from A").expect("A send");
let pt_creator = match creator.receive_message(&ct_a).expect("creator recv") {
ReceivedMessage::Application(pt) => pt,
other => panic!("expected Application, got {other:?}"),
};
let pt_b2 = match b.receive_message(&ct_a).expect("B recv A") {
ReceivedMessage::Application(pt) => pt,
other => panic!("expected Application, got {other:?}"),
};
assert_eq!(pt_creator, b"from A");
assert_eq!(pt_b2, b"from A");
}
/// Creator adds A and B, then removes B. A and creator can still communicate.
/// B can no longer decrypt.
#[test]
fn three_party_remove_member() {
let (mut creator, mut a, mut b) = setup_three_party(false);
// Get B's identity for removal
let b_identity = b.identity.public_key_bytes().to_vec();
// Creator removes B
let remove_commit = creator.remove_member(&b_identity).expect("remove B");
// A processes the remove commit
match a.receive_message(&remove_commit).expect("A recv remove") {
ReceivedMessage::StateChanged => {}
other => panic!("expected StateChanged, got {other:?}"),
}
// B processes the remove commit — should get SelfRemoved
match b.receive_message(&remove_commit).expect("B recv remove") {
ReceivedMessage::SelfRemoved => {}
other => panic!("expected SelfRemoved, got {other:?}"),
}
// B's group should be cleared
assert!(b.group_id().is_none(), "B's group should be None after removal");
// Creator and A can still communicate
let ct = creator.send_message(b"after removal").expect("creator send");
let pt = match a.receive_message(&ct).expect("A recv") {
ReceivedMessage::Application(pt) => pt,
other => panic!("expected Application, got {other:?}"),
};
assert_eq!(pt, b"after removal");
// B cannot send (no group)
assert!(b.send_message(b"should fail").is_err());
}
/// A proposes to leave, creator commits the proposal, A receives SelfRemoved.
#[test]
fn leave_group_proposal() {
let (mut creator, mut a, _b) = setup_three_party(false);
// A proposes to leave
let leave_proposal = a.leave_group().expect("A leave");
// Creator receives the proposal (stored as pending)
match creator.receive_message(&leave_proposal).expect("creator recv proposal") {
ReceivedMessage::StateChanged => {}
other => panic!("expected StateChanged for proposal, got {other:?}"),
}
// Creator should have pending proposals
assert!(creator.has_pending_proposals(), "should have pending proposal");
// Creator commits the pending proposals
let (commit_bytes, _welcome) = creator
.commit_pending_proposals()
.expect("commit pending");
// A processes the commit — should get SelfRemoved
match a.receive_message(&commit_bytes).expect("A recv commit") {
ReceivedMessage::SelfRemoved => {}
other => panic!("expected SelfRemoved, got {other:?}"),
}
assert!(a.group_id().is_none(), "A's group should be None after leave");
}
/// Propose self-update, commit, other member processes the commit.
#[test]
fn propose_self_update_round_trip() {
let creator_id = Arc::new(IdentityKeypair::generate());
let joiner_id = Arc::new(IdentityKeypair::generate());
let mut creator = GroupMember::new(Arc::clone(&creator_id));
let mut joiner = GroupMember::new(Arc::clone(&joiner_id));
let joiner_kp = joiner.generate_key_package().expect("joiner KP");
creator.create_group(b"update-test").expect("create");
let (_commit, welcome) = creator.add_member(&joiner_kp).expect("add");
joiner.join_group(&welcome).expect("join");
// Creator proposes a self-update
let update_proposal = creator.propose_self_update().expect("propose update");
// Joiner receives the proposal
match joiner.receive_message(&update_proposal).expect("joiner recv proposal") {
ReceivedMessage::StateChanged => {}
other => panic!("expected StateChanged, got {other:?}"),
}
// Joiner commits the pending update proposal
let (commit_bytes, _) = joiner.commit_pending_proposals().expect("commit update");
// Creator processes the commit
match creator.receive_message(&commit_bytes).expect("creator recv commit") {
ReceivedMessage::StateChanged => {}
other => panic!("expected StateChanged, got {other:?}"),
}
// Both can still communicate after the update
let ct = creator.send_message(b"post-update").expect("send");
let pt = match joiner.receive_message(&ct).expect("recv") {
ReceivedMessage::Application(pt) => pt,
other => panic!("expected Application, got {other:?}"),
};
assert_eq!(pt, b"post-update");
}
/// Receiving a ciphertext from a stale (lower) epoch returns an error — not a panic.
/// This is the core invariant violated by the bidirectional-/dm race condition.
#[test]
fn receive_stale_epoch_message_returns_error() {
let creator_id = Arc::new(IdentityKeypair::generate());
let joiner_a_id = Arc::new(IdentityKeypair::generate());
let joiner_b_id = Arc::new(IdentityKeypair::generate());
let mut creator = GroupMember::new(Arc::clone(&creator_id));
let mut joiner_a = GroupMember::new(Arc::clone(&joiner_a_id));
let mut joiner_b = GroupMember::new(Arc::clone(&joiner_b_id));
// Set up group with joiner_a (epoch 1 after create_group, epoch 2 after add).
let kp_a = joiner_a.generate_key_package().expect("kp_a");
creator.create_group(b"stale-epoch-test").expect("create");
let (_, welcome_a) = creator.add_member(&kp_a).expect("add a");
joiner_a.join_group(&welcome_a).expect("join a");
// Creator sends a message at the current epoch (epoch 2).
let ct_epoch2 = creator.send_message(b"epoch-2 message").expect("send");
// Creator now adds joiner_b, advancing to epoch 3. joiner_a must process the commit.
let kp_b = joiner_b.generate_key_package().expect("kp_b");
let (commit_b, welcome_b) = creator.add_member(&kp_b).expect("add b");
joiner_b.join_group(&welcome_b).expect("join b");
match joiner_a.receive_message(&commit_b).expect("a recv add-b commit") {
ReceivedMessage::StateChanged => {}
other => panic!("expected StateChanged, got {other:?}"),
}
// joiner_b joined at epoch 3 via Welcome. Attempting to decrypt ct_epoch2 (epoch 2)
// must return an error, not panic.
let result = joiner_b.receive_message(&ct_epoch2);
assert!(
result.is_err(),
"decrypting an epoch-2 ciphertext in epoch-3 context must fail, not panic"
);
}
/// 10 messages alternating Alice→Bob and Bob→Alice all decrypt successfully.
/// Verifies that epoch state stays in sync across multiple application messages.
#[test]
fn multi_message_roundtrip_epoch_stays_in_sync() {
let alice_id = Arc::new(IdentityKeypair::generate());
let bob_id = Arc::new(IdentityKeypair::generate());
let mut alice = GroupMember::new(Arc::clone(&alice_id));
let mut bob = GroupMember::new(Arc::clone(&bob_id));
let bob_kp = bob.generate_key_package().expect("bob kp");
alice.create_group(b"multi-msg-test").expect("create");
let (_, welcome) = alice.add_member(&bob_kp).expect("add bob");
bob.join_group(&welcome).expect("join");
for i in 0u32..5 {
let payload_alice = format!("alice msg {i}");
let ct = alice.send_message(payload_alice.as_bytes()).expect("alice send");
let pt = match bob.receive_message(&ct).expect("bob recv") {
ReceivedMessage::Application(pt) => pt,
other => panic!("expected Application, got {other:?}"),
};
assert_eq!(pt, payload_alice.as_bytes());
let payload_bob = format!("bob reply {i}");
let ct = bob.send_message(payload_bob.as_bytes()).expect("bob send");
let pt = match alice.receive_message(&ct).expect("alice recv") {
ReceivedMessage::Application(pt) => pt,
other => panic!("expected Application, got {other:?}"),
};
assert_eq!(pt, payload_bob.as_bytes());
}
}
/// A member who has not yet joined (no group) cannot send messages.
#[test]
fn send_before_join_returns_error() {
let id = Arc::new(IdentityKeypair::generate());
let mut member = GroupMember::new(id);
assert!(
member.send_message(b"too early").is_err(),
"send_message before join must return an error"
);
}
}

View File

@@ -32,7 +32,7 @@ pub use app_message::{
serialize_typing, parse, generate_message_id, AppMessage, MessageType, VERSION as APP_MESSAGE_VERSION,
};
pub use error::CoreError;
pub use group::GroupMember;
pub use group::{GroupMember, ReceivedMessage, ReceivedMessageWithSender};
pub use hybrid_kem::{
hybrid_decrypt, hybrid_encrypt, HybridKemError, HybridKeypair, HybridKeypairBytes,
HybridPublicKey,

View File

@@ -15,8 +15,9 @@
use iroh::{Endpoint, EndpointAddr, PublicKey, SecretKey};
/// ALPN protocol identifier for quicproquo P2P messaging.
/// Frozen at the original project name for wire compatibility.
const P2P_ALPN: &[u8] = b"quicnprotochat/p2p/1";
/// Updated from the original project name "quicnprotochat" to "quicproquo" (breaking wire change;
/// all peers must be on the same version to connect).
const P2P_ALPN: &[u8] = b"quicproquo/p2p/1";
/// A P2P node backed by an iroh endpoint.
///

View File

@@ -56,5 +56,8 @@ toml = { version = "0.8" }
metrics = "0.22"
metrics-exporter-prometheus = "0.15"
# mDNS service announcement for local mesh / Freifunk node discovery.
mdns-sd = "0.12"
[dev-dependencies]
tempfile = "3"

View File

@@ -72,6 +72,7 @@ pub struct FederationPeerConfig {
}
#[derive(Debug)]
#[allow(dead_code)] // federation not yet wired up
pub struct EffectiveFederationConfig {
pub enabled: bool,
pub domain: String,

View File

@@ -2,6 +2,8 @@
//!
//! A bare `username` (no `@`) is treated as local.
#![allow(dead_code)] // federation not yet wired up
/// A parsed federated address.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FederatedAddress {

View File

@@ -2,15 +2,16 @@
//!
//! Uses a lazy connection pool (DashMap) to reuse QUIC connections to known peers.
#![allow(dead_code)] // federation not yet wired up
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use anyhow::Context;
use dashmap::DashMap;
use quinn::Endpoint;
use crate::config::{EffectiveFederationConfig, FederationPeerConfig};
use crate::config::EffectiveFederationConfig;
/// Outbound federation client for relaying to peer servers.
pub struct FederationClient {

View File

@@ -11,6 +11,4 @@ pub mod routing;
pub mod service;
pub mod tls;
pub use address::FederatedAddress;
pub use client::FederationClient;
pub use routing::Destination;

View File

@@ -46,7 +46,7 @@ pub fn build_federation_server_config(
let mut tls = rustls::ServerConfig::builder_with_protocol_versions(&[&TLS13])
.with_client_cert_verifier(client_verifier)
.with_single_cert(cert_chain, key)?;
tls.alpn_protocols = vec![b"qnpc-fed".to_vec()];
tls.alpn_protocols = vec![b"quicproquo/federation/1".to_vec()];
let crypto = QuicServerConfig::try_from(tls)
.map_err(|e| anyhow::anyhow!("invalid federation server TLS config: {e}"))?;

View File

@@ -354,6 +354,65 @@ async fn main() -> anyhow::Result<()> {
None
};
// ── mDNS local mesh discovery ─────────────────────────────────────────────
// Announce this server on the local network so mesh-mode clients (and other
// Freifunk nodes) can discover it automatically without manual configuration.
// Non-critical: failures are logged as warnings; the server starts regardless.
let _mdns_daemon = {
let listen_port: u16 = listen.port();
// Use the federation domain as the mDNS instance name when available.
let mdns_instance = effective
.federation
.as_ref()
.map(|f| f.domain.clone())
.unwrap_or_else(|| "qpq-server".to_string());
// mDNS host names must end with a dot.
let mdns_host = if mdns_instance.ends_with('.') {
mdns_instance.clone()
} else {
format!("{mdns_instance}.local.")
};
match mdns_sd::ServiceDaemon::new() {
Ok(daemon) => {
let mut props = std::collections::HashMap::new();
props.insert("ver".to_string(), "1".to_string());
props.insert("server".to_string(), effective.listen.clone());
props.insert("domain".to_string(), mdns_instance.clone());
match mdns_sd::ServiceInfo::new(
"_quicproquo._udp.local.",
&mdns_instance,
&mdns_host,
&[] as &[std::net::IpAddr],
listen_port,
Some(props),
) {
Ok(info) => match daemon.register(info) {
Ok(()) => {
tracing::info!(
instance = %mdns_instance,
port = listen_port,
"mDNS: announced qpq server on local network (_quicproquo._udp.local.)"
);
}
Err(e) => {
tracing::warn!(error = %e, "mDNS: service registration failed; mesh discovery disabled");
}
},
Err(e) => {
tracing::warn!(error = %e, "mDNS: failed to build service info; mesh discovery disabled");
}
}
Some(daemon)
}
Err(e) => {
tracing::warn!(error = %e, "mDNS: daemon start failed; mesh discovery disabled");
None
}
}
};
// capnp-rpc is !Send (Rc internals), so all RPC tasks must stay on a LocalSet.
let local = LocalSet::new();
local

View File

@@ -51,12 +51,14 @@ impl NodeServiceImpl {
));
}
let channel_id = match self.store.create_channel(&identity, &peer_key) {
Ok(id) => id,
let (channel_id, was_new) = match self.store.create_channel(&identity, &peer_key) {
Ok(pair) => pair,
Err(e) => return Promise::err(storage_err(e)),
};
results.get().set_channel_id(&channel_id);
let mut r = results.get();
r.set_channel_id(&channel_id);
r.set_was_new(was_new);
Promise::ok(())
}
}

View File

@@ -100,6 +100,42 @@ impl NodeServiceImpl {
}
}
// Federation routing: if the recipient's home server differs from ours, relay the
// message to the remote server instead of enqueueing locally. This enables
// cross-node delivery in a Freifunk / community mesh deployment.
if let (Some(fed_client), Some(local_domain)) =
(&self.federation_client, &self.local_domain)
{
let dest = crate::federation::routing::resolve_destination(
&self.store,
&recipient_key,
local_domain,
);
if let crate::federation::routing::Destination::Remote(remote_domain) = dest {
let fed = Arc::clone(fed_client);
let rk = recipient_key;
let pl = payload;
let ch = channel_id;
tracing::info!(
recipient_prefix = %fmt_hex(&rk[..4]),
domain = %remote_domain,
"federation: routing enqueue to remote server"
);
return Promise::from_future(async move {
let seq = fed
.relay_enqueue(&remote_domain, &rk, &pl, &ch)
.await
.map_err(|e| {
capnp::Error::failed(format!("federation relay failed: {e}"))
})?;
results.get().set_seq(seq);
metrics::record_enqueue_total();
metrics::record_enqueue_bytes(pl.len() as u64);
Ok(())
});
}
}
// DM channel authz: channel_id.len() == 16 means a created channel; caller and recipient must be the two members.
if channel_id.len() == 16 {
let members = match self.store.get_channel_members(&channel_id) {
@@ -591,7 +627,8 @@ impl NodeServiceImpl {
}
}
let mut seqs = Vec::with_capacity(recipient_keys.len() as usize);
// Eagerly collect recipient keys so params can be dropped before any async work.
let mut recipient_key_vecs: Vec<Vec<u8>> = Vec::with_capacity(recipient_keys.len() as usize);
for i in 0..recipient_keys.len() {
let rk = match recipient_keys.get(i) {
Ok(v) => v.to_vec(),
@@ -604,7 +641,7 @@ impl NodeServiceImpl {
));
}
// Per-recipient DM channel membership check.
// Per-recipient DM channel membership check (only when channel_id is a 16-byte UUID).
if channel_id.len() == 16 {
let members = match self.store.get_channel_members(&channel_id) {
Ok(Some(m)) => m,
@@ -631,31 +668,65 @@ impl NodeServiceImpl {
}
}
match self.store.queue_depth(&rk, &channel_id) {
recipient_key_vecs.push(rk);
}
let n = recipient_key_vecs.len();
let store = Arc::clone(&self.store);
let waiters = Arc::clone(&self.waiters);
let fed_client = self.federation_client.clone();
let local_domain = self.local_domain.clone();
// Use an async future to support federation relay alongside local enqueue.
// All storage operations are synchronous; only federation relay calls are await-ed.
Promise::from_future(async move {
let mut seqs = Vec::with_capacity(n);
for rk in &recipient_key_vecs {
// Federation routing: relay to the recipient's home server when remote.
let dest = if let (Some(ref _fed), Some(ref domain)) = (&fed_client, &local_domain) {
crate::federation::routing::resolve_destination(&store, rk, domain)
} else {
crate::federation::routing::Destination::Local
};
let seq = match dest {
crate::federation::routing::Destination::Remote(ref remote_domain) => {
let fed = fed_client.as_deref().ok_or_else(|| {
capnp::Error::failed("federation client unavailable for remote routing".into())
})?;
tracing::info!(
recipient_prefix = %fmt_hex(&rk[..4]),
domain = %remote_domain,
"federation: routing batch enqueue to remote server"
);
fed.relay_enqueue(remote_domain, rk, &payload, &channel_id)
.await
.map_err(|e| {
capnp::Error::failed(format!("federation relay failed: {e}"))
})?
}
crate::federation::routing::Destination::Local => {
match store.queue_depth(rk, &channel_id) {
Ok(depth) if depth >= MAX_QUEUE_DEPTH => {
return Promise::err(coded_error(
return Err(coded_error(
E015_QUEUE_FULL,
format!("queue depth {} exceeds limit {}", depth, MAX_QUEUE_DEPTH),
format!("queue depth {} exceeds limit {MAX_QUEUE_DEPTH}", depth),
));
}
Err(e) => return Promise::err(storage_err(e)),
Err(e) => return Err(storage_err(e)),
_ => {}
}
let seq = match self
.store
.enqueue(&rk, &channel_id, payload.clone())
.map_err(storage_err)
{
Ok(seq) => seq,
Err(e) => return Promise::err(e),
store
.enqueue(rk, &channel_id, payload.clone())
.map_err(storage_err)?
}
};
seqs.push(seq);
seqs.push(seq);
metrics::record_enqueue_total();
metrics::record_enqueue_bytes(payload.len() as u64);
crate::auth::waiter(&self.waiters, &rk).notify_waiters();
crate::auth::waiter(&waiters, rk).notify_waiters();
}
let mut list = results.get().init_seqs(seqs.len() as u32);
@@ -664,11 +735,12 @@ impl NodeServiceImpl {
}
tracing::info!(
recipient_count = recipient_keys.len(),
recipient_count = n,
payload_len = payload.len(),
"audit: batch_enqueue"
);
Promise::ok(())
Ok(())
})
}
}

View File

@@ -457,7 +457,7 @@ impl Store for SqlStore {
.map_err(|e| StorageError::Db(e.to_string()))
}
fn create_channel(&self, member_a: &[u8], member_b: &[u8]) -> Result<Vec<u8>, StorageError> {
fn create_channel(&self, member_a: &[u8], member_b: &[u8]) -> Result<(Vec<u8>, bool), StorageError> {
let (a, b) = if member_a < member_b {
(member_a.to_vec(), member_b.to_vec())
} else {
@@ -473,7 +473,7 @@ impl Store for SqlStore {
.optional()
.map_err(|e| StorageError::Db(e.to_string()))?;
if let Some(id) = existing {
return Ok(id);
return Ok((id, false));
}
let mut channel_id = [0u8; 16];
rand::thread_rng().fill_bytes(&mut channel_id);
@@ -482,7 +482,7 @@ impl Store for SqlStore {
params![channel_id.as_slice(), a, b],
)
.map_err(|e| StorageError::Db(e.to_string()))?;
Ok(channel_id.to_vec())
Ok((channel_id.to_vec(), true))
}
fn get_channel_members(&self, channel_id: &[u8]) -> Result<Option<(Vec<u8>, Vec<u8>)>, StorageError> {
@@ -721,4 +721,107 @@ mod tests {
let b_msgs = store.fetch(&rk, b"ch-b").unwrap();
assert_eq!(b_msgs, vec![(0u64, b"b1".to_vec())]);
}
#[test]
fn create_channel_was_new_first_call() {
let store = open_in_memory();
let a = [10u8; 32];
let b = [11u8; 32];
let (id, was_new) = store.create_channel(&a, &b).unwrap();
assert_eq!(id.len(), 16, "channel_id must be 16 bytes");
assert!(was_new, "first create_channel must return was_new=true");
}
#[test]
fn create_channel_idempotent_same_direction() {
let store = open_in_memory();
let a = [12u8; 32];
let b = [13u8; 32];
let (id1, was_new1) = store.create_channel(&a, &b).unwrap();
let (id2, was_new2) = store.create_channel(&a, &b).unwrap();
assert_eq!(id1, id2, "repeated call must return same channel_id");
assert!(was_new1);
assert!(!was_new2, "second call must return was_new=false");
}
#[test]
fn create_channel_idempotent_reversed_direction() {
let store = open_in_memory();
let a = [14u8; 32];
let b = [15u8; 32];
let (id1, was_new1) = store.create_channel(&a, &b).unwrap();
let (id2, was_new2) = store.create_channel(&b, &a).unwrap();
assert_eq!(id1, id2, "reversed-key call must return same channel_id");
assert!(was_new1);
assert!(!was_new2, "reversed-key second call must return was_new=false");
}
#[test]
fn create_channel_different_pairs_isolated() {
let store = open_in_memory();
let a = [16u8; 32];
let b = [17u8; 32];
let c = [18u8; 32];
let (id_ab, _) = store.create_channel(&a, &b).unwrap();
let (id_ac, _) = store.create_channel(&a, &c).unwrap();
let (id_bc, _) = store.create_channel(&b, &c).unwrap();
assert_ne!(id_ab, id_ac);
assert_ne!(id_ab, id_bc);
assert_ne!(id_ac, id_bc);
}
#[test]
fn create_channel_get_members_roundtrip() {
let store = open_in_memory();
let a = [20u8; 32];
let b = [21u8; 32];
let (id, _) = store.create_channel(&a, &b).unwrap();
let members = store.get_channel_members(&id).unwrap();
assert!(members.is_some(), "get_channel_members must return Some after create");
let (ma, mb) = members.unwrap();
// members stored in canonical (lex) order
let (expected_a, expected_b) = if a < b {
(a.to_vec(), b.to_vec())
} else {
(b.to_vec(), a.to_vec())
};
assert_eq!(ma, expected_a);
assert_eq!(mb, expected_b);
}
#[test]
fn get_channel_members_unknown_id_returns_none() {
let store = open_in_memory();
assert!(store.get_channel_members(&[0u8; 16]).unwrap().is_none());
}
#[test]
fn resolve_identity_key_after_store() {
let store = open_in_memory();
let ik = [30u8; 32];
store.store_user_record("carol", b"record".to_vec()).unwrap();
store.store_user_identity_key("carol", ik.to_vec()).unwrap();
let resolved = store.resolve_identity_key(&ik).unwrap();
assert_eq!(resolved, Some("carol".to_string()));
}
#[test]
fn resolve_identity_key_unknown_returns_none() {
let store = open_in_memory();
let unknown = [31u8; 32];
assert!(store.resolve_identity_key(&unknown).unwrap().is_none());
}
#[test]
fn resolve_identity_key_two_users_distinct() {
let store = open_in_memory();
let ik_a = [32u8; 32];
let ik_b = [33u8; 32];
store.store_user_record("user_a", b"ra".to_vec()).unwrap();
store.store_user_record("user_b", b"rb".to_vec()).unwrap();
store.store_user_identity_key("user_a", ik_a.to_vec()).unwrap();
store.store_user_identity_key("user_b", ik_b.to_vec()).unwrap();
assert_eq!(store.resolve_identity_key(&ik_a).unwrap(), Some("user_a".to_string()));
assert_eq!(store.resolve_identity_key(&ik_b).unwrap(), Some("user_b".to_string()));
}
}

View File

@@ -127,9 +127,12 @@ pub trait Store: Send + Sync {
/// Resolve a peer's P2P endpoint address.
fn resolve_endpoint(&self, identity_key: &[u8]) -> Result<Option<Vec<u8>>, StorageError>;
/// Create a 1:1 channel between two members. Returns 16-byte channel_id (UUID).
/// Members are stored in sorted order for deterministic lookup.
fn create_channel(&self, member_a: &[u8], member_b: &[u8]) -> Result<Vec<u8>, StorageError>;
/// Create a 1:1 channel between two members.
/// Returns `(channel_id, was_new)` where `was_new` is true iff the channel was created by
/// this call (false = it already existed). Members are stored in sorted order for deterministic
/// lookup — both `create_channel(a, b)` and `create_channel(b, a)` return the same channel_id.
/// The caller who receives `was_new = true` is the MLS group initiator and must send the Welcome.
fn create_channel(&self, member_a: &[u8], member_b: &[u8]) -> Result<(Vec<u8>, bool), StorageError>;
/// Get the two members of a channel by channel_id (16 bytes). Returns (member_a, member_b) in sorted order.
fn get_channel_members(&self, channel_id: &[u8]) -> Result<Option<(Vec<u8>, Vec<u8>)>, StorageError>;
@@ -137,6 +140,7 @@ pub trait Store: Send + Sync {
// ── Federation ──────────────────────────────────────────────────────────
/// Store the home server domain for an identity key.
#[allow(dead_code)] // federation not yet wired up
fn store_identity_home_server(
&self,
identity_key: &[u8],
@@ -157,6 +161,7 @@ pub trait Store: Send + Sync {
) -> Result<(), StorageError>;
/// List all active federation peers.
#[allow(dead_code)] // federation not yet wired up
fn list_federation_peers(&self) -> Result<Vec<(String, bool)>, StorageError>;
}
@@ -647,7 +652,7 @@ impl Store for FileBackedStore {
Ok(map.get(identity_key).cloned())
}
fn create_channel(&self, member_a: &[u8], member_b: &[u8]) -> Result<Vec<u8>, StorageError> {
fn create_channel(&self, member_a: &[u8], member_b: &[u8]) -> Result<(Vec<u8>, bool), StorageError> {
let (a, b) = if member_a < member_b {
(member_a.to_vec(), member_b.to_vec())
} else {
@@ -655,14 +660,14 @@ impl Store for FileBackedStore {
};
let mut map = lock(&self.channels)?;
if let Some((channel_id, _)) = map.iter().find(|(_, (ma, mb))| ma == &a && mb == &b) {
return Ok(channel_id.clone());
return Ok((channel_id.clone(), false));
}
let mut channel_id = [0u8; 16];
rand::thread_rng().fill_bytes(&mut channel_id);
let channel_id = channel_id.to_vec();
map.insert(channel_id.clone(), (a, b));
self.flush_channels(&self.channels_path, &*map)?;
Ok(channel_id)
Ok((channel_id, true))
}
fn get_channel_members(&self, channel_id: &[u8]) -> Result<Option<(Vec<u8>, Vec<u8>)>, StorageError> {
@@ -812,12 +817,40 @@ mod tests {
let a = vec![1u8; 32];
let b = vec![2u8; 32];
assert_eq!(store.get_channel_members(&[0u8; 16]).unwrap(), None);
let id1 = store.create_channel(&a, &b).unwrap();
let (id1, was_new1) = store.create_channel(&a, &b).unwrap();
assert_eq!(id1.len(), 16);
assert!(was_new1, "first call must return was_new=true");
let members = store.get_channel_members(&id1).unwrap().unwrap();
assert_eq!(members.0, a);
assert_eq!(members.1, b);
let id2 = store.create_channel(&b, &a).unwrap();
let (id2, was_new2) = store.create_channel(&b, &a).unwrap();
assert_eq!(id1, id2, "reversed key order must return same channel_id");
assert!(!was_new2, "second call (reversed) must return was_new=false");
}
#[test]
fn create_channel_idempotent_same_direction() {
let (_dir, store) = temp_store();
let a = vec![3u8; 32];
let b = vec![4u8; 32];
let (id1, was_new1) = store.create_channel(&a, &b).unwrap();
let (id2, was_new2) = store.create_channel(&a, &b).unwrap();
assert_eq!(id1, id2);
assert!(was_new1);
assert!(!was_new2);
}
#[test]
fn create_channel_different_pairs_get_different_ids() {
let (_dir, store) = temp_store();
let a = vec![5u8; 32];
let b = vec![6u8; 32];
let c = vec![7u8; 32];
let (id_ab, _) = store.create_channel(&a, &b).unwrap();
let (id_ac, _) = store.create_channel(&a, &c).unwrap();
let (id_bc, _) = store.create_channel(&b, &c).unwrap();
assert_ne!(id_ab, id_ac);
assert_ne!(id_ab, id_bc);
assert_ne!(id_ac, id_bc);
}
}

View File

@@ -82,7 +82,10 @@ interface NodeService {
# Create a 1:1 channel between the caller and the given peer. Returns a 16-byte channelId (UUID).
# Both members can enqueue/fetch for this channel; recipientKey must be the other member.
createChannel @18 (peerKey :Data, auth :Auth) -> (channelId :Data);
# wasNew is true iff this call created the channel; false if it already existed.
# The caller who receives wasNew=true is the MLS group initiator and must send the Welcome.
# The caller who receives wasNew=false must wait for the peer's Welcome via the background poller.
createChannel @18 (peerKey :Data, auth :Auth) -> (channelId :Data, wasNew :Bool);
# Resolve a username to its Ed25519 identity key (32 bytes).
# Returns empty Data if the username is not registered.