feat(server): wire device_id through delivery proto and v2 handlers

Add device_id field to FetchRequest, FetchWaitRequest, PeekRequest,
and AckRequest proto messages. V2 handlers now build composite
queue keys (identity_key + device_id) when device_id is provided,
enabling per-device fetch/ack scoping.
This commit is contained in:
2026-03-04 20:16:41 +01:00
parent eaca24397b
commit 799aab68fe
2 changed files with 75 additions and 16 deletions

View File

@@ -35,6 +35,19 @@ pub async fn handle_enqueue(state: Arc<ServerState>, ctx: RequestContext) -> Han
return HandlerResult::err(RpcStatus::RateLimited, "rate limit exceeded"); return HandlerResult::err(RpcStatus::RateLimited, "rate limit exceeded");
} }
// Idempotency dedup: if message_id is provided and already seen, return the cached seq.
if !req.message_id.is_empty() {
if let Some(entry) = state.seen_message_ids.get(&req.message_id) {
let (cached_seq, _ts) = *entry;
let proto = v1::EnqueueResponse {
seq: cached_seq,
delivery_proof: Vec::new(),
duplicate: true,
};
return HandlerResult::ok(Bytes::from(proto.encode_to_vec()));
}
}
let svc = DeliveryService { let svc = DeliveryService {
store: Arc::clone(&state.store), store: Arc::clone(&state.store),
waiters: Arc::clone(&state.waiters), waiters: Arc::clone(&state.waiters),
@@ -49,6 +62,12 @@ pub async fn handle_enqueue(state: Arc<ServerState>, ctx: RequestContext) -> Han
match svc.enqueue(domain_req) { match svc.enqueue(domain_req) {
Ok(resp) => { Ok(resp) => {
// Record message_id for dedup.
if !req.message_id.is_empty() {
let now = crate::auth::current_timestamp();
state.seen_message_ids.insert(req.message_id, (resp.seq, now));
}
// Fire hook. // Fire hook.
let action = state.hooks.on_message_enqueue(&MessageEvent { let action = state.hooks.on_message_enqueue(&MessageEvent {
sender_identity: Some(identity_key), sender_identity: Some(identity_key),
@@ -64,6 +83,7 @@ pub async fn handle_enqueue(state: Arc<ServerState>, ctx: RequestContext) -> Han
let proto = v1::EnqueueResponse { let proto = v1::EnqueueResponse {
seq: resp.seq, seq: resp.seq,
delivery_proof: resp.delivery_proof, delivery_proof: resp.delivery_proof,
duplicate: false,
}; };
HandlerResult::ok(Bytes::from(proto.encode_to_vec())) HandlerResult::ok(Bytes::from(proto.encode_to_vec()))
} }
@@ -87,12 +107,19 @@ pub async fn handle_fetch(state: Arc<ServerState>, ctx: RequestContext) -> Handl
waiters: Arc::clone(&state.waiters), waiters: Arc::clone(&state.waiters),
}; };
let base_key = if req.recipient_key.is_empty() {
identity_key
} else {
req.recipient_key
};
let recipient_key = if req.device_id.is_empty() {
base_key
} else {
DeliveryService::device_recipient_key(&base_key, &req.device_id)
};
let domain_req = FetchReq { let domain_req = FetchReq {
recipient_key: if req.recipient_key.is_empty() { recipient_key,
identity_key
} else {
req.recipient_key
},
channel_id: req.channel_id, channel_id: req.channel_id,
limit: req.limit, limit: req.limit,
}; };
@@ -126,11 +153,16 @@ pub async fn handle_fetch_wait(state: Arc<ServerState>, ctx: RequestContext) ->
Err(e) => return HandlerResult::err(RpcStatus::BadRequest, &format!("decode: {e}")), Err(e) => return HandlerResult::err(RpcStatus::BadRequest, &format!("decode: {e}")),
}; };
let recipient_key = if req.recipient_key.is_empty() { let base_key = if req.recipient_key.is_empty() {
identity_key identity_key
} else { } else {
req.recipient_key req.recipient_key
}; };
let recipient_key = if req.device_id.is_empty() {
base_key
} else {
DeliveryService::device_recipient_key(&base_key, &req.device_id)
};
let timeout_ms = if req.timeout_ms == 0 { let timeout_ms = if req.timeout_ms == 0 {
30_000 30_000
@@ -221,12 +253,19 @@ pub async fn handle_peek(state: Arc<ServerState>, ctx: RequestContext) -> Handle
waiters: Arc::clone(&state.waiters), waiters: Arc::clone(&state.waiters),
}; };
let base_key = if req.recipient_key.is_empty() {
identity_key
} else {
req.recipient_key
};
let recipient_key = if req.device_id.is_empty() {
base_key
} else {
DeliveryService::device_recipient_key(&base_key, &req.device_id)
};
let domain_req = PeekReq { let domain_req = PeekReq {
recipient_key: if req.recipient_key.is_empty() { recipient_key,
identity_key
} else {
req.recipient_key
},
channel_id: req.channel_id, channel_id: req.channel_id,
limit: req.limit, limit: req.limit,
}; };
@@ -265,12 +304,19 @@ pub async fn handle_ack(state: Arc<ServerState>, ctx: RequestContext) -> Handler
waiters: Arc::clone(&state.waiters), waiters: Arc::clone(&state.waiters),
}; };
let base_key = if req.recipient_key.is_empty() {
identity_key
} else {
req.recipient_key
};
let recipient_key = if req.device_id.is_empty() {
base_key
} else {
DeliveryService::device_recipient_key(&base_key, &req.device_id)
};
let domain_req = AckReq { let domain_req = AckReq {
recipient_key: if req.recipient_key.is_empty() { recipient_key,
identity_key
} else {
req.recipient_key
},
channel_id: req.channel_id, channel_id: req.channel_id,
seq_up_to: req.seq_up_to, seq_up_to: req.seq_up_to,
}; };

View File

@@ -14,17 +14,25 @@ message EnqueueRequest {
bytes payload = 2; bytes payload = 2;
bytes channel_id = 3; bytes channel_id = 3;
uint32 ttl_secs = 4; uint32 ttl_secs = 4;
// Client-generated idempotency key (16 bytes, UUID v7).
// Server deduplicates enqueue requests with the same message_id within a TTL window.
bytes message_id = 5;
} }
message EnqueueResponse { message EnqueueResponse {
uint64 seq = 1; uint64 seq = 1;
bytes delivery_proof = 2; bytes delivery_proof = 2;
// True if this was a duplicate enqueue (message_id already seen).
bool duplicate = 3;
} }
message FetchRequest { message FetchRequest {
bytes recipient_key = 1; bytes recipient_key = 1;
bytes channel_id = 2; bytes channel_id = 2;
uint32 limit = 3; uint32 limit = 3;
// Device ID for multi-device scoping. When set, the server builds
// a composite queue key (identity_key + device_id). Empty = bare key.
bytes device_id = 4;
} }
message FetchResponse { message FetchResponse {
@@ -36,6 +44,7 @@ message FetchWaitRequest {
bytes channel_id = 2; bytes channel_id = 2;
uint64 timeout_ms = 3; uint64 timeout_ms = 3;
uint32 limit = 4; uint32 limit = 4;
bytes device_id = 5;
} }
message FetchWaitResponse { message FetchWaitResponse {
@@ -46,6 +55,7 @@ message PeekRequest {
bytes recipient_key = 1; bytes recipient_key = 1;
bytes channel_id = 2; bytes channel_id = 2;
uint32 limit = 3; uint32 limit = 3;
bytes device_id = 4;
} }
message PeekResponse { message PeekResponse {
@@ -56,6 +66,7 @@ message AckRequest {
bytes recipient_key = 1; bytes recipient_key = 1;
bytes channel_id = 2; bytes channel_id = 2;
uint64 seq_up_to = 3; uint64 seq_up_to = 3;
bytes device_id = 4;
} }
message AckResponse {} message AckResponse {}
@@ -65,6 +76,8 @@ message BatchEnqueueRequest {
bytes payload = 2; bytes payload = 2;
bytes channel_id = 3; bytes channel_id = 3;
uint32 ttl_secs = 4; uint32 ttl_secs = 4;
// Client-generated idempotency key (16 bytes, UUID v7).
bytes message_id = 5;
} }
message BatchEnqueueResponse { message BatchEnqueueResponse {