stargazer/db/
reflectors.rs

1//! Query functions for the `reflectors` table.
2//!
3//! The reflectors table is the central registry populated by Tier 1 discovery
4//! sweeps. Each row represents a unique D-STAR reflector identified by its
5//! callsign (e.g., `REF001`, `XLX320`, `DCS001`).
6//!
7//! Tier 1 fetchers call [`upsert`] after each sweep to insert newly-discovered
8//! reflectors or update metadata (IP address, dashboard URL, last-seen time)
9//! for existing ones. Tier 2 monitors query [`get_active`] to decide which
10//! reflectors have recent activity and deserve live monitoring.
11
12use chrono::{DateTime, Utc};
13use sqlx::PgPool;
14
15/// A single row from the `reflectors` table.
16///
17/// Maps directly to the table columns via `sqlx::FromRow`. All optional
18/// columns use `Option<T>` so that partially-populated rows (e.g., reflectors
19/// discovered from Pi-Star host files, which lack dashboard URLs) deserialize
20/// cleanly.
21#[derive(Debug, sqlx::FromRow)]
22pub(crate) struct ReflectorRow {
23    /// Reflector callsign (primary key), e.g. `"REF001"` or `"XLX320"`.
24    pub(crate) callsign: String,
25
26    /// Protocol family: `"dplus"`, `"dextra"`, or `"dcs"`.
27    pub(crate) protocol: String,
28
29    /// Reflector IP address (Postgres `INET` maps to `String` via sqlx).
30    pub(crate) ip_address: Option<String>,
31
32    /// URL of the reflector's web dashboard, if known.
33    pub(crate) dashboard_url: Option<String>,
34
35    /// Two-letter country code, if known.
36    pub(crate) country: Option<String>,
37
38    /// Timestamp of the most recent observation from any source.
39    pub(crate) last_seen: Option<DateTime<Utc>>,
40
41    /// Whether this reflector supports the XLX UDP JSON monitor protocol
42    /// (port 10001), making it eligible for Tier 2 monitoring.
43    pub(crate) tier2_available: Option<bool>,
44
45    /// When this row was first inserted.
46    pub(crate) created_at: Option<DateTime<Utc>>,
47}
48
49/// Inserts a new reflector or updates an existing one on callsign conflict.
50///
51/// Executes an `INSERT ... ON CONFLICT (callsign) DO UPDATE` so that repeated
52/// discovery sweeps refresh metadata without requiring a separate
53/// existence check.
54///
55/// # Errors
56///
57/// Returns `sqlx::Error` on connection or constraint failures.
58pub(crate) async fn upsert(
59    pool: &PgPool,
60    callsign: &str,
61    protocol: &str,
62    ip_address: Option<&str>,
63    dashboard_url: Option<&str>,
64    country: Option<&str>,
65) -> Result<(), sqlx::Error> {
66    // ON CONFLICT UPDATE refreshes all mutable metadata columns and bumps
67    // last_seen to the current time. The callsign (PK) and created_at are
68    // left unchanged.
69    let _result = sqlx::query(
70        "INSERT INTO reflectors (callsign, protocol, ip_address, dashboard_url, country, last_seen)
71         VALUES ($1, $2, $3::INET, $4, $5, now())
72         ON CONFLICT (callsign) DO UPDATE SET
73             protocol      = EXCLUDED.protocol,
74             ip_address    = EXCLUDED.ip_address,
75             dashboard_url = EXCLUDED.dashboard_url,
76             country       = EXCLUDED.country,
77             last_seen     = now()",
78    )
79    .bind(callsign)
80    .bind(protocol)
81    .bind(ip_address)
82    .bind(dashboard_url)
83    .bind(country)
84    .execute(pool)
85    .await?;
86    Ok(())
87}
88
89/// Returns all reflectors that have been seen since the given timestamp.
90///
91/// Used by Tier 2 to select reflectors with recent activity for live
92/// monitoring. Results are ordered by `last_seen DESC` so the most recently
93/// active reflectors appear first.
94///
95/// # Errors
96///
97/// Returns `sqlx::Error` on query failure.
98pub(crate) async fn get_active(
99    pool: &PgPool,
100    since: DateTime<Utc>,
101) -> Result<Vec<ReflectorRow>, sqlx::Error> {
102    // Filters on last_seen >= $1 and orders by recency.
103    sqlx::query_as::<_, ReflectorRow>(
104        "SELECT callsign, protocol, ip_address, dashboard_url, country,
105                last_seen, tier2_available, created_at
106         FROM reflectors
107         WHERE last_seen >= $1
108         ORDER BY last_seen DESC",
109    )
110    .bind(since)
111    .fetch_all(pool)
112    .await
113}
114
115/// Returns reflectors eligible for Tier 2 monitoring.
116///
117/// Filters for reflectors where `tier2_available = true` (supports the XLX UDP
118/// JSON monitor protocol) AND `last_seen >= since` (has recent activity). The
119/// additional `ip_address IS NOT NULL` check ensures we have a usable endpoint.
120///
121/// Results are ordered by `last_seen DESC` and capped at `limit` rows so the
122/// orchestrator can respect its `max_concurrent_monitors` cap.
123///
124/// # Errors
125///
126/// Returns `sqlx::Error` on query failure.
127pub(crate) async fn get_tier2_eligible(
128    pool: &PgPool,
129    since: DateTime<Utc>,
130    limit: i64,
131) -> Result<Vec<ReflectorRow>, sqlx::Error> {
132    sqlx::query_as::<_, ReflectorRow>(
133        "SELECT callsign, protocol, ip_address, dashboard_url, country,
134                last_seen, tier2_available, created_at
135         FROM reflectors
136         WHERE tier2_available = true
137           AND last_seen >= $1
138           AND ip_address IS NOT NULL
139         ORDER BY last_seen DESC
140         LIMIT $2",
141    )
142    .bind(since)
143    .bind(limit)
144    .fetch_all(pool)
145    .await
146}
147
148/// Returns the total number of rows in the `reflectors` table.
149///
150/// Used by the HTTP API `/metrics` endpoint. This counts every known
151/// reflector regardless of `last_seen`, giving a registry-size snapshot.
152///
153/// # Errors
154///
155/// Returns `sqlx::Error` on query failure.
156pub(crate) async fn count_total(pool: &PgPool) -> Result<i64, sqlx::Error> {
157    let row: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM reflectors")
158        .fetch_one(pool)
159        .await?;
160    Ok(row.0)
161}
162
163/// Sets the `tier2_available` flag for a reflector.
164///
165/// Called by Tier 1 XLX API fetcher when it determines whether a reflector
166/// supports the UDP JSON monitor protocol. The flag controls whether Tier 2
167/// attempts to connect.
168///
169/// # Errors
170///
171/// Returns `sqlx::Error` on query failure.
172pub(crate) async fn set_tier2_available(
173    pool: &PgPool,
174    callsign: &str,
175    available: bool,
176) -> Result<(), sqlx::Error> {
177    // Simple UPDATE targeting a single row by primary key.
178    let _result = sqlx::query("UPDATE reflectors SET tier2_available = $1 WHERE callsign = $2")
179        .bind(available)
180        .bind(callsign)
181        .execute(pool)
182        .await?;
183    Ok(())
184}