stargazer/tier2/monitor.rs
1//! Single-reflector XLX UDP JSON monitor client.
2//!
3//! Each [`XlxMonitor`] manages a single UDP socket connected to one XLX
4//! reflector's monitor port (10001). The lifecycle is:
5//!
6//! 1. **Connect**: [`XlxMonitor::connect`] binds a local UDP socket, sends the
7//! `"hello"` handshake datagram, and returns the monitor handle.
8//!
9//! 2. **Receive**: [`XlxMonitor::recv`] awaits the next UDP datagram with a
10//! 30-second timeout, parses the JSON payload via [`protocol::parse`], and
11//! returns the decoded [`MonitorMessage`]. A timeout returns `None`,
12//! signaling that the reflector may be unresponsive.
13//!
14//! 3. **Disconnect**: drop the monitor. The [`Drop`] implementation sends
15//! a best-effort `"bye"` datagram so the reflector can clean up its
16//! client entry promptly rather than waiting for a timeout.
17//!
18//! # Single-client limitation
19//!
20//! Each XLX reflector monitor port accepts connections from any client, but
21//! each `XlxMonitor` instance talks to exactly one reflector. To monitor
22//! multiple reflectors, the orchestrator in [`super::run`] manages a pool of
23//! monitors.
24//!
25//! # Timeout behavior
26//!
27//! The 30-second recv timeout is chosen to be 3x the xlxd update period
28//! (~10 seconds). If no data arrives within this window, the reflector is
29//! likely down or the network path is broken. The orchestrator can then
30//! decide to reconnect or replace the monitor.
31//!
32//! # UDP socket binding
33//!
34//! Each monitor binds to `0.0.0.0:0` (ephemeral port) because the XLX
35//! monitor protocol is stateless from the server's perspective — the server
36//! tracks clients by source address, and each monitor needs its own port to
37//! avoid datagram interleaving.
38
39use std::io;
40use std::net::{IpAddr, SocketAddr};
41use std::time::Duration;
42
43use tokio::net::UdpSocket;
44
45use super::protocol::{self, MonitorMessage};
46
47/// XLX monitor port as defined by the xlxd source code.
48const XLX_MONITOR_PORT: u16 = 10001;
49
50/// Recv timeout: 3x the xlxd ~10-second update period.
51const RECV_TIMEOUT: Duration = Duration::from_secs(30);
52
53/// Maximum UDP datagram size for the XLX monitor protocol.
54///
55/// XLX node dumps can contain up to 250 entries, and each JSON node object
56/// is roughly 100-150 bytes. 65535 bytes (maximum UDP payload) is sufficient
57/// for any realistic message size.
58const MAX_DATAGRAM_SIZE: usize = 65535;
59
60/// A UDP client connected to a single XLX reflector's monitor port.
61///
62/// Manages the UDP socket lifecycle and provides async methods for receiving
63/// parsed monitor messages. Create via [`XlxMonitor::connect`] and receive
64/// events via [`XlxMonitor::recv`].
65///
66/// On drop, a best-effort `"bye"` datagram is sent to the reflector so it can
67/// clean up its client tracking state promptly.
68#[derive(Debug)]
69pub(crate) struct XlxMonitor {
70 /// The bound UDP socket, connected to the reflector's monitor port.
71 socket: UdpSocket,
72
73 /// The reflector's monitor endpoint (`ip:10001`).
74 peer: SocketAddr,
75
76 /// Reflector callsign for logging and database correlation.
77 reflector: String,
78}
79
80impl XlxMonitor {
81 /// Connects to a reflector's XLX monitor port and sends the `"hello"`
82 /// handshake.
83 ///
84 /// Binds a new UDP socket on an ephemeral port, "connects" it to the
85 /// reflector's monitor address (so that subsequent `send`/`recv` calls are
86 /// scoped to this peer), and sends the initial `"hello"` datagram.
87 ///
88 /// The reflector will respond with three JSON datagrams (reflector info,
89 /// nodes snapshot, stations snapshot) which can be read via [`recv`](Self::recv).
90 ///
91 /// # Errors
92 ///
93 /// Returns `io::Error` if socket binding or the initial send fails.
94 pub(crate) async fn connect(ip: IpAddr, reflector: String) -> Result<Self, io::Error> {
95 // Bind to an ephemeral port. Use the appropriate wildcard address for
96 // the peer's address family (IPv4 or IPv6).
97 let bind_addr: SocketAddr = if ip.is_ipv4() {
98 SocketAddr::new(IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), 0)
99 } else {
100 SocketAddr::new(IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED), 0)
101 };
102
103 let socket = UdpSocket::bind(bind_addr).await?;
104
105 let peer = SocketAddr::new(ip, XLX_MONITOR_PORT);
106 // Connect the socket to the peer so that send/recv are scoped.
107 socket.connect(peer).await?;
108
109 // Send the handshake "hello" datagram.
110 let _bytes_sent = socket.send(b"hello").await?;
111
112 tracing::debug!(
113 reflector = %reflector,
114 peer = %peer,
115 "xlx monitor connected"
116 );
117
118 Ok(Self {
119 socket,
120 peer,
121 reflector,
122 })
123 }
124
125 /// Receives the next monitor message from the reflector.
126 ///
127 /// Blocks (asynchronously) until a UDP datagram arrives or the 30-second
128 /// timeout expires. Returns:
129 ///
130 /// - `Some(message)` — a successfully parsed [`MonitorMessage`].
131 /// - `None` — the timeout expired (no data received within 30 seconds),
132 /// or the received datagram was not valid JSON and could not be parsed
133 /// at all (not even as `Unknown`).
134 ///
135 /// The caller should treat repeated `None` returns as a signal that the
136 /// reflector is unresponsive and consider reconnecting.
137 pub(crate) async fn recv(&self) -> Option<MonitorMessage> {
138 let mut buf = vec![0u8; MAX_DATAGRAM_SIZE];
139
140 // Apply the recv timeout. If no datagram arrives within the window,
141 // return None to signal potential reflector unresponsiveness.
142 let result = tokio::time::timeout(RECV_TIMEOUT, self.socket.recv(&mut buf)).await;
143
144 match result {
145 Ok(Ok(n)) => {
146 // Successfully received n bytes. Parse the JSON payload.
147 // `n` is bounded by the buffer length (recv writes at most
148 // buf.len() bytes), so get(..n) always returns Some.
149 buf.get(..n).and_then(protocol::parse)
150 }
151 Ok(Err(e)) => {
152 // UDP recv error (unlikely for connected UDP, but possible
153 // with ICMP unreachable or similar).
154 tracing::warn!(
155 reflector = %self.reflector,
156 error = %e,
157 "xlx monitor recv error"
158 );
159 None
160 }
161 Err(_elapsed) => {
162 // Timeout expired — no datagram received within RECV_TIMEOUT.
163 tracing::debug!(
164 reflector = %self.reflector,
165 timeout_secs = RECV_TIMEOUT.as_secs(),
166 "xlx monitor recv timeout"
167 );
168 None
169 }
170 }
171 }
172
173 /// Returns the peer socket address (`ip:10001`).
174 pub(crate) const fn peer(&self) -> SocketAddr {
175 self.peer
176 }
177}
178
179impl Drop for XlxMonitor {
180 /// Best-effort `"bye"` on drop.
181 ///
182 /// Uses `try_send` (non-async, non-blocking) because `Drop` cannot be
183 /// async. If the send fails (e.g., the socket is already closed or the
184 /// runtime is shutting down), the failure is silently ignored — the
185 /// reflector will eventually time out the client entry on its own.
186 fn drop(&mut self) {
187 // try_send is the synchronous equivalent for connected UDP sockets.
188 // It will fail if the send buffer is full or the socket is closed,
189 // but that's acceptable for a best-effort cleanup.
190 let _result = self.socket.try_send(b"bye");
191 }
192}