diff --git a/crates/quicproquo-client/src/client/repl.rs b/crates/quicproquo-client/src/client/repl.rs index 2370e90..3117013 100644 --- a/crates/quicproquo-client/src/client/repl.rs +++ b/crates/quicproquo-client/src/client/repl.rs @@ -3,14 +3,15 @@ //! Supports slash commands for DMs, groups, invitations, and conversation switching. //! Background polling fetches messages for all active conversations. -use std::path::Path; +use std::path::{Path, PathBuf}; +use std::process::{Child, Command as ProcessCommand}; use std::sync::Arc; use std::time::Duration; use anyhow::Context; use quicproquo_core::{ - AppMessage, DiskKeyStore, GroupMember, IdentityKeypair, hybrid_encrypt, - parse as parse_app_msg, serialize_chat, + AppMessage, DiskKeyStore, GroupMember, IdentityKeypair, ReceivedMessage, + hybrid_encrypt, parse as parse_app_msg, serialize_chat, }; use quicproquo_proto::node_capnp::node_service; use tokio::sync::mpsc; @@ -48,6 +49,8 @@ enum SlashCommand { Dm { username: String }, CreateGroup { name: String }, Invite { target: String }, + Remove { target: String }, + Leave, Join, Members, History { count: usize }, @@ -99,6 +102,14 @@ fn parse_input(line: &str) -> Input { Input::Empty } }, + "/remove" | "/kick" => match arg { + Some(target) => Input::Slash(SlashCommand::Remove { target }), + None => { + display::print_error("usage: /remove "); + Input::Empty + } + }, + "/leave" => Input::Slash(SlashCommand::Leave), "/join" => Input::Slash(SlashCommand::Join), "/members" => Input::Slash(SlashCommand::Members), "/history" | "/hist" => { @@ -112,6 +123,137 @@ fn parse_input(line: &str) -> Input { } } +// ── Auto-start server ──────────────────────────────────────────────────────── + +/// RAII guard that kills an auto-started server process on drop. +struct ServerGuard(Option); + +impl Drop for ServerGuard { + fn drop(&mut self) { + if let Some(ref mut child) = self.0 { + let _ = child.kill(); + let _ = child.wait(); + } + } +} + +/// Derive the TLS key path from the cert path (e.g. server-cert.der → server-key.der). +fn derive_key_path(cert_path: &Path) -> PathBuf { + let stem = cert_path + .file_name() + .and_then(|f| f.to_str()) + .unwrap_or("server-cert.der"); + let key_name = stem.replace("cert", "key"); + let key_name = if key_name == stem { + // No "cert" in filename — just append "-key" before extension. + let p = PathBuf::from(stem); + let ext = p.extension().and_then(|e| e.to_str()).unwrap_or("der"); + let base = p.file_stem().and_then(|s| s.to_str()).unwrap_or("server"); + format!("{base}-key.{ext}") + } else { + key_name + }; + cert_path.with_file_name(key_name) +} + +/// Find the `qpq-server` binary: same directory as current exe, then PATH. +fn find_server_binary() -> Option { + if let Ok(exe) = std::env::current_exe() { + let sibling = exe.with_file_name("qpq-server"); + if sibling.exists() { + return Some(sibling); + } + } + // Fall back to PATH lookup. + std::env::var_os("PATH").and_then(|paths| { + std::env::split_paths(&paths) + .map(|dir| dir.join("qpq-server")) + .find(|p| p.exists()) + }) +} + +/// Try to connect to the server. Returns Ok(()) if reachable, Err otherwise. +async fn probe_server(server: &str, ca_cert: &Path, server_name: &str) -> bool { + connect_node(server, ca_cert, server_name).await.is_ok() +} + +/// Ensure a server is running. If not reachable, auto-start one. +/// Returns a guard that kills the child process on drop (if we started one). +async fn ensure_server( + server: &str, + ca_cert: &Path, + server_name: &str, + no_server: bool, +) -> anyhow::Result { + if no_server { + return Ok(ServerGuard(None)); + } + + // If the cert already exists, try connecting first. + if ca_cert.exists() && probe_server(server, ca_cert, server_name).await { + return Ok(ServerGuard(None)); + } + + // Server not reachable — try to auto-start. + let binary = match find_server_binary() { + Some(b) => b, + None => { + if ca_cert.exists() { + // Cert exists but connection failed and no binary found. + anyhow::bail!( + "server at {server} is not reachable and qpq-server binary not found; \ + start a server manually or install qpq-server" + ); + } else { + anyhow::bail!( + "no server running and qpq-server binary not found; \ + start a server manually or install qpq-server" + ); + } + } + }; + + let key_path = derive_key_path(ca_cert); + + display::print_status(&format!("starting server on {server}...")); + + let child = ProcessCommand::new(&binary) + .args([ + "--allow-insecure-auth", + "--listen", server, + "--tls-cert", &ca_cert.to_string_lossy(), + "--tls-key", &key_path.to_string_lossy(), + ]) + .stdout(std::process::Stdio::null()) + .stderr(std::process::Stdio::null()) + .spawn() + .with_context(|| format!("failed to spawn {}", binary.display()))?; + + let guard = ServerGuard(Some(child)); + + // Poll until the server is ready (cert may need to be generated first). + let mut delay = Duration::from_millis(100); + let max_wait = Duration::from_secs(5); + let start = std::time::Instant::now(); + + loop { + tokio::time::sleep(delay).await; + + if ca_cert.exists() && probe_server(server, ca_cert, server_name).await { + display::print_status("server ready"); + return Ok(guard); + } + + if start.elapsed() > max_wait { + anyhow::bail!( + "auto-started qpq-server but it did not become ready within {max_wait:?}" + ); + } + + delay = (delay * 2).min(Duration::from_secs(1)); + } +} + // ── REPL entry point ───────────────────────────────────────────────────────── pub async fn run_repl( @@ -124,7 +266,11 @@ pub async fn run_repl( opaque_password: Option<&str>, access_token: &str, device_id: Option<&str>, + no_server: bool, ) -> anyhow::Result<()> { + // Phase 0: Ensure a server is running (auto-start if needed). + let _server_guard = ensure_server(server, ca_cert, server_name, no_server).await?; + // Phase 1: Resolve an access token (auto-register/login if needed). let resolved_token = resolve_access_token( state_path, server, ca_cert, server_name, password, @@ -218,6 +364,8 @@ pub async fn run_repl( let mut poll = interval(Duration::from_millis(1000)); poll.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); let mut consecutive_errors: u32 = 0; + let mut backoff_ms: u64 = 1000; + const MAX_BACKOFF_MS: u64 = 60_000; display::print_prompt(&session); @@ -242,22 +390,35 @@ pub async fn run_repl( } } _ = poll.tick() => { + // Drain offline outbox before polling for new messages. + drain_outbox(&mut session, &client).await; + match poll_messages(&mut session, &client).await { - Ok(()) => { consecutive_errors = 0; } + Ok(()) => { + consecutive_errors = 0; + backoff_ms = 1000; + } Err(e) => { consecutive_errors += 1; tracing::warn!(error = format!("{e:#}"), n = consecutive_errors, "poll error"); if consecutive_errors >= 3 { - display::print_status("connection lost, reconnecting..."); + display::print_status(&format!( + "connection lost, reconnecting in {:.0}s...", + backoff_ms as f64 / 1000.0 + )); + tokio::time::sleep(Duration::from_millis(backoff_ms)).await; match connect_node(server, ca_cert, server_name).await { Ok(new_client) => { client = new_client; consecutive_errors = 0; + backoff_ms = 1000; display::print_status("reconnected"); display::print_prompt(&session); } Err(re) => { tracing::debug!(error = %re, "reconnect failed"); + // Exponential backoff, capped. + backoff_ms = (backoff_ms * 2).min(MAX_BACKOFF_MS); } } } @@ -409,6 +570,8 @@ async fn handle_slash( SlashCommand::Dm { username } => cmd_dm(session, client, &username).await, SlashCommand::CreateGroup { name } => cmd_create_group(session, &name), SlashCommand::Invite { target } => cmd_invite(session, client, &target).await, + SlashCommand::Remove { target } => cmd_remove(session, client, &target).await, + SlashCommand::Leave => cmd_leave(session, client).await, SlashCommand::Join => cmd_join(session, client).await, SlashCommand::Members => cmd_members(session), SlashCommand::History { count } => cmd_history(session, count), @@ -423,6 +586,8 @@ fn print_help() { display::print_status(" /dm - Start or switch to a DM (federation supported)"); display::print_status(" /create-group - Create a new group"); display::print_status(" /invite - Invite user to current group"); + display::print_status(" /remove - Remove a member from the current group"); + display::print_status(" /leave - Leave the current group"); display::print_status(" /join - Join a group from pending Welcome"); display::print_status(" /switch <@user|#group> - Switch conversation"); display::print_status(" /list - List all conversations"); @@ -541,6 +706,8 @@ async fn cmd_dm( unread_count: 0, last_activity_ms: now_ms(), created_at_ms: now_ms(), + is_hybrid: false, + last_seen_seq: 0, }; let ks = DiskKeyStore::ephemeral(); let member = GroupMember::new_with_state(Arc::clone(&session.identity), ks, None, false); @@ -569,12 +736,16 @@ async fn cmd_dm( let kp_bytes = fetch_key_package(client, &peer_key).await?; anyhow::ensure!(!kp_bytes.is_empty(), "peer has no key package uploaded"); + // Negotiate hybrid mode: use post-quantum MLS if both sides have hybrid keys. + let peer_hybrid_pk = fetch_hybrid_key(client, &peer_key).await?; + let use_hybrid = session.hybrid_kp.is_some() && peer_hybrid_pk.is_some(); + // Create MLS group using channel_id as group_id. let ks_dir = session.state_path.with_extension("keystores"); std::fs::create_dir_all(&ks_dir).ok(); let ks_path = ks_dir.join(format!("{}.ks", conv_id.hex())); let ks = DiskKeyStore::persistent(&ks_path)?; - let mut member = GroupMember::new_with_state(Arc::clone(&session.identity), ks, None, false); + let mut member = GroupMember::new_with_state(Arc::clone(&session.identity), ks, None, use_hybrid); // Generate a key package for ourselves (needed for MLS) let _my_kp = member.generate_key_package()?; @@ -583,9 +754,12 @@ async fn cmd_dm( let (commit, welcome) = member.add_member(&kp_bytes)?; // Deliver welcome to peer and commit to peer. - let peer_hybrid_pk = fetch_hybrid_key(client, &peer_key).await?; + // For hybrid MLS groups, the MLS layer already provides PQ protection, + // so we skip the outer hybrid envelope wrapping. let wrap = |data: &[u8]| -> anyhow::Result> { - if let Some(ref pk) = peer_hybrid_pk { + if use_hybrid { + Ok(data.to_vec()) // MLS-level hybrid KEM is sufficient + } else if let Some(ref pk) = peer_hybrid_pk { hybrid_encrypt(pk, data, b"", b"").context("hybrid encrypt") } else { Ok(data.to_vec()) @@ -613,6 +787,8 @@ async fn cmd_dm( unread_count: 0, last_activity_ms: now_ms(), created_at_ms: now_ms(), + is_hybrid: member.is_hybrid(), + last_seen_seq: 0, }; session.add_conversation(conv, member)?; @@ -655,6 +831,8 @@ fn cmd_create_group(session: &mut SessionState, name: &str) -> anyhow::Result<() unread_count: 0, last_activity_ms: now_ms(), created_at_ms: now_ms(), + is_hybrid: false, + last_seen_seq: 0, }; session.add_conversation(conv, member)?; @@ -735,6 +913,87 @@ async fn cmd_invite( Ok(()) } +async fn cmd_remove( + session: &mut SessionState, + client: &node_service::Client, + target: &str, +) -> anyhow::Result<()> { + let conv_id = session + .active_conversation + .as_ref() + .context("no active conversation")? + .clone(); + + let my_key = session.identity_bytes(); + + // Resolve the target — username or hex key. + let target_key = if target.len() == 64 && target.chars().all(|c| c.is_ascii_hexdigit()) { + decode_identity_key(target)? + } else { + resolve_user(client, target) + .await? + .with_context(|| format!("user '{target}' not found"))? + }; + + let member = session + .get_member_mut(&conv_id) + .context("no group member for active conversation")?; + + let commit = member.remove_member(&target_key)?; + + // Fan-out commit to all remaining members (excluding self). + let remaining: Vec> = member + .member_identities() + .into_iter() + .filter(|id| id.as_slice() != my_key.as_slice()) + .collect(); + + for rk in &remaining { + enqueue(client, rk, &commit).await?; + } + + session.save_member(&conv_id)?; + display::print_status(&format!("removed {target} from the group")); + Ok(()) +} + +async fn cmd_leave( + session: &mut SessionState, + client: &node_service::Client, +) -> anyhow::Result<()> { + let conv_id = session + .active_conversation + .as_ref() + .context("no active conversation")? + .clone(); + + let my_key = session.identity_bytes(); + + let member = session + .get_member_mut(&conv_id) + .context("no group member for active conversation")?; + + let proposal = member.leave_group()?; + + // Send leave proposal to all other members. + let others: Vec> = member + .member_identities() + .into_iter() + .filter(|id| id.as_slice() != my_key.as_slice()) + .collect(); + + for rk in &others { + enqueue(client, rk, &proposal).await?; + } + + // Locally deactivate the conversation (keep history). + session.members.remove(&conv_id); + session.active_conversation = None; + + display::print_status("left the conversation"); + Ok(()) +} + async fn cmd_join( session: &mut SessionState, client: &node_service::Client, @@ -814,6 +1073,8 @@ async fn cmd_join( unread_count: 0, last_activity_ms: now_ms(), created_at_ms: now_ms(), + is_hybrid: new_member.is_hybrid(), + last_seen_seq: 0, }; session.add_conversation(conv, new_member)?; @@ -971,6 +1232,28 @@ async fn do_send( Ok(()) } +// ── Outbox drain ───────────────────────────────────────────────────────────── + +async fn drain_outbox( + session: &mut SessionState, + client: &node_service::Client, +) { + let entries = match session.conv_store.load_pending_outbox() { + Ok(e) => e, + Err(_) => return, + }; + for entry in entries { + match enqueue(client, &entry.recipient_key, &entry.payload).await { + Ok(_) => { + let _ = session.conv_store.mark_outbox_sent(entry.id); + } + Err(_) => { + let _ = session.conv_store.mark_outbox_failed(entry.id, entry.retry_count + 1); + } + } + } +} + // ── Polling ────────────────────────────────────────────────────────────────── async fn poll_messages( @@ -994,6 +1277,7 @@ async fn poll_messages( // Try each conversation's GroupMember. let conv_ids: Vec = session.members.keys().cloned().collect(); + let my_key = session.identity_bytes(); let mut handled = false; for conv_id in &conv_ids { @@ -1003,7 +1287,7 @@ async fn poll_messages( }; match member.receive_message(&mls_payload) { - Ok(Some(plaintext)) => { + Ok(ReceivedMessage::Application(plaintext)) => { // Metadata protection: try unpad → unseal → parse. // Falls back gracefully for messages from older clients. let (sender_key, app_bytes) = { @@ -1015,10 +1299,10 @@ async fn poll_messages( if quicproquo_core::sealed_sender::is_sealed(&after_unpad) { match quicproquo_core::sealed_sender::unseal(&after_unpad) { Ok((sk, inner)) => (sk.to_vec(), inner), - Err(_) => (session.identity_bytes(), after_unpad), + Err(_) => (my_key.clone(), after_unpad), } } else { - (session.identity_bytes(), after_unpad) + (my_key.clone(), after_unpad) } }; @@ -1087,8 +1371,28 @@ async fn poll_messages( handled = true; break; } - Ok(None) => { - // Processed a non-application message (commit, etc.) + Ok(ReceivedMessage::StateChanged) => { + // Processed a non-application message (commit, proposal, etc.). + // Auto-commit any pending proposals (e.g. leave requests). + if member.has_pending_proposals() { + if let Ok((commit, _welcome)) = member.commit_pending_proposals() { + let remaining: Vec> = member + .member_identities() + .into_iter() + .filter(|id| id.as_slice() != my_key.as_slice()) + .collect(); + for rk in &remaining { + let _ = enqueue(client, rk, &commit).await; + } + } + } + any_changed = true; + handled = true; + break; + } + Ok(ReceivedMessage::SelfRemoved) => { + display::print_status("you were removed from this conversation"); + session.members.remove(conv_id); any_changed = true; handled = true; break; @@ -1199,6 +1503,8 @@ async fn try_auto_join( unread_count: 0, last_activity_ms: now_ms(), created_at_ms: now_ms(), + is_hybrid: member.is_hybrid(), + last_seen_seq: 0, }; if let Err(e) = session.add_conversation(conv, member) { diff --git a/crates/quicproquo-client/src/main.rs b/crates/quicproquo-client/src/main.rs index 3c5b886..2bdf834 100644 --- a/crates/quicproquo-client/src/main.rs +++ b/crates/quicproquo-client/src/main.rs @@ -70,6 +70,10 @@ struct Args { #[arg(long, env = "QPQ_PASSWORD")] password: Option, + /// Do not auto-start a local qpq-server (useful when connecting to a remote server). + #[arg(long, env = "QPQ_NO_SERVER")] + no_server: bool, + #[command(subcommand)] command: Option, } @@ -301,6 +305,9 @@ enum Command { /// OPAQUE password (prompted securely if --username is set but --password is not). #[arg(long, env = "QPQ_PASSWORD")] password: Option, + /// Do not auto-start a local qpq-server. + #[arg(long, env = "QPQ_NO_SERVER")] + no_server: bool, }, /// Interactive 1:1 chat: type to send, incoming messages printed as [peer] . Ctrl+D to exit. @@ -350,11 +357,13 @@ async fn main() -> anyhow::Result<()> { let state_pw = args.state_password.as_deref(); // Default to REPL when no subcommand is given. + let no_server = args.no_server; let command = args.command.unwrap_or_else(|| Command::Repl { state: args.state, server: args.server, username: args.username, password: args.password, + no_server, }); match command { @@ -553,6 +562,7 @@ async fn main() -> anyhow::Result<()> { server, username, password, + no_server, } => { let local = tokio::task::LocalSet::new(); local @@ -566,6 +576,7 @@ async fn main() -> anyhow::Result<()> { password.as_deref(), &args.access_token, args.device_id.as_deref(), + no_server, )) .await }