stargazer/tier2/
monitor.rs

1//! Single-reflector XLX UDP JSON monitor client.
2//!
3//! Each [`XlxMonitor`] manages a single UDP socket connected to one XLX
4//! reflector's monitor port (10001). The lifecycle is:
5//!
6//! 1. **Connect**: [`XlxMonitor::connect`] binds a local UDP socket, sends the
7//!    `"hello"` handshake datagram, and returns the monitor handle.
8//!
9//! 2. **Receive**: [`XlxMonitor::recv`] awaits the next UDP datagram with a
10//!    30-second timeout, parses the JSON payload via [`protocol::parse`], and
11//!    returns the decoded [`MonitorMessage`]. A timeout returns `None`,
12//!    signaling that the reflector may be unresponsive.
13//!
14//! 3. **Disconnect**: drop the monitor. The [`Drop`] implementation sends
15//!    a best-effort `"bye"` datagram so the reflector can clean up its
16//!    client entry promptly rather than waiting for a timeout.
17//!
18//! # Single-client limitation
19//!
20//! Each XLX reflector monitor port accepts connections from any client, but
21//! each `XlxMonitor` instance talks to exactly one reflector. To monitor
22//! multiple reflectors, the orchestrator in [`super::run`] manages a pool of
23//! monitors.
24//!
25//! # Timeout behavior
26//!
27//! The 30-second recv timeout is chosen to be 3x the xlxd update period
28//! (~10 seconds). If no data arrives within this window, the reflector is
29//! likely down or the network path is broken. The orchestrator can then
30//! decide to reconnect or replace the monitor.
31//!
32//! # UDP socket binding
33//!
34//! Each monitor binds to `0.0.0.0:0` (ephemeral port) because the XLX
35//! monitor protocol is stateless from the server's perspective — the server
36//! tracks clients by source address, and each monitor needs its own port to
37//! avoid datagram interleaving.
38
39use std::io;
40use std::net::{IpAddr, SocketAddr};
41use std::time::Duration;
42
43use tokio::net::UdpSocket;
44
45use super::protocol::{self, MonitorMessage};
46
47/// XLX monitor port as defined by the xlxd source code.
48const XLX_MONITOR_PORT: u16 = 10001;
49
50/// Recv timeout: 3x the xlxd ~10-second update period.
51const RECV_TIMEOUT: Duration = Duration::from_secs(30);
52
53/// Maximum UDP datagram size for the XLX monitor protocol.
54///
55/// XLX node dumps can contain up to 250 entries, and each JSON node object
56/// is roughly 100-150 bytes. 65535 bytes (maximum UDP payload) is sufficient
57/// for any realistic message size.
58const MAX_DATAGRAM_SIZE: usize = 65535;
59
60/// A UDP client connected to a single XLX reflector's monitor port.
61///
62/// Manages the UDP socket lifecycle and provides async methods for receiving
63/// parsed monitor messages. Create via [`XlxMonitor::connect`] and receive
64/// events via [`XlxMonitor::recv`].
65///
66/// On drop, a best-effort `"bye"` datagram is sent to the reflector so it can
67/// clean up its client tracking state promptly.
68#[derive(Debug)]
69pub(crate) struct XlxMonitor {
70    /// The bound UDP socket, connected to the reflector's monitor port.
71    socket: UdpSocket,
72
73    /// The reflector's monitor endpoint (`ip:10001`).
74    peer: SocketAddr,
75
76    /// Reflector callsign for logging and database correlation.
77    reflector: String,
78}
79
80impl XlxMonitor {
81    /// Connects to a reflector's XLX monitor port and sends the `"hello"`
82    /// handshake.
83    ///
84    /// Binds a new UDP socket on an ephemeral port, "connects" it to the
85    /// reflector's monitor address (so that subsequent `send`/`recv` calls are
86    /// scoped to this peer), and sends the initial `"hello"` datagram.
87    ///
88    /// The reflector will respond with three JSON datagrams (reflector info,
89    /// nodes snapshot, stations snapshot) which can be read via [`recv`](Self::recv).
90    ///
91    /// # Errors
92    ///
93    /// Returns `io::Error` if socket binding or the initial send fails.
94    pub(crate) async fn connect(ip: IpAddr, reflector: String) -> Result<Self, io::Error> {
95        // Bind to an ephemeral port. Use the appropriate wildcard address for
96        // the peer's address family (IPv4 or IPv6).
97        let bind_addr: SocketAddr = if ip.is_ipv4() {
98            SocketAddr::new(IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), 0)
99        } else {
100            SocketAddr::new(IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED), 0)
101        };
102
103        let socket = UdpSocket::bind(bind_addr).await?;
104
105        let peer = SocketAddr::new(ip, XLX_MONITOR_PORT);
106        // Connect the socket to the peer so that send/recv are scoped.
107        socket.connect(peer).await?;
108
109        // Send the handshake "hello" datagram.
110        let _bytes_sent = socket.send(b"hello").await?;
111
112        tracing::debug!(
113            reflector = %reflector,
114            peer = %peer,
115            "xlx monitor connected"
116        );
117
118        Ok(Self {
119            socket,
120            peer,
121            reflector,
122        })
123    }
124
125    /// Receives the next monitor message from the reflector.
126    ///
127    /// Blocks (asynchronously) until a UDP datagram arrives or the 30-second
128    /// timeout expires. Returns:
129    ///
130    /// - `Some(message)` — a successfully parsed [`MonitorMessage`].
131    /// - `None` — the timeout expired (no data received within 30 seconds),
132    ///   or the received datagram was not valid JSON and could not be parsed
133    ///   at all (not even as `Unknown`).
134    ///
135    /// The caller should treat repeated `None` returns as a signal that the
136    /// reflector is unresponsive and consider reconnecting.
137    pub(crate) async fn recv(&self) -> Option<MonitorMessage> {
138        let mut buf = vec![0u8; MAX_DATAGRAM_SIZE];
139
140        // Apply the recv timeout. If no datagram arrives within the window,
141        // return None to signal potential reflector unresponsiveness.
142        let result = tokio::time::timeout(RECV_TIMEOUT, self.socket.recv(&mut buf)).await;
143
144        match result {
145            Ok(Ok(n)) => {
146                // Successfully received n bytes. Parse the JSON payload.
147                // `n` is bounded by the buffer length (recv writes at most
148                // buf.len() bytes), so get(..n) always returns Some.
149                buf.get(..n).and_then(protocol::parse)
150            }
151            Ok(Err(e)) => {
152                // UDP recv error (unlikely for connected UDP, but possible
153                // with ICMP unreachable or similar).
154                tracing::warn!(
155                    reflector = %self.reflector,
156                    error = %e,
157                    "xlx monitor recv error"
158                );
159                None
160            }
161            Err(_elapsed) => {
162                // Timeout expired — no datagram received within RECV_TIMEOUT.
163                tracing::debug!(
164                    reflector = %self.reflector,
165                    timeout_secs = RECV_TIMEOUT.as_secs(),
166                    "xlx monitor recv timeout"
167                );
168                None
169            }
170        }
171    }
172
173    /// Returns the peer socket address (`ip:10001`).
174    pub(crate) const fn peer(&self) -> SocketAddr {
175        self.peer
176    }
177}
178
179impl Drop for XlxMonitor {
180    /// Best-effort `"bye"` on drop.
181    ///
182    /// Uses `try_send` (non-async, non-blocking) because `Drop` cannot be
183    /// async. If the send fails (e.g., the socket is already closed or the
184    /// runtime is shutting down), the failure is silently ignored — the
185    /// reflector will eventually time out the client entry on its own.
186    fn drop(&mut self) {
187        // try_send is the synchronous equivalent for connected UDP sockets.
188        // It will fail if the send buffer is full or the socket is closed,
189        // but that's acceptable for a best-effort cleanup.
190        let _result = self.socket.try_send(b"bye");
191    }
192}