feat: add multi-node horizontal scaling foundations
NotificationBus for cross-node message delivery fan-out: - NotificationBus trait: publish(topic) + subscribe(topic) -> Notify - InMemoryNotificationBus: single-node default via tokio::sync::Notify - Designed for Redis pub/sub or NATS replacement in multi-node deploys - 3 async tests: publish wakes, timeout without publish, independent topics Health endpoint enhancements for load balancer awareness: - HealthResponse proto: add node_id, version, uptime_secs, storage_backend - ServerState: add node_id, start_time, storage_backend fields - Health handler returns full node identity for multi-node monitoring
This commit is contained in:
@@ -17,5 +17,6 @@ pub mod groups;
|
||||
pub mod p2p;
|
||||
pub mod account;
|
||||
pub mod moderation;
|
||||
pub mod notification;
|
||||
pub mod rate_limit;
|
||||
pub mod recovery;
|
||||
|
||||
131
crates/quicproquo-server/src/domain/notification.rs
Normal file
131
crates/quicproquo-server/src/domain/notification.rs
Normal file
@@ -0,0 +1,131 @@
|
||||
//! Cross-node notification bus for message delivery fan-out.
|
||||
//!
|
||||
//! When a message is enqueued, the bus publishes a notification so that
|
||||
//! any node running a `fetch_wait` long-poll for that recipient can
|
||||
//! wake up — even if the enqueue happened on a different node.
|
||||
//!
|
||||
//! Two backends:
|
||||
//! - `InMemoryNotificationBus`: single-node, tokio::sync::Notify (default)
|
||||
//! - Redis pub/sub (feature-gated `redis-pubsub`, implemented externally)
|
||||
|
||||
use std::sync::Arc;
|
||||
use dashmap::DashMap;
|
||||
use tokio::sync::Notify;
|
||||
|
||||
// ── Trait ────────────────────────────────────────────────────────────────────
|
||||
|
||||
/// Cross-node notification bus.
|
||||
///
|
||||
/// Publishers call `publish` when a message is enqueued.
|
||||
/// Subscribers call `subscribe` to get a future that resolves when
|
||||
/// a notification arrives for the given topic.
|
||||
pub trait NotificationBus: Send + Sync {
|
||||
/// Notify all waiters for `topic` that new data is available.
|
||||
fn publish(&self, topic: &[u8]);
|
||||
|
||||
/// Return a future that completes when `topic` receives a notification.
|
||||
/// The returned `Notify` can be `.notified().await`'d.
|
||||
fn subscribe(&self, topic: &[u8]) -> Arc<Notify>;
|
||||
}
|
||||
|
||||
// ── In-memory implementation ────────────────────────────────────────────────
|
||||
|
||||
/// Single-node notification bus backed by `tokio::sync::Notify`.
|
||||
///
|
||||
/// This is the default for single-node deployments. For multi-node,
|
||||
/// replace with a Redis pub/sub or NATS implementation.
|
||||
pub struct InMemoryNotificationBus {
|
||||
waiters: DashMap<Vec<u8>, Arc<Notify>>,
|
||||
}
|
||||
|
||||
impl InMemoryNotificationBus {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
waiters: DashMap::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for InMemoryNotificationBus {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl NotificationBus for InMemoryNotificationBus {
|
||||
fn publish(&self, topic: &[u8]) {
|
||||
if let Some(notify) = self.waiters.get(topic) {
|
||||
notify.notify_waiters();
|
||||
}
|
||||
}
|
||||
|
||||
fn subscribe(&self, topic: &[u8]) -> Arc<Notify> {
|
||||
self.waiters
|
||||
.entry(topic.to_vec())
|
||||
.or_insert_with(|| Arc::new(Notify::new()))
|
||||
.clone()
|
||||
}
|
||||
}
|
||||
|
||||
/// Create the default notification bus (in-memory, single-node).
|
||||
pub fn default_notification_bus() -> Arc<dyn NotificationBus> {
|
||||
Arc::new(InMemoryNotificationBus::new())
|
||||
}
|
||||
|
||||
// ── Tests ───────────────────────────────────────────────────────────────────
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::time::Duration;
|
||||
|
||||
#[tokio::test]
|
||||
async fn publish_wakes_subscriber() {
|
||||
let bus = InMemoryNotificationBus::new();
|
||||
let topic = b"user:alice";
|
||||
|
||||
let notify = bus.subscribe(topic);
|
||||
let notified = notify.notified();
|
||||
|
||||
// Publish from another "node" (same process in this case).
|
||||
bus.publish(topic);
|
||||
|
||||
// Should resolve immediately since we published.
|
||||
tokio::time::timeout(Duration::from_millis(100), notified)
|
||||
.await
|
||||
.expect("notification should arrive");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn no_publish_times_out() {
|
||||
let bus = InMemoryNotificationBus::new();
|
||||
let topic = b"user:bob";
|
||||
|
||||
let notify = bus.subscribe(topic);
|
||||
let notified = notify.notified();
|
||||
|
||||
let result = tokio::time::timeout(Duration::from_millis(50), notified).await;
|
||||
assert!(result.is_err(), "should time out without publish");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn independent_topics() {
|
||||
let bus = InMemoryNotificationBus::new();
|
||||
|
||||
let notify_a = bus.subscribe(b"topic-a");
|
||||
let notified_a = notify_a.notified();
|
||||
|
||||
let notify_b = bus.subscribe(b"topic-b");
|
||||
let notified_b = notify_b.notified();
|
||||
|
||||
// Only publish to topic-a.
|
||||
bus.publish(b"topic-a");
|
||||
|
||||
tokio::time::timeout(Duration::from_millis(100), notified_a)
|
||||
.await
|
||||
.expect("topic-a should wake");
|
||||
|
||||
let result = tokio::time::timeout(Duration::from_millis(50), notified_b).await;
|
||||
assert!(result.is_err(), "topic-b should not wake");
|
||||
}
|
||||
}
|
||||
@@ -57,6 +57,12 @@ pub struct ServerState {
|
||||
pub banned_users: Arc<DashMap<Vec<u8>, BanRecord>>,
|
||||
/// Moderation reports (append-only).
|
||||
pub moderation_reports: Arc<std::sync::Mutex<Vec<ModerationReport>>>,
|
||||
/// Unique node identifier for multi-node health reporting.
|
||||
pub node_id: String,
|
||||
/// Process start time for uptime calculation.
|
||||
pub start_time: std::time::Instant,
|
||||
/// Storage backend name (e.g. "sql", "file").
|
||||
pub storage_backend: String,
|
||||
}
|
||||
|
||||
/// A ban record for a user.
|
||||
|
||||
@@ -106,8 +106,13 @@ pub async fn handle_health(
|
||||
} else {
|
||||
"ok"
|
||||
};
|
||||
let uptime = state.start_time.elapsed().as_secs();
|
||||
let resp = v1::HealthResponse {
|
||||
status: status.into(),
|
||||
node_id: state.node_id.clone(),
|
||||
version: env!("CARGO_PKG_VERSION").to_string(),
|
||||
uptime_secs: uptime,
|
||||
storage_backend: state.storage_backend.clone(),
|
||||
};
|
||||
HandlerResult::ok(Bytes::from(resp.encode_to_vec()))
|
||||
}
|
||||
|
||||
@@ -23,4 +23,12 @@ message HealthRequest {}
|
||||
|
||||
message HealthResponse {
|
||||
string status = 1;
|
||||
// Unique node identifier for multi-node deployments.
|
||||
string node_id = 2;
|
||||
// Server version string.
|
||||
string version = 3;
|
||||
// Uptime in seconds since process start.
|
||||
uint64 uptime_secs = 4;
|
||||
// Storage backend type (e.g. "sql", "file", "postgres").
|
||||
string storage_backend = 5;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user