feat(server): v2 RPC handler dispatch for all 33 methods

Add v2_handlers module with ServerState, build_registry(), require_auth()
helper, and 33 protobuf handlers across 10 files:
- auth: 4 OPAQUE handlers (register start/finish, login start/finish)
- delivery: 6 handlers (enqueue, fetch, fetch_wait, peek, ack, batch)
- keys: 5 handlers (upload/fetch key package, upload/fetch hybrid key/keys)
- channel: create_channel
- user: resolve_user, resolve_identity
- blob: upload_blob, download_blob
- device: register, list, revoke
- p2p: publish_endpoint, resolve_endpoint, health
- federation: 6 stubs (Unimplemented)
- account: delete_account

All handlers decode protobuf, call domain services, encode response.
Auth handlers use full OPAQUE flow with session creation.
Delivery handlers include rate limiting and long-poll (fetch_wait).
This commit is contained in:
2026-03-04 12:10:33 +01:00
parent 6273ab668d
commit d118fdbddf
13 changed files with 1695 additions and 0 deletions

View File

@@ -14,6 +14,7 @@ quicproquo-core = { path = "../quicproquo-core" }
quicproquo-proto = { path = "../quicproquo-proto" }
quicproquo-plugin-api = { path = "../quicproquo-plugin-api" }
quicproquo-kt = { path = "../quicproquo-kt" }
quicproquo-rpc = { path = "../quicproquo-rpc" }
# Dynamic plugin loading
libloading = "0.8"
@@ -21,6 +22,8 @@ libloading = "0.8"
# Serialisation + RPC
capnp = { workspace = true }
capnp-rpc = { workspace = true }
prost = { workspace = true }
bytes = { workspace = true }
# Async
tokio = { workspace = true }

View File

@@ -28,6 +28,7 @@ mod plugin_loader;
mod sql_store;
mod tls;
mod storage;
pub mod v2_handlers;
mod ws_bridge;
use auth::{AuthConfig, PendingLogin, RateEntry, SessionInfo};

View File

@@ -0,0 +1,51 @@
//! Account handler — account deletion.
use std::sync::Arc;
use bytes::Bytes;
use prost::Message;
use quicproquo_proto::qpq::v1;
use quicproquo_rpc::method::{HandlerResult, RequestContext};
use crate::domain::account::AccountService;
use super::{domain_err, require_auth, ServerState};
pub async fn handle_delete_account(
state: Arc<ServerState>,
ctx: RequestContext,
) -> HandlerResult {
let identity_key = match require_auth(&state, &ctx) {
Ok(ik) => ik,
Err(e) => return e,
};
// DeleteAccountRequest is empty but decode for protocol correctness.
let _req = match v1::DeleteAccountRequest::decode(ctx.payload) {
Ok(r) => r,
Err(e) => {
return HandlerResult::err(
quicproquo_rpc::error::RpcStatus::BadRequest,
&format!("decode: {e}"),
)
}
};
let svc = AccountService {
store: Arc::clone(&state.store),
kt_log: Arc::clone(&state.kt_log),
};
match svc.delete_account(&identity_key) {
Ok(()) => {
// Remove session for the deleted account.
if let Some(token) = ctx.session_token.as_deref() {
state.sessions.remove(token);
}
let proto = v1::DeleteAccountResponse { success: true };
HandlerResult::ok(Bytes::from(proto.encode_to_vec()))
}
Err(e) => domain_err(e),
}
}

View File

