aprs_is/
client.rs

1//! Async TCP client for APRS-IS (Internet Service).
2//!
3//! Provides a [`AprsIsClient`] that connects to an APRS-IS server over
4//! TCP, authenticates, and exchanges APRS packets as line-delimited text.
5//! This is the complement to the pure-data helpers at the crate root
6//! (e.g. [`crate::parse_is_line`], [`crate::format_is_packet`],
7//! [`crate::build_login_string`]), which stay transport-agnostic.
8//!
9//! # Usage
10//!
11//! ```no_run
12//! use aprs_is::{AprsIsClient, AprsIsConfig, AprsIsEvent};
13//!
14//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
15//! let mut config = AprsIsConfig::new("N0CALL");
16//! config.filter = "r/35.25/-97.75/100".to_owned();
17//! let mut client = AprsIsClient::connect(config).await?;
18//!
19//! loop {
20//!     match client.next_event().await? {
21//!         AprsIsEvent::Packet(line) => println!("Got: {line}"),
22//!         AprsIsEvent::Comment(line) => println!("Server: {line}"),
23//!         AprsIsEvent::LoggedIn { server } => {
24//!             println!("Authenticated (server {server:?})");
25//!         }
26//!         AprsIsEvent::LoginRejected { reason } => {
27//!             println!("Login rejected: {reason}");
28//!             break;
29//!         }
30//!         AprsIsEvent::Disconnected => break,
31//!     }
32//! }
33//! # Ok(())
34//! # }
35//! ```
36//!
37//! # Keepalive
38//!
39//! APRS-IS expects a comment line every ~2 minutes if the client is
40//! otherwise idle. Call [`AprsIsClient::send_keepalive`] on a timer, or
41//! use [`AprsIsClient::maybe_send_keepalive`] which only sends if the
42//! keepalive interval has elapsed since the last write.
43//!
44//! # Reconnection
45//!
46//! On [`AprsIsEvent::Disconnected`], call [`AprsIsClient::reconnect`] to
47//! re-establish the TCP connection and re-login. Callers typically wrap
48//! this in an exponential backoff loop.
49
50use std::time::{Duration, Instant};
51
52use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
53use tokio::net::TcpStream;
54use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
55
56use crate::error::AprsIsError;
57use crate::events::AprsIsEvent;
58use crate::line::{format_is_packet, parse_is_line};
59use crate::login::{AprsIsConfig, build_login_string};
60
61/// Extract the server hostname from a `# logresp ... verified, server X`
62/// comment line. Returns `None` if the `server` clause is absent.
63fn parse_logresp_server(line: &str) -> Option<String> {
64    let idx = line.find("server ")?;
65    let rest = line.get(idx + "server ".len()..)?;
66    // Skip any extra whitespace after "server" and take the next
67    // whitespace-delimited token.
68    let name = rest
69        .split_whitespace()
70        .next()
71        .map(|s| s.trim_matches(',').to_owned())?;
72    if name.is_empty() { None } else { Some(name) }
73}
74
75/// Default APRS-IS keepalive interval.
76///
77/// APRS-IS servers expect the client to send something (a packet or a
78/// comment line) at least every 2 minutes or they may disconnect.
79pub const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(120);
80
81/// Default connect timeout for the initial TCP handshake + login.
82pub const CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
83
84/// Keepalive comment text (sent as `# aprs-is keepalive\r\n`).
85const KEEPALIVE_COMMENT: &str = "# aprs-is keepalive";
86
87/// Async TCP client for APRS-IS.
88///
89/// Owns a single TCP connection to an APRS-IS server, handles the login
90/// handshake, and exposes line-at-a-time read/write methods.
91///
92/// Not `Clone` and not `Send`-across-the-await — typical usage is to own
93/// it from a single task.
94///
95/// # TLS support
96///
97/// This client speaks plaintext TCP only. APRS-IS T2 servers also
98/// support TLS on port 24580 — to use it, build the connection
99/// yourself with your preferred TLS library (e.g. `tokio-rustls` or
100/// `tokio-native-tls`) and use the line-level helpers at the crate
101/// root ([`crate::build_login_string`], [`crate::format_is_packet`],
102/// [`crate::AprsIsLine`]):
103///
104/// ```no_run
105/// use aprs_is::{AprsIsConfig, AprsIsLine, build_login_string, format_is_packet};
106/// // 1. TLS handshake against `core.aprs2.net:24580` using your TLS library.
107/// // 2. Send the result of `build_login_string(&config)` over the stream.
108/// // 3. Read lines from the stream and parse them with `AprsIsLine::parse`.
109/// // 4. Send packets formatted via `format_is_packet`.
110/// # let _ = (AprsIsConfig::new("N0CALL"), build_login_string, format_is_packet,
111/// #     |line: &str| AprsIsLine::parse(line));
112/// ```
113///
114/// The library deliberately does not bundle a TLS implementation so
115/// callers can choose their preferred backend.
116#[derive(Debug)]
117pub struct AprsIsClient {
118    config: AprsIsConfig,
119    reader: BufReader<OwnedReadHalf>,
120    writer: OwnedWriteHalf,
121    line_buf: String,
122    last_write: Instant,
123    logged_in_emitted: bool,
124}
125
126impl AprsIsClient {
127    /// Connect to the APRS-IS server and perform the login handshake.
128    ///
129    /// Performs TCP connect, sends the login string, and returns as soon
130    /// as the socket is writable. Login verification (the `# logresp`
131    /// line) is reported asynchronously via [`AprsIsEvent::LoggedIn`]
132    /// from [`next_event`](Self::next_event).
133    ///
134    /// Times out after [`CONNECT_TIMEOUT`] (10 seconds) during TCP connect.
135    ///
136    /// # Errors
137    ///
138    /// Returns [`AprsIsError::Connect`] if TCP connect fails or times out,
139    /// or [`AprsIsError::Write`] if the login string cannot be sent.
140    pub async fn connect(config: AprsIsConfig) -> Result<Self, AprsIsError> {
141        let addr = format!("{}:{}", config.server, config.port);
142        tracing::info!(server = %addr, callsign = %config.callsign, "APRS-IS connecting");
143
144        let stream = tokio::time::timeout(CONNECT_TIMEOUT, TcpStream::connect(&addr))
145            .await
146            .map_err(|_| {
147                AprsIsError::Connect(std::io::Error::new(
148                    std::io::ErrorKind::TimedOut,
149                    "TCP connect timed out",
150                ))
151            })?
152            .map_err(AprsIsError::Connect)?;
153
154        let (read_half, mut write_half) = stream.into_split();
155
156        // Send login string.
157        let login = build_login_string(&config);
158        write_half
159            .write_all(login.as_bytes())
160            .await
161            .map_err(AprsIsError::Write)?;
162        write_half.flush().await.map_err(AprsIsError::Write)?;
163
164        tracing::debug!("APRS-IS login sent");
165
166        Ok(Self {
167            config,
168            reader: BufReader::new(read_half),
169            writer: write_half,
170            line_buf: String::with_capacity(512),
171            last_write: Instant::now(),
172            logged_in_emitted: false,
173        })
174    }
175
176    /// Connect with exponential backoff.
177    ///
178    /// Retries the TCP connection up to `max_attempts` times, doubling
179    /// the delay from 1 second up to a cap of 60 seconds between attempts.
180    /// Pass `None` for `max_attempts` to retry forever.
181    ///
182    /// # Errors
183    ///
184    /// Returns the last [`AprsIsError`] after exhausting all attempts.
185    pub async fn connect_with_retry(
186        config: AprsIsConfig,
187        max_attempts: Option<u32>,
188    ) -> Result<Self, AprsIsError> {
189        let mut delay = Duration::from_secs(1);
190        let mut attempt: u32 = 0;
191        loop {
192            attempt += 1;
193            match Self::connect(config.clone()).await {
194                Ok(client) => return Ok(client),
195                Err(e) => {
196                    if max_attempts.is_some_and(|max| attempt >= max) {
197                        return Err(e);
198                    }
199                    tracing::warn!(
200                        attempt,
201                        error = %e,
202                        retry_in_secs = delay.as_secs(),
203                        "APRS-IS connect failed, retrying"
204                    );
205                    tokio::time::sleep(delay).await;
206                    delay = (delay * 2).min(Duration::from_secs(60));
207                }
208            }
209        }
210    }
211
212    /// Reconnect to the APRS-IS server after a disconnect.
213    ///
214    /// Drops the current connection (if any) and performs a fresh
215    /// connect + login. Preserves the configuration.
216    ///
217    /// # Errors
218    ///
219    /// Returns [`AprsIsError::Connect`] if the TCP connect fails or
220    /// [`AprsIsError::Write`] if the login string cannot be sent.
221    pub async fn reconnect(&mut self) -> Result<(), AprsIsError> {
222        tracing::info!("APRS-IS reconnecting");
223        let new = Self::connect(self.config.clone()).await?;
224        self.reader = new.reader;
225        self.writer = new.writer;
226        self.line_buf.clear();
227        self.last_write = new.last_write;
228        self.logged_in_emitted = false;
229        Ok(())
230    }
231
232    /// Read the next event from the server.
233    ///
234    /// Returns when a complete line arrives or the connection closes.
235    /// This is a blocking read — wrap in a `tokio::select!` with a
236    /// keepalive timer if you need concurrency.
237    ///
238    /// # Errors
239    ///
240    /// Returns [`AprsIsError::Read`] on socket errors.
241    pub async fn next_event(&mut self) -> Result<AprsIsEvent, AprsIsError> {
242        self.line_buf.clear();
243        let bytes = self
244            .reader
245            .read_line(&mut self.line_buf)
246            .await
247            .map_err(AprsIsError::Read)?;
248
249        if bytes == 0 {
250            tracing::info!("APRS-IS connection closed by server");
251            return Ok(AprsIsEvent::Disconnected);
252        }
253
254        let line = self.line_buf.trim_end_matches(['\r', '\n']);
255
256        if let Some(packet) = parse_is_line(line) {
257            return Ok(AprsIsEvent::Packet(packet.to_owned()));
258        }
259
260        // Comment line. Check for login response on first one.
261        if !self.logged_in_emitted && line.contains("logresp") {
262            // The verified response has the form
263            //   "# logresp CALL verified, server T2FOO"
264            // and the rejected response has
265            //   "# logresp CALL unverified, ..."
266            // We have to check `unverified` before `verified` because the
267            // latter is a substring of the former.
268            if line.contains("unverified") {
269                self.logged_in_emitted = true;
270                tracing::warn!(response = %line, "APRS-IS login rejected");
271                return Ok(AprsIsEvent::LoginRejected {
272                    reason: line.to_owned(),
273                });
274            }
275            if line.contains("verified") {
276                self.logged_in_emitted = true;
277                let server = parse_logresp_server(line);
278                tracing::info!(response = %line, ?server, "APRS-IS login verified");
279                return Ok(AprsIsEvent::LoggedIn { server });
280            }
281        }
282
283        Ok(AprsIsEvent::Comment(line.to_owned()))
284    }
285
286    /// Send a formatted APRS packet to the server.
287    ///
288    /// The packet is formatted as `source>destination,path:data\r\n` via
289    /// [`crate::format_is_packet`] and written to the TCP socket.
290    ///
291    /// # Errors
292    ///
293    /// Returns [`AprsIsError::Write`] if the write fails.
294    pub async fn send_packet(
295        &mut self,
296        source: &str,
297        destination: &str,
298        path: &[&str],
299        data: &str,
300    ) -> Result<(), AprsIsError> {
301        let line = format_is_packet(source, destination, path, data);
302        self.send_raw_line(&line).await
303    }
304
305    /// Send a raw line to the server (must already be CRLF-terminated).
306    ///
307    /// Use this for custom formatting or to forward packets from RF.
308    ///
309    /// # Errors
310    ///
311    /// Returns [`AprsIsError::Write`] if the write fails.
312    pub async fn send_raw_line(&mut self, line: &str) -> Result<(), AprsIsError> {
313        self.writer
314            .write_all(line.as_bytes())
315            .await
316            .map_err(AprsIsError::Write)?;
317        self.writer.flush().await.map_err(AprsIsError::Write)?;
318        self.last_write = Instant::now();
319        Ok(())
320    }
321
322    /// Send a keepalive comment line unconditionally.
323    ///
324    /// Sends `# aprs-is keepalive\r\n` to the server. Call this
325    /// on a timer or use [`maybe_send_keepalive`](Self::maybe_send_keepalive)
326    /// to only send if the interval has elapsed.
327    ///
328    /// # Errors
329    ///
330    /// Returns [`AprsIsError::Write`] if the write fails.
331    pub async fn send_keepalive(&mut self) -> Result<(), AprsIsError> {
332        self.send_raw_line(&format!("{KEEPALIVE_COMMENT}\r\n"))
333            .await
334    }
335
336    /// Send a keepalive if the keepalive interval has elapsed.
337    ///
338    /// No-op if less than [`KEEPALIVE_INTERVAL`] has passed since the
339    /// last write of any kind (keepalive or packet).
340    ///
341    /// # Errors
342    ///
343    /// Returns [`AprsIsError::Write`] if the write fails.
344    pub async fn maybe_send_keepalive(&mut self) -> Result<(), AprsIsError> {
345        if self.last_write.elapsed() >= KEEPALIVE_INTERVAL {
346            self.send_keepalive().await?;
347        }
348        Ok(())
349    }
350
351    /// Get the configuration this client was created with.
352    #[must_use]
353    pub const fn config(&self) -> &AprsIsConfig {
354        &self.config
355    }
356
357    /// Gracefully shut down the TCP connection.
358    ///
359    /// # Errors
360    ///
361    /// Returns [`AprsIsError::Write`] if the shutdown flush fails.
362    pub async fn shutdown(mut self) -> Result<(), AprsIsError> {
363        tracing::debug!("APRS-IS shutting down");
364        self.writer.shutdown().await.map_err(AprsIsError::Write)?;
365        Ok(())
366    }
367}
368
369#[cfg(test)]
370mod tests {
371    use super::*;
372    use crate::login::Passcode;
373    use std::future::Future;
374    use tokio::io::AsyncReadExt as _;
375    use tokio::net::TcpListener;
376
377    type TestResult = Result<(), Box<dyn std::error::Error>>;
378
379    /// Read up to `buf.len()` bytes from `stream`. Returns the number of
380    /// bytes read, or panics via `assert!` in the test handler on I/O
381    /// error — the handler is spawned on a tokio task and must not
382    /// leak an `?` beyond the `async move` body.
383    async fn read_some(stream: &mut TcpStream, buf: &mut [u8]) -> Option<usize> {
384        stream.read(buf).await.ok().filter(|n| *n > 0)
385    }
386
387    /// Write all of `data` to `stream`; swallow any I/O error since the
388    /// test will fail separately if the client doesn't see the line.
389    async fn write_all_ignore(stream: &mut TcpStream, data: &[u8]) {
390        if let Err(err) = stream.write_all(data).await {
391            tracing::debug!(%err, "mock server write_all error");
392        }
393    }
394
395    /// Spawn a mock APRS-IS server that accepts one connection, reads
396    /// the login line, and runs the given handler.
397    ///
398    /// Returns the bound `SocketAddr` so tests can connect to it.
399    async fn spawn_mock_server<F, Fut>(handler: F) -> Result<std::net::SocketAddr, std::io::Error>
400    where
401        F: FnOnce(TcpStream) -> Fut + Send + 'static,
402        Fut: Future<Output = ()> + Send,
403    {
404        let listener = TcpListener::bind("127.0.0.1:0").await?;
405        let addr = listener.local_addr()?;
406        drop(tokio::spawn(async move {
407            if let Ok((stream, _)) = listener.accept().await {
408                handler(stream).await;
409            }
410        }));
411        Ok(addr)
412    }
413
414    fn test_config(addr: std::net::SocketAddr) -> AprsIsConfig {
415        AprsIsConfig {
416            callsign: "N0CALL".to_owned(),
417            passcode: Passcode::ReceiveOnly,
418            server: addr.ip().to_string(),
419            port: addr.port(),
420            filter: String::new(),
421            software_name: "test".to_owned(),
422            software_version: "0.1".to_owned(),
423        }
424    }
425
426    #[tokio::test]
427    async fn connect_sends_login_string() -> TestResult {
428        let addr = spawn_mock_server(|mut stream| async move {
429            let mut buf = [0u8; 512];
430            let Some(n) = read_some(&mut stream, &mut buf).await else {
431                return;
432            };
433            let Ok(login) = std::str::from_utf8(buf.get(..n).unwrap_or(&[])) else {
434                return;
435            };
436            assert!(
437                login.starts_with("user N0CALL pass -1 vers test 0.1"),
438                "unexpected login: {login:?}"
439            );
440            assert!(login.ends_with("\r\n"), "missing CRLF: {login:?}");
441            tokio::time::sleep(Duration::from_millis(50)).await;
442        })
443        .await?;
444
445        let _client = AprsIsClient::connect(test_config(addr)).await?;
446        Ok(())
447    }
448
449    #[tokio::test]
450    async fn next_event_receives_packet_line() -> TestResult {
451        let addr = spawn_mock_server(|mut stream| async move {
452            let mut buf = [0u8; 512];
453            let _ = read_some(&mut stream, &mut buf).await;
454            write_all_ignore(&mut stream, b"N0CALL>APK005:!4903.50N/07201.75W-Test\r\n").await;
455            tokio::time::sleep(Duration::from_millis(50)).await;
456        })
457        .await?;
458
459        let mut client = AprsIsClient::connect(test_config(addr)).await?;
460        let event = client.next_event().await?;
461        assert!(
462            matches!(event, AprsIsEvent::Packet(ref line) if line == "N0CALL>APK005:!4903.50N/07201.75W-Test"),
463            "expected Packet, got {event:?}"
464        );
465        Ok(())
466    }
467
468    #[tokio::test]
469    async fn next_event_receives_comment_line() -> TestResult {
470        let addr = spawn_mock_server(|mut stream| async move {
471            let mut buf = [0u8; 512];
472            let _ = read_some(&mut stream, &mut buf).await;
473            write_all_ignore(&mut stream, b"# javAPRSSrvr 4.2.0b05\r\n").await;
474            tokio::time::sleep(Duration::from_millis(50)).await;
475        })
476        .await?;
477
478        let mut client = AprsIsClient::connect(test_config(addr)).await?;
479        let event = client.next_event().await?;
480        assert!(
481            matches!(event, AprsIsEvent::Comment(ref line) if line == "# javAPRSSrvr 4.2.0b05"),
482            "expected Comment, got {event:?}"
483        );
484        Ok(())
485    }
486
487    #[tokio::test]
488    async fn next_event_detects_login_verified() -> TestResult {
489        let addr = spawn_mock_server(|mut stream| async move {
490            let mut buf = [0u8; 512];
491            let _ = read_some(&mut stream, &mut buf).await;
492            write_all_ignore(&mut stream, b"# logresp N0CALL verified, server T2TEST\r\n").await;
493            tokio::time::sleep(Duration::from_millis(50)).await;
494        })
495        .await?;
496
497        let mut client = AprsIsClient::connect(test_config(addr)).await?;
498        let event = client.next_event().await?;
499        assert!(
500            matches!(event, AprsIsEvent::LoggedIn { ref server } if server.as_deref() == Some("T2TEST")),
501            "expected LoggedIn, got {event:?}"
502        );
503        Ok(())
504    }
505
506    #[tokio::test]
507    async fn next_event_detects_login_rejected() -> TestResult {
508        let addr = spawn_mock_server(|mut stream| async move {
509            let mut buf = [0u8; 512];
510            let _ = read_some(&mut stream, &mut buf).await;
511            write_all_ignore(
512                &mut stream,
513                b"# logresp N0CALL unverified, server T2TEST\r\n",
514            )
515            .await;
516            tokio::time::sleep(Duration::from_millis(50)).await;
517        })
518        .await?;
519
520        let mut client = AprsIsClient::connect(test_config(addr)).await?;
521        let event = client.next_event().await?;
522        assert!(
523            matches!(event, AprsIsEvent::LoginRejected { ref reason } if reason.contains("unverified")),
524            "expected LoginRejected, got {event:?}"
525        );
526        Ok(())
527    }
528
529    #[test]
530    fn parse_logresp_server_extracts_name() {
531        assert_eq!(
532            parse_logresp_server("# logresp N0CALL verified, server T2TEST"),
533            Some("T2TEST".to_owned())
534        );
535        assert_eq!(
536            parse_logresp_server("# logresp N0CALL verified, server  T2A "),
537            Some("T2A".to_owned())
538        );
539        assert_eq!(parse_logresp_server("# javAPRSSrvr 4.2.0b05"), None);
540    }
541
542    #[tokio::test]
543    async fn next_event_detects_disconnect() -> TestResult {
544        let addr = spawn_mock_server(|mut stream| async move {
545            let mut buf = [0u8; 512];
546            let _ = read_some(&mut stream, &mut buf).await;
547            drop(stream);
548        })
549        .await?;
550
551        let mut client = AprsIsClient::connect(test_config(addr)).await?;
552        let event = client.next_event().await?;
553        assert!(
554            matches!(event, AprsIsEvent::Disconnected),
555            "expected Disconnected, got {event:?}"
556        );
557        Ok(())
558    }
559
560    #[tokio::test]
561    async fn send_packet_formats_line() -> TestResult {
562        let addr = spawn_mock_server(|mut stream| async move {
563            let mut buf = [0u8; 1024];
564            let Some(n) = read_some(&mut stream, &mut buf).await else {
565                return;
566            };
567            let Ok(text) = std::str::from_utf8(buf.get(..n).unwrap_or(&[])) else {
568                return;
569            };
570            assert!(text.contains("user N0CALL"), "login missing: {text:?}");
571            let Some(n) = read_some(&mut stream, &mut buf).await else {
572                return;
573            };
574            let Ok(pkt) = std::str::from_utf8(buf.get(..n).unwrap_or(&[])) else {
575                return;
576            };
577            assert_eq!(
578                pkt, "N0CALL>APK005,WIDE1-1:!4903.50N/07201.75W-Test\r\n",
579                "unexpected packet: {pkt:?}"
580            );
581        })
582        .await?;
583
584        let mut client = AprsIsClient::connect(test_config(addr)).await?;
585        client
586            .send_packet("N0CALL", "APK005", &["WIDE1-1"], "!4903.50N/07201.75W-Test")
587            .await?;
588        tokio::time::sleep(Duration::from_millis(50)).await;
589        Ok(())
590    }
591
592    #[tokio::test]
593    async fn send_keepalive_sends_comment_line() -> TestResult {
594        let addr = spawn_mock_server(|mut stream| async move {
595            let mut buf = [0u8; 1024];
596            let _ = read_some(&mut stream, &mut buf).await;
597            let Some(n) = read_some(&mut stream, &mut buf).await else {
598                return;
599            };
600            let Ok(ka) = std::str::from_utf8(buf.get(..n).unwrap_or(&[])) else {
601                return;
602            };
603            assert!(
604                ka.starts_with("# aprs-is keepalive"),
605                "unexpected keepalive: {ka:?}"
606            );
607            assert!(ka.ends_with("\r\n"), "missing CRLF: {ka:?}");
608        })
609        .await?;
610
611        let mut client = AprsIsClient::connect(test_config(addr)).await?;
612        client.send_keepalive().await?;
613        tokio::time::sleep(Duration::from_millis(50)).await;
614        Ok(())
615    }
616
617    #[tokio::test]
618    async fn maybe_send_keepalive_noop_when_recent() -> TestResult {
619        let addr = spawn_mock_server(|mut stream| async move {
620            let mut buf = [0u8; 1024];
621            let _ = read_some(&mut stream, &mut buf).await;
622            tokio::time::sleep(Duration::from_millis(50)).await;
623        })
624        .await?;
625
626        let mut client = AprsIsClient::connect(test_config(addr)).await?;
627        // Called immediately after connect — last_write is fresh, no send.
628        client.maybe_send_keepalive().await?;
629        Ok(())
630    }
631
632    #[tokio::test]
633    async fn connect_timeout() -> TestResult {
634        // Connect to a non-routable IP to trigger timeout.
635        // Using 198.51.100.1 (TEST-NET-2) which should not respond.
636        let config = AprsIsConfig {
637            callsign: "N0CALL".to_owned(),
638            passcode: Passcode::ReceiveOnly,
639            server: "198.51.100.1".to_owned(),
640            port: 14580,
641            filter: String::new(),
642            software_name: "test".to_owned(),
643            software_version: "0.1".to_owned(),
644        };
645        // Override the timeout for the test — we don't want to wait 10s.
646        // Instead, verify the error path exists by checking connect_with_retry
647        // returns an error with max_attempts=1.
648        let result = tokio::time::timeout(
649            Duration::from_secs(15),
650            AprsIsClient::connect_with_retry(config, Some(1)),
651        )
652        .await;
653        // Either the overall test timeout fires, or the connect fails.
654        // Both are acceptable as long as we don't hang.
655        if let Ok(r) = result {
656            assert!(r.is_err(), "expected connect to fail, got Ok");
657        }
658        Ok(())
659    }
660}