stargazer/db/
connected_nodes.rs

1//! Query functions for the `connected_nodes` table.
2//!
3//! The connected nodes table maintains a live snapshot of nodes (gateways and
4//! hotspots) currently linked to each reflector module. It is updated by Tier 2
5//! XLX monitors as they receive `"nodes"` push notifications.
6//!
7//! Unlike the append-only `activity_log`, this table uses `UPSERT` semantics:
8//! each node can appear at most once per reflector (enforced by the composite
9//! primary key `(reflector, node_callsign)`). When a monitor receives a fresh
10//! node list, it upserts each entry and clears stale nodes that are no longer
11//! present.
12//!
13//! # Staleness eviction
14//!
15//! [`clear_for_reflector`] deletes all nodes for a given reflector. This is
16//! called before upserting the fresh snapshot so that nodes that have
17//! disconnected since the last update are removed.
18
19use chrono::{DateTime, Utc};
20use sqlx::PgPool;
21
22/// A single row from the `connected_nodes` table.
23///
24/// Maps directly to the table columns via `sqlx::FromRow`.
25#[derive(Debug, sqlx::FromRow)]
26pub(crate) struct ConnectedNodeRow {
27    /// Reflector callsign (foreign key to `reflectors.callsign`).
28    pub(crate) reflector: String,
29
30    /// Node callsign (e.g. `"W1AW  B"`).
31    pub(crate) node_callsign: String,
32
33    /// Module letter on the reflector the node is linked to.
34    pub(crate) module: Option<String>,
35
36    /// Protocol used by the node (e.g. `"dextra"`, `"dplus"`).
37    pub(crate) protocol: Option<String>,
38
39    /// When the node first connected (as reported by the reflector).
40    pub(crate) connected_since: Option<DateTime<Utc>>,
41
42    /// When the node was last seen in a monitor update.
43    pub(crate) last_heard: Option<DateTime<Utc>>,
44}
45
46/// Upserts a single connected node entry.
47///
48/// Inserts a new node or updates the existing entry's `module`, `last_heard`,
49/// and `connected_since` if the node is already tracked for this reflector.
50/// The composite primary key `(reflector, node_callsign)` prevents duplicates.
51///
52/// # Errors
53///
54/// Returns `sqlx::Error` on connection or foreign-key constraint failures
55/// (the referenced reflector must exist in the `reflectors` table).
56pub(crate) async fn upsert_node(
57    pool: &PgPool,
58    reflector: &str,
59    node_callsign: &str,
60    module: Option<&str>,
61    now: DateTime<Utc>,
62) -> Result<(), sqlx::Error> {
63    let _result = sqlx::query(
64        "INSERT INTO connected_nodes (reflector, node_callsign, module, last_heard)
65         VALUES ($1, $2, $3, $4)
66         ON CONFLICT (reflector, node_callsign) DO UPDATE SET
67             module     = EXCLUDED.module,
68             last_heard = EXCLUDED.last_heard",
69    )
70    .bind(reflector)
71    .bind(node_callsign)
72    .bind(module)
73    .bind(now)
74    .execute(pool)
75    .await?;
76    Ok(())
77}
78
79/// Deletes all connected node entries for a reflector.
80///
81/// Called before upserting a fresh node snapshot so that nodes that have
82/// disconnected since the last update are removed. This simple
83/// delete-then-reinsert pattern avoids the complexity of diff-based eviction.
84///
85/// # Errors
86///
87/// Returns `sqlx::Error` on query failure.
88pub(crate) async fn clear_for_reflector(pool: &PgPool, reflector: &str) -> Result<(), sqlx::Error> {
89    let _result = sqlx::query("DELETE FROM connected_nodes WHERE reflector = $1")
90        .bind(reflector)
91        .execute(pool)
92        .await?;
93    Ok(())
94}
95
96/// Returns all nodes currently linked to the given reflector.
97///
98/// Results are ordered by `last_heard DESC` (most recently active first),
99/// with `NULLS LAST` so rows whose `last_heard` was not reported by the
100/// monitor don't dominate the top of the list. Used by the HTTP API
101/// (`GET /api/reflectors/{callsign}/nodes`) to expose the live node roster.
102///
103/// # Errors
104///
105/// Returns `sqlx::Error` on query failure.
106pub(crate) async fn get_for_reflector(
107    pool: &PgPool,
108    reflector: &str,
109) -> Result<Vec<ConnectedNodeRow>, sqlx::Error> {
110    sqlx::query_as::<_, ConnectedNodeRow>(
111        "SELECT reflector, node_callsign, module, protocol,
112                connected_since, last_heard
113         FROM connected_nodes
114         WHERE reflector = $1
115         ORDER BY last_heard DESC NULLS LAST",
116    )
117    .bind(reflector)
118    .fetch_all(pool)
119    .await
120}