feat: Sprint 4 — rich messaging: receipts, typing, reactions, edit/delete

- Auto-send read receipts on Chat/Reply receive, display "✓ read"
  notification (loop-safe: only Chat/Reply trigger receipts)
- Typing indicators with /typing command, 10s timeout expiry,
  /typing-notify toggle, ephemeral (not stored in DB)
- /react <emoji> [index] command for message reactions, display
  inline with sender name
- Add Edit (0x06) and Delete (0x07) AppMessage variants with
  serialize/parse, /edit and /delete REPL commands (own messages
  only), incoming edit/delete handling with DB updates
- 3 new roundtrip tests for Edit/Delete serialization (70 core tests)
This commit is contained in:
2026-03-04 00:12:06 +01:00
parent db46b72f58
commit 81d5e2e590
6 changed files with 631 additions and 8 deletions

View File

@@ -636,6 +636,33 @@ impl ConversationStore {
Ok(msgs)
}
/// Update the body of an existing message (for edits).
pub fn update_message_body(
&self,
conv_id: &ConversationId,
message_id: &[u8; 16],
new_body: &str,
) -> anyhow::Result<bool> {
let rows = self.conn.execute(
"UPDATE messages SET body = ?3 WHERE conversation_id = ?1 AND message_id = ?2",
params![conv_id.0.as_slice(), message_id.as_slice(), new_body],
)?;
Ok(rows > 0)
}
/// Mark a message as deleted (sets body to "[deleted]" and msg_type to "deleted").
pub fn delete_message(
&self,
conv_id: &ConversationId,
message_id: &[u8; 16],
) -> anyhow::Result<bool> {
let rows = self.conn.execute(
"UPDATE messages SET body = '[deleted]', msg_type = 'deleted' WHERE conversation_id = ?1 AND message_id = ?2",
params![conv_id.0.as_slice(), message_id.as_slice()],
)?;
Ok(rows > 0)
}
/// Save a message, deduplicating by message_id within the same conversation.
/// Returns `true` if the message was saved (new), `false` if it was a duplicate.
pub fn save_message_dedup(&self, msg: &StoredMessage) -> anyhow::Result<bool> {

View File

@@ -28,12 +28,17 @@ pub fn print_prompt(session: &SessionState) {
/// Print an incoming or outgoing message.
pub fn print_message(msg: &StoredMessage) {
let body = if msg.msg_type == "reaction" {
format!("reacted {}", msg.body)
} else {
msg.body.clone()
};
if msg.is_outgoing {
println!("\r{GREEN}> {}{RESET}", msg.body);
println!("\r{GREEN}> {body}{RESET}");
} else {
let fallback = hex::encode(&msg.sender_key[..4]);
let sender = msg.sender_name.as_deref().unwrap_or(&fallback);
println!("\r{CYAN}{BOLD}[{sender}]{RESET} {}", msg.body);
println!("\r{CYAN}{BOLD}[{sender}]{RESET} {body}");
}
}
@@ -51,6 +56,14 @@ pub fn print_status(msg: &str) {
println!("{DIM} {msg}{RESET}");
}
/// Print a transient typing indicator (clears current line first).
pub fn print_typing(sender: &str) {
use std::io::Write;
print!("\r\x1b[2K");
println!("{DIM} {sender} is typing...{RESET}");
let _ = std::io::stdout().flush();
}
/// Print an error message.
pub fn print_error(msg: &str) {
println!("{YELLOW} error: {msg}{RESET}");

View File

@@ -12,6 +12,8 @@ use anyhow::Context;
use quicproquo_core::{
AppMessage, DiskKeyStore, GroupMember, IdentityKeypair, ReceivedMessage,
compute_safety_number, hybrid_encrypt, parse as parse_app_msg, serialize_chat,
serialize_delete, serialize_edit, serialize_reaction, serialize_read_receipt,
serialize_typing,
};
use quicproquo_proto::node_capnp::node_service;
use tokio::sync::mpsc;
@@ -61,6 +63,16 @@ enum SlashCommand {
Verify { username: String },
/// Rotate own MLS leaf key in the active group.
UpdateKey,
/// Send a typing indicator to the active conversation.
Typing,
/// Toggle display of typing notifications from others.
TypingNotify { enabled: bool },
/// React to a message with an emoji.
React { emoji: String, index: Option<usize> },
/// Edit a previously sent message by index.
Edit { index: usize, new_text: String },
/// Delete a previously sent message by index.
Delete { index: usize },
}
fn parse_input(line: &str) -> Input {
@@ -147,6 +159,53 @@ fn parse_input(line: &str) -> Input {
}
},
"/update-key" | "/rotate-key" => Input::Slash(SlashCommand::UpdateKey),
"/typing" => Input::Slash(SlashCommand::Typing),
"/typing-notify" => match arg.as_deref() {
Some("on") => Input::Slash(SlashCommand::TypingNotify { enabled: true }),
Some("off") => Input::Slash(SlashCommand::TypingNotify { enabled: false }),
_ => {
display::print_error("usage: /typing-notify on|off");
Input::Empty
}
},
"/react" => match arg {
Some(rest) => {
let mut parts = rest.splitn(2, ' ');
let emoji = parts.next().unwrap().to_string();
let index = parts.next().and_then(|s| s.trim().parse::<usize>().ok());
Input::Slash(SlashCommand::React { emoji, index })
}
None => {
display::print_error("usage: /react <emoji> [msg-index]");
Input::Empty
}
},
"/edit" => match arg {
Some(rest) => {
let mut parts = rest.splitn(2, ' ');
let idx_str = parts.next().unwrap();
match (idx_str.parse::<usize>(), parts.next()) {
(Ok(index), Some(new_text)) if !new_text.trim().is_empty() => {
Input::Slash(SlashCommand::Edit { index, new_text: new_text.trim().to_string() })
}
_ => {
display::print_error("usage: /edit <msg-index> <new text>");
Input::Empty
}
}
}
None => {
display::print_error("usage: /edit <msg-index> <new text>");
Input::Empty
}
},
"/delete" | "/del" => match arg.and_then(|s| s.parse::<usize>().ok()) {
Some(index) => Input::Slash(SlashCommand::Delete { index }),
None => {
display::print_error("usage: /delete <msg-index>");
Input::Empty
}
},
_ => {
display::print_error(&format!("unknown command: {cmd}. Try /help"));
Input::Empty
@@ -425,6 +484,10 @@ pub async fn run_repl(
// Drain offline outbox before polling for new messages.
drain_outbox(&mut session, &client).await;
// Expire stale typing indicators (10-second timeout).
let now = std::time::Instant::now();
session.typing_indicators.retain(|_, ts| now.duration_since(*ts).as_secs() < 10);
match poll_messages(&mut session, &client).await {
Ok(()) => {
consecutive_errors = 0;
@@ -617,6 +680,18 @@ async fn handle_slash(
}
SlashCommand::Verify { username } => cmd_verify(session, client, &username).await,
SlashCommand::UpdateKey => cmd_update_key(session, client).await,
SlashCommand::Typing => cmd_typing(session, client).await,
SlashCommand::TypingNotify { enabled } => {
session.typing_notify_enabled = enabled;
display::print_status(&format!(
"typing notifications {}",
if enabled { "enabled" } else { "disabled" }
));
Ok(())
}
SlashCommand::React { emoji, index } => cmd_react(session, client, &emoji, index).await,
SlashCommand::Edit { index, new_text } => cmd_edit(session, client, index, &new_text).await,
SlashCommand::Delete { index } => cmd_delete(session, client, index).await,
};
if let Err(e) = result {
display::print_error(&format!("{e:#}"));
@@ -640,6 +715,11 @@ fn print_help() {
display::print_status(" /mesh server <host:port> - Show how to reconnect to a mesh node");
display::print_status(" /update-key - Rotate your MLS leaf key in the active group");
display::print_status(" /verify <username> - Show safety number for key verification");
display::print_status(" /react <emoji> [index] - React to last message (or message at index)");
display::print_status(" /typing - Send a typing indicator");
display::print_status(" /typing-notify on|off - Toggle typing notifications");
display::print_status(" /edit <index> <new text> - Edit a sent message");
display::print_status(" /delete <index> - Delete a sent message");
display::print_status(" /quit - Exit");
}
@@ -1298,6 +1378,295 @@ async fn cmd_verify(
Ok(())
}
// ── Typing indicator ─────────────────────────────────────────────────────────
async fn cmd_typing(
session: &mut SessionState,
client: &node_service::Client,
) -> anyhow::Result<()> {
let conv_id = session
.active_conversation
.as_ref()
.context("no active conversation")?
.clone();
let my_key = session.identity_bytes();
let identity = std::sync::Arc::clone(&session.identity);
let member = session
.get_member_mut(&conv_id)
.context("no group member")?;
anyhow::ensure!(
member.group_ref().is_some(),
"active conversation has no MLS group"
);
let app_payload = serialize_typing(1);
let sealed = quicproquo_core::sealed_sender::seal(&identity, &app_payload);
let padded = quicproquo_core::padding::pad(&sealed);
let ct = member
.send_message(&padded)
.context("MLS send_message failed")?;
let recipients: Vec<Vec<u8>> = member
.member_identities()
.into_iter()
.filter(|id| id.as_slice() != my_key.as_slice())
.collect();
for recipient_key in &recipients {
let peer_hybrid_pk = fetch_hybrid_key(client, recipient_key).await?;
let payload = if let Some(ref pk) = peer_hybrid_pk {
hybrid_encrypt(pk, &ct, b"", b"").context("hybrid encrypt")?
} else {
ct.clone()
};
enqueue(client, recipient_key, &payload).await?;
}
session.save_member(&conv_id)?;
display::print_status("typing indicator sent");
Ok(())
}
async fn cmd_react(
session: &mut SessionState,
client: &node_service::Client,
emoji: &str,
index: Option<usize>,
) -> anyhow::Result<()> {
let conv_id = session
.active_conversation
.as_ref()
.context("no active conversation; use /dm or /create-group first")?
.clone();
// Resolve the target message_id.
let ref_msg_id = if let Some(idx) = index {
// User specified a 1-based display index into conversation history.
let msgs = session.conv_store.load_all_messages(&conv_id)?;
let msg = msgs
.get(idx.saturating_sub(1))
.with_context(|| format!("no message at index {idx}"))?;
msg.message_id
.context("message at that index has no message_id")?
} else {
// React to the most recent non-outgoing chat/reply message.
let msgs = session.conv_store.load_recent_messages(&conv_id, 50)?;
let target = msgs
.iter()
.rev()
.find(|m| !m.is_outgoing && (m.msg_type == "chat" || m.msg_type == "reply"))
.context("no received messages to react to")?;
target
.message_id
.context("most recent message has no message_id")?
};
let my_key = session.identity_bytes();
let identity = Arc::clone(&session.identity);
let member = session
.get_member_mut(&conv_id)
.context("no group member")?;
anyhow::ensure!(
member.group_ref().is_some(),
"cannot react in a local-only conversation"
);
let app_payload = serialize_reaction(ref_msg_id, emoji.as_bytes())
.context("serialize reaction")?;
let sealed = quicproquo_core::sealed_sender::seal(&identity, &app_payload);
let padded = quicproquo_core::padding::pad(&sealed);
let ct = member
.send_message(&padded)
.context("MLS send_message failed")?;
let recipients: Vec<Vec<u8>> = member
.member_identities()
.into_iter()
.filter(|id| id.as_slice() != my_key.as_slice())
.collect();
for recipient_key in &recipients {
let peer_hybrid_pk = fetch_hybrid_key(client, recipient_key).await?;
let payload = if let Some(ref pk) = peer_hybrid_pk {
hybrid_encrypt(pk, &ct, b"", b"").context("hybrid encrypt")?
} else {
ct.clone()
};
enqueue(client, recipient_key, &payload).await?;
}
// Store outgoing reaction.
let msg = StoredMessage {
conversation_id: conv_id.clone(),
message_id: None,
sender_key: my_key,
sender_name: Some("you".into()),
body: emoji.to_string(),
msg_type: "reaction".into(),
ref_msg_id: Some(ref_msg_id),
timestamp_ms: now_ms(),
is_outgoing: true,
};
session.conv_store.save_message(&msg)?;
session.conv_store.update_activity(&conv_id, now_ms())?;
session.save_member(&conv_id)?;
display::print_status(&format!("reacted {emoji}"));
Ok(())
}
// ── Edit / Delete ────────────────────────────────────────────────────────────
async fn cmd_edit(
session: &mut SessionState,
client: &node_service::Client,
index: usize,
new_text: &str,
) -> anyhow::Result<()> {
let conv_id = session
.active_conversation
.as_ref()
.context("no active conversation")?
.clone();
let msgs = session.conv_store.load_all_messages(&conv_id)?;
anyhow::ensure!(!msgs.is_empty(), "no messages in this conversation");
anyhow::ensure!(
index < msgs.len(),
"message index {index} out of range (0..{})",
msgs.len() - 1
);
let target = &msgs[index];
anyhow::ensure!(target.is_outgoing, "you can only edit your own messages");
let msg_id = target
.message_id
.context("message has no message_id (cannot edit)")?;
let my_key = session.identity_bytes();
let identity = std::sync::Arc::clone(&session.identity);
let member = session
.get_member_mut(&conv_id)
.context("no group member")?;
anyhow::ensure!(
member.group_ref().is_some(),
"active conversation has no MLS group"
);
let app_payload = serialize_edit(&msg_id, new_text.as_bytes())
.context("serialize edit message")?;
let sealed = quicproquo_core::sealed_sender::seal(&identity, &app_payload);
let padded = quicproquo_core::padding::pad(&sealed);
let ct = member
.send_message(&padded)
.context("MLS send_message failed")?;
let recipients: Vec<Vec<u8>> = member
.member_identities()
.into_iter()
.filter(|id| id.as_slice() != my_key.as_slice())
.collect();
for recipient_key in &recipients {
let peer_hybrid_pk = fetch_hybrid_key(client, recipient_key).await?;
let payload = if let Some(ref pk) = peer_hybrid_pk {
hybrid_encrypt(pk, &ct, b"", b"").context("hybrid encrypt")?
} else {
ct.clone()
};
enqueue(client, recipient_key, &payload).await?;
}
// Update local DB.
session
.conv_store
.update_message_body(&conv_id, &msg_id, new_text)?;
session.save_member(&conv_id)?;
display::print_status("message edited");
Ok(())
}
async fn cmd_delete(
session: &mut SessionState,
client: &node_service::Client,
index: usize,
) -> anyhow::Result<()> {
let conv_id = session
.active_conversation
.as_ref()
.context("no active conversation")?
.clone();
let msgs = session.conv_store.load_all_messages(&conv_id)?;
anyhow::ensure!(!msgs.is_empty(), "no messages in this conversation");
anyhow::ensure!(
index < msgs.len(),
"message index {index} out of range (0..{})",
msgs.len() - 1
);
let target = &msgs[index];
anyhow::ensure!(target.is_outgoing, "you can only delete your own messages");
let msg_id = target
.message_id
.context("message has no message_id (cannot delete)")?;
let my_key = session.identity_bytes();
let identity = std::sync::Arc::clone(&session.identity);
let member = session
.get_member_mut(&conv_id)
.context("no group member")?;
anyhow::ensure!(
member.group_ref().is_some(),
"active conversation has no MLS group"
);
let app_payload = serialize_delete(&msg_id);
let sealed = quicproquo_core::sealed_sender::seal(&identity, &app_payload);
let padded = quicproquo_core::padding::pad(&sealed);
let ct = member
.send_message(&padded)
.context("MLS send_message failed")?;
let recipients: Vec<Vec<u8>> = member
.member_identities()
.into_iter()
.filter(|id| id.as_slice() != my_key.as_slice())
.collect();
for recipient_key in &recipients {
let peer_hybrid_pk = fetch_hybrid_key(client, recipient_key).await?;
let payload = if let Some(ref pk) = peer_hybrid_pk {
hybrid_encrypt(pk, &ct, b"", b"").context("hybrid encrypt")?
} else {
ct.clone()
};
enqueue(client, recipient_key, &payload).await?;
}
// Mark as deleted in local DB.
session.conv_store.delete_message(&conv_id, &msg_id)?;
session.save_member(&conv_id)?;
display::print_status("message deleted");
Ok(())
}
// ── Sending ──────────────────────────────────────────────────────────────────
async fn handle_send(
@@ -1475,9 +1844,101 @@ async fn poll_messages(
}
};
// Parse structured AppMessage; fall back to raw UTF-8 for legacy.
let (body, msg_id, msg_type, ref_msg_id) =
match parse_app_msg(&app_bytes) {
// Parse structured AppMessage; handle ephemeral types first.
let parsed = parse_app_msg(&app_bytes);
// Typing indicators: ephemeral display only, never stored.
if let Ok((_, AppMessage::Typing { active })) = &parsed {
if session.typing_notify_enabled {
let sender_hex = hex::encode(&sender_key[..4.min(sender_key.len())]);
if *active != 0 {
session.typing_indicators.insert(
sender_hex.clone(),
std::time::Instant::now(),
);
let is_active_conv = session
.active_conversation
.as_ref()
.map(|a| a == conv_id)
.unwrap_or(false);
if is_active_conv {
display::print_typing(&sender_hex);
display::print_prompt(session);
}
} else {
session.typing_indicators.remove(&sender_hex);
}
}
any_changed = true;
handled = true;
break;
}
// Read receipts: ephemeral, show subtle notification.
if let Ok((_, AppMessage::ReadReceipt { .. })) = &parsed {
let is_active = session
.active_conversation
.as_ref()
.map(|a| a == conv_id)
.unwrap_or(false);
if is_active {
let fallback = hex::encode(&sender_key[..4.min(sender_key.len())]);
let label = resolve_identity(client, &sender_key)
.await.ok().flatten().unwrap_or(fallback);
display::print_status(&format!("\u{2713} {label} read your message"));
}
any_changed = true;
handled = true;
break;
}
// Edit: update existing message body in DB.
if let Ok((_, AppMessage::Edit { ref_msg_id, body: edit_body })) = &parsed {
let new_body = String::from_utf8_lossy(edit_body).to_string();
session.conv_store.update_message_body(
conv_id, ref_msg_id, &new_body,
)?;
session.conv_store.update_activity(conv_id, now_ms())?;
let is_active = session
.active_conversation
.as_ref()
.map(|a| a == conv_id)
.unwrap_or(false);
if is_active {
let conv = session.conv_store.load_conversation(conv_id)?;
let conv_name = conv.map(|c| c.display_name).unwrap_or_default();
display::print_incoming(&conv_name, &format!("[edited] {new_body}"));
display::print_prompt(session);
}
any_changed = true;
handled = true;
break;
}
// Delete: mark existing message as deleted in DB.
if let Ok((_, AppMessage::Delete { ref_msg_id })) = &parsed {
session.conv_store.delete_message(conv_id, ref_msg_id)?;
session.conv_store.update_activity(conv_id, now_ms())?;
let is_active = session
.active_conversation
.as_ref()
.map(|a| a == conv_id)
.unwrap_or(false);
if is_active {
let conv = session.conv_store.load_conversation(conv_id)?;
let conv_name = conv.map(|c| c.display_name).unwrap_or_default();
display::print_incoming(&conv_name, "[deleted a message]");
display::print_prompt(session);
}
any_changed = true;
handled = true;
break;
}
// Storable message types: Chat, Reply, Reaction, legacy.
let (body, msg_id, msg_type, ref_msg_id) = match parsed {
Ok((_, AppMessage::Chat { message_id, body })) => (
String::from_utf8_lossy(&body).to_string(),
Some(message_id),
@@ -1507,6 +1968,10 @@ async fn poll_messages(
}
};
// A real message clears the sender's typing indicator.
let sender_hex = hex::encode(&sender_key[..4.min(sender_key.len())]);
session.typing_indicators.remove(&sender_hex);
let msg = StoredMessage {
conversation_id: conv_id.clone(),
message_id: msg_id,
@@ -1530,12 +1995,30 @@ async fn poll_messages(
if is_active {
let conv = session.conv_store.load_conversation(conv_id)?;
let conv_name = conv.map(|c| c.display_name).unwrap_or_default();
display::print_incoming(&conv_name, &body);
let display_body = if msg_type == "reaction" {
format!("reacted {body}")
} else {
body.clone()
};
display::print_incoming(&conv_name, &display_body);
display::print_prompt(session);
} else {
session.conv_store.increment_unread(conv_id)?;
}
// Auto-send read receipt for Chat and Reply (has message_id).
if let Some(mid) = msg_id {
let receipt_bytes = serialize_read_receipt(mid);
let identity = Arc::clone(&session.identity);
let sealed = quicproquo_core::sealed_sender::seal(&identity, &receipt_bytes);
let padded = quicproquo_core::padding::pad(&sealed);
if let Some(m) = session.members.get_mut(conv_id) {
if let Ok(ct) = m.send_message(&padded) {
let _ = enqueue(client, &sender_key, &ct).await;
}
}
}
any_changed = true;
handled = true;
break;

View File

@@ -6,6 +6,7 @@
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Instant;
use anyhow::Context;
@@ -36,6 +37,11 @@ pub struct SessionState {
/// Its keystore contains the HPKE init private key needed to decrypt
/// incoming Welcome messages. Consumed on auto-join, then replenished.
pub pending_member: Option<GroupMember>,
/// Whether to display typing indicators from others (session preference).
pub typing_notify_enabled: bool,
/// Tracks who is currently typing and when the indicator was last received.
/// Entries older than 10 seconds are considered expired.
pub typing_indicators: HashMap<String, Instant>,
}
impl SessionState {
@@ -69,6 +75,8 @@ impl SessionState {
active_conversation: None,
members: HashMap::new(),
pending_member: None,
typing_notify_enabled: true,
typing_indicators: HashMap::new(),
};
// Migrate legacy single-group into conversations if present and not yet migrated.