feat: add delivery sequence numbers + major server/client refactor

Delivery sequence numbers (MLS epoch ordering fix):
- schemas/node.capnp: add Envelope{seq,data} struct; enqueue returns seq:UInt64;
  fetch/fetchWait return List(Envelope) instead of List(Data)
- storage.rs: Store trait enqueue returns u64; fetch/fetch_limited return
  Vec<(u64, Vec<u8>)>; FileBackedStore gains QueueMapV3 with per-inbox seq
  counters and V2→V3 on-disk migration
- migrations/002_add_seq.sql: seq column, delivery_seq_counters table, index
- sql_store.rs: atomic UPSERT counter via RETURNING, ORDER BY seq, SCHEMA_VERSION→3
- node_service/delivery.rs: builds Envelope list; returns seq from enqueue
- client/rpc.rs: enqueue→u64, fetch_all/fetch_wait→Vec<(u64,Vec<u8>)>
- client/commands.rs: sort-by-seq before MLS processing; retry loop in cmd_recv
  and receive_pending_plaintexts for correct epoch ordering

Server refactor:
- Split monolithic main.rs into node_service/{mod,delivery,auth_ops,key_ops,p2p_ops}
- Add auth.rs (token validation, rate limiting), config.rs, metrics.rs, tls.rs
- Add SQL migrations runner (001_initial.sql, 002_add_seq.sql)
- OPAQUE PAKE login/registration, sealed-sender mode, queue depth limit (1000)

Client refactor:
- Split lib.rs into client/{commands,rpc,state,retry,hex,mod}
- Add cmd_whoami, cmd_health, cmd_check_key, cmd_ping subcommands
- Add cmd_register_user, cmd_login (OPAQUE), cmd_refresh_keypackage
- Hybrid PQ envelope (X25519 + ML-KEM-768) on all send/recv paths
- E2E test suite expanded

Other:
- quicnprotochat-gui: Tauri 2 desktop GUI skeleton (backend + HTML UI)
- quicnprotochat-p2p: iroh-based P2P transport stub
- quicnprotochat-core: app_message, hybrid_crypto modules; GroupMember API updates
- .github/workflows/size-lint.yml: binary size regression check
- docs: protocol comparison, roadmap updates, fully-operational checklist
This commit is contained in:
2026-02-22 20:40:12 +01:00
parent b5b361e2ff
commit 6b8b61c6ae
56 changed files with 10693 additions and 3024 deletions

View File

@@ -49,3 +49,10 @@ serde = { workspace = true }
# CLI
clap = { workspace = true }
toml = { version = "0.8" }
# Metrics (Prometheus)
metrics = "0.22"
metrics-exporter-prometheus = "0.15"
[dev-dependencies]
tempfile = "3"

View File

@@ -0,0 +1,47 @@
CREATE TABLE IF NOT EXISTS key_packages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
identity_key BLOB NOT NULL,
package_data BLOB NOT NULL,
created_at INTEGER DEFAULT (strftime('%s','now'))
);
CREATE TABLE IF NOT EXISTS deliveries (
id INTEGER PRIMARY KEY AUTOINCREMENT,
recipient_key BLOB NOT NULL,
channel_id BLOB NOT NULL DEFAULT X'',
payload BLOB NOT NULL,
created_at INTEGER DEFAULT (strftime('%s','now'))
);
CREATE TABLE IF NOT EXISTS hybrid_keys (
identity_key BLOB PRIMARY KEY,
hybrid_public_key BLOB NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_kp_identity
ON key_packages(identity_key);
CREATE INDEX IF NOT EXISTS idx_del_recipient_channel
ON deliveries(recipient_key, channel_id);
CREATE TABLE IF NOT EXISTS server_setup (
id INTEGER PRIMARY KEY CHECK (id = 1),
setup_data BLOB NOT NULL
);
CREATE TABLE IF NOT EXISTS users (
username TEXT PRIMARY KEY,
opaque_record BLOB NOT NULL,
created_at INTEGER DEFAULT (strftime('%s','now'))
);
CREATE TABLE IF NOT EXISTS user_identity_keys (
username TEXT PRIMARY KEY,
identity_key BLOB NOT NULL
);
CREATE TABLE IF NOT EXISTS endpoints (
identity_key BLOB PRIMARY KEY,
node_addr BLOB NOT NULL,
updated_at INTEGER DEFAULT (strftime('%s','now'))
);

View File

@@ -0,0 +1,21 @@
-- Migration 002: add per-inbox delivery sequence numbers.
--
-- Adds a `seq` column to the deliveries table and a separate counter table
-- that tracks the next sequence number per (recipient_key, channel_id) inbox.
-- The counter is atomically incremented on each enqueue via an UPSERT so
-- sequence numbers are gapless even under concurrent inserts.
--
-- Requires SQLite >= 3.35 (RETURNING clause support; available on Ubuntu 22.04+).
ALTER TABLE deliveries ADD COLUMN seq INTEGER NOT NULL DEFAULT 0;
CREATE TABLE IF NOT EXISTS delivery_seq_counters (
recipient_key BLOB NOT NULL,
channel_id BLOB NOT NULL,
next_seq INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY (recipient_key, channel_id)
);
-- Index lets ORDER BY seq queries use an index scan instead of a sort.
CREATE INDEX IF NOT EXISTS idx_del_seq
ON deliveries (recipient_key, channel_id, seq);

View File

@@ -0,0 +1,221 @@
use std::sync::Arc;
use dashmap::DashMap;
use quicnprotochat_proto::node_capnp::auth;
use sha2::Digest;
use subtle::ConstantTimeEq;
use tokio::sync::Notify;
use crate::error_codes::*;
pub const SESSION_TTL_SECS: u64 = 24 * 60 * 60; // 24 hours
pub const PENDING_LOGIN_TTL_SECS: u64 = 300; // 5 minutes
pub const RATE_LIMIT_WINDOW_SECS: u64 = 60;
pub const RATE_LIMIT_MAX_ENQUEUES: u32 = 100;
#[derive(Clone, Debug)]
pub struct AuthConfig {
pub required_token: Option<Vec<u8>>,
/// When true, a valid bearer token (no session) is accepted and the request's identity/key is used (dev/e2e only).
pub allow_insecure_identity_from_request: bool,
}
impl AuthConfig {
pub fn new(required_token: Option<String>, allow_insecure_identity_from_request: bool) -> Self {
let required_token = required_token
.filter(|s| !s.is_empty())
.map(|s| s.into_bytes());
Self {
required_token,
allow_insecure_identity_from_request,
}
}
}
#[derive(Clone)]
pub struct SessionInfo {
#[allow(dead_code)]
pub username: String,
pub identity_key: Vec<u8>,
#[allow(dead_code)]
pub created_at: u64,
pub expires_at: u64,
}
pub struct PendingLogin {
pub state_bytes: Vec<u8>,
pub created_at: u64,
}
pub struct RateEntry {
pub count: u32,
pub window_start: u64,
}
#[derive(Clone)]
pub struct AuthContext {
pub token: Vec<u8>,
pub identity_key: Option<Vec<u8>>,
}
pub fn current_timestamp() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
}
pub fn check_rate_limit(
rate_limits: &DashMap<Vec<u8>, RateEntry>,
token: &[u8],
) -> Result<(), capnp::Error> {
let now = current_timestamp();
let mut entry = rate_limits.entry(token.to_vec()).or_insert(RateEntry {
count: 0,
window_start: now,
});
if now - entry.window_start >= RATE_LIMIT_WINDOW_SECS {
entry.count = 1;
entry.window_start = now;
} else {
entry.count += 1;
if entry.count > RATE_LIMIT_MAX_ENQUEUES {
return Err(crate::error_codes::coded_error(
E014_RATE_LIMITED,
format!(
"rate limit exceeded: {} enqueues in {}s window",
RATE_LIMIT_MAX_ENQUEUES, RATE_LIMIT_WINDOW_SECS
),
));
}
}
Ok(())
}
pub fn validate_auth(
cfg: &AuthConfig,
sessions: &DashMap<Vec<u8>, SessionInfo>,
auth: Result<auth::Reader<'_>, capnp::Error>,
) -> Result<(), capnp::Error> {
validate_auth_context(cfg, sessions, auth).map(|_| ())
}
pub fn validate_auth_context(
cfg: &AuthConfig,
sessions: &DashMap<Vec<u8>, SessionInfo>,
auth: Result<auth::Reader<'_>, capnp::Error>,
) -> Result<AuthContext, capnp::Error> {
let auth = auth?;
let version = auth.get_version();
if version != 1 {
return Err(crate::error_codes::coded_error(
E001_BAD_AUTH_VERSION,
format!("unsupported auth version {} (expected 1)", version),
));
}
let token = auth
.get_access_token()
.map_err(|e| crate::error_codes::coded_error(E020_BAD_PARAMS, format!("auth.accessToken: {e}")))?
.to_vec();
if token.is_empty() {
return Err(crate::error_codes::coded_error(
E002_EMPTY_TOKEN,
"auth.version=1 requires non-empty accessToken",
));
}
if let Some(expected) = &cfg.required_token {
if expected.len() == token.len() && bool::from(expected.ct_eq(&token)) {
return Ok(AuthContext {
token,
identity_key: None,
});
}
}
if let Some(session) = sessions.get(&token) {
let now = current_timestamp();
if session.expires_at > now {
let identity = if session.identity_key.is_empty() {
None
} else {
Some(session.identity_key.clone())
};
return Ok(AuthContext {
token,
identity_key: identity,
});
}
drop(session);
sessions.remove(&token);
return Err(crate::error_codes::coded_error(
E017_SESSION_EXPIRED,
"session token has expired",
));
}
Err(crate::error_codes::coded_error(E003_INVALID_TOKEN, "invalid accessToken"))
}
pub fn require_identity<'a>(auth_ctx: &'a AuthContext) -> Result<&'a [u8], capnp::Error> {
match auth_ctx.identity_key.as_deref() {
Some(ik) => Ok(ik),
None => Err(crate::error_codes::coded_error(
E003_INVALID_TOKEN,
"access token is not identity-bound; login required",
)),
}
}
pub fn require_identity_match(auth_ctx: &AuthContext, expected: &[u8]) -> Result<(), capnp::Error> {
let ik = require_identity(auth_ctx)?;
if ik != expected {
return Err(crate::error_codes::coded_error(
E016_IDENTITY_MISMATCH,
"access token is bound to a different identity",
));
}
Ok(())
}
/// When the token is a valid session, require it to match `request_identity`.
/// When the token is a bearer token (no identity) and `allow_insecure_identity_from_request` is true, accept the request identity (dev/e2e).
pub fn require_identity_or_request(
auth_ctx: &AuthContext,
request_identity: &[u8],
allow_insecure: bool,
) -> Result<(), capnp::Error> {
match auth_ctx.identity_key.as_deref() {
Some(_) => require_identity_match(auth_ctx, request_identity),
None if allow_insecure => Ok(()),
None => Err(crate::error_codes::coded_error(
E003_INVALID_TOKEN,
"access token is not identity-bound; login required",
)),
}
}
pub fn fmt_hex(bytes: &[u8]) -> String {
let hex: String = bytes.iter().map(|b| format!("{b:02x}")).collect();
format!("{hex}")
}
pub fn waiter(waiters: &DashMap<Vec<u8>, Arc<Notify>>, recipient_key: &[u8]) -> Arc<Notify> {
waiters
.entry(recipient_key.to_vec())
.or_insert_with(|| Arc::new(Notify::new()))
.clone()
}
pub fn fingerprint(data: &[u8]) -> Vec<u8> {
sha2::Sha256::digest(data).to_vec()
}
pub fn coded_error(code: &str, msg: impl std::fmt::Display) -> capnp::Error {
crate::error_codes::coded_error(code, msg)
}

