feat: add client auto-reconnect, heartbeat, and connection status UI

RPC layer (quicprochat-rpc):
- RpcClient now uses tokio::sync::Mutex<Connection> for safe reconnection
- Auto-reconnect with exponential backoff + jitter on retriable errors
- QUIC-level keepalive via quinn TransportConfig
- subscribe_push() returns Option<PushFrame> with None sentinel on break
- RpcError::is_retriable() classifies transient vs permanent errors
- ConnectionState enum (Connected/Reconnecting/Disconnected) with Display
- Configurable max_retries, base_delay, max_backoff, keepalive_secs

SDK layer (quicprochat-sdk):
- QpqClient wraps RpcClient in Arc for safe heartbeat task sharing
- start_heartbeat() spawns background task checking connection every 30s
- connection_state() exposes RPC-layer state to UI
- Reconnecting event added to ClientEvent enum
- disconnect() aborts heartbeat before closing connection

Client UI (quicprochat-client):
- TUI status bar shows Connected/Reconnecting.../Offline with color
- TUI handles Reconnecting event with attempt count display
- REPL event listener prints connection state changes
- REPL /status shows connection state instead of bool
- Both TUI and REPL call start_heartbeat() on startup
This commit is contained in:
2026-03-08 18:00:47 +01:00
parent 66eca065e0
commit e4c5868b31
8 changed files with 526 additions and 99 deletions

View File

