Files
quicproquo/docs/src/internals/delivery-service.md
Christian Nennemann 2e081ead8e chore: rename quicproquo → quicprochat in docs, Docker, CI, and packaging
Rename all project references from quicproquo/qpq to quicprochat/qpc
across documentation, Docker configuration, CI workflows, packaging
scripts, operational configs, and build tooling.

- Docker: crate paths, binary names, user/group, data dirs, env vars
- CI: workflow crate references, binary names, artifact names
- Docs: all markdown files under docs/, SDK READMEs, book.toml
- Packaging: OpenWrt Makefile, init script, UCI config (file renames)
- Scripts: justfile, dev-shell, screenshot, cross-compile, ai_team
- Operations: Prometheus config, alert rules, Grafana dashboard
- Config: .env.example (QPQ_* → QPC_*), CODEOWNERS paths
- Top-level: README, CONTRIBUTING, ROADMAP, CLAUDE.md
2026-03-21 19:14:06 +01:00

13 KiB

Delivery Service Internals

The Delivery Service (DS) is a store-and-forward relay for opaque MLS payloads. It never inspects, decrypts, or validates MLS ciphertext -- it routes solely by recipient identity key and channel identifier. The DS exposes three operations through the NodeService RPC interface: enqueue, fetch, and fetchWait.

Sources:

  • crates/quicprochat-server/src/main.rs (RPC handlers)
  • crates/quicprochat-server/src/storage.rs (queue storage)
  • schemas/node.capnp (wire schema)

Architecture

                         NodeService (port 7000)
                         =======================

  enqueue(recipientKey, payload, channelId)
      |
      v
  +---------------------------------------------------------+
  |  FileBackedStore                                        |
  |                                                         |
  |  deliveries: Mutex<HashMap<ChannelKey, VecDeque<Vec<u8>>>>|
  |                      ^                  ^                |
  |                      |                  |                |
  |               ChannelKey {          FIFO queue of        |
  |                 channel_id,         opaque payload        |
  |                 recipient_key       bytes                |
  |               }                                         |
  |                                                         |
  |  Persisted to: data/deliveries.bin (bincode, V2 format) |
  +---------------------------------------------------------+
      |
      v
  notify_waiters() --> DashMap<Vec<u8>, Arc<Notify>>
                          ^
                          |
                    keyed by recipient_key
                    wakes blocked fetchWait calls

The DS is intentionally MLS-unaware. This design decision is documented in ADR-004: MLS-Unaware Delivery Service. From the server's perspective, every payload is an opaque blob -- it could be a Welcome, a Commit, an application message, or a hybrid-encrypted envelope.


Queue Model

ChannelKey

Delivery queues are indexed by a compound key:

#[derive(Serialize, Deserialize, Clone, Eq, PartialEq, Debug)]
pub struct ChannelKey {
    pub channel_id: Vec<u8>,
    pub recipient_key: Vec<u8>,
}
Field Size Purpose
channel_id Variable (typically 16 bytes UUID or empty) Isolates messages by conversation. Empty for legacy/default channel.
recipient_key 32 bytes Ed25519 public key of the intended recipient.

The ChannelKey implements Hash manually, hashing channel_id followed by recipient_key.

Channel-aware routing ensures that messages for different conversations do not interfere with each other. A client fetching from channel A will not see messages enqueued for channel B, even if both target the same recipient. For legacy clients (or single-channel usage), channel_id is left empty.

Queue Structure

Each ChannelKey maps to a VecDeque<Vec<u8>>:

ChannelKey("chan-1", "alice-pk") -> [msg_1, msg_2, msg_3]
ChannelKey("chan-1", "bob-pk")   -> [msg_4]
ChannelKey("chan-2", "alice-pk") -> [msg_5, msg_6]
ChannelKey("",      "alice-pk") -> [msg_7]  (legacy/default channel)

Messages within a queue are ordered FIFO (first-in, first-out). This preserves MLS epoch ordering, which is critical: a recipient must process a Welcome before application messages, and Commits in the order they were produced.


RPC Operations

enqueue

Appends a payload to the recipient's queue and wakes any blocked long-poll waiters.

enqueue @2 (recipientKey :Data, payload :Data, channelId :Data,
            version :UInt16, auth :Auth) -> ();

Handler logic:

  1. Parse parameters. Extract recipientKey, payload, channelId, version, and auth from the Cap'n Proto request.

  2. Validate auth. Call validate_auth() to check the Auth struct. See Authentication Service Internals for auth validation details.

  3. Validate inputs:

    Field Constraint Error on Violation
    recipientKey Exactly 32 bytes "recipientKey must be exactly 32 bytes, got {n}"
    payload Non-empty "payload must not be empty"
    payload At most 5 MB "payload exceeds max size (5242880 bytes)"
    version 0 (legacy) or 1 (current) "unsupported wire version {v} (expected 0 or 1)"
  4. Store. Call FileBackedStore::enqueue(recipient_key, channel_id, payload), which constructs a ChannelKey from the channel ID and recipient key, then pushes the payload to the back of the corresponding VecDeque. The entire delivery map is flushed to disk.

  5. Notify waiters. Look up or create a tokio::sync::Notify for the recipient key in DashMap<Vec<u8>, Arc<Notify>> and call notify_waiters(). This wakes all fetchWait calls currently blocked on this recipient.

fetch

Atomically drains the entire queue for a recipient+channel and returns all payloads.

fetch @3 (recipientKey :Data, channelId :Data, version :UInt16, auth :Auth)
    -> (payloads :List(Data));