View File

@@ -0,0 +1,187 @@
use std::path::{Path, PathBuf};
use anyhow::Context;
use serde::Deserialize;
pub const DEFAULT_LISTEN: &str = "0.0.0.0:7000";
pub const DEFAULT_DATA_DIR: &str = "data";
pub const DEFAULT_TLS_CERT: &str = "data/server-cert.der";
pub const DEFAULT_TLS_KEY: &str = "data/server-key.der";
pub const DEFAULT_STORE_BACKEND: &str = "file";
pub const DEFAULT_DB_PATH: &str = "data/quicnprotochat.db";
#[derive(Debug, Default, Deserialize)]
pub struct FileConfig {
pub listen: Option<String>,
pub data_dir: Option<String>,
pub tls_cert: Option<PathBuf>,
pub tls_key: Option<PathBuf>,
pub auth_token: Option<String>,
pub allow_insecure_auth: Option<bool>,
/// When true, enqueue does not require an identity-bound session: only a valid token is required.
/// The server does not associate the request with a specific sender (Sealed Sender).
#[serde(default)]
pub sealed_sender: Option<bool>,
pub store_backend: Option<String>,
pub db_path: Option<PathBuf>,
pub db_key: Option<String>,
/// Metrics HTTP listen address (e.g. "0.0.0.0:9090"). If set, /metrics is served there.
pub metrics_listen: Option<String>,
/// When true and metrics_listen is set, start the metrics server.
#[serde(default)]
pub metrics_enabled: Option<bool>,
}
#[derive(Debug)]
pub struct EffectiveConfig {
pub listen: String,
pub data_dir: String,
pub tls_cert: PathBuf,
pub tls_key: PathBuf,
pub auth_token: Option<String>,
pub allow_insecure_auth: bool,
/// When true, enqueue does not require identity; valid token only (Sealed Sender).
pub sealed_sender: bool,
pub store_backend: String,
pub db_path: PathBuf,
pub db_key: String,
/// If Some(addr), metrics server listens here (e.g. "0.0.0.0:9090").
pub metrics_listen: Option<String>,
/// Start metrics server only when true and metrics_listen is set.
pub metrics_enabled: bool,
}
pub fn load_config(path: Option<&Path>) -> anyhow::Result<FileConfig> {
let path = match path {
Some(p) => PathBuf::from(p),
None => PathBuf::from("quicnprotochat-server.toml"),
};
if !path.exists() {
return Ok(FileConfig::default());
}
let contents =
std::fs::read_to_string(&path).with_context(|| format!("read config file {path:?}"))?;
let cfg: FileConfig =
toml::from_str(&contents).with_context(|| format!("parse config file {path:?}"))?;
Ok(cfg)
}
pub fn merge_config(args: &crate::Args, file: &FileConfig) -> EffectiveConfig {
let listen = if args.listen == DEFAULT_LISTEN {
file.listen
.clone()
.unwrap_or_else(|| DEFAULT_LISTEN.to_string())
} else {
args.listen.clone()
};
let data_dir = if args.data_dir == DEFAULT_DATA_DIR {
file.data_dir
.clone()
.unwrap_or_else(|| DEFAULT_DATA_DIR.to_string())
} else {
args.data_dir.clone()
};
let tls_cert = if args.tls_cert == PathBuf::from(DEFAULT_TLS_CERT) {
file.tls_cert
.clone()
.unwrap_or_else(|| PathBuf::from(DEFAULT_TLS_CERT))
} else {
args.tls_cert.clone()
};
let tls_key = if args.tls_key == PathBuf::from(DEFAULT_TLS_KEY) {
file.tls_key
.clone()
.unwrap_or_else(|| PathBuf::from(DEFAULT_TLS_KEY))
} else {
args.tls_key.clone()
};
let auth_token = if args.auth_token.is_some() {
args.auth_token.clone()
} else {
file.auth_token.clone()
};
let allow_insecure_auth = if args.allow_insecure_auth {
true
} else {
file.allow_insecure_auth.unwrap_or(false)
};
let sealed_sender = args.sealed_sender || file.sealed_sender.unwrap_or(false);
let store_backend = if args.store_backend == DEFAULT_STORE_BACKEND {
file.store_backend
.clone()
.unwrap_or_else(|| DEFAULT_STORE_BACKEND.to_string())
} else {
args.store_backend.clone()
};
let db_path = if args.db_path == PathBuf::from(DEFAULT_DB_PATH) {
file.db_path
.clone()
.unwrap_or_else(|| PathBuf::from(DEFAULT_DB_PATH))
} else {
args.db_path.clone()
};
let db_key = if args.db_key.is_empty() {
file.db_key.clone().unwrap_or_else(|| args.db_key.clone())
} else {
args.db_key.clone()
};
let metrics_listen = args
.metrics_listen
.clone()
.or_else(|| file.metrics_listen.clone());
let metrics_enabled = args
.metrics_enabled
.or(file.metrics_enabled)
.unwrap_or(metrics_listen.is_some());
EffectiveConfig {
listen,
data_dir,
tls_cert,
tls_key,
auth_token,
allow_insecure_auth,
sealed_sender,
store_backend,
db_path,
db_key,
metrics_listen,
metrics_enabled,
}
}
pub fn validate_production_config(effective: &EffectiveConfig) -> anyhow::Result<()> {
let token = effective
.auth_token
.as_deref()
.filter(|s| !s.is_empty())
.ok_or_else(|| {
anyhow::anyhow!("production requires QUICNPROTOCHAT_AUTH_TOKEN (non-empty)")
})?;
if token == "devtoken" {
anyhow::bail!(
"production forbids auth_token 'devtoken'; set a strong QUICNPROTOCHAT_AUTH_TOKEN"
);
}
if effective.store_backend == "sql" && effective.db_key.is_empty() {
anyhow::bail!("production with store_backend=sql requires non-empty QUICNPROTOCHAT_DB_KEY");
}
if !effective.tls_cert.exists() || !effective.tls_key.exists() {
anyhow::bail!(
"production requires existing TLS cert and key (no auto-generation); provide QUICNPROTOCHAT_TLS_CERT and QUICNPROTOCHAT_TLS_KEY"
);
}
Ok(())
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,49 @@
//! Prometheus metrics for the server.
//!
//! All counters/histograms/gauges use the `metrics` crate and are exported
//! via metrics-exporter-prometheus on a configurable HTTP port (e.g. /metrics).
/// Record one enqueue (success). Call after a message is enqueued.
pub fn record_enqueue_total() {
metrics::counter!("enqueue_total").increment(1);
}
/// Record enqueued payload size in bytes.
pub fn record_enqueue_bytes(bytes: u64) {
metrics::counter!("enqueue_bytes_total").increment(bytes);
}
/// Record one fetch (success). Call when fetch returns.
pub fn record_fetch_total() {
metrics::counter!("fetch_total").increment(1);
}
/// Record one fetch_wait (success). Call when fetch_wait returns.
pub fn record_fetch_wait_total() {
metrics::counter!("fetch_wait_total").increment(1);
}
/// Set the delivery queue depth gauge (sample). Updated at enqueue/fetch time.
pub fn record_delivery_queue_depth(depth: usize) {
metrics::gauge!("delivery_queue_depth").set(depth as f64);
}
/// Record one KeyPackage upload (success).
pub fn record_key_package_upload_total() {
metrics::counter!("key_package_upload_total").increment(1);
}
/// Record successful auth login (session token issued).
pub fn record_auth_login_success_total() {
metrics::counter!("auth_login_success_total").increment(1);
}
/// Record failed auth login attempt.
pub fn record_auth_login_failure_total() {
metrics::counter!("auth_login_failure_total").increment(1);
}
/// Record rate limit hit (enqueue rejected).
pub fn record_rate_limit_hit_total() {
metrics::counter!("rate_limit_hit_total").increment(1);
}

View File

@@ -0,0 +1,351 @@
use capnp::capability::Promise;
use opaque_ke::{
CredentialFinalization, CredentialRequest, RegistrationRequest, RegistrationUpload,
ServerLogin, ServerRegistration,
};
use quicnprotochat_core::opaque_auth::OpaqueSuite;
use quicnprotochat_proto::node_capnp::node_service;
use crate::auth::{coded_error, current_timestamp, PendingLogin, SESSION_TTL_SECS};
use crate::error_codes::*;
use crate::metrics;
use crate::storage::StorageError;
use super::NodeServiceImpl;
// Audit events in this module must never include secrets (no session tokens, passwords, or raw keys).
fn storage_err(err: StorageError) -> capnp::Error {
coded_error(E009_STORAGE_ERROR, err)
}
impl NodeServiceImpl {
pub fn handle_opaque_login_start(
&mut self,
params: node_service::OpaqueLoginStartParams,
mut results: node_service::OpaqueLoginStartResults,
) -> Promise<(), capnp::Error> {
let p = match params.get() {
Ok(p) => p,
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
};
let username = match p.get_username() {
Ok(v) => v.to_string().unwrap_or_default().to_string(),
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
};
let request_bytes = match p.get_request() {
Ok(v) => v.to_vec(),
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
};
if username.is_empty() {
return Promise::err(coded_error(E011_USERNAME_EMPTY, "username must not be empty"));
}
let credential_request = match CredentialRequest::<OpaqueSuite>::deserialize(&request_bytes) {
Ok(r) => r,
Err(e) => {
return Promise::err(coded_error(
E010_OPAQUE_ERROR,
format!("invalid credential request: {e}"),
))
}
};
let password_file = match self.store.get_user_record(&username) {
Ok(Some(bytes)) => match ServerRegistration::<OpaqueSuite>::deserialize(&bytes) {
Ok(pf) => Some(pf),
Err(e) => {
return Promise::err(coded_error(
E010_OPAQUE_ERROR,
format!("corrupt user record: {e}"),
))
}
},
Ok(None) => {
return Promise::err(coded_error(E010_OPAQUE_ERROR, "user not registered"))
}
Err(e) => return Promise::err(storage_err(e)),
};
let mut rng = rand::rngs::OsRng;
let result = match ServerLogin::<OpaqueSuite>::start(
&mut rng,
&self.opaque_setup,
password_file,
credential_request,
username.as_bytes(),
Default::default(),
) {
Ok(r) => r,
Err(e) => {
return Promise::err(coded_error(
E010_OPAQUE_ERROR,
format!("OPAQUE login start failed: {e}"),
))
}
};
let state_bytes = result.state.serialize().to_vec();
self.pending_logins.insert(
username.clone(),
PendingLogin {
state_bytes,
created_at: current_timestamp(),
},
);
let response_bytes = result.message.serialize();
results.get().set_response(&response_bytes);
tracing::info!(user = %username, "OPAQUE login started");
Promise::ok(())
}
pub fn handle_opaque_register_start(
&mut self,
params: node_service::OpaqueRegisterStartParams,
mut results: node_service::OpaqueRegisterStartResults,
) -> Promise<(), capnp::Error> {
let p = match params.get() {
Ok(p) => p,
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
};
let username = match p.get_username() {
Ok(v) => v.to_string().unwrap_or_default().to_string(),
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
};
let request_bytes = match p.get_request() {
Ok(v) => v.to_vec(),
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
};
if username.is_empty() {
return Promise::err(coded_error(E011_USERNAME_EMPTY, "username must not be empty"));
}
if let Ok(true) = self.store.has_user_record(&username) {
return Promise::err(coded_error(
E018_USER_EXISTS,
format!("user '{}' already registered", username),
));
}
let registration_request = match RegistrationRequest::<OpaqueSuite>::deserialize(&request_bytes) {
Ok(r) => r,
Err(e) => {
return Promise::err(coded_error(
E010_OPAQUE_ERROR,
format!("invalid registration request: {e}"),
))
}
};
let result = match ServerRegistration::<OpaqueSuite>::start(
&self.opaque_setup,
registration_request,
username.as_bytes(),
) {
Ok(r) => r,
Err(e) => {
return Promise::err(coded_error(
E010_OPAQUE_ERROR,
format!("OPAQUE registration start failed: {e}"),
))
}
};
let response_bytes = result.message.serialize();
results.get().set_response(&response_bytes);
tracing::info!(user = %username, "OPAQUE registration started");
Promise::ok(())
}
pub fn handle_opaque_login_finish(
&mut self,
params: node_service::OpaqueLoginFinishParams,
mut results: node_service::OpaqueLoginFinishResults,
) -> Promise<(), capnp::Error> {
let p = match params.get() {
Ok(p) => p,
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
};
let username = match p.get_username() {
Ok(v) => v.to_string().unwrap_or_default().to_string(),
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
};
let finalization_bytes = match p.get_finalization() {
Ok(v) => v.to_vec(),
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
};
let identity_key = p.get_identity_key().unwrap_or_default().to_vec();
if username.is_empty() {
return Promise::err(coded_error(E011_USERNAME_EMPTY, "username must not be empty"));
}
let pending = match self.pending_logins.remove(&username) {
Some((_, pl)) => pl,
None => {
// Audit: login failure — do not log secrets (no token, no password).
tracing::warn!(user = %username, "audit: auth login failure (no pending login)");
metrics::record_auth_login_failure_total();
return Promise::err(coded_error(E019_NO_PENDING_LOGIN, "no pending login for this username"))
}
};
let server_login = match ServerLogin::<OpaqueSuite>::deserialize(&pending.state_bytes) {
Ok(s) => s,
Err(e) => {
return Promise::err(coded_error(
E010_OPAQUE_ERROR,
format!("corrupt login state: {e}"),
))
}
};
let finalization = match CredentialFinalization::<OpaqueSuite>::deserialize(&finalization_bytes) {
Ok(f) => f,
Err(e) => {
return Promise::err(coded_error(
E010_OPAQUE_ERROR,
format!("invalid credential finalization: {e}"),
))
}
};
let _result = match server_login.finish(finalization, Default::default()) {
Ok(r) => r,
Err(e) => {
tracing::warn!(user = %username, "audit: auth login failure (OPAQUE finish failed)");
metrics::record_auth_login_failure_total();
return Promise::err(coded_error(
E010_OPAQUE_ERROR,
format!("OPAQUE login finish failed (bad password?): {e}"),
))
}
};
if identity_key.is_empty() {
metrics::record_auth_login_failure_total();
return Promise::err(coded_error(
E016_IDENTITY_MISMATCH,
"identity key required to bind session token",
));
}
if let Ok(Some(stored_ik)) = self.store.get_user_identity_key(&username) {
if stored_ik != identity_key {
tracing::warn!(user = %username, "audit: auth login failure (identity mismatch)");
metrics::record_auth_login_failure_total();
return Promise::err(coded_error(
E016_IDENTITY_MISMATCH,
"identity key does not match registered key",
));
}
}
let mut token = [0u8; 32];
rand::RngCore::fill_bytes(&mut rand::rngs::OsRng, &mut token);
let token_vec = token.to_vec();
let now = current_timestamp();
self.sessions.insert(
token_vec.clone(),
crate::auth::SessionInfo {
username: username.clone(),
identity_key,
created_at: now,
expires_at: now + SESSION_TTL_SECS,
},
);
results.get().set_session_token(&token_vec);
// Audit: login success — do not log session token or any secrets.
metrics::record_auth_login_success_total();
tracing::info!(user = %username, "audit: auth login success — session token issued");
Promise::ok(())
}
pub fn handle_opaque_register_finish(
&mut self,
params: node_service::OpaqueRegisterFinishParams,
mut results: node_service::OpaqueRegisterFinishResults,
) -> Promise<(), capnp::Error> {
let p = match params.get() {
Ok(p) => p,
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
};
let username = match p.get_username() {
Ok(v) => v.to_string().unwrap_or_default().to_string(),
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
};
let upload_bytes = match p.get_upload() {
Ok(v) => v.to_vec(),
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
};
let identity_key = p.get_identity_key().unwrap_or_default().to_vec();
if username.is_empty() {
return Promise::err(coded_error(E011_USERNAME_EMPTY, "username must not be empty"));
}
let _request = match RegistrationRequest::<OpaqueSuite>::deserialize(&upload_bytes) {
Ok(r) => r,
Err(e) => {
return Promise::err(coded_error(
E010_OPAQUE_ERROR,
format!("invalid registration upload: {e}"),
))
}
};
match self.store.has_user_record(&username) {
Ok(true) => {
return Promise::err(coded_error(
E018_USER_EXISTS,
format!("user '{}' already registered", username),
))
}
Err(e) => return Promise::err(storage_err(e)),
_ => {}
}
let upload = match RegistrationUpload::<OpaqueSuite>::deserialize(&upload_bytes) {
Ok(u) => u,
Err(e) => {
return Promise::err(coded_error(
E010_OPAQUE_ERROR,
format!("invalid registration upload: {e}"),
))
}
};
let password_file = ServerRegistration::<OpaqueSuite>::finish(upload);
let record_bytes = password_file.serialize().to_vec();
if let Err(e) = self
.store
.store_user_record(&username, record_bytes)
.map_err(storage_err)
{
return Promise::err(e);
}
if !identity_key.is_empty() {
if let Err(e) = self
.store
.store_user_identity_key(&username, identity_key)
.map_err(storage_err)
{
return Promise::err(e);
}
}
results.get().set_success(true);
tracing::info!(user = %username, "OPAQUE registration complete");
Promise::ok(())
}
}

View File

@@ -0,0 +1,318 @@
use std::sync::Arc;
use std::time::Duration;
use capnp::capability::Promise;
use dashmap::DashMap;
use quicnprotochat_proto::node_capnp::node_service;
use tokio::sync::Notify;
use tokio::time::timeout;
use crate::auth::{
check_rate_limit, coded_error, fmt_hex, require_identity_or_request, validate_auth_context,
};
use crate::error_codes::*;
use crate::metrics;
use crate::storage::{StorageError, Store};
use super::{NodeServiceImpl, CURRENT_WIRE_VERSION};
// Audit events here must not include secrets: no payload content, no full recipient/token bytes (prefix only).
const MAX_PAYLOAD_BYTES: usize = 5 * 1024 * 1024; // 5 MB cap per message
const MAX_QUEUE_DEPTH: usize = 1000;
fn storage_err(err: StorageError) -> capnp::Error {
coded_error(E009_STORAGE_ERROR, err)
}
pub fn fill_payloads_wait(
results: &mut node_service::FetchWaitResults,
messages: Vec<(u64, Vec<u8>)>,
) {
let mut list = results.get().init_payloads(messages.len() as u32);
for (i, (seq, data)) in messages.iter().enumerate() {
let mut entry = list.reborrow().get(i as u32);
entry.set_seq(*seq);
entry.set_data(data);
}
}
impl NodeServiceImpl {
pub fn handle_enqueue(
&mut self,
params: node_service::EnqueueParams,
mut results: node_service::EnqueueResults,
) -> Promise<(), capnp::Error> {
let p = match params.get() {
Ok(p) => p,
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
};
let recipient_key = match p.get_recipient_key() {
Ok(v) => v.to_vec(),
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
};
let payload = match p.get_payload() {
Ok(v) => v.to_vec(),
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
};
let channel_id = p.get_channel_id().unwrap_or_default().to_vec();
let version = p.get_version();
let auth_ctx = match validate_auth_context(&self.auth_cfg, &self.sessions, p.get_auth()) {
Ok(ctx) => ctx,
Err(e) => return Promise::err(e),
};
if recipient_key.len() != 32 {
return Promise::err(coded_error(
E004_IDENTITY_KEY_LENGTH,
format!("recipientKey must be exactly 32 bytes, got {}", recipient_key.len()),
));
}
if payload.is_empty() {
return Promise::err(coded_error(E005_PAYLOAD_EMPTY, "payload must not be empty"));
}
if payload.len() > MAX_PAYLOAD_BYTES {
return Promise::err(coded_error(
E006_PAYLOAD_TOO_LARGE,
format!("payload exceeds max size ({} bytes)", MAX_PAYLOAD_BYTES),
));
}
if version != CURRENT_WIRE_VERSION {
return Promise::err(coded_error(
E012_WIRE_VERSION,
format!("unsupported wire version {} (expected {CURRENT_WIRE_VERSION})", version),
));
}
if let Err(e) = check_rate_limit(&self.rate_limits, &auth_ctx.token) {
// Audit: rate limit hit — do not log token or identity.
tracing::warn!("rate_limit_hit");
metrics::record_rate_limit_hit_total();
return Promise::err(e);
}
// When sealed_sender is true, enqueue does not require identity; valid token only.
if !self.sealed_sender {
if let Err(e) = require_identity_or_request(
&auth_ctx,
&recipient_key,
self.auth_cfg.allow_insecure_identity_from_request,
) {
return Promise::err(e);
}
}
match self.store.queue_depth(&recipient_key, &channel_id) {
Ok(depth) if depth >= MAX_QUEUE_DEPTH => {
return Promise::err(coded_error(
E015_QUEUE_FULL,
format!("queue depth {} exceeds limit {}", depth, MAX_QUEUE_DEPTH),
));
}
Err(e) => return Promise::err(storage_err(e)),
_ => {}
}
let payload_len = payload.len();
let seq = match self
.store
.enqueue(&recipient_key, &channel_id, payload)
.map_err(storage_err)
{
Ok(seq) => seq,
Err(e) => return Promise::err(e),
};
results.get().set_seq(seq);
// Metrics and audit. Audit events must not include secrets (no payload, no full keys).
metrics::record_enqueue_total();
metrics::record_enqueue_bytes(payload_len as u64);
if let Ok(depth) = self.store.queue_depth(&recipient_key, &channel_id) {
metrics::record_delivery_queue_depth(depth);
}
tracing::info!(
recipient_prefix = %fmt_hex(&recipient_key[..4]),
payload_len = payload_len,
seq = seq,
"audit: enqueue"
);
crate::auth::waiter(&self.waiters, &recipient_key).notify_waiters();
Promise::ok(())
}
pub fn handle_fetch(
&mut self,
params: node_service::FetchParams,
mut results: node_service::FetchResults,
) -> Promise<(), capnp::Error> {
let recipient_key = match params.get() {
Ok(p) => match p.get_recipient_key() {
Ok(v) => v.to_vec(),
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
},
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
};
let channel_id = params
.get()
.ok()
.and_then(|p| p.get_channel_id().ok())
.map(|c| c.to_vec())
.unwrap_or_default();
let version = params
.get()
.ok()
.map(|p| p.get_version())
.unwrap_or(CURRENT_WIRE_VERSION);
let limit = params.get().ok().map(|p| p.get_limit()).unwrap_or(0);
let auth_ctx = match params
.get()
.ok()
.map(|p| validate_auth_context(&self.auth_cfg, &self.sessions, p.get_auth()))
.transpose()
{
Ok(ctx) => ctx,
Err(e) => return Promise::err(e),
};
if recipient_key.len() != 32 {
return Promise::err(coded_error(
E004_IDENTITY_KEY_LENGTH,
format!("recipientKey must be exactly 32 bytes, got {}", recipient_key.len()),
));
}
if version != CURRENT_WIRE_VERSION {
return Promise::err(coded_error(
E012_WIRE_VERSION,
format!("unsupported wire version {} (expected {CURRENT_WIRE_VERSION})", version),
));
}
let auth_ctx = match auth_ctx {
Some(ctx) => ctx,
None => return Promise::err(coded_error(E003_INVALID_TOKEN, "auth required")),
};
if let Err(e) = require_identity_or_request(
&auth_ctx,
&recipient_key,
self.auth_cfg.allow_insecure_identity_from_request,
) {
return Promise::err(e);
}
let messages = if limit > 0 {
match self
.store
.fetch_limited(&recipient_key, &channel_id, limit as usize)
.map_err(storage_err)
{
Ok(m) => m,
Err(e) => return Promise::err(e),
}
} else {
match self
.store
.fetch(&recipient_key, &channel_id)
.map_err(storage_err)
{
Ok(m) => m,
Err(e) => return Promise::err(e),
}
};
// Audit: fetch — do not log payload or full keys.
metrics::record_fetch_total();
tracing::info!(
recipient_prefix = %fmt_hex(&recipient_key[..4]),
count = messages.len(),
"audit: fetch"
);
let mut list = results.get().init_payloads(messages.len() as u32);
for (i, (seq, data)) in messages.iter().enumerate() {
let mut entry = list.reborrow().get(i as u32);
entry.set_seq(*seq);
entry.set_data(data);
}
Promise::ok(())
}
pub fn handle_fetch_wait(
&mut self,
params: node_service::FetchWaitParams,
mut results: node_service::FetchWaitResults,
) -> Promise<(), capnp::Error> {
let p = match params.get() {
Ok(p) => p,
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
};
let recipient_key = match p.get_recipient_key() {
Ok(v) => v.to_vec(),
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
};
let channel_id = p.get_channel_id().unwrap_or_default().to_vec();
let version = p.get_version();
let timeout_ms = p.get_timeout_ms();
let limit = p.get_limit();
let auth_ctx = match validate_auth_context(&self.auth_cfg, &self.sessions, p.get_auth()) {
Ok(ctx) => ctx,
Err(e) => return Promise::err(e),
};
if recipient_key.len() != 32 {
return Promise::err(coded_error(
E004_IDENTITY_KEY_LENGTH,
format!("recipientKey must be exactly 32 bytes, got {}", recipient_key.len()),
));
}
if version != CURRENT_WIRE_VERSION {
return Promise::err(coded_error(
E012_WIRE_VERSION,
format!("unsupported wire version {} (expected {CURRENT_WIRE_VERSION})", version),
));
}
if let Err(e) = require_identity_or_request(
&auth_ctx,
&recipient_key,
self.auth_cfg.allow_insecure_identity_from_request,
) {
return Promise::err(e);
}
let store = Arc::clone(&self.store);
let waiters: Arc<DashMap<Vec<u8>, Arc<Notify>>> = self.waiters.clone();
Promise::from_future(async move {
let fetch_fn = |s: &Arc<dyn Store>, rk: &[u8], ch: &[u8], lim: u32| -> Result<Vec<(u64, Vec<u8>)>, capnp::Error> {
if lim > 0 {
s.fetch_limited(rk, ch, lim as usize).map_err(storage_err)
} else {
s.fetch(rk, ch).map_err(storage_err)
}
};
let messages = fetch_fn(&store, &recipient_key, &channel_id, limit)?;
if messages.is_empty() && timeout_ms > 0 {
let waiter = waiters
.entry(recipient_key.clone())
.or_insert_with(|| Arc::new(Notify::new()))
.clone();
let _ = timeout(Duration::from_millis(timeout_ms), waiter.notified()).await;
let msgs = fetch_fn(&store, &recipient_key, &channel_id, limit)?;
fill_payloads_wait(&mut results, msgs);
metrics::record_fetch_wait_total();
return Ok(());
}
fill_payloads_wait(&mut results, messages);
metrics::record_fetch_wait_total();
Ok(())
})
}
}

View File

@@ -0,0 +1,259 @@
use capnp::capability::Promise;
use quicnprotochat_proto::node_capnp::node_service;
use crate::auth::{coded_error, fmt_hex, require_identity_or_request, validate_auth_context};
use crate::error_codes::*;
use crate::metrics;
use crate::storage::StorageError;
use super::NodeServiceImpl;
fn storage_err(err: StorageError) -> capnp::Error {
coded_error(E009_STORAGE_ERROR, err)
}
const MAX_KEYPACKAGE_BYTES: usize = 1 * 1024 * 1024; // 1 MB cap per KeyPackage
impl NodeServiceImpl {
pub fn handle_upload_key_package(
&mut self,
params: node_service::UploadKeyPackageParams,
mut results: node_service::UploadKeyPackageResults,
) -> Promise<(), capnp::Error> {
let (auth_ctx, identity_key, package) = match params.get() {
Ok(p) => {
let auth_ctx = match validate_auth_context(&self.auth_cfg, &self.sessions, p.get_auth()) {
Ok(ctx) => ctx,
Err(e) => return Promise::err(e),
};
let ik = match p.get_identity_key() {
Ok(v) => v.to_vec(),
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
};
let pkg = match p.get_package() {
Ok(v) => v.to_vec(),
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
};
(auth_ctx, ik, pkg)
}
Err(e) => return Promise::err(e),
};
if identity_key.len() != 32 {
return Promise::err(coded_error(
E004_IDENTITY_KEY_LENGTH,
format!("identityKey must be exactly 32 bytes, got {}", identity_key.len()),
));
}
if package.is_empty() {
return Promise::err(coded_error(E007_PACKAGE_EMPTY, "package must not be empty"));
}
if package.len() > MAX_KEYPACKAGE_BYTES {
return Promise::err(coded_error(
E008_PACKAGE_TOO_LARGE,
format!("package exceeds max size ({} bytes)", MAX_KEYPACKAGE_BYTES),
));
}
if let Err(e) = require_identity_or_request(
&auth_ctx,
&identity_key,
self.auth_cfg.allow_insecure_identity_from_request,
) {
return Promise::err(e);
}
if let Err(e) = quicnprotochat_core::validate_keypackage_ciphersuite(&package) {
return Promise::err(coded_error(
E021_CIPHERSUITE_NOT_ALLOWED,
format!("KeyPackage ciphersuite not allowed: {e}"),
));
}
let fingerprint: Vec<u8> = crate::auth::fingerprint(&package);
if let Err(e) = self
.store
.upload_key_package(&identity_key, package)
.map_err(storage_err)
{
return Promise::err(e);
}
results.get().set_fingerprint(&fingerprint);
metrics::record_key_package_upload_total();
// Audit: KeyPackage upload — only fingerprint prefix, no secrets.
tracing::info!(
identity_prefix = %fmt_hex(&identity_key[..4]),
fingerprint_prefix = %fmt_hex(&fingerprint[..4]),
"audit: key_package_upload"
);
Promise::ok(())
}
pub fn handle_fetch_key_package(
&mut self,
params: node_service::FetchKeyPackageParams,
mut results: node_service::FetchKeyPackageResults,
) -> Promise<(), capnp::Error> {
let identity_key = match params.get() {
Ok(p) => match p.get_identity_key() {
Ok(v) => v.to_vec(),
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
},
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
};
if let Err(e) = params
.get()
.ok()
.map(|p| crate::auth::validate_auth(&self.auth_cfg, &self.sessions, p.get_auth()))
.transpose()
{
return Promise::err(e);
}
if identity_key.len() != 32 {
return Promise::err(coded_error(
E004_IDENTITY_KEY_LENGTH,
format!("identityKey must be exactly 32 bytes, got {}", identity_key.len()),
));
}
let package = match self
.store
.fetch_key_package(&identity_key)
.map_err(storage_err)
{
Ok(p) => p,
Err(e) => return Promise::err(e),
};
match package {
Some(pkg) => {
tracing::debug!(identity = %fmt_hex(&identity_key[..4]), "KeyPackage fetched");
results.get().set_package(&pkg);
}
None => {
tracing::debug!(
identity = %fmt_hex(&identity_key[..4]),
"no KeyPackage available for identity"
);
results.get().set_package(&[]);
}
}
Promise::ok(())
}
pub fn handle_upload_hybrid_key(
&mut self,
params: node_service::UploadHybridKeyParams,
_results: node_service::UploadHybridKeyResults,
) -> Promise<(), capnp::Error> {
let p = match params.get() {
Ok(p) => p,
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
};
let identity_key = match p.get_identity_key() {
Ok(v) => v.to_vec(),
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
};
let hybrid_pk = match p.get_hybrid_public_key() {
Ok(v) => v.to_vec(),
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
};
let auth_ctx = match validate_auth_context(&self.auth_cfg, &self.sessions, p.get_auth()) {
Ok(ctx) => ctx,
Err(e) => return Promise::err(e),
};
if identity_key.len() != 32 {
return Promise::err(coded_error(
E004_IDENTITY_KEY_LENGTH,
format!("identityKey must be exactly 32 bytes, got {}", identity_key.len()),
));
}
if hybrid_pk.is_empty() {
return Promise::err(coded_error(E013_HYBRID_KEY_EMPTY, "hybridPublicKey must not be empty"));
}
if let Err(e) = require_identity_or_request(
&auth_ctx,
&identity_key,
self.auth_cfg.allow_insecure_identity_from_request,
) {
return Promise::err(e);
}
if let Err(e) = self
.store
.upload_hybrid_key(&identity_key, hybrid_pk)
.map_err(storage_err)
{
return Promise::err(e);
}
tracing::debug!(identity = %fmt_hex(&identity_key[..4]), "hybrid public key uploaded");
Promise::ok(())
}
pub fn handle_fetch_hybrid_key(
&mut self,
params: node_service::FetchHybridKeyParams,
mut results: node_service::FetchHybridKeyResults,
) -> Promise<(), capnp::Error> {
let p = match params.get() {
Ok(p) => p,
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
};
let identity_key = match p.get_identity_key() {
Ok(v) => v.to_vec(),
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
};
let auth_ctx = match validate_auth_context(&self.auth_cfg, &self.sessions, p.get_auth()) {
Ok(ctx) => ctx,
Err(e) => return Promise::err(e),
};
if identity_key.len() != 32 {
return Promise::err(coded_error(
E004_IDENTITY_KEY_LENGTH,
format!("identityKey must be exactly 32 bytes, got {}", identity_key.len()),
));
}
if let Err(e) = require_identity_or_request(
&auth_ctx,
&identity_key,
self.auth_cfg.allow_insecure_identity_from_request,
) {
return Promise::err(e);
}
let hybrid_pk = match self
.store
.fetch_hybrid_key(&identity_key)
.map_err(storage_err)
{
Ok(p) => p,
Err(e) => return Promise::err(e),
};
match hybrid_pk {
Some(pk) => {
tracing::debug!(identity = %fmt_hex(&identity_key[..4]), "hybrid key fetched");
results.get().set_hybrid_public_key(&pk);
}
None => {
tracing::debug!(identity = %fmt_hex(&identity_key[..4]), "no hybrid key for identity");
results.get().set_hybrid_public_key(&[]);
}
}
Promise::ok(())
}
}

View File

@@ -0,0 +1,246 @@
use std::sync::Arc;
use std::time::Duration;
use capnp_rpc::RpcSystem;
use dashmap::DashMap;
use opaque_ke::ServerSetup;
use quicnprotochat_core::opaque_auth::OpaqueSuite;
use quicnprotochat_proto::node_capnp::node_service;
use tokio::sync::Notify;
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
use crate::auth::{
current_timestamp, AuthConfig, PendingLogin, RateEntry, SessionInfo,
PENDING_LOGIN_TTL_SECS, RATE_LIMIT_WINDOW_SECS,
};
use crate::storage::Store;
mod auth_ops;
mod delivery;
mod key_ops;
mod p2p_ops;
impl node_service::Server for NodeServiceImpl {
fn upload_key_package(
&mut self,
params: node_service::UploadKeyPackageParams,
results: node_service::UploadKeyPackageResults,
) -> capnp::capability::Promise<(), capnp::Error> {
self.handle_upload_key_package(params, results)
}
fn fetch_key_package(
&mut self,
params: node_service::FetchKeyPackageParams,
results: node_service::FetchKeyPackageResults,
) -> capnp::capability::Promise<(), capnp::Error> {
self.handle_fetch_key_package(params, results)
}
fn enqueue(
&mut self,
params: node_service::EnqueueParams,
results: node_service::EnqueueResults,
) -> capnp::capability::Promise<(), capnp::Error> {
self.handle_enqueue(params, results)
}
fn fetch(
&mut self,
params: node_service::FetchParams,
results: node_service::FetchResults,
) -> capnp::capability::Promise<(), capnp::Error> {
self.handle_fetch(params, results)
}
fn fetch_wait(
&mut self,
params: node_service::FetchWaitParams,
results: node_service::FetchWaitResults,
) -> capnp::capability::Promise<(), capnp::Error> {
self.handle_fetch_wait(params, results)
}
fn health(
&mut self,
params: node_service::HealthParams,
results: node_service::HealthResults,
) -> capnp::capability::Promise<(), capnp::Error> {
self.handle_health(params, results)
}
fn upload_hybrid_key(
&mut self,
params: node_service::UploadHybridKeyParams,
results: node_service::UploadHybridKeyResults,
) -> capnp::capability::Promise<(), capnp::Error> {
self.handle_upload_hybrid_key(params, results)
}
fn fetch_hybrid_key(
&mut self,
params: node_service::FetchHybridKeyParams,
results: node_service::FetchHybridKeyResults,
) -> capnp::capability::Promise<(), capnp::Error> {
self.handle_fetch_hybrid_key(params, results)
}
fn opaque_login_start(
&mut self,
params: node_service::OpaqueLoginStartParams,
results: node_service::OpaqueLoginStartResults,
) -> capnp::capability::Promise<(), capnp::Error> {
self.handle_opaque_login_start(params, results)
}
fn opaque_register_start(
&mut self,
params: node_service::OpaqueRegisterStartParams,
results: node_service::OpaqueRegisterStartResults,
) -> capnp::capability::Promise<(), capnp::Error> {
self.handle_opaque_register_start(params, results)
}
fn opaque_login_finish(
&mut self,
params: node_service::OpaqueLoginFinishParams,
results: node_service::OpaqueLoginFinishResults,
) -> capnp::capability::Promise<(), capnp::Error> {
self.handle_opaque_login_finish(params, results)
}
fn opaque_register_finish(
&mut self,
params: node_service::OpaqueRegisterFinishParams,
results: node_service::OpaqueRegisterFinishResults,
) -> capnp::capability::Promise<(), capnp::Error> {
self.handle_opaque_register_finish(params, results)
}
fn publish_endpoint(
&mut self,
params: node_service::PublishEndpointParams,
results: node_service::PublishEndpointResults,
) -> capnp::capability::Promise<(), capnp::Error> {
self.handle_publish_endpoint(params, results)
}
fn resolve_endpoint(
&mut self,
params: node_service::ResolveEndpointParams,
results: node_service::ResolveEndpointResults,
) -> capnp::capability::Promise<(), capnp::Error> {
self.handle_resolve_endpoint(params, results)
}
}
pub const CURRENT_WIRE_VERSION: u16 = 1;
pub struct NodeServiceImpl {
pub store: Arc<dyn Store>,
pub waiters: Arc<DashMap<Vec<u8>, Arc<Notify>>>,
pub auth_cfg: Arc<AuthConfig>,
pub opaque_setup: Arc<ServerSetup<OpaqueSuite>>,
pub pending_logins: Arc<DashMap<String, PendingLogin>>,
pub sessions: Arc<DashMap<Vec<u8>, SessionInfo>>,
pub rate_limits: Arc<DashMap<Vec<u8>, RateEntry>>,
/// When true, enqueue does not require identity-bound session (Sealed Sender).
pub sealed_sender: bool,
}
impl NodeServiceImpl {
pub fn new(
store: Arc<dyn Store>,
waiters: Arc<DashMap<Vec<u8>, Arc<Notify>>>,
auth_cfg: Arc<AuthConfig>,
opaque_setup: Arc<ServerSetup<OpaqueSuite>>,
pending_logins: Arc<DashMap<String, PendingLogin>>,
sessions: Arc<DashMap<Vec<u8>, SessionInfo>>,
rate_limits: Arc<DashMap<Vec<u8>, RateEntry>>,
sealed_sender: bool,
) -> Self {
Self {
store,
waiters,
auth_cfg,
opaque_setup,
pending_logins,
sessions,
rate_limits,
sealed_sender,
}
}
}
pub async fn handle_node_connection(
connecting: quinn::Connecting,
store: Arc<dyn Store>,
waiters: Arc<DashMap<Vec<u8>, Arc<Notify>>>,
auth_cfg: Arc<AuthConfig>,
opaque_setup: Arc<ServerSetup<OpaqueSuite>>,
pending_logins: Arc<DashMap<String, PendingLogin>>,
sessions: Arc<DashMap<Vec<u8>, SessionInfo>>,
rate_limits: Arc<DashMap<Vec<u8>, RateEntry>>,
sealed_sender: bool,
) -> Result<(), anyhow::Error> {
let connection = connecting.await?;
tracing::info!(peer = %connection.remote_address(), "QUIC connected");
let (send, recv) = connection
.accept_bi()
.await
.map_err(|e| anyhow::anyhow!("failed to accept bi stream: {e}"))?;
let (reader, writer) = (recv.compat(), send.compat_write());
let network = capnp_rpc::twoparty::VatNetwork::new(
reader,
writer,
capnp_rpc::rpc_twoparty_capnp::Side::Server,
Default::default(),
);
let service: node_service::Client = capnp_rpc::new_client(NodeServiceImpl::new(
store,
waiters,
auth_cfg,
opaque_setup,
pending_logins,
sessions,
rate_limits,
sealed_sender,
));
RpcSystem::new(Box::new(network), Some(service.client))
.await
.map_err(|e| anyhow::anyhow!("NodeService RPC error: {e}"))
}
const MESSAGE_TTL_SECS: u64 = 7 * 24 * 60 * 60; // 7 days
pub fn spawn_cleanup_task(
sessions: Arc<DashMap<Vec<u8>, SessionInfo>>,
pending_logins: Arc<DashMap<String, PendingLogin>>,
rate_limits: Arc<DashMap<Vec<u8>, RateEntry>>,
store: Arc<dyn Store>,
) {
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(60));
loop {
interval.tick().await;
let now = current_timestamp();
sessions.retain(|_, info| info.expires_at > now);
pending_logins.retain(|_, pl| now - pl.created_at < PENDING_LOGIN_TTL_SECS);
rate_limits.retain(|_, entry| now - entry.window_start < RATE_LIMIT_WINDOW_SECS * 2);
match store.gc_expired_messages(MESSAGE_TTL_SECS) {
Ok(n) if n > 0 => {
tracing::debug!(expired = n, "garbage collected expired messages")
}
Err(e) => tracing::warn!(error = %e, "message GC failed"),
_ => {}
}
}
});
}

