stargazer/db/reflectors.rs
1//! Query functions for the `reflectors` table.
2//!
3//! The reflectors table is the central registry populated by Tier 1 discovery
4//! sweeps. Each row represents a unique D-STAR reflector identified by its
5//! callsign (e.g., `REF001`, `XLX320`, `DCS001`).
6//!
7//! Tier 1 fetchers call [`upsert`] after each sweep to insert newly-discovered
8//! reflectors or update metadata (IP address, dashboard URL, last-seen time)
9//! for existing ones. Tier 2 monitors query [`get_active`] to decide which
10//! reflectors have recent activity and deserve live monitoring.
11
12use chrono::{DateTime, Utc};
13use sqlx::PgPool;
14
15/// A single row from the `reflectors` table.
16///
17/// Maps directly to the table columns via `sqlx::FromRow`. All optional
18/// columns use `Option<T>` so that partially-populated rows (e.g., reflectors
19/// discovered from Pi-Star host files, which lack dashboard URLs) deserialize
20/// cleanly.
21#[derive(Debug, sqlx::FromRow)]
22pub(crate) struct ReflectorRow {
23 /// Reflector callsign (primary key), e.g. `"REF001"` or `"XLX320"`.
24 pub(crate) callsign: String,
25
26 /// Protocol family: `"dplus"`, `"dextra"`, or `"dcs"`.
27 pub(crate) protocol: String,
28
29 /// Reflector IP address (Postgres `INET` maps to `String` via sqlx).
30 pub(crate) ip_address: Option<String>,
31
32 /// URL of the reflector's web dashboard, if known.
33 pub(crate) dashboard_url: Option<String>,
34
35 /// Two-letter country code, if known.
36 pub(crate) country: Option<String>,
37
38 /// Timestamp of the most recent observation from any source.
39 pub(crate) last_seen: Option<DateTime<Utc>>,
40
41 /// Whether this reflector supports the XLX UDP JSON monitor protocol
42 /// (port 10001), making it eligible for Tier 2 monitoring.
43 pub(crate) tier2_available: Option<bool>,
44
45 /// When this row was first inserted.
46 pub(crate) created_at: Option<DateTime<Utc>>,
47}
48
49/// Inserts a new reflector or updates an existing one on callsign conflict.
50///
51/// Executes an `INSERT ... ON CONFLICT (callsign) DO UPDATE` so that repeated
52/// discovery sweeps refresh metadata without requiring a separate
53/// existence check.
54///
55/// # Errors
56///
57/// Returns `sqlx::Error` on connection or constraint failures.
58pub(crate) async fn upsert(
59 pool: &PgPool,
60 callsign: &str,
61 protocol: &str,
62 ip_address: Option<&str>,
63 dashboard_url: Option<&str>,
64 country: Option<&str>,
65) -> Result<(), sqlx::Error> {
66 // ON CONFLICT UPDATE refreshes all mutable metadata columns and bumps
67 // last_seen to the current time. The callsign (PK) and created_at are
68 // left unchanged.
69 let _result = sqlx::query(
70 "INSERT INTO reflectors (callsign, protocol, ip_address, dashboard_url, country, last_seen)
71 VALUES ($1, $2, $3::INET, $4, $5, now())
72 ON CONFLICT (callsign) DO UPDATE SET
73 protocol = EXCLUDED.protocol,
74 ip_address = EXCLUDED.ip_address,
75 dashboard_url = EXCLUDED.dashboard_url,
76 country = EXCLUDED.country,
77 last_seen = now()",
78 )
79 .bind(callsign)
80 .bind(protocol)
81 .bind(ip_address)
82 .bind(dashboard_url)
83 .bind(country)
84 .execute(pool)
85 .await?;
86 Ok(())
87}
88
89/// Returns all reflectors that have been seen since the given timestamp.
90///
91/// Used by Tier 2 to select reflectors with recent activity for live
92/// monitoring. Results are ordered by `last_seen DESC` so the most recently
93/// active reflectors appear first.
94///
95/// # Errors
96///
97/// Returns `sqlx::Error` on query failure.
98pub(crate) async fn get_active(
99 pool: &PgPool,
100 since: DateTime<Utc>,
101) -> Result<Vec<ReflectorRow>, sqlx::Error> {
102 // Filters on last_seen >= $1 and orders by recency.
103 sqlx::query_as::<_, ReflectorRow>(
104 "SELECT callsign, protocol, ip_address, dashboard_url, country,
105 last_seen, tier2_available, created_at
106 FROM reflectors
107 WHERE last_seen >= $1
108 ORDER BY last_seen DESC",
109 )
110 .bind(since)
111 .fetch_all(pool)
112 .await
113}
114
115/// Returns reflectors eligible for Tier 2 monitoring.
116///
117/// Filters for reflectors where `tier2_available = true` (supports the XLX UDP
118/// JSON monitor protocol) AND `last_seen >= since` (has recent activity). The
119/// additional `ip_address IS NOT NULL` check ensures we have a usable endpoint.
120///
121/// Results are ordered by `last_seen DESC` and capped at `limit` rows so the
122/// orchestrator can respect its `max_concurrent_monitors` cap.
123///
124/// # Errors
125///
126/// Returns `sqlx::Error` on query failure.
127pub(crate) async fn get_tier2_eligible(
128 pool: &PgPool,
129 since: DateTime<Utc>,
130 limit: i64,
131) -> Result<Vec<ReflectorRow>, sqlx::Error> {
132 sqlx::query_as::<_, ReflectorRow>(
133 "SELECT callsign, protocol, ip_address, dashboard_url, country,
134 last_seen, tier2_available, created_at
135 FROM reflectors
136 WHERE tier2_available = true
137 AND last_seen >= $1
138 AND ip_address IS NOT NULL
139 ORDER BY last_seen DESC
140 LIMIT $2",
141 )
142 .bind(since)
143 .bind(limit)
144 .fetch_all(pool)
145 .await
146}
147
148/// Returns the total number of rows in the `reflectors` table.
149///
150/// Used by the HTTP API `/metrics` endpoint. This counts every known
151/// reflector regardless of `last_seen`, giving a registry-size snapshot.
152///
153/// # Errors
154///
155/// Returns `sqlx::Error` on query failure.
156pub(crate) async fn count_total(pool: &PgPool) -> Result<i64, sqlx::Error> {
157 let row: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM reflectors")
158 .fetch_one(pool)
159 .await?;
160 Ok(row.0)
161}
162
163/// Sets the `tier2_available` flag for a reflector.
164///
165/// Called by Tier 1 XLX API fetcher when it determines whether a reflector
166/// supports the UDP JSON monitor protocol. The flag controls whether Tier 2
167/// attempts to connect.
168///
169/// # Errors
170///
171/// Returns `sqlx::Error` on query failure.
172pub(crate) async fn set_tier2_available(
173 pool: &PgPool,
174 callsign: &str,
175 available: bool,
176) -> Result<(), sqlx::Error> {
177 // Simple UPDATE targeting a single row by primary key.
178 let _result = sqlx::query("UPDATE reflectors SET tier2_available = $1 WHERE callsign = $2")
179 .bind(available)
180 .bind(callsign)
181 .execute(pool)
182 .await?;
183 Ok(())
184}