Handler logic:

  1. Parse and validate recipientKey (32 bytes), version (0 or 1), and auth.

  2. Call FileBackedStore::fetch(recipient_key, channel_id), which:

    • Constructs a ChannelKey.
    • Calls VecDeque::drain(..) on the matching queue, collecting all messages.
    • Flushes the updated (now empty) map to disk.
    • Returns the drained messages as Vec<Vec<u8>>.
  3. Build a List(Data) response with all the payload bytes.

Important: The drain is atomic with respect to the Mutex lock. No interleaving with concurrent enqueue calls is possible. The returned list preserves FIFO order.

fetchWait (Long-Polling)

Combines fetch with a blocking wait. If the queue is empty, the server waits for up to timeoutMs milliseconds for a new message to arrive.

fetchWait @4 (recipientKey :Data, channelId :Data, version :UInt16,
              timeoutMs :UInt64, auth :Auth) -> (payloads :List(Data));

Handler logic:

1. validate inputs (same as fetch)
2. messages = store.fetch(recipient_key, channel_id)
3. if messages.is_empty() AND timeout_ms > 0:
     a. waiter = waiters.entry(recipient_key).or_insert(Arc::new(Notify::new()))
     b. tokio::time::timeout(Duration::from_millis(timeout_ms), waiter.notified()).await
     c. messages = store.fetch(recipient_key, channel_id)  // re-fetch after wake
4. return messages

The implementation uses Promise::from_future(async move { ... }) because the tokio::time::timeout call is async. This is the only DS handler that produces an async Promise.

Timeout behavior:

  • If timeout_ms == 0, fetchWait behaves identically to fetch (immediate return).
  • If a message arrives before the timeout, notify_waiters() from enqueue wakes the Notify, and the handler re-fetches immediately.
  • If the timeout expires without a message, the handler re-fetches (which will return empty) and returns an empty list.

Waiter model: The DashMap<Vec<u8>, Arc<Notify>> is keyed by recipient key (not by ChannelKey). This means a notification for any channel targeting the same recipient will wake all blocked fetchWait calls for that recipient. This is a deliberate simplification -- the re-fetch after waking will only return messages from the requested channel, so cross-channel wake-ups result in a no-op re-fetch rather than incorrect behavior.


Version Validation

The version field in enqueue, fetch, and fetchWait enables future schema evolution:

Version Meaning
0 Legacy (pre-versioning). channelId is treated as empty.
1 Current wire format. channelId is a meaningful field.
2+ Rejected with "unsupported wire version".

Both 0 and 1 are accepted on the server side. The constant CURRENT_WIRE_VERSION = 1 is used in validation:

if version != 0 && version != CURRENT_WIRE_VERSION {
    return Promise::err(/* unsupported version */);
}

The client library always sends version: 1 for new operations.


Notification System

The waiter map provides a lightweight push-notification mechanism:

enqueue()                             fetchWait()
    |                                      |
    v                                      v
store.enqueue(key, ch, payload)       messages = store.fetch(key, ch)
    |                                      |
    v                                      |  (if empty)
waiter = waiters.entry(key)                v
    .or_insert(Notify::new())         waiter = waiters.entry(key)
    |                                      .or_insert(Notify::new())
    v                                      |
waiter.notify_waiters()                    v
    |                                 timeout(duration, waiter.notified())
    |                                      |
    +------- wakes ----------------------->+
                                           |
                                           v
                                      messages = store.fetch(key, ch)
                                           |
                                           v
                                      return messages

tokio::sync::Notify is a broadcast notification primitive. notify_waiters() wakes all tasks currently awaiting .notified(). If no tasks are waiting, the notification is lost (there is no stored permit in the notify_waiters() path). This is acceptable because fetchWait always performs a fetch before blocking, so messages that arrive before the wait begins are returned immediately.


Data Flow Example: Two-Party Message Exchange

Alice                       Server DS                        Bob
  |                            |                              |
  | encrypt("hello bob")       |                              |
  | -> ct_bytes                |                              |
  |                            |                              |
  | enqueue(bob_pk, ct_bytes)  |                              |
  | -------------------------> |                              |
  |                            | queue[("", bob_pk)] += ct    |
  |                            | notify_waiters(bob_pk)       |
  |                            |                              |
  |                            | <--- fetchWait(bob_pk, 30s)  |
  |                            |      (was blocked, now woken)|
  |                            |                              |
  |                            | drain queue[("", bob_pk)]    |
  |                            | ---- [ct_bytes] -----------> |
  |                            |                              |
  |                            |         decrypt(ct_bytes)    |
  |                            |         -> "hello bob"       |

Server Constants

Constant Value Purpose
MAX_PAYLOAD_BYTES 5,242,880 (5 MB) Maximum size of a single enqueued payload
MAX_KEYPACKAGE_BYTES 1,048,576 (1 MB) Maximum size of a KeyPackage (AS)
CURRENT_WIRE_VERSION 1 Current schema version; rejects > 1

Persistence

Delivery queues are persisted to data/deliveries.bin using bincode serialization. The V2 format uses ChannelKey as the map key:

#[derive(Serialize, Deserialize, Default)]
struct QueueMapV2 {
    map: HashMap<ChannelKey, VecDeque<Vec<u8>>>,
}

On load, the server attempts V2 deserialization first. If that fails, it falls back to V1 format (keyed by Vec<u8> recipient key only) and migrates in memory by assigning empty channel_id to each entry:

for (recipient_key, queue) in legacy.map.into_iter() {
    upgraded.insert(
        ChannelKey { channel_id: Vec::new(), recipient_key },
        queue,
    );
}

See Storage Backend for the full persistence model.