feat: auto-start local server from REPL

When no server is reachable, the REPL now automatically spawns a
qpq-server child process with dev defaults (--allow-insecure-auth,
matching TLS cert paths). The server is killed on REPL exit via a
Drop guard. Use --no-server to opt out (e.g. for remote servers).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-01 23:03:31 +01:00
parent c2762f93f6
commit e24497bf90
2 changed files with 330 additions and 13 deletions

View File

@@ -3,14 +3,15 @@
//! Supports slash commands for DMs, groups, invitations, and conversation switching. //! Supports slash commands for DMs, groups, invitations, and conversation switching.
//! Background polling fetches messages for all active conversations. //! Background polling fetches messages for all active conversations.
use std::path::Path; use std::path::{Path, PathBuf};
use std::process::{Child, Command as ProcessCommand};
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use anyhow::Context; use anyhow::Context;
use quicproquo_core::{ use quicproquo_core::{
AppMessage, DiskKeyStore, GroupMember, IdentityKeypair, hybrid_encrypt, AppMessage, DiskKeyStore, GroupMember, IdentityKeypair, ReceivedMessage,
parse as parse_app_msg, serialize_chat, hybrid_encrypt, parse as parse_app_msg, serialize_chat,
}; };
use quicproquo_proto::node_capnp::node_service; use quicproquo_proto::node_capnp::node_service;
use tokio::sync::mpsc; use tokio::sync::mpsc;
@@ -48,6 +49,8 @@ enum SlashCommand {
Dm { username: String }, Dm { username: String },
CreateGroup { name: String }, CreateGroup { name: String },
Invite { target: String }, Invite { target: String },
Remove { target: String },
Leave,
Join, Join,
Members, Members,
History { count: usize }, History { count: usize },
@@ -99,6 +102,14 @@ fn parse_input(line: &str) -> Input {
Input::Empty Input::Empty
} }
}, },
"/remove" | "/kick" => match arg {
Some(target) => Input::Slash(SlashCommand::Remove { target }),
None => {
display::print_error("usage: /remove <username>");
Input::Empty
}
},
"/leave" => Input::Slash(SlashCommand::Leave),
"/join" => Input::Slash(SlashCommand::Join), "/join" => Input::Slash(SlashCommand::Join),
"/members" => Input::Slash(SlashCommand::Members), "/members" => Input::Slash(SlashCommand::Members),
"/history" | "/hist" => { "/history" | "/hist" => {
@@ -112,6 +123,137 @@ fn parse_input(line: &str) -> Input {
} }
} }
// ── Auto-start server ────────────────────────────────────────────────────────
/// RAII guard that kills an auto-started server process on drop.
struct ServerGuard(Option<Child>);
impl Drop for ServerGuard {
fn drop(&mut self) {
if let Some(ref mut child) = self.0 {
let _ = child.kill();
let _ = child.wait();
}
}
}
/// Derive the TLS key path from the cert path (e.g. server-cert.der → server-key.der).
fn derive_key_path(cert_path: &Path) -> PathBuf {
let stem = cert_path
.file_name()
.and_then(|f| f.to_str())
.unwrap_or("server-cert.der");
let key_name = stem.replace("cert", "key");
let key_name = if key_name == stem {
// No "cert" in filename — just append "-key" before extension.
let p = PathBuf::from(stem);
let ext = p.extension().and_then(|e| e.to_str()).unwrap_or("der");
let base = p.file_stem().and_then(|s| s.to_str()).unwrap_or("server");
format!("{base}-key.{ext}")
} else {
key_name
};
cert_path.with_file_name(key_name)
}
/// Find the `qpq-server` binary: same directory as current exe, then PATH.
fn find_server_binary() -> Option<PathBuf> {
if let Ok(exe) = std::env::current_exe() {
let sibling = exe.with_file_name("qpq-server");
if sibling.exists() {
return Some(sibling);
}
}
// Fall back to PATH lookup.
std::env::var_os("PATH").and_then(|paths| {
std::env::split_paths(&paths)
.map(|dir| dir.join("qpq-server"))
.find(|p| p.exists())
})
}
/// Try to connect to the server. Returns Ok(()) if reachable, Err otherwise.
async fn probe_server(server: &str, ca_cert: &Path, server_name: &str) -> bool {
connect_node(server, ca_cert, server_name).await.is_ok()
}
/// Ensure a server is running. If not reachable, auto-start one.
/// Returns a guard that kills the child process on drop (if we started one).
async fn ensure_server(
server: &str,
ca_cert: &Path,
server_name: &str,
no_server: bool,
) -> anyhow::Result<ServerGuard> {
if no_server {
return Ok(ServerGuard(None));
}
// If the cert already exists, try connecting first.
if ca_cert.exists() && probe_server(server, ca_cert, server_name).await {
return Ok(ServerGuard(None));
}
// Server not reachable — try to auto-start.
let binary = match find_server_binary() {
Some(b) => b,
None => {
if ca_cert.exists() {
// Cert exists but connection failed and no binary found.
anyhow::bail!(
"server at {server} is not reachable and qpq-server binary not found; \
start a server manually or install qpq-server"
);
} else {
anyhow::bail!(
"no server running and qpq-server binary not found; \
start a server manually or install qpq-server"
);
}
}
};
let key_path = derive_key_path(ca_cert);
display::print_status(&format!("starting server on {server}..."));
let child = ProcessCommand::new(&binary)
.args([
"--allow-insecure-auth",
"--listen", server,
"--tls-cert", &ca_cert.to_string_lossy(),
"--tls-key", &key_path.to_string_lossy(),
])
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.spawn()
.with_context(|| format!("failed to spawn {}", binary.display()))?;
let guard = ServerGuard(Some(child));
// Poll until the server is ready (cert may need to be generated first).
let mut delay = Duration::from_millis(100);
let max_wait = Duration::from_secs(5);
let start = std::time::Instant::now();
loop {
tokio::time::sleep(delay).await;
if ca_cert.exists() && probe_server(server, ca_cert, server_name).await {
display::print_status("server ready");
return Ok(guard);
}
if start.elapsed() > max_wait {
anyhow::bail!(
"auto-started qpq-server but it did not become ready within {max_wait:?}"
);
}
delay = (delay * 2).min(Duration::from_secs(1));
}
}
// ── REPL entry point ───────────────────────────────────────────────────────── // ── REPL entry point ─────────────────────────────────────────────────────────
pub async fn run_repl( pub async fn run_repl(
@@ -124,7 +266,11 @@ pub async fn run_repl(
opaque_password: Option<&str>, opaque_password: Option<&str>,
access_token: &str, access_token: &str,
device_id: Option<&str>, device_id: Option<&str>,
no_server: bool,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
// Phase 0: Ensure a server is running (auto-start if needed).
let _server_guard = ensure_server(server, ca_cert, server_name, no_server).await?;
// Phase 1: Resolve an access token (auto-register/login if needed). // Phase 1: Resolve an access token (auto-register/login if needed).
let resolved_token = resolve_access_token( let resolved_token = resolve_access_token(
state_path, server, ca_cert, server_name, password, state_path, server, ca_cert, server_name, password,
@@ -218,6 +364,8 @@ pub async fn run_repl(
let mut poll = interval(Duration::from_millis(1000)); let mut poll = interval(Duration::from_millis(1000));
poll.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); poll.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
let mut consecutive_errors: u32 = 0; let mut consecutive_errors: u32 = 0;
let mut backoff_ms: u64 = 1000;
const MAX_BACKOFF_MS: u64 = 60_000;
display::print_prompt(&session); display::print_prompt(&session);
@@ -242,22 +390,35 @@ pub async fn run_repl(
} }
} }
_ = poll.tick() => { _ = poll.tick() => {
// Drain offline outbox before polling for new messages.
drain_outbox(&mut session, &client).await;
match poll_messages(&mut session, &client).await { match poll_messages(&mut session, &client).await {
Ok(()) => { consecutive_errors = 0; } Ok(()) => {
consecutive_errors = 0;
backoff_ms = 1000;
}
Err(e) => { Err(e) => {
consecutive_errors += 1; consecutive_errors += 1;
tracing::warn!(error = format!("{e:#}"), n = consecutive_errors, "poll error"); tracing::warn!(error = format!("{e:#}"), n = consecutive_errors, "poll error");
if consecutive_errors >= 3 { if consecutive_errors >= 3 {
display::print_status("connection lost, reconnecting..."); display::print_status(&format!(
"connection lost, reconnecting in {:.0}s...",
backoff_ms as f64 / 1000.0
));
tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
match connect_node(server, ca_cert, server_name).await { match connect_node(server, ca_cert, server_name).await {
Ok(new_client) => { Ok(new_client) => {
client = new_client; client = new_client;
consecutive_errors = 0; consecutive_errors = 0;
backoff_ms = 1000;
display::print_status("reconnected"); display::print_status("reconnected");
display::print_prompt(&session); display::print_prompt(&session);
} }
Err(re) => { Err(re) => {
tracing::debug!(error = %re, "reconnect failed"); tracing::debug!(error = %re, "reconnect failed");
// Exponential backoff, capped.
backoff_ms = (backoff_ms * 2).min(MAX_BACKOFF_MS);
} }
} }
} }
@@ -409,6 +570,8 @@ async fn handle_slash(
SlashCommand::Dm { username } => cmd_dm(session, client, &username).await, SlashCommand::Dm { username } => cmd_dm(session, client, &username).await,
SlashCommand::CreateGroup { name } => cmd_create_group(session, &name), SlashCommand::CreateGroup { name } => cmd_create_group(session, &name),
SlashCommand::Invite { target } => cmd_invite(session, client, &target).await, SlashCommand::Invite { target } => cmd_invite(session, client, &target).await,
SlashCommand::Remove { target } => cmd_remove(session, client, &target).await,
SlashCommand::Leave => cmd_leave(session, client).await,
SlashCommand::Join => cmd_join(session, client).await, SlashCommand::Join => cmd_join(session, client).await,
SlashCommand::Members => cmd_members(session), SlashCommand::Members => cmd_members(session),
SlashCommand::History { count } => cmd_history(session, count), SlashCommand::History { count } => cmd_history(session, count),
@@ -423,6 +586,8 @@ fn print_help() {
display::print_status(" /dm <user[@domain]> - Start or switch to a DM (federation supported)"); display::print_status(" /dm <user[@domain]> - Start or switch to a DM (federation supported)");
display::print_status(" /create-group <name> - Create a new group"); display::print_status(" /create-group <name> - Create a new group");
display::print_status(" /invite <username> - Invite user to current group"); display::print_status(" /invite <username> - Invite user to current group");
display::print_status(" /remove <username> - Remove a member from the current group");
display::print_status(" /leave - Leave the current group");
display::print_status(" /join - Join a group from pending Welcome"); display::print_status(" /join - Join a group from pending Welcome");
display::print_status(" /switch <@user|#group> - Switch conversation"); display::print_status(" /switch <@user|#group> - Switch conversation");
display::print_status(" /list - List all conversations"); display::print_status(" /list - List all conversations");
@@ -541,6 +706,8 @@ async fn cmd_dm(
unread_count: 0, unread_count: 0,
last_activity_ms: now_ms(), last_activity_ms: now_ms(),
created_at_ms: now_ms(), created_at_ms: now_ms(),
is_hybrid: false,
last_seen_seq: 0,
}; };
let ks = DiskKeyStore::ephemeral(); let ks = DiskKeyStore::ephemeral();
let member = GroupMember::new_with_state(Arc::clone(&session.identity), ks, None, false); let member = GroupMember::new_with_state(Arc::clone(&session.identity), ks, None, false);
@@ -569,12 +736,16 @@ async fn cmd_dm(
let kp_bytes = fetch_key_package(client, &peer_key).await?; let kp_bytes = fetch_key_package(client, &peer_key).await?;
anyhow::ensure!(!kp_bytes.is_empty(), "peer has no key package uploaded"); anyhow::ensure!(!kp_bytes.is_empty(), "peer has no key package uploaded");
// Negotiate hybrid mode: use post-quantum MLS if both sides have hybrid keys.
let peer_hybrid_pk = fetch_hybrid_key(client, &peer_key).await?;
let use_hybrid = session.hybrid_kp.is_some() && peer_hybrid_pk.is_some();
// Create MLS group using channel_id as group_id. // Create MLS group using channel_id as group_id.
let ks_dir = session.state_path.with_extension("keystores"); let ks_dir = session.state_path.with_extension("keystores");
std::fs::create_dir_all(&ks_dir).ok(); std::fs::create_dir_all(&ks_dir).ok();
let ks_path = ks_dir.join(format!("{}.ks", conv_id.hex())); let ks_path = ks_dir.join(format!("{}.ks", conv_id.hex()));
let ks = DiskKeyStore::persistent(&ks_path)?; let ks = DiskKeyStore::persistent(&ks_path)?;
let mut member = GroupMember::new_with_state(Arc::clone(&session.identity), ks, None, false); let mut member = GroupMember::new_with_state(Arc::clone(&session.identity), ks, None, use_hybrid);
// Generate a key package for ourselves (needed for MLS) // Generate a key package for ourselves (needed for MLS)
let _my_kp = member.generate_key_package()?; let _my_kp = member.generate_key_package()?;
@@ -583,9 +754,12 @@ async fn cmd_dm(
let (commit, welcome) = member.add_member(&kp_bytes)?; let (commit, welcome) = member.add_member(&kp_bytes)?;
// Deliver welcome to peer and commit to peer. // Deliver welcome to peer and commit to peer.
let peer_hybrid_pk = fetch_hybrid_key(client, &peer_key).await?; // For hybrid MLS groups, the MLS layer already provides PQ protection,
// so we skip the outer hybrid envelope wrapping.
let wrap = |data: &[u8]| -> anyhow::Result<Vec<u8>> { let wrap = |data: &[u8]| -> anyhow::Result<Vec<u8>> {
if let Some(ref pk) = peer_hybrid_pk { if use_hybrid {
Ok(data.to_vec()) // MLS-level hybrid KEM is sufficient
} else if let Some(ref pk) = peer_hybrid_pk {
hybrid_encrypt(pk, data, b"", b"").context("hybrid encrypt") hybrid_encrypt(pk, data, b"", b"").context("hybrid encrypt")
} else { } else {
Ok(data.to_vec()) Ok(data.to_vec())
@@ -613,6 +787,8 @@ async fn cmd_dm(
unread_count: 0, unread_count: 0,
last_activity_ms: now_ms(), last_activity_ms: now_ms(),
created_at_ms: now_ms(), created_at_ms: now_ms(),
is_hybrid: member.is_hybrid(),
last_seen_seq: 0,
}; };
session.add_conversation(conv, member)?; session.add_conversation(conv, member)?;
@@ -655,6 +831,8 @@ fn cmd_create_group(session: &mut SessionState, name: &str) -> anyhow::Result<()
unread_count: 0, unread_count: 0,
last_activity_ms: now_ms(), last_activity_ms: now_ms(),
created_at_ms: now_ms(), created_at_ms: now_ms(),
is_hybrid: false,
last_seen_seq: 0,
}; };
session.add_conversation(conv, member)?; session.add_conversation(conv, member)?;
@@ -735,6 +913,87 @@ async fn cmd_invite(
Ok(()) Ok(())
} }
async fn cmd_remove(
session: &mut SessionState,
client: &node_service::Client,
target: &str,
) -> anyhow::Result<()> {
let conv_id = session
.active_conversation
.as_ref()
.context("no active conversation")?
.clone();
let my_key = session.identity_bytes();
// Resolve the target — username or hex key.
let target_key = if target.len() == 64 && target.chars().all(|c| c.is_ascii_hexdigit()) {
decode_identity_key(target)?
} else {
resolve_user(client, target)
.await?
.with_context(|| format!("user '{target}' not found"))?
};
let member = session
.get_member_mut(&conv_id)
.context("no group member for active conversation")?;
let commit = member.remove_member(&target_key)?;
// Fan-out commit to all remaining members (excluding self).
let remaining: Vec<Vec<u8>> = member
.member_identities()
.into_iter()
.filter(|id| id.as_slice() != my_key.as_slice())
.collect();
for rk in &remaining {
enqueue(client, rk, &commit).await?;
}
session.save_member(&conv_id)?;
display::print_status(&format!("removed {target} from the group"));
Ok(())
}
async fn cmd_leave(
session: &mut SessionState,
client: &node_service::Client,
) -> anyhow::Result<()> {
let conv_id = session
.active_conversation
.as_ref()
.context("no active conversation")?
.clone();
let my_key = session.identity_bytes();
let member = session
.get_member_mut(&conv_id)
.context("no group member for active conversation")?;
let proposal = member.leave_group()?;
// Send leave proposal to all other members.
let others: Vec<Vec<u8>> = member
.member_identities()
.into_iter()
.filter(|id| id.as_slice() != my_key.as_slice())
.collect();
for rk in &others {
enqueue(client, rk, &proposal).await?;
}
// Locally deactivate the conversation (keep history).
session.members.remove(&conv_id);
session.active_conversation = None;
display::print_status("left the conversation");
Ok(())
}
async fn cmd_join( async fn cmd_join(
session: &mut SessionState, session: &mut SessionState,
client: &node_service::Client, client: &node_service::Client,
@@ -814,6 +1073,8 @@ async fn cmd_join(
unread_count: 0, unread_count: 0,
last_activity_ms: now_ms(), last_activity_ms: now_ms(),
created_at_ms: now_ms(), created_at_ms: now_ms(),
is_hybrid: new_member.is_hybrid(),
last_seen_seq: 0,
}; };
session.add_conversation(conv, new_member)?; session.add_conversation(conv, new_member)?;
@@ -971,6 +1232,28 @@ async fn do_send(
Ok(()) Ok(())
} }
// ── Outbox drain ─────────────────────────────────────────────────────────────
async fn drain_outbox(
session: &mut SessionState,
client: &node_service::Client,
) {
let entries = match session.conv_store.load_pending_outbox() {
Ok(e) => e,
Err(_) => return,
};
for entry in entries {
match enqueue(client, &entry.recipient_key, &entry.payload).await {
Ok(_) => {
let _ = session.conv_store.mark_outbox_sent(entry.id);
}
Err(_) => {
let _ = session.conv_store.mark_outbox_failed(entry.id, entry.retry_count + 1);
}
}
}
}
// ── Polling ────────────────────────────────────────────────────────────────── // ── Polling ──────────────────────────────────────────────────────────────────
async fn poll_messages( async fn poll_messages(
@@ -994,6 +1277,7 @@ async fn poll_messages(
// Try each conversation's GroupMember. // Try each conversation's GroupMember.
let conv_ids: Vec<ConversationId> = session.members.keys().cloned().collect(); let conv_ids: Vec<ConversationId> = session.members.keys().cloned().collect();
let my_key = session.identity_bytes();
let mut handled = false; let mut handled = false;
for conv_id in &conv_ids { for conv_id in &conv_ids {
@@ -1003,7 +1287,7 @@ async fn poll_messages(
}; };
match member.receive_message(&mls_payload) { match member.receive_message(&mls_payload) {
Ok(Some(plaintext)) => { Ok(ReceivedMessage::Application(plaintext)) => {
// Metadata protection: try unpad → unseal → parse. // Metadata protection: try unpad → unseal → parse.
// Falls back gracefully for messages from older clients. // Falls back gracefully for messages from older clients.
let (sender_key, app_bytes) = { let (sender_key, app_bytes) = {
@@ -1015,10 +1299,10 @@ async fn poll_messages(
if quicproquo_core::sealed_sender::is_sealed(&after_unpad) { if quicproquo_core::sealed_sender::is_sealed(&after_unpad) {
match quicproquo_core::sealed_sender::unseal(&after_unpad) { match quicproquo_core::sealed_sender::unseal(&after_unpad) {
Ok((sk, inner)) => (sk.to_vec(), inner), Ok((sk, inner)) => (sk.to_vec(), inner),
Err(_) => (session.identity_bytes(), after_unpad), Err(_) => (my_key.clone(), after_unpad),
} }
} else { } else {
(session.identity_bytes(), after_unpad) (my_key.clone(), after_unpad)
} }
}; };
@@ -1087,8 +1371,28 @@ async fn poll_messages(
handled = true; handled = true;
break; break;
} }
Ok(None) => { Ok(ReceivedMessage::StateChanged) => {
// Processed a non-application message (commit, etc.) // Processed a non-application message (commit, proposal, etc.).
// Auto-commit any pending proposals (e.g. leave requests).
if member.has_pending_proposals() {
if let Ok((commit, _welcome)) = member.commit_pending_proposals() {
let remaining: Vec<Vec<u8>> = member
.member_identities()
.into_iter()
.filter(|id| id.as_slice() != my_key.as_slice())
.collect();
for rk in &remaining {
let _ = enqueue(client, rk, &commit).await;
}
}
}
any_changed = true;
handled = true;
break;
}
Ok(ReceivedMessage::SelfRemoved) => {
display::print_status("you were removed from this conversation");
session.members.remove(conv_id);
any_changed = true; any_changed = true;
handled = true; handled = true;
break; break;
@@ -1199,6 +1503,8 @@ async fn try_auto_join(
unread_count: 0, unread_count: 0,
last_activity_ms: now_ms(), last_activity_ms: now_ms(),
created_at_ms: now_ms(), created_at_ms: now_ms(),
is_hybrid: member.is_hybrid(),
last_seen_seq: 0,
}; };
if let Err(e) = session.add_conversation(conv, member) { if let Err(e) = session.add_conversation(conv, member) {

View File

@@ -70,6 +70,10 @@ struct Args {
#[arg(long, env = "QPQ_PASSWORD")] #[arg(long, env = "QPQ_PASSWORD")]
password: Option<String>, password: Option<String>,
/// Do not auto-start a local qpq-server (useful when connecting to a remote server).
#[arg(long, env = "QPQ_NO_SERVER")]
no_server: bool,
#[command(subcommand)] #[command(subcommand)]
command: Option<Command>, command: Option<Command>,
} }
@@ -301,6 +305,9 @@ enum Command {
/// OPAQUE password (prompted securely if --username is set but --password is not). /// OPAQUE password (prompted securely if --username is set but --password is not).
#[arg(long, env = "QPQ_PASSWORD")] #[arg(long, env = "QPQ_PASSWORD")]
password: Option<String>, password: Option<String>,
/// Do not auto-start a local qpq-server.
#[arg(long, env = "QPQ_NO_SERVER")]
no_server: bool,
}, },
/// Interactive 1:1 chat: type to send, incoming messages printed as [peer] <msg>. Ctrl+D to exit. /// Interactive 1:1 chat: type to send, incoming messages printed as [peer] <msg>. Ctrl+D to exit.
@@ -350,11 +357,13 @@ async fn main() -> anyhow::Result<()> {
let state_pw = args.state_password.as_deref(); let state_pw = args.state_password.as_deref();
// Default to REPL when no subcommand is given. // Default to REPL when no subcommand is given.
let no_server = args.no_server;
let command = args.command.unwrap_or_else(|| Command::Repl { let command = args.command.unwrap_or_else(|| Command::Repl {
state: args.state, state: args.state,
server: args.server, server: args.server,
username: args.username, username: args.username,
password: args.password, password: args.password,
no_server,
}); });
match command { match command {
@@ -553,6 +562,7 @@ async fn main() -> anyhow::Result<()> {
server, server,
username, username,
password, password,
no_server,
} => { } => {
let local = tokio::task::LocalSet::new(); let local = tokio::task::LocalSet::new();
local local
@@ -566,6 +576,7 @@ async fn main() -> anyhow::Result<()> {
password.as_deref(), password.as_deref(),
&args.access_token, &args.access_token,
args.device_id.as_deref(), args.device_id.as_deref(),
no_server,
)) ))
.await .await
} }