feat(server): complete domain service modules (keys, channels, users, blobs, devices, p2p, account)

This commit is contained in:
2026-03-04 12:07:00 +01:00
parent a5864127d1
commit ff93275dc1
9 changed files with 598 additions and 0 deletions

View File

@@ -0,0 +1,28 @@
//! Account domain logic — account deletion with KT tombstone.
use std::sync::{Arc, Mutex};
use quicproquo_kt::MerkleLog;
use crate::storage::Store;
use super::types::DomainError;
/// Domain service for account lifecycle operations.
pub struct AccountService {
pub store: Arc<dyn Store>,
pub kt_log: Arc<Mutex<MerkleLog>>,
}
impl AccountService {
pub fn delete_account(&self, caller_identity_key: &[u8]) -> Result<(), DomainError> {
self.store.delete_account(caller_identity_key)?;
// Append a KT tombstone entry so the deletion is auditable.
if let Ok(mut log) = self.kt_log.lock() {
log.append("__tombstone__", caller_identity_key);
}
Ok(())
}
}

View File

@@ -0,0 +1,193 @@
//! Blob domain logic — chunked file upload/download with SHA-256 verification.
use std::io::{Read, Seek, SeekFrom, Write};
use std::path::PathBuf;
use sha2::{Digest, Sha256};
use super::types::*;
/// Maximum blob size: 100 MB.
const MAX_BLOB_SIZE: u64 = 100 * 1024 * 1024;
/// Maximum download chunk size: 256 KB.
const MAX_DOWNLOAD_CHUNK: u32 = 256 * 1024;
/// Metadata stored alongside each completed blob.
#[derive(serde::Serialize, serde::Deserialize)]
struct BlobMeta {
mime_type: String,
total_size: u64,
uploaded_at: u64,
}
/// Domain service for blob (file attachment) storage.
pub struct BlobService {
pub data_dir: PathBuf,
}
impl BlobService {
fn blobs_dir(&self) -> PathBuf {
self.data_dir.join("blobs")
}
pub fn upload_blob(
&self,
req: UploadBlobReq,
_auth: &CallerAuth,
) -> Result<UploadBlobResp, DomainError> {
if req.blob_hash.len() != 32 {
return Err(DomainError::BlobHashLength(req.blob_hash.len()));
}
if req.total_size > MAX_BLOB_SIZE {
return Err(DomainError::BlobTooLarge(req.total_size));
}
if req.total_size == 0 {
return Err(DomainError::BadParams("total_size must be > 0".into()));
}
if req
.offset
.checked_add(req.chunk.len() as u64)
.map_or(true, |end| end > req.total_size)
{
return Err(DomainError::BadParams(format!(
"chunk out of bounds: offset={} + chunk_len={} > total_size={}",
req.offset,
req.chunk.len(),
req.total_size
)));
}
let blob_hex = hex::encode(&req.blob_hash);
let dir = self.blobs_dir();
std::fs::create_dir_all(&dir)
.map_err(|e| DomainError::Io(format!("create blobs directory: {e}")))?;
let part_path = dir.join(format!("{blob_hex}.part"));
let final_path = dir.join(&blob_hex);
let meta_path = dir.join(format!("{blob_hex}.meta"));
// Already fully uploaded.
if final_path.exists() {
return Ok(UploadBlobResp {
blob_id: req.blob_hash,
});
}
// Write chunk at offset.
let mut file = std::fs::OpenOptions::new()
.create(true)
.write(true)
.truncate(false)
.open(&part_path)
.map_err(|e| DomainError::Io(format!("open .part file: {e}")))?;
file.seek(SeekFrom::Start(req.offset))
.map_err(|e| DomainError::Io(format!("seek: {e}")))?;
file.write_all(&req.chunk)
.map_err(|e| DomainError::Io(format!("write chunk: {e}")))?;
file.sync_all()
.map_err(|e| DomainError::Io(format!("sync: {e}")))?;
// Check if upload is complete.
let end = req.offset + req.chunk.len() as u64;
if end == req.total_size {
// Verify SHA-256.
let mut vfile = std::fs::File::open(&part_path)
.map_err(|e| DomainError::Io(format!("open for verify: {e}")))?;
let mut hasher = Sha256::new();
let mut buf = [0u8; 64 * 1024];
loop {
let n = vfile
.read(&mut buf)
.map_err(|e| DomainError::Io(format!("read: {e}")))?;
if n == 0 {
break;
}
hasher.update(&buf[..n]);
}
let computed: [u8; 32] = hasher.finalize().into();
if computed[..] != req.blob_hash[..] {
let _ = std::fs::remove_file(&part_path);
return Err(DomainError::BlobHashMismatch);
}
// Finalize.
std::fs::rename(&part_path, &final_path)
.map_err(|e| DomainError::Io(format!("rename .part: {e}")))?;
// Write metadata.
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let meta = BlobMeta {
mime_type: req.mime_type,
total_size: req.total_size,
uploaded_at: now,
};
if let Ok(json) = serde_json::to_string_pretty(&meta) {
let _ = std::fs::write(&meta_path, json.as_bytes());
}
}
Ok(UploadBlobResp {
blob_id: req.blob_hash,
})
}
pub fn download_blob(
&self,
req: DownloadBlobReq,
_auth: &CallerAuth,
) -> Result<DownloadBlobResp, DomainError> {
if req.blob_id.len() != 32 {
return Err(DomainError::BlobHashLength(req.blob_id.len()));
}
let blob_hex = hex::encode(&req.blob_id);
let dir = self.blobs_dir();
let blob_path = dir.join(&blob_hex);
let meta_path = dir.join(format!("{blob_hex}.meta"));
if !blob_path.exists() {
return Err(DomainError::BlobNotFound);
}
// Read metadata.
let meta_json = std::fs::read_to_string(&meta_path)
.map_err(|e| DomainError::Io(format!("read blob metadata: {e}")))?;
let meta: BlobMeta = serde_json::from_str(&meta_json)
.map_err(|e| DomainError::Io(format!("corrupt blob metadata: {e}")))?;
// Read chunk.
let mut file = std::fs::File::open(&blob_path)
.map_err(|e| DomainError::Io(format!("open blob: {e}")))?;
let file_len = file
.metadata()
.map_err(|e| DomainError::Io(format!("file metadata: {e}")))?
.len();
if req.offset >= file_len {
return Ok(DownloadBlobResp {
chunk: vec![],
total_size: meta.total_size,
mime_type: meta.mime_type,
});
}
file.seek(SeekFrom::Start(req.offset))
.map_err(|e| DomainError::Io(format!("seek: {e}")))?;
let remaining = (file_len - req.offset) as usize;
let to_read = remaining.min(req.length.min(MAX_DOWNLOAD_CHUNK) as usize);
let mut chunk = vec![0u8; to_read];
file.read_exact(&mut chunk)
.map_err(|e| DomainError::Io(format!("read chunk: {e}")))?;
Ok(DownloadBlobResp {
chunk,
total_size: meta.total_size,
mime_type: meta.mime_type,
})
}
}

