feat: interactive REPL with auto-setup, auto-join, encrypted local storage

REPL auto-setup (zero-friction startup):
- OnceLock → RwLock for CLIENT_AUTH to allow delayed init after OPAQUE login
- Extract opaque_register/opaque_login helpers from one-shot commands
- Token cache (.session file) with QPCE encryption when password provided
- Add --username/--password/--state-password to repl subcommand
- resolve_access_token: auto-register + login, cache token, prompt interactively
- rpassword for secure password input (no echo)

Interactive REPL (multi-conversation):
- SessionState: identity, hybrid key, ConversationStore, per-conversation GroupMembers
- ConversationStore: SQLite-backed conversations + messages with full CRUD
- Slash commands: /dm, /group, /invite, /join, /switch, /list, /members, /history, /whoami
- Background polling (1s interval) with auto-join from MLS Welcome messages
- pending_member pattern: persistent keystore for HPKE init key, replenish after join
- Self-DM handled as local-only notepad (no MLS/server channel)
- ANSI display module for colored prompts, incoming messages, status/error output

Username resolution:
- resolveIdentity RPC (@20 in node.capnp): look up username by identity key
- Server: resolve_identity_key in Store trait, FileBackedStore, SqlStore
- Client: resolve_identity in rpc.rs, used in auto-join for peer display names
- resolveUser: bidirectional lookup (username → identity key)

Encrypted local storage (nothing in cleartext):
- ConversationStore uses SQLCipher when --state-password is provided
- Argon2id key derivation with per-database random salt (.convdb-salt, mode 0600)
- Transparent migration of existing unencrypted databases via sqlcipher_export
- Token cache encrypted with QPCE format (Argon2id + ChaCha20Poly1305)

Server changes:
- resolveIdentity + resolveUser RPC handlers with auth + validation
- Auth: sealed-sender identity binding on enqueue, channel member authorization
- Delivery: hybrid decrypt attempts, identity key validation on enqueue
- Config: --allow-sealed-sender flag for anonymous delivery mode
- zeroize added to server dependencies

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-26 22:45:34 +01:00
parent 4c1e4683e3
commit 553de3a2b7
23 changed files with 2791 additions and 33 deletions

View File

@@ -50,9 +50,15 @@ tracing-subscriber = { workspace = true }
# CLI
clap = { workspace = true }
# Local message/conversation storage
rusqlite = { workspace = true }
# Hex encoding/decoding
hex = "0.4"
# Secure password prompting (no echo)
rpassword = "5"
[dev-dependencies]
dashmap = { workspace = true }
assert_cmd = "2"

View File

