feat: implement MLS lifecycle and multi-device support
Phase 5.3 (MLS lifecycle): - Add group.proto with RemoveMember, UpdateGroupMetadata, ListGroupMembers, RotateKeys RPCs - Add GroupService domain logic with metadata and membership persistence - Add v2 RPC handlers for all 4 group management endpoints (method IDs 410-413) - Add SDK functions: remove_member_from_group, leave_group, rotate_group_keys, set_group_metadata, get_group_members - Add REPL commands: /group remove, /group rename, /group rotate-keys, /group leave - Add 5 unit tests for GroupService (metadata CRUD, membership add/list/remove) Phase 5.1 (multi-device): - Wire device_id through SDK fetch/ack functions (fetch_for_device, ack) - Add /devices list|add|remove REPL commands with tab completion - Add clear_failed_outbox to ConversationStore - Fix missing message_id/device_id fields in SDK proto struct initializers
This commit is contained in:
@@ -980,4 +980,62 @@ mod tests {
|
||||
let unblocked = store.load_recent_messages_filtered(&conv.id, 10).unwrap();
|
||||
assert_eq!(unblocked.len(), 3);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn clear_failed_outbox_entries() {
|
||||
let (_dir, store) = open_test_store();
|
||||
let conv_id = ConversationId([2u8; 16]);
|
||||
let recipient = vec![5u8; 32];
|
||||
|
||||
store.enqueue_outbox(&conv_id, &recipient, b"msg1").unwrap();
|
||||
store.enqueue_outbox(&conv_id, &recipient, b"msg2").unwrap();
|
||||
|
||||
let entries = store.load_pending_outbox().unwrap();
|
||||
assert_eq!(entries.len(), 2);
|
||||
|
||||
// Mark first as permanently failed (retry_count > 10).
|
||||
store.mark_outbox_failed(entries[0].id, 11).unwrap();
|
||||
// Mark second as retryable failure.
|
||||
store.mark_outbox_failed(entries[1].id, 3).unwrap();
|
||||
|
||||
// Only 1 pending (the retryable one).
|
||||
assert_eq!(store.count_pending_outbox().unwrap(), 1);
|
||||
|
||||
// Clear failed entries.
|
||||
let cleared = store.clear_failed_outbox().unwrap();
|
||||
assert_eq!(cleared, 1);
|
||||
|
||||
// Still 1 pending.
|
||||
assert_eq!(store.count_pending_outbox().unwrap(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn update_and_get_last_seen_seq() {
|
||||
let (_dir, store) = open_test_store();
|
||||
let conv = make_group_conv("seq-test", 1000);
|
||||
store.save_conversation(&conv).unwrap();
|
||||
|
||||
// Initially 0.
|
||||
assert_eq!(store.get_last_seen_seq(&conv.id).unwrap(), 0);
|
||||
|
||||
// Update to 5.
|
||||
store.update_last_seen_seq(&conv.id, 5).unwrap();
|
||||
assert_eq!(store.get_last_seen_seq(&conv.id).unwrap(), 5);
|
||||
|
||||
// Update to 10 — should work.
|
||||
store.update_last_seen_seq(&conv.id, 10).unwrap();
|
||||
assert_eq!(store.get_last_seen_seq(&conv.id).unwrap(), 10);
|
||||
|
||||
// Update to 7 — should be a no-op (only increases).
|
||||
store.update_last_seen_seq(&conv.id, 7).unwrap();
|
||||
assert_eq!(store.get_last_seen_seq(&conv.id).unwrap(), 10);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn get_last_seen_seq_missing_conversation() {
|
||||
let (_dir, store) = open_test_store();
|
||||
let missing = ConversationId([99u8; 16]);
|
||||
// Returns 0 for unknown conversations.
|
||||
assert_eq!(store.get_last_seen_seq(&missing).unwrap(), 0);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,6 +15,8 @@ use quicproquo_core::{
|
||||
use quicproquo_proto::method_ids;
|
||||
use quicproquo_proto::qpq::v1::{
|
||||
CreateChannelRequest, CreateChannelResponse, EnqueueRequest, EnqueueResponse,
|
||||
ListGroupMembersRequest, ListGroupMembersResponse, RemoveMemberRequest, RemoveMemberResponse,
|
||||
RotateKeysRequest, RotateKeysResponse, UpdateGroupMetadataRequest, UpdateGroupMetadataResponse,
|
||||
};
|
||||
use quicproquo_rpc::client::RpcClient;
|
||||
|
||||
@@ -292,6 +294,207 @@ pub fn join_from_welcome(
|
||||
Ok(conv_id)
|
||||
}
|
||||
|
||||
// ── Member removal ─────────────────────────────────────────────────────────
|
||||
|
||||
/// Remove a member from a group.
|
||||
///
|
||||
/// Generates an MLS Commit for the removal, sends it via the server RPC,
|
||||
/// and broadcasts the commit to remaining members.
|
||||
pub async fn remove_member_from_group(
|
||||
rpc: &RpcClient,
|
||||
conv_store: &ConversationStore,
|
||||
member: &mut GroupMember,
|
||||
conv_id: &ConversationId,
|
||||
member_identity_key: &[u8],
|
||||
) -> Result<(), SdkError> {
|
||||
// 1. MLS removal — generates a commit.
|
||||
let commit = member
|
||||
.remove_member(member_identity_key)
|
||||
.map_err(|e| SdkError::Crypto(format!("remove_member: {e}")))?;
|
||||
|
||||
// 2. Call the server-side RemoveMember RPC.
|
||||
let req = RemoveMemberRequest {
|
||||
group_id: conv_id.0.to_vec(),
|
||||
member_identity_key: member_identity_key.to_vec(),
|
||||
};
|
||||
let resp_bytes = rpc
|
||||
.call(method_ids::REMOVE_MEMBER, Bytes::from(req.encode_to_vec()))
|
||||
.await?;
|
||||
let _resp = RemoveMemberResponse::decode(resp_bytes)
|
||||
.map_err(|e| SdkError::Crypto(format!("decode RemoveMemberResponse: {e}")))?;
|
||||
|
||||
// 3. Broadcast the commit to remaining members.
|
||||
let remaining = member.member_identities();
|
||||
for key in &remaining {
|
||||
enqueue_to_peer(rpc, key, &commit).await?;
|
||||
}
|
||||
|
||||
// 4. Persist updated MLS state.
|
||||
save_mls_state(conv_store, conv_id, member)?;
|
||||
|
||||
debug!(
|
||||
conv = %conv_id.hex(),
|
||||
removed = %hex::encode(&member_identity_key[..4.min(member_identity_key.len())]),
|
||||
"removed member from group"
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ── Leave group ────────────────────────────────────────────────────────────
|
||||
|
||||
/// Leave a group. Generates a removal proposal for self and notifies members.
|
||||
pub async fn leave_group(
|
||||
rpc: &RpcClient,
|
||||
conv_store: &ConversationStore,
|
||||
member: &mut GroupMember,
|
||||
conv_id: &ConversationId,
|
||||
) -> Result<(), SdkError> {
|
||||
let proposal = member
|
||||
.leave_group()
|
||||
.map_err(|e| SdkError::Crypto(format!("leave_group: {e}")))?;
|
||||
|
||||
// Send the leave proposal to all remaining members so they can commit it.
|
||||
let members = member.member_identities();
|
||||
for key in &members {
|
||||
enqueue_to_peer(rpc, key, &proposal).await?;
|
||||
}
|
||||
|
||||
// Persist updated MLS state (now in a "left" state).
|
||||
save_mls_state(conv_store, conv_id, member)?;
|
||||
|
||||
debug!(conv = %conv_id.hex(), "left group");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ── Key rotation ───────────────────────────────────────────────────────────
|
||||
|
||||
/// Rotate group keys — self-update + commit pending proposals.
|
||||
///
|
||||
/// Broadcasts the commit to all group members via the server.
|
||||
pub async fn rotate_group_keys(
|
||||
rpc: &RpcClient,
|
||||
conv_store: &ConversationStore,
|
||||
member: &mut GroupMember,
|
||||
conv_id: &ConversationId,
|
||||
) -> Result<(), SdkError> {
|
||||
// 1. Propose self-update (new leaf key material).
|
||||
member
|
||||
.propose_self_update()
|
||||
.map_err(|e| SdkError::Crypto(format!("propose_self_update: {e}")))?;
|
||||
|
||||
// 2. Commit all pending proposals (including the self-update).
|
||||
let (commit, _welcome) = member
|
||||
.commit_pending_proposals()
|
||||
.map_err(|e| SdkError::Crypto(format!("commit_pending_proposals: {e}")))?;
|
||||
|
||||
// 3. Call server-side RotateKeys RPC.
|
||||
let req = RotateKeysRequest {
|
||||
group_id: conv_id.0.to_vec(),
|
||||
};
|
||||
let resp_bytes = rpc
|
||||
.call(method_ids::ROTATE_KEYS, Bytes::from(req.encode_to_vec()))
|
||||
.await?;
|
||||
let _resp = RotateKeysResponse::decode(resp_bytes)
|
||||
.map_err(|e| SdkError::Crypto(format!("decode RotateKeysResponse: {e}")))?;
|
||||
|
||||
// 4. Broadcast commit to all members.
|
||||
let members = member.member_identities();
|
||||
for key in &members {
|
||||
enqueue_to_peer(rpc, key, &commit).await?;
|
||||
}
|
||||
|
||||
// 5. Persist updated MLS state.
|
||||
save_mls_state(conv_store, conv_id, member)?;
|
||||
|
||||
debug!(conv = %conv_id.hex(), "rotated group keys");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ── Group metadata ─────────────────────────────────────────────────────────
|
||||
|
||||
/// Update group metadata (name, description, avatar) on the server.
|
||||
pub async fn set_group_metadata(
|
||||
rpc: &RpcClient,
|
||||
conv_store: &ConversationStore,
|
||||
conv_id: &ConversationId,
|
||||
name: &str,
|
||||
description: &str,
|
||||
avatar_hash: &[u8],
|
||||
) -> Result<(), SdkError> {
|
||||
let req = UpdateGroupMetadataRequest {
|
||||
group_id: conv_id.0.to_vec(),
|
||||
name: name.to_string(),
|
||||
description: description.to_string(),
|
||||
avatar_hash: avatar_hash.to_vec(),
|
||||
};
|
||||
let resp_bytes = rpc
|
||||
.call(
|
||||
method_ids::UPDATE_GROUP_METADATA,
|
||||
Bytes::from(req.encode_to_vec()),
|
||||
)
|
||||
.await?;
|
||||
let resp = UpdateGroupMetadataResponse::decode(resp_bytes)
|
||||
.map_err(|e| SdkError::Crypto(format!("decode UpdateGroupMetadataResponse: {e}")))?;
|
||||
|
||||
if !resp.success {
|
||||
return Err(SdkError::Other(anyhow::anyhow!(
|
||||
"server rejected metadata update"
|
||||
)));
|
||||
}
|
||||
|
||||
// Update local conversation display name if name is provided.
|
||||
if !name.is_empty() {
|
||||
if let Ok(Some(mut conv)) = conv_store.load_conversation(conv_id) {
|
||||
conv.display_name = format!("#{name}");
|
||||
conv.kind = ConversationKind::Group {
|
||||
name: name.to_string(),
|
||||
};
|
||||
let _ = conv_store.save_conversation(&conv);
|
||||
}
|
||||
}
|
||||
|
||||
debug!(conv = %conv_id.hex(), name = name, "updated group metadata");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Fetch group members from the server.
|
||||
pub async fn get_group_members(
|
||||
rpc: &RpcClient,
|
||||
conv_id: &ConversationId,
|
||||
) -> Result<Vec<GroupMemberInfoResult>, SdkError> {
|
||||
let req = ListGroupMembersRequest {
|
||||
group_id: conv_id.0.to_vec(),
|
||||
};
|
||||
let resp_bytes = rpc
|
||||
.call(
|
||||
method_ids::LIST_GROUP_MEMBERS,
|
||||
Bytes::from(req.encode_to_vec()),
|
||||
)
|
||||
.await?;
|
||||
let resp = ListGroupMembersResponse::decode(resp_bytes)
|
||||
.map_err(|e| SdkError::Crypto(format!("decode ListGroupMembersResponse: {e}")))?;
|
||||
|
||||
let members = resp
|
||||
.members
|
||||
.into_iter()
|
||||
.map(|m| GroupMemberInfoResult {
|
||||
identity_key: m.identity_key,
|
||||
username: m.username,
|
||||
joined_at: m.joined_at,
|
||||
})
|
||||
.collect();
|
||||
|
||||
Ok(members)
|
||||
}
|
||||
|
||||
/// SDK-side group member info returned by [`get_group_members`].
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct GroupMemberInfoResult {
|
||||
pub identity_key: Vec<u8>,
|
||||
pub username: String,
|
||||
pub joined_at: u64,
|
||||
}
|
||||
|
||||
// ── MLS state persistence ───────────────────────────────────────────────────
|
||||
|
||||
/// Save MLS group state into a conversation record.
|
||||
@@ -375,6 +578,7 @@ async fn enqueue_to_peer(
|
||||
payload: payload.to_vec(),
|
||||
channel_id: Vec::new(),
|
||||
ttl_secs: 0,
|
||||
message_id: Vec::new(),
|
||||
};
|
||||
let resp_bytes = rpc
|
||||
.call(method_ids::ENQUEUE, Bytes::from(req.encode_to_vec()))
|
||||
|
||||
Reference in New Issue
Block a user