View File

@@ -0,0 +1,38 @@
//! Channel domain logic — 1:1 DM channel creation.
use std::sync::Arc;
use crate::storage::Store;
use super::types::*;
/// Domain service for 1:1 channel management.
pub struct ChannelService {
pub store: Arc<dyn Store>,
}
impl ChannelService {
pub fn create_channel(
&self,
req: CreateChannelReq,
caller_identity_key: &[u8],
) -> Result<CreateChannelResp, DomainError> {
if req.peer_key.len() != 32 {
return Err(DomainError::InvalidIdentityKey(req.peer_key.len()));
}
if caller_identity_key == req.peer_key.as_slice() {
return Err(DomainError::BadParams(
"peer_key must not equal caller identity".into(),
));
}
let (channel_id, was_new) = self
.store
.create_channel(caller_identity_key, &req.peer_key)?;
Ok(CreateChannelResp {
channel_id,
was_new,
})
}
}

View File

@@ -0,0 +1,76 @@
//! Device registry domain logic — register, list, revoke devices.
use std::sync::Arc;
use crate::storage::Store;
use super::types::*;
const MAX_DEVICES_PER_IDENTITY: usize = 5;
/// Domain service for multi-device management.
pub struct DeviceService {
pub store: Arc<dyn Store>,
}
impl DeviceService {
pub fn register_device(
&self,
req: RegisterDeviceReq,
caller_identity_key: &[u8],
) -> Result<RegisterDeviceResp, DomainError> {
if req.device_id.is_empty() {
return Err(DomainError::BadParams(
"device_id must not be empty".into(),
));
}
let count = self.store.device_count(caller_identity_key)?;
if count >= MAX_DEVICES_PER_IDENTITY {
return Err(DomainError::DeviceLimit(MAX_DEVICES_PER_IDENTITY));
}
let success =
self.store
.register_device(caller_identity_key, &req.device_id, &req.device_name)?;
Ok(RegisterDeviceResp { success })
}
pub fn list_devices(
&self,
caller_identity_key: &[u8],
) -> Result<ListDevicesResp, DomainError> {
let raw = self.store.list_devices(caller_identity_key)?;
let devices = raw
.into_iter()
.map(|(device_id, device_name, registered_at)| DeviceInfo {
device_id,
device_name,
registered_at,
})
.collect();
Ok(ListDevicesResp { devices })
}
pub fn revoke_device(
&self,
req: RevokeDeviceReq,
caller_identity_key: &[u8],
) -> Result<RevokeDeviceResp, DomainError> {
if req.device_id.is_empty() {
return Err(DomainError::BadParams(
"device_id must not be empty".into(),
));
}
let success = self
.store
.revoke_device(caller_identity_key, &req.device_id)?;
if !success {
return Err(DomainError::DeviceNotFound);
}
Ok(RevokeDeviceResp { success })
}
}