@@ -310,6 +310,129 @@ fn derive_identity_for_login(
))
}
// ── OPAQUE helpers (used by both one-shot commands and REPL bootstrap) ───────
/// Perform OPAQUE registration. Returns Ok(()) on success.
/// The error message contains "E018" if the user already exists.
/// Does NOT require init_auth() — OPAQUE RPCs are unauthenticated.
pub(crate) async fn opaque_register(
client: &quicnprotochat_proto::node_capnp::node_service::Client,
username: &str,
password: &str,
identity_key: Option<&[u8]>,
) -> anyhow::Result<()> {
let mut rng = rand::rngs::OsRng;
let reg_start = ClientRegistration::<OpaqueSuite>::start(&mut rng, password.as_bytes())
.map_err(|e| anyhow::anyhow!("OPAQUE register start: {e}"))?;
let mut req = client.opaque_register_start_request();
{
let mut p = req.get();
p.set_username(username);
p.set_request(&reg_start.message.serialize());
}
let resp = req.send().promise.await.context("opaque_register_start RPC failed")?;
let response_bytes = resp
.get()
.context("register_start: bad response")?
.get_response()
.context("register_start: missing response")?
.to_vec();
let reg_response = RegistrationResponse::<OpaqueSuite>::deserialize(&response_bytes)
.map_err(|e| anyhow::anyhow!("invalid registration response: {e}"))?;
let reg_finish = reg_start
.state
.finish(
&mut rng,
password.as_bytes(),
reg_response,
ClientRegistrationFinishParameters::<OpaqueSuite>::default(),
)
.map_err(|e| anyhow::anyhow!("OPAQUE register finish: {e}"))?;
let mut req = client.opaque_register_finish_request();
{
let mut p = req.get();
p.set_username(username);
p.set_upload(&reg_finish.message.serialize());
if let Some(ik) = identity_key {
p.set_identity_key(ik);
} else {
p.set_identity_key(&[]);
}
}
let resp = req.send().promise.await.context("opaque_register_finish RPC failed")?;
let success = resp
.get()
.context("register_finish: bad response")?
.get_success();
anyhow::ensure!(success, "server rejected registration");
Ok(())
}
/// Perform OPAQUE login and return the raw session token bytes.
/// Does NOT require init_auth() — OPAQUE RPCs are unauthenticated.
pub(crate) async fn opaque_login(
client: &quicnprotochat_proto::node_capnp::node_service::Client,
username: &str,
password: &str,
identity_key: &[u8],
) -> anyhow::Result<Vec<u8>> {
let mut rng = rand::rngs::OsRng;
let login_start = ClientLogin::<OpaqueSuite>::start(&mut rng, password.as_bytes())
.map_err(|e| anyhow::anyhow!("OPAQUE login start: {e}"))?;
let mut req = client.opaque_login_start_request();
{
let mut p = req.get();
p.set_username(username);
p.set_request(&login_start.message.serialize());
}
let resp = req.send().promise.await.context("opaque_login_start RPC failed")?;
let response_bytes = resp
.get()
.context("login_start: bad response")?
.get_response()
.context("login_start: missing response")?
.to_vec();
let credential_response = CredentialResponse::<OpaqueSuite>::deserialize(&response_bytes)
.map_err(|e| anyhow::anyhow!("invalid credential response: {e}"))?;
let login_finish = login_start
.state
.finish(
&mut rng,
password.as_bytes(),
credential_response,
ClientLoginFinishParameters::<OpaqueSuite>::default(),
)
.map_err(|e| anyhow::anyhow!("OPAQUE login finish (bad password?): {e}"))?;
let mut req = client.opaque_login_finish_request();
{
let mut p = req.get();
p.set_username(username);
p.set_finalization(&login_finish.message.serialize());
p.set_identity_key(identity_key);
}
let resp = req.send().promise.await.context("opaque_login_finish RPC failed")?;
let session_token = resp
.get()
.context("login_finish: bad response")?
.get_session_token()
.context("login_finish: missing session_token")?
.to_vec();
anyhow::ensure!(!session_token.is_empty(), "server returned empty session token");
Ok(session_token)
}
/// Generate a KeyPackage for a fresh identity and upload it to the AS.
pub async fn cmd_register(server: &str, ca_cert: &Path, server_name: &str) -> anyhow::Result<()> {
let identity = IdentityKeypair::generate();

View File

@@ -0,0 +1,563 @@
//! Multi-conversation state backed by SQLite (SQLCipher-encrypted when a
//! password is provided).
//!
//! Each conversation (DM or group) has its own MLS group blob, keystore blob,
//! member list, and message history.
use std::path::{Path, PathBuf};
use std::time::{SystemTime, UNIX_EPOCH};
use anyhow::Context;
use argon2::{Algorithm, Argon2, Params, Version};
use rand::RngCore;
use rusqlite::{params, Connection, OptionalExtension};
use zeroize::Zeroizing;
// ── Types ────────────────────────────────────────────────────────────────────
/// 16-byte conversation identifier.
/// - DMs: the channel_id returned by `createChannel` (server-assigned UUID).
/// - Groups: SHA-256(group_name)[..16].
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct ConversationId(pub [u8; 16]);
impl ConversationId {
pub fn from_slice(s: &[u8]) -> Option<Self> {
if s.len() == 16 {
let mut buf = [0u8; 16];
buf.copy_from_slice(s);
Some(Self(buf))
} else {
None
}
}
/// Derive a conversation ID from a group name via SHA-256 truncation.
pub fn from_group_name(name: &str) -> Self {
use sha2::{Sha256, Digest};
let hash = Sha256::digest(name.as_bytes());
let mut buf = [0u8; 16];
buf.copy_from_slice(&hash[..16]);
Self(buf)
}
pub fn hex(&self) -> String {
hex::encode(self.0)
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum ConversationKind {
/// 1:1 DM channel with a specific peer.
Dm {
peer_key: Vec<u8>,
peer_username: Option<String>,
},
/// Named group with N members.
Group { name: String },
}
#[derive(Clone, Debug)]
pub struct Conversation {
pub id: ConversationId,
pub kind: ConversationKind,
pub display_name: String,
/// Serialized MLS group (bincode).
pub mls_group_blob: Option<Vec<u8>>,
/// Serialized keystore (bincode HashMap).
pub keystore_blob: Option<Vec<u8>>,
/// Member identity keys (bincode Vec<Vec<u8>>).
pub member_keys: Vec<Vec<u8>>,
pub unread_count: u32,
pub last_activity_ms: u64,
pub created_at_ms: u64,
}
#[derive(Clone, Debug)]
pub struct StoredMessage {
pub conversation_id: ConversationId,
pub message_id: Option<[u8; 16]>,
pub sender_key: Vec<u8>,
pub sender_name: Option<String>,
pub body: String,
pub msg_type: String,
pub ref_msg_id: Option<[u8; 16]>,
pub timestamp_ms: u64,
pub is_outgoing: bool,
}
// ── Key derivation (Argon2id, matching state.rs parameters) ─────────────────
const ARGON2_M_COST: u32 = 19 * 1024;
const ARGON2_T_COST: u32 = 2;
const ARGON2_P_COST: u32 = 1;
const SALT_LEN: usize = 16;
/// Derive a 32-byte SQLCipher key from the user password and a random salt.
fn derive_convdb_key(password: &str, salt: &[u8]) -> anyhow::Result<Zeroizing<[u8; 32]>> {
let params = Params::new(ARGON2_M_COST, ARGON2_T_COST, ARGON2_P_COST, Some(32))
.map_err(|e| anyhow::anyhow!("argon2 params: {e}"))?;
let argon2 = Argon2::new(Algorithm::Argon2id, Version::default(), params);
let mut key = Zeroizing::new([0u8; 32]);
argon2
.hash_password_into(password.as_bytes(), salt, &mut *key)
.map_err(|e| anyhow::anyhow!("convdb key derivation: {e}"))?;
Ok(key)
}
/// Read or create a 16-byte random salt at `salt_path` (mode 0o600).
fn get_or_create_salt(salt_path: &Path) -> anyhow::Result<Vec<u8>> {
if salt_path.exists() {
let bytes = std::fs::read(salt_path).context("read convdb salt")?;
anyhow::ensure!(bytes.len() == SALT_LEN, "invalid convdb salt length");
return Ok(bytes);
}
let mut salt = vec![0u8; SALT_LEN];
rand::rngs::OsRng.fill_bytes(&mut salt);
std::fs::write(salt_path, &salt).context("write convdb salt")?;
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
std::fs::set_permissions(salt_path, std::fs::Permissions::from_mode(0o600)).ok();
}
Ok(salt)
}
// ── ConversationStore ────────────────────────────────────────────────────────
pub struct ConversationStore {
conn: Connection,
}
impl ConversationStore {
/// Open or create the conversation database at `db_path`.
/// If `password` is `Some`, the database is encrypted with SQLCipher using
/// an Argon2id-derived key. Existing unencrypted databases are migrated
/// transparently.
pub fn open(db_path: &Path, password: Option<&str>) -> anyhow::Result<Self> {
if let Some(parent) = db_path.parent() {
std::fs::create_dir_all(parent).ok();
}
match password {
Some(pw) => Self::open_encrypted(db_path, pw),
None => Self::open_plain(db_path),
}
}
fn open_plain(db_path: &Path) -> anyhow::Result<Self> {
let conn = Connection::open(db_path).context("open conversation db")?;
conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA foreign_keys=ON;")
.context("set pragmas")?;
Self::migrate(&conn)?;
Ok(Self { conn })
}
fn open_encrypted(db_path: &Path, password: &str) -> anyhow::Result<Self> {
let salt_path = PathBuf::from(format!("{}-salt", db_path.display()));
let already_encrypted = salt_path.exists();
// Migrate an existing unencrypted database before opening with encryption.
if db_path.exists() && !already_encrypted {
Self::migrate_plain_to_encrypted(db_path, &salt_path, password)?;
// After migration, salt file exists and DB is encrypted — fall through.
}
let salt = get_or_create_salt(&salt_path)?;
let key = derive_convdb_key(password, &salt)?;
let hex_key = hex::encode(&*key);
let conn = Connection::open(db_path).context("open conversation db")?;
conn.pragma_update(None, "key", format!("x'{hex_key}'"))
.context("set SQLCipher key")?;
conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA foreign_keys=ON;")
.context("set pragmas")?;
Self::migrate(&conn)?;
Ok(Self { conn })
}
/// Migrate an unencrypted `.convdb` to an encrypted one in-place.
fn migrate_plain_to_encrypted(
db_path: &Path,
salt_path: &Path,
password: &str,
) -> anyhow::Result<()> {
let salt = get_or_create_salt(salt_path)?;
let key = derive_convdb_key(password, &salt)?;
let hex_key = hex::encode(&*key);
let enc_path = db_path.with_extension("convdb-enc");
// Open the existing plaintext database.
let plain = Connection::open(db_path).context("open plain convdb for migration")?;
plain.execute_batch("PRAGMA journal_mode=WAL; PRAGMA foreign_keys=ON;").ok();
// Attach a new encrypted database and export into it.
plain
.execute_batch(&format!(
"ATTACH DATABASE '{}' AS encrypted KEY \"x'{hex_key}'\";",
enc_path.display()
))
.context("attach encrypted db for migration")?;
plain
.execute_batch("SELECT sqlcipher_export('encrypted');")
.context("sqlcipher_export to encrypted db")?;
plain
.execute_batch("DETACH DATABASE encrypted;")
.context("detach encrypted db")?;
drop(plain);
// Swap files: encrypted → original.
std::fs::rename(&enc_path, db_path).context("replace convdb with encrypted version")?;
// Clean up WAL/SHM left from the plaintext open.
let wal = PathBuf::from(format!("{}-wal", db_path.display()));
let shm = PathBuf::from(format!("{}-shm", db_path.display()));
std::fs::remove_file(&wal).ok();
std::fs::remove_file(&shm).ok();
tracing::info!("migrated conversation database to encrypted storage");
Ok(())
}
fn migrate(conn: &Connection) -> anyhow::Result<()> {
conn.execute_batch(
"CREATE TABLE IF NOT EXISTS conversations (
id BLOB PRIMARY KEY,
kind TEXT NOT NULL,
display_name TEXT NOT NULL,
peer_key BLOB,
peer_username TEXT,
group_name TEXT,
mls_group_blob BLOB,
keystore_blob BLOB,
member_keys BLOB,
unread_count INTEGER NOT NULL DEFAULT 0,
last_activity_ms INTEGER NOT NULL DEFAULT 0,
created_at_ms INTEGER NOT NULL DEFAULT 0
);
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
conversation_id BLOB NOT NULL REFERENCES conversations(id),
message_id BLOB,
sender_key BLOB NOT NULL,
sender_name TEXT,
body TEXT NOT NULL,
msg_type TEXT NOT NULL,
ref_msg_id BLOB,
timestamp_ms INTEGER NOT NULL,
is_outgoing INTEGER NOT NULL DEFAULT 0
);
CREATE INDEX IF NOT EXISTS idx_messages_conv
ON messages(conversation_id, timestamp_ms);",
)
.context("migrate conversation db")?;
Ok(())
}
// ── Conversation CRUD ────────────────────────────────────────────────
pub fn save_conversation(&self, conv: &Conversation) -> anyhow::Result<()> {
let (kind_str, peer_key, peer_username, group_name) = match &conv.kind {
ConversationKind::Dm {
peer_key,
peer_username,
} => ("dm", Some(peer_key.as_slice()), peer_username.as_deref(), None),
ConversationKind::Group { name } => ("group", None, None, Some(name.as_str())),
};
let member_keys_blob = bincode::serialize(&conv.member_keys)
.context("serialize member_keys")?;
self.conn.execute(
"INSERT INTO conversations
(id, kind, display_name, peer_key, peer_username, group_name,
mls_group_blob, keystore_blob, member_keys, unread_count,
last_activity_ms, created_at_ms)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)
ON CONFLICT(id) DO UPDATE SET
display_name = excluded.display_name,
mls_group_blob = excluded.mls_group_blob,
keystore_blob = excluded.keystore_blob,
member_keys = excluded.member_keys,
unread_count = excluded.unread_count,
last_activity_ms = excluded.last_activity_ms",
params![
conv.id.0.as_slice(),
kind_str,
conv.display_name,
peer_key,
peer_username,
group_name,
conv.mls_group_blob,
conv.keystore_blob,
member_keys_blob,
conv.unread_count,
conv.last_activity_ms,
conv.created_at_ms,
],
)?;
Ok(())
}
pub fn load_conversation(&self, id: &ConversationId) -> anyhow::Result<Option<Conversation>> {
self.conn
.query_row(
"SELECT kind, display_name, peer_key, peer_username, group_name,
mls_group_blob, keystore_blob, member_keys, unread_count,
last_activity_ms, created_at_ms
FROM conversations WHERE id = ?1",
params![id.0.as_slice()],
|row| {
let kind_str: String = row.get(0)?;
let display_name: String = row.get(1)?;
let peer_key: Option<Vec<u8>> = row.get(2)?;
let peer_username: Option<String> = row.get(3)?;
let group_name: Option<String> = row.get(4)?;
let mls_group_blob: Option<Vec<u8>> = row.get(5)?;
let keystore_blob: Option<Vec<u8>> = row.get(6)?;
let member_keys_blob: Option<Vec<u8>> = row.get(7)?;
let unread_count: u32 = row.get(8)?;
let last_activity_ms: u64 = row.get(9)?;
let created_at_ms: u64 = row.get(10)?;
let kind = if kind_str == "dm" {
ConversationKind::Dm {
peer_key: peer_key.unwrap_or_default(),
peer_username,
}
} else {
ConversationKind::Group {
name: group_name.unwrap_or_default(),
}
};
let member_keys: Vec<Vec<u8>> = member_keys_blob
.and_then(|b| bincode::deserialize(&b).ok())
.unwrap_or_default();
Ok(Conversation {
id: id.clone(),
kind,
display_name,
mls_group_blob,
keystore_blob,
member_keys,
unread_count,
last_activity_ms,
created_at_ms,
})
},
)
.optional()
.context("load conversation")
}
pub fn list_conversations(&self) -> anyhow::Result<Vec<Conversation>> {
let mut stmt = self.conn.prepare(
"SELECT id, kind, display_name, peer_key, peer_username, group_name,
mls_group_blob, keystore_blob, member_keys, unread_count,
last_activity_ms, created_at_ms
FROM conversations ORDER BY last_activity_ms DESC",
)?;
let rows = stmt.query_map([], |row| {
let id_blob: Vec<u8> = row.get(0)?;
let kind_str: String = row.get(1)?;
let display_name: String = row.get(2)?;
let peer_key: Option<Vec<u8>> = row.get(3)?;
let peer_username: Option<String> = row.get(4)?;
let group_name: Option<String> = row.get(5)?;
let mls_group_blob: Option<Vec<u8>> = row.get(6)?;
let keystore_blob: Option<Vec<u8>> = row.get(7)?;
let member_keys_blob: Option<Vec<u8>> = row.get(8)?;
let unread_count: u32 = row.get(9)?;
let last_activity_ms: u64 = row.get(10)?;
let created_at_ms: u64 = row.get(11)?;
let id = ConversationId::from_slice(&id_blob).unwrap_or(ConversationId([0; 16]));
let kind = if kind_str == "dm" {
ConversationKind::Dm {
peer_key: peer_key.unwrap_or_default(),
peer_username,
}
} else {
ConversationKind::Group {
name: group_name.unwrap_or_default(),
}
};
let member_keys: Vec<Vec<u8>> = member_keys_blob
.and_then(|b| bincode::deserialize(&b).ok())
.unwrap_or_default();
Ok(Conversation {
id,
kind,
display_name,
mls_group_blob,
keystore_blob,
member_keys,
unread_count,
last_activity_ms,
created_at_ms,
})
})?;
let mut convs = Vec::new();
for row in rows {
convs.push(row?);
}
Ok(convs)
}
/// Find a DM conversation by the peer's identity key.
pub fn find_dm_by_peer(&self, peer_key: &[u8]) -> anyhow::Result<Option<Conversation>> {
let id_blob: Option<Vec<u8>> = self
.conn
.query_row(
"SELECT id FROM conversations WHERE kind = 'dm' AND peer_key = ?1",
params![peer_key],
|row| row.get(0),
)
.optional()?;
match id_blob {
Some(blob) => {
let id = ConversationId::from_slice(&blob)
.context("invalid conversation id in db")?;
self.load_conversation(&id)
}
None => Ok(None),
}
}
/// Find a group conversation by name.
pub fn find_group_by_name(&self, name: &str) -> anyhow::Result<Option<Conversation>> {
let id_blob: Option<Vec<u8>> = self
.conn
.query_row(
"SELECT id FROM conversations WHERE kind = 'group' AND group_name = ?1",
params![name],
|row| row.get(0),
)
.optional()?;
match id_blob {
Some(blob) => {
let id = ConversationId::from_slice(&blob)
.context("invalid conversation id in db")?;
self.load_conversation(&id)
}
None => Ok(None),
}
}
pub fn increment_unread(&self, id: &ConversationId) -> anyhow::Result<()> {
self.conn.execute(
"UPDATE conversations SET unread_count = unread_count + 1 WHERE id = ?1",
params![id.0.as_slice()],
)?;
Ok(())
}
pub fn reset_unread(&self, id: &ConversationId) -> anyhow::Result<()> {
self.conn.execute(
"UPDATE conversations SET unread_count = 0 WHERE id = ?1",
params![id.0.as_slice()],
)?;
Ok(())
}
pub fn update_activity(&self, id: &ConversationId, ts_ms: u64) -> anyhow::Result<()> {
self.conn.execute(
"UPDATE conversations SET last_activity_ms = ?2 WHERE id = ?1 AND last_activity_ms < ?2",
params![id.0.as_slice(), ts_ms],
)?;
Ok(())
}
// ── Message CRUD ─────────────────────────────────────────────────────
pub fn save_message(&self, msg: &StoredMessage) -> anyhow::Result<()> {
self.conn.execute(
"INSERT INTO messages
(conversation_id, message_id, sender_key, sender_name, body,
msg_type, ref_msg_id, timestamp_ms, is_outgoing)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
params![
msg.conversation_id.0.as_slice(),
msg.message_id.as_ref().map(|id| id.as_slice()),
msg.sender_key,
msg.sender_name,
msg.body,
msg.msg_type,
msg.ref_msg_id.as_ref().map(|id| id.as_slice()),
msg.timestamp_ms,
msg.is_outgoing as i32,
],
)?;
Ok(())
}
pub fn load_recent_messages(
&self,
conv_id: &ConversationId,
limit: usize,
) -> anyhow::Result<Vec<StoredMessage>> {
let mut stmt = self.conn.prepare(
"SELECT message_id, sender_key, sender_name, body, msg_type,
ref_msg_id, timestamp_ms, is_outgoing
FROM messages
WHERE conversation_id = ?1
ORDER BY timestamp_ms DESC
LIMIT ?2",
)?;
let rows = stmt.query_map(params![conv_id.0.as_slice(), limit as u32], |row| {
let message_id: Option<Vec<u8>> = row.get(0)?;
let sender_key: Vec<u8> = row.get(1)?;
let sender_name: Option<String> = row.get(2)?;
let body: String = row.get(3)?;
let msg_type: String = row.get(4)?;
let ref_msg_id: Option<Vec<u8>> = row.get(5)?;
let timestamp_ms: u64 = row.get(6)?;
let is_outgoing: i32 = row.get(7)?;
fn to_16(v: &[u8]) -> Option<[u8; 16]> {
if v.len() == 16 {
let mut buf = [0u8; 16];
buf.copy_from_slice(v);
Some(buf)
} else {
None
}
}
Ok(StoredMessage {
conversation_id: conv_id.clone(),
message_id: message_id.as_deref().and_then(to_16),
sender_key,
sender_name,
body,
msg_type,
ref_msg_id: ref_msg_id.as_deref().and_then(to_16),
timestamp_ms,
is_outgoing: is_outgoing != 0,
})
})?;
let mut msgs = Vec::new();
for row in rows {
msgs.push(row?);
}
// Reverse so oldest first
msgs.reverse();
Ok(msgs)
}
}
pub fn now_ms() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64
}

