stargazer/tier2/
mod.rs

1//! Tier 2: XLX live monitoring via UDP JSON protocol.
2//!
3//! Maintains concurrent UDP connections to active XLX reflectors on port 10001,
4//! receiving real-time push notifications about connected nodes, heard stations,
5//! and on-air/off-air events.
6//!
7//! The monitor pool is activity-driven:
8//!
9//! - Reflectors detected as active by Tier 1 are connected (up to the
10//!   configured maximum).
11//! - Reflectors idle beyond the configured threshold are disconnected to free
12//!   slots.
13//! - Newly active reflectors are connected as Tier 1 detects them.
14//!
15//! Events are written to the `activity_log` and `connected_nodes` `PostgreSQL`
16//! tables, and on-air events can trigger Tier 3 auto-promotion for voice
17//! capture.
18
19mod monitor;
20mod protocol;
21
22use std::collections::HashMap;
23use std::net::IpAddr;
24use std::time::Duration;
25
26use chrono::Utc;
27
28use crate::config::Tier2Config;
29use crate::db;
30
31use self::monitor::XlxMonitor;
32use self::protocol::MonitorMessage;
33
34/// How often to re-query the database for newly active reflectors.
35///
36/// This is independent of the XLX monitor recv timeout; it controls how
37/// quickly the orchestrator discovers reflectors that Tier 1 has flagged
38/// as active since the last check.
39const REFRESH_INTERVAL: Duration = Duration::from_secs(60);
40
41/// Runs the Tier 2 XLX monitoring loop.
42///
43/// Manages a pool of UDP JSON monitor connections, connecting and disconnecting
44/// based on Tier 1 activity data. Runs until cancelled.
45///
46/// # Startup behavior
47///
48/// On startup, queries the database for XLX reflectors with
49/// `tier2_available = true` and recent activity (within `activity_threshold_secs`).
50/// Connects to up to `max_concurrent_monitors` of the most recently active
51/// reflectors.
52///
53/// # Main loop
54///
55/// The main loop uses `tokio::select!` to multiplex across:
56///
57/// 1. **Monitor recv**: each active monitor's `recv()` future is polled. When
58///    a message arrives, it is dispatched by type:
59///    - `Nodes`: upserts to `connected_nodes` table.
60///    - `Stations`: inserts observations to `activity_log`.
61///    - `OnAir`/`OffAir`: logged via tracing (potential Tier 3 trigger point).
62///    - `Reflector`: logged once on connect, otherwise ignored.
63///    - `Unknown`: logged at debug level for diagnostics.
64///
65/// 2. **Refresh timer**: every 60 seconds, re-queries the database for newly
66///    eligible reflectors and connects any that are not already monitored.
67///
68/// # Error handling
69///
70/// Individual monitor failures (recv timeout, parse errors) are logged and
71/// the monitor is removed from the pool. The orchestrator continues running
72/// with the remaining monitors. Only a fatal error (e.g., database pool
73/// closed) causes the function to return.
74///
75/// # Errors
76///
77/// Returns an error if a fatal, non-retryable failure occurs (e.g., the
78/// database pool is closed or initial reflector query fails).
79pub(crate) async fn run(
80    config: Tier2Config,
81    pool: sqlx::PgPool,
82) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
83    tracing::info!(
84        max_concurrent_monitors = config.max_concurrent_monitors,
85        idle_disconnect_secs = config.idle_disconnect_secs,
86        activity_threshold_secs = config.activity_threshold_secs,
87        "tier2 XLX monitoring starting"
88    );
89
90    let mut monitors: HashMap<String, XlxMonitor> = HashMap::new();
91    let mut refresh_interval = tokio::time::interval(REFRESH_INTERVAL);
92
93    // Initial connect pass: query eligible reflectors and connect monitors.
94    connect_eligible_monitors(&config, &pool, &mut monitors).await;
95
96    tracing::info!(
97        active_monitors = monitors.len(),
98        "tier2 initial monitor pool established"
99    );
100
101    // Main event loop: multiplex monitor recv and periodic refresh.
102    loop {
103        // If we have no active monitors, just wait for the refresh timer
104        // to try connecting new ones.
105        if monitors.is_empty() {
106            let _tick = refresh_interval.tick().await;
107            connect_eligible_monitors(&config, &pool, &mut monitors).await;
108            continue;
109        }
110
111        // Poll all active monitors concurrently. We collect the callsigns
112        // into a Vec first to avoid borrow conflicts with the HashMap.
113        let callsigns: Vec<String> = monitors.keys().cloned().collect();
114
115        tokio::select! {
116            // Refresh timer: check for newly eligible reflectors.
117            _ = refresh_interval.tick() => {
118                connect_eligible_monitors(&config, &pool, &mut monitors).await;
119            }
120
121            // Monitor recv: process the first message from any monitor.
122            // We use a helper that polls all monitors and returns the first
123            // result along with the reflector callsign.
124            result = poll_any_monitor(&callsigns, &monitors) => {
125                let (callsign, message) = result;
126
127                if let Some(msg) = message {
128                    handle_message(&callsign, &msg, &pool).await;
129                } else {
130                    // Recv returned None — timeout or error. Remove the
131                    // monitor so it can be reconnected on the next refresh.
132                    tracing::info!(
133                        reflector = %callsign,
134                        "tier2 monitor unresponsive, removing from pool"
135                    );
136                    // Drop sends best-effort "bye".
137                    let _removed = monitors.remove(&callsign);
138                }
139            }
140        }
141    }
142}
143
144/// Queries the database for tier2-eligible reflectors and connects monitors
145/// for any that are not already in the pool.
146///
147/// Respects the `max_concurrent_monitors` cap. Only connects to reflectors
148/// that have a valid IP address, `tier2_available = true`, and recent activity.
149async fn connect_eligible_monitors(
150    config: &Tier2Config,
151    pool: &sqlx::PgPool,
152    monitors: &mut HashMap<String, XlxMonitor>,
153) {
154    let since = Utc::now()
155        - chrono::Duration::seconds(
156            i64::try_from(config.activity_threshold_secs).unwrap_or(i64::MAX),
157        );
158    let limit = i64::try_from(config.max_concurrent_monitors).unwrap_or(i64::MAX);
159
160    let reflectors = match db::reflectors::get_tier2_eligible(pool, since, limit).await {
161        Ok(rows) => rows,
162        Err(e) => {
163            tracing::warn!(error = %e, "tier2: failed to query eligible reflectors");
164            return;
165        }
166    };
167
168    for row in &reflectors {
169        // Skip if already monitored.
170        if monitors.contains_key(&row.callsign) {
171            continue;
172        }
173
174        // Respect the concurrency cap.
175        if monitors.len() >= config.max_concurrent_monitors {
176            break;
177        }
178
179        // Parse the IP address from the database.
180        let Some(ip_str) = &row.ip_address else {
181            tracing::debug!(
182                reflector = %row.callsign,
183                "tier2: skipping reflector with no IP address"
184            );
185            continue;
186        };
187
188        let ip: IpAddr = match ip_str.parse() {
189            Ok(addr) => addr,
190            Err(e) => {
191                tracing::debug!(
192                    reflector = %row.callsign,
193                    ip = %ip_str,
194                    error = %e,
195                    "tier2: skipping reflector with unparseable IP"
196                );
197                continue;
198            }
199        };
200
201        // Attempt to connect the monitor.
202        match XlxMonitor::connect(ip, row.callsign.clone()).await {
203            Ok(mon) => {
204                tracing::info!(
205                    reflector = %row.callsign,
206                    peer = %mon.peer(),
207                    "tier2 monitor connected"
208                );
209                let _prev = monitors.insert(row.callsign.clone(), mon);
210            }
211            Err(e) => {
212                tracing::warn!(
213                    reflector = %row.callsign,
214                    ip = %ip,
215                    error = %e,
216                    "tier2: failed to connect monitor"
217                );
218            }
219        }
220    }
221}
222
223/// Polls all monitors via round-robin and returns the first message received
224/// along with the reflector callsign that produced it.
225///
226/// Each monitor is given a 500ms window to produce a message. If no monitor
227/// has data in the quick-poll pass, falls back to a full blocking recv on
228/// the first monitor (which uses the standard 30-second timeout).
229///
230/// With up to 100 monitors, the round-robin worst case is 50 seconds, but in
231/// practice monitors with pending data return immediately.
232async fn poll_any_monitor(
233    callsigns: &[String],
234    monitors: &HashMap<String, XlxMonitor>,
235) -> (String, Option<MonitorMessage>) {
236    // Round-robin poll with 500ms per-monitor timeout. Most monitors with
237    // pending data return immediately; the timeout only fires for idle ones.
238    let poll_timeout = Duration::from_millis(500);
239
240    for callsign in callsigns {
241        if let Some(monitor) = monitors.get(callsign)
242            && let Ok(msg) = tokio::time::timeout(poll_timeout, monitor.recv()).await
243        {
244            return (callsign.clone(), msg);
245        }
246        // This monitor had no data within the poll window — try the next.
247    }
248
249    // All monitors timed out in the quick-poll pass. Do a full blocking recv
250    // on the first monitor (uses the standard 30-second timeout) to avoid
251    // busy-spinning when all monitors are idle.
252    if let Some(callsign) = callsigns.first()
253        && let Some(monitor) = monitors.get(callsign)
254    {
255        let msg = monitor.recv().await;
256        return (callsign.clone(), msg);
257    }
258
259    // Unreachable when callsigns is non-empty (caller checks monitors.is_empty()),
260    // but we must return something for exhaustiveness.
261    callsigns
262        .first()
263        .map_or_else(|| (String::new(), None), |cs| (cs.clone(), None))
264}
265
266/// Dispatches a parsed monitor message to the appropriate handler.
267async fn handle_message(reflector: &str, msg: &MonitorMessage, pool: &sqlx::PgPool) {
268    match msg {
269        MonitorMessage::Reflector(info) => {
270            tracing::info!(
271                reflector = %reflector,
272                reported_name = %info.reflector.trim(),
273                module_count = info.modules.len(),
274                "tier2: reflector info received"
275            );
276        }
277        MonitorMessage::Nodes(nodes) => {
278            handle_nodes_update(reflector, nodes, pool).await;
279        }
280        MonitorMessage::Stations(stations) => {
281            handle_stations_update(reflector, stations, pool).await;
282        }
283        MonitorMessage::OnAir(callsign) => {
284            // TODO: trigger point for Tier 3 auto-promotion. When a station
285            // goes on-air, the orchestrator could signal the Tier 3 manager to
286            // establish a full D-STAR connection for voice capture.
287            tracing::info!(
288                reflector = %reflector,
289                callsign = %callsign.trim(),
290                "tier2: station on-air"
291            );
292        }
293        MonitorMessage::OffAir(callsign) => {
294            tracing::info!(
295                reflector = %reflector,
296                callsign = %callsign.trim(),
297                "tier2: station off-air"
298            );
299        }
300        MonitorMessage::Unknown(raw) => {
301            tracing::debug!(
302                reflector = %reflector,
303                raw_json = %raw,
304                "tier2: unrecognized monitor message"
305            );
306        }
307    }
308}
309
310/// Processes a nodes update: clears stale entries and upserts the fresh snapshot.
311async fn handle_nodes_update(reflector: &str, nodes: &[protocol::NodeInfo], pool: &sqlx::PgPool) {
312    tracing::debug!(
313        reflector = %reflector,
314        node_count = nodes.len(),
315        "tier2: nodes update"
316    );
317
318    // Clear stale nodes for this reflector, then upsert the fresh snapshot.
319    // This simple delete-then-reinsert avoids diff logic.
320    if let Err(e) = db::connected_nodes::clear_for_reflector(pool, reflector).await {
321        tracing::warn!(
322            reflector = %reflector,
323            error = %e,
324            "tier2: failed to clear stale nodes"
325        );
326        return;
327    }
328
329    let now = Utc::now();
330    for node in nodes {
331        // Extract the module letter from the linkedto field.
332        let module = if node.linkedto.is_empty() {
333            None
334        } else {
335            Some(node.linkedto.as_str())
336        };
337
338        if let Err(e) =
339            db::connected_nodes::upsert_node(pool, reflector, &node.callsign, module, now).await
340        {
341            tracing::warn!(
342                reflector = %reflector,
343                node = %node.callsign,
344                error = %e,
345                "tier2: failed to upsert connected node"
346            );
347        }
348    }
349}
350
351/// Processes a stations update: inserts each station as an activity observation.
352async fn handle_stations_update(
353    reflector: &str,
354    stations: &[protocol::StationInfo],
355    pool: &sqlx::PgPool,
356) {
357    tracing::debug!(
358        reflector = %reflector,
359        station_count = stations.len(),
360        "tier2: stations update"
361    );
362
363    let now = Utc::now();
364    for station in stations {
365        let module = if station.module.is_empty() {
366            None
367        } else {
368            Some(station.module.as_str())
369        };
370
371        if let Err(e) = db::activity::insert_observation(
372            pool,
373            reflector,
374            module,
375            station.callsign.trim(),
376            "xlx_monitor",
377            now,
378        )
379        .await
380        {
381            tracing::warn!(
382                reflector = %reflector,
383                station = %station.callsign,
384                error = %e,
385                "tier2: failed to insert station observation"
386            );
387        }
388    }
389}