//! Interactive multi-conversation REPL. //! //! Supports slash commands for DMs, groups, invitations, and conversation switching. //! Background polling fetches messages for all active conversations. use std::path::{Path, PathBuf}; use std::process::{Child, Command as ProcessCommand}; use std::sync::Arc; use std::time::Duration; use anyhow::Context; use quicprochat_core::{ AppMessage, DiskKeyStore, GroupMember, IdentityKeypair, ReceivedMessage, compute_safety_number, hybrid_encrypt, parse as parse_app_msg, serialize_chat, serialize_delete, serialize_dummy, serialize_edit, serialize_file_ref, serialize_reaction, serialize_read_receipt, serialize_typing, }; use quicprochat_proto::node_capnp::node_service; use tokio::sync::mpsc; use tokio::time::interval; use crate::{ClientAuth, init_auth}; use super::commands::{opaque_login, opaque_register}; use super::conversation::{ now_ms, Conversation, ConversationId, ConversationKind, StoredMessage, }; use super::display; use super::rpc::{ connect_node, create_channel, delete_account, download_blob_chunk, enqueue, enqueue_with_ttl, fetch_hybrid_key, fetch_key_package, fetch_wait, list_devices, register_device, resolve_identity, resolve_user, revoke_device, try_hybrid_decrypt, upload_blob_chunk, upload_hybrid_key, upload_key_package, }; use super::session::SessionState; use super::state::{decode_identity_key, load_or_init_state}; use super::token_cache::{clear_cached_session, load_cached_session, save_cached_session}; // ── Input parsing ──────────────────────────────────────────────────────────── pub(crate) enum Input { Slash(SlashCommand), ChatMessage(String), Empty, } pub(crate) enum SlashCommand { Help, Quit, Whoami, List, Switch { target: String }, Dm { username: String }, CreateGroup { name: String }, Invite { target: String }, Remove { target: String }, Leave, Join, Members, GroupInfo, Rename { name: String }, History { count: usize }, /// Mesh subcommands: /mesh peers, /mesh server , etc. MeshPeers, MeshServer { addr: String }, MeshSend { peer_id: String, message: String }, MeshBroadcast { topic: String, message: String }, MeshSubscribe { topic: String }, MeshRoute, MeshIdentity, MeshStore, /// Display safety number for out-of-band key verification with a contact. Verify { username: String }, /// Rotate own MLS leaf key in the active group. UpdateKey, /// Send a typing indicator to the active conversation. Typing, /// Toggle display of typing notifications from others. TypingNotify { enabled: bool }, /// React to a message with an emoji. React { emoji: String, index: Option }, /// Edit a previously sent message by index. Edit { index: usize, new_text: String }, /// Delete a previously sent message by index. Delete { index: usize }, /// Send a file to the active conversation. SendFile { path: String }, /// Download a file attachment by message index. Download { index: usize }, /// Permanently delete the user's account on the server. DeleteAccount, /// Set or query disappearing message TTL for the active conversation. Disappear { arg: Option }, /// Privacy controls: redact-keys, auto-clear, padding. Privacy { arg: Option }, /// Verify that MLS epoch has advanced since last send (forward secrecy check). VerifyFs, /// Rotate MLS leaf key AND regenerate + upload hybrid KEM keypair. RotateAllKeys, /// List all registered devices. Devices, /// Register this device with a name. RegisterDevice { name: String }, /// Revoke a device by hex ID prefix. RevokeDevice { id_prefix: String }, } pub(crate) fn parse_input(line: &str) -> Input { let trimmed = line.trim(); if trimmed.is_empty() { return Input::Empty; } if !trimmed.starts_with('/') { return Input::ChatMessage(trimmed.to_string()); } let parts: Vec<&str> = trimmed.splitn(2, ' ').collect(); let cmd = parts[0].to_lowercase(); let arg = parts.get(1).map(|s| s.trim().to_string()); match cmd.as_str() { "/help" | "/h" => Input::Slash(SlashCommand::Help), "/quit" | "/q" | "/exit" => Input::Slash(SlashCommand::Quit), "/whoami" => Input::Slash(SlashCommand::Whoami), "/list" | "/ls" => Input::Slash(SlashCommand::List), "/switch" | "/sw" => match arg { Some(target) => Input::Slash(SlashCommand::Switch { target }), None => { display::print_error("usage: /switch @username or /switch #groupname"); Input::Empty } }, "/dm" => match arg { Some(username) => Input::Slash(SlashCommand::Dm { username }), None => { display::print_error("usage: /dm "); Input::Empty } }, "/create-group" | "/cg" => match arg { Some(name) => Input::Slash(SlashCommand::CreateGroup { name }), None => { display::print_error("usage: /create-group "); Input::Empty } }, "/invite" => match arg { Some(target) => Input::Slash(SlashCommand::Invite { target }), None => { display::print_error("usage: /invite "); Input::Empty } }, "/remove" | "/kick" => match arg { Some(target) => Input::Slash(SlashCommand::Remove { target }), None => { display::print_error("usage: /remove "); Input::Empty } }, "/leave" => Input::Slash(SlashCommand::Leave), "/join" => Input::Slash(SlashCommand::Join), "/members" => Input::Slash(SlashCommand::Members), "/group-info" | "/gi" => Input::Slash(SlashCommand::GroupInfo), "/rename" => match arg { Some(name) => Input::Slash(SlashCommand::Rename { name }), None => { display::print_error("usage: /rename "); Input::Empty } }, "/history" | "/hist" => { let count = arg.and_then(|s| s.parse().ok()).unwrap_or(20); Input::Slash(SlashCommand::History { count }) } "/mesh" => match arg.as_deref() { Some("peers") => Input::Slash(SlashCommand::MeshPeers), Some(rest) if rest.starts_with("server ") => { let addr = rest.trim_start_matches("server ").trim().to_string(); if addr.is_empty() { display::print_error("usage: /mesh server "); Input::Empty } else { Input::Slash(SlashCommand::MeshServer { addr }) } } Some(rest) if rest.starts_with("send ") => { let parts: Vec<&str> = rest.splitn(3, ' ').collect(); if parts.len() >= 3 { Input::Slash(SlashCommand::MeshSend { peer_id: parts[1].into(), message: parts[2].into(), }) } else { display::print_error("usage: /mesh send "); Input::Empty } } Some(rest) if rest.starts_with("broadcast ") => { let parts: Vec<&str> = rest.splitn(3, ' ').collect(); if parts.len() >= 3 { Input::Slash(SlashCommand::MeshBroadcast { topic: parts[1].into(), message: parts[2].into(), }) } else { display::print_error("usage: /mesh broadcast "); Input::Empty } } Some(rest) if rest.starts_with("subscribe ") => { let topic = rest[10..].trim(); if topic.is_empty() { display::print_error("usage: /mesh subscribe "); Input::Empty } else { Input::Slash(SlashCommand::MeshSubscribe { topic: topic.into() }) } } Some("route") => Input::Slash(SlashCommand::MeshRoute), Some("identity") | Some("id") => Input::Slash(SlashCommand::MeshIdentity), Some("store") => Input::Slash(SlashCommand::MeshStore), _ => { display::print_error( "usage: /mesh peers|server|send|broadcast|subscribe|route|identity|store" ); Input::Empty } }, "/verify" => match arg { Some(username) => Input::Slash(SlashCommand::Verify { username }), None => { display::print_error("usage: /verify "); Input::Empty } }, "/update-key" | "/rotate-key" => Input::Slash(SlashCommand::UpdateKey), "/typing" => Input::Slash(SlashCommand::Typing), "/typing-notify" => match arg.as_deref() { Some("on") => Input::Slash(SlashCommand::TypingNotify { enabled: true }), Some("off") => Input::Slash(SlashCommand::TypingNotify { enabled: false }), _ => { display::print_error("usage: /typing-notify on|off"); Input::Empty } }, "/react" => match arg { Some(rest) => { let mut parts = rest.splitn(2, ' '); let emoji = parts.next().unwrap_or_default().to_string(); let index = parts.next().and_then(|s| s.trim().parse::().ok()); Input::Slash(SlashCommand::React { emoji, index }) } None => { display::print_error("usage: /react [msg-index]"); Input::Empty } }, "/edit" => match arg { Some(rest) => { let mut parts = rest.splitn(2, ' '); let idx_str = parts.next().unwrap_or_default(); match (idx_str.parse::(), parts.next()) { (Ok(index), Some(new_text)) if !new_text.trim().is_empty() => { Input::Slash(SlashCommand::Edit { index, new_text: new_text.trim().to_string() }) } _ => { display::print_error("usage: /edit "); Input::Empty } } } None => { display::print_error("usage: /edit "); Input::Empty } }, "/delete" | "/del" => match arg.and_then(|s| s.parse::().ok()) { Some(index) => Input::Slash(SlashCommand::Delete { index }), None => { display::print_error("usage: /delete "); Input::Empty } }, "/send-file" | "/sf" => match arg { Some(path) => Input::Slash(SlashCommand::SendFile { path }), None => { display::print_error("usage: /send-file "); Input::Empty } }, "/download" | "/dl" => match arg.and_then(|s| s.parse::().ok()) { Some(index) => Input::Slash(SlashCommand::Download { index }), None => { display::print_error("usage: /download "); Input::Empty } }, "/delete-account" => Input::Slash(SlashCommand::DeleteAccount), "/disappear" => Input::Slash(SlashCommand::Disappear { arg }), "/privacy" => Input::Slash(SlashCommand::Privacy { arg }), "/verify-fs" => Input::Slash(SlashCommand::VerifyFs), "/rotate-all-keys" => Input::Slash(SlashCommand::RotateAllKeys), "/devices" => Input::Slash(SlashCommand::Devices), "/register-device" => match arg { Some(name) => Input::Slash(SlashCommand::RegisterDevice { name }), None => { display::print_error("usage: /register-device "); Input::Empty } }, "/revoke-device" => match arg { Some(id_prefix) => Input::Slash(SlashCommand::RevokeDevice { id_prefix }), None => { display::print_error("usage: /revoke-device "); Input::Empty } }, _ => { display::print_error(&format!("unknown command: {cmd}. Try /help")); Input::Empty } } } // ── Auto-start server ──────────────────────────────────────────────────────── /// RAII guard that kills an auto-started server process on drop. struct ServerGuard(Option); impl Drop for ServerGuard { fn drop(&mut self) { if let Some(ref mut child) = self.0 { let _ = child.kill(); let _ = child.wait(); } } } /// Derive the TLS key path from the cert path (e.g. server-cert.der → server-key.der). fn derive_key_path(cert_path: &Path) -> PathBuf { let stem = cert_path .file_name() .and_then(|f| f.to_str()) .unwrap_or("server-cert.der"); let key_name = stem.replace("cert", "key"); let key_name = if key_name == stem { // No "cert" in filename — just append "-key" before extension. let p = PathBuf::from(stem); let ext = p.extension().and_then(|e| e.to_str()).unwrap_or("der"); let base = p.file_stem().and_then(|s| s.to_str()).unwrap_or("server"); format!("{base}-key.{ext}") } else { key_name }; cert_path.with_file_name(key_name) } /// Find the `qpc-server` binary: same directory as current exe, then PATH. fn find_server_binary() -> Option { if let Ok(exe) = std::env::current_exe() { let sibling = exe.with_file_name("qpc-server"); if sibling.exists() { return Some(sibling); } } // Fall back to PATH lookup. std::env::var_os("PATH").and_then(|paths| { std::env::split_paths(&paths) .map(|dir| dir.join("qpc-server")) .find(|p| p.exists()) }) } /// Try to connect to the server. Returns Ok(()) if reachable, Err otherwise. async fn probe_server(server: &str, ca_cert: &Path, server_name: &str) -> bool { connect_node(server, ca_cert, server_name).await.is_ok() } /// Ensure a server is running. If not reachable, auto-start one. /// Returns a guard that kills the child process on drop (if we started one). async fn ensure_server( server: &str, ca_cert: &Path, server_name: &str, no_server: bool, ) -> anyhow::Result { if no_server { return Ok(ServerGuard(None)); } // If the cert already exists, try connecting first. if ca_cert.exists() && probe_server(server, ca_cert, server_name).await { return Ok(ServerGuard(None)); } // Server not reachable — try to auto-start. let binary = match find_server_binary() { Some(b) => b, None => { if ca_cert.exists() { // Cert exists but connection failed and no binary found. anyhow::bail!( "server at {server} is not reachable and qpc-server binary not found; \ start a server manually or install qpc-server" ); } else { anyhow::bail!( "no server running and qpc-server binary not found; \ start a server manually or install qpc-server" ); } } }; let key_path = derive_key_path(ca_cert); display::print_status(&format!("starting server on {server}...")); let child = ProcessCommand::new(&binary) .args([ "--allow-insecure-auth", "--listen", server, "--tls-cert", &ca_cert.to_string_lossy(), "--tls-key", &key_path.to_string_lossy(), ]) .stdout(std::process::Stdio::null()) .stderr(std::process::Stdio::null()) .spawn() .with_context(|| format!("failed to spawn {}", binary.display()))?; let guard = ServerGuard(Some(child)); // Poll until the server is ready (cert may need to be generated first). let mut delay = Duration::from_millis(100); let max_wait = Duration::from_secs(5); let start = std::time::Instant::now(); loop { tokio::time::sleep(delay).await; if ca_cert.exists() && probe_server(server, ca_cert, server_name).await { display::print_status("server ready"); return Ok(guard); } if start.elapsed() > max_wait { anyhow::bail!( "auto-started qpc-server but it did not become ready within {max_wait:?}" ); } delay = (delay * 2).min(Duration::from_secs(1)); } } // ── REPL entry point ───────────────────────────────────────────────────────── #[allow(clippy::too_many_arguments)] pub async fn run_repl( state_path: &Path, server: &str, ca_cert: &Path, server_name: &str, password: Option<&str>, username: Option<&str>, opaque_password: Option<&str>, access_token: &str, device_id: Option<&str>, no_server: bool, ) -> anyhow::Result<()> { // Phase 0: Ensure a server is running (auto-start if needed). let _server_guard = ensure_server(server, ca_cert, server_name, no_server).await?; // Phase 1: Resolve an access token (auto-register/login if needed). let resolved_token = resolve_access_token( state_path, server, ca_cert, server_name, password, username, opaque_password, access_token, ).await?; // Phase 2: Set the global auth context. // Session tokens are hex-encoded raw bytes; decode back to raw for the wire format. // Bearer tokens (explicit --access-token) are used as-is via UTF-8 bytes. let token_bytes = hex::decode(&resolved_token) .unwrap_or_else(|_| resolved_token.into_bytes()); let auth_ctx = ClientAuth::from_raw(token_bytes, device_id.map(String::from)); init_auth(auth_ctx); // Phase 3: Normal REPL startup. let mut session = SessionState::load(state_path, password)?; let mut client = connect_node(server, ca_cert, server_name).await?; display::print_status(&format!( "identity: {}", hex::encode(session.identity.public_key_bytes()) )); // Auto-upload a fresh KeyPackage so peers can /dm us. // The returned GroupMember holds the HPKE init private key for Welcome decryption. // If this fails with an auth error, the cached session is stale — re-login. match auto_upload_keys(&session, &client).await { Ok(pending) => { session.pending_member = Some(pending); } Err(e) => { let err_str = format!("{e:#}").to_lowercase(); if err_str.contains("e003") || err_str.contains("e017") || err_str.contains("invalid accesstoken") { display::print_status("session expired, re-authenticating..."); clear_cached_session(state_path); let fresh_token = resolve_access_token( state_path, server, ca_cert, server_name, password, username, opaque_password, access_token, ).await?; let token_bytes = hex::decode(&fresh_token) .unwrap_or_else(|_| fresh_token.into_bytes()); let auth_ctx = ClientAuth::from_raw(token_bytes, device_id.map(String::from)); init_auth(auth_ctx); client = connect_node(server, ca_cert, server_name).await?; match auto_upload_keys(&session, &client).await { Ok(pending) => { session.pending_member = Some(pending); } Err(e2) => { display::print_error(&format!("key upload after re-auth: {e2:#}")); } } } else { display::print_error(&format!("key upload: {e:#}")); display::print_status("peers may not be able to invite you until a KeyPackage is uploaded"); } } } let conv_count = session.conv_store.list_conversations()?.len(); if conv_count > 0 { display::print_status(&format!("{conv_count} conversations loaded")); } display::print_status("type /help for commands, Ctrl+D to exit"); println!(); // Auto-switch to the most recent conversation if there is one. if let Some(conv) = session.conv_store.list_conversations()?.into_iter().next() { session.active_conversation = Some(conv.id.clone()); session.conv_store.reset_unread(&conv.id)?; } let (tx, mut rx) = mpsc::unbounded_channel::>(); // Spawn stdin reader (must be spawn_local — capnp-rpc is !Send). tokio::task::spawn_local({ let tx = tx.clone(); async move { use tokio::io::AsyncBufReadExt; let mut stdin = tokio::io::BufReader::new(tokio::io::stdin()); let mut line = String::new(); loop { line.clear(); match stdin.read_line(&mut line).await { Ok(0) => { let _ = tx.send(None); break; } Ok(_) => { let _ = tx.send(Some(line.trim().to_string())); } Err(_) => break, } } } }); let mut poll = interval(Duration::from_millis(1000)); poll.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); let mut consecutive_errors: u32 = 0; let mut backoff_ms: u64 = 1000; const MAX_BACKOFF_MS: u64 = 60_000; display::print_prompt(&session); loop { tokio::select! { msg = rx.recv() => { match msg { Some(None) | None => break, Some(Some(line)) => { match parse_input(&line) { Input::Slash(SlashCommand::Quit) => break, Input::Slash(cmd) => { handle_slash(&mut session, &client, cmd).await; } Input::ChatMessage(text) => { handle_send(&mut session, &client, &text).await; } Input::Empty => {} } display::print_prompt(&session); } } } _ = poll.tick() => { // Drain offline outbox before polling for new messages. drain_outbox(&mut session, &client).await; // Expire stale typing indicators (10-second timeout). let now = std::time::Instant::now(); session.typing_indicators.retain(|_, ts| now.duration_since(*ts).as_secs() < 10); // Auto-clear: delete messages older than the configured duration. if let Some(max_age_secs) = session.auto_clear_secs { let cutoff_ms = now_ms().saturating_sub(max_age_secs as u64 * 1000); if let Err(e) = session.conv_store.delete_messages_before(cutoff_ms) { tracing::debug!(error = %e, "auto-clear failed"); } } // Traffic padding: send a dummy message every ~30 seconds. if session.padding_enabled { static LAST_PADDING: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); let now_secs = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_secs(); let last = LAST_PADDING.load(std::sync::atomic::Ordering::Relaxed); if now_secs.saturating_sub(last) >= 30 { LAST_PADDING.store(now_secs, std::sync::atomic::Ordering::Relaxed); send_dummy_message(&mut session, &client).await; } } match poll_messages(&mut session, &client).await { Ok(()) => { consecutive_errors = 0; backoff_ms = 1000; } Err(e) => { consecutive_errors += 1; tracing::warn!(error = format!("{e:#}"), n = consecutive_errors, "poll error"); if consecutive_errors >= 3 { display::print_status(&format!( "connection lost, reconnecting in {:.0}s...", backoff_ms as f64 / 1000.0 )); tokio::time::sleep(Duration::from_millis(backoff_ms)).await; match connect_node(server, ca_cert, server_name).await { Ok(new_client) => { client = new_client; consecutive_errors = 0; backoff_ms = 1000; display::print_status("reconnected"); display::print_prompt(&session); } Err(re) => { tracing::debug!(error = %re, "reconnect failed"); // Exponential backoff, capped. backoff_ms = (backoff_ms * 2).min(MAX_BACKOFF_MS); } } } } } } } } display::print_status("saving state..."); session.save_all()?; println!(); Ok(()) } // ── Startup helpers ───────────────────────────────────────────────────────── /// Generate and upload a fresh KeyPackage (and hybrid key if available) so peers can invite us. /// /// Returns the `GroupMember` whose keystore holds the HPKE init private key /// needed to decrypt the next incoming Welcome. The caller should store it as /// `session.pending_member`. async fn auto_upload_keys( session: &SessionState, client: &node_service::Client, ) -> anyhow::Result { let ks_path = session.state_path.with_extension("pending.ks"); let ks = DiskKeyStore::persistent(&ks_path) .unwrap_or_else(|_| DiskKeyStore::ephemeral()); let mut member = GroupMember::new_with_state( Arc::clone(&session.identity), ks, None, false, ); let kp_bytes = member.generate_key_package().context("generate KeyPackage")?; let id_key = session.identity.public_key_bytes(); upload_key_package(client, &id_key, &kp_bytes).await?; display::print_status("KeyPackage uploaded"); if let Some(ref hkp) = session.hybrid_kp { upload_hybrid_key(client, &id_key, &hkp.public_key()).await?; display::print_status("hybrid key uploaded"); } Ok(member) } /// Determine the access token, performing OPAQUE registration/login as needed. #[allow(clippy::too_many_arguments)] async fn resolve_access_token( state_path: &Path, server: &str, ca_cert: &Path, server_name: &str, state_password: Option<&str>, username: Option<&str>, opaque_password: Option<&str>, cli_access_token: &str, ) -> anyhow::Result { // Priority 1: Explicit --access-token (or env var). if !cli_access_token.is_empty() { display::print_status("using provided access token"); return Ok(cli_access_token.to_string()); } // Priority 2: Cached session token from a previous login. if let Some(cached) = load_cached_session(state_path, state_password) { display::print_status(&format!("using cached session for '{}'", cached.username)); return Ok(cached.token_hex); } // Priority 3: Login with --username/--password (or prompt interactively). let username = match username { Some(u) => u.to_string(), None => { use std::io::Write; eprint!("Username: "); std::io::stderr().flush().ok(); let mut input = String::new(); std::io::stdin() .read_line(&mut input) .context("failed to read username")?; let trimmed = input.trim().to_string(); anyhow::ensure!(!trimmed.is_empty(), "username is required"); trimmed } }; let opaque_password = match opaque_password { Some(p) => p.to_string(), None => { eprint!("Password: "); rpassword::read_password().context("failed to read password")? } }; // Ensure state file exists (creates identity + hybrid key if missing). let state = load_or_init_state(state_path, state_password)?; let identity = IdentityKeypair::from_seed(state.identity_seed); let identity_key = identity.public_key_bytes().to_vec(); // Connect for the OPAQUE flow. let node_client = connect_node(server, ca_cert, server_name).await?; // Try registration (idempotent: E018 = already registered → proceed to login). display::print_status(&format!("registering '{username}'...")); match opaque_register(&node_client, &username, &opaque_password, Some(&identity_key)).await { Ok(()) => { display::print_status(&format!("user '{username}' registered")); } Err(e) => { let msg = format!("{e:#}"); if msg.contains("E018") || msg.to_lowercase().contains("already") { display::print_status(&format!("user '{username}' already registered")); } else { return Err(e).context("OPAQUE registration failed"); } } } // Login (fresh connection in case register left RPC state dirty). display::print_status(&format!("logging in as '{username}'...")); let node_client = connect_node(server, ca_cert, server_name).await?; let token_bytes = opaque_login(&node_client, &username, &opaque_password, &identity_key).await?; let token_hex = hex::encode(&token_bytes); // Cache for future sessions. save_cached_session(state_path, &username, &token_hex, state_password)?; display::print_status("logged in, session cached"); Ok(token_hex) } // ── Slash command handlers ─────────────────────────────────────────────────── async fn handle_slash( session: &mut SessionState, client: &node_service::Client, cmd: SlashCommand, ) { let result = match cmd { SlashCommand::Help => { print_help(); Ok(()) } SlashCommand::Quit => unreachable!(), SlashCommand::Whoami => cmd_whoami(session), SlashCommand::List => cmd_list(session), SlashCommand::Switch { target } => cmd_switch(session, &target), SlashCommand::Dm { username } => cmd_dm(session, client, &username).await, SlashCommand::CreateGroup { name } => cmd_create_group(session, &name), SlashCommand::Invite { target } => cmd_invite(session, client, &target).await, SlashCommand::Remove { target } => cmd_remove(session, client, &target).await, SlashCommand::Leave => cmd_leave(session, client).await, SlashCommand::Join => cmd_join(session, client).await, SlashCommand::Members => cmd_members(session, client).await, SlashCommand::GroupInfo => cmd_group_info(session, client).await, SlashCommand::Rename { name } => cmd_rename(session, &name), SlashCommand::History { count } => cmd_history(session, count), SlashCommand::MeshPeers => cmd_mesh_peers(), SlashCommand::MeshServer { addr } => { display::print_status(&format!( "mesh server hint: reconnect with --server {addr} to use this node" )); Ok(()) } SlashCommand::MeshSend { peer_id, message } => cmd_mesh_send(&peer_id, &message), SlashCommand::MeshBroadcast { topic, message } => cmd_mesh_broadcast(&topic, &message), SlashCommand::MeshSubscribe { topic } => cmd_mesh_subscribe(&topic), SlashCommand::MeshRoute => cmd_mesh_route(session), SlashCommand::MeshIdentity => cmd_mesh_identity(session), SlashCommand::MeshStore => cmd_mesh_store(session), SlashCommand::Verify { username } => cmd_verify(session, client, &username).await, SlashCommand::UpdateKey => cmd_update_key(session, client).await, SlashCommand::Typing => cmd_typing(session, client).await, SlashCommand::TypingNotify { enabled } => { session.typing_notify_enabled = enabled; display::print_status(&format!( "typing notifications {}", if enabled { "enabled" } else { "disabled" } )); Ok(()) } SlashCommand::React { emoji, index } => cmd_react(session, client, &emoji, index).await, SlashCommand::Edit { index, new_text } => cmd_edit(session, client, index, &new_text).await, SlashCommand::Delete { index } => cmd_delete(session, client, index).await, SlashCommand::SendFile { path } => cmd_send_file(session, client, &path).await, SlashCommand::Download { index } => cmd_download(session, client, index).await, SlashCommand::DeleteAccount => cmd_delete_account(session, client).await, SlashCommand::Disappear { arg } => cmd_disappear(session, arg.as_deref()), SlashCommand::Privacy { arg } => cmd_privacy(session, arg.as_deref()), SlashCommand::VerifyFs => cmd_verify_fs(session), SlashCommand::RotateAllKeys => cmd_rotate_all_keys(session, client).await, SlashCommand::Devices => cmd_devices(client).await, SlashCommand::RegisterDevice { name } => cmd_register_device(client, &name).await, SlashCommand::RevokeDevice { id_prefix } => cmd_revoke_device(client, &id_prefix).await, }; if let Err(e) = result { display::print_error(&format!("{e:#}")); } } pub(crate) fn print_help() { display::print_status("Commands:"); display::print_status(" /dm - Start or switch to a DM (federation supported)"); display::print_status(" /create-group - Create a new group"); display::print_status(" /invite - Invite user to current group"); display::print_status(" /remove - Remove a member from the current group"); display::print_status(" /leave - Leave the current group"); display::print_status(" /join - Join a group from pending Welcome"); display::print_status(" /switch <@user|#group> - Switch conversation"); display::print_status(" /list - List all conversations"); display::print_status(" /members - Show members of current conversation"); display::print_status(" /group-info - Show detailed info about the active conversation"); display::print_status(" /rename - Rename the current conversation"); display::print_status(" /history [N] - Show last N messages (default: 20)"); display::print_status(" /whoami - Show your identity"); display::print_status(" /mesh peers - Discover nearby qpc nodes via mDNS"); display::print_status(" /mesh server - Show how to reconnect to a mesh node"); display::print_status(" /mesh send - Send a P2P message to a mesh peer"); display::print_status(" /mesh broadcast - Broadcast an encrypted message on a topic"); display::print_status(" /mesh subscribe - Subscribe to a broadcast topic"); display::print_status(" /mesh route - Show known mesh peers and routes"); display::print_status(" /mesh identity - Show mesh node identity info"); display::print_status(" /mesh store - Show mesh store-and-forward stats"); display::print_status(" /update-key - Rotate your MLS leaf key in the active group"); display::print_status(" /verify - Show safety number for key verification"); display::print_status(" /react [index] - React to last message (or message at index)"); display::print_status(" /typing - Send a typing indicator"); display::print_status(" /typing-notify on|off - Toggle typing notifications"); display::print_status(" /edit - Edit a sent message"); display::print_status(" /delete - Delete a sent message"); display::print_status(" /send-file - Upload and send a file (max 50 MB)"); display::print_status(" /download - Download a received file attachment"); display::print_status(" /delete-account - Permanently delete your account"); display::print_status(" /disappear - Set disappearing messages (1h, 30m, 1d, 300)"); display::print_status(" /disappear off - Disable disappearing messages"); display::print_status(" /disappear - Show current setting"); display::print_status(" /privacy - Show current privacy settings"); display::print_status(" /privacy redact-keys on|off - Redact identity keys in /members, /group-info"); display::print_status(" /privacy auto-clear - Auto-clear local messages older than "); display::print_status(" /privacy padding on|off - Toggle dummy traffic padding"); display::print_status(" /verify-fs - Verify MLS forward secrecy (epoch advancement)"); display::print_status(" /rotate-all-keys - Rotate MLS key + regenerate hybrid KEM keypair"); display::print_status(" /devices - List all registered devices"); display::print_status(" /register-device - Register this device with a name"); display::print_status(" /revoke-device - Remove a device by hex ID prefix"); display::print_status(" /quit - Exit"); } /// Parse a human-friendly duration string into seconds. /// Supports: "30s", "5m", "1h", "1d", "300" (plain seconds). fn parse_duration_secs(s: &str) -> Option { let s = s.trim().to_lowercase(); if s.ends_with('d') { s[..s.len() - 1].parse::().ok().map(|d| d * 86400) } else if s.ends_with('h') { s[..s.len() - 1].parse::().ok().map(|h| h * 3600) } else if s.ends_with('m') { s[..s.len() - 1].parse::().ok().map(|m| m * 60) } else if s.ends_with('s') { s[..s.len() - 1].parse::().ok() } else { s.parse::().ok() } } /// Format a TTL in seconds into a human-friendly string. fn format_ttl(secs: u32) -> String { if secs >= 86400 && secs.is_multiple_of(86400) { format!("{} day(s)", secs / 86400) } else if secs >= 3600 && secs.is_multiple_of(3600) { format!("{} hour(s)", secs / 3600) } else if secs >= 60 && secs.is_multiple_of(60) { format!("{} minute(s)", secs / 60) } else { format!("{} second(s)", secs) } } pub(crate) fn cmd_disappear( session: &mut SessionState, arg: Option<&str>, ) -> anyhow::Result<()> { let conv_id = session .active_conversation .as_ref() .context("no active conversation")? .clone(); match arg { None => { // Show current setting. match session.disappear_ttl.get(&conv_id) { Some(ttl) => display::print_status(&format!( "messages will disappear after {}", format_ttl(*ttl) )), None => display::print_status("disappearing messages are off"), } } Some(s) if s.eq_ignore_ascii_case("off") => { session.disappear_ttl.remove(&conv_id); display::print_status("disappearing messages disabled"); } Some(s) => { let secs = parse_duration_secs(s) .context("invalid duration; use e.g. 30m, 1h, 1d, or 300")?; if secs == 0 { anyhow::bail!("TTL must be greater than 0"); } session.disappear_ttl.insert(conv_id, secs); display::print_status(&format!( "messages will disappear after {}", format_ttl(secs) )); } } Ok(()) } pub(crate) fn cmd_privacy( session: &mut SessionState, arg: Option<&str>, ) -> anyhow::Result<()> { match arg { None => { display::print_status(&format!( "redact-keys: {}", if session.redact_keys { "on" } else { "off" } )); display::print_status(&format!( "auto-clear: {}", match session.auto_clear_secs { Some(secs) => format_ttl(secs), None => "off".to_string(), } )); display::print_status(&format!( "padding: {}", if session.padding_enabled { "on" } else { "off" } )); } Some(s) if s.starts_with("redact-keys ") => { let val = s.trim_start_matches("redact-keys ").trim(); match val { "on" => { session.redact_keys = true; display::print_status("key redaction enabled"); } "off" => { session.redact_keys = false; display::print_status("key redaction disabled"); } _ => display::print_error("usage: /privacy redact-keys on|off"), } } Some(s) if s.starts_with("auto-clear ") => { let val = s.trim_start_matches("auto-clear ").trim(); if val.eq_ignore_ascii_case("off") { session.auto_clear_secs = None; display::print_status("auto-clear disabled"); } else { let secs = parse_duration_secs(val) .context("invalid duration; use e.g. 30m, 1h, 1d, or 300")?; if secs == 0 { anyhow::bail!("auto-clear duration must be greater than 0"); } session.auto_clear_secs = Some(secs); display::print_status(&format!( "auto-clear enabled: messages older than {} will be deleted", format_ttl(secs) )); } } Some(s) if s.starts_with("padding ") => { let val = s.trim_start_matches("padding ").trim(); match val { "on" => { session.padding_enabled = true; display::print_status("traffic padding enabled (dummy messages every 30s)"); } "off" => { session.padding_enabled = false; display::print_status("traffic padding disabled"); } "status" => { display::print_status(&format!( "padding: {}", if session.padding_enabled { "on" } else { "off" } )); } _ => display::print_error("usage: /privacy padding on|off|status"), } } Some(_) => { display::print_error("usage: /privacy [redact-keys on|off | auto-clear |off | padding on|off|status]"); } } Ok(()) } pub(crate) fn cmd_verify_fs(session: &SessionState) -> anyhow::Result<()> { let conv_id = session .active_conversation .as_ref() .context("no active conversation")?; let member = session .members .get(conv_id) .context("no group member for active conversation")?; let current_epoch = member.epoch().context("no MLS group in active conversation")?; match session.last_send_epoch { Some(last) if current_epoch > last => { display::print_status(&format!( "forward secrecy OK: epoch advanced from {} to {}", last, current_epoch )); } Some(last) if current_epoch == last => { display::print_status(&format!( "warning: MLS epoch has NOT advanced since last send (epoch {}). \ Use /rotate-all-keys or /update-key to rotate keys.", current_epoch )); } Some(last) => { display::print_status(&format!( "unexpected: current epoch {} < last send epoch {}", current_epoch, last )); } None => { display::print_status(&format!( "no previous send recorded. Current epoch: {}", current_epoch )); } } Ok(()) } pub(crate) async fn cmd_rotate_all_keys( session: &mut SessionState, client: &node_service::Client, ) -> anyhow::Result<()> { // Step 1: MLS leaf key rotation (same as /update-key). cmd_update_key(session, client).await?; // Step 2: Generate new hybrid KEM keypair and upload. let new_kp = quicprochat_core::HybridKeypair::generate(); let id_key = session.identity.public_key_bytes(); upload_hybrid_key(client, &id_key, &new_kp.public_key()).await?; session.hybrid_kp = Some(new_kp); display::print_status("all keys rotated: MLS leaf key + hybrid KEM keypair"); Ok(()) } /// Discover nearby qpc servers via mDNS (requires `--features mesh` build). pub(crate) fn cmd_mesh_peers() -> anyhow::Result<()> { use super::mesh_discovery::MeshDiscovery; match MeshDiscovery::start() { Err(e) => { display::print_error(&format!("mesh discovery: {e}")); return Ok(()); } Ok(disc) => { display::print_status("scanning for nearby qpc nodes (2s)..."); // Block briefly to collect mDNS announcements from the local network. std::thread::sleep(std::time::Duration::from_secs(2)); let peers = disc.peers(); if peers.is_empty() { display::print_status("no qpc nodes found on the local network"); } else { display::print_status(&format!("found {} node(s):", peers.len())); for p in &peers { display::print_status(&format!(" {} at {}", p.domain, p.server_addr)); } display::print_status("use: /mesh server to note the address,"); display::print_status("then reconnect with: qpc --server "); } } } Ok(()) } /// Send a direct P2P mesh message (stub — P2pNode not yet wired into session). pub(crate) fn cmd_mesh_send(peer_id: &str, message: &str) -> anyhow::Result<()> { #[cfg(feature = "mesh")] { display::print_status(&format!("mesh send: would send to {peer_id}: {message}")); display::print_status("(P2P node integration pending — message not actually sent)"); } #[cfg(not(feature = "mesh"))] { let _ = (peer_id, message); display::print_error("requires --features mesh"); } Ok(()) } /// Broadcast an encrypted message on a topic (stub — P2pNode not yet wired into session). pub(crate) fn cmd_mesh_broadcast(topic: &str, message: &str) -> anyhow::Result<()> { #[cfg(feature = "mesh")] { display::print_status(&format!("mesh broadcast to {topic}: {message}")); display::print_status("(P2P node integration pending — message not actually sent)"); } #[cfg(not(feature = "mesh"))] { let _ = (topic, message); display::print_error("requires --features mesh"); } Ok(()) } /// Subscribe to a broadcast topic (stub — P2pNode not yet wired into session). pub(crate) fn cmd_mesh_subscribe(topic: &str) -> anyhow::Result<()> { #[cfg(feature = "mesh")] { display::print_status(&format!("subscribed to topic: {topic}")); display::print_status("(P2P node integration pending — subscription is not persisted)"); } #[cfg(not(feature = "mesh"))] { let _ = topic; display::print_error("requires --features mesh"); } Ok(()) } /// Display known mesh peers and routes from the mesh identity file. pub(crate) fn cmd_mesh_route(session: &SessionState) -> anyhow::Result<()> { #[cfg(feature = "mesh")] { let mesh_state_path = session.state_path.with_extension("mesh.json"); if mesh_state_path.exists() { let id = quicprochat_p2p::identity::MeshIdentity::load(&mesh_state_path)?; let peers = id.known_peers(); if peers.is_empty() { display::print_status("no known mesh peers"); } else { display::print_status(&format!("{} known peer(s):", peers.len())); for (hex_id, info) in peers { let short_id = &hex_id[..8.min(hex_id.len())]; let addrs = if info.addresses.is_empty() { "no addresses".to_string() } else { info.addresses.join(", ") }; display::print_status(&format!(" {short_id}... last_seen={} addrs={addrs}", info.last_seen)); } } } else { display::print_status("no mesh identity file found (start mesh mode first)"); } } #[cfg(not(feature = "mesh"))] { let _ = session; display::print_error("requires --features mesh"); } Ok(()) } /// Display mesh node identity information. pub(crate) fn cmd_mesh_identity(session: &SessionState) -> anyhow::Result<()> { #[cfg(feature = "mesh")] { let mesh_state_path = session.state_path.with_extension("mesh.json"); if mesh_state_path.exists() { let id = quicprochat_p2p::identity::MeshIdentity::load(&mesh_state_path)?; display::print_status(&format!("mesh public key: {}", hex::encode(id.public_key()))); display::print_status(&format!("known peers: {}", id.known_peers().len())); } else { display::print_status("no mesh identity file found"); display::print_status("a mesh identity will be created when mesh mode is started"); } } #[cfg(not(feature = "mesh"))] { let _ = session; display::print_error("requires --features mesh"); } Ok(()) } /// Display mesh store-and-forward statistics. pub(crate) fn cmd_mesh_store(session: &SessionState) -> anyhow::Result<()> { #[cfg(feature = "mesh")] { // Without a live P2pNode in the session, we can only report that the store // is not active. Once P2pNode is wired in, this will show real stats. display::print_status("mesh store: not active (P2P node not started in this session)"); display::print_status("start mesh mode to enable store-and-forward"); let _ = session; } #[cfg(not(feature = "mesh"))] { let _ = session; display::print_error("requires --features mesh"); } Ok(()) } pub(crate) fn cmd_whoami(session: &SessionState) -> anyhow::Result<()> { display::print_status(&format!( "identity: {}", hex::encode(session.identity.public_key_bytes()) )); display::print_status(&format!( "hybrid key: {}", if session.hybrid_kp.is_some() { "yes" } else { "no" } )); display::print_status(&format!( "conversations: {}", session.members.len() )); Ok(()) } pub(crate) fn cmd_list(session: &SessionState) -> anyhow::Result<()> { let convs = session.conv_store.list_conversations()?; if convs.is_empty() { display::print_status("no conversations yet. Try /dm or /create-group "); return Ok(()); } for conv in &convs { let kind_str = match &conv.kind { ConversationKind::Dm { .. } => "dm", ConversationKind::Group { .. } => "group", }; let active = session .active_conversation .as_ref() .map(|a| a == &conv.id) .unwrap_or(false); let marker = if active { " *" } else { "" }; println!( "{}{}", display::format_conv_line( &conv.display_name, kind_str, conv.unread_count, conv.member_keys.len(), ), marker, ); } Ok(()) } pub(crate) fn cmd_switch(session: &mut SessionState, target: &str) -> anyhow::Result<()> { let target = target.trim(); let conv = if let Some(username) = target.strip_prefix('@') { session.conv_store.list_conversations()?.into_iter().find(|c| { matches!(&c.kind, ConversationKind::Dm { peer_username: Some(u), .. } if u == username) }) } else if let Some(name) = target.strip_prefix('#') { session.conv_store.find_group_by_name(name)? } else { // Try as display name session.conv_store.list_conversations()?.into_iter().find(|c| c.display_name == target) }; match conv { Some(c) => { session.conv_store.reset_unread(&c.id)?; session.active_conversation = Some(c.id); display::print_status(&format!("switched to {}", c.display_name)); } None => { display::print_error(&format!("conversation not found: {target}")); } } Ok(()) } pub(crate) async fn cmd_dm( session: &mut SessionState, client: &node_service::Client, username: &str, ) -> anyhow::Result<()> { // Resolve username → identity key. display::print_status(&format!("resolving {username}...")); let peer_key = resolve_user(client, username) .await? .with_context(|| format!("user '{username}' not found"))?; // Self-DM → local-only notepad (no server channel, no MLS). if peer_key == session.identity_bytes() { let conv_id = ConversationId::from_group_name(&format!("self-{username}")); if let Some(existing) = session.conv_store.load_conversation(&conv_id)? { session.conv_store.reset_unread(&existing.id)?; session.active_conversation = Some(existing.id); display::print_status("switched to notes"); return Ok(()); } let conv = Conversation { id: conv_id.clone(), kind: ConversationKind::Dm { peer_key: peer_key.clone(), peer_username: Some(username.to_string()), }, display_name: format!("@{username} (notes)"), mls_group_blob: None, keystore_blob: None, member_keys: vec![peer_key], unread_count: 0, last_activity_ms: now_ms(), created_at_ms: now_ms(), is_hybrid: false, last_seen_seq: 0, }; let ks = DiskKeyStore::ephemeral(); let member = GroupMember::new_with_state(Arc::clone(&session.identity), ks, None, false); session.add_conversation(conv, member)?; session.active_conversation = Some(conv_id); display::print_status("notes created — messages here are local only"); return Ok(()); } // Check if we already have a DM with this peer. if let Some(existing) = session.conv_store.find_dm_by_peer(&peer_key)? { session.conv_store.reset_unread(&existing.id)?; session.active_conversation = Some(existing.id); display::print_status(&format!("switched to DM with @{username}")); return Ok(()); } // Create or look up the server-side channel. // was_new=true → this call created the channel; we are the MLS initiator. // was_new=false → channel already existed; peer is the MLS initiator and has // sent (or will send) us a Welcome. Wait for try_auto_join. display::print_status("creating channel..."); let (channel_id, was_new) = create_channel(client, &peer_key).await?; if !was_new { // Peer is the MLS initiator. Their Welcome is en route; the background // poller's try_auto_join will process it within the next poll interval // and auto-switch to the conversation automatically. display::print_status(&format!( "DM channel with @{username} exists — peer is initiator, auto-joining via Welcome (arrives within ~1 s)" )); return Ok(()); } let conv_id = ConversationId::from_slice(&channel_id) .context("server returned invalid channel_id length")?; // Fetch peer's KeyPackage for MLS. display::print_status("fetching peer's key package..."); let kp_bytes = fetch_key_package(client, &peer_key).await?; anyhow::ensure!(!kp_bytes.is_empty(), "peer has no key package uploaded"); // Negotiate hybrid mode: use post-quantum MLS if both sides have hybrid keys. let peer_hybrid_pk = fetch_hybrid_key(client, &peer_key).await?; let use_hybrid = session.hybrid_kp.is_some() && peer_hybrid_pk.is_some(); // Create MLS group using channel_id as group_id. let ks_dir = session.state_path.with_extension("keystores"); std::fs::create_dir_all(&ks_dir).ok(); let ks_path = ks_dir.join(format!("{}.ks", conv_id.hex())); let ks = DiskKeyStore::persistent(&ks_path)?; let mut member = GroupMember::new_with_state(Arc::clone(&session.identity), ks, None, use_hybrid); // Generate a key package for ourselves (needed for MLS) let _my_kp = member.generate_key_package()?; member.create_group(&channel_id)?; let (commit, welcome) = member.add_member(&kp_bytes)?; // Deliver welcome to peer and commit to peer. // For hybrid MLS groups, the MLS layer already provides PQ protection, // so we skip the outer hybrid envelope wrapping. let wrap = |data: &[u8]| -> anyhow::Result> { if use_hybrid { Ok(data.to_vec()) // MLS-level hybrid KEM is sufficient } else if let Some(ref pk) = peer_hybrid_pk { hybrid_encrypt(pk, data, b"", b"").context("hybrid encrypt") } else { Ok(data.to_vec()) } }; enqueue(client, &peer_key, &wrap(&welcome)?).await?; enqueue(client, &peer_key, &wrap(&commit)?).await?; let member_keys = member.member_identities(); let conv = Conversation { id: conv_id.clone(), kind: ConversationKind::Dm { peer_key: peer_key.clone(), peer_username: Some(username.to_string()), }, display_name: format!("@{username}"), mls_group_blob: member .serialize_mls_state() .context("serialize MLS state")?, keystore_blob: None, member_keys, unread_count: 0, last_activity_ms: now_ms(), created_at_ms: now_ms(), is_hybrid: member.is_hybrid(), last_seen_seq: 0, }; session.add_conversation(conv, member)?; session.active_conversation = Some(conv_id); display::print_status(&format!("DM with @{username} created. Start typing!")); Ok(()) } pub(crate) fn cmd_create_group(session: &mut SessionState, name: &str) -> anyhow::Result<()> { let conv_id = ConversationId::from_group_name(name); if session.conv_store.find_group_by_name(name)?.is_some() { session.active_conversation = Some(conv_id); display::print_status(&format!("switched to existing group #{name}")); return Ok(()); } let ks_dir = session.state_path.with_extension("keystores"); std::fs::create_dir_all(&ks_dir).ok(); let ks_path = ks_dir.join(format!("{}.ks", conv_id.hex())); let ks = DiskKeyStore::persistent(&ks_path)?; let mut member = GroupMember::new_with_state(Arc::clone(&session.identity), ks, None, false); let _my_kp = member.generate_key_package()?; member.create_group(conv_id.0.as_slice())?; let member_keys = member.member_identities(); let conv = Conversation { id: conv_id.clone(), kind: ConversationKind::Group { name: name.to_string() }, display_name: format!("#{name}"), mls_group_blob: member .serialize_mls_state() .context("serialize MLS state")?, keystore_blob: None, member_keys, unread_count: 0, last_activity_ms: now_ms(), created_at_ms: now_ms(), is_hybrid: false, last_seen_seq: 0, }; session.add_conversation(conv, member)?; session.active_conversation = Some(conv_id); display::print_status(&format!("group #{name} created. Use /invite to add members")); Ok(()) } pub(crate) async fn cmd_invite( session: &mut SessionState, client: &node_service::Client, target: &str, ) -> anyhow::Result<()> { let conv_id = session .active_conversation .as_ref() .context("no active conversation; switch to a group first")? .clone(); let my_key = session.identity_bytes(); // Resolve the target — could be a username or hex key. let peer_key = if target.len() == 64 && target.chars().all(|c| c.is_ascii_hexdigit()) { decode_identity_key(target)? } else { display::print_status(&format!("resolving {target}...")); resolve_user(client, target) .await? .with_context(|| format!("user '{target}' not found"))? }; display::print_status("fetching key package..."); let kp_bytes = fetch_key_package(client, &peer_key).await?; anyhow::ensure!(!kp_bytes.is_empty(), "peer has no key package uploaded"); let member = session .get_member_mut(&conv_id) .context("no group member for active conversation")?; anyhow::ensure!( member.group_ref().is_some(), "active conversation has no MLS group" ); let (commit, welcome) = member.add_member(&kp_bytes)?; // Get member list before dropping the mutable borrow. let other_members: Vec> = member .member_identities() .into_iter() .filter(|id| id.as_slice() != my_key.as_slice() && id.as_slice() != peer_key.as_slice()) .collect(); // Deliver welcome to new member. let peer_hybrid_pk = fetch_hybrid_key(client, &peer_key).await?; let wrap = |data: &[u8]| -> anyhow::Result> { if let Some(ref pk) = peer_hybrid_pk { hybrid_encrypt(pk, data, b"", b"").context("hybrid encrypt") } else { Ok(data.to_vec()) } }; enqueue(client, &peer_key, &wrap(&welcome)?).await?; for mk in &other_members { let pk = fetch_hybrid_key(client, mk).await?; let payload = if let Some(ref pk) = pk { hybrid_encrypt(pk, &commit, b"", b"").context("hybrid encrypt commit")? } else { commit.clone() }; enqueue(client, mk, &payload).await?; } session.save_member(&conv_id)?; display::print_status(&format!("invited {target} and broadcast commit")); Ok(()) } pub(crate) async fn cmd_remove( session: &mut SessionState, client: &node_service::Client, target: &str, ) -> anyhow::Result<()> { let conv_id = session .active_conversation .as_ref() .context("no active conversation")? .clone(); let my_key = session.identity_bytes(); // Resolve the target — username or hex key. let target_key = if target.len() == 64 && target.chars().all(|c| c.is_ascii_hexdigit()) { decode_identity_key(target)? } else { resolve_user(client, target) .await? .with_context(|| format!("user '{target}' not found"))? }; let member = session .get_member_mut(&conv_id) .context("no group member for active conversation")?; let commit = member.remove_member(&target_key)?; // Fan-out commit to all remaining members (excluding self). let remaining: Vec> = member .member_identities() .into_iter() .filter(|id| id.as_slice() != my_key.as_slice()) .collect(); for rk in &remaining { enqueue(client, rk, &commit).await?; } session.save_member(&conv_id)?; display::print_status(&format!("removed {target} from the group")); Ok(()) } pub(crate) async fn cmd_leave( session: &mut SessionState, client: &node_service::Client, ) -> anyhow::Result<()> { let conv_id = session .active_conversation .as_ref() .context("no active conversation")? .clone(); let my_key = session.identity_bytes(); let member = session .get_member_mut(&conv_id) .context("no group member for active conversation")?; let proposal = member.leave_group()?; // Send leave proposal to all other members. let others: Vec> = member .member_identities() .into_iter() .filter(|id| id.as_slice() != my_key.as_slice()) .collect(); for rk in &others { enqueue(client, rk, &proposal).await?; } // Locally deactivate the conversation (keep history). session.members.remove(&conv_id); session.active_conversation = None; display::print_status("left the conversation"); Ok(()) } pub(crate) async fn cmd_update_key( session: &mut SessionState, client: &node_service::Client, ) -> anyhow::Result<()> { let conv_id = session .active_conversation .as_ref() .context("no active conversation; switch to a group first")? .clone(); let my_key = session.identity_bytes(); let member = session .get_member_mut(&conv_id) .context("no group member for active conversation")?; anyhow::ensure!( member.group_ref().is_some(), "active conversation has no MLS group" ); // Propose a self-update (leaf key rotation). let proposal = member.propose_self_update()?; // Immediately commit the pending proposal. let (commit, _welcome) = member.commit_pending_proposals()?; // Fan out the commit to all other group members. let others: Vec> = member .member_identities() .into_iter() .filter(|id| id.as_slice() != my_key.as_slice()) .collect(); // Send proposal followed by commit so recipients can process in order. for rk in &others { enqueue(client, rk, &proposal).await?; enqueue(client, rk, &commit).await?; } session.save_member(&conv_id)?; display::print_status("key rotation complete"); Ok(()) } pub(crate) async fn cmd_join( session: &mut SessionState, client: &node_service::Client, ) -> anyhow::Result<()> { display::print_status("checking for pending Welcome messages..."); let identity_bytes = session.identity_bytes(); let mut payloads = fetch_wait(client, &identity_bytes, 0).await?; payloads.sort_by_key(|(seq, _)| *seq); if payloads.is_empty() { display::print_status("no pending messages"); return Ok(()); } for (_seq, payload) in &payloads { let mls_payload = match try_hybrid_decrypt(session.hybrid_kp.as_ref(), payload) { Ok(b) => b, Err(_) => continue, }; // Try to process with existing groups first let mut handled = false; for member in session.members.values_mut() { match member.receive_message(&mls_payload) { Ok(_) => { handled = true; break; } Err(_) => continue, } } if handled { continue; } // Not handled by any existing group — try as Welcome. let ks = DiskKeyStore::ephemeral(); let mut new_member = GroupMember::new_with_state( Arc::clone(&session.identity), ks, None, false, ); // Need a key package to decrypt Welcome. let _kp = new_member.generate_key_package()?; match new_member.join_group(&mls_payload) { Ok(()) => { let group_id = new_member .group_id() .unwrap_or_default(); let conv_id = if group_id.len() >= 16 { ConversationId::from_slice(&group_id[..16]) .unwrap_or_else(|| ConversationId::from_group_name(&hex::encode(&group_id))) } else { ConversationId::from_group_name(&hex::encode(&group_id)) }; // Check if conversation already exists. if session.members.contains_key(&conv_id) { continue; } let member_keys = new_member.member_identities(); let display = format!("joined-{}", &hex::encode(&group_id)[..8.min(group_id.len())]); let conv = Conversation { id: conv_id.clone(), kind: ConversationKind::Group { name: display.clone() }, display_name: format!("#{display}"), mls_group_blob: new_member .serialize_mls_state() .context("serialize joined group")?, keystore_blob: None, member_keys, unread_count: 0, last_activity_ms: now_ms(), created_at_ms: now_ms(), is_hybrid: new_member.is_hybrid(), last_seen_seq: 0, }; session.add_conversation(conv, new_member)?; session.active_conversation = Some(conv_id.clone()); display::print_status(&format!("joined group #{display}")); } Err(_) => { // Not a Welcome either — skip. } } } session.save_all()?; Ok(()) } /// Resolve an identity key to a username, falling back to a hex prefix on failure. async fn resolve_or_hex( client: &node_service::Client, identity_key: &[u8], ) -> String { match resolve_identity(client, identity_key).await { Ok(Some(name)) => name, _ => hex::encode(&identity_key[..8.min(identity_key.len())]), } } pub(crate) async fn cmd_members( session: &SessionState, client: &node_service::Client, ) -> anyhow::Result<()> { let conv_id = session .active_conversation .as_ref() .context("no active conversation")?; let member = session .members .get(conv_id) .context("no group member for active conversation")?; let my_key = session.identity_bytes(); let ids = member.member_identities(); let mut names = Vec::with_capacity(ids.len()); for id in &ids { if session.redact_keys { let short = &hex::encode(&id[..4.min(id.len())]); let mut name = format!("[redacted-{short}]"); if id.as_slice() == my_key.as_slice() { name.push_str(" (you)"); } names.push(name); } else { let mut name = resolve_or_hex(client, id).await; if id.as_slice() == my_key.as_slice() { name.push_str(" (you)"); } names.push(name); } } display::print_status(&format!("Members ({}): {}", names.len(), names.join(", "))); Ok(()) } pub(crate) async fn cmd_group_info( session: &SessionState, client: &node_service::Client, ) -> anyhow::Result<()> { let conv_id = session .active_conversation .as_ref() .context("no active conversation")?; let conv = session .conv_store .load_conversation(conv_id)? .context("conversation not found in store")?; let member = session .members .get(conv_id) .context("no group member for active conversation")?; let my_key = session.identity_bytes(); let ids = member.member_identities(); let conv_type = if ids.len() <= 2 { "DM" } else { "Group" }; display::print_status(&format!("Conversation: {}", conv.display_name)); display::print_status(&format!("Type: {}", conv_type)); display::print_status(&format!("Members: {}", ids.len())); let mut names = Vec::with_capacity(ids.len()); for id in &ids { if session.redact_keys { let short = &hex::encode(&id[..4.min(id.len())]); let mut name = format!("[redacted-{short}]"); if id.as_slice() == my_key.as_slice() { name.push_str(" (you)"); } names.push(name); } else { let mut name = resolve_or_hex(client, id).await; if id.as_slice() == my_key.as_slice() { name.push_str(" (you)"); } names.push(name); } } display::print_status(&format!(" {}", names.join(", "))); if let Some(epoch) = member.epoch() { display::print_status(&format!("MLS epoch: {}", epoch)); } Ok(()) } pub(crate) fn cmd_rename(session: &mut SessionState, new_name: &str) -> anyhow::Result<()> { let conv_id = session .active_conversation .as_ref() .context("no active conversation")? .clone(); let mut conv = session .conv_store .load_conversation(&conv_id)? .context("conversation not found in store")?; conv.display_name = new_name.to_string(); session.conv_store.save_conversation(&conv)?; display::print_status(&format!("Conversation renamed to: {new_name}")); Ok(()) } pub(crate) fn cmd_history(session: &SessionState, count: usize) -> anyhow::Result<()> { let conv_id = session .active_conversation .as_ref() .context("no active conversation")?; let msgs = session.conv_store.load_recent_messages(conv_id, count)?; if msgs.is_empty() { display::print_status("no messages yet"); return Ok(()); } for msg in &msgs { display::print_message(msg); } Ok(()) } pub(crate) async fn cmd_verify( session: &SessionState, client: &node_service::Client, username: &str, ) -> anyhow::Result<()> { // Resolve contact's identity key from the server. display::print_status(&format!("resolving {username}...")); let peer_key_vec = resolve_user(client, username) .await? .with_context(|| format!("user '{username}' not found"))?; anyhow::ensure!( peer_key_vec.len() == 32, "server returned an identity key with unexpected length ({}); expected 32 bytes", peer_key_vec.len() ); let peer_key: [u8; 32] = peer_key_vec .as_slice() .try_into() .expect("length checked above"); let my_key: [u8; 32] = session.identity.public_key_bytes(); let safety_number = compute_safety_number(&my_key, &peer_key); display::print_status(&format!("Safety number with @{username}:")); display::print_status(""); display::print_status(&format!(" {safety_number}")); display::print_status(""); display::print_status("Compare this number with your contact via a separate channel"); display::print_status("(voice call, in person, or any out-of-band means)."); display::print_status("If the numbers match, the connection has not been tampered with."); Ok(()) } // ── Typing indicator ───────────────────────────────────────────────────────── pub(crate) async fn cmd_typing( session: &mut SessionState, client: &node_service::Client, ) -> anyhow::Result<()> { let conv_id = session .active_conversation .as_ref() .context("no active conversation")? .clone(); let my_key = session.identity_bytes(); let identity = std::sync::Arc::clone(&session.identity); let member = session .get_member_mut(&conv_id) .context("no group member")?; anyhow::ensure!( member.group_ref().is_some(), "active conversation has no MLS group" ); let app_payload = serialize_typing(1); let sealed = quicprochat_core::sealed_sender::seal(&identity, &app_payload); let padded = quicprochat_core::padding::pad(&sealed); let ct = member .send_message(&padded) .context("MLS send_message failed")?; let recipients: Vec> = member .member_identities() .into_iter() .filter(|id| id.as_slice() != my_key.as_slice()) .collect(); for recipient_key in &recipients { let peer_hybrid_pk = fetch_hybrid_key(client, recipient_key).await?; let payload = if let Some(ref pk) = peer_hybrid_pk { hybrid_encrypt(pk, &ct, b"", b"").context("hybrid encrypt")? } else { ct.clone() }; enqueue(client, recipient_key, &payload).await?; } session.save_member(&conv_id)?; display::print_status("typing indicator sent"); Ok(()) } pub(crate) async fn cmd_react( session: &mut SessionState, client: &node_service::Client, emoji: &str, index: Option, ) -> anyhow::Result<()> { let conv_id = session .active_conversation .as_ref() .context("no active conversation; use /dm or /create-group first")? .clone(); // Resolve the target message_id. let ref_msg_id = if let Some(idx) = index { // User specified a 1-based display index into conversation history. let msgs = session.conv_store.load_all_messages(&conv_id)?; let msg = msgs .get(idx.saturating_sub(1)) .with_context(|| format!("no message at index {idx}"))?; msg.message_id .context("message at that index has no message_id")? } else { // React to the most recent non-outgoing chat/reply message. let msgs = session.conv_store.load_recent_messages(&conv_id, 50)?; let target = msgs .iter() .rev() .find(|m| !m.is_outgoing && (m.msg_type == "chat" || m.msg_type == "reply")) .context("no received messages to react to")?; target .message_id .context("most recent message has no message_id")? }; let my_key = session.identity_bytes(); let identity = Arc::clone(&session.identity); let member = session .get_member_mut(&conv_id) .context("no group member")?; anyhow::ensure!( member.group_ref().is_some(), "cannot react in a local-only conversation" ); let app_payload = serialize_reaction(ref_msg_id, emoji.as_bytes()) .context("serialize reaction")?; let sealed = quicprochat_core::sealed_sender::seal(&identity, &app_payload); let padded = quicprochat_core::padding::pad(&sealed); let ct = member .send_message(&padded) .context("MLS send_message failed")?; let recipients: Vec> = member .member_identities() .into_iter() .filter(|id| id.as_slice() != my_key.as_slice()) .collect(); for recipient_key in &recipients { let peer_hybrid_pk = fetch_hybrid_key(client, recipient_key).await?; let payload = if let Some(ref pk) = peer_hybrid_pk { hybrid_encrypt(pk, &ct, b"", b"").context("hybrid encrypt")? } else { ct.clone() }; enqueue(client, recipient_key, &payload).await?; } // Store outgoing reaction. let msg = StoredMessage { conversation_id: conv_id.clone(), message_id: None, sender_key: my_key, sender_name: Some("you".into()), body: emoji.to_string(), msg_type: "reaction".into(), ref_msg_id: Some(ref_msg_id), timestamp_ms: now_ms(), is_outgoing: true, }; session.conv_store.save_message(&msg)?; session.conv_store.update_activity(&conv_id, now_ms())?; session.save_member(&conv_id)?; display::print_status(&format!("reacted {emoji}")); Ok(()) } // ── Edit / Delete ──────────────────────────────────────────────────────────── pub(crate) async fn cmd_edit( session: &mut SessionState, client: &node_service::Client, index: usize, new_text: &str, ) -> anyhow::Result<()> { let conv_id = session .active_conversation .as_ref() .context("no active conversation")? .clone(); let msgs = session.conv_store.load_all_messages(&conv_id)?; anyhow::ensure!(!msgs.is_empty(), "no messages in this conversation"); anyhow::ensure!( index < msgs.len(), "message index {index} out of range (0..{})", msgs.len() - 1 ); let target = &msgs[index]; anyhow::ensure!(target.is_outgoing, "you can only edit your own messages"); let msg_id = target .message_id .context("message has no message_id (cannot edit)")?; let my_key = session.identity_bytes(); let identity = std::sync::Arc::clone(&session.identity); let member = session .get_member_mut(&conv_id) .context("no group member")?; anyhow::ensure!( member.group_ref().is_some(), "active conversation has no MLS group" ); let app_payload = serialize_edit(&msg_id, new_text.as_bytes()) .context("serialize edit message")?; let sealed = quicprochat_core::sealed_sender::seal(&identity, &app_payload); let padded = quicprochat_core::padding::pad(&sealed); let ct = member .send_message(&padded) .context("MLS send_message failed")?; let recipients: Vec> = member .member_identities() .into_iter() .filter(|id| id.as_slice() != my_key.as_slice()) .collect(); for recipient_key in &recipients { let peer_hybrid_pk = fetch_hybrid_key(client, recipient_key).await?; let payload = if let Some(ref pk) = peer_hybrid_pk { hybrid_encrypt(pk, &ct, b"", b"").context("hybrid encrypt")? } else { ct.clone() }; enqueue(client, recipient_key, &payload).await?; } // Update local DB. session .conv_store .update_message_body(&conv_id, &msg_id, new_text)?; session.save_member(&conv_id)?; display::print_status("message edited"); Ok(()) } pub(crate) async fn cmd_delete( session: &mut SessionState, client: &node_service::Client, index: usize, ) -> anyhow::Result<()> { let conv_id = session .active_conversation .as_ref() .context("no active conversation")? .clone(); let msgs = session.conv_store.load_all_messages(&conv_id)?; anyhow::ensure!(!msgs.is_empty(), "no messages in this conversation"); anyhow::ensure!( index < msgs.len(), "message index {index} out of range (0..{})", msgs.len() - 1 ); let target = &msgs[index]; anyhow::ensure!(target.is_outgoing, "you can only delete your own messages"); let msg_id = target .message_id .context("message has no message_id (cannot delete)")?; let my_key = session.identity_bytes(); let identity = std::sync::Arc::clone(&session.identity); let member = session .get_member_mut(&conv_id) .context("no group member")?; anyhow::ensure!( member.group_ref().is_some(), "active conversation has no MLS group" ); let app_payload = serialize_delete(&msg_id); let sealed = quicprochat_core::sealed_sender::seal(&identity, &app_payload); let padded = quicprochat_core::padding::pad(&sealed); let ct = member .send_message(&padded) .context("MLS send_message failed")?; let recipients: Vec> = member .member_identities() .into_iter() .filter(|id| id.as_slice() != my_key.as_slice()) .collect(); for recipient_key in &recipients { let peer_hybrid_pk = fetch_hybrid_key(client, recipient_key).await?; let payload = if let Some(ref pk) = peer_hybrid_pk { hybrid_encrypt(pk, &ct, b"", b"").context("hybrid encrypt")? } else { ct.clone() }; enqueue(client, recipient_key, &payload).await?; } // Mark as deleted in local DB. session.conv_store.delete_message(&conv_id, &msg_id)?; session.save_member(&conv_id)?; display::print_status("message deleted"); Ok(()) } // ── File transfer ──────────────────────────────────────────────────────────── /// Maximum file size for upload (50 MB). const MAX_FILE_SIZE: u64 = 50 * 1024 * 1024; /// Chunk size for upload/download (256 KB). const BLOB_CHUNK_SIZE: usize = 256 * 1024; /// Guess MIME type from file extension. fn guess_mime(path: &Path) -> &'static str { match path .extension() .and_then(|e| e.to_str()) .map(|e| e.to_lowercase()) .as_deref() { Some("pdf") => "application/pdf", Some("jpg" | "jpeg") => "image/jpeg", Some("png") => "image/png", Some("gif") => "image/gif", Some("txt") => "text/plain", Some("zip") => "application/zip", Some("json") => "application/json", Some("html" | "htm") => "text/html", Some("mp4") => "video/mp4", Some("mp3") => "audio/mpeg", Some("webp") => "image/webp", Some("svg") => "image/svg+xml", _ => "application/octet-stream", } } /// Format a byte size for human display (e.g. "1.2 MB"). fn format_size(bytes: u64) -> String { if bytes < 1024 { format!("{bytes} B") } else if bytes < 1024 * 1024 { format!("{:.1} KB", bytes as f64 / 1024.0) } else if bytes < 1024 * 1024 * 1024 { format!("{:.1} MB", bytes as f64 / (1024.0 * 1024.0)) } else { format!("{:.1} GB", bytes as f64 / (1024.0 * 1024.0 * 1024.0)) } } pub(crate) async fn cmd_send_file( session: &mut SessionState, client: &node_service::Client, path_str: &str, ) -> anyhow::Result<()> { let conv_id = session .active_conversation .as_ref() .context("no active conversation; use /dm or /create-group first")? .clone(); let file_path = PathBuf::from(path_str.trim_matches('"')); anyhow::ensure!(file_path.exists(), "file not found: {}", file_path.display()); let metadata = std::fs::metadata(&file_path) .with_context(|| format!("cannot read file: {}", file_path.display()))?; let file_size = metadata.len(); anyhow::ensure!( file_size <= MAX_FILE_SIZE, "file too large ({}, max {})", format_size(file_size), format_size(MAX_FILE_SIZE) ); let filename = file_path .file_name() .and_then(|n| n.to_str()) .context("cannot determine filename")? .to_string(); let mime_type = guess_mime(&file_path); // Read entire file and compute SHA-256 hash. let file_bytes = std::fs::read(&file_path) .with_context(|| format!("read file: {}", file_path.display()))?; use sha2::{Sha256, Digest}; let hash = Sha256::digest(&file_bytes); let blob_hash: [u8; 32] = hash.into(); // Upload in chunks with progress. let total = file_bytes.len(); let mut offset = 0usize; while offset < total { let end = (offset + BLOB_CHUNK_SIZE).min(total); let chunk = &file_bytes[offset..end]; upload_blob_chunk( client, &blob_hash, chunk, offset as u64, total as u64, mime_type, ) .await?; offset = end; let pct = (offset as u64 * 100) / total as u64; eprint!("\rUploading... {pct}%"); } eprintln!(); // Build FileRef AppMessage. let app_payload = serialize_file_ref( &blob_hash, filename.as_bytes(), file_size, mime_type.as_bytes(), ) .context("serialize FileRef")?; let my_key = session.identity_bytes(); let identity = std::sync::Arc::clone(&session.identity); let member = session .get_member_mut(&conv_id) .context("no group member")?; anyhow::ensure!( member.group_ref().is_some(), "cannot send files in a local-only conversation" ); let sealed = quicprochat_core::sealed_sender::seal(&identity, &app_payload); let padded = quicprochat_core::padding::pad(&sealed); let ct = member .send_message(&padded) .context("MLS send_message failed")?; let recipients: Vec> = member .member_identities() .into_iter() .filter(|id| id.as_slice() != my_key.as_slice()) .collect(); for recipient_key in &recipients { let peer_hybrid_pk = fetch_hybrid_key(client, recipient_key).await?; let payload = if let Some(ref pk) = peer_hybrid_pk { hybrid_encrypt(pk, &ct, b"", b"").context("hybrid encrypt")? } else { ct.clone() }; enqueue(client, recipient_key, &payload).await?; } // Store outgoing message (include blob hash so /download can extract it). let body = format!( "\u{1f4ce} {} ({}) blob:{}", filename, format_size(file_size), hex::encode(blob_hash) ); let msg = StoredMessage { conversation_id: conv_id.clone(), message_id: None, sender_key: my_key, sender_name: Some("you".into()), body, msg_type: "file".into(), ref_msg_id: None, timestamp_ms: now_ms(), is_outgoing: true, }; session.conv_store.save_message(&msg)?; session.conv_store.update_activity(&conv_id, now_ms())?; session.save_member(&conv_id)?; display::print_status(&format!( "Sent: {} ({})", filename, format_size(file_size) )); Ok(()) } pub(crate) async fn cmd_download( session: &mut SessionState, client: &node_service::Client, index: usize, ) -> anyhow::Result<()> { let conv_id = session .active_conversation .as_ref() .context("no active conversation")? .clone(); let msgs = session.conv_store.load_all_messages(&conv_id)?; anyhow::ensure!(!msgs.is_empty(), "no messages in this conversation"); anyhow::ensure!( index < msgs.len(), "message index {index} out of range (0..{})", msgs.len() - 1 ); let target = &msgs[index]; anyhow::ensure!( target.msg_type == "file", "message at index {index} is not a file (type: {})", target.msg_type ); // Extract blob_id from the stored ref_msg_id field (32-byte blob hash stored as first 16 bytes // in ref_msg_id is not enough). We store the blob_id hex in the body after the filename. // Parse the body format: "\u{1f4ce} filename (size) blob:HEXHASH" let blob_id = extract_blob_id_from_body(&target.body) .context("cannot extract blob_id from file message; the message may be from an older version")?; // Get filename from body: "\u{1f4ce} filename (size) ..." let filename = extract_filename_from_body(&target.body) .unwrap_or_else(|| "download".to_string()); // Download in chunks. // First request to learn total_size. let (first_chunk, total_size, _mime) = download_blob_chunk(client, &blob_id, 0, BLOB_CHUNK_SIZE as u32).await?; let mut data = Vec::with_capacity(total_size as usize); data.extend_from_slice(&first_chunk); if total_size > first_chunk.len() as u64 { let pct = (data.len() as u64 * 100) / total_size; eprint!("\rDownloading... {pct}%"); } while (data.len() as u64) < total_size { let (chunk, _, _) = download_blob_chunk( client, &blob_id, data.len() as u64, BLOB_CHUNK_SIZE as u32, ) .await?; if chunk.is_empty() { break; } data.extend_from_slice(&chunk); let pct = (data.len() as u64 * 100) / total_size; eprint!("\rDownloading... {pct}%"); } if total_size > BLOB_CHUNK_SIZE as u64 { eprintln!(); } // Verify SHA-256. use sha2::{Sha256, Digest}; let computed_hash = Sha256::digest(&data); anyhow::ensure!( computed_hash.as_slice() == blob_id.as_slice(), "SHA-256 mismatch: blob data is corrupt" ); // Save to current directory, avoiding overwrites. let mut save_path = PathBuf::from(&filename); let mut counter = 1u32; while save_path.exists() { let stem = Path::new(&filename) .file_stem() .and_then(|s| s.to_str()) .unwrap_or("download"); let ext = Path::new(&filename) .extension() .and_then(|e| e.to_str()) .unwrap_or(""); if ext.is_empty() { save_path = PathBuf::from(format!("{stem}.{counter}")); } else { save_path = PathBuf::from(format!("{stem}.{counter}.{ext}")); } counter += 1; } std::fs::write(&save_path, &data) .with_context(|| format!("write file: {}", save_path.display()))?; display::print_status(&format!( "Downloaded: {} -> ./{}", filename, save_path.display() )); Ok(()) } /// Extract blob_id from the file message body format: /// "\u{1f4ce} filename (size) blob:HEX64" fn extract_blob_id_from_body(body: &str) -> Option> { let marker = "blob:"; let idx = body.find(marker)?; let hex_str = &body[idx + marker.len()..]; let hex_str = hex_str.split_whitespace().next()?; if hex_str.len() != 64 { return None; } hex::decode(hex_str).ok() } /// Extract filename from the file message body format: /// "\u{1f4ce} filename (size) blob:HEX64" fn extract_filename_from_body(body: &str) -> Option { // Skip the leading emoji + space. let rest = body.strip_prefix("\u{1f4ce} ")?; // Find the last " (" to separate filename from "(size) blob:..." let paren_idx = rest.rfind(" (")?; let filename = &rest[..paren_idx]; if filename.is_empty() { None } else { Some(filename.to_string()) } } pub(crate) async fn cmd_delete_account( session: &mut SessionState, client: &node_service::Client, ) -> anyhow::Result<()> { display::print_error("WARNING: This will permanently delete your account and all data on the server."); display::print_status("Type 'DELETE' to confirm:"); // Read confirmation from stdin. let mut input = String::new(); { use std::io::Write; std::io::stderr().flush().ok(); } std::io::stdin() .read_line(&mut input) .context("failed to read confirmation")?; if input.trim() != "DELETE" { display::print_status("Account deletion cancelled."); return Ok(()); } delete_account(client).await?; // Clear local state file. if session.state_path.exists() { std::fs::remove_file(&session.state_path) .with_context(|| format!("remove state file: {}", session.state_path.display()))?; } // Clear cached session token. clear_cached_session(&session.state_path); display::print_status("Account deleted successfully."); std::process::exit(0); } // ── Sending ────────────────────────────────────────────────────────────────── async fn handle_send( session: &mut SessionState, client: &node_service::Client, text: &str, ) { if let Err(e) = do_send(session, client, text).await { display::print_error(&format!("{e:#}")); } } pub(crate) async fn do_send( session: &mut SessionState, client: &node_service::Client, text: &str, ) -> anyhow::Result<()> { let conv_id = session .active_conversation .as_ref() .context("no active conversation; use /dm or /create-group first")? .clone(); let my_key = session.identity_bytes(); let identity = std::sync::Arc::clone(&session.identity); let member = session .get_member_mut(&conv_id) .context("no group member")?; // Local-only notepad (self-DM): save message without MLS/delivery. if member.group_ref().is_none() { let msg = StoredMessage { conversation_id: conv_id.clone(), message_id: None, sender_key: my_key, sender_name: Some("you".into()), body: text.to_string(), msg_type: "chat".into(), ref_msg_id: None, timestamp_ms: now_ms(), is_outgoing: true, }; session.conv_store.save_message(&msg)?; session.conv_store.update_activity(&conv_id, now_ms())?; return Ok(()); } // Wrap in structured AppMessage format with a unique message_id. let app_payload = serialize_chat(text.as_bytes(), None) .context("serialize app message")?; // Metadata protection: seal sender identity inside payload + pad to bucket size. let sealed = quicprochat_core::sealed_sender::seal(&identity, &app_payload); let padded = quicprochat_core::padding::pad(&sealed); let ct = member .send_message(&padded) .context("MLS send_message failed")?; // Collect epoch and recipients before releasing the mutable borrow on session. let epoch = member.epoch(); let recipients: Vec> = member .member_identities() .into_iter() .filter(|id| id.as_slice() != my_key.as_slice()) .collect(); // Track epoch for /verify-fs (must be after member borrow is released). if let Some(epoch) = epoch { session.last_send_epoch = Some(epoch); } let ttl = session.disappear_ttl.get(&conv_id).copied(); for recipient_key in &recipients { let peer_hybrid_pk = fetch_hybrid_key(client, recipient_key).await?; let payload = if let Some(ref pk) = peer_hybrid_pk { hybrid_encrypt(pk, &ct, b"", b"").context("hybrid encrypt")? } else { ct.clone() }; enqueue_with_ttl(client, recipient_key, &payload, ttl).await?; } // Extract message_id from what we just serialized. let msg_id = parse_app_msg(&app_payload) .ok() .and_then(|(_, m)| match m { AppMessage::Chat { message_id, .. } => Some(message_id), _ => None, }); // Save outgoing message to history. let msg = StoredMessage { conversation_id: conv_id.clone(), message_id: msg_id, sender_key: my_key, sender_name: Some("you".into()), body: text.to_string(), msg_type: "chat".into(), ref_msg_id: None, timestamp_ms: now_ms(), is_outgoing: true, }; session.conv_store.save_message(&msg)?; session.conv_store.update_activity(&conv_id, now_ms())?; session.save_member(&conv_id)?; Ok(()) } /// Send a dummy message for traffic analysis resistance. async fn send_dummy_message( session: &mut SessionState, client: &node_service::Client, ) { let conv_id = match session.active_conversation.as_ref() { Some(id) => id.clone(), None => return, }; let my_key = session.identity_bytes(); let identity = std::sync::Arc::clone(&session.identity); let member = match session.get_member_mut(&conv_id) { Some(m) => m, None => return, }; if member.group_ref().is_none() { return; } let recipients: Vec> = member .member_identities() .into_iter() .filter(|id| id.as_slice() != my_key.as_slice()) .collect(); if recipients.is_empty() { return; } let dummy_payload = serialize_dummy(); let sealed = quicprochat_core::sealed_sender::seal(&identity, &dummy_payload); let padded = quicprochat_core::padding::pad(&sealed); let ct = match member.send_message(&padded) { Ok(ct) => ct, Err(_) => return, }; let idx = (now_ms() as usize) % recipients.len(); let rk = &recipients[idx]; let payload = match fetch_hybrid_key(client, rk).await { Ok(Some(ref pk)) => match hybrid_encrypt(pk, &ct, b"", b"") { Ok(p) => p, Err(_) => ct, }, _ => ct, }; let _ = enqueue(client, rk, &payload).await; let _ = session.save_member(&conv_id); } // ── Outbox drain ───────────────────────────────────────────────────────────── async fn drain_outbox( session: &mut SessionState, client: &node_service::Client, ) { let entries = match session.conv_store.load_pending_outbox() { Ok(e) => e, Err(_) => return, }; for entry in entries { match enqueue(client, &entry.recipient_key, &entry.payload).await { Ok(_) => { let _ = session.conv_store.mark_outbox_sent(entry.id); } Err(_) => { let _ = session.conv_store.mark_outbox_failed(entry.id, entry.retry_count + 1); } } } } // ── Polling ────────────────────────────────────────────────────────────────── async fn poll_messages( session: &mut SessionState, client: &node_service::Client, ) -> anyhow::Result<()> { let identity_bytes = session.identity_bytes(); let mut payloads = fetch_wait(client, &identity_bytes, 0).await?; if payloads.is_empty() { return Ok(()); } payloads.sort_by_key(|(seq, _)| *seq); let mut any_changed = false; for (_seq, payload) in &payloads { let mls_payload = match try_hybrid_decrypt(session.hybrid_kp.as_ref(), payload) { Ok(b) => b, Err(_) => payload.clone(), // Try raw (non-hybrid) }; // Try each conversation's GroupMember. let conv_ids: Vec = session.members.keys().cloned().collect(); let my_key = session.identity_bytes(); let mut handled = false; for conv_id in &conv_ids { let member = match session.members.get_mut(conv_id) { Some(m) => m, None => continue, }; match member.receive_message(&mls_payload) { Ok(ReceivedMessage::Application(plaintext)) => { // Metadata protection: try unpad → unseal → parse. // Falls back gracefully for messages from older clients. let (sender_key, app_bytes) = { // Step 1: try unpad let after_unpad = quicprochat_core::padding::unpad(&plaintext) .unwrap_or_else(|_| plaintext.clone()); // Step 2: try unseal if quicprochat_core::sealed_sender::is_sealed(&after_unpad) { match quicprochat_core::sealed_sender::unseal(&after_unpad) { Ok((sk, inner)) => (sk.to_vec(), inner), Err(_) => (my_key.clone(), after_unpad), } } else { (my_key.clone(), after_unpad) } }; // Parse structured AppMessage; handle ephemeral types first. let parsed = parse_app_msg(&app_bytes); // Typing indicators: ephemeral display only, never stored. if let Ok((_, AppMessage::Typing { active })) = &parsed { if session.typing_notify_enabled { let sender_hex = hex::encode(&sender_key[..4.min(sender_key.len())]); if *active != 0 { session.typing_indicators.insert( sender_hex.clone(), std::time::Instant::now(), ); let is_active_conv = session .active_conversation .as_ref() .map(|a| a == conv_id) .unwrap_or(false); if is_active_conv { display::print_typing(&sender_hex); display::print_prompt(session); } } else { session.typing_indicators.remove(&sender_hex); } } any_changed = true; handled = true; break; } // Dummy messages: silently discard (traffic padding). if let Ok((_, AppMessage::Dummy)) = &parsed { any_changed = true; handled = true; break; } // Read receipts: ephemeral, show subtle notification. if let Ok((_, AppMessage::ReadReceipt { .. })) = &parsed { let is_active = session .active_conversation .as_ref() .map(|a| a == conv_id) .unwrap_or(false); if is_active { let fallback = hex::encode(&sender_key[..4.min(sender_key.len())]); let label = resolve_identity(client, &sender_key) .await.ok().flatten().unwrap_or(fallback); display::print_status(&format!("\u{2713} {label} read your message")); } any_changed = true; handled = true; break; } // Edit: update existing message body in DB. if let Ok((_, AppMessage::Edit { ref_msg_id, body: edit_body })) = &parsed { let new_body = String::from_utf8_lossy(edit_body).to_string(); session.conv_store.update_message_body( conv_id, ref_msg_id, &new_body, )?; session.conv_store.update_activity(conv_id, now_ms())?; let is_active = session .active_conversation .as_ref() .map(|a| a == conv_id) .unwrap_or(false); if is_active { let conv = session.conv_store.load_conversation(conv_id)?; let conv_name = conv.map(|c| c.display_name).unwrap_or_default(); display::print_incoming(&conv_name, &format!("[edited] {new_body}")); display::print_prompt(session); } any_changed = true; handled = true; break; } // Delete: mark existing message as deleted in DB. if let Ok((_, AppMessage::Delete { ref_msg_id })) = &parsed { session.conv_store.delete_message(conv_id, ref_msg_id)?; session.conv_store.update_activity(conv_id, now_ms())?; let is_active = session .active_conversation .as_ref() .map(|a| a == conv_id) .unwrap_or(false); if is_active { let conv = session.conv_store.load_conversation(conv_id)?; let conv_name = conv.map(|c| c.display_name).unwrap_or_default(); display::print_incoming(&conv_name, "[deleted a message]"); display::print_prompt(session); } any_changed = true; handled = true; break; } // Storable message types: Chat, Reply, Reaction, FileRef, legacy. let (body, msg_id, msg_type, ref_msg_id) = match parsed { Ok((_, AppMessage::Chat { message_id, body })) => ( String::from_utf8_lossy(&body).to_string(), Some(message_id), "chat", None, ), Ok((_, AppMessage::Reply { ref_msg_id, body })) => ( String::from_utf8_lossy(&body).to_string(), None, "reply", Some(ref_msg_id), ), Ok((_, AppMessage::Reaction { ref_msg_id, emoji })) => ( String::from_utf8_lossy(&emoji).to_string(), None, "reaction", Some(ref_msg_id), ), Ok((_, AppMessage::FileRef { blob_id, filename, file_size, .. })) => { let fname = String::from_utf8_lossy(&filename).to_string(); let body = format!( "\u{1f4ce} {} ({}) blob:{}", fname, format_size(file_size), hex::encode(blob_id), ); (body, None, "file", None) } _ => { // Legacy raw plaintext or unknown type. ( String::from_utf8_lossy(&app_bytes).to_string(), None, "chat", None, ) } }; // A real message clears the sender's typing indicator. let sender_hex = hex::encode(&sender_key[..4.min(sender_key.len())]); session.typing_indicators.remove(&sender_hex); let msg = StoredMessage { conversation_id: conv_id.clone(), message_id: msg_id, sender_key: sender_key.clone(), sender_name: None, body: body.clone(), msg_type: msg_type.into(), ref_msg_id, timestamp_ms: now_ms(), is_outgoing: false, }; session.conv_store.save_message(&msg)?; session.conv_store.update_activity(conv_id, now_ms())?; let is_active = session .active_conversation .as_ref() .map(|a| a == conv_id) .unwrap_or(false); if is_active { let conv = session.conv_store.load_conversation(conv_id)?; let conv_name = conv.map(|c| c.display_name).unwrap_or_default(); let display_body = if msg_type == "reaction" { format!("reacted {body}") } else if msg_type == "file" { // Show the file info without the blob: suffix, plus download hint. let visible = body.split(" blob:").next().unwrap_or(&body); let all_msgs = session.conv_store.load_all_messages(conv_id)?; let msg_idx = all_msgs.len().saturating_sub(1); format!("{visible} -- use /download {msg_idx} to save") } else { body.clone() }; display::print_incoming(&conv_name, &display_body); display::print_prompt(session); } else { session.conv_store.increment_unread(conv_id)?; } // Auto-send read receipt for Chat and Reply (has message_id). if let Some(mid) = msg_id { let receipt_bytes = serialize_read_receipt(mid); let identity = Arc::clone(&session.identity); let sealed = quicprochat_core::sealed_sender::seal(&identity, &receipt_bytes); let padded = quicprochat_core::padding::pad(&sealed); if let Some(m) = session.members.get_mut(conv_id) { if let Ok(ct) = m.send_message(&padded) { let _ = enqueue(client, &sender_key, &ct).await; } } } any_changed = true; handled = true; break; } Ok(ReceivedMessage::StateChanged) => { // Processed a non-application message (commit, proposal, etc.). // Auto-commit any pending proposals (e.g. leave requests). if member.has_pending_proposals() { if let Ok((commit, _welcome)) = member.commit_pending_proposals() { let remaining: Vec> = member .member_identities() .into_iter() .filter(|id| id.as_slice() != my_key.as_slice()) .collect(); for rk in &remaining { let _ = enqueue(client, rk, &commit).await; } } } any_changed = true; handled = true; break; } Ok(ReceivedMessage::SelfRemoved) => { display::print_status("you were removed from this conversation"); session.members.remove(conv_id); any_changed = true; handled = true; break; } Err(_) => continue, } } if !handled { // Try as a Welcome — auto-join the group (IRC-like behaviour). handled = try_auto_join(session, client, &mls_payload).await; if handled { any_changed = true; } else { tracing::debug!("unhandled payload (not Welcome or existing group)"); } } } if any_changed { session.save_all()?; } Ok(()) } /// Attempt to process `mls_payload` as a Welcome message using the pending /// KeyPackage member. On success, creates the conversation automatically /// (IRC-like auto-join) and uploads a fresh KeyPackage for future invites. async fn try_auto_join( session: &mut SessionState, client: &node_service::Client, mls_payload: &[u8], ) -> bool { let pending = match session.pending_member.as_mut() { Some(p) => p, None => return false, }; if pending.join_group(mls_payload).is_err() { return false; } // Successfully joined — extract group info. let group_id = pending.group_id().unwrap_or_default(); let conv_id = if group_id.len() >= 16 { ConversationId::from_slice(&group_id[..16]) .unwrap_or_else(|| ConversationId::from_group_name(&hex::encode(&group_id))) } else { ConversationId::from_group_name(&hex::encode(&group_id)) }; // Already know this conversation — skip. if session.members.contains_key(&conv_id) { return false; } // Take ownership of the pending member. let member = match session.pending_member.take() { Some(m) => m, None => { tracing::error!("pending_member disappeared after successful join"); return false; } }; let member_keys = member.member_identities(); // Figure out the peer (any member that isn't us). let my_key = session.identity_bytes(); let peer_key = member_keys .iter() .find(|k| k.as_slice() != my_key.as_slice()) .cloned(); // Resolve the peer's username from the server (best-effort). let peer_username = if let Some(ref pk) = peer_key { resolve_identity(client, pk).await.ok().flatten() } else { None }; // Build display name: "@username" if known, else "@". let display_name = if let Some(ref uname) = peer_username { format!("@{uname}") } else if let Some(ref pk) = peer_key { format!("@{}", &hex::encode(&pk[..4.min(pk.len())])) } else { let id_hex = hex::encode(&group_id); format!("#{}", &id_hex[..8.min(id_hex.len())]) }; let kind = if let Some(pk) = peer_key { ConversationKind::Dm { peer_key: pk, peer_username, } } else { ConversationKind::Group { name: display_name.clone(), } }; let mls_blob = member .serialize_mls_state() .ok() .flatten(); let conv = Conversation { id: conv_id.clone(), kind, display_name: display_name.clone(), mls_group_blob: mls_blob, keystore_blob: None, member_keys, unread_count: 0, last_activity_ms: now_ms(), created_at_ms: now_ms(), is_hybrid: member.is_hybrid(), last_seen_seq: 0, }; if let Err(e) = session.add_conversation(conv, member) { display::print_error(&format!("auto-join failed: {e:#}")); return false; } // Auto-switch if no active conversation. if session.active_conversation.is_none() { session.active_conversation = Some(conv_id); } display::print_incoming("system", &format!("new conversation: {display_name}")); display::print_prompt(session); // Upload a fresh KeyPackage so we can receive more invites. replenish_pending_key(session, client).await; true } /// Upload a new KeyPackage and store the member as `pending_member` so we're /// ready for the next incoming Welcome. async fn replenish_pending_key( session: &mut SessionState, client: &node_service::Client, ) { match auto_upload_keys(session, client).await { Ok(pending) => { session.pending_member = Some(pending); } Err(e) => { tracing::warn!(error = format!("{e:#}"), "failed to replenish KeyPackage"); } } } // ── Device management commands ────────────────────────────────────────────── pub(crate) async fn cmd_devices(client: &node_service::Client) -> anyhow::Result<()> { let devices = list_devices(client).await?; if devices.is_empty() { display::print_status("No devices registered."); return Ok(()); } display::print_status(&format!("{} device(s):", devices.len())); for (device_id, name, registered_at) in &devices { let id_hex = hex::encode(device_id); let name_display = if name.is_empty() { "(unnamed)" } else { name.as_str() }; display::print_status(&format!( " {} - {} (registered: {})", &id_hex[..16.min(id_hex.len())], name_display, registered_at )); } Ok(()) } pub(crate) async fn cmd_register_device( client: &node_service::Client, name: &str, ) -> anyhow::Result<()> { let mut device_id = [0u8; 16]; rand::RngCore::fill_bytes(&mut rand::rngs::OsRng, &mut device_id); let success = register_device(client, &device_id, name).await?; if success { display::print_status(&format!( "Device registered: {} ({})", hex::encode(device_id), name )); } else { display::print_error("Device already exists with that ID."); } Ok(()) } pub(crate) async fn cmd_revoke_device( client: &node_service::Client, id_prefix: &str, ) -> anyhow::Result<()> { let devices = list_devices(client).await?; let prefix_lower = id_prefix.to_lowercase(); let matches: Vec<_> = devices .iter() .filter(|(id, _, _)| hex::encode(id).starts_with(&prefix_lower)) .collect(); match matches.len() { 0 => { display::print_error(&format!("No device matching prefix '{id_prefix}'")); } 1 => { let (device_id, name, _) = matches[0]; let success = revoke_device(client, device_id).await?; if success { display::print_status(&format!( "Device revoked: {} ({})", hex::encode(device_id), if name.is_empty() { "(unnamed)" } else { name.as_str() } )); } else { display::print_error("Device not found on server."); } } n => { display::print_error(&format!( "Ambiguous prefix '{id_prefix}' matches {n} devices. Be more specific." )); } } Ok(()) }