View File

@@ -0,0 +1,69 @@
//! Terminal display helpers for the REPL.
use super::conversation::StoredMessage;
use super::session::SessionState;
// ANSI color codes
const RESET: &str = "\x1b[0m";
const BOLD: &str = "\x1b[1m";
const DIM: &str = "\x1b[2m";
const GREEN: &str = "\x1b[32m";
const CYAN: &str = "\x1b[36m";
const YELLOW: &str = "\x1b[33m";
/// Print the REPL prompt showing the active conversation and unread count.
pub fn print_prompt(session: &SessionState) {
use std::io::Write;
let name = session
.active_display_name()
.unwrap_or_else(|| "no conversation".into());
let unread = session.total_unread();
if unread > 0 {
print!("{DIM}[{RESET}{BOLD}{name}{RESET} {YELLOW}{unread} unread{RESET}{DIM}]{RESET} > ");
} else {
print!("{DIM}[{RESET}{BOLD}{name}{RESET}{DIM}]{RESET} > ");
}
let _ = std::io::stdout().flush();
}
/// Print an incoming or outgoing message.
pub fn print_message(msg: &StoredMessage) {
if msg.is_outgoing {
println!("\r{GREEN}> {}{RESET}", msg.body);
} else {
let fallback = hex::encode(&msg.sender_key[..4]);
let sender = msg.sender_name.as_deref().unwrap_or(&fallback);
println!("\r{CYAN}{BOLD}[{sender}]{RESET} {}", msg.body);
}
}
/// Print a message received in real-time (clears current line first).
pub fn print_incoming(sender: &str, body: &str) {
use std::io::Write;
// Clear current line, print message, then re-show prompt context
print!("\r\x1b[2K");
println!("{CYAN}{BOLD}[{sender}]{RESET} {body}");
let _ = std::io::stdout().flush();
}
/// Print a system/status message.
pub fn print_status(msg: &str) {
println!("{DIM} {msg}{RESET}");
}
/// Print an error message.
pub fn print_error(msg: &str) {
println!("{YELLOW} error: {msg}{RESET}");
}
/// Format a conversation list entry for `/list`.
pub fn format_conv_line(display_name: &str, kind: &str, unread: u32, members: usize) -> String {
let unread_str = if unread > 0 {
format!(" {YELLOW}({unread} new){RESET}")
} else {
String::new()
};
format!(
" {BOLD}{display_name}{RESET} {DIM}[{kind}, {members} members]{RESET}{unread_str}"
)
}

