feat(sdk): add Python SDK with QUIC and FFI transport backends

Implements quicproquo-py with two transport backends:
- Async QUIC transport via aioquic with v2 protobuf wire format
- Synchronous Rust FFI transport via CFFI wrapping libquicproquo_ffi

Includes manual protobuf encode/decode (no codegen), full RPC coverage
(auth, delivery, channels, users, keys, health), PyPI-ready packaging,
async echo bot and FFI demo examples, and 15 passing unit tests.
This commit is contained in:
2026-03-04 20:52:02 +01:00
parent f4621b3425
commit 49e8e066d7
16 changed files with 1732 additions and 0 deletions

6
sdks/python/.gitignore vendored Normal file
View File

@@ -0,0 +1,6 @@
__pycache__/
*.pyc
*.egg-info/
dist/
build/
.pytest_cache/

165
sdks/python/README.md Normal file
View File

@@ -0,0 +1,165 @@
# quicproquo Python SDK
Python client library for the [quicproquo](https://github.com/nicholasgasior/quicproquo) E2E encrypted messenger.
## Prerequisites
- Python 3.10+
- A running quicproquo server
## Installation
```sh
pip install quicproquo
```
For development:
```sh
pip install -e ".[dev]"
```
## Transport Backends
### 1. Async QUIC (pure Python)
Uses [aioquic](https://github.com/aiortc/aioquic) for native QUIC transport with the v2 protobuf wire format. No Rust dependency required.
```python
import asyncio
from quicproquo import QpqClient, ConnectOptions
async def main():
client = await QpqClient.connect(ConnectOptions(
addr="127.0.0.1:5001",
ca_cert_path="ca.pem",
))
health = await client.health()
print(f"Server: {health.status}")
# OPAQUE auth (requires external OPAQUE library for crypto)
server_resp = await client.login_start("alice", opaque_request_bytes)
# ... process server_resp with OPAQUE library ...
token = await client.login_finish("alice", finalization, identity_key)
# Resolve a user
key, proof = await client.resolve_user("bob")
# Send a message
seq, proof = await client.send(recipient_key, b"hello")
# Receive messages (long-poll)
messages = await client.receive_wait(my_key, timeout_ms=5000)
for msg in messages:
print(f"[{msg.seq}] {msg.data}")
await client.close()
asyncio.run(main())
```
### 2. Rust FFI (synchronous)
Wraps `libquicproquo_ffi` via CFFI for full Rust crypto stack (MLS, hybrid KEM, OPAQUE) at native speed.
```sh
# Build the FFI library first
cargo build --release -p quicproquo-ffi
```
```python
from quicproquo import QpqClient, ConnectOptions
client = QpqClient.connect_ffi(ConnectOptions(
addr="127.0.0.1:5001",
ca_cert_path="ca.pem",
))
client.ffi_login("alice", "password123")
client.ffi_send("bob", b"hello from Python!")
messages = client.ffi_receive(timeout_ms=5000)
for msg in messages:
print(msg)
client.close_sync()
```
## API Reference
### Connection
| Method | Transport | Description |
|---|---|---|
| `QpqClient.connect(opts)` | QUIC | Async connect to server |
| `QpqClient.connect_ffi(opts)` | FFI | Sync connect via Rust FFI |
| `client.close()` | QUIC | Async disconnect |
| `client.close_sync()` | Both | Sync disconnect |
### Authentication (QUIC)
| Method | Description |
|---|---|
| `client.register_start(username, request)` | Start OPAQUE registration |
| `client.register_finish(username, upload, identity_key)` | Complete registration |
| `client.login_start(username, request)` | Start OPAQUE login |
| `client.login_finish(username, finalization, identity_key)` | Complete login |
| `client.set_session_token(token)` | Set pre-existing session token |
### Authentication (FFI)
| Method | Description |
|---|---|
| `client.ffi_login(username, password)` | Full OPAQUE login (Rust handles crypto) |
### Messaging (QUIC)
| Method | Description |
|---|---|
| `client.health()` | Server health check |
| `client.resolve_user(username)` | Look up identity key |
| `client.resolve_identity(key)` | Reverse look up username |
| `client.create_channel(peer_key)` | Create 1:1 DM channel |
| `client.send(recipient_key, payload)` | Send a message |
| `client.receive(recipient_key)` | Fetch queued messages |
| `client.receive_wait(recipient_key, timeout_ms=5000)` | Long-poll for messages |
| `client.ack(recipient_key, seq_up_to)` | Acknowledge messages |
| `client.upload_key_package(key, package)` | Upload MLS key package |
| `client.fetch_key_package(key)` | Fetch MLS key package |
| `client.delete_account()` | Permanently delete account |
### Messaging (FFI)
| Method | Description |
|---|---|
| `client.ffi_send(recipient, message)` | Send message by username |
| `client.ffi_receive(timeout_ms=5000)` | Receive pending messages |
## Wire Format
The SDK implements the qpq v2 wire format:
```
[method_id:u16][req_id:u32][len:u32][protobuf payload]
```
Each RPC is sent over its own QUIC bidirectional stream.
## Structure
- `quicproquo/client.py` -- High-level client API
- `quicproquo/transport.py` -- QUIC transport (aioquic)
- `quicproquo/ffi.py` -- Rust FFI transport (CFFI)
- `quicproquo/proto.py` -- Protobuf encode/decode (no codegen)
- `quicproquo/wire.py` -- v2 wire format framing
- `quicproquo/types.py` -- Data types and exceptions
- `examples/bot.py` -- Async echo bot example
- `examples/ffi_demo.py` -- Synchronous FFI example
## Running Tests
```sh
pip install -e ".[dev]"
pytest
```

View File

@@ -0,0 +1,96 @@
#!/usr/bin/env python3
"""Example: async echo bot using the quicproquo Python SDK.
Connects to a qpq server, authenticates, and echoes back any received
messages with a "[bot] " prefix.
Usage:
python bot.py --server 127.0.0.1:5001 --ca-cert ca.pem
This example uses the QUIC transport with the v2 wire format.
OPAQUE authentication requires external crypto; this demo assumes
a session token is obtained externally and set via --token.
"""
from __future__ import annotations
import argparse
import asyncio
import signal
import sys
from quicproquo import QpqClient, ConnectOptions
async def run_bot(opts: ConnectOptions, token: bytes, identity_key: bytes) -> None:
client = await QpqClient.connect(opts)
client.set_session_token(token)
print(f"Connected to {opts.addr}")
health = await client.health()
print(f"Server status: {health.status} (v{health.version})")
# Poll loop.
running = True
def on_signal() -> None:
nonlocal running
running = False
print("\nShutting down...")
loop = asyncio.get_running_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, on_signal)
while running:
try:
messages = await client.receive_wait(
identity_key, timeout_ms=5000
)
except Exception as exc:
print(f"receive error: {exc}")
await asyncio.sleep(1)
continue
for msg in messages:
text = msg.data.decode("utf-8", errors="replace")
print(f"[seq={msg.seq}] {text}")
# Echo back with prefix.
echo = f"[bot] {text}".encode("utf-8")
try:
seq, _ = await client.send(identity_key, echo)
print(f" -> echoed (seq={seq})")
except Exception as exc:
print(f" -> send error: {exc}")
await client.close()
print("Disconnected.")
def main() -> None:
parser = argparse.ArgumentParser(description="qpq echo bot")
parser.add_argument("--server", default="127.0.0.1:5001", help="server address")
parser.add_argument("--ca-cert", default="", help="CA certificate path")
parser.add_argument("--server-name", default="", help="TLS server name")
parser.add_argument("--token", required=True, help="session token (hex)")
parser.add_argument("--identity-key", required=True, help="identity key (hex)")
parser.add_argument("--insecure", action="store_true", help="skip TLS verification")
args = parser.parse_args()
opts = ConnectOptions(
addr=args.server,
ca_cert_path=args.ca_cert,
server_name=args.server_name,
insecure_skip_verify=args.insecure,
)
token = bytes.fromhex(args.token)
identity_key = bytes.fromhex(args.identity_key)
asyncio.run(run_bot(opts, token, identity_key))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,60 @@
#!/usr/bin/env python3
"""Example: synchronous messaging using the Rust FFI backend.
Requires libquicproquo_ffi to be built:
cargo build --release -p quicproquo-ffi
Set QPQ_LIB_PATH if the library is not in the default search path.
Usage:
python ffi_demo.py --server 127.0.0.1:5001 \
--ca-cert ca.pem --user alice --pass secret
"""
from __future__ import annotations
import argparse
from quicproquo import QpqClient, ConnectOptions
def main() -> None:
parser = argparse.ArgumentParser(description="qpq FFI demo")
parser.add_argument("--server", default="127.0.0.1:5001")
parser.add_argument("--ca-cert", default="ca.pem")
parser.add_argument("--server-name", default="")
parser.add_argument("--user", required=True)
parser.add_argument("--pass", dest="password", required=True)
parser.add_argument("--recipient", required=True, help="recipient username")
parser.add_argument("--message", default="hello from Python SDK!")
args = parser.parse_args()
opts = ConnectOptions(
addr=args.server,
ca_cert_path=args.ca_cert,
server_name=args.server_name,
)
client = QpqClient.connect_ffi(opts)
try:
print(f"Connected to {args.server}")
client.ffi_login(args.user, args.password)
print(f"Logged in as {args.user}")
client.ffi_send(args.recipient, args.message.encode("utf-8"))
print(f"Sent message to {args.recipient}")
print("Waiting for messages (5s)...")
messages = client.ffi_receive(timeout_ms=5000)
for msg in messages:
print(f" received: {msg}")
finally:
client.close_sync()
print("Disconnected.")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,55 @@
[build-system]
requires = ["setuptools>=68.0", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "quicproquo"
version = "0.1.0"
description = "Python SDK for quicproquo E2E encrypted messenger"
readme = "README.md"
license = "MIT"
requires-python = ">=3.10"
authors = [{ name = "quicproquo contributors" }]
keywords = ["quicproquo", "e2e", "encrypted", "messaging", "quic"]
classifiers = [
"Development Status :: 3 - Alpha",
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
"Topic :: Communications :: Chat",
"Topic :: Security :: Cryptography",
"Framework :: AsyncIO",
]
dependencies = [
"aioquic>=1.0.0",
"protobuf>=5.26.0",
"cffi>=1.16.0",
]
[project.optional-dependencies]
dev = [
"pytest>=8.0",
"pytest-asyncio>=0.23",
"mypy>=1.8",
"ruff>=0.3",
]
[project.urls]
Homepage = "https://github.com/nicholasgasior/quicproquo"
Repository = "https://github.com/nicholasgasior/quicproquo"
[tool.setuptools.packages.find]
include = ["quicproquo*"]
[tool.ruff]
target-version = "py310"
line-length = 99
[tool.mypy]
python_version = "3.10"
strict = true

View File

@@ -0,0 +1,38 @@
"""quicproquo -- Python SDK for the quicproquo E2E encrypted messenger.
Two transport backends are available:
1. **FFI** (``QpqClient.connect_ffi``): wraps the Rust ``libquicproquo_ffi``
shared library via CFFI. This gives you the full Rust crypto stack
(MLS, hybrid KEM, OPAQUE) at native speed.
2. **QUIC** (``QpqClient.connect``): pure-Python QUIC transport via *aioquic*
with protobuf (v2 wire format). No Rust dependency required; crypto
operations must be supplied externally.
"""
from quicproquo.client import QpqClient
from quicproquo.types import (
ConnectOptions,
Envelope,
ChannelResult,
HealthInfo,
QpqError,
AuthError,
TimeoutError as QpqTimeoutError,
ConnectionError as QpqConnectionError,
)
__version__ = "0.1.0"
__all__ = [
"QpqClient",
"ConnectOptions",
"Envelope",
"ChannelResult",
"HealthInfo",
"QpqError",
"AuthError",
"QpqTimeoutError",
"QpqConnectionError",
]

View File

@@ -0,0 +1,291 @@
"""High-level quicproquo client.
Provides both async (QUIC transport) and sync (FFI transport) APIs for
interacting with a quicproquo server.
"""
from __future__ import annotations
from typing import Optional
from quicproquo.types import (
ConnectOptions,
Envelope,
ChannelResult,
HealthInfo,
ConnectionError,
)
from quicproquo.transport import QuicTransport
from quicproquo.ffi import FfiTransport
from quicproquo import proto, wire
class QpqClient:
"""High-level quicproquo client.
Use ``QpqClient.connect()`` for the async QUIC transport, or
``QpqClient.connect_ffi()`` for the synchronous Rust FFI backend.
Example (async)::
client = await QpqClient.connect(ConnectOptions(addr="127.0.0.1:5001"))
health = await client.health()
token = await client.login_start("alice", opaque_request)
await client.close()
Example (FFI)::
client = QpqClient.connect_ffi(ConnectOptions(addr="127.0.0.1:5001"))
client.ffi_login("alice", "password123")
client.ffi_send("bob", b"hello")
client.close()
"""
def __init__(
self,
quic: Optional[QuicTransport] = None,
ffi: Optional[FfiTransport] = None,
) -> None:
self._quic = quic
self._ffi = ffi
self._session_token: bytes = b""
self._device_id: bytes = b""
# ------------------------------------------------------------------
# Constructors
# ------------------------------------------------------------------
@staticmethod
async def connect(opts: ConnectOptions) -> "QpqClient":
"""Connect to a server using the async QUIC transport."""
transport = await QuicTransport.connect(
opts.addr,
ca_cert_path=opts.ca_cert_path,
server_name=opts.server_name,
insecure_skip_verify=opts.insecure_skip_verify,
connect_timeout_ms=opts.connect_timeout_ms,
request_timeout_ms=opts.request_timeout_ms,
)
return QpqClient(quic=transport)
@staticmethod
def connect_ffi(opts: ConnectOptions) -> "QpqClient":
"""Connect using the synchronous Rust FFI backend."""
transport = FfiTransport.connect(
opts.addr,
ca_cert_path=opts.ca_cert_path,
server_name=opts.server_name,
)
return QpqClient(ffi=transport)
# ------------------------------------------------------------------
# Session management
# ------------------------------------------------------------------
def set_session_token(self, token: bytes) -> None:
"""Set an externally-obtained session token for authenticated RPCs."""
self._session_token = token
def set_device_id(self, device_id: bytes) -> None:
"""Set the device ID for multi-device scoping."""
self._device_id = device_id
# ------------------------------------------------------------------
# Async RPC methods (QUIC transport)
# ------------------------------------------------------------------
async def health(self) -> HealthInfo:
"""Check server health (async)."""
data = await self._rpc(wire.HEALTH, proto.encode_health())
info = proto.decode_health_response(data)
return HealthInfo(
status=str(info["status"]),
node_id=str(info["node_id"]),
version=str(info["version"]),
uptime_secs=int(info["uptime_secs"]),
storage_backend=str(info["storage_backend"]),
)
async def register_start(self, username: str, request: bytes) -> bytes:
"""Start OPAQUE registration. Returns server response bytes."""
payload = proto.encode_opaque_register_start(username, request)
data = await self._rpc(wire.OPAQUE_REGISTER_START, payload)
return proto.decode_opaque_register_start_response(data)
async def register_finish(
self, username: str, upload: bytes, identity_key: bytes
) -> bool:
"""Complete OPAQUE registration."""
payload = proto.encode_opaque_register_finish(username, upload, identity_key)
data = await self._rpc(wire.OPAQUE_REGISTER_FINISH, payload)
return proto.decode_opaque_register_finish_response(data)
async def login_start(self, username: str, request: bytes) -> bytes:
"""Start OPAQUE login. Returns server response bytes."""
payload = proto.encode_opaque_login_start(username, request)
data = await self._rpc(wire.OPAQUE_LOGIN_START, payload)
return proto.decode_opaque_login_start_response(data)
async def login_finish(
self, username: str, finalization: bytes, identity_key: bytes
) -> bytes:
"""Complete OPAQUE login. Returns and stores session token."""
payload = proto.encode_opaque_login_finish(username, finalization, identity_key)
data = await self._rpc(wire.OPAQUE_LOGIN_FINISH, payload)
token = proto.decode_opaque_login_finish_response(data)
self._session_token = token
return token
async def resolve_user(self, username: str) -> tuple[bytes, bytes]:
"""Resolve username to (identity_key, inclusion_proof)."""
payload = proto.encode_resolve_user(username)
data = await self._rpc(wire.RESOLVE_USER, payload)
return proto.decode_resolve_user_response(data)
async def resolve_identity(self, identity_key: bytes) -> str:
"""Resolve identity key to username."""
payload = proto.encode_resolve_identity(identity_key)
data = await self._rpc(wire.RESOLVE_IDENTITY, payload)
return proto.decode_resolve_identity_response(data)
async def create_channel(self, peer_key: bytes) -> ChannelResult:
"""Create a 1:1 DM channel with a peer."""
payload = proto.encode_create_channel(peer_key)
data = await self._rpc(wire.CREATE_CHANNEL, payload)
channel_id, was_new = proto.decode_create_channel_response(data)
return ChannelResult(channel_id=channel_id, was_new=was_new)
async def send(
self,
recipient_key: bytes,
payload: bytes,
*,
channel_id: bytes = b"",
ttl_secs: int = 0,
message_id: bytes = b"",
) -> tuple[int, bytes]:
"""Enqueue a message. Returns (seq, delivery_proof)."""
req = proto.encode_enqueue(recipient_key, payload, channel_id, ttl_secs, message_id)
data = await self._rpc(wire.ENQUEUE, req)
seq, proof, _ = proto.decode_enqueue_response(data)
return seq, proof
async def receive(
self,
recipient_key: bytes,
*,
channel_id: bytes = b"",
limit: int = 0,
device_id: bytes = b"",
) -> list[Envelope]:
"""Fetch queued messages."""
req = proto.encode_fetch(recipient_key, channel_id, limit, device_id)
data = await self._rpc(wire.FETCH, req)
return [Envelope(seq=s, data=d) for s, d in proto.decode_fetch_response(data)]
async def receive_wait(
self,
recipient_key: bytes,
*,
timeout_ms: int = 5000,
channel_id: bytes = b"",
limit: int = 0,
device_id: bytes = b"",
) -> list[Envelope]:
"""Long-poll for messages with a timeout."""
req = proto.encode_fetch_wait(recipient_key, channel_id, timeout_ms, limit, device_id)
data = await self._rpc(wire.FETCH_WAIT, req)
return [Envelope(seq=s, data=d) for s, d in proto.decode_fetch_wait_response(data)]
async def ack(
self,
recipient_key: bytes,
seq_up_to: int,
*,
channel_id: bytes = b"",
device_id: bytes = b"",
) -> None:
"""Acknowledge messages up to a sequence number."""
req = proto.encode_ack(recipient_key, seq_up_to, channel_id, device_id)
await self._rpc(wire.ACK, req)
async def upload_key_package(self, identity_key: bytes, package: bytes) -> bytes:
"""Upload an MLS key package. Returns fingerprint."""
req = proto.encode_upload_key_package(identity_key, package)
data = await self._rpc(wire.UPLOAD_KEY_PACKAGE, req)
return proto.decode_upload_key_package_response(data)
async def fetch_key_package(self, identity_key: bytes) -> bytes:
"""Fetch an MLS key package."""
req = proto.encode_fetch_key_package(identity_key)
data = await self._rpc(wire.FETCH_KEY_PACKAGE, req)
return proto.decode_fetch_key_package_response(data)
async def upload_hybrid_key(self, identity_key: bytes, hybrid_public_key: bytes) -> None:
"""Upload a hybrid (X25519 + ML-KEM-768) public key."""
req = proto.encode_upload_hybrid_key(identity_key, hybrid_public_key)
await self._rpc(wire.UPLOAD_HYBRID_KEY, req)
async def fetch_hybrid_key(self, identity_key: bytes) -> bytes:
"""Fetch a hybrid public key."""
req = proto.encode_fetch_hybrid_key(identity_key)
data = await self._rpc(wire.FETCH_HYBRID_KEY, req)
return proto.decode_fetch_hybrid_key_response(data)
async def delete_account(self) -> bool:
"""Permanently delete the authenticated account."""
data = await self._rpc(wire.DELETE_ACCOUNT, proto.encode_delete_account())
return proto.decode_delete_account_response(data)
# ------------------------------------------------------------------
# FFI (synchronous) methods
# ------------------------------------------------------------------
def ffi_login(self, username: str, password: str) -> None:
"""Authenticate via OPAQUE using the FFI backend (synchronous)."""
if not self._ffi:
raise ConnectionError("no FFI transport; use QpqClient.connect_ffi()")
self._ffi.login(username, password)
def ffi_send(self, recipient: str, message: bytes) -> None:
"""Send a message via the FFI backend (synchronous)."""
if not self._ffi:
raise ConnectionError("no FFI transport; use QpqClient.connect_ffi()")
self._ffi.send(recipient, message)
def ffi_receive(self, timeout_ms: int = 5000) -> list[str]:
"""Receive messages via the FFI backend (synchronous)."""
if not self._ffi:
raise ConnectionError("no FFI transport; use QpqClient.connect_ffi()")
return self._ffi.receive(timeout_ms)
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
async def close(self) -> None:
"""Close all transports."""
if self._quic:
self._quic.close()
self._quic = None
if self._ffi:
self._ffi.close()
self._ffi = None
def close_sync(self) -> None:
"""Close all transports (synchronous variant)."""
if self._quic:
self._quic.close()
self._quic = None
if self._ffi:
self._ffi.close()
self._ffi = None
# ------------------------------------------------------------------
# Internal
# ------------------------------------------------------------------
async def _rpc(self, method_id: int, payload: bytes) -> bytes:
if not self._quic:
raise ConnectionError("no QUIC transport; use QpqClient.connect()")
return await self._quic.rpc(method_id, payload)

View File

@@ -0,0 +1,192 @@
"""CFFI bindings to ``libquicproquo_ffi`` (the Rust C FFI layer).
This module loads the shared library and exposes a synchronous Python API
that mirrors the C functions in ``crates/quicproquo-ffi/src/lib.rs``.
"""
from __future__ import annotations
import json
import os
from pathlib import Path
from typing import Optional
import cffi
from quicproquo.types import (
QpqError,
AuthError,
TimeoutError,
ConnectionError,
)
# Status codes (must match crates/quicproquo-ffi/src/lib.rs).
QPQ_OK = 0
QPQ_ERROR = 1
QPQ_AUTH_FAILED = 2
QPQ_TIMEOUT = 3
QPQ_NOT_CONNECTED = 4
_CDEFS = """
typedef struct QpqHandle QpqHandle;
QpqHandle* qpq_connect(const char* server, const char* ca_cert, const char* server_name);
int qpq_login(QpqHandle* handle, const char* username, const char* password);
int qpq_send(QpqHandle* handle, const char* recipient, const uint8_t* message, size_t message_len);
int qpq_receive(QpqHandle* handle, uint32_t timeout_ms, char** out_json);
void qpq_disconnect(QpqHandle* handle);
const char* qpq_last_error(const QpqHandle* handle);
void qpq_free_string(char* ptr);
"""
_ffi = cffi.FFI()
_ffi.cdef(_CDEFS)
_lib: Optional[object] = None
def _load_lib() -> object:
"""Load the shared library, searching common paths."""
global _lib
if _lib is not None:
return _lib
search_paths = [
# Explicit environment variable.
os.environ.get("QPQ_LIB_PATH", ""),
# Common cargo build output locations.
str(Path(__file__).resolve().parents[3] / "target" / "release" / "libquicproquo_ffi.so"),
str(Path(__file__).resolve().parents[3] / "target" / "debug" / "libquicproquo_ffi.so"),
# macOS dylib.
str(
Path(__file__).resolve().parents[3]
/ "target"
/ "release"
/ "libquicproquo_ffi.dylib"
),
str(
Path(__file__).resolve().parents[3]
/ "target"
/ "debug"
/ "libquicproquo_ffi.dylib"
),
# System library path.
"libquicproquo_ffi.so",
]
for path in search_paths:
if not path:
continue
try:
_lib = _ffi.dlopen(path)
return _lib
except OSError:
continue
raise OSError(
"Could not find libquicproquo_ffi. Set QPQ_LIB_PATH or build with "
"`cargo build --release -p quicproquo-ffi`."
)
def _check_error(handle: object, code: int) -> None:
"""Raise an appropriate exception if the FFI call returned an error code."""
if code == QPQ_OK:
return
lib = _load_lib()
err_ptr = lib.qpq_last_error(handle) # type: ignore[union-attr]
msg = _ffi.string(err_ptr).decode("utf-8") if err_ptr != _ffi.NULL else "unknown error"
if code == QPQ_AUTH_FAILED:
raise AuthError(msg)
if code == QPQ_TIMEOUT:
raise TimeoutError(msg)
if code == QPQ_NOT_CONNECTED:
raise ConnectionError(msg)
raise QpqError(msg)
class FfiTransport:
"""Synchronous transport wrapping ``libquicproquo_ffi``.
Provides the same logical operations as ``QuicTransport`` but backed
by the Rust client library through C FFI.
Usage::
ffi = FfiTransport.connect("127.0.0.1:5001", ca_cert="/path/to/ca.pem")
ffi.login("alice", "password123")
ffi.send("bob", b"hello")
messages = ffi.receive(timeout_ms=5000)
ffi.close()
"""
def __init__(self, handle: object) -> None:
self._handle = handle
self._lib = _load_lib()
@staticmethod
def connect(
addr: str,
*,
ca_cert_path: str = "",
server_name: str = "",
) -> "FfiTransport":
"""Connect to a qpq server via the Rust FFI layer."""
lib = _load_lib()
server_c = _ffi.new("char[]", addr.encode("utf-8"))
ca_c = _ffi.new("char[]", ca_cert_path.encode("utf-8"))
if not server_name:
host = addr.split(":")[0]
server_name = host
sn_c = _ffi.new("char[]", server_name.encode("utf-8"))
handle = lib.qpq_connect(server_c, ca_c, sn_c) # type: ignore[union-attr]
if handle == _ffi.NULL:
raise ConnectionError(f"qpq_connect failed for {addr}")
return FfiTransport(handle)
def login(self, username: str, password: str) -> None:
"""Authenticate with OPAQUE credentials."""
u = _ffi.new("char[]", username.encode("utf-8"))
p = _ffi.new("char[]", password.encode("utf-8"))
code = self._lib.qpq_login(self._handle, u, p) # type: ignore[union-attr]
_check_error(self._handle, code)
def send(self, recipient: str, message: bytes) -> None:
"""Send a message to a recipient (by username)."""
r = _ffi.new("char[]", recipient.encode("utf-8"))
m = _ffi.new("uint8_t[]", message)
code = self._lib.qpq_send(self._handle, r, m, len(message)) # type: ignore[union-attr]
_check_error(self._handle, code)
def receive(self, timeout_ms: int = 5000) -> list[str]:
"""Receive pending messages, blocking up to *timeout_ms*.
Returns a list of message strings (UTF-8).
"""
out = _ffi.new("char**")
code = self._lib.qpq_receive(self._handle, timeout_ms, out) # type: ignore[union-attr]
_check_error(self._handle, code)
if out[0] == _ffi.NULL:
return []
json_str = _ffi.string(out[0]).decode("utf-8")
self._lib.qpq_free_string(out[0]) # type: ignore[union-attr]
return json.loads(json_str) # type: ignore[no-any-return]
def close(self) -> None:
"""Disconnect and free the handle."""
if self._handle is not None:
self._lib.qpq_disconnect(self._handle) # type: ignore[union-attr]
self._handle = None
def __enter__(self) -> "FfiTransport":
return self
def __exit__(self, *args: object) -> None:
self.close()

View File

@@ -0,0 +1,303 @@
"""Minimal protobuf encode/decode for qpq v1 messages.
Uses the ``protobuf`` library's descriptor-less encoding for simplicity.
Each message is represented as a plain dict and encoded/decoded via
google.protobuf helpers.
This avoids requiring protoc at build time while still producing
wire-compatible protobuf bytes.
"""
from __future__ import annotations
from google.protobuf import descriptor_pb2 as _ # noqa: F401 ensure protobuf is importable
from google.protobuf.internal.encoder import _VarintBytes # type: ignore[attr-defined]
from google.protobuf.internal.decoder import _DecodeVarint # type: ignore[attr-defined]
# ---------------------------------------------------------------------------
# Low-level protobuf helpers (wire types 0=varint, 2=length-delimited)
# ---------------------------------------------------------------------------
def _encode_varint_field(field_number: int, value: int) -> bytes:
"""Encode a varint (wire type 0) field."""
if value == 0:
return b""
tag = (field_number << 3) | 0
return _VarintBytes(tag) + _VarintBytes(value)
def _encode_bytes_field(field_number: int, value: bytes) -> bytes:
"""Encode a length-delimited (wire type 2) field."""
if not value:
return b""
tag = (field_number << 3) | 2
return _VarintBytes(tag) + _VarintBytes(len(value)) + value
def _encode_string_field(field_number: int, value: str) -> bytes:
"""Encode a string (wire type 2) field."""
return _encode_bytes_field(field_number, value.encode("utf-8"))
def _decode_fields(data: bytes) -> dict[int, list[tuple[int, bytes | int]]]:
"""Decode a protobuf message into {field_number: [(wire_type, value), ...]}."""
fields: dict[int, list[tuple[int, bytes | int]]] = {}
pos = 0
while pos < len(data):
tag, pos = _DecodeVarint(data, pos)
field_number = tag >> 3
wire_type = tag & 0x07
if wire_type == 0: # varint
value, pos = _DecodeVarint(data, pos)
fields.setdefault(field_number, []).append((wire_type, value))
elif wire_type == 2: # length-delimited
length, pos = _DecodeVarint(data, pos)
fields.setdefault(field_number, []).append((wire_type, data[pos : pos + length]))
pos += length
elif wire_type == 5: # 32-bit fixed
fields.setdefault(field_number, []).append((wire_type, data[pos : pos + 4]))
pos += 4
elif wire_type == 1: # 64-bit fixed
fields.setdefault(field_number, []).append((wire_type, data[pos : pos + 8]))
pos += 8
else:
raise ValueError(f"unsupported wire type {wire_type}")
return fields
def _get_bytes(fields: dict[int, list[tuple[int, bytes | int]]], fn: int) -> bytes:
entries = fields.get(fn, [])
if not entries:
return b""
_, val = entries[0]
return val if isinstance(val, bytes) else b""
def _get_string(fields: dict[int, list[tuple[int, bytes | int]]], fn: int) -> str:
return _get_bytes(fields, fn).decode("utf-8", errors="replace")
def _get_varint(fields: dict[int, list[tuple[int, bytes | int]]], fn: int) -> int:
entries = fields.get(fn, [])
if not entries:
return 0
_, val = entries[0]
return val if isinstance(val, int) else 0
def _get_bool(fields: dict[int, list[tuple[int, bytes | int]]], fn: int) -> bool:
return _get_varint(fields, fn) != 0
def _get_repeated_bytes(fields: dict[int, list[tuple[int, bytes | int]]], fn: int) -> list[bytes]:
result: list[bytes] = []
for _, val in fields.get(fn, []):
if isinstance(val, bytes):
result.append(val)
return result
# ---------------------------------------------------------------------------
# Auth
# ---------------------------------------------------------------------------
def encode_opaque_register_start(username: str, request: bytes) -> bytes:
return _encode_string_field(1, username) + _encode_bytes_field(2, request)
def decode_opaque_register_start_response(data: bytes) -> bytes:
return _get_bytes(_decode_fields(data), 1)
def encode_opaque_register_finish(username: str, upload: bytes, identity_key: bytes) -> bytes:
return (
_encode_string_field(1, username)
+ _encode_bytes_field(2, upload)
+ _encode_bytes_field(3, identity_key)
)
def decode_opaque_register_finish_response(data: bytes) -> bool:
return _get_bool(_decode_fields(data), 1)
def encode_opaque_login_start(username: str, request: bytes) -> bytes:
return _encode_string_field(1, username) + _encode_bytes_field(2, request)
def decode_opaque_login_start_response(data: bytes) -> bytes:
return _get_bytes(_decode_fields(data), 1)
def encode_opaque_login_finish(username: str, finalization: bytes, identity_key: bytes) -> bytes:
return (
_encode_string_field(1, username)
+ _encode_bytes_field(2, finalization)
+ _encode_bytes_field(3, identity_key)
)
def decode_opaque_login_finish_response(data: bytes) -> bytes:
"""Returns session_token."""
return _get_bytes(_decode_fields(data), 1)
# ---------------------------------------------------------------------------
# Delivery
# ---------------------------------------------------------------------------
def encode_enqueue(
recipient_key: bytes,
payload: bytes,
channel_id: bytes = b"",
ttl_secs: int = 0,
message_id: bytes = b"",
) -> bytes:
return (
_encode_bytes_field(1, recipient_key)
+ _encode_bytes_field(2, payload)
+ _encode_bytes_field(3, channel_id)
+ _encode_varint_field(4, ttl_secs)
+ _encode_bytes_field(5, message_id)
)
def decode_enqueue_response(data: bytes) -> tuple[int, bytes, bool]:
"""Returns (seq, delivery_proof, duplicate)."""
fields = _decode_fields(data)
return _get_varint(fields, 1), _get_bytes(fields, 2), _get_bool(fields, 3)
def encode_fetch(
recipient_key: bytes,
channel_id: bytes = b"",
limit: int = 0,
device_id: bytes = b"",
) -> bytes:
return (
_encode_bytes_field(1, recipient_key)
+ _encode_bytes_field(2, channel_id)
+ _encode_varint_field(3, limit)
+ _encode_bytes_field(4, device_id)
)
def decode_fetch_response(data: bytes) -> list[tuple[int, bytes]]:
"""Returns list of (seq, data) envelopes."""
fields = _decode_fields(data)
envelopes: list[tuple[int, bytes]] = []
for _, val in fields.get(1, []):
if isinstance(val, bytes):
env_fields = _decode_fields(val)
envelopes.append((_get_varint(env_fields, 1), _get_bytes(env_fields, 2)))
return envelopes
def encode_fetch_wait(
recipient_key: bytes,
channel_id: bytes = b"",
timeout_ms: int = 5000,
limit: int = 0,
device_id: bytes = b"",
) -> bytes:
return (
_encode_bytes_field(1, recipient_key)
+ _encode_bytes_field(2, channel_id)
+ _encode_varint_field(3, timeout_ms)
+ _encode_varint_field(4, limit)
+ _encode_bytes_field(5, device_id)
)
# decode_fetch_wait_response = decode_fetch_response (same message shape)
decode_fetch_wait_response = decode_fetch_response
def encode_ack(
recipient_key: bytes,
seq_up_to: int,
channel_id: bytes = b"",
device_id: bytes = b"",
) -> bytes:
return (
_encode_bytes_field(1, recipient_key)
+ _encode_bytes_field(2, channel_id)
+ _encode_varint_field(3, seq_up_to)
+ _encode_bytes_field(4, device_id)
)
# ---------------------------------------------------------------------------
# Channel
# ---------------------------------------------------------------------------
def encode_create_channel(peer_key: bytes) -> bytes:
return _encode_bytes_field(1, peer_key)
def decode_create_channel_response(data: bytes) -> tuple[bytes, bool]:
"""Returns (channel_id, was_new)."""
fields = _decode_fields(data)
return _get_bytes(fields, 1), _get_bool(fields, 2)
# ---------------------------------------------------------------------------
# User
# ---------------------------------------------------------------------------
def encode_resolve_user(username: str) -> bytes:
return _encode_string_field(1, username)
def decode_resolve_user_response(data: bytes) -> tuple[bytes, bytes]:
"""Returns (identity_key, inclusion_proof)."""
fields = _decode_fields(data)
return _get_bytes(fields, 1), _get_bytes(fields, 2)
def encode_resolve_identity(identity_key: bytes) -> bytes:
return _encode_bytes_field(1, identity_key)
def decode_resolve_identity_response(data: bytes) -> str:
"""Returns username."""
return _get_string(_decode_fields(data), 1)
# ---------------------------------------------------------------------------
# Keys
# ---------------------------------------------------------------------------
def encode_upload_key_package(identity_key: bytes, package: bytes) -> bytes:
return _encode_bytes_field(1, identity_key) + _encode_bytes_field(2, package)
def decode_upload_key_package_response(data: bytes) -> bytes:
return _get_bytes(_decode_fields(data), 1)
def encode_fetch_key_package(identity_key: bytes) -> bytes:
return _encode_bytes_field(1, identity_key)
def decode_fetch_key_package_response(data: bytes) -> bytes:
return _get_bytes(_decode_fields(data), 1)
def encode_upload_hybrid_key(identity_key: bytes, hybrid_public_key: bytes) -> bytes:
return _encode_bytes_field(1, identity_key) + _encode_bytes_field(2, hybrid_public_key)
def encode_fetch_hybrid_key(identity_key: bytes) -> bytes:
return _encode_bytes_field(1, identity_key)
def decode_fetch_hybrid_key_response(data: bytes) -> bytes:
return _get_bytes(_decode_fields(data), 1)
# ---------------------------------------------------------------------------
# Health
# ---------------------------------------------------------------------------
def encode_health() -> bytes:
return b""
def decode_health_response(data: bytes) -> dict[str, str | int]:
fields = _decode_fields(data)
return {
"status": _get_string(fields, 1),
"node_id": _get_string(fields, 2),
"version": _get_string(fields, 3),
"uptime_secs": _get_varint(fields, 4),
"storage_backend": _get_string(fields, 5),
}
# ---------------------------------------------------------------------------
# Delete Account
# ---------------------------------------------------------------------------
def encode_delete_account() -> bytes:
return b""
def decode_delete_account_response(data: bytes) -> bool:
return _get_bool(_decode_fields(data), 1)

View File

@@ -0,0 +1,181 @@
"""QUIC transport using aioquic for the v2 wire format.
Opens a QUIC connection to the qpq server and provides ``rpc()`` to send
protobuf-encoded requests over individual QUIC streams, reading back the
framed response on the same stream.
aioquic is imported lazily so that the module can be loaded even when
aioquic is not installed (e.g. for tests that only exercise wire/proto).
"""
from __future__ import annotations
import asyncio
import ssl
from typing import Any
from quicproquo.types import ConnectionError, TimeoutError
from quicproquo.wire import HEADER_SIZE, encode_frame, decode_header
def _make_protocol_class() -> type:
"""Build the protocol class at call time so aioquic is imported lazily."""
from aioquic.asyncio.protocol import QuicConnectionProtocol
from aioquic.quic.events import StreamDataReceived, QuicEvent
class _QpqQuicProtocol(QuicConnectionProtocol):
"""QUIC protocol handler that dispatches stream data to waiting futures."""
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self._stream_buffers: dict[int, bytearray] = {}
self._stream_waiters: dict[int, asyncio.Future[bytes]] = {}
def quic_event_received(self, event: QuicEvent) -> None:
if isinstance(event, StreamDataReceived):
sid = event.stream_id
buf = self._stream_buffers.setdefault(sid, bytearray())
buf.extend(event.data)
if len(buf) >= HEADER_SIZE:
_, _, payload_len = decode_header(bytes(buf[:HEADER_SIZE]))
total = HEADER_SIZE + payload_len
if len(buf) >= total:
frame = bytes(buf[:total])
del buf[:total]
waiter = self._stream_waiters.pop(sid, None)
if waiter and not waiter.done():
waiter.set_result(frame)
def wait_for_stream(self, stream_id: int) -> asyncio.Future[bytes]:
loop = asyncio.get_event_loop()
fut: asyncio.Future[bytes] = loop.create_future()
self._stream_waiters[stream_id] = fut
buf = self._stream_buffers.get(stream_id, bytearray())
if len(buf) >= HEADER_SIZE:
_, _, payload_len = decode_header(bytes(buf[:HEADER_SIZE]))
total = HEADER_SIZE + payload_len
if len(buf) >= total:
frame = bytes(buf[:total])
del buf[:total]
if not fut.done():
fut.set_result(frame)
return fut
return _QpqQuicProtocol
class QuicTransport:
"""Async QUIC transport for the qpq v2 wire format.
Usage::
transport = await QuicTransport.connect("127.0.0.1:5001")
response_bytes = await transport.rpc(method_id, request_payload)
transport.close()
"""
def __init__(
self,
protocol: Any,
connection: Any,
request_timeout_ms: int,
) -> None:
self._protocol = protocol
self._connection = connection
self._req_id = 0
self._request_timeout = request_timeout_ms / 1000.0
self._closed = False
@staticmethod
async def connect(
addr: str,
*,
ca_cert_path: str = "",
server_name: str = "",
insecure_skip_verify: bool = False,
connect_timeout_ms: int = 5_000,
request_timeout_ms: int = 10_000,
) -> "QuicTransport":
"""Open a QUIC connection to the server."""
from aioquic.asyncio import connect as quic_connect
from aioquic.quic.configuration import QuicConfiguration
host, _, port_str = addr.rpartition(":")
if not host:
host = addr
port_str = "5001"
port = int(port_str)
configuration = QuicConfiguration(
is_client=True,
alpn_protocols=["qpq"],
)
if insecure_skip_verify:
configuration.verify_mode = ssl.CERT_NONE
elif ca_cert_path:
configuration.load_verify_locations(ca_cert_path)
if not server_name:
server_name = host
proto_cls = _make_protocol_class()
try:
async with asyncio.timeout(connect_timeout_ms / 1000.0):
connection = await quic_connect(
host,
port,
configuration=configuration,
create_protocol=proto_cls,
server_name=server_name,
)
except (OSError, asyncio.TimeoutError) as exc:
raise ConnectionError(f"failed to connect to {addr}: {exc}") from exc
protocol = connection._protocol # type: ignore[attr-defined]
return QuicTransport(protocol, connection, request_timeout_ms)
async def rpc(self, method_id: int, payload: bytes) -> bytes:
"""Send an RPC request and return the response payload (protobuf bytes).
Opens a new QUIC stream for each request.
"""
if self._closed:
raise ConnectionError("transport is closed")
self._req_id += 1
req_id = self._req_id
frame = encode_frame(method_id, req_id, payload)
stream_id = self._protocol._quic.get_next_available_stream_id()
waiter = self._protocol.wait_for_stream(stream_id)
self._protocol._quic.send_stream_data(stream_id, frame, end_stream=True)
self._protocol.transmit()
try:
async with asyncio.timeout(self._request_timeout):
response_frame = await waiter
except asyncio.TimeoutError as exc:
raise TimeoutError(
f"RPC timeout for method {method_id} (req_id={req_id})"
) from exc
_, _, resp_len = decode_header(response_frame)
return response_frame[HEADER_SIZE : HEADER_SIZE + resp_len]
@property
def closed(self) -> bool:
return self._closed
def close(self) -> None:
"""Close the QUIC connection."""
if not self._closed:
self._closed = True
self._protocol._quic.close()
self._protocol.transmit()

View File

@@ -0,0 +1,92 @@
"""Data types and exceptions for the quicproquo Python SDK."""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Optional
# ---------------------------------------------------------------------------
# Exceptions
# ---------------------------------------------------------------------------
class QpqError(Exception):
"""Base exception for quicproquo SDK errors."""
class AuthError(QpqError):
"""OPAQUE authentication failed (bad credentials)."""
class TimeoutError(QpqError):
"""An operation timed out waiting for a response."""
class ConnectionError(QpqError):
"""Could not connect to or communicate with the server."""
# ---------------------------------------------------------------------------
# Connection options
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class ConnectOptions:
"""Options for connecting to a quicproquo server.
Parameters
----------
addr:
Server address as ``host:port``.
ca_cert_path:
Path to PEM-encoded CA certificate. Required for production.
server_name:
TLS SNI server name (defaults to the host part of *addr*).
insecure_skip_verify:
Disable TLS certificate verification (development only).
connect_timeout_ms:
Connection timeout in milliseconds (default 5000).
request_timeout_ms:
Per-request timeout in milliseconds (default 10000).
"""
addr: str
ca_cert_path: str = ""
server_name: str = ""
insecure_skip_verify: bool = False
connect_timeout_ms: int = 5_000
request_timeout_ms: int = 10_000
# ---------------------------------------------------------------------------
# Response types
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class Envelope:
"""A received message envelope."""
seq: int
data: bytes
@dataclass(frozen=True)
class ChannelResult:
"""Result of creating or joining a channel."""
channel_id: bytes
was_new: bool
@dataclass(frozen=True)
class HealthInfo:
"""Server health check response."""
status: str
node_id: str = ""
version: str = ""
uptime_secs: int = 0
storage_backend: str = ""

View File

@@ -0,0 +1,73 @@
"""v2 wire format: ``[method_id:u16][req_id:u32][len:u32][protobuf]``.
Each RPC is sent over its own QUIC stream. The response uses the same
framing on the same stream.
"""
from __future__ import annotations
import struct
# Header: method_id (u16) + req_id (u32) + length (u32) = 10 bytes.
HEADER_FMT = "!HII" # network byte-order: u16 + u32 + u32
HEADER_SIZE = struct.calcsize(HEADER_FMT)
# Method IDs (mirrors quicproquo-proto/src/lib.rs::method_ids).
# Auth (100-103)
OPAQUE_REGISTER_START = 100
OPAQUE_REGISTER_FINISH = 101
OPAQUE_LOGIN_START = 102
OPAQUE_LOGIN_FINISH = 103
# Delivery (200-205)
ENQUEUE = 200
FETCH = 201
FETCH_WAIT = 202
PEEK = 203
ACK = 204
BATCH_ENQUEUE = 205
# Keys (300-304)
UPLOAD_KEY_PACKAGE = 300
FETCH_KEY_PACKAGE = 301
UPLOAD_HYBRID_KEY = 302
FETCH_HYBRID_KEY = 303
FETCH_HYBRID_KEYS = 304
# Channel (400)
CREATE_CHANNEL = 400
# User (500-501)
RESOLVE_USER = 500
RESOLVE_IDENTITY = 501
# Blob (600-601)
UPLOAD_BLOB = 600
DOWNLOAD_BLOB = 601
# Device (700-702)
REGISTER_DEVICE = 700
LIST_DEVICES = 701
REVOKE_DEVICE = 702
# P2P (800-802)
PUBLISH_ENDPOINT = 800
RESOLVE_ENDPOINT = 801
HEALTH = 802
# Delete account (950)
DELETE_ACCOUNT = 950
def encode_frame(method_id: int, req_id: int, payload: bytes) -> bytes:
"""Encode a wire frame: header + protobuf payload."""
header = struct.pack(HEADER_FMT, method_id, req_id, len(payload))
return header + payload
def decode_header(data: bytes) -> tuple[int, int, int]:
"""Decode a wire frame header, returning (method_id, req_id, payload_len)."""
if len(data) < HEADER_SIZE:
raise ValueError(f"header too short: {len(data)} < {HEADER_SIZE}")
method_id, req_id, length = struct.unpack(HEADER_FMT, data[:HEADER_SIZE])
return method_id, req_id, length

View File

View File

@@ -0,0 +1,89 @@
"""Tests for the manual protobuf encode/decode layer."""
from quicproquo.proto import (
encode_health,
decode_health_response,
encode_resolve_user,
decode_resolve_user_response,
encode_enqueue,
decode_enqueue_response,
encode_create_channel,
decode_create_channel_response,
encode_opaque_login_start,
decode_opaque_login_start_response,
encode_opaque_login_finish,
decode_opaque_login_finish_response,
_encode_string_field,
_encode_bytes_field,
_encode_varint_field,
)
def test_health_empty():
assert encode_health() == b""
def test_health_response_decode():
# Manually build a HealthResponse: status="ok" (field 1), version="1.0" (field 3)
data = _encode_string_field(1, "ok") + _encode_string_field(3, "1.0")
info = decode_health_response(data)
assert info["status"] == "ok"
assert info["version"] == "1.0"
assert info["uptime_secs"] == 0
def test_resolve_user_roundtrip():
encoded = encode_resolve_user("alice")
# The encoded form is a string field (field 1).
assert b"alice" in encoded
# Build a fake response: identity_key = b"\x01\x02\x03"
resp = _encode_bytes_field(1, b"\x01\x02\x03")
key, proof = decode_resolve_user_response(resp)
assert key == b"\x01\x02\x03"
assert proof == b""
def test_enqueue_roundtrip():
encoded = encode_enqueue(
recipient_key=b"\xaa\xbb",
payload=b"hello",
ttl_secs=60,
)
assert b"hello" in encoded
# Build a fake response: seq=42, delivery_proof=b"\xff"
resp = _encode_varint_field(1, 42) + _encode_bytes_field(2, b"\xff")
seq, proof, dup = decode_enqueue_response(resp)
assert seq == 42
assert proof == b"\xff"
assert dup is False
def test_create_channel_roundtrip():
encoded = encode_create_channel(b"\xde\xad")
assert b"\xde\xad" in encoded
# Build response: channel_id=b"\xca\xfe", was_new=true
resp = _encode_bytes_field(1, b"\xca\xfe") + _encode_varint_field(2, 1)
ch_id, was_new = decode_create_channel_response(resp)
assert ch_id == b"\xca\xfe"
assert was_new is True
def test_opaque_login_flow():
# Encode login start
start_req = encode_opaque_login_start("bob", b"\x01\x02")
assert b"bob" in start_req
# Decode login start response
start_resp = _encode_bytes_field(1, b"\xaa\xbb")
assert decode_opaque_login_start_response(start_resp) == b"\xaa\xbb"
# Encode login finish
finish_req = encode_opaque_login_finish("bob", b"\x03\x04", b"\x05\x06")
assert b"bob" in finish_req
# Decode login finish response (session_token)
finish_resp = _encode_bytes_field(1, b"\xde\xad\xbe\xef")
assert decode_opaque_login_finish_response(finish_resp) == b"\xde\xad\xbe\xef"

View File

@@ -0,0 +1,50 @@
"""Tests for SDK data types and exceptions."""
from quicproquo.types import (
ConnectOptions,
Envelope,
ChannelResult,
HealthInfo,
QpqError,
AuthError,
TimeoutError,
ConnectionError,
)
def test_connect_options_defaults():
opts = ConnectOptions(addr="127.0.0.1:5001")
assert opts.addr == "127.0.0.1:5001"
assert opts.ca_cert_path == ""
assert opts.insecure_skip_verify is False
assert opts.connect_timeout_ms == 5_000
assert opts.request_timeout_ms == 10_000
def test_envelope_immutable():
env = Envelope(seq=1, data=b"hello")
assert env.seq == 1
assert env.data == b"hello"
def test_channel_result():
cr = ChannelResult(channel_id=b"\x01", was_new=True)
assert cr.was_new is True
def test_health_info():
h = HealthInfo(status="ok", version="0.1.0", uptime_secs=42)
assert h.status == "ok"
assert h.uptime_secs == 42
def test_exception_hierarchy():
assert issubclass(AuthError, QpqError)
assert issubclass(TimeoutError, QpqError)
assert issubclass(ConnectionError, QpqError)
# Can catch all qpq errors with base class.
try:
raise AuthError("bad password")
except QpqError as e:
assert "bad password" in str(e)

View File

@@ -0,0 +1,41 @@
"""Tests for the v2 wire format encoder/decoder."""
from quicproquo.wire import (
HEADER_SIZE,
encode_frame,
decode_header,
HEALTH,
ENQUEUE,
)
def test_header_size():
assert HEADER_SIZE == 10
def test_encode_decode_roundtrip():
payload = b"\x0a\x05hello"
frame = encode_frame(HEALTH, 42, payload)
assert len(frame) == HEADER_SIZE + len(payload)
method_id, req_id, length = decode_header(frame)
assert method_id == HEALTH
assert req_id == 42
assert length == len(payload)
assert frame[HEADER_SIZE:] == payload
def test_empty_payload():
frame = encode_frame(ENQUEUE, 1, b"")
method_id, req_id, length = decode_header(frame)
assert method_id == ENQUEUE
assert req_id == 1
assert length == 0
def test_decode_header_too_short():
import pytest
with pytest.raises(ValueError, match="header too short"):
decode_header(b"\x00\x01")