feat: Sprint 6 — disappearing messages, group info, account deletion
- Disappearing messages: ttlSecs param on enqueue/batchEnqueue RPCs, expires_at column (migration 007), server GC deletes expired messages, /disappear command with human-friendly duration parsing (30m, 1h, 1d) - Group info: /group-info shows type, members, MLS epoch; /rename renames conversations; /members resolves usernames via resolveIdentity - Account deletion: deleteAccount @23 RPC with transactional purge of all user data (deliveries, keys, channels), session invalidation, KT log preserved for auditability; /delete-account with confirmation - Added epoch() accessor to GroupMember, enqueue_with_ttl client helper All 35 server + 71 core + 14 E2E tests pass.
This commit is contained in:
2
crates/quicproquo-server/migrations/007_add_expiry.sql
Normal file
2
crates/quicproquo-server/migrations/007_add_expiry.sql
Normal file
@@ -0,0 +1,2 @@
|
||||
ALTER TABLE deliveries ADD COLUMN expires_at INTEGER;
|
||||
CREATE INDEX idx_deliveries_expires ON deliveries(expires_at) WHERE expires_at IS NOT NULL;
|
||||
@@ -30,6 +30,7 @@ pub const E024_BLOB_TOO_LARGE: &str = "E024";
|
||||
pub const E025_BLOB_HASH_LENGTH: &str = "E025";
|
||||
pub const E026_BLOB_HASH_MISMATCH: &str = "E026";
|
||||
pub const E027_BLOB_NOT_FOUND: &str = "E027";
|
||||
pub const E028_ACCOUNT_DELETION_FAILED: &str = "E028";
|
||||
|
||||
/// Build a `capnp::Error::failed()` with the structured code prefix.
|
||||
pub fn coded_error(code: &str, msg: impl std::fmt::Display) -> capnp::Error {
|
||||
|
||||
@@ -54,7 +54,7 @@ impl federation_service::Server for FederationServiceImpl {
|
||||
return Promise::err(capnp::Error::failed("payload must not be empty".into()));
|
||||
}
|
||||
|
||||
let seq = match self.store.enqueue(&recipient_key, &channel_id, payload) {
|
||||
let seq = match self.store.enqueue(&recipient_key, &channel_id, payload, None) {
|
||||
Ok(s) => s,
|
||||
Err(e) => return Promise::err(capnp::Error::failed(format!("store error: {e}"))),
|
||||
};
|
||||
@@ -106,7 +106,7 @@ impl federation_service::Server for FederationServiceImpl {
|
||||
format!("recipient_key[{i}] must be 32 bytes"),
|
||||
));
|
||||
}
|
||||
let seq = match self.store.enqueue(&rk, &channel_id, payload.clone()) {
|
||||
let seq = match self.store.enqueue(&rk, &channel_id, payload.clone(), None) {
|
||||
Ok(s) => s,
|
||||
Err(e) => return Promise::err(capnp::Error::failed(format!("store error: {e}"))),
|
||||
};
|
||||
|
||||
63
crates/quicproquo-server/src/node_service/account_ops.rs
Normal file
63
crates/quicproquo-server/src/node_service/account_ops.rs
Normal file
@@ -0,0 +1,63 @@
|
||||
use capnp::capability::Promise;
|
||||
use quicproquo_proto::node_capnp::node_service;
|
||||
|
||||
use crate::auth::{coded_error, require_identity, validate_auth_context};
|
||||
use crate::error_codes::*;
|
||||
|
||||
use super::NodeServiceImpl;
|
||||
|
||||
impl NodeServiceImpl {
|
||||
pub fn handle_delete_account(
|
||||
&mut self,
|
||||
params: node_service::DeleteAccountParams,
|
||||
mut results: node_service::DeleteAccountResults,
|
||||
) -> Promise<(), capnp::Error> {
|
||||
let p = match params.get() {
|
||||
Ok(p) => p,
|
||||
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
|
||||
};
|
||||
|
||||
// Validate auth and require an identity-bound session.
|
||||
let auth_ctx = match validate_auth_context(&self.auth_cfg, &self.sessions, p.get_auth()) {
|
||||
Ok(ctx) => ctx,
|
||||
Err(e) => return Promise::err(e),
|
||||
};
|
||||
|
||||
let identity_key = match require_identity(&auth_ctx) {
|
||||
Ok(ik) => ik.to_vec(),
|
||||
Err(e) => return Promise::err(e),
|
||||
};
|
||||
|
||||
let identity_prefix = crate::auth::fmt_hex(&identity_key[..8.min(identity_key.len())]);
|
||||
|
||||
// Delete account data from the store.
|
||||
if let Err(e) = self.store.delete_account(&identity_key) {
|
||||
tracing::error!(identity = %identity_prefix, error = %e, "account deletion failed");
|
||||
return Promise::err(coded_error(
|
||||
E028_ACCOUNT_DELETION_FAILED,
|
||||
format!("account deletion failed: {e}"),
|
||||
));
|
||||
}
|
||||
|
||||
// Invalidate all sessions for this identity.
|
||||
let tokens_to_remove: Vec<Vec<u8>> = self
|
||||
.sessions
|
||||
.iter()
|
||||
.filter(|entry| entry.value().identity_key == identity_key)
|
||||
.map(|entry| entry.key().clone())
|
||||
.collect();
|
||||
|
||||
for token in &tokens_to_remove {
|
||||
self.sessions.remove(token);
|
||||
}
|
||||
|
||||
tracing::info!(
|
||||
identity = %identity_prefix,
|
||||
sessions_invalidated = tokens_to_remove.len(),
|
||||
"audit: account deleted"
|
||||
);
|
||||
|
||||
results.get().set_success(true);
|
||||
Promise::ok(())
|
||||
}
|
||||
}
|
||||
@@ -85,6 +85,8 @@ impl NodeServiceImpl {
|
||||
};
|
||||
let channel_id = p.get_channel_id().unwrap_or_default().to_vec();
|
||||
let version = p.get_version();
|
||||
let ttl_secs_raw = p.get_ttl_secs();
|
||||
let ttl_secs = if ttl_secs_raw > 0 { Some(ttl_secs_raw) } else { None };
|
||||
let auth_ctx = match validate_auth_context(&self.auth_cfg, &self.sessions, p.get_auth()) {
|
||||
Ok(ctx) => ctx,
|
||||
Err(e) => return Promise::err(e),
|
||||
@@ -229,7 +231,7 @@ impl NodeServiceImpl {
|
||||
|
||||
let seq = match self
|
||||
.store
|
||||
.enqueue(&recipient_key, &channel_id, payload)
|
||||
.enqueue(&recipient_key, &channel_id, payload, ttl_secs)
|
||||
.map_err(storage_err)
|
||||
{
|
||||
Ok(seq) => seq,
|
||||
@@ -640,6 +642,8 @@ impl NodeServiceImpl {
|
||||
};
|
||||
let channel_id = p.get_channel_id().unwrap_or_default().to_vec();
|
||||
let version = p.get_version();
|
||||
let ttl_secs_raw = p.get_ttl_secs();
|
||||
let ttl_secs = if ttl_secs_raw > 0 { Some(ttl_secs_raw) } else { None };
|
||||
let auth_ctx = match validate_auth_context(&self.auth_cfg, &self.sessions, p.get_auth()) {
|
||||
Ok(ctx) => ctx,
|
||||
Err(e) => return Promise::err(e),
|
||||
@@ -816,7 +820,7 @@ impl NodeServiceImpl {
|
||||
_ => {}
|
||||
}
|
||||
store
|
||||
.enqueue(rk, &channel_id, payload.clone())
|
||||
.enqueue(rk, &channel_id, payload.clone(), ttl_secs)
|
||||
.map_err(storage_err)?
|
||||
}
|
||||
};
|
||||
|
||||
@@ -20,6 +20,7 @@ use crate::storage::Store;
|
||||
/// Cap'n Proto traversal limit (words). 4 Mi words = 32 MiB; bounds DoS from deeply nested or large messages.
|
||||
const CAPNP_TRAVERSAL_LIMIT_WORDS: usize = 4 * 1024 * 1024;
|
||||
|
||||
mod account_ops;
|
||||
mod auth_ops;
|
||||
mod blob_ops;
|
||||
mod channel_ops;
|
||||
@@ -212,6 +213,14 @@ impl node_service::Server for NodeServiceImpl {
|
||||
) -> capnp::capability::Promise<(), capnp::Error> {
|
||||
self.handle_download_blob(params, results)
|
||||
}
|
||||
|
||||
fn delete_account(
|
||||
&mut self,
|
||||
params: node_service::DeleteAccountParams,
|
||||
results: node_service::DeleteAccountResults,
|
||||
) -> capnp::capability::Promise<(), capnp::Error> {
|
||||
self.handle_delete_account(params, results)
|
||||
}
|
||||
}
|
||||
|
||||
pub const CURRENT_WIRE_VERSION: u16 = 1;
|
||||
|
||||
@@ -9,7 +9,7 @@ use rusqlite::{params, Connection};
|
||||
use crate::storage::{StorageError, Store};
|
||||
|
||||
/// Schema version after introducing the migration runner (existing DBs had 1).
|
||||
const SCHEMA_VERSION: i32 = 7;
|
||||
const SCHEMA_VERSION: i32 = 8;
|
||||
|
||||
/// Migrations: (migration_number, SQL). Files named NNN_name.sql, applied in order when N > user_version.
|
||||
const MIGRATIONS: &[(i32, &str)] = &[
|
||||
@@ -19,6 +19,7 @@ const MIGRATIONS: &[(i32, &str)] = &[
|
||||
(5, include_str!("../migrations/004_federation.sql")),
|
||||
(6, include_str!("../migrations/005_signing_key.sql")),
|
||||
(7, include_str!("../migrations/006_kt_log.sql")),
|
||||
(8, include_str!("../migrations/007_add_expiry.sql")),
|
||||
];
|
||||
|
||||
/// Runs pending migrations on an open connection: applies any migration whose number is greater
|
||||
@@ -133,6 +134,7 @@ impl Store for SqlStore {
|
||||
recipient_key: &[u8],
|
||||
channel_id: &[u8],
|
||||
payload: Vec<u8>,
|
||||
ttl_secs: Option<u32>,
|
||||
) -> Result<u64, StorageError> {
|
||||
let conn = self.lock_conn()?;
|
||||
// Atomically get-and-increment the per-inbox sequence counter.
|
||||
@@ -147,9 +149,16 @@ impl Store for SqlStore {
|
||||
|row| row.get(0),
|
||||
)
|
||||
.map_err(|e| StorageError::Db(e.to_string()))?;
|
||||
let expires_at: Option<i64> = ttl_secs.map(|ttl| {
|
||||
let now = std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_secs() as i64;
|
||||
now + ttl as i64
|
||||
});
|
||||
conn.execute(
|
||||
"INSERT INTO deliveries (recipient_key, channel_id, seq, payload) VALUES (?1, ?2, ?3, ?4)",
|
||||
params![recipient_key, channel_id, seq, payload],
|
||||
"INSERT INTO deliveries (recipient_key, channel_id, seq, payload, expires_at) VALUES (?1, ?2, ?3, ?4, ?5)",
|
||||
params![recipient_key, channel_id, seq, payload, expires_at],
|
||||
)
|
||||
.map_err(|e| StorageError::Db(e.to_string()))?;
|
||||
Ok(seq as u64)
|
||||
@@ -166,6 +175,7 @@ impl Store for SqlStore {
|
||||
.prepare(
|
||||
"SELECT id, seq, payload FROM deliveries
|
||||
WHERE recipient_key = ?1 AND channel_id = ?2
|
||||
AND (expires_at IS NULL OR expires_at > strftime('%s','now'))
|
||||
ORDER BY seq ASC",
|
||||
)
|
||||
.map_err(|e| StorageError::Db(e.to_string()))?;
|
||||
@@ -205,6 +215,7 @@ impl Store for SqlStore {
|
||||
.prepare(
|
||||
"SELECT id, seq, payload FROM deliveries
|
||||
WHERE recipient_key = ?1 AND channel_id = ?2
|
||||
AND (expires_at IS NULL OR expires_at > strftime('%s','now'))
|
||||
ORDER BY seq ASC
|
||||
LIMIT ?3",
|
||||
)
|
||||
@@ -237,7 +248,7 @@ impl Store for SqlStore {
|
||||
let conn = self.lock_conn()?;
|
||||
let count: i64 = conn
|
||||
.query_row(
|
||||
"SELECT COUNT(*) FROM deliveries WHERE recipient_key = ?1 AND channel_id = ?2",
|
||||
"SELECT COUNT(*) FROM deliveries WHERE recipient_key = ?1 AND channel_id = ?2 AND (expires_at IS NULL OR expires_at > strftime('%s','now'))",
|
||||
params![recipient_key, channel_id],
|
||||
|row| row.get(0),
|
||||
)
|
||||
@@ -247,18 +258,26 @@ impl Store for SqlStore {
|
||||
|
||||
fn gc_expired_messages(&self, max_age_secs: u64) -> Result<usize, StorageError> {
|
||||
let conn = self.lock_conn()?;
|
||||
let cutoff = std::time::SystemTime::now()
|
||||
let now = std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_secs()
|
||||
.saturating_sub(max_age_secs);
|
||||
let deleted = conn
|
||||
.as_secs();
|
||||
let cutoff = now.saturating_sub(max_age_secs);
|
||||
// Delete messages older than max_age_secs based on created_at.
|
||||
let deleted_age = conn
|
||||
.execute(
|
||||
"DELETE FROM deliveries WHERE created_at < ?1",
|
||||
params![cutoff as i64],
|
||||
)
|
||||
.map_err(|e| StorageError::Db(e.to_string()))?;
|
||||
Ok(deleted)
|
||||
// Delete messages that have passed their per-message TTL expiry.
|
||||
let deleted_ttl = conn
|
||||
.execute(
|
||||
"DELETE FROM deliveries WHERE expires_at IS NOT NULL AND expires_at <= ?1",
|
||||
params![now as i64],
|
||||
)
|
||||
.map_err(|e| StorageError::Db(e.to_string()))?;
|
||||
Ok(deleted_age + deleted_ttl)
|
||||
}
|
||||
|
||||
fn upload_hybrid_key(
|
||||
@@ -436,11 +455,13 @@ impl Store for SqlStore {
|
||||
let sql = if limit == 0 {
|
||||
"SELECT seq, payload FROM deliveries
|
||||
WHERE recipient_key = ?1 AND channel_id = ?2
|
||||
AND (expires_at IS NULL OR expires_at > strftime('%s','now'))
|
||||
ORDER BY seq ASC".to_string()
|
||||
} else {
|
||||
format!(
|
||||
"SELECT seq, payload FROM deliveries
|
||||
WHERE recipient_key = ?1 AND channel_id = ?2
|
||||
AND (expires_at IS NULL OR expires_at > strftime('%s','now'))
|
||||
ORDER BY seq ASC
|
||||
LIMIT {}",
|
||||
limit
|
||||
@@ -604,6 +625,91 @@ impl Store for SqlStore {
|
||||
.map_err(|e| StorageError::Db(e.to_string()))?;
|
||||
Ok(rows)
|
||||
}
|
||||
|
||||
fn delete_account(&self, identity_key: &[u8]) -> Result<(), StorageError> {
|
||||
let conn = self.lock_conn()?;
|
||||
|
||||
// Resolve the username for this identity key.
|
||||
let username: Option<String> = conn
|
||||
.query_row(
|
||||
"SELECT username FROM user_identity_keys WHERE identity_key = ?1",
|
||||
params![identity_key],
|
||||
|row| row.get(0),
|
||||
)
|
||||
.optional()
|
||||
.map_err(|e| StorageError::Db(e.to_string()))?;
|
||||
|
||||
// Use a transaction for atomicity.
|
||||
conn.execute_batch("BEGIN IMMEDIATE")
|
||||
.map_err(|e| StorageError::Db(e.to_string()))?;
|
||||
|
||||
let result = (|| -> Result<(), StorageError> {
|
||||
// 1. Delete queued deliveries.
|
||||
conn.execute(
|
||||
"DELETE FROM deliveries WHERE recipient_key = ?1",
|
||||
params![identity_key],
|
||||
).map_err(|e| StorageError::Db(e.to_string()))?;
|
||||
|
||||
conn.execute(
|
||||
"DELETE FROM delivery_seq_counters WHERE recipient_key = ?1",
|
||||
params![identity_key],
|
||||
).map_err(|e| StorageError::Db(e.to_string()))?;
|
||||
|
||||
// 2. Delete key packages.
|
||||
conn.execute(
|
||||
"DELETE FROM key_packages WHERE identity_key = ?1",
|
||||
params![identity_key],
|
||||
).map_err(|e| StorageError::Db(e.to_string()))?;
|
||||
|
||||
// 3. Delete hybrid keys.
|
||||
conn.execute(
|
||||
"DELETE FROM hybrid_keys WHERE identity_key = ?1",
|
||||
params![identity_key],
|
||||
).map_err(|e| StorageError::Db(e.to_string()))?;
|
||||
|
||||
// 4. Delete channel memberships.
|
||||
conn.execute(
|
||||
"DELETE FROM channels WHERE member_a = ?1 OR member_b = ?1",
|
||||
params![identity_key],
|
||||
).map_err(|e| StorageError::Db(e.to_string()))?;
|
||||
|
||||
// 5. Delete identity key mapping.
|
||||
conn.execute(
|
||||
"DELETE FROM user_identity_keys WHERE identity_key = ?1",
|
||||
params![identity_key],
|
||||
).map_err(|e| StorageError::Db(e.to_string()))?;
|
||||
|
||||
// 6. Delete user record (by username).
|
||||
if let Some(ref uname) = username {
|
||||
conn.execute(
|
||||
"DELETE FROM users WHERE username = ?1",
|
||||
params![uname],
|
||||
).map_err(|e| StorageError::Db(e.to_string()))?;
|
||||
}
|
||||
|
||||
// 7. Delete endpoints (table may not exist on older schemas).
|
||||
let _ = conn.execute(
|
||||
"DELETE FROM endpoints WHERE identity_key = ?1",
|
||||
params![identity_key],
|
||||
);
|
||||
|
||||
// Do NOT delete KT log entries — append-only for auditability.
|
||||
|
||||
Ok(())
|
||||
})();
|
||||
|
||||
match result {
|
||||
Ok(()) => {
|
||||
conn.execute_batch("COMMIT")
|
||||
.map_err(|e| StorageError::Db(e.to_string()))?;
|
||||
Ok(())
|
||||
}
|
||||
Err(e) => {
|
||||
let _ = conn.execute_batch("ROLLBACK");
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Convenience extension for `rusqlite::OptionalExtension`.
|
||||
@@ -677,8 +783,8 @@ mod tests {
|
||||
let rk = [1u8; 32];
|
||||
let ch = b"channel-1";
|
||||
|
||||
let seq0 = store.enqueue(&rk, ch, b"msg1".to_vec()).unwrap();
|
||||
let seq1 = store.enqueue(&rk, ch, b"msg2".to_vec()).unwrap();
|
||||
let seq0 = store.enqueue(&rk, ch, b"msg1".to_vec(), None).unwrap();
|
||||
let seq1 = store.enqueue(&rk, ch, b"msg2".to_vec(), None).unwrap();
|
||||
assert_eq!(seq0, 0);
|
||||
assert_eq!(seq1, 1);
|
||||
|
||||
@@ -694,9 +800,9 @@ mod tests {
|
||||
let rk = [5u8; 32];
|
||||
let ch = b"ch";
|
||||
|
||||
store.enqueue(&rk, ch, b"a".to_vec()).unwrap();
|
||||
store.enqueue(&rk, ch, b"b".to_vec()).unwrap();
|
||||
store.enqueue(&rk, ch, b"c".to_vec()).unwrap();
|
||||
store.enqueue(&rk, ch, b"a".to_vec(), None).unwrap();
|
||||
store.enqueue(&rk, ch, b"b".to_vec(), None).unwrap();
|
||||
store.enqueue(&rk, ch, b"c".to_vec(), None).unwrap();
|
||||
|
||||
let msgs = store.fetch_limited(&rk, ch, 2).unwrap();
|
||||
assert_eq!(msgs, vec![(0u64, b"a".to_vec()), (1u64, b"b".to_vec())]);
|
||||
@@ -712,8 +818,8 @@ mod tests {
|
||||
let ch = b"ch";
|
||||
|
||||
assert_eq!(store.queue_depth(&rk, ch).unwrap(), 0);
|
||||
store.enqueue(&rk, ch, b"x".to_vec()).unwrap();
|
||||
store.enqueue(&rk, ch, b"y".to_vec()).unwrap();
|
||||
store.enqueue(&rk, ch, b"x".to_vec(), None).unwrap();
|
||||
store.enqueue(&rk, ch, b"y".to_vec(), None).unwrap();
|
||||
assert_eq!(store.queue_depth(&rk, ch).unwrap(), 2);
|
||||
}
|
||||
|
||||
@@ -756,8 +862,8 @@ mod tests {
|
||||
let store = open_in_memory();
|
||||
let rk = [4u8; 32];
|
||||
|
||||
store.enqueue(&rk, b"ch-a", b"a1".to_vec()).unwrap();
|
||||
store.enqueue(&rk, b"ch-b", b"b1".to_vec()).unwrap();
|
||||
store.enqueue(&rk, b"ch-a", b"a1".to_vec(), None).unwrap();
|
||||
store.enqueue(&rk, b"ch-b", b"b1".to_vec(), None).unwrap();
|
||||
|
||||
let a_msgs = store.fetch(&rk, b"ch-a").unwrap();
|
||||
assert_eq!(a_msgs, vec![(0u64, b"a1".to_vec())]);
|
||||
|
||||
@@ -38,11 +38,13 @@ pub trait Store: Send + Sync {
|
||||
|
||||
/// Enqueue a payload and return the monotonically increasing per-inbox sequence number
|
||||
/// assigned to this message. Clients sort by seq before MLS processing.
|
||||
/// When `ttl_secs` is `Some(n)`, the message expires n seconds from now.
|
||||
fn enqueue(
|
||||
&self,
|
||||
recipient_key: &[u8],
|
||||
channel_id: &[u8],
|
||||
payload: Vec<u8>,
|
||||
ttl_secs: Option<u32>,
|
||||
) -> Result<u64, StorageError>;
|
||||
|
||||
/// Fetch and drain all queued messages, returning `(seq, payload)` pairs ordered by seq.
|
||||
@@ -176,6 +178,12 @@ pub trait Store: Send + Sync {
|
||||
/// List all active federation peers.
|
||||
#[allow(dead_code)] // federation not yet wired up
|
||||
fn list_federation_peers(&self) -> Result<Vec<(String, bool)>, StorageError>;
|
||||
|
||||
/// Permanently delete all data associated with an identity key.
|
||||
/// Removes deliveries, key packages, hybrid keys, channel memberships,
|
||||
/// user identity key mapping, and the user record itself.
|
||||
/// Does NOT delete KT log entries (append-only for auditability).
|
||||
fn delete_account(&self, identity_key: &[u8]) -> Result<(), StorageError>;
|
||||
}
|
||||
|
||||
// ── ChannelKey ───────────────────────────────────────────────────────────────
|
||||
@@ -453,6 +461,7 @@ impl Store for FileBackedStore {
|
||||
recipient_key: &[u8],
|
||||
channel_id: &[u8],
|
||||
payload: Vec<u8>,
|
||||
_ttl_secs: Option<u32>,
|
||||
) -> Result<u64, StorageError> {
|
||||
let mut inner = lock(&self.deliveries)?;
|
||||
let key = ChannelKey {
|
||||
@@ -769,6 +778,67 @@ impl Store for FileBackedStore {
|
||||
fn list_federation_peers(&self) -> Result<Vec<(String, bool)>, StorageError> {
|
||||
Ok(vec![])
|
||||
}
|
||||
|
||||
fn delete_account(&self, identity_key: &[u8]) -> Result<(), StorageError> {
|
||||
// Resolve username from identity key for user record deletion.
|
||||
let username = {
|
||||
let ik_map = lock(&self.identity_keys)?;
|
||||
ik_map.iter()
|
||||
.find(|(_, v)| v.as_slice() == identity_key)
|
||||
.map(|(k, _)| k.clone())
|
||||
};
|
||||
|
||||
// Remove deliveries where this identity is the recipient.
|
||||
{
|
||||
let mut deliveries = lock(&self.deliveries)?;
|
||||
deliveries.map.retain(|k, _| k.recipient_key != identity_key);
|
||||
deliveries.next_seq.retain(|k, _| k.recipient_key != identity_key);
|
||||
self.flush_delivery_map(&self.ds_path, &deliveries)?;
|
||||
}
|
||||
|
||||
// Remove key packages.
|
||||
{
|
||||
let mut kp = lock(&self.key_packages)?;
|
||||
kp.remove(identity_key);
|
||||
self.flush_kp_map(&self.kp_path, &kp)?;
|
||||
}
|
||||
|
||||
// Remove hybrid keys.
|
||||
{
|
||||
let mut hk = lock(&self.hybrid_keys)?;
|
||||
hk.remove(identity_key);
|
||||
self.flush_hybrid_keys(&self.hk_path, &hk)?;
|
||||
}
|
||||
|
||||
// Remove channels where this identity is a member.
|
||||
{
|
||||
let mut ch = lock(&self.channels)?;
|
||||
ch.retain(|_, (a, b)| a.as_slice() != identity_key && b.as_slice() != identity_key);
|
||||
self.flush_channels(&self.channels_path, &ch)?;
|
||||
}
|
||||
|
||||
// Remove identity key mapping and user record.
|
||||
if let Some(uname) = username {
|
||||
{
|
||||
let mut ik_map = lock(&self.identity_keys)?;
|
||||
ik_map.remove(&uname);
|
||||
self.flush_map_string_bytes(&self.identity_keys_path, &ik_map)?;
|
||||
}
|
||||
{
|
||||
let mut users = lock(&self.users)?;
|
||||
users.remove(&uname);
|
||||
self.flush_users(&self.users_path, &users)?;
|
||||
}
|
||||
}
|
||||
|
||||
// Remove endpoint.
|
||||
{
|
||||
let mut ep = lock(&self.endpoints)?;
|
||||
ep.remove(identity_key);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -799,8 +869,8 @@ mod tests {
|
||||
let (_dir, store) = temp_store();
|
||||
let rk = vec![2u8; 32];
|
||||
let ch = vec![];
|
||||
let seq0 = store.enqueue(&rk, &ch, vec![1]).unwrap();
|
||||
let seq1 = store.enqueue(&rk, &ch, vec![2]).unwrap();
|
||||
let seq0 = store.enqueue(&rk, &ch, vec![1], None).unwrap();
|
||||
let seq1 = store.enqueue(&rk, &ch, vec![2], None).unwrap();
|
||||
assert_eq!(seq0, 0);
|
||||
assert_eq!(seq1, 1);
|
||||
let msgs = store.fetch(&rk, &ch).unwrap();
|
||||
@@ -818,7 +888,7 @@ mod tests {
|
||||
let rk = vec![3u8; 32];
|
||||
let ch = vec![];
|
||||
for i in 0..5 {
|
||||
store.enqueue(&rk, &ch, vec![i]).unwrap();
|
||||
store.enqueue(&rk, &ch, vec![i], None).unwrap();
|
||||
}
|
||||
let msgs = store.fetch_limited(&rk, &ch, 2).unwrap();
|
||||
assert_eq!(msgs.len(), 2);
|
||||
@@ -835,9 +905,9 @@ mod tests {
|
||||
let rk = vec![4u8; 32];
|
||||
let ch = vec![];
|
||||
assert_eq!(store.queue_depth(&rk, &ch).unwrap(), 0);
|
||||
store.enqueue(&rk, &ch, vec![1]).unwrap();
|
||||
store.enqueue(&rk, &ch, vec![1], None).unwrap();
|
||||
assert_eq!(store.queue_depth(&rk, &ch).unwrap(), 1);
|
||||
store.enqueue(&rk, &ch, vec![2]).unwrap();
|
||||
store.enqueue(&rk, &ch, vec![2], None).unwrap();
|
||||
assert_eq!(store.queue_depth(&rk, &ch).unwrap(), 2);
|
||||
store.fetch(&rk, &ch).unwrap();
|
||||
assert_eq!(store.queue_depth(&rk, &ch).unwrap(), 0);
|
||||
|
||||
Reference in New Issue
Block a user