@@ -1,18 +1,32 @@
//! QUIC RPC client — connect to server, send requests, receive push events.
//!
//! Supports auto-reconnect with exponential backoff, keepalive pings, and
//! push subscription recovery.
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use std::time::Duration;
use bytes::{Bytes, BytesMut};
use quinn::{Connection, Endpoint};
use tokio::sync::mpsc;
use tracing::{debug, warn};
use tokio::sync::{mpsc, Mutex};
use tracing::{debug, info, warn};
use crate::auth_handshake;
use crate::error::{RpcError, RpcStatus};
use crate::framing::{PushFrame, RequestFrame, ResponseFrame};
/// Default maximum retries for auto-reconnect (including first attempt).
pub const DEFAULT_MAX_RETRIES: u32 = 3;
/// Default base delay for exponential backoff (milliseconds).
pub const DEFAULT_BASE_DELAY_MS: u64 = 500;
/// Default maximum backoff cap (milliseconds).
pub const DEFAULT_MAX_BACKOFF_MS: u64 = 30_000;
/// Default keepalive interval (seconds).
pub const DEFAULT_KEEPALIVE_SECS: u64 = 30;
/// Configuration for the RPC client.
#[derive(Clone)]
pub struct RpcClientConfig {
/// Server address to connect to.
pub server_addr: std::net::SocketAddr,
@@ -24,19 +38,84 @@ pub struct RpcClientConfig {
pub alpn: Vec<u8>,
/// Session token to send during auth handshake.
pub session_token: Option<Vec<u8>>,
/// Max retries on connection failure (default 3).
pub max_retries: u32,
/// Base delay for backoff in milliseconds (default 500).
pub base_delay_ms: u64,
/// Maximum backoff cap in milliseconds (default 30000).
pub max_backoff_ms: u64,
/// Keepalive interval in seconds (default 30). Set to 0 to disable.
pub keepalive_secs: u64,
}
/// A QUIC RPC client connection.
impl RpcClientConfig {
/// Fill in default values for zero/unset fields.
fn with_defaults(mut self) -> Self {
if self.max_retries == 0 {
self.max_retries = DEFAULT_MAX_RETRIES;
}
if self.base_delay_ms == 0 {
self.base_delay_ms = DEFAULT_BASE_DELAY_MS;
}
if self.max_backoff_ms == 0 {
self.max_backoff_ms = DEFAULT_MAX_BACKOFF_MS;
}
if self.keepalive_secs == 0 {
self.keepalive_secs = DEFAULT_KEEPALIVE_SECS;
}
self
}
}
/// Connection state for the RPC client.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConnectionState {
/// Connected and ready to send/receive.
Connected,
/// Connection lost, attempting to reconnect.
Reconnecting { attempt: u32 },
/// Disconnected (intentional or exhausted retries).
Disconnected,
}
impl std::fmt::Display for ConnectionState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Connected => write!(f, "Connected"),
Self::Reconnecting { attempt } => write!(f, "Reconnecting (attempt {attempt})"),
Self::Disconnected => write!(f, "Disconnected"),
}
}
}
/// A QUIC RPC client connection with auto-reconnect support.
pub struct RpcClient {
connection: Connection,
connection: Mutex<Connection>,
endpoint: Endpoint,
config: RpcClientConfig,
next_request_id: AtomicU32,
state: std::sync::Mutex<ConnectionState>,
}
impl RpcClient {
/// Connect to the RPC server.
pub async fn connect(config: RpcClientConfig) -> Result<Self, RpcError> {
let config = config.with_defaults();
let (endpoint, connection) = Self::establish(&config).await?;
Ok(Self {
connection: Mutex::new(connection),
endpoint,
config,
next_request_id: AtomicU32::new(1),
state: std::sync::Mutex::new(ConnectionState::Connected),
})
}
/// Establish a new QUIC connection + optional auth handshake.
async fn establish(config: &RpcClientConfig) -> Result<(Endpoint, Connection), RpcError> {
let mut tls = (*config.tls_config).clone();
tls.alpn_protocols = vec![config.alpn];
tls.alpn_protocols = vec![config.alpn.clone()];
let quic_tls = quinn::crypto::rustls::QuicClientConfig::try_from(tls)
.map_err(|e| RpcError::Connection(format!("TLS config: {e}")))?;
@@ -46,7 +125,13 @@ impl RpcClient {
);
let mut endpoint = Endpoint::client(bind_addr)
.map_err(|e| RpcError::Connection(e.to_string()))?;
endpoint.set_default_client_config(quinn::ClientConfig::new(Arc::new(quic_tls)));
let mut quinn_config = quinn::ClientConfig::new(Arc::new(quic_tls));
// Enable QUIC-level keepalive.
let mut transport = quinn::TransportConfig::default();
transport.keep_alive_interval(Some(Duration::from_secs(config.keepalive_secs)));
quinn_config.transport_config(Arc::new(transport));
endpoint.set_default_client_config(quinn_config);
let connection = endpoint
.connect(config.server_addr, &config.server_name)
@@ -58,34 +143,115 @@ impl RpcClient {
// Perform auth handshake if a session token was provided.
if let Some(ref token) = config.session_token {
let (mut send, mut recv) = connection
.open_bi()
.await
.map_err(|e| RpcError::Connection(format!("open auth stream: {e}")))?;
auth_handshake::send_auth_init(&mut send, token).await?;
send.finish()
.map_err(|e| RpcError::Connection(format!("finish auth send: {e}")))?;
auth_handshake::recv_auth_ack(&mut recv).await?;
debug!("auth handshake complete");
Self::do_auth_handshake(&connection, token).await?;
}
Ok(Self {
connection,
next_request_id: AtomicU32::new(1),
})
Ok((endpoint, connection))
}
/// Perform the auth handshake on a connection.
async fn do_auth_handshake(connection: &Connection, token: &[u8]) -> Result<(), RpcError> {
let (mut send, mut recv) = connection
.open_bi()
.await
.map_err(|e| RpcError::Connection(format!("open auth stream: {e}")))?;
auth_handshake::send_auth_init(&mut send, token).await?;
send.finish()
.map_err(|e| RpcError::Connection(format!("finish auth send: {e}")))?;
auth_handshake::recv_auth_ack(&mut recv).await?;
debug!("auth handshake complete");
Ok(())
}
/// Attempt to reconnect to the server with exponential backoff.
/// Returns `Ok(())` on success, `Err` if all retries exhausted.
async fn reconnect(&self) -> Result<(), RpcError> {
let max = self.config.max_retries;
let base = self.config.base_delay_ms;
let cap = self.config.max_backoff_ms;
for attempt in 1..=max {
self.set_state(ConnectionState::Reconnecting { attempt });
info!(attempt, max, "attempting reconnect");
// Exponential backoff with jitter, capped.
let delay_ms = (base * 2u64.saturating_pow(attempt.saturating_sub(1))).min(cap);
let jitter_ms = rand::Rng::gen_range(&mut rand::thread_rng(), 0..=delay_ms / 2);
tokio::time::sleep(Duration::from_millis(delay_ms + jitter_ms)).await;
match self.try_connect_once().await {
Ok(new_conn) => {
// Auth handshake on the new connection.
if let Some(ref token) = self.config.session_token {
if let Err(e) = Self::do_auth_handshake(&new_conn, token).await {
warn!(attempt, "reconnect auth handshake failed: {e}");
continue;
}
}
// Swap the connection under the lock.
*self.connection.lock().await = new_conn;
self.set_state(ConnectionState::Connected);
info!("reconnected successfully");
return Ok(());
}
Err(e) => {
warn!(attempt, max, "reconnect attempt failed: {e}");
}
}
}
self.set_state(ConnectionState::Disconnected);
Err(RpcError::Connection(format!(
"reconnect failed after {max} attempts"
)))
}
/// Single connection attempt (no retry).
async fn try_connect_once(&self) -> Result<Connection, RpcError> {
let conn = self
.endpoint
.connect(self.config.server_addr, &self.config.server_name)
.map_err(|e| RpcError::Connection(e.to_string()))?
.await
.map_err(|e| RpcError::Connection(e.to_string()))?;
Ok(conn)
}
/// Send an RPC request and wait for the response.
///
/// On retriable connection errors, automatically reconnects and retries.
pub async fn call(
&self,
method_id: u16,
payload: Bytes,
) -> Result<Bytes, RpcError> {
let request_id = self.next_request_id.fetch_add(1, Ordering::Relaxed);
let conn = self.connection.lock().await.clone();
match Self::call_on(&conn, &self.next_request_id, method_id, payload.clone()).await {
Ok(resp) => Ok(resp),
Err(e) if e.is_retriable() && conn.close_reason().is_some() => {
// Connection is dead — try reconnect then retry once.
warn!("connection lost during RPC call, attempting reconnect");
drop(conn);
self.reconnect().await?;
let conn = self.connection.lock().await.clone();
Self::call_on(&conn, &self.next_request_id, method_id, payload).await
}
Err(e) => Err(e),
}
}
let (mut send, mut recv) = self
.connection
/// Inner call implementation on a specific connection.
async fn call_on(
connection: &Connection,
next_request_id: &AtomicU32,
method_id: u16,
payload: Bytes,
) -> Result<Bytes, RpcError> {
let request_id = next_request_id.fetch_add(1, Ordering::Relaxed);
let (mut send, mut recv) = connection
.open_bi()
.await
.map_err(|e| RpcError::Connection(e.to_string()))?;
@@ -142,55 +308,86 @@ impl RpcClient {
}
/// Subscribe to server-push events. Returns a receiver channel.
/// Spawns a background task that reads uni-streams.
pub fn subscribe_push(&self) -> mpsc::UnboundedReceiver<PushFrame> {
///
/// Spawns a background task that reads uni-streams. When the push stream
/// breaks (connection error, EOF), a `None` sentinel is sent so the
/// caller can detect the break and resubscribe after reconnection.
///
/// This is an async method because it needs to clone the current connection.
pub async fn subscribe_push(&self) -> mpsc::UnboundedReceiver<Option<PushFrame>> {
let (tx, rx) = mpsc::unbounded_channel();
let conn = self.connection.clone();
let conn = self.connection.lock().await.clone();
tokio::spawn(Self::push_loop(conn, tx));
rx
}
tokio::spawn(async move {
loop {
match conn.accept_uni().await {
Ok(mut recv) => {
let mut buf = BytesMut::new();
loop {
match recv.read_chunk(65536, true).await {
Ok(Some(chunk)) => buf.extend_from_slice(&chunk.bytes),
Ok(None) => break,
Err(e) => {
debug!("push stream read error: {e}");
break;
}
async fn push_loop(conn: Connection, tx: mpsc::UnboundedSender<Option<PushFrame>>) {
loop {
match conn.accept_uni().await {
Ok(mut recv) => {
let mut buf = BytesMut::new();
loop {
match recv.read_chunk(65536, true).await {
Ok(Some(chunk)) => buf.extend_from_slice(&chunk.bytes),
Ok(None) => break,
Err(e) => {
debug!("push stream read error: {e}");
break;
}
}
match PushFrame::decode(&mut buf) {
Ok(Some(frame)) => {
if tx.send(frame).is_err() {
return; // receiver dropped
}
}
Ok(None) => debug!("incomplete push frame"),
Err(e) => debug!("push decode error: {e}"),
}
}
Err(quinn::ConnectionError::ApplicationClosed(_)) => break,
Err(e) => {
warn!("accept_uni error: {e}");
break;
match PushFrame::decode(&mut buf) {
Ok(Some(frame)) => {
if tx.send(Some(frame)).is_err() {
return; // receiver dropped
}
}
Ok(None) => debug!("incomplete push frame"),
Err(e) => debug!("push decode error: {e}"),
}
}
Err(quinn::ConnectionError::ApplicationClosed(_)) => {
let _ = tx.send(None);
break;
}
Err(e) => {
warn!("accept_uni error: {e}");
let _ = tx.send(None);
break;
}
}
});
rx
}
}
/// Close the connection gracefully.
pub fn close(&self) {
self.connection.close(0u32.into(), b"bye");
self.set_state(ConnectionState::Disconnected);
if let Ok(conn) = self.connection.try_lock() {
conn.close(0u32.into(), b"bye");
}
}
/// Get the underlying QUIC connection (for advanced use).
pub fn connection(&self) -> &Connection {
&self.connection
pub async fn connection(&self) -> Connection {
self.connection.lock().await.clone()
}
/// Get the current connection state.
pub fn connection_state(&self) -> ConnectionState {
*self.state.lock().unwrap_or_else(|e| e.into_inner())
}
/// Check if the connection appears alive (no close reason set).
pub fn is_alive(&self) -> bool {
match self.connection.try_lock() {
Ok(conn) => conn.close_reason().is_none(),
Err(_) => true, // locked = likely in use = alive
}
}
fn set_state(&self, new_state: ConnectionState) {
if let Ok(mut s) = self.state.lock() {
*s = new_state;
}
}
}

View File

@@ -72,3 +72,92 @@ pub enum RpcError {
#[error("payload too large: {size} bytes (max {max})")]
PayloadTooLarge { size: usize, max: usize },
}
impl RpcError {
/// Returns `true` if this error is transient and the operation may succeed
/// on retry (e.g. connection reset, timeout, server 5xx). Returns `false`
/// for permanent failures (auth, bad request, payload limits).
pub fn is_retriable(&self) -> bool {
match self {
Self::Connection(_) | Self::Timeout | Self::StreamClosed => true,
Self::Server { status, .. } => matches!(
status,
RpcStatus::Unavailable
| RpcStatus::DeadlineExceeded
| RpcStatus::Internal
| RpcStatus::RateLimited
),
Self::Encode(_) | Self::Decode(_) | Self::PayloadTooLarge { .. } => false,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn retriable_errors() {
assert!(RpcError::Connection("reset".into()).is_retriable());
assert!(RpcError::Timeout.is_retriable());
assert!(RpcError::StreamClosed.is_retriable());
assert!(RpcError::Server {
status: RpcStatus::Unavailable,
message: String::new(),
}
.is_retriable());
assert!(RpcError::Server {
status: RpcStatus::Internal,
message: String::new(),
}
.is_retriable());
assert!(RpcError::Server {
status: RpcStatus::DeadlineExceeded,
message: String::new(),
}
.is_retriable());
assert!(RpcError::Server {
status: RpcStatus::RateLimited,
message: String::new(),
}
.is_retriable());
}
#[test]
fn non_retriable_errors() {
assert!(!RpcError::Encode("bad".into()).is_retriable());
assert!(!RpcError::Decode("bad".into()).is_retriable());
assert!(!RpcError::PayloadTooLarge { size: 100, max: 50 }.is_retriable());
assert!(!RpcError::Server {
status: RpcStatus::Unauthorized,
message: String::new(),
}
.is_retriable());
assert!(!RpcError::Server {
status: RpcStatus::BadRequest,
message: String::new(),
}
.is_retriable());
assert!(!RpcError::Server {
status: RpcStatus::Forbidden,
message: String::new(),
}
.is_retriable());
assert!(!RpcError::Server {
status: RpcStatus::NotFound,
message: String::new(),
}
.is_retriable());
}
#[test]
fn connection_state_display() {
use crate::client::ConnectionState;
assert_eq!(ConnectionState::Connected.to_string(), "Connected");
assert_eq!(ConnectionState::Disconnected.to_string(), "Disconnected");
assert_eq!(
ConnectionState::Reconnecting { attempt: 2 }.to_string(),
"Reconnecting (attempt 2)"
);
}
}