From 6273ab668de1164085e7d04b374de565d07bf2e8 Mon Sep 17 00:00:00 2001 From: Christian Nennemann Date: Wed, 4 Mar 2026 12:09:03 +0100 Subject: [PATCH] feat(server): connection pool, session persistence, blob storage in SqlStore - Replace Mutex with Vec> pool (default 4) with try_lock fast-path and blocking fallback - Add SessionRecord struct and session CRUD to Store trait (default no-ops) - Implement session persistence in SqlStore (sessions table, migration 009) - Add blob upload/download with SHA-256 verified staging assembly (blobs + blob_staging tables, migration 010) - All 35 server tests pass, FileBackedStore unaffected --- .../migrations/009_sessions.sql | 8 + .../migrations/010_blobs.sql | 15 + crates/quicproquo-server/src/sql_store.rs | 325 ++++++++++++++---- crates/quicproquo-server/src/storage.rs | 57 +++ 4 files changed, 346 insertions(+), 59 deletions(-) create mode 100644 crates/quicproquo-server/migrations/009_sessions.sql create mode 100644 crates/quicproquo-server/migrations/010_blobs.sql diff --git a/crates/quicproquo-server/migrations/009_sessions.sql b/crates/quicproquo-server/migrations/009_sessions.sql new file mode 100644 index 0000000..1561e7b --- /dev/null +++ b/crates/quicproquo-server/migrations/009_sessions.sql @@ -0,0 +1,8 @@ +CREATE TABLE IF NOT EXISTS sessions ( + token BLOB PRIMARY KEY, + username TEXT NOT NULL, + identity_key BLOB NOT NULL, + created_at INTEGER NOT NULL, + expires_at INTEGER NOT NULL +); +CREATE INDEX IF NOT EXISTS idx_sessions_expires ON sessions(expires_at); diff --git a/crates/quicproquo-server/migrations/010_blobs.sql b/crates/quicproquo-server/migrations/010_blobs.sql new file mode 100644 index 0000000..4f23fe4 --- /dev/null +++ b/crates/quicproquo-server/migrations/010_blobs.sql @@ -0,0 +1,15 @@ +CREATE TABLE IF NOT EXISTS blobs ( + blob_id BLOB PRIMARY KEY, + data BLOB NOT NULL, + total_size INTEGER NOT NULL, + mime_type TEXT NOT NULL DEFAULT '', + uploaded_at INTEGER NOT NULL +); +CREATE TABLE IF NOT EXISTS blob_staging ( + blob_hash BLOB NOT NULL, + offset INTEGER NOT NULL, + chunk BLOB NOT NULL, + total_size INTEGER NOT NULL, + mime_type TEXT NOT NULL DEFAULT '', + PRIMARY KEY (blob_hash, offset) +); diff --git a/crates/quicproquo-server/src/sql_store.rs b/crates/quicproquo-server/src/sql_store.rs index fb61970..192b9e5 100644 --- a/crates/quicproquo-server/src/sql_store.rs +++ b/crates/quicproquo-server/src/sql_store.rs @@ -5,11 +5,15 @@ use std::sync::Mutex; use rand::RngCore; use rusqlite::{params, Connection}; +use sha2::{Digest, Sha256}; -use crate::storage::{StorageError, Store}; +use crate::storage::{SessionRecord, StorageError, Store}; /// Schema version after introducing the migration runner (existing DBs had 1). -const SCHEMA_VERSION: i32 = 9; +const SCHEMA_VERSION: i32 = 11; + +/// Default number of connections in the pool. +const DEFAULT_POOL_SIZE: usize = 4; /// Migrations: (migration_number, SQL). Files named NNN_name.sql, applied in order when N > user_version. const MIGRATIONS: &[(i32, &str)] = &[ @@ -21,6 +25,8 @@ const MIGRATIONS: &[(i32, &str)] = &[ (7, include_str!("../migrations/006_kt_log.sql")), (8, include_str!("../migrations/007_add_expiry.sql")), (9, include_str!("../migrations/008_devices.sql")), + (10, include_str!("../migrations/009_sessions.sql")), + (11, include_str!("../migrations/010_blobs.sql")), ]; /// Runs pending migrations on an open connection: applies any migration whose number is greater @@ -41,19 +47,72 @@ fn run_migrations(conn: &Connection) -> Result<(), StorageError> { Ok(()) } -/// SQLCipher-encrypted storage backend. +/// SQLCipher-encrypted storage backend with a connection pool. +/// +/// Maintains `pool_size` SQLite connections (default 4) behind `std::sync::Mutex`. +/// Each store method tries all connections via `try_lock()` before falling back to +/// blocking on the first connection. WAL mode allows concurrent readers; writers +/// are serialised by SQLite itself. pub struct SqlStore { - conn: Mutex, + pool: Vec>, } impl SqlStore { - fn lock_conn(&self) -> Result, StorageError> { - self.conn + /// Try to acquire any connection from the pool without blocking. + /// Falls back to blocking on the first connection. + fn get_conn(&self) -> Result, StorageError> { + // Fast path: try each connection without blocking. + for conn in &self.pool { + if let Ok(guard) = conn.try_lock() { + return Ok(guard); + } + } + // Slow path: block on the first connection. + self.pool[0] .lock() .map_err(|e| StorageError::Db(format!("lock poisoned: {e}"))) } pub fn open(path: impl AsRef, key: &str) -> Result { + Self::open_with_pool_size(path, key, DEFAULT_POOL_SIZE) + } + + pub fn open_with_pool_size( + path: impl AsRef, + key: &str, + pool_size: usize, + ) -> Result { + let pool_size = pool_size.max(1); + let path = path.as_ref(); + + // Open the first connection and run migrations. + let first = Self::open_one(path, key)?; + let current_version: i32 = first + .pragma_query_value(None, "user_version", |row| row.get(0)) + .map_err(|e| StorageError::Db(format!("PRAGMA user_version failed: {e}")))?; + + if current_version > SCHEMA_VERSION { + return Err(StorageError::Db(format!( + "database schema version {current_version} is newer than supported {SCHEMA_VERSION}" + ))); + } + + run_migrations(&first)?; + + let mut pool = Vec::with_capacity(pool_size); + pool.push(Mutex::new(first)); + + // Open remaining connections (they skip migrations since the first one already ran them). + for _ in 1..pool_size { + let conn = Self::open_one(path, key)?; + pool.push(Mutex::new(conn)); + } + + Ok(Self { pool }) + } + + /// Open a single connection with shared pragmas. + fn open_one(path: &Path, key: &str) -> Result { let conn = Connection::open(path).map_err(|e| StorageError::Db(e.to_string()))?; if !key.is_empty() { @@ -68,21 +127,7 @@ impl SqlStore { ) .map_err(|e| StorageError::Db(e.to_string()))?; - let current_version: i32 = conn - .pragma_query_value(None, "user_version", |row| row.get(0)) - .map_err(|e| StorageError::Db(format!("PRAGMA user_version failed: {e}")))?; - - if current_version > SCHEMA_VERSION { - return Err(StorageError::Db(format!( - "database schema version {current_version} is newer than supported {SCHEMA_VERSION}" - ))); - } - - run_migrations(&conn)?; - - Ok(Self { - conn: Mutex::new(conn), - }) + Ok(conn) } } @@ -92,7 +137,7 @@ impl Store for SqlStore { identity_key: &[u8], package: Vec, ) -> Result<(), StorageError> { - let conn = self.lock_conn()?; + let conn = self.get_conn()?; conn.execute( "INSERT INTO key_packages (identity_key, package_data) VALUES (?1, ?2)", params![identity_key, package], @@ -102,7 +147,7 @@ impl Store for SqlStore { } fn fetch_key_package(&self, identity_key: &[u8]) -> Result>, StorageError> { - let conn = self.lock_conn()?; + let conn = self.get_conn()?; let mut stmt = conn .prepare( @@ -137,7 +182,7 @@ impl Store for SqlStore { payload: Vec, ttl_secs: Option, ) -> Result { - let conn = self.lock_conn()?; + let conn = self.get_conn()?; // Atomically get-and-increment the per-inbox sequence counter. // RETURNING gives us the post-update next_seq; the assigned seq is next_seq - 1. let seq: i64 = conn @@ -170,7 +215,7 @@ impl Store for SqlStore { recipient_key: &[u8], channel_id: &[u8], ) -> Result)>, StorageError> { - let conn = self.lock_conn()?; + let conn = self.get_conn()?; let mut stmt = conn .prepare( @@ -210,7 +255,7 @@ impl Store for SqlStore { channel_id: &[u8], limit: usize, ) -> Result)>, StorageError> { - let conn = self.lock_conn()?; + let conn = self.get_conn()?; let mut stmt = conn .prepare( @@ -246,7 +291,7 @@ impl Store for SqlStore { } fn queue_depth(&self, recipient_key: &[u8], channel_id: &[u8]) -> Result { - let conn = self.lock_conn()?; + let conn = self.get_conn()?; let count: i64 = conn .query_row( "SELECT COUNT(*) FROM deliveries WHERE recipient_key = ?1 AND channel_id = ?2 AND (expires_at IS NULL OR expires_at > strftime('%s','now'))", @@ -258,7 +303,7 @@ impl Store for SqlStore { } fn gc_expired_messages(&self, max_age_secs: u64) -> Result { - let conn = self.lock_conn()?; + let conn = self.get_conn()?; let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() @@ -286,7 +331,7 @@ impl Store for SqlStore { identity_key: &[u8], hybrid_pk: Vec, ) -> Result<(), StorageError> { - let conn = self.lock_conn()?; + let conn = self.get_conn()?; conn.execute( "INSERT OR REPLACE INTO hybrid_keys (identity_key, hybrid_public_key) VALUES (?1, ?2)", params![identity_key, hybrid_pk], @@ -296,7 +341,7 @@ impl Store for SqlStore { } fn fetch_hybrid_key(&self, identity_key: &[u8]) -> Result>, StorageError> { - let conn = self.lock_conn()?; + let conn = self.get_conn()?; let mut stmt = conn .prepare("SELECT hybrid_public_key FROM hybrid_keys WHERE identity_key = ?1") .map_err(|e| StorageError::Db(e.to_string()))?; @@ -307,7 +352,7 @@ impl Store for SqlStore { } fn store_server_setup(&self, setup: Vec) -> Result<(), StorageError> { - let conn = self.lock_conn()?; + let conn = self.get_conn()?; conn.execute( "INSERT OR REPLACE INTO server_setup (id, setup_data) VALUES (1, ?1)", params![setup], @@ -317,7 +362,7 @@ impl Store for SqlStore { } fn get_server_setup(&self) -> Result>, StorageError> { - let conn = self.lock_conn()?; + let conn = self.get_conn()?; let mut stmt = conn .prepare("SELECT setup_data FROM server_setup WHERE id = 1") .map_err(|e| StorageError::Db(e.to_string()))?; @@ -328,7 +373,7 @@ impl Store for SqlStore { } fn store_signing_key_seed(&self, seed: Vec) -> Result<(), StorageError> { - let conn = self.lock_conn()?; + let conn = self.get_conn()?; conn.execute( "INSERT OR REPLACE INTO server_signing_key (id, seed_data) VALUES (1, ?1)", params![seed], @@ -338,7 +383,7 @@ impl Store for SqlStore { } fn get_signing_key_seed(&self) -> Result>, StorageError> { - let conn = self.lock_conn()?; + let conn = self.get_conn()?; let mut stmt = conn .prepare("SELECT seed_data FROM server_signing_key WHERE id = 1") .map_err(|e| StorageError::Db(e.to_string()))?; @@ -349,7 +394,7 @@ impl Store for SqlStore { } fn save_kt_log(&self, bytes: Vec) -> Result<(), StorageError> { - let conn = self.lock_conn()?; + let conn = self.get_conn()?; conn.execute( "INSERT OR REPLACE INTO kt_log (id, log_data) VALUES (1, ?1)", params![bytes], @@ -359,7 +404,7 @@ impl Store for SqlStore { } fn load_kt_log(&self) -> Result>, StorageError> { - let conn = self.lock_conn()?; + let conn = self.get_conn()?; let mut stmt = conn .prepare("SELECT log_data FROM kt_log WHERE id = 1") .map_err(|e| StorageError::Db(e.to_string()))?; @@ -370,7 +415,7 @@ impl Store for SqlStore { } fn store_user_record(&self, username: &str, record: Vec) -> Result<(), StorageError> { - let conn = self.lock_conn()?; + let conn = self.get_conn()?; conn.execute( "INSERT INTO users (username, opaque_record) VALUES (?1, ?2)", params![username, record], @@ -387,7 +432,7 @@ impl Store for SqlStore { } fn get_user_record(&self, username: &str) -> Result>, StorageError> { - let conn = self.lock_conn()?; + let conn = self.get_conn()?; let mut stmt = conn .prepare("SELECT opaque_record FROM users WHERE username = ?1") .map_err(|e| StorageError::Db(e.to_string()))?; @@ -398,7 +443,7 @@ impl Store for SqlStore { } fn has_user_record(&self, username: &str) -> Result { - let conn = self.lock_conn()?; + let conn = self.get_conn()?; let exists: bool = conn .query_row( "SELECT EXISTS(SELECT 1 FROM users WHERE username = ?1)", @@ -414,7 +459,7 @@ impl Store for SqlStore { username: &str, identity_key: Vec, ) -> Result<(), StorageError> { - let conn = self.lock_conn()?; + let conn = self.get_conn()?; conn.execute( "INSERT OR REPLACE INTO user_identity_keys (username, identity_key) VALUES (?1, ?2)", params![username, identity_key], @@ -424,7 +469,7 @@ impl Store for SqlStore { } fn get_user_identity_key(&self, username: &str) -> Result>, StorageError> { - let conn = self.lock_conn()?; + let conn = self.get_conn()?; let mut stmt = conn .prepare("SELECT identity_key FROM user_identity_keys WHERE username = ?1") .map_err(|e| StorageError::Db(e.to_string()))?; @@ -435,7 +480,7 @@ impl Store for SqlStore { } fn resolve_identity_key(&self, identity_key: &[u8]) -> Result, StorageError> { - let conn = self.lock_conn()?; + let conn = self.get_conn()?; let mut stmt = conn .prepare("SELECT username FROM user_identity_keys WHERE identity_key = ?1") .map_err(|e| StorageError::Db(e.to_string()))?; @@ -451,7 +496,7 @@ impl Store for SqlStore { channel_id: &[u8], limit: usize, ) -> Result)>, StorageError> { - let conn = self.lock_conn()?; + let conn = self.get_conn()?; let sql = if limit == 0 { "SELECT seq, payload FROM deliveries @@ -488,7 +533,7 @@ impl Store for SqlStore { channel_id: &[u8], seq_up_to: u64, ) -> Result { - let conn = self.lock_conn()?; + let conn = self.get_conn()?; let deleted = conn .execute( "DELETE FROM deliveries WHERE recipient_key = ?1 AND channel_id = ?2 AND seq <= ?3", @@ -503,7 +548,7 @@ impl Store for SqlStore { identity_key: &[u8], node_addr: Vec, ) -> Result<(), StorageError> { - let conn = self.lock_conn()?; + let conn = self.get_conn()?; conn.execute( "INSERT OR REPLACE INTO endpoints (identity_key, node_addr) VALUES (?1, ?2)", params![identity_key, node_addr], @@ -513,7 +558,7 @@ impl Store for SqlStore { } fn resolve_endpoint(&self, identity_key: &[u8]) -> Result>, StorageError> { - let conn = self.lock_conn()?; + let conn = self.get_conn()?; let mut stmt = conn .prepare("SELECT node_addr FROM endpoints WHERE identity_key = ?1") .map_err(|e| StorageError::Db(e.to_string()))?; @@ -529,7 +574,7 @@ impl Store for SqlStore { } else { (member_b.to_vec(), member_a.to_vec()) }; - let conn = self.lock_conn()?; + let conn = self.get_conn()?; let existing: Option> = conn .query_row( "SELECT channel_id FROM channels WHERE member_a = ?1 AND member_b = ?2", @@ -552,7 +597,7 @@ impl Store for SqlStore { } fn get_channel_members(&self, channel_id: &[u8]) -> Result, Vec)>, StorageError> { - let conn = self.lock_conn()?; + let conn = self.get_conn()?; conn.query_row( "SELECT member_a, member_b FROM channels WHERE channel_id = ?1", params![channel_id], @@ -567,7 +612,7 @@ impl Store for SqlStore { identity_key: &[u8], home_server: &str, ) -> Result<(), StorageError> { - let conn = self.lock_conn()?; + let conn = self.get_conn()?; let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() @@ -584,7 +629,7 @@ impl Store for SqlStore { &self, identity_key: &[u8], ) -> Result, StorageError> { - let conn = self.lock_conn()?; + let conn = self.get_conn()?; let mut stmt = conn .prepare("SELECT home_server FROM identity_home_servers WHERE identity_key = ?1") .map_err(|e| StorageError::Db(e.to_string()))?; @@ -598,7 +643,7 @@ impl Store for SqlStore { domain: &str, is_active: bool, ) -> Result<(), StorageError> { - let conn = self.lock_conn()?; + let conn = self.get_conn()?; let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() @@ -613,7 +658,7 @@ impl Store for SqlStore { } fn list_federation_peers(&self) -> Result, StorageError> { - let conn = self.lock_conn()?; + let conn = self.get_conn()?; let mut stmt = conn .prepare("SELECT domain, is_active FROM federation_peers WHERE is_active = 1") .map_err(|e| StorageError::Db(e.to_string()))?; @@ -628,7 +673,7 @@ impl Store for SqlStore { } fn delete_account(&self, identity_key: &[u8]) -> Result<(), StorageError> { - let conn = self.lock_conn()?; + let conn = self.get_conn()?; // Resolve the username for this identity key. let username: Option = conn @@ -719,7 +764,7 @@ impl Store for SqlStore { } fn register_device(&self, identity_key: &[u8], device_id: &[u8], device_name: &str) -> Result { - let conn = self.lock_conn()?; + let conn = self.get_conn()?; // Check if device already exists. let exists: bool = conn .query_row( @@ -740,7 +785,7 @@ impl Store for SqlStore { } fn list_devices(&self, identity_key: &[u8]) -> Result, String, u64)>, StorageError> { - let conn = self.lock_conn()?; + let conn = self.get_conn()?; let mut stmt = conn .prepare("SELECT device_id, device_name, registered_at FROM devices WHERE identity_key = ?1 ORDER BY registered_at ASC") .map_err(|e| StorageError::Db(e.to_string()))?; @@ -759,7 +804,7 @@ impl Store for SqlStore { } fn revoke_device(&self, identity_key: &[u8], device_id: &[u8]) -> Result { - let conn = self.lock_conn()?; + let conn = self.get_conn()?; let deleted = conn .execute( "DELETE FROM devices WHERE identity_key = ?1 AND device_id = ?2", @@ -770,7 +815,7 @@ impl Store for SqlStore { } fn device_count(&self, identity_key: &[u8]) -> Result { - let conn = self.lock_conn()?; + let conn = self.get_conn()?; let count: i64 = conn .query_row( "SELECT COUNT(*) FROM devices WHERE identity_key = ?1", @@ -780,6 +825,167 @@ impl Store for SqlStore { .map_err(|e| StorageError::Db(e.to_string()))?; Ok(count as usize) } + + // ── Session persistence ──────────────────────────────────────────────── + + fn store_session(&self, token: &[u8], record: &SessionRecord) -> Result<(), StorageError> { + let conn = self.get_conn()?; + conn.execute( + "INSERT OR REPLACE INTO sessions (token, username, identity_key, created_at, expires_at) VALUES (?1, ?2, ?3, ?4, ?5)", + params![token, record.username, record.identity_key, record.created_at as i64, record.expires_at as i64], + ) + .map_err(|e| StorageError::Db(e.to_string()))?; + Ok(()) + } + + fn get_session(&self, token: &[u8]) -> Result, StorageError> { + let conn = self.get_conn()?; + let mut stmt = conn + .prepare("SELECT username, identity_key, created_at, expires_at FROM sessions WHERE token = ?1") + .map_err(|e| StorageError::Db(e.to_string()))?; + + stmt.query_row(params![token], |row| { + Ok(SessionRecord { + username: row.get(0)?, + identity_key: row.get(1)?, + created_at: row.get::<_, i64>(2)? as u64, + expires_at: row.get::<_, i64>(3)? as u64, + }) + }) + .optional() + .map_err(|e| StorageError::Db(e.to_string())) + } + + fn delete_expired_sessions(&self, now: u64) -> Result { + let conn = self.get_conn()?; + let deleted = conn + .execute( + "DELETE FROM sessions WHERE expires_at <= ?1", + params![now as i64], + ) + .map_err(|e| StorageError::Db(e.to_string()))?; + Ok(deleted) + } + + fn delete_session(&self, token: &[u8]) -> Result<(), StorageError> { + let conn = self.get_conn()?; + conn.execute("DELETE FROM sessions WHERE token = ?1", params![token]) + .map_err(|e| StorageError::Db(e.to_string()))?; + Ok(()) + } + + // ── Blob storage ─────────────────────────────────────────────────────── + + fn store_blob_chunk( + &self, + blob_hash: &[u8], + chunk: &[u8], + offset: u64, + total_size: u64, + mime_type: &str, + ) -> Result>, StorageError> { + let conn = self.get_conn()?; + + // Insert chunk into staging. + conn.execute( + "INSERT OR REPLACE INTO blob_staging (blob_hash, offset, chunk, total_size, mime_type) VALUES (?1, ?2, ?3, ?4, ?5)", + params![blob_hash, offset as i64, chunk, total_size as i64, mime_type], + ) + .map_err(|e| StorageError::Db(e.to_string()))?; + + // Check if all chunks have arrived. + let staged_size: i64 = conn + .query_row( + "SELECT COALESCE(SUM(LENGTH(chunk)), 0) FROM blob_staging WHERE blob_hash = ?1", + params![blob_hash], + |row| row.get(0), + ) + .map_err(|e| StorageError::Db(e.to_string()))?; + + if staged_size as u64 != total_size { + return Ok(None); + } + + // All chunks received — assemble in offset order. + let mut stmt = conn + .prepare("SELECT chunk FROM blob_staging WHERE blob_hash = ?1 ORDER BY offset ASC") + .map_err(|e| StorageError::Db(e.to_string()))?; + + let chunks: Vec> = stmt + .query_map(params![blob_hash], |row| row.get(0)) + .map_err(|e| StorageError::Db(e.to_string()))? + .collect::, _>>() + .map_err(|e| StorageError::Db(e.to_string()))?; + + let mut assembled = Vec::with_capacity(total_size as usize); + for c in &chunks { + assembled.extend_from_slice(c); + } + + // Verify SHA-256. + let hash = Sha256::digest(&assembled); + if hash.as_slice() != blob_hash { + // Clean up staging rows for this blob. + conn.execute( + "DELETE FROM blob_staging WHERE blob_hash = ?1", + params![blob_hash], + ) + .map_err(|e| StorageError::Db(e.to_string()))?; + return Err(StorageError::Db( + "blob hash mismatch after assembly".into(), + )); + } + + // Use the hash as the blob_id (content-addressable). + let blob_id = hash.to_vec(); + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs() as i64; + + conn.execute( + "INSERT OR REPLACE INTO blobs (blob_id, data, total_size, mime_type, uploaded_at) VALUES (?1, ?2, ?3, ?4, ?5)", + params![blob_id, assembled, total_size as i64, mime_type, now], + ) + .map_err(|e| StorageError::Db(e.to_string()))?; + + // Clean up staging. + conn.execute( + "DELETE FROM blob_staging WHERE blob_hash = ?1", + params![blob_hash], + ) + .map_err(|e| StorageError::Db(e.to_string()))?; + + Ok(Some(blob_id)) + } + + fn get_blob_chunk( + &self, + blob_id: &[u8], + offset: u64, + length: u32, + ) -> Result, u64, String)>, StorageError> { + let conn = self.get_conn()?; + let mut stmt = conn + .prepare( + "SELECT substr(data, ?2, ?3), total_size, mime_type FROM blobs WHERE blob_id = ?1", + ) + .map_err(|e| StorageError::Db(e.to_string()))?; + + // SQLite substr is 1-indexed. + stmt.query_row( + params![blob_id, (offset + 1) as i64, length as i64], + |row| { + Ok(( + row.get::<_, Vec>(0)?, + row.get::<_, i64>(1)? as u64, + row.get::<_, String>(2)?, + )) + }, + ) + .optional() + .map_err(|e| StorageError::Db(e.to_string())) + } } /// Convenience extension for `rusqlite::OptionalExtension`. @@ -803,7 +1009,8 @@ mod tests { use std::path::PathBuf; fn open_in_memory() -> SqlStore { - SqlStore::open(":memory:", "").unwrap() + // Pool size 1 for :memory: — each connection is a separate DB. + SqlStore::open_with_pool_size(":memory:", "", 1).unwrap() } #[test] @@ -813,7 +1020,7 @@ mod tests { { let store = SqlStore::open(&db_path, "").expect("open store"); - let _guard = store.lock_conn().unwrap(); + let _guard = store.get_conn().unwrap(); } let conn = rusqlite::Connection::open(&db_path).expect("reopen db"); diff --git a/crates/quicproquo-server/src/storage.rs b/crates/quicproquo-server/src/storage.rs index 62a7098..54773ff 100644 --- a/crates/quicproquo-server/src/storage.rs +++ b/crates/quicproquo-server/src/storage.rs @@ -22,6 +22,14 @@ pub enum StorageError { DuplicateUser(String), } +/// A persisted session record mapping a bearer token to an authenticated user. +pub struct SessionRecord { + pub username: String, + pub identity_key: Vec, + pub created_at: u64, + pub expires_at: u64, +} + fn lock(m: &Mutex) -> Result, StorageError> { m.lock() .map_err(|e| StorageError::Io(format!("lock poisoned: {e}"))) @@ -199,6 +207,55 @@ pub trait Store: Send + Sync { /// Return the number of registered devices for an identity. fn device_count(&self, identity_key: &[u8]) -> Result; + + // ── Session persistence ──────────────────────────────────────────────── + + /// Store a session token → record mapping. + fn store_session(&self, _token: &[u8], _record: &SessionRecord) -> Result<(), StorageError> { + Ok(()) + } + + /// Retrieve a session record by bearer token. + fn get_session(&self, _token: &[u8]) -> Result, StorageError> { + Ok(None) + } + + /// Delete all sessions whose `expires_at` <= `now`. Returns count deleted. + fn delete_expired_sessions(&self, _now: u64) -> Result { + Ok(0) + } + + /// Delete a single session by token. + fn delete_session(&self, _token: &[u8]) -> Result<(), StorageError> { + Ok(()) + } + + // ── Blob storage ─────────────────────────────────────────────────────── + + /// Append a chunk to the staging area for an in-progress upload. + /// When all chunks have arrived (sum of chunk sizes == `total_size`), assembles the blob, + /// verifies its SHA-256 hash against `blob_hash`, inserts into permanent storage, and + /// returns `Some(blob_id)`. Otherwise returns `None`. + fn store_blob_chunk( + &self, + _blob_hash: &[u8], + _chunk: &[u8], + _offset: u64, + _total_size: u64, + _mime_type: &str, + ) -> Result>, StorageError> { + Err(StorageError::Io("blob storage not supported".into())) + } + + /// Read a slice of a completed blob. Returns `(chunk_data, total_size, mime_type)`. + fn get_blob_chunk( + &self, + _blob_id: &[u8], + _offset: u64, + _length: u32, + ) -> Result, u64, String)>, StorageError> { + Err(StorageError::Io("blob storage not supported".into())) + } } // ── ChannelKey ───────────────────────────────────────────────────────────────