Add channel-aware delivery and update roadmap

This commit is contained in:
2026-02-22 06:31:16 +01:00
parent 9a3fa94858
commit d1ddef4cea
11 changed files with 374 additions and 85 deletions

View File

@@ -8,7 +8,7 @@
//! # Architecture
//!
//! ```text
//! QUIC endpoint (7000)
//! QUIC endpoint (4201)
//! └─ TLS 1.3 handshake (self-signed by default)
//! └─ capnp-rpc VatNetwork (LocalSet, !Send)
//! └─ NodeServiceImpl (KeyPackage + Delivery queues)
@@ -22,7 +22,7 @@
//!
//! | Env var | CLI flag | Default |
//! |---------------------|----------------|-----------------|
//! | `QUICNPROTOCHAT_LISTEN` | `--listen` | `0.0.0.0:7000` |
//! | `QUICNPROTOCHAT_LISTEN` | `--listen` | `0.0.0.0:4201` |
//! | `RUST_LOG` | — | `info` |
use std::{fs, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};
@@ -37,6 +37,7 @@ use quinn::{Endpoint, ServerConfig};
use quinn_proto::crypto::rustls::QuicServerConfig;
use rcgen::generate_simple_self_signed;
use rustls::pki_types::{CertificateDer, PrivateKeyDer};
use rustls::version::TLS13;
use sha2::{Digest, Sha256};
use tokio::sync::Notify;
use tokio::time::timeout;
@@ -45,6 +46,10 @@ use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
mod storage;
use storage::{FileBackedStore, StorageError};
const MAX_PAYLOAD_BYTES: usize = 5 * 1024 * 1024; // 5 MB cap per message
const MAX_KEYPACKAGE_BYTES: usize = 1 * 1024 * 1024; // 1 MB cap per KeyPackage
const CURRENT_WIRE_VERSION: u16 = 1; // allow 0 (legacy) and 1 (current)
// ── CLI ───────────────────────────────────────────────────────────────────────
#[derive(Debug, Parser)]
@@ -55,7 +60,7 @@ use storage::{FileBackedStore, StorageError};
)]
struct Args {
/// QUIC listen address (host:port).
#[arg(long, default_value = "0.0.0.0:7000", env = "QUICNPROTOCHAT_LISTEN")]
#[arg(long, default_value = "0.0.0.0:4201", env = "QUICNPROTOCHAT_LISTEN")]
listen: String,
/// Directory for persisted server data (KeyPackages + delivery queues).
@@ -133,6 +138,12 @@ impl node_service::Server for NodeServiceImpl {
"package must not be empty".to_string(),
));
}
if package.len() > MAX_KEYPACKAGE_BYTES {
return Promise::err(capnp::Error::failed(format!(
"package exceeds max size ({} bytes)",
MAX_KEYPACKAGE_BYTES
)));
}
let fingerprint: Vec<u8> = Sha256::digest(&package).to_vec();
if let Err(e) = self
@@ -221,6 +232,8 @@ impl node_service::Server for NodeServiceImpl {
Ok(v) => v.to_vec(),
Err(e) => return Promise::err(capnp::Error::failed(format!("{e}"))),
};
let channel_id = p.get_channel_id().unwrap_or_default().to_vec();
let version = p.get_version();
if recipient_key.len() != 32 {
return Promise::err(capnp::Error::failed(format!(
@@ -233,10 +246,22 @@ impl node_service::Server for NodeServiceImpl {
"payload must not be empty".to_string(),
));
}
if payload.len() > MAX_PAYLOAD_BYTES {
return Promise::err(capnp::Error::failed(format!(
"payload exceeds max size ({} bytes)",
MAX_PAYLOAD_BYTES
)));
}
if version != 0 && version != CURRENT_WIRE_VERSION {
return Promise::err(capnp::Error::failed(format!(
"unsupported wire version {} (expected 0 or {CURRENT_WIRE_VERSION})",
version
)));
}
if let Err(e) = self
.store
.enqueue(&recipient_key, payload)
.enqueue(&recipient_key, &channel_id, payload)
.map_err(storage_err)
{
return Promise::err(e);
@@ -265,6 +290,17 @@ impl node_service::Server for NodeServiceImpl {
},
Err(e) => return Promise::err(capnp::Error::failed(format!("{e}"))),
};
let channel_id = params
.get()
.ok()
.and_then(|p| p.get_channel_id().ok())
.map(|c| c.to_vec())
.unwrap_or_default();
let version = params
.get()
.ok()
.map(|p| p.get_version())
.unwrap_or(0);
if recipient_key.len() != 32 {
return Promise::err(capnp::Error::failed(format!(
@@ -272,8 +308,18 @@ impl node_service::Server for NodeServiceImpl {
recipient_key.len()
)));
}
if version != 0 && version != CURRENT_WIRE_VERSION {
return Promise::err(capnp::Error::failed(format!(
"unsupported wire version {} (expected 0 or {CURRENT_WIRE_VERSION})",
version
)));
}
let messages = match self.store.fetch(&recipient_key).map_err(storage_err) {
let messages = match self
.store
.fetch(&recipient_key, &channel_id)
.map_err(storage_err)
{
Ok(m) => m,
Err(e) => return Promise::err(e),
};
@@ -306,6 +352,8 @@ impl node_service::Server for NodeServiceImpl {
Ok(v) => v.to_vec(),
Err(e) => return Promise::err(capnp::Error::failed(format!("{e}"))),
};
let channel_id = p.get_channel_id().unwrap_or_default().to_vec();
let version = p.get_version();
let timeout_ms = p.get_timeout_ms();
if recipient_key.len() != 32 {
@@ -314,12 +362,20 @@ impl node_service::Server for NodeServiceImpl {
recipient_key.len()
)));
}
if version != 0 && version != CURRENT_WIRE_VERSION {
return Promise::err(capnp::Error::failed(format!(
"unsupported wire version {} (expected 0 or {CURRENT_WIRE_VERSION})",
version
)));
}
let store = Arc::clone(&self.store);
let waiters = self.waiters.clone();
Promise::from_future(async move {
let messages = store.fetch(&recipient_key).map_err(storage_err)?;
let messages = store
.fetch(&recipient_key, &channel_id)
.map_err(storage_err)?;
if messages.is_empty() && timeout_ms > 0 {
let waiter = waiters
@@ -327,7 +383,9 @@ impl node_service::Server for NodeServiceImpl {
.or_insert_with(|| Arc::new(Notify::new()))
.clone();
let _ = timeout(Duration::from_millis(timeout_ms), waiter.notified()).await;
let msgs = store.fetch(&recipient_key).map_err(storage_err)?;
let msgs = store
.fetch(&recipient_key, &channel_id)
.map_err(storage_err)?;
fill_payloads_wait(&mut results, msgs);
return Ok(());
}
@@ -467,7 +525,7 @@ fn build_server_config(cert_path: &PathBuf, key_path: &PathBuf) -> anyhow::Resul
let cert_chain = vec![CertificateDer::from(cert_bytes)];
let key = PrivateKeyDer::try_from(key_bytes).map_err(|_| anyhow::anyhow!("invalid key"))?;
let mut tls = rustls::ServerConfig::builder()
let mut tls = rustls::ServerConfig::builder_with_protocol_versions(&[&TLS13])
.with_no_client_auth()
.with_single_cert(cert_chain, key)?;
tls.alpn_protocols = vec![b"capnp".to_vec()];

View File

@@ -1,6 +1,7 @@
use std::{
collections::{HashMap, VecDeque},
fs,
hash::{Hash, Hasher},
path::{Path, PathBuf},
sync::Mutex,
};
@@ -16,10 +17,28 @@ pub enum StorageError {
}
#[derive(Serialize, Deserialize, Default)]
struct QueueMap {
struct QueueMapV1 {
map: HashMap<Vec<u8>, VecDeque<Vec<u8>>>,
}
#[derive(Serialize, Deserialize, Clone, Eq, PartialEq, Debug)]
pub struct ChannelKey {
pub channel_id: Vec<u8>,
pub recipient_key: Vec<u8>,
}
impl Hash for ChannelKey {
fn hash<H: Hasher>(&self, state: &mut H) {
self.channel_id.hash(state);
self.recipient_key.hash(state);
}
}
#[derive(Serialize, Deserialize, Default)]
struct QueueMapV2 {
map: HashMap<ChannelKey, VecDeque<Vec<u8>>>,
}
/// File-backed storage for KeyPackages and delivery queues.
///
/// Each mutation flushes the entire map to disk. Suitable for MVP-scale loads.
@@ -27,7 +46,7 @@ pub struct FileBackedStore {
kp_path: PathBuf,
ds_path: PathBuf,
key_packages: Mutex<HashMap<Vec<u8>, VecDeque<Vec<u8>>>>,
deliveries: Mutex<HashMap<Vec<u8>, VecDeque<Vec<u8>>>>,
deliveries: Mutex<HashMap<ChannelKey, VecDeque<Vec<u8>>>>,
}
impl FileBackedStore {
@@ -69,25 +88,42 @@ impl FileBackedStore {
Ok(package)
}
pub fn enqueue(&self, recipient_key: &[u8], payload: Vec<u8>) -> Result<(), StorageError> {
pub fn enqueue(
&self,
recipient_key: &[u8],
channel_id: &[u8],
payload: Vec<u8>,
) -> Result<(), StorageError> {
let mut map = self.deliveries.lock().unwrap();
map.entry(recipient_key.to_vec())
let key = ChannelKey {
channel_id: channel_id.to_vec(),
recipient_key: recipient_key.to_vec(),
};
map.entry(key)
.or_default()
.push_back(payload);
self.flush_map(&self.ds_path, &*map)
}
pub fn fetch(&self, recipient_key: &[u8]) -> Result<Vec<Vec<u8>>, StorageError> {
pub fn fetch(
&self,
recipient_key: &[u8],
channel_id: &[u8],
) -> Result<Vec<Vec<u8>>, StorageError> {
let mut map = self.deliveries.lock().unwrap();
let key = ChannelKey {
channel_id: channel_id.to_vec(),
recipient_key: recipient_key.to_vec(),
};
let messages = map
.get_mut(recipient_key)
.get_mut(&key)
.map(|q| q.drain(..).collect())
.unwrap_or_default();
self.flush_map(&self.ds_path, &*map)?;
Ok(messages)
}
fn load_map(path: &Path) -> Result<HashMap<Vec<u8>, VecDeque<Vec<u8>>>, StorageError> {
fn load_map(path: &Path) -> Result<HashMap<ChannelKey, VecDeque<Vec<u8>>>, StorageError> {
if !path.exists() {
return Ok(HashMap::new());
}
@@ -95,16 +131,30 @@ impl FileBackedStore {
if bytes.is_empty() {
return Ok(HashMap::new());
}
let map: QueueMap = bincode::deserialize(&bytes).map_err(|_| StorageError::Serde)?;
Ok(map.map)
// Try v2 format (channel-aware). Fallback to legacy v1.
if let Ok(map) = bincode::deserialize::<QueueMapV2>(&bytes) {
return Ok(map.map);
}
let legacy: QueueMapV1 = bincode::deserialize(&bytes).map_err(|_| StorageError::Serde)?;
let mut upgraded = HashMap::new();
for (recipient_key, queue) in legacy.map.into_iter() {
upgraded.insert(
ChannelKey {
channel_id: Vec::new(),
recipient_key,
},
queue,
);
}
Ok(upgraded)
}
fn flush_map(
&self,
path: &Path,
map: &HashMap<Vec<u8>, VecDeque<Vec<u8>>>,
map: &HashMap<ChannelKey, VecDeque<Vec<u8>>>,
) -> Result<(), StorageError> {
let payload = QueueMap { map: map.clone() };
let payload = QueueMapV2 { map: map.clone() };
let bytes = bincode::serialize(&payload).map_err(|_| StorageError::Serde)?;
if let Some(parent) = path.parent() {
fs::create_dir_all(parent).map_err(|e| StorageError::Io(e.to_string()))?;