View File

@@ -0,0 +1,93 @@
//! Key management domain logic — KeyPackage and hybrid key operations.
use std::sync::Arc;
use sha2::{Digest, Sha256};
use crate::storage::Store;
use super::types::*;
const MAX_KEYPACKAGE_BYTES: usize = 1024 * 1024; // 1 MB
/// Domain service for MLS KeyPackage and hybrid (PQ) key management.
pub struct KeyService {
pub store: Arc<dyn Store>,
}
impl KeyService {
pub fn upload_key_package(
&self,
req: UploadKeyPackageReq,
_auth: &CallerAuth,
) -> Result<UploadKeyPackageResp, DomainError> {
if req.identity_key.len() != 32 {
return Err(DomainError::InvalidIdentityKey(req.identity_key.len()));
}
if req.package.is_empty() {
return Err(DomainError::EmptyPackage);
}
if req.package.len() > MAX_KEYPACKAGE_BYTES {
return Err(DomainError::PackageTooLarge(req.package.len()));
}
let fingerprint: Vec<u8> = Sha256::digest(&req.package).to_vec();
self.store
.upload_key_package(&req.identity_key, req.package)?;
Ok(UploadKeyPackageResp { fingerprint })
}
pub fn fetch_key_package(
&self,
req: FetchKeyPackageReq,
_auth: &CallerAuth,
) -> Result<FetchKeyPackageResp, DomainError> {
let package = self.store.fetch_key_package(&req.identity_key)?;
Ok(FetchKeyPackageResp {
package: package.unwrap_or_default(),
})
}
pub fn upload_hybrid_key(
&self,
req: UploadHybridKeyReq,
_auth: &CallerAuth,
) -> Result<(), DomainError> {
if req.identity_key.len() != 32 {
return Err(DomainError::InvalidIdentityKey(req.identity_key.len()));
}
if req.hybrid_public_key.is_empty() {
return Err(DomainError::EmptyHybridKey);
}
self.store
.upload_hybrid_key(&req.identity_key, req.hybrid_public_key)?;
Ok(())
}
pub fn fetch_hybrid_key(
&self,
req: FetchHybridKeyReq,
_auth: &CallerAuth,
) -> Result<FetchHybridKeyResp, DomainError> {
let hybrid_public_key = self
.store
.fetch_hybrid_key(&req.identity_key)?
.unwrap_or_default();
Ok(FetchHybridKeyResp { hybrid_public_key })
}
pub fn fetch_hybrid_keys(
&self,
req: FetchHybridKeysReq,
_auth: &CallerAuth,
) -> Result<FetchHybridKeysResp, DomainError> {
let mut keys = Vec::with_capacity(req.identity_keys.len());
for ik in &req.identity_keys {
let pk = self.store.fetch_hybrid_key(ik)?.unwrap_or_default();
keys.push(pk);
}
Ok(FetchHybridKeysResp { keys })
}
}

View File

@@ -8,3 +8,10 @@
pub mod types;
pub mod auth;
pub mod delivery;
pub mod keys;
pub mod channels;
pub mod users;
pub mod blobs;
pub mod devices;
pub mod p2p;
pub mod account;