View File

@@ -1,8 +1,13 @@
pub mod commands;
pub mod conversation;
pub mod display;
pub mod hex;
pub mod repl;
pub mod retry;
pub mod rpc;
pub mod session;
pub mod state;
pub mod token_cache;
pub use commands::*;
pub use rpc::{connect_node, enqueue, fetch_all, fetch_hybrid_key, fetch_key_package, fetch_wait, upload_hybrid_key, upload_key_package};

File diff suppressed because it is too large Load Diff

View File

@@ -76,7 +76,8 @@ pub async fn connect_node(
}
pub fn set_auth(auth: &mut auth::Builder<'_>) -> anyhow::Result<()> {
let ctx = AUTH_CONTEXT.get().ok_or_else(|| {
let guard = AUTH_CONTEXT.read().expect("AUTH_CONTEXT poisoned");
let ctx = guard.as_ref().ok_or_else(|| {
anyhow::anyhow!(
"init_auth must be called before RPCs (use a bearer or session token for authenticated commands)"
)
@@ -574,6 +575,106 @@ pub async fn batch_enqueue(
.await
}
/// Resolve a username to its Ed25519 identity key (32 bytes).
/// Returns `None` if the username is not registered.
pub async fn resolve_user(
client: &node_service::Client,
username: &str,
) -> anyhow::Result<Option<Vec<u8>>> {
let mut req = client.resolve_user_request();
{
let mut p = req.get();
p.set_username(username);
let mut auth = p.reborrow().init_auth();
set_auth(&mut auth)?;
}
let resp = req
.send()
.promise
.await
.context("resolve_user RPC failed")?;
let key = resp
.get()
.context("resolve_user: bad response")?
.get_identity_key()
.context("resolve_user: missing field")?
.to_vec();
if key.is_empty() {
Ok(None)
} else {
Ok(Some(key))
}
}
/// Reverse lookup: resolve an identity key to the registered username.
/// Returns `None` if no username is associated with the key.
pub async fn resolve_identity(
client: &node_service::Client,
identity_key: &[u8],
) -> anyhow::Result<Option<String>> {
let mut req = client.resolve_identity_request();
{
let mut p = req.get();
p.set_identity_key(identity_key);
let mut auth = p.reborrow().init_auth();
set_auth(&mut auth)?;
}
let resp = req
.send()
.promise
.await
.context("resolve_identity RPC failed")?;
let username = resp
.get()
.context("resolve_identity: bad response")?
.get_username()
.context("resolve_identity: missing field")?
.to_str()
.unwrap_or("")
.to_string();
if username.is_empty() {
Ok(None)
} else {
Ok(Some(username))
}
}
/// Create a 1:1 DM channel with a peer. Returns the 16-byte channel ID.
/// If a channel already exists between the two users, returns the existing ID.
pub async fn create_channel(
client: &node_service::Client,
peer_key: &[u8],
) -> anyhow::Result<Vec<u8>> {
let mut req = client.create_channel_request();
{
let mut p = req.get();
p.set_peer_key(peer_key);
let mut auth = p.reborrow().init_auth();
set_auth(&mut auth)?;
}
let resp = req
.send()
.promise
.await
.context("create_channel RPC failed")?;
let channel_id = resp
.get()
.context("create_channel: bad response")?
.get_channel_id()
.context("create_channel: missing channel_id")?
.to_vec();
Ok(channel_id)
}
/// Return the current Unix timestamp in milliseconds.
pub fn current_timestamp_ms() -> u64 {
std::time::SystemTime::now()

View File

@@ -0,0 +1,260 @@
//! Runtime session state for the interactive REPL.
//!
//! Wraps the legacy `StoredState` (identity + hybrid key) and adds
//! multi-conversation management via `ConversationStore`.
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use anyhow::Context;
use quicnprotochat_core::{DiskKeyStore, GroupMember, HybridKeypair, IdentityKeypair};
use super::conversation::{
now_ms, Conversation, ConversationId, ConversationKind, ConversationStore,
};
use super::state::{load_or_init_state, keystore_path};
/// Runtime state for an interactive REPL session.
pub struct SessionState {
/// Long-term identity keypair.
pub identity: Arc<IdentityKeypair>,
/// Post-quantum hybrid keypair.
pub hybrid_kp: Option<HybridKeypair>,
/// Path to the legacy state file (for backward compat with one-shot commands).
pub state_path: PathBuf,
/// Optional password for the legacy state file.
pub password: Option<String>,
/// SQLite-backed conversation + message store.
pub conv_store: ConversationStore,
/// Currently active conversation.
pub active_conversation: Option<ConversationId>,
/// In-memory GroupMember instances keyed by conversation ID.
pub members: HashMap<ConversationId, GroupMember>,
/// Holds the GroupMember whose KeyPackage was uploaded to the server.
/// Its keystore contains the HPKE init private key needed to decrypt
/// incoming Welcome messages. Consumed on auto-join, then replenished.
pub pending_member: Option<GroupMember>,
}
impl SessionState {
/// Load identity from the legacy state file, open the conversation store,
/// and migrate any existing single-group state into the conversation DB.
pub fn load(
state_path: &Path,
password: Option<&str>,
) -> anyhow::Result<Self> {
let state = load_or_init_state(state_path, password)?;
let identity = Arc::new(IdentityKeypair::from_seed(state.identity_seed));
let hybrid_kp = state
.hybrid_key
.as_ref()
.map(|b| HybridKeypair::from_bytes(b))
.transpose()
.context("decode hybrid key")?;
// Open the conversation DB next to the state file.
// When a state password is provided, encrypt the DB with SQLCipher.
let db_path = state_path.with_extension("convdb");
let conv_store = ConversationStore::open(&db_path, password)?;
let mut session = Self {
identity,
hybrid_kp,
state_path: state_path.to_path_buf(),
password: password.map(String::from),
conv_store,
active_conversation: None,
members: HashMap::new(),
pending_member: None,
};
// Migrate legacy single-group into conversations if present and not yet migrated.
if state.group.is_some() {
session.migrate_legacy_group(state_path, &state.group)?;
}
// Load all existing conversations' GroupMembers into memory.
session.load_all_members()?;
Ok(session)
}
/// Migrate the legacy single-group from StoredState into the conversation DB.
fn migrate_legacy_group(
&mut self,
state_path: &Path,
group_blob: &Option<Vec<u8>>,
) -> anyhow::Result<()> {
let blob = match group_blob {
Some(b) => b,
None => return Ok(()),
};
// Reconstruct GroupMember using the legacy keystore and group blob.
let ks_path = keystore_path(state_path);
let ks = DiskKeyStore::persistent(&ks_path)?;
let group = bincode::deserialize(blob).context("decode legacy group")?;
let member = GroupMember::new_with_state(
Arc::clone(&self.identity),
ks,
Some(group),
);
let group_id_bytes = member.group_id().unwrap_or_default();
// Use the first 16 bytes of the group_id as the ConversationId.
let conv_id = if group_id_bytes.len() >= 16 {
ConversationId::from_slice(&group_id_bytes[..16])
.unwrap_or_else(|| ConversationId([0; 16]))
} else {
ConversationId::from_group_name(&hex::encode(&group_id_bytes))
};
// Check if already migrated.
if self.conv_store.load_conversation(&conv_id)?.is_some() {
return Ok(());
}
let member_keys = member.member_identities();
let short_id = &hex::encode(&group_id_bytes)[..8.min(group_id_bytes.len() * 2)];
let conv = Conversation {
id: conv_id.clone(),
kind: ConversationKind::Group {
name: format!("legacy-{short_id}"),
},
display_name: format!("legacy-{short_id}"),
mls_group_blob: Some(blob.clone()),
keystore_blob: None,
member_keys,
unread_count: 0,
last_activity_ms: now_ms(),
created_at_ms: now_ms(),
};
self.conv_store.save_conversation(&conv)?;
self.members.insert(conv_id, member);
Ok(())
}
/// Load all conversations from the DB and create in-memory GroupMember instances.
fn load_all_members(&mut self) -> anyhow::Result<()> {
let convs = self.conv_store.list_conversations()?;
for conv in convs {
if self.members.contains_key(&conv.id) {
continue;
}
let member = self.create_member_from_conv(&conv)?;
self.members.insert(conv.id.clone(), member);
}
Ok(())
}
/// Create a GroupMember from a stored conversation.
fn create_member_from_conv(&self, conv: &Conversation) -> anyhow::Result<GroupMember> {
let ks_path = self.keystore_path_for(&conv.id);
let ks = DiskKeyStore::persistent(&ks_path)
.unwrap_or_else(|_| DiskKeyStore::ephemeral());
let group = conv
.mls_group_blob
.as_ref()
.map(|b| bincode::deserialize(b))
.transpose()
.context("decode MLS group from conversation db")?;
Ok(GroupMember::new_with_state(
Arc::clone(&self.identity),
ks,
group,
))
}
/// Path for a per-conversation keystore file.
fn keystore_path_for(&self, conv_id: &ConversationId) -> PathBuf {
let dir = self.state_path.with_extension("keystores");
dir.join(format!("{}.ks", conv_id.hex()))
}
/// Persist a conversation's MLS group state back to the DB.
pub fn save_member(&self, conv_id: &ConversationId) -> anyhow::Result<()> {
let member = self.members.get(conv_id).context("no such conversation")?;
let blob = member
.group_ref()
.map(|g| bincode::serialize(g))
.transpose()
.context("serialize MLS group")?;
let member_keys = member.member_identities();
// Update the mls_group_blob and member_keys in the DB.
if let Some(mut conv) = self.conv_store.load_conversation(conv_id)? {
conv.mls_group_blob = blob;
conv.member_keys = member_keys;
self.conv_store.save_conversation(&conv)?;
}
Ok(())
}
/// Persist all in-memory group states back to the DB.
pub fn save_all(&self) -> anyhow::Result<()> {
for conv_id in self.members.keys() {
if let Err(e) = self.save_member(conv_id) {
tracing::warn!(conv = %conv_id.hex(), error = %e, "failed to save conversation");
}
}
Ok(())
}
/// Add a new conversation and its GroupMember to the session.
pub fn add_conversation(
&mut self,
conv: Conversation,
member: GroupMember,
) -> anyhow::Result<()> {
// Ensure keystore directory exists
let ks_path = self.keystore_path_for(&conv.id);
if let Some(parent) = ks_path.parent() {
std::fs::create_dir_all(parent).ok();
}
self.conv_store.save_conversation(&conv)?;
self.members.insert(conv.id.clone(), member);
Ok(())
}
/// Get a mutable reference to a conversation's GroupMember.
pub fn get_member_mut(&mut self, conv_id: &ConversationId) -> Option<&mut GroupMember> {
self.members.get_mut(conv_id)
}
/// Public key bytes for this identity.
pub fn identity_bytes(&self) -> Vec<u8> {
self.identity.public_key_bytes().to_vec()
}
/// Short hex prefix of the identity key for display.
pub fn identity_short(&self) -> String {
hex::encode(&self.identity.public_key_bytes()[..4])
}
/// Get display name of a conversation.
pub fn active_display_name(&self) -> Option<String> {
let id = self.active_conversation.as_ref()?;
self.conv_store.load_conversation(id).ok().flatten().map(|c| c.display_name)
}
/// Count total unread across all conversations.
pub fn total_unread(&self) -> u32 {
self.conv_store
.list_conversations()
.unwrap_or_default()
.iter()
.map(|c| c.unread_count)
.sum()
}
}

View File

@@ -0,0 +1,86 @@
//! Cached session token stored next to the state file.
//!
//! File format (no password): two lines — username and hex-encoded session token.
//! File format (with password): QPCE-encrypted version of the above.
//! The token has a server-side 24h TTL; no client-side expiry tracking.
use std::path::{Path, PathBuf};
use anyhow::Context;
use super::state::{decrypt_state, encrypt_state, is_encrypted_state};
pub struct CachedSession {
pub username: String,
pub token_hex: String,
}
/// Derive the session cache path: `{state_path}.session`.
fn session_cache_path(state_path: &Path) -> PathBuf {
state_path.with_extension("session")
}
/// Parse the two-line format (username + token_hex) from plaintext bytes.
fn parse_session_lines(text: &str) -> Option<CachedSession> {
let mut lines = text.lines();
let username = lines.next()?.trim().to_string();
let token_hex = lines.next()?.trim().to_string();
if username.is_empty() || token_hex.is_empty() {
return None;
}
if hex::decode(&token_hex).is_err() {
return None;
}
Some(CachedSession { username, token_hex })
}
/// Load a cached session token. Returns None if file is missing or malformed.
/// Decrypts if the file is QPCE-encrypted (requires `password`).
pub fn load_cached_session(state_path: &Path, password: Option<&str>) -> Option<CachedSession> {
let path = session_cache_path(state_path);
let raw = std::fs::read(&path).ok()?;
if is_encrypted_state(&raw) {
let pw = password?;
let plaintext = decrypt_state(pw, &raw).ok()?;
let text = String::from_utf8(plaintext).ok()?;
parse_session_lines(&text)
} else {
let text = String::from_utf8(raw).ok()?;
parse_session_lines(&text)
}
}
/// Save a session token to the cache file (mode 0o600 on Unix).
/// Encrypts with QPCE if `password` is provided.
pub fn save_cached_session(
state_path: &Path,
username: &str,
token_hex: &str,
password: Option<&str>,
) -> anyhow::Result<()> {
let path = session_cache_path(state_path);
let contents = format!("{username}\n{token_hex}\n");
let bytes = match password {
Some(pw) => encrypt_state(pw, contents.as_bytes())?,
None => contents.into_bytes(),
};
std::fs::write(&path, bytes).with_context(|| format!("write session cache {path:?}"))?;
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let perms = std::fs::Permissions::from_mode(0o600);
std::fs::set_permissions(&path, perms).ok();
}
Ok(())
}
/// Remove the cached session file.
pub fn clear_cached_session(state_path: &Path) {
let path = session_cache_path(state_path);
std::fs::remove_file(&path).ok();
}

View File

@@ -14,7 +14,7 @@
//! commands. See the [running-the-client](https://docs.quicnprotochat.dev/getting-started/running-the-client)
//! docs for details.
use std::sync::OnceLock;
use std::sync::RwLock;
pub mod client;
@@ -25,10 +25,11 @@ pub use client::commands::{
receive_pending_plaintexts, whoami_json,
};
pub use client::repl::run_repl;
pub use client::rpc::{connect_node, enqueue, fetch_wait};
// Global auth context initialized once per process.
pub(crate) static AUTH_CONTEXT: OnceLock<ClientAuth> = OnceLock::new();
// Global auth context — RwLock so the REPL can set it after OPAQUE login.
pub(crate) static AUTH_CONTEXT: RwLock<Option<ClientAuth>> = RwLock::new(None);
#[derive(Clone, Debug)]
pub struct ClientAuth {
@@ -48,9 +49,20 @@ impl ClientAuth {
device_id: device,
}
}
/// Build from raw token bytes (e.g. a 32-byte OPAQUE session token).
pub fn from_raw(raw_token: Vec<u8>, device_id: Option<String>) -> Self {
let device = device_id.unwrap_or_default().into_bytes();
Self {
version: 1,
access_token: raw_token,
device_id: device,
}
}
}
/// Initialize the global auth context; subsequent calls are ignored.
/// Set (or replace) the global auth context.
pub fn init_auth(ctx: ClientAuth) {
let _ = AUTH_CONTEXT.set(ctx);
let mut guard = AUTH_CONTEXT.write().expect("AUTH_CONTEXT poisoned");
*guard = Some(ctx);
}

View File

@@ -7,7 +7,8 @@ use clap::{Parser, Subcommand};
use quicnprotochat_client::{
cmd_chat, cmd_check_key, cmd_create_group, cmd_demo_group, cmd_fetch_key, cmd_health,
cmd_invite, cmd_join, cmd_login, cmd_ping, cmd_recv, cmd_register, cmd_register_state,
cmd_refresh_keypackage, cmd_register_user, cmd_send, cmd_whoami, init_auth, ClientAuth,
cmd_refresh_keypackage, cmd_register_user, cmd_send, cmd_whoami, init_auth, run_repl,
ClientAuth,
};
// ── CLI ───────────────────────────────────────────────────────────────────────
@@ -266,6 +267,25 @@ enum Command {
stream: bool,
},
/// Interactive multi-conversation REPL. Supports /dm, /create-group, /invite, /join, /switch, and more.
/// Automatically registers and logs in if --username/--password are provided (or prompts interactively).
Repl {
#[arg(
long,
default_value = "quicnprotochat-state.bin",
env = "QUICNPROTOCHAT_STATE"
)]
state: PathBuf,
#[arg(long, default_value = "127.0.0.1:7000", env = "QUICNPROTOCHAT_SERVER")]
server: String,
/// OPAQUE username for automatic registration/login.
#[arg(long, env = "QUICNPROTOCHAT_USERNAME")]
username: Option<String>,
/// OPAQUE password (prompted securely if --username is set but --password is not).
#[arg(long, env = "QUICNPROTOCHAT_PASSWORD")]
password: Option<String>,
},
/// Interactive 1:1 chat: type to send, incoming messages printed as [peer] <msg>. Ctrl+D to exit.
/// In a two-person group, peer is chosen automatically; use --peer-key only with 3+ members.
Chat {
@@ -290,6 +310,9 @@ enum Command {
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Install the rustls crypto provider before any TLS operations.
let _ = rustls::crypto::ring::default_provider().install_default();
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
@@ -299,9 +322,13 @@ async fn main() -> anyhow::Result<()> {
let args = Args::parse();
// Initialize auth context once for all RPCs (empty token OK for register-user/login).
let auth_ctx = ClientAuth::from_parts(args.access_token.clone(), args.device_id.clone());
init_auth(auth_ctx);
// For the REPL, defer init_auth so it can resolve its own token via OPAQUE.
// For all other subcommands, initialize auth immediately.
let is_repl = matches!(args.command, Command::Repl { .. });
if !is_repl {
let auth_ctx = ClientAuth::from_parts(args.access_token.clone(), args.device_id.clone());
init_auth(auth_ctx);
}
let state_pw = args.state_password.as_deref();
@@ -496,6 +523,27 @@ async fn main() -> anyhow::Result<()> {
))
.await
}
Command::Repl {
state,
server,
username,
password,
} => {
let local = tokio::task::LocalSet::new();
local
.run_until(run_repl(
&state,
&server,
&args.ca_cert,
&args.server_name,
state_pw,
username.as_deref(),
password.as_deref(),
&args.access_token,
args.device_id.as_deref(),
))
.await
}
Command::Chat {
state,
server,

View File

@@ -36,6 +36,7 @@ rcgen = { workspace = true }
opaque-ke = { workspace = true }
rand = { workspace = true }
subtle = { workspace = true }
zeroize = { workspace = true }
# Database
rusqlite = { workspace = true }

View File

@@ -1,3 +1,4 @@
use std::net::IpAddr;
use std::sync::Arc;
use dashmap::DashMap;
@@ -5,6 +6,7 @@ use quicnprotochat_proto::node_capnp::auth;
use sha2::Digest;
use subtle::ConstantTimeEq;
use tokio::sync::Notify;
use zeroize::Zeroizing;
use crate::error_codes::*;
@@ -13,19 +15,29 @@ pub const PENDING_LOGIN_TTL_SECS: u64 = 300; // 5 minutes
pub const RATE_LIMIT_WINDOW_SECS: u64 = 60;
pub const RATE_LIMIT_MAX_ENQUEUES: u32 = 100;
#[derive(Clone, Debug)]
#[derive(Clone)]
pub struct AuthConfig {
pub required_token: Option<Vec<u8>>,
/// Server bearer token — zeroized on drop to prevent memory disclosure.
pub required_token: Option<Zeroizing<Vec<u8>>>,
/// When true, a valid bearer token (no session) is accepted and the request's identity/key is used (dev/e2e only).
/// CLI flag: --allow-insecure-auth / QUICNPROTOCHAT_ALLOW_INSECURE_AUTH.
pub allow_insecure_identity_from_request: bool,
}
impl std::fmt::Debug for AuthConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AuthConfig")
.field("required_token", &self.required_token.as_ref().map(|_| "[REDACTED]"))
.field("allow_insecure_identity_from_request", &self.allow_insecure_identity_from_request)
.finish()
}
}
impl AuthConfig {
pub fn new(required_token: Option<String>, allow_insecure_identity_from_request: bool) -> Self {
let required_token = required_token
.filter(|s| !s.is_empty())
.map(|s| s.into_bytes());
.map(|s| Zeroizing::new(s.into_bytes()));
Self {
required_token,
allow_insecure_identity_from_request,
@@ -133,7 +145,7 @@ pub fn validate_auth_context(
}
if let Some(expected) = &cfg.required_token {
if expected.len() == token.len() && bool::from(expected.ct_eq(&token)) {
if expected.len() == token.len() && bool::from(expected.as_slice().ct_eq(&token)) {
return Ok(AuthContext {
token,
identity_key: None,
@@ -216,6 +228,30 @@ pub fn waiter(waiters: &DashMap<Vec<u8>, Arc<Notify>>, recipient_key: &[u8]) ->
.clone()
}
pub const CONN_RATE_LIMIT_WINDOW_SECS: u64 = 60;
pub const CONN_RATE_LIMIT_MAX: u32 = 50;
/// Per-IP connection rate limiter. Returns `true` if the connection is allowed.
pub fn check_conn_rate_limit(
conn_rate_limits: &DashMap<IpAddr, RateEntry>,
ip: IpAddr,
) -> bool {
let now = current_timestamp();
let mut entry = conn_rate_limits.entry(ip).or_insert(RateEntry {
count: 0,
window_start: now,
});
if now - entry.window_start >= CONN_RATE_LIMIT_WINDOW_SECS {
entry.count = 1;
entry.window_start = now;
true
} else {
entry.count += 1;
entry.count <= CONN_RATE_LIMIT_MAX
}
}
pub fn fingerprint(data: &[u8]) -> Vec<u8> {
sha2::Sha256::digest(data).to_vec()
}

View File

@@ -163,6 +163,9 @@ pub fn merge_config(args: &crate::Args, file: &FileConfig) -> EffectiveConfig {
}
pub fn validate_production_config(effective: &EffectiveConfig) -> anyhow::Result<()> {
if effective.allow_insecure_auth {
anyhow::bail!("production forbids --allow-insecure-auth");
}
let token = effective
.auth_token
.as_deref()
@@ -178,6 +181,12 @@ pub fn validate_production_config(effective: &EffectiveConfig) -> anyhow::Result
if effective.store_backend == "sql" && effective.db_key.is_empty() {
anyhow::bail!("production with store_backend=sql requires non-empty QUICNPROTOCHAT_DB_KEY");
}
if effective.store_backend != "sql" {
tracing::warn!(
"production is using file-backed storage; \
consider store_backend=sql with QUICNPROTOCHAT_DB_KEY for encryption at rest"
);
}
if !effective.tls_cert.exists() || !effective.tls_key.exists() {
anyhow::bail!(
"production requires existing TLS cert and key (no auto-generation); provide QUICNPROTOCHAT_TLS_CERT and QUICNPROTOCHAT_TLS_KEY"

View File

@@ -2,7 +2,7 @@
//!
//! The server hosts Authentication + Delivery services over QUIC + Cap'n Proto.
use std::{net::SocketAddr, path::PathBuf, sync::Arc};
use std::{net::IpAddr, net::SocketAddr, path::PathBuf, sync::Arc};
use anyhow::Context;
use clap::Parser;
@@ -167,7 +167,11 @@ async fn main() -> anyhow::Result<()> {
// Harden QUIC transport: idle timeout, limit stream concurrency.
let mut transport = quinn::TransportConfig::default();
transport.max_idle_timeout(Some(std::time::Duration::from_secs(300).try_into().unwrap()));
transport.max_idle_timeout(Some(
std::time::Duration::from_secs(300)
.try_into()
.expect("300s is a valid IdleTimeout"),
));
transport.max_concurrent_bidi_streams(1u32.into());
transport.max_concurrent_uni_streams(0u32.into());
server_config.transport_config(Arc::new(transport));
@@ -223,12 +227,14 @@ async fn main() -> anyhow::Result<()> {
let pending_logins: Arc<DashMap<String, PendingLogin>> = Arc::new(DashMap::new());
let sessions: Arc<DashMap<Vec<u8>, SessionInfo>> = Arc::new(DashMap::new());
let rate_limits: Arc<DashMap<Vec<u8>, RateEntry>> = Arc::new(DashMap::new());
let conn_rate_limits: Arc<DashMap<IpAddr, RateEntry>> = Arc::new(DashMap::new());
// Background cleanup task (expire sessions, pending logins, rate limits, and stale messages).
spawn_cleanup_task(
Arc::clone(&sessions),
Arc::clone(&pending_logins),
Arc::clone(&rate_limits),
Arc::clone(&conn_rate_limits),
Arc::clone(&store),
Arc::clone(&waiters),
);
@@ -254,6 +260,14 @@ async fn main() -> anyhow::Result<()> {
None => break,
};
// Per-IP connection rate limiting.
let remote_ip = incoming.remote_address().ip();
if !auth::check_conn_rate_limit(&conn_rate_limits, remote_ip) {
tracing::warn!(ip = %remote_ip, "connection rate limit exceeded, dropping");
incoming.refuse();
continue;
}
let connecting = match incoming.accept() {
Ok(c) => c,
Err(e) => {
@@ -298,6 +312,10 @@ async fn main() -> anyhow::Result<()> {
}
}
// Grace period: let in-flight RPC tasks on the LocalSet finish.
tracing::info!("waiting up to 5s for in-flight RPCs to complete");
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
Ok::<(), anyhow::Error>(())
})
.await?;

View File

@@ -92,12 +92,10 @@ impl NodeServiceImpl {
}
// When sealed_sender is true, enqueue does not require identity; valid token only.
// Otherwise, the sender must have an identity-bound session (but their identity
// does NOT need to match the recipient — they're sending TO the recipient).
if !self.sealed_sender {
if let Err(e) = require_identity_or_request(
&auth_ctx,
&recipient_key,
self.auth_cfg.allow_insecure_identity_from_request,
) {
if let Err(e) = crate::auth::require_identity(&auth_ctx) {
return Promise::err(e);
}
}
@@ -563,6 +561,36 @@ impl NodeServiceImpl {
return Promise::err(e);
}
// When sealed_sender is false, require an identity-bound session.
if !self.sealed_sender {
if let Err(e) = crate::auth::require_identity(&auth_ctx) {
return Promise::err(e);
}
}
// DM channel authz: validate caller membership once before the loop.
if channel_id.len() == 16 {
let members = match self.store.get_channel_members(&channel_id) {
Ok(Some(m)) => m,
Ok(None) => {
return Promise::err(coded_error(E023_CHANNEL_NOT_FOUND, "channel not found"));
}
Err(e) => return Promise::err(storage_err(e)),
};
let caller = match crate::auth::require_identity(&auth_ctx) {
Ok(id) => id,
Err(e) => return Promise::err(e),
};
let (a, b) = &members;
let caller_in = caller == a.as_slice() || caller == b.as_slice();
if !caller_in {
return Promise::err(coded_error(
E022_CHANNEL_ACCESS_DENIED,
"caller is not a member of this channel",
));
}
}
let mut seqs = Vec::with_capacity(recipient_keys.len() as usize);
for i in 0..recipient_keys.len() {
let rk = match recipient_keys.get(i) {
@@ -576,6 +604,33 @@ impl NodeServiceImpl {
));
}
// Per-recipient DM channel membership check.
if channel_id.len() == 16 {
let members = match self.store.get_channel_members(&channel_id) {
Ok(Some(m)) => m,
Ok(None) => {
return Promise::err(coded_error(
E023_CHANNEL_NOT_FOUND,
"channel not found",
));
}
Err(e) => return Promise::err(storage_err(e)),
};
let caller = match crate::auth::require_identity(&auth_ctx) {
Ok(id) => id,
Err(e) => return Promise::err(e),
};
let (a, b) = &members;
let recipient_other = (rk == *a && caller == b.as_slice())
|| (rk == *b && caller == a.as_slice());
if !recipient_other {
return Promise::err(coded_error(
E022_CHANNEL_ACCESS_DENIED,
"recipient is not a member of this channel",
));
}
}
match self.store.queue_depth(&rk, &channel_id) {
Ok(depth) if depth >= MAX_QUEUE_DEPTH => {
return Promise::err(coded_error(

View File

@@ -214,10 +214,10 @@ impl NodeServiceImpl {
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
};
let auth_ctx = match validate_auth_context(&self.auth_cfg, &self.sessions, p.get_auth()) {
Ok(ctx) => ctx,
Err(e) => return Promise::err(e),
};
// Auth check only — any authenticated user can fetch any peer's hybrid public key.
if let Err(e) = validate_auth_context(&self.auth_cfg, &self.sessions, p.get_auth()) {
return Promise::err(e);
}
if identity_key.len() != 32 {
return Promise::err(coded_error(
@@ -226,14 +226,6 @@ impl NodeServiceImpl {
));
}
if let Err(e) = require_identity_or_request(
&auth_ctx,
&identity_key,
self.auth_cfg.allow_insecure_identity_from_request,
) {
return Promise::err(e);
}
let hybrid_pk = match self
.store
.fetch_hybrid_key(&identity_key)

View File

@@ -23,6 +23,7 @@ mod channel_ops;
mod delivery;
mod key_ops;
mod p2p_ops;
mod user_ops;
impl node_service::Server for NodeServiceImpl {
fn upload_key_package(
@@ -176,6 +177,22 @@ impl node_service::Server for NodeServiceImpl {
) -> capnp::capability::Promise<(), capnp::Error> {
self.handle_create_channel(params, results)
}
fn resolve_user(
&mut self,
params: node_service::ResolveUserParams,
results: node_service::ResolveUserResults,
) -> capnp::capability::Promise<(), capnp::Error> {
self.handle_resolve_user(params, results)
}
fn resolve_identity(
&mut self,
params: node_service::ResolveIdentityParams,
results: node_service::ResolveIdentityResults,
) -> capnp::capability::Promise<(), capnp::Error> {
self.handle_resolve_identity(params, results)
}
}
pub const CURRENT_WIRE_VERSION: u16 = 1;
@@ -268,6 +285,7 @@ pub fn spawn_cleanup_task(
sessions: Arc<DashMap<Vec<u8>, SessionInfo>>,
pending_logins: Arc<DashMap<String, PendingLogin>>,
rate_limits: Arc<DashMap<Vec<u8>, RateEntry>>,
conn_rate_limits: Arc<DashMap<std::net::IpAddr, RateEntry>>,
store: Arc<dyn Store>,
waiters: Arc<DashMap<Vec<u8>, Arc<Notify>>>,
) {
@@ -280,6 +298,9 @@ pub fn spawn_cleanup_task(
sessions.retain(|_, info| info.expires_at > now);
pending_logins.retain(|_, pl| now - pl.created_at < PENDING_LOGIN_TTL_SECS);
rate_limits.retain(|_, entry| now - entry.window_start < RATE_LIMIT_WINDOW_SECS * 2);
conn_rate_limits.retain(|_, entry| {
now - entry.window_start < crate::auth::CONN_RATE_LIMIT_WINDOW_SECS * 2
});
// Bound map sizes to prevent unbounded growth from malicious clients.
const MAX_SESSIONS: usize = 100_000;

View File

@@ -0,0 +1,94 @@
//! resolveUser / resolveIdentity RPCs: bidirectional username ↔ identity key lookup.
use capnp::capability::Promise;
use quicnprotochat_proto::node_capnp::node_service;
use crate::auth::{coded_error, validate_auth_context};
use crate::error_codes::*;
use crate::storage::StorageError;
use super::NodeServiceImpl;
fn storage_err(err: StorageError) -> capnp::Error {
coded_error(E009_STORAGE_ERROR, err)
}
impl NodeServiceImpl {
pub fn handle_resolve_user(
&mut self,
params: node_service::ResolveUserParams,
mut results: node_service::ResolveUserResults,
) -> Promise<(), capnp::Error> {
let p = match params.get() {
Ok(p) => p,
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
};
let username = match p.get_username() {
Ok(u) => u,
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
};
let _auth_ctx = match validate_auth_context(&self.auth_cfg, &self.sessions, p.get_auth()) {
Ok(ctx) => ctx,
Err(e) => return Promise::err(e),
};
let username_str = match username.to_str() {
Ok(s) => s,
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
};
if username_str.is_empty() {
return Promise::err(coded_error(E020_BAD_PARAMS, "username must not be empty"));
}
match self.store.get_user_identity_key(username_str) {
Ok(Some(key)) => {
results.get().set_identity_key(&key);
}
Ok(None) => {
// Return empty Data — caller checks length to detect "not found".
}
Err(e) => return Promise::err(storage_err(e)),
}
Promise::ok(())
}
pub fn handle_resolve_identity(
&mut self,
params: node_service::ResolveIdentityParams,
mut results: node_service::ResolveIdentityResults,
) -> Promise<(), capnp::Error> {
let p = match params.get() {
Ok(p) => p,
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
};
let identity_key = match p.get_identity_key() {
Ok(v) => v,
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
};
let _auth_ctx = match validate_auth_context(&self.auth_cfg, &self.sessions, p.get_auth()) {
Ok(ctx) => ctx,
Err(e) => return Promise::err(e),
};
if identity_key.len() != 32 {
return Promise::err(coded_error(
E004_IDENTITY_KEY_LENGTH,
format!("identityKey must be exactly 32 bytes, got {}", identity_key.len()),
));
}
match self.store.resolve_identity_key(identity_key) {
Ok(Some(username)) => {
results.get().set_username(&username);
}
Ok(None) => {
// Return empty string — caller checks length to detect "not found".
}
Err(e) => return Promise::err(storage_err(e)),
}
Promise::ok(())
}
}

View File

@@ -369,6 +369,17 @@ impl Store for SqlStore {
.map_err(|e| StorageError::Db(e.to_string()))
}
fn resolve_identity_key(&self, identity_key: &[u8]) -> Result<Option<String>, StorageError> {
let conn = self.lock_conn()?;
let mut stmt = conn
.prepare("SELECT username FROM user_identity_keys WHERE identity_key = ?1")
.map_err(|e| StorageError::Db(e.to_string()))?;
stmt.query_row(params![identity_key], |row| row.get(0))
.optional()
.map_err(|e| StorageError::Db(e.to_string()))
}
fn peek(
&self,
recipient_key: &[u8],

View File

@@ -100,6 +100,9 @@ pub trait Store: Send + Sync {
/// Retrieve identity key for a user (Fix 2).
fn get_user_identity_key(&self, username: &str) -> Result<Option<Vec<u8>>, StorageError>;
/// Reverse lookup: resolve an identity key to the registered username.
fn resolve_identity_key(&self, identity_key: &[u8]) -> Result<Option<String>, StorageError>;
/// Peek at queued messages without removing them (non-destructive).
/// Returns `(seq, payload)` pairs ordered by seq.
fn peek(
@@ -546,6 +549,16 @@ impl Store for FileBackedStore {
Ok(map.get(username).cloned())
}
fn resolve_identity_key(&self, identity_key: &[u8]) -> Result<Option<String>, StorageError> {
let map = lock(&self.identity_keys)?;
for (username, ik) in map.iter() {
if ik.as_slice() == identity_key {
return Ok(Some(username.clone()));
}
}
Ok(None)
}
fn peek(
&self,
recipient_key: &[u8],