//! Federation v2 RPC handlers — relay, proxy, and health. //! //! Implements the inbound side of server-to-server federation: accepts relay //! and proxy requests from peer servers and delegates to local storage. //! Outbound relay to remote peers is handled by the capnp-based //! `FederationClient` on the main connection path. use std::sync::Arc; use bytes::Bytes; use prost::Message; use quicprochat_proto::qpc::v1; use quicprochat_rpc::error::RpcStatus; use quicprochat_rpc::method::{HandlerResult, RequestContext}; use crate::federation::address::FederatedAddress; use super::ServerState; /// Validate that the request carries a valid federation auth origin. fn validate_federation_auth(auth: &Option) -> Result { let a = auth.as_ref().ok_or_else(|| { HandlerResult::err(RpcStatus::Unauthorized, "missing federation auth") })?; if a.origin.is_empty() { return Err(HandlerResult::err( RpcStatus::Unauthorized, "federation auth origin must not be empty", )); } Ok(a.origin.clone()) } /// Relay a single message to a local recipient. /// /// This handler is called by peer servers to deliver messages to users /// homed on this server. If the recipient is not local, returns NotFound /// (the originating server should route directly to the correct home server). pub async fn handle_relay_enqueue(state: Arc, ctx: RequestContext) -> HandlerResult { let req = match v1::RelayEnqueueRequest::decode(ctx.payload) { Ok(r) => r, Err(e) => return HandlerResult::err(RpcStatus::BadRequest, &format!("decode: {e}")), }; let origin = match validate_federation_auth(&req.auth) { Ok(o) => o, Err(e) => return e, }; if req.recipient_key.len() != 32 { return HandlerResult::err(RpcStatus::BadRequest, "recipient_key must be 32 bytes"); } if req.payload.is_empty() { return HandlerResult::err(RpcStatus::BadRequest, "payload must not be empty"); } match state .store .enqueue(&req.recipient_key, &req.channel_id, req.payload, None) { Ok(seq) => { if let Some(waiter) = state.waiters.get(&req.recipient_key) { waiter.notify_waiters(); } tracing::info!( origin = %origin, recipient_prefix = %hex::encode(&req.recipient_key[..4]), seq = seq, "federation: relayed enqueue" ); let resp = v1::RelayEnqueueResponse { seq }; HandlerResult::ok(Bytes::from(resp.encode_to_vec())) } Err(e) => HandlerResult::err(RpcStatus::Internal, &format!("store error: {e}")), } } /// Relay a batch of messages to local recipients. pub async fn handle_relay_batch_enqueue( state: Arc, ctx: RequestContext, ) -> HandlerResult { let req = match v1::RelayBatchEnqueueRequest::decode(ctx.payload) { Ok(r) => r, Err(e) => return HandlerResult::err(RpcStatus::BadRequest, &format!("decode: {e}")), }; let _origin = match validate_federation_auth(&req.auth) { Ok(o) => o, Err(e) => return e, }; if req.payload.is_empty() { return HandlerResult::err(RpcStatus::BadRequest, "payload must not be empty"); } let mut seqs = Vec::with_capacity(req.recipient_keys.len()); for rk in &req.recipient_keys { if rk.len() != 32 { return HandlerResult::err( RpcStatus::BadRequest, "each recipient_key must be 32 bytes", ); } match state .store .enqueue(rk, &req.channel_id, req.payload.clone(), None) { Ok(seq) => { if let Some(waiter) = state.waiters.get(rk.as_slice()) { waiter.notify_waiters(); } seqs.push(seq); } Err(e) => { return HandlerResult::err(RpcStatus::Internal, &format!("store error: {e}")) } } } tracing::info!( recipient_count = req.recipient_keys.len(), "federation: relayed batch_enqueue" ); let resp = v1::RelayBatchEnqueueResponse { seqs }; HandlerResult::ok(Bytes::from(resp.encode_to_vec())) } /// Proxy a key package fetch from local storage. pub async fn handle_proxy_fetch_key_package( state: Arc, ctx: RequestContext, ) -> HandlerResult { let req = match v1::ProxyFetchKeyPackageRequest::decode(ctx.payload) { Ok(r) => r, Err(e) => return HandlerResult::err(RpcStatus::BadRequest, &format!("decode: {e}")), }; let _origin = match validate_federation_auth(&req.auth) { Ok(o) => o, Err(e) => return e, }; let package = match state.store.fetch_key_package(&req.identity_key) { Ok(pkg) => pkg.unwrap_or_default(), Err(e) => return HandlerResult::err(RpcStatus::Internal, &format!("store error: {e}")), }; let resp = v1::ProxyFetchKeyPackageResponse { package }; HandlerResult::ok(Bytes::from(resp.encode_to_vec())) } /// Proxy a hybrid key fetch from local storage. pub async fn handle_proxy_fetch_hybrid_key( state: Arc, ctx: RequestContext, ) -> HandlerResult { let req = match v1::ProxyFetchHybridKeyRequest::decode(ctx.payload) { Ok(r) => r, Err(e) => return HandlerResult::err(RpcStatus::BadRequest, &format!("decode: {e}")), }; let _origin = match validate_federation_auth(&req.auth) { Ok(o) => o, Err(e) => return e, }; let hybrid_public_key = match state.store.fetch_hybrid_key(&req.identity_key) { Ok(pk) => pk.unwrap_or_default(), Err(e) => return HandlerResult::err(RpcStatus::Internal, &format!("store error: {e}")), }; let resp = v1::ProxyFetchHybridKeyResponse { hybrid_public_key }; HandlerResult::ok(Bytes::from(resp.encode_to_vec())) } /// Proxy a user resolution from local storage. /// /// Supports federated `user@domain` addresses: if the domain matches the /// local server, the local user is resolved; otherwise returns empty. pub async fn handle_proxy_resolve_user( state: Arc, ctx: RequestContext, ) -> HandlerResult { let req = match v1::ProxyResolveUserRequest::decode(ctx.payload) { Ok(r) => r, Err(e) => return HandlerResult::err(RpcStatus::BadRequest, &format!("decode: {e}")), }; let _origin = match validate_federation_auth(&req.auth) { Ok(o) => o, Err(e) => return e, }; let addr = FederatedAddress::parse(&req.username); let is_local = addr.is_local(&state.local_domain); let identity_key = if is_local { match state.store.get_user_identity_key(&addr.username) { Ok(key) => key.unwrap_or_default(), Err(e) => { return HandlerResult::err(RpcStatus::Internal, &format!("store error: {e}")) } } } else { // Remote user: not on this server. Return empty. Vec::new() }; let resp = v1::ProxyResolveUserResponse { identity_key }; HandlerResult::ok(Bytes::from(resp.encode_to_vec())) } /// Federation health check — returns ok status and this server's domain. pub async fn handle_federation_health( state: Arc, _ctx: RequestContext, ) -> HandlerResult { let resp = v1::FederationHealthResponse { status: "ok".into(), server_domain: state.local_domain.clone(), }; HandlerResult::ok(Bytes::from(resp.encode_to_vec())) }