feat: Sprint 5 — encrypted file transfer with chunked upload/download
- Add uploadBlob (@21) and downloadBlob (@22) RPCs to Cap'n Proto schema with SHA-256 content addressing and chunked transfer - Server blob handler: 256KB chunks, SHA-256 verification on finalize, .meta JSON sidecar, 50MB size limit, content-addressable storage - Add FileRef (0x08) AppMessage variant with blob_id, filename, file_size, mime_type - /send-file command: read file, compute hash, upload in chunks with progress display, send FileRef via MLS, MIME auto-detection - /download command: fetch blob in chunks with progress, verify hash, save to disk with collision avoidance - 2 new E2E tests: upload/download round-trip with partial reads, hash mismatch rejection (14 E2E tests total) - New error codes: E024-E027 for blob operations
This commit is contained in:
@@ -12,8 +12,8 @@ use anyhow::Context;
|
||||
use quicproquo_core::{
|
||||
AppMessage, DiskKeyStore, GroupMember, IdentityKeypair, ReceivedMessage,
|
||||
compute_safety_number, hybrid_encrypt, parse as parse_app_msg, serialize_chat,
|
||||
serialize_delete, serialize_edit, serialize_reaction, serialize_read_receipt,
|
||||
serialize_typing,
|
||||
serialize_delete, serialize_edit, serialize_file_ref, serialize_reaction,
|
||||
serialize_read_receipt, serialize_typing,
|
||||
};
|
||||
use quicproquo_proto::node_capnp::node_service;
|
||||
use tokio::sync::mpsc;
|
||||
@@ -26,9 +26,9 @@ use super::conversation::{
|
||||
};
|
||||
use super::display;
|
||||
use super::rpc::{
|
||||
connect_node, create_channel, enqueue, fetch_hybrid_key, fetch_key_package,
|
||||
fetch_wait, resolve_identity, resolve_user, try_hybrid_decrypt, upload_hybrid_key,
|
||||
upload_key_package,
|
||||
connect_node, create_channel, download_blob_chunk, enqueue, fetch_hybrid_key,
|
||||
fetch_key_package, fetch_wait, resolve_identity, resolve_user, try_hybrid_decrypt,
|
||||
upload_blob_chunk, upload_hybrid_key, upload_key_package,
|
||||
};
|
||||
use super::session::SessionState;
|
||||
use super::state::{decode_identity_key, load_or_init_state};
|
||||
@@ -73,6 +73,10 @@ enum SlashCommand {
|
||||
Edit { index: usize, new_text: String },
|
||||
/// Delete a previously sent message by index.
|
||||
Delete { index: usize },
|
||||
/// Send a file to the active conversation.
|
||||
SendFile { path: String },
|
||||
/// Download a file attachment by message index.
|
||||
Download { index: usize },
|
||||
}
|
||||
|
||||
fn parse_input(line: &str) -> Input {
|
||||
@@ -206,6 +210,20 @@ fn parse_input(line: &str) -> Input {
|
||||
Input::Empty
|
||||
}
|
||||
},
|
||||
"/send-file" | "/sf" => match arg {
|
||||
Some(path) => Input::Slash(SlashCommand::SendFile { path }),
|
||||
None => {
|
||||
display::print_error("usage: /send-file <path>");
|
||||
Input::Empty
|
||||
}
|
||||
},
|
||||
"/download" | "/dl" => match arg.and_then(|s| s.parse::<usize>().ok()) {
|
||||
Some(index) => Input::Slash(SlashCommand::Download { index }),
|
||||
None => {
|
||||
display::print_error("usage: /download <msg-index>");
|
||||
Input::Empty
|
||||
}
|
||||
},
|
||||
_ => {
|
||||
display::print_error(&format!("unknown command: {cmd}. Try /help"));
|
||||
Input::Empty
|
||||
@@ -692,6 +710,8 @@ async fn handle_slash(
|
||||
SlashCommand::React { emoji, index } => cmd_react(session, client, &emoji, index).await,
|
||||
SlashCommand::Edit { index, new_text } => cmd_edit(session, client, index, &new_text).await,
|
||||
SlashCommand::Delete { index } => cmd_delete(session, client, index).await,
|
||||
SlashCommand::SendFile { path } => cmd_send_file(session, client, &path).await,
|
||||
SlashCommand::Download { index } => cmd_download(session, client, index).await,
|
||||
};
|
||||
if let Err(e) = result {
|
||||
display::print_error(&format!("{e:#}"));
|
||||
@@ -720,6 +740,8 @@ fn print_help() {
|
||||
display::print_status(" /typing-notify on|off - Toggle typing notifications");
|
||||
display::print_status(" /edit <index> <new text> - Edit a sent message");
|
||||
display::print_status(" /delete <index> - Delete a sent message");
|
||||
display::print_status(" /send-file <path> - Upload and send a file (max 50 MB)");
|
||||
display::print_status(" /download <index> - Download a received file attachment");
|
||||
display::print_status(" /quit - Exit");
|
||||
}
|
||||
|
||||
@@ -1667,6 +1689,319 @@ async fn cmd_delete(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ── File transfer ────────────────────────────────────────────────────────────
|
||||
|
||||
/// Maximum file size for upload (50 MB).
|
||||
const MAX_FILE_SIZE: u64 = 50 * 1024 * 1024;
|
||||
/// Chunk size for upload/download (256 KB).
|
||||
const BLOB_CHUNK_SIZE: usize = 256 * 1024;
|
||||
|
||||
/// Guess MIME type from file extension.
|
||||
fn guess_mime(path: &Path) -> &'static str {
|
||||
match path
|
||||
.extension()
|
||||
.and_then(|e| e.to_str())
|
||||
.map(|e| e.to_lowercase())
|
||||
.as_deref()
|
||||
{
|
||||
Some("pdf") => "application/pdf",
|
||||
Some("jpg" | "jpeg") => "image/jpeg",
|
||||
Some("png") => "image/png",
|
||||
Some("gif") => "image/gif",
|
||||
Some("txt") => "text/plain",
|
||||
Some("zip") => "application/zip",
|
||||
Some("json") => "application/json",
|
||||
Some("html" | "htm") => "text/html",
|
||||
Some("mp4") => "video/mp4",
|
||||
Some("mp3") => "audio/mpeg",
|
||||
Some("webp") => "image/webp",
|
||||
Some("svg") => "image/svg+xml",
|
||||
_ => "application/octet-stream",
|
||||
}
|
||||
}
|
||||
|
||||
/// Format a byte size for human display (e.g. "1.2 MB").
|
||||
fn format_size(bytes: u64) -> String {
|
||||
if bytes < 1024 {
|
||||
format!("{bytes} B")
|
||||
} else if bytes < 1024 * 1024 {
|
||||
format!("{:.1} KB", bytes as f64 / 1024.0)
|
||||
} else if bytes < 1024 * 1024 * 1024 {
|
||||
format!("{:.1} MB", bytes as f64 / (1024.0 * 1024.0))
|
||||
} else {
|
||||
format!("{:.1} GB", bytes as f64 / (1024.0 * 1024.0 * 1024.0))
|
||||
}
|
||||
}
|
||||
|
||||
async fn cmd_send_file(
|
||||
session: &mut SessionState,
|
||||
client: &node_service::Client,
|
||||
path_str: &str,
|
||||
) -> anyhow::Result<()> {
|
||||
let conv_id = session
|
||||
.active_conversation
|
||||
.as_ref()
|
||||
.context("no active conversation; use /dm or /create-group first")?
|
||||
.clone();
|
||||
|
||||
let file_path = PathBuf::from(path_str.trim_matches('"'));
|
||||
anyhow::ensure!(file_path.exists(), "file not found: {}", file_path.display());
|
||||
|
||||
let metadata = std::fs::metadata(&file_path)
|
||||
.with_context(|| format!("cannot read file: {}", file_path.display()))?;
|
||||
let file_size = metadata.len();
|
||||
anyhow::ensure!(
|
||||
file_size <= MAX_FILE_SIZE,
|
||||
"file too large ({}, max {})",
|
||||
format_size(file_size),
|
||||
format_size(MAX_FILE_SIZE)
|
||||
);
|
||||
|
||||
let filename = file_path
|
||||
.file_name()
|
||||
.and_then(|n| n.to_str())
|
||||
.context("cannot determine filename")?
|
||||
.to_string();
|
||||
let mime_type = guess_mime(&file_path);
|
||||
|
||||
// Read entire file and compute SHA-256 hash.
|
||||
let file_bytes = std::fs::read(&file_path)
|
||||
.with_context(|| format!("read file: {}", file_path.display()))?;
|
||||
|
||||
use sha2::{Sha256, Digest};
|
||||
let hash = Sha256::digest(&file_bytes);
|
||||
let blob_hash: [u8; 32] = hash.into();
|
||||
|
||||
// Upload in chunks with progress.
|
||||
let total = file_bytes.len();
|
||||
let mut offset = 0usize;
|
||||
while offset < total {
|
||||
let end = (offset + BLOB_CHUNK_SIZE).min(total);
|
||||
let chunk = &file_bytes[offset..end];
|
||||
upload_blob_chunk(
|
||||
client,
|
||||
&blob_hash,
|
||||
chunk,
|
||||
offset as u64,
|
||||
total as u64,
|
||||
mime_type,
|
||||
)
|
||||
.await?;
|
||||
offset = end;
|
||||
let pct = (offset as u64 * 100) / total as u64;
|
||||
eprint!("\rUploading... {pct}%");
|
||||
}
|
||||
eprintln!();
|
||||
|
||||
// Build FileRef AppMessage.
|
||||
let app_payload = serialize_file_ref(
|
||||
&blob_hash,
|
||||
filename.as_bytes(),
|
||||
file_size,
|
||||
mime_type.as_bytes(),
|
||||
)
|
||||
.context("serialize FileRef")?;
|
||||
|
||||
let my_key = session.identity_bytes();
|
||||
let identity = std::sync::Arc::clone(&session.identity);
|
||||
|
||||
let member = session
|
||||
.get_member_mut(&conv_id)
|
||||
.context("no group member")?;
|
||||
|
||||
anyhow::ensure!(
|
||||
member.group_ref().is_some(),
|
||||
"cannot send files in a local-only conversation"
|
||||
);
|
||||
|
||||
let sealed = quicproquo_core::sealed_sender::seal(&identity, &app_payload);
|
||||
let padded = quicproquo_core::padding::pad(&sealed);
|
||||
|
||||
let ct = member
|
||||
.send_message(&padded)
|
||||
.context("MLS send_message failed")?;
|
||||
|
||||
let recipients: Vec<Vec<u8>> = member
|
||||
.member_identities()
|
||||
.into_iter()
|
||||
.filter(|id| id.as_slice() != my_key.as_slice())
|
||||
.collect();
|
||||
|
||||
for recipient_key in &recipients {
|
||||
let peer_hybrid_pk = fetch_hybrid_key(client, recipient_key).await?;
|
||||
let payload = if let Some(ref pk) = peer_hybrid_pk {
|
||||
hybrid_encrypt(pk, &ct, b"", b"").context("hybrid encrypt")?
|
||||
} else {
|
||||
ct.clone()
|
||||
};
|
||||
enqueue(client, recipient_key, &payload).await?;
|
||||
}
|
||||
|
||||
// Store outgoing message (include blob hash so /download can extract it).
|
||||
let body = format!(
|
||||
"\u{1f4ce} {} ({}) blob:{}",
|
||||
filename,
|
||||
format_size(file_size),
|
||||
hex::encode(blob_hash)
|
||||
);
|
||||
let msg = StoredMessage {
|
||||
conversation_id: conv_id.clone(),
|
||||
message_id: None,
|
||||
sender_key: my_key,
|
||||
sender_name: Some("you".into()),
|
||||
body,
|
||||
msg_type: "file".into(),
|
||||
ref_msg_id: None,
|
||||
timestamp_ms: now_ms(),
|
||||
is_outgoing: true,
|
||||
};
|
||||
session.conv_store.save_message(&msg)?;
|
||||
session.conv_store.update_activity(&conv_id, now_ms())?;
|
||||
session.save_member(&conv_id)?;
|
||||
|
||||
display::print_status(&format!(
|
||||
"Sent: {} ({})",
|
||||
filename,
|
||||
format_size(file_size)
|
||||
));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn cmd_download(
|
||||
session: &mut SessionState,
|
||||
client: &node_service::Client,
|
||||
index: usize,
|
||||
) -> anyhow::Result<()> {
|
||||
let conv_id = session
|
||||
.active_conversation
|
||||
.as_ref()
|
||||
.context("no active conversation")?
|
||||
.clone();
|
||||
|
||||
let msgs = session.conv_store.load_all_messages(&conv_id)?;
|
||||
anyhow::ensure!(!msgs.is_empty(), "no messages in this conversation");
|
||||
anyhow::ensure!(
|
||||
index < msgs.len(),
|
||||
"message index {index} out of range (0..{})",
|
||||
msgs.len() - 1
|
||||
);
|
||||
|
||||
let target = &msgs[index];
|
||||
anyhow::ensure!(
|
||||
target.msg_type == "file",
|
||||
"message at index {index} is not a file (type: {})",
|
||||
target.msg_type
|
||||
);
|
||||
|
||||
// Extract blob_id from the stored ref_msg_id field (32-byte blob hash stored as first 16 bytes
|
||||
// in ref_msg_id is not enough). We store the blob_id hex in the body after the filename.
|
||||
// Parse the body format: "\u{1f4ce} filename (size) blob:HEXHASH"
|
||||
let blob_id = extract_blob_id_from_body(&target.body)
|
||||
.context("cannot extract blob_id from file message; the message may be from an older version")?;
|
||||
|
||||
// Get filename from body: "\u{1f4ce} filename (size) ..."
|
||||
let filename = extract_filename_from_body(&target.body)
|
||||
.unwrap_or_else(|| "download".to_string());
|
||||
|
||||
// Download in chunks.
|
||||
// First request to learn total_size.
|
||||
let (first_chunk, total_size, _mime) =
|
||||
download_blob_chunk(client, &blob_id, 0, BLOB_CHUNK_SIZE as u32).await?;
|
||||
|
||||
let mut data = Vec::with_capacity(total_size as usize);
|
||||
data.extend_from_slice(&first_chunk);
|
||||
|
||||
if total_size > first_chunk.len() as u64 {
|
||||
let pct = (data.len() as u64 * 100) / total_size;
|
||||
eprint!("\rDownloading... {pct}%");
|
||||
}
|
||||
|
||||
while (data.len() as u64) < total_size {
|
||||
let (chunk, _, _) = download_blob_chunk(
|
||||
client,
|
||||
&blob_id,
|
||||
data.len() as u64,
|
||||
BLOB_CHUNK_SIZE as u32,
|
||||
)
|
||||
.await?;
|
||||
if chunk.is_empty() {
|
||||
break;
|
||||
}
|
||||
data.extend_from_slice(&chunk);
|
||||
let pct = (data.len() as u64 * 100) / total_size;
|
||||
eprint!("\rDownloading... {pct}%");
|
||||
}
|
||||
if total_size > BLOB_CHUNK_SIZE as u64 {
|
||||
eprintln!();
|
||||
}
|
||||
|
||||
// Verify SHA-256.
|
||||
use sha2::{Sha256, Digest};
|
||||
let computed_hash = Sha256::digest(&data);
|
||||
anyhow::ensure!(
|
||||
computed_hash.as_slice() == blob_id.as_slice(),
|
||||
"SHA-256 mismatch: blob data is corrupt"
|
||||
);
|
||||
|
||||
// Save to current directory, avoiding overwrites.
|
||||
let mut save_path = PathBuf::from(&filename);
|
||||
let mut counter = 1u32;
|
||||
while save_path.exists() {
|
||||
let stem = Path::new(&filename)
|
||||
.file_stem()
|
||||
.and_then(|s| s.to_str())
|
||||
.unwrap_or("download");
|
||||
let ext = Path::new(&filename)
|
||||
.extension()
|
||||
.and_then(|e| e.to_str())
|
||||
.unwrap_or("");
|
||||
if ext.is_empty() {
|
||||
save_path = PathBuf::from(format!("{stem}.{counter}"));
|
||||
} else {
|
||||
save_path = PathBuf::from(format!("{stem}.{counter}.{ext}"));
|
||||
}
|
||||
counter += 1;
|
||||
}
|
||||
|
||||
std::fs::write(&save_path, &data)
|
||||
.with_context(|| format!("write file: {}", save_path.display()))?;
|
||||
|
||||
display::print_status(&format!(
|
||||
"Downloaded: {} -> ./{}",
|
||||
filename,
|
||||
save_path.display()
|
||||
));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Extract blob_id from the file message body format:
|
||||
/// "\u{1f4ce} filename (size) blob:HEX64"
|
||||
fn extract_blob_id_from_body(body: &str) -> Option<Vec<u8>> {
|
||||
let marker = "blob:";
|
||||
let idx = body.find(marker)?;
|
||||
let hex_str = &body[idx + marker.len()..];
|
||||
let hex_str = hex_str.split_whitespace().next()?;
|
||||
if hex_str.len() != 64 {
|
||||
return None;
|
||||
}
|
||||
hex::decode(hex_str).ok()
|
||||
}
|
||||
|
||||
/// Extract filename from the file message body format:
|
||||
/// "\u{1f4ce} filename (size) blob:HEX64"
|
||||
fn extract_filename_from_body(body: &str) -> Option<String> {
|
||||
// Skip the leading emoji + space.
|
||||
let rest = body.strip_prefix("\u{1f4ce} ")?;
|
||||
// Find the last " (" to separate filename from "(size) blob:..."
|
||||
let paren_idx = rest.rfind(" (")?;
|
||||
let filename = &rest[..paren_idx];
|
||||
if filename.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(filename.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
// ── Sending ──────────────────────────────────────────────────────────────────
|
||||
|
||||
async fn handle_send(
|
||||
@@ -1937,7 +2272,7 @@ async fn poll_messages(
|
||||
break;
|
||||
}
|
||||
|
||||
// Storable message types: Chat, Reply, Reaction, legacy.
|
||||
// Storable message types: Chat, Reply, Reaction, FileRef, legacy.
|
||||
let (body, msg_id, msg_type, ref_msg_id) = match parsed {
|
||||
Ok((_, AppMessage::Chat { message_id, body })) => (
|
||||
String::from_utf8_lossy(&body).to_string(),
|
||||
@@ -1957,6 +2292,16 @@ async fn poll_messages(
|
||||
"reaction",
|
||||
Some(ref_msg_id),
|
||||
),
|
||||
Ok((_, AppMessage::FileRef { blob_id, filename, file_size, .. })) => {
|
||||
let fname = String::from_utf8_lossy(&filename).to_string();
|
||||
let body = format!(
|
||||
"\u{1f4ce} {} ({}) blob:{}",
|
||||
fname,
|
||||
format_size(file_size),
|
||||
hex::encode(blob_id),
|
||||
);
|
||||
(body, None, "file", None)
|
||||
}
|
||||
_ => {
|
||||
// Legacy raw plaintext or unknown type.
|
||||
(
|
||||
@@ -1997,6 +2342,12 @@ async fn poll_messages(
|
||||
let conv_name = conv.map(|c| c.display_name).unwrap_or_default();
|
||||
let display_body = if msg_type == "reaction" {
|
||||
format!("reacted {body}")
|
||||
} else if msg_type == "file" {
|
||||
// Show the file info without the blob: suffix, plus download hint.
|
||||
let visible = body.split(" blob:").next().unwrap_or(&body);
|
||||
let all_msgs = session.conv_store.load_all_messages(conv_id)?;
|
||||
let msg_idx = all_msgs.len().saturating_sub(1);
|
||||
format!("{visible} -- use /download {msg_idx} to save")
|
||||
} else {
|
||||
body.clone()
|
||||
};
|
||||
|
||||
@@ -767,6 +767,70 @@ pub async fn create_channel(
|
||||
Ok((channel_id, was_new))
|
||||
}
|
||||
|
||||
/// Upload a single chunk of a blob to the server.
|
||||
///
|
||||
/// `blob_hash` is the expected SHA-256 hash (32 bytes) of the complete blob.
|
||||
/// Returns the `blob_id` once the server has received and verified the final chunk.
|
||||
pub async fn upload_blob_chunk(
|
||||
client: &node_service::Client,
|
||||
blob_hash: &[u8],
|
||||
chunk: &[u8],
|
||||
offset: u64,
|
||||
total_size: u64,
|
||||
mime_type: &str,
|
||||
) -> anyhow::Result<Vec<u8>> {
|
||||
let mut req = client.upload_blob_request();
|
||||
{
|
||||
let mut p = req.get();
|
||||
let mut auth = p.reborrow().init_auth();
|
||||
set_auth(&mut auth)?;
|
||||
p.set_blob_hash(blob_hash);
|
||||
p.set_chunk(chunk);
|
||||
p.set_offset(offset);
|
||||
p.set_total_size(total_size);
|
||||
p.set_mime_type(mime_type);
|
||||
}
|
||||
let resp = req.send().promise.await.context("upload_blob RPC failed")?;
|
||||
let blob_id = resp
|
||||
.get()
|
||||
.context("upload_blob: bad response")?
|
||||
.get_blob_id()
|
||||
.context("upload_blob: missing blob_id")?
|
||||
.to_vec();
|
||||
Ok(blob_id)
|
||||
}
|
||||
|
||||
/// Download a single chunk of a blob from the server.
|
||||
///
|
||||
/// Returns `(chunk_bytes, total_size, mime_type)`.
|
||||
pub async fn download_blob_chunk(
|
||||
client: &node_service::Client,
|
||||
blob_id: &[u8],
|
||||
offset: u64,
|
||||
length: u32,
|
||||
) -> anyhow::Result<(Vec<u8>, u64, String)> {
|
||||
let mut req = client.download_blob_request();
|
||||
{
|
||||
let mut p = req.get();
|
||||
let mut auth = p.reborrow().init_auth();
|
||||
set_auth(&mut auth)?;
|
||||
p.set_blob_id(blob_id);
|
||||
p.set_offset(offset);
|
||||
p.set_length(length);
|
||||
}
|
||||
let resp = req.send().promise.await.context("download_blob RPC failed")?;
|
||||
let reader = resp.get().context("download_blob: bad response")?;
|
||||
let chunk = reader.get_chunk().context("download_blob: missing chunk")?.to_vec();
|
||||
let total_size = reader.get_total_size();
|
||||
let mime_type = reader
|
||||
.get_mime_type()
|
||||
.context("download_blob: missing mime_type")?
|
||||
.to_str()
|
||||
.unwrap_or("application/octet-stream")
|
||||
.to_string();
|
||||
Ok((chunk, total_size, mime_type))
|
||||
}
|
||||
|
||||
/// Return the current Unix timestamp in milliseconds.
|
||||
pub fn current_timestamp_ms() -> u64 {
|
||||
std::time::SystemTime::now()
|
||||
|
||||
@@ -15,6 +15,8 @@ fn ensure_rustls_provider() {
|
||||
let _ = rustls::crypto::ring::default_provider().install_default();
|
||||
}
|
||||
|
||||
use sha2::{Sha256, Digest};
|
||||
|
||||
use quicproquo_client::{
|
||||
cmd_create_group, cmd_invite, cmd_join, cmd_login, cmd_ping, cmd_register_state,
|
||||
cmd_register_user, cmd_send, connect_node, create_channel, enqueue, fetch_wait, init_auth,
|
||||
@@ -1342,3 +1344,199 @@ async fn e2e_multi_party_group() -> anyhow::Result<()> {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ─── blob upload / download tests ────────────────────────────────────────────
|
||||
|
||||
/// Upload a 2 KB blob, download it in full, then download a partial slice.
|
||||
/// Verifies SHA-256 integrity, blobId, and partial-range semantics.
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn e2e_file_upload_download() -> anyhow::Result<()> {
|
||||
ensure_rustls_provider();
|
||||
|
||||
let temp = TempDir::new()?;
|
||||
let base = temp.path();
|
||||
|
||||
let (server, ca_cert, _child) = spawn_server(base, &["--sealed-sender"]);
|
||||
wait_for_health(&server, &ca_cert, "localhost").await?;
|
||||
init_auth(ClientAuth::from_parts("devtoken".to_string(), None));
|
||||
|
||||
let local = tokio::task::LocalSet::new();
|
||||
|
||||
// Register Alice (needed so the auth context is valid).
|
||||
let alice_state = base.join("alice.bin");
|
||||
local
|
||||
.run_until(cmd_register_state(&alice_state, &server, &ca_cert, "localhost", None))
|
||||
.await?;
|
||||
|
||||
let client = local.run_until(connect_node(&server, &ca_cert, "localhost")).await?;
|
||||
|
||||
// Build 2 KB of known data.
|
||||
let pattern = b"hello-world-file-test\n";
|
||||
let repeat_count = (2048 + pattern.len() - 1) / pattern.len();
|
||||
let file_data: Vec<u8> = pattern.iter().copied().cycle().take(repeat_count * pattern.len()).collect();
|
||||
let file_data = &file_data[..2048]; // exactly 2 KB
|
||||
|
||||
// Compute SHA-256.
|
||||
let hash: [u8; 32] = Sha256::digest(file_data).into();
|
||||
|
||||
// ── Upload ──
|
||||
let blob_id = local
|
||||
.run_until(async {
|
||||
let mut req = client.upload_blob_request();
|
||||
{
|
||||
let mut p = req.get();
|
||||
let mut auth = p.reborrow().init_auth();
|
||||
quicproquo_client::client::rpc::set_auth(&mut auth)?;
|
||||
p.set_blob_hash(&hash);
|
||||
p.set_chunk(file_data);
|
||||
p.set_offset(0);
|
||||
p.set_total_size(file_data.len() as u64);
|
||||
p.set_mime_type("application/octet-stream");
|
||||
}
|
||||
let resp = req.send().promise.await
|
||||
.map_err(|e| anyhow::anyhow!("uploadBlob RPC failed: {e}"))?;
|
||||
let blob_id = resp.get()
|
||||
.map_err(|e| anyhow::anyhow!("uploadBlob bad response: {e}"))?
|
||||
.get_blob_id()
|
||||
.map_err(|e| anyhow::anyhow!("uploadBlob missing blobId: {e}"))?
|
||||
.to_vec();
|
||||
Ok::<Vec<u8>, anyhow::Error>(blob_id)
|
||||
})
|
||||
.await?;
|
||||
|
||||
anyhow::ensure!(
|
||||
blob_id == hash,
|
||||
"blobId must equal SHA-256 hash; got {} vs {}",
|
||||
hex_encode(&blob_id),
|
||||
hex_encode(&hash)
|
||||
);
|
||||
|
||||
// ── Full download ──
|
||||
let (chunk, total_size) = local
|
||||
.run_until(async {
|
||||
let mut req = client.download_blob_request();
|
||||
{
|
||||
let mut p = req.get();
|
||||
let mut auth = p.reborrow().init_auth();
|
||||
quicproquo_client::client::rpc::set_auth(&mut auth)?;
|
||||
p.set_blob_id(&blob_id);
|
||||
p.set_offset(0);
|
||||
p.set_length(file_data.len() as u32);
|
||||
}
|
||||
let resp = req.send().promise.await
|
||||
.map_err(|e| anyhow::anyhow!("downloadBlob RPC failed: {e}"))?;
|
||||
let r = resp.get()
|
||||
.map_err(|e| anyhow::anyhow!("downloadBlob bad response: {e}"))?;
|
||||
let chunk = r.get_chunk()
|
||||
.map_err(|e| anyhow::anyhow!("downloadBlob missing chunk: {e}"))?
|
||||
.to_vec();
|
||||
let total = r.get_total_size();
|
||||
Ok::<(Vec<u8>, u64), anyhow::Error>((chunk, total))
|
||||
})
|
||||
.await?;
|
||||
|
||||
anyhow::ensure!(
|
||||
total_size == file_data.len() as u64,
|
||||
"totalSize mismatch: {} vs {}",
|
||||
total_size,
|
||||
file_data.len()
|
||||
);
|
||||
anyhow::ensure!(
|
||||
chunk == file_data,
|
||||
"downloaded data does not match uploaded data (len {} vs {})",
|
||||
chunk.len(),
|
||||
file_data.len()
|
||||
);
|
||||
|
||||
// ── Partial download: offset=100, length=200 ──
|
||||
let partial = local
|
||||
.run_until(async {
|
||||
let mut req = client.download_blob_request();
|
||||
{
|
||||
let mut p = req.get();
|
||||
let mut auth = p.reborrow().init_auth();
|
||||
quicproquo_client::client::rpc::set_auth(&mut auth)?;
|
||||
p.set_blob_id(&blob_id);
|
||||
p.set_offset(100);
|
||||
p.set_length(200);
|
||||
}
|
||||
let resp = req.send().promise.await
|
||||
.map_err(|e| anyhow::anyhow!("downloadBlob partial RPC failed: {e}"))?;
|
||||
let r = resp.get()
|
||||
.map_err(|e| anyhow::anyhow!("downloadBlob partial bad response: {e}"))?;
|
||||
let chunk = r.get_chunk()
|
||||
.map_err(|e| anyhow::anyhow!("downloadBlob partial missing chunk: {e}"))?
|
||||
.to_vec();
|
||||
Ok::<Vec<u8>, anyhow::Error>(chunk)
|
||||
})
|
||||
.await?;
|
||||
|
||||
anyhow::ensure!(
|
||||
partial == &file_data[100..300],
|
||||
"partial download [100..300] does not match expected slice"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Uploading with a blobHash that does not match the chunk data must return E026.
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn e2e_blob_hash_mismatch() -> anyhow::Result<()> {
|
||||
ensure_rustls_provider();
|
||||
|
||||
let temp = TempDir::new()?;
|
||||
let base = temp.path();
|
||||
|
||||
let (server, ca_cert, _child) = spawn_server(base, &["--sealed-sender"]);
|
||||
wait_for_health(&server, &ca_cert, "localhost").await?;
|
||||
init_auth(ClientAuth::from_parts("devtoken".to_string(), None));
|
||||
|
||||
let local = tokio::task::LocalSet::new();
|
||||
|
||||
let alice_state = base.join("alice.bin");
|
||||
local
|
||||
.run_until(cmd_register_state(&alice_state, &server, &ca_cert, "localhost", None))
|
||||
.await?;
|
||||
|
||||
let client = local.run_until(connect_node(&server, &ca_cert, "localhost")).await?;
|
||||
|
||||
// Chunk data.
|
||||
let chunk_data = b"some file content for mismatch test";
|
||||
|
||||
// Wrong hash (all zeros — will not match any real data).
|
||||
let wrong_hash = [0u8; 32];
|
||||
|
||||
let result = local
|
||||
.run_until(async {
|
||||
let mut req = client.upload_blob_request();
|
||||
{
|
||||
let mut p = req.get();
|
||||
let mut auth = p.reborrow().init_auth();
|
||||
quicproquo_client::client::rpc::set_auth(&mut auth)?;
|
||||
p.set_blob_hash(&wrong_hash);
|
||||
p.set_chunk(&chunk_data[..]);
|
||||
p.set_offset(0);
|
||||
p.set_total_size(chunk_data.len() as u64);
|
||||
p.set_mime_type("text/plain");
|
||||
}
|
||||
let resp = req.send().promise.await
|
||||
.map_err(|e| anyhow::anyhow!("uploadBlob RPC: {e}"))?;
|
||||
resp.get()
|
||||
.map_err(|e| anyhow::anyhow!("uploadBlob response: {e}"))?;
|
||||
Ok::<(), anyhow::Error>(())
|
||||
})
|
||||
.await;
|
||||
|
||||
match result {
|
||||
Ok(_) => anyhow::bail!("uploadBlob with wrong hash should have been rejected"),
|
||||
Err(e) => {
|
||||
let msg = format!("{e:#}");
|
||||
anyhow::ensure!(
|
||||
msg.contains("E026") || msg.contains("hash") || msg.contains("mismatch"),
|
||||
"expected E026 / hash mismatch error, got: {msg}"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -26,6 +26,7 @@ pub enum MessageType {
|
||||
Typing = 0x05,
|
||||
Edit = 0x06,
|
||||
Delete = 0x07,
|
||||
FileRef = 0x08,
|
||||
}
|
||||
|
||||
impl MessageType {
|
||||
@@ -38,6 +39,7 @@ impl MessageType {
|
||||
0x05 => Some(MessageType::Typing),
|
||||
0x06 => Some(MessageType::Edit),
|
||||
0x07 => Some(MessageType::Delete),
|
||||
0x08 => Some(MessageType::FileRef),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
@@ -75,6 +77,13 @@ pub enum AppMessage {
|
||||
Delete {
|
||||
ref_msg_id: [u8; 16],
|
||||
},
|
||||
/// File reference: metadata pointing to a blob stored on the server.
|
||||
FileRef {
|
||||
blob_id: [u8; 32],
|
||||
filename: Vec<u8>,
|
||||
file_size: u64,
|
||||
mime_type: Vec<u8>,
|
||||
},
|
||||
}
|
||||
|
||||
/// Generate a new 16-byte message ID (e.g. for Chat/Reply so recipients can reference it).
|
||||
@@ -95,6 +104,7 @@ pub fn generate_message_id() -> [u8; 16] {
|
||||
// Typing: [active: 1] 0 = stopped, 1 = typing
|
||||
// Edit: [ref_msg_id: 16][body_len: 2 BE][body]
|
||||
// Delete: [ref_msg_id: 16]
|
||||
// FileRef: [blob_id: 32][filename_len: 2 BE][filename][file_size: 8 BE][mime_len: 2 BE][mime_type]
|
||||
|
||||
/// Serialize a rich message into the application payload format.
|
||||
pub fn serialize(msg_type: MessageType, payload: &[u8]) -> Vec<u8> {
|
||||
@@ -170,6 +180,29 @@ pub fn serialize_delete(ref_msg_id: &[u8; 16]) -> Vec<u8> {
|
||||
serialize(MessageType::Delete, ref_msg_id)
|
||||
}
|
||||
|
||||
/// Serialize a FileRef message (metadata pointing to a blob on the server).
|
||||
pub fn serialize_file_ref(
|
||||
blob_id: &[u8; 32],
|
||||
filename: &[u8],
|
||||
file_size: u64,
|
||||
mime_type: &[u8],
|
||||
) -> Result<Vec<u8>, CoreError> {
|
||||
if filename.len() > u16::MAX as usize {
|
||||
return Err(CoreError::AppMessage("filename exceeds maximum length".into()));
|
||||
}
|
||||
if mime_type.len() > u16::MAX as usize {
|
||||
return Err(CoreError::AppMessage("mime_type exceeds maximum length".into()));
|
||||
}
|
||||
let mut payload = Vec::with_capacity(32 + 2 + filename.len() + 8 + 2 + mime_type.len());
|
||||
payload.extend_from_slice(blob_id);
|
||||
payload.extend_from_slice(&(filename.len() as u16).to_be_bytes());
|
||||
payload.extend_from_slice(filename);
|
||||
payload.extend_from_slice(&file_size.to_be_bytes());
|
||||
payload.extend_from_slice(&(mime_type.len() as u16).to_be_bytes());
|
||||
payload.extend_from_slice(mime_type);
|
||||
Ok(serialize(MessageType::FileRef, &payload))
|
||||
}
|
||||
|
||||
/// Parse bytes into (MessageType, AppMessage). Fails if version/type unknown or payload too short.
|
||||
pub fn parse(bytes: &[u8]) -> Result<(MessageType, AppMessage), CoreError> {
|
||||
if bytes.len() < 2 {
|
||||
@@ -191,6 +224,7 @@ pub fn parse(bytes: &[u8]) -> Result<(MessageType, AppMessage), CoreError> {
|
||||
MessageType::Typing => parse_typing(payload)?,
|
||||
MessageType::Edit => parse_edit(payload)?,
|
||||
MessageType::Delete => parse_delete(payload)?,
|
||||
MessageType::FileRef => parse_file_ref(payload)?,
|
||||
};
|
||||
Ok((msg_type, app))
|
||||
}
|
||||
@@ -276,6 +310,34 @@ fn parse_delete(payload: &[u8]) -> Result<AppMessage, CoreError> {
|
||||
Ok(AppMessage::Delete { ref_msg_id })
|
||||
}
|
||||
|
||||
fn parse_file_ref(payload: &[u8]) -> Result<AppMessage, CoreError> {
|
||||
// blob_id(32) + filename_len(2) minimum
|
||||
if payload.len() < 34 {
|
||||
return Err(CoreError::AppMessage("FileRef payload too short".into()));
|
||||
}
|
||||
let mut blob_id = [0u8; 32];
|
||||
blob_id.copy_from_slice(&payload[..32]);
|
||||
let filename_len = u16::from_be_bytes([payload[32], payload[33]]) as usize;
|
||||
let pos = 34;
|
||||
if payload.len() < pos + filename_len + 8 + 2 {
|
||||
return Err(CoreError::AppMessage("FileRef payload truncated after filename_len".into()));
|
||||
}
|
||||
let filename = payload[pos..pos + filename_len].to_vec();
|
||||
let pos = pos + filename_len;
|
||||
let file_size = u64::from_be_bytes([
|
||||
payload[pos], payload[pos + 1], payload[pos + 2], payload[pos + 3],
|
||||
payload[pos + 4], payload[pos + 5], payload[pos + 6], payload[pos + 7],
|
||||
]);
|
||||
let pos = pos + 8;
|
||||
let mime_len = u16::from_be_bytes([payload[pos], payload[pos + 1]]) as usize;
|
||||
let pos = pos + 2;
|
||||
if payload.len() < pos + mime_len {
|
||||
return Err(CoreError::AppMessage("FileRef payload truncated after mime_len".into()));
|
||||
}
|
||||
let mime_type = payload[pos..pos + mime_len].to_vec();
|
||||
Ok(AppMessage::FileRef { blob_id, filename, file_size, mime_type })
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
@@ -415,4 +477,29 @@ mod tests {
|
||||
data.extend_from_slice(&[0u8; 10]);
|
||||
assert!(parse(&data).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn roundtrip_file_ref() {
|
||||
let blob_id = [7u8; 32];
|
||||
let filename = b"report.pdf";
|
||||
let file_size = 123456u64;
|
||||
let mime_type = b"application/pdf";
|
||||
let encoded = serialize_file_ref(&blob_id, filename, file_size, mime_type).unwrap();
|
||||
let (t, msg) = parse(&encoded).unwrap();
|
||||
assert_eq!(t, MessageType::FileRef);
|
||||
match &msg {
|
||||
AppMessage::FileRef {
|
||||
blob_id: bid,
|
||||
filename: fname,
|
||||
file_size: fsize,
|
||||
mime_type: mtype,
|
||||
} => {
|
||||
assert_eq!(bid, &blob_id);
|
||||
assert_eq!(fname.as_slice(), filename);
|
||||
assert_eq!(*fsize, file_size);
|
||||
assert_eq!(mtype.as_slice(), mime_type);
|
||||
}
|
||||
_ => panic!("expected FileRef"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -59,8 +59,9 @@ pub mod opaque_auth;
|
||||
// ── Public API (always available) ───────────────────────────────────────────
|
||||
|
||||
pub use app_message::{
|
||||
serialize, serialize_chat, serialize_delete, serialize_edit, serialize_reaction,
|
||||
serialize_read_receipt, serialize_reply, serialize_typing, parse, generate_message_id,
|
||||
serialize, serialize_chat, serialize_delete, serialize_edit, serialize_file_ref,
|
||||
serialize_reaction, serialize_read_receipt, serialize_reply, serialize_typing,
|
||||
parse, generate_message_id,
|
||||
AppMessage, MessageType, VERSION as APP_MESSAGE_VERSION,
|
||||
};
|
||||
pub use error::CoreError;
|
||||
|
||||
@@ -52,6 +52,7 @@ anyhow = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
bincode = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
|
||||
# CLI
|
||||
clap = { workspace = true }
|
||||
|
||||
@@ -26,6 +26,10 @@ pub const E020_BAD_PARAMS: &str = "E020";
|
||||
pub const E021_CIPHERSUITE_NOT_ALLOWED: &str = "E021";
|
||||
pub const E022_CHANNEL_ACCESS_DENIED: &str = "E022";
|
||||
pub const E023_CHANNEL_NOT_FOUND: &str = "E023";
|
||||
pub const E024_BLOB_TOO_LARGE: &str = "E024";
|
||||
pub const E025_BLOB_HASH_LENGTH: &str = "E025";
|
||||
pub const E026_BLOB_HASH_MISMATCH: &str = "E026";
|
||||
pub const E027_BLOB_NOT_FOUND: &str = "E027";
|
||||
|
||||
/// Build a `capnp::Error::failed()` with the structured code prefix.
|
||||
pub fn coded_error(code: &str, msg: impl std::fmt::Display) -> capnp::Error {
|
||||
|
||||
@@ -220,6 +220,10 @@ async fn main() -> anyhow::Result<()> {
|
||||
}
|
||||
};
|
||||
|
||||
// Ensure blobs directory exists for file transfer support.
|
||||
std::fs::create_dir_all(PathBuf::from(&effective.data_dir).join("blobs"))
|
||||
.context("create blobs directory")?;
|
||||
|
||||
let auth_cfg = Arc::new(AuthConfig::new(
|
||||
effective.auth_token.clone(),
|
||||
effective.allow_insecure_auth,
|
||||
@@ -594,6 +598,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
let sk = Arc::clone(&signing_key);
|
||||
let conn_hooks = Arc::clone(&hooks);
|
||||
let conn_kt_log = Arc::clone(&kt_log);
|
||||
let conn_data_dir = PathBuf::from(&effective.data_dir);
|
||||
|
||||
tokio::task::spawn_local(async move {
|
||||
if let Err(e) = handle_node_connection(
|
||||
@@ -611,6 +616,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
sk,
|
||||
conn_hooks,
|
||||
conn_kt_log,
|
||||
conn_data_dir,
|
||||
)
|
||||
.await
|
||||
{
|
||||
|
||||
325
crates/quicproquo-server/src/node_service/blob_ops.rs
Normal file
325
crates/quicproquo-server/src/node_service/blob_ops.rs
Normal file
@@ -0,0 +1,325 @@
|
||||
//! uploadBlob / downloadBlob RPCs: chunked file transfer with SHA-256 integrity verification.
|
||||
|
||||
use std::io::{Read, Seek, SeekFrom, Write};
|
||||
use std::path::PathBuf;
|
||||
|
||||
use capnp::capability::Promise;
|
||||
use quicproquo_proto::node_capnp::node_service;
|
||||
use sha2::{Digest, Sha256};
|
||||
|
||||
use crate::auth::{coded_error, fmt_hex, validate_auth_context};
|
||||
use crate::error_codes::*;
|
||||
|
||||
use super::NodeServiceImpl;
|
||||
|
||||
/// Maximum blob size: 50 MB.
|
||||
const MAX_BLOB_SIZE: u64 = 50 * 1024 * 1024;
|
||||
|
||||
/// Maximum download chunk size: 256 KB.
|
||||
const MAX_DOWNLOAD_CHUNK: u32 = 256 * 1024;
|
||||
|
||||
/// Metadata stored alongside each completed blob.
|
||||
#[derive(serde::Serialize, serde::Deserialize)]
|
||||
struct BlobMeta {
|
||||
mime_type: String,
|
||||
total_size: u64,
|
||||
uploaded_at: u64,
|
||||
uploader_key_prefix: String,
|
||||
}
|
||||
|
||||
/// Resolve the blobs directory from the server's data_dir.
|
||||
fn blobs_dir(data_dir: &std::path::Path) -> PathBuf {
|
||||
data_dir.join("blobs")
|
||||
}
|
||||
|
||||
impl NodeServiceImpl {
|
||||
pub fn handle_upload_blob(
|
||||
&mut self,
|
||||
params: node_service::UploadBlobParams,
|
||||
mut results: node_service::UploadBlobResults,
|
||||
) -> Promise<(), capnp::Error> {
|
||||
let p = match params.get() {
|
||||
Ok(p) => p,
|
||||
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
|
||||
};
|
||||
|
||||
let auth_ctx = match validate_auth_context(&self.auth_cfg, &self.sessions, p.get_auth()) {
|
||||
Ok(ctx) => ctx,
|
||||
Err(e) => return Promise::err(e),
|
||||
};
|
||||
|
||||
let blob_hash = match p.get_blob_hash() {
|
||||
Ok(v) => v.to_vec(),
|
||||
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
|
||||
};
|
||||
let chunk = match p.get_chunk() {
|
||||
Ok(v) => v.to_vec(),
|
||||
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
|
||||
};
|
||||
let offset = p.get_offset();
|
||||
let total_size = p.get_total_size();
|
||||
let mime_type = match p.get_mime_type() {
|
||||
Ok(v) => match v.to_str() {
|
||||
Ok(s) => s.to_string(),
|
||||
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
|
||||
},
|
||||
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
|
||||
};
|
||||
|
||||
// Validate blobHash length.
|
||||
if blob_hash.len() != 32 {
|
||||
return Promise::err(coded_error(
|
||||
E025_BLOB_HASH_LENGTH,
|
||||
format!("blobHash must be exactly 32 bytes, got {}", blob_hash.len()),
|
||||
));
|
||||
}
|
||||
|
||||
// Validate totalSize.
|
||||
if total_size > MAX_BLOB_SIZE {
|
||||
return Promise::err(coded_error(
|
||||
E024_BLOB_TOO_LARGE,
|
||||
format!("totalSize {} exceeds max blob size ({} bytes)", total_size, MAX_BLOB_SIZE),
|
||||
));
|
||||
}
|
||||
if total_size == 0 {
|
||||
return Promise::err(coded_error(E020_BAD_PARAMS, "totalSize must be > 0"));
|
||||
}
|
||||
|
||||
// Validate chunk bounds.
|
||||
if offset.checked_add(chunk.len() as u64).map_or(true, |end| end > total_size) {
|
||||
return Promise::err(coded_error(
|
||||
E020_BAD_PARAMS,
|
||||
format!(
|
||||
"chunk out of bounds: offset={} + chunk_len={} > totalSize={}",
|
||||
offset,
|
||||
chunk.len(),
|
||||
total_size
|
||||
),
|
||||
));
|
||||
}
|
||||
|
||||
let blob_hex = hex::encode(&blob_hash);
|
||||
let dir = blobs_dir(&self.data_dir);
|
||||
|
||||
// Ensure blobs directory exists.
|
||||
if let Err(e) = std::fs::create_dir_all(&dir) {
|
||||
return Promise::err(coded_error(
|
||||
E009_STORAGE_ERROR,
|
||||
format!("failed to create blobs directory: {e}"),
|
||||
));
|
||||
}
|
||||
|
||||
let part_path = dir.join(format!("{blob_hex}.part"));
|
||||
let final_path = dir.join(&blob_hex);
|
||||
let meta_path = dir.join(format!("{blob_hex}.meta"));
|
||||
|
||||
// If the blob already exists (fully uploaded), return immediately.
|
||||
if final_path.exists() {
|
||||
results.get().set_blob_id(&blob_hash);
|
||||
return Promise::ok(());
|
||||
}
|
||||
|
||||
// Write chunk at the given offset.
|
||||
let write_result = (|| -> Result<(), String> {
|
||||
let mut file = std::fs::OpenOptions::new()
|
||||
.create(true)
|
||||
.write(true)
|
||||
.truncate(false)
|
||||
.open(&part_path)
|
||||
.map_err(|e| format!("open .part file: {e}"))?;
|
||||
file.seek(SeekFrom::Start(offset))
|
||||
.map_err(|e| format!("seek: {e}"))?;
|
||||
file.write_all(&chunk)
|
||||
.map_err(|e| format!("write chunk: {e}"))?;
|
||||
file.sync_all()
|
||||
.map_err(|e| format!("sync: {e}"))?;
|
||||
Ok(())
|
||||
})();
|
||||
|
||||
if let Err(e) = write_result {
|
||||
return Promise::err(coded_error(E009_STORAGE_ERROR, e));
|
||||
}
|
||||
|
||||
// Check if the blob is complete.
|
||||
let end = offset + chunk.len() as u64;
|
||||
if end == total_size {
|
||||
// Verify SHA-256 of the complete file.
|
||||
let verify_result = (|| -> Result<bool, String> {
|
||||
let mut file = std::fs::File::open(&part_path)
|
||||
.map_err(|e| format!("open for verify: {e}"))?;
|
||||
let mut hasher = Sha256::new();
|
||||
let mut buf = [0u8; 64 * 1024];
|
||||
loop {
|
||||
let n = file.read(&mut buf).map_err(|e| format!("read: {e}"))?;
|
||||
if n == 0 {
|
||||
break;
|
||||
}
|
||||
hasher.update(&buf[..n]);
|
||||
}
|
||||
let computed: [u8; 32] = hasher.finalize().into();
|
||||
Ok(computed == blob_hash.as_slice())
|
||||
})();
|
||||
|
||||
match verify_result {
|
||||
Ok(true) => {
|
||||
// Hash matches — finalize the blob.
|
||||
if let Err(e) = std::fs::rename(&part_path, &final_path) {
|
||||
return Promise::err(coded_error(
|
||||
E009_STORAGE_ERROR,
|
||||
format!("rename .part to final: {e}"),
|
||||
));
|
||||
}
|
||||
|
||||
// Write metadata file.
|
||||
let uploader_prefix = auth_ctx
|
||||
.identity_key
|
||||
.as_deref()
|
||||
.filter(|k| k.len() >= 4)
|
||||
.map(|k| hex::encode(&k[..4]))
|
||||
.unwrap_or_default();
|
||||
|
||||
let now = std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_secs();
|
||||
|
||||
let meta = BlobMeta {
|
||||
mime_type: mime_type.clone(),
|
||||
total_size,
|
||||
uploaded_at: now,
|
||||
uploader_key_prefix: uploader_prefix.clone(),
|
||||
};
|
||||
|
||||
if let Err(e) = (|| -> Result<(), String> {
|
||||
let json = serde_json::to_string_pretty(&meta)
|
||||
.map_err(|e| format!("serialize meta: {e}"))?;
|
||||
std::fs::write(&meta_path, json.as_bytes())
|
||||
.map_err(|e| format!("write meta: {e}"))?;
|
||||
Ok(())
|
||||
})() {
|
||||
// Non-fatal: the blob is already stored; log and continue.
|
||||
tracing::warn!(error = %e, "failed to write blob metadata");
|
||||
}
|
||||
|
||||
tracing::info!(
|
||||
blob_hash_prefix = %fmt_hex(&blob_hash[..4]),
|
||||
total_size = total_size,
|
||||
mime_type = %mime_type,
|
||||
uploader_prefix = %uploader_prefix,
|
||||
"audit: blob_upload_complete"
|
||||
);
|
||||
}
|
||||
Ok(false) => {
|
||||
// Hash mismatch — delete the .part file.
|
||||
let _ = std::fs::remove_file(&part_path);
|
||||
return Promise::err(coded_error(
|
||||
E026_BLOB_HASH_MISMATCH,
|
||||
"SHA-256 of uploaded data does not match blobHash",
|
||||
));
|
||||
}
|
||||
Err(e) => {
|
||||
let _ = std::fs::remove_file(&part_path);
|
||||
return Promise::err(coded_error(E009_STORAGE_ERROR, e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
results.get().set_blob_id(&blob_hash);
|
||||
Promise::ok(())
|
||||
}
|
||||
|
||||
pub fn handle_download_blob(
|
||||
&mut self,
|
||||
params: node_service::DownloadBlobParams,
|
||||
mut results: node_service::DownloadBlobResults,
|
||||
) -> Promise<(), capnp::Error> {
|
||||
let p = match params.get() {
|
||||
Ok(p) => p,
|
||||
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
|
||||
};
|
||||
|
||||
if let Err(e) = validate_auth_context(&self.auth_cfg, &self.sessions, p.get_auth()) {
|
||||
return Promise::err(e);
|
||||
}
|
||||
|
||||
let blob_id = match p.get_blob_id() {
|
||||
Ok(v) => v.to_vec(),
|
||||
Err(e) => return Promise::err(coded_error(E020_BAD_PARAMS, e)),
|
||||
};
|
||||
let offset = p.get_offset();
|
||||
let length = p.get_length().min(MAX_DOWNLOAD_CHUNK);
|
||||
|
||||
if blob_id.len() != 32 {
|
||||
return Promise::err(coded_error(
|
||||
E025_BLOB_HASH_LENGTH,
|
||||
format!("blobId must be exactly 32 bytes, got {}", blob_id.len()),
|
||||
));
|
||||
}
|
||||
|
||||
let blob_hex = hex::encode(&blob_id);
|
||||
let dir = blobs_dir(&self.data_dir);
|
||||
let blob_path = dir.join(&blob_hex);
|
||||
let meta_path = dir.join(format!("{blob_hex}.meta"));
|
||||
|
||||
// Check that the blob exists.
|
||||
if !blob_path.exists() {
|
||||
return Promise::err(coded_error(E027_BLOB_NOT_FOUND, "blob not found"));
|
||||
}
|
||||
|
||||
// Read metadata.
|
||||
let meta: BlobMeta = match std::fs::read_to_string(&meta_path) {
|
||||
Ok(json) => match serde_json::from_str(&json) {
|
||||
Ok(m) => m,
|
||||
Err(e) => {
|
||||
return Promise::err(coded_error(
|
||||
E009_STORAGE_ERROR,
|
||||
format!("corrupt blob metadata: {e}"),
|
||||
));
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
return Promise::err(coded_error(
|
||||
E009_STORAGE_ERROR,
|
||||
format!("read blob metadata: {e}"),
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
// Read the requested chunk.
|
||||
let read_result = (|| -> Result<Vec<u8>, String> {
|
||||
let mut file = std::fs::File::open(&blob_path)
|
||||
.map_err(|e| format!("open blob: {e}"))?;
|
||||
let file_len = file
|
||||
.metadata()
|
||||
.map_err(|e| format!("file metadata: {e}"))?
|
||||
.len();
|
||||
|
||||
if offset >= file_len {
|
||||
return Ok(vec![]);
|
||||
}
|
||||
|
||||
file.seek(SeekFrom::Start(offset))
|
||||
.map_err(|e| format!("seek: {e}"))?;
|
||||
let remaining = (file_len - offset) as usize;
|
||||
let to_read = remaining.min(length as usize);
|
||||
let mut buf = vec![0u8; to_read];
|
||||
file.read_exact(&mut buf)
|
||||
.map_err(|e| format!("read chunk: {e}"))?;
|
||||
Ok(buf)
|
||||
})();
|
||||
|
||||
match read_result {
|
||||
Ok(chunk) => {
|
||||
let mut r = results.get();
|
||||
r.set_chunk(&chunk);
|
||||
r.set_total_size(meta.total_size);
|
||||
r.set_mime_type(&meta.mime_type);
|
||||
}
|
||||
Err(e) => {
|
||||
return Promise::err(coded_error(E009_STORAGE_ERROR, e));
|
||||
}
|
||||
}
|
||||
|
||||
Promise::ok(())
|
||||
}
|
||||
}
|
||||
@@ -1,3 +1,4 @@
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
@@ -20,6 +21,7 @@ use crate::storage::Store;
|
||||
const CAPNP_TRAVERSAL_LIMIT_WORDS: usize = 4 * 1024 * 1024;
|
||||
|
||||
mod auth_ops;
|
||||
mod blob_ops;
|
||||
mod channel_ops;
|
||||
mod delivery;
|
||||
mod key_ops;
|
||||
@@ -194,6 +196,22 @@ impl node_service::Server for NodeServiceImpl {
|
||||
) -> capnp::capability::Promise<(), capnp::Error> {
|
||||
self.handle_resolve_identity(params, results)
|
||||
}
|
||||
|
||||
fn upload_blob(
|
||||
&mut self,
|
||||
params: node_service::UploadBlobParams,
|
||||
results: node_service::UploadBlobResults,
|
||||
) -> capnp::capability::Promise<(), capnp::Error> {
|
||||
self.handle_upload_blob(params, results)
|
||||
}
|
||||
|
||||
fn download_blob(
|
||||
&mut self,
|
||||
params: node_service::DownloadBlobParams,
|
||||
results: node_service::DownloadBlobResults,
|
||||
) -> capnp::capability::Promise<(), capnp::Error> {
|
||||
self.handle_download_blob(params, results)
|
||||
}
|
||||
}
|
||||
|
||||
pub const CURRENT_WIRE_VERSION: u16 = 1;
|
||||
@@ -218,6 +236,8 @@ pub struct NodeServiceImpl {
|
||||
pub signing_key: Arc<quicproquo_core::IdentityKeypair>,
|
||||
/// Key Transparency Merkle log (shared across connections).
|
||||
pub kt_log: Arc<std::sync::Mutex<MerkleLog>>,
|
||||
/// Server data directory (used for blob storage).
|
||||
pub data_dir: PathBuf,
|
||||
}
|
||||
|
||||
impl NodeServiceImpl {
|
||||
@@ -236,6 +256,7 @@ impl NodeServiceImpl {
|
||||
signing_key: Arc<quicproquo_core::IdentityKeypair>,
|
||||
hooks: Arc<dyn crate::hooks::ServerHooks>,
|
||||
kt_log: Arc<std::sync::Mutex<MerkleLog>>,
|
||||
data_dir: PathBuf,
|
||||
) -> Self {
|
||||
Self {
|
||||
store,
|
||||
@@ -251,6 +272,7 @@ impl NodeServiceImpl {
|
||||
hooks,
|
||||
signing_key,
|
||||
kt_log,
|
||||
data_dir,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -271,6 +293,7 @@ pub async fn handle_node_connection(
|
||||
signing_key: Arc<quicproquo_core::IdentityKeypair>,
|
||||
hooks: Arc<dyn crate::hooks::ServerHooks>,
|
||||
kt_log: Arc<std::sync::Mutex<MerkleLog>>,
|
||||
data_dir: PathBuf,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
let connection = connecting.await?;
|
||||
|
||||
@@ -305,6 +328,7 @@ pub async fn handle_node_connection(
|
||||
signing_key,
|
||||
hooks,
|
||||
kt_log,
|
||||
data_dir,
|
||||
));
|
||||
|
||||
RpcSystem::new(Box::new(network), Some(service.client))
|
||||
|
||||
Reference in New Issue
Block a user