feat(server): connection pool, session persistence, blob storage in SqlStore

- Replace Mutex<Connection> with Vec<Mutex<Connection>> pool (default 4)
  with try_lock fast-path and blocking fallback
- Add SessionRecord struct and session CRUD to Store trait (default no-ops)
- Implement session persistence in SqlStore (sessions table, migration 009)
- Add blob upload/download with SHA-256 verified staging assembly
  (blobs + blob_staging tables, migration 010)
- All 35 server tests pass, FileBackedStore unaffected
This commit is contained in:
2026-03-04 12:09:03 +01:00
parent f09dbe10ce
commit 6273ab668d
4 changed files with 346 additions and 59 deletions

View File

@@ -0,0 +1,8 @@
CREATE TABLE IF NOT EXISTS sessions (
token BLOB PRIMARY KEY,
username TEXT NOT NULL,
identity_key BLOB NOT NULL,
created_at INTEGER NOT NULL,
expires_at INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_sessions_expires ON sessions(expires_at);

View File

@@ -0,0 +1,15 @@
CREATE TABLE IF NOT EXISTS blobs (
blob_id BLOB PRIMARY KEY,
data BLOB NOT NULL,
total_size INTEGER NOT NULL,
mime_type TEXT NOT NULL DEFAULT '',
uploaded_at INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS blob_staging (
blob_hash BLOB NOT NULL,
offset INTEGER NOT NULL,
chunk BLOB NOT NULL,
total_size INTEGER NOT NULL,
mime_type TEXT NOT NULL DEFAULT '',
PRIMARY KEY (blob_hash, offset)
);

View File

@@ -5,11 +5,15 @@ use std::sync::Mutex;
use rand::RngCore; use rand::RngCore;
use rusqlite::{params, Connection}; use rusqlite::{params, Connection};
use sha2::{Digest, Sha256};
use crate::storage::{StorageError, Store}; use crate::storage::{SessionRecord, StorageError, Store};
/// Schema version after introducing the migration runner (existing DBs had 1). /// Schema version after introducing the migration runner (existing DBs had 1).
const SCHEMA_VERSION: i32 = 9; const SCHEMA_VERSION: i32 = 11;
/// Default number of connections in the pool.
const DEFAULT_POOL_SIZE: usize = 4;
/// Migrations: (migration_number, SQL). Files named NNN_name.sql, applied in order when N > user_version. /// Migrations: (migration_number, SQL). Files named NNN_name.sql, applied in order when N > user_version.
const MIGRATIONS: &[(i32, &str)] = &[ const MIGRATIONS: &[(i32, &str)] = &[
@@ -21,6 +25,8 @@ const MIGRATIONS: &[(i32, &str)] = &[
(7, include_str!("../migrations/006_kt_log.sql")), (7, include_str!("../migrations/006_kt_log.sql")),
(8, include_str!("../migrations/007_add_expiry.sql")), (8, include_str!("../migrations/007_add_expiry.sql")),
(9, include_str!("../migrations/008_devices.sql")), (9, include_str!("../migrations/008_devices.sql")),
(10, include_str!("../migrations/009_sessions.sql")),
(11, include_str!("../migrations/010_blobs.sql")),
]; ];
/// Runs pending migrations on an open connection: applies any migration whose number is greater /// Runs pending migrations on an open connection: applies any migration whose number is greater
@@ -41,19 +47,72 @@ fn run_migrations(conn: &Connection) -> Result<(), StorageError> {
Ok(()) Ok(())
} }
/// SQLCipher-encrypted storage backend. /// SQLCipher-encrypted storage backend with a connection pool.
///
/// Maintains `pool_size` SQLite connections (default 4) behind `std::sync::Mutex`.
/// Each store method tries all connections via `try_lock()` before falling back to
/// blocking on the first connection. WAL mode allows concurrent readers; writers
/// are serialised by SQLite itself.
pub struct SqlStore { pub struct SqlStore {
conn: Mutex<Connection>, pool: Vec<Mutex<Connection>>,
} }
impl SqlStore { impl SqlStore {
fn lock_conn(&self) -> Result<std::sync::MutexGuard<'_, Connection>, StorageError> { /// Try to acquire any connection from the pool without blocking.
self.conn /// Falls back to blocking on the first connection.
fn get_conn(&self) -> Result<std::sync::MutexGuard<'_, Connection>, StorageError> {
// Fast path: try each connection without blocking.
for conn in &self.pool {
if let Ok(guard) = conn.try_lock() {
return Ok(guard);
}
}
// Slow path: block on the first connection.
self.pool[0]
.lock() .lock()
.map_err(|e| StorageError::Db(format!("lock poisoned: {e}"))) .map_err(|e| StorageError::Db(format!("lock poisoned: {e}")))
} }
pub fn open(path: impl AsRef<Path>, key: &str) -> Result<Self, StorageError> { pub fn open(path: impl AsRef<Path>, key: &str) -> Result<Self, StorageError> {
Self::open_with_pool_size(path, key, DEFAULT_POOL_SIZE)
}
pub fn open_with_pool_size(
path: impl AsRef<Path>,
key: &str,
pool_size: usize,
) -> Result<Self, StorageError> {
let pool_size = pool_size.max(1);
let path = path.as_ref();
// Open the first connection and run migrations.
let first = Self::open_one(path, key)?;
let current_version: i32 = first
.pragma_query_value(None, "user_version", |row| row.get(0))
.map_err(|e| StorageError::Db(format!("PRAGMA user_version failed: {e}")))?;
if current_version > SCHEMA_VERSION {
return Err(StorageError::Db(format!(
"database schema version {current_version} is newer than supported {SCHEMA_VERSION}"
)));
}
run_migrations(&first)?;
let mut pool = Vec::with_capacity(pool_size);
pool.push(Mutex::new(first));
// Open remaining connections (they skip migrations since the first one already ran them).
for _ in 1..pool_size {
let conn = Self::open_one(path, key)?;
pool.push(Mutex::new(conn));
}
Ok(Self { pool })
}
/// Open a single connection with shared pragmas.
fn open_one(path: &Path, key: &str) -> Result<Connection, StorageError> {
let conn = Connection::open(path).map_err(|e| StorageError::Db(e.to_string()))?; let conn = Connection::open(path).map_err(|e| StorageError::Db(e.to_string()))?;
if !key.is_empty() { if !key.is_empty() {
@@ -68,21 +127,7 @@ impl SqlStore {
) )
.map_err(|e| StorageError::Db(e.to_string()))?; .map_err(|e| StorageError::Db(e.to_string()))?;
let current_version: i32 = conn Ok(conn)
.pragma_query_value(None, "user_version", |row| row.get(0))
.map_err(|e| StorageError::Db(format!("PRAGMA user_version failed: {e}")))?;
if current_version > SCHEMA_VERSION {
return Err(StorageError::Db(format!(
"database schema version {current_version} is newer than supported {SCHEMA_VERSION}"
)));
}
run_migrations(&conn)?;
Ok(Self {
conn: Mutex::new(conn),
})
} }
} }
@@ -92,7 +137,7 @@ impl Store for SqlStore {
identity_key: &[u8], identity_key: &[u8],
package: Vec<u8>, package: Vec<u8>,
) -> Result<(), StorageError> { ) -> Result<(), StorageError> {
let conn = self.lock_conn()?; let conn = self.get_conn()?;
conn.execute( conn.execute(
"INSERT INTO key_packages (identity_key, package_data) VALUES (?1, ?2)", "INSERT INTO key_packages (identity_key, package_data) VALUES (?1, ?2)",
params![identity_key, package], params![identity_key, package],
@@ -102,7 +147,7 @@ impl Store for SqlStore {
} }
fn fetch_key_package(&self, identity_key: &[u8]) -> Result<Option<Vec<u8>>, StorageError> { fn fetch_key_package(&self, identity_key: &[u8]) -> Result<Option<Vec<u8>>, StorageError> {
let conn = self.lock_conn()?; let conn = self.get_conn()?;
let mut stmt = conn let mut stmt = conn
.prepare( .prepare(
@@ -137,7 +182,7 @@ impl Store for SqlStore {
payload: Vec<u8>, payload: Vec<u8>,
ttl_secs: Option<u32>, ttl_secs: Option<u32>,
) -> Result<u64, StorageError> { ) -> Result<u64, StorageError> {
let conn = self.lock_conn()?; let conn = self.get_conn()?;
// Atomically get-and-increment the per-inbox sequence counter. // Atomically get-and-increment the per-inbox sequence counter.
// RETURNING gives us the post-update next_seq; the assigned seq is next_seq - 1. // RETURNING gives us the post-update next_seq; the assigned seq is next_seq - 1.
let seq: i64 = conn let seq: i64 = conn
@@ -170,7 +215,7 @@ impl Store for SqlStore {
recipient_key: &[u8], recipient_key: &[u8],
channel_id: &[u8], channel_id: &[u8],
) -> Result<Vec<(u64, Vec<u8>)>, StorageError> { ) -> Result<Vec<(u64, Vec<u8>)>, StorageError> {
let conn = self.lock_conn()?; let conn = self.get_conn()?;
let mut stmt = conn let mut stmt = conn
.prepare( .prepare(
@@ -210,7 +255,7 @@ impl Store for SqlStore {
channel_id: &[u8], channel_id: &[u8],
limit: usize, limit: usize,
) -> Result<Vec<(u64, Vec<u8>)>, StorageError> { ) -> Result<Vec<(u64, Vec<u8>)>, StorageError> {
let conn = self.lock_conn()?; let conn = self.get_conn()?;
let mut stmt = conn let mut stmt = conn
.prepare( .prepare(
@@ -246,7 +291,7 @@ impl Store for SqlStore {
} }
fn queue_depth(&self, recipient_key: &[u8], channel_id: &[u8]) -> Result<usize, StorageError> { fn queue_depth(&self, recipient_key: &[u8], channel_id: &[u8]) -> Result<usize, StorageError> {
let conn = self.lock_conn()?; let conn = self.get_conn()?;
let count: i64 = conn let count: i64 = conn
.query_row( .query_row(
"SELECT COUNT(*) FROM deliveries WHERE recipient_key = ?1 AND channel_id = ?2 AND (expires_at IS NULL OR expires_at > strftime('%s','now'))", "SELECT COUNT(*) FROM deliveries WHERE recipient_key = ?1 AND channel_id = ?2 AND (expires_at IS NULL OR expires_at > strftime('%s','now'))",
@@ -258,7 +303,7 @@ impl Store for SqlStore {
} }
fn gc_expired_messages(&self, max_age_secs: u64) -> Result<usize, StorageError> { fn gc_expired_messages(&self, max_age_secs: u64) -> Result<usize, StorageError> {
let conn = self.lock_conn()?; let conn = self.get_conn()?;
let now = std::time::SystemTime::now() let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH) .duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default() .unwrap_or_default()
@@ -286,7 +331,7 @@ impl Store for SqlStore {
identity_key: &[u8], identity_key: &[u8],
hybrid_pk: Vec<u8>, hybrid_pk: Vec<u8>,
) -> Result<(), StorageError> { ) -> Result<(), StorageError> {
let conn = self.lock_conn()?; let conn = self.get_conn()?;
conn.execute( conn.execute(
"INSERT OR REPLACE INTO hybrid_keys (identity_key, hybrid_public_key) VALUES (?1, ?2)", "INSERT OR REPLACE INTO hybrid_keys (identity_key, hybrid_public_key) VALUES (?1, ?2)",
params![identity_key, hybrid_pk], params![identity_key, hybrid_pk],
@@ -296,7 +341,7 @@ impl Store for SqlStore {
} }
fn fetch_hybrid_key(&self, identity_key: &[u8]) -> Result<Option<Vec<u8>>, StorageError> { fn fetch_hybrid_key(&self, identity_key: &[u8]) -> Result<Option<Vec<u8>>, StorageError> {
let conn = self.lock_conn()?; let conn = self.get_conn()?;
let mut stmt = conn let mut stmt = conn
.prepare("SELECT hybrid_public_key FROM hybrid_keys WHERE identity_key = ?1") .prepare("SELECT hybrid_public_key FROM hybrid_keys WHERE identity_key = ?1")
.map_err(|e| StorageError::Db(e.to_string()))?; .map_err(|e| StorageError::Db(e.to_string()))?;
@@ -307,7 +352,7 @@ impl Store for SqlStore {
} }
fn store_server_setup(&self, setup: Vec<u8>) -> Result<(), StorageError> { fn store_server_setup(&self, setup: Vec<u8>) -> Result<(), StorageError> {
let conn = self.lock_conn()?; let conn = self.get_conn()?;
conn.execute( conn.execute(
"INSERT OR REPLACE INTO server_setup (id, setup_data) VALUES (1, ?1)", "INSERT OR REPLACE INTO server_setup (id, setup_data) VALUES (1, ?1)",
params![setup], params![setup],
@@ -317,7 +362,7 @@ impl Store for SqlStore {
} }
fn get_server_setup(&self) -> Result<Option<Vec<u8>>, StorageError> { fn get_server_setup(&self) -> Result<Option<Vec<u8>>, StorageError> {
let conn = self.lock_conn()?; let conn = self.get_conn()?;
let mut stmt = conn let mut stmt = conn
.prepare("SELECT setup_data FROM server_setup WHERE id = 1") .prepare("SELECT setup_data FROM server_setup WHERE id = 1")
.map_err(|e| StorageError::Db(e.to_string()))?; .map_err(|e| StorageError::Db(e.to_string()))?;
@@ -328,7 +373,7 @@ impl Store for SqlStore {
} }
fn store_signing_key_seed(&self, seed: Vec<u8>) -> Result<(), StorageError> { fn store_signing_key_seed(&self, seed: Vec<u8>) -> Result<(), StorageError> {
let conn = self.lock_conn()?; let conn = self.get_conn()?;
conn.execute( conn.execute(
"INSERT OR REPLACE INTO server_signing_key (id, seed_data) VALUES (1, ?1)", "INSERT OR REPLACE INTO server_signing_key (id, seed_data) VALUES (1, ?1)",
params![seed], params![seed],
@@ -338,7 +383,7 @@ impl Store for SqlStore {
} }
fn get_signing_key_seed(&self) -> Result<Option<Vec<u8>>, StorageError> { fn get_signing_key_seed(&self) -> Result<Option<Vec<u8>>, StorageError> {
let conn = self.lock_conn()?; let conn = self.get_conn()?;
let mut stmt = conn let mut stmt = conn
.prepare("SELECT seed_data FROM server_signing_key WHERE id = 1") .prepare("SELECT seed_data FROM server_signing_key WHERE id = 1")
.map_err(|e| StorageError::Db(e.to_string()))?; .map_err(|e| StorageError::Db(e.to_string()))?;
@@ -349,7 +394,7 @@ impl Store for SqlStore {
} }
fn save_kt_log(&self, bytes: Vec<u8>) -> Result<(), StorageError> { fn save_kt_log(&self, bytes: Vec<u8>) -> Result<(), StorageError> {
let conn = self.lock_conn()?; let conn = self.get_conn()?;
conn.execute( conn.execute(
"INSERT OR REPLACE INTO kt_log (id, log_data) VALUES (1, ?1)", "INSERT OR REPLACE INTO kt_log (id, log_data) VALUES (1, ?1)",
params![bytes], params![bytes],
@@ -359,7 +404,7 @@ impl Store for SqlStore {
} }
fn load_kt_log(&self) -> Result<Option<Vec<u8>>, StorageError> { fn load_kt_log(&self) -> Result<Option<Vec<u8>>, StorageError> {
let conn = self.lock_conn()?; let conn = self.get_conn()?;
let mut stmt = conn let mut stmt = conn
.prepare("SELECT log_data FROM kt_log WHERE id = 1") .prepare("SELECT log_data FROM kt_log WHERE id = 1")
.map_err(|e| StorageError::Db(e.to_string()))?; .map_err(|e| StorageError::Db(e.to_string()))?;
@@ -370,7 +415,7 @@ impl Store for SqlStore {
} }
fn store_user_record(&self, username: &str, record: Vec<u8>) -> Result<(), StorageError> { fn store_user_record(&self, username: &str, record: Vec<u8>) -> Result<(), StorageError> {
let conn = self.lock_conn()?; let conn = self.get_conn()?;
conn.execute( conn.execute(
"INSERT INTO users (username, opaque_record) VALUES (?1, ?2)", "INSERT INTO users (username, opaque_record) VALUES (?1, ?2)",
params![username, record], params![username, record],
@@ -387,7 +432,7 @@ impl Store for SqlStore {
} }
fn get_user_record(&self, username: &str) -> Result<Option<Vec<u8>>, StorageError> { fn get_user_record(&self, username: &str) -> Result<Option<Vec<u8>>, StorageError> {
let conn = self.lock_conn()?; let conn = self.get_conn()?;
let mut stmt = conn let mut stmt = conn
.prepare("SELECT opaque_record FROM users WHERE username = ?1") .prepare("SELECT opaque_record FROM users WHERE username = ?1")
.map_err(|e| StorageError::Db(e.to_string()))?; .map_err(|e| StorageError::Db(e.to_string()))?;
@@ -398,7 +443,7 @@ impl Store for SqlStore {
} }
fn has_user_record(&self, username: &str) -> Result<bool, StorageError> { fn has_user_record(&self, username: &str) -> Result<bool, StorageError> {
let conn = self.lock_conn()?; let conn = self.get_conn()?;
let exists: bool = conn let exists: bool = conn
.query_row( .query_row(
"SELECT EXISTS(SELECT 1 FROM users WHERE username = ?1)", "SELECT EXISTS(SELECT 1 FROM users WHERE username = ?1)",
@@ -414,7 +459,7 @@ impl Store for SqlStore {
username: &str, username: &str,
identity_key: Vec<u8>, identity_key: Vec<u8>,
) -> Result<(), StorageError> { ) -> Result<(), StorageError> {
let conn = self.lock_conn()?; let conn = self.get_conn()?;
conn.execute( conn.execute(
"INSERT OR REPLACE INTO user_identity_keys (username, identity_key) VALUES (?1, ?2)", "INSERT OR REPLACE INTO user_identity_keys (username, identity_key) VALUES (?1, ?2)",
params![username, identity_key], params![username, identity_key],
@@ -424,7 +469,7 @@ impl Store for SqlStore {
} }
fn get_user_identity_key(&self, username: &str) -> Result<Option<Vec<u8>>, StorageError> { fn get_user_identity_key(&self, username: &str) -> Result<Option<Vec<u8>>, StorageError> {
let conn = self.lock_conn()?; let conn = self.get_conn()?;
let mut stmt = conn let mut stmt = conn
.prepare("SELECT identity_key FROM user_identity_keys WHERE username = ?1") .prepare("SELECT identity_key FROM user_identity_keys WHERE username = ?1")
.map_err(|e| StorageError::Db(e.to_string()))?; .map_err(|e| StorageError::Db(e.to_string()))?;
@@ -435,7 +480,7 @@ impl Store for SqlStore {
} }
fn resolve_identity_key(&self, identity_key: &[u8]) -> Result<Option<String>, StorageError> { fn resolve_identity_key(&self, identity_key: &[u8]) -> Result<Option<String>, StorageError> {
let conn = self.lock_conn()?; let conn = self.get_conn()?;
let mut stmt = conn let mut stmt = conn
.prepare("SELECT username FROM user_identity_keys WHERE identity_key = ?1") .prepare("SELECT username FROM user_identity_keys WHERE identity_key = ?1")
.map_err(|e| StorageError::Db(e.to_string()))?; .map_err(|e| StorageError::Db(e.to_string()))?;
@@ -451,7 +496,7 @@ impl Store for SqlStore {
channel_id: &[u8], channel_id: &[u8],
limit: usize, limit: usize,
) -> Result<Vec<(u64, Vec<u8>)>, StorageError> { ) -> Result<Vec<(u64, Vec<u8>)>, StorageError> {
let conn = self.lock_conn()?; let conn = self.get_conn()?;
let sql = if limit == 0 { let sql = if limit == 0 {
"SELECT seq, payload FROM deliveries "SELECT seq, payload FROM deliveries
@@ -488,7 +533,7 @@ impl Store for SqlStore {
channel_id: &[u8], channel_id: &[u8],
seq_up_to: u64, seq_up_to: u64,
) -> Result<usize, StorageError> { ) -> Result<usize, StorageError> {
let conn = self.lock_conn()?; let conn = self.get_conn()?;
let deleted = conn let deleted = conn
.execute( .execute(
"DELETE FROM deliveries WHERE recipient_key = ?1 AND channel_id = ?2 AND seq <= ?3", "DELETE FROM deliveries WHERE recipient_key = ?1 AND channel_id = ?2 AND seq <= ?3",
@@ -503,7 +548,7 @@ impl Store for SqlStore {
identity_key: &[u8], identity_key: &[u8],
node_addr: Vec<u8>, node_addr: Vec<u8>,
) -> Result<(), StorageError> { ) -> Result<(), StorageError> {
let conn = self.lock_conn()?; let conn = self.get_conn()?;
conn.execute( conn.execute(
"INSERT OR REPLACE INTO endpoints (identity_key, node_addr) VALUES (?1, ?2)", "INSERT OR REPLACE INTO endpoints (identity_key, node_addr) VALUES (?1, ?2)",
params![identity_key, node_addr], params![identity_key, node_addr],
@@ -513,7 +558,7 @@ impl Store for SqlStore {
} }
fn resolve_endpoint(&self, identity_key: &[u8]) -> Result<Option<Vec<u8>>, StorageError> { fn resolve_endpoint(&self, identity_key: &[u8]) -> Result<Option<Vec<u8>>, StorageError> {
let conn = self.lock_conn()?; let conn = self.get_conn()?;
let mut stmt = conn let mut stmt = conn
.prepare("SELECT node_addr FROM endpoints WHERE identity_key = ?1") .prepare("SELECT node_addr FROM endpoints WHERE identity_key = ?1")
.map_err(|e| StorageError::Db(e.to_string()))?; .map_err(|e| StorageError::Db(e.to_string()))?;
@@ -529,7 +574,7 @@ impl Store for SqlStore {
} else { } else {
(member_b.to_vec(), member_a.to_vec()) (member_b.to_vec(), member_a.to_vec())
}; };
let conn = self.lock_conn()?; let conn = self.get_conn()?;
let existing: Option<Vec<u8>> = conn let existing: Option<Vec<u8>> = conn
.query_row( .query_row(
"SELECT channel_id FROM channels WHERE member_a = ?1 AND member_b = ?2", "SELECT channel_id FROM channels WHERE member_a = ?1 AND member_b = ?2",
@@ -552,7 +597,7 @@ impl Store for SqlStore {
} }
fn get_channel_members(&self, channel_id: &[u8]) -> Result<Option<(Vec<u8>, Vec<u8>)>, StorageError> { fn get_channel_members(&self, channel_id: &[u8]) -> Result<Option<(Vec<u8>, Vec<u8>)>, StorageError> {
let conn = self.lock_conn()?; let conn = self.get_conn()?;
conn.query_row( conn.query_row(
"SELECT member_a, member_b FROM channels WHERE channel_id = ?1", "SELECT member_a, member_b FROM channels WHERE channel_id = ?1",
params![channel_id], params![channel_id],
@@ -567,7 +612,7 @@ impl Store for SqlStore {
identity_key: &[u8], identity_key: &[u8],
home_server: &str, home_server: &str,
) -> Result<(), StorageError> { ) -> Result<(), StorageError> {
let conn = self.lock_conn()?; let conn = self.get_conn()?;
let now = std::time::SystemTime::now() let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH) .duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default() .unwrap_or_default()
@@ -584,7 +629,7 @@ impl Store for SqlStore {
&self, &self,
identity_key: &[u8], identity_key: &[u8],
) -> Result<Option<String>, StorageError> { ) -> Result<Option<String>, StorageError> {
let conn = self.lock_conn()?; let conn = self.get_conn()?;
let mut stmt = conn let mut stmt = conn
.prepare("SELECT home_server FROM identity_home_servers WHERE identity_key = ?1") .prepare("SELECT home_server FROM identity_home_servers WHERE identity_key = ?1")
.map_err(|e| StorageError::Db(e.to_string()))?; .map_err(|e| StorageError::Db(e.to_string()))?;
@@ -598,7 +643,7 @@ impl Store for SqlStore {
domain: &str, domain: &str,
is_active: bool, is_active: bool,
) -> Result<(), StorageError> { ) -> Result<(), StorageError> {
let conn = self.lock_conn()?; let conn = self.get_conn()?;
let now = std::time::SystemTime::now() let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH) .duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default() .unwrap_or_default()
@@ -613,7 +658,7 @@ impl Store for SqlStore {
} }
fn list_federation_peers(&self) -> Result<Vec<(String, bool)>, StorageError> { fn list_federation_peers(&self) -> Result<Vec<(String, bool)>, StorageError> {
let conn = self.lock_conn()?; let conn = self.get_conn()?;
let mut stmt = conn let mut stmt = conn
.prepare("SELECT domain, is_active FROM federation_peers WHERE is_active = 1") .prepare("SELECT domain, is_active FROM federation_peers WHERE is_active = 1")
.map_err(|e| StorageError::Db(e.to_string()))?; .map_err(|e| StorageError::Db(e.to_string()))?;
@@ -628,7 +673,7 @@ impl Store for SqlStore {
} }
fn delete_account(&self, identity_key: &[u8]) -> Result<(), StorageError> { fn delete_account(&self, identity_key: &[u8]) -> Result<(), StorageError> {
let conn = self.lock_conn()?; let conn = self.get_conn()?;
// Resolve the username for this identity key. // Resolve the username for this identity key.
let username: Option<String> = conn let username: Option<String> = conn
@@ -719,7 +764,7 @@ impl Store for SqlStore {
} }
fn register_device(&self, identity_key: &[u8], device_id: &[u8], device_name: &str) -> Result<bool, StorageError> { fn register_device(&self, identity_key: &[u8], device_id: &[u8], device_name: &str) -> Result<bool, StorageError> {
let conn = self.lock_conn()?; let conn = self.get_conn()?;
// Check if device already exists. // Check if device already exists.
let exists: bool = conn let exists: bool = conn
.query_row( .query_row(
@@ -740,7 +785,7 @@ impl Store for SqlStore {
} }
fn list_devices(&self, identity_key: &[u8]) -> Result<Vec<(Vec<u8>, String, u64)>, StorageError> { fn list_devices(&self, identity_key: &[u8]) -> Result<Vec<(Vec<u8>, String, u64)>, StorageError> {
let conn = self.lock_conn()?; let conn = self.get_conn()?;
let mut stmt = conn let mut stmt = conn
.prepare("SELECT device_id, device_name, registered_at FROM devices WHERE identity_key = ?1 ORDER BY registered_at ASC") .prepare("SELECT device_id, device_name, registered_at FROM devices WHERE identity_key = ?1 ORDER BY registered_at ASC")
.map_err(|e| StorageError::Db(e.to_string()))?; .map_err(|e| StorageError::Db(e.to_string()))?;
@@ -759,7 +804,7 @@ impl Store for SqlStore {
} }
fn revoke_device(&self, identity_key: &[u8], device_id: &[u8]) -> Result<bool, StorageError> { fn revoke_device(&self, identity_key: &[u8], device_id: &[u8]) -> Result<bool, StorageError> {
let conn = self.lock_conn()?; let conn = self.get_conn()?;
let deleted = conn let deleted = conn
.execute( .execute(
"DELETE FROM devices WHERE identity_key = ?1 AND device_id = ?2", "DELETE FROM devices WHERE identity_key = ?1 AND device_id = ?2",
@@ -770,7 +815,7 @@ impl Store for SqlStore {
} }
fn device_count(&self, identity_key: &[u8]) -> Result<usize, StorageError> { fn device_count(&self, identity_key: &[u8]) -> Result<usize, StorageError> {
let conn = self.lock_conn()?; let conn = self.get_conn()?;
let count: i64 = conn let count: i64 = conn
.query_row( .query_row(
"SELECT COUNT(*) FROM devices WHERE identity_key = ?1", "SELECT COUNT(*) FROM devices WHERE identity_key = ?1",
@@ -780,6 +825,167 @@ impl Store for SqlStore {
.map_err(|e| StorageError::Db(e.to_string()))?; .map_err(|e| StorageError::Db(e.to_string()))?;
Ok(count as usize) Ok(count as usize)
} }
// ── Session persistence ────────────────────────────────────────────────
fn store_session(&self, token: &[u8], record: &SessionRecord) -> Result<(), StorageError> {
let conn = self.get_conn()?;
conn.execute(
"INSERT OR REPLACE INTO sessions (token, username, identity_key, created_at, expires_at) VALUES (?1, ?2, ?3, ?4, ?5)",
params![token, record.username, record.identity_key, record.created_at as i64, record.expires_at as i64],
)
.map_err(|e| StorageError::Db(e.to_string()))?;
Ok(())
}
fn get_session(&self, token: &[u8]) -> Result<Option<SessionRecord>, StorageError> {
let conn = self.get_conn()?;
let mut stmt = conn
.prepare("SELECT username, identity_key, created_at, expires_at FROM sessions WHERE token = ?1")
.map_err(|e| StorageError::Db(e.to_string()))?;
stmt.query_row(params![token], |row| {
Ok(SessionRecord {
username: row.get(0)?,
identity_key: row.get(1)?,
created_at: row.get::<_, i64>(2)? as u64,
expires_at: row.get::<_, i64>(3)? as u64,
})
})
.optional()
.map_err(|e| StorageError::Db(e.to_string()))
}
fn delete_expired_sessions(&self, now: u64) -> Result<usize, StorageError> {
let conn = self.get_conn()?;
let deleted = conn
.execute(
"DELETE FROM sessions WHERE expires_at <= ?1",
params![now as i64],
)
.map_err(|e| StorageError::Db(e.to_string()))?;
Ok(deleted)
}
fn delete_session(&self, token: &[u8]) -> Result<(), StorageError> {
let conn = self.get_conn()?;
conn.execute("DELETE FROM sessions WHERE token = ?1", params![token])
.map_err(|e| StorageError::Db(e.to_string()))?;
Ok(())
}
// ── Blob storage ───────────────────────────────────────────────────────
fn store_blob_chunk(
&self,
blob_hash: &[u8],
chunk: &[u8],
offset: u64,
total_size: u64,
mime_type: &str,
) -> Result<Option<Vec<u8>>, StorageError> {
let conn = self.get_conn()?;
// Insert chunk into staging.
conn.execute(
"INSERT OR REPLACE INTO blob_staging (blob_hash, offset, chunk, total_size, mime_type) VALUES (?1, ?2, ?3, ?4, ?5)",
params![blob_hash, offset as i64, chunk, total_size as i64, mime_type],
)
.map_err(|e| StorageError::Db(e.to_string()))?;
// Check if all chunks have arrived.
let staged_size: i64 = conn
.query_row(
"SELECT COALESCE(SUM(LENGTH(chunk)), 0) FROM blob_staging WHERE blob_hash = ?1",
params![blob_hash],
|row| row.get(0),
)
.map_err(|e| StorageError::Db(e.to_string()))?;
if staged_size as u64 != total_size {
return Ok(None);
}
// All chunks received — assemble in offset order.
let mut stmt = conn
.prepare("SELECT chunk FROM blob_staging WHERE blob_hash = ?1 ORDER BY offset ASC")
.map_err(|e| StorageError::Db(e.to_string()))?;
let chunks: Vec<Vec<u8>> = stmt
.query_map(params![blob_hash], |row| row.get(0))
.map_err(|e| StorageError::Db(e.to_string()))?
.collect::<Result<Vec<_>, _>>()
.map_err(|e| StorageError::Db(e.to_string()))?;
let mut assembled = Vec::with_capacity(total_size as usize);
for c in &chunks {
assembled.extend_from_slice(c);
}
// Verify SHA-256.
let hash = Sha256::digest(&assembled);
if hash.as_slice() != blob_hash {
// Clean up staging rows for this blob.
conn.execute(
"DELETE FROM blob_staging WHERE blob_hash = ?1",
params![blob_hash],
)
.map_err(|e| StorageError::Db(e.to_string()))?;
return Err(StorageError::Db(
"blob hash mismatch after assembly".into(),
));
}
// Use the hash as the blob_id (content-addressable).
let blob_id = hash.to_vec();
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs() as i64;
conn.execute(
"INSERT OR REPLACE INTO blobs (blob_id, data, total_size, mime_type, uploaded_at) VALUES (?1, ?2, ?3, ?4, ?5)",
params![blob_id, assembled, total_size as i64, mime_type, now],
)
.map_err(|e| StorageError::Db(e.to_string()))?;
// Clean up staging.
conn.execute(
"DELETE FROM blob_staging WHERE blob_hash = ?1",
params![blob_hash],
)
.map_err(|e| StorageError::Db(e.to_string()))?;
Ok(Some(blob_id))
}
fn get_blob_chunk(
&self,
blob_id: &[u8],
offset: u64,
length: u32,
) -> Result<Option<(Vec<u8>, u64, String)>, StorageError> {
let conn = self.get_conn()?;
let mut stmt = conn
.prepare(
"SELECT substr(data, ?2, ?3), total_size, mime_type FROM blobs WHERE blob_id = ?1",
)
.map_err(|e| StorageError::Db(e.to_string()))?;
// SQLite substr is 1-indexed.
stmt.query_row(
params![blob_id, (offset + 1) as i64, length as i64],
|row| {
Ok((
row.get::<_, Vec<u8>>(0)?,
row.get::<_, i64>(1)? as u64,
row.get::<_, String>(2)?,
))
},
)
.optional()
.map_err(|e| StorageError::Db(e.to_string()))
}
} }
/// Convenience extension for `rusqlite::OptionalExtension`. /// Convenience extension for `rusqlite::OptionalExtension`.
@@ -803,7 +1009,8 @@ mod tests {
use std::path::PathBuf; use std::path::PathBuf;
fn open_in_memory() -> SqlStore { fn open_in_memory() -> SqlStore {
SqlStore::open(":memory:", "").unwrap() // Pool size 1 for :memory: — each connection is a separate DB.
SqlStore::open_with_pool_size(":memory:", "", 1).unwrap()
} }
#[test] #[test]
@@ -813,7 +1020,7 @@ mod tests {
{ {
let store = SqlStore::open(&db_path, "").expect("open store"); let store = SqlStore::open(&db_path, "").expect("open store");
let _guard = store.lock_conn().unwrap(); let _guard = store.get_conn().unwrap();
} }
let conn = rusqlite::Connection::open(&db_path).expect("reopen db"); let conn = rusqlite::Connection::open(&db_path).expect("reopen db");

View File

@@ -22,6 +22,14 @@ pub enum StorageError {
DuplicateUser(String), DuplicateUser(String),
} }
/// A persisted session record mapping a bearer token to an authenticated user.
pub struct SessionRecord {
pub username: String,
pub identity_key: Vec<u8>,
pub created_at: u64,
pub expires_at: u64,
}
fn lock<T>(m: &Mutex<T>) -> Result<std::sync::MutexGuard<'_, T>, StorageError> { fn lock<T>(m: &Mutex<T>) -> Result<std::sync::MutexGuard<'_, T>, StorageError> {
m.lock() m.lock()
.map_err(|e| StorageError::Io(format!("lock poisoned: {e}"))) .map_err(|e| StorageError::Io(format!("lock poisoned: {e}")))
@@ -199,6 +207,55 @@ pub trait Store: Send + Sync {
/// Return the number of registered devices for an identity. /// Return the number of registered devices for an identity.
fn device_count(&self, identity_key: &[u8]) -> Result<usize, StorageError>; fn device_count(&self, identity_key: &[u8]) -> Result<usize, StorageError>;
// ── Session persistence ────────────────────────────────────────────────
/// Store a session token → record mapping.
fn store_session(&self, _token: &[u8], _record: &SessionRecord) -> Result<(), StorageError> {
Ok(())
}
/// Retrieve a session record by bearer token.
fn get_session(&self, _token: &[u8]) -> Result<Option<SessionRecord>, StorageError> {
Ok(None)
}
/// Delete all sessions whose `expires_at` <= `now`. Returns count deleted.
fn delete_expired_sessions(&self, _now: u64) -> Result<usize, StorageError> {
Ok(0)
}
/// Delete a single session by token.
fn delete_session(&self, _token: &[u8]) -> Result<(), StorageError> {
Ok(())
}
// ── Blob storage ───────────────────────────────────────────────────────
/// Append a chunk to the staging area for an in-progress upload.
/// When all chunks have arrived (sum of chunk sizes == `total_size`), assembles the blob,
/// verifies its SHA-256 hash against `blob_hash`, inserts into permanent storage, and
/// returns `Some(blob_id)`. Otherwise returns `None`.
fn store_blob_chunk(
&self,
_blob_hash: &[u8],
_chunk: &[u8],
_offset: u64,
_total_size: u64,
_mime_type: &str,
) -> Result<Option<Vec<u8>>, StorageError> {
Err(StorageError::Io("blob storage not supported".into()))
}
/// Read a slice of a completed blob. Returns `(chunk_data, total_size, mime_type)`.
fn get_blob_chunk(
&self,
_blob_id: &[u8],
_offset: u64,
_length: u32,
) -> Result<Option<(Vec<u8>, u64, String)>, StorageError> {
Err(StorageError::Io("blob storage not supported".into()))
}
} }
// ── ChannelKey ─────────────────────────────────────────────────────────────── // ── ChannelKey ───────────────────────────────────────────────────────────────