@@ -0,0 +1,255 @@
//! OPAQUE auth handlers — registration and login.
use std::sync::Arc;
use bytes::Bytes;
use prost::Message;
use quicproquo_proto::qpq::v1;
use quicproquo_rpc::error::RpcStatus;
use quicproquo_rpc::method::{HandlerResult, RequestContext};
use crate::auth::{PendingLogin, SessionInfo, SESSION_TTL_SECS};
use crate::domain::auth::AuthService;
use crate::domain::types::{RegisterFinishReq, RegisterStartReq};
use super::ServerState;
pub async fn handle_opaque_register_start(
state: Arc<ServerState>,
ctx: RequestContext,
) -> HandlerResult {
let req = match v1::OpaqueRegisterStartRequest::decode(ctx.payload) {
Ok(r) => r,
Err(e) => return HandlerResult::err(RpcStatus::BadRequest, &format!("decode: {e}")),
};
if req.username.is_empty() {
return HandlerResult::err(RpcStatus::BadRequest, "username must not be empty");
}
let svc = AuthService {
store: Arc::clone(&state.store),
opaque_setup: Arc::clone(&state.opaque_setup),
pending_logins: Arc::clone(&state.pending_logins),
sessions: Arc::clone(&state.sessions),
auth_cfg: Arc::clone(&state.auth_cfg),
};
let domain_req = RegisterStartReq {
username: req.username,
request_bytes: req.request,
};
match svc.register_start(domain_req) {
Ok(resp) => {
let proto = v1::OpaqueRegisterStartResponse {
response: resp.response_bytes,
};
HandlerResult::ok(Bytes::from(proto.encode_to_vec()))
}
Err(e) => HandlerResult::err(RpcStatus::Internal, &format!("register_start: {e}")),
}
}
pub async fn handle_opaque_register_finish(
state: Arc<ServerState>,
ctx: RequestContext,
) -> HandlerResult {
let req = match v1::OpaqueRegisterFinishRequest::decode(ctx.payload) {
Ok(r) => r,
Err(e) => return HandlerResult::err(RpcStatus::BadRequest, &format!("decode: {e}")),
};
if req.username.is_empty() {
return HandlerResult::err(RpcStatus::BadRequest, "username must not be empty");
}
let svc = AuthService {
store: Arc::clone(&state.store),
opaque_setup: Arc::clone(&state.opaque_setup),
pending_logins: Arc::clone(&state.pending_logins),
sessions: Arc::clone(&state.sessions),
auth_cfg: Arc::clone(&state.auth_cfg),
};
let domain_req = RegisterFinishReq {
username: req.username.clone(),
upload_bytes: req.upload,
identity_key: req.identity_key.clone(),
};
match svc.register_finish(domain_req) {
Ok(resp) => {
state
.hooks
.on_user_registered(&req.username, &req.identity_key);
let proto = v1::OpaqueRegisterFinishResponse {
success: resp.success,
};
HandlerResult::ok(Bytes::from(proto.encode_to_vec()))
}
Err(e) => HandlerResult::err(RpcStatus::Internal, &format!("register_finish: {e}")),
}
}
pub async fn handle_opaque_login_start(
state: Arc<ServerState>,
ctx: RequestContext,
) -> HandlerResult {
let req = match v1::OpaqueLoginStartRequest::decode(ctx.payload) {
Ok(r) => r,
Err(e) => return HandlerResult::err(RpcStatus::BadRequest, &format!("decode: {e}")),
};
if req.username.is_empty() {
return HandlerResult::err(RpcStatus::BadRequest, "username must not be empty");
}
// Look up user record.
let user_record = match state.store.get_user_record(&req.username) {
Ok(Some(r)) => r,
Ok(None) => {
return HandlerResult::err(RpcStatus::NotFound, "user not found");
}
Err(e) => return HandlerResult::err(RpcStatus::Internal, &format!("store: {e}")),
};
// Deserialise stored registration.
let registration =
match opaque_ke::ServerRegistration::<quicproquo_core::opaque_auth::OpaqueSuite>::deserialize(&user_record) {
Ok(r) => r,
Err(e) => {
return HandlerResult::err(
RpcStatus::Internal,
&format!("corrupt user record: {e}"),
)
}
};
// Start login.
let credential_request =
match opaque_ke::CredentialRequest::<quicproquo_core::opaque_auth::OpaqueSuite>::deserialize(&req.request)
{
Ok(r) => r,
Err(e) => {
return HandlerResult::err(RpcStatus::BadRequest, &format!("bad login request: {e}"))
}
};
let login_start = match opaque_ke::ServerLogin::<
quicproquo_core::opaque_auth::OpaqueSuite,
>::start(
&mut rand::rngs::OsRng,
&state.opaque_setup,
Some(registration),
credential_request,
req.username.as_bytes(),
Default::default(),
) {
Ok(r) => r,
Err(e) => {
return HandlerResult::err(RpcStatus::Internal, &format!("login start: {e}"));
}
};
let response_bytes = login_start.message.serialize().to_vec();
// Store pending login state.
let now = crate::auth::current_timestamp();
state.pending_logins.insert(
req.username.clone(),
PendingLogin {
state_bytes: login_start.state.serialize().to_vec(),
created_at: now,
},
);
let proto = v1::OpaqueLoginStartResponse {
response: response_bytes,
};
HandlerResult::ok(Bytes::from(proto.encode_to_vec()))
}
pub async fn handle_opaque_login_finish(
state: Arc<ServerState>,
ctx: RequestContext,
) -> HandlerResult {
let req = match v1::OpaqueLoginFinishRequest::decode(ctx.payload) {
Ok(r) => r,
Err(e) => return HandlerResult::err(RpcStatus::BadRequest, &format!("decode: {e}")),
};
if req.username.is_empty() {
return HandlerResult::err(RpcStatus::BadRequest, "username must not be empty");
}
// Retrieve pending login state.
let pending = match state.pending_logins.remove(&req.username) {
Some((_, p)) => p,
None => {
return HandlerResult::err(
RpcStatus::BadRequest,
"no pending login for this username",
);
}
};
let login_state = match opaque_ke::ServerLogin::<
quicproquo_core::opaque_auth::OpaqueSuite,
>::deserialize(&pending.state_bytes)
{
Ok(s) => s,
Err(e) => {
return HandlerResult::err(
RpcStatus::Internal,
&format!("corrupt pending login: {e}"),
)
}
};
let finalization = match opaque_ke::CredentialFinalization::<
quicproquo_core::opaque_auth::OpaqueSuite,
>::deserialize(&req.finalization)
{
Ok(f) => f,
Err(e) => {
return HandlerResult::err(RpcStatus::BadRequest, &format!("bad finalization: {e}"));
}
};
if let Err(e) = login_state.finish(finalization, Default::default()) {
state.hooks.on_auth(&crate::hooks::AuthEvent {
username: req.username.clone(),
success: false,
failure_reason: format!("{e}"),
});
return HandlerResult::err(RpcStatus::Unauthorized, &format!("login failed: {e}"));
}
// Generate session token.
let mut token = vec![0u8; 32];
rand::RngCore::fill_bytes(&mut rand::rngs::OsRng, &mut token);
let now = crate::auth::current_timestamp();
state.sessions.insert(
token.clone(),
SessionInfo {
username: req.username.clone(),
identity_key: req.identity_key.clone(),
created_at: now,
expires_at: now + SESSION_TTL_SECS,
},
);
state.hooks.on_auth(&crate::hooks::AuthEvent {
username: req.username,
success: true,
failure_reason: String::new(),
});
let proto = v1::OpaqueLoginFinishResponse {
session_token: token,
};
HandlerResult::ok(Bytes::from(proto.encode_to_vec()))
}

View File

@@ -0,0 +1,101 @@
//! Blob handlers — chunked file upload/download.
use std::sync::Arc;
use bytes::Bytes;
use prost::Message;
use quicproquo_proto::qpq::v1;
use quicproquo_rpc::method::{HandlerResult, RequestContext};
use crate::domain::blobs::BlobService;
use crate::domain::types::{CallerAuth, DownloadBlobReq, UploadBlobReq};
use super::{domain_err, require_auth, ServerState};
fn caller_auth(identity_key: Vec<u8>) -> CallerAuth {
CallerAuth {
identity_key,
token: Vec::new(),
device_id: None,
}
}
pub async fn handle_upload_blob(state: Arc<ServerState>, ctx: RequestContext) -> HandlerResult {
let identity_key = match require_auth(&state, &ctx) {
Ok(ik) => ik,
Err(e) => return e,
};
let req = match v1::UploadBlobRequest::decode(ctx.payload) {
Ok(r) => r,
Err(e) => {
return HandlerResult::err(
quicproquo_rpc::error::RpcStatus::BadRequest,
&format!("decode: {e}"),
)
}
};
let svc = BlobService {
data_dir: state.data_dir.clone(),
};
let auth = caller_auth(identity_key);
let domain_req = UploadBlobReq {
blob_hash: req.blob_hash,
chunk: req.chunk,
offset: req.offset,
total_size: req.total_size,
mime_type: req.mime_type,
};
match svc.upload_blob(domain_req, &auth) {
Ok(resp) => {
let proto = v1::UploadBlobResponse {
blob_id: resp.blob_id,
};
HandlerResult::ok(Bytes::from(proto.encode_to_vec()))
}
Err(e) => domain_err(e),
}
}
pub async fn handle_download_blob(state: Arc<ServerState>, ctx: RequestContext) -> HandlerResult {
let identity_key = match require_auth(&state, &ctx) {
Ok(ik) => ik,
Err(e) => return e,
};
let req = match v1::DownloadBlobRequest::decode(ctx.payload) {
Ok(r) => r,
Err(e) => {
return HandlerResult::err(
quicproquo_rpc::error::RpcStatus::BadRequest,
&format!("decode: {e}"),
)
}
};
let svc = BlobService {
data_dir: state.data_dir.clone(),
};
let auth = caller_auth(identity_key);
let domain_req = DownloadBlobReq {
blob_id: req.blob_id,
offset: req.offset,
length: req.length,
};
match svc.download_blob(domain_req, &auth) {
Ok(resp) => {
let proto = v1::DownloadBlobResponse {
chunk: resp.chunk,
total_size: resp.total_size,
mime_type: resp.mime_type,
};
HandlerResult::ok(Bytes::from(proto.encode_to_vec()))
}
Err(e) => domain_err(e),
}
}

