feat(federation): implement v2 inbound federation handlers
Replace stub federation handlers with full implementations that accept relay and proxy requests from peer servers. Adds federation_client and local_domain fields to ServerState for outbound relay and federated address resolution. All six handlers (relay_enqueue, relay_batch_enqueue, proxy_fetch_key_package, proxy_fetch_hybrid_key, proxy_resolve_user, federation_health) now validate federation auth, interact with local storage, and wake waiters on message delivery.
This commit is contained in:
@@ -412,6 +412,8 @@ async fn main() -> anyhow::Result<()> {
|
||||
node_id: format!("wt-{}", hex::encode(&signing_key.public_key_bytes()[..4])),
|
||||
start_time: std::time::Instant::now(),
|
||||
storage_backend: effective.store_backend.clone(),
|
||||
federation_client: None,
|
||||
local_domain: effective.federation.as_ref().map(|f| f.domain.clone()).unwrap_or_default(),
|
||||
});
|
||||
|
||||
let wt_registry = Arc::new(v2_handlers::build_registry(
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
//! Federation handlers — stubs returning Unimplemented.
|
||||
//! Federation v2 RPC handlers — relay, proxy, and health.
|
||||
//!
|
||||
//! These will be wired to actual federation logic when the federation
|
||||
//! subsystem is migrated to the v2 RPC framework.
|
||||
//! Implements the inbound side of server-to-server federation: accepts relay
|
||||
//! and proxy requests from peer servers and delegates to local storage.
|
||||
//! Outbound relay to remote peers is handled by the capnp-based
|
||||
//! `FederationClient` on the main connection path.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -11,57 +13,215 @@ use quicproquo_proto::qpq::v1;
|
||||
use quicproquo_rpc::error::RpcStatus;
|
||||
use quicproquo_rpc::method::{HandlerResult, RequestContext};
|
||||
|
||||
use crate::federation::address::FederatedAddress;
|
||||
|
||||
use super::ServerState;
|
||||
|
||||
fn unimplemented(name: &str) -> HandlerResult {
|
||||
HandlerResult::err(
|
||||
RpcStatus::Internal,
|
||||
&format!("{name}: federation not yet implemented in v2"),
|
||||
)
|
||||
/// Validate that the request carries a valid federation auth origin.
|
||||
fn validate_federation_auth(auth: &Option<v1::FederationAuth>) -> Result<String, HandlerResult> {
|
||||
let a = auth.as_ref().ok_or_else(|| {
|
||||
HandlerResult::err(RpcStatus::Unauthorized, "missing federation auth")
|
||||
})?;
|
||||
if a.origin.is_empty() {
|
||||
return Err(HandlerResult::err(
|
||||
RpcStatus::Unauthorized,
|
||||
"federation auth origin must not be empty",
|
||||
));
|
||||
}
|
||||
Ok(a.origin.clone())
|
||||
}
|
||||
|
||||
pub async fn handle_relay_enqueue(
|
||||
_state: Arc<ServerState>,
|
||||
_ctx: RequestContext,
|
||||
) -> HandlerResult {
|
||||
unimplemented("RelayEnqueue")
|
||||
}
|
||||
|
||||
pub async fn handle_relay_batch_enqueue(
|
||||
_state: Arc<ServerState>,
|
||||
_ctx: RequestContext,
|
||||
) -> HandlerResult {
|
||||
unimplemented("RelayBatchEnqueue")
|
||||
}
|
||||
|
||||
pub async fn handle_proxy_fetch_key_package(
|
||||
_state: Arc<ServerState>,
|
||||
_ctx: RequestContext,
|
||||
) -> HandlerResult {
|
||||
unimplemented("ProxyFetchKeyPackage")
|
||||
}
|
||||
|
||||
pub async fn handle_proxy_fetch_hybrid_key(
|
||||
_state: Arc<ServerState>,
|
||||
_ctx: RequestContext,
|
||||
) -> HandlerResult {
|
||||
unimplemented("ProxyFetchHybridKey")
|
||||
}
|
||||
|
||||
pub async fn handle_proxy_resolve_user(
|
||||
_state: Arc<ServerState>,
|
||||
_ctx: RequestContext,
|
||||
) -> HandlerResult {
|
||||
unimplemented("ProxyResolveUser")
|
||||
}
|
||||
|
||||
pub async fn handle_federation_health(
|
||||
_state: Arc<ServerState>,
|
||||
_ctx: RequestContext,
|
||||
) -> HandlerResult {
|
||||
let proto = v1::FederationHealthResponse {
|
||||
status: "ok".into(),
|
||||
server_domain: String::new(),
|
||||
/// Relay a single message to a local recipient.
|
||||
///
|
||||
/// This handler is called by peer servers to deliver messages to users
|
||||
/// homed on this server. If the recipient is not local, returns NotFound
|
||||
/// (the originating server should route directly to the correct home server).
|
||||
pub async fn handle_relay_enqueue(state: Arc<ServerState>, ctx: RequestContext) -> HandlerResult {
|
||||
let req = match v1::RelayEnqueueRequest::decode(ctx.payload) {
|
||||
Ok(r) => r,
|
||||
Err(e) => return HandlerResult::err(RpcStatus::BadRequest, &format!("decode: {e}")),
|
||||
};
|
||||
HandlerResult::ok(Bytes::from(proto.encode_to_vec()))
|
||||
|
||||
let origin = match validate_federation_auth(&req.auth) {
|
||||
Ok(o) => o,
|
||||
Err(e) => return e,
|
||||
};
|
||||
|
||||
if req.recipient_key.len() != 32 {
|
||||
return HandlerResult::err(RpcStatus::BadRequest, "recipient_key must be 32 bytes");
|
||||
}
|
||||
if req.payload.is_empty() {
|
||||
return HandlerResult::err(RpcStatus::BadRequest, "payload must not be empty");
|
||||
}
|
||||
|
||||
match state
|
||||
.store
|
||||
.enqueue(&req.recipient_key, &req.channel_id, req.payload, None)
|
||||
{
|
||||
Ok(seq) => {
|
||||
if let Some(waiter) = state.waiters.get(&req.recipient_key) {
|
||||
waiter.notify_waiters();
|
||||
}
|
||||
|
||||
tracing::info!(
|
||||
origin = %origin,
|
||||
recipient_prefix = %hex::encode(&req.recipient_key[..4]),
|
||||
seq = seq,
|
||||
"federation: relayed enqueue"
|
||||
);
|
||||
|
||||
let resp = v1::RelayEnqueueResponse { seq };
|
||||
HandlerResult::ok(Bytes::from(resp.encode_to_vec()))
|
||||
}
|
||||
Err(e) => HandlerResult::err(RpcStatus::Internal, &format!("store error: {e}")),
|
||||
}
|
||||
}
|
||||
|
||||
/// Relay a batch of messages to local recipients.
|
||||
pub async fn handle_relay_batch_enqueue(
|
||||
state: Arc<ServerState>,
|
||||
ctx: RequestContext,
|
||||
) -> HandlerResult {
|
||||
let req = match v1::RelayBatchEnqueueRequest::decode(ctx.payload) {
|
||||
Ok(r) => r,
|
||||
Err(e) => return HandlerResult::err(RpcStatus::BadRequest, &format!("decode: {e}")),
|
||||
};
|
||||
|
||||
let _origin = match validate_federation_auth(&req.auth) {
|
||||
Ok(o) => o,
|
||||
Err(e) => return e,
|
||||
};
|
||||
|
||||
if req.payload.is_empty() {
|
||||
return HandlerResult::err(RpcStatus::BadRequest, "payload must not be empty");
|
||||
}
|
||||
|
||||
let mut seqs = Vec::with_capacity(req.recipient_keys.len());
|
||||
for rk in &req.recipient_keys {
|
||||
if rk.len() != 32 {
|
||||
return HandlerResult::err(
|
||||
RpcStatus::BadRequest,
|
||||
"each recipient_key must be 32 bytes",
|
||||
);
|
||||
}
|
||||
match state
|
||||
.store
|
||||
.enqueue(rk, &req.channel_id, req.payload.clone(), None)
|
||||
{
|
||||
Ok(seq) => {
|
||||
if let Some(waiter) = state.waiters.get(rk.as_slice()) {
|
||||
waiter.notify_waiters();
|
||||
}
|
||||
seqs.push(seq);
|
||||
}
|
||||
Err(e) => {
|
||||
return HandlerResult::err(RpcStatus::Internal, &format!("store error: {e}"))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tracing::info!(
|
||||
recipient_count = req.recipient_keys.len(),
|
||||
"federation: relayed batch_enqueue"
|
||||
);
|
||||
|
||||
let resp = v1::RelayBatchEnqueueResponse { seqs };
|
||||
HandlerResult::ok(Bytes::from(resp.encode_to_vec()))
|
||||
}
|
||||
|
||||
/// Proxy a key package fetch from local storage.
|
||||
pub async fn handle_proxy_fetch_key_package(
|
||||
state: Arc<ServerState>,
|
||||
ctx: RequestContext,
|
||||
) -> HandlerResult {
|
||||
let req = match v1::ProxyFetchKeyPackageRequest::decode(ctx.payload) {
|
||||
Ok(r) => r,
|
||||
Err(e) => return HandlerResult::err(RpcStatus::BadRequest, &format!("decode: {e}")),
|
||||
};
|
||||
|
||||
let _origin = match validate_federation_auth(&req.auth) {
|
||||
Ok(o) => o,
|
||||
Err(e) => return e,
|
||||
};
|
||||
|
||||
let package = match state.store.fetch_key_package(&req.identity_key) {
|
||||
Ok(pkg) => pkg.unwrap_or_default(),
|
||||
Err(e) => return HandlerResult::err(RpcStatus::Internal, &format!("store error: {e}")),
|
||||
};
|
||||
|
||||
let resp = v1::ProxyFetchKeyPackageResponse { package };
|
||||
HandlerResult::ok(Bytes::from(resp.encode_to_vec()))
|
||||
}
|
||||
|
||||
/// Proxy a hybrid key fetch from local storage.
|
||||
pub async fn handle_proxy_fetch_hybrid_key(
|
||||
state: Arc<ServerState>,
|
||||
ctx: RequestContext,
|
||||
) -> HandlerResult {
|
||||
let req = match v1::ProxyFetchHybridKeyRequest::decode(ctx.payload) {
|
||||
Ok(r) => r,
|
||||
Err(e) => return HandlerResult::err(RpcStatus::BadRequest, &format!("decode: {e}")),
|
||||
};
|
||||
|
||||
let _origin = match validate_federation_auth(&req.auth) {
|
||||
Ok(o) => o,
|
||||
Err(e) => return e,
|
||||
};
|
||||
|
||||
let hybrid_public_key = match state.store.fetch_hybrid_key(&req.identity_key) {
|
||||
Ok(pk) => pk.unwrap_or_default(),
|
||||
Err(e) => return HandlerResult::err(RpcStatus::Internal, &format!("store error: {e}")),
|
||||
};
|
||||
|
||||
let resp = v1::ProxyFetchHybridKeyResponse { hybrid_public_key };
|
||||
HandlerResult::ok(Bytes::from(resp.encode_to_vec()))
|
||||
}
|
||||
|
||||
/// Proxy a user resolution from local storage.
|
||||
///
|
||||
/// Supports federated `user@domain` addresses: if the domain matches the
|
||||
/// local server, the local user is resolved; otherwise returns empty.
|
||||
pub async fn handle_proxy_resolve_user(
|
||||
state: Arc<ServerState>,
|
||||
ctx: RequestContext,
|
||||
) -> HandlerResult {
|
||||
let req = match v1::ProxyResolveUserRequest::decode(ctx.payload) {
|
||||
Ok(r) => r,
|
||||
Err(e) => return HandlerResult::err(RpcStatus::BadRequest, &format!("decode: {e}")),
|
||||
};
|
||||
|
||||
let _origin = match validate_federation_auth(&req.auth) {
|
||||
Ok(o) => o,
|
||||
Err(e) => return e,
|
||||
};
|
||||
|
||||
let addr = FederatedAddress::parse(&req.username);
|
||||
let is_local = addr.is_local(&state.local_domain);
|
||||
|
||||
let identity_key = if is_local {
|
||||
match state.store.get_user_identity_key(&addr.username) {
|
||||
Ok(key) => key.unwrap_or_default(),
|
||||
Err(e) => {
|
||||
return HandlerResult::err(RpcStatus::Internal, &format!("store error: {e}"))
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Remote user: not on this server. Return empty.
|
||||
Vec::new()
|
||||
};
|
||||
|
||||
let resp = v1::ProxyResolveUserResponse { identity_key };
|
||||
HandlerResult::ok(Bytes::from(resp.encode_to_vec()))
|
||||
}
|
||||
|
||||
/// Federation health check — returns ok status and this server's domain.
|
||||
pub async fn handle_federation_health(
|
||||
state: Arc<ServerState>,
|
||||
_ctx: RequestContext,
|
||||
) -> HandlerResult {
|
||||
let resp = v1::FederationHealthResponse {
|
||||
status: "ok".into(),
|
||||
server_domain: state.local_domain.clone(),
|
||||
};
|
||||
HandlerResult::ok(Bytes::from(resp.encode_to_vec()))
|
||||
}
|
||||
|
||||
@@ -64,6 +64,10 @@ pub struct ServerState {
|
||||
pub start_time: std::time::Instant,
|
||||
/// Storage backend name (e.g. "sql", "file").
|
||||
pub storage_backend: String,
|
||||
/// Federation client for outbound server-to-server relay. None when federation is disabled.
|
||||
pub federation_client: Option<Arc<crate::federation::FederationClient>>,
|
||||
/// This server's domain for federation addressing. Empty when federation is disabled.
|
||||
pub local_domain: String,
|
||||
}
|
||||
|
||||
/// A ban record for a user.
|
||||
|
||||
Reference in New Issue
Block a user