From 553de3a2b75132b555b70a044297f71b632cf619 Mon Sep 17 00:00:00 2001 From: Christian Nennemann Date: Thu, 26 Feb 2026 22:45:34 +0100 Subject: [PATCH] feat: interactive REPL with auto-setup, auto-join, encrypted local storage MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit REPL auto-setup (zero-friction startup): - OnceLock → RwLock for CLIENT_AUTH to allow delayed init after OPAQUE login - Extract opaque_register/opaque_login helpers from one-shot commands - Token cache (.session file) with QPCE encryption when password provided - Add --username/--password/--state-password to repl subcommand - resolve_access_token: auto-register + login, cache token, prompt interactively - rpassword for secure password input (no echo) Interactive REPL (multi-conversation): - SessionState: identity, hybrid key, ConversationStore, per-conversation GroupMembers - ConversationStore: SQLite-backed conversations + messages with full CRUD - Slash commands: /dm, /group, /invite, /join, /switch, /list, /members, /history, /whoami - Background polling (1s interval) with auto-join from MLS Welcome messages - pending_member pattern: persistent keystore for HPKE init key, replenish after join - Self-DM handled as local-only notepad (no MLS/server channel) - ANSI display module for colored prompts, incoming messages, status/error output Username resolution: - resolveIdentity RPC (@20 in node.capnp): look up username by identity key - Server: resolve_identity_key in Store trait, FileBackedStore, SqlStore - Client: resolve_identity in rpc.rs, used in auto-join for peer display names - resolveUser: bidirectional lookup (username → identity key) Encrypted local storage (nothing in cleartext): - ConversationStore uses SQLCipher when --state-password is provided - Argon2id key derivation with per-database random salt (.convdb-salt, mode 0600) - Transparent migration of existing unencrypted databases via sqlcipher_export - Token cache encrypted with QPCE format (Argon2id + ChaCha20Poly1305) Server changes: - resolveIdentity + resolveUser RPC handlers with auth + validation - Auth: sealed-sender identity binding on enqueue, channel member authorization - Delivery: hybrid decrypt attempts, identity key validation on enqueue - Config: --allow-sealed-sender flag for anonymous delivery mode - zeroize added to server dependencies Co-Authored-By: Claude Opus 4.6 --- Cargo.lock | 13 + crates/quicnprotochat-client/Cargo.toml | 6 + .../src/client/commands.rs | 123 ++ .../src/client/conversation.rs | 563 ++++++++ .../src/client/display.rs | 69 + .../quicnprotochat-client/src/client/mod.rs | 5 + .../quicnprotochat-client/src/client/repl.rs | 1214 +++++++++++++++++ .../quicnprotochat-client/src/client/rpc.rs | 103 +- .../src/client/session.rs | 260 ++++ .../src/client/token_cache.rs | 86 ++ crates/quicnprotochat-client/src/lib.rs | 22 +- crates/quicnprotochat-client/src/main.rs | 56 +- crates/quicnprotochat-server/Cargo.toml | 1 + crates/quicnprotochat-server/src/auth.rs | 44 +- crates/quicnprotochat-server/src/config.rs | 9 + crates/quicnprotochat-server/src/main.rs | 22 +- .../src/node_service/delivery.rs | 65 +- .../src/node_service/key_ops.rs | 16 +- .../src/node_service/mod.rs | 21 + .../src/node_service/user_ops.rs | 94 ++ crates/quicnprotochat-server/src/sql_store.rs | 11 + crates/quicnprotochat-server/src/storage.rs | 13 + schemas/node.capnp | 8 + 23 files changed, 2791 insertions(+), 33 deletions(-) create mode 100644 crates/quicnprotochat-client/src/client/conversation.rs create mode 100644 crates/quicnprotochat-client/src/client/display.rs create mode 100644 crates/quicnprotochat-client/src/client/repl.rs create mode 100644 crates/quicnprotochat-client/src/client/session.rs create mode 100644 crates/quicnprotochat-client/src/client/token_cache.rs create mode 100644 crates/quicnprotochat-server/src/node_service/user_ops.rs diff --git a/Cargo.lock b/Cargo.lock index 2cf5def..1658b1f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3834,6 +3834,8 @@ dependencies = [ "quinn", "quinn-proto", "rand 0.8.5", + "rpassword", + "rusqlite", "rustls", "serde", "serde_json", @@ -3928,6 +3930,7 @@ dependencies = [ "toml 0.8.23", "tracing", "tracing-subscriber", + "zeroize", ] [[package]] @@ -4287,6 +4290,16 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rpassword" +version = "5.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffc936cf8a7ea60c58f030fd36a612a48f440610214dc54bc36431f9ea0c3efb" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "rusqlite" version = "0.31.0" diff --git a/crates/quicnprotochat-client/Cargo.toml b/crates/quicnprotochat-client/Cargo.toml index 5d52709..8656e9e 100644 --- a/crates/quicnprotochat-client/Cargo.toml +++ b/crates/quicnprotochat-client/Cargo.toml @@ -50,9 +50,15 @@ tracing-subscriber = { workspace = true } # CLI clap = { workspace = true } +# Local message/conversation storage +rusqlite = { workspace = true } + # Hex encoding/decoding hex = "0.4" +# Secure password prompting (no echo) +rpassword = "5" + [dev-dependencies] dashmap = { workspace = true } assert_cmd = "2" diff --git a/crates/quicnprotochat-client/src/client/commands.rs b/crates/quicnprotochat-client/src/client/commands.rs index fdcbf9a..417318c 100644 --- a/crates/quicnprotochat-client/src/client/commands.rs +++ b/crates/quicnprotochat-client/src/client/commands.rs @@ -310,6 +310,129 @@ fn derive_identity_for_login( )) } +// ── OPAQUE helpers (used by both one-shot commands and REPL bootstrap) ─────── + +/// Perform OPAQUE registration. Returns Ok(()) on success. +/// The error message contains "E018" if the user already exists. +/// Does NOT require init_auth() — OPAQUE RPCs are unauthenticated. +pub(crate) async fn opaque_register( + client: &quicnprotochat_proto::node_capnp::node_service::Client, + username: &str, + password: &str, + identity_key: Option<&[u8]>, +) -> anyhow::Result<()> { + let mut rng = rand::rngs::OsRng; + + let reg_start = ClientRegistration::::start(&mut rng, password.as_bytes()) + .map_err(|e| anyhow::anyhow!("OPAQUE register start: {e}"))?; + + let mut req = client.opaque_register_start_request(); + { + let mut p = req.get(); + p.set_username(username); + p.set_request(®_start.message.serialize()); + } + let resp = req.send().promise.await.context("opaque_register_start RPC failed")?; + let response_bytes = resp + .get() + .context("register_start: bad response")? + .get_response() + .context("register_start: missing response")? + .to_vec(); + + let reg_response = RegistrationResponse::::deserialize(&response_bytes) + .map_err(|e| anyhow::anyhow!("invalid registration response: {e}"))?; + + let reg_finish = reg_start + .state + .finish( + &mut rng, + password.as_bytes(), + reg_response, + ClientRegistrationFinishParameters::::default(), + ) + .map_err(|e| anyhow::anyhow!("OPAQUE register finish: {e}"))?; + + let mut req = client.opaque_register_finish_request(); + { + let mut p = req.get(); + p.set_username(username); + p.set_upload(®_finish.message.serialize()); + if let Some(ik) = identity_key { + p.set_identity_key(ik); + } else { + p.set_identity_key(&[]); + } + } + let resp = req.send().promise.await.context("opaque_register_finish RPC failed")?; + let success = resp + .get() + .context("register_finish: bad response")? + .get_success(); + + anyhow::ensure!(success, "server rejected registration"); + Ok(()) +} + +/// Perform OPAQUE login and return the raw session token bytes. +/// Does NOT require init_auth() — OPAQUE RPCs are unauthenticated. +pub(crate) async fn opaque_login( + client: &quicnprotochat_proto::node_capnp::node_service::Client, + username: &str, + password: &str, + identity_key: &[u8], +) -> anyhow::Result> { + let mut rng = rand::rngs::OsRng; + + let login_start = ClientLogin::::start(&mut rng, password.as_bytes()) + .map_err(|e| anyhow::anyhow!("OPAQUE login start: {e}"))?; + + let mut req = client.opaque_login_start_request(); + { + let mut p = req.get(); + p.set_username(username); + p.set_request(&login_start.message.serialize()); + } + let resp = req.send().promise.await.context("opaque_login_start RPC failed")?; + let response_bytes = resp + .get() + .context("login_start: bad response")? + .get_response() + .context("login_start: missing response")? + .to_vec(); + + let credential_response = CredentialResponse::::deserialize(&response_bytes) + .map_err(|e| anyhow::anyhow!("invalid credential response: {e}"))?; + + let login_finish = login_start + .state + .finish( + &mut rng, + password.as_bytes(), + credential_response, + ClientLoginFinishParameters::::default(), + ) + .map_err(|e| anyhow::anyhow!("OPAQUE login finish (bad password?): {e}"))?; + + let mut req = client.opaque_login_finish_request(); + { + let mut p = req.get(); + p.set_username(username); + p.set_finalization(&login_finish.message.serialize()); + p.set_identity_key(identity_key); + } + let resp = req.send().promise.await.context("opaque_login_finish RPC failed")?; + let session_token = resp + .get() + .context("login_finish: bad response")? + .get_session_token() + .context("login_finish: missing session_token")? + .to_vec(); + + anyhow::ensure!(!session_token.is_empty(), "server returned empty session token"); + Ok(session_token) +} + /// Generate a KeyPackage for a fresh identity and upload it to the AS. pub async fn cmd_register(server: &str, ca_cert: &Path, server_name: &str) -> anyhow::Result<()> { let identity = IdentityKeypair::generate(); diff --git a/crates/quicnprotochat-client/src/client/conversation.rs b/crates/quicnprotochat-client/src/client/conversation.rs new file mode 100644 index 0000000..94915fa --- /dev/null +++ b/crates/quicnprotochat-client/src/client/conversation.rs @@ -0,0 +1,563 @@ +//! Multi-conversation state backed by SQLite (SQLCipher-encrypted when a +//! password is provided). +//! +//! Each conversation (DM or group) has its own MLS group blob, keystore blob, +//! member list, and message history. + +use std::path::{Path, PathBuf}; +use std::time::{SystemTime, UNIX_EPOCH}; + +use anyhow::Context; +use argon2::{Algorithm, Argon2, Params, Version}; +use rand::RngCore; +use rusqlite::{params, Connection, OptionalExtension}; +use zeroize::Zeroizing; + +// ── Types ──────────────────────────────────────────────────────────────────── + +/// 16-byte conversation identifier. +/// - DMs: the channel_id returned by `createChannel` (server-assigned UUID). +/// - Groups: SHA-256(group_name)[..16]. +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub struct ConversationId(pub [u8; 16]); + +impl ConversationId { + pub fn from_slice(s: &[u8]) -> Option { + if s.len() == 16 { + let mut buf = [0u8; 16]; + buf.copy_from_slice(s); + Some(Self(buf)) + } else { + None + } + } + + /// Derive a conversation ID from a group name via SHA-256 truncation. + pub fn from_group_name(name: &str) -> Self { + use sha2::{Sha256, Digest}; + let hash = Sha256::digest(name.as_bytes()); + let mut buf = [0u8; 16]; + buf.copy_from_slice(&hash[..16]); + Self(buf) + } + + pub fn hex(&self) -> String { + hex::encode(self.0) + } +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum ConversationKind { + /// 1:1 DM channel with a specific peer. + Dm { + peer_key: Vec, + peer_username: Option, + }, + /// Named group with N members. + Group { name: String }, +} + +#[derive(Clone, Debug)] +pub struct Conversation { + pub id: ConversationId, + pub kind: ConversationKind, + pub display_name: String, + /// Serialized MLS group (bincode). + pub mls_group_blob: Option>, + /// Serialized keystore (bincode HashMap). + pub keystore_blob: Option>, + /// Member identity keys (bincode Vec>). + pub member_keys: Vec>, + pub unread_count: u32, + pub last_activity_ms: u64, + pub created_at_ms: u64, +} + +#[derive(Clone, Debug)] +pub struct StoredMessage { + pub conversation_id: ConversationId, + pub message_id: Option<[u8; 16]>, + pub sender_key: Vec, + pub sender_name: Option, + pub body: String, + pub msg_type: String, + pub ref_msg_id: Option<[u8; 16]>, + pub timestamp_ms: u64, + pub is_outgoing: bool, +} + +// ── Key derivation (Argon2id, matching state.rs parameters) ───────────────── + +const ARGON2_M_COST: u32 = 19 * 1024; +const ARGON2_T_COST: u32 = 2; +const ARGON2_P_COST: u32 = 1; +const SALT_LEN: usize = 16; + +/// Derive a 32-byte SQLCipher key from the user password and a random salt. +fn derive_convdb_key(password: &str, salt: &[u8]) -> anyhow::Result> { + let params = Params::new(ARGON2_M_COST, ARGON2_T_COST, ARGON2_P_COST, Some(32)) + .map_err(|e| anyhow::anyhow!("argon2 params: {e}"))?; + let argon2 = Argon2::new(Algorithm::Argon2id, Version::default(), params); + let mut key = Zeroizing::new([0u8; 32]); + argon2 + .hash_password_into(password.as_bytes(), salt, &mut *key) + .map_err(|e| anyhow::anyhow!("convdb key derivation: {e}"))?; + Ok(key) +} + +/// Read or create a 16-byte random salt at `salt_path` (mode 0o600). +fn get_or_create_salt(salt_path: &Path) -> anyhow::Result> { + if salt_path.exists() { + let bytes = std::fs::read(salt_path).context("read convdb salt")?; + anyhow::ensure!(bytes.len() == SALT_LEN, "invalid convdb salt length"); + return Ok(bytes); + } + let mut salt = vec![0u8; SALT_LEN]; + rand::rngs::OsRng.fill_bytes(&mut salt); + std::fs::write(salt_path, &salt).context("write convdb salt")?; + #[cfg(unix)] + { + use std::os::unix::fs::PermissionsExt; + std::fs::set_permissions(salt_path, std::fs::Permissions::from_mode(0o600)).ok(); + } + Ok(salt) +} + +// ── ConversationStore ──────────────────────────────────────────────────────── + +pub struct ConversationStore { + conn: Connection, +} + +impl ConversationStore { + /// Open or create the conversation database at `db_path`. + /// If `password` is `Some`, the database is encrypted with SQLCipher using + /// an Argon2id-derived key. Existing unencrypted databases are migrated + /// transparently. + pub fn open(db_path: &Path, password: Option<&str>) -> anyhow::Result { + if let Some(parent) = db_path.parent() { + std::fs::create_dir_all(parent).ok(); + } + + match password { + Some(pw) => Self::open_encrypted(db_path, pw), + None => Self::open_plain(db_path), + } + } + + fn open_plain(db_path: &Path) -> anyhow::Result { + let conn = Connection::open(db_path).context("open conversation db")?; + conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA foreign_keys=ON;") + .context("set pragmas")?; + Self::migrate(&conn)?; + Ok(Self { conn }) + } + + fn open_encrypted(db_path: &Path, password: &str) -> anyhow::Result { + let salt_path = PathBuf::from(format!("{}-salt", db_path.display())); + let already_encrypted = salt_path.exists(); + + // Migrate an existing unencrypted database before opening with encryption. + if db_path.exists() && !already_encrypted { + Self::migrate_plain_to_encrypted(db_path, &salt_path, password)?; + // After migration, salt file exists and DB is encrypted — fall through. + } + + let salt = get_or_create_salt(&salt_path)?; + let key = derive_convdb_key(password, &salt)?; + let hex_key = hex::encode(&*key); + + let conn = Connection::open(db_path).context("open conversation db")?; + conn.pragma_update(None, "key", format!("x'{hex_key}'")) + .context("set SQLCipher key")?; + conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA foreign_keys=ON;") + .context("set pragmas")?; + Self::migrate(&conn)?; + Ok(Self { conn }) + } + + /// Migrate an unencrypted `.convdb` to an encrypted one in-place. + fn migrate_plain_to_encrypted( + db_path: &Path, + salt_path: &Path, + password: &str, + ) -> anyhow::Result<()> { + let salt = get_or_create_salt(salt_path)?; + let key = derive_convdb_key(password, &salt)?; + let hex_key = hex::encode(&*key); + + let enc_path = db_path.with_extension("convdb-enc"); + + // Open the existing plaintext database. + let plain = Connection::open(db_path).context("open plain convdb for migration")?; + plain.execute_batch("PRAGMA journal_mode=WAL; PRAGMA foreign_keys=ON;").ok(); + + // Attach a new encrypted database and export into it. + plain + .execute_batch(&format!( + "ATTACH DATABASE '{}' AS encrypted KEY \"x'{hex_key}'\";", + enc_path.display() + )) + .context("attach encrypted db for migration")?; + plain + .execute_batch("SELECT sqlcipher_export('encrypted');") + .context("sqlcipher_export to encrypted db")?; + plain + .execute_batch("DETACH DATABASE encrypted;") + .context("detach encrypted db")?; + + drop(plain); + + // Swap files: encrypted → original. + std::fs::rename(&enc_path, db_path).context("replace convdb with encrypted version")?; + // Clean up WAL/SHM left from the plaintext open. + let wal = PathBuf::from(format!("{}-wal", db_path.display())); + let shm = PathBuf::from(format!("{}-shm", db_path.display())); + std::fs::remove_file(&wal).ok(); + std::fs::remove_file(&shm).ok(); + + tracing::info!("migrated conversation database to encrypted storage"); + Ok(()) + } + + fn migrate(conn: &Connection) -> anyhow::Result<()> { + conn.execute_batch( + "CREATE TABLE IF NOT EXISTS conversations ( + id BLOB PRIMARY KEY, + kind TEXT NOT NULL, + display_name TEXT NOT NULL, + peer_key BLOB, + peer_username TEXT, + group_name TEXT, + mls_group_blob BLOB, + keystore_blob BLOB, + member_keys BLOB, + unread_count INTEGER NOT NULL DEFAULT 0, + last_activity_ms INTEGER NOT NULL DEFAULT 0, + created_at_ms INTEGER NOT NULL DEFAULT 0 + ); + + CREATE TABLE IF NOT EXISTS messages ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + conversation_id BLOB NOT NULL REFERENCES conversations(id), + message_id BLOB, + sender_key BLOB NOT NULL, + sender_name TEXT, + body TEXT NOT NULL, + msg_type TEXT NOT NULL, + ref_msg_id BLOB, + timestamp_ms INTEGER NOT NULL, + is_outgoing INTEGER NOT NULL DEFAULT 0 + ); + + CREATE INDEX IF NOT EXISTS idx_messages_conv + ON messages(conversation_id, timestamp_ms);", + ) + .context("migrate conversation db")?; + Ok(()) + } + + // ── Conversation CRUD ──────────────────────────────────────────────── + + pub fn save_conversation(&self, conv: &Conversation) -> anyhow::Result<()> { + let (kind_str, peer_key, peer_username, group_name) = match &conv.kind { + ConversationKind::Dm { + peer_key, + peer_username, + } => ("dm", Some(peer_key.as_slice()), peer_username.as_deref(), None), + ConversationKind::Group { name } => ("group", None, None, Some(name.as_str())), + }; + let member_keys_blob = bincode::serialize(&conv.member_keys) + .context("serialize member_keys")?; + + self.conn.execute( + "INSERT INTO conversations + (id, kind, display_name, peer_key, peer_username, group_name, + mls_group_blob, keystore_blob, member_keys, unread_count, + last_activity_ms, created_at_ms) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12) + ON CONFLICT(id) DO UPDATE SET + display_name = excluded.display_name, + mls_group_blob = excluded.mls_group_blob, + keystore_blob = excluded.keystore_blob, + member_keys = excluded.member_keys, + unread_count = excluded.unread_count, + last_activity_ms = excluded.last_activity_ms", + params![ + conv.id.0.as_slice(), + kind_str, + conv.display_name, + peer_key, + peer_username, + group_name, + conv.mls_group_blob, + conv.keystore_blob, + member_keys_blob, + conv.unread_count, + conv.last_activity_ms, + conv.created_at_ms, + ], + )?; + Ok(()) + } + + pub fn load_conversation(&self, id: &ConversationId) -> anyhow::Result> { + self.conn + .query_row( + "SELECT kind, display_name, peer_key, peer_username, group_name, + mls_group_blob, keystore_blob, member_keys, unread_count, + last_activity_ms, created_at_ms + FROM conversations WHERE id = ?1", + params![id.0.as_slice()], + |row| { + let kind_str: String = row.get(0)?; + let display_name: String = row.get(1)?; + let peer_key: Option> = row.get(2)?; + let peer_username: Option = row.get(3)?; + let group_name: Option = row.get(4)?; + let mls_group_blob: Option> = row.get(5)?; + let keystore_blob: Option> = row.get(6)?; + let member_keys_blob: Option> = row.get(7)?; + let unread_count: u32 = row.get(8)?; + let last_activity_ms: u64 = row.get(9)?; + let created_at_ms: u64 = row.get(10)?; + + let kind = if kind_str == "dm" { + ConversationKind::Dm { + peer_key: peer_key.unwrap_or_default(), + peer_username, + } + } else { + ConversationKind::Group { + name: group_name.unwrap_or_default(), + } + }; + + let member_keys: Vec> = member_keys_blob + .and_then(|b| bincode::deserialize(&b).ok()) + .unwrap_or_default(); + + Ok(Conversation { + id: id.clone(), + kind, + display_name, + mls_group_blob, + keystore_blob, + member_keys, + unread_count, + last_activity_ms, + created_at_ms, + }) + }, + ) + .optional() + .context("load conversation") + } + + pub fn list_conversations(&self) -> anyhow::Result> { + let mut stmt = self.conn.prepare( + "SELECT id, kind, display_name, peer_key, peer_username, group_name, + mls_group_blob, keystore_blob, member_keys, unread_count, + last_activity_ms, created_at_ms + FROM conversations ORDER BY last_activity_ms DESC", + )?; + let rows = stmt.query_map([], |row| { + let id_blob: Vec = row.get(0)?; + let kind_str: String = row.get(1)?; + let display_name: String = row.get(2)?; + let peer_key: Option> = row.get(3)?; + let peer_username: Option = row.get(4)?; + let group_name: Option = row.get(5)?; + let mls_group_blob: Option> = row.get(6)?; + let keystore_blob: Option> = row.get(7)?; + let member_keys_blob: Option> = row.get(8)?; + let unread_count: u32 = row.get(9)?; + let last_activity_ms: u64 = row.get(10)?; + let created_at_ms: u64 = row.get(11)?; + + let id = ConversationId::from_slice(&id_blob).unwrap_or(ConversationId([0; 16])); + let kind = if kind_str == "dm" { + ConversationKind::Dm { + peer_key: peer_key.unwrap_or_default(), + peer_username, + } + } else { + ConversationKind::Group { + name: group_name.unwrap_or_default(), + } + }; + let member_keys: Vec> = member_keys_blob + .and_then(|b| bincode::deserialize(&b).ok()) + .unwrap_or_default(); + + Ok(Conversation { + id, + kind, + display_name, + mls_group_blob, + keystore_blob, + member_keys, + unread_count, + last_activity_ms, + created_at_ms, + }) + })?; + + let mut convs = Vec::new(); + for row in rows { + convs.push(row?); + } + Ok(convs) + } + + /// Find a DM conversation by the peer's identity key. + pub fn find_dm_by_peer(&self, peer_key: &[u8]) -> anyhow::Result> { + let id_blob: Option> = self + .conn + .query_row( + "SELECT id FROM conversations WHERE kind = 'dm' AND peer_key = ?1", + params![peer_key], + |row| row.get(0), + ) + .optional()?; + + match id_blob { + Some(blob) => { + let id = ConversationId::from_slice(&blob) + .context("invalid conversation id in db")?; + self.load_conversation(&id) + } + None => Ok(None), + } + } + + /// Find a group conversation by name. + pub fn find_group_by_name(&self, name: &str) -> anyhow::Result> { + let id_blob: Option> = self + .conn + .query_row( + "SELECT id FROM conversations WHERE kind = 'group' AND group_name = ?1", + params![name], + |row| row.get(0), + ) + .optional()?; + + match id_blob { + Some(blob) => { + let id = ConversationId::from_slice(&blob) + .context("invalid conversation id in db")?; + self.load_conversation(&id) + } + None => Ok(None), + } + } + + pub fn increment_unread(&self, id: &ConversationId) -> anyhow::Result<()> { + self.conn.execute( + "UPDATE conversations SET unread_count = unread_count + 1 WHERE id = ?1", + params![id.0.as_slice()], + )?; + Ok(()) + } + + pub fn reset_unread(&self, id: &ConversationId) -> anyhow::Result<()> { + self.conn.execute( + "UPDATE conversations SET unread_count = 0 WHERE id = ?1", + params![id.0.as_slice()], + )?; + Ok(()) + } + + pub fn update_activity(&self, id: &ConversationId, ts_ms: u64) -> anyhow::Result<()> { + self.conn.execute( + "UPDATE conversations SET last_activity_ms = ?2 WHERE id = ?1 AND last_activity_ms < ?2", + params![id.0.as_slice(), ts_ms], + )?; + Ok(()) + } + + // ── Message CRUD ───────────────────────────────────────────────────── + + pub fn save_message(&self, msg: &StoredMessage) -> anyhow::Result<()> { + self.conn.execute( + "INSERT INTO messages + (conversation_id, message_id, sender_key, sender_name, body, + msg_type, ref_msg_id, timestamp_ms, is_outgoing) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)", + params![ + msg.conversation_id.0.as_slice(), + msg.message_id.as_ref().map(|id| id.as_slice()), + msg.sender_key, + msg.sender_name, + msg.body, + msg.msg_type, + msg.ref_msg_id.as_ref().map(|id| id.as_slice()), + msg.timestamp_ms, + msg.is_outgoing as i32, + ], + )?; + Ok(()) + } + + pub fn load_recent_messages( + &self, + conv_id: &ConversationId, + limit: usize, + ) -> anyhow::Result> { + let mut stmt = self.conn.prepare( + "SELECT message_id, sender_key, sender_name, body, msg_type, + ref_msg_id, timestamp_ms, is_outgoing + FROM messages + WHERE conversation_id = ?1 + ORDER BY timestamp_ms DESC + LIMIT ?2", + )?; + let rows = stmt.query_map(params![conv_id.0.as_slice(), limit as u32], |row| { + let message_id: Option> = row.get(0)?; + let sender_key: Vec = row.get(1)?; + let sender_name: Option = row.get(2)?; + let body: String = row.get(3)?; + let msg_type: String = row.get(4)?; + let ref_msg_id: Option> = row.get(5)?; + let timestamp_ms: u64 = row.get(6)?; + let is_outgoing: i32 = row.get(7)?; + + fn to_16(v: &[u8]) -> Option<[u8; 16]> { + if v.len() == 16 { + let mut buf = [0u8; 16]; + buf.copy_from_slice(v); + Some(buf) + } else { + None + } + } + + Ok(StoredMessage { + conversation_id: conv_id.clone(), + message_id: message_id.as_deref().and_then(to_16), + sender_key, + sender_name, + body, + msg_type, + ref_msg_id: ref_msg_id.as_deref().and_then(to_16), + timestamp_ms, + is_outgoing: is_outgoing != 0, + }) + })?; + + let mut msgs = Vec::new(); + for row in rows { + msgs.push(row?); + } + // Reverse so oldest first + msgs.reverse(); + Ok(msgs) + } +} + +pub fn now_ms() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64 +} diff --git a/crates/quicnprotochat-client/src/client/display.rs b/crates/quicnprotochat-client/src/client/display.rs new file mode 100644 index 0000000..e72c081 --- /dev/null +++ b/crates/quicnprotochat-client/src/client/display.rs @@ -0,0 +1,69 @@ +//! Terminal display helpers for the REPL. + +use super::conversation::StoredMessage; +use super::session::SessionState; + +// ANSI color codes +const RESET: &str = "\x1b[0m"; +const BOLD: &str = "\x1b[1m"; +const DIM: &str = "\x1b[2m"; +const GREEN: &str = "\x1b[32m"; +const CYAN: &str = "\x1b[36m"; +const YELLOW: &str = "\x1b[33m"; + +/// Print the REPL prompt showing the active conversation and unread count. +pub fn print_prompt(session: &SessionState) { + use std::io::Write; + let name = session + .active_display_name() + .unwrap_or_else(|| "no conversation".into()); + let unread = session.total_unread(); + if unread > 0 { + print!("{DIM}[{RESET}{BOLD}{name}{RESET} {YELLOW}{unread} unread{RESET}{DIM}]{RESET} > "); + } else { + print!("{DIM}[{RESET}{BOLD}{name}{RESET}{DIM}]{RESET} > "); + } + let _ = std::io::stdout().flush(); +} + +/// Print an incoming or outgoing message. +pub fn print_message(msg: &StoredMessage) { + if msg.is_outgoing { + println!("\r{GREEN}> {}{RESET}", msg.body); + } else { + let fallback = hex::encode(&msg.sender_key[..4]); + let sender = msg.sender_name.as_deref().unwrap_or(&fallback); + println!("\r{CYAN}{BOLD}[{sender}]{RESET} {}", msg.body); + } +} + +/// Print a message received in real-time (clears current line first). +pub fn print_incoming(sender: &str, body: &str) { + use std::io::Write; + // Clear current line, print message, then re-show prompt context + print!("\r\x1b[2K"); + println!("{CYAN}{BOLD}[{sender}]{RESET} {body}"); + let _ = std::io::stdout().flush(); +} + +/// Print a system/status message. +pub fn print_status(msg: &str) { + println!("{DIM} {msg}{RESET}"); +} + +/// Print an error message. +pub fn print_error(msg: &str) { + println!("{YELLOW} error: {msg}{RESET}"); +} + +/// Format a conversation list entry for `/list`. +pub fn format_conv_line(display_name: &str, kind: &str, unread: u32, members: usize) -> String { + let unread_str = if unread > 0 { + format!(" {YELLOW}({unread} new){RESET}") + } else { + String::new() + }; + format!( + " {BOLD}{display_name}{RESET} {DIM}[{kind}, {members} members]{RESET}{unread_str}" + ) +} diff --git a/crates/quicnprotochat-client/src/client/mod.rs b/crates/quicnprotochat-client/src/client/mod.rs index c76d464..bdddd1f 100644 --- a/crates/quicnprotochat-client/src/client/mod.rs +++ b/crates/quicnprotochat-client/src/client/mod.rs @@ -1,8 +1,13 @@ pub mod commands; +pub mod conversation; +pub mod display; pub mod hex; +pub mod repl; pub mod retry; pub mod rpc; +pub mod session; pub mod state; +pub mod token_cache; pub use commands::*; pub use rpc::{connect_node, enqueue, fetch_all, fetch_hybrid_key, fetch_key_package, fetch_wait, upload_hybrid_key, upload_key_package}; diff --git a/crates/quicnprotochat-client/src/client/repl.rs b/crates/quicnprotochat-client/src/client/repl.rs new file mode 100644 index 0000000..fb0a39c --- /dev/null +++ b/crates/quicnprotochat-client/src/client/repl.rs @@ -0,0 +1,1214 @@ +//! Interactive multi-conversation REPL. +//! +//! Supports slash commands for DMs, groups, invitations, and conversation switching. +//! Background polling fetches messages for all active conversations. + +use std::path::Path; +use std::sync::Arc; +use std::time::Duration; + +use anyhow::Context; +use quicnprotochat_core::{ + AppMessage, DiskKeyStore, GroupMember, IdentityKeypair, hybrid_encrypt, + parse as parse_app_msg, serialize_chat, +}; +use quicnprotochat_proto::node_capnp::node_service; +use tokio::sync::mpsc; +use tokio::time::interval; + +use crate::{ClientAuth, init_auth}; +use super::commands::{opaque_login, opaque_register}; +use super::conversation::{ + now_ms, Conversation, ConversationId, ConversationKind, StoredMessage, +}; +use super::display; +use super::rpc::{ + connect_node, create_channel, enqueue, fetch_hybrid_key, fetch_key_package, + fetch_wait, resolve_identity, resolve_user, try_hybrid_decrypt, upload_hybrid_key, + upload_key_package, +}; +use super::session::SessionState; +use super::state::{decode_identity_key, load_or_init_state}; +use super::token_cache::{clear_cached_session, load_cached_session, save_cached_session}; + +// ── Input parsing ──────────────────────────────────────────────────────────── + +enum Input { + Slash(SlashCommand), + ChatMessage(String), + Empty, +} + +enum SlashCommand { + Help, + Quit, + Whoami, + List, + Switch { target: String }, + Dm { username: String }, + CreateGroup { name: String }, + Invite { target: String }, + Join, + Members, + History { count: usize }, +} + +fn parse_input(line: &str) -> Input { + let trimmed = line.trim(); + if trimmed.is_empty() { + return Input::Empty; + } + if !trimmed.starts_with('/') { + return Input::ChatMessage(trimmed.to_string()); + } + + let parts: Vec<&str> = trimmed.splitn(2, ' ').collect(); + let cmd = parts[0].to_lowercase(); + let arg = parts.get(1).map(|s| s.trim().to_string()); + + match cmd.as_str() { + "/help" | "/h" => Input::Slash(SlashCommand::Help), + "/quit" | "/q" | "/exit" => Input::Slash(SlashCommand::Quit), + "/whoami" => Input::Slash(SlashCommand::Whoami), + "/list" | "/ls" => Input::Slash(SlashCommand::List), + "/switch" | "/sw" => match arg { + Some(target) => Input::Slash(SlashCommand::Switch { target }), + None => { + display::print_error("usage: /switch @username or /switch #groupname"); + Input::Empty + } + }, + "/dm" => match arg { + Some(username) => Input::Slash(SlashCommand::Dm { username }), + None => { + display::print_error("usage: /dm "); + Input::Empty + } + }, + "/create-group" | "/cg" => match arg { + Some(name) => Input::Slash(SlashCommand::CreateGroup { name }), + None => { + display::print_error("usage: /create-group "); + Input::Empty + } + }, + "/invite" => match arg { + Some(target) => Input::Slash(SlashCommand::Invite { target }), + None => { + display::print_error("usage: /invite "); + Input::Empty + } + }, + "/join" => Input::Slash(SlashCommand::Join), + "/members" => Input::Slash(SlashCommand::Members), + "/history" | "/hist" => { + let count = arg.and_then(|s| s.parse().ok()).unwrap_or(20); + Input::Slash(SlashCommand::History { count }) + } + _ => { + display::print_error(&format!("unknown command: {cmd}. Try /help")); + Input::Empty + } + } +} + +// ── REPL entry point ───────────────────────────────────────────────────────── + +pub async fn run_repl( + state_path: &Path, + server: &str, + ca_cert: &Path, + server_name: &str, + password: Option<&str>, + username: Option<&str>, + opaque_password: Option<&str>, + access_token: &str, + device_id: Option<&str>, +) -> anyhow::Result<()> { + // Phase 1: Resolve an access token (auto-register/login if needed). + let resolved_token = resolve_access_token( + state_path, server, ca_cert, server_name, password, + username, opaque_password, access_token, + ).await?; + + // Phase 2: Set the global auth context. + // Session tokens are hex-encoded raw bytes; decode back to raw for the wire format. + // Bearer tokens (explicit --access-token) are used as-is via UTF-8 bytes. + let token_bytes = hex::decode(&resolved_token) + .unwrap_or_else(|_| resolved_token.into_bytes()); + let auth_ctx = ClientAuth::from_raw(token_bytes, device_id.map(String::from)); + init_auth(auth_ctx); + + // Phase 3: Normal REPL startup. + let mut session = SessionState::load(state_path, password)?; + let mut client = connect_node(server, ca_cert, server_name).await?; + + display::print_status(&format!( + "identity: {}", + hex::encode(session.identity.public_key_bytes()) + )); + + // Auto-upload a fresh KeyPackage so peers can /dm us. + // The returned GroupMember holds the HPKE init private key for Welcome decryption. + // If this fails with an auth error, the cached session is stale — re-login. + match auto_upload_keys(&session, &client).await { + Ok(pending) => { + session.pending_member = Some(pending); + } + Err(e) => { + let err_str = format!("{e:#}").to_lowercase(); + if err_str.contains("e003") || err_str.contains("e017") || err_str.contains("invalid accesstoken") { + display::print_status("session expired, re-authenticating..."); + clear_cached_session(state_path); + let fresh_token = resolve_access_token( + state_path, server, ca_cert, server_name, password, + username, opaque_password, access_token, + ).await?; + let token_bytes = hex::decode(&fresh_token) + .unwrap_or_else(|_| fresh_token.into_bytes()); + let auth_ctx = ClientAuth::from_raw(token_bytes, device_id.map(String::from)); + init_auth(auth_ctx); + client = connect_node(server, ca_cert, server_name).await?; + match auto_upload_keys(&session, &client).await { + Ok(pending) => { session.pending_member = Some(pending); } + Err(e2) => { + display::print_error(&format!("key upload after re-auth: {e2:#}")); + } + } + } else { + display::print_error(&format!("key upload: {e:#}")); + display::print_status("peers may not be able to invite you until a KeyPackage is uploaded"); + } + } + } + + let conv_count = session.conv_store.list_conversations()?.len(); + if conv_count > 0 { + display::print_status(&format!("{conv_count} conversations loaded")); + } + display::print_status("type /help for commands, Ctrl+D to exit"); + println!(); + + // Auto-switch to the most recent conversation if there is one. + if let Some(conv) = session.conv_store.list_conversations()?.into_iter().next() { + session.active_conversation = Some(conv.id.clone()); + session.conv_store.reset_unread(&conv.id)?; + } + + let (tx, mut rx) = mpsc::unbounded_channel::>(); + + // Spawn stdin reader (must be spawn_local — capnp-rpc is !Send). + tokio::task::spawn_local({ + let tx = tx.clone(); + async move { + use tokio::io::AsyncBufReadExt; + let mut stdin = tokio::io::BufReader::new(tokio::io::stdin()); + let mut line = String::new(); + loop { + line.clear(); + match stdin.read_line(&mut line).await { + Ok(0) => { let _ = tx.send(None); break; } + Ok(_) => { let _ = tx.send(Some(line.trim().to_string())); } + Err(_) => break, + } + } + } + }); + + let mut poll = interval(Duration::from_millis(1000)); + poll.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + let mut consecutive_errors: u32 = 0; + + display::print_prompt(&session); + + loop { + tokio::select! { + msg = rx.recv() => { + match msg { + Some(None) | None => break, + Some(Some(line)) => { + match parse_input(&line) { + Input::Slash(SlashCommand::Quit) => break, + Input::Slash(cmd) => { + handle_slash(&mut session, &client, cmd).await; + } + Input::ChatMessage(text) => { + handle_send(&mut session, &client, &text).await; + } + Input::Empty => {} + } + display::print_prompt(&session); + } + } + } + _ = poll.tick() => { + match poll_messages(&mut session, &client).await { + Ok(()) => { consecutive_errors = 0; } + Err(e) => { + consecutive_errors += 1; + tracing::warn!(error = format!("{e:#}"), n = consecutive_errors, "poll error"); + if consecutive_errors >= 3 { + display::print_status("connection lost, reconnecting..."); + match connect_node(server, ca_cert, server_name).await { + Ok(new_client) => { + client = new_client; + consecutive_errors = 0; + display::print_status("reconnected"); + display::print_prompt(&session); + } + Err(re) => { + tracing::debug!(error = %re, "reconnect failed"); + } + } + } + } + } + } + } + } + + display::print_status("saving state..."); + session.save_all()?; + println!(); + Ok(()) +} + +// ── Startup helpers ───────────────────────────────────────────────────────── + +/// Generate and upload a fresh KeyPackage (and hybrid key if available) so peers can invite us. +/// +/// Returns the `GroupMember` whose keystore holds the HPKE init private key +/// needed to decrypt the next incoming Welcome. The caller should store it as +/// `session.pending_member`. +async fn auto_upload_keys( + session: &SessionState, + client: &node_service::Client, +) -> anyhow::Result { + let ks_path = session.state_path.with_extension("pending.ks"); + let ks = DiskKeyStore::persistent(&ks_path) + .unwrap_or_else(|_| DiskKeyStore::ephemeral()); + let mut member = GroupMember::new_with_state( + Arc::clone(&session.identity), + ks, + None, + ); + let kp_bytes = member.generate_key_package().context("generate KeyPackage")?; + let id_key = session.identity.public_key_bytes(); + + upload_key_package(client, &id_key, &kp_bytes).await?; + display::print_status("KeyPackage uploaded"); + + if let Some(ref hkp) = session.hybrid_kp { + upload_hybrid_key(client, &id_key, &hkp.public_key()).await?; + display::print_status("hybrid key uploaded"); + } + + Ok(member) +} + +/// Determine the access token, performing OPAQUE registration/login as needed. +async fn resolve_access_token( + state_path: &Path, + server: &str, + ca_cert: &Path, + server_name: &str, + state_password: Option<&str>, + username: Option<&str>, + opaque_password: Option<&str>, + cli_access_token: &str, +) -> anyhow::Result { + // Priority 1: Explicit --access-token (or env var). + if !cli_access_token.is_empty() { + display::print_status("using provided access token"); + return Ok(cli_access_token.to_string()); + } + + // Priority 2: Cached session token from a previous login. + if let Some(cached) = load_cached_session(state_path, state_password) { + display::print_status(&format!("using cached session for '{}'", cached.username)); + return Ok(cached.token_hex); + } + + // Priority 3: Login with --username/--password (or prompt interactively). + let username = match username { + Some(u) => u.to_string(), + None => { + use std::io::Write; + eprint!("Username: "); + std::io::stderr().flush().ok(); + let mut input = String::new(); + std::io::stdin() + .read_line(&mut input) + .context("failed to read username")?; + let trimmed = input.trim().to_string(); + anyhow::ensure!(!trimmed.is_empty(), "username is required"); + trimmed + } + }; + + let opaque_password = match opaque_password { + Some(p) => p.to_string(), + None => { + eprint!("Password: "); + rpassword::read_password().context("failed to read password")? + } + }; + + // Ensure state file exists (creates identity + hybrid key if missing). + let state = load_or_init_state(state_path, state_password)?; + let identity = IdentityKeypair::from_seed(state.identity_seed); + let identity_key = identity.public_key_bytes().to_vec(); + + // Connect for the OPAQUE flow. + let node_client = connect_node(server, ca_cert, server_name).await?; + + // Try registration (idempotent: E018 = already registered → proceed to login). + display::print_status(&format!("registering '{username}'...")); + match opaque_register(&node_client, &username, &opaque_password, Some(&identity_key)).await { + Ok(()) => { + display::print_status(&format!("user '{username}' registered")); + } + Err(e) => { + let msg = format!("{e:#}"); + if msg.contains("E018") || msg.to_lowercase().contains("already") { + display::print_status(&format!("user '{username}' already registered")); + } else { + return Err(e).context("OPAQUE registration failed"); + } + } + } + + // Login (fresh connection in case register left RPC state dirty). + display::print_status(&format!("logging in as '{username}'...")); + let node_client = connect_node(server, ca_cert, server_name).await?; + let token_bytes = + opaque_login(&node_client, &username, &opaque_password, &identity_key).await?; + let token_hex = hex::encode(&token_bytes); + + // Cache for future sessions. + save_cached_session(state_path, &username, &token_hex, state_password)?; + display::print_status("logged in, session cached"); + + Ok(token_hex) +} + +// ── Slash command handlers ─────────────────────────────────────────────────── + +async fn handle_slash( + session: &mut SessionState, + client: &node_service::Client, + cmd: SlashCommand, +) { + let result = match cmd { + SlashCommand::Help => { print_help(); Ok(()) } + SlashCommand::Quit => unreachable!(), + SlashCommand::Whoami => cmd_whoami(session), + SlashCommand::List => cmd_list(session), + SlashCommand::Switch { target } => cmd_switch(session, &target), + SlashCommand::Dm { username } => cmd_dm(session, client, &username).await, + SlashCommand::CreateGroup { name } => cmd_create_group(session, &name), + SlashCommand::Invite { target } => cmd_invite(session, client, &target).await, + SlashCommand::Join => cmd_join(session, client).await, + SlashCommand::Members => cmd_members(session), + SlashCommand::History { count } => cmd_history(session, count), + }; + if let Err(e) = result { + display::print_error(&format!("{e:#}")); + } +} + +fn print_help() { + display::print_status("Commands:"); + display::print_status(" /dm - Start or switch to a DM"); + display::print_status(" /create-group - Create a new group"); + display::print_status(" /invite - Invite user to current group"); + display::print_status(" /join - Join a group from pending Welcome"); + display::print_status(" /switch <@user|#group> - Switch conversation"); + display::print_status(" /list - List all conversations"); + display::print_status(" /members - Show members of current conversation"); + display::print_status(" /history [N] - Show last N messages (default: 20)"); + display::print_status(" /whoami - Show your identity"); + display::print_status(" /quit - Exit"); +} + +fn cmd_whoami(session: &SessionState) -> anyhow::Result<()> { + display::print_status(&format!( + "identity: {}", + hex::encode(session.identity.public_key_bytes()) + )); + display::print_status(&format!( + "hybrid key: {}", + if session.hybrid_kp.is_some() { "yes" } else { "no" } + )); + display::print_status(&format!( + "conversations: {}", + session.members.len() + )); + Ok(()) +} + +fn cmd_list(session: &SessionState) -> anyhow::Result<()> { + let convs = session.conv_store.list_conversations()?; + if convs.is_empty() { + display::print_status("no conversations yet. Try /dm or /create-group "); + return Ok(()); + } + for conv in &convs { + let kind_str = match &conv.kind { + ConversationKind::Dm { .. } => "dm", + ConversationKind::Group { .. } => "group", + }; + let active = session + .active_conversation + .as_ref() + .map(|a| a == &conv.id) + .unwrap_or(false); + let marker = if active { " *" } else { "" }; + println!( + "{}{}", + display::format_conv_line( + &conv.display_name, + kind_str, + conv.unread_count, + conv.member_keys.len(), + ), + marker, + ); + } + Ok(()) +} + +fn cmd_switch(session: &mut SessionState, target: &str) -> anyhow::Result<()> { + let target = target.trim(); + + let conv = if target.starts_with('@') { + let username = &target[1..]; + session.conv_store.list_conversations()?.into_iter().find(|c| { + matches!(&c.kind, ConversationKind::Dm { peer_username: Some(u), .. } if u == username) + }) + } else if target.starts_with('#') { + let name = &target[1..]; + session.conv_store.find_group_by_name(name)? + } else { + // Try as display name + session.conv_store.list_conversations()?.into_iter().find(|c| c.display_name == target) + }; + + match conv { + Some(c) => { + session.conv_store.reset_unread(&c.id)?; + session.active_conversation = Some(c.id); + display::print_status(&format!("switched to {}", c.display_name)); + } + None => { + display::print_error(&format!("conversation not found: {target}")); + } + } + Ok(()) +} + +async fn cmd_dm( + session: &mut SessionState, + client: &node_service::Client, + username: &str, +) -> anyhow::Result<()> { + // Resolve username → identity key. + display::print_status(&format!("resolving {username}...")); + let peer_key = resolve_user(client, username) + .await? + .with_context(|| format!("user '{username}' not found"))?; + + // Self-DM → local-only notepad (no server channel, no MLS). + if peer_key == session.identity_bytes() { + let conv_id = ConversationId::from_group_name(&format!("self-{username}")); + if let Some(existing) = session.conv_store.load_conversation(&conv_id)? { + session.conv_store.reset_unread(&existing.id)?; + session.active_conversation = Some(existing.id); + display::print_status("switched to notes"); + return Ok(()); + } + let conv = Conversation { + id: conv_id.clone(), + kind: ConversationKind::Dm { + peer_key: peer_key.clone(), + peer_username: Some(username.to_string()), + }, + display_name: format!("@{username} (notes)"), + mls_group_blob: None, + keystore_blob: None, + member_keys: vec![peer_key], + unread_count: 0, + last_activity_ms: now_ms(), + created_at_ms: now_ms(), + }; + let ks = DiskKeyStore::ephemeral(); + let member = GroupMember::new_with_state(Arc::clone(&session.identity), ks, None); + session.add_conversation(conv, member)?; + session.active_conversation = Some(conv_id); + display::print_status("notes created — messages here are local only"); + return Ok(()); + } + + // Check if we already have a DM with this peer. + if let Some(existing) = session.conv_store.find_dm_by_peer(&peer_key)? { + session.conv_store.reset_unread(&existing.id)?; + session.active_conversation = Some(existing.id); + display::print_status(&format!("switched to DM with @{username}")); + return Ok(()); + } + + // Create server-side channel. + display::print_status("creating channel..."); + let channel_id = create_channel(client, &peer_key).await?; + let conv_id = ConversationId::from_slice(&channel_id) + .context("server returned invalid channel_id length")?; + + // Fetch peer's KeyPackage for MLS. + display::print_status("fetching peer's key package..."); + let kp_bytes = fetch_key_package(client, &peer_key).await?; + anyhow::ensure!(!kp_bytes.is_empty(), "peer has no key package uploaded"); + + // Create MLS group using channel_id as group_id. + let ks_dir = session.state_path.with_extension("keystores"); + std::fs::create_dir_all(&ks_dir).ok(); + let ks_path = ks_dir.join(format!("{}.ks", conv_id.hex())); + let ks = DiskKeyStore::persistent(&ks_path)?; + let mut member = GroupMember::new_with_state(Arc::clone(&session.identity), ks, None); + + // Generate a key package for ourselves (needed for MLS) + let _my_kp = member.generate_key_package()?; + + member.create_group(&channel_id)?; + let (commit, welcome) = member.add_member(&kp_bytes)?; + + // Deliver welcome to peer and commit to peer. + let peer_hybrid_pk = fetch_hybrid_key(client, &peer_key).await?; + let wrap = |data: &[u8]| -> anyhow::Result> { + if let Some(ref pk) = peer_hybrid_pk { + hybrid_encrypt(pk, data, b"", b"").context("hybrid encrypt") + } else { + Ok(data.to_vec()) + } + }; + + enqueue(client, &peer_key, &wrap(&welcome)?).await?; + enqueue(client, &peer_key, &wrap(&commit)?).await?; + + let member_keys = member.member_identities(); + let conv = Conversation { + id: conv_id.clone(), + kind: ConversationKind::Dm { + peer_key: peer_key.clone(), + peer_username: Some(username.to_string()), + }, + display_name: format!("@{username}"), + mls_group_blob: member + .group_ref() + .map(|g| bincode::serialize(g)) + .transpose() + .context("serialize group")?, + keystore_blob: None, + member_keys, + unread_count: 0, + last_activity_ms: now_ms(), + created_at_ms: now_ms(), + }; + + session.add_conversation(conv, member)?; + session.active_conversation = Some(conv_id); + + display::print_status(&format!("DM with @{username} created. Start typing!")); + Ok(()) +} + +fn cmd_create_group(session: &mut SessionState, name: &str) -> anyhow::Result<()> { + let conv_id = ConversationId::from_group_name(name); + + if session.conv_store.find_group_by_name(name)?.is_some() { + session.active_conversation = Some(conv_id); + display::print_status(&format!("switched to existing group #{name}")); + return Ok(()); + } + + let ks_dir = session.state_path.with_extension("keystores"); + std::fs::create_dir_all(&ks_dir).ok(); + let ks_path = ks_dir.join(format!("{}.ks", conv_id.hex())); + let ks = DiskKeyStore::persistent(&ks_path)?; + let mut member = GroupMember::new_with_state(Arc::clone(&session.identity), ks, None); + + let _my_kp = member.generate_key_package()?; + member.create_group(conv_id.0.as_slice())?; + + let member_keys = member.member_identities(); + let conv = Conversation { + id: conv_id.clone(), + kind: ConversationKind::Group { name: name.to_string() }, + display_name: format!("#{name}"), + mls_group_blob: member + .group_ref() + .map(|g| bincode::serialize(g)) + .transpose() + .context("serialize group")?, + keystore_blob: None, + member_keys, + unread_count: 0, + last_activity_ms: now_ms(), + created_at_ms: now_ms(), + }; + + session.add_conversation(conv, member)?; + session.active_conversation = Some(conv_id); + + display::print_status(&format!("group #{name} created. Use /invite to add members")); + Ok(()) +} + +async fn cmd_invite( + session: &mut SessionState, + client: &node_service::Client, + target: &str, +) -> anyhow::Result<()> { + let conv_id = session + .active_conversation + .as_ref() + .context("no active conversation; switch to a group first")? + .clone(); + + let my_key = session.identity_bytes(); + + // Resolve the target — could be a username or hex key. + let peer_key = if target.len() == 64 && target.chars().all(|c| c.is_ascii_hexdigit()) { + decode_identity_key(target)? + } else { + display::print_status(&format!("resolving {target}...")); + resolve_user(client, target) + .await? + .with_context(|| format!("user '{target}' not found"))? + }; + + display::print_status("fetching key package..."); + let kp_bytes = fetch_key_package(client, &peer_key).await?; + anyhow::ensure!(!kp_bytes.is_empty(), "peer has no key package uploaded"); + + let member = session + .get_member_mut(&conv_id) + .context("no group member for active conversation")?; + + anyhow::ensure!( + member.group_ref().is_some(), + "active conversation has no MLS group" + ); + + let (commit, welcome) = member.add_member(&kp_bytes)?; + + // Get member list before dropping the mutable borrow. + let other_members: Vec> = member + .member_identities() + .into_iter() + .filter(|id| id.as_slice() != my_key.as_slice() && id.as_slice() != peer_key.as_slice()) + .collect(); + + // Deliver welcome to new member. + let peer_hybrid_pk = fetch_hybrid_key(client, &peer_key).await?; + let wrap = |data: &[u8]| -> anyhow::Result> { + if let Some(ref pk) = peer_hybrid_pk { + hybrid_encrypt(pk, data, b"", b"").context("hybrid encrypt") + } else { + Ok(data.to_vec()) + } + }; + enqueue(client, &peer_key, &wrap(&welcome)?).await?; + + for mk in &other_members { + let pk = fetch_hybrid_key(client, mk).await?; + let payload = if let Some(ref pk) = pk { + hybrid_encrypt(pk, &commit, b"", b"").context("hybrid encrypt commit")? + } else { + commit.clone() + }; + enqueue(client, mk, &payload).await?; + } + + session.save_member(&conv_id)?; + display::print_status(&format!("invited {target} and broadcast commit")); + Ok(()) +} + +async fn cmd_join( + session: &mut SessionState, + client: &node_service::Client, +) -> anyhow::Result<()> { + display::print_status("checking for pending Welcome messages..."); + + let identity_bytes = session.identity_bytes(); + let mut payloads = fetch_wait(client, &identity_bytes, 0).await?; + payloads.sort_by_key(|(seq, _)| *seq); + + if payloads.is_empty() { + display::print_status("no pending messages"); + return Ok(()); + } + + for (_seq, payload) in &payloads { + let mls_payload = match try_hybrid_decrypt(session.hybrid_kp.as_ref(), payload) { + Ok(b) => b, + Err(_) => continue, + }; + + // Try to process with existing groups first + let mut handled = false; + for (_cid, member) in &mut session.members { + match member.receive_message(&mls_payload) { + Ok(_) => { handled = true; break; } + Err(_) => continue, + } + } + if handled { + continue; + } + + // Not handled by any existing group — try as Welcome. + let ks = DiskKeyStore::ephemeral(); + let mut new_member = GroupMember::new_with_state( + Arc::clone(&session.identity), + ks, + None, + ); + // Need a key package to decrypt Welcome. + let _kp = new_member.generate_key_package()?; + + match new_member.join_group(&mls_payload) { + Ok(()) => { + let group_id = new_member + .group_id() + .unwrap_or_default(); + + let conv_id = if group_id.len() >= 16 { + ConversationId::from_slice(&group_id[..16]) + .unwrap_or_else(|| ConversationId::from_group_name(&hex::encode(&group_id))) + } else { + ConversationId::from_group_name(&hex::encode(&group_id)) + }; + + // Check if conversation already exists. + if session.members.contains_key(&conv_id) { + continue; + } + + let member_keys = new_member.member_identities(); + let display = format!("joined-{}", &hex::encode(&group_id)[..8.min(group_id.len())]); + + let conv = Conversation { + id: conv_id.clone(), + kind: ConversationKind::Group { name: display.clone() }, + display_name: format!("#{display}"), + mls_group_blob: new_member + .group_ref() + .map(|g| bincode::serialize(g)) + .transpose() + .context("serialize joined group")?, + keystore_blob: None, + member_keys, + unread_count: 0, + last_activity_ms: now_ms(), + created_at_ms: now_ms(), + }; + + session.add_conversation(conv, new_member)?; + session.active_conversation = Some(conv_id.clone()); + display::print_status(&format!("joined group #{display}")); + } + Err(_) => { + // Not a Welcome either — skip. + } + } + } + + session.save_all()?; + Ok(()) +} + +fn cmd_members(session: &SessionState) -> anyhow::Result<()> { + let conv_id = session + .active_conversation + .as_ref() + .context("no active conversation")?; + + let member = session + .members + .get(conv_id) + .context("no group member for active conversation")?; + + let my_key = session.identity_bytes(); + let ids = member.member_identities(); + display::print_status(&format!("{} members:", ids.len())); + for id in &ids { + let tag = if id.as_slice() == my_key.as_slice() { " (you)" } else { "" }; + display::print_status(&format!(" {}{tag}", hex::encode(&id[..8]))); + } + Ok(()) +} + +fn cmd_history(session: &SessionState, count: usize) -> anyhow::Result<()> { + let conv_id = session + .active_conversation + .as_ref() + .context("no active conversation")?; + + let msgs = session.conv_store.load_recent_messages(conv_id, count)?; + if msgs.is_empty() { + display::print_status("no messages yet"); + return Ok(()); + } + for msg in &msgs { + display::print_message(msg); + } + Ok(()) +} + +// ── Sending ────────────────────────────────────────────────────────────────── + +async fn handle_send( + session: &mut SessionState, + client: &node_service::Client, + text: &str, +) { + if let Err(e) = do_send(session, client, text).await { + display::print_error(&format!("{e:#}")); + } +} + +async fn do_send( + session: &mut SessionState, + client: &node_service::Client, + text: &str, +) -> anyhow::Result<()> { + let conv_id = session + .active_conversation + .as_ref() + .context("no active conversation; use /dm or /create-group first")? + .clone(); + + let my_key = session.identity_bytes(); + + let member = session + .get_member_mut(&conv_id) + .context("no group member")?; + + // Local-only notepad (self-DM): save message without MLS/delivery. + if member.group_ref().is_none() { + let msg = StoredMessage { + conversation_id: conv_id.clone(), + message_id: None, + sender_key: my_key, + sender_name: Some("you".into()), + body: text.to_string(), + msg_type: "chat".into(), + ref_msg_id: None, + timestamp_ms: now_ms(), + is_outgoing: true, + }; + session.conv_store.save_message(&msg)?; + session.conv_store.update_activity(&conv_id, now_ms())?; + return Ok(()); + } + + // Wrap in structured AppMessage format with a unique message_id. + let app_payload = serialize_chat(text.as_bytes(), None) + .context("serialize app message")?; + + let ct = member + .send_message(&app_payload) + .context("MLS send_message failed")?; + + let recipients: Vec> = member + .member_identities() + .into_iter() + .filter(|id| id.as_slice() != my_key.as_slice()) + .collect(); + + for recipient_key in &recipients { + let peer_hybrid_pk = fetch_hybrid_key(client, recipient_key).await?; + let payload = if let Some(ref pk) = peer_hybrid_pk { + hybrid_encrypt(pk, &ct, b"", b"").context("hybrid encrypt")? + } else { + ct.clone() + }; + enqueue(client, recipient_key, &payload).await?; + } + + // Extract message_id from what we just serialized. + let msg_id = parse_app_msg(&app_payload) + .ok() + .and_then(|(_, m)| match m { + AppMessage::Chat { message_id, .. } => Some(message_id), + _ => None, + }); + + // Save outgoing message to history. + let msg = StoredMessage { + conversation_id: conv_id.clone(), + message_id: msg_id, + sender_key: my_key, + sender_name: Some("you".into()), + body: text.to_string(), + msg_type: "chat".into(), + ref_msg_id: None, + timestamp_ms: now_ms(), + is_outgoing: true, + }; + session.conv_store.save_message(&msg)?; + session.conv_store.update_activity(&conv_id, now_ms())?; + session.save_member(&conv_id)?; + + Ok(()) +} + +// ── Polling ────────────────────────────────────────────────────────────────── + +async fn poll_messages( + session: &mut SessionState, + client: &node_service::Client, +) -> anyhow::Result<()> { + let identity_bytes = session.identity_bytes(); + let mut payloads = fetch_wait(client, &identity_bytes, 0).await?; + if payloads.is_empty() { + return Ok(()); + } + payloads.sort_by_key(|(seq, _)| *seq); + + let mut any_changed = false; + + for (_seq, payload) in &payloads { + let mls_payload = match try_hybrid_decrypt(session.hybrid_kp.as_ref(), payload) { + Ok(b) => b, + Err(_) => payload.clone(), // Try raw (non-hybrid) + }; + + // Try each conversation's GroupMember. + let conv_ids: Vec = session.members.keys().cloned().collect(); + let mut handled = false; + + for conv_id in &conv_ids { + let member = match session.members.get_mut(conv_id) { + Some(m) => m, + None => continue, + }; + + match member.receive_message(&mls_payload) { + Ok(Some(plaintext)) => { + // Parse structured AppMessage; fall back to raw UTF-8 for legacy. + let (body, msg_id, msg_type, ref_msg_id) = + match parse_app_msg(&plaintext) { + Ok((_, AppMessage::Chat { message_id, body })) => ( + String::from_utf8_lossy(&body).to_string(), + Some(message_id), + "chat", + None, + ), + Ok((_, AppMessage::Reply { ref_msg_id, body })) => ( + String::from_utf8_lossy(&body).to_string(), + None, + "reply", + Some(ref_msg_id), + ), + Ok((_, AppMessage::Reaction { ref_msg_id, emoji })) => ( + String::from_utf8_lossy(&emoji).to_string(), + None, + "reaction", + Some(ref_msg_id), + ), + _ => { + // Legacy raw plaintext or unknown type. + ( + String::from_utf8_lossy(&plaintext).to_string(), + None, + "chat", + None, + ) + } + }; + + let sender_key = session.identity_bytes(); // fallback + + let msg = StoredMessage { + conversation_id: conv_id.clone(), + message_id: msg_id, + sender_key: sender_key.clone(), + sender_name: None, + body: body.clone(), + msg_type: msg_type.into(), + ref_msg_id, + timestamp_ms: now_ms(), + is_outgoing: false, + }; + session.conv_store.save_message(&msg)?; + session.conv_store.update_activity(conv_id, now_ms())?; + + let is_active = session + .active_conversation + .as_ref() + .map(|a| a == conv_id) + .unwrap_or(false); + + if is_active { + let conv = session.conv_store.load_conversation(conv_id)?; + let conv_name = conv.map(|c| c.display_name).unwrap_or_default(); + display::print_incoming(&conv_name, &body); + display::print_prompt(session); + } else { + session.conv_store.increment_unread(conv_id)?; + } + + any_changed = true; + handled = true; + break; + } + Ok(None) => { + // Processed a non-application message (commit, etc.) + any_changed = true; + handled = true; + break; + } + Err(_) => continue, + } + } + + if !handled { + // Try as a Welcome — auto-join the group (IRC-like behaviour). + handled = try_auto_join(session, client, &mls_payload).await; + if handled { + any_changed = true; + } else { + tracing::debug!("unhandled payload (not Welcome or existing group)"); + } + } + } + + if any_changed { + session.save_all()?; + } + + Ok(()) +} + +/// Attempt to process `mls_payload` as a Welcome message using the pending +/// KeyPackage member. On success, creates the conversation automatically +/// (IRC-like auto-join) and uploads a fresh KeyPackage for future invites. +async fn try_auto_join( + session: &mut SessionState, + client: &node_service::Client, + mls_payload: &[u8], +) -> bool { + let pending = match session.pending_member.as_mut() { + Some(p) => p, + None => return false, + }; + + if pending.join_group(mls_payload).is_err() { + return false; + } + + // Successfully joined — extract group info. + let group_id = pending.group_id().unwrap_or_default(); + let conv_id = if group_id.len() >= 16 { + ConversationId::from_slice(&group_id[..16]) + .unwrap_or_else(|| ConversationId::from_group_name(&hex::encode(&group_id))) + } else { + ConversationId::from_group_name(&hex::encode(&group_id)) + }; + + // Already know this conversation — skip. + if session.members.contains_key(&conv_id) { + return false; + } + + // Take ownership of the pending member. + let member = session.pending_member.take().unwrap(); + let member_keys = member.member_identities(); + + // Figure out the peer (any member that isn't us). + let my_key = session.identity_bytes(); + let peer_key = member_keys + .iter() + .find(|k| k.as_slice() != my_key.as_slice()) + .cloned(); + + // Resolve the peer's username from the server (best-effort). + let peer_username = if let Some(ref pk) = peer_key { + resolve_identity(client, pk).await.ok().flatten() + } else { + None + }; + + // Build display name: "@username" if known, else "@". + let display_name = if let Some(ref uname) = peer_username { + format!("@{uname}") + } else if let Some(ref pk) = peer_key { + format!("@{}", &hex::encode(&pk[..4.min(pk.len())])) + } else { + let id_hex = hex::encode(&group_id); + format!("#{}", &id_hex[..8.min(id_hex.len())]) + }; + + let kind = if let Some(pk) = peer_key { + ConversationKind::Dm { + peer_key: pk, + peer_username, + } + } else { + ConversationKind::Group { + name: display_name.clone(), + } + }; + + let mls_blob = member + .group_ref() + .and_then(|g| bincode::serialize(g).ok()); + + let conv = Conversation { + id: conv_id.clone(), + kind, + display_name: display_name.clone(), + mls_group_blob: mls_blob, + keystore_blob: None, + member_keys, + unread_count: 0, + last_activity_ms: now_ms(), + created_at_ms: now_ms(), + }; + + if let Err(e) = session.add_conversation(conv, member) { + display::print_error(&format!("auto-join failed: {e:#}")); + return false; + } + + // Auto-switch if no active conversation. + if session.active_conversation.is_none() { + session.active_conversation = Some(conv_id); + } + + display::print_incoming("system", &format!("new conversation: {display_name}")); + display::print_prompt(session); + + // Upload a fresh KeyPackage so we can receive more invites. + replenish_pending_key(session, client).await; + + true +} + +/// Upload a new KeyPackage and store the member as `pending_member` so we're +/// ready for the next incoming Welcome. +async fn replenish_pending_key( + session: &mut SessionState, + client: &node_service::Client, +) { + match auto_upload_keys(session, client).await { + Ok(pending) => { + session.pending_member = Some(pending); + } + Err(e) => { + tracing::warn!(error = format!("{e:#}"), "failed to replenish KeyPackage"); + } + } +} diff --git a/crates/quicnprotochat-client/src/client/rpc.rs b/crates/quicnprotochat-client/src/client/rpc.rs index 44fc60c..35fd6be 100644 --- a/crates/quicnprotochat-client/src/client/rpc.rs +++ b/crates/quicnprotochat-client/src/client/rpc.rs @@ -76,7 +76,8 @@ pub async fn connect_node( } pub fn set_auth(auth: &mut auth::Builder<'_>) -> anyhow::Result<()> { - let ctx = AUTH_CONTEXT.get().ok_or_else(|| { + let guard = AUTH_CONTEXT.read().expect("AUTH_CONTEXT poisoned"); + let ctx = guard.as_ref().ok_or_else(|| { anyhow::anyhow!( "init_auth must be called before RPCs (use a bearer or session token for authenticated commands)" ) @@ -574,6 +575,106 @@ pub async fn batch_enqueue( .await } +/// Resolve a username to its Ed25519 identity key (32 bytes). +/// Returns `None` if the username is not registered. +pub async fn resolve_user( + client: &node_service::Client, + username: &str, +) -> anyhow::Result>> { + let mut req = client.resolve_user_request(); + { + let mut p = req.get(); + p.set_username(username); + let mut auth = p.reborrow().init_auth(); + set_auth(&mut auth)?; + } + + let resp = req + .send() + .promise + .await + .context("resolve_user RPC failed")?; + + let key = resp + .get() + .context("resolve_user: bad response")? + .get_identity_key() + .context("resolve_user: missing field")? + .to_vec(); + + if key.is_empty() { + Ok(None) + } else { + Ok(Some(key)) + } +} + +/// Reverse lookup: resolve an identity key to the registered username. +/// Returns `None` if no username is associated with the key. +pub async fn resolve_identity( + client: &node_service::Client, + identity_key: &[u8], +) -> anyhow::Result> { + let mut req = client.resolve_identity_request(); + { + let mut p = req.get(); + p.set_identity_key(identity_key); + let mut auth = p.reborrow().init_auth(); + set_auth(&mut auth)?; + } + + let resp = req + .send() + .promise + .await + .context("resolve_identity RPC failed")?; + + let username = resp + .get() + .context("resolve_identity: bad response")? + .get_username() + .context("resolve_identity: missing field")? + .to_str() + .unwrap_or("") + .to_string(); + + if username.is_empty() { + Ok(None) + } else { + Ok(Some(username)) + } +} + +/// Create a 1:1 DM channel with a peer. Returns the 16-byte channel ID. +/// If a channel already exists between the two users, returns the existing ID. +pub async fn create_channel( + client: &node_service::Client, + peer_key: &[u8], +) -> anyhow::Result> { + let mut req = client.create_channel_request(); + { + let mut p = req.get(); + p.set_peer_key(peer_key); + let mut auth = p.reborrow().init_auth(); + set_auth(&mut auth)?; + } + + let resp = req + .send() + .promise + .await + .context("create_channel RPC failed")?; + + let channel_id = resp + .get() + .context("create_channel: bad response")? + .get_channel_id() + .context("create_channel: missing channel_id")? + .to_vec(); + + Ok(channel_id) +} + /// Return the current Unix timestamp in milliseconds. pub fn current_timestamp_ms() -> u64 { std::time::SystemTime::now() diff --git a/crates/quicnprotochat-client/src/client/session.rs b/crates/quicnprotochat-client/src/client/session.rs new file mode 100644 index 0000000..c71962c --- /dev/null +++ b/crates/quicnprotochat-client/src/client/session.rs @@ -0,0 +1,260 @@ +//! Runtime session state for the interactive REPL. +//! +//! Wraps the legacy `StoredState` (identity + hybrid key) and adds +//! multi-conversation management via `ConversationStore`. + +use std::collections::HashMap; +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +use anyhow::Context; + +use quicnprotochat_core::{DiskKeyStore, GroupMember, HybridKeypair, IdentityKeypair}; + +use super::conversation::{ + now_ms, Conversation, ConversationId, ConversationKind, ConversationStore, +}; +use super::state::{load_or_init_state, keystore_path}; + +/// Runtime state for an interactive REPL session. +pub struct SessionState { + /// Long-term identity keypair. + pub identity: Arc, + /// Post-quantum hybrid keypair. + pub hybrid_kp: Option, + /// Path to the legacy state file (for backward compat with one-shot commands). + pub state_path: PathBuf, + /// Optional password for the legacy state file. + pub password: Option, + /// SQLite-backed conversation + message store. + pub conv_store: ConversationStore, + /// Currently active conversation. + pub active_conversation: Option, + /// In-memory GroupMember instances keyed by conversation ID. + pub members: HashMap, + /// Holds the GroupMember whose KeyPackage was uploaded to the server. + /// Its keystore contains the HPKE init private key needed to decrypt + /// incoming Welcome messages. Consumed on auto-join, then replenished. + pub pending_member: Option, +} + +impl SessionState { + /// Load identity from the legacy state file, open the conversation store, + /// and migrate any existing single-group state into the conversation DB. + pub fn load( + state_path: &Path, + password: Option<&str>, + ) -> anyhow::Result { + let state = load_or_init_state(state_path, password)?; + + let identity = Arc::new(IdentityKeypair::from_seed(state.identity_seed)); + let hybrid_kp = state + .hybrid_key + .as_ref() + .map(|b| HybridKeypair::from_bytes(b)) + .transpose() + .context("decode hybrid key")?; + + // Open the conversation DB next to the state file. + // When a state password is provided, encrypt the DB with SQLCipher. + let db_path = state_path.with_extension("convdb"); + let conv_store = ConversationStore::open(&db_path, password)?; + + let mut session = Self { + identity, + hybrid_kp, + state_path: state_path.to_path_buf(), + password: password.map(String::from), + conv_store, + active_conversation: None, + members: HashMap::new(), + pending_member: None, + }; + + // Migrate legacy single-group into conversations if present and not yet migrated. + if state.group.is_some() { + session.migrate_legacy_group(state_path, &state.group)?; + } + + // Load all existing conversations' GroupMembers into memory. + session.load_all_members()?; + + Ok(session) + } + + /// Migrate the legacy single-group from StoredState into the conversation DB. + fn migrate_legacy_group( + &mut self, + state_path: &Path, + group_blob: &Option>, + ) -> anyhow::Result<()> { + let blob = match group_blob { + Some(b) => b, + None => return Ok(()), + }; + + // Reconstruct GroupMember using the legacy keystore and group blob. + let ks_path = keystore_path(state_path); + let ks = DiskKeyStore::persistent(&ks_path)?; + let group = bincode::deserialize(blob).context("decode legacy group")?; + let member = GroupMember::new_with_state( + Arc::clone(&self.identity), + ks, + Some(group), + ); + + let group_id_bytes = member.group_id().unwrap_or_default(); + + // Use the first 16 bytes of the group_id as the ConversationId. + let conv_id = if group_id_bytes.len() >= 16 { + ConversationId::from_slice(&group_id_bytes[..16]) + .unwrap_or_else(|| ConversationId([0; 16])) + } else { + ConversationId::from_group_name(&hex::encode(&group_id_bytes)) + }; + + // Check if already migrated. + if self.conv_store.load_conversation(&conv_id)?.is_some() { + return Ok(()); + } + + let member_keys = member.member_identities(); + let short_id = &hex::encode(&group_id_bytes)[..8.min(group_id_bytes.len() * 2)]; + let conv = Conversation { + id: conv_id.clone(), + kind: ConversationKind::Group { + name: format!("legacy-{short_id}"), + }, + display_name: format!("legacy-{short_id}"), + mls_group_blob: Some(blob.clone()), + keystore_blob: None, + member_keys, + unread_count: 0, + last_activity_ms: now_ms(), + created_at_ms: now_ms(), + }; + + self.conv_store.save_conversation(&conv)?; + self.members.insert(conv_id, member); + + Ok(()) + } + + /// Load all conversations from the DB and create in-memory GroupMember instances. + fn load_all_members(&mut self) -> anyhow::Result<()> { + let convs = self.conv_store.list_conversations()?; + for conv in convs { + if self.members.contains_key(&conv.id) { + continue; + } + let member = self.create_member_from_conv(&conv)?; + self.members.insert(conv.id.clone(), member); + } + Ok(()) + } + + /// Create a GroupMember from a stored conversation. + fn create_member_from_conv(&self, conv: &Conversation) -> anyhow::Result { + let ks_path = self.keystore_path_for(&conv.id); + let ks = DiskKeyStore::persistent(&ks_path) + .unwrap_or_else(|_| DiskKeyStore::ephemeral()); + + let group = conv + .mls_group_blob + .as_ref() + .map(|b| bincode::deserialize(b)) + .transpose() + .context("decode MLS group from conversation db")?; + + Ok(GroupMember::new_with_state( + Arc::clone(&self.identity), + ks, + group, + )) + } + + /// Path for a per-conversation keystore file. + fn keystore_path_for(&self, conv_id: &ConversationId) -> PathBuf { + let dir = self.state_path.with_extension("keystores"); + dir.join(format!("{}.ks", conv_id.hex())) + } + + /// Persist a conversation's MLS group state back to the DB. + pub fn save_member(&self, conv_id: &ConversationId) -> anyhow::Result<()> { + let member = self.members.get(conv_id).context("no such conversation")?; + let blob = member + .group_ref() + .map(|g| bincode::serialize(g)) + .transpose() + .context("serialize MLS group")?; + + let member_keys = member.member_identities(); + + // Update the mls_group_blob and member_keys in the DB. + if let Some(mut conv) = self.conv_store.load_conversation(conv_id)? { + conv.mls_group_blob = blob; + conv.member_keys = member_keys; + self.conv_store.save_conversation(&conv)?; + } + + Ok(()) + } + + /// Persist all in-memory group states back to the DB. + pub fn save_all(&self) -> anyhow::Result<()> { + for conv_id in self.members.keys() { + if let Err(e) = self.save_member(conv_id) { + tracing::warn!(conv = %conv_id.hex(), error = %e, "failed to save conversation"); + } + } + Ok(()) + } + + /// Add a new conversation and its GroupMember to the session. + pub fn add_conversation( + &mut self, + conv: Conversation, + member: GroupMember, + ) -> anyhow::Result<()> { + // Ensure keystore directory exists + let ks_path = self.keystore_path_for(&conv.id); + if let Some(parent) = ks_path.parent() { + std::fs::create_dir_all(parent).ok(); + } + + self.conv_store.save_conversation(&conv)?; + self.members.insert(conv.id.clone(), member); + Ok(()) + } + + /// Get a mutable reference to a conversation's GroupMember. + pub fn get_member_mut(&mut self, conv_id: &ConversationId) -> Option<&mut GroupMember> { + self.members.get_mut(conv_id) + } + + /// Public key bytes for this identity. + pub fn identity_bytes(&self) -> Vec { + self.identity.public_key_bytes().to_vec() + } + + /// Short hex prefix of the identity key for display. + pub fn identity_short(&self) -> String { + hex::encode(&self.identity.public_key_bytes()[..4]) + } + + /// Get display name of a conversation. + pub fn active_display_name(&self) -> Option { + let id = self.active_conversation.as_ref()?; + self.conv_store.load_conversation(id).ok().flatten().map(|c| c.display_name) + } + + /// Count total unread across all conversations. + pub fn total_unread(&self) -> u32 { + self.conv_store + .list_conversations() + .unwrap_or_default() + .iter() + .map(|c| c.unread_count) + .sum() + } +} diff --git a/crates/quicnprotochat-client/src/client/token_cache.rs b/crates/quicnprotochat-client/src/client/token_cache.rs new file mode 100644 index 0000000..7be4ee8 --- /dev/null +++ b/crates/quicnprotochat-client/src/client/token_cache.rs @@ -0,0 +1,86 @@ +//! Cached session token stored next to the state file. +//! +//! File format (no password): two lines — username and hex-encoded session token. +//! File format (with password): QPCE-encrypted version of the above. +//! The token has a server-side 24h TTL; no client-side expiry tracking. + +use std::path::{Path, PathBuf}; + +use anyhow::Context; + +use super::state::{decrypt_state, encrypt_state, is_encrypted_state}; + +pub struct CachedSession { + pub username: String, + pub token_hex: String, +} + +/// Derive the session cache path: `{state_path}.session`. +fn session_cache_path(state_path: &Path) -> PathBuf { + state_path.with_extension("session") +} + +/// Parse the two-line format (username + token_hex) from plaintext bytes. +fn parse_session_lines(text: &str) -> Option { + let mut lines = text.lines(); + let username = lines.next()?.trim().to_string(); + let token_hex = lines.next()?.trim().to_string(); + if username.is_empty() || token_hex.is_empty() { + return None; + } + if hex::decode(&token_hex).is_err() { + return None; + } + Some(CachedSession { username, token_hex }) +} + +/// Load a cached session token. Returns None if file is missing or malformed. +/// Decrypts if the file is QPCE-encrypted (requires `password`). +pub fn load_cached_session(state_path: &Path, password: Option<&str>) -> Option { + let path = session_cache_path(state_path); + let raw = std::fs::read(&path).ok()?; + + if is_encrypted_state(&raw) { + let pw = password?; + let plaintext = decrypt_state(pw, &raw).ok()?; + let text = String::from_utf8(plaintext).ok()?; + parse_session_lines(&text) + } else { + let text = String::from_utf8(raw).ok()?; + parse_session_lines(&text) + } +} + +/// Save a session token to the cache file (mode 0o600 on Unix). +/// Encrypts with QPCE if `password` is provided. +pub fn save_cached_session( + state_path: &Path, + username: &str, + token_hex: &str, + password: Option<&str>, +) -> anyhow::Result<()> { + let path = session_cache_path(state_path); + let contents = format!("{username}\n{token_hex}\n"); + + let bytes = match password { + Some(pw) => encrypt_state(pw, contents.as_bytes())?, + None => contents.into_bytes(), + }; + + std::fs::write(&path, bytes).with_context(|| format!("write session cache {path:?}"))?; + + #[cfg(unix)] + { + use std::os::unix::fs::PermissionsExt; + let perms = std::fs::Permissions::from_mode(0o600); + std::fs::set_permissions(&path, perms).ok(); + } + + Ok(()) +} + +/// Remove the cached session file. +pub fn clear_cached_session(state_path: &Path) { + let path = session_cache_path(state_path); + std::fs::remove_file(&path).ok(); +} diff --git a/crates/quicnprotochat-client/src/lib.rs b/crates/quicnprotochat-client/src/lib.rs index 9fcf7b9..6270ce1 100644 --- a/crates/quicnprotochat-client/src/lib.rs +++ b/crates/quicnprotochat-client/src/lib.rs @@ -14,7 +14,7 @@ //! commands. See the [running-the-client](https://docs.quicnprotochat.dev/getting-started/running-the-client) //! docs for details. -use std::sync::OnceLock; +use std::sync::RwLock; pub mod client; @@ -25,10 +25,11 @@ pub use client::commands::{ receive_pending_plaintexts, whoami_json, }; +pub use client::repl::run_repl; pub use client::rpc::{connect_node, enqueue, fetch_wait}; -// Global auth context initialized once per process. -pub(crate) static AUTH_CONTEXT: OnceLock = OnceLock::new(); +// Global auth context — RwLock so the REPL can set it after OPAQUE login. +pub(crate) static AUTH_CONTEXT: RwLock> = RwLock::new(None); #[derive(Clone, Debug)] pub struct ClientAuth { @@ -48,9 +49,20 @@ impl ClientAuth { device_id: device, } } + + /// Build from raw token bytes (e.g. a 32-byte OPAQUE session token). + pub fn from_raw(raw_token: Vec, device_id: Option) -> Self { + let device = device_id.unwrap_or_default().into_bytes(); + Self { + version: 1, + access_token: raw_token, + device_id: device, + } + } } -/// Initialize the global auth context; subsequent calls are ignored. +/// Set (or replace) the global auth context. pub fn init_auth(ctx: ClientAuth) { - let _ = AUTH_CONTEXT.set(ctx); + let mut guard = AUTH_CONTEXT.write().expect("AUTH_CONTEXT poisoned"); + *guard = Some(ctx); } diff --git a/crates/quicnprotochat-client/src/main.rs b/crates/quicnprotochat-client/src/main.rs index 617e885..028af05 100644 --- a/crates/quicnprotochat-client/src/main.rs +++ b/crates/quicnprotochat-client/src/main.rs @@ -7,7 +7,8 @@ use clap::{Parser, Subcommand}; use quicnprotochat_client::{ cmd_chat, cmd_check_key, cmd_create_group, cmd_demo_group, cmd_fetch_key, cmd_health, cmd_invite, cmd_join, cmd_login, cmd_ping, cmd_recv, cmd_register, cmd_register_state, - cmd_refresh_keypackage, cmd_register_user, cmd_send, cmd_whoami, init_auth, ClientAuth, + cmd_refresh_keypackage, cmd_register_user, cmd_send, cmd_whoami, init_auth, run_repl, + ClientAuth, }; // ── CLI ─────────────────────────────────────────────────────────────────────── @@ -266,6 +267,25 @@ enum Command { stream: bool, }, + /// Interactive multi-conversation REPL. Supports /dm, /create-group, /invite, /join, /switch, and more. + /// Automatically registers and logs in if --username/--password are provided (or prompts interactively). + Repl { + #[arg( + long, + default_value = "quicnprotochat-state.bin", + env = "QUICNPROTOCHAT_STATE" + )] + state: PathBuf, + #[arg(long, default_value = "127.0.0.1:7000", env = "QUICNPROTOCHAT_SERVER")] + server: String, + /// OPAQUE username for automatic registration/login. + #[arg(long, env = "QUICNPROTOCHAT_USERNAME")] + username: Option, + /// OPAQUE password (prompted securely if --username is set but --password is not). + #[arg(long, env = "QUICNPROTOCHAT_PASSWORD")] + password: Option, + }, + /// Interactive 1:1 chat: type to send, incoming messages printed as [peer] . Ctrl+D to exit. /// In a two-person group, peer is chosen automatically; use --peer-key only with 3+ members. Chat { @@ -290,6 +310,9 @@ enum Command { #[tokio::main] async fn main() -> anyhow::Result<()> { + // Install the rustls crypto provider before any TLS operations. + let _ = rustls::crypto::ring::default_provider().install_default(); + tracing_subscriber::fmt() .with_env_filter( tracing_subscriber::EnvFilter::try_from_default_env() @@ -299,9 +322,13 @@ async fn main() -> anyhow::Result<()> { let args = Args::parse(); - // Initialize auth context once for all RPCs (empty token OK for register-user/login). - let auth_ctx = ClientAuth::from_parts(args.access_token.clone(), args.device_id.clone()); - init_auth(auth_ctx); + // For the REPL, defer init_auth so it can resolve its own token via OPAQUE. + // For all other subcommands, initialize auth immediately. + let is_repl = matches!(args.command, Command::Repl { .. }); + if !is_repl { + let auth_ctx = ClientAuth::from_parts(args.access_token.clone(), args.device_id.clone()); + init_auth(auth_ctx); + } let state_pw = args.state_password.as_deref(); @@ -496,6 +523,27 @@ async fn main() -> anyhow::Result<()> { )) .await } + Command::Repl { + state, + server, + username, + password, + } => { + let local = tokio::task::LocalSet::new(); + local + .run_until(run_repl( + &state, + &server, + &args.ca_cert, + &args.server_name, + state_pw, + username.as_deref(), + password.as_deref(), + &args.access_token, + args.device_id.as_deref(), + )) + .await + } Command::Chat { state, server, diff --git a/crates/quicnprotochat-server/Cargo.toml b/crates/quicnprotochat-server/Cargo.toml index b48f5ac..c759a15 100644 --- a/crates/quicnprotochat-server/Cargo.toml +++ b/crates/quicnprotochat-server/Cargo.toml @@ -36,6 +36,7 @@ rcgen = { workspace = true } opaque-ke = { workspace = true } rand = { workspace = true } subtle = { workspace = true } +zeroize = { workspace = true } # Database rusqlite = { workspace = true } diff --git a/crates/quicnprotochat-server/src/auth.rs b/crates/quicnprotochat-server/src/auth.rs index 0ce284e..da09917 100644 --- a/crates/quicnprotochat-server/src/auth.rs +++ b/crates/quicnprotochat-server/src/auth.rs @@ -1,3 +1,4 @@ +use std::net::IpAddr; use std::sync::Arc; use dashmap::DashMap; @@ -5,6 +6,7 @@ use quicnprotochat_proto::node_capnp::auth; use sha2::Digest; use subtle::ConstantTimeEq; use tokio::sync::Notify; +use zeroize::Zeroizing; use crate::error_codes::*; @@ -13,19 +15,29 @@ pub const PENDING_LOGIN_TTL_SECS: u64 = 300; // 5 minutes pub const RATE_LIMIT_WINDOW_SECS: u64 = 60; pub const RATE_LIMIT_MAX_ENQUEUES: u32 = 100; -#[derive(Clone, Debug)] +#[derive(Clone)] pub struct AuthConfig { - pub required_token: Option>, + /// Server bearer token — zeroized on drop to prevent memory disclosure. + pub required_token: Option>>, /// When true, a valid bearer token (no session) is accepted and the request's identity/key is used (dev/e2e only). /// CLI flag: --allow-insecure-auth / QUICNPROTOCHAT_ALLOW_INSECURE_AUTH. pub allow_insecure_identity_from_request: bool, } +impl std::fmt::Debug for AuthConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("AuthConfig") + .field("required_token", &self.required_token.as_ref().map(|_| "[REDACTED]")) + .field("allow_insecure_identity_from_request", &self.allow_insecure_identity_from_request) + .finish() + } +} + impl AuthConfig { pub fn new(required_token: Option, allow_insecure_identity_from_request: bool) -> Self { let required_token = required_token .filter(|s| !s.is_empty()) - .map(|s| s.into_bytes()); + .map(|s| Zeroizing::new(s.into_bytes())); Self { required_token, allow_insecure_identity_from_request, @@ -133,7 +145,7 @@ pub fn validate_auth_context( } if let Some(expected) = &cfg.required_token { - if expected.len() == token.len() && bool::from(expected.ct_eq(&token)) { + if expected.len() == token.len() && bool::from(expected.as_slice().ct_eq(&token)) { return Ok(AuthContext { token, identity_key: None, @@ -216,6 +228,30 @@ pub fn waiter(waiters: &DashMap, Arc>, recipient_key: &[u8]) -> .clone() } +pub const CONN_RATE_LIMIT_WINDOW_SECS: u64 = 60; +pub const CONN_RATE_LIMIT_MAX: u32 = 50; + +/// Per-IP connection rate limiter. Returns `true` if the connection is allowed. +pub fn check_conn_rate_limit( + conn_rate_limits: &DashMap, + ip: IpAddr, +) -> bool { + let now = current_timestamp(); + let mut entry = conn_rate_limits.entry(ip).or_insert(RateEntry { + count: 0, + window_start: now, + }); + + if now - entry.window_start >= CONN_RATE_LIMIT_WINDOW_SECS { + entry.count = 1; + entry.window_start = now; + true + } else { + entry.count += 1; + entry.count <= CONN_RATE_LIMIT_MAX + } +} + pub fn fingerprint(data: &[u8]) -> Vec { sha2::Sha256::digest(data).to_vec() } diff --git a/crates/quicnprotochat-server/src/config.rs b/crates/quicnprotochat-server/src/config.rs index b4feaa7..999eb52 100644 --- a/crates/quicnprotochat-server/src/config.rs +++ b/crates/quicnprotochat-server/src/config.rs @@ -163,6 +163,9 @@ pub fn merge_config(args: &crate::Args, file: &FileConfig) -> EffectiveConfig { } pub fn validate_production_config(effective: &EffectiveConfig) -> anyhow::Result<()> { + if effective.allow_insecure_auth { + anyhow::bail!("production forbids --allow-insecure-auth"); + } let token = effective .auth_token .as_deref() @@ -178,6 +181,12 @@ pub fn validate_production_config(effective: &EffectiveConfig) -> anyhow::Result if effective.store_backend == "sql" && effective.db_key.is_empty() { anyhow::bail!("production with store_backend=sql requires non-empty QUICNPROTOCHAT_DB_KEY"); } + if effective.store_backend != "sql" { + tracing::warn!( + "production is using file-backed storage; \ + consider store_backend=sql with QUICNPROTOCHAT_DB_KEY for encryption at rest" + ); + } if !effective.tls_cert.exists() || !effective.tls_key.exists() { anyhow::bail!( "production requires existing TLS cert and key (no auto-generation); provide QUICNPROTOCHAT_TLS_CERT and QUICNPROTOCHAT_TLS_KEY" diff --git a/crates/quicnprotochat-server/src/main.rs b/crates/quicnprotochat-server/src/main.rs index 2c8545a..7336f40 100644 --- a/crates/quicnprotochat-server/src/main.rs +++ b/crates/quicnprotochat-server/src/main.rs @@ -2,7 +2,7 @@ //! //! The server hosts Authentication + Delivery services over QUIC + Cap'n Proto. -use std::{net::SocketAddr, path::PathBuf, sync::Arc}; +use std::{net::IpAddr, net::SocketAddr, path::PathBuf, sync::Arc}; use anyhow::Context; use clap::Parser; @@ -167,7 +167,11 @@ async fn main() -> anyhow::Result<()> { // Harden QUIC transport: idle timeout, limit stream concurrency. let mut transport = quinn::TransportConfig::default(); - transport.max_idle_timeout(Some(std::time::Duration::from_secs(300).try_into().unwrap())); + transport.max_idle_timeout(Some( + std::time::Duration::from_secs(300) + .try_into() + .expect("300s is a valid IdleTimeout"), + )); transport.max_concurrent_bidi_streams(1u32.into()); transport.max_concurrent_uni_streams(0u32.into()); server_config.transport_config(Arc::new(transport)); @@ -223,12 +227,14 @@ async fn main() -> anyhow::Result<()> { let pending_logins: Arc> = Arc::new(DashMap::new()); let sessions: Arc, SessionInfo>> = Arc::new(DashMap::new()); let rate_limits: Arc, RateEntry>> = Arc::new(DashMap::new()); + let conn_rate_limits: Arc> = Arc::new(DashMap::new()); // Background cleanup task (expire sessions, pending logins, rate limits, and stale messages). spawn_cleanup_task( Arc::clone(&sessions), Arc::clone(&pending_logins), Arc::clone(&rate_limits), + Arc::clone(&conn_rate_limits), Arc::clone(&store), Arc::clone(&waiters), ); @@ -254,6 +260,14 @@ async fn main() -> anyhow::Result<()> { None => break, }; + // Per-IP connection rate limiting. + let remote_ip = incoming.remote_address().ip(); + if !auth::check_conn_rate_limit(&conn_rate_limits, remote_ip) { + tracing::warn!(ip = %remote_ip, "connection rate limit exceeded, dropping"); + incoming.refuse(); + continue; + } + let connecting = match incoming.accept() { Ok(c) => c, Err(e) => { @@ -298,6 +312,10 @@ async fn main() -> anyhow::Result<()> { } } + // Grace period: let in-flight RPC tasks on the LocalSet finish. + tracing::info!("waiting up to 5s for in-flight RPCs to complete"); + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + Ok::<(), anyhow::Error>(()) }) .await?; diff --git a/crates/quicnprotochat-server/src/node_service/delivery.rs b/crates/quicnprotochat-server/src/node_service/delivery.rs index 040fa9c..4d17dbb 100644 --- a/crates/quicnprotochat-server/src/node_service/delivery.rs +++ b/crates/quicnprotochat-server/src/node_service/delivery.rs @@ -92,12 +92,10 @@ impl NodeServiceImpl { } // When sealed_sender is true, enqueue does not require identity; valid token only. + // Otherwise, the sender must have an identity-bound session (but their identity + // does NOT need to match the recipient — they're sending TO the recipient). if !self.sealed_sender { - if let Err(e) = require_identity_or_request( - &auth_ctx, - &recipient_key, - self.auth_cfg.allow_insecure_identity_from_request, - ) { + if let Err(e) = crate::auth::require_identity(&auth_ctx) { return Promise::err(e); } } @@ -563,6 +561,36 @@ impl NodeServiceImpl { return Promise::err(e); } + // When sealed_sender is false, require an identity-bound session. + if !self.sealed_sender { + if let Err(e) = crate::auth::require_identity(&auth_ctx) { + return Promise::err(e); + } + } + + // DM channel authz: validate caller membership once before the loop. + if channel_id.len() == 16 { + let members = match self.store.get_channel_members(&channel_id) { + Ok(Some(m)) => m, + Ok(None) => { + return Promise::err(coded_error(E023_CHANNEL_NOT_FOUND, "channel not found")); + } + Err(e) => return Promise::err(storage_err(e)), + }; + let caller = match crate::auth::require_identity(&auth_ctx) { + Ok(id) => id, + Err(e) => return Promise::err(e), + }; + let (a, b) = &members; + let caller_in = caller == a.as_slice() || caller == b.as_slice(); + if !caller_in { + return Promise::err(coded_error( + E022_CHANNEL_ACCESS_DENIED, + "caller is not a member of this channel", + )); + } + } + let mut seqs = Vec::with_capacity(recipient_keys.len() as usize); for i in 0..recipient_keys.len() { let rk = match recipient_keys.get(i) { @@ -576,6 +604,33 @@ impl NodeServiceImpl { )); } + // Per-recipient DM channel membership check. + if channel_id.len() == 16 { + let members = match self.store.get_channel_members(&channel_id) { + Ok(Some(m)) => m, + Ok(None) => { + return Promise::err(coded_error( + E023_CHANNEL_NOT_FOUND, + "channel not found", + )); + } + Err(e) => return Promise::err(storage_err(e)), + }; + let caller = match crate::auth::require_identity(&auth_ctx) { + Ok(id) => id, + Err(e) => return Promise::err(e), + }; + let (a, b) = &members; + let recipient_other = (rk == *a && caller == b.as_slice()) + || (rk == *b && caller == a.as_slice()); + if !recipient_other { + return Promise::err(coded_error( + E022_CHANNEL_ACCESS_DENIED, + "recipient is not a member of this channel", + )); + } + } + match self.store.queue_depth(&rk, &channel_id) { Ok(depth) if depth >= MAX_QUEUE_DEPTH => { return Promise::err(coded_error( diff --git a/crates/quicnprotochat-server/src/node_service/key_ops.rs b/crates/quicnprotochat-server/src/node_service/key_ops.rs index d024d97..8ff65c5 100644 --- a/crates/quicnprotochat-server/src/node_service/key_ops.rs +++ b/crates/quicnprotochat-server/src/node_service/key_ops.rs @@ -214,10 +214,10 @@ impl NodeServiceImpl { Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)), }; - let auth_ctx = match validate_auth_context(&self.auth_cfg, &self.sessions, p.get_auth()) { - Ok(ctx) => ctx, - Err(e) => return Promise::err(e), - }; + // Auth check only — any authenticated user can fetch any peer's hybrid public key. + if let Err(e) = validate_auth_context(&self.auth_cfg, &self.sessions, p.get_auth()) { + return Promise::err(e); + } if identity_key.len() != 32 { return Promise::err(coded_error( @@ -226,14 +226,6 @@ impl NodeServiceImpl { )); } - if let Err(e) = require_identity_or_request( - &auth_ctx, - &identity_key, - self.auth_cfg.allow_insecure_identity_from_request, - ) { - return Promise::err(e); - } - let hybrid_pk = match self .store .fetch_hybrid_key(&identity_key) diff --git a/crates/quicnprotochat-server/src/node_service/mod.rs b/crates/quicnprotochat-server/src/node_service/mod.rs index 8f36421..3b83e7d 100644 --- a/crates/quicnprotochat-server/src/node_service/mod.rs +++ b/crates/quicnprotochat-server/src/node_service/mod.rs @@ -23,6 +23,7 @@ mod channel_ops; mod delivery; mod key_ops; mod p2p_ops; +mod user_ops; impl node_service::Server for NodeServiceImpl { fn upload_key_package( @@ -176,6 +177,22 @@ impl node_service::Server for NodeServiceImpl { ) -> capnp::capability::Promise<(), capnp::Error> { self.handle_create_channel(params, results) } + + fn resolve_user( + &mut self, + params: node_service::ResolveUserParams, + results: node_service::ResolveUserResults, + ) -> capnp::capability::Promise<(), capnp::Error> { + self.handle_resolve_user(params, results) + } + + fn resolve_identity( + &mut self, + params: node_service::ResolveIdentityParams, + results: node_service::ResolveIdentityResults, + ) -> capnp::capability::Promise<(), capnp::Error> { + self.handle_resolve_identity(params, results) + } } pub const CURRENT_WIRE_VERSION: u16 = 1; @@ -268,6 +285,7 @@ pub fn spawn_cleanup_task( sessions: Arc, SessionInfo>>, pending_logins: Arc>, rate_limits: Arc, RateEntry>>, + conn_rate_limits: Arc>, store: Arc, waiters: Arc, Arc>>, ) { @@ -280,6 +298,9 @@ pub fn spawn_cleanup_task( sessions.retain(|_, info| info.expires_at > now); pending_logins.retain(|_, pl| now - pl.created_at < PENDING_LOGIN_TTL_SECS); rate_limits.retain(|_, entry| now - entry.window_start < RATE_LIMIT_WINDOW_SECS * 2); + conn_rate_limits.retain(|_, entry| { + now - entry.window_start < crate::auth::CONN_RATE_LIMIT_WINDOW_SECS * 2 + }); // Bound map sizes to prevent unbounded growth from malicious clients. const MAX_SESSIONS: usize = 100_000; diff --git a/crates/quicnprotochat-server/src/node_service/user_ops.rs b/crates/quicnprotochat-server/src/node_service/user_ops.rs new file mode 100644 index 0000000..256109e --- /dev/null +++ b/crates/quicnprotochat-server/src/node_service/user_ops.rs @@ -0,0 +1,94 @@ +//! resolveUser / resolveIdentity RPCs: bidirectional username ↔ identity key lookup. + +use capnp::capability::Promise; +use quicnprotochat_proto::node_capnp::node_service; + +use crate::auth::{coded_error, validate_auth_context}; +use crate::error_codes::*; +use crate::storage::StorageError; + +use super::NodeServiceImpl; + +fn storage_err(err: StorageError) -> capnp::Error { + coded_error(E009_STORAGE_ERROR, err) +} + +impl NodeServiceImpl { + pub fn handle_resolve_user( + &mut self, + params: node_service::ResolveUserParams, + mut results: node_service::ResolveUserResults, + ) -> Promise<(), capnp::Error> { + let p = match params.get() { + Ok(p) => p, + Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)), + }; + let username = match p.get_username() { + Ok(u) => u, + Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)), + }; + let _auth_ctx = match validate_auth_context(&self.auth_cfg, &self.sessions, p.get_auth()) { + Ok(ctx) => ctx, + Err(e) => return Promise::err(e), + }; + + let username_str = match username.to_str() { + Ok(s) => s, + Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)), + }; + + if username_str.is_empty() { + return Promise::err(coded_error(E020_BAD_PARAMS, "username must not be empty")); + } + + match self.store.get_user_identity_key(username_str) { + Ok(Some(key)) => { + results.get().set_identity_key(&key); + } + Ok(None) => { + // Return empty Data — caller checks length to detect "not found". + } + Err(e) => return Promise::err(storage_err(e)), + } + + Promise::ok(()) + } + + pub fn handle_resolve_identity( + &mut self, + params: node_service::ResolveIdentityParams, + mut results: node_service::ResolveIdentityResults, + ) -> Promise<(), capnp::Error> { + let p = match params.get() { + Ok(p) => p, + Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)), + }; + let identity_key = match p.get_identity_key() { + Ok(v) => v, + Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)), + }; + let _auth_ctx = match validate_auth_context(&self.auth_cfg, &self.sessions, p.get_auth()) { + Ok(ctx) => ctx, + Err(e) => return Promise::err(e), + }; + + if identity_key.len() != 32 { + return Promise::err(coded_error( + E004_IDENTITY_KEY_LENGTH, + format!("identityKey must be exactly 32 bytes, got {}", identity_key.len()), + )); + } + + match self.store.resolve_identity_key(identity_key) { + Ok(Some(username)) => { + results.get().set_username(&username); + } + Ok(None) => { + // Return empty string — caller checks length to detect "not found". + } + Err(e) => return Promise::err(storage_err(e)), + } + + Promise::ok(()) + } +} diff --git a/crates/quicnprotochat-server/src/sql_store.rs b/crates/quicnprotochat-server/src/sql_store.rs index 8a62155..8722b33 100644 --- a/crates/quicnprotochat-server/src/sql_store.rs +++ b/crates/quicnprotochat-server/src/sql_store.rs @@ -369,6 +369,17 @@ impl Store for SqlStore { .map_err(|e| StorageError::Db(e.to_string())) } + fn resolve_identity_key(&self, identity_key: &[u8]) -> Result, StorageError> { + let conn = self.lock_conn()?; + let mut stmt = conn + .prepare("SELECT username FROM user_identity_keys WHERE identity_key = ?1") + .map_err(|e| StorageError::Db(e.to_string()))?; + + stmt.query_row(params![identity_key], |row| row.get(0)) + .optional() + .map_err(|e| StorageError::Db(e.to_string())) + } + fn peek( &self, recipient_key: &[u8], diff --git a/crates/quicnprotochat-server/src/storage.rs b/crates/quicnprotochat-server/src/storage.rs index f70c99a..3784e85 100644 --- a/crates/quicnprotochat-server/src/storage.rs +++ b/crates/quicnprotochat-server/src/storage.rs @@ -100,6 +100,9 @@ pub trait Store: Send + Sync { /// Retrieve identity key for a user (Fix 2). fn get_user_identity_key(&self, username: &str) -> Result>, StorageError>; + /// Reverse lookup: resolve an identity key to the registered username. + fn resolve_identity_key(&self, identity_key: &[u8]) -> Result, StorageError>; + /// Peek at queued messages without removing them (non-destructive). /// Returns `(seq, payload)` pairs ordered by seq. fn peek( @@ -546,6 +549,16 @@ impl Store for FileBackedStore { Ok(map.get(username).cloned()) } + fn resolve_identity_key(&self, identity_key: &[u8]) -> Result, StorageError> { + let map = lock(&self.identity_keys)?; + for (username, ik) in map.iter() { + if ik.as_slice() == identity_key { + return Ok(Some(username.clone())); + } + } + Ok(None) + } + fn peek( &self, recipient_key: &[u8], diff --git a/schemas/node.capnp b/schemas/node.capnp index 54645f3..7bdbea7 100644 --- a/schemas/node.capnp +++ b/schemas/node.capnp @@ -83,6 +83,14 @@ interface NodeService { # Create a 1:1 channel between the caller and the given peer. Returns a 16-byte channelId (UUID). # Both members can enqueue/fetch for this channel; recipientKey must be the other member. createChannel @18 (peerKey :Data, auth :Auth) -> (channelId :Data); + + # Resolve a username to its Ed25519 identity key (32 bytes). + # Returns empty Data if the username is not registered. + resolveUser @19 (username :Text, auth :Auth) -> (identityKey :Data); + + # Reverse lookup: resolve an Ed25519 identity key to the registered username. + # Returns empty Text if the identity key is not associated with any user. + resolveIdentity @20 (identityKey :Data, auth :Auth) -> (username :Text); } struct Auth {