View File

@@ -0,0 +1,60 @@
//! Channel handler — 1:1 DM channel creation.
use std::sync::Arc;
use bytes::Bytes;
use prost::Message;
use quicproquo_proto::qpq::v1;
use quicproquo_rpc::method::{HandlerResult, RequestContext};
use crate::domain::channels::ChannelService;
use crate::domain::types::CreateChannelReq;
use crate::hooks::ChannelEvent;
use super::{domain_err, require_auth, ServerState};
pub async fn handle_create_channel(
state: Arc<ServerState>,
ctx: RequestContext,
) -> HandlerResult {
let identity_key = match require_auth(&state, &ctx) {
Ok(ik) => ik,
Err(e) => return e,
};
let req = match v1::CreateChannelRequest::decode(ctx.payload) {
Ok(r) => r,
Err(e) => {
return HandlerResult::err(
quicproquo_rpc::error::RpcStatus::BadRequest,
&format!("decode: {e}"),
)
}
};
let svc = ChannelService {
store: Arc::clone(&state.store),
};
let domain_req = CreateChannelReq {
peer_key: req.peer_key.clone(),
};
match svc.create_channel(domain_req, &identity_key) {
Ok(resp) => {
state.hooks.on_channel_created(&ChannelEvent {
channel_id: resp.channel_id.clone(),
initiator_key: identity_key,
peer_key: req.peer_key,
was_new: resp.was_new,
});
let proto = v1::CreateChannelResponse {
channel_id: resp.channel_id,
was_new: resp.was_new,
};
HandlerResult::ok(Bytes::from(proto.encode_to_vec()))
}
Err(e) => domain_err(e),
}
}

View File

