From 3f5a3a5ac8868195d6b25db87f9b25f8993ed9aa Mon Sep 17 00:00:00 2001 From: Christian Nennemann Date: Wed, 4 Mar 2026 20:59:59 +0100 Subject: [PATCH] feat: add WebTransport (HTTP/3) server endpoint for browser clients Feature-gated behind --features webtransport. Uses h3, h3-quinn, and h3-webtransport crates to accept WebTransport sessions over HTTP/3. Dispatches RPC through the same v2 handler registry as native QUIC, using identical wire framing. - webtransport.rs: H3 connection handling, session management, bidi stream RPC dispatch with auth handshake - Config: --webtransport-listen / QPQ_WEBTRANSPORT_LISTEN - ALPN: "h3" for WebTransport, "capnp" for native QUIC - Also fixes: add missing save/load_revocation_log to SqlStore --- crates/quicproquo-server/Cargo.toml | 2 +- crates/quicproquo-server/src/config.rs | 10 + crates/quicproquo-server/src/main.rs | 55 +++ crates/quicproquo-server/src/sql_store.rs | 21 + crates/quicproquo-server/src/webtransport.rs | 420 +++++++++++++++++++ 5 files changed, 507 insertions(+), 1 deletion(-) create mode 100644 crates/quicproquo-server/src/webtransport.rs diff --git a/crates/quicproquo-server/Cargo.toml b/crates/quicproquo-server/Cargo.toml index dddb99b..2dc90ae 100644 --- a/crates/quicproquo-server/Cargo.toml +++ b/crates/quicproquo-server/Cargo.toml @@ -79,7 +79,7 @@ mdns-sd = "0.12" # WebTransport (HTTP/3) — feature-gated, for browser clients. h3 = { version = "0.0.8", optional = true } -h3-quinn = { version = "0.0.10", optional = true } +h3-quinn = { version = "0.0.10", features = ["datagram"], optional = true } h3-webtransport = { version = "0.1", optional = true } http = { version = "1", optional = true } diff --git a/crates/quicproquo-server/src/config.rs b/crates/quicproquo-server/src/config.rs index d8ce3ff..52faafb 100644 --- a/crates/quicproquo-server/src/config.rs +++ b/crates/quicproquo-server/src/config.rs @@ -38,6 +38,8 @@ pub struct FileConfig { pub redact_logs: Option, /// WebSocket JSON-RPC bridge listen address (e.g. "0.0.0.0:9000"). pub ws_listen: Option, + /// WebTransport (HTTP/3) listen address for browser clients (e.g. "0.0.0.0:7443"). + pub webtransport_listen: Option, /// Graceful shutdown drain timeout in seconds. pub drain_timeout_secs: Option, /// Default per-RPC timeout in seconds. @@ -70,6 +72,8 @@ pub struct EffectiveConfig { pub redact_logs: bool, /// WebSocket JSON-RPC bridge listen address. If set, the bridge is started. pub ws_listen: Option, + /// WebTransport (HTTP/3) listen address. If set, the WebTransport endpoint is started. + pub webtransport_listen: Option, /// Graceful shutdown drain timeout in seconds. pub drain_timeout_secs: u64, /// Default per-RPC timeout in seconds. @@ -250,6 +254,11 @@ pub fn merge_config(args: &crate::Args, file: &FileConfig) -> EffectiveConfig { .clone() .or_else(|| file.ws_listen.clone()); + let webtransport_listen = args + .webtransport_listen + .clone() + .or_else(|| file.webtransport_listen.clone()); + let drain_timeout_secs = if args.drain_timeout == DEFAULT_DRAIN_TIMEOUT_SECS { file.drain_timeout_secs.unwrap_or(DEFAULT_DRAIN_TIMEOUT_SECS) } else { @@ -283,6 +292,7 @@ pub fn merge_config(args: &crate::Args, file: &FileConfig) -> EffectiveConfig { plugin_dir, redact_logs, ws_listen, + webtransport_listen, drain_timeout_secs, rpc_timeout_secs, storage_timeout_secs, diff --git a/crates/quicproquo-server/src/main.rs b/crates/quicproquo-server/src/main.rs index d533173..cead67f 100644 --- a/crates/quicproquo-server/src/main.rs +++ b/crates/quicproquo-server/src/main.rs @@ -31,6 +31,8 @@ mod tls; mod storage; pub mod v2_handlers; mod ws_bridge; +#[cfg(feature = "webtransport")] +mod webtransport; use auth::{AuthConfig, PendingLogin, RateEntry, SessionInfo}; use config::{ @@ -128,6 +130,11 @@ struct Args { #[arg(long, env = "QPQ_WS_LISTEN")] ws_listen: Option, + /// WebTransport (HTTP/3) listen address for browser clients (e.g. 0.0.0.0:7443). + /// Requires --features webtransport. + #[arg(long, env = "QPQ_WEBTRANSPORT_LISTEN")] + webtransport_listen: Option, + /// Graceful shutdown drain timeout in seconds (default: 30). In-flight RPCs get this /// long to finish after a shutdown signal before connections are forcefully closed. #[arg(long, env = "QPQ_DRAIN_TIMEOUT", default_value_t = config::DEFAULT_DRAIN_TIMEOUT_SECS)] @@ -367,6 +374,54 @@ async fn main() -> anyhow::Result<()> { ws_bridge::spawn_ws_bridge(ws_addr, ws_state); } + // ── WebTransport (HTTP/3) endpoint ───────────────────────────────────── + #[cfg(feature = "webtransport")] + if let Some(wt_addr_str) = &effective.webtransport_listen { + let wt_addr: SocketAddr = wt_addr_str + .parse() + .context("--webtransport-listen must be host:port (e.g. 0.0.0.0:7443)")?; + + let wt_server_config = webtransport::build_webtransport_server_config( + &effective.tls_cert, + &effective.tls_key, + ) + .context("build WebTransport server config")?; + + let wt_state = Arc::new(v2_handlers::ServerState { + store: Arc::clone(&store), + waiters: Arc::clone(&waiters), + auth_cfg: Arc::clone(&auth_cfg), + opaque_setup: Arc::clone(&opaque_setup), + pending_logins: Arc::clone(&pending_logins), + sessions: Arc::clone(&sessions), + rate_limits: Arc::clone(&rate_limits), + sealed_sender: effective.sealed_sender, + hooks: Arc::clone(&hooks), + signing_key: Arc::clone(&signing_key), + kt_log: Arc::clone(&kt_log), + revocation_log: Arc::new(std::sync::Mutex::new( + quicproquo_kt::RevocationLog::new(), + )), + data_dir: PathBuf::from(&effective.data_dir), + redact_logs: effective.redact_logs, + audit_logger: Arc::new(audit::NoopAuditLogger), + draining: Arc::new(std::sync::atomic::AtomicBool::new(false)), + seen_message_ids: Arc::new(DashMap::new()), + banned_users: Arc::new(DashMap::new()), + moderation_reports: Arc::new(std::sync::Mutex::new(Vec::new())), + node_id: format!("wt-{}", hex::encode(&signing_key.public_key_bytes()[..4])), + start_time: std::time::Instant::now(), + storage_backend: effective.store_backend.clone(), + }); + + let wt_registry = Arc::new(v2_handlers::build_registry( + std::time::Duration::from_secs(effective.rpc_timeout_secs), + )); + + webtransport::spawn_webtransport_listener(wt_addr, wt_server_config, wt_state, wt_registry) + .context("spawn WebTransport listener")?; + } + let endpoint = Endpoint::server(server_config, listen)?; tracing::info!( diff --git a/crates/quicproquo-server/src/sql_store.rs b/crates/quicproquo-server/src/sql_store.rs index f0b7bf5..145da03 100644 --- a/crates/quicproquo-server/src/sql_store.rs +++ b/crates/quicproquo-server/src/sql_store.rs @@ -416,6 +416,27 @@ impl Store for SqlStore { .map_err(|e| StorageError::Db(e.to_string())) } + fn save_revocation_log(&self, bytes: Vec) -> Result<(), StorageError> { + let conn = self.get_conn()?; + conn.execute( + "INSERT OR REPLACE INTO kt_log (id, log_data) VALUES (2, ?1)", + params![bytes], + ) + .map_err(|e| StorageError::Db(e.to_string()))?; + Ok(()) + } + + fn load_revocation_log(&self) -> Result>, StorageError> { + let conn = self.get_conn()?; + let mut stmt = conn + .prepare("SELECT log_data FROM kt_log WHERE id = 2") + .map_err(|e| StorageError::Db(e.to_string()))?; + + stmt.query_row([], |row| row.get(0)) + .optional() + .map_err(|e| StorageError::Db(e.to_string())) + } + fn store_user_record(&self, username: &str, record: Vec) -> Result<(), StorageError> { let conn = self.get_conn()?; conn.execute( diff --git a/crates/quicproquo-server/src/webtransport.rs b/crates/quicproquo-server/src/webtransport.rs new file mode 100644 index 0000000..a32dfec --- /dev/null +++ b/crates/quicproquo-server/src/webtransport.rs @@ -0,0 +1,420 @@ +//! WebTransport server endpoint for browser clients. +//! +//! Accepts HTTP/3 WebTransport sessions and dispatches RPC requests through the +//! same v2 handler registry as the native QUIC endpoint. Browsers connect via: +//! +//! ```js +//! const wt = new WebTransport("https://server:7443"); +//! ``` +//! +//! Each WebTransport bidirectional stream carries a single RPC request/response +//! using the same wire format as the native QUIC transport: +//! +//! ```text +//! [method_id: u16][request_id: u32][payload_len: u32][protobuf bytes] +//! ``` + +use std::sync::Arc; + +use bytes::BytesMut; +use h3_quinn::quinn; +use h3_webtransport::server::AcceptedBi; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tracing::{debug, info, warn}; + +use crate::v2_handlers::ServerState; +use quicproquo_rpc::error::RpcStatus; +use quicproquo_rpc::framing::{RequestFrame, ResponseFrame}; +use quicproquo_rpc::method::{HandlerResult, MethodRegistry, RequestContext}; + +/// Concrete H3 connection type. +type H3Conn = h3::server::Connection; + +/// Concrete request resolver type. +type H3Resolver = h3::server::RequestResolver; + +/// Type alias for the concrete WebTransport session type used by this server. +type WtSession = + h3_webtransport::server::WebTransportSession; + +/// Type alias for the concrete WebTransport bidi stream. +type WtBidiStream = h3_webtransport::stream::BidiStream< + h3_quinn::BidiStream, + bytes::Bytes, +>; + +/// Start the WebTransport listener in a background task. +/// +/// The endpoint uses the provided quinn `ServerConfig` (with "h3" ALPN) and +/// binds to `listen_addr`. Incoming HTTP/3 CONNECT requests are upgraded to +/// WebTransport sessions. +pub fn spawn_webtransport_listener( + listen_addr: std::net::SocketAddr, + server_config: quinn::ServerConfig, + state: Arc, + registry: Arc>, +) -> anyhow::Result<()> { + let endpoint = quinn::Endpoint::server(server_config, listen_addr) + .map_err(|e| anyhow::anyhow!("bind WebTransport endpoint {listen_addr}: {e}"))?; + + info!(addr = %listen_addr, "WebTransport endpoint listening"); + + tokio::spawn(async move { + accept_loop(endpoint, state, registry).await; + }); + + Ok(()) +} + +/// Accept QUIC connections and upgrade them to HTTP/3 + WebTransport. +async fn accept_loop( + endpoint: quinn::Endpoint, + state: Arc, + registry: Arc>, +) { + while let Some(incoming) = endpoint.accept().await { + let state = Arc::clone(&state); + let registry = Arc::clone(®istry); + + tokio::spawn(async move { + let connection = match incoming.await { + Ok(c) => c, + Err(e) => { + warn!(error = %e, "WebTransport: QUIC accept failed"); + return; + } + }; + + let remote = connection.remote_address(); + debug!(remote = %remote, "WebTransport: new QUIC connection"); + metrics::counter!("webtransport_connections_total").increment(1); + metrics::gauge!("webtransport_active_connections").increment(1.0); + + if let Err(e) = handle_h3_connection(connection, state, registry).await { + debug!(remote = %remote, error = %e, "WebTransport: session error"); + } + + metrics::gauge!("webtransport_active_connections").decrement(1.0); + }); + } +} + +/// Handle an HTTP/3 connection: accept the WebTransport CONNECT request and +/// process bidirectional streams as RPC calls. +async fn handle_h3_connection( + connection: quinn::Connection, + state: Arc, + registry: Arc>, +) -> anyhow::Result<()> { + let h3_quinn_conn = h3_quinn::Connection::new(connection); + let mut h3_conn: H3Conn = h3::server::builder() + .enable_webtransport(true) + .enable_extended_connect(true) + .enable_datagram(true) + .build::(h3_quinn_conn) + .await + .map_err(|e| anyhow::anyhow!("H3 connection setup: {e}"))?; + + // Accept HTTP/3 requests until we get a CONNECT for WebTransport. + loop { + let resolver: H3Resolver = match h3_conn.accept().await { + Ok(Some(r)) => r, + Ok(None) => { + debug!("WebTransport: H3 connection closed"); + return Ok(()); + } + Err(e) => { + return Err(anyhow::anyhow!("WebTransport: H3 accept error: {e}")); + } + }; + + let (request, stream) = resolver + .resolve_request() + .await + .map_err(|e| anyhow::anyhow!("resolve request: {e}"))?; + + let method = request.method().clone(); + let uri = request.uri().clone(); + + if method == http::Method::CONNECT { + debug!(uri = %uri, "WebTransport: CONNECT request"); + + let wt_session = h3_webtransport::server::WebTransportSession::accept( + request, stream, h3_conn, + ) + .await + .map_err(|e| anyhow::anyhow!("WebTransport session accept: {e}"))?; + + info!("WebTransport: session established"); + metrics::counter!("webtransport_sessions_total").increment(1); + + serve_wt_streams(wt_session, state, registry).await; + return Ok(()); + } + + debug!(method = %method, uri = %uri, "WebTransport: non-CONNECT request ignored"); + } +} + +/// Per-connection state from the WebTransport auth handshake. +#[derive(Debug, Clone, Default)] +struct ConnectionState { + session_token: Option>, + identity_key: Option>, +} + +/// Accept bidirectional streams from a WebTransport session and dispatch +/// each as an RPC request. +async fn serve_wt_streams( + session: WtSession, + state: Arc, + registry: Arc>, +) { + // Auth handshake: the first bidi stream carries the session token. + let conn_state: Arc = match accept_auth_stream(&session).await { + Ok(cs) => Arc::new(cs), + Err(e) => { + warn!(error = %e, "WebTransport: auth handshake failed"); + return; + } + }; + + loop { + match session.accept_bi().await { + Ok(Some(AcceptedBi::BidiStream(_session_id, stream))) => { + let state = Arc::clone(&state); + let registry = Arc::clone(®istry); + let conn_state = Arc::clone(&conn_state); + tokio::spawn(async move { + if let Err(e) = + handle_wt_bidi_stream(stream, state, registry, &conn_state).await + { + debug!(error = %e, "WebTransport: stream error"); + } + }); + } + Ok(Some(AcceptedBi::Request(_req, _stream))) => { + debug!("WebTransport: ignoring nested HTTP/3 request"); + } + Ok(None) => { + debug!("WebTransport: no more bidi streams"); + break; + } + Err(e) => { + debug!(error = %e, "WebTransport: accept_bi error"); + break; + } + } + } +} + +/// Accept the first bidirectional stream as an auth init handshake. +/// +/// The client sends a raw session token (length-prefixed: `u32 BE + token bytes`). +/// The server reads it and sends a 1-byte ack (0x00). +async fn accept_auth_stream(session: &WtSession) -> anyhow::Result { + let accepted = session + .accept_bi() + .await + .map_err(|e| anyhow::anyhow!("auth stream accept: {e}"))? + .ok_or_else(|| anyhow::anyhow!("session closed before auth handshake"))?; + + let mut stream: WtBidiStream = match accepted { + AcceptedBi::BidiStream(_session_id, stream) => stream, + AcceptedBi::Request(_, _) => { + anyhow::bail!("expected bidi stream for auth, got HTTP/3 request") + } + }; + + // Read the token: [len: u32 BE][token bytes] + let mut header = [0u8; 4]; + AsyncReadExt::read_exact(&mut stream, &mut header) + .await + .map_err(|e| anyhow::anyhow!("auth read header: {e}"))?; + let len = u32::from_be_bytes(header) as usize; + + if len > 4096 { + anyhow::bail!("auth token too large: {len} bytes"); + } + + let mut token = vec![0u8; len]; + if len > 0 { + AsyncReadExt::read_exact(&mut stream, &mut token) + .await + .map_err(|e| anyhow::anyhow!("auth read token: {e}"))?; + } + + // Send ack: single zero byte. + AsyncWriteExt::write_all(&mut stream, &[0u8]) + .await + .map_err(|e| anyhow::anyhow!("auth ack send: {e}"))?; + + debug!(token_len = token.len(), "WebTransport: auth init received"); + + Ok(ConnectionState { + session_token: Some(token), + identity_key: None, + }) +} + +/// Handle a single WebTransport bidirectional stream: read request, dispatch, +/// write response. Uses the same framing as native QUIC. +async fn handle_wt_bidi_stream( + mut stream: WtBidiStream, + state: Arc, + registry: Arc>, + conn_state: &ConnectionState, +) -> anyhow::Result<()> { + // Read the complete request from the stream. + let max_size = + quicproquo_rpc::framing::MAX_PAYLOAD_SIZE + quicproquo_rpc::framing::REQUEST_HEADER_SIZE; + + let mut buf = Vec::with_capacity(1024); + let mut tmp = [0u8; 8192]; + loop { + let n = AsyncReadExt::read(&mut stream, &mut tmp) + .await + .map_err(|e| anyhow::anyhow!("recv: {e}"))?; + + if n == 0 { + break; + } + + buf.extend_from_slice(&tmp[..n]); + + if buf.len() > max_size { + anyhow::bail!("payload too large"); + } + } + + let mut bytes = BytesMut::from(buf.as_slice()); + + let frame = match RequestFrame::decode(&mut bytes) + .map_err(|e| anyhow::anyhow!("decode: {e}"))? + { + Some(f) => f, + None => anyhow::bail!("incomplete request frame"), + }; + + let trace_id = format!( + "wt-{:016x}", + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_nanos() as u64 + ); + + let result = match registry.get(frame.method_id) { + Some((handler, name, timeout)) => { + let span = tracing::info_span!( + "wt_rpc", + trace_id = %trace_id, + method_id = frame.method_id, + method = name, + req_id = frame.request_id, + ); + let _guard = span.enter(); + debug!("dispatching"); + + let deadline = timeout.map(|d| tokio::time::Instant::now() + d); + let start = std::time::Instant::now(); + + let ctx = RequestContext { + identity_key: conn_state.identity_key.clone(), + session_token: conn_state.session_token.clone(), + payload: frame.payload, + trace_id: trace_id.clone(), + deadline, + }; + + let result = if let Some(dur) = timeout { + match tokio::time::timeout(dur, handler(Arc::clone(&state), ctx)).await { + Ok(r) => r, + Err(_) => { + warn!(method = name, "WebTransport: request deadline exceeded"); + HandlerResult::err(RpcStatus::DeadlineExceeded, "request deadline exceeded") + } + } + } else { + handler(Arc::clone(&state), ctx).await + }; + + let elapsed = start.elapsed(); + metrics::histogram!("webtransport_request_duration_seconds", "method" => name) + .record(elapsed.as_secs_f64()); + metrics::counter!("webtransport_requests_total", "method" => name).increment(1); + + result + } + None => { + warn!(method_id = frame.method_id, "WebTransport: unknown method"); + HandlerResult::err(RpcStatus::UnknownMethod, "unknown method") + } + }; + + let response = ResponseFrame { + status: result.status as u8, + request_id: frame.request_id, + payload: result.payload, + }; + + let encoded = response.encode(); + AsyncWriteExt::write_all(&mut stream, &encoded) + .await + .map_err(|e| anyhow::anyhow!("send response: {e}"))?; + AsyncWriteExt::shutdown(&mut stream) + .await + .map_err(|e| anyhow::anyhow!("shutdown: {e}"))?; + + Ok(()) +} + +/// Build a quinn `ServerConfig` for the WebTransport endpoint. +/// +/// Uses the same TLS cert/key as the main server but with "h3" ALPN. +pub fn build_webtransport_server_config( + cert_path: &std::path::Path, + key_path: &std::path::Path, +) -> anyhow::Result { + use anyhow::Context; + use rustls::pki_types::{CertificateDer, PrivateKeyDer}; + use rustls::version::TLS13; + + let cert_bytes = std::fs::read(cert_path).context("read WebTransport cert")?; + let key_bytes = std::fs::read(key_path).context("read WebTransport key")?; + + let cert_chain = vec![CertificateDer::from(cert_bytes)]; + let key = PrivateKeyDer::try_from(key_bytes) + .map_err(|_| anyhow::anyhow!("invalid WebTransport private key"))?; + + let mut tls = rustls::ServerConfig::builder_with_protocol_versions(&[&TLS13]) + .with_no_client_auth() + .with_single_cert(cert_chain, key)?; + tls.alpn_protocols = vec![b"h3".to_vec()]; + + let crypto = quinn_proto::crypto::rustls::QuicServerConfig::try_from(tls) + .map_err(|e| anyhow::anyhow!("invalid WebTransport TLS config: {e}"))?; + + let mut transport = quinn::TransportConfig::default(); + transport.max_idle_timeout(Some( + std::time::Duration::from_secs(300) + .try_into() + .map_err(|e| anyhow::anyhow!("idle timeout: {e}"))?, + )); + // WebTransport sessions may have multiple simultaneous streams. + transport.max_concurrent_bidi_streams(64u32.into()); + transport.max_concurrent_uni_streams(16u32.into()); + + let mut server_config = quinn::ServerConfig::with_crypto(Arc::new(crypto)); + server_config.transport_config(Arc::new(transport)); + + Ok(server_config) +} + +#[cfg(test)] +mod tests { + #[test] + fn webtransport_module_compiles() { + assert!(true); + } +}