Files
quicproquo/sdks/python/examples/bot.py
Christian Nennemann 49e8e066d7 feat(sdk): add Python SDK with QUIC and FFI transport backends
Implements quicproquo-py with two transport backends:
- Async QUIC transport via aioquic with v2 protobuf wire format
- Synchronous Rust FFI transport via CFFI wrapping libquicproquo_ffi

Includes manual protobuf encode/decode (no codegen), full RPC coverage
(auth, delivery, channels, users, keys, health), PyPI-ready packaging,
async echo bot and FFI demo examples, and 15 passing unit tests.
2026-03-04 20:52:02 +01:00

97 lines
2.9 KiB
Python

#!/usr/bin/env python3
"""Example: async echo bot using the quicproquo Python SDK.
Connects to a qpq server, authenticates, and echoes back any received
messages with a "[bot] " prefix.
Usage:
python bot.py --server 127.0.0.1:5001 --ca-cert ca.pem
This example uses the QUIC transport with the v2 wire format.
OPAQUE authentication requires external crypto; this demo assumes
a session token is obtained externally and set via --token.
"""
from __future__ import annotations
import argparse
import asyncio
import signal
import sys
from quicproquo import QpqClient, ConnectOptions
async def run_bot(opts: ConnectOptions, token: bytes, identity_key: bytes) -> None:
client = await QpqClient.connect(opts)
client.set_session_token(token)
print(f"Connected to {opts.addr}")
health = await client.health()
print(f"Server status: {health.status} (v{health.version})")
# Poll loop.
running = True
def on_signal() -> None:
nonlocal running
running = False
print("\nShutting down...")
loop = asyncio.get_running_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, on_signal)
while running:
try:
messages = await client.receive_wait(
identity_key, timeout_ms=5000
)
except Exception as exc:
print(f"receive error: {exc}")
await asyncio.sleep(1)
continue
for msg in messages:
text = msg.data.decode("utf-8", errors="replace")
print(f"[seq={msg.seq}] {text}")
# Echo back with prefix.
echo = f"[bot] {text}".encode("utf-8")
try:
seq, _ = await client.send(identity_key, echo)
print(f" -> echoed (seq={seq})")
except Exception as exc:
print(f" -> send error: {exc}")
await client.close()
print("Disconnected.")
def main() -> None:
parser = argparse.ArgumentParser(description="qpq echo bot")
parser.add_argument("--server", default="127.0.0.1:5001", help="server address")
parser.add_argument("--ca-cert", default="", help="CA certificate path")
parser.add_argument("--server-name", default="", help="TLS server name")
parser.add_argument("--token", required=True, help="session token (hex)")
parser.add_argument("--identity-key", required=True, help="identity key (hex)")
parser.add_argument("--insecure", action="store_true", help="skip TLS verification")
args = parser.parse_args()
opts = ConnectOptions(
addr=args.server,
ca_cert_path=args.ca_cert,
server_name=args.server_name,
insecure_skip_verify=args.insecure,
)
token = bytes.fromhex(args.token)
identity_key = bytes.fromhex(args.identity_key)
asyncio.run(run_bot(opts, token, identity_key))
if __name__ == "__main__":
main()