@@ -0,0 +1,329 @@
//! Delivery handlers — enqueue, fetch, fetch_wait, peek, ack, batch_enqueue.
use std::sync::Arc;
use bytes::Bytes;
use prost::Message;
use quicproquo_proto::qpq::v1;
use quicproquo_rpc::error::RpcStatus;
use quicproquo_rpc::method::{HandlerResult, RequestContext};
use tokio::sync::Notify;
use crate::domain::delivery::DeliveryService;
use crate::domain::types::{AckReq, BatchEnqueueReq, EnqueueReq, FetchReq, PeekReq};
use crate::hooks::{HookAction, MessageEvent};
use super::{require_auth, ServerState};
pub async fn handle_enqueue(state: Arc<ServerState>, ctx: RequestContext) -> HandlerResult {
let identity_key = match require_auth(&state, &ctx) {
Ok(ik) => ik,
Err(e) => return e,
};
let req = match v1::EnqueueRequest::decode(ctx.payload) {
Ok(r) => r,
Err(e) => return HandlerResult::err(RpcStatus::BadRequest, &format!("decode: {e}")),
};
if req.recipient_key.is_empty() || req.payload.is_empty() {
return HandlerResult::err(RpcStatus::BadRequest, "recipient_key and payload required");
}
// Rate limiting.
if let Err(_e) = crate::auth::check_rate_limit(&state.rate_limits, &identity_key) {
return HandlerResult::err(RpcStatus::RateLimited, "rate limit exceeded");
}
let svc = DeliveryService {
store: Arc::clone(&state.store),
waiters: Arc::clone(&state.waiters),
};
let domain_req = EnqueueReq {
recipient_key: req.recipient_key.clone(),
payload: req.payload.clone(),
channel_id: req.channel_id.clone(),
ttl_secs: req.ttl_secs,
};
match svc.enqueue(domain_req) {
Ok(resp) => {
// Fire hook.
let action = state.hooks.on_message_enqueue(&MessageEvent {
sender_identity: Some(identity_key),
recipient_key: req.recipient_key,
channel_id: req.channel_id,
payload_len: req.payload.len(),
seq: resp.seq,
});
if let HookAction::Reject(reason) = action {
return HandlerResult::err(RpcStatus::Forbidden, &reason);
}
let proto = v1::EnqueueResponse {
seq: resp.seq,
delivery_proof: resp.delivery_proof,
};
HandlerResult::ok(Bytes::from(proto.encode_to_vec()))
}
Err(e) => HandlerResult::err(RpcStatus::Internal, &format!("enqueue: {e}")),
}
}
pub async fn handle_fetch(state: Arc<ServerState>, ctx: RequestContext) -> HandlerResult {
let identity_key = match require_auth(&state, &ctx) {
Ok(ik) => ik,
Err(e) => return e,
};
let req = match v1::FetchRequest::decode(ctx.payload) {
Ok(r) => r,
Err(e) => return HandlerResult::err(RpcStatus::BadRequest, &format!("decode: {e}")),
};
let svc = DeliveryService {
store: Arc::clone(&state.store),
waiters: Arc::clone(&state.waiters),
};
let domain_req = FetchReq {
recipient_key: if req.recipient_key.is_empty() {
identity_key
} else {
req.recipient_key
},
channel_id: req.channel_id,
limit: req.limit,
};
match svc.fetch(domain_req) {
Ok(resp) => {
let proto = v1::FetchResponse {
payloads: resp
.payloads
.into_iter()
.map(|e| v1::Envelope {
seq: e.seq,
data: e.data,
})
.collect(),
};
HandlerResult::ok(Bytes::from(proto.encode_to_vec()))
}
Err(e) => HandlerResult::err(RpcStatus::Internal, &format!("fetch: {e}")),
}
}
pub async fn handle_fetch_wait(state: Arc<ServerState>, ctx: RequestContext) -> HandlerResult {
let identity_key = match require_auth(&state, &ctx) {
Ok(ik) => ik,
Err(e) => return e,
};
let req = match v1::FetchWaitRequest::decode(ctx.payload) {
Ok(r) => r,
Err(e) => return HandlerResult::err(RpcStatus::BadRequest, &format!("decode: {e}")),
};
let recipient_key = if req.recipient_key.is_empty() {
identity_key
} else {
req.recipient_key
};
let timeout_ms = if req.timeout_ms == 0 {
30_000
} else {
req.timeout_ms.min(60_000)
};
let svc = DeliveryService {
store: Arc::clone(&state.store),
waiters: Arc::clone(&state.waiters),
};
// Try immediate fetch first.
let fetch_req = FetchReq {
recipient_key: recipient_key.clone(),
channel_id: req.channel_id.clone(),
limit: req.limit,
};
match svc.fetch(fetch_req) {
Ok(resp) if !resp.payloads.is_empty() => {
let proto = v1::FetchWaitResponse {
payloads: resp
.payloads
.into_iter()
.map(|e| v1::Envelope {
seq: e.seq,
data: e.data,
})
.collect(),
};
return HandlerResult::ok(Bytes::from(proto.encode_to_vec()));
}
Err(e) => {
return HandlerResult::err(RpcStatus::Internal, &format!("fetch: {e}"));
}
_ => {}
}
// Long-poll: wait for notification or timeout.
let notify = state
.waiters
.entry(recipient_key.clone())
.or_insert_with(|| Arc::new(Notify::new()))
.clone();
let timeout = tokio::time::Duration::from_millis(timeout_ms);
let _ = tokio::time::timeout(timeout, notify.notified()).await;
// Re-fetch after wake or timeout.
let fetch_req = FetchReq {
recipient_key,
channel_id: req.channel_id,
limit: req.limit,
};
match svc.fetch(fetch_req) {
Ok(resp) => {
let proto = v1::FetchWaitResponse {
payloads: resp
.payloads
.into_iter()
.map(|e| v1::Envelope {
seq: e.seq,
data: e.data,
})
.collect(),
};
HandlerResult::ok(Bytes::from(proto.encode_to_vec()))
}
Err(e) => HandlerResult::err(RpcStatus::Internal, &format!("fetch: {e}")),
}
}
pub async fn handle_peek(state: Arc<ServerState>, ctx: RequestContext) -> HandlerResult {
let identity_key = match require_auth(&state, &ctx) {
Ok(ik) => ik,
Err(e) => return e,
};
let req = match v1::PeekRequest::decode(ctx.payload) {
Ok(r) => r,
Err(e) => return HandlerResult::err(RpcStatus::BadRequest, &format!("decode: {e}")),
};
let svc = DeliveryService {
store: Arc::clone(&state.store),
waiters: Arc::clone(&state.waiters),
};
let domain_req = PeekReq {
recipient_key: if req.recipient_key.is_empty() {
identity_key
} else {
req.recipient_key
},
channel_id: req.channel_id,
limit: req.limit,
};
match svc.peek(domain_req) {
Ok(resp) => {
let proto = v1::PeekResponse {
payloads: resp
.payloads
.into_iter()
.map(|e| v1::Envelope {
seq: e.seq,
data: e.data,
})
.collect(),
};
HandlerResult::ok(Bytes::from(proto.encode_to_vec()))
}
Err(e) => HandlerResult::err(RpcStatus::Internal, &format!("peek: {e}")),
}
}
pub async fn handle_ack(state: Arc<ServerState>, ctx: RequestContext) -> HandlerResult {
let identity_key = match require_auth(&state, &ctx) {
Ok(ik) => ik,
Err(e) => return e,
};
let req = match v1::AckRequest::decode(ctx.payload) {
Ok(r) => r,
Err(e) => return HandlerResult::err(RpcStatus::BadRequest, &format!("decode: {e}")),
};
let svc = DeliveryService {
store: Arc::clone(&state.store),
waiters: Arc::clone(&state.waiters),
};
let domain_req = AckReq {
recipient_key: if req.recipient_key.is_empty() {
identity_key
} else {
req.recipient_key
},
channel_id: req.channel_id,
seq_up_to: req.seq_up_to,
};
match svc.ack(domain_req) {
Ok(()) => {
let proto = v1::AckResponse {};
HandlerResult::ok(Bytes::from(proto.encode_to_vec()))
}
Err(e) => HandlerResult::err(RpcStatus::Internal, &format!("ack: {e}")),
}
}
pub async fn handle_batch_enqueue(state: Arc<ServerState>, ctx: RequestContext) -> HandlerResult {
let identity_key = match require_auth(&state, &ctx) {
Ok(ik) => ik,
Err(e) => return e,
};
let req = match v1::BatchEnqueueRequest::decode(ctx.payload) {
Ok(r) => r,
Err(e) => return HandlerResult::err(RpcStatus::BadRequest, &format!("decode: {e}")),
};
if req.recipient_keys.is_empty() || req.payload.is_empty() {
return HandlerResult::err(
RpcStatus::BadRequest,
"recipient_keys and payload required",
);
}
// Rate limiting.
if let Err(_e) = crate::auth::check_rate_limit(&state.rate_limits, &identity_key) {
return HandlerResult::err(RpcStatus::RateLimited, "rate limit exceeded");
}
let svc = DeliveryService {
store: Arc::clone(&state.store),
waiters: Arc::clone(&state.waiters),
};
let domain_req = BatchEnqueueReq {
recipient_keys: req.recipient_keys,
payload: req.payload,
channel_id: req.channel_id,
ttl_secs: req.ttl_secs,
};
match svc.batch_enqueue(domain_req) {
Ok(resp) => {
let proto = v1::BatchEnqueueResponse { seqs: resp.seqs };
HandlerResult::ok(Bytes::from(proto.encode_to_vec()))
}
Err(e) => HandlerResult::err(RpcStatus::Internal, &format!("batch_enqueue: {e}")),
}
}

