stargazer/tier2/mod.rs
1//! Tier 2: XLX live monitoring via UDP JSON protocol.
2//!
3//! Maintains concurrent UDP connections to active XLX reflectors on port 10001,
4//! receiving real-time push notifications about connected nodes, heard stations,
5//! and on-air/off-air events.
6//!
7//! The monitor pool is activity-driven:
8//!
9//! - Reflectors detected as active by Tier 1 are connected (up to the
10//! configured maximum).
11//! - Reflectors idle beyond the configured threshold are disconnected to free
12//! slots.
13//! - Newly active reflectors are connected as Tier 1 detects them.
14//!
15//! Events are written to the `activity_log` and `connected_nodes` `PostgreSQL`
16//! tables, and on-air events can trigger Tier 3 auto-promotion for voice
17//! capture.
18
19mod monitor;
20mod protocol;
21
22use std::collections::HashMap;
23use std::net::IpAddr;
24use std::time::Duration;
25
26use chrono::Utc;
27
28use crate::config::Tier2Config;
29use crate::db;
30
31use self::monitor::XlxMonitor;
32use self::protocol::MonitorMessage;
33
34/// How often to re-query the database for newly active reflectors.
35///
36/// This is independent of the XLX monitor recv timeout; it controls how
37/// quickly the orchestrator discovers reflectors that Tier 1 has flagged
38/// as active since the last check.
39const REFRESH_INTERVAL: Duration = Duration::from_secs(60);
40
41/// Runs the Tier 2 XLX monitoring loop.
42///
43/// Manages a pool of UDP JSON monitor connections, connecting and disconnecting
44/// based on Tier 1 activity data. Runs until cancelled.
45///
46/// # Startup behavior
47///
48/// On startup, queries the database for XLX reflectors with
49/// `tier2_available = true` and recent activity (within `activity_threshold_secs`).
50/// Connects to up to `max_concurrent_monitors` of the most recently active
51/// reflectors.
52///
53/// # Main loop
54///
55/// The main loop uses `tokio::select!` to multiplex across:
56///
57/// 1. **Monitor recv**: each active monitor's `recv()` future is polled. When
58/// a message arrives, it is dispatched by type:
59/// - `Nodes`: upserts to `connected_nodes` table.
60/// - `Stations`: inserts observations to `activity_log`.
61/// - `OnAir`/`OffAir`: logged via tracing (potential Tier 3 trigger point).
62/// - `Reflector`: logged once on connect, otherwise ignored.
63/// - `Unknown`: logged at debug level for diagnostics.
64///
65/// 2. **Refresh timer**: every 60 seconds, re-queries the database for newly
66/// eligible reflectors and connects any that are not already monitored.
67///
68/// # Error handling
69///
70/// Individual monitor failures (recv timeout, parse errors) are logged and
71/// the monitor is removed from the pool. The orchestrator continues running
72/// with the remaining monitors. Only a fatal error (e.g., database pool
73/// closed) causes the function to return.
74///
75/// # Errors
76///
77/// Returns an error if a fatal, non-retryable failure occurs (e.g., the
78/// database pool is closed or initial reflector query fails).
79pub(crate) async fn run(
80 config: Tier2Config,
81 pool: sqlx::PgPool,
82) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
83 tracing::info!(
84 max_concurrent_monitors = config.max_concurrent_monitors,
85 idle_disconnect_secs = config.idle_disconnect_secs,
86 activity_threshold_secs = config.activity_threshold_secs,
87 "tier2 XLX monitoring starting"
88 );
89
90 let mut monitors: HashMap<String, XlxMonitor> = HashMap::new();
91 let mut refresh_interval = tokio::time::interval(REFRESH_INTERVAL);
92
93 // Initial connect pass: query eligible reflectors and connect monitors.
94 connect_eligible_monitors(&config, &pool, &mut monitors).await;
95
96 tracing::info!(
97 active_monitors = monitors.len(),
98 "tier2 initial monitor pool established"
99 );
100
101 // Main event loop: multiplex monitor recv and periodic refresh.
102 loop {
103 // If we have no active monitors, just wait for the refresh timer
104 // to try connecting new ones.
105 if monitors.is_empty() {
106 let _tick = refresh_interval.tick().await;
107 connect_eligible_monitors(&config, &pool, &mut monitors).await;
108 continue;
109 }
110
111 // Poll all active monitors concurrently. We collect the callsigns
112 // into a Vec first to avoid borrow conflicts with the HashMap.
113 let callsigns: Vec<String> = monitors.keys().cloned().collect();
114
115 tokio::select! {
116 // Refresh timer: check for newly eligible reflectors.
117 _ = refresh_interval.tick() => {
118 connect_eligible_monitors(&config, &pool, &mut monitors).await;
119 }
120
121 // Monitor recv: process the first message from any monitor.
122 // We use a helper that polls all monitors and returns the first
123 // result along with the reflector callsign.
124 result = poll_any_monitor(&callsigns, &monitors) => {
125 let (callsign, message) = result;
126
127 if let Some(msg) = message {
128 handle_message(&callsign, &msg, &pool).await;
129 } else {
130 // Recv returned None — timeout or error. Remove the
131 // monitor so it can be reconnected on the next refresh.
132 tracing::info!(
133 reflector = %callsign,
134 "tier2 monitor unresponsive, removing from pool"
135 );
136 // Drop sends best-effort "bye".
137 let _removed = monitors.remove(&callsign);
138 }
139 }
140 }
141 }
142}
143
144/// Queries the database for tier2-eligible reflectors and connects monitors
145/// for any that are not already in the pool.
146///
147/// Respects the `max_concurrent_monitors` cap. Only connects to reflectors
148/// that have a valid IP address, `tier2_available = true`, and recent activity.
149async fn connect_eligible_monitors(
150 config: &Tier2Config,
151 pool: &sqlx::PgPool,
152 monitors: &mut HashMap<String, XlxMonitor>,
153) {
154 let since = Utc::now()
155 - chrono::Duration::seconds(
156 i64::try_from(config.activity_threshold_secs).unwrap_or(i64::MAX),
157 );
158 let limit = i64::try_from(config.max_concurrent_monitors).unwrap_or(i64::MAX);
159
160 let reflectors = match db::reflectors::get_tier2_eligible(pool, since, limit).await {
161 Ok(rows) => rows,
162 Err(e) => {
163 tracing::warn!(error = %e, "tier2: failed to query eligible reflectors");
164 return;
165 }
166 };
167
168 for row in &reflectors {
169 // Skip if already monitored.
170 if monitors.contains_key(&row.callsign) {
171 continue;
172 }
173
174 // Respect the concurrency cap.
175 if monitors.len() >= config.max_concurrent_monitors {
176 break;
177 }
178
179 // Parse the IP address from the database.
180 let Some(ip_str) = &row.ip_address else {
181 tracing::debug!(
182 reflector = %row.callsign,
183 "tier2: skipping reflector with no IP address"
184 );
185 continue;
186 };
187
188 let ip: IpAddr = match ip_str.parse() {
189 Ok(addr) => addr,
190 Err(e) => {
191 tracing::debug!(
192 reflector = %row.callsign,
193 ip = %ip_str,
194 error = %e,
195 "tier2: skipping reflector with unparseable IP"
196 );
197 continue;
198 }
199 };
200
201 // Attempt to connect the monitor.
202 match XlxMonitor::connect(ip, row.callsign.clone()).await {
203 Ok(mon) => {
204 tracing::info!(
205 reflector = %row.callsign,
206 peer = %mon.peer(),
207 "tier2 monitor connected"
208 );
209 let _prev = monitors.insert(row.callsign.clone(), mon);
210 }
211 Err(e) => {
212 tracing::warn!(
213 reflector = %row.callsign,
214 ip = %ip,
215 error = %e,
216 "tier2: failed to connect monitor"
217 );
218 }
219 }
220 }
221}
222
223/// Polls all monitors via round-robin and returns the first message received
224/// along with the reflector callsign that produced it.
225///
226/// Each monitor is given a 500ms window to produce a message. If no monitor
227/// has data in the quick-poll pass, falls back to a full blocking recv on
228/// the first monitor (which uses the standard 30-second timeout).
229///
230/// With up to 100 monitors, the round-robin worst case is 50 seconds, but in
231/// practice monitors with pending data return immediately.
232async fn poll_any_monitor(
233 callsigns: &[String],
234 monitors: &HashMap<String, XlxMonitor>,
235) -> (String, Option<MonitorMessage>) {
236 // Round-robin poll with 500ms per-monitor timeout. Most monitors with
237 // pending data return immediately; the timeout only fires for idle ones.
238 let poll_timeout = Duration::from_millis(500);
239
240 for callsign in callsigns {
241 if let Some(monitor) = monitors.get(callsign)
242 && let Ok(msg) = tokio::time::timeout(poll_timeout, monitor.recv()).await
243 {
244 return (callsign.clone(), msg);
245 }
246 // This monitor had no data within the poll window — try the next.
247 }
248
249 // All monitors timed out in the quick-poll pass. Do a full blocking recv
250 // on the first monitor (uses the standard 30-second timeout) to avoid
251 // busy-spinning when all monitors are idle.
252 if let Some(callsign) = callsigns.first()
253 && let Some(monitor) = monitors.get(callsign)
254 {
255 let msg = monitor.recv().await;
256 return (callsign.clone(), msg);
257 }
258
259 // Unreachable when callsigns is non-empty (caller checks monitors.is_empty()),
260 // but we must return something for exhaustiveness.
261 callsigns
262 .first()
263 .map_or_else(|| (String::new(), None), |cs| (cs.clone(), None))
264}
265
266/// Dispatches a parsed monitor message to the appropriate handler.
267async fn handle_message(reflector: &str, msg: &MonitorMessage, pool: &sqlx::PgPool) {
268 match msg {
269 MonitorMessage::Reflector(info) => {
270 tracing::info!(
271 reflector = %reflector,
272 reported_name = %info.reflector.trim(),
273 module_count = info.modules.len(),
274 "tier2: reflector info received"
275 );
276 }
277 MonitorMessage::Nodes(nodes) => {
278 handle_nodes_update(reflector, nodes, pool).await;
279 }
280 MonitorMessage::Stations(stations) => {
281 handle_stations_update(reflector, stations, pool).await;
282 }
283 MonitorMessage::OnAir(callsign) => {
284 // TODO: trigger point for Tier 3 auto-promotion. When a station
285 // goes on-air, the orchestrator could signal the Tier 3 manager to
286 // establish a full D-STAR connection for voice capture.
287 tracing::info!(
288 reflector = %reflector,
289 callsign = %callsign.trim(),
290 "tier2: station on-air"
291 );
292 }
293 MonitorMessage::OffAir(callsign) => {
294 tracing::info!(
295 reflector = %reflector,
296 callsign = %callsign.trim(),
297 "tier2: station off-air"
298 );
299 }
300 MonitorMessage::Unknown(raw) => {
301 tracing::debug!(
302 reflector = %reflector,
303 raw_json = %raw,
304 "tier2: unrecognized monitor message"
305 );
306 }
307 }
308}
309
310/// Processes a nodes update: clears stale entries and upserts the fresh snapshot.
311async fn handle_nodes_update(reflector: &str, nodes: &[protocol::NodeInfo], pool: &sqlx::PgPool) {
312 tracing::debug!(
313 reflector = %reflector,
314 node_count = nodes.len(),
315 "tier2: nodes update"
316 );
317
318 // Clear stale nodes for this reflector, then upsert the fresh snapshot.
319 // This simple delete-then-reinsert avoids diff logic.
320 if let Err(e) = db::connected_nodes::clear_for_reflector(pool, reflector).await {
321 tracing::warn!(
322 reflector = %reflector,
323 error = %e,
324 "tier2: failed to clear stale nodes"
325 );
326 return;
327 }
328
329 let now = Utc::now();
330 for node in nodes {
331 // Extract the module letter from the linkedto field.
332 let module = if node.linkedto.is_empty() {
333 None
334 } else {
335 Some(node.linkedto.as_str())
336 };
337
338 if let Err(e) =
339 db::connected_nodes::upsert_node(pool, reflector, &node.callsign, module, now).await
340 {
341 tracing::warn!(
342 reflector = %reflector,
343 node = %node.callsign,
344 error = %e,
345 "tier2: failed to upsert connected node"
346 );
347 }
348 }
349}
350
351/// Processes a stations update: inserts each station as an activity observation.
352async fn handle_stations_update(
353 reflector: &str,
354 stations: &[protocol::StationInfo],
355 pool: &sqlx::PgPool,
356) {
357 tracing::debug!(
358 reflector = %reflector,
359 station_count = stations.len(),
360 "tier2: stations update"
361 );
362
363 let now = Utc::now();
364 for station in stations {
365 let module = if station.module.is_empty() {
366 None
367 } else {
368 Some(station.module.as_str())
369 };
370
371 if let Err(e) = db::activity::insert_observation(
372 pool,
373 reflector,
374 module,
375 station.callsign.trim(),
376 "xlx_monitor",
377 now,
378 )
379 .await
380 {
381 tracing::warn!(
382 reflector = %reflector,
383 station = %station.callsign,
384 error = %e,
385 "tier2: failed to insert station observation"
386 );
387 }
388 }
389}