1use std::time::{Duration, Instant};
51
52use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
53use tokio::net::TcpStream;
54use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
55
56use crate::error::AprsIsError;
57use crate::events::AprsIsEvent;
58use crate::line::{format_is_packet, parse_is_line};
59use crate::login::{AprsIsConfig, build_login_string};
60
61fn parse_logresp_server(line: &str) -> Option<String> {
64 let idx = line.find("server ")?;
65 let rest = line.get(idx + "server ".len()..)?;
66 let name = rest
69 .split_whitespace()
70 .next()
71 .map(|s| s.trim_matches(',').to_owned())?;
72 if name.is_empty() { None } else { Some(name) }
73}
74
75pub const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(120);
80
81pub const CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
83
84const KEEPALIVE_COMMENT: &str = "# aprs-is keepalive";
86
87#[derive(Debug)]
117pub struct AprsIsClient {
118 config: AprsIsConfig,
119 reader: BufReader<OwnedReadHalf>,
120 writer: OwnedWriteHalf,
121 line_buf: String,
122 last_write: Instant,
123 logged_in_emitted: bool,
124}
125
126impl AprsIsClient {
127 pub async fn connect(config: AprsIsConfig) -> Result<Self, AprsIsError> {
141 let addr = format!("{}:{}", config.server, config.port);
142 tracing::info!(server = %addr, callsign = %config.callsign, "APRS-IS connecting");
143
144 let stream = tokio::time::timeout(CONNECT_TIMEOUT, TcpStream::connect(&addr))
145 .await
146 .map_err(|_| {
147 AprsIsError::Connect(std::io::Error::new(
148 std::io::ErrorKind::TimedOut,
149 "TCP connect timed out",
150 ))
151 })?
152 .map_err(AprsIsError::Connect)?;
153
154 let (read_half, mut write_half) = stream.into_split();
155
156 let login = build_login_string(&config);
158 write_half
159 .write_all(login.as_bytes())
160 .await
161 .map_err(AprsIsError::Write)?;
162 write_half.flush().await.map_err(AprsIsError::Write)?;
163
164 tracing::debug!("APRS-IS login sent");
165
166 Ok(Self {
167 config,
168 reader: BufReader::new(read_half),
169 writer: write_half,
170 line_buf: String::with_capacity(512),
171 last_write: Instant::now(),
172 logged_in_emitted: false,
173 })
174 }
175
176 pub async fn connect_with_retry(
186 config: AprsIsConfig,
187 max_attempts: Option<u32>,
188 ) -> Result<Self, AprsIsError> {
189 let mut delay = Duration::from_secs(1);
190 let mut attempt: u32 = 0;
191 loop {
192 attempt += 1;
193 match Self::connect(config.clone()).await {
194 Ok(client) => return Ok(client),
195 Err(e) => {
196 if max_attempts.is_some_and(|max| attempt >= max) {
197 return Err(e);
198 }
199 tracing::warn!(
200 attempt,
201 error = %e,
202 retry_in_secs = delay.as_secs(),
203 "APRS-IS connect failed, retrying"
204 );
205 tokio::time::sleep(delay).await;
206 delay = (delay * 2).min(Duration::from_secs(60));
207 }
208 }
209 }
210 }
211
212 pub async fn reconnect(&mut self) -> Result<(), AprsIsError> {
222 tracing::info!("APRS-IS reconnecting");
223 let new = Self::connect(self.config.clone()).await?;
224 self.reader = new.reader;
225 self.writer = new.writer;
226 self.line_buf.clear();
227 self.last_write = new.last_write;
228 self.logged_in_emitted = false;
229 Ok(())
230 }
231
232 pub async fn next_event(&mut self) -> Result<AprsIsEvent, AprsIsError> {
242 self.line_buf.clear();
243 let bytes = self
244 .reader
245 .read_line(&mut self.line_buf)
246 .await
247 .map_err(AprsIsError::Read)?;
248
249 if bytes == 0 {
250 tracing::info!("APRS-IS connection closed by server");
251 return Ok(AprsIsEvent::Disconnected);
252 }
253
254 let line = self.line_buf.trim_end_matches(['\r', '\n']);
255
256 if let Some(packet) = parse_is_line(line) {
257 return Ok(AprsIsEvent::Packet(packet.to_owned()));
258 }
259
260 if !self.logged_in_emitted && line.contains("logresp") {
262 if line.contains("unverified") {
269 self.logged_in_emitted = true;
270 tracing::warn!(response = %line, "APRS-IS login rejected");
271 return Ok(AprsIsEvent::LoginRejected {
272 reason: line.to_owned(),
273 });
274 }
275 if line.contains("verified") {
276 self.logged_in_emitted = true;
277 let server = parse_logresp_server(line);
278 tracing::info!(response = %line, ?server, "APRS-IS login verified");
279 return Ok(AprsIsEvent::LoggedIn { server });
280 }
281 }
282
283 Ok(AprsIsEvent::Comment(line.to_owned()))
284 }
285
286 pub async fn send_packet(
295 &mut self,
296 source: &str,
297 destination: &str,
298 path: &[&str],
299 data: &str,
300 ) -> Result<(), AprsIsError> {
301 let line = format_is_packet(source, destination, path, data);
302 self.send_raw_line(&line).await
303 }
304
305 pub async fn send_raw_line(&mut self, line: &str) -> Result<(), AprsIsError> {
313 self.writer
314 .write_all(line.as_bytes())
315 .await
316 .map_err(AprsIsError::Write)?;
317 self.writer.flush().await.map_err(AprsIsError::Write)?;
318 self.last_write = Instant::now();
319 Ok(())
320 }
321
322 pub async fn send_keepalive(&mut self) -> Result<(), AprsIsError> {
332 self.send_raw_line(&format!("{KEEPALIVE_COMMENT}\r\n"))
333 .await
334 }
335
336 pub async fn maybe_send_keepalive(&mut self) -> Result<(), AprsIsError> {
345 if self.last_write.elapsed() >= KEEPALIVE_INTERVAL {
346 self.send_keepalive().await?;
347 }
348 Ok(())
349 }
350
351 #[must_use]
353 pub const fn config(&self) -> &AprsIsConfig {
354 &self.config
355 }
356
357 pub async fn shutdown(mut self) -> Result<(), AprsIsError> {
363 tracing::debug!("APRS-IS shutting down");
364 self.writer.shutdown().await.map_err(AprsIsError::Write)?;
365 Ok(())
366 }
367}
368
369#[cfg(test)]
370mod tests {
371 use super::*;
372 use crate::login::Passcode;
373 use std::future::Future;
374 use tokio::io::AsyncReadExt as _;
375 use tokio::net::TcpListener;
376
377 type TestResult = Result<(), Box<dyn std::error::Error>>;
378
379 async fn read_some(stream: &mut TcpStream, buf: &mut [u8]) -> Option<usize> {
384 stream.read(buf).await.ok().filter(|n| *n > 0)
385 }
386
387 async fn write_all_ignore(stream: &mut TcpStream, data: &[u8]) {
390 if let Err(err) = stream.write_all(data).await {
391 tracing::debug!(%err, "mock server write_all error");
392 }
393 }
394
395 async fn spawn_mock_server<F, Fut>(handler: F) -> Result<std::net::SocketAddr, std::io::Error>
400 where
401 F: FnOnce(TcpStream) -> Fut + Send + 'static,
402 Fut: Future<Output = ()> + Send,
403 {
404 let listener = TcpListener::bind("127.0.0.1:0").await?;
405 let addr = listener.local_addr()?;
406 drop(tokio::spawn(async move {
407 if let Ok((stream, _)) = listener.accept().await {
408 handler(stream).await;
409 }
410 }));
411 Ok(addr)
412 }
413
414 fn test_config(addr: std::net::SocketAddr) -> AprsIsConfig {
415 AprsIsConfig {
416 callsign: "N0CALL".to_owned(),
417 passcode: Passcode::ReceiveOnly,
418 server: addr.ip().to_string(),
419 port: addr.port(),
420 filter: String::new(),
421 software_name: "test".to_owned(),
422 software_version: "0.1".to_owned(),
423 }
424 }
425
426 #[tokio::test]
427 async fn connect_sends_login_string() -> TestResult {
428 let addr = spawn_mock_server(|mut stream| async move {
429 let mut buf = [0u8; 512];
430 let Some(n) = read_some(&mut stream, &mut buf).await else {
431 return;
432 };
433 let Ok(login) = std::str::from_utf8(buf.get(..n).unwrap_or(&[])) else {
434 return;
435 };
436 assert!(
437 login.starts_with("user N0CALL pass -1 vers test 0.1"),
438 "unexpected login: {login:?}"
439 );
440 assert!(login.ends_with("\r\n"), "missing CRLF: {login:?}");
441 tokio::time::sleep(Duration::from_millis(50)).await;
442 })
443 .await?;
444
445 let _client = AprsIsClient::connect(test_config(addr)).await?;
446 Ok(())
447 }
448
449 #[tokio::test]
450 async fn next_event_receives_packet_line() -> TestResult {
451 let addr = spawn_mock_server(|mut stream| async move {
452 let mut buf = [0u8; 512];
453 let _ = read_some(&mut stream, &mut buf).await;
454 write_all_ignore(&mut stream, b"N0CALL>APK005:!4903.50N/07201.75W-Test\r\n").await;
455 tokio::time::sleep(Duration::from_millis(50)).await;
456 })
457 .await?;
458
459 let mut client = AprsIsClient::connect(test_config(addr)).await?;
460 let event = client.next_event().await?;
461 assert!(
462 matches!(event, AprsIsEvent::Packet(ref line) if line == "N0CALL>APK005:!4903.50N/07201.75W-Test"),
463 "expected Packet, got {event:?}"
464 );
465 Ok(())
466 }
467
468 #[tokio::test]
469 async fn next_event_receives_comment_line() -> TestResult {
470 let addr = spawn_mock_server(|mut stream| async move {
471 let mut buf = [0u8; 512];
472 let _ = read_some(&mut stream, &mut buf).await;
473 write_all_ignore(&mut stream, b"# javAPRSSrvr 4.2.0b05\r\n").await;
474 tokio::time::sleep(Duration::from_millis(50)).await;
475 })
476 .await?;
477
478 let mut client = AprsIsClient::connect(test_config(addr)).await?;
479 let event = client.next_event().await?;
480 assert!(
481 matches!(event, AprsIsEvent::Comment(ref line) if line == "# javAPRSSrvr 4.2.0b05"),
482 "expected Comment, got {event:?}"
483 );
484 Ok(())
485 }
486
487 #[tokio::test]
488 async fn next_event_detects_login_verified() -> TestResult {
489 let addr = spawn_mock_server(|mut stream| async move {
490 let mut buf = [0u8; 512];
491 let _ = read_some(&mut stream, &mut buf).await;
492 write_all_ignore(&mut stream, b"# logresp N0CALL verified, server T2TEST\r\n").await;
493 tokio::time::sleep(Duration::from_millis(50)).await;
494 })
495 .await?;
496
497 let mut client = AprsIsClient::connect(test_config(addr)).await?;
498 let event = client.next_event().await?;
499 assert!(
500 matches!(event, AprsIsEvent::LoggedIn { ref server } if server.as_deref() == Some("T2TEST")),
501 "expected LoggedIn, got {event:?}"
502 );
503 Ok(())
504 }
505
506 #[tokio::test]
507 async fn next_event_detects_login_rejected() -> TestResult {
508 let addr = spawn_mock_server(|mut stream| async move {
509 let mut buf = [0u8; 512];
510 let _ = read_some(&mut stream, &mut buf).await;
511 write_all_ignore(
512 &mut stream,
513 b"# logresp N0CALL unverified, server T2TEST\r\n",
514 )
515 .await;
516 tokio::time::sleep(Duration::from_millis(50)).await;
517 })
518 .await?;
519
520 let mut client = AprsIsClient::connect(test_config(addr)).await?;
521 let event = client.next_event().await?;
522 assert!(
523 matches!(event, AprsIsEvent::LoginRejected { ref reason } if reason.contains("unverified")),
524 "expected LoginRejected, got {event:?}"
525 );
526 Ok(())
527 }
528
529 #[test]
530 fn parse_logresp_server_extracts_name() {
531 assert_eq!(
532 parse_logresp_server("# logresp N0CALL verified, server T2TEST"),
533 Some("T2TEST".to_owned())
534 );
535 assert_eq!(
536 parse_logresp_server("# logresp N0CALL verified, server T2A "),
537 Some("T2A".to_owned())
538 );
539 assert_eq!(parse_logresp_server("# javAPRSSrvr 4.2.0b05"), None);
540 }
541
542 #[tokio::test]
543 async fn next_event_detects_disconnect() -> TestResult {
544 let addr = spawn_mock_server(|mut stream| async move {
545 let mut buf = [0u8; 512];
546 let _ = read_some(&mut stream, &mut buf).await;
547 drop(stream);
548 })
549 .await?;
550
551 let mut client = AprsIsClient::connect(test_config(addr)).await?;
552 let event = client.next_event().await?;
553 assert!(
554 matches!(event, AprsIsEvent::Disconnected),
555 "expected Disconnected, got {event:?}"
556 );
557 Ok(())
558 }
559
560 #[tokio::test]
561 async fn send_packet_formats_line() -> TestResult {
562 let addr = spawn_mock_server(|mut stream| async move {
563 let mut buf = [0u8; 1024];
564 let Some(n) = read_some(&mut stream, &mut buf).await else {
565 return;
566 };
567 let Ok(text) = std::str::from_utf8(buf.get(..n).unwrap_or(&[])) else {
568 return;
569 };
570 assert!(text.contains("user N0CALL"), "login missing: {text:?}");
571 let Some(n) = read_some(&mut stream, &mut buf).await else {
572 return;
573 };
574 let Ok(pkt) = std::str::from_utf8(buf.get(..n).unwrap_or(&[])) else {
575 return;
576 };
577 assert_eq!(
578 pkt, "N0CALL>APK005,WIDE1-1:!4903.50N/07201.75W-Test\r\n",
579 "unexpected packet: {pkt:?}"
580 );
581 })
582 .await?;
583
584 let mut client = AprsIsClient::connect(test_config(addr)).await?;
585 client
586 .send_packet("N0CALL", "APK005", &["WIDE1-1"], "!4903.50N/07201.75W-Test")
587 .await?;
588 tokio::time::sleep(Duration::from_millis(50)).await;
589 Ok(())
590 }
591
592 #[tokio::test]
593 async fn send_keepalive_sends_comment_line() -> TestResult {
594 let addr = spawn_mock_server(|mut stream| async move {
595 let mut buf = [0u8; 1024];
596 let _ = read_some(&mut stream, &mut buf).await;
597 let Some(n) = read_some(&mut stream, &mut buf).await else {
598 return;
599 };
600 let Ok(ka) = std::str::from_utf8(buf.get(..n).unwrap_or(&[])) else {
601 return;
602 };
603 assert!(
604 ka.starts_with("# aprs-is keepalive"),
605 "unexpected keepalive: {ka:?}"
606 );
607 assert!(ka.ends_with("\r\n"), "missing CRLF: {ka:?}");
608 })
609 .await?;
610
611 let mut client = AprsIsClient::connect(test_config(addr)).await?;
612 client.send_keepalive().await?;
613 tokio::time::sleep(Duration::from_millis(50)).await;
614 Ok(())
615 }
616
617 #[tokio::test]
618 async fn maybe_send_keepalive_noop_when_recent() -> TestResult {
619 let addr = spawn_mock_server(|mut stream| async move {
620 let mut buf = [0u8; 1024];
621 let _ = read_some(&mut stream, &mut buf).await;
622 tokio::time::sleep(Duration::from_millis(50)).await;
623 })
624 .await?;
625
626 let mut client = AprsIsClient::connect(test_config(addr)).await?;
627 client.maybe_send_keepalive().await?;
629 Ok(())
630 }
631
632 #[tokio::test]
633 async fn connect_timeout() -> TestResult {
634 let config = AprsIsConfig {
637 callsign: "N0CALL".to_owned(),
638 passcode: Passcode::ReceiveOnly,
639 server: "198.51.100.1".to_owned(),
640 port: 14580,
641 filter: String::new(),
642 software_name: "test".to_owned(),
643 software_version: "0.1".to_owned(),
644 };
645 let result = tokio::time::timeout(
649 Duration::from_secs(15),
650 AprsIsClient::connect_with_retry(config, Some(1)),
651 )
652 .await;
653 if let Ok(r) = result {
656 assert!(r.is_err(), "expected connect to fail, got Ok");
657 }
658 Ok(())
659 }
660}