diff --git a/crates/quicproquo-server/src/domain/mod.rs b/crates/quicproquo-server/src/domain/mod.rs index 8112416..dc96243 100644 --- a/crates/quicproquo-server/src/domain/mod.rs +++ b/crates/quicproquo-server/src/domain/mod.rs @@ -17,5 +17,6 @@ pub mod groups; pub mod p2p; pub mod account; pub mod moderation; +pub mod notification; pub mod rate_limit; pub mod recovery; diff --git a/crates/quicproquo-server/src/domain/notification.rs b/crates/quicproquo-server/src/domain/notification.rs new file mode 100644 index 0000000..9424676 --- /dev/null +++ b/crates/quicproquo-server/src/domain/notification.rs @@ -0,0 +1,131 @@ +//! Cross-node notification bus for message delivery fan-out. +//! +//! When a message is enqueued, the bus publishes a notification so that +//! any node running a `fetch_wait` long-poll for that recipient can +//! wake up — even if the enqueue happened on a different node. +//! +//! Two backends: +//! - `InMemoryNotificationBus`: single-node, tokio::sync::Notify (default) +//! - Redis pub/sub (feature-gated `redis-pubsub`, implemented externally) + +use std::sync::Arc; +use dashmap::DashMap; +use tokio::sync::Notify; + +// ── Trait ──────────────────────────────────────────────────────────────────── + +/// Cross-node notification bus. +/// +/// Publishers call `publish` when a message is enqueued. +/// Subscribers call `subscribe` to get a future that resolves when +/// a notification arrives for the given topic. +pub trait NotificationBus: Send + Sync { + /// Notify all waiters for `topic` that new data is available. + fn publish(&self, topic: &[u8]); + + /// Return a future that completes when `topic` receives a notification. + /// The returned `Notify` can be `.notified().await`'d. + fn subscribe(&self, topic: &[u8]) -> Arc; +} + +// ── In-memory implementation ──────────────────────────────────────────────── + +/// Single-node notification bus backed by `tokio::sync::Notify`. +/// +/// This is the default for single-node deployments. For multi-node, +/// replace with a Redis pub/sub or NATS implementation. +pub struct InMemoryNotificationBus { + waiters: DashMap, Arc>, +} + +impl InMemoryNotificationBus { + pub fn new() -> Self { + Self { + waiters: DashMap::new(), + } + } +} + +impl Default for InMemoryNotificationBus { + fn default() -> Self { + Self::new() + } +} + +impl NotificationBus for InMemoryNotificationBus { + fn publish(&self, topic: &[u8]) { + if let Some(notify) = self.waiters.get(topic) { + notify.notify_waiters(); + } + } + + fn subscribe(&self, topic: &[u8]) -> Arc { + self.waiters + .entry(topic.to_vec()) + .or_insert_with(|| Arc::new(Notify::new())) + .clone() + } +} + +/// Create the default notification bus (in-memory, single-node). +pub fn default_notification_bus() -> Arc { + Arc::new(InMemoryNotificationBus::new()) +} + +// ── Tests ─────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + use std::time::Duration; + + #[tokio::test] + async fn publish_wakes_subscriber() { + let bus = InMemoryNotificationBus::new(); + let topic = b"user:alice"; + + let notify = bus.subscribe(topic); + let notified = notify.notified(); + + // Publish from another "node" (same process in this case). + bus.publish(topic); + + // Should resolve immediately since we published. + tokio::time::timeout(Duration::from_millis(100), notified) + .await + .expect("notification should arrive"); + } + + #[tokio::test] + async fn no_publish_times_out() { + let bus = InMemoryNotificationBus::new(); + let topic = b"user:bob"; + + let notify = bus.subscribe(topic); + let notified = notify.notified(); + + let result = tokio::time::timeout(Duration::from_millis(50), notified).await; + assert!(result.is_err(), "should time out without publish"); + } + + #[tokio::test] + async fn independent_topics() { + let bus = InMemoryNotificationBus::new(); + + let notify_a = bus.subscribe(b"topic-a"); + let notified_a = notify_a.notified(); + + let notify_b = bus.subscribe(b"topic-b"); + let notified_b = notify_b.notified(); + + // Only publish to topic-a. + bus.publish(b"topic-a"); + + tokio::time::timeout(Duration::from_millis(100), notified_a) + .await + .expect("topic-a should wake"); + + let result = tokio::time::timeout(Duration::from_millis(50), notified_b).await; + assert!(result.is_err(), "topic-b should not wake"); + } +} diff --git a/crates/quicproquo-server/src/v2_handlers/mod.rs b/crates/quicproquo-server/src/v2_handlers/mod.rs index a00e788..9e1f27a 100644 --- a/crates/quicproquo-server/src/v2_handlers/mod.rs +++ b/crates/quicproquo-server/src/v2_handlers/mod.rs @@ -57,6 +57,12 @@ pub struct ServerState { pub banned_users: Arc, BanRecord>>, /// Moderation reports (append-only). pub moderation_reports: Arc>>, + /// Unique node identifier for multi-node health reporting. + pub node_id: String, + /// Process start time for uptime calculation. + pub start_time: std::time::Instant, + /// Storage backend name (e.g. "sql", "file"). + pub storage_backend: String, } /// A ban record for a user. diff --git a/crates/quicproquo-server/src/v2_handlers/p2p.rs b/crates/quicproquo-server/src/v2_handlers/p2p.rs index 5ecbc43..83cde77 100644 --- a/crates/quicproquo-server/src/v2_handlers/p2p.rs +++ b/crates/quicproquo-server/src/v2_handlers/p2p.rs @@ -106,8 +106,13 @@ pub async fn handle_health( } else { "ok" }; + let uptime = state.start_time.elapsed().as_secs(); let resp = v1::HealthResponse { status: status.into(), + node_id: state.node_id.clone(), + version: env!("CARGO_PKG_VERSION").to_string(), + uptime_secs: uptime, + storage_backend: state.storage_backend.clone(), }; HandlerResult::ok(Bytes::from(resp.encode_to_vec())) } diff --git a/proto/qpq/v1/p2p.proto b/proto/qpq/v1/p2p.proto index f686fb8..6ca4de4 100644 --- a/proto/qpq/v1/p2p.proto +++ b/proto/qpq/v1/p2p.proto @@ -23,4 +23,12 @@ message HealthRequest {} message HealthResponse { string status = 1; + // Unique node identifier for multi-node deployments. + string node_id = 2; + // Server version string. + string version = 3; + // Uptime in seconds since process start. + uint64 uptime_secs = 4; + // Storage backend type (e.g. "sql", "file", "postgres"). + string storage_backend = 5; }