feat: Phase 9 — developer experience, extensibility, and community growth
New crates: - quicproquo-bot: Bot SDK with polling API + JSON pipe mode - quicproquo-kt: Key Transparency Merkle log (RFC 9162 subset) - quicproquo-plugin-api: no_std C-compatible plugin vtable API - quicproquo-gen: scaffolding tool (qpq-gen plugin/bot/rpc/hook) Server features: - ServerHooks trait wired into all RPC handlers (enqueue, fetch, auth, channel, registration) with plugin rejection support - Dynamic plugin loader (libloading) with --plugin-dir config - Delivery proof canary tokens (Ed25519 server signatures on enqueue) - Key Transparency Merkle log with inclusion proofs on resolveUser Core library: - Safety numbers (60-digit HMAC-SHA256 key verification codes) - Verifiable transcript archive (CBOR + ChaCha20-Poly1305 + hash chain) - Delivery proof verification utility - Criterion benchmarks (hybrid KEM, MLS, identity, sealed sender, padding) Client: - /verify REPL command for out-of-band key verification - Full-screen TUI via Ratatui (feature-gated --features tui) - qpq export / qpq export-verify CLI subcommands - KT inclusion proof verification on user resolution Also: ROADMAP Phase 9 added, bot SDK docs, server hooks docs, crate-responsibilities updated, example plugins (rate_limit, logging).
This commit is contained in:
@@ -31,6 +31,8 @@ pub struct FileConfig {
|
||||
#[serde(default)]
|
||||
pub metrics_enabled: Option<bool>,
|
||||
pub federation: Option<FederationFileConfig>,
|
||||
/// Directory containing plugin `.so` / `.dylib` files to load at startup.
|
||||
pub plugin_dir: Option<PathBuf>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -51,6 +53,8 @@ pub struct EffectiveConfig {
|
||||
/// Start metrics server only when true and metrics_listen is set.
|
||||
pub metrics_enabled: bool,
|
||||
pub federation: Option<EffectiveFederationConfig>,
|
||||
/// Directory to scan for plugin `.so` / `.dylib` files at startup. None = no plugins.
|
||||
pub plugin_dir: Option<PathBuf>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Deserialize)]
|
||||
@@ -214,6 +218,8 @@ pub fn merge_config(args: &crate::Args, file: &FileConfig) -> EffectiveConfig {
|
||||
}
|
||||
};
|
||||
|
||||
let plugin_dir = args.plugin_dir.clone().or_else(|| file.plugin_dir.clone());
|
||||
|
||||
EffectiveConfig {
|
||||
listen,
|
||||
data_dir,
|
||||
@@ -228,6 +234,7 @@ pub fn merge_config(args: &crate::Args, file: &FileConfig) -> EffectiveConfig {
|
||||
metrics_listen,
|
||||
metrics_enabled,
|
||||
federation,
|
||||
plugin_dir,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
198
crates/quicproquo-server/src/hooks.rs
Normal file
198
crates/quicproquo-server/src/hooks.rs
Normal file
@@ -0,0 +1,198 @@
|
||||
//! Server-side plugin hooks for extending quicproquo.
|
||||
//!
|
||||
//! Implement the [`ServerHooks`] trait to intercept server events — message delivery,
|
||||
//! authentication, channel creation, and more. Hooks fire after validation but before
|
||||
//! storage, so they can inspect, log, or reject operations.
|
||||
//!
|
||||
//! # Built-in implementations
|
||||
//!
|
||||
//! - [`NoopHooks`] — does nothing (default when no hooks are configured)
|
||||
//! - [`TracingHooks`] — logs all events via `tracing` at info/debug level
|
||||
//!
|
||||
//! # Writing a custom hook
|
||||
//!
|
||||
//! ```rust,ignore
|
||||
//! use quicproquo_server::hooks::{ServerHooks, HookAction, MessageEvent};
|
||||
//!
|
||||
//! struct ModeratorHook {
|
||||
//! banned_words: Vec<String>,
|
||||
//! }
|
||||
//!
|
||||
//! impl ServerHooks for ModeratorHook {
|
||||
//! fn on_message_enqueue(&self, event: &MessageEvent) -> HookAction {
|
||||
//! // Can't inspect encrypted content (E2E), but can enforce rate limits,
|
||||
//! // payload size limits, or sender restrictions.
|
||||
//! if event.payload_len > 1_000_000 {
|
||||
//! return HookAction::Reject("payload too large".into());
|
||||
//! }
|
||||
//! HookAction::Continue
|
||||
//! }
|
||||
//! }
|
||||
//! ```
|
||||
|
||||
/// The result of a hook invocation.
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum HookAction {
|
||||
/// Allow the operation to proceed.
|
||||
Continue,
|
||||
/// Reject the operation with a reason (returned to the client as an error).
|
||||
Reject(String),
|
||||
}
|
||||
|
||||
/// Event data for message enqueue operations.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct MessageEvent {
|
||||
/// Sender's identity key (32 bytes), if known (None in sealed sender mode).
|
||||
pub sender_identity: Option<Vec<u8>>,
|
||||
/// Recipient's identity key (32 bytes).
|
||||
pub recipient_key: Vec<u8>,
|
||||
/// Channel ID (16 bytes) if this is a DM channel message.
|
||||
pub channel_id: Vec<u8>,
|
||||
/// Length of the encrypted payload in bytes.
|
||||
pub payload_len: usize,
|
||||
/// Server-assigned sequence number.
|
||||
pub seq: u64,
|
||||
}
|
||||
|
||||
/// Event data for authentication operations.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct AuthEvent {
|
||||
/// The username attempting to authenticate.
|
||||
pub username: String,
|
||||
/// Whether the authentication succeeded.
|
||||
pub success: bool,
|
||||
/// Failure reason (empty on success).
|
||||
pub failure_reason: String,
|
||||
}
|
||||
|
||||
/// Event data for channel creation operations.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct ChannelEvent {
|
||||
/// The channel's unique ID (16 bytes).
|
||||
pub channel_id: Vec<u8>,
|
||||
/// Identity key of the initiator.
|
||||
pub initiator_key: Vec<u8>,
|
||||
/// Identity key of the peer.
|
||||
pub peer_key: Vec<u8>,
|
||||
/// True if this is a newly created channel (initiator creates the MLS group).
|
||||
pub was_new: bool,
|
||||
}
|
||||
|
||||
/// Event data for message fetch operations.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct FetchEvent {
|
||||
/// Identity key of the fetcher.
|
||||
pub recipient_key: Vec<u8>,
|
||||
/// Channel ID being fetched from.
|
||||
pub channel_id: Vec<u8>,
|
||||
/// Number of messages returned.
|
||||
pub message_count: usize,
|
||||
}
|
||||
|
||||
/// Trait for server-side plugin hooks.
|
||||
///
|
||||
/// All methods have default implementations that return [`HookAction::Continue`],
|
||||
/// so you only need to override the events you care about.
|
||||
///
|
||||
/// Hooks are called synchronously in the RPC handler path. Keep them fast —
|
||||
/// offload heavy work (HTTP calls, disk I/O) to background tasks.
|
||||
pub trait ServerHooks: Send + Sync {
|
||||
/// Called after validation, before a message is stored in the delivery queue.
|
||||
///
|
||||
/// Return `HookAction::Reject` to prevent delivery.
|
||||
fn on_message_enqueue(&self, _event: &MessageEvent) -> HookAction {
|
||||
HookAction::Continue
|
||||
}
|
||||
|
||||
/// Called after a batch of messages is enqueued.
|
||||
fn on_batch_enqueue(&self, _events: &[MessageEvent]) {
|
||||
// Default: no-op
|
||||
}
|
||||
|
||||
/// Called after a successful or failed login attempt.
|
||||
fn on_auth(&self, _event: &AuthEvent) {
|
||||
// Default: no-op
|
||||
}
|
||||
|
||||
/// Called after a channel is created or looked up.
|
||||
fn on_channel_created(&self, _event: &ChannelEvent) {
|
||||
// Default: no-op
|
||||
}
|
||||
|
||||
/// Called after messages are fetched from the delivery queue.
|
||||
fn on_fetch(&self, _event: &FetchEvent) {
|
||||
// Default: no-op
|
||||
}
|
||||
|
||||
/// Called when a user registers (OPAQUE registration complete).
|
||||
fn on_user_registered(&self, _username: &str, _identity_key: &[u8]) {
|
||||
// Default: no-op
|
||||
}
|
||||
}
|
||||
|
||||
/// No-op hook implementation (default).
|
||||
pub struct NoopHooks;
|
||||
|
||||
impl ServerHooks for NoopHooks {}
|
||||
|
||||
/// Hook implementation that logs all events via `tracing`.
|
||||
pub struct TracingHooks;
|
||||
|
||||
impl ServerHooks for TracingHooks {
|
||||
fn on_message_enqueue(&self, event: &MessageEvent) -> HookAction {
|
||||
tracing::info!(
|
||||
recipient_prefix = %hex_prefix(&event.recipient_key),
|
||||
payload_len = event.payload_len,
|
||||
seq = event.seq,
|
||||
has_sender = event.sender_identity.is_some(),
|
||||
"hook: message enqueued"
|
||||
);
|
||||
HookAction::Continue
|
||||
}
|
||||
|
||||
fn on_batch_enqueue(&self, events: &[MessageEvent]) {
|
||||
tracing::info!(
|
||||
count = events.len(),
|
||||
"hook: batch enqueue"
|
||||
);
|
||||
}
|
||||
|
||||
fn on_auth(&self, event: &AuthEvent) {
|
||||
if event.success {
|
||||
tracing::info!(username = %event.username, "hook: login success");
|
||||
} else {
|
||||
tracing::warn!(
|
||||
username = %event.username,
|
||||
reason = %event.failure_reason,
|
||||
"hook: login failure"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
fn on_channel_created(&self, event: &ChannelEvent) {
|
||||
tracing::info!(
|
||||
channel_id = %hex_prefix(&event.channel_id),
|
||||
was_new = event.was_new,
|
||||
"hook: channel created"
|
||||
);
|
||||
}
|
||||
|
||||
fn on_fetch(&self, event: &FetchEvent) {
|
||||
if event.message_count > 0 {
|
||||
tracing::debug!(
|
||||
recipient_prefix = %hex_prefix(&event.recipient_key),
|
||||
count = event.message_count,
|
||||
"hook: messages fetched"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
fn on_user_registered(&self, username: &str, _identity_key: &[u8]) {
|
||||
tracing::info!(username = %username, "hook: user registered");
|
||||
}
|
||||
}
|
||||
|
||||
fn hex_prefix(bytes: &[u8]) -> String {
|
||||
let n = bytes.len().min(4);
|
||||
hex::encode(&bytes[..n])
|
||||
}
|
||||
@@ -9,6 +9,7 @@ use clap::Parser;
|
||||
use dashmap::DashMap;
|
||||
use opaque_ke::ServerSetup;
|
||||
use quicproquo_core::opaque_auth::OpaqueSuite;
|
||||
use quicproquo_kt::MerkleLog;
|
||||
use quinn::Endpoint;
|
||||
use rand::rngs::OsRng;
|
||||
use tokio::sync::Notify;
|
||||
@@ -18,8 +19,10 @@ mod auth;
|
||||
mod config;
|
||||
mod error_codes;
|
||||
mod federation;
|
||||
pub mod hooks;
|
||||
mod metrics;
|
||||
mod node_service;
|
||||
mod plugin_loader;
|
||||
mod sql_store;
|
||||
mod tls;
|
||||
mod storage;
|
||||
@@ -106,6 +109,11 @@ struct Args {
|
||||
/// Federation QUIC listen address (default: 0.0.0.0:7001).
|
||||
#[arg(long, env = "QPQ_FEDERATION_LISTEN")]
|
||||
federation_listen: Option<String>,
|
||||
|
||||
/// Directory containing plugin `.so` / `.dylib` files to load at startup.
|
||||
/// Each library must export `extern "C" fn qpq_plugin_init(vtable: *mut HookVTable) -> i32`.
|
||||
#[arg(long, env = "QPQ_PLUGIN_DIR")]
|
||||
plugin_dir: Option<PathBuf>,
|
||||
}
|
||||
|
||||
// ── Entry point ───────────────────────────────────────────────────────────────
|
||||
@@ -237,6 +245,66 @@ async fn main() -> anyhow::Result<()> {
|
||||
Err(e) => return Err(anyhow::anyhow!("load OPAQUE server setup: {e}")),
|
||||
};
|
||||
|
||||
// Server Ed25519 signing key for delivery proofs: load from storage or generate fresh.
|
||||
let signing_key: Arc<quicproquo_core::IdentityKeypair> = match store.get_signing_key_seed() {
|
||||
Ok(Some(seed_bytes)) => {
|
||||
let seed: [u8; 32] = seed_bytes
|
||||
.as_slice()
|
||||
.try_into()
|
||||
.context("signing key seed must be 32 bytes")?;
|
||||
tracing::info!("loaded persisted server signing key");
|
||||
Arc::new(quicproquo_core::IdentityKeypair::from_seed(seed))
|
||||
}
|
||||
Ok(None) => {
|
||||
let kp = quicproquo_core::IdentityKeypair::generate();
|
||||
store
|
||||
.store_signing_key_seed(kp.seed_bytes().to_vec())
|
||||
.context("persist server signing key")?;
|
||||
tracing::info!("generated and persisted new server signing key");
|
||||
Arc::new(kp)
|
||||
}
|
||||
Err(e) => return Err(anyhow::anyhow!("load server signing key: {e}")),
|
||||
};
|
||||
|
||||
// Key Transparency Merkle log: load from storage or start fresh.
|
||||
let kt_log: Arc<std::sync::Mutex<MerkleLog>> = match store.load_kt_log() {
|
||||
Ok(Some(bytes)) => {
|
||||
match MerkleLog::from_bytes(&bytes) {
|
||||
Ok(log) => {
|
||||
tracing::info!(entries = log.len(), "loaded persisted KT Merkle log");
|
||||
Arc::new(std::sync::Mutex::new(log))
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!(error = %e, "KT log deserialise failed; starting fresh");
|
||||
Arc::new(std::sync::Mutex::new(MerkleLog::new()))
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(None) => {
|
||||
tracing::info!("no KT log found; starting fresh");
|
||||
Arc::new(std::sync::Mutex::new(MerkleLog::new()))
|
||||
}
|
||||
Err(e) => return Err(anyhow::anyhow!("load KT log: {e}")),
|
||||
};
|
||||
|
||||
// ── Plugin hooks ──────────────────────────────────────────────────────────
|
||||
let hooks: Arc<dyn hooks::ServerHooks> = if let Some(dir) = &effective.plugin_dir {
|
||||
let plugins = plugin_loader::load_plugins_from_dir(dir);
|
||||
if plugins.is_empty() {
|
||||
tracing::info!(dir = %dir.display(), "plugin_dir set but no plugins loaded");
|
||||
Arc::new(hooks::NoopHooks)
|
||||
} else {
|
||||
tracing::info!(count = plugins.len(), "plugins loaded");
|
||||
let boxed: Vec<Box<dyn hooks::ServerHooks>> = plugins
|
||||
.into_iter()
|
||||
.map(|p| Box::new(p) as Box<dyn hooks::ServerHooks>)
|
||||
.collect();
|
||||
Arc::new(plugin_loader::ChainedHooks::new(boxed))
|
||||
}
|
||||
} else {
|
||||
Arc::new(hooks::NoopHooks)
|
||||
};
|
||||
|
||||
let pending_logins: Arc<DashMap<String, PendingLogin>> = Arc::new(DashMap::new());
|
||||
let sessions: Arc<DashMap<Vec<u8>, SessionInfo>> = Arc::new(DashMap::new());
|
||||
let rate_limits: Arc<DashMap<Vec<u8>, RateEntry>> = Arc::new(DashMap::new());
|
||||
@@ -298,7 +366,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
None
|
||||
};
|
||||
|
||||
let fed_bind: SocketAddr = "0.0.0.0:0".parse().unwrap();
|
||||
let fed_bind: SocketAddr = SocketAddr::from(([0, 0, 0, 0], 0));
|
||||
let mut fed_endpoint = Endpoint::client(fed_bind)
|
||||
.context("create federation client endpoint")?;
|
||||
if let Some(cc) = client_config {
|
||||
@@ -522,6 +590,9 @@ async fn main() -> anyhow::Result<()> {
|
||||
let sealed_sender = effective.sealed_sender;
|
||||
let fed_client = federation_client.clone();
|
||||
let local_dom = local_domain.clone();
|
||||
let sk = Arc::clone(&signing_key);
|
||||
let conn_hooks = Arc::clone(&hooks);
|
||||
let conn_kt_log = Arc::clone(&kt_log);
|
||||
|
||||
tokio::task::spawn_local(async move {
|
||||
if let Err(e) = handle_node_connection(
|
||||
@@ -536,6 +607,9 @@ async fn main() -> anyhow::Result<()> {
|
||||
sealed_sender,
|
||||
fed_client,
|
||||
local_dom,
|
||||
sk,
|
||||
conn_hooks,
|
||||
conn_kt_log,
|
||||
)
|
||||
.await
|
||||
{
|
||||
|
||||
@@ -11,6 +11,8 @@ use crate::error_codes::*;
|
||||
use crate::metrics;
|
||||
use crate::storage::StorageError;
|
||||
|
||||
use crate::hooks::AuthEvent;
|
||||
|
||||
use super::NodeServiceImpl;
|
||||
|
||||
// Audit events in this module must never include secrets (no session tokens, passwords, or raw keys).
|
||||
@@ -207,6 +209,11 @@ impl NodeServiceImpl {
|
||||
// Audit: login failure — do not log secrets (no token, no password).
|
||||
tracing::warn!(user = %username, "audit: auth login failure (no pending login)");
|
||||
metrics::record_auth_login_failure_total();
|
||||
self.hooks.on_auth(&AuthEvent {
|
||||
username: username.clone(),
|
||||
success: false,
|
||||
failure_reason: "no pending login".to_string(),
|
||||
});
|
||||
return Promise::err(coded_error(E019_NO_PENDING_LOGIN, "no pending login for this username"))
|
||||
}
|
||||
};
|
||||
@@ -236,6 +243,11 @@ impl NodeServiceImpl {
|
||||
Err(e) => {
|
||||
tracing::warn!(user = %username, "audit: auth login failure (OPAQUE finish failed)");
|
||||
metrics::record_auth_login_failure_total();
|
||||
self.hooks.on_auth(&AuthEvent {
|
||||
username: username.clone(),
|
||||
success: false,
|
||||
failure_reason: format!("OPAQUE finish failed: {e}"),
|
||||
});
|
||||
return Promise::err(coded_error(
|
||||
E010_OPAQUE_ERROR,
|
||||
format!("OPAQUE login finish failed (bad password?): {e}"),
|
||||
@@ -255,6 +267,11 @@ impl NodeServiceImpl {
|
||||
if stored_ik != identity_key {
|
||||
tracing::warn!(user = %username, "audit: auth login failure (identity mismatch)");
|
||||
metrics::record_auth_login_failure_total();
|
||||
self.hooks.on_auth(&AuthEvent {
|
||||
username: username.clone(),
|
||||
success: false,
|
||||
failure_reason: "identity key mismatch".to_string(),
|
||||
});
|
||||
return Promise::err(coded_error(
|
||||
E016_IDENTITY_MISMATCH,
|
||||
"identity key does not match registered key",
|
||||
@@ -279,6 +296,13 @@ impl NodeServiceImpl {
|
||||
|
||||
results.get().set_session_token(&token_vec);
|
||||
|
||||
// Hook: on_auth — fires after successful login.
|
||||
self.hooks.on_auth(&AuthEvent {
|
||||
username: username.clone(),
|
||||
success: true,
|
||||
failure_reason: String::new(),
|
||||
});
|
||||
|
||||
// Audit: login success — do not log session token or any secrets.
|
||||
metrics::record_auth_login_success_total();
|
||||
tracing::info!(user = %username, "audit: auth login success — session token issued");
|
||||
@@ -356,14 +380,39 @@ impl NodeServiceImpl {
|
||||
Err(e) => return Promise::err(storage_err(e)),
|
||||
}
|
||||
|
||||
// Hook: on_user_registered — fires after successful registration.
|
||||
self.hooks.on_user_registered(&username, &identity_key);
|
||||
|
||||
if !identity_key.is_empty() {
|
||||
if let Err(e) = self
|
||||
.store
|
||||
.store_user_identity_key(&username, identity_key)
|
||||
.store_user_identity_key(&username, identity_key.clone())
|
||||
.map_err(storage_err)
|
||||
{
|
||||
return Promise::err(e);
|
||||
}
|
||||
|
||||
// Append (username, identity_key) to the Key Transparency Merkle log.
|
||||
match self.kt_log.lock() {
|
||||
Ok(mut log) => {
|
||||
log.append(&username, &identity_key);
|
||||
// Persist after each append (small extra cost, but ensures durability).
|
||||
match log.to_bytes() {
|
||||
Ok(bytes) => {
|
||||
if let Err(e) = self.store.save_kt_log(bytes) {
|
||||
tracing::warn!(user = %username, error = %e, "KT log persist failed");
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!(user = %username, error = %e, "KT log serialise failed");
|
||||
}
|
||||
}
|
||||
tracing::info!(user = %username, tree_size = log.len(), "KT: appended identity binding");
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!(user = %username, error = %e, "KT log lock poisoned; skipping append");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
results.get().set_success(true);
|
||||
|
||||
@@ -7,6 +7,8 @@ use crate::auth::{coded_error, require_identity, validate_auth_context};
|
||||
use crate::error_codes::*;
|
||||
use crate::storage::StorageError;
|
||||
|
||||
use crate::hooks::ChannelEvent;
|
||||
|
||||
use super::NodeServiceImpl;
|
||||
|
||||
fn storage_err(err: StorageError) -> capnp::Error {
|
||||
@@ -56,6 +58,14 @@ impl NodeServiceImpl {
|
||||
Err(e) => return Promise::err(storage_err(e)),
|
||||
};
|
||||
|
||||
// Hook: on_channel_created — fires after channel is created or looked up.
|
||||
self.hooks.on_channel_created(&ChannelEvent {
|
||||
channel_id: channel_id.clone(),
|
||||
initiator_key: identity.to_vec(),
|
||||
peer_key: peer_key.clone(),
|
||||
was_new,
|
||||
});
|
||||
|
||||
let mut r = results.get();
|
||||
r.set_channel_id(&channel_id);
|
||||
r.set_was_new(was_new);
|
||||
|
||||
@@ -7,6 +7,8 @@ use quicproquo_proto::node_capnp::node_service;
|
||||
use tokio::sync::Notify;
|
||||
use tokio::time::timeout;
|
||||
|
||||
use sha2::{Digest, Sha256};
|
||||
|
||||
use crate::auth::{
|
||||
check_rate_limit, coded_error, fmt_hex, require_identity_or_request, validate_auth_context,
|
||||
};
|
||||
@@ -15,12 +17,38 @@ use crate::metrics;
|
||||
use crate::storage::{StorageError, Store};
|
||||
|
||||
use super::{NodeServiceImpl, CURRENT_WIRE_VERSION};
|
||||
use crate::hooks::{HookAction, MessageEvent, FetchEvent};
|
||||
|
||||
// Audit events here must not include secrets: no payload content, no full recipient/token bytes (prefix only).
|
||||
|
||||
const MAX_PAYLOAD_BYTES: usize = 5 * 1024 * 1024; // 5 MB cap per message
|
||||
const MAX_QUEUE_DEPTH: usize = 1000;
|
||||
|
||||
/// Build a 96-byte delivery proof: SHA-256(seq || recipient_key || timestamp_ms) || Ed25519 sig.
|
||||
///
|
||||
/// Layout:
|
||||
/// bytes 0..32 — SHA-256 preimage hash
|
||||
/// bytes 32..96 — Ed25519 signature over those 32 bytes
|
||||
fn build_delivery_proof(
|
||||
signing_key: &quicproquo_core::IdentityKeypair,
|
||||
seq: u64,
|
||||
recipient_key: &[u8],
|
||||
timestamp_ms: u64,
|
||||
) -> [u8; 96] {
|
||||
let mut hasher = Sha256::new();
|
||||
hasher.update(seq.to_le_bytes());
|
||||
hasher.update(recipient_key);
|
||||
hasher.update(timestamp_ms.to_le_bytes());
|
||||
let hash: [u8; 32] = hasher.finalize().into();
|
||||
|
||||
let sig = signing_key.sign_raw(&hash);
|
||||
|
||||
let mut proof = [0u8; 96];
|
||||
proof[..32].copy_from_slice(&hash);
|
||||
proof[32..].copy_from_slice(&sig);
|
||||
proof
|
||||
}
|
||||
|
||||
fn storage_err(err: StorageError) -> capnp::Error {
|
||||
coded_error(E009_STORAGE_ERROR, err)
|
||||
}
|
||||
@@ -173,6 +201,24 @@ impl NodeServiceImpl {
|
||||
}
|
||||
|
||||
let payload_len = payload.len();
|
||||
let sender_identity = if self.sealed_sender {
|
||||
None
|
||||
} else {
|
||||
crate::auth::require_identity(&auth_ctx).ok().map(|v| v.to_vec())
|
||||
};
|
||||
|
||||
// Hook: on_message_enqueue — fires after validation, before storage.
|
||||
let hook_event = MessageEvent {
|
||||
sender_identity,
|
||||
recipient_key: recipient_key.clone(),
|
||||
channel_id: channel_id.clone(),
|
||||
payload_len,
|
||||
seq: 0, // not yet assigned
|
||||
};
|
||||
if let HookAction::Reject(reason) = self.hooks.on_message_enqueue(&hook_event) {
|
||||
return Promise::err(capnp::Error::failed(format!("hook rejected enqueue: {reason}")));
|
||||
}
|
||||
|
||||
let seq = match self
|
||||
.store
|
||||
.enqueue(&recipient_key, &channel_id, payload)
|
||||
@@ -182,7 +228,15 @@ impl NodeServiceImpl {
|
||||
Err(e) => return Promise::err(e),
|
||||
};
|
||||
|
||||
results.get().set_seq(seq);
|
||||
let timestamp_ms = std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_millis() as u64;
|
||||
let proof = build_delivery_proof(&self.signing_key, seq, &recipient_key, timestamp_ms);
|
||||
|
||||
let mut r = results.get();
|
||||
r.set_seq(seq);
|
||||
r.set_delivery_proof(&proof);
|
||||
|
||||
// Metrics and audit. Audit events must not include secrets (no payload, no full keys).
|
||||
metrics::record_enqueue_total();
|
||||
@@ -306,6 +360,13 @@ impl NodeServiceImpl {
|
||||
}
|
||||
};
|
||||
|
||||
// Hook: on_fetch — fires after messages are retrieved.
|
||||
self.hooks.on_fetch(&FetchEvent {
|
||||
recipient_key: recipient_key.clone(),
|
||||
channel_id: channel_id.clone(),
|
||||
message_count: messages.len(),
|
||||
});
|
||||
|
||||
// Audit: fetch — do not log payload or full keys.
|
||||
metrics::record_fetch_total();
|
||||
tracing::info!(
|
||||
@@ -671,11 +732,33 @@ impl NodeServiceImpl {
|
||||
recipient_key_vecs.push(rk);
|
||||
}
|
||||
|
||||
// Hook: on_message_enqueue for each recipient — fires before storage.
|
||||
let sender_identity = if self.sealed_sender {
|
||||
None
|
||||
} else {
|
||||
crate::auth::require_identity(&auth_ctx).ok().map(|v| v.to_vec())
|
||||
};
|
||||
let mut hook_events = Vec::with_capacity(recipient_key_vecs.len());
|
||||
for rk in &recipient_key_vecs {
|
||||
let event = MessageEvent {
|
||||
sender_identity: sender_identity.clone(),
|
||||
recipient_key: rk.clone(),
|
||||
channel_id: channel_id.clone(),
|
||||
payload_len: payload.len(),
|
||||
seq: 0,
|
||||
};
|
||||
if let HookAction::Reject(reason) = self.hooks.on_message_enqueue(&event) {
|
||||
return Promise::err(capnp::Error::failed(format!("hook rejected enqueue: {reason}")));
|
||||
}
|
||||
hook_events.push(event);
|
||||
}
|
||||
|
||||
let n = recipient_key_vecs.len();
|
||||
let store = Arc::clone(&self.store);
|
||||
let waiters = Arc::clone(&self.waiters);
|
||||
let fed_client = self.federation_client.clone();
|
||||
let local_domain = self.local_domain.clone();
|
||||
let hooks = Arc::clone(&self.hooks);
|
||||
|
||||
// Use an async future to support federation relay alongside local enqueue.
|
||||
// All storage operations are synchronous; only federation relay calls are await-ed.
|
||||
@@ -734,6 +817,9 @@ impl NodeServiceImpl {
|
||||
list.set(i as u32, *seq);
|
||||
}
|
||||
|
||||
// Hook: on_batch_enqueue — fires after all messages are stored.
|
||||
hooks.on_batch_enqueue(&hook_events);
|
||||
|
||||
tracing::info!(
|
||||
recipient_count = n,
|
||||
payload_len = payload.len(),
|
||||
|
||||
@@ -5,6 +5,7 @@ use capnp_rpc::RpcSystem;
|
||||
use dashmap::DashMap;
|
||||
use opaque_ke::ServerSetup;
|
||||
use quicproquo_core::opaque_auth::OpaqueSuite;
|
||||
use quicproquo_kt::MerkleLog;
|
||||
use quicproquo_proto::node_capnp::node_service;
|
||||
use tokio::sync::Notify;
|
||||
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
|
||||
@@ -211,6 +212,12 @@ pub struct NodeServiceImpl {
|
||||
pub federation_client: Option<Arc<crate::federation::FederationClient>>,
|
||||
/// This server's federation domain (empty if federation disabled).
|
||||
pub local_domain: Option<String>,
|
||||
/// Server-side plugin hooks for extensibility.
|
||||
pub hooks: Arc<dyn crate::hooks::ServerHooks>,
|
||||
/// Server Ed25519 signing key for delivery proofs.
|
||||
pub signing_key: Arc<quicproquo_core::IdentityKeypair>,
|
||||
/// Key Transparency Merkle log (shared across connections).
|
||||
pub kt_log: Arc<std::sync::Mutex<MerkleLog>>,
|
||||
}
|
||||
|
||||
impl NodeServiceImpl {
|
||||
@@ -225,6 +232,9 @@ impl NodeServiceImpl {
|
||||
sealed_sender: bool,
|
||||
federation_client: Option<Arc<crate::federation::FederationClient>>,
|
||||
local_domain: Option<String>,
|
||||
signing_key: Arc<quicproquo_core::IdentityKeypair>,
|
||||
hooks: Arc<dyn crate::hooks::ServerHooks>,
|
||||
kt_log: Arc<std::sync::Mutex<MerkleLog>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
store,
|
||||
@@ -237,6 +247,9 @@ impl NodeServiceImpl {
|
||||
sealed_sender,
|
||||
federation_client,
|
||||
local_domain,
|
||||
hooks,
|
||||
signing_key,
|
||||
kt_log,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -253,6 +266,9 @@ pub async fn handle_node_connection(
|
||||
sealed_sender: bool,
|
||||
federation_client: Option<Arc<crate::federation::FederationClient>>,
|
||||
local_domain: Option<String>,
|
||||
signing_key: Arc<quicproquo_core::IdentityKeypair>,
|
||||
hooks: Arc<dyn crate::hooks::ServerHooks>,
|
||||
kt_log: Arc<std::sync::Mutex<MerkleLog>>,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
let connection = connecting.await?;
|
||||
|
||||
@@ -284,6 +300,9 @@ pub async fn handle_node_connection(
|
||||
sealed_sender,
|
||||
federation_client,
|
||||
local_domain,
|
||||
signing_key,
|
||||
hooks,
|
||||
kt_log,
|
||||
));
|
||||
|
||||
RpcSystem::new(Box::new(network), Some(service.client))
|
||||
|
||||
@@ -78,14 +78,36 @@ impl NodeServiceImpl {
|
||||
}
|
||||
|
||||
// Local resolution.
|
||||
match self.store.get_user_identity_key(&addr.username) {
|
||||
Ok(Some(key)) => {
|
||||
results.get().set_identity_key(&key);
|
||||
}
|
||||
let identity_key = match self.store.get_user_identity_key(&addr.username) {
|
||||
Ok(Some(key)) => key,
|
||||
Ok(None) => {
|
||||
// Return empty Data — caller checks length to detect "not found".
|
||||
return Promise::ok(());
|
||||
}
|
||||
Err(e) => return Promise::err(storage_err(e)),
|
||||
};
|
||||
|
||||
let mut r = results.get();
|
||||
r.set_identity_key(&identity_key);
|
||||
|
||||
// Attempt to include a KT Merkle inclusion proof.
|
||||
// Non-fatal: if the log is unavailable or has no entry, return just the key.
|
||||
if let Ok(log) = self.kt_log.lock() {
|
||||
if let Some(leaf_idx) = log.find(&addr.username, &identity_key) {
|
||||
match log.inclusion_proof(leaf_idx) {
|
||||
Ok(proof) => match proof.to_bytes() {
|
||||
Ok(bytes) => {
|
||||
r.set_inclusion_proof(&bytes);
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!(error = %e, "KT proof serialise failed");
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
tracing::warn!(error = %e, "KT inclusion_proof failed");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Promise::ok(())
|
||||
|
||||
342
crates/quicproquo-server/src/plugin_loader.rs
Normal file
342
crates/quicproquo-server/src/plugin_loader.rs
Normal file
@@ -0,0 +1,342 @@
|
||||
//! Dynamic plugin loader for server-side hook extensions.
|
||||
//!
|
||||
//! Loads shared libraries (`*.so` / `*.dylib`) from a directory at server
|
||||
//! startup. Each library must export:
|
||||
//!
|
||||
//! ```c
|
||||
//! extern "C" int32_t qpq_plugin_init(HookVTable *vtable);
|
||||
//! ```
|
||||
//!
|
||||
//! The server creates a zeroed [`HookVTable`], passes it to `qpq_plugin_init`,
|
||||
//! and wraps the resulting vtable in a [`PluginHooks`] that implements
|
||||
//! [`ServerHooks`]. Multiple plugins are chained via [`ChainedHooks`].
|
||||
//!
|
||||
//! # Safety model
|
||||
//!
|
||||
//! Dynamic loading is inherently unsafe. The plugin binary MUST:
|
||||
//! - be compiled against the same `quicproquo-plugin-api` version
|
||||
//! - not store the event-struct pointers beyond the callback duration
|
||||
//! - be `Send + Sync` (the wrapper is put behind an `Arc`)
|
||||
//!
|
||||
//! The server operator is responsible for only loading trusted plugin binaries.
|
||||
|
||||
use std::path::Path;
|
||||
|
||||
use libloading::{Library, Symbol};
|
||||
use quicproquo_plugin_api::{
|
||||
CAuthEvent, CChannelEvent, CFetchEvent, CMessageEvent, HookVTable, HOOK_CONTINUE, PLUGIN_OK,
|
||||
};
|
||||
|
||||
use crate::hooks::{AuthEvent, ChannelEvent, FetchEvent, HookAction, MessageEvent, ServerHooks};
|
||||
|
||||
// ── PluginHooks ───────────────────────────────────────────────────────────────
|
||||
|
||||
/// A [`ServerHooks`] implementation backed by a dynamically loaded plugin vtable.
|
||||
///
|
||||
/// Holds the [`Library`] alive alongside the vtable so that the loaded code
|
||||
/// is not unmapped while the vtable function pointers are still reachable.
|
||||
pub struct PluginHooks {
|
||||
/// The vtable filled by `qpq_plugin_init`.
|
||||
vtable: HookVTable,
|
||||
/// Keeps the shared library mapped. Must be dropped after `vtable`.
|
||||
_lib: Library,
|
||||
/// Name of the plugin file, for diagnostics.
|
||||
name: String,
|
||||
}
|
||||
|
||||
impl PluginHooks {
|
||||
/// Load a plugin from `path` and call `qpq_plugin_init`.
|
||||
///
|
||||
/// Returns `Err` if the library cannot be opened, the symbol is missing,
|
||||
/// or `qpq_plugin_init` returns a non-zero error code.
|
||||
pub fn load(path: &Path) -> anyhow::Result<Self> {
|
||||
let name = path
|
||||
.file_name()
|
||||
.map(|n| n.to_string_lossy().into_owned())
|
||||
.unwrap_or_else(|| path.display().to_string());
|
||||
|
||||
// Safety: loading arbitrary shared libraries is inherently unsafe.
|
||||
// The server operator is responsible for only loading trusted plugins.
|
||||
let lib = unsafe { Library::new(path) }
|
||||
.map_err(|e| anyhow::anyhow!("plugin '{}': load failed: {}", name, e))?;
|
||||
|
||||
// Zero-initialise the vtable so unused slots are null.
|
||||
let mut vtable = HookVTable {
|
||||
user_data: core::ptr::null_mut(),
|
||||
on_message_enqueue: None,
|
||||
on_batch_enqueue: None,
|
||||
on_auth: None,
|
||||
on_channel_created: None,
|
||||
on_fetch: None,
|
||||
on_user_registered: None,
|
||||
error_message: None,
|
||||
destroy: None,
|
||||
};
|
||||
|
||||
// Safety: the symbol must have the exact signature declared in the API crate.
|
||||
let init: Symbol<unsafe extern "C" fn(*mut HookVTable) -> i32> =
|
||||
unsafe { lib.get(b"qpq_plugin_init\0") }.map_err(|e| {
|
||||
anyhow::anyhow!("plugin '{}': missing qpq_plugin_init: {}", name, e)
|
||||
})?;
|
||||
|
||||
let rc = unsafe { init(&mut vtable) };
|
||||
if rc != PLUGIN_OK {
|
||||
anyhow::bail!("plugin '{}': qpq_plugin_init returned error {}", name, rc);
|
||||
}
|
||||
|
||||
tracing::info!(plugin = %name, "loaded plugin");
|
||||
Ok(Self { vtable, _lib: lib, name })
|
||||
}
|
||||
|
||||
/// Human-readable plugin name (filename).
|
||||
pub fn name(&self) -> &str {
|
||||
&self.name
|
||||
}
|
||||
|
||||
/// Retrieve the rejection reason from the plugin, falling back to a generic string.
|
||||
fn rejection_reason(&self) -> String {
|
||||
if let Some(f) = self.vtable.error_message {
|
||||
let ptr = unsafe { f(self.vtable.user_data) };
|
||||
if !ptr.is_null() {
|
||||
// Safety: plugin must return a valid null-terminated UTF-8 (or ASCII) string.
|
||||
let cstr = unsafe { std::ffi::CStr::from_ptr(ptr as *const core::ffi::c_char) };
|
||||
return cstr.to_string_lossy().into_owned();
|
||||
}
|
||||
}
|
||||
"rejected by plugin".to_string()
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for PluginHooks {
|
||||
fn drop(&mut self) {
|
||||
if let Some(destroy) = self.vtable.destroy {
|
||||
// Safety: destroy must be safe to call at any time after init.
|
||||
unsafe { destroy(self.vtable.user_data) };
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ServerHooks for PluginHooks {
|
||||
fn on_message_enqueue(&self, event: &MessageEvent) -> HookAction {
|
||||
let f = match self.vtable.on_message_enqueue {
|
||||
Some(f) => f,
|
||||
None => return HookAction::Continue,
|
||||
};
|
||||
|
||||
let sender_ptr = event
|
||||
.sender_identity
|
||||
.as_deref()
|
||||
.map(|s| s.as_ptr())
|
||||
.unwrap_or(core::ptr::null());
|
||||
let sender_len = event.sender_identity.as_deref().map_or(0, |s| s.len());
|
||||
|
||||
let c_event = CMessageEvent {
|
||||
sender_identity: sender_ptr,
|
||||
sender_identity_len: sender_len,
|
||||
recipient_key: event.recipient_key.as_ptr(),
|
||||
recipient_key_len: event.recipient_key.len(),
|
||||
channel_id: event.channel_id.as_ptr(),
|
||||
channel_id_len: event.channel_id.len(),
|
||||
payload_len: event.payload_len,
|
||||
seq: event.seq,
|
||||
};
|
||||
|
||||
let rc = unsafe { f(self.vtable.user_data, &c_event) };
|
||||
if rc == HOOK_CONTINUE {
|
||||
HookAction::Continue
|
||||
} else {
|
||||
HookAction::Reject(self.rejection_reason())
|
||||
}
|
||||
}
|
||||
|
||||
fn on_batch_enqueue(&self, events: &[MessageEvent]) {
|
||||
let f = match self.vtable.on_batch_enqueue {
|
||||
Some(f) => f,
|
||||
None => return,
|
||||
};
|
||||
|
||||
let c_events: Vec<CMessageEvent> = events
|
||||
.iter()
|
||||
.map(|e| {
|
||||
let sender_ptr = e
|
||||
.sender_identity
|
||||
.as_deref()
|
||||
.map(|s| s.as_ptr())
|
||||
.unwrap_or(core::ptr::null());
|
||||
let sender_len = e.sender_identity.as_deref().map_or(0, |s| s.len());
|
||||
CMessageEvent {
|
||||
sender_identity: sender_ptr,
|
||||
sender_identity_len: sender_len,
|
||||
recipient_key: e.recipient_key.as_ptr(),
|
||||
recipient_key_len: e.recipient_key.len(),
|
||||
channel_id: e.channel_id.as_ptr(),
|
||||
channel_id_len: e.channel_id.len(),
|
||||
payload_len: e.payload_len,
|
||||
seq: e.seq,
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
unsafe { f(self.vtable.user_data, c_events.as_ptr(), c_events.len()) };
|
||||
}
|
||||
|
||||
fn on_auth(&self, event: &AuthEvent) {
|
||||
let f = match self.vtable.on_auth {
|
||||
Some(f) => f,
|
||||
None => return,
|
||||
};
|
||||
let c_event = CAuthEvent {
|
||||
username: event.username.as_ptr(),
|
||||
username_len: event.username.len(),
|
||||
success: if event.success { 1 } else { 0 },
|
||||
failure_reason: event.failure_reason.as_ptr(),
|
||||
failure_reason_len: event.failure_reason.len(),
|
||||
};
|
||||
unsafe { f(self.vtable.user_data, &c_event) };
|
||||
}
|
||||
|
||||
fn on_channel_created(&self, event: &ChannelEvent) {
|
||||
let f = match self.vtable.on_channel_created {
|
||||
Some(f) => f,
|
||||
None => return,
|
||||
};
|
||||
let c_event = CChannelEvent {
|
||||
channel_id: event.channel_id.as_ptr(),
|
||||
channel_id_len: event.channel_id.len(),
|
||||
initiator_key: event.initiator_key.as_ptr(),
|
||||
initiator_key_len: event.initiator_key.len(),
|
||||
peer_key: event.peer_key.as_ptr(),
|
||||
peer_key_len: event.peer_key.len(),
|
||||
was_new: if event.was_new { 1 } else { 0 },
|
||||
};
|
||||
unsafe { f(self.vtable.user_data, &c_event) };
|
||||
}
|
||||
|
||||
fn on_fetch(&self, event: &FetchEvent) {
|
||||
let f = match self.vtable.on_fetch {
|
||||
Some(f) => f,
|
||||
None => return,
|
||||
};
|
||||
let c_event = CFetchEvent {
|
||||
recipient_key: event.recipient_key.as_ptr(),
|
||||
recipient_key_len: event.recipient_key.len(),
|
||||
channel_id: event.channel_id.as_ptr(),
|
||||
channel_id_len: event.channel_id.len(),
|
||||
message_count: event.message_count,
|
||||
};
|
||||
unsafe { f(self.vtable.user_data, &c_event) };
|
||||
}
|
||||
|
||||
fn on_user_registered(&self, username: &str, identity_key: &[u8]) {
|
||||
let f = match self.vtable.on_user_registered {
|
||||
Some(f) => f,
|
||||
None => return,
|
||||
};
|
||||
unsafe {
|
||||
f(
|
||||
self.vtable.user_data,
|
||||
username.as_ptr(),
|
||||
username.len(),
|
||||
identity_key.as_ptr(),
|
||||
identity_key.len(),
|
||||
)
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
// ── ChainedHooks ─────────────────────────────────────────────────────────────
|
||||
|
||||
/// Composes multiple [`ServerHooks`] implementations into one.
|
||||
///
|
||||
/// For filtering hooks (`on_message_enqueue`), the first rejection short-circuits
|
||||
/// the chain. For fire-and-forget hooks, all plugins are called in order.
|
||||
pub struct ChainedHooks {
|
||||
hooks: Vec<Box<dyn ServerHooks>>,
|
||||
}
|
||||
|
||||
impl ChainedHooks {
|
||||
pub fn new(hooks: Vec<Box<dyn ServerHooks>>) -> Self {
|
||||
Self { hooks }
|
||||
}
|
||||
}
|
||||
|
||||
impl ServerHooks for ChainedHooks {
|
||||
fn on_message_enqueue(&self, event: &MessageEvent) -> HookAction {
|
||||
for h in &self.hooks {
|
||||
match h.on_message_enqueue(event) {
|
||||
HookAction::Continue => {}
|
||||
reject => return reject,
|
||||
}
|
||||
}
|
||||
HookAction::Continue
|
||||
}
|
||||
|
||||
fn on_batch_enqueue(&self, events: &[MessageEvent]) {
|
||||
for h in &self.hooks {
|
||||
h.on_batch_enqueue(events);
|
||||
}
|
||||
}
|
||||
|
||||
fn on_auth(&self, event: &AuthEvent) {
|
||||
for h in &self.hooks {
|
||||
h.on_auth(event);
|
||||
}
|
||||
}
|
||||
|
||||
fn on_channel_created(&self, event: &ChannelEvent) {
|
||||
for h in &self.hooks {
|
||||
h.on_channel_created(event);
|
||||
}
|
||||
}
|
||||
|
||||
fn on_fetch(&self, event: &FetchEvent) {
|
||||
for h in &self.hooks {
|
||||
h.on_fetch(event);
|
||||
}
|
||||
}
|
||||
|
||||
fn on_user_registered(&self, username: &str, identity_key: &[u8]) {
|
||||
for h in &self.hooks {
|
||||
h.on_user_registered(username, identity_key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ── load_plugins_from_dir ─────────────────────────────────────────────────────
|
||||
|
||||
/// Load all `*.so` / `*.dylib` files from `dir` as plugins.
|
||||
///
|
||||
/// Non-fatal errors (unreadable files, init failures) are logged as warnings
|
||||
/// and skipped; the server continues with the plugins that did load.
|
||||
/// Returns the full list of successfully loaded plugins.
|
||||
pub fn load_plugins_from_dir(dir: &Path) -> Vec<PluginHooks> {
|
||||
let mut plugins = Vec::new();
|
||||
|
||||
let entries = match std::fs::read_dir(dir) {
|
||||
Ok(e) => e,
|
||||
Err(e) => {
|
||||
tracing::warn!(dir = %dir.display(), error = %e, "plugin_dir unreadable; no plugins loaded");
|
||||
return plugins;
|
||||
}
|
||||
};
|
||||
|
||||
for entry in entries.flatten() {
|
||||
let path = entry.path();
|
||||
let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
|
||||
if ext != "so" && ext != "dylib" {
|
||||
continue;
|
||||
}
|
||||
|
||||
match PluginHooks::load(&path) {
|
||||
Ok(p) => {
|
||||
tracing::info!(plugin = %p.name(), "plugin loaded successfully");
|
||||
plugins.push(p);
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!(path = %path.display(), error = %e, "failed to load plugin; skipping");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
plugins
|
||||
}
|
||||
@@ -9,7 +9,7 @@ use rusqlite::{params, Connection};
|
||||
use crate::storage::{StorageError, Store};
|
||||
|
||||
/// Schema version after introducing the migration runner (existing DBs had 1).
|
||||
const SCHEMA_VERSION: i32 = 5;
|
||||
const SCHEMA_VERSION: i32 = 7;
|
||||
|
||||
/// Migrations: (migration_number, SQL). Files named NNN_name.sql, applied in order when N > user_version.
|
||||
const MIGRATIONS: &[(i32, &str)] = &[
|
||||
@@ -17,6 +17,8 @@ const MIGRATIONS: &[(i32, &str)] = &[
|
||||
(3, include_str!("../migrations/002_add_seq.sql")),
|
||||
(4, include_str!("../migrations/003_channels.sql")),
|
||||
(5, include_str!("../migrations/004_federation.sql")),
|
||||
(6, include_str!("../migrations/005_signing_key.sql")),
|
||||
(7, include_str!("../migrations/006_kt_log.sql")),
|
||||
];
|
||||
|
||||
/// Runs pending migrations on an open connection: applies any migration whose number is greater
|
||||
@@ -305,6 +307,48 @@ impl Store for SqlStore {
|
||||
.map_err(|e| StorageError::Db(e.to_string()))
|
||||
}
|
||||
|
||||
fn store_signing_key_seed(&self, seed: Vec<u8>) -> Result<(), StorageError> {
|
||||
let conn = self.lock_conn()?;
|
||||
conn.execute(
|
||||
"INSERT OR REPLACE INTO server_signing_key (id, seed_data) VALUES (1, ?1)",
|
||||
params![seed],
|
||||
)
|
||||
.map_err(|e| StorageError::Db(e.to_string()))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_signing_key_seed(&self) -> Result<Option<Vec<u8>>, StorageError> {
|
||||
let conn = self.lock_conn()?;
|
||||
let mut stmt = conn
|
||||
.prepare("SELECT seed_data FROM server_signing_key WHERE id = 1")
|
||||
.map_err(|e| StorageError::Db(e.to_string()))?;
|
||||
|
||||
stmt.query_row([], |row| row.get(0))
|
||||
.optional()
|
||||
.map_err(|e| StorageError::Db(e.to_string()))
|
||||
}
|
||||
|
||||
fn save_kt_log(&self, bytes: Vec<u8>) -> Result<(), StorageError> {
|
||||
let conn = self.lock_conn()?;
|
||||
conn.execute(
|
||||
"INSERT OR REPLACE INTO kt_log (id, log_data) VALUES (1, ?1)",
|
||||
params![bytes],
|
||||
)
|
||||
.map_err(|e| StorageError::Db(e.to_string()))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn load_kt_log(&self) -> Result<Option<Vec<u8>>, StorageError> {
|
||||
let conn = self.lock_conn()?;
|
||||
let mut stmt = conn
|
||||
.prepare("SELECT log_data FROM kt_log WHERE id = 1")
|
||||
.map_err(|e| StorageError::Db(e.to_string()))?;
|
||||
|
||||
stmt.query_row([], |row| row.get(0))
|
||||
.optional()
|
||||
.map_err(|e| StorageError::Db(e.to_string()))
|
||||
}
|
||||
|
||||
fn store_user_record(&self, username: &str, record: Vec<u8>) -> Result<(), StorageError> {
|
||||
let conn = self.lock_conn()?;
|
||||
conn.execute(
|
||||
|
||||
@@ -81,6 +81,18 @@ pub trait Store: Send + Sync {
|
||||
/// Load the persisted `ServerSetup`, if any.
|
||||
fn get_server_setup(&self) -> Result<Option<Vec<u8>>, StorageError>;
|
||||
|
||||
/// Persist the server's Ed25519 signing key seed (32 bytes) for delivery proofs.
|
||||
fn store_signing_key_seed(&self, seed: Vec<u8>) -> Result<(), StorageError>;
|
||||
|
||||
/// Load the persisted signing key seed, if any.
|
||||
fn get_signing_key_seed(&self) -> Result<Option<Vec<u8>>, StorageError>;
|
||||
|
||||
/// Persist the Key Transparency Merkle log (bincode-serialised `MerkleLog` bytes).
|
||||
fn save_kt_log(&self, bytes: Vec<u8>) -> Result<(), StorageError>;
|
||||
|
||||
/// Load the persisted KT Merkle log, if any.
|
||||
fn load_kt_log(&self) -> Result<Option<Vec<u8>>, StorageError>;
|
||||
|
||||
/// Store an OPAQUE user record (serialized `ServerRegistration`).
|
||||
fn store_user_record(&self, username: &str, record: Vec<u8>) -> Result<(), StorageError>;
|
||||
|
||||
@@ -213,6 +225,8 @@ pub struct FileBackedStore {
|
||||
ds_path: PathBuf,
|
||||
hk_path: PathBuf,
|
||||
setup_path: PathBuf,
|
||||
signing_key_path: PathBuf,
|
||||
kt_log_path: PathBuf,
|
||||
users_path: PathBuf,
|
||||
identity_keys_path: PathBuf,
|
||||
channels_path: PathBuf,
|
||||
@@ -235,6 +249,8 @@ impl FileBackedStore {
|
||||
let ds_path = dir.join("deliveries.bin");
|
||||
let hk_path = dir.join("hybridkeys.bin");
|
||||
let setup_path = dir.join("server_setup.bin");
|
||||
let signing_key_path = dir.join("server_signing_key.bin");
|
||||
let kt_log_path = dir.join("kt_log.bin");
|
||||
let users_path = dir.join("users.bin");
|
||||
let identity_keys_path = dir.join("identity_keys.bin");
|
||||
let channels_path = dir.join("channels.bin");
|
||||
@@ -251,6 +267,8 @@ impl FileBackedStore {
|
||||
ds_path,
|
||||
hk_path,
|
||||
setup_path,
|
||||
signing_key_path,
|
||||
kt_log_path,
|
||||
users_path,
|
||||
identity_keys_path,
|
||||
channels_path,
|
||||
@@ -541,6 +559,52 @@ impl Store for FileBackedStore {
|
||||
Ok(Some(bytes))
|
||||
}
|
||||
|
||||
fn store_signing_key_seed(&self, seed: Vec<u8>) -> Result<(), StorageError> {
|
||||
if let Some(parent) = self.signing_key_path.parent() {
|
||||
fs::create_dir_all(parent).map_err(|e| StorageError::Io(e.to_string()))?;
|
||||
}
|
||||
fs::write(&self.signing_key_path, &seed).map_err(|e| StorageError::Io(e.to_string()))?;
|
||||
#[cfg(unix)]
|
||||
{
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
let _ = std::fs::set_permissions(
|
||||
&self.signing_key_path,
|
||||
std::fs::Permissions::from_mode(0o600),
|
||||
);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_signing_key_seed(&self) -> Result<Option<Vec<u8>>, StorageError> {
|
||||
if !self.signing_key_path.exists() {
|
||||
return Ok(None);
|
||||
}
|
||||
let bytes =
|
||||
fs::read(&self.signing_key_path).map_err(|e| StorageError::Io(e.to_string()))?;
|
||||
if bytes.is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
Ok(Some(bytes))
|
||||
}
|
||||
|
||||
fn save_kt_log(&self, bytes: Vec<u8>) -> Result<(), StorageError> {
|
||||
if let Some(parent) = self.kt_log_path.parent() {
|
||||
fs::create_dir_all(parent).map_err(|e| StorageError::Io(e.to_string()))?;
|
||||
}
|
||||
fs::write(&self.kt_log_path, &bytes).map_err(|e| StorageError::Io(e.to_string()))
|
||||
}
|
||||
|
||||
fn load_kt_log(&self) -> Result<Option<Vec<u8>>, StorageError> {
|
||||
if !self.kt_log_path.exists() {
|
||||
return Ok(None);
|
||||
}
|
||||
let bytes = fs::read(&self.kt_log_path).map_err(|e| StorageError::Io(e.to_string()))?;
|
||||
if bytes.is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
Ok(Some(bytes))
|
||||
}
|
||||
|
||||
fn store_user_record(&self, username: &str, record: Vec<u8>) -> Result<(), StorageError> {
|
||||
let mut map = lock(&self.users)?;
|
||||
match map.entry(username.to_string()) {
|
||||
|
||||
Reference in New Issue
Block a user