View File

@@ -0,0 +1,127 @@
//! Device handlers — register, list, revoke devices.
use std::sync::Arc;
use bytes::Bytes;
use prost::Message;
use quicproquo_proto::qpq::v1;
use quicproquo_rpc::method::{HandlerResult, RequestContext};
use crate::domain::devices::DeviceService;
use crate::domain::types::{RegisterDeviceReq, RevokeDeviceReq};
use super::{domain_err, require_auth, ServerState};
pub async fn handle_register_device(
state: Arc<ServerState>,
ctx: RequestContext,
) -> HandlerResult {
let identity_key = match require_auth(&state, &ctx) {
Ok(ik) => ik,
Err(e) => return e,
};
let req = match v1::RegisterDeviceRequest::decode(ctx.payload) {
Ok(r) => r,
Err(e) => {
return HandlerResult::err(
quicproquo_rpc::error::RpcStatus::BadRequest,
&format!("decode: {e}"),
)
}
};
let svc = DeviceService {
store: Arc::clone(&state.store),
};
let domain_req = RegisterDeviceReq {
device_id: req.device_id,
device_name: req.device_name,
};
match svc.register_device(domain_req, &identity_key) {
Ok(resp) => {
let proto = v1::RegisterDeviceResponse {
success: resp.success,
};
HandlerResult::ok(Bytes::from(proto.encode_to_vec()))
}
Err(e) => domain_err(e),
}
}
pub async fn handle_list_devices(state: Arc<ServerState>, ctx: RequestContext) -> HandlerResult {
let identity_key = match require_auth(&state, &ctx) {
Ok(ik) => ik,
Err(e) => return e,
};
// ListDevicesRequest is empty but we still decode for protocol correctness.
let _req = match v1::ListDevicesRequest::decode(ctx.payload) {
Ok(r) => r,
Err(e) => {
return HandlerResult::err(
quicproquo_rpc::error::RpcStatus::BadRequest,
&format!("decode: {e}"),
)
}
};
let svc = DeviceService {
store: Arc::clone(&state.store),
};
match svc.list_devices(&identity_key) {
Ok(resp) => {
let proto = v1::ListDevicesResponse {
devices: resp
.devices
.into_iter()
.map(|d| v1::Device {
device_id: d.device_id,
device_name: d.device_name,
registered_at: d.registered_at,
})
.collect(),
};
HandlerResult::ok(Bytes::from(proto.encode_to_vec()))
}
Err(e) => domain_err(e),
}
}
pub async fn handle_revoke_device(state: Arc<ServerState>, ctx: RequestContext) -> HandlerResult {
let identity_key = match require_auth(&state, &ctx) {
Ok(ik) => ik,
Err(e) => return e,
};
let req = match v1::RevokeDeviceRequest::decode(ctx.payload) {
Ok(r) => r,
Err(e) => {
return HandlerResult::err(
quicproquo_rpc::error::RpcStatus::BadRequest,
&format!("decode: {e}"),
)
}
};
let svc = DeviceService {
store: Arc::clone(&state.store),
};
let domain_req = RevokeDeviceReq {
device_id: req.device_id,
};
match svc.revoke_device(domain_req, &identity_key) {
Ok(resp) => {
let proto = v1::RevokeDeviceResponse {
success: resp.success,
};
HandlerResult::ok(Bytes::from(proto.encode_to_vec()))
}
Err(e) => domain_err(e),
}
}

View File

@@ -0,0 +1,67 @@
//! Federation handlers — stubs returning Unimplemented.
//!
//! These will be wired to actual federation logic when the federation
//! subsystem is migrated to the v2 RPC framework.
use std::sync::Arc;
use bytes::Bytes;
use prost::Message;
use quicproquo_proto::qpq::v1;
use quicproquo_rpc::error::RpcStatus;
use quicproquo_rpc::method::{HandlerResult, RequestContext};
use super::ServerState;
fn unimplemented(name: &str) -> HandlerResult {
HandlerResult::err(
RpcStatus::Internal,
&format!("{name}: federation not yet implemented in v2"),
)
}
pub async fn handle_relay_enqueue(
_state: Arc<ServerState>,
_ctx: RequestContext,
) -> HandlerResult {
unimplemented("RelayEnqueue")
}
pub async fn handle_relay_batch_enqueue(
_state: Arc<ServerState>,
_ctx: RequestContext,
) -> HandlerResult {
unimplemented("RelayBatchEnqueue")
}
pub async fn handle_proxy_fetch_key_package(
_state: Arc<ServerState>,
_ctx: RequestContext,
) -> HandlerResult {
unimplemented("ProxyFetchKeyPackage")
}
pub async fn handle_proxy_fetch_hybrid_key(
_state: Arc<ServerState>,
_ctx: RequestContext,
) -> HandlerResult {
unimplemented("ProxyFetchHybridKey")
}
pub async fn handle_proxy_resolve_user(
_state: Arc<ServerState>,
_ctx: RequestContext,
) -> HandlerResult {
unimplemented("ProxyResolveUser")
}
pub async fn handle_federation_health(
_state: Arc<ServerState>,
_ctx: RequestContext,
) -> HandlerResult {
let proto = v1::FederationHealthResponse {
status: "ok".into(),
server_domain: String::new(),
};
HandlerResult::ok(Bytes::from(proto.encode_to_vec()))
}

View File