View File

@@ -0,0 +1,118 @@
use capnp::capability::Promise;
use quicnprotochat_proto::node_capnp::node_service;
use crate::auth::{
coded_error, fmt_hex, require_identity_or_request, validate_auth, validate_auth_context,
};
use crate::error_codes::*;
use crate::storage::StorageError;
use super::NodeServiceImpl;
fn storage_err(err: StorageError) -> capnp::Error {
coded_error(E009_STORAGE_ERROR, err)
}
impl NodeServiceImpl {
pub fn handle_health(
&mut self,
_params: node_service::HealthParams,
mut results: node_service::HealthResults,
) -> Promise<(), capnp::Error> {
results.get().set_status("ok");
Promise::ok(())
}
pub fn handle_publish_endpoint(
&mut self,
params: node_service::PublishEndpointParams,
_results: node_service::PublishEndpointResults,
) -> Promise<(), capnp::Error> {
let p = match params.get() {
Ok(p) => p,
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
};
let identity_key = match p.get_identity_key() {
Ok(v) => v.to_vec(),
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
};
let node_addr = match p.get_node_addr() {
Ok(v) => v.to_vec(),
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
};
let auth_ctx = match validate_auth_context(&self.auth_cfg, &self.sessions, p.get_auth()) {
Ok(ctx) => ctx,
Err(e) => return Promise::err(e),
};
if identity_key.len() != 32 {
return Promise::err(coded_error(
E004_IDENTITY_KEY_LENGTH,
format!("identityKey must be exactly 32 bytes, got {}", identity_key.len()),
));
}
if let Err(e) = require_identity_or_request(
&auth_ctx,
&identity_key,
self.auth_cfg.allow_insecure_identity_from_request,
) {
return Promise::err(e);
}
if let Err(e) = self
.store
.publish_endpoint(&identity_key, node_addr)
.map_err(storage_err)
{
return Promise::err(e);
}
tracing::debug!(identity = %fmt_hex(&identity_key[..4]), "endpoint published");
Promise::ok(())
}
pub fn handle_resolve_endpoint(
&mut self,
params: node_service::ResolveEndpointParams,
mut results: node_service::ResolveEndpointResults,
) -> Promise<(), capnp::Error> {
let p = match params.get() {
Ok(p) => p,
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
};
let identity_key = match p.get_identity_key() {
Ok(v) => v.to_vec(),
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
};
if let Err(e) = validate_auth(&self.auth_cfg, &self.sessions, p.get_auth()) {
return Promise::err(e);
}
if identity_key.len() != 32 {
return Promise::err(coded_error(
E004_IDENTITY_KEY_LENGTH,
format!("identityKey must be exactly 32 bytes, got {}", identity_key.len()),
));
}
let endpoint = match self
.store
.resolve_endpoint(&identity_key)
.map_err(storage_err)
{
Ok(e) => e,
Err(e) => return Promise::err(e),
};
if let Some(ep) = endpoint {
tracing::debug!(identity = %fmt_hex(&identity_key[..4]), "endpoint resolved");
results.get().set_node_addr(&ep);
} else {
results.get().set_node_addr(&[]);
}
Promise::ok(())
}
}

