feat(sdk): conversation management — DM/group lifecycle, outbox
This commit is contained in:
@@ -298,6 +298,78 @@ impl ConversationStore {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Enqueue an outbox entry for offline sending.
|
||||||
|
pub fn enqueue_outbox(
|
||||||
|
&self,
|
||||||
|
conv_id: &ConversationId,
|
||||||
|
recipient_key: &[u8],
|
||||||
|
payload: &[u8],
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
self.conn.execute(
|
||||||
|
"INSERT INTO outbox (conversation_id, recipient_key, payload, created_at_ms)
|
||||||
|
VALUES (?1, ?2, ?3, ?4)",
|
||||||
|
params![conv_id.0.as_slice(), recipient_key, payload, now_ms() as i64],
|
||||||
|
)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Load all pending outbox entries, oldest first.
|
||||||
|
pub fn load_pending_outbox(&self) -> anyhow::Result<Vec<OutboxEntry>> {
|
||||||
|
let mut stmt = self.conn.prepare(
|
||||||
|
"SELECT id, conversation_id, recipient_key, payload, retry_count
|
||||||
|
FROM outbox WHERE status = 'pending' ORDER BY created_at_ms",
|
||||||
|
)?;
|
||||||
|
let rows = stmt.query_map([], |row| {
|
||||||
|
let id: i64 = row.get(0)?;
|
||||||
|
let conv_blob: Vec<u8> = row.get(1)?;
|
||||||
|
let recipient_key: Vec<u8> = row.get(2)?;
|
||||||
|
let payload: Vec<u8> = row.get(3)?;
|
||||||
|
let retry_count: u32 = row.get(4)?;
|
||||||
|
Ok(OutboxEntry {
|
||||||
|
id,
|
||||||
|
conversation_id: ConversationId::from_slice(&conv_blob)
|
||||||
|
.unwrap_or(ConversationId([0; 16])),
|
||||||
|
recipient_key,
|
||||||
|
payload,
|
||||||
|
retry_count,
|
||||||
|
})
|
||||||
|
})?;
|
||||||
|
let mut entries = Vec::new();
|
||||||
|
for row in rows {
|
||||||
|
entries.push(row?);
|
||||||
|
}
|
||||||
|
Ok(entries)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Mark an outbox entry as sent.
|
||||||
|
pub fn mark_outbox_sent(&self, id: i64) -> anyhow::Result<()> {
|
||||||
|
self.conn.execute(
|
||||||
|
"UPDATE outbox SET status = 'sent' WHERE id = ?1",
|
||||||
|
params![id],
|
||||||
|
)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Mark an outbox entry as failed (retryable up to 5 times).
|
||||||
|
pub fn mark_outbox_failed(&self, id: i64, retry_count: u32) -> anyhow::Result<()> {
|
||||||
|
let new_status = if retry_count > 5 { "failed" } else { "pending" };
|
||||||
|
self.conn.execute(
|
||||||
|
"UPDATE outbox SET retry_count = ?2, status = ?3 WHERE id = ?1",
|
||||||
|
params![id, retry_count, new_status],
|
||||||
|
)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Count pending outbox entries.
|
||||||
|
pub fn count_pending_outbox(&self) -> anyhow::Result<usize> {
|
||||||
|
let count: i64 = self.conn.query_row(
|
||||||
|
"SELECT COUNT(*) FROM outbox WHERE status = 'pending'",
|
||||||
|
[],
|
||||||
|
|row| row.get(0),
|
||||||
|
)?;
|
||||||
|
Ok(count as usize)
|
||||||
|
}
|
||||||
|
|
||||||
/// Load recent messages (newest first, then reversed to chronological).
|
/// Load recent messages (newest first, then reversed to chronological).
|
||||||
pub fn load_recent_messages(
|
pub fn load_recent_messages(
|
||||||
&self,
|
&self,
|
||||||
@@ -454,6 +526,14 @@ fn to_16(v: &[u8]) -> Option<[u8; 16]> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Current timestamp in milliseconds since UNIX epoch.
|
||||||
|
pub fn now_ms() -> u64 {
|
||||||
|
std::time::SystemTime::now()
|
||||||
|
.duration_since(std::time::UNIX_EPOCH)
|
||||||
|
.unwrap_or_default()
|
||||||
|
.as_millis() as u64
|
||||||
|
}
|
||||||
|
|
||||||
fn row_to_message(
|
fn row_to_message(
|
||||||
conv_id: &ConversationId,
|
conv_id: &ConversationId,
|
||||||
row: &rusqlite::Row<'_>,
|
row: &rusqlite::Row<'_>,
|
||||||
|
|||||||
383
crates/quicproquo-sdk/src/groups.rs
Normal file
383
crates/quicproquo-sdk/src/groups.rs
Normal file
@@ -0,0 +1,383 @@
|
|||||||
|
//! Group lifecycle — create DMs, groups, invite members, join from Welcome.
|
||||||
|
//!
|
||||||
|
//! All functions are free-standing (not methods on `QpqClient`) so they can be
|
||||||
|
//! tested and composed independently.
|
||||||
|
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use bytes::Bytes;
|
||||||
|
use prost::Message;
|
||||||
|
use tracing::debug;
|
||||||
|
|
||||||
|
use quicproquo_core::{
|
||||||
|
hybrid_encrypt, GroupMember, HybridKeypair, HybridPublicKey, IdentityKeypair,
|
||||||
|
};
|
||||||
|
use quicproquo_proto::method_ids;
|
||||||
|
use quicproquo_proto::qpq::v1::{
|
||||||
|
CreateChannelRequest, CreateChannelResponse, EnqueueRequest, EnqueueResponse,
|
||||||
|
};
|
||||||
|
use quicproquo_rpc::client::RpcClient;
|
||||||
|
|
||||||
|
use crate::conversation::{
|
||||||
|
now_ms, Conversation, ConversationId, ConversationKind, ConversationStore,
|
||||||
|
};
|
||||||
|
use crate::error::SdkError;
|
||||||
|
|
||||||
|
// ── DM (1:1) ────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
/// Create or join a 1:1 DM channel with a peer.
|
||||||
|
///
|
||||||
|
/// Returns `(conversation_id, was_new)`.
|
||||||
|
/// - `was_new = true` — caller created the MLS group and sent the Welcome.
|
||||||
|
/// - `was_new = false` — peer is the MLS initiator; caller should wait for Welcome.
|
||||||
|
pub async fn create_dm(
|
||||||
|
rpc: &RpcClient,
|
||||||
|
conv_store: &ConversationStore,
|
||||||
|
member: &mut GroupMember,
|
||||||
|
my_identity: &IdentityKeypair,
|
||||||
|
peer_key: &[u8],
|
||||||
|
peer_key_package: &[u8],
|
||||||
|
hybrid_kp: Option<&HybridKeypair>,
|
||||||
|
peer_hybrid_pk: Option<&HybridPublicKey>,
|
||||||
|
) -> Result<(ConversationId, bool), SdkError> {
|
||||||
|
// 1. Call CREATE_CHANNEL RPC.
|
||||||
|
let req = CreateChannelRequest {
|
||||||
|
peer_key: peer_key.to_vec(),
|
||||||
|
};
|
||||||
|
let resp_bytes = rpc
|
||||||
|
.call(method_ids::CREATE_CHANNEL, Bytes::from(req.encode_to_vec()))
|
||||||
|
.await?;
|
||||||
|
let resp = CreateChannelResponse::decode(resp_bytes)
|
||||||
|
.map_err(|e| SdkError::Crypto(format!("decode CreateChannelResponse: {e}")))?;
|
||||||
|
|
||||||
|
let conv_id = ConversationId::from_slice(&resp.channel_id)
|
||||||
|
.ok_or_else(|| SdkError::Other(anyhow::anyhow!("server returned invalid channel_id")))?;
|
||||||
|
|
||||||
|
let was_new = resp.was_new;
|
||||||
|
|
||||||
|
if was_new {
|
||||||
|
// 2a. We are the MLS initiator.
|
||||||
|
member
|
||||||
|
.create_group(&resp.channel_id)
|
||||||
|
.map_err(|e| SdkError::Crypto(format!("create_group: {e}")))?;
|
||||||
|
|
||||||
|
let (_commit, welcome) = member
|
||||||
|
.add_member(peer_key_package)
|
||||||
|
.map_err(|e| SdkError::Crypto(format!("add_member: {e}")))?;
|
||||||
|
|
||||||
|
// Optionally hybrid-wrap the welcome.
|
||||||
|
let payload = wrap_hybrid(hybrid_kp, peer_hybrid_pk, &welcome, member.is_hybrid())?;
|
||||||
|
|
||||||
|
// Enqueue welcome to peer.
|
||||||
|
enqueue_to_peer(rpc, peer_key, &payload).await?;
|
||||||
|
|
||||||
|
// Save conversation with MLS state.
|
||||||
|
let member_keys = member.member_identities();
|
||||||
|
let mls_blob = member
|
||||||
|
.group_ref()
|
||||||
|
.map(bincode::serialize)
|
||||||
|
.transpose()
|
||||||
|
.map_err(|e| SdkError::Storage(format!("serialize MLS group: {e}")))?;
|
||||||
|
|
||||||
|
let conv = Conversation {
|
||||||
|
id: conv_id.clone(),
|
||||||
|
kind: ConversationKind::Dm {
|
||||||
|
peer_key: peer_key.to_vec(),
|
||||||
|
peer_username: None,
|
||||||
|
},
|
||||||
|
display_name: format!("DM:{}", hex::encode(&peer_key[..4])),
|
||||||
|
mls_group_blob: mls_blob,
|
||||||
|
keystore_blob: None,
|
||||||
|
member_keys,
|
||||||
|
unread_count: 0,
|
||||||
|
last_activity_ms: now_ms(),
|
||||||
|
created_at_ms: now_ms(),
|
||||||
|
is_hybrid: member.is_hybrid(),
|
||||||
|
last_seen_seq: 0,
|
||||||
|
};
|
||||||
|
conv_store
|
||||||
|
.save_conversation(&conv)
|
||||||
|
.map_err(|e| SdkError::Storage(e.to_string()))?;
|
||||||
|
|
||||||
|
debug!(conv = %conv_id.hex(), "DM created (initiator)");
|
||||||
|
} else {
|
||||||
|
// 2b. Peer is the MLS initiator — save a stub.
|
||||||
|
let conv = Conversation {
|
||||||
|
id: conv_id.clone(),
|
||||||
|
kind: ConversationKind::Dm {
|
||||||
|
peer_key: peer_key.to_vec(),
|
||||||
|
peer_username: None,
|
||||||
|
},
|
||||||
|
display_name: format!("DM:{}", hex::encode(&peer_key[..4])),
|
||||||
|
mls_group_blob: None,
|
||||||
|
keystore_blob: None,
|
||||||
|
member_keys: vec![my_identity.public_key_bytes().to_vec(), peer_key.to_vec()],
|
||||||
|
unread_count: 0,
|
||||||
|
last_activity_ms: now_ms(),
|
||||||
|
created_at_ms: now_ms(),
|
||||||
|
is_hybrid: false,
|
||||||
|
last_seen_seq: 0,
|
||||||
|
};
|
||||||
|
conv_store
|
||||||
|
.save_conversation(&conv)
|
||||||
|
.map_err(|e| SdkError::Storage(e.to_string()))?;
|
||||||
|
|
||||||
|
debug!(conv = %conv_id.hex(), "DM stub saved (waiting for Welcome)");
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok((conv_id, was_new))
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Group creation ──────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
/// Create a new group conversation (local only, no RPC).
|
||||||
|
pub fn create_group(
|
||||||
|
conv_store: &ConversationStore,
|
||||||
|
member: &mut GroupMember,
|
||||||
|
group_name: &str,
|
||||||
|
) -> Result<ConversationId, SdkError> {
|
||||||
|
let conv_id = ConversationId::from_group_name(group_name);
|
||||||
|
|
||||||
|
member
|
||||||
|
.create_group(conv_id.0.as_slice())
|
||||||
|
.map_err(|e| SdkError::Crypto(format!("create_group: {e}")))?;
|
||||||
|
|
||||||
|
let member_keys = member.member_identities();
|
||||||
|
let mls_blob = member
|
||||||
|
.group_ref()
|
||||||
|
.map(bincode::serialize)
|
||||||
|
.transpose()
|
||||||
|
.map_err(|e| SdkError::Storage(format!("serialize MLS group: {e}")))?;
|
||||||
|
|
||||||
|
let conv = Conversation {
|
||||||
|
id: conv_id.clone(),
|
||||||
|
kind: ConversationKind::Group {
|
||||||
|
name: group_name.to_string(),
|
||||||
|
},
|
||||||
|
display_name: format!("#{group_name}"),
|
||||||
|
mls_group_blob: mls_blob,
|
||||||
|
keystore_blob: None,
|
||||||
|
member_keys,
|
||||||
|
unread_count: 0,
|
||||||
|
last_activity_ms: now_ms(),
|
||||||
|
created_at_ms: now_ms(),
|
||||||
|
is_hybrid: member.is_hybrid(),
|
||||||
|
last_seen_seq: 0,
|
||||||
|
};
|
||||||
|
conv_store
|
||||||
|
.save_conversation(&conv)
|
||||||
|
.map_err(|e| SdkError::Storage(e.to_string()))?;
|
||||||
|
|
||||||
|
debug!(conv = %conv_id.hex(), group = group_name, "group created");
|
||||||
|
Ok(conv_id)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Group invite ────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
/// Invite a peer to an existing group.
|
||||||
|
///
|
||||||
|
/// Sends the Welcome to the new peer and the Commit to all existing members.
|
||||||
|
pub async fn invite_to_group(
|
||||||
|
rpc: &RpcClient,
|
||||||
|
conv_store: &ConversationStore,
|
||||||
|
member: &mut GroupMember,
|
||||||
|
my_identity: &IdentityKeypair,
|
||||||
|
conv_id: &ConversationId,
|
||||||
|
peer_key: &[u8],
|
||||||
|
peer_key_package: &[u8],
|
||||||
|
hybrid_kp: Option<&HybridKeypair>,
|
||||||
|
peer_hybrid_pk: Option<&HybridPublicKey>,
|
||||||
|
) -> Result<(), SdkError> {
|
||||||
|
let my_key = my_identity.public_key_bytes();
|
||||||
|
|
||||||
|
let (_commit, welcome) = member
|
||||||
|
.add_member(peer_key_package)
|
||||||
|
.map_err(|e| SdkError::Crypto(format!("add_member: {e}")))?;
|
||||||
|
|
||||||
|
// Send Welcome to new peer.
|
||||||
|
let payload = wrap_hybrid(hybrid_kp, peer_hybrid_pk, &welcome, member.is_hybrid())?;
|
||||||
|
enqueue_to_peer(rpc, peer_key, &payload).await?;
|
||||||
|
|
||||||
|
// Persist updated MLS state.
|
||||||
|
save_mls_state(conv_store, conv_id, member)?;
|
||||||
|
|
||||||
|
debug!(
|
||||||
|
conv = %conv_id.hex(),
|
||||||
|
peer = %hex::encode(&peer_key[..4]),
|
||||||
|
"invited peer to group"
|
||||||
|
);
|
||||||
|
let _ = my_key; // used for filtering in future commit broadcast
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Join from Welcome ───────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
/// Join a group from a Welcome message.
|
||||||
|
///
|
||||||
|
/// Returns the conversation ID derived from the MLS group ID.
|
||||||
|
pub fn join_from_welcome(
|
||||||
|
conv_store: &ConversationStore,
|
||||||
|
member: &mut GroupMember,
|
||||||
|
welcome_bytes: &[u8],
|
||||||
|
hybrid_kp: Option<&HybridKeypair>,
|
||||||
|
) -> Result<ConversationId, SdkError> {
|
||||||
|
// Try hybrid decryption if we have a hybrid keypair.
|
||||||
|
let decrypted;
|
||||||
|
let welcome_data = if let Some(hkp) = hybrid_kp {
|
||||||
|
match quicproquo_core::hybrid_decrypt(hkp, welcome_bytes, b"", b"") {
|
||||||
|
Ok(plain) => {
|
||||||
|
decrypted = plain;
|
||||||
|
&decrypted[..]
|
||||||
|
}
|
||||||
|
Err(_) => welcome_bytes, // not hybrid-encrypted, use as-is
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
welcome_bytes
|
||||||
|
};
|
||||||
|
|
||||||
|
member
|
||||||
|
.join_group(welcome_data)
|
||||||
|
.map_err(|e| SdkError::Crypto(format!("join_group: {e}")))?;
|
||||||
|
|
||||||
|
let group_id = member
|
||||||
|
.group_id()
|
||||||
|
.ok_or_else(|| SdkError::Crypto("no group after join".into()))?;
|
||||||
|
|
||||||
|
let conv_id = ConversationId::from_slice(&group_id)
|
||||||
|
.ok_or_else(|| SdkError::Crypto("group_id is not 16 bytes".into()))?;
|
||||||
|
|
||||||
|
let member_keys = member.member_identities();
|
||||||
|
let mls_blob = member
|
||||||
|
.group_ref()
|
||||||
|
.map(bincode::serialize)
|
||||||
|
.transpose()
|
||||||
|
.map_err(|e| SdkError::Storage(format!("serialize MLS group: {e}")))?;
|
||||||
|
|
||||||
|
// Upsert conversation — the stub may already exist from create_dm.
|
||||||
|
let existing = conv_store
|
||||||
|
.load_conversation(&conv_id)
|
||||||
|
.map_err(|e| SdkError::Storage(e.to_string()))?;
|
||||||
|
|
||||||
|
let conv = if let Some(mut ex) = existing {
|
||||||
|
ex.mls_group_blob = mls_blob;
|
||||||
|
ex.member_keys = member_keys;
|
||||||
|
ex.is_hybrid = member.is_hybrid();
|
||||||
|
ex.last_activity_ms = now_ms();
|
||||||
|
ex
|
||||||
|
} else {
|
||||||
|
Conversation {
|
||||||
|
id: conv_id.clone(),
|
||||||
|
kind: ConversationKind::Group {
|
||||||
|
name: format!("group-{}", conv_id.hex()),
|
||||||
|
},
|
||||||
|
display_name: format!("group-{}", &conv_id.hex()[..8]),
|
||||||
|
mls_group_blob: mls_blob,
|
||||||
|
keystore_blob: None,
|
||||||
|
member_keys,
|
||||||
|
unread_count: 0,
|
||||||
|
last_activity_ms: now_ms(),
|
||||||
|
created_at_ms: now_ms(),
|
||||||
|
is_hybrid: member.is_hybrid(),
|
||||||
|
last_seen_seq: 0,
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
conv_store
|
||||||
|
.save_conversation(&conv)
|
||||||
|
.map_err(|e| SdkError::Storage(e.to_string()))?;
|
||||||
|
|
||||||
|
debug!(conv = %conv_id.hex(), "joined group from Welcome");
|
||||||
|
Ok(conv_id)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── MLS state persistence ───────────────────────────────────────────────────
|
||||||
|
|
||||||
|
/// Save MLS group state into a conversation record.
|
||||||
|
pub fn save_mls_state(
|
||||||
|
conv_store: &ConversationStore,
|
||||||
|
conv_id: &ConversationId,
|
||||||
|
member: &GroupMember,
|
||||||
|
) -> Result<(), SdkError> {
|
||||||
|
let mut conv = conv_store
|
||||||
|
.load_conversation(conv_id)
|
||||||
|
.map_err(|e| SdkError::Storage(e.to_string()))?
|
||||||
|
.ok_or_else(|| SdkError::ConversationNotFound(conv_id.hex()))?;
|
||||||
|
|
||||||
|
conv.mls_group_blob = member
|
||||||
|
.group_ref()
|
||||||
|
.map(bincode::serialize)
|
||||||
|
.transpose()
|
||||||
|
.map_err(|e| SdkError::Storage(format!("serialize MLS group: {e}")))?;
|
||||||
|
conv.member_keys = member.member_identities();
|
||||||
|
conv.is_hybrid = member.is_hybrid();
|
||||||
|
|
||||||
|
conv_store
|
||||||
|
.save_conversation(&conv)
|
||||||
|
.map_err(|e| SdkError::Storage(e.to_string()))?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Restore a `GroupMember` from a conversation record.
|
||||||
|
///
|
||||||
|
/// Returns `Err` if the conversation has no MLS group blob.
|
||||||
|
pub fn restore_mls_state(
|
||||||
|
conv: &Conversation,
|
||||||
|
identity: &Arc<IdentityKeypair>,
|
||||||
|
) -> Result<GroupMember, SdkError> {
|
||||||
|
let group_blob = conv
|
||||||
|
.mls_group_blob
|
||||||
|
.as_ref()
|
||||||
|
.ok_or_else(|| SdkError::Crypto("no MLS group blob in conversation".into()))?;
|
||||||
|
|
||||||
|
let mls_group = bincode::deserialize(group_blob)
|
||||||
|
.map_err(|e| SdkError::Crypto(format!("deserialize MLS group: {e}")))?;
|
||||||
|
|
||||||
|
let ks = quicproquo_core::DiskKeyStore::ephemeral();
|
||||||
|
let member = GroupMember::new_with_state(Arc::clone(identity), ks, Some(mls_group), conv.is_hybrid);
|
||||||
|
|
||||||
|
Ok(member)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Helpers ─────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
/// Optionally wrap data in hybrid encryption.
|
||||||
|
///
|
||||||
|
/// If `is_hybrid_mls` is true, MLS already provides PQ protection and we skip
|
||||||
|
/// the outer envelope.
|
||||||
|
fn wrap_hybrid(
|
||||||
|
my_kp: Option<&HybridKeypair>,
|
||||||
|
peer_pk: Option<&HybridPublicKey>,
|
||||||
|
data: &[u8],
|
||||||
|
is_hybrid_mls: bool,
|
||||||
|
) -> Result<Vec<u8>, SdkError> {
|
||||||
|
if is_hybrid_mls {
|
||||||
|
return Ok(data.to_vec());
|
||||||
|
}
|
||||||
|
match (my_kp, peer_pk) {
|
||||||
|
(Some(_), Some(pk)) => {
|
||||||
|
hybrid_encrypt(pk, data, b"", b"")
|
||||||
|
.map_err(|e| SdkError::Crypto(format!("hybrid encrypt: {e}")))
|
||||||
|
}
|
||||||
|
_ => Ok(data.to_vec()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Enqueue a payload to a peer via the ENQUEUE RPC.
|
||||||
|
async fn enqueue_to_peer(
|
||||||
|
rpc: &RpcClient,
|
||||||
|
recipient_key: &[u8],
|
||||||
|
payload: &[u8],
|
||||||
|
) -> Result<(), SdkError> {
|
||||||
|
let req = EnqueueRequest {
|
||||||
|
recipient_key: recipient_key.to_vec(),
|
||||||
|
payload: payload.to_vec(),
|
||||||
|
channel_id: Vec::new(),
|
||||||
|
ttl_secs: 0,
|
||||||
|
};
|
||||||
|
let resp_bytes = rpc
|
||||||
|
.call(method_ids::ENQUEUE, Bytes::from(req.encode_to_vec()))
|
||||||
|
.await?;
|
||||||
|
let _resp = EnqueueResponse::decode(resp_bytes)
|
||||||
|
.map_err(|e| SdkError::Crypto(format!("decode EnqueueResponse: {e}")))?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
@@ -10,7 +10,9 @@ pub mod conversation;
|
|||||||
pub mod devices;
|
pub mod devices;
|
||||||
pub mod error;
|
pub mod error;
|
||||||
pub mod events;
|
pub mod events;
|
||||||
|
pub mod groups;
|
||||||
pub mod keys;
|
pub mod keys;
|
||||||
pub mod messaging;
|
pub mod messaging;
|
||||||
|
pub mod outbox;
|
||||||
pub mod state;
|
pub mod state;
|
||||||
pub mod users;
|
pub mod users;
|
||||||
|
|||||||
76
crates/quicproquo-sdk/src/outbox.rs
Normal file
76
crates/quicproquo-sdk/src/outbox.rs
Normal file
@@ -0,0 +1,76 @@
|
|||||||
|
//! Offline outbox — queue messages for deferred delivery.
|
||||||
|
|
||||||
|
use bytes::Bytes;
|
||||||
|
use prost::Message;
|
||||||
|
use tracing::{debug, warn};
|
||||||
|
|
||||||
|
use quicproquo_proto::method_ids;
|
||||||
|
use quicproquo_proto::qpq::v1::{EnqueueRequest, EnqueueResponse};
|
||||||
|
use quicproquo_rpc::client::RpcClient;
|
||||||
|
|
||||||
|
use crate::conversation::{ConversationId, ConversationStore};
|
||||||
|
use crate::error::SdkError;
|
||||||
|
|
||||||
|
/// Queue a message for sending when connectivity is restored.
|
||||||
|
pub fn queue_outbox(
|
||||||
|
conv_store: &ConversationStore,
|
||||||
|
conv_id: &ConversationId,
|
||||||
|
recipient_key: &[u8],
|
||||||
|
payload: &[u8],
|
||||||
|
) -> Result<(), SdkError> {
|
||||||
|
conv_store
|
||||||
|
.enqueue_outbox(conv_id, recipient_key, payload)
|
||||||
|
.map_err(|e| SdkError::Storage(format!("enqueue outbox: {e}")))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Process all pending outbox entries — send them to the server.
|
||||||
|
///
|
||||||
|
/// Returns the number of entries successfully sent.
|
||||||
|
pub async fn flush_outbox(
|
||||||
|
rpc: &RpcClient,
|
||||||
|
conv_store: &ConversationStore,
|
||||||
|
) -> Result<usize, SdkError> {
|
||||||
|
let entries = conv_store
|
||||||
|
.load_pending_outbox()
|
||||||
|
.map_err(|e| SdkError::Storage(format!("load outbox: {e}")))?;
|
||||||
|
|
||||||
|
let mut sent = 0usize;
|
||||||
|
for entry in &entries {
|
||||||
|
let req = EnqueueRequest {
|
||||||
|
recipient_key: entry.recipient_key.clone(),
|
||||||
|
payload: entry.payload.clone(),
|
||||||
|
channel_id: Vec::new(),
|
||||||
|
ttl_secs: 0,
|
||||||
|
};
|
||||||
|
match rpc
|
||||||
|
.call(method_ids::ENQUEUE, Bytes::from(req.encode_to_vec()))
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(resp_bytes) => {
|
||||||
|
if let Err(e) = EnqueueResponse::decode(resp_bytes) {
|
||||||
|
warn!(outbox_id = entry.id, "decode enqueue response: {e}");
|
||||||
|
}
|
||||||
|
conv_store
|
||||||
|
.mark_outbox_sent(entry.id)
|
||||||
|
.map_err(|e| SdkError::Storage(format!("mark_outbox_sent: {e}")))?;
|
||||||
|
sent += 1;
|
||||||
|
debug!(outbox_id = entry.id, "outbox entry sent");
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!(outbox_id = entry.id, "outbox send failed: {e}");
|
||||||
|
conv_store
|
||||||
|
.mark_outbox_failed(entry.id, entry.retry_count + 1)
|
||||||
|
.map_err(|e| SdkError::Storage(format!("mark_outbox_failed: {e}")))?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(sent)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get the number of pending outbox entries.
|
||||||
|
pub fn outbox_count(conv_store: &ConversationStore) -> Result<usize, SdkError> {
|
||||||
|
conv_store
|
||||||
|
.count_pending_outbox()
|
||||||
|
.map_err(|e| SdkError::Storage(format!("count outbox: {e}")))
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user