chore: rename project quicnprotochat -> quicproquo (binaries: qpq)
Rename the entire workspace:
- Crate packages: quicnprotochat-{core,proto,server,client,gui,p2p,mobile} -> quicproquo-*
- Binary names: quicnprotochat -> qpq, quicnprotochat-server -> qpq-server,
quicnprotochat-gui -> qpq-gui
- Default files: *-state.bin -> qpq-state.bin, *-server.toml -> qpq-server.toml,
*.db -> qpq.db
- Environment variable prefix: QUICNPROTOCHAT_* -> QPQ_*
- App identifier: chat.quicnproto.gui -> chat.quicproquo.gui
- Proto package: quicnprotochat.bench -> quicproquo.bench
- All documentation, Docker, CI, and script references updated
HKDF domain-separation strings and P2P ALPN remain unchanged for
backward compatibility with existing encrypted state and wire protocol.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
674
crates/quicproquo-server/src/node_service/delivery.rs
Normal file
674
crates/quicproquo-server/src/node_service/delivery.rs
Normal file
@@ -0,0 +1,674 @@
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use capnp::capability::Promise;
|
||||
use dashmap::DashMap;
|
||||
use quicproquo_proto::node_capnp::node_service;
|
||||
use tokio::sync::Notify;
|
||||
use tokio::time::timeout;
|
||||
|
||||
use crate::auth::{
|
||||
check_rate_limit, coded_error, fmt_hex, require_identity_or_request, validate_auth_context,
|
||||
};
|
||||
use crate::error_codes::*;
|
||||
use crate::metrics;
|
||||
use crate::storage::{StorageError, Store};
|
||||
|
||||
use super::{NodeServiceImpl, CURRENT_WIRE_VERSION};
|
||||
|
||||
// Audit events here must not include secrets: no payload content, no full recipient/token bytes (prefix only).
|
||||
|
||||
const MAX_PAYLOAD_BYTES: usize = 5 * 1024 * 1024; // 5 MB cap per message
|
||||
const MAX_QUEUE_DEPTH: usize = 1000;
|
||||
|
||||
fn storage_err(err: StorageError) -> capnp::Error {
|
||||
coded_error(E009_STORAGE_ERROR, err)
|
||||
}
|
||||
|
||||
pub fn fill_payloads_wait(
|
||||
results: &mut node_service::FetchWaitResults,
|
||||
messages: Vec<(u64, Vec<u8>)>,
|
||||
) {
|
||||
let mut list = results.get().init_payloads(messages.len() as u32);
|
||||
for (i, (seq, data)) in messages.iter().enumerate() {
|
||||
let mut entry = list.reborrow().get(i as u32);
|
||||
entry.set_seq(*seq);
|
||||
entry.set_data(data);
|
||||
}
|
||||
}
|
||||
|
||||
impl NodeServiceImpl {
|
||||
pub fn handle_enqueue(
|
||||
&mut self,
|
||||
params: node_service::EnqueueParams,
|
||||
mut results: node_service::EnqueueResults,
|
||||
) -> Promise<(), capnp::Error> {
|
||||
let p = match params.get() {
|
||||
Ok(p) => p,
|
||||
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
|
||||
};
|
||||
let recipient_key = match p.get_recipient_key() {
|
||||
Ok(v) => v.to_vec(),
|
||||
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
|
||||
};
|
||||
let payload = match p.get_payload() {
|
||||
Ok(v) => v.to_vec(),
|
||||
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
|
||||
};
|
||||
let channel_id = p.get_channel_id().unwrap_or_default().to_vec();
|
||||
let version = p.get_version();
|
||||
let auth_ctx = match validate_auth_context(&self.auth_cfg, &self.sessions, p.get_auth()) {
|
||||
Ok(ctx) => ctx,
|
||||
Err(e) => return Promise::err(e),
|
||||
};
|
||||
|
||||
if recipient_key.len() != 32 {
|
||||
return Promise::err(coded_error(
|
||||
E004_IDENTITY_KEY_LENGTH,
|
||||
format!("recipientKey must be exactly 32 bytes, got {}", recipient_key.len()),
|
||||
));
|
||||
}
|
||||
if payload.is_empty() {
|
||||
return Promise::err(coded_error(E005_PAYLOAD_EMPTY, "payload must not be empty"));
|
||||
}
|
||||
if payload.len() > MAX_PAYLOAD_BYTES {
|
||||
return Promise::err(coded_error(
|
||||
E006_PAYLOAD_TOO_LARGE,
|
||||
format!("payload exceeds max size ({} bytes)", MAX_PAYLOAD_BYTES),
|
||||
));
|
||||
}
|
||||
if version > CURRENT_WIRE_VERSION {
|
||||
return Promise::err(coded_error(
|
||||
E012_WIRE_VERSION,
|
||||
format!("wire version {} not supported (max {CURRENT_WIRE_VERSION})", version),
|
||||
));
|
||||
}
|
||||
|
||||
if let Err(e) = check_rate_limit(&self.rate_limits, &auth_ctx.token) {
|
||||
// Audit: rate limit hit — do not log token or identity.
|
||||
tracing::warn!("rate_limit_hit");
|
||||
metrics::record_rate_limit_hit_total();
|
||||
return Promise::err(e);
|
||||
}
|
||||
|
||||
// When sealed_sender is true, enqueue does not require identity; valid token only.
|
||||
// Otherwise, the sender must have an identity-bound session (but their identity
|
||||
// does NOT need to match the recipient — they're sending TO the recipient).
|
||||
if !self.sealed_sender {
|
||||
if let Err(e) = crate::auth::require_identity(&auth_ctx) {
|
||||
return Promise::err(e);
|
||||
}
|
||||
}
|
||||
|
||||
// DM channel authz: channel_id.len() == 16 means a created channel; caller and recipient must be the two members.
|
||||
if channel_id.len() == 16 {
|
||||
let members = match self.store.get_channel_members(&channel_id) {
|
||||
Ok(Some(m)) => m,
|
||||
Ok(None) => {
|
||||
return Promise::err(coded_error(E023_CHANNEL_NOT_FOUND, "channel not found"));
|
||||
}
|
||||
Err(e) => return Promise::err(storage_err(e)),
|
||||
};
|
||||
let caller = match crate::auth::require_identity(&auth_ctx) {
|
||||
Ok(id) => id,
|
||||
Err(e) => return Promise::err(e),
|
||||
};
|
||||
let (a, b) = &members;
|
||||
let caller_in = caller == a.as_slice() || caller == b.as_slice();
|
||||
let recipient_other = (recipient_key == *a && caller == b.as_slice())
|
||||
|| (recipient_key == *b && caller == a.as_slice());
|
||||
if !caller_in || !recipient_other {
|
||||
return Promise::err(coded_error(
|
||||
E022_CHANNEL_ACCESS_DENIED,
|
||||
"caller or recipient not a member of this channel",
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
match self.store.queue_depth(&recipient_key, &channel_id) {
|
||||
Ok(depth) if depth >= MAX_QUEUE_DEPTH => {
|
||||
return Promise::err(coded_error(
|
||||
E015_QUEUE_FULL,
|
||||
format!("queue depth {} exceeds limit {}", depth, MAX_QUEUE_DEPTH),
|
||||
));
|
||||
}
|
||||
Err(e) => return Promise::err(storage_err(e)),
|
||||
_ => {}
|
||||
}
|
||||
|
||||
let payload_len = payload.len();
|
||||
let seq = match self
|
||||
.store
|
||||
.enqueue(&recipient_key, &channel_id, payload)
|
||||
.map_err(storage_err)
|
||||
{
|
||||
Ok(seq) => seq,
|
||||
Err(e) => return Promise::err(e),
|
||||
};
|
||||
|
||||
results.get().set_seq(seq);
|
||||
|
||||
// Metrics and audit. Audit events must not include secrets (no payload, no full keys).
|
||||
metrics::record_enqueue_total();
|
||||
metrics::record_enqueue_bytes(payload_len as u64);
|
||||
if let Ok(depth) = self.store.queue_depth(&recipient_key, &channel_id) {
|
||||
metrics::record_delivery_queue_depth(depth);
|
||||
}
|
||||
tracing::info!(
|
||||
recipient_prefix = %fmt_hex(&recipient_key[..4]),
|
||||
payload_len = payload_len,
|
||||
seq = seq,
|
||||
"audit: enqueue"
|
||||
);
|
||||
|
||||
crate::auth::waiter(&self.waiters, &recipient_key).notify_waiters();
|
||||
|
||||
Promise::ok(())
|
||||
}
|
||||
|
||||
pub fn handle_fetch(
|
||||
&mut self,
|
||||
params: node_service::FetchParams,
|
||||
mut results: node_service::FetchResults,
|
||||
) -> Promise<(), capnp::Error> {
|
||||
let recipient_key = match params.get() {
|
||||
Ok(p) => match p.get_recipient_key() {
|
||||
Ok(v) => v.to_vec(),
|
||||
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
|
||||
},
|
||||
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
|
||||
};
|
||||
let channel_id = params
|
||||
.get()
|
||||
.ok()
|
||||
.and_then(|p| p.get_channel_id().ok())
|
||||
.map(|c| c.to_vec())
|
||||
.unwrap_or_default();
|
||||
let version = params
|
||||
.get()
|
||||
.ok()
|
||||
.map(|p| p.get_version())
|
||||
.unwrap_or(CURRENT_WIRE_VERSION);
|
||||
let limit = params.get().ok().map(|p| p.get_limit()).unwrap_or(0);
|
||||
let auth_ctx = match params
|
||||
.get()
|
||||
.ok()
|
||||
.map(|p| validate_auth_context(&self.auth_cfg, &self.sessions, p.get_auth()))
|
||||
.transpose()
|
||||
{
|
||||
Ok(ctx) => ctx,
|
||||
Err(e) => return Promise::err(e),
|
||||
};
|
||||
|
||||
if recipient_key.len() != 32 {
|
||||
return Promise::err(coded_error(
|
||||
E004_IDENTITY_KEY_LENGTH,
|
||||
format!("recipientKey must be exactly 32 bytes, got {}", recipient_key.len()),
|
||||
));
|
||||
}
|
||||
if version > CURRENT_WIRE_VERSION {
|
||||
return Promise::err(coded_error(
|
||||
E012_WIRE_VERSION,
|
||||
format!("wire version {} not supported (max {CURRENT_WIRE_VERSION})", version),
|
||||
));
|
||||
}
|
||||
|
||||
let auth_ctx = match auth_ctx {
|
||||
Some(ctx) => ctx,
|
||||
None => return Promise::err(coded_error(E003_INVALID_TOKEN, "auth required")),
|
||||
};
|
||||
|
||||
if let Err(e) = require_identity_or_request(
|
||||
&auth_ctx,
|
||||
&recipient_key,
|
||||
self.auth_cfg.allow_insecure_identity_from_request,
|
||||
) {
|
||||
return Promise::err(e);
|
||||
}
|
||||
|
||||
if channel_id.len() == 16 {
|
||||
let members = match self.store.get_channel_members(&channel_id) {
|
||||
Ok(Some(m)) => m,
|
||||
Ok(None) => {
|
||||
return Promise::err(coded_error(E023_CHANNEL_NOT_FOUND, "channel not found"));
|
||||
}
|
||||
Err(e) => return Promise::err(storage_err(e)),
|
||||
};
|
||||
let caller = match crate::auth::require_identity(&auth_ctx) {
|
||||
Ok(id) => id,
|
||||
Err(e) => return Promise::err(e),
|
||||
};
|
||||
let (a, b) = &members;
|
||||
let caller_in = caller == a.as_slice() || caller == b.as_slice();
|
||||
let recipient_other = (recipient_key.as_slice() == a.as_slice() && caller == b.as_slice())
|
||||
|| (recipient_key.as_slice() == b.as_slice() && caller == a.as_slice());
|
||||
if !caller_in || !recipient_other {
|
||||
return Promise::err(coded_error(
|
||||
E022_CHANNEL_ACCESS_DENIED,
|
||||
"caller or recipient not a member of this channel",
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
let messages = if limit > 0 {
|
||||
match self
|
||||
.store
|
||||
.fetch_limited(&recipient_key, &channel_id, limit as usize)
|
||||
.map_err(storage_err)
|
||||
{
|
||||
Ok(m) => m,
|
||||
Err(e) => return Promise::err(e),
|
||||
}
|
||||
} else {
|
||||
match self
|
||||
.store
|
||||
.fetch(&recipient_key, &channel_id)
|
||||
.map_err(storage_err)
|
||||
{
|
||||
Ok(m) => m,
|
||||
Err(e) => return Promise::err(e),
|
||||
}
|
||||
};
|
||||
|
||||
// Audit: fetch — do not log payload or full keys.
|
||||
metrics::record_fetch_total();
|
||||
tracing::info!(
|
||||
recipient_prefix = %fmt_hex(&recipient_key[..4]),
|
||||
count = messages.len(),
|
||||
"audit: fetch"
|
||||
);
|
||||
|
||||
let mut list = results.get().init_payloads(messages.len() as u32);
|
||||
for (i, (seq, data)) in messages.iter().enumerate() {
|
||||
let mut entry = list.reborrow().get(i as u32);
|
||||
entry.set_seq(*seq);
|
||||
entry.set_data(data);
|
||||
}
|
||||
|
||||
Promise::ok(())
|
||||
}
|
||||
|
||||
pub fn handle_fetch_wait(
|
||||
&mut self,
|
||||
params: node_service::FetchWaitParams,
|
||||
mut results: node_service::FetchWaitResults,
|
||||
) -> Promise<(), capnp::Error> {
|
||||
let p = match params.get() {
|
||||
Ok(p) => p,
|
||||
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
|
||||
};
|
||||
let recipient_key = match p.get_recipient_key() {
|
||||
Ok(v) => v.to_vec(),
|
||||
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
|
||||
};
|
||||
let channel_id = p.get_channel_id().unwrap_or_default().to_vec();
|
||||
let version = p.get_version();
|
||||
let timeout_ms = p.get_timeout_ms();
|
||||
let limit = p.get_limit();
|
||||
let auth_ctx = match validate_auth_context(&self.auth_cfg, &self.sessions, p.get_auth()) {
|
||||
Ok(ctx) => ctx,
|
||||
Err(e) => return Promise::err(e),
|
||||
};
|
||||
|
||||
if recipient_key.len() != 32 {
|
||||
return Promise::err(coded_error(
|
||||
E004_IDENTITY_KEY_LENGTH,
|
||||
format!("recipientKey must be exactly 32 bytes, got {}", recipient_key.len()),
|
||||
));
|
||||
}
|
||||
if version > CURRENT_WIRE_VERSION {
|
||||
return Promise::err(coded_error(
|
||||
E012_WIRE_VERSION,
|
||||
format!("wire version {} not supported (max {CURRENT_WIRE_VERSION})", version),
|
||||
));
|
||||
}
|
||||
|
||||
if let Err(e) = require_identity_or_request(
|
||||
&auth_ctx,
|
||||
&recipient_key,
|
||||
self.auth_cfg.allow_insecure_identity_from_request,
|
||||
) {
|
||||
return Promise::err(e);
|
||||
}
|
||||
|
||||
if channel_id.len() == 16 {
|
||||
let members = match self.store.get_channel_members(&channel_id) {
|
||||
Ok(Some(m)) => m,
|
||||
Ok(None) => {
|
||||
return Promise::err(coded_error(E023_CHANNEL_NOT_FOUND, "channel not found"));
|
||||
}
|
||||
Err(e) => return Promise::err(storage_err(e)),
|
||||
};
|
||||
let caller = match crate::auth::require_identity(&auth_ctx) {
|
||||
Ok(id) => id,
|
||||
Err(e) => return Promise::err(e),
|
||||
};
|
||||
let (a, b) = &members;
|
||||
let caller_in = caller == a.as_slice() || caller == b.as_slice();
|
||||
let recipient_other = (recipient_key.as_slice() == a.as_slice() && caller == b.as_slice())
|
||||
|| (recipient_key.as_slice() == b.as_slice() && caller == a.as_slice());
|
||||
if !caller_in || !recipient_other {
|
||||
return Promise::err(coded_error(
|
||||
E022_CHANNEL_ACCESS_DENIED,
|
||||
"caller or recipient not a member of this channel",
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
let store = Arc::clone(&self.store);
|
||||
let waiters: Arc<DashMap<Vec<u8>, Arc<Notify>>> = self.waiters.clone();
|
||||
|
||||
Promise::from_future(async move {
|
||||
let fetch_fn = |s: &Arc<dyn Store>, rk: &[u8], ch: &[u8], lim: u32| -> Result<Vec<(u64, Vec<u8>)>, capnp::Error> {
|
||||
if lim > 0 {
|
||||
s.fetch_limited(rk, ch, lim as usize).map_err(storage_err)
|
||||
} else {
|
||||
s.fetch(rk, ch).map_err(storage_err)
|
||||
}
|
||||
};
|
||||
|
||||
let messages = fetch_fn(&store, &recipient_key, &channel_id, limit)?;
|
||||
|
||||
if messages.is_empty() && timeout_ms > 0 {
|
||||
let waiter = waiters
|
||||
.entry(recipient_key.clone())
|
||||
.or_insert_with(|| Arc::new(Notify::new()))
|
||||
.clone();
|
||||
let _ = timeout(Duration::from_millis(timeout_ms), waiter.notified()).await;
|
||||
let msgs = fetch_fn(&store, &recipient_key, &channel_id, limit)?;
|
||||
fill_payloads_wait(&mut results, msgs);
|
||||
metrics::record_fetch_wait_total();
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
fill_payloads_wait(&mut results, messages);
|
||||
metrics::record_fetch_wait_total();
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
pub fn handle_peek(
|
||||
&mut self,
|
||||
params: node_service::PeekParams,
|
||||
mut results: node_service::PeekResults,
|
||||
) -> Promise<(), capnp::Error> {
|
||||
let p = match params.get() {
|
||||
Ok(p) => p,
|
||||
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
|
||||
};
|
||||
let recipient_key = match p.get_recipient_key() {
|
||||
Ok(v) => v.to_vec(),
|
||||
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
|
||||
};
|
||||
let channel_id = p.get_channel_id().unwrap_or_default().to_vec();
|
||||
let version = p.get_version();
|
||||
let limit = p.get_limit();
|
||||
let auth_ctx = match validate_auth_context(&self.auth_cfg, &self.sessions, p.get_auth()) {
|
||||
Ok(ctx) => ctx,
|
||||
Err(e) => return Promise::err(e),
|
||||
};
|
||||
|
||||
if recipient_key.len() != 32 {
|
||||
return Promise::err(coded_error(
|
||||
E004_IDENTITY_KEY_LENGTH,
|
||||
format!("recipientKey must be exactly 32 bytes, got {}", recipient_key.len()),
|
||||
));
|
||||
}
|
||||
if version > CURRENT_WIRE_VERSION {
|
||||
return Promise::err(coded_error(
|
||||
E012_WIRE_VERSION,
|
||||
format!("wire version {} not supported (max {CURRENT_WIRE_VERSION})", version),
|
||||
));
|
||||
}
|
||||
|
||||
if let Err(e) = require_identity_or_request(
|
||||
&auth_ctx,
|
||||
&recipient_key,
|
||||
self.auth_cfg.allow_insecure_identity_from_request,
|
||||
) {
|
||||
return Promise::err(e);
|
||||
}
|
||||
|
||||
let messages = match self
|
||||
.store
|
||||
.peek(&recipient_key, &channel_id, limit as usize)
|
||||
.map_err(storage_err)
|
||||
{
|
||||
Ok(m) => m,
|
||||
Err(e) => return Promise::err(e),
|
||||
};
|
||||
|
||||
tracing::info!(
|
||||
recipient_prefix = %fmt_hex(&recipient_key[..4]),
|
||||
count = messages.len(),
|
||||
"audit: peek"
|
||||
);
|
||||
|
||||
let mut list = results.get().init_payloads(messages.len() as u32);
|
||||
for (i, (seq, data)) in messages.iter().enumerate() {
|
||||
let mut entry = list.reborrow().get(i as u32);
|
||||
entry.set_seq(*seq);
|
||||
entry.set_data(data);
|
||||
}
|
||||
|
||||
Promise::ok(())
|
||||
}
|
||||
|
||||
pub fn handle_ack(
|
||||
&mut self,
|
||||
params: node_service::AckParams,
|
||||
_results: node_service::AckResults,
|
||||
) -> Promise<(), capnp::Error> {
|
||||
let p = match params.get() {
|
||||
Ok(p) => p,
|
||||
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
|
||||
};
|
||||
let recipient_key = match p.get_recipient_key() {
|
||||
Ok(v) => v.to_vec(),
|
||||
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
|
||||
};
|
||||
let channel_id = p.get_channel_id().unwrap_or_default().to_vec();
|
||||
let version = p.get_version();
|
||||
let seq_up_to = p.get_seq_up_to();
|
||||
let auth_ctx = match validate_auth_context(&self.auth_cfg, &self.sessions, p.get_auth()) {
|
||||
Ok(ctx) => ctx,
|
||||
Err(e) => return Promise::err(e),
|
||||
};
|
||||
|
||||
if recipient_key.len() != 32 {
|
||||
return Promise::err(coded_error(
|
||||
E004_IDENTITY_KEY_LENGTH,
|
||||
format!("recipientKey must be exactly 32 bytes, got {}", recipient_key.len()),
|
||||
));
|
||||
}
|
||||
if version > CURRENT_WIRE_VERSION {
|
||||
return Promise::err(coded_error(
|
||||
E012_WIRE_VERSION,
|
||||
format!("wire version {} not supported (max {CURRENT_WIRE_VERSION})", version),
|
||||
));
|
||||
}
|
||||
|
||||
if let Err(e) = require_identity_or_request(
|
||||
&auth_ctx,
|
||||
&recipient_key,
|
||||
self.auth_cfg.allow_insecure_identity_from_request,
|
||||
) {
|
||||
return Promise::err(e);
|
||||
}
|
||||
|
||||
match self
|
||||
.store
|
||||
.ack(&recipient_key, &channel_id, seq_up_to)
|
||||
.map_err(storage_err)
|
||||
{
|
||||
Ok(removed) => {
|
||||
tracing::info!(
|
||||
recipient_prefix = %fmt_hex(&recipient_key[..4]),
|
||||
seq_up_to = seq_up_to,
|
||||
removed = removed,
|
||||
"audit: ack"
|
||||
);
|
||||
}
|
||||
Err(e) => return Promise::err(e),
|
||||
}
|
||||
|
||||
Promise::ok(())
|
||||
}
|
||||
|
||||
pub fn handle_batch_enqueue(
|
||||
&mut self,
|
||||
params: node_service::BatchEnqueueParams,
|
||||
mut results: node_service::BatchEnqueueResults,
|
||||
) -> Promise<(), capnp::Error> {
|
||||
let p = match params.get() {
|
||||
Ok(p) => p,
|
||||
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
|
||||
};
|
||||
let recipient_keys = match p.get_recipient_keys() {
|
||||
Ok(v) => v,
|
||||
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
|
||||
};
|
||||
let payload = match p.get_payload() {
|
||||
Ok(v) => v.to_vec(),
|
||||
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
|
||||
};
|
||||
let channel_id = p.get_channel_id().unwrap_or_default().to_vec();
|
||||
let version = p.get_version();
|
||||
let auth_ctx = match validate_auth_context(&self.auth_cfg, &self.sessions, p.get_auth()) {
|
||||
Ok(ctx) => ctx,
|
||||
Err(e) => return Promise::err(e),
|
||||
};
|
||||
|
||||
if payload.is_empty() {
|
||||
return Promise::err(coded_error(E005_PAYLOAD_EMPTY, "payload must not be empty"));
|
||||
}
|
||||
if payload.len() > MAX_PAYLOAD_BYTES {
|
||||
return Promise::err(coded_error(
|
||||
E006_PAYLOAD_TOO_LARGE,
|
||||
format!("payload exceeds max size ({} bytes)", MAX_PAYLOAD_BYTES),
|
||||
));
|
||||
}
|
||||
if version > CURRENT_WIRE_VERSION {
|
||||
return Promise::err(coded_error(
|
||||
E012_WIRE_VERSION,
|
||||
format!("wire version {} not supported (max {CURRENT_WIRE_VERSION})", version),
|
||||
));
|
||||
}
|
||||
|
||||
if let Err(e) = check_rate_limit(&self.rate_limits, &auth_ctx.token) {
|
||||
tracing::warn!("rate_limit_hit");
|
||||
metrics::record_rate_limit_hit_total();
|
||||
return Promise::err(e);
|
||||
}
|
||||
|
||||
// When sealed_sender is false, require an identity-bound session.
|
||||
if !self.sealed_sender {
|
||||
if let Err(e) = crate::auth::require_identity(&auth_ctx) {
|
||||
return Promise::err(e);
|
||||
}
|
||||
}
|
||||
|
||||
// DM channel authz: validate caller membership once before the loop.
|
||||
if channel_id.len() == 16 {
|
||||
let members = match self.store.get_channel_members(&channel_id) {
|
||||
Ok(Some(m)) => m,
|
||||
Ok(None) => {
|
||||
return Promise::err(coded_error(E023_CHANNEL_NOT_FOUND, "channel not found"));
|
||||
}
|
||||
Err(e) => return Promise::err(storage_err(e)),
|
||||
};
|
||||
let caller = match crate::auth::require_identity(&auth_ctx) {
|
||||
Ok(id) => id,
|
||||
Err(e) => return Promise::err(e),
|
||||
};
|
||||
let (a, b) = &members;
|
||||
let caller_in = caller == a.as_slice() || caller == b.as_slice();
|
||||
if !caller_in {
|
||||
return Promise::err(coded_error(
|
||||
E022_CHANNEL_ACCESS_DENIED,
|
||||
"caller is not a member of this channel",
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
let mut seqs = Vec::with_capacity(recipient_keys.len() as usize);
|
||||
for i in 0..recipient_keys.len() {
|
||||
let rk = match recipient_keys.get(i) {
|
||||
Ok(v) => v.to_vec(),
|
||||
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
|
||||
};
|
||||
if rk.len() != 32 {
|
||||
return Promise::err(coded_error(
|
||||
E004_IDENTITY_KEY_LENGTH,
|
||||
format!("recipientKey[{}] must be exactly 32 bytes, got {}", i, rk.len()),
|
||||
));
|
||||
}
|
||||
|
||||
// Per-recipient DM channel membership check.
|
||||
if channel_id.len() == 16 {
|
||||
let members = match self.store.get_channel_members(&channel_id) {
|
||||
Ok(Some(m)) => m,
|
||||
Ok(None) => {
|
||||
return Promise::err(coded_error(
|
||||
E023_CHANNEL_NOT_FOUND,
|
||||
"channel not found",
|
||||
));
|
||||
}
|
||||
Err(e) => return Promise::err(storage_err(e)),
|
||||
};
|
||||
let caller = match crate::auth::require_identity(&auth_ctx) {
|
||||
Ok(id) => id,
|
||||
Err(e) => return Promise::err(e),
|
||||
};
|
||||
let (a, b) = &members;
|
||||
let recipient_other = (rk == *a && caller == b.as_slice())
|
||||
|| (rk == *b && caller == a.as_slice());
|
||||
if !recipient_other {
|
||||
return Promise::err(coded_error(
|
||||
E022_CHANNEL_ACCESS_DENIED,
|
||||
"recipient is not a member of this channel",
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
match self.store.queue_depth(&rk, &channel_id) {
|
||||
Ok(depth) if depth >= MAX_QUEUE_DEPTH => {
|
||||
return Promise::err(coded_error(
|
||||
E015_QUEUE_FULL,
|
||||
format!("queue depth {} exceeds limit {}", depth, MAX_QUEUE_DEPTH),
|
||||
));
|
||||
}
|
||||
Err(e) => return Promise::err(storage_err(e)),
|
||||
_ => {}
|
||||
}
|
||||
|
||||
let seq = match self
|
||||
.store
|
||||
.enqueue(&rk, &channel_id, payload.clone())
|
||||
.map_err(storage_err)
|
||||
{
|
||||
Ok(seq) => seq,
|
||||
Err(e) => return Promise::err(e),
|
||||
};
|
||||
seqs.push(seq);
|
||||
|
||||
metrics::record_enqueue_total();
|
||||
metrics::record_enqueue_bytes(payload.len() as u64);
|
||||
|
||||
crate::auth::waiter(&self.waiters, &rk).notify_waiters();
|
||||
}
|
||||
|
||||
let mut list = results.get().init_seqs(seqs.len() as u32);
|
||||
for (i, seq) in seqs.iter().enumerate() {
|
||||
list.set(i as u32, *seq);
|
||||
}
|
||||
|
||||
tracing::info!(
|
||||
recipient_count = recipient_keys.len(),
|
||||
payload_len = payload.len(),
|
||||
"audit: batch_enqueue"
|
||||
);
|
||||
|
||||
Promise::ok(())
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user