View File

@@ -7,6 +7,33 @@ use rusqlite::{params, Connection};
use crate::storage::{StorageError, Store};
/// Schema version after introducing the migration runner (existing DBs had 1).
const SCHEMA_VERSION: i32 = 3;
/// Migrations: (migration_number, SQL). Files named NNN_name.sql, applied in order when N > user_version.
const MIGRATIONS: &[(i32, &str)] = &[
(1, include_str!("../migrations/001_initial.sql")),
(3, include_str!("../migrations/002_add_seq.sql")),
];
/// Runs pending migrations on an open connection: applies any migration whose number is greater
/// than the current PRAGMA user_version, then sets user_version to SCHEMA_VERSION.
fn run_migrations(conn: &Connection) -> Result<(), StorageError> {
let current_version: i32 = conn
.pragma_query_value(None, "user_version", |row| row.get(0))
.map_err(|e| StorageError::Db(format!("PRAGMA user_version failed: {e}")))?;
for (migration_num, sql) in MIGRATIONS {
if *migration_num > current_version {
conn.execute_batch(sql).map_err(|e| StorageError::Db(e.to_string()))?;
}
}
conn.pragma_update(None, "user_version", SCHEMA_VERSION)
.map_err(|e| StorageError::Db(format!("set user_version failed: {e}")))?;
Ok(())
}
/// SQLCipher-encrypted storage backend.
pub struct SqlStore {
conn: Mutex<Connection>,
@@ -34,66 +61,21 @@ impl SqlStore {
)
.map_err(|e| StorageError::Db(e.to_string()))?;
let store = Self {
let current_version: i32 = conn
.pragma_query_value(None, "user_version", |row| row.get(0))
.map_err(|e| StorageError::Db(format!("PRAGMA user_version failed: {e}")))?;
if current_version > SCHEMA_VERSION {
return Err(StorageError::Db(format!(
"database schema version {current_version} is newer than supported {SCHEMA_VERSION}"
)));
}
run_migrations(&conn)?;
Ok(Self {
conn: Mutex::new(conn),
};
store.migrate()?;
Ok(store)
}
fn migrate(&self) -> Result<(), StorageError> {
let conn = self.lock_conn()?;
conn.execute_batch(
"CREATE TABLE IF NOT EXISTS key_packages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
identity_key BLOB NOT NULL,
package_data BLOB NOT NULL,
created_at INTEGER DEFAULT (strftime('%s','now'))
);
CREATE TABLE IF NOT EXISTS deliveries (
id INTEGER PRIMARY KEY AUTOINCREMENT,
recipient_key BLOB NOT NULL,
channel_id BLOB NOT NULL DEFAULT X'',
payload BLOB NOT NULL,
created_at INTEGER DEFAULT (strftime('%s','now'))
);
CREATE TABLE IF NOT EXISTS hybrid_keys (
identity_key BLOB PRIMARY KEY,
hybrid_public_key BLOB NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_kp_identity
ON key_packages(identity_key);
CREATE INDEX IF NOT EXISTS idx_del_recipient_channel
ON deliveries(recipient_key, channel_id);
CREATE TABLE IF NOT EXISTS server_setup (
id INTEGER PRIMARY KEY CHECK (id = 1),
setup_data BLOB NOT NULL
);
CREATE TABLE IF NOT EXISTS users (
username TEXT PRIMARY KEY,
opaque_record BLOB NOT NULL,
created_at INTEGER DEFAULT (strftime('%s','now'))
);
CREATE TABLE IF NOT EXISTS user_identity_keys (
username TEXT PRIMARY KEY,
identity_key BLOB NOT NULL
);
CREATE TABLE IF NOT EXISTS endpoints (
identity_key BLOB PRIMARY KEY,
node_addr BLOB NOT NULL,
updated_at INTEGER DEFAULT (strftime('%s','now'))
);",
)
.map_err(|e| StorageError::Db(e.to_string()))?;
Ok(())
})
}
}
@@ -146,37 +128,53 @@ impl Store for SqlStore {
recipient_key: &[u8],
channel_id: &[u8],
payload: Vec<u8>,
) -> Result<(), StorageError> {
) -> Result<u64, StorageError> {
let conn = self.lock_conn()?;
// Atomically get-and-increment the per-inbox sequence counter.
// RETURNING gives us the post-update next_seq; the assigned seq is next_seq - 1.
let seq: i64 = conn
.query_row(
"INSERT INTO delivery_seq_counters (recipient_key, channel_id, next_seq)
VALUES (?1, ?2, 1)
ON CONFLICT(recipient_key, channel_id) DO UPDATE SET next_seq = next_seq + 1
RETURNING next_seq - 1",
params![recipient_key, channel_id],
|row| row.get(0),
)
.map_err(|e| StorageError::Db(e.to_string()))?;
conn.execute(
"INSERT INTO deliveries (recipient_key, channel_id, payload) VALUES (?1, ?2, ?3)",
params![recipient_key, channel_id, payload],
"INSERT INTO deliveries (recipient_key, channel_id, seq, payload) VALUES (?1, ?2, ?3, ?4)",
params![recipient_key, channel_id, seq, payload],
)
.map_err(|e| StorageError::Db(e.to_string()))?;
Ok(())
Ok(seq as u64)
}
fn fetch(&self, recipient_key: &[u8], channel_id: &[u8]) -> Result<Vec<Vec<u8>>, StorageError> {
fn fetch(
&self,
recipient_key: &[u8],
channel_id: &[u8],
) -> Result<Vec<(u64, Vec<u8>)>, StorageError> {
let conn = self.lock_conn()?;
let mut stmt = conn
.prepare(
"SELECT id, payload FROM deliveries
"SELECT id, seq, payload FROM deliveries
WHERE recipient_key = ?1 AND channel_id = ?2
ORDER BY id ASC",
ORDER BY seq ASC",
)
.map_err(|e| StorageError::Db(e.to_string()))?;
let rows: Vec<(i64, Vec<u8>)> = stmt
let rows: Vec<(i64, i64, Vec<u8>)> = stmt
.query_map(params![recipient_key, channel_id], |row| {
Ok((row.get(0)?, row.get(1)?))
Ok((row.get(0)?, row.get(1)?, row.get(2)?))
})
.map_err(|e| StorageError::Db(e.to_string()))?
.collect::<Result<Vec<_>, _>>()
.map_err(|e| StorageError::Db(e.to_string()))?;
if !rows.is_empty() {
let ids: Vec<i64> = rows.iter().map(|(id, _)| *id).collect();
let ids: Vec<i64> = rows.iter().map(|(id, _, _)| *id).collect();
let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
let sql = format!("DELETE FROM deliveries WHERE id IN ({placeholders})");
let params: Vec<&dyn rusqlite::types::ToSql> = ids
@@ -187,7 +185,7 @@ impl Store for SqlStore {
.map_err(|e| StorageError::Db(e.to_string()))?;
}
Ok(rows.into_iter().map(|(_, payload)| payload).collect())
Ok(rows.into_iter().map(|(_, seq, payload)| (seq as u64, payload)).collect())
}
fn fetch_limited(
@@ -195,28 +193,28 @@ impl Store for SqlStore {
recipient_key: &[u8],
channel_id: &[u8],
limit: usize,
) -> Result<Vec<Vec<u8>>, StorageError> {
) -> Result<Vec<(u64, Vec<u8>)>, StorageError> {
let conn = self.lock_conn()?;
let mut stmt = conn
.prepare(
"SELECT id, payload FROM deliveries
"SELECT id, seq, payload FROM deliveries
WHERE recipient_key = ?1 AND channel_id = ?2
ORDER BY id ASC
ORDER BY seq ASC
LIMIT ?3",
)
.map_err(|e| StorageError::Db(e.to_string()))?;
let rows: Vec<(i64, Vec<u8>)> = stmt
let rows: Vec<(i64, i64, Vec<u8>)> = stmt
.query_map(params![recipient_key, channel_id, limit as i64], |row| {
Ok((row.get(0)?, row.get(1)?))
Ok((row.get(0)?, row.get(1)?, row.get(2)?))
})
.map_err(|e| StorageError::Db(e.to_string()))?
.collect::<Result<Vec<_>, _>>()
.map_err(|e| StorageError::Db(e.to_string()))?;
if !rows.is_empty() {
let ids: Vec<i64> = rows.iter().map(|(id, _)| *id).collect();
let ids: Vec<i64> = rows.iter().map(|(id, _, _)| *id).collect();
let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
let sql = format!("DELETE FROM deliveries WHERE id IN ({placeholders})");
let params: Vec<&dyn rusqlite::types::ToSql> = ids
@@ -227,7 +225,7 @@ impl Store for SqlStore {
.map_err(|e| StorageError::Db(e.to_string()))?;
}
Ok(rows.into_iter().map(|(_, payload)| payload).collect())
Ok(rows.into_iter().map(|(_, seq, payload)| (seq as u64, payload)).collect())
}
fn queue_depth(&self, recipient_key: &[u8], channel_id: &[u8]) -> Result<usize, StorageError> {
@@ -406,16 +404,34 @@ impl<T> OptionalExt<T> for Result<T, rusqlite::Error> {
#[cfg(test)]
mod tests {
use super::*;
use std::path::PathBuf;
fn open_in_memory() -> SqlStore {
SqlStore::open(":memory:", "").unwrap()
}
#[test]
fn sets_user_version_after_migrate() {
let dir = tempfile::tempdir().expect("tempdir");
let db_path: PathBuf = dir.path().join("store.db");
{
let store = SqlStore::open(&db_path, "").expect("open store");
let _guard = store.lock_conn().unwrap();
}
let conn = rusqlite::Connection::open(&db_path).expect("reopen db");
let version: i32 = conn
.pragma_query_value(None, "user_version", |row| row.get(0))
.expect("read user_version");
assert_eq!(version, SCHEMA_VERSION);
}
#[test]
fn key_package_fifo() {
let store = open_in_memory();
let mut identity = [0u8; 32];
identity[..31].copy_from_slice(b"alice_identity_key__32bytes_lon");
let identity = [1u8; 32];
store
.upload_key_package(&identity, b"kp1".to_vec())
@@ -441,11 +457,13 @@ mod tests {
let rk = [1u8; 32];
let ch = b"channel-1";
store.enqueue(&rk, ch, b"msg1".to_vec()).unwrap();
store.enqueue(&rk, ch, b"msg2".to_vec()).unwrap();
let seq0 = store.enqueue(&rk, ch, b"msg1".to_vec()).unwrap();
let seq1 = store.enqueue(&rk, ch, b"msg2".to_vec()).unwrap();
assert_eq!(seq0, 0);
assert_eq!(seq1, 1);
let msgs = store.fetch(&rk, ch).unwrap();
assert_eq!(msgs, vec![b"msg1".to_vec(), b"msg2".to_vec()]);
assert_eq!(msgs, vec![(0u64, b"msg1".to_vec()), (1u64, b"msg2".to_vec())]);
assert!(store.fetch(&rk, ch).unwrap().is_empty());
}
@@ -461,10 +479,10 @@ mod tests {
store.enqueue(&rk, ch, b"c".to_vec()).unwrap();
let msgs = store.fetch_limited(&rk, ch, 2).unwrap();
assert_eq!(msgs, vec![b"a".to_vec(), b"b".to_vec()]);
assert_eq!(msgs, vec![(0u64, b"a".to_vec()), (1u64, b"b".to_vec())]);
let remaining = store.fetch(&rk, ch).unwrap();
assert_eq!(remaining, vec![b"c".to_vec()]);
assert_eq!(remaining, vec![(2u64, b"c".to_vec())]);
}
#[test]
@@ -482,23 +500,23 @@ mod tests {
#[test]
fn has_user_record_check() {
let store = open_in_memory();
assert!(!store.has_user_record("alice").unwrap());
assert!(!store.has_user_record("user1").unwrap());
store
.store_user_record("alice", b"record".to_vec())
.store_user_record("user1", b"record".to_vec())
.unwrap();
assert!(store.has_user_record("alice").unwrap());
assert!(!store.has_user_record("bob").unwrap());
assert!(store.has_user_record("user1").unwrap());
assert!(!store.has_user_record("user2").unwrap());
}
#[test]
fn user_identity_key_round_trip() {
let store = open_in_memory();
assert!(store.get_user_identity_key("alice").unwrap().is_none());
assert!(store.get_user_identity_key("user1").unwrap().is_none());
store
.store_user_identity_key("alice", vec![1u8; 32])
.store_user_identity_key("user1", vec![1u8; 32])
.unwrap();
assert_eq!(
store.get_user_identity_key("alice").unwrap(),
store.get_user_identity_key("user1").unwrap(),
Some(vec![1u8; 32])
);
}
@@ -522,9 +540,9 @@ mod tests {
store.enqueue(&rk, b"ch-b", b"b1".to_vec()).unwrap();
let a_msgs = store.fetch(&rk, b"ch-a").unwrap();
assert_eq!(a_msgs, vec![b"a1".to_vec()]);
assert_eq!(a_msgs, vec![(0u64, b"a1".to_vec())]);
let b_msgs = store.fetch(&rk, b"ch-b").unwrap();
assert_eq!(b_msgs, vec![b"b1".to_vec()]);
assert_eq!(b_msgs, vec![(0u64, b"b1".to_vec())]);
}
}

View File

@@ -32,22 +32,30 @@ pub trait Store: Send + Sync {
fn fetch_key_package(&self, identity_key: &[u8]) -> Result<Option<Vec<u8>>, StorageError>;
/// Enqueue a payload and return the monotonically increasing per-inbox sequence number
/// assigned to this message. Clients sort by seq before MLS processing.
fn enqueue(
&self,
recipient_key: &[u8],
channel_id: &[u8],
payload: Vec<u8>,
) -> Result<(), StorageError>;
) -> Result<u64, StorageError>;
fn fetch(&self, recipient_key: &[u8], channel_id: &[u8]) -> Result<Vec<Vec<u8>>, StorageError>;
/// Fetch and drain all queued messages, returning `(seq, payload)` pairs ordered by seq.
fn fetch(
&self,
recipient_key: &[u8],
channel_id: &[u8],
) -> Result<Vec<(u64, Vec<u8>)>, StorageError>;
/// Fetch up to `limit` messages without draining the entire queue (Fix 8).
/// Returns `(seq, payload)` pairs ordered by seq.
fn fetch_limited(
&self,
recipient_key: &[u8],
channel_id: &[u8],
limit: usize,
) -> Result<Vec<Vec<u8>>, StorageError>;
) -> Result<Vec<(u64, Vec<u8>)>, StorageError>;
/// Return the number of queued messages for (recipient, channel) (Fix 7).
fn queue_depth(&self, recipient_key: &[u8], channel_id: &[u8]) -> Result<usize, StorageError>;
@@ -123,6 +131,19 @@ struct QueueMapV2 {
map: HashMap<ChannelKey, VecDeque<Vec<u8>>>,
}
#[derive(Serialize, Deserialize, Default, Clone)]
struct SeqEntry {
seq: u64,
data: Vec<u8>,
}
/// V3 delivery store: each queue entry carries a monotonic per-inbox sequence number.
#[derive(Serialize, Deserialize, Default)]
struct QueueMapV3 {
map: HashMap<ChannelKey, VecDeque<SeqEntry>>,
next_seq: HashMap<ChannelKey, u64>,
}
/// File-backed storage for KeyPackages and delivery queues.
///
/// Each mutation flushes the entire map to disk. Suitable for MVP-scale loads.
@@ -134,7 +155,7 @@ pub struct FileBackedStore {
users_path: PathBuf,
identity_keys_path: PathBuf,
key_packages: Mutex<HashMap<Vec<u8>, VecDeque<Vec<u8>>>>,
deliveries: Mutex<HashMap<ChannelKey, VecDeque<Vec<u8>>>>,
deliveries: Mutex<QueueMapV3>,
hybrid_keys: Mutex<HashMap<Vec<u8>, Vec<u8>>>,
users: Mutex<HashMap<String, Vec<u8>>>,
identity_keys: Mutex<HashMap<String, Vec<u8>>>,
@@ -155,7 +176,7 @@ impl FileBackedStore {
let identity_keys_path = dir.join("identity_keys.bin");
let key_packages = Mutex::new(Self::load_kp_map(&kp_path)?);
let deliveries = Mutex::new(Self::load_delivery_map(&ds_path)?);
let deliveries = Mutex::new(Self::load_delivery_map_v3(&ds_path)?);
let hybrid_keys = Mutex::new(Self::load_hybrid_keys(&hk_path)?);
let users = Mutex::new(Self::load_users(&users_path)?);
let identity_keys = Mutex::new(Self::load_map_string_bytes(&identity_keys_path)?);
@@ -201,28 +222,38 @@ impl FileBackedStore {
fs::write(path, bytes).map_err(|e| StorageError::Io(e.to_string()))
}
fn load_delivery_map(
path: &Path,
) -> Result<HashMap<ChannelKey, VecDeque<Vec<u8>>>, StorageError> {
/// Load deliveries as V3. Falls back to V2 format (assigns seqs starting at 0).
fn load_delivery_map_v3(path: &Path) -> Result<QueueMapV3, StorageError> {
if !path.exists() {
return Ok(HashMap::new());
return Ok(QueueMapV3::default());
}
let bytes = fs::read(path).map_err(|e| StorageError::Io(e.to_string()))?;
if bytes.is_empty() {
return Ok(HashMap::new());
return Ok(QueueMapV3::default());
}
bincode::deserialize::<QueueMapV2>(&bytes)
.map(|v| v.map)
.map_err(|_| StorageError::Io("deliveries file: v1 format no longer supported; delete or migrate".into()))
// Try V3 first.
if let Ok(v3) = bincode::deserialize::<QueueMapV3>(&bytes) {
return Ok(v3);
}
// Fall back to V2: assign ascending seqs starting at 0 per channel.
let v2 = bincode::deserialize::<QueueMapV2>(&bytes)
.map_err(|_| StorageError::Io("deliveries file: unrecognised format".into()))?;
let mut v3 = QueueMapV3::default();
for (key, queue) in v2.map {
let entries: VecDeque<SeqEntry> = queue
.into_iter()
.enumerate()
.map(|(i, data)| SeqEntry { seq: i as u64, data })
.collect();
let next = entries.len() as u64;
v3.next_seq.insert(key.clone(), next);
v3.map.insert(key, entries);
}
Ok(v3)
}
fn flush_delivery_map(
&self,
path: &Path,
map: &HashMap<ChannelKey, VecDeque<Vec<u8>>>,
) -> Result<(), StorageError> {
let payload = QueueMapV2 { map: map.clone() };
let bytes = bincode::serialize(&payload).map_err(|_| StorageError::Serde)?;
fn flush_delivery_map(&self, path: &Path, map: &QueueMapV3) -> Result<(), StorageError> {
let bytes = bincode::serialize(map).map_err(|_| StorageError::Serde)?;
if let Some(parent) = path.parent() {
fs::create_dir_all(parent).map_err(|e| StorageError::Io(e.to_string()))?;
}
@@ -309,27 +340,35 @@ impl Store for FileBackedStore {
recipient_key: &[u8],
channel_id: &[u8],
payload: Vec<u8>,
) -> Result<(), StorageError> {
let mut map = lock(&self.deliveries)?;
) -> Result<u64, StorageError> {
let mut inner = lock(&self.deliveries)?;
let key = ChannelKey {
channel_id: channel_id.to_vec(),
recipient_key: recipient_key.to_vec(),
};
map.entry(key).or_default().push_back(payload);
self.flush_delivery_map(&self.ds_path, &*map)
let seq = *inner.next_seq.entry(key.clone()).or_insert(0);
*inner.next_seq.get_mut(&key).unwrap() = seq + 1;
inner.map.entry(key).or_default().push_back(SeqEntry { seq, data: payload });
self.flush_delivery_map(&self.ds_path, &*inner)?;
Ok(seq)
}
fn fetch(&self, recipient_key: &[u8], channel_id: &[u8]) -> Result<Vec<Vec<u8>>, StorageError> {
let mut map = lock(&self.deliveries)?;
fn fetch(
&self,
recipient_key: &[u8],
channel_id: &[u8],
) -> Result<Vec<(u64, Vec<u8>)>, StorageError> {
let mut inner = lock(&self.deliveries)?;
let key = ChannelKey {
channel_id: channel_id.to_vec(),
recipient_key: recipient_key.to_vec(),
};
let messages = map
let messages: Vec<(u64, Vec<u8>)> = inner
.map
.get_mut(&key)
.map(|q| q.drain(..).collect())
.map(|q| q.drain(..).map(|e| (e.seq, e.data)).collect())
.unwrap_or_default();
self.flush_delivery_map(&self.ds_path, &*map)?;
self.flush_delivery_map(&self.ds_path, &*inner)?;
Ok(messages)
}
@@ -338,30 +377,31 @@ impl Store for FileBackedStore {
recipient_key: &[u8],
channel_id: &[u8],
limit: usize,
) -> Result<Vec<Vec<u8>>, StorageError> {
let mut map = lock(&self.deliveries)?;
) -> Result<Vec<(u64, Vec<u8>)>, StorageError> {
let mut inner = lock(&self.deliveries)?;
let key = ChannelKey {
channel_id: channel_id.to_vec(),
recipient_key: recipient_key.to_vec(),
};
let messages = map
let messages: Vec<(u64, Vec<u8>)> = inner
.map
.get_mut(&key)
.map(|q| {
let count = limit.min(q.len());
q.drain(..count).collect()
q.drain(..count).map(|e| (e.seq, e.data)).collect()
})
.unwrap_or_default();
self.flush_delivery_map(&self.ds_path, &*map)?;
self.flush_delivery_map(&self.ds_path, &*inner)?;
Ok(messages)
}
fn queue_depth(&self, recipient_key: &[u8], channel_id: &[u8]) -> Result<usize, StorageError> {
let map = lock(&self.deliveries)?;
let inner = lock(&self.deliveries)?;
let key = ChannelKey {
channel_id: channel_id.to_vec(),
recipient_key: recipient_key.to_vec(),
};
Ok(map.get(&key).map(|q| q.len()).unwrap_or(0))
Ok(inner.map.get(&key).map(|q| q.len()).unwrap_or(0))
}
fn gc_expired_messages(&self, _max_age_secs: u64) -> Result<usize, StorageError> {

View File

@@ -0,0 +1,72 @@
use std::path::PathBuf;
use anyhow::Context;
use quinn::ServerConfig;
use quinn_proto::crypto::rustls::QuicServerConfig;
use rcgen::generate_simple_self_signed;
use rustls::pki_types::{CertificateDer, PrivateKeyDer};
use rustls::version::TLS13;
/// Ensure a self-signed certificate exists on disk and return a QUIC server config.
/// When `production` is true, cert and key must already exist (no auto-generation).
pub fn build_server_config(
cert_path: &PathBuf,
key_path: &PathBuf,
production: bool,
) -> anyhow::Result<ServerConfig> {
if !cert_path.exists() || !key_path.exists() {
if production {
anyhow::bail!(
"TLS cert or key missing at {:?} / {:?}; production mode forbids auto-generation",
cert_path,
key_path
);
}
generate_self_signed_cert(cert_path, key_path)?;
}
let cert_bytes = std::fs::read(cert_path).context("read cert")?;
let key_bytes = std::fs::read(key_path).context("read key")?;
let cert_chain = vec![CertificateDer::from(cert_bytes)];
let key = PrivateKeyDer::try_from(key_bytes).map_err(|_| anyhow::anyhow!("invalid key"))?;
let mut tls = rustls::ServerConfig::builder_with_protocol_versions(&[&TLS13])
.with_no_client_auth()
.with_single_cert(cert_chain, key)?;
tls.alpn_protocols = vec![b"capnp".to_vec()];
let crypto = QuicServerConfig::try_from(tls)
.map_err(|e| anyhow::anyhow!("invalid server TLS config: {e}"))?;
Ok(ServerConfig::with_crypto(std::sync::Arc::new(crypto)))
}
fn generate_self_signed_cert(cert_path: &PathBuf, key_path: &PathBuf) -> anyhow::Result<()> {
if let Some(parent) = cert_path.parent() {
std::fs::create_dir_all(parent).context("create cert dir")?;
}
if let Some(parent) = key_path.parent() {
std::fs::create_dir_all(parent).context("create key dir")?;
}
let subject_alt_names = vec![
"localhost".to_string(),
"127.0.0.1".to_string(),
"::1".to_string(),
];
let issued = generate_simple_self_signed(subject_alt_names)?;
let key_der = issued.key_pair.serialize_der();
std::fs::write(cert_path, issued.cert.der()).context("write cert")?;
std::fs::write(key_path, &key_der).context("write key")?;
tracing::info!(
cert = %cert_path.display(),
key = %key_path.display(),
"generated self-signed TLS certificate"
);
Ok(())
}