@@ -0,0 +1,217 @@
//! Key management handlers — KeyPackage and hybrid key operations.
use std::sync::Arc;
use bytes::Bytes;
use prost::Message;
use quicproquo_proto::qpq::v1;
use quicproquo_rpc::method::{HandlerResult, RequestContext};
use crate::domain::keys::KeyService;
use crate::domain::types::{
CallerAuth, FetchHybridKeyReq, FetchHybridKeysReq, FetchKeyPackageReq, UploadHybridKeyReq,
UploadKeyPackageReq,
};
use super::{domain_err, require_auth, ServerState};
fn caller_auth(identity_key: Vec<u8>) -> CallerAuth {
CallerAuth {
identity_key,
token: Vec::new(),
device_id: None,
}
}
pub async fn handle_upload_key_package(
state: Arc<ServerState>,
ctx: RequestContext,
) -> HandlerResult {
let identity_key = match require_auth(&state, &ctx) {
Ok(ik) => ik,
Err(e) => return e,
};
let req = match v1::UploadKeyPackageRequest::decode(ctx.payload) {
Ok(r) => r,
Err(e) => {
return HandlerResult::err(
quicproquo_rpc::error::RpcStatus::BadRequest,
&format!("decode: {e}"),
)
}
};
let svc = KeyService {
store: Arc::clone(&state.store),
};
let auth = caller_auth(identity_key);
let domain_req = UploadKeyPackageReq {
identity_key: req.identity_key,
package: req.package,
};
match svc.upload_key_package(domain_req, &auth) {
Ok(resp) => {
let proto = v1::UploadKeyPackageResponse {
fingerprint: resp.fingerprint,
};
HandlerResult::ok(Bytes::from(proto.encode_to_vec()))
}
Err(e) => domain_err(e),
}
}
pub async fn handle_fetch_key_package(
state: Arc<ServerState>,
ctx: RequestContext,
) -> HandlerResult {
let identity_key = match require_auth(&state, &ctx) {
Ok(ik) => ik,
Err(e) => return e,
};
let req = match v1::FetchKeyPackageRequest::decode(ctx.payload) {
Ok(r) => r,
Err(e) => {
return HandlerResult::err(
quicproquo_rpc::error::RpcStatus::BadRequest,
&format!("decode: {e}"),
)
}
};
let svc = KeyService {
store: Arc::clone(&state.store),
};
let auth = caller_auth(identity_key);
let domain_req = FetchKeyPackageReq {
identity_key: req.identity_key,
};
match svc.fetch_key_package(domain_req, &auth) {
Ok(resp) => {
let proto = v1::FetchKeyPackageResponse {
package: resp.package,
};
HandlerResult::ok(Bytes::from(proto.encode_to_vec()))
}
Err(e) => domain_err(e),
}
}
pub async fn handle_upload_hybrid_key(
state: Arc<ServerState>,
ctx: RequestContext,
) -> HandlerResult {
let identity_key = match require_auth(&state, &ctx) {
Ok(ik) => ik,
Err(e) => return e,
};
let req = match v1::UploadHybridKeyRequest::decode(ctx.payload) {
Ok(r) => r,
Err(e) => {
return HandlerResult::err(
quicproquo_rpc::error::RpcStatus::BadRequest,
&format!("decode: {e}"),
)
}
};
let svc = KeyService {
store: Arc::clone(&state.store),
};
let auth = caller_auth(identity_key);
let domain_req = UploadHybridKeyReq {
identity_key: req.identity_key,
hybrid_public_key: req.hybrid_public_key,
};
match svc.upload_hybrid_key(domain_req, &auth) {
Ok(()) => {
let proto = v1::UploadHybridKeyResponse {};
HandlerResult::ok(Bytes::from(proto.encode_to_vec()))
}
Err(e) => domain_err(e),
}
}
pub async fn handle_fetch_hybrid_key(
state: Arc<ServerState>,
ctx: RequestContext,
) -> HandlerResult {
let identity_key = match require_auth(&state, &ctx) {
Ok(ik) => ik,
Err(e) => return e,
};
let req = match v1::FetchHybridKeyRequest::decode(ctx.payload) {
Ok(r) => r,
Err(e) => {
return HandlerResult::err(
quicproquo_rpc::error::RpcStatus::BadRequest,
&format!("decode: {e}"),
)
}
};
let svc = KeyService {
store: Arc::clone(&state.store),
};
let auth = caller_auth(identity_key);
let domain_req = FetchHybridKeyReq {
identity_key: req.identity_key,
};
match svc.fetch_hybrid_key(domain_req, &auth) {
Ok(resp) => {
let proto = v1::FetchHybridKeyResponse {
hybrid_public_key: resp.hybrid_public_key,
};
HandlerResult::ok(Bytes::from(proto.encode_to_vec()))
}
Err(e) => domain_err(e),
}
}
pub async fn handle_fetch_hybrid_keys(
state: Arc<ServerState>,
ctx: RequestContext,
) -> HandlerResult {
let identity_key = match require_auth(&state, &ctx) {
Ok(ik) => ik,
Err(e) => return e,
};
let req = match v1::FetchHybridKeysRequest::decode(ctx.payload) {
Ok(r) => r,
Err(e) => {
return HandlerResult::err(
quicproquo_rpc::error::RpcStatus::BadRequest,
&format!("decode: {e}"),
)
}
};
let svc = KeyService {
store: Arc::clone(&state.store),
};
let auth = caller_auth(identity_key);
let domain_req = FetchHybridKeysReq {
identity_keys: req.identity_keys,
};
match svc.fetch_hybrid_keys(domain_req, &auth) {
Ok(resp) => {
let proto = v1::FetchHybridKeysResponse { keys: resp.keys };
HandlerResult::ok(Bytes::from(proto.encode_to_vec()))
}
Err(e) => domain_err(e),
}
}

View File

