Rename all crate directories, package names, binary names, proto package/module paths, ALPN strings, env var prefixes, config filenames, mDNS service names, and plugin ABI symbols from quicproquo/qpq to quicprochat/qpc.
979 lines
31 KiB
Rust
979 lines
31 KiB
Rust
use std::net::SocketAddr;
|
|
use std::path::Path;
|
|
use std::sync::Arc;
|
|
|
|
use anyhow::Context;
|
|
use quinn::{ClientConfig, Endpoint};
|
|
use quinn_proto::crypto::rustls::QuicClientConfig;
|
|
use rustls::pki_types::{CertificateDer, ServerName, UnixTime};
|
|
use rustls::{ClientConfig as RustlsClientConfig, RootCertStore};
|
|
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
|
|
use capnp_rpc::{rpc_twoparty_capnp::Side, twoparty, RpcSystem};
|
|
|
|
use quicprochat_core::HybridPublicKey;
|
|
use quicprochat_proto::node_capnp::{auth, node_service};
|
|
|
|
use crate::{AUTH_CONTEXT, INSECURE_SKIP_VERIFY};
|
|
|
|
use super::retry::{anyhow_is_retriable, retry_async, DEFAULT_BASE_DELAY_MS, DEFAULT_MAX_RETRIES};
|
|
|
|
/// Cap'n Proto traversal limit (words). 4 Mi words = 32 MiB; bounds DoS from deeply nested or large messages.
|
|
const CAPNP_TRAVERSAL_LIMIT_WORDS: usize = 4 * 1024 * 1024;
|
|
|
|
/// A [`rustls::client::danger::ServerCertVerifier`] that accepts any certificate.
|
|
///
|
|
/// **Development only.** Using this in production disables all TLS guarantees.
|
|
#[derive(Debug)]
|
|
struct InsecureServerCertVerifier;
|
|
|
|
impl rustls::client::danger::ServerCertVerifier for InsecureServerCertVerifier {
|
|
fn verify_server_cert(
|
|
&self,
|
|
_end_entity: &CertificateDer<'_>,
|
|
_intermediates: &[CertificateDer<'_>],
|
|
_server_name: &ServerName<'_>,
|
|
_ocsp_response: &[u8],
|
|
_now: UnixTime,
|
|
) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
|
|
Ok(rustls::client::danger::ServerCertVerified::assertion())
|
|
}
|
|
|
|
fn verify_tls12_signature(
|
|
&self,
|
|
_message: &[u8],
|
|
_cert: &CertificateDer<'_>,
|
|
_dss: &rustls::DigitallySignedStruct,
|
|
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
|
|
Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
|
|
}
|
|
|
|
fn verify_tls13_signature(
|
|
&self,
|
|
_message: &[u8],
|
|
_cert: &CertificateDer<'_>,
|
|
_dss: &rustls::DigitallySignedStruct,
|
|
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
|
|
Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
|
|
}
|
|
|
|
fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
|
|
rustls::crypto::ring::default_provider()
|
|
.signature_verification_algorithms
|
|
.supported_schemes()
|
|
}
|
|
}
|
|
|
|
/// Establish a QUIC/TLS connection and return a `NodeService` client.
|
|
///
|
|
/// Must be called from within a `LocalSet` because capnp-rpc is `!Send`.
|
|
///
|
|
/// Reads [`INSECURE_SKIP_VERIFY`] to decide whether to bypass certificate
|
|
/// verification (set once at startup via [`crate::set_insecure_skip_verify`]).
|
|
pub async fn connect_node(
|
|
server: &str,
|
|
ca_cert: &Path,
|
|
server_name: &str,
|
|
) -> anyhow::Result<node_service::Client> {
|
|
let skip = INSECURE_SKIP_VERIFY.load(std::sync::atomic::Ordering::Relaxed);
|
|
connect_node_opt(server, ca_cert, server_name, skip).await
|
|
}
|
|
|
|
/// Like [`connect_node`] but with an explicit `insecure_skip_verify` toggle.
|
|
///
|
|
/// When `insecure_skip_verify` is `true`, certificate verification is disabled entirely.
|
|
/// This is intended for development and testing only.
|
|
pub async fn connect_node_opt(
|
|
server: &str,
|
|
ca_cert: &Path,
|
|
server_name: &str,
|
|
insecure_skip_verify: bool,
|
|
) -> anyhow::Result<node_service::Client> {
|
|
let addr: SocketAddr = server
|
|
.parse()
|
|
.with_context(|| format!("server must be host:port, got {server}"))?;
|
|
|
|
let mut tls = if insecure_skip_verify {
|
|
RustlsClientConfig::builder()
|
|
.dangerous()
|
|
.with_custom_certificate_verifier(Arc::new(InsecureServerCertVerifier))
|
|
.with_no_client_auth()
|
|
} else {
|
|
let cert_bytes =
|
|
std::fs::read(ca_cert).with_context(|| format!("read ca_cert {ca_cert:?}"))?;
|
|
let mut roots = RootCertStore::empty();
|
|
roots
|
|
.add(CertificateDer::from(cert_bytes))
|
|
.context("add root cert")?;
|
|
RustlsClientConfig::builder()
|
|
.with_root_certificates(roots)
|
|
.with_no_client_auth()
|
|
};
|
|
tls.alpn_protocols = vec![b"capnp".to_vec()];
|
|
|
|
let crypto = QuicClientConfig::try_from(tls)
|
|
.map_err(|e| anyhow::anyhow!("invalid client TLS config: {e}"))?;
|
|
|
|
let bind_addr: SocketAddr = "0.0.0.0:0".parse().context("parse client bind address")?;
|
|
let mut endpoint = Endpoint::client(bind_addr)?;
|
|
endpoint.set_default_client_config(ClientConfig::new(Arc::new(crypto)));
|
|
|
|
let connection = endpoint
|
|
.connect(addr, server_name)
|
|
.context("quic connect init")?
|
|
.await
|
|
.context("quic connect failed")?;
|
|
|
|
let (send, recv) = connection.open_bi().await.context("open bi stream")?;
|
|
|
|
let mut reader_opts = capnp::message::ReaderOptions::new();
|
|
reader_opts.traversal_limit_in_words(Some(CAPNP_TRAVERSAL_LIMIT_WORDS));
|
|
let network = twoparty::VatNetwork::new(
|
|
recv.compat(),
|
|
send.compat_write(),
|
|
Side::Client,
|
|
reader_opts,
|
|
);
|
|
|
|
let mut rpc_system = RpcSystem::new(Box::new(network), None);
|
|
let client: node_service::Client = rpc_system.bootstrap(Side::Server);
|
|
|
|
tokio::task::spawn_local(rpc_system);
|
|
|
|
Ok(client)
|
|
}
|
|
|
|
pub fn set_auth(auth: &mut auth::Builder<'_>) -> anyhow::Result<()> {
|
|
let guard = AUTH_CONTEXT
|
|
.read()
|
|
.map_err(|e| anyhow::anyhow!("AUTH_CONTEXT lock poisoned: {e}"))?;
|
|
let ctx = guard.as_ref().ok_or_else(|| {
|
|
anyhow::anyhow!(
|
|
"init_auth must be called before RPCs (use a bearer or session token for authenticated commands)"
|
|
)
|
|
})?;
|
|
auth.set_version(ctx.version);
|
|
auth.set_access_token(&ctx.access_token);
|
|
auth.set_device_id(&ctx.device_id);
|
|
Ok(())
|
|
}
|
|
|
|
/// Upload a KeyPackage and verify the fingerprint echoed by the AS.
|
|
pub async fn upload_key_package(
|
|
client: &node_service::Client,
|
|
identity_key: &[u8],
|
|
package: &[u8],
|
|
) -> anyhow::Result<()> {
|
|
let mut req = client.upload_key_package_request();
|
|
{
|
|
let mut p = req.get();
|
|
p.set_identity_key(identity_key);
|
|
p.set_package(package);
|
|
let mut auth = p.reborrow().init_auth();
|
|
set_auth(&mut auth)?;
|
|
}
|
|
|
|
let resp = req
|
|
.send()
|
|
.promise
|
|
.await
|
|
.context("upload_key_package RPC failed")?;
|
|
|
|
let server_fp = resp
|
|
.get()
|
|
.context("upload_key_package: bad response")?
|
|
.get_fingerprint()
|
|
.context("upload_key_package: missing fingerprint")?
|
|
.to_vec();
|
|
|
|
let local_fp = super::state::sha256(package);
|
|
anyhow::ensure!(server_fp == local_fp, "fingerprint mismatch");
|
|
Ok(())
|
|
}
|
|
|
|
/// Fetch a KeyPackage for `identity_key` from the AS.
|
|
pub async fn fetch_key_package(
|
|
client: &node_service::Client,
|
|
identity_key: &[u8],
|
|
) -> anyhow::Result<Vec<u8>> {
|
|
let mut req = client.fetch_key_package_request();
|
|
{
|
|
let mut p = req.get();
|
|
p.set_identity_key(identity_key);
|
|
let mut auth = p.reborrow().init_auth();
|
|
set_auth(&mut auth)?;
|
|
}
|
|
|
|
let resp = req
|
|
.send()
|
|
.promise
|
|
.await
|
|
.context("fetch_key_package RPC failed")?;
|
|
|
|
let pkg = resp
|
|
.get()
|
|
.context("fetch_key_package: bad response")?
|
|
.get_package()
|
|
.context("fetch_key_package: missing package field")?
|
|
.to_vec();
|
|
|
|
Ok(pkg)
|
|
}
|
|
|
|
/// Enqueue an opaque payload to the DS for `recipient_key`.
|
|
/// Returns the per-inbox sequence number assigned by the server.
|
|
/// Retries on transient failures with exponential backoff.
|
|
pub async fn enqueue(
|
|
client: &node_service::Client,
|
|
recipient_key: &[u8],
|
|
payload: &[u8],
|
|
) -> anyhow::Result<u64> {
|
|
enqueue_with_ttl(client, recipient_key, payload, None).await
|
|
}
|
|
|
|
/// Enqueue with an optional TTL (seconds). 0 or None means no expiry.
|
|
pub async fn enqueue_with_ttl(
|
|
client: &node_service::Client,
|
|
recipient_key: &[u8],
|
|
payload: &[u8],
|
|
ttl_secs: Option<u32>,
|
|
) -> anyhow::Result<u64> {
|
|
let client = client.clone();
|
|
let recipient_key = recipient_key.to_vec();
|
|
let payload = payload.to_vec();
|
|
retry_async(
|
|
|| {
|
|
let client = client.clone();
|
|
let recipient_key = recipient_key.clone();
|
|
let payload = payload.clone();
|
|
async move {
|
|
let mut req = client.enqueue_request();
|
|
{
|
|
let mut p = req.get();
|
|
p.set_recipient_key(&recipient_key);
|
|
p.set_payload(&payload);
|
|
p.set_channel_id(&[]);
|
|
p.set_version(1);
|
|
if let Some(ttl) = ttl_secs {
|
|
p.set_ttl_secs(ttl);
|
|
}
|
|
let mut auth = p.reborrow().init_auth();
|
|
set_auth(&mut auth)?;
|
|
}
|
|
let resp = req.send().promise.await.context("enqueue RPC failed")?;
|
|
let seq = resp.get().context("enqueue: bad response")?.get_seq();
|
|
Ok(seq)
|
|
}
|
|
},
|
|
DEFAULT_MAX_RETRIES,
|
|
DEFAULT_BASE_DELAY_MS,
|
|
anyhow_is_retriable,
|
|
)
|
|
.await
|
|
}
|
|
|
|
/// Fetch and drain all payloads for `recipient_key`.
|
|
/// Returns `(seq, payload)` pairs — sort by `seq` before MLS processing.
|
|
/// Retries on transient failures with exponential backoff.
|
|
pub async fn fetch_all(
|
|
client: &node_service::Client,
|
|
recipient_key: &[u8],
|
|
) -> anyhow::Result<Vec<(u64, Vec<u8>)>> {
|
|
let client = client.clone();
|
|
let recipient_key = recipient_key.to_vec();
|
|
retry_async(
|
|
|| {
|
|
let client = client.clone();
|
|
let recipient_key = recipient_key.clone();
|
|
async move {
|
|
let mut req = client.fetch_request();
|
|
{
|
|
let mut p = req.get();
|
|
p.set_recipient_key(&recipient_key);
|
|
p.set_channel_id(&[]);
|
|
p.set_version(1);
|
|
p.set_limit(0); // fetch all
|
|
let mut auth = p.reborrow().init_auth();
|
|
set_auth(&mut auth)?;
|
|
}
|
|
|
|
let resp = req.send().promise.await.context("fetch RPC failed")?;
|
|
|
|
let list = resp
|
|
.get()
|
|
.context("fetch: bad response")?
|
|
.get_payloads()
|
|
.context("fetch: missing payloads")?;
|
|
|
|
let mut payloads = Vec::with_capacity(list.len() as usize);
|
|
for i in 0..list.len() {
|
|
let entry = list.get(i);
|
|
let seq = entry.get_seq();
|
|
let data = entry
|
|
.get_data()
|
|
.context("fetch: envelope data read failed")?
|
|
.to_vec();
|
|
payloads.push((seq, data));
|
|
}
|
|
|
|
Ok(payloads)
|
|
}
|
|
},
|
|
DEFAULT_MAX_RETRIES,
|
|
DEFAULT_BASE_DELAY_MS,
|
|
anyhow_is_retriable,
|
|
)
|
|
.await
|
|
}
|
|
|
|
/// Long-poll for payloads with optional timeout (ms).
|
|
/// Returns `(seq, payload)` pairs — sort by `seq` before MLS processing.
|
|
/// Retries on transient failures with exponential backoff.
|
|
pub async fn fetch_wait(
|
|
client: &node_service::Client,
|
|
recipient_key: &[u8],
|
|
timeout_ms: u64,
|
|
) -> anyhow::Result<Vec<(u64, Vec<u8>)>> {
|
|
let client = client.clone();
|
|
let recipient_key = recipient_key.to_vec();
|
|
retry_async(
|
|
|| {
|
|
let client = client.clone();
|
|
let recipient_key = recipient_key.clone();
|
|
async move {
|
|
let mut req = client.fetch_wait_request();
|
|
{
|
|
let mut p = req.get();
|
|
p.set_recipient_key(&recipient_key);
|
|
p.set_timeout_ms(timeout_ms);
|
|
p.set_channel_id(&[]);
|
|
p.set_version(1);
|
|
p.set_limit(0); // fetch all
|
|
let mut auth = p.reborrow().init_auth();
|
|
set_auth(&mut auth)?;
|
|
}
|
|
|
|
let resp = req.send().promise.await.context("fetch_wait RPC failed")?;
|
|
|
|
let list = resp
|
|
.get()
|
|
.context("fetch_wait: bad response")?
|
|
.get_payloads()
|
|
.context("fetch_wait: missing payloads")?;
|
|
|
|
let mut payloads = Vec::with_capacity(list.len() as usize);
|
|
for i in 0..list.len() {
|
|
let entry = list.get(i);
|
|
let seq = entry.get_seq();
|
|
let data = entry
|
|
.get_data()
|
|
.context("fetch_wait: envelope data read failed")?
|
|
.to_vec();
|
|
payloads.push((seq, data));
|
|
}
|
|
|
|
Ok(payloads)
|
|
}
|
|
},
|
|
DEFAULT_MAX_RETRIES,
|
|
DEFAULT_BASE_DELAY_MS,
|
|
anyhow_is_retriable,
|
|
)
|
|
.await
|
|
}
|
|
|
|
/// Upload a hybrid (X25519 + ML-KEM-768) public key for an identity.
|
|
pub async fn upload_hybrid_key(
|
|
client: &node_service::Client,
|
|
identity_key: &[u8],
|
|
hybrid_pk: &HybridPublicKey,
|
|
) -> anyhow::Result<()> {
|
|
let mut req = client.upload_hybrid_key_request();
|
|
{
|
|
let mut p = req.get();
|
|
p.set_identity_key(identity_key);
|
|
p.set_hybrid_public_key(&hybrid_pk.to_bytes());
|
|
let mut auth = p.reborrow().init_auth();
|
|
set_auth(&mut auth)?;
|
|
}
|
|
req.send()
|
|
.promise
|
|
.await
|
|
.context("upload_hybrid_key RPC failed")?;
|
|
Ok(())
|
|
}
|
|
|
|
/// Fetch a peer's hybrid public key from the server.
|
|
///
|
|
/// Returns `None` if the peer has not uploaded a hybrid key.
|
|
pub async fn fetch_hybrid_key(
|
|
client: &node_service::Client,
|
|
identity_key: &[u8],
|
|
) -> anyhow::Result<Option<HybridPublicKey>> {
|
|
let mut req = client.fetch_hybrid_key_request();
|
|
{
|
|
let mut p = req.get();
|
|
p.set_identity_key(identity_key);
|
|
let mut auth = p.reborrow().init_auth();
|
|
set_auth(&mut auth)?;
|
|
}
|
|
|
|
let resp = req
|
|
.send()
|
|
.promise
|
|
.await
|
|
.context("fetch_hybrid_key RPC failed")?;
|
|
|
|
let pk_bytes = resp
|
|
.get()
|
|
.context("fetch_hybrid_key: bad response")?
|
|
.get_hybrid_public_key()
|
|
.context("fetch_hybrid_key: missing field")?
|
|
.to_vec();
|
|
|
|
if pk_bytes.is_empty() {
|
|
return Ok(None);
|
|
}
|
|
|
|
let pk = HybridPublicKey::from_bytes(&pk_bytes).context("invalid hybrid public key")?;
|
|
Ok(Some(pk))
|
|
}
|
|
|
|
/// Decrypt a hybrid envelope. Requires a hybrid key; no fallback to plaintext MLS.
|
|
pub fn try_hybrid_decrypt(
|
|
hybrid_kp: Option<&quicprochat_core::HybridKeypair>,
|
|
payload: &[u8],
|
|
) -> anyhow::Result<Vec<u8>> {
|
|
let kp = hybrid_kp.ok_or_else(|| anyhow::anyhow!("hybrid key required for decryption"))?;
|
|
quicprochat_core::hybrid_decrypt(kp, payload, b"", b"").map_err(|e| anyhow::anyhow!("{e}"))
|
|
}
|
|
|
|
/// Peek at queued payloads without removing them.
|
|
/// Returns `(seq, payload)` pairs sorted by seq.
|
|
/// Retries on transient failures with exponential backoff.
|
|
pub async fn peek(
|
|
client: &node_service::Client,
|
|
recipient_key: &[u8],
|
|
) -> anyhow::Result<Vec<(u64, Vec<u8>)>> {
|
|
let client = client.clone();
|
|
let recipient_key = recipient_key.to_vec();
|
|
retry_async(
|
|
|| {
|
|
let client = client.clone();
|
|
let recipient_key = recipient_key.clone();
|
|
async move {
|
|
let mut req = client.peek_request();
|
|
{
|
|
let mut p = req.get();
|
|
p.set_recipient_key(&recipient_key);
|
|
p.set_channel_id(&[]);
|
|
p.set_version(1);
|
|
p.set_limit(0); // peek all
|
|
let mut auth = p.reborrow().init_auth();
|
|
set_auth(&mut auth)?;
|
|
}
|
|
|
|
let resp = req.send().promise.await.context("peek RPC failed")?;
|
|
|
|
let list = resp
|
|
.get()
|
|
.context("peek: bad response")?
|
|
.get_payloads()
|
|
.context("peek: missing payloads")?;
|
|
|
|
let mut payloads = Vec::with_capacity(list.len() as usize);
|
|
for i in 0..list.len() {
|
|
let entry = list.get(i);
|
|
let seq = entry.get_seq();
|
|
let data = entry
|
|
.get_data()
|
|
.context("peek: envelope data read failed")?
|
|
.to_vec();
|
|
payloads.push((seq, data));
|
|
}
|
|
|
|
Ok(payloads)
|
|
}
|
|
},
|
|
DEFAULT_MAX_RETRIES,
|
|
DEFAULT_BASE_DELAY_MS,
|
|
anyhow_is_retriable,
|
|
)
|
|
.await
|
|
}
|
|
|
|
/// Acknowledge all messages up to and including `seq_up_to`.
|
|
/// Retries on transient failures with exponential backoff.
|
|
pub async fn ack(
|
|
client: &node_service::Client,
|
|
recipient_key: &[u8],
|
|
seq_up_to: u64,
|
|
) -> anyhow::Result<()> {
|
|
let client = client.clone();
|
|
let recipient_key = recipient_key.to_vec();
|
|
retry_async(
|
|
|| {
|
|
let client = client.clone();
|
|
let recipient_key = recipient_key.clone();
|
|
async move {
|
|
let mut req = client.ack_request();
|
|
{
|
|
let mut p = req.get();
|
|
p.set_recipient_key(&recipient_key);
|
|
p.set_channel_id(&[]);
|
|
p.set_version(1);
|
|
p.set_seq_up_to(seq_up_to);
|
|
let mut auth = p.reborrow().init_auth();
|
|
set_auth(&mut auth)?;
|
|
}
|
|
req.send().promise.await.context("ack RPC failed")?;
|
|
Ok(())
|
|
}
|
|
},
|
|
DEFAULT_MAX_RETRIES,
|
|
DEFAULT_BASE_DELAY_MS,
|
|
anyhow_is_retriable,
|
|
)
|
|
.await
|
|
}
|
|
|
|
/// Fetch multiple peers' hybrid keys in a single round-trip.
|
|
/// Returns `None` for peers who have not uploaded a hybrid key.
|
|
/// Retries on transient failures with exponential backoff.
|
|
pub async fn fetch_hybrid_keys(
|
|
client: &node_service::Client,
|
|
identity_keys: &[&[u8]],
|
|
) -> anyhow::Result<Vec<Option<HybridPublicKey>>> {
|
|
let client = client.clone();
|
|
let identity_keys: Vec<Vec<u8>> = identity_keys.iter().map(|k| k.to_vec()).collect();
|
|
retry_async(
|
|
|| {
|
|
let client = client.clone();
|
|
let identity_keys = identity_keys.clone();
|
|
async move {
|
|
let mut req = client.fetch_hybrid_keys_request();
|
|
{
|
|
let mut p = req.get();
|
|
let mut list = p.reborrow().init_identity_keys(identity_keys.len() as u32);
|
|
for (i, ik) in identity_keys.iter().enumerate() {
|
|
list.set(i as u32, ik);
|
|
}
|
|
let mut auth = p.reborrow().init_auth();
|
|
set_auth(&mut auth)?;
|
|
}
|
|
|
|
let resp = req
|
|
.send()
|
|
.promise
|
|
.await
|
|
.context("fetch_hybrid_keys RPC failed")?;
|
|
|
|
let keys = resp
|
|
.get()
|
|
.context("fetch_hybrid_keys: bad response")?
|
|
.get_keys()
|
|
.context("fetch_hybrid_keys: missing keys")?;
|
|
|
|
let mut result = Vec::with_capacity(keys.len() as usize);
|
|
for i in 0..keys.len() {
|
|
let pk_bytes = keys
|
|
.get(i)
|
|
.context("fetch_hybrid_keys: key read failed")?
|
|
.to_vec();
|
|
if pk_bytes.is_empty() {
|
|
result.push(None);
|
|
} else {
|
|
let pk = HybridPublicKey::from_bytes(&pk_bytes)
|
|
.context("invalid hybrid public key")?;
|
|
result.push(Some(pk));
|
|
}
|
|
}
|
|
|
|
Ok(result)
|
|
}
|
|
},
|
|
DEFAULT_MAX_RETRIES,
|
|
DEFAULT_BASE_DELAY_MS,
|
|
anyhow_is_retriable,
|
|
)
|
|
.await
|
|
}
|
|
|
|
/// Enqueue the same payload to multiple recipients in a single round-trip.
|
|
/// Returns per-recipient sequence numbers.
|
|
/// Retries on transient failures with exponential backoff.
|
|
pub async fn batch_enqueue(
|
|
client: &node_service::Client,
|
|
recipient_keys: &[&[u8]],
|
|
payload: &[u8],
|
|
) -> anyhow::Result<Vec<u64>> {
|
|
let client = client.clone();
|
|
let recipient_keys: Vec<Vec<u8>> = recipient_keys.iter().map(|k| k.to_vec()).collect();
|
|
let payload = payload.to_vec();
|
|
retry_async(
|
|
|| {
|
|
let client = client.clone();
|
|
let recipient_keys = recipient_keys.clone();
|
|
let payload = payload.clone();
|
|
async move {
|
|
let mut req = client.batch_enqueue_request();
|
|
{
|
|
let mut p = req.get();
|
|
let mut list = p.reborrow().init_recipient_keys(recipient_keys.len() as u32);
|
|
for (i, rk) in recipient_keys.iter().enumerate() {
|
|
list.set(i as u32, rk);
|
|
}
|
|
p.set_payload(&payload);
|
|
p.set_channel_id(&[]);
|
|
p.set_version(1);
|
|
let mut auth = p.reborrow().init_auth();
|
|
set_auth(&mut auth)?;
|
|
}
|
|
|
|
let resp = req
|
|
.send()
|
|
.promise
|
|
.await
|
|
.context("batch_enqueue RPC failed")?;
|
|
|
|
let seqs = resp
|
|
.get()
|
|
.context("batch_enqueue: bad response")?
|
|
.get_seqs()
|
|
.context("batch_enqueue: missing seqs")?;
|
|
|
|
let mut result = Vec::with_capacity(seqs.len() as usize);
|
|
for i in 0..seqs.len() {
|
|
result.push(seqs.get(i));
|
|
}
|
|
|
|
Ok(result)
|
|
}
|
|
},
|
|
DEFAULT_MAX_RETRIES,
|
|
DEFAULT_BASE_DELAY_MS,
|
|
anyhow_is_retriable,
|
|
)
|
|
.await
|
|
}
|
|
|
|
/// Resolve a username to its Ed25519 identity key (32 bytes).
|
|
///
|
|
/// When the server returns a non-empty `inclusionProof`, the client verifies it
|
|
/// against the identity key using the Key Transparency Merkle proof. Proof
|
|
/// verification failure is treated as a hard error (the server is misbehaving).
|
|
/// If the server sends no proof (empty field), the key is returned as-is —
|
|
/// callers can decide whether to require proofs for security-critical flows.
|
|
///
|
|
/// Returns `None` if the username is not registered.
|
|
pub async fn resolve_user(
|
|
client: &node_service::Client,
|
|
username: &str,
|
|
) -> anyhow::Result<Option<Vec<u8>>> {
|
|
let mut req = client.resolve_user_request();
|
|
{
|
|
let mut p = req.get();
|
|
p.set_username(username);
|
|
let mut auth = p.reborrow().init_auth();
|
|
set_auth(&mut auth)?;
|
|
}
|
|
|
|
let resp = req
|
|
.send()
|
|
.promise
|
|
.await
|
|
.context("resolve_user RPC failed")?;
|
|
|
|
let reader = resp.get().context("resolve_user: bad response")?;
|
|
|
|
let key = reader
|
|
.get_identity_key()
|
|
.context("resolve_user: missing identity_key field")?
|
|
.to_vec();
|
|
|
|
if key.is_empty() {
|
|
return Ok(None);
|
|
}
|
|
|
|
// Verify the KT inclusion proof when the server sends one.
|
|
let proof_bytes = reader
|
|
.get_inclusion_proof()
|
|
.context("resolve_user: missing inclusion_proof field")?
|
|
.to_vec();
|
|
|
|
if !proof_bytes.is_empty() {
|
|
let proof = quicprochat_kt::InclusionProof::from_bytes(&proof_bytes)
|
|
.context("resolve_user: inclusion proof deserialise failed")?;
|
|
quicprochat_kt::verify_inclusion(&proof, username, &key)
|
|
.context("resolve_user: KT inclusion proof verification FAILED — possible key mislabelling")?;
|
|
}
|
|
|
|
Ok(Some(key))
|
|
}
|
|
|
|
/// Reverse lookup: resolve an identity key to the registered username.
|
|
/// Returns `None` if no username is associated with the key.
|
|
pub async fn resolve_identity(
|
|
client: &node_service::Client,
|
|
identity_key: &[u8],
|
|
) -> anyhow::Result<Option<String>> {
|
|
let mut req = client.resolve_identity_request();
|
|
{
|
|
let mut p = req.get();
|
|
p.set_identity_key(identity_key);
|
|
let mut auth = p.reborrow().init_auth();
|
|
set_auth(&mut auth)?;
|
|
}
|
|
|
|
let resp = req
|
|
.send()
|
|
.promise
|
|
.await
|
|
.context("resolve_identity RPC failed")?;
|
|
|
|
let username = resp
|
|
.get()
|
|
.context("resolve_identity: bad response")?
|
|
.get_username()
|
|
.context("resolve_identity: missing field")?
|
|
.to_str()
|
|
.unwrap_or("")
|
|
.to_string();
|
|
|
|
if username.is_empty() {
|
|
Ok(None)
|
|
} else {
|
|
Ok(Some(username))
|
|
}
|
|
}
|
|
|
|
/// Create a 1:1 DM channel with a peer.
|
|
///
|
|
/// Returns `(channel_id, was_new)` where `channel_id` is the stable 16-byte identifier and
|
|
/// `was_new` is `true` iff this call created the channel for the first time. When `was_new` is
|
|
/// `false`, the channel already existed (created by the peer), and the caller should wait for
|
|
/// the peer's MLS Welcome to arrive via the background poller rather than creating a new MLS group.
|
|
pub async fn create_channel(
|
|
client: &node_service::Client,
|
|
peer_key: &[u8],
|
|
) -> anyhow::Result<(Vec<u8>, bool)> {
|
|
let mut req = client.create_channel_request();
|
|
{
|
|
let mut p = req.get();
|
|
p.set_peer_key(peer_key);
|
|
let mut auth = p.reborrow().init_auth();
|
|
set_auth(&mut auth)?;
|
|
}
|
|
|
|
let resp = req
|
|
.send()
|
|
.promise
|
|
.await
|
|
.context("create_channel RPC failed")?;
|
|
|
|
let reader = resp.get().context("create_channel: bad response")?;
|
|
let channel_id = reader
|
|
.get_channel_id()
|
|
.context("create_channel: missing channel_id")?
|
|
.to_vec();
|
|
let was_new = reader.get_was_new();
|
|
|
|
Ok((channel_id, was_new))
|
|
}
|
|
|
|
/// Upload a single chunk of a blob to the server.
|
|
///
|
|
/// `blob_hash` is the expected SHA-256 hash (32 bytes) of the complete blob.
|
|
/// Returns the `blob_id` once the server has received and verified the final chunk.
|
|
pub async fn upload_blob_chunk(
|
|
client: &node_service::Client,
|
|
blob_hash: &[u8],
|
|
chunk: &[u8],
|
|
offset: u64,
|
|
total_size: u64,
|
|
mime_type: &str,
|
|
) -> anyhow::Result<Vec<u8>> {
|
|
let mut req = client.upload_blob_request();
|
|
{
|
|
let mut p = req.get();
|
|
let mut auth = p.reborrow().init_auth();
|
|
set_auth(&mut auth)?;
|
|
p.set_blob_hash(blob_hash);
|
|
p.set_chunk(chunk);
|
|
p.set_offset(offset);
|
|
p.set_total_size(total_size);
|
|
p.set_mime_type(mime_type);
|
|
}
|
|
let resp = req.send().promise.await.context("upload_blob RPC failed")?;
|
|
let blob_id = resp
|
|
.get()
|
|
.context("upload_blob: bad response")?
|
|
.get_blob_id()
|
|
.context("upload_blob: missing blob_id")?
|
|
.to_vec();
|
|
Ok(blob_id)
|
|
}
|
|
|
|
/// Download a single chunk of a blob from the server.
|
|
///
|
|
/// Returns `(chunk_bytes, total_size, mime_type)`.
|
|
pub async fn download_blob_chunk(
|
|
client: &node_service::Client,
|
|
blob_id: &[u8],
|
|
offset: u64,
|
|
length: u32,
|
|
) -> anyhow::Result<(Vec<u8>, u64, String)> {
|
|
let mut req = client.download_blob_request();
|
|
{
|
|
let mut p = req.get();
|
|
let mut auth = p.reborrow().init_auth();
|
|
set_auth(&mut auth)?;
|
|
p.set_blob_id(blob_id);
|
|
p.set_offset(offset);
|
|
p.set_length(length);
|
|
}
|
|
let resp = req.send().promise.await.context("download_blob RPC failed")?;
|
|
let reader = resp.get().context("download_blob: bad response")?;
|
|
let chunk = reader.get_chunk().context("download_blob: missing chunk")?.to_vec();
|
|
let total_size = reader.get_total_size();
|
|
let mime_type = reader
|
|
.get_mime_type()
|
|
.context("download_blob: missing mime_type")?
|
|
.to_str()
|
|
.unwrap_or("application/octet-stream")
|
|
.to_string();
|
|
Ok((chunk, total_size, mime_type))
|
|
}
|
|
|
|
/// Delete the authenticated user's account on the server.
|
|
/// Requires an identity-bound session (OPAQUE login).
|
|
pub async fn delete_account(
|
|
client: &node_service::Client,
|
|
) -> anyhow::Result<bool> {
|
|
let mut req = client.delete_account_request();
|
|
{
|
|
let mut p = req.get();
|
|
let mut auth = p.reborrow().init_auth();
|
|
set_auth(&mut auth)?;
|
|
}
|
|
|
|
let resp = req
|
|
.send()
|
|
.promise
|
|
.await
|
|
.context("delete_account RPC failed")?;
|
|
|
|
let success = resp
|
|
.get()
|
|
.context("delete_account: bad response")?
|
|
.get_success();
|
|
|
|
Ok(success)
|
|
}
|
|
|
|
/// Register a device for the authenticated identity.
|
|
pub async fn register_device(
|
|
client: &node_service::Client,
|
|
device_id: &[u8],
|
|
device_name: &str,
|
|
) -> anyhow::Result<bool> {
|
|
let mut req = client.register_device_request();
|
|
{
|
|
let mut p = req.get();
|
|
p.set_device_id(device_id);
|
|
p.set_device_name(device_name);
|
|
let mut auth = p.reborrow().init_auth();
|
|
set_auth(&mut auth)?;
|
|
}
|
|
|
|
let resp = req
|
|
.send()
|
|
.promise
|
|
.await
|
|
.context("register_device RPC failed")?;
|
|
|
|
let success = resp
|
|
.get()
|
|
.context("register_device: bad response")?
|
|
.get_success();
|
|
|
|
Ok(success)
|
|
}
|
|
|
|
/// List all registered devices for the authenticated identity.
|
|
pub async fn list_devices(
|
|
client: &node_service::Client,
|
|
) -> anyhow::Result<Vec<(Vec<u8>, String, u64)>> {
|
|
let mut req = client.list_devices_request();
|
|
{
|
|
let mut p = req.get();
|
|
let mut auth = p.reborrow().init_auth();
|
|
set_auth(&mut auth)?;
|
|
}
|
|
|
|
let resp = req
|
|
.send()
|
|
.promise
|
|
.await
|
|
.context("list_devices RPC failed")?;
|
|
|
|
let devices = resp
|
|
.get()
|
|
.context("list_devices: bad response")?
|
|
.get_devices()
|
|
.context("list_devices: missing devices field")?;
|
|
|
|
let mut result = Vec::with_capacity(devices.len() as usize);
|
|
for i in 0..devices.len() {
|
|
let entry = devices.get(i);
|
|
let device_id = entry
|
|
.get_device_id()
|
|
.context("list_devices: missing device_id")?
|
|
.to_vec();
|
|
let device_name = entry
|
|
.get_device_name()
|
|
.context("list_devices: missing device_name")?
|
|
.to_str()
|
|
.unwrap_or("")
|
|
.to_string();
|
|
let registered_at = entry.get_registered_at();
|
|
result.push((device_id, device_name, registered_at));
|
|
}
|
|
|
|
Ok(result)
|
|
}
|
|
|
|
/// Revoke (remove) a registered device.
|
|
pub async fn revoke_device(
|
|
client: &node_service::Client,
|
|
device_id: &[u8],
|
|
) -> anyhow::Result<bool> {
|
|
let mut req = client.revoke_device_request();
|
|
{
|
|
let mut p = req.get();
|
|
p.set_device_id(device_id);
|
|
let mut auth = p.reborrow().init_auth();
|
|
set_auth(&mut auth)?;
|
|
}
|
|
|
|
let resp = req
|
|
.send()
|
|
.promise
|
|
.await
|
|
.context("revoke_device RPC failed")?;
|
|
|
|
let success = resp
|
|
.get()
|
|
.context("revoke_device: bad response")?
|
|
.get_success();
|
|
|
|
Ok(success)
|
|
}
|
|
|
|
/// Return the current Unix timestamp in milliseconds.
|
|
pub fn current_timestamp_ms() -> u64 {
|
|
std::time::SystemTime::now()
|
|
.duration_since(std::time::UNIX_EPOCH)
|
|
.unwrap_or_default()
|
|
.as_millis() as u64
|
|
}
|