From 49e8e066d775b1564cffa219a3979f2853784ab4 Mon Sep 17 00:00:00 2001 From: Christian Nennemann Date: Wed, 4 Mar 2026 20:52:02 +0100 Subject: [PATCH] feat(sdk): add Python SDK with QUIC and FFI transport backends Implements quicproquo-py with two transport backends: - Async QUIC transport via aioquic with v2 protobuf wire format - Synchronous Rust FFI transport via CFFI wrapping libquicproquo_ffi Includes manual protobuf encode/decode (no codegen), full RPC coverage (auth, delivery, channels, users, keys, health), PyPI-ready packaging, async echo bot and FFI demo examples, and 15 passing unit tests. --- sdks/python/.gitignore | 6 + sdks/python/README.md | 165 +++++++++++++++ sdks/python/examples/bot.py | 96 +++++++++ sdks/python/examples/ffi_demo.py | 60 ++++++ sdks/python/pyproject.toml | 55 +++++ sdks/python/quicproquo/__init__.py | 38 ++++ sdks/python/quicproquo/client.py | 291 ++++++++++++++++++++++++++ sdks/python/quicproquo/ffi.py | 192 ++++++++++++++++++ sdks/python/quicproquo/proto.py | 303 ++++++++++++++++++++++++++++ sdks/python/quicproquo/transport.py | 181 +++++++++++++++++ sdks/python/quicproquo/types.py | 92 +++++++++ sdks/python/quicproquo/wire.py | 73 +++++++ sdks/python/tests/__init__.py | 0 sdks/python/tests/test_proto.py | 89 ++++++++ sdks/python/tests/test_types.py | 50 +++++ sdks/python/tests/test_wire.py | 41 ++++ 16 files changed, 1732 insertions(+) create mode 100644 sdks/python/.gitignore create mode 100644 sdks/python/README.md create mode 100644 sdks/python/examples/bot.py create mode 100644 sdks/python/examples/ffi_demo.py create mode 100644 sdks/python/pyproject.toml create mode 100644 sdks/python/quicproquo/__init__.py create mode 100644 sdks/python/quicproquo/client.py create mode 100644 sdks/python/quicproquo/ffi.py create mode 100644 sdks/python/quicproquo/proto.py create mode 100644 sdks/python/quicproquo/transport.py create mode 100644 sdks/python/quicproquo/types.py create mode 100644 sdks/python/quicproquo/wire.py create mode 100644 sdks/python/tests/__init__.py create mode 100644 sdks/python/tests/test_proto.py create mode 100644 sdks/python/tests/test_types.py create mode 100644 sdks/python/tests/test_wire.py diff --git a/sdks/python/.gitignore b/sdks/python/.gitignore new file mode 100644 index 0000000..d0900c8 --- /dev/null +++ b/sdks/python/.gitignore @@ -0,0 +1,6 @@ +__pycache__/ +*.pyc +*.egg-info/ +dist/ +build/ +.pytest_cache/ diff --git a/sdks/python/README.md b/sdks/python/README.md new file mode 100644 index 0000000..546de39 --- /dev/null +++ b/sdks/python/README.md @@ -0,0 +1,165 @@ +# quicproquo Python SDK + +Python client library for the [quicproquo](https://github.com/nicholasgasior/quicproquo) E2E encrypted messenger. + +## Prerequisites + +- Python 3.10+ +- A running quicproquo server + +## Installation + +```sh +pip install quicproquo +``` + +For development: + +```sh +pip install -e ".[dev]" +``` + +## Transport Backends + +### 1. Async QUIC (pure Python) + +Uses [aioquic](https://github.com/aiortc/aioquic) for native QUIC transport with the v2 protobuf wire format. No Rust dependency required. + +```python +import asyncio +from quicproquo import QpqClient, ConnectOptions + +async def main(): + client = await QpqClient.connect(ConnectOptions( + addr="127.0.0.1:5001", + ca_cert_path="ca.pem", + )) + + health = await client.health() + print(f"Server: {health.status}") + + # OPAQUE auth (requires external OPAQUE library for crypto) + server_resp = await client.login_start("alice", opaque_request_bytes) + # ... process server_resp with OPAQUE library ... + token = await client.login_finish("alice", finalization, identity_key) + + # Resolve a user + key, proof = await client.resolve_user("bob") + + # Send a message + seq, proof = await client.send(recipient_key, b"hello") + + # Receive messages (long-poll) + messages = await client.receive_wait(my_key, timeout_ms=5000) + for msg in messages: + print(f"[{msg.seq}] {msg.data}") + + await client.close() + +asyncio.run(main()) +``` + +### 2. Rust FFI (synchronous) + +Wraps `libquicproquo_ffi` via CFFI for full Rust crypto stack (MLS, hybrid KEM, OPAQUE) at native speed. + +```sh +# Build the FFI library first +cargo build --release -p quicproquo-ffi +``` + +```python +from quicproquo import QpqClient, ConnectOptions + +client = QpqClient.connect_ffi(ConnectOptions( + addr="127.0.0.1:5001", + ca_cert_path="ca.pem", +)) + +client.ffi_login("alice", "password123") +client.ffi_send("bob", b"hello from Python!") + +messages = client.ffi_receive(timeout_ms=5000) +for msg in messages: + print(msg) + +client.close_sync() +``` + +## API Reference + +### Connection + +| Method | Transport | Description | +|---|---|---| +| `QpqClient.connect(opts)` | QUIC | Async connect to server | +| `QpqClient.connect_ffi(opts)` | FFI | Sync connect via Rust FFI | +| `client.close()` | QUIC | Async disconnect | +| `client.close_sync()` | Both | Sync disconnect | + +### Authentication (QUIC) + +| Method | Description | +|---|---| +| `client.register_start(username, request)` | Start OPAQUE registration | +| `client.register_finish(username, upload, identity_key)` | Complete registration | +| `client.login_start(username, request)` | Start OPAQUE login | +| `client.login_finish(username, finalization, identity_key)` | Complete login | +| `client.set_session_token(token)` | Set pre-existing session token | + +### Authentication (FFI) + +| Method | Description | +|---|---| +| `client.ffi_login(username, password)` | Full OPAQUE login (Rust handles crypto) | + +### Messaging (QUIC) + +| Method | Description | +|---|---| +| `client.health()` | Server health check | +| `client.resolve_user(username)` | Look up identity key | +| `client.resolve_identity(key)` | Reverse look up username | +| `client.create_channel(peer_key)` | Create 1:1 DM channel | +| `client.send(recipient_key, payload)` | Send a message | +| `client.receive(recipient_key)` | Fetch queued messages | +| `client.receive_wait(recipient_key, timeout_ms=5000)` | Long-poll for messages | +| `client.ack(recipient_key, seq_up_to)` | Acknowledge messages | +| `client.upload_key_package(key, package)` | Upload MLS key package | +| `client.fetch_key_package(key)` | Fetch MLS key package | +| `client.delete_account()` | Permanently delete account | + +### Messaging (FFI) + +| Method | Description | +|---|---| +| `client.ffi_send(recipient, message)` | Send message by username | +| `client.ffi_receive(timeout_ms=5000)` | Receive pending messages | + +## Wire Format + +The SDK implements the qpq v2 wire format: + +``` +[method_id:u16][req_id:u32][len:u32][protobuf payload] +``` + +Each RPC is sent over its own QUIC bidirectional stream. + +## Structure + +- `quicproquo/client.py` -- High-level client API +- `quicproquo/transport.py` -- QUIC transport (aioquic) +- `quicproquo/ffi.py` -- Rust FFI transport (CFFI) +- `quicproquo/proto.py` -- Protobuf encode/decode (no codegen) +- `quicproquo/wire.py` -- v2 wire format framing +- `quicproquo/types.py` -- Data types and exceptions +- `examples/bot.py` -- Async echo bot example +- `examples/ffi_demo.py` -- Synchronous FFI example + +## Running Tests + +```sh +pip install -e ".[dev]" +pytest +``` diff --git a/sdks/python/examples/bot.py b/sdks/python/examples/bot.py new file mode 100644 index 0000000..4382c8a --- /dev/null +++ b/sdks/python/examples/bot.py @@ -0,0 +1,96 @@ +#!/usr/bin/env python3 +"""Example: async echo bot using the quicproquo Python SDK. + +Connects to a qpq server, authenticates, and echoes back any received +messages with a "[bot] " prefix. + +Usage: + python bot.py --server 127.0.0.1:5001 --ca-cert ca.pem + +This example uses the QUIC transport with the v2 wire format. +OPAQUE authentication requires external crypto; this demo assumes +a session token is obtained externally and set via --token. +""" + +from __future__ import annotations + +import argparse +import asyncio +import signal +import sys + +from quicproquo import QpqClient, ConnectOptions + + +async def run_bot(opts: ConnectOptions, token: bytes, identity_key: bytes) -> None: + client = await QpqClient.connect(opts) + client.set_session_token(token) + + print(f"Connected to {opts.addr}") + + health = await client.health() + print(f"Server status: {health.status} (v{health.version})") + + # Poll loop. + running = True + + def on_signal() -> None: + nonlocal running + running = False + print("\nShutting down...") + + loop = asyncio.get_running_loop() + for sig in (signal.SIGINT, signal.SIGTERM): + loop.add_signal_handler(sig, on_signal) + + while running: + try: + messages = await client.receive_wait( + identity_key, timeout_ms=5000 + ) + except Exception as exc: + print(f"receive error: {exc}") + await asyncio.sleep(1) + continue + + for msg in messages: + text = msg.data.decode("utf-8", errors="replace") + print(f"[seq={msg.seq}] {text}") + + # Echo back with prefix. + echo = f"[bot] {text}".encode("utf-8") + try: + seq, _ = await client.send(identity_key, echo) + print(f" -> echoed (seq={seq})") + except Exception as exc: + print(f" -> send error: {exc}") + + await client.close() + print("Disconnected.") + + +def main() -> None: + parser = argparse.ArgumentParser(description="qpq echo bot") + parser.add_argument("--server", default="127.0.0.1:5001", help="server address") + parser.add_argument("--ca-cert", default="", help="CA certificate path") + parser.add_argument("--server-name", default="", help="TLS server name") + parser.add_argument("--token", required=True, help="session token (hex)") + parser.add_argument("--identity-key", required=True, help="identity key (hex)") + parser.add_argument("--insecure", action="store_true", help="skip TLS verification") + args = parser.parse_args() + + opts = ConnectOptions( + addr=args.server, + ca_cert_path=args.ca_cert, + server_name=args.server_name, + insecure_skip_verify=args.insecure, + ) + + token = bytes.fromhex(args.token) + identity_key = bytes.fromhex(args.identity_key) + + asyncio.run(run_bot(opts, token, identity_key)) + + +if __name__ == "__main__": + main() diff --git a/sdks/python/examples/ffi_demo.py b/sdks/python/examples/ffi_demo.py new file mode 100644 index 0000000..3377e11 --- /dev/null +++ b/sdks/python/examples/ffi_demo.py @@ -0,0 +1,60 @@ +#!/usr/bin/env python3 +"""Example: synchronous messaging using the Rust FFI backend. + +Requires libquicproquo_ffi to be built: + cargo build --release -p quicproquo-ffi + +Set QPQ_LIB_PATH if the library is not in the default search path. + +Usage: + python ffi_demo.py --server 127.0.0.1:5001 \ + --ca-cert ca.pem --user alice --pass secret +""" + +from __future__ import annotations + +import argparse + +from quicproquo import QpqClient, ConnectOptions + + +def main() -> None: + parser = argparse.ArgumentParser(description="qpq FFI demo") + parser.add_argument("--server", default="127.0.0.1:5001") + parser.add_argument("--ca-cert", default="ca.pem") + parser.add_argument("--server-name", default="") + parser.add_argument("--user", required=True) + parser.add_argument("--pass", dest="password", required=True) + parser.add_argument("--recipient", required=True, help="recipient username") + parser.add_argument("--message", default="hello from Python SDK!") + args = parser.parse_args() + + opts = ConnectOptions( + addr=args.server, + ca_cert_path=args.ca_cert, + server_name=args.server_name, + ) + + client = QpqClient.connect_ffi(opts) + + try: + print(f"Connected to {args.server}") + + client.ffi_login(args.user, args.password) + print(f"Logged in as {args.user}") + + client.ffi_send(args.recipient, args.message.encode("utf-8")) + print(f"Sent message to {args.recipient}") + + print("Waiting for messages (5s)...") + messages = client.ffi_receive(timeout_ms=5000) + for msg in messages: + print(f" received: {msg}") + + finally: + client.close_sync() + print("Disconnected.") + + +if __name__ == "__main__": + main() diff --git a/sdks/python/pyproject.toml b/sdks/python/pyproject.toml new file mode 100644 index 0000000..53216a8 --- /dev/null +++ b/sdks/python/pyproject.toml @@ -0,0 +1,55 @@ +[build-system] +requires = ["setuptools>=68.0", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "quicproquo" +version = "0.1.0" +description = "Python SDK for quicproquo E2E encrypted messenger" +readme = "README.md" +license = "MIT" +requires-python = ">=3.10" +authors = [{ name = "quicproquo contributors" }] +keywords = ["quicproquo", "e2e", "encrypted", "messaging", "quic"] +classifiers = [ + "Development Status :: 3 - Alpha", + "Intended Audience :: Developers", + "License :: OSI Approved :: MIT License", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", + "Topic :: Communications :: Chat", + "Topic :: Security :: Cryptography", + "Framework :: AsyncIO", +] + +dependencies = [ + "aioquic>=1.0.0", + "protobuf>=5.26.0", + "cffi>=1.16.0", +] + +[project.optional-dependencies] +dev = [ + "pytest>=8.0", + "pytest-asyncio>=0.23", + "mypy>=1.8", + "ruff>=0.3", +] + +[project.urls] +Homepage = "https://github.com/nicholasgasior/quicproquo" +Repository = "https://github.com/nicholasgasior/quicproquo" + +[tool.setuptools.packages.find] +include = ["quicproquo*"] + +[tool.ruff] +target-version = "py310" +line-length = 99 + +[tool.mypy] +python_version = "3.10" +strict = true diff --git a/sdks/python/quicproquo/__init__.py b/sdks/python/quicproquo/__init__.py new file mode 100644 index 0000000..5946e2a --- /dev/null +++ b/sdks/python/quicproquo/__init__.py @@ -0,0 +1,38 @@ +"""quicproquo -- Python SDK for the quicproquo E2E encrypted messenger. + +Two transport backends are available: + +1. **FFI** (``QpqClient.connect_ffi``): wraps the Rust ``libquicproquo_ffi`` + shared library via CFFI. This gives you the full Rust crypto stack + (MLS, hybrid KEM, OPAQUE) at native speed. + +2. **QUIC** (``QpqClient.connect``): pure-Python QUIC transport via *aioquic* + with protobuf (v2 wire format). No Rust dependency required; crypto + operations must be supplied externally. +""" + +from quicproquo.client import QpqClient +from quicproquo.types import ( + ConnectOptions, + Envelope, + ChannelResult, + HealthInfo, + QpqError, + AuthError, + TimeoutError as QpqTimeoutError, + ConnectionError as QpqConnectionError, +) + +__version__ = "0.1.0" + +__all__ = [ + "QpqClient", + "ConnectOptions", + "Envelope", + "ChannelResult", + "HealthInfo", + "QpqError", + "AuthError", + "QpqTimeoutError", + "QpqConnectionError", +] diff --git a/sdks/python/quicproquo/client.py b/sdks/python/quicproquo/client.py new file mode 100644 index 0000000..8dbf113 --- /dev/null +++ b/sdks/python/quicproquo/client.py @@ -0,0 +1,291 @@ +"""High-level quicproquo client. + +Provides both async (QUIC transport) and sync (FFI transport) APIs for +interacting with a quicproquo server. +""" + +from __future__ import annotations + +from typing import Optional + +from quicproquo.types import ( + ConnectOptions, + Envelope, + ChannelResult, + HealthInfo, + ConnectionError, +) +from quicproquo.transport import QuicTransport +from quicproquo.ffi import FfiTransport +from quicproquo import proto, wire + + +class QpqClient: + """High-level quicproquo client. + + Use ``QpqClient.connect()`` for the async QUIC transport, or + ``QpqClient.connect_ffi()`` for the synchronous Rust FFI backend. + + Example (async):: + + client = await QpqClient.connect(ConnectOptions(addr="127.0.0.1:5001")) + health = await client.health() + token = await client.login_start("alice", opaque_request) + await client.close() + + Example (FFI):: + + client = QpqClient.connect_ffi(ConnectOptions(addr="127.0.0.1:5001")) + client.ffi_login("alice", "password123") + client.ffi_send("bob", b"hello") + client.close() + """ + + def __init__( + self, + quic: Optional[QuicTransport] = None, + ffi: Optional[FfiTransport] = None, + ) -> None: + self._quic = quic + self._ffi = ffi + self._session_token: bytes = b"" + self._device_id: bytes = b"" + + # ------------------------------------------------------------------ + # Constructors + # ------------------------------------------------------------------ + + @staticmethod + async def connect(opts: ConnectOptions) -> "QpqClient": + """Connect to a server using the async QUIC transport.""" + transport = await QuicTransport.connect( + opts.addr, + ca_cert_path=opts.ca_cert_path, + server_name=opts.server_name, + insecure_skip_verify=opts.insecure_skip_verify, + connect_timeout_ms=opts.connect_timeout_ms, + request_timeout_ms=opts.request_timeout_ms, + ) + return QpqClient(quic=transport) + + @staticmethod + def connect_ffi(opts: ConnectOptions) -> "QpqClient": + """Connect using the synchronous Rust FFI backend.""" + transport = FfiTransport.connect( + opts.addr, + ca_cert_path=opts.ca_cert_path, + server_name=opts.server_name, + ) + return QpqClient(ffi=transport) + + # ------------------------------------------------------------------ + # Session management + # ------------------------------------------------------------------ + + def set_session_token(self, token: bytes) -> None: + """Set an externally-obtained session token for authenticated RPCs.""" + self._session_token = token + + def set_device_id(self, device_id: bytes) -> None: + """Set the device ID for multi-device scoping.""" + self._device_id = device_id + + # ------------------------------------------------------------------ + # Async RPC methods (QUIC transport) + # ------------------------------------------------------------------ + + async def health(self) -> HealthInfo: + """Check server health (async).""" + data = await self._rpc(wire.HEALTH, proto.encode_health()) + info = proto.decode_health_response(data) + return HealthInfo( + status=str(info["status"]), + node_id=str(info["node_id"]), + version=str(info["version"]), + uptime_secs=int(info["uptime_secs"]), + storage_backend=str(info["storage_backend"]), + ) + + async def register_start(self, username: str, request: bytes) -> bytes: + """Start OPAQUE registration. Returns server response bytes.""" + payload = proto.encode_opaque_register_start(username, request) + data = await self._rpc(wire.OPAQUE_REGISTER_START, payload) + return proto.decode_opaque_register_start_response(data) + + async def register_finish( + self, username: str, upload: bytes, identity_key: bytes + ) -> bool: + """Complete OPAQUE registration.""" + payload = proto.encode_opaque_register_finish(username, upload, identity_key) + data = await self._rpc(wire.OPAQUE_REGISTER_FINISH, payload) + return proto.decode_opaque_register_finish_response(data) + + async def login_start(self, username: str, request: bytes) -> bytes: + """Start OPAQUE login. Returns server response bytes.""" + payload = proto.encode_opaque_login_start(username, request) + data = await self._rpc(wire.OPAQUE_LOGIN_START, payload) + return proto.decode_opaque_login_start_response(data) + + async def login_finish( + self, username: str, finalization: bytes, identity_key: bytes + ) -> bytes: + """Complete OPAQUE login. Returns and stores session token.""" + payload = proto.encode_opaque_login_finish(username, finalization, identity_key) + data = await self._rpc(wire.OPAQUE_LOGIN_FINISH, payload) + token = proto.decode_opaque_login_finish_response(data) + self._session_token = token + return token + + async def resolve_user(self, username: str) -> tuple[bytes, bytes]: + """Resolve username to (identity_key, inclusion_proof).""" + payload = proto.encode_resolve_user(username) + data = await self._rpc(wire.RESOLVE_USER, payload) + return proto.decode_resolve_user_response(data) + + async def resolve_identity(self, identity_key: bytes) -> str: + """Resolve identity key to username.""" + payload = proto.encode_resolve_identity(identity_key) + data = await self._rpc(wire.RESOLVE_IDENTITY, payload) + return proto.decode_resolve_identity_response(data) + + async def create_channel(self, peer_key: bytes) -> ChannelResult: + """Create a 1:1 DM channel with a peer.""" + payload = proto.encode_create_channel(peer_key) + data = await self._rpc(wire.CREATE_CHANNEL, payload) + channel_id, was_new = proto.decode_create_channel_response(data) + return ChannelResult(channel_id=channel_id, was_new=was_new) + + async def send( + self, + recipient_key: bytes, + payload: bytes, + *, + channel_id: bytes = b"", + ttl_secs: int = 0, + message_id: bytes = b"", + ) -> tuple[int, bytes]: + """Enqueue a message. Returns (seq, delivery_proof).""" + req = proto.encode_enqueue(recipient_key, payload, channel_id, ttl_secs, message_id) + data = await self._rpc(wire.ENQUEUE, req) + seq, proof, _ = proto.decode_enqueue_response(data) + return seq, proof + + async def receive( + self, + recipient_key: bytes, + *, + channel_id: bytes = b"", + limit: int = 0, + device_id: bytes = b"", + ) -> list[Envelope]: + """Fetch queued messages.""" + req = proto.encode_fetch(recipient_key, channel_id, limit, device_id) + data = await self._rpc(wire.FETCH, req) + return [Envelope(seq=s, data=d) for s, d in proto.decode_fetch_response(data)] + + async def receive_wait( + self, + recipient_key: bytes, + *, + timeout_ms: int = 5000, + channel_id: bytes = b"", + limit: int = 0, + device_id: bytes = b"", + ) -> list[Envelope]: + """Long-poll for messages with a timeout.""" + req = proto.encode_fetch_wait(recipient_key, channel_id, timeout_ms, limit, device_id) + data = await self._rpc(wire.FETCH_WAIT, req) + return [Envelope(seq=s, data=d) for s, d in proto.decode_fetch_wait_response(data)] + + async def ack( + self, + recipient_key: bytes, + seq_up_to: int, + *, + channel_id: bytes = b"", + device_id: bytes = b"", + ) -> None: + """Acknowledge messages up to a sequence number.""" + req = proto.encode_ack(recipient_key, seq_up_to, channel_id, device_id) + await self._rpc(wire.ACK, req) + + async def upload_key_package(self, identity_key: bytes, package: bytes) -> bytes: + """Upload an MLS key package. Returns fingerprint.""" + req = proto.encode_upload_key_package(identity_key, package) + data = await self._rpc(wire.UPLOAD_KEY_PACKAGE, req) + return proto.decode_upload_key_package_response(data) + + async def fetch_key_package(self, identity_key: bytes) -> bytes: + """Fetch an MLS key package.""" + req = proto.encode_fetch_key_package(identity_key) + data = await self._rpc(wire.FETCH_KEY_PACKAGE, req) + return proto.decode_fetch_key_package_response(data) + + async def upload_hybrid_key(self, identity_key: bytes, hybrid_public_key: bytes) -> None: + """Upload a hybrid (X25519 + ML-KEM-768) public key.""" + req = proto.encode_upload_hybrid_key(identity_key, hybrid_public_key) + await self._rpc(wire.UPLOAD_HYBRID_KEY, req) + + async def fetch_hybrid_key(self, identity_key: bytes) -> bytes: + """Fetch a hybrid public key.""" + req = proto.encode_fetch_hybrid_key(identity_key) + data = await self._rpc(wire.FETCH_HYBRID_KEY, req) + return proto.decode_fetch_hybrid_key_response(data) + + async def delete_account(self) -> bool: + """Permanently delete the authenticated account.""" + data = await self._rpc(wire.DELETE_ACCOUNT, proto.encode_delete_account()) + return proto.decode_delete_account_response(data) + + # ------------------------------------------------------------------ + # FFI (synchronous) methods + # ------------------------------------------------------------------ + + def ffi_login(self, username: str, password: str) -> None: + """Authenticate via OPAQUE using the FFI backend (synchronous).""" + if not self._ffi: + raise ConnectionError("no FFI transport; use QpqClient.connect_ffi()") + self._ffi.login(username, password) + + def ffi_send(self, recipient: str, message: bytes) -> None: + """Send a message via the FFI backend (synchronous).""" + if not self._ffi: + raise ConnectionError("no FFI transport; use QpqClient.connect_ffi()") + self._ffi.send(recipient, message) + + def ffi_receive(self, timeout_ms: int = 5000) -> list[str]: + """Receive messages via the FFI backend (synchronous).""" + if not self._ffi: + raise ConnectionError("no FFI transport; use QpqClient.connect_ffi()") + return self._ffi.receive(timeout_ms) + + # ------------------------------------------------------------------ + # Lifecycle + # ------------------------------------------------------------------ + + async def close(self) -> None: + """Close all transports.""" + if self._quic: + self._quic.close() + self._quic = None + if self._ffi: + self._ffi.close() + self._ffi = None + + def close_sync(self) -> None: + """Close all transports (synchronous variant).""" + if self._quic: + self._quic.close() + self._quic = None + if self._ffi: + self._ffi.close() + self._ffi = None + + # ------------------------------------------------------------------ + # Internal + # ------------------------------------------------------------------ + + async def _rpc(self, method_id: int, payload: bytes) -> bytes: + if not self._quic: + raise ConnectionError("no QUIC transport; use QpqClient.connect()") + return await self._quic.rpc(method_id, payload) diff --git a/sdks/python/quicproquo/ffi.py b/sdks/python/quicproquo/ffi.py new file mode 100644 index 0000000..1283f28 --- /dev/null +++ b/sdks/python/quicproquo/ffi.py @@ -0,0 +1,192 @@ +"""CFFI bindings to ``libquicproquo_ffi`` (the Rust C FFI layer). + +This module loads the shared library and exposes a synchronous Python API +that mirrors the C functions in ``crates/quicproquo-ffi/src/lib.rs``. +""" + +from __future__ import annotations + +import json +import os +from pathlib import Path +from typing import Optional + +import cffi + +from quicproquo.types import ( + QpqError, + AuthError, + TimeoutError, + ConnectionError, +) + +# Status codes (must match crates/quicproquo-ffi/src/lib.rs). +QPQ_OK = 0 +QPQ_ERROR = 1 +QPQ_AUTH_FAILED = 2 +QPQ_TIMEOUT = 3 +QPQ_NOT_CONNECTED = 4 + +_CDEFS = """ + typedef struct QpqHandle QpqHandle; + + QpqHandle* qpq_connect(const char* server, const char* ca_cert, const char* server_name); + int qpq_login(QpqHandle* handle, const char* username, const char* password); + int qpq_send(QpqHandle* handle, const char* recipient, const uint8_t* message, size_t message_len); + int qpq_receive(QpqHandle* handle, uint32_t timeout_ms, char** out_json); + void qpq_disconnect(QpqHandle* handle); + const char* qpq_last_error(const QpqHandle* handle); + void qpq_free_string(char* ptr); +""" + +_ffi = cffi.FFI() +_ffi.cdef(_CDEFS) +_lib: Optional[object] = None + + +def _load_lib() -> object: + """Load the shared library, searching common paths.""" + global _lib + if _lib is not None: + return _lib + + search_paths = [ + # Explicit environment variable. + os.environ.get("QPQ_LIB_PATH", ""), + # Common cargo build output locations. + str(Path(__file__).resolve().parents[3] / "target" / "release" / "libquicproquo_ffi.so"), + str(Path(__file__).resolve().parents[3] / "target" / "debug" / "libquicproquo_ffi.so"), + # macOS dylib. + str( + Path(__file__).resolve().parents[3] + / "target" + / "release" + / "libquicproquo_ffi.dylib" + ), + str( + Path(__file__).resolve().parents[3] + / "target" + / "debug" + / "libquicproquo_ffi.dylib" + ), + # System library path. + "libquicproquo_ffi.so", + ] + + for path in search_paths: + if not path: + continue + try: + _lib = _ffi.dlopen(path) + return _lib + except OSError: + continue + + raise OSError( + "Could not find libquicproquo_ffi. Set QPQ_LIB_PATH or build with " + "`cargo build --release -p quicproquo-ffi`." + ) + + +def _check_error(handle: object, code: int) -> None: + """Raise an appropriate exception if the FFI call returned an error code.""" + if code == QPQ_OK: + return + + lib = _load_lib() + err_ptr = lib.qpq_last_error(handle) # type: ignore[union-attr] + msg = _ffi.string(err_ptr).decode("utf-8") if err_ptr != _ffi.NULL else "unknown error" + + if code == QPQ_AUTH_FAILED: + raise AuthError(msg) + if code == QPQ_TIMEOUT: + raise TimeoutError(msg) + if code == QPQ_NOT_CONNECTED: + raise ConnectionError(msg) + raise QpqError(msg) + + +class FfiTransport: + """Synchronous transport wrapping ``libquicproquo_ffi``. + + Provides the same logical operations as ``QuicTransport`` but backed + by the Rust client library through C FFI. + + Usage:: + + ffi = FfiTransport.connect("127.0.0.1:5001", ca_cert="/path/to/ca.pem") + ffi.login("alice", "password123") + ffi.send("bob", b"hello") + messages = ffi.receive(timeout_ms=5000) + ffi.close() + """ + + def __init__(self, handle: object) -> None: + self._handle = handle + self._lib = _load_lib() + + @staticmethod + def connect( + addr: str, + *, + ca_cert_path: str = "", + server_name: str = "", + ) -> "FfiTransport": + """Connect to a qpq server via the Rust FFI layer.""" + lib = _load_lib() + + server_c = _ffi.new("char[]", addr.encode("utf-8")) + ca_c = _ffi.new("char[]", ca_cert_path.encode("utf-8")) + + if not server_name: + host = addr.split(":")[0] + server_name = host + sn_c = _ffi.new("char[]", server_name.encode("utf-8")) + + handle = lib.qpq_connect(server_c, ca_c, sn_c) # type: ignore[union-attr] + if handle == _ffi.NULL: + raise ConnectionError(f"qpq_connect failed for {addr}") + + return FfiTransport(handle) + + def login(self, username: str, password: str) -> None: + """Authenticate with OPAQUE credentials.""" + u = _ffi.new("char[]", username.encode("utf-8")) + p = _ffi.new("char[]", password.encode("utf-8")) + code = self._lib.qpq_login(self._handle, u, p) # type: ignore[union-attr] + _check_error(self._handle, code) + + def send(self, recipient: str, message: bytes) -> None: + """Send a message to a recipient (by username).""" + r = _ffi.new("char[]", recipient.encode("utf-8")) + m = _ffi.new("uint8_t[]", message) + code = self._lib.qpq_send(self._handle, r, m, len(message)) # type: ignore[union-attr] + _check_error(self._handle, code) + + def receive(self, timeout_ms: int = 5000) -> list[str]: + """Receive pending messages, blocking up to *timeout_ms*. + + Returns a list of message strings (UTF-8). + """ + out = _ffi.new("char**") + code = self._lib.qpq_receive(self._handle, timeout_ms, out) # type: ignore[union-attr] + _check_error(self._handle, code) + + if out[0] == _ffi.NULL: + return [] + + json_str = _ffi.string(out[0]).decode("utf-8") + self._lib.qpq_free_string(out[0]) # type: ignore[union-attr] + return json.loads(json_str) # type: ignore[no-any-return] + + def close(self) -> None: + """Disconnect and free the handle.""" + if self._handle is not None: + self._lib.qpq_disconnect(self._handle) # type: ignore[union-attr] + self._handle = None + + def __enter__(self) -> "FfiTransport": + return self + + def __exit__(self, *args: object) -> None: + self.close() diff --git a/sdks/python/quicproquo/proto.py b/sdks/python/quicproquo/proto.py new file mode 100644 index 0000000..b0f8480 --- /dev/null +++ b/sdks/python/quicproquo/proto.py @@ -0,0 +1,303 @@ +"""Minimal protobuf encode/decode for qpq v1 messages. + +Uses the ``protobuf`` library's descriptor-less encoding for simplicity. +Each message is represented as a plain dict and encoded/decoded via +google.protobuf helpers. + +This avoids requiring protoc at build time while still producing +wire-compatible protobuf bytes. +""" + +from __future__ import annotations + +from google.protobuf import descriptor_pb2 as _ # noqa: F401 – ensure protobuf is importable +from google.protobuf.internal.encoder import _VarintBytes # type: ignore[attr-defined] +from google.protobuf.internal.decoder import _DecodeVarint # type: ignore[attr-defined] + + +# --------------------------------------------------------------------------- +# Low-level protobuf helpers (wire types 0=varint, 2=length-delimited) +# --------------------------------------------------------------------------- + +def _encode_varint_field(field_number: int, value: int) -> bytes: + """Encode a varint (wire type 0) field.""" + if value == 0: + return b"" + tag = (field_number << 3) | 0 + return _VarintBytes(tag) + _VarintBytes(value) + + +def _encode_bytes_field(field_number: int, value: bytes) -> bytes: + """Encode a length-delimited (wire type 2) field.""" + if not value: + return b"" + tag = (field_number << 3) | 2 + return _VarintBytes(tag) + _VarintBytes(len(value)) + value + + +def _encode_string_field(field_number: int, value: str) -> bytes: + """Encode a string (wire type 2) field.""" + return _encode_bytes_field(field_number, value.encode("utf-8")) + + +def _decode_fields(data: bytes) -> dict[int, list[tuple[int, bytes | int]]]: + """Decode a protobuf message into {field_number: [(wire_type, value), ...]}.""" + fields: dict[int, list[tuple[int, bytes | int]]] = {} + pos = 0 + while pos < len(data): + tag, pos = _DecodeVarint(data, pos) + field_number = tag >> 3 + wire_type = tag & 0x07 + if wire_type == 0: # varint + value, pos = _DecodeVarint(data, pos) + fields.setdefault(field_number, []).append((wire_type, value)) + elif wire_type == 2: # length-delimited + length, pos = _DecodeVarint(data, pos) + fields.setdefault(field_number, []).append((wire_type, data[pos : pos + length])) + pos += length + elif wire_type == 5: # 32-bit fixed + fields.setdefault(field_number, []).append((wire_type, data[pos : pos + 4])) + pos += 4 + elif wire_type == 1: # 64-bit fixed + fields.setdefault(field_number, []).append((wire_type, data[pos : pos + 8])) + pos += 8 + else: + raise ValueError(f"unsupported wire type {wire_type}") + return fields + + +def _get_bytes(fields: dict[int, list[tuple[int, bytes | int]]], fn: int) -> bytes: + entries = fields.get(fn, []) + if not entries: + return b"" + _, val = entries[0] + return val if isinstance(val, bytes) else b"" + + +def _get_string(fields: dict[int, list[tuple[int, bytes | int]]], fn: int) -> str: + return _get_bytes(fields, fn).decode("utf-8", errors="replace") + + +def _get_varint(fields: dict[int, list[tuple[int, bytes | int]]], fn: int) -> int: + entries = fields.get(fn, []) + if not entries: + return 0 + _, val = entries[0] + return val if isinstance(val, int) else 0 + + +def _get_bool(fields: dict[int, list[tuple[int, bytes | int]]], fn: int) -> bool: + return _get_varint(fields, fn) != 0 + + +def _get_repeated_bytes(fields: dict[int, list[tuple[int, bytes | int]]], fn: int) -> list[bytes]: + result: list[bytes] = [] + for _, val in fields.get(fn, []): + if isinstance(val, bytes): + result.append(val) + return result + + +# --------------------------------------------------------------------------- +# Auth +# --------------------------------------------------------------------------- + +def encode_opaque_register_start(username: str, request: bytes) -> bytes: + return _encode_string_field(1, username) + _encode_bytes_field(2, request) + +def decode_opaque_register_start_response(data: bytes) -> bytes: + return _get_bytes(_decode_fields(data), 1) + +def encode_opaque_register_finish(username: str, upload: bytes, identity_key: bytes) -> bytes: + return ( + _encode_string_field(1, username) + + _encode_bytes_field(2, upload) + + _encode_bytes_field(3, identity_key) + ) + +def decode_opaque_register_finish_response(data: bytes) -> bool: + return _get_bool(_decode_fields(data), 1) + +def encode_opaque_login_start(username: str, request: bytes) -> bytes: + return _encode_string_field(1, username) + _encode_bytes_field(2, request) + +def decode_opaque_login_start_response(data: bytes) -> bytes: + return _get_bytes(_decode_fields(data), 1) + +def encode_opaque_login_finish(username: str, finalization: bytes, identity_key: bytes) -> bytes: + return ( + _encode_string_field(1, username) + + _encode_bytes_field(2, finalization) + + _encode_bytes_field(3, identity_key) + ) + +def decode_opaque_login_finish_response(data: bytes) -> bytes: + """Returns session_token.""" + return _get_bytes(_decode_fields(data), 1) + + +# --------------------------------------------------------------------------- +# Delivery +# --------------------------------------------------------------------------- + +def encode_enqueue( + recipient_key: bytes, + payload: bytes, + channel_id: bytes = b"", + ttl_secs: int = 0, + message_id: bytes = b"", +) -> bytes: + return ( + _encode_bytes_field(1, recipient_key) + + _encode_bytes_field(2, payload) + + _encode_bytes_field(3, channel_id) + + _encode_varint_field(4, ttl_secs) + + _encode_bytes_field(5, message_id) + ) + +def decode_enqueue_response(data: bytes) -> tuple[int, bytes, bool]: + """Returns (seq, delivery_proof, duplicate).""" + fields = _decode_fields(data) + return _get_varint(fields, 1), _get_bytes(fields, 2), _get_bool(fields, 3) + +def encode_fetch( + recipient_key: bytes, + channel_id: bytes = b"", + limit: int = 0, + device_id: bytes = b"", +) -> bytes: + return ( + _encode_bytes_field(1, recipient_key) + + _encode_bytes_field(2, channel_id) + + _encode_varint_field(3, limit) + + _encode_bytes_field(4, device_id) + ) + +def decode_fetch_response(data: bytes) -> list[tuple[int, bytes]]: + """Returns list of (seq, data) envelopes.""" + fields = _decode_fields(data) + envelopes: list[tuple[int, bytes]] = [] + for _, val in fields.get(1, []): + if isinstance(val, bytes): + env_fields = _decode_fields(val) + envelopes.append((_get_varint(env_fields, 1), _get_bytes(env_fields, 2))) + return envelopes + +def encode_fetch_wait( + recipient_key: bytes, + channel_id: bytes = b"", + timeout_ms: int = 5000, + limit: int = 0, + device_id: bytes = b"", +) -> bytes: + return ( + _encode_bytes_field(1, recipient_key) + + _encode_bytes_field(2, channel_id) + + _encode_varint_field(3, timeout_ms) + + _encode_varint_field(4, limit) + + _encode_bytes_field(5, device_id) + ) + +# decode_fetch_wait_response = decode_fetch_response (same message shape) +decode_fetch_wait_response = decode_fetch_response + +def encode_ack( + recipient_key: bytes, + seq_up_to: int, + channel_id: bytes = b"", + device_id: bytes = b"", +) -> bytes: + return ( + _encode_bytes_field(1, recipient_key) + + _encode_bytes_field(2, channel_id) + + _encode_varint_field(3, seq_up_to) + + _encode_bytes_field(4, device_id) + ) + + +# --------------------------------------------------------------------------- +# Channel +# --------------------------------------------------------------------------- + +def encode_create_channel(peer_key: bytes) -> bytes: + return _encode_bytes_field(1, peer_key) + +def decode_create_channel_response(data: bytes) -> tuple[bytes, bool]: + """Returns (channel_id, was_new).""" + fields = _decode_fields(data) + return _get_bytes(fields, 1), _get_bool(fields, 2) + + +# --------------------------------------------------------------------------- +# User +# --------------------------------------------------------------------------- + +def encode_resolve_user(username: str) -> bytes: + return _encode_string_field(1, username) + +def decode_resolve_user_response(data: bytes) -> tuple[bytes, bytes]: + """Returns (identity_key, inclusion_proof).""" + fields = _decode_fields(data) + return _get_bytes(fields, 1), _get_bytes(fields, 2) + +def encode_resolve_identity(identity_key: bytes) -> bytes: + return _encode_bytes_field(1, identity_key) + +def decode_resolve_identity_response(data: bytes) -> str: + """Returns username.""" + return _get_string(_decode_fields(data), 1) + + +# --------------------------------------------------------------------------- +# Keys +# --------------------------------------------------------------------------- + +def encode_upload_key_package(identity_key: bytes, package: bytes) -> bytes: + return _encode_bytes_field(1, identity_key) + _encode_bytes_field(2, package) + +def decode_upload_key_package_response(data: bytes) -> bytes: + return _get_bytes(_decode_fields(data), 1) + +def encode_fetch_key_package(identity_key: bytes) -> bytes: + return _encode_bytes_field(1, identity_key) + +def decode_fetch_key_package_response(data: bytes) -> bytes: + return _get_bytes(_decode_fields(data), 1) + +def encode_upload_hybrid_key(identity_key: bytes, hybrid_public_key: bytes) -> bytes: + return _encode_bytes_field(1, identity_key) + _encode_bytes_field(2, hybrid_public_key) + +def encode_fetch_hybrid_key(identity_key: bytes) -> bytes: + return _encode_bytes_field(1, identity_key) + +def decode_fetch_hybrid_key_response(data: bytes) -> bytes: + return _get_bytes(_decode_fields(data), 1) + + +# --------------------------------------------------------------------------- +# Health +# --------------------------------------------------------------------------- + +def encode_health() -> bytes: + return b"" + +def decode_health_response(data: bytes) -> dict[str, str | int]: + fields = _decode_fields(data) + return { + "status": _get_string(fields, 1), + "node_id": _get_string(fields, 2), + "version": _get_string(fields, 3), + "uptime_secs": _get_varint(fields, 4), + "storage_backend": _get_string(fields, 5), + } + + +# --------------------------------------------------------------------------- +# Delete Account +# --------------------------------------------------------------------------- + +def encode_delete_account() -> bytes: + return b"" + +def decode_delete_account_response(data: bytes) -> bool: + return _get_bool(_decode_fields(data), 1) diff --git a/sdks/python/quicproquo/transport.py b/sdks/python/quicproquo/transport.py new file mode 100644 index 0000000..c007ec6 --- /dev/null +++ b/sdks/python/quicproquo/transport.py @@ -0,0 +1,181 @@ +"""QUIC transport using aioquic for the v2 wire format. + +Opens a QUIC connection to the qpq server and provides ``rpc()`` to send +protobuf-encoded requests over individual QUIC streams, reading back the +framed response on the same stream. + +aioquic is imported lazily so that the module can be loaded even when +aioquic is not installed (e.g. for tests that only exercise wire/proto). +""" + +from __future__ import annotations + +import asyncio +import ssl +from typing import Any + +from quicproquo.types import ConnectionError, TimeoutError +from quicproquo.wire import HEADER_SIZE, encode_frame, decode_header + + +def _make_protocol_class() -> type: + """Build the protocol class at call time so aioquic is imported lazily.""" + from aioquic.asyncio.protocol import QuicConnectionProtocol + from aioquic.quic.events import StreamDataReceived, QuicEvent + + class _QpqQuicProtocol(QuicConnectionProtocol): + """QUIC protocol handler that dispatches stream data to waiting futures.""" + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + self._stream_buffers: dict[int, bytearray] = {} + self._stream_waiters: dict[int, asyncio.Future[bytes]] = {} + + def quic_event_received(self, event: QuicEvent) -> None: + if isinstance(event, StreamDataReceived): + sid = event.stream_id + buf = self._stream_buffers.setdefault(sid, bytearray()) + buf.extend(event.data) + + if len(buf) >= HEADER_SIZE: + _, _, payload_len = decode_header(bytes(buf[:HEADER_SIZE])) + total = HEADER_SIZE + payload_len + if len(buf) >= total: + frame = bytes(buf[:total]) + del buf[:total] + waiter = self._stream_waiters.pop(sid, None) + if waiter and not waiter.done(): + waiter.set_result(frame) + + def wait_for_stream(self, stream_id: int) -> asyncio.Future[bytes]: + loop = asyncio.get_event_loop() + fut: asyncio.Future[bytes] = loop.create_future() + self._stream_waiters[stream_id] = fut + + buf = self._stream_buffers.get(stream_id, bytearray()) + if len(buf) >= HEADER_SIZE: + _, _, payload_len = decode_header(bytes(buf[:HEADER_SIZE])) + total = HEADER_SIZE + payload_len + if len(buf) >= total: + frame = bytes(buf[:total]) + del buf[:total] + if not fut.done(): + fut.set_result(frame) + + return fut + + return _QpqQuicProtocol + + +class QuicTransport: + """Async QUIC transport for the qpq v2 wire format. + + Usage:: + + transport = await QuicTransport.connect("127.0.0.1:5001") + response_bytes = await transport.rpc(method_id, request_payload) + transport.close() + """ + + def __init__( + self, + protocol: Any, + connection: Any, + request_timeout_ms: int, + ) -> None: + self._protocol = protocol + self._connection = connection + self._req_id = 0 + self._request_timeout = request_timeout_ms / 1000.0 + self._closed = False + + @staticmethod + async def connect( + addr: str, + *, + ca_cert_path: str = "", + server_name: str = "", + insecure_skip_verify: bool = False, + connect_timeout_ms: int = 5_000, + request_timeout_ms: int = 10_000, + ) -> "QuicTransport": + """Open a QUIC connection to the server.""" + from aioquic.asyncio import connect as quic_connect + from aioquic.quic.configuration import QuicConfiguration + + host, _, port_str = addr.rpartition(":") + if not host: + host = addr + port_str = "5001" + port = int(port_str) + + configuration = QuicConfiguration( + is_client=True, + alpn_protocols=["qpq"], + ) + + if insecure_skip_verify: + configuration.verify_mode = ssl.CERT_NONE + elif ca_cert_path: + configuration.load_verify_locations(ca_cert_path) + + if not server_name: + server_name = host + + proto_cls = _make_protocol_class() + + try: + async with asyncio.timeout(connect_timeout_ms / 1000.0): + connection = await quic_connect( + host, + port, + configuration=configuration, + create_protocol=proto_cls, + server_name=server_name, + ) + except (OSError, asyncio.TimeoutError) as exc: + raise ConnectionError(f"failed to connect to {addr}: {exc}") from exc + + protocol = connection._protocol # type: ignore[attr-defined] + return QuicTransport(protocol, connection, request_timeout_ms) + + async def rpc(self, method_id: int, payload: bytes) -> bytes: + """Send an RPC request and return the response payload (protobuf bytes). + + Opens a new QUIC stream for each request. + """ + if self._closed: + raise ConnectionError("transport is closed") + + self._req_id += 1 + req_id = self._req_id + + frame = encode_frame(method_id, req_id, payload) + + stream_id = self._protocol._quic.get_next_available_stream_id() + waiter = self._protocol.wait_for_stream(stream_id) + + self._protocol._quic.send_stream_data(stream_id, frame, end_stream=True) + self._protocol.transmit() + + try: + async with asyncio.timeout(self._request_timeout): + response_frame = await waiter + except asyncio.TimeoutError as exc: + raise TimeoutError( + f"RPC timeout for method {method_id} (req_id={req_id})" + ) from exc + + _, _, resp_len = decode_header(response_frame) + return response_frame[HEADER_SIZE : HEADER_SIZE + resp_len] + + @property + def closed(self) -> bool: + return self._closed + + def close(self) -> None: + """Close the QUIC connection.""" + if not self._closed: + self._closed = True + self._protocol._quic.close() + self._protocol.transmit() diff --git a/sdks/python/quicproquo/types.py b/sdks/python/quicproquo/types.py new file mode 100644 index 0000000..efc01e0 --- /dev/null +++ b/sdks/python/quicproquo/types.py @@ -0,0 +1,92 @@ +"""Data types and exceptions for the quicproquo Python SDK.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Optional + + +# --------------------------------------------------------------------------- +# Exceptions +# --------------------------------------------------------------------------- + + +class QpqError(Exception): + """Base exception for quicproquo SDK errors.""" + + +class AuthError(QpqError): + """OPAQUE authentication failed (bad credentials).""" + + +class TimeoutError(QpqError): + """An operation timed out waiting for a response.""" + + +class ConnectionError(QpqError): + """Could not connect to or communicate with the server.""" + + +# --------------------------------------------------------------------------- +# Connection options +# --------------------------------------------------------------------------- + + +@dataclass(frozen=True) +class ConnectOptions: + """Options for connecting to a quicproquo server. + + Parameters + ---------- + addr: + Server address as ``host:port``. + ca_cert_path: + Path to PEM-encoded CA certificate. Required for production. + server_name: + TLS SNI server name (defaults to the host part of *addr*). + insecure_skip_verify: + Disable TLS certificate verification (development only). + connect_timeout_ms: + Connection timeout in milliseconds (default 5000). + request_timeout_ms: + Per-request timeout in milliseconds (default 10000). + """ + + addr: str + ca_cert_path: str = "" + server_name: str = "" + insecure_skip_verify: bool = False + connect_timeout_ms: int = 5_000 + request_timeout_ms: int = 10_000 + + +# --------------------------------------------------------------------------- +# Response types +# --------------------------------------------------------------------------- + + +@dataclass(frozen=True) +class Envelope: + """A received message envelope.""" + + seq: int + data: bytes + + +@dataclass(frozen=True) +class ChannelResult: + """Result of creating or joining a channel.""" + + channel_id: bytes + was_new: bool + + +@dataclass(frozen=True) +class HealthInfo: + """Server health check response.""" + + status: str + node_id: str = "" + version: str = "" + uptime_secs: int = 0 + storage_backend: str = "" diff --git a/sdks/python/quicproquo/wire.py b/sdks/python/quicproquo/wire.py new file mode 100644 index 0000000..08c78ae --- /dev/null +++ b/sdks/python/quicproquo/wire.py @@ -0,0 +1,73 @@ +"""v2 wire format: ``[method_id:u16][req_id:u32][len:u32][protobuf]``. + +Each RPC is sent over its own QUIC stream. The response uses the same +framing on the same stream. +""" + +from __future__ import annotations + +import struct + +# Header: method_id (u16) + req_id (u32) + length (u32) = 10 bytes. +HEADER_FMT = "!HII" # network byte-order: u16 + u32 + u32 +HEADER_SIZE = struct.calcsize(HEADER_FMT) + +# Method IDs (mirrors quicproquo-proto/src/lib.rs::method_ids). +# Auth (100-103) +OPAQUE_REGISTER_START = 100 +OPAQUE_REGISTER_FINISH = 101 +OPAQUE_LOGIN_START = 102 +OPAQUE_LOGIN_FINISH = 103 + +# Delivery (200-205) +ENQUEUE = 200 +FETCH = 201 +FETCH_WAIT = 202 +PEEK = 203 +ACK = 204 +BATCH_ENQUEUE = 205 + +# Keys (300-304) +UPLOAD_KEY_PACKAGE = 300 +FETCH_KEY_PACKAGE = 301 +UPLOAD_HYBRID_KEY = 302 +FETCH_HYBRID_KEY = 303 +FETCH_HYBRID_KEYS = 304 + +# Channel (400) +CREATE_CHANNEL = 400 + +# User (500-501) +RESOLVE_USER = 500 +RESOLVE_IDENTITY = 501 + +# Blob (600-601) +UPLOAD_BLOB = 600 +DOWNLOAD_BLOB = 601 + +# Device (700-702) +REGISTER_DEVICE = 700 +LIST_DEVICES = 701 +REVOKE_DEVICE = 702 + +# P2P (800-802) +PUBLISH_ENDPOINT = 800 +RESOLVE_ENDPOINT = 801 +HEALTH = 802 + +# Delete account (950) +DELETE_ACCOUNT = 950 + + +def encode_frame(method_id: int, req_id: int, payload: bytes) -> bytes: + """Encode a wire frame: header + protobuf payload.""" + header = struct.pack(HEADER_FMT, method_id, req_id, len(payload)) + return header + payload + + +def decode_header(data: bytes) -> tuple[int, int, int]: + """Decode a wire frame header, returning (method_id, req_id, payload_len).""" + if len(data) < HEADER_SIZE: + raise ValueError(f"header too short: {len(data)} < {HEADER_SIZE}") + method_id, req_id, length = struct.unpack(HEADER_FMT, data[:HEADER_SIZE]) + return method_id, req_id, length diff --git a/sdks/python/tests/__init__.py b/sdks/python/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/sdks/python/tests/test_proto.py b/sdks/python/tests/test_proto.py new file mode 100644 index 0000000..b61a595 --- /dev/null +++ b/sdks/python/tests/test_proto.py @@ -0,0 +1,89 @@ +"""Tests for the manual protobuf encode/decode layer.""" + +from quicproquo.proto import ( + encode_health, + decode_health_response, + encode_resolve_user, + decode_resolve_user_response, + encode_enqueue, + decode_enqueue_response, + encode_create_channel, + decode_create_channel_response, + encode_opaque_login_start, + decode_opaque_login_start_response, + encode_opaque_login_finish, + decode_opaque_login_finish_response, + _encode_string_field, + _encode_bytes_field, + _encode_varint_field, +) + + +def test_health_empty(): + assert encode_health() == b"" + + +def test_health_response_decode(): + # Manually build a HealthResponse: status="ok" (field 1), version="1.0" (field 3) + data = _encode_string_field(1, "ok") + _encode_string_field(3, "1.0") + info = decode_health_response(data) + assert info["status"] == "ok" + assert info["version"] == "1.0" + assert info["uptime_secs"] == 0 + + +def test_resolve_user_roundtrip(): + encoded = encode_resolve_user("alice") + # The encoded form is a string field (field 1). + assert b"alice" in encoded + + # Build a fake response: identity_key = b"\x01\x02\x03" + resp = _encode_bytes_field(1, b"\x01\x02\x03") + key, proof = decode_resolve_user_response(resp) + assert key == b"\x01\x02\x03" + assert proof == b"" + + +def test_enqueue_roundtrip(): + encoded = encode_enqueue( + recipient_key=b"\xaa\xbb", + payload=b"hello", + ttl_secs=60, + ) + assert b"hello" in encoded + + # Build a fake response: seq=42, delivery_proof=b"\xff" + resp = _encode_varint_field(1, 42) + _encode_bytes_field(2, b"\xff") + seq, proof, dup = decode_enqueue_response(resp) + assert seq == 42 + assert proof == b"\xff" + assert dup is False + + +def test_create_channel_roundtrip(): + encoded = encode_create_channel(b"\xde\xad") + assert b"\xde\xad" in encoded + + # Build response: channel_id=b"\xca\xfe", was_new=true + resp = _encode_bytes_field(1, b"\xca\xfe") + _encode_varint_field(2, 1) + ch_id, was_new = decode_create_channel_response(resp) + assert ch_id == b"\xca\xfe" + assert was_new is True + + +def test_opaque_login_flow(): + # Encode login start + start_req = encode_opaque_login_start("bob", b"\x01\x02") + assert b"bob" in start_req + + # Decode login start response + start_resp = _encode_bytes_field(1, b"\xaa\xbb") + assert decode_opaque_login_start_response(start_resp) == b"\xaa\xbb" + + # Encode login finish + finish_req = encode_opaque_login_finish("bob", b"\x03\x04", b"\x05\x06") + assert b"bob" in finish_req + + # Decode login finish response (session_token) + finish_resp = _encode_bytes_field(1, b"\xde\xad\xbe\xef") + assert decode_opaque_login_finish_response(finish_resp) == b"\xde\xad\xbe\xef" diff --git a/sdks/python/tests/test_types.py b/sdks/python/tests/test_types.py new file mode 100644 index 0000000..4420cce --- /dev/null +++ b/sdks/python/tests/test_types.py @@ -0,0 +1,50 @@ +"""Tests for SDK data types and exceptions.""" + +from quicproquo.types import ( + ConnectOptions, + Envelope, + ChannelResult, + HealthInfo, + QpqError, + AuthError, + TimeoutError, + ConnectionError, +) + + +def test_connect_options_defaults(): + opts = ConnectOptions(addr="127.0.0.1:5001") + assert opts.addr == "127.0.0.1:5001" + assert opts.ca_cert_path == "" + assert opts.insecure_skip_verify is False + assert opts.connect_timeout_ms == 5_000 + assert opts.request_timeout_ms == 10_000 + + +def test_envelope_immutable(): + env = Envelope(seq=1, data=b"hello") + assert env.seq == 1 + assert env.data == b"hello" + + +def test_channel_result(): + cr = ChannelResult(channel_id=b"\x01", was_new=True) + assert cr.was_new is True + + +def test_health_info(): + h = HealthInfo(status="ok", version="0.1.0", uptime_secs=42) + assert h.status == "ok" + assert h.uptime_secs == 42 + + +def test_exception_hierarchy(): + assert issubclass(AuthError, QpqError) + assert issubclass(TimeoutError, QpqError) + assert issubclass(ConnectionError, QpqError) + + # Can catch all qpq errors with base class. + try: + raise AuthError("bad password") + except QpqError as e: + assert "bad password" in str(e) diff --git a/sdks/python/tests/test_wire.py b/sdks/python/tests/test_wire.py new file mode 100644 index 0000000..a2ba95d --- /dev/null +++ b/sdks/python/tests/test_wire.py @@ -0,0 +1,41 @@ +"""Tests for the v2 wire format encoder/decoder.""" + +from quicproquo.wire import ( + HEADER_SIZE, + encode_frame, + decode_header, + HEALTH, + ENQUEUE, +) + + +def test_header_size(): + assert HEADER_SIZE == 10 + + +def test_encode_decode_roundtrip(): + payload = b"\x0a\x05hello" + frame = encode_frame(HEALTH, 42, payload) + + assert len(frame) == HEADER_SIZE + len(payload) + + method_id, req_id, length = decode_header(frame) + assert method_id == HEALTH + assert req_id == 42 + assert length == len(payload) + assert frame[HEADER_SIZE:] == payload + + +def test_empty_payload(): + frame = encode_frame(ENQUEUE, 1, b"") + method_id, req_id, length = decode_header(frame) + assert method_id == ENQUEUE + assert req_id == 1 + assert length == 0 + + +def test_decode_header_too_short(): + import pytest + + with pytest.raises(ValueError, match="header too short"): + decode_header(b"\x00\x01")