feat: wire traffic resistance, implement v2 CLI commands, add auth expiry detection
Server: - Wire traffic resistance decoy generator into main.rs startup behind --traffic-resistance flag + --decoy-interval-ms config (feature-gated) Client: - Implement v2 CLI one-shot commands: send, recv, dm, group create, group invite All previously printed "coming soon" — now fully functional with MLS state restoration, peer resolution, KeyPackage fetch, and MLS encryption pipeline SDK: - Add SdkError::SessionExpired variant + is_auth_expired() helper for detecting expired session tokens (RpcStatus::Unauthorized) - Add ClientEvent::AuthExpired for UI-layer session expiry notification
This commit is contained in:
@@ -351,6 +351,25 @@ async fn connect_client(args: &Args) -> anyhow::Result<QpqClient> {
|
||||
Ok(client)
|
||||
}
|
||||
|
||||
/// Connect and return client + identity keypair (needed for MLS one-shot commands).
|
||||
async fn connect_with_identity(
|
||||
args: &Args,
|
||||
) -> anyhow::Result<(QpqClient, std::sync::Arc<quicprochat_core::IdentityKeypair>)> {
|
||||
let client = connect_client(args).await?;
|
||||
let keypair = if args.state.exists() {
|
||||
let stored =
|
||||
quicprochat_sdk::state::load_state(&args.state, args.db_password.as_deref())
|
||||
.context("load identity state — register or login first")?;
|
||||
std::sync::Arc::new(quicprochat_core::IdentityKeypair::from_seed(
|
||||
stored.identity_seed,
|
||||
))
|
||||
} else {
|
||||
anyhow::bail!("no state file found at {} — register or login first", args.state.display());
|
||||
};
|
||||
|
||||
Ok((client, keypair))
|
||||
}
|
||||
|
||||
// ── Entry point ──────────────────────────────────────────────────────────────
|
||||
|
||||
pub fn main() {
|
||||
@@ -446,34 +465,89 @@ async fn run(args: Args) -> anyhow::Result<()> {
|
||||
}
|
||||
|
||||
Cmd::Dm { ref username } => {
|
||||
let mut client = connect_client(&args).await?;
|
||||
v2_commands::cmd_resolve(&mut client, username)
|
||||
.await
|
||||
.context("dm setup failed")?;
|
||||
// For now, print the resolved key. Full DM creation requires
|
||||
// MLS group state, which will be handled in the REPL flow.
|
||||
println!("(DM creation with full MLS setup is available in the REPL)");
|
||||
let (client, identity) = connect_with_identity(&args).await?;
|
||||
let rpc = client.rpc().map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
let conv_store = client.conversations().map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
let peer_key = quicprochat_sdk::users::resolve_user(rpc, username)
|
||||
.await?
|
||||
.ok_or_else(|| anyhow::anyhow!("user '{username}' not found"))?;
|
||||
let key_package = quicprochat_sdk::keys::fetch_key_package(rpc, &peer_key)
|
||||
.await?
|
||||
.ok_or_else(|| anyhow::anyhow!("no KeyPackage available for peer"))?;
|
||||
let mut member = quicprochat_core::GroupMember::new(identity.clone());
|
||||
let (conv_id, was_new) = quicprochat_sdk::groups::create_dm(
|
||||
rpc, conv_store, &mut member, &identity,
|
||||
&peer_key, &key_package, None, None,
|
||||
).await?;
|
||||
if was_new {
|
||||
println!("DM with {username} created (id: {})", hex::encode(conv_id.0));
|
||||
} else {
|
||||
println!("DM with {username} resumed (id: {})", hex::encode(conv_id.0));
|
||||
}
|
||||
}
|
||||
|
||||
Cmd::Send { ref to, ref msg } => {
|
||||
let _ = (to, msg);
|
||||
let _client = connect_client(&args).await?;
|
||||
// Full send requires MLS group state restoration — deferred to REPL.
|
||||
println!("(send is currently available in the REPL; one-shot send coming soon)");
|
||||
let (client, identity) = connect_with_identity(&args).await?;
|
||||
let conv_store = client.conversations().map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
let conv_id = quicprochat_sdk::conversation::ConversationId::from_group_name(to);
|
||||
let conv = conv_store
|
||||
.load_conversation(&conv_id)?
|
||||
.ok_or_else(|| anyhow::anyhow!("conversation '{to}' not found"))?;
|
||||
let mut member = quicprochat_sdk::groups::restore_mls_state(&conv, &identity)?;
|
||||
let my_pub = identity.public_key_bytes();
|
||||
let recipients: Vec<Vec<u8>> = conv
|
||||
.member_keys
|
||||
.iter()
|
||||
.filter(|k| k.as_slice() != my_pub.as_slice())
|
||||
.cloned()
|
||||
.collect();
|
||||
let rpc = client.rpc().map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
let hybrid_keys = vec![None; recipients.len()];
|
||||
quicprochat_sdk::messaging::send_message(
|
||||
rpc, &mut member, &identity, msg, &recipients, &hybrid_keys, conv_id.0.as_slice(),
|
||||
).await?;
|
||||
quicprochat_sdk::groups::save_mls_state(conv_store, &conv_id, &member)?;
|
||||
println!("sent to {to}");
|
||||
}
|
||||
|
||||
Cmd::Recv { ref from } => {
|
||||
let _ = from;
|
||||
let _client = connect_client(&args).await?;
|
||||
println!("(recv is currently available in the REPL; one-shot recv coming soon)");
|
||||
let (client, identity) = connect_with_identity(&args).await?;
|
||||
let conv_store = client.conversations().map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
let conv_id = quicprochat_sdk::conversation::ConversationId::from_group_name(from);
|
||||
let conv = conv_store
|
||||
.load_conversation(&conv_id)?
|
||||
.ok_or_else(|| anyhow::anyhow!("conversation '{from}' not found"))?;
|
||||
let mut member = quicprochat_sdk::groups::restore_mls_state(&conv, &identity)?;
|
||||
let rpc = client.rpc().map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
let my_key = identity.public_key_bytes();
|
||||
let messages = quicprochat_sdk::messaging::receive_messages(
|
||||
rpc, &mut member, my_key.as_slice(), None, conv_id.0.as_slice(), &[],
|
||||
).await?;
|
||||
quicprochat_sdk::groups::save_mls_state(conv_store, &conv_id, &member)?;
|
||||
if messages.is_empty() {
|
||||
println!("no new messages");
|
||||
} else {
|
||||
for msg in &messages {
|
||||
let sender_short = hex::encode(&msg.sender_key[..4]);
|
||||
let body = match &msg.message {
|
||||
quicprochat_core::AppMessage::Chat { body, .. } => {
|
||||
String::from_utf8_lossy(body).to_string()
|
||||
}
|
||||
other => format!("{other:?}"),
|
||||
};
|
||||
println!("[{sender_short}] {body}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Cmd::Group {
|
||||
action: GroupCmd::Create { ref name },
|
||||
} => {
|
||||
let _ = name;
|
||||
let _client = connect_client(&args).await?;
|
||||
println!("(group create is currently available in the REPL; one-shot coming soon)");
|
||||
let (_client, identity) = connect_with_identity(&args).await?;
|
||||
let conv_store = _client.conversations().map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
let mut member = quicprochat_core::GroupMember::new(identity.clone());
|
||||
let conv_id = quicprochat_sdk::groups::create_group(conv_store, &mut member, name)?;
|
||||
println!("group '{name}' created (id: {})", hex::encode(conv_id.0));
|
||||
}
|
||||
|
||||
Cmd::Group {
|
||||
@@ -483,9 +557,26 @@ async fn run(args: Args) -> anyhow::Result<()> {
|
||||
ref user,
|
||||
},
|
||||
} => {
|
||||
let _ = (group, user);
|
||||
let _client = connect_client(&args).await?;
|
||||
println!("(group invite is currently available in the REPL; one-shot coming soon)");
|
||||
let (client, identity) = connect_with_identity(&args).await?;
|
||||
let rpc = client.rpc().map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
let conv_store = client.conversations().map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
let conv_id = quicprochat_sdk::conversation::ConversationId::from_group_name(group);
|
||||
let conv = conv_store
|
||||
.load_conversation(&conv_id)?
|
||||
.ok_or_else(|| anyhow::anyhow!("group '{group}' not found"))?;
|
||||
let mut member = quicprochat_sdk::groups::restore_mls_state(&conv, &identity)?;
|
||||
// Resolve peer identity key and fetch their KeyPackage.
|
||||
let peer_key = quicprochat_sdk::users::resolve_user(rpc, user)
|
||||
.await?
|
||||
.ok_or_else(|| anyhow::anyhow!("user '{user}' not found"))?;
|
||||
let key_package = quicprochat_sdk::keys::fetch_key_package(rpc, &peer_key)
|
||||
.await?
|
||||
.ok_or_else(|| anyhow::anyhow!("no KeyPackage available for peer"))?;
|
||||
quicprochat_sdk::groups::invite_to_group(
|
||||
rpc, conv_store, &mut member, &identity, &conv_id,
|
||||
&peer_key, &key_package, None, None,
|
||||
).await?;
|
||||
println!("invited {user} to '{group}'");
|
||||
}
|
||||
|
||||
Cmd::Devices {
|
||||
|
||||
@@ -24,6 +24,21 @@ pub enum SdkError {
|
||||
#[error("storage error: {0}")]
|
||||
Storage(String),
|
||||
|
||||
#[error("session expired — re-login required")]
|
||||
SessionExpired,
|
||||
|
||||
#[error("{0}")]
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
impl SdkError {
|
||||
/// Returns `true` if the error indicates the session token has expired
|
||||
/// and the user needs to re-authenticate.
|
||||
pub fn is_auth_expired(&self) -> bool {
|
||||
matches!(self, SdkError::SessionExpired)
|
||||
|| matches!(self, SdkError::Rpc(quicprochat_rpc::error::RpcError::Server {
|
||||
status: quicprochat_rpc::error::RpcStatus::Unauthorized,
|
||||
..
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -82,6 +82,10 @@ pub enum ClientEvent {
|
||||
received_seq: u64,
|
||||
},
|
||||
|
||||
/// Session token expired — the user must re-authenticate.
|
||||
/// Emitted when an RPC returns Unauthorized after a previously valid session.
|
||||
AuthExpired,
|
||||
|
||||
/// A peer's identity key changed — possible re-registration, new device,
|
||||
/// or MITM attack. The UI MUST alert the user (like Signal's "safety number changed").
|
||||
IdentityKeyChanged {
|
||||
@@ -241,6 +245,7 @@ mod tests {
|
||||
expected_seq: 0,
|
||||
received_seq: 1,
|
||||
},
|
||||
ClientEvent::AuthExpired,
|
||||
ClientEvent::IdentityKeyChanged {
|
||||
username: "u".into(),
|
||||
old_fingerprint: "old".into(),
|
||||
@@ -261,6 +266,6 @@ mod tests {
|
||||
for event in &events {
|
||||
let _ = event.clone();
|
||||
}
|
||||
assert_eq!(events.len(), 20);
|
||||
assert_eq!(events.len(), 21);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -179,6 +179,15 @@ struct Args {
|
||||
/// Storage/database operation timeout in seconds (default: 10).
|
||||
#[arg(long, env = "QPQ_STORAGE_TIMEOUT", default_value_t = config::DEFAULT_STORAGE_TIMEOUT_SECS)]
|
||||
storage_timeout: u64,
|
||||
|
||||
/// Enable traffic analysis resistance (decoy traffic + timing jitter).
|
||||
/// Requires --features traffic-resistance.
|
||||
#[arg(long, env = "QPQ_TRAFFIC_RESISTANCE", default_value_t = false)]
|
||||
traffic_resistance: bool,
|
||||
|
||||
/// Mean interval in milliseconds between decoy messages (default: 5000).
|
||||
#[arg(long, env = "QPQ_DECOY_INTERVAL_MS", default_value_t = 5000)]
|
||||
decoy_interval_ms: u64,
|
||||
}
|
||||
|
||||
// ── In-flight RPC guard ──────────────────────────────────────────────────────
|
||||
@@ -646,6 +655,40 @@ async fn main() -> anyhow::Result<()> {
|
||||
"effective timeouts and listeners"
|
||||
);
|
||||
|
||||
// ── Traffic resistance (decoy traffic generator) ──────────────────────────
|
||||
#[cfg(feature = "traffic-resistance")]
|
||||
let _decoy_handle = {
|
||||
if args.traffic_resistance {
|
||||
let shutdown_notify = Arc::new(tokio::sync::Notify::new());
|
||||
let delivery_svc = Arc::new(domain::delivery::DeliveryService {
|
||||
store: Arc::clone(&store),
|
||||
waiters: Arc::clone(&waiters),
|
||||
});
|
||||
let config = domain::traffic_resistance::TrafficResistanceConfig {
|
||||
decoy_interval_ms: args.decoy_interval_ms,
|
||||
..Default::default()
|
||||
};
|
||||
tracing::info!(
|
||||
decoy_interval_ms = config.decoy_interval_ms,
|
||||
jitter_max_ms = config.jitter_max_ms,
|
||||
padding_boundary = config.padding_boundary,
|
||||
"traffic resistance enabled — decoy generator started"
|
||||
);
|
||||
// Start with an empty recipient list; decoys will be a no-op until
|
||||
// recipients are populated. A future enhancement can dynamically
|
||||
// update the list from connected sessions.
|
||||
Some(domain::traffic_resistance::spawn_decoy_generator(
|
||||
delivery_svc,
|
||||
Vec::new(),
|
||||
b"decoy-channel".to_vec(),
|
||||
config,
|
||||
shutdown_notify,
|
||||
))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
// In-flight RPC counter for graceful drain on shutdown.
|
||||
let in_flight: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
|
||||
|
||||
|
||||
Reference in New Issue
Block a user