feat: add WebTransport (HTTP/3) server endpoint for browser clients
Feature-gated behind --features webtransport. Uses h3, h3-quinn, and h3-webtransport crates to accept WebTransport sessions over HTTP/3. Dispatches RPC through the same v2 handler registry as native QUIC, using identical wire framing. - webtransport.rs: H3 connection handling, session management, bidi stream RPC dispatch with auth handshake - Config: --webtransport-listen / QPQ_WEBTRANSPORT_LISTEN - ALPN: "h3" for WebTransport, "capnp" for native QUIC - Also fixes: add missing save/load_revocation_log to SqlStore
This commit is contained in:
@@ -79,7 +79,7 @@ mdns-sd = "0.12"
|
|||||||
|
|
||||||
# WebTransport (HTTP/3) — feature-gated, for browser clients.
|
# WebTransport (HTTP/3) — feature-gated, for browser clients.
|
||||||
h3 = { version = "0.0.8", optional = true }
|
h3 = { version = "0.0.8", optional = true }
|
||||||
h3-quinn = { version = "0.0.10", optional = true }
|
h3-quinn = { version = "0.0.10", features = ["datagram"], optional = true }
|
||||||
h3-webtransport = { version = "0.1", optional = true }
|
h3-webtransport = { version = "0.1", optional = true }
|
||||||
http = { version = "1", optional = true }
|
http = { version = "1", optional = true }
|
||||||
|
|
||||||
|
|||||||
@@ -38,6 +38,8 @@ pub struct FileConfig {
|
|||||||
pub redact_logs: Option<bool>,
|
pub redact_logs: Option<bool>,
|
||||||
/// WebSocket JSON-RPC bridge listen address (e.g. "0.0.0.0:9000").
|
/// WebSocket JSON-RPC bridge listen address (e.g. "0.0.0.0:9000").
|
||||||
pub ws_listen: Option<String>,
|
pub ws_listen: Option<String>,
|
||||||
|
/// WebTransport (HTTP/3) listen address for browser clients (e.g. "0.0.0.0:7443").
|
||||||
|
pub webtransport_listen: Option<String>,
|
||||||
/// Graceful shutdown drain timeout in seconds.
|
/// Graceful shutdown drain timeout in seconds.
|
||||||
pub drain_timeout_secs: Option<u64>,
|
pub drain_timeout_secs: Option<u64>,
|
||||||
/// Default per-RPC timeout in seconds.
|
/// Default per-RPC timeout in seconds.
|
||||||
@@ -70,6 +72,8 @@ pub struct EffectiveConfig {
|
|||||||
pub redact_logs: bool,
|
pub redact_logs: bool,
|
||||||
/// WebSocket JSON-RPC bridge listen address. If set, the bridge is started.
|
/// WebSocket JSON-RPC bridge listen address. If set, the bridge is started.
|
||||||
pub ws_listen: Option<String>,
|
pub ws_listen: Option<String>,
|
||||||
|
/// WebTransport (HTTP/3) listen address. If set, the WebTransport endpoint is started.
|
||||||
|
pub webtransport_listen: Option<String>,
|
||||||
/// Graceful shutdown drain timeout in seconds.
|
/// Graceful shutdown drain timeout in seconds.
|
||||||
pub drain_timeout_secs: u64,
|
pub drain_timeout_secs: u64,
|
||||||
/// Default per-RPC timeout in seconds.
|
/// Default per-RPC timeout in seconds.
|
||||||
@@ -250,6 +254,11 @@ pub fn merge_config(args: &crate::Args, file: &FileConfig) -> EffectiveConfig {
|
|||||||
.clone()
|
.clone()
|
||||||
.or_else(|| file.ws_listen.clone());
|
.or_else(|| file.ws_listen.clone());
|
||||||
|
|
||||||
|
let webtransport_listen = args
|
||||||
|
.webtransport_listen
|
||||||
|
.clone()
|
||||||
|
.or_else(|| file.webtransport_listen.clone());
|
||||||
|
|
||||||
let drain_timeout_secs = if args.drain_timeout == DEFAULT_DRAIN_TIMEOUT_SECS {
|
let drain_timeout_secs = if args.drain_timeout == DEFAULT_DRAIN_TIMEOUT_SECS {
|
||||||
file.drain_timeout_secs.unwrap_or(DEFAULT_DRAIN_TIMEOUT_SECS)
|
file.drain_timeout_secs.unwrap_or(DEFAULT_DRAIN_TIMEOUT_SECS)
|
||||||
} else {
|
} else {
|
||||||
@@ -283,6 +292,7 @@ pub fn merge_config(args: &crate::Args, file: &FileConfig) -> EffectiveConfig {
|
|||||||
plugin_dir,
|
plugin_dir,
|
||||||
redact_logs,
|
redact_logs,
|
||||||
ws_listen,
|
ws_listen,
|
||||||
|
webtransport_listen,
|
||||||
drain_timeout_secs,
|
drain_timeout_secs,
|
||||||
rpc_timeout_secs,
|
rpc_timeout_secs,
|
||||||
storage_timeout_secs,
|
storage_timeout_secs,
|
||||||
|
|||||||
@@ -31,6 +31,8 @@ mod tls;
|
|||||||
mod storage;
|
mod storage;
|
||||||
pub mod v2_handlers;
|
pub mod v2_handlers;
|
||||||
mod ws_bridge;
|
mod ws_bridge;
|
||||||
|
#[cfg(feature = "webtransport")]
|
||||||
|
mod webtransport;
|
||||||
|
|
||||||
use auth::{AuthConfig, PendingLogin, RateEntry, SessionInfo};
|
use auth::{AuthConfig, PendingLogin, RateEntry, SessionInfo};
|
||||||
use config::{
|
use config::{
|
||||||
@@ -128,6 +130,11 @@ struct Args {
|
|||||||
#[arg(long, env = "QPQ_WS_LISTEN")]
|
#[arg(long, env = "QPQ_WS_LISTEN")]
|
||||||
ws_listen: Option<String>,
|
ws_listen: Option<String>,
|
||||||
|
|
||||||
|
/// WebTransport (HTTP/3) listen address for browser clients (e.g. 0.0.0.0:7443).
|
||||||
|
/// Requires --features webtransport.
|
||||||
|
#[arg(long, env = "QPQ_WEBTRANSPORT_LISTEN")]
|
||||||
|
webtransport_listen: Option<String>,
|
||||||
|
|
||||||
/// Graceful shutdown drain timeout in seconds (default: 30). In-flight RPCs get this
|
/// Graceful shutdown drain timeout in seconds (default: 30). In-flight RPCs get this
|
||||||
/// long to finish after a shutdown signal before connections are forcefully closed.
|
/// long to finish after a shutdown signal before connections are forcefully closed.
|
||||||
#[arg(long, env = "QPQ_DRAIN_TIMEOUT", default_value_t = config::DEFAULT_DRAIN_TIMEOUT_SECS)]
|
#[arg(long, env = "QPQ_DRAIN_TIMEOUT", default_value_t = config::DEFAULT_DRAIN_TIMEOUT_SECS)]
|
||||||
@@ -367,6 +374,54 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
ws_bridge::spawn_ws_bridge(ws_addr, ws_state);
|
ws_bridge::spawn_ws_bridge(ws_addr, ws_state);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── WebTransport (HTTP/3) endpoint ─────────────────────────────────────
|
||||||
|
#[cfg(feature = "webtransport")]
|
||||||
|
if let Some(wt_addr_str) = &effective.webtransport_listen {
|
||||||
|
let wt_addr: SocketAddr = wt_addr_str
|
||||||
|
.parse()
|
||||||
|
.context("--webtransport-listen must be host:port (e.g. 0.0.0.0:7443)")?;
|
||||||
|
|
||||||
|
let wt_server_config = webtransport::build_webtransport_server_config(
|
||||||
|
&effective.tls_cert,
|
||||||
|
&effective.tls_key,
|
||||||
|
)
|
||||||
|
.context("build WebTransport server config")?;
|
||||||
|
|
||||||
|
let wt_state = Arc::new(v2_handlers::ServerState {
|
||||||
|
store: Arc::clone(&store),
|
||||||
|
waiters: Arc::clone(&waiters),
|
||||||
|
auth_cfg: Arc::clone(&auth_cfg),
|
||||||
|
opaque_setup: Arc::clone(&opaque_setup),
|
||||||
|
pending_logins: Arc::clone(&pending_logins),
|
||||||
|
sessions: Arc::clone(&sessions),
|
||||||
|
rate_limits: Arc::clone(&rate_limits),
|
||||||
|
sealed_sender: effective.sealed_sender,
|
||||||
|
hooks: Arc::clone(&hooks),
|
||||||
|
signing_key: Arc::clone(&signing_key),
|
||||||
|
kt_log: Arc::clone(&kt_log),
|
||||||
|
revocation_log: Arc::new(std::sync::Mutex::new(
|
||||||
|
quicproquo_kt::RevocationLog::new(),
|
||||||
|
)),
|
||||||
|
data_dir: PathBuf::from(&effective.data_dir),
|
||||||
|
redact_logs: effective.redact_logs,
|
||||||
|
audit_logger: Arc::new(audit::NoopAuditLogger),
|
||||||
|
draining: Arc::new(std::sync::atomic::AtomicBool::new(false)),
|
||||||
|
seen_message_ids: Arc::new(DashMap::new()),
|
||||||
|
banned_users: Arc::new(DashMap::new()),
|
||||||
|
moderation_reports: Arc::new(std::sync::Mutex::new(Vec::new())),
|
||||||
|
node_id: format!("wt-{}", hex::encode(&signing_key.public_key_bytes()[..4])),
|
||||||
|
start_time: std::time::Instant::now(),
|
||||||
|
storage_backend: effective.store_backend.clone(),
|
||||||
|
});
|
||||||
|
|
||||||
|
let wt_registry = Arc::new(v2_handlers::build_registry(
|
||||||
|
std::time::Duration::from_secs(effective.rpc_timeout_secs),
|
||||||
|
));
|
||||||
|
|
||||||
|
webtransport::spawn_webtransport_listener(wt_addr, wt_server_config, wt_state, wt_registry)
|
||||||
|
.context("spawn WebTransport listener")?;
|
||||||
|
}
|
||||||
|
|
||||||
let endpoint = Endpoint::server(server_config, listen)?;
|
let endpoint = Endpoint::server(server_config, listen)?;
|
||||||
|
|
||||||
tracing::info!(
|
tracing::info!(
|
||||||
|
|||||||
@@ -416,6 +416,27 @@ impl Store for SqlStore {
|
|||||||
.map_err(|e| StorageError::Db(e.to_string()))
|
.map_err(|e| StorageError::Db(e.to_string()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn save_revocation_log(&self, bytes: Vec<u8>) -> Result<(), StorageError> {
|
||||||
|
let conn = self.get_conn()?;
|
||||||
|
conn.execute(
|
||||||
|
"INSERT OR REPLACE INTO kt_log (id, log_data) VALUES (2, ?1)",
|
||||||
|
params![bytes],
|
||||||
|
)
|
||||||
|
.map_err(|e| StorageError::Db(e.to_string()))?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn load_revocation_log(&self) -> Result<Option<Vec<u8>>, StorageError> {
|
||||||
|
let conn = self.get_conn()?;
|
||||||
|
let mut stmt = conn
|
||||||
|
.prepare("SELECT log_data FROM kt_log WHERE id = 2")
|
||||||
|
.map_err(|e| StorageError::Db(e.to_string()))?;
|
||||||
|
|
||||||
|
stmt.query_row([], |row| row.get(0))
|
||||||
|
.optional()
|
||||||
|
.map_err(|e| StorageError::Db(e.to_string()))
|
||||||
|
}
|
||||||
|
|
||||||
fn store_user_record(&self, username: &str, record: Vec<u8>) -> Result<(), StorageError> {
|
fn store_user_record(&self, username: &str, record: Vec<u8>) -> Result<(), StorageError> {
|
||||||
let conn = self.get_conn()?;
|
let conn = self.get_conn()?;
|
||||||
conn.execute(
|
conn.execute(
|
||||||
|
|||||||
420
crates/quicproquo-server/src/webtransport.rs
Normal file
420
crates/quicproquo-server/src/webtransport.rs
Normal file
@@ -0,0 +1,420 @@
|
|||||||
|
//! WebTransport server endpoint for browser clients.
|
||||||
|
//!
|
||||||
|
//! Accepts HTTP/3 WebTransport sessions and dispatches RPC requests through the
|
||||||
|
//! same v2 handler registry as the native QUIC endpoint. Browsers connect via:
|
||||||
|
//!
|
||||||
|
//! ```js
|
||||||
|
//! const wt = new WebTransport("https://server:7443");
|
||||||
|
//! ```
|
||||||
|
//!
|
||||||
|
//! Each WebTransport bidirectional stream carries a single RPC request/response
|
||||||
|
//! using the same wire format as the native QUIC transport:
|
||||||
|
//!
|
||||||
|
//! ```text
|
||||||
|
//! [method_id: u16][request_id: u32][payload_len: u32][protobuf bytes]
|
||||||
|
//! ```
|
||||||
|
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use bytes::BytesMut;
|
||||||
|
use h3_quinn::quinn;
|
||||||
|
use h3_webtransport::server::AcceptedBi;
|
||||||
|
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||||
|
use tracing::{debug, info, warn};
|
||||||
|
|
||||||
|
use crate::v2_handlers::ServerState;
|
||||||
|
use quicproquo_rpc::error::RpcStatus;
|
||||||
|
use quicproquo_rpc::framing::{RequestFrame, ResponseFrame};
|
||||||
|
use quicproquo_rpc::method::{HandlerResult, MethodRegistry, RequestContext};
|
||||||
|
|
||||||
|
/// Concrete H3 connection type.
|
||||||
|
type H3Conn = h3::server::Connection<h3_quinn::Connection, bytes::Bytes>;
|
||||||
|
|
||||||
|
/// Concrete request resolver type.
|
||||||
|
type H3Resolver = h3::server::RequestResolver<h3_quinn::Connection, bytes::Bytes>;
|
||||||
|
|
||||||
|
/// Type alias for the concrete WebTransport session type used by this server.
|
||||||
|
type WtSession =
|
||||||
|
h3_webtransport::server::WebTransportSession<h3_quinn::Connection, bytes::Bytes>;
|
||||||
|
|
||||||
|
/// Type alias for the concrete WebTransport bidi stream.
|
||||||
|
type WtBidiStream = h3_webtransport::stream::BidiStream<
|
||||||
|
h3_quinn::BidiStream<bytes::Bytes>,
|
||||||
|
bytes::Bytes,
|
||||||
|
>;
|
||||||
|
|
||||||
|
/// Start the WebTransport listener in a background task.
|
||||||
|
///
|
||||||
|
/// The endpoint uses the provided quinn `ServerConfig` (with "h3" ALPN) and
|
||||||
|
/// binds to `listen_addr`. Incoming HTTP/3 CONNECT requests are upgraded to
|
||||||
|
/// WebTransport sessions.
|
||||||
|
pub fn spawn_webtransport_listener(
|
||||||
|
listen_addr: std::net::SocketAddr,
|
||||||
|
server_config: quinn::ServerConfig,
|
||||||
|
state: Arc<ServerState>,
|
||||||
|
registry: Arc<MethodRegistry<ServerState>>,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
let endpoint = quinn::Endpoint::server(server_config, listen_addr)
|
||||||
|
.map_err(|e| anyhow::anyhow!("bind WebTransport endpoint {listen_addr}: {e}"))?;
|
||||||
|
|
||||||
|
info!(addr = %listen_addr, "WebTransport endpoint listening");
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
accept_loop(endpoint, state, registry).await;
|
||||||
|
});
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Accept QUIC connections and upgrade them to HTTP/3 + WebTransport.
|
||||||
|
async fn accept_loop(
|
||||||
|
endpoint: quinn::Endpoint,
|
||||||
|
state: Arc<ServerState>,
|
||||||
|
registry: Arc<MethodRegistry<ServerState>>,
|
||||||
|
) {
|
||||||
|
while let Some(incoming) = endpoint.accept().await {
|
||||||
|
let state = Arc::clone(&state);
|
||||||
|
let registry = Arc::clone(®istry);
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let connection = match incoming.await {
|
||||||
|
Ok(c) => c,
|
||||||
|
Err(e) => {
|
||||||
|
warn!(error = %e, "WebTransport: QUIC accept failed");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let remote = connection.remote_address();
|
||||||
|
debug!(remote = %remote, "WebTransport: new QUIC connection");
|
||||||
|
metrics::counter!("webtransport_connections_total").increment(1);
|
||||||
|
metrics::gauge!("webtransport_active_connections").increment(1.0);
|
||||||
|
|
||||||
|
if let Err(e) = handle_h3_connection(connection, state, registry).await {
|
||||||
|
debug!(remote = %remote, error = %e, "WebTransport: session error");
|
||||||
|
}
|
||||||
|
|
||||||
|
metrics::gauge!("webtransport_active_connections").decrement(1.0);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Handle an HTTP/3 connection: accept the WebTransport CONNECT request and
|
||||||
|
/// process bidirectional streams as RPC calls.
|
||||||
|
async fn handle_h3_connection(
|
||||||
|
connection: quinn::Connection,
|
||||||
|
state: Arc<ServerState>,
|
||||||
|
registry: Arc<MethodRegistry<ServerState>>,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
let h3_quinn_conn = h3_quinn::Connection::new(connection);
|
||||||
|
let mut h3_conn: H3Conn = h3::server::builder()
|
||||||
|
.enable_webtransport(true)
|
||||||
|
.enable_extended_connect(true)
|
||||||
|
.enable_datagram(true)
|
||||||
|
.build::<h3_quinn::Connection, bytes::Bytes>(h3_quinn_conn)
|
||||||
|
.await
|
||||||
|
.map_err(|e| anyhow::anyhow!("H3 connection setup: {e}"))?;
|
||||||
|
|
||||||
|
// Accept HTTP/3 requests until we get a CONNECT for WebTransport.
|
||||||
|
loop {
|
||||||
|
let resolver: H3Resolver = match h3_conn.accept().await {
|
||||||
|
Ok(Some(r)) => r,
|
||||||
|
Ok(None) => {
|
||||||
|
debug!("WebTransport: H3 connection closed");
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
return Err(anyhow::anyhow!("WebTransport: H3 accept error: {e}"));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let (request, stream) = resolver
|
||||||
|
.resolve_request()
|
||||||
|
.await
|
||||||
|
.map_err(|e| anyhow::anyhow!("resolve request: {e}"))?;
|
||||||
|
|
||||||
|
let method = request.method().clone();
|
||||||
|
let uri = request.uri().clone();
|
||||||
|
|
||||||
|
if method == http::Method::CONNECT {
|
||||||
|
debug!(uri = %uri, "WebTransport: CONNECT request");
|
||||||
|
|
||||||
|
let wt_session = h3_webtransport::server::WebTransportSession::accept(
|
||||||
|
request, stream, h3_conn,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.map_err(|e| anyhow::anyhow!("WebTransport session accept: {e}"))?;
|
||||||
|
|
||||||
|
info!("WebTransport: session established");
|
||||||
|
metrics::counter!("webtransport_sessions_total").increment(1);
|
||||||
|
|
||||||
|
serve_wt_streams(wt_session, state, registry).await;
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
debug!(method = %method, uri = %uri, "WebTransport: non-CONNECT request ignored");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Per-connection state from the WebTransport auth handshake.
|
||||||
|
#[derive(Debug, Clone, Default)]
|
||||||
|
struct ConnectionState {
|
||||||
|
session_token: Option<Vec<u8>>,
|
||||||
|
identity_key: Option<Vec<u8>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Accept bidirectional streams from a WebTransport session and dispatch
|
||||||
|
/// each as an RPC request.
|
||||||
|
async fn serve_wt_streams(
|
||||||
|
session: WtSession,
|
||||||
|
state: Arc<ServerState>,
|
||||||
|
registry: Arc<MethodRegistry<ServerState>>,
|
||||||
|
) {
|
||||||
|
// Auth handshake: the first bidi stream carries the session token.
|
||||||
|
let conn_state: Arc<ConnectionState> = match accept_auth_stream(&session).await {
|
||||||
|
Ok(cs) => Arc::new(cs),
|
||||||
|
Err(e) => {
|
||||||
|
warn!(error = %e, "WebTransport: auth handshake failed");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match session.accept_bi().await {
|
||||||
|
Ok(Some(AcceptedBi::BidiStream(_session_id, stream))) => {
|
||||||
|
let state = Arc::clone(&state);
|
||||||
|
let registry = Arc::clone(®istry);
|
||||||
|
let conn_state = Arc::clone(&conn_state);
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Err(e) =
|
||||||
|
handle_wt_bidi_stream(stream, state, registry, &conn_state).await
|
||||||
|
{
|
||||||
|
debug!(error = %e, "WebTransport: stream error");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Ok(Some(AcceptedBi::Request(_req, _stream))) => {
|
||||||
|
debug!("WebTransport: ignoring nested HTTP/3 request");
|
||||||
|
}
|
||||||
|
Ok(None) => {
|
||||||
|
debug!("WebTransport: no more bidi streams");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
debug!(error = %e, "WebTransport: accept_bi error");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Accept the first bidirectional stream as an auth init handshake.
|
||||||
|
///
|
||||||
|
/// The client sends a raw session token (length-prefixed: `u32 BE + token bytes`).
|
||||||
|
/// The server reads it and sends a 1-byte ack (0x00).
|
||||||
|
async fn accept_auth_stream(session: &WtSession) -> anyhow::Result<ConnectionState> {
|
||||||
|
let accepted = session
|
||||||
|
.accept_bi()
|
||||||
|
.await
|
||||||
|
.map_err(|e| anyhow::anyhow!("auth stream accept: {e}"))?
|
||||||
|
.ok_or_else(|| anyhow::anyhow!("session closed before auth handshake"))?;
|
||||||
|
|
||||||
|
let mut stream: WtBidiStream = match accepted {
|
||||||
|
AcceptedBi::BidiStream(_session_id, stream) => stream,
|
||||||
|
AcceptedBi::Request(_, _) => {
|
||||||
|
anyhow::bail!("expected bidi stream for auth, got HTTP/3 request")
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Read the token: [len: u32 BE][token bytes]
|
||||||
|
let mut header = [0u8; 4];
|
||||||
|
AsyncReadExt::read_exact(&mut stream, &mut header)
|
||||||
|
.await
|
||||||
|
.map_err(|e| anyhow::anyhow!("auth read header: {e}"))?;
|
||||||
|
let len = u32::from_be_bytes(header) as usize;
|
||||||
|
|
||||||
|
if len > 4096 {
|
||||||
|
anyhow::bail!("auth token too large: {len} bytes");
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut token = vec![0u8; len];
|
||||||
|
if len > 0 {
|
||||||
|
AsyncReadExt::read_exact(&mut stream, &mut token)
|
||||||
|
.await
|
||||||
|
.map_err(|e| anyhow::anyhow!("auth read token: {e}"))?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send ack: single zero byte.
|
||||||
|
AsyncWriteExt::write_all(&mut stream, &[0u8])
|
||||||
|
.await
|
||||||
|
.map_err(|e| anyhow::anyhow!("auth ack send: {e}"))?;
|
||||||
|
|
||||||
|
debug!(token_len = token.len(), "WebTransport: auth init received");
|
||||||
|
|
||||||
|
Ok(ConnectionState {
|
||||||
|
session_token: Some(token),
|
||||||
|
identity_key: None,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Handle a single WebTransport bidirectional stream: read request, dispatch,
|
||||||
|
/// write response. Uses the same framing as native QUIC.
|
||||||
|
async fn handle_wt_bidi_stream(
|
||||||
|
mut stream: WtBidiStream,
|
||||||
|
state: Arc<ServerState>,
|
||||||
|
registry: Arc<MethodRegistry<ServerState>>,
|
||||||
|
conn_state: &ConnectionState,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
// Read the complete request from the stream.
|
||||||
|
let max_size =
|
||||||
|
quicproquo_rpc::framing::MAX_PAYLOAD_SIZE + quicproquo_rpc::framing::REQUEST_HEADER_SIZE;
|
||||||
|
|
||||||
|
let mut buf = Vec::with_capacity(1024);
|
||||||
|
let mut tmp = [0u8; 8192];
|
||||||
|
loop {
|
||||||
|
let n = AsyncReadExt::read(&mut stream, &mut tmp)
|
||||||
|
.await
|
||||||
|
.map_err(|e| anyhow::anyhow!("recv: {e}"))?;
|
||||||
|
|
||||||
|
if n == 0 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
buf.extend_from_slice(&tmp[..n]);
|
||||||
|
|
||||||
|
if buf.len() > max_size {
|
||||||
|
anyhow::bail!("payload too large");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut bytes = BytesMut::from(buf.as_slice());
|
||||||
|
|
||||||
|
let frame = match RequestFrame::decode(&mut bytes)
|
||||||
|
.map_err(|e| anyhow::anyhow!("decode: {e}"))?
|
||||||
|
{
|
||||||
|
Some(f) => f,
|
||||||
|
None => anyhow::bail!("incomplete request frame"),
|
||||||
|
};
|
||||||
|
|
||||||
|
let trace_id = format!(
|
||||||
|
"wt-{:016x}",
|
||||||
|
std::time::SystemTime::now()
|
||||||
|
.duration_since(std::time::UNIX_EPOCH)
|
||||||
|
.unwrap_or_default()
|
||||||
|
.as_nanos() as u64
|
||||||
|
);
|
||||||
|
|
||||||
|
let result = match registry.get(frame.method_id) {
|
||||||
|
Some((handler, name, timeout)) => {
|
||||||
|
let span = tracing::info_span!(
|
||||||
|
"wt_rpc",
|
||||||
|
trace_id = %trace_id,
|
||||||
|
method_id = frame.method_id,
|
||||||
|
method = name,
|
||||||
|
req_id = frame.request_id,
|
||||||
|
);
|
||||||
|
let _guard = span.enter();
|
||||||
|
debug!("dispatching");
|
||||||
|
|
||||||
|
let deadline = timeout.map(|d| tokio::time::Instant::now() + d);
|
||||||
|
let start = std::time::Instant::now();
|
||||||
|
|
||||||
|
let ctx = RequestContext {
|
||||||
|
identity_key: conn_state.identity_key.clone(),
|
||||||
|
session_token: conn_state.session_token.clone(),
|
||||||
|
payload: frame.payload,
|
||||||
|
trace_id: trace_id.clone(),
|
||||||
|
deadline,
|
||||||
|
};
|
||||||
|
|
||||||
|
let result = if let Some(dur) = timeout {
|
||||||
|
match tokio::time::timeout(dur, handler(Arc::clone(&state), ctx)).await {
|
||||||
|
Ok(r) => r,
|
||||||
|
Err(_) => {
|
||||||
|
warn!(method = name, "WebTransport: request deadline exceeded");
|
||||||
|
HandlerResult::err(RpcStatus::DeadlineExceeded, "request deadline exceeded")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
handler(Arc::clone(&state), ctx).await
|
||||||
|
};
|
||||||
|
|
||||||
|
let elapsed = start.elapsed();
|
||||||
|
metrics::histogram!("webtransport_request_duration_seconds", "method" => name)
|
||||||
|
.record(elapsed.as_secs_f64());
|
||||||
|
metrics::counter!("webtransport_requests_total", "method" => name).increment(1);
|
||||||
|
|
||||||
|
result
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
warn!(method_id = frame.method_id, "WebTransport: unknown method");
|
||||||
|
HandlerResult::err(RpcStatus::UnknownMethod, "unknown method")
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let response = ResponseFrame {
|
||||||
|
status: result.status as u8,
|
||||||
|
request_id: frame.request_id,
|
||||||
|
payload: result.payload,
|
||||||
|
};
|
||||||
|
|
||||||
|
let encoded = response.encode();
|
||||||
|
AsyncWriteExt::write_all(&mut stream, &encoded)
|
||||||
|
.await
|
||||||
|
.map_err(|e| anyhow::anyhow!("send response: {e}"))?;
|
||||||
|
AsyncWriteExt::shutdown(&mut stream)
|
||||||
|
.await
|
||||||
|
.map_err(|e| anyhow::anyhow!("shutdown: {e}"))?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Build a quinn `ServerConfig` for the WebTransport endpoint.
|
||||||
|
///
|
||||||
|
/// Uses the same TLS cert/key as the main server but with "h3" ALPN.
|
||||||
|
pub fn build_webtransport_server_config(
|
||||||
|
cert_path: &std::path::Path,
|
||||||
|
key_path: &std::path::Path,
|
||||||
|
) -> anyhow::Result<quinn::ServerConfig> {
|
||||||
|
use anyhow::Context;
|
||||||
|
use rustls::pki_types::{CertificateDer, PrivateKeyDer};
|
||||||
|
use rustls::version::TLS13;
|
||||||
|
|
||||||
|
let cert_bytes = std::fs::read(cert_path).context("read WebTransport cert")?;
|
||||||
|
let key_bytes = std::fs::read(key_path).context("read WebTransport key")?;
|
||||||
|
|
||||||
|
let cert_chain = vec![CertificateDer::from(cert_bytes)];
|
||||||
|
let key = PrivateKeyDer::try_from(key_bytes)
|
||||||
|
.map_err(|_| anyhow::anyhow!("invalid WebTransport private key"))?;
|
||||||
|
|
||||||
|
let mut tls = rustls::ServerConfig::builder_with_protocol_versions(&[&TLS13])
|
||||||
|
.with_no_client_auth()
|
||||||
|
.with_single_cert(cert_chain, key)?;
|
||||||
|
tls.alpn_protocols = vec![b"h3".to_vec()];
|
||||||
|
|
||||||
|
let crypto = quinn_proto::crypto::rustls::QuicServerConfig::try_from(tls)
|
||||||
|
.map_err(|e| anyhow::anyhow!("invalid WebTransport TLS config: {e}"))?;
|
||||||
|
|
||||||
|
let mut transport = quinn::TransportConfig::default();
|
||||||
|
transport.max_idle_timeout(Some(
|
||||||
|
std::time::Duration::from_secs(300)
|
||||||
|
.try_into()
|
||||||
|
.map_err(|e| anyhow::anyhow!("idle timeout: {e}"))?,
|
||||||
|
));
|
||||||
|
// WebTransport sessions may have multiple simultaneous streams.
|
||||||
|
transport.max_concurrent_bidi_streams(64u32.into());
|
||||||
|
transport.max_concurrent_uni_streams(16u32.into());
|
||||||
|
|
||||||
|
let mut server_config = quinn::ServerConfig::with_crypto(Arc::new(crypto));
|
||||||
|
server_config.transport_config(Arc::new(transport));
|
||||||
|
|
||||||
|
Ok(server_config)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
#[test]
|
||||||
|
fn webtransport_module_compiles() {
|
||||||
|
assert!(true);
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user