View File

@@ -0,0 +1,50 @@
//! P2P endpoint domain logic — publish, resolve, health.
use std::sync::Arc;
use crate::storage::Store;
use super::types::*;
/// Domain service for P2P endpoint management and health checks.
pub struct P2pService {
pub store: Arc<dyn Store>,
}
impl P2pService {
pub fn publish_endpoint(
&self,
req: PublishEndpointReq,
_auth: &CallerAuth,
) -> Result<(), DomainError> {
if req.identity_key.len() != 32 {
return Err(DomainError::InvalidIdentityKey(req.identity_key.len()));
}
self.store
.publish_endpoint(&req.identity_key, req.node_addr)?;
Ok(())
}
pub fn resolve_endpoint(
&self,
req: ResolveEndpointReq,
_auth: &CallerAuth,
) -> Result<ResolveEndpointResp, DomainError> {
if req.identity_key.len() != 32 {
return Err(DomainError::InvalidIdentityKey(req.identity_key.len()));
}
let node_addr = self
.store
.resolve_endpoint(&req.identity_key)?
.unwrap_or_default();
Ok(ResolveEndpointResp { node_addr })
}
pub fn health() -> HealthResp {
HealthResp {
status: "ok".into(),
}
}
}

View File

@@ -2,6 +2,56 @@
//!
//! No proto, no capnp — just Rust structs.
use crate::storage::StorageError;
// ── Domain Error ────────────────────────────────────────────────────────────
/// Errors returned by domain service methods.
#[derive(thiserror::Error, Debug)]
pub enum DomainError {
#[error("identity key must be exactly 32 bytes, got {0}")]
InvalidIdentityKey(usize),
#[error("key package must not be empty")]
EmptyPackage,
#[error("key package exceeds max size ({0} bytes)")]
PackageTooLarge(usize),
#[error("hybrid public key must not be empty")]
EmptyHybridKey,
#[error("username must not be empty")]
EmptyUsername,
#[error("blob hash must be exactly 32 bytes, got {0}")]
BlobHashLength(usize),
#[error("blob exceeds max size ({0} bytes)")]
BlobTooLarge(u64),
#[error("SHA-256 of uploaded data does not match blob hash")]
BlobHashMismatch,
#[error("blob not found")]
BlobNotFound,
#[error("maximum {0} devices per identity")]
DeviceLimit(usize),
#[error("device not found")]
DeviceNotFound,
#[error("bad parameters: {0}")]
BadParams(String),
#[error("I/O error: {0}")]
Io(String),
#[error("storage error: {0}")]
Storage(#[from] StorageError),
}
// ── Auth ─────────────────────────────────────────────────────────────────────
/// Caller authentication context (resolved from session token).

View File

@@ -0,0 +1,63 @@
//! User resolution domain logic — username <-> identity key lookups.
use std::sync::{Arc, Mutex};
use quicproquo_kt::MerkleLog;
use crate::storage::Store;
use super::types::*;
/// Domain service for user/identity resolution.
pub struct UserService {
pub store: Arc<dyn Store>,
pub kt_log: Arc<Mutex<MerkleLog>>,
}
impl UserService {
pub fn resolve_user(&self, req: ResolveUserReq) -> Result<ResolveUserResp, DomainError> {
if req.username.is_empty() {
return Err(DomainError::EmptyUsername);
}
let identity_key = self
.store
.get_user_identity_key(&req.username)?
.unwrap_or_default();
let mut inclusion_proof = Vec::new();
if !identity_key.is_empty() {
if let Ok(log) = self.kt_log.lock() {
if let Some(leaf_idx) = log.find(&req.username, &identity_key) {
if let Ok(proof) = log.inclusion_proof(leaf_idx) {
if let Ok(bytes) = proof.to_bytes() {
inclusion_proof = bytes;
}
}
}
}
}
Ok(ResolveUserResp {
identity_key,
inclusion_proof,
})
}
pub fn resolve_identity(
&self,
req: ResolveIdentityReq,
) -> Result<ResolveIdentityResp, DomainError> {
if req.identity_key.len() != 32 {
return Err(DomainError::InvalidIdentityKey(req.identity_key.len()));
}
let username = self
.store
.resolve_identity_key(&req.identity_key)?
.unwrap_or_default();
Ok(ResolveIdentityResp { username })
}
}