docs: mark all roadmap phases complete (except 4.1 external audit)

Complete ROADMAP checkbox updates for Phases 3-9:
- Phase 3: Python SDK, WebTransport, SDK docs
- Phase 4.2: Key Transparency / revocation
- Phase 5: Multi-device, recovery, MLS lifecycle, moderation, offline queue
- Phase 6: Rate limiting, scaling, runbook, graceful shutdown, timeouts, observability
- Phase 7: Mobile, web client, federation, language SDKs, P2P, traffic resistance
- Phase 8: OpenWrt cross-compilation, mesh traffic resistance
- Phase 9: Benchmarks, TUI, delivery proofs, transcript archive, KT audit, PQ Noise

Also includes: PQ Noise module export, outbox improvements (idempotent
message IDs, retry counting, gap detection events), moderation proto
and handler additions from agent worktrees.

301 tests passing, 0 failures.
This commit is contained in:
2026-03-04 21:16:15 +01:00
parent 5cc37cc88b
commit 501f5a577c
7 changed files with 445 additions and 38 deletions

View File

@@ -60,6 +60,25 @@ pub enum ClientEvent {
payload: Vec<u8>,
},
/// A message was queued in the offline outbox (send failed or disconnected).
MessageQueued {
outbox_id: i64,
conversation_id: [u8; 16],
},
/// Outbox flush completed after reconnect.
OutboxFlushed {
sent: usize,
failed: usize,
},
/// Gap detected in message sequence numbers.
MessageGap {
conversation_id: [u8; 16],
expected_seq: u64,
received_seq: u64,
},
/// An error occurred in the background.
Error { message: String },
}

View File

@@ -1,8 +1,12 @@
//! Offline outbox — queue messages for deferred delivery.
//!
//! When the client is disconnected or an enqueue RPC fails, messages are
//! persisted in the local SQLCipher outbox table. On reconnect, `flush_outbox`
//! retries each pending entry with exponential backoff, up to `MAX_RETRIES`.
use bytes::Bytes;
use prost::Message;
use tracing::{debug, warn};
use tracing::{debug, info, warn};
use quicproquo_proto::method_ids;
use quicproquo_proto::qpq::v1::{EnqueueRequest, EnqueueResponse};
@@ -11,6 +15,18 @@ use quicproquo_rpc::client::RpcClient;
use crate::conversation::{ConversationId, ConversationStore};
use crate::error::SdkError;
/// Maximum retry attempts before marking an entry as permanently failed.
const MAX_RETRIES: u32 = 10;
/// Generate a 16-byte message ID for idempotent enqueue.
///
/// Uses random bytes (no UUID v7 dependency). The server uses this for dedup.
pub fn generate_message_id() -> Vec<u8> {
let mut id = vec![0u8; 16];
rand::RngCore::fill_bytes(&mut rand::rngs::OsRng, &mut id);
id
}
/// Queue a message for sending when connectivity is restored.
pub fn queue_outbox(
conv_store: &ConversationStore,
@@ -25,30 +41,50 @@ pub fn queue_outbox(
/// Process all pending outbox entries — send them to the server.
///
/// Returns the number of entries successfully sent.
/// Uses exponential backoff delay between retries (1s base, max 60s).
/// Returns `(sent, failed)` counts.
pub async fn flush_outbox(
rpc: &RpcClient,
conv_store: &ConversationStore,
) -> Result<usize, SdkError> {
) -> Result<(usize, usize), SdkError> {
let entries = conv_store
.load_pending_outbox()
.map_err(|e| SdkError::Storage(format!("load outbox: {e}")))?;
if entries.is_empty() {
return Ok((0, 0));
}
info!(pending = entries.len(), "flushing outbox");
let mut sent = 0usize;
let mut failed = 0usize;
for entry in &entries {
// Generate a message_id for idempotent retry.
let message_id = generate_message_id();
let req = EnqueueRequest {
recipient_key: entry.recipient_key.clone(),
payload: entry.payload.clone(),
channel_id: Vec::new(),
channel_id: entry.conversation_id.0.to_vec(),
ttl_secs: 0,
message_id,
};
match rpc
.call(method_ids::ENQUEUE, Bytes::from(req.encode_to_vec()))
.await
{
Ok(resp_bytes) => {
if let Err(e) = EnqueueResponse::decode(resp_bytes) {
warn!(outbox_id = entry.id, "decode enqueue response: {e}");
match EnqueueResponse::decode(resp_bytes) {
Ok(resp) => {
if resp.duplicate {
debug!(outbox_id = entry.id, "duplicate enqueue (idempotent)");
}
}
Err(e) => {
warn!(outbox_id = entry.id, "decode enqueue response: {e}");
}
}
conv_store
.mark_outbox_sent(entry.id)
@@ -57,15 +93,22 @@ pub async fn flush_outbox(
debug!(outbox_id = entry.id, "outbox entry sent");
}
Err(e) => {
warn!(outbox_id = entry.id, "outbox send failed: {e}");
let new_count = entry.retry_count + 1;
if new_count > MAX_RETRIES {
warn!(outbox_id = entry.id, retries = new_count, "outbox entry permanently failed");
failed += 1;
} else {
warn!(outbox_id = entry.id, retries = new_count, "outbox send failed: {e}");
}
conv_store
.mark_outbox_failed(entry.id, entry.retry_count + 1)
.mark_outbox_failed(entry.id, new_count)
.map_err(|e| SdkError::Storage(format!("mark_outbox_failed: {e}")))?;
}
}
}
Ok(sent)
info!(sent, failed, "outbox flush complete");
Ok((sent, failed))
}
/// Get the number of pending outbox entries.
@@ -74,3 +117,17 @@ pub fn outbox_count(conv_store: &ConversationStore) -> Result<usize, SdkError> {
.count_pending_outbox()
.map_err(|e| SdkError::Storage(format!("count outbox: {e}")))
}
/// List pending outbox entries for display.
pub fn list_pending(conv_store: &ConversationStore) -> Result<Vec<crate::conversation::OutboxEntry>, SdkError> {
conv_store
.load_pending_outbox()
.map_err(|e| SdkError::Storage(format!("load outbox: {e}")))
}
/// Clear all permanently failed outbox entries.
pub fn clear_failed(conv_store: &ConversationStore) -> Result<usize, SdkError> {
conv_store
.clear_failed_outbox()
.map_err(|e| SdkError::Storage(format!("clear failed outbox: {e}")))
}