diff --git a/crates/quicproquo-server/Cargo.toml b/crates/quicproquo-server/Cargo.toml index 47b2c6a..3c139f7 100644 --- a/crates/quicproquo-server/Cargo.toml +++ b/crates/quicproquo-server/Cargo.toml @@ -14,6 +14,7 @@ quicproquo-core = { path = "../quicproquo-core" } quicproquo-proto = { path = "../quicproquo-proto" } quicproquo-plugin-api = { path = "../quicproquo-plugin-api" } quicproquo-kt = { path = "../quicproquo-kt" } +quicproquo-rpc = { path = "../quicproquo-rpc" } # Dynamic plugin loading libloading = "0.8" @@ -21,6 +22,8 @@ libloading = "0.8" # Serialisation + RPC capnp = { workspace = true } capnp-rpc = { workspace = true } +prost = { workspace = true } +bytes = { workspace = true } # Async tokio = { workspace = true } diff --git a/crates/quicproquo-server/src/main.rs b/crates/quicproquo-server/src/main.rs index 3c1edec..cf5d360 100644 --- a/crates/quicproquo-server/src/main.rs +++ b/crates/quicproquo-server/src/main.rs @@ -28,6 +28,7 @@ mod plugin_loader; mod sql_store; mod tls; mod storage; +pub mod v2_handlers; mod ws_bridge; use auth::{AuthConfig, PendingLogin, RateEntry, SessionInfo}; diff --git a/crates/quicproquo-server/src/v2_handlers/account.rs b/crates/quicproquo-server/src/v2_handlers/account.rs new file mode 100644 index 0000000..96e36ec --- /dev/null +++ b/crates/quicproquo-server/src/v2_handlers/account.rs @@ -0,0 +1,51 @@ +//! Account handler — account deletion. + +use std::sync::Arc; + +use bytes::Bytes; +use prost::Message; +use quicproquo_proto::qpq::v1; +use quicproquo_rpc::method::{HandlerResult, RequestContext}; + +use crate::domain::account::AccountService; + +use super::{domain_err, require_auth, ServerState}; + +pub async fn handle_delete_account( + state: Arc, + ctx: RequestContext, +) -> HandlerResult { + let identity_key = match require_auth(&state, &ctx) { + Ok(ik) => ik, + Err(e) => return e, + }; + + // DeleteAccountRequest is empty but decode for protocol correctness. + let _req = match v1::DeleteAccountRequest::decode(ctx.payload) { + Ok(r) => r, + Err(e) => { + return HandlerResult::err( + quicproquo_rpc::error::RpcStatus::BadRequest, + &format!("decode: {e}"), + ) + } + }; + + let svc = AccountService { + store: Arc::clone(&state.store), + kt_log: Arc::clone(&state.kt_log), + }; + + match svc.delete_account(&identity_key) { + Ok(()) => { + // Remove session for the deleted account. + if let Some(token) = ctx.session_token.as_deref() { + state.sessions.remove(token); + } + + let proto = v1::DeleteAccountResponse { success: true }; + HandlerResult::ok(Bytes::from(proto.encode_to_vec())) + } + Err(e) => domain_err(e), + } +} diff --git a/crates/quicproquo-server/src/v2_handlers/auth.rs b/crates/quicproquo-server/src/v2_handlers/auth.rs new file mode 100644 index 0000000..72992af --- /dev/null +++ b/crates/quicproquo-server/src/v2_handlers/auth.rs @@ -0,0 +1,255 @@ +//! OPAQUE auth handlers — registration and login. + +use std::sync::Arc; + +use bytes::Bytes; +use prost::Message; +use quicproquo_proto::qpq::v1; +use quicproquo_rpc::error::RpcStatus; +use quicproquo_rpc::method::{HandlerResult, RequestContext}; + +use crate::auth::{PendingLogin, SessionInfo, SESSION_TTL_SECS}; +use crate::domain::auth::AuthService; +use crate::domain::types::{RegisterFinishReq, RegisterStartReq}; + +use super::ServerState; + +pub async fn handle_opaque_register_start( + state: Arc, + ctx: RequestContext, +) -> HandlerResult { + let req = match v1::OpaqueRegisterStartRequest::decode(ctx.payload) { + Ok(r) => r, + Err(e) => return HandlerResult::err(RpcStatus::BadRequest, &format!("decode: {e}")), + }; + + if req.username.is_empty() { + return HandlerResult::err(RpcStatus::BadRequest, "username must not be empty"); + } + + let svc = AuthService { + store: Arc::clone(&state.store), + opaque_setup: Arc::clone(&state.opaque_setup), + pending_logins: Arc::clone(&state.pending_logins), + sessions: Arc::clone(&state.sessions), + auth_cfg: Arc::clone(&state.auth_cfg), + }; + + let domain_req = RegisterStartReq { + username: req.username, + request_bytes: req.request, + }; + + match svc.register_start(domain_req) { + Ok(resp) => { + let proto = v1::OpaqueRegisterStartResponse { + response: resp.response_bytes, + }; + HandlerResult::ok(Bytes::from(proto.encode_to_vec())) + } + Err(e) => HandlerResult::err(RpcStatus::Internal, &format!("register_start: {e}")), + } +} + +pub async fn handle_opaque_register_finish( + state: Arc, + ctx: RequestContext, +) -> HandlerResult { + let req = match v1::OpaqueRegisterFinishRequest::decode(ctx.payload) { + Ok(r) => r, + Err(e) => return HandlerResult::err(RpcStatus::BadRequest, &format!("decode: {e}")), + }; + + if req.username.is_empty() { + return HandlerResult::err(RpcStatus::BadRequest, "username must not be empty"); + } + + let svc = AuthService { + store: Arc::clone(&state.store), + opaque_setup: Arc::clone(&state.opaque_setup), + pending_logins: Arc::clone(&state.pending_logins), + sessions: Arc::clone(&state.sessions), + auth_cfg: Arc::clone(&state.auth_cfg), + }; + + let domain_req = RegisterFinishReq { + username: req.username.clone(), + upload_bytes: req.upload, + identity_key: req.identity_key.clone(), + }; + + match svc.register_finish(domain_req) { + Ok(resp) => { + state + .hooks + .on_user_registered(&req.username, &req.identity_key); + + let proto = v1::OpaqueRegisterFinishResponse { + success: resp.success, + }; + HandlerResult::ok(Bytes::from(proto.encode_to_vec())) + } + Err(e) => HandlerResult::err(RpcStatus::Internal, &format!("register_finish: {e}")), + } +} + +pub async fn handle_opaque_login_start( + state: Arc, + ctx: RequestContext, +) -> HandlerResult { + let req = match v1::OpaqueLoginStartRequest::decode(ctx.payload) { + Ok(r) => r, + Err(e) => return HandlerResult::err(RpcStatus::BadRequest, &format!("decode: {e}")), + }; + + if req.username.is_empty() { + return HandlerResult::err(RpcStatus::BadRequest, "username must not be empty"); + } + + // Look up user record. + let user_record = match state.store.get_user_record(&req.username) { + Ok(Some(r)) => r, + Ok(None) => { + return HandlerResult::err(RpcStatus::NotFound, "user not found"); + } + Err(e) => return HandlerResult::err(RpcStatus::Internal, &format!("store: {e}")), + }; + + // Deserialise stored registration. + let registration = + match opaque_ke::ServerRegistration::::deserialize(&user_record) { + Ok(r) => r, + Err(e) => { + return HandlerResult::err( + RpcStatus::Internal, + &format!("corrupt user record: {e}"), + ) + } + }; + + // Start login. + let credential_request = + match opaque_ke::CredentialRequest::::deserialize(&req.request) + { + Ok(r) => r, + Err(e) => { + return HandlerResult::err(RpcStatus::BadRequest, &format!("bad login request: {e}")) + } + }; + + let login_start = match opaque_ke::ServerLogin::< + quicproquo_core::opaque_auth::OpaqueSuite, + >::start( + &mut rand::rngs::OsRng, + &state.opaque_setup, + Some(registration), + credential_request, + req.username.as_bytes(), + Default::default(), + ) { + Ok(r) => r, + Err(e) => { + return HandlerResult::err(RpcStatus::Internal, &format!("login start: {e}")); + } + }; + + let response_bytes = login_start.message.serialize().to_vec(); + + // Store pending login state. + let now = crate::auth::current_timestamp(); + state.pending_logins.insert( + req.username.clone(), + PendingLogin { + state_bytes: login_start.state.serialize().to_vec(), + created_at: now, + }, + ); + + let proto = v1::OpaqueLoginStartResponse { + response: response_bytes, + }; + HandlerResult::ok(Bytes::from(proto.encode_to_vec())) +} + +pub async fn handle_opaque_login_finish( + state: Arc, + ctx: RequestContext, +) -> HandlerResult { + let req = match v1::OpaqueLoginFinishRequest::decode(ctx.payload) { + Ok(r) => r, + Err(e) => return HandlerResult::err(RpcStatus::BadRequest, &format!("decode: {e}")), + }; + + if req.username.is_empty() { + return HandlerResult::err(RpcStatus::BadRequest, "username must not be empty"); + } + + // Retrieve pending login state. + let pending = match state.pending_logins.remove(&req.username) { + Some((_, p)) => p, + None => { + return HandlerResult::err( + RpcStatus::BadRequest, + "no pending login for this username", + ); + } + }; + + let login_state = match opaque_ke::ServerLogin::< + quicproquo_core::opaque_auth::OpaqueSuite, + >::deserialize(&pending.state_bytes) + { + Ok(s) => s, + Err(e) => { + return HandlerResult::err( + RpcStatus::Internal, + &format!("corrupt pending login: {e}"), + ) + } + }; + + let finalization = match opaque_ke::CredentialFinalization::< + quicproquo_core::opaque_auth::OpaqueSuite, + >::deserialize(&req.finalization) + { + Ok(f) => f, + Err(e) => { + return HandlerResult::err(RpcStatus::BadRequest, &format!("bad finalization: {e}")); + } + }; + + if let Err(e) = login_state.finish(finalization, Default::default()) { + state.hooks.on_auth(&crate::hooks::AuthEvent { + username: req.username.clone(), + success: false, + failure_reason: format!("{e}"), + }); + return HandlerResult::err(RpcStatus::Unauthorized, &format!("login failed: {e}")); + } + + // Generate session token. + let mut token = vec![0u8; 32]; + rand::RngCore::fill_bytes(&mut rand::rngs::OsRng, &mut token); + + let now = crate::auth::current_timestamp(); + state.sessions.insert( + token.clone(), + SessionInfo { + username: req.username.clone(), + identity_key: req.identity_key.clone(), + created_at: now, + expires_at: now + SESSION_TTL_SECS, + }, + ); + + state.hooks.on_auth(&crate::hooks::AuthEvent { + username: req.username, + success: true, + failure_reason: String::new(), + }); + + let proto = v1::OpaqueLoginFinishResponse { + session_token: token, + }; + HandlerResult::ok(Bytes::from(proto.encode_to_vec())) +} diff --git a/crates/quicproquo-server/src/v2_handlers/blob.rs b/crates/quicproquo-server/src/v2_handlers/blob.rs new file mode 100644 index 0000000..f0deb64 --- /dev/null +++ b/crates/quicproquo-server/src/v2_handlers/blob.rs @@ -0,0 +1,101 @@ +//! Blob handlers — chunked file upload/download. + +use std::sync::Arc; + +use bytes::Bytes; +use prost::Message; +use quicproquo_proto::qpq::v1; +use quicproquo_rpc::method::{HandlerResult, RequestContext}; + +use crate::domain::blobs::BlobService; +use crate::domain::types::{CallerAuth, DownloadBlobReq, UploadBlobReq}; + +use super::{domain_err, require_auth, ServerState}; + +fn caller_auth(identity_key: Vec) -> CallerAuth { + CallerAuth { + identity_key, + token: Vec::new(), + device_id: None, + } +} + +pub async fn handle_upload_blob(state: Arc, ctx: RequestContext) -> HandlerResult { + let identity_key = match require_auth(&state, &ctx) { + Ok(ik) => ik, + Err(e) => return e, + }; + + let req = match v1::UploadBlobRequest::decode(ctx.payload) { + Ok(r) => r, + Err(e) => { + return HandlerResult::err( + quicproquo_rpc::error::RpcStatus::BadRequest, + &format!("decode: {e}"), + ) + } + }; + + let svc = BlobService { + data_dir: state.data_dir.clone(), + }; + let auth = caller_auth(identity_key); + + let domain_req = UploadBlobReq { + blob_hash: req.blob_hash, + chunk: req.chunk, + offset: req.offset, + total_size: req.total_size, + mime_type: req.mime_type, + }; + + match svc.upload_blob(domain_req, &auth) { + Ok(resp) => { + let proto = v1::UploadBlobResponse { + blob_id: resp.blob_id, + }; + HandlerResult::ok(Bytes::from(proto.encode_to_vec())) + } + Err(e) => domain_err(e), + } +} + +pub async fn handle_download_blob(state: Arc, ctx: RequestContext) -> HandlerResult { + let identity_key = match require_auth(&state, &ctx) { + Ok(ik) => ik, + Err(e) => return e, + }; + + let req = match v1::DownloadBlobRequest::decode(ctx.payload) { + Ok(r) => r, + Err(e) => { + return HandlerResult::err( + quicproquo_rpc::error::RpcStatus::BadRequest, + &format!("decode: {e}"), + ) + } + }; + + let svc = BlobService { + data_dir: state.data_dir.clone(), + }; + let auth = caller_auth(identity_key); + + let domain_req = DownloadBlobReq { + blob_id: req.blob_id, + offset: req.offset, + length: req.length, + }; + + match svc.download_blob(domain_req, &auth) { + Ok(resp) => { + let proto = v1::DownloadBlobResponse { + chunk: resp.chunk, + total_size: resp.total_size, + mime_type: resp.mime_type, + }; + HandlerResult::ok(Bytes::from(proto.encode_to_vec())) + } + Err(e) => domain_err(e), + } +} diff --git a/crates/quicproquo-server/src/v2_handlers/channel.rs b/crates/quicproquo-server/src/v2_handlers/channel.rs new file mode 100644 index 0000000..f26d4ef --- /dev/null +++ b/crates/quicproquo-server/src/v2_handlers/channel.rs @@ -0,0 +1,60 @@ +//! Channel handler — 1:1 DM channel creation. + +use std::sync::Arc; + +use bytes::Bytes; +use prost::Message; +use quicproquo_proto::qpq::v1; +use quicproquo_rpc::method::{HandlerResult, RequestContext}; + +use crate::domain::channels::ChannelService; +use crate::domain::types::CreateChannelReq; +use crate::hooks::ChannelEvent; + +use super::{domain_err, require_auth, ServerState}; + +pub async fn handle_create_channel( + state: Arc, + ctx: RequestContext, +) -> HandlerResult { + let identity_key = match require_auth(&state, &ctx) { + Ok(ik) => ik, + Err(e) => return e, + }; + + let req = match v1::CreateChannelRequest::decode(ctx.payload) { + Ok(r) => r, + Err(e) => { + return HandlerResult::err( + quicproquo_rpc::error::RpcStatus::BadRequest, + &format!("decode: {e}"), + ) + } + }; + + let svc = ChannelService { + store: Arc::clone(&state.store), + }; + + let domain_req = CreateChannelReq { + peer_key: req.peer_key.clone(), + }; + + match svc.create_channel(domain_req, &identity_key) { + Ok(resp) => { + state.hooks.on_channel_created(&ChannelEvent { + channel_id: resp.channel_id.clone(), + initiator_key: identity_key, + peer_key: req.peer_key, + was_new: resp.was_new, + }); + + let proto = v1::CreateChannelResponse { + channel_id: resp.channel_id, + was_new: resp.was_new, + }; + HandlerResult::ok(Bytes::from(proto.encode_to_vec())) + } + Err(e) => domain_err(e), + } +} diff --git a/crates/quicproquo-server/src/v2_handlers/delivery.rs b/crates/quicproquo-server/src/v2_handlers/delivery.rs new file mode 100644 index 0000000..c9323d6 --- /dev/null +++ b/crates/quicproquo-server/src/v2_handlers/delivery.rs @@ -0,0 +1,329 @@ +//! Delivery handlers — enqueue, fetch, fetch_wait, peek, ack, batch_enqueue. + +use std::sync::Arc; + +use bytes::Bytes; +use prost::Message; +use quicproquo_proto::qpq::v1; +use quicproquo_rpc::error::RpcStatus; +use quicproquo_rpc::method::{HandlerResult, RequestContext}; +use tokio::sync::Notify; + +use crate::domain::delivery::DeliveryService; +use crate::domain::types::{AckReq, BatchEnqueueReq, EnqueueReq, FetchReq, PeekReq}; +use crate::hooks::{HookAction, MessageEvent}; + +use super::{require_auth, ServerState}; + +pub async fn handle_enqueue(state: Arc, ctx: RequestContext) -> HandlerResult { + let identity_key = match require_auth(&state, &ctx) { + Ok(ik) => ik, + Err(e) => return e, + }; + + let req = match v1::EnqueueRequest::decode(ctx.payload) { + Ok(r) => r, + Err(e) => return HandlerResult::err(RpcStatus::BadRequest, &format!("decode: {e}")), + }; + + if req.recipient_key.is_empty() || req.payload.is_empty() { + return HandlerResult::err(RpcStatus::BadRequest, "recipient_key and payload required"); + } + + // Rate limiting. + if let Err(_e) = crate::auth::check_rate_limit(&state.rate_limits, &identity_key) { + return HandlerResult::err(RpcStatus::RateLimited, "rate limit exceeded"); + } + + let svc = DeliveryService { + store: Arc::clone(&state.store), + waiters: Arc::clone(&state.waiters), + }; + + let domain_req = EnqueueReq { + recipient_key: req.recipient_key.clone(), + payload: req.payload.clone(), + channel_id: req.channel_id.clone(), + ttl_secs: req.ttl_secs, + }; + + match svc.enqueue(domain_req) { + Ok(resp) => { + // Fire hook. + let action = state.hooks.on_message_enqueue(&MessageEvent { + sender_identity: Some(identity_key), + recipient_key: req.recipient_key, + channel_id: req.channel_id, + payload_len: req.payload.len(), + seq: resp.seq, + }); + if let HookAction::Reject(reason) = action { + return HandlerResult::err(RpcStatus::Forbidden, &reason); + } + + let proto = v1::EnqueueResponse { + seq: resp.seq, + delivery_proof: resp.delivery_proof, + }; + HandlerResult::ok(Bytes::from(proto.encode_to_vec())) + } + Err(e) => HandlerResult::err(RpcStatus::Internal, &format!("enqueue: {e}")), + } +} + +pub async fn handle_fetch(state: Arc, ctx: RequestContext) -> HandlerResult { + let identity_key = match require_auth(&state, &ctx) { + Ok(ik) => ik, + Err(e) => return e, + }; + + let req = match v1::FetchRequest::decode(ctx.payload) { + Ok(r) => r, + Err(e) => return HandlerResult::err(RpcStatus::BadRequest, &format!("decode: {e}")), + }; + + let svc = DeliveryService { + store: Arc::clone(&state.store), + waiters: Arc::clone(&state.waiters), + }; + + let domain_req = FetchReq { + recipient_key: if req.recipient_key.is_empty() { + identity_key + } else { + req.recipient_key + }, + channel_id: req.channel_id, + limit: req.limit, + }; + + match svc.fetch(domain_req) { + Ok(resp) => { + let proto = v1::FetchResponse { + payloads: resp + .payloads + .into_iter() + .map(|e| v1::Envelope { + seq: e.seq, + data: e.data, + }) + .collect(), + }; + HandlerResult::ok(Bytes::from(proto.encode_to_vec())) + } + Err(e) => HandlerResult::err(RpcStatus::Internal, &format!("fetch: {e}")), + } +} + +pub async fn handle_fetch_wait(state: Arc, ctx: RequestContext) -> HandlerResult { + let identity_key = match require_auth(&state, &ctx) { + Ok(ik) => ik, + Err(e) => return e, + }; + + let req = match v1::FetchWaitRequest::decode(ctx.payload) { + Ok(r) => r, + Err(e) => return HandlerResult::err(RpcStatus::BadRequest, &format!("decode: {e}")), + }; + + let recipient_key = if req.recipient_key.is_empty() { + identity_key + } else { + req.recipient_key + }; + + let timeout_ms = if req.timeout_ms == 0 { + 30_000 + } else { + req.timeout_ms.min(60_000) + }; + + let svc = DeliveryService { + store: Arc::clone(&state.store), + waiters: Arc::clone(&state.waiters), + }; + + // Try immediate fetch first. + let fetch_req = FetchReq { + recipient_key: recipient_key.clone(), + channel_id: req.channel_id.clone(), + limit: req.limit, + }; + + match svc.fetch(fetch_req) { + Ok(resp) if !resp.payloads.is_empty() => { + let proto = v1::FetchWaitResponse { + payloads: resp + .payloads + .into_iter() + .map(|e| v1::Envelope { + seq: e.seq, + data: e.data, + }) + .collect(), + }; + return HandlerResult::ok(Bytes::from(proto.encode_to_vec())); + } + Err(e) => { + return HandlerResult::err(RpcStatus::Internal, &format!("fetch: {e}")); + } + _ => {} + } + + // Long-poll: wait for notification or timeout. + let notify = state + .waiters + .entry(recipient_key.clone()) + .or_insert_with(|| Arc::new(Notify::new())) + .clone(); + + let timeout = tokio::time::Duration::from_millis(timeout_ms); + let _ = tokio::time::timeout(timeout, notify.notified()).await; + + // Re-fetch after wake or timeout. + let fetch_req = FetchReq { + recipient_key, + channel_id: req.channel_id, + limit: req.limit, + }; + + match svc.fetch(fetch_req) { + Ok(resp) => { + let proto = v1::FetchWaitResponse { + payloads: resp + .payloads + .into_iter() + .map(|e| v1::Envelope { + seq: e.seq, + data: e.data, + }) + .collect(), + }; + HandlerResult::ok(Bytes::from(proto.encode_to_vec())) + } + Err(e) => HandlerResult::err(RpcStatus::Internal, &format!("fetch: {e}")), + } +} + +pub async fn handle_peek(state: Arc, ctx: RequestContext) -> HandlerResult { + let identity_key = match require_auth(&state, &ctx) { + Ok(ik) => ik, + Err(e) => return e, + }; + + let req = match v1::PeekRequest::decode(ctx.payload) { + Ok(r) => r, + Err(e) => return HandlerResult::err(RpcStatus::BadRequest, &format!("decode: {e}")), + }; + + let svc = DeliveryService { + store: Arc::clone(&state.store), + waiters: Arc::clone(&state.waiters), + }; + + let domain_req = PeekReq { + recipient_key: if req.recipient_key.is_empty() { + identity_key + } else { + req.recipient_key + }, + channel_id: req.channel_id, + limit: req.limit, + }; + + match svc.peek(domain_req) { + Ok(resp) => { + let proto = v1::PeekResponse { + payloads: resp + .payloads + .into_iter() + .map(|e| v1::Envelope { + seq: e.seq, + data: e.data, + }) + .collect(), + }; + HandlerResult::ok(Bytes::from(proto.encode_to_vec())) + } + Err(e) => HandlerResult::err(RpcStatus::Internal, &format!("peek: {e}")), + } +} + +pub async fn handle_ack(state: Arc, ctx: RequestContext) -> HandlerResult { + let identity_key = match require_auth(&state, &ctx) { + Ok(ik) => ik, + Err(e) => return e, + }; + + let req = match v1::AckRequest::decode(ctx.payload) { + Ok(r) => r, + Err(e) => return HandlerResult::err(RpcStatus::BadRequest, &format!("decode: {e}")), + }; + + let svc = DeliveryService { + store: Arc::clone(&state.store), + waiters: Arc::clone(&state.waiters), + }; + + let domain_req = AckReq { + recipient_key: if req.recipient_key.is_empty() { + identity_key + } else { + req.recipient_key + }, + channel_id: req.channel_id, + seq_up_to: req.seq_up_to, + }; + + match svc.ack(domain_req) { + Ok(()) => { + let proto = v1::AckResponse {}; + HandlerResult::ok(Bytes::from(proto.encode_to_vec())) + } + Err(e) => HandlerResult::err(RpcStatus::Internal, &format!("ack: {e}")), + } +} + +pub async fn handle_batch_enqueue(state: Arc, ctx: RequestContext) -> HandlerResult { + let identity_key = match require_auth(&state, &ctx) { + Ok(ik) => ik, + Err(e) => return e, + }; + + let req = match v1::BatchEnqueueRequest::decode(ctx.payload) { + Ok(r) => r, + Err(e) => return HandlerResult::err(RpcStatus::BadRequest, &format!("decode: {e}")), + }; + + if req.recipient_keys.is_empty() || req.payload.is_empty() { + return HandlerResult::err( + RpcStatus::BadRequest, + "recipient_keys and payload required", + ); + } + + // Rate limiting. + if let Err(_e) = crate::auth::check_rate_limit(&state.rate_limits, &identity_key) { + return HandlerResult::err(RpcStatus::RateLimited, "rate limit exceeded"); + } + + let svc = DeliveryService { + store: Arc::clone(&state.store), + waiters: Arc::clone(&state.waiters), + }; + + let domain_req = BatchEnqueueReq { + recipient_keys: req.recipient_keys, + payload: req.payload, + channel_id: req.channel_id, + ttl_secs: req.ttl_secs, + }; + + match svc.batch_enqueue(domain_req) { + Ok(resp) => { + let proto = v1::BatchEnqueueResponse { seqs: resp.seqs }; + HandlerResult::ok(Bytes::from(proto.encode_to_vec())) + } + Err(e) => HandlerResult::err(RpcStatus::Internal, &format!("batch_enqueue: {e}")), + } +} diff --git a/crates/quicproquo-server/src/v2_handlers/device.rs b/crates/quicproquo-server/src/v2_handlers/device.rs new file mode 100644 index 0000000..6b0d649 --- /dev/null +++ b/crates/quicproquo-server/src/v2_handlers/device.rs @@ -0,0 +1,127 @@ +//! Device handlers — register, list, revoke devices. + +use std::sync::Arc; + +use bytes::Bytes; +use prost::Message; +use quicproquo_proto::qpq::v1; +use quicproquo_rpc::method::{HandlerResult, RequestContext}; + +use crate::domain::devices::DeviceService; +use crate::domain::types::{RegisterDeviceReq, RevokeDeviceReq}; + +use super::{domain_err, require_auth, ServerState}; + +pub async fn handle_register_device( + state: Arc, + ctx: RequestContext, +) -> HandlerResult { + let identity_key = match require_auth(&state, &ctx) { + Ok(ik) => ik, + Err(e) => return e, + }; + + let req = match v1::RegisterDeviceRequest::decode(ctx.payload) { + Ok(r) => r, + Err(e) => { + return HandlerResult::err( + quicproquo_rpc::error::RpcStatus::BadRequest, + &format!("decode: {e}"), + ) + } + }; + + let svc = DeviceService { + store: Arc::clone(&state.store), + }; + + let domain_req = RegisterDeviceReq { + device_id: req.device_id, + device_name: req.device_name, + }; + + match svc.register_device(domain_req, &identity_key) { + Ok(resp) => { + let proto = v1::RegisterDeviceResponse { + success: resp.success, + }; + HandlerResult::ok(Bytes::from(proto.encode_to_vec())) + } + Err(e) => domain_err(e), + } +} + +pub async fn handle_list_devices(state: Arc, ctx: RequestContext) -> HandlerResult { + let identity_key = match require_auth(&state, &ctx) { + Ok(ik) => ik, + Err(e) => return e, + }; + + // ListDevicesRequest is empty but we still decode for protocol correctness. + let _req = match v1::ListDevicesRequest::decode(ctx.payload) { + Ok(r) => r, + Err(e) => { + return HandlerResult::err( + quicproquo_rpc::error::RpcStatus::BadRequest, + &format!("decode: {e}"), + ) + } + }; + + let svc = DeviceService { + store: Arc::clone(&state.store), + }; + + match svc.list_devices(&identity_key) { + Ok(resp) => { + let proto = v1::ListDevicesResponse { + devices: resp + .devices + .into_iter() + .map(|d| v1::Device { + device_id: d.device_id, + device_name: d.device_name, + registered_at: d.registered_at, + }) + .collect(), + }; + HandlerResult::ok(Bytes::from(proto.encode_to_vec())) + } + Err(e) => domain_err(e), + } +} + +pub async fn handle_revoke_device(state: Arc, ctx: RequestContext) -> HandlerResult { + let identity_key = match require_auth(&state, &ctx) { + Ok(ik) => ik, + Err(e) => return e, + }; + + let req = match v1::RevokeDeviceRequest::decode(ctx.payload) { + Ok(r) => r, + Err(e) => { + return HandlerResult::err( + quicproquo_rpc::error::RpcStatus::BadRequest, + &format!("decode: {e}"), + ) + } + }; + + let svc = DeviceService { + store: Arc::clone(&state.store), + }; + + let domain_req = RevokeDeviceReq { + device_id: req.device_id, + }; + + match svc.revoke_device(domain_req, &identity_key) { + Ok(resp) => { + let proto = v1::RevokeDeviceResponse { + success: resp.success, + }; + HandlerResult::ok(Bytes::from(proto.encode_to_vec())) + } + Err(e) => domain_err(e), + } +} diff --git a/crates/quicproquo-server/src/v2_handlers/federation.rs b/crates/quicproquo-server/src/v2_handlers/federation.rs new file mode 100644 index 0000000..e77a605 --- /dev/null +++ b/crates/quicproquo-server/src/v2_handlers/federation.rs @@ -0,0 +1,67 @@ +//! Federation handlers — stubs returning Unimplemented. +//! +//! These will be wired to actual federation logic when the federation +//! subsystem is migrated to the v2 RPC framework. + +use std::sync::Arc; + +use bytes::Bytes; +use prost::Message; +use quicproquo_proto::qpq::v1; +use quicproquo_rpc::error::RpcStatus; +use quicproquo_rpc::method::{HandlerResult, RequestContext}; + +use super::ServerState; + +fn unimplemented(name: &str) -> HandlerResult { + HandlerResult::err( + RpcStatus::Internal, + &format!("{name}: federation not yet implemented in v2"), + ) +} + +pub async fn handle_relay_enqueue( + _state: Arc, + _ctx: RequestContext, +) -> HandlerResult { + unimplemented("RelayEnqueue") +} + +pub async fn handle_relay_batch_enqueue( + _state: Arc, + _ctx: RequestContext, +) -> HandlerResult { + unimplemented("RelayBatchEnqueue") +} + +pub async fn handle_proxy_fetch_key_package( + _state: Arc, + _ctx: RequestContext, +) -> HandlerResult { + unimplemented("ProxyFetchKeyPackage") +} + +pub async fn handle_proxy_fetch_hybrid_key( + _state: Arc, + _ctx: RequestContext, +) -> HandlerResult { + unimplemented("ProxyFetchHybridKey") +} + +pub async fn handle_proxy_resolve_user( + _state: Arc, + _ctx: RequestContext, +) -> HandlerResult { + unimplemented("ProxyResolveUser") +} + +pub async fn handle_federation_health( + _state: Arc, + _ctx: RequestContext, +) -> HandlerResult { + let proto = v1::FederationHealthResponse { + status: "ok".into(), + server_domain: String::new(), + }; + HandlerResult::ok(Bytes::from(proto.encode_to_vec())) +} diff --git a/crates/quicproquo-server/src/v2_handlers/keys.rs b/crates/quicproquo-server/src/v2_handlers/keys.rs new file mode 100644 index 0000000..04e8619 --- /dev/null +++ b/crates/quicproquo-server/src/v2_handlers/keys.rs @@ -0,0 +1,217 @@ +//! Key management handlers — KeyPackage and hybrid key operations. + +use std::sync::Arc; + +use bytes::Bytes; +use prost::Message; +use quicproquo_proto::qpq::v1; +use quicproquo_rpc::method::{HandlerResult, RequestContext}; + +use crate::domain::keys::KeyService; +use crate::domain::types::{ + CallerAuth, FetchHybridKeyReq, FetchHybridKeysReq, FetchKeyPackageReq, UploadHybridKeyReq, + UploadKeyPackageReq, +}; + +use super::{domain_err, require_auth, ServerState}; + +fn caller_auth(identity_key: Vec) -> CallerAuth { + CallerAuth { + identity_key, + token: Vec::new(), + device_id: None, + } +} + +pub async fn handle_upload_key_package( + state: Arc, + ctx: RequestContext, +) -> HandlerResult { + let identity_key = match require_auth(&state, &ctx) { + Ok(ik) => ik, + Err(e) => return e, + }; + + let req = match v1::UploadKeyPackageRequest::decode(ctx.payload) { + Ok(r) => r, + Err(e) => { + return HandlerResult::err( + quicproquo_rpc::error::RpcStatus::BadRequest, + &format!("decode: {e}"), + ) + } + }; + + let svc = KeyService { + store: Arc::clone(&state.store), + }; + let auth = caller_auth(identity_key); + + let domain_req = UploadKeyPackageReq { + identity_key: req.identity_key, + package: req.package, + }; + + match svc.upload_key_package(domain_req, &auth) { + Ok(resp) => { + let proto = v1::UploadKeyPackageResponse { + fingerprint: resp.fingerprint, + }; + HandlerResult::ok(Bytes::from(proto.encode_to_vec())) + } + Err(e) => domain_err(e), + } +} + +pub async fn handle_fetch_key_package( + state: Arc, + ctx: RequestContext, +) -> HandlerResult { + let identity_key = match require_auth(&state, &ctx) { + Ok(ik) => ik, + Err(e) => return e, + }; + + let req = match v1::FetchKeyPackageRequest::decode(ctx.payload) { + Ok(r) => r, + Err(e) => { + return HandlerResult::err( + quicproquo_rpc::error::RpcStatus::BadRequest, + &format!("decode: {e}"), + ) + } + }; + + let svc = KeyService { + store: Arc::clone(&state.store), + }; + let auth = caller_auth(identity_key); + + let domain_req = FetchKeyPackageReq { + identity_key: req.identity_key, + }; + + match svc.fetch_key_package(domain_req, &auth) { + Ok(resp) => { + let proto = v1::FetchKeyPackageResponse { + package: resp.package, + }; + HandlerResult::ok(Bytes::from(proto.encode_to_vec())) + } + Err(e) => domain_err(e), + } +} + +pub async fn handle_upload_hybrid_key( + state: Arc, + ctx: RequestContext, +) -> HandlerResult { + let identity_key = match require_auth(&state, &ctx) { + Ok(ik) => ik, + Err(e) => return e, + }; + + let req = match v1::UploadHybridKeyRequest::decode(ctx.payload) { + Ok(r) => r, + Err(e) => { + return HandlerResult::err( + quicproquo_rpc::error::RpcStatus::BadRequest, + &format!("decode: {e}"), + ) + } + }; + + let svc = KeyService { + store: Arc::clone(&state.store), + }; + let auth = caller_auth(identity_key); + + let domain_req = UploadHybridKeyReq { + identity_key: req.identity_key, + hybrid_public_key: req.hybrid_public_key, + }; + + match svc.upload_hybrid_key(domain_req, &auth) { + Ok(()) => { + let proto = v1::UploadHybridKeyResponse {}; + HandlerResult::ok(Bytes::from(proto.encode_to_vec())) + } + Err(e) => domain_err(e), + } +} + +pub async fn handle_fetch_hybrid_key( + state: Arc, + ctx: RequestContext, +) -> HandlerResult { + let identity_key = match require_auth(&state, &ctx) { + Ok(ik) => ik, + Err(e) => return e, + }; + + let req = match v1::FetchHybridKeyRequest::decode(ctx.payload) { + Ok(r) => r, + Err(e) => { + return HandlerResult::err( + quicproquo_rpc::error::RpcStatus::BadRequest, + &format!("decode: {e}"), + ) + } + }; + + let svc = KeyService { + store: Arc::clone(&state.store), + }; + let auth = caller_auth(identity_key); + + let domain_req = FetchHybridKeyReq { + identity_key: req.identity_key, + }; + + match svc.fetch_hybrid_key(domain_req, &auth) { + Ok(resp) => { + let proto = v1::FetchHybridKeyResponse { + hybrid_public_key: resp.hybrid_public_key, + }; + HandlerResult::ok(Bytes::from(proto.encode_to_vec())) + } + Err(e) => domain_err(e), + } +} + +pub async fn handle_fetch_hybrid_keys( + state: Arc, + ctx: RequestContext, +) -> HandlerResult { + let identity_key = match require_auth(&state, &ctx) { + Ok(ik) => ik, + Err(e) => return e, + }; + + let req = match v1::FetchHybridKeysRequest::decode(ctx.payload) { + Ok(r) => r, + Err(e) => { + return HandlerResult::err( + quicproquo_rpc::error::RpcStatus::BadRequest, + &format!("decode: {e}"), + ) + } + }; + + let svc = KeyService { + store: Arc::clone(&state.store), + }; + let auth = caller_auth(identity_key); + + let domain_req = FetchHybridKeysReq { + identity_keys: req.identity_keys, + }; + + match svc.fetch_hybrid_keys(domain_req, &auth) { + Ok(resp) => { + let proto = v1::FetchHybridKeysResponse { keys: resp.keys }; + HandlerResult::ok(Bytes::from(proto.encode_to_vec())) + } + Err(e) => domain_err(e), + } +} diff --git a/crates/quicproquo-server/src/v2_handlers/mod.rs b/crates/quicproquo-server/src/v2_handlers/mod.rs new file mode 100644 index 0000000..28b8754 --- /dev/null +++ b/crates/quicproquo-server/src/v2_handlers/mod.rs @@ -0,0 +1,287 @@ +//! v2 RPC handler dispatch — protobuf in, domain logic, protobuf out. + +use std::path::PathBuf; +use std::sync::Arc; + +use dashmap::DashMap; +use opaque_ke::ServerSetup; +use quicproquo_core::opaque_auth::OpaqueSuite; +use quicproquo_proto::method_ids; +use quicproquo_rpc::error::RpcStatus; +use quicproquo_rpc::method::{HandlerResult, MethodRegistry, RequestContext}; +use tokio::sync::Notify; + +use crate::auth::{AuthConfig, PendingLogin, RateEntry, SessionInfo}; +use crate::hooks::ServerHooks; +use crate::storage::Store; + +pub mod account; +pub mod auth; +pub mod blob; +pub mod channel; +pub mod delivery; +pub mod device; +pub mod federation; +pub mod keys; +pub mod p2p; +pub mod user; + +/// Shared server state accessible by all v2 RPC handlers. +pub struct ServerState { + pub store: Arc, + pub waiters: Arc, Arc>>, + pub auth_cfg: Arc, + pub opaque_setup: Arc>, + pub pending_logins: Arc>, + pub sessions: Arc, SessionInfo>>, + pub rate_limits: Arc, RateEntry>>, + pub sealed_sender: bool, + pub hooks: Arc, + pub signing_key: Arc, + pub kt_log: Arc>, + pub data_dir: PathBuf, + pub redact_logs: bool, +} + +/// Validate the session token from the request context and return the +/// authenticated caller's identity key. Returns an Unauthorized HandlerResult +/// on failure. +pub fn require_auth(state: &ServerState, ctx: &RequestContext) -> Result, HandlerResult> { + let token = ctx + .session_token + .as_deref() + .or(ctx.identity_key.as_deref()) + .unwrap_or(&[]); + + if token.is_empty() { + return Err(HandlerResult::err( + RpcStatus::Unauthorized, + "missing session token", + )); + } + + // Check session store. + if let Some(session) = state.sessions.get(token) { + let now = crate::auth::current_timestamp(); + if session.expires_at > now && !session.identity_key.is_empty() { + return Ok(session.identity_key.clone()); + } + } + + // Fall back to static bearer token (dev mode). + if state.auth_cfg.allow_insecure_identity_from_request { + if let Some(ik) = ctx.identity_key.as_deref() { + if !ik.is_empty() { + return Ok(ik.to_vec()); + } + } + } + + Err(HandlerResult::err( + RpcStatus::Unauthorized, + "invalid or expired session token", + )) +} + +/// Map a domain error to an RPC HandlerResult error. +pub fn domain_err(e: crate::domain::types::DomainError) -> HandlerResult { + use crate::domain::types::DomainError; + match &e { + DomainError::InvalidIdentityKey(_) + | DomainError::EmptyPackage + | DomainError::EmptyHybridKey + | DomainError::EmptyUsername + | DomainError::BlobHashLength(_) + | DomainError::BadParams(_) => HandlerResult::err(RpcStatus::BadRequest, &e.to_string()), + + DomainError::BlobNotFound | DomainError::DeviceNotFound => { + HandlerResult::err(RpcStatus::NotFound, &e.to_string()) + } + + DomainError::PackageTooLarge(_) | DomainError::BlobTooLarge(_) => { + HandlerResult::err(RpcStatus::BadRequest, &e.to_string()) + } + + DomainError::BlobHashMismatch => { + HandlerResult::err(RpcStatus::BadRequest, &e.to_string()) + } + + DomainError::DeviceLimit(_) => HandlerResult::err(RpcStatus::Forbidden, &e.to_string()), + + DomainError::Io(_) | DomainError::Storage(_) => { + HandlerResult::err(RpcStatus::Internal, &e.to_string()) + } + } +} + +/// Build the v2 method registry with all 33 handlers registered. +pub fn build_registry() -> MethodRegistry { + let mut reg = MethodRegistry::new(); + + // Auth (100-103) + reg.register( + method_ids::OPAQUE_REGISTER_START, + "OpaqueRegisterStart", + auth::handle_opaque_register_start, + ); + reg.register( + method_ids::OPAQUE_REGISTER_FINISH, + "OpaqueRegisterFinish", + auth::handle_opaque_register_finish, + ); + reg.register( + method_ids::OPAQUE_LOGIN_START, + "OpaqueLoginStart", + auth::handle_opaque_login_start, + ); + reg.register( + method_ids::OPAQUE_LOGIN_FINISH, + "OpaqueLoginFinish", + auth::handle_opaque_login_finish, + ); + + // Delivery (200-205) + reg.register(method_ids::ENQUEUE, "Enqueue", delivery::handle_enqueue); + reg.register(method_ids::FETCH, "Fetch", delivery::handle_fetch); + reg.register( + method_ids::FETCH_WAIT, + "FetchWait", + delivery::handle_fetch_wait, + ); + reg.register(method_ids::PEEK, "Peek", delivery::handle_peek); + reg.register(method_ids::ACK, "Ack", delivery::handle_ack); + reg.register( + method_ids::BATCH_ENQUEUE, + "BatchEnqueue", + delivery::handle_batch_enqueue, + ); + + // Keys (300-304) + reg.register( + method_ids::UPLOAD_KEY_PACKAGE, + "UploadKeyPackage", + keys::handle_upload_key_package, + ); + reg.register( + method_ids::FETCH_KEY_PACKAGE, + "FetchKeyPackage", + keys::handle_fetch_key_package, + ); + reg.register( + method_ids::UPLOAD_HYBRID_KEY, + "UploadHybridKey", + keys::handle_upload_hybrid_key, + ); + reg.register( + method_ids::FETCH_HYBRID_KEY, + "FetchHybridKey", + keys::handle_fetch_hybrid_key, + ); + reg.register( + method_ids::FETCH_HYBRID_KEYS, + "FetchHybridKeys", + keys::handle_fetch_hybrid_keys, + ); + + // Channel (400) + reg.register( + method_ids::CREATE_CHANNEL, + "CreateChannel", + channel::handle_create_channel, + ); + + // User (500-501) + reg.register( + method_ids::RESOLVE_USER, + "ResolveUser", + user::handle_resolve_user, + ); + reg.register( + method_ids::RESOLVE_IDENTITY, + "ResolveIdentity", + user::handle_resolve_identity, + ); + + // Blob (600-601) + reg.register( + method_ids::UPLOAD_BLOB, + "UploadBlob", + blob::handle_upload_blob, + ); + reg.register( + method_ids::DOWNLOAD_BLOB, + "DownloadBlob", + blob::handle_download_blob, + ); + + // Device (700-702) + reg.register( + method_ids::REGISTER_DEVICE, + "RegisterDevice", + device::handle_register_device, + ); + reg.register( + method_ids::LIST_DEVICES, + "ListDevices", + device::handle_list_devices, + ); + reg.register( + method_ids::REVOKE_DEVICE, + "RevokeDevice", + device::handle_revoke_device, + ); + + // P2P (800-802) + reg.register( + method_ids::PUBLISH_ENDPOINT, + "PublishEndpoint", + p2p::handle_publish_endpoint, + ); + reg.register( + method_ids::RESOLVE_ENDPOINT, + "ResolveEndpoint", + p2p::handle_resolve_endpoint, + ); + reg.register(method_ids::HEALTH, "Health", p2p::handle_health); + + // Federation (900-905) + reg.register( + method_ids::RELAY_ENQUEUE, + "RelayEnqueue", + federation::handle_relay_enqueue, + ); + reg.register( + method_ids::RELAY_BATCH_ENQUEUE, + "RelayBatchEnqueue", + federation::handle_relay_batch_enqueue, + ); + reg.register( + method_ids::PROXY_FETCH_KEY_PACKAGE, + "ProxyFetchKeyPackage", + federation::handle_proxy_fetch_key_package, + ); + reg.register( + method_ids::PROXY_FETCH_HYBRID_KEY, + "ProxyFetchHybridKey", + federation::handle_proxy_fetch_hybrid_key, + ); + reg.register( + method_ids::PROXY_RESOLVE_USER, + "ProxyResolveUser", + federation::handle_proxy_resolve_user, + ); + reg.register( + method_ids::FEDERATION_HEALTH, + "FederationHealth", + federation::handle_federation_health, + ); + + // Account (950) + reg.register( + method_ids::DELETE_ACCOUNT, + "DeleteAccount", + account::handle_delete_account, + ); + + reg +} diff --git a/crates/quicproquo-server/src/v2_handlers/p2p.rs b/crates/quicproquo-server/src/v2_handlers/p2p.rs new file mode 100644 index 0000000..3483bc7 --- /dev/null +++ b/crates/quicproquo-server/src/v2_handlers/p2p.rs @@ -0,0 +1,108 @@ +//! P2P handlers — publish/resolve endpoints and health check. + +use std::sync::Arc; + +use bytes::Bytes; +use prost::Message; +use quicproquo_proto::qpq::v1; +use quicproquo_rpc::method::{HandlerResult, RequestContext}; + +use crate::domain::p2p::P2pService; +use crate::domain::types::{CallerAuth, PublishEndpointReq, ResolveEndpointReq}; + +use super::{domain_err, require_auth, ServerState}; + +fn caller_auth(identity_key: Vec) -> CallerAuth { + CallerAuth { + identity_key, + token: Vec::new(), + device_id: None, + } +} + +pub async fn handle_publish_endpoint( + state: Arc, + ctx: RequestContext, +) -> HandlerResult { + let identity_key = match require_auth(&state, &ctx) { + Ok(ik) => ik, + Err(e) => return e, + }; + + let req = match v1::PublishEndpointRequest::decode(ctx.payload) { + Ok(r) => r, + Err(e) => { + return HandlerResult::err( + quicproquo_rpc::error::RpcStatus::BadRequest, + &format!("decode: {e}"), + ) + } + }; + + let svc = P2pService { + store: Arc::clone(&state.store), + }; + let auth = caller_auth(identity_key); + + let domain_req = PublishEndpointReq { + identity_key: req.identity_key, + node_addr: req.node_addr, + }; + + match svc.publish_endpoint(domain_req, &auth) { + Ok(()) => { + let proto = v1::PublishEndpointResponse {}; + HandlerResult::ok(Bytes::from(proto.encode_to_vec())) + } + Err(e) => domain_err(e), + } +} + +pub async fn handle_resolve_endpoint( + state: Arc, + ctx: RequestContext, +) -> HandlerResult { + let identity_key = match require_auth(&state, &ctx) { + Ok(ik) => ik, + Err(e) => return e, + }; + + let req = match v1::ResolveEndpointRequest::decode(ctx.payload) { + Ok(r) => r, + Err(e) => { + return HandlerResult::err( + quicproquo_rpc::error::RpcStatus::BadRequest, + &format!("decode: {e}"), + ) + } + }; + + let svc = P2pService { + store: Arc::clone(&state.store), + }; + let auth = caller_auth(identity_key); + + let domain_req = ResolveEndpointReq { + identity_key: req.identity_key, + }; + + match svc.resolve_endpoint(domain_req, &auth) { + Ok(resp) => { + let proto = v1::ResolveEndpointResponse { + node_addr: resp.node_addr, + }; + HandlerResult::ok(Bytes::from(proto.encode_to_vec())) + } + Err(e) => domain_err(e), + } +} + +pub async fn handle_health( + _state: Arc, + _ctx: RequestContext, +) -> HandlerResult { + let resp = v1::HealthResponse { + status: "ok".into(), + }; + HandlerResult::ok(Bytes::from(resp.encode_to_vec())) +} diff --git a/crates/quicproquo-server/src/v2_handlers/user.rs b/crates/quicproquo-server/src/v2_handlers/user.rs new file mode 100644 index 0000000..1551232 --- /dev/null +++ b/crates/quicproquo-server/src/v2_handlers/user.rs @@ -0,0 +1,89 @@ +//! User resolution handlers — username <-> identity key lookups. + +use std::sync::Arc; + +use bytes::Bytes; +use prost::Message; +use quicproquo_proto::qpq::v1; +use quicproquo_rpc::method::{HandlerResult, RequestContext}; + +use crate::domain::types::{ResolveIdentityReq, ResolveUserReq}; +use crate::domain::users::UserService; + +use super::{domain_err, require_auth, ServerState}; + +pub async fn handle_resolve_user(state: Arc, ctx: RequestContext) -> HandlerResult { + let _identity_key = match require_auth(&state, &ctx) { + Ok(ik) => ik, + Err(e) => return e, + }; + + let req = match v1::ResolveUserRequest::decode(ctx.payload) { + Ok(r) => r, + Err(e) => { + return HandlerResult::err( + quicproquo_rpc::error::RpcStatus::BadRequest, + &format!("decode: {e}"), + ) + } + }; + + let svc = UserService { + store: Arc::clone(&state.store), + kt_log: Arc::clone(&state.kt_log), + }; + + let domain_req = ResolveUserReq { + username: req.username, + }; + + match svc.resolve_user(domain_req) { + Ok(resp) => { + let proto = v1::ResolveUserResponse { + identity_key: resp.identity_key, + inclusion_proof: resp.inclusion_proof, + }; + HandlerResult::ok(Bytes::from(proto.encode_to_vec())) + } + Err(e) => domain_err(e), + } +} + +pub async fn handle_resolve_identity( + state: Arc, + ctx: RequestContext, +) -> HandlerResult { + let _identity_key = match require_auth(&state, &ctx) { + Ok(ik) => ik, + Err(e) => return e, + }; + + let req = match v1::ResolveIdentityRequest::decode(ctx.payload) { + Ok(r) => r, + Err(e) => { + return HandlerResult::err( + quicproquo_rpc::error::RpcStatus::BadRequest, + &format!("decode: {e}"), + ) + } + }; + + let svc = UserService { + store: Arc::clone(&state.store), + kt_log: Arc::clone(&state.kt_log), + }; + + let domain_req = ResolveIdentityReq { + identity_key: req.identity_key, + }; + + match svc.resolve_identity(domain_req) { + Ok(resp) => { + let proto = v1::ResolveIdentityResponse { + username: resp.username, + }; + HandlerResult::ok(Bytes::from(proto.encode_to_vec())) + } + Err(e) => domain_err(e), + } +}