@@ -0,0 +1,287 @@
//! v2 RPC handler dispatch — protobuf in, domain logic, protobuf out.
use std::path::PathBuf;
use std::sync::Arc;
use dashmap::DashMap;
use opaque_ke::ServerSetup;
use quicproquo_core::opaque_auth::OpaqueSuite;
use quicproquo_proto::method_ids;
use quicproquo_rpc::error::RpcStatus;
use quicproquo_rpc::method::{HandlerResult, MethodRegistry, RequestContext};
use tokio::sync::Notify;
use crate::auth::{AuthConfig, PendingLogin, RateEntry, SessionInfo};
use crate::hooks::ServerHooks;
use crate::storage::Store;
pub mod account;
pub mod auth;
pub mod blob;
pub mod channel;
pub mod delivery;
pub mod device;
pub mod federation;
pub mod keys;
pub mod p2p;
pub mod user;
/// Shared server state accessible by all v2 RPC handlers.
pub struct ServerState {
pub store: Arc<dyn Store>,
pub waiters: Arc<DashMap<Vec<u8>, Arc<Notify>>>,
pub auth_cfg: Arc<AuthConfig>,
pub opaque_setup: Arc<ServerSetup<OpaqueSuite>>,
pub pending_logins: Arc<DashMap<String, PendingLogin>>,
pub sessions: Arc<DashMap<Vec<u8>, SessionInfo>>,
pub rate_limits: Arc<DashMap<Vec<u8>, RateEntry>>,
pub sealed_sender: bool,
pub hooks: Arc<dyn ServerHooks>,
pub signing_key: Arc<quicproquo_core::IdentityKeypair>,
pub kt_log: Arc<std::sync::Mutex<quicproquo_kt::MerkleLog>>,
pub data_dir: PathBuf,
pub redact_logs: bool,
}
/// Validate the session token from the request context and return the
/// authenticated caller's identity key. Returns an Unauthorized HandlerResult
/// on failure.
pub fn require_auth(state: &ServerState, ctx: &RequestContext) -> Result<Vec<u8>, HandlerResult> {
let token = ctx
.session_token
.as_deref()
.or(ctx.identity_key.as_deref())
.unwrap_or(&[]);
if token.is_empty() {
return Err(HandlerResult::err(
RpcStatus::Unauthorized,
"missing session token",
));
}
// Check session store.
if let Some(session) = state.sessions.get(token) {
let now = crate::auth::current_timestamp();
if session.expires_at > now && !session.identity_key.is_empty() {
return Ok(session.identity_key.clone());
}
}
// Fall back to static bearer token (dev mode).
if state.auth_cfg.allow_insecure_identity_from_request {
if let Some(ik) = ctx.identity_key.as_deref() {
if !ik.is_empty() {
return Ok(ik.to_vec());
}
}
}
Err(HandlerResult::err(
RpcStatus::Unauthorized,
"invalid or expired session token",
))
}
/// Map a domain error to an RPC HandlerResult error.
pub fn domain_err(e: crate::domain::types::DomainError) -> HandlerResult {
use crate::domain::types::DomainError;
match &e {
DomainError::InvalidIdentityKey(_)
| DomainError::EmptyPackage
| DomainError::EmptyHybridKey
| DomainError::EmptyUsername
| DomainError::BlobHashLength(_)
| DomainError::BadParams(_) => HandlerResult::err(RpcStatus::BadRequest, &e.to_string()),
DomainError::BlobNotFound | DomainError::DeviceNotFound => {
HandlerResult::err(RpcStatus::NotFound, &e.to_string())
}
DomainError::PackageTooLarge(_) | DomainError::BlobTooLarge(_) => {
HandlerResult::err(RpcStatus::BadRequest, &e.to_string())
}
DomainError::BlobHashMismatch => {
HandlerResult::err(RpcStatus::BadRequest, &e.to_string())
}
DomainError::DeviceLimit(_) => HandlerResult::err(RpcStatus::Forbidden, &e.to_string()),
DomainError::Io(_) | DomainError::Storage(_) => {
HandlerResult::err(RpcStatus::Internal, &e.to_string())
}
}
}
/// Build the v2 method registry with all 33 handlers registered.
pub fn build_registry() -> MethodRegistry<ServerState> {
let mut reg = MethodRegistry::new();
// Auth (100-103)
reg.register(
method_ids::OPAQUE_REGISTER_START,
"OpaqueRegisterStart",
auth::handle_opaque_register_start,
);
reg.register(
method_ids::OPAQUE_REGISTER_FINISH,
"OpaqueRegisterFinish",
auth::handle_opaque_register_finish,
);
reg.register(
method_ids::OPAQUE_LOGIN_START,
"OpaqueLoginStart",
auth::handle_opaque_login_start,
);
reg.register(
method_ids::OPAQUE_LOGIN_FINISH,
"OpaqueLoginFinish",
auth::handle_opaque_login_finish,
);
// Delivery (200-205)
reg.register(method_ids::ENQUEUE, "Enqueue", delivery::handle_enqueue);
reg.register(method_ids::FETCH, "Fetch", delivery::handle_fetch);
reg.register(
method_ids::FETCH_WAIT,
"FetchWait",
delivery::handle_fetch_wait,
);
reg.register(method_ids::PEEK, "Peek", delivery::handle_peek);
reg.register(method_ids::ACK, "Ack", delivery::handle_ack);
reg.register(
method_ids::BATCH_ENQUEUE,
"BatchEnqueue",
delivery::handle_batch_enqueue,
);
// Keys (300-304)
reg.register(
method_ids::UPLOAD_KEY_PACKAGE,
"UploadKeyPackage",
keys::handle_upload_key_package,
);
reg.register(
method_ids::FETCH_KEY_PACKAGE,
"FetchKeyPackage",
keys::handle_fetch_key_package,
);
reg.register(
method_ids::UPLOAD_HYBRID_KEY,
"UploadHybridKey",
keys::handle_upload_hybrid_key,
);
reg.register(
method_ids::FETCH_HYBRID_KEY,
"FetchHybridKey",
keys::handle_fetch_hybrid_key,
);
reg.register(
method_ids::FETCH_HYBRID_KEYS,
"FetchHybridKeys",
keys::handle_fetch_hybrid_keys,
);
// Channel (400)
reg.register(
method_ids::CREATE_CHANNEL,
"CreateChannel",
channel::handle_create_channel,
);
// User (500-501)
reg.register(
method_ids::RESOLVE_USER,
"ResolveUser",
user::handle_resolve_user,
);
reg.register(
method_ids::RESOLVE_IDENTITY,
"ResolveIdentity",
user::handle_resolve_identity,
);
// Blob (600-601)
reg.register(
method_ids::UPLOAD_BLOB,
"UploadBlob",
blob::handle_upload_blob,
);
reg.register(
method_ids::DOWNLOAD_BLOB,
"DownloadBlob",
blob::handle_download_blob,
);
// Device (700-702)
reg.register(
method_ids::REGISTER_DEVICE,
"RegisterDevice",
device::handle_register_device,
);
reg.register(
method_ids::LIST_DEVICES,
"ListDevices",
device::handle_list_devices,
);
reg.register(
method_ids::REVOKE_DEVICE,
"RevokeDevice",
device::handle_revoke_device,
);
// P2P (800-802)
reg.register(
method_ids::PUBLISH_ENDPOINT,
"PublishEndpoint",
p2p::handle_publish_endpoint,
);
reg.register(
method_ids::RESOLVE_ENDPOINT,
"ResolveEndpoint",
p2p::handle_resolve_endpoint,
);
reg.register(method_ids::HEALTH, "Health", p2p::handle_health);
// Federation (900-905)
reg.register(
method_ids::RELAY_ENQUEUE,
"RelayEnqueue",
federation::handle_relay_enqueue,
);
reg.register(
method_ids::RELAY_BATCH_ENQUEUE,
"RelayBatchEnqueue",
federation::handle_relay_batch_enqueue,
);
reg.register(
method_ids::PROXY_FETCH_KEY_PACKAGE,
"ProxyFetchKeyPackage",
federation::handle_proxy_fetch_key_package,
);
reg.register(
method_ids::PROXY_FETCH_HYBRID_KEY,
"ProxyFetchHybridKey",
federation::handle_proxy_fetch_hybrid_key,
);
reg.register(
method_ids::PROXY_RESOLVE_USER,
"ProxyResolveUser",
federation::handle_proxy_resolve_user,
);
reg.register(
method_ids::FEDERATION_HEALTH,
"FederationHealth",
federation::handle_federation_health,
);
// Account (950)
reg.register(
method_ids::DELETE_ACCOUNT,
"DeleteAccount",
account::handle_delete_account,
);
reg
}

