//! Wire format encoding and decoding for the quicprochat v2 RPC protocol. //! //! ## Request frame //! ```text //! [method_id: u16 BE][request_id: u32 BE][payload_len: u32 BE][protobuf bytes] //! ``` //! //! ## Response frame //! ```text //! [status: u8][request_id: u32 BE][payload_len: u32 BE][protobuf bytes] //! ``` //! //! ## Push frame (server → client, uni-stream) //! ```text //! [event_type: u16 BE][payload_len: u32 BE][protobuf bytes] //! ``` use bytes::{Buf, BufMut, Bytes, BytesMut}; use crate::error::{RpcError, RpcStatus}; /// Maximum payload size: 4 MiB. pub const MAX_PAYLOAD_SIZE: usize = 4 * 1024 * 1024; /// Request header size: 2 (method) + 4 (req_id) + 4 (len) = 10 bytes. pub const REQUEST_HEADER_SIZE: usize = 10; /// Response header size: 1 (status) + 4 (req_id) + 4 (len) = 9 bytes. pub const RESPONSE_HEADER_SIZE: usize = 9; /// Push header size: 2 (event_type) + 4 (len) = 6 bytes. pub const PUSH_HEADER_SIZE: usize = 6; // ── Request ────────────────────────────────────────────────────────────────── /// A decoded RPC request frame. #[derive(Debug, Clone)] pub struct RequestFrame { pub method_id: u16, pub request_id: u32, pub payload: Bytes, } impl RequestFrame { /// Encode this request into a byte buffer. pub fn encode(&self) -> Bytes { let mut buf = BytesMut::with_capacity(REQUEST_HEADER_SIZE + self.payload.len()); buf.put_u16(self.method_id); buf.put_u32(self.request_id); buf.put_u32(self.payload.len() as u32); buf.put(self.payload.clone()); buf.freeze() } /// Decode a request frame from a byte buffer. /// Returns `None` if the buffer does not contain a complete frame. pub fn decode(buf: &mut BytesMut) -> Result, RpcError> { if buf.len() < REQUEST_HEADER_SIZE { return Ok(None); } // Peek at payload_len without consuming. let payload_len = u32::from_be_bytes([buf[6], buf[7], buf[8], buf[9]]) as usize; if payload_len > MAX_PAYLOAD_SIZE { return Err(RpcError::PayloadTooLarge { size: payload_len, max: MAX_PAYLOAD_SIZE, }); } let total = REQUEST_HEADER_SIZE + payload_len; if buf.len() < total { return Ok(None); } let method_id = buf.get_u16(); let request_id = buf.get_u32(); let _len = buf.get_u32(); let payload = buf.split_to(payload_len).freeze(); Ok(Some(Self { method_id, request_id, payload, })) } } // ── Response ───────────────────────────────────────────────────────────────── /// A decoded RPC response frame. #[derive(Debug, Clone)] pub struct ResponseFrame { pub status: u8, pub request_id: u32, pub payload: Bytes, } impl ResponseFrame { /// Encode this response into a byte buffer. pub fn encode(&self) -> Bytes { let mut buf = BytesMut::with_capacity(RESPONSE_HEADER_SIZE + self.payload.len()); buf.put_u8(self.status); buf.put_u32(self.request_id); buf.put_u32(self.payload.len() as u32); buf.put(self.payload.clone()); buf.freeze() } /// Decode a response frame from a byte buffer. pub fn decode(buf: &mut BytesMut) -> Result, RpcError> { if buf.len() < RESPONSE_HEADER_SIZE { return Ok(None); } let payload_len = u32::from_be_bytes([buf[5], buf[6], buf[7], buf[8]]) as usize; if payload_len > MAX_PAYLOAD_SIZE { return Err(RpcError::PayloadTooLarge { size: payload_len, max: MAX_PAYLOAD_SIZE, }); } let total = RESPONSE_HEADER_SIZE + payload_len; if buf.len() < total { return Ok(None); } let status = buf.get_u8(); let request_id = buf.get_u32(); let _len = buf.get_u32(); let payload = buf.split_to(payload_len).freeze(); Ok(Some(Self { status, request_id, payload, })) } /// Convert the status byte to an `RpcStatus`. pub fn rpc_status(&self) -> Option { RpcStatus::from_u8(self.status) } } // ── Push ───────────────────────────────────────────────────────────────────── /// A decoded server-push event frame (sent on QUIC uni-streams). #[derive(Debug, Clone)] pub struct PushFrame { pub event_type: u16, pub payload: Bytes, } impl PushFrame { /// Encode this push frame into a byte buffer. pub fn encode(&self) -> Bytes { let mut buf = BytesMut::with_capacity(PUSH_HEADER_SIZE + self.payload.len()); buf.put_u16(self.event_type); buf.put_u32(self.payload.len() as u32); buf.put(self.payload.clone()); buf.freeze() } /// Decode a push frame from a byte buffer. pub fn decode(buf: &mut BytesMut) -> Result, RpcError> { if buf.len() < PUSH_HEADER_SIZE { return Ok(None); } let payload_len = u32::from_be_bytes([buf[2], buf[3], buf[4], buf[5]]) as usize; if payload_len > MAX_PAYLOAD_SIZE { return Err(RpcError::PayloadTooLarge { size: payload_len, max: MAX_PAYLOAD_SIZE, }); } let total = PUSH_HEADER_SIZE + payload_len; if buf.len() < total { return Ok(None); } let event_type = buf.get_u16(); let _len = buf.get_u32(); let payload = buf.split_to(payload_len).freeze(); Ok(Some(Self { event_type, payload, })) } } #[cfg(test)] mod tests { use super::*; #[test] fn request_roundtrip() { let frame = RequestFrame { method_id: 42, request_id: 1001, payload: Bytes::from_static(b"hello"), }; let encoded = frame.encode(); let mut buf = BytesMut::from(encoded.as_ref()); let decoded = RequestFrame::decode(&mut buf).expect("decode").expect("complete"); assert_eq!(decoded.method_id, 42); assert_eq!(decoded.request_id, 1001); assert_eq!(decoded.payload, Bytes::from_static(b"hello")); assert!(buf.is_empty()); } #[test] fn response_roundtrip() { let frame = ResponseFrame { status: RpcStatus::Ok as u8, request_id: 2002, payload: Bytes::from_static(b"world"), }; let encoded = frame.encode(); let mut buf = BytesMut::from(encoded.as_ref()); let decoded = ResponseFrame::decode(&mut buf).expect("decode").expect("complete"); assert_eq!(decoded.status, 0); assert_eq!(decoded.request_id, 2002); assert_eq!(decoded.payload, Bytes::from_static(b"world")); } #[test] fn push_roundtrip() { let frame = PushFrame { event_type: 7, payload: Bytes::from_static(b"event-data"), }; let encoded = frame.encode(); let mut buf = BytesMut::from(encoded.as_ref()); let decoded = PushFrame::decode(&mut buf).expect("decode").expect("complete"); assert_eq!(decoded.event_type, 7); assert_eq!(decoded.payload, Bytes::from_static(b"event-data")); } #[test] fn incomplete_request_returns_none() { let mut buf = BytesMut::from(&[0u8; 5][..]); assert!(RequestFrame::decode(&mut buf).expect("no error").is_none()); } #[test] fn payload_too_large_rejected() { // Craft a request header with payload_len = MAX + 1. let mut buf = BytesMut::new(); buf.put_u16(1); buf.put_u32(1); buf.put_u32((MAX_PAYLOAD_SIZE + 1) as u32); let result = RequestFrame::decode(&mut buf); assert!(matches!(result, Err(RpcError::PayloadTooLarge { .. }))); } #[test] fn empty_payload_request() { let frame = RequestFrame { method_id: 0, request_id: 0, payload: Bytes::new(), }; let encoded = frame.encode(); assert_eq!(encoded.len(), REQUEST_HEADER_SIZE); let mut buf = BytesMut::from(encoded.as_ref()); let decoded = RequestFrame::decode(&mut buf).expect("decode").expect("complete"); assert!(decoded.payload.is_empty()); } // ── Additional framing tests ──────────────────────────────────────────── #[test] fn empty_payload_response() { let frame = ResponseFrame { status: RpcStatus::NotFound as u8, request_id: 999, payload: Bytes::new(), }; let encoded = frame.encode(); assert_eq!(encoded.len(), RESPONSE_HEADER_SIZE); let mut buf = BytesMut::from(encoded.as_ref()); let decoded = ResponseFrame::decode(&mut buf).expect("decode").expect("complete"); assert!(decoded.payload.is_empty()); assert_eq!(decoded.status, RpcStatus::NotFound as u8); assert_eq!(decoded.request_id, 999); } #[test] fn empty_payload_push() { let frame = PushFrame { event_type: 0, payload: Bytes::new(), }; let encoded = frame.encode(); assert_eq!(encoded.len(), PUSH_HEADER_SIZE); let mut buf = BytesMut::from(encoded.as_ref()); let decoded = PushFrame::decode(&mut buf).expect("decode").expect("complete"); assert!(decoded.payload.is_empty()); assert_eq!(decoded.event_type, 0); } #[test] fn max_boundary_payload_request() { // Exactly MAX_PAYLOAD_SIZE should succeed (not exceed limit). let payload = vec![0xABu8; MAX_PAYLOAD_SIZE]; let frame = RequestFrame { method_id: 1, request_id: 1, payload: Bytes::from(payload.clone()), }; let encoded = frame.encode(); assert_eq!(encoded.len(), REQUEST_HEADER_SIZE + MAX_PAYLOAD_SIZE); let mut buf = BytesMut::from(encoded.as_ref()); let decoded = RequestFrame::decode(&mut buf).expect("decode").expect("complete"); assert_eq!(decoded.payload.len(), MAX_PAYLOAD_SIZE); assert_eq!(decoded.payload[0], 0xAB); } #[test] fn response_payload_too_large_rejected() { let mut buf = BytesMut::new(); buf.put_u8(0); // status OK buf.put_u32(1); // request_id buf.put_u32((MAX_PAYLOAD_SIZE + 1) as u32); let result = ResponseFrame::decode(&mut buf); assert!(matches!(result, Err(RpcError::PayloadTooLarge { .. }))); } #[test] fn push_payload_too_large_rejected() { let mut buf = BytesMut::new(); buf.put_u16(1); // event_type buf.put_u32((MAX_PAYLOAD_SIZE + 1) as u32); let result = PushFrame::decode(&mut buf); assert!(matches!(result, Err(RpcError::PayloadTooLarge { .. }))); } #[test] fn incomplete_response_returns_none() { // Less than RESPONSE_HEADER_SIZE bytes let mut buf = BytesMut::from(&[0u8; 4][..]); assert!(ResponseFrame::decode(&mut buf).expect("no error").is_none()); } #[test] fn incomplete_push_returns_none() { // Less than PUSH_HEADER_SIZE bytes let mut buf = BytesMut::from(&[0u8; 3][..]); assert!(PushFrame::decode(&mut buf).expect("no error").is_none()); } #[test] fn request_header_present_but_payload_incomplete() { // Full header but payload not yet received let frame = RequestFrame { method_id: 10, request_id: 20, payload: Bytes::from_static(b"abcdefgh"), }; let encoded = frame.encode(); // Truncate to header + 3 bytes of payload (need 8) let mut buf = BytesMut::from(&encoded[..REQUEST_HEADER_SIZE + 3]); assert!(RequestFrame::decode(&mut buf).expect("no error").is_none()); // Buffer should be untouched (not consumed) assert_eq!(buf.len(), REQUEST_HEADER_SIZE + 3); } #[test] fn response_header_present_but_payload_incomplete() { let frame = ResponseFrame { status: 0, request_id: 1, payload: Bytes::from_static(b"abcdefgh"), }; let encoded = frame.encode(); let mut buf = BytesMut::from(&encoded[..RESPONSE_HEADER_SIZE + 2]); assert!(ResponseFrame::decode(&mut buf).expect("no error").is_none()); } #[test] fn push_header_present_but_payload_incomplete() { let frame = PushFrame { event_type: 1, payload: Bytes::from_static(b"abcdefgh"), }; let encoded = frame.encode(); let mut buf = BytesMut::from(&encoded[..PUSH_HEADER_SIZE + 2]); assert!(PushFrame::decode(&mut buf).expect("no error").is_none()); } #[test] fn request_zero_length_prefix() { // Zero-length payload in the header is valid (empty payload) let mut buf = BytesMut::new(); buf.put_u16(5); // method_id buf.put_u32(10); // request_id buf.put_u32(0); // payload_len = 0 let decoded = RequestFrame::decode(&mut buf).expect("decode").expect("complete"); assert_eq!(decoded.method_id, 5); assert_eq!(decoded.request_id, 10); assert!(decoded.payload.is_empty()); } #[test] fn response_rpc_status_conversion() { let frame = ResponseFrame { status: RpcStatus::Unauthorized as u8, request_id: 1, payload: Bytes::new(), }; assert_eq!(frame.rpc_status(), Some(RpcStatus::Unauthorized)); let unknown = ResponseFrame { status: 255, request_id: 1, payload: Bytes::new(), }; assert_eq!(unknown.rpc_status(), None); } #[test] fn all_method_ids_roundtrip() { // Test a selection of method IDs spanning the full u16 range let method_ids: &[u16] = &[0, 1, 100, 200, 300, 400, 500, 1000, u16::MAX]; for &mid in method_ids { let frame = RequestFrame { method_id: mid, request_id: mid as u32, payload: Bytes::from_static(b"x"), }; let encoded = frame.encode(); let mut buf = BytesMut::from(encoded.as_ref()); let decoded = RequestFrame::decode(&mut buf).unwrap().unwrap(); assert_eq!(decoded.method_id, mid); assert_eq!(decoded.request_id, mid as u32); } } #[test] fn all_rpc_status_values_roundtrip() { let statuses = [ RpcStatus::Ok, RpcStatus::BadRequest, RpcStatus::Unauthorized, RpcStatus::Forbidden, RpcStatus::NotFound, RpcStatus::RateLimited, RpcStatus::DeadlineExceeded, RpcStatus::Unavailable, RpcStatus::Internal, RpcStatus::UnknownMethod, ]; for status in statuses { let frame = ResponseFrame { status: status as u8, request_id: 1, payload: Bytes::new(), }; let encoded = frame.encode(); let mut buf = BytesMut::from(encoded.as_ref()); let decoded = ResponseFrame::decode(&mut buf).unwrap().unwrap(); assert_eq!(decoded.rpc_status(), Some(status)); } } #[test] fn request_max_request_id() { let frame = RequestFrame { method_id: 1, request_id: u32::MAX, payload: Bytes::from_static(b"max-id"), }; let encoded = frame.encode(); let mut buf = BytesMut::from(encoded.as_ref()); let decoded = RequestFrame::decode(&mut buf).unwrap().unwrap(); assert_eq!(decoded.request_id, u32::MAX); } #[test] fn multiple_frames_in_buffer() { // Two request frames concatenated in one buffer let f1 = RequestFrame { method_id: 1, request_id: 10, payload: Bytes::from_static(b"first"), }; let f2 = RequestFrame { method_id: 2, request_id: 20, payload: Bytes::from_static(b"second"), }; let mut buf = BytesMut::new(); buf.extend_from_slice(&f1.encode()); buf.extend_from_slice(&f2.encode()); let d1 = RequestFrame::decode(&mut buf).unwrap().unwrap(); assert_eq!(d1.method_id, 1); assert_eq!(d1.payload, Bytes::from_static(b"first")); let d2 = RequestFrame::decode(&mut buf).unwrap().unwrap(); assert_eq!(d2.method_id, 2); assert_eq!(d2.payload, Bytes::from_static(b"second")); assert!(buf.is_empty()); } }