//! WebTransport server endpoint for browser clients. //! //! Accepts HTTP/3 WebTransport sessions and dispatches RPC requests through the //! same v2 handler registry as the native QUIC endpoint. Browsers connect via: //! //! ```js //! const wt = new WebTransport("https://server:7443"); //! ``` //! //! Each WebTransport bidirectional stream carries a single RPC request/response //! using the same wire format as the native QUIC transport: //! //! ```text //! [method_id: u16][request_id: u32][payload_len: u32][protobuf bytes] //! ``` use std::sync::Arc; use bytes::BytesMut; use h3_quinn::quinn; use h3_webtransport::server::AcceptedBi; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tracing::{debug, info, warn}; use crate::v2_handlers::ServerState; use quicproquo_rpc::error::RpcStatus; use quicproquo_rpc::framing::{RequestFrame, ResponseFrame}; use quicproquo_rpc::method::{HandlerResult, MethodRegistry, RequestContext}; /// Concrete H3 connection type. type H3Conn = h3::server::Connection; /// Concrete request resolver type. type H3Resolver = h3::server::RequestResolver; /// Type alias for the concrete WebTransport session type used by this server. type WtSession = h3_webtransport::server::WebTransportSession; /// Type alias for the concrete WebTransport bidi stream. type WtBidiStream = h3_webtransport::stream::BidiStream< h3_quinn::BidiStream, bytes::Bytes, >; /// Start the WebTransport listener in a background task. /// /// The endpoint uses the provided quinn `ServerConfig` (with "h3" ALPN) and /// binds to `listen_addr`. Incoming HTTP/3 CONNECT requests are upgraded to /// WebTransport sessions. pub fn spawn_webtransport_listener( listen_addr: std::net::SocketAddr, server_config: quinn::ServerConfig, state: Arc, registry: Arc>, ) -> anyhow::Result<()> { let endpoint = quinn::Endpoint::server(server_config, listen_addr) .map_err(|e| anyhow::anyhow!("bind WebTransport endpoint {listen_addr}: {e}"))?; info!(addr = %listen_addr, "WebTransport endpoint listening"); tokio::spawn(async move { accept_loop(endpoint, state, registry).await; }); Ok(()) } /// Accept QUIC connections and upgrade them to HTTP/3 + WebTransport. async fn accept_loop( endpoint: quinn::Endpoint, state: Arc, registry: Arc>, ) { while let Some(incoming) = endpoint.accept().await { let state = Arc::clone(&state); let registry = Arc::clone(®istry); tokio::spawn(async move { let connection = match incoming.await { Ok(c) => c, Err(e) => { warn!(error = %e, "WebTransport: QUIC accept failed"); return; } }; let remote = connection.remote_address(); debug!(remote = %remote, "WebTransport: new QUIC connection"); metrics::counter!("webtransport_connections_total").increment(1); metrics::gauge!("webtransport_active_connections").increment(1.0); if let Err(e) = handle_h3_connection(connection, state, registry).await { debug!(remote = %remote, error = %e, "WebTransport: session error"); } metrics::gauge!("webtransport_active_connections").decrement(1.0); }); } } /// Handle an HTTP/3 connection: accept the WebTransport CONNECT request and /// process bidirectional streams as RPC calls. async fn handle_h3_connection( connection: quinn::Connection, state: Arc, registry: Arc>, ) -> anyhow::Result<()> { let h3_quinn_conn = h3_quinn::Connection::new(connection); let mut h3_conn: H3Conn = h3::server::builder() .enable_webtransport(true) .enable_extended_connect(true) .enable_datagram(true) .build::(h3_quinn_conn) .await .map_err(|e| anyhow::anyhow!("H3 connection setup: {e}"))?; // Accept HTTP/3 requests until we get a CONNECT for WebTransport. loop { let resolver: H3Resolver = match h3_conn.accept().await { Ok(Some(r)) => r, Ok(None) => { debug!("WebTransport: H3 connection closed"); return Ok(()); } Err(e) => { return Err(anyhow::anyhow!("WebTransport: H3 accept error: {e}")); } }; let (request, stream) = resolver .resolve_request() .await .map_err(|e| anyhow::anyhow!("resolve request: {e}"))?; let method = request.method().clone(); let uri = request.uri().clone(); if method == http::Method::CONNECT { debug!(uri = %uri, "WebTransport: CONNECT request"); let wt_session = h3_webtransport::server::WebTransportSession::accept( request, stream, h3_conn, ) .await .map_err(|e| anyhow::anyhow!("WebTransport session accept: {e}"))?; info!("WebTransport: session established"); metrics::counter!("webtransport_sessions_total").increment(1); serve_wt_streams(wt_session, state, registry).await; return Ok(()); } debug!(method = %method, uri = %uri, "WebTransport: non-CONNECT request ignored"); } } /// Per-connection state from the WebTransport auth handshake. #[derive(Debug, Clone, Default)] struct ConnectionState { session_token: Option>, identity_key: Option>, } /// Accept bidirectional streams from a WebTransport session and dispatch /// each as an RPC request. async fn serve_wt_streams( session: WtSession, state: Arc, registry: Arc>, ) { // Auth handshake: the first bidi stream carries the session token. let conn_state: Arc = match accept_auth_stream(&session).await { Ok(cs) => Arc::new(cs), Err(e) => { warn!(error = %e, "WebTransport: auth handshake failed"); return; } }; loop { match session.accept_bi().await { Ok(Some(AcceptedBi::BidiStream(_session_id, stream))) => { let state = Arc::clone(&state); let registry = Arc::clone(®istry); let conn_state = Arc::clone(&conn_state); tokio::spawn(async move { if let Err(e) = handle_wt_bidi_stream(stream, state, registry, &conn_state).await { debug!(error = %e, "WebTransport: stream error"); } }); } Ok(Some(AcceptedBi::Request(_req, _stream))) => { debug!("WebTransport: ignoring nested HTTP/3 request"); } Ok(None) => { debug!("WebTransport: no more bidi streams"); break; } Err(e) => { debug!(error = %e, "WebTransport: accept_bi error"); break; } } } } /// Accept the first bidirectional stream as an auth init handshake. /// /// The client sends a raw session token (length-prefixed: `u32 BE + token bytes`). /// The server reads it and sends a 1-byte ack (0x00). async fn accept_auth_stream(session: &WtSession) -> anyhow::Result { let accepted = session .accept_bi() .await .map_err(|e| anyhow::anyhow!("auth stream accept: {e}"))? .ok_or_else(|| anyhow::anyhow!("session closed before auth handshake"))?; let mut stream: WtBidiStream = match accepted { AcceptedBi::BidiStream(_session_id, stream) => stream, AcceptedBi::Request(_, _) => { anyhow::bail!("expected bidi stream for auth, got HTTP/3 request") } }; // Read the token: [len: u32 BE][token bytes] let mut header = [0u8; 4]; AsyncReadExt::read_exact(&mut stream, &mut header) .await .map_err(|e| anyhow::anyhow!("auth read header: {e}"))?; let len = u32::from_be_bytes(header) as usize; if len > 4096 { anyhow::bail!("auth token too large: {len} bytes"); } let mut token = vec![0u8; len]; if len > 0 { AsyncReadExt::read_exact(&mut stream, &mut token) .await .map_err(|e| anyhow::anyhow!("auth read token: {e}"))?; } // Send ack: single zero byte. AsyncWriteExt::write_all(&mut stream, &[0u8]) .await .map_err(|e| anyhow::anyhow!("auth ack send: {e}"))?; debug!(token_len = token.len(), "WebTransport: auth init received"); Ok(ConnectionState { session_token: Some(token), identity_key: None, }) } /// Handle a single WebTransport bidirectional stream: read request, dispatch, /// write response. Uses the same framing as native QUIC. async fn handle_wt_bidi_stream( mut stream: WtBidiStream, state: Arc, registry: Arc>, conn_state: &ConnectionState, ) -> anyhow::Result<()> { // Read the complete request from the stream. let max_size = quicproquo_rpc::framing::MAX_PAYLOAD_SIZE + quicproquo_rpc::framing::REQUEST_HEADER_SIZE; let mut buf = Vec::with_capacity(1024); let mut tmp = [0u8; 8192]; loop { let n = AsyncReadExt::read(&mut stream, &mut tmp) .await .map_err(|e| anyhow::anyhow!("recv: {e}"))?; if n == 0 { break; } buf.extend_from_slice(&tmp[..n]); if buf.len() > max_size { anyhow::bail!("payload too large"); } } let mut bytes = BytesMut::from(buf.as_slice()); let frame = match RequestFrame::decode(&mut bytes) .map_err(|e| anyhow::anyhow!("decode: {e}"))? { Some(f) => f, None => anyhow::bail!("incomplete request frame"), }; let trace_id = format!( "wt-{:016x}", std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_nanos() as u64 ); let result = match registry.get(frame.method_id) { Some((handler, name, timeout)) => { let span = tracing::info_span!( "wt_rpc", trace_id = %trace_id, method_id = frame.method_id, method = name, req_id = frame.request_id, ); let _guard = span.enter(); debug!("dispatching"); let deadline = timeout.map(|d| tokio::time::Instant::now() + d); let start = std::time::Instant::now(); let ctx = RequestContext { identity_key: conn_state.identity_key.clone(), session_token: conn_state.session_token.clone(), payload: frame.payload, trace_id: trace_id.clone(), deadline, }; let result = if let Some(dur) = timeout { match tokio::time::timeout(dur, handler(Arc::clone(&state), ctx)).await { Ok(r) => r, Err(_) => { warn!(method = name, "WebTransport: request deadline exceeded"); HandlerResult::err(RpcStatus::DeadlineExceeded, "request deadline exceeded") } } } else { handler(Arc::clone(&state), ctx).await }; let elapsed = start.elapsed(); metrics::histogram!("webtransport_request_duration_seconds", "method" => name) .record(elapsed.as_secs_f64()); metrics::counter!("webtransport_requests_total", "method" => name).increment(1); result } None => { warn!(method_id = frame.method_id, "WebTransport: unknown method"); HandlerResult::err(RpcStatus::UnknownMethod, "unknown method") } }; let response = ResponseFrame { status: result.status as u8, request_id: frame.request_id, payload: result.payload, }; let encoded = response.encode(); AsyncWriteExt::write_all(&mut stream, &encoded) .await .map_err(|e| anyhow::anyhow!("send response: {e}"))?; AsyncWriteExt::shutdown(&mut stream) .await .map_err(|e| anyhow::anyhow!("shutdown: {e}"))?; Ok(()) } /// Build a quinn `ServerConfig` for the WebTransport endpoint. /// /// Uses the same TLS cert/key as the main server but with "h3" ALPN. pub fn build_webtransport_server_config( cert_path: &std::path::Path, key_path: &std::path::Path, ) -> anyhow::Result { use anyhow::Context; use rustls::pki_types::{CertificateDer, PrivateKeyDer}; use rustls::version::TLS13; let cert_bytes = std::fs::read(cert_path).context("read WebTransport cert")?; let key_bytes = std::fs::read(key_path).context("read WebTransport key")?; let cert_chain = vec![CertificateDer::from(cert_bytes)]; let key = PrivateKeyDer::try_from(key_bytes) .map_err(|_| anyhow::anyhow!("invalid WebTransport private key"))?; let mut tls = rustls::ServerConfig::builder_with_protocol_versions(&[&TLS13]) .with_no_client_auth() .with_single_cert(cert_chain, key)?; tls.alpn_protocols = vec![b"h3".to_vec()]; let crypto = quinn_proto::crypto::rustls::QuicServerConfig::try_from(tls) .map_err(|e| anyhow::anyhow!("invalid WebTransport TLS config: {e}"))?; let mut transport = quinn::TransportConfig::default(); transport.max_idle_timeout(Some( std::time::Duration::from_secs(300) .try_into() .map_err(|e| anyhow::anyhow!("idle timeout: {e}"))?, )); // WebTransport sessions may have multiple simultaneous streams. transport.max_concurrent_bidi_streams(64u32.into()); transport.max_concurrent_uni_streams(16u32.into()); let mut server_config = quinn::ServerConfig::with_crypto(Arc::new(crypto)); server_config.transport_config(Arc::new(transport)); Ok(server_config) } #[cfg(test)] mod tests { #[test] fn webtransport_module_compiles() { assert!(true); } }