View File

@@ -0,0 +1,108 @@
//! P2P handlers — publish/resolve endpoints and health check.
use std::sync::Arc;
use bytes::Bytes;
use prost::Message;
use quicproquo_proto::qpq::v1;
use quicproquo_rpc::method::{HandlerResult, RequestContext};
use crate::domain::p2p::P2pService;
use crate::domain::types::{CallerAuth, PublishEndpointReq, ResolveEndpointReq};
use super::{domain_err, require_auth, ServerState};
fn caller_auth(identity_key: Vec<u8>) -> CallerAuth {
CallerAuth {
identity_key,
token: Vec::new(),
device_id: None,
}
}
pub async fn handle_publish_endpoint(
state: Arc<ServerState>,
ctx: RequestContext,
) -> HandlerResult {
let identity_key = match require_auth(&state, &ctx) {
Ok(ik) => ik,
Err(e) => return e,
};
let req = match v1::PublishEndpointRequest::decode(ctx.payload) {
Ok(r) => r,
Err(e) => {
return HandlerResult::err(
quicproquo_rpc::error::RpcStatus::BadRequest,
&format!("decode: {e}"),
)
}
};
let svc = P2pService {
store: Arc::clone(&state.store),
};
let auth = caller_auth(identity_key);
let domain_req = PublishEndpointReq {
identity_key: req.identity_key,
node_addr: req.node_addr,
};
match svc.publish_endpoint(domain_req, &auth) {
Ok(()) => {
let proto = v1::PublishEndpointResponse {};
HandlerResult::ok(Bytes::from(proto.encode_to_vec()))
}
Err(e) => domain_err(e),
}
}
pub async fn handle_resolve_endpoint(
state: Arc<ServerState>,
ctx: RequestContext,
) -> HandlerResult {
let identity_key = match require_auth(&state, &ctx) {
Ok(ik) => ik,
Err(e) => return e,
};
let req = match v1::ResolveEndpointRequest::decode(ctx.payload) {
Ok(r) => r,
Err(e) => {
return HandlerResult::err(
quicproquo_rpc::error::RpcStatus::BadRequest,
&format!("decode: {e}"),
)
}
};
let svc = P2pService {
store: Arc::clone(&state.store),
};
let auth = caller_auth(identity_key);
let domain_req = ResolveEndpointReq {
identity_key: req.identity_key,
};
match svc.resolve_endpoint(domain_req, &auth) {
Ok(resp) => {
let proto = v1::ResolveEndpointResponse {
node_addr: resp.node_addr,
};
HandlerResult::ok(Bytes::from(proto.encode_to_vec()))
}
Err(e) => domain_err(e),
}
}
pub async fn handle_health(
_state: Arc<ServerState>,
_ctx: RequestContext,
) -> HandlerResult {
let resp = v1::HealthResponse {
status: "ok".into(),
};
HandlerResult::ok(Bytes::from(resp.encode_to_vec()))
}

View File

@@ -0,0 +1,89 @@
//! User resolution handlers — username <-> identity key lookups.
use std::sync::Arc;
use bytes::Bytes;
use prost::Message;
use quicproquo_proto::qpq::v1;
use quicproquo_rpc::method::{HandlerResult, RequestContext};
use crate::domain::types::{ResolveIdentityReq, ResolveUserReq};
use crate::domain::users::UserService;
use super::{domain_err, require_auth, ServerState};
pub async fn handle_resolve_user(state: Arc<ServerState>, ctx: RequestContext) -> HandlerResult {
let _identity_key = match require_auth(&state, &ctx) {
Ok(ik) => ik,
Err(e) => return e,
};
let req = match v1::ResolveUserRequest::decode(ctx.payload) {
Ok(r) => r,
Err(e) => {
return HandlerResult::err(
quicproquo_rpc::error::RpcStatus::BadRequest,
&format!("decode: {e}"),
)
}
};
let svc = UserService {
store: Arc::clone(&state.store),
kt_log: Arc::clone(&state.kt_log),
};
let domain_req = ResolveUserReq {
username: req.username,
};
match svc.resolve_user(domain_req) {
Ok(resp) => {
let proto = v1::ResolveUserResponse {
identity_key: resp.identity_key,
inclusion_proof: resp.inclusion_proof,
};
HandlerResult::ok(Bytes::from(proto.encode_to_vec()))
}
Err(e) => domain_err(e),
}
}
pub async fn handle_resolve_identity(
state: Arc<ServerState>,
ctx: RequestContext,
) -> HandlerResult {
let _identity_key = match require_auth(&state, &ctx) {
Ok(ik) => ik,
Err(e) => return e,
};
let req = match v1::ResolveIdentityRequest::decode(ctx.payload) {
Ok(r) => r,
Err(e) => {
return HandlerResult::err(
quicproquo_rpc::error::RpcStatus::BadRequest,
&format!("decode: {e}"),
)
}
};
let svc = UserService {
store: Arc::clone(&state.store),
kt_log: Arc::clone(&state.kt_log),
};
let domain_req = ResolveIdentityReq {
identity_key: req.identity_key,
};
match svc.resolve_identity(domain_req) {
Ok(resp) => {
let proto = v1::ResolveIdentityResponse {
username: resp.username,
};
HandlerResult::ok(Bytes::from(proto.encode_to_vec()))
}
Err(e) => domain_err(e),
}
}