aprs/
messenger.rs

1//! APRS message ack/retry manager.
2//!
3//! Provides reliable APRS messaging with automatic acknowledgement tracking
4//! and retry logic. Messages are retried up to [`MAX_RETRIES`] times at
5//! [`RETRY_INTERVAL`] intervals until acknowledged or expired. Incoming
6//! duplicates are suppressed via a rolling dedup cache keyed on
7//! `(source, msgno)` with a [`INCOMING_DEDUP_WINDOW`] TTL.
8//!
9//! # Time handling
10//!
11//! Per the crate-level convention, this module is sans-io and never calls
12//! `std::time::Instant::now()` internally. Every stateful method that
13//! reads the clock accepts a `now: Instant` parameter; callers (typically
14//! the tokio shell) read the wall clock once per iteration and thread
15//! it down.
16
17use std::collections::HashMap;
18use std::time::{Duration, Instant};
19
20use ax25_codec::Ax25Address;
21
22use crate::build::build_aprs_message;
23use crate::error::AprsError;
24use crate::message::{AprsMessage, MAX_APRS_MESSAGE_TEXT_LEN, classify_ack_rej};
25
26/// How long an incoming message's `(source, msgno)` stays in the dedup
27/// cache before being purged.
28pub const INCOMING_DEDUP_WINDOW: Duration = Duration::from_secs(5 * 60);
29
30/// Maximum number of transmission attempts per message before giving up
31/// (the default used when [`MessengerConfig::default`] is in play).
32pub const MAX_RETRIES: u8 = 5;
33
34/// Default interval between retry attempts.
35pub const RETRY_INTERVAL: Duration = Duration::from_secs(30);
36
37/// Configuration knobs for the APRS messenger.
38///
39/// All fields are tunable; the defaults match APRS community conventions
40/// (5 retries at 30-second intervals, 5-minute incoming dedup window).
41#[derive(Debug, Clone, PartialEq, Eq)]
42pub struct MessengerConfig {
43    /// Maximum number of transmission attempts per message.
44    pub max_retries: u8,
45    /// Interval between retry attempts.
46    pub retry_interval: Duration,
47    /// TTL for the incoming-message dedup cache.
48    pub incoming_dedup_window: Duration,
49}
50
51impl Default for MessengerConfig {
52    fn default() -> Self {
53        Self {
54            max_retries: MAX_RETRIES,
55            retry_interval: RETRY_INTERVAL,
56            incoming_dedup_window: INCOMING_DEDUP_WINDOW,
57        }
58    }
59}
60
61/// A message awaiting acknowledgement.
62#[derive(Debug)]
63struct PendingMessage {
64    /// Sequence ID for ack matching.
65    message_id: String,
66    /// Pre-built KISS wire frame for retransmission.
67    wire_frame: Vec<u8>,
68    /// Number of transmission attempts so far.
69    attempts: u8,
70    /// Timestamp of the most recent transmission.
71    last_sent: Instant,
72}
73
74/// Manages APRS message send/receive with automatic ack/retry.
75///
76/// Queues outbound messages, assigns sequence IDs, tracks pending
77/// acknowledgements, generates retry frames on schedule, and suppresses
78/// duplicate deliveries of the same incoming message via a rolling
79/// `(source, msgno)` cache.
80#[derive(Debug)]
81pub struct AprsMessenger {
82    /// This station's callsign/SSID.
83    my_callsign: Ax25Address,
84    /// Digipeater path used for outgoing message frames.
85    digipeater_path: Vec<Ax25Address>,
86    /// Messages awaiting acknowledgement.
87    pending_messages: Vec<PendingMessage>,
88    /// Counter for generating unique message IDs.
89    next_message_id: u16,
90    /// Dedup cache for incoming messages keyed on `(source_call, msgno)`.
91    incoming_seen: HashMap<(String, String), Instant>,
92    /// Tunable retry / dedup behaviour.
93    config: MessengerConfig,
94}
95
96impl AprsMessenger {
97    /// Create a new messenger with the default config.
98    #[must_use]
99    #[allow(clippy::missing_const_for_fn)] // HashMap::new is not const
100    pub fn new(callsign: Ax25Address, digipeater_path: Vec<Ax25Address>) -> Self {
101        Self::with_config(callsign, digipeater_path, MessengerConfig::default())
102    }
103
104    /// Create a new messenger with a caller-supplied [`MessengerConfig`].
105    #[must_use]
106    #[allow(clippy::missing_const_for_fn)] // HashMap::new is not const
107    pub fn with_config(
108        callsign: Ax25Address,
109        digipeater_path: Vec<Ax25Address>,
110        config: MessengerConfig,
111    ) -> Self {
112        Self {
113            my_callsign: callsign,
114            digipeater_path,
115            pending_messages: Vec::new(),
116            next_message_id: 1,
117            incoming_seen: HashMap::new(),
118            config,
119        }
120    }
121
122    /// Queue a message for transmission. Returns the assigned message ID.
123    ///
124    /// The message is immediately available from
125    /// [`next_frame_to_send`](Self::next_frame_to_send). Text longer than
126    /// [`MAX_APRS_MESSAGE_TEXT_LEN`] (67 bytes, APRS 1.0.1 §14) is silently
127    /// truncated; use [`Self::send_message_checked`] if you want a hard
128    /// error instead.
129    ///
130    /// `now` is used to initialise the message's `last_sent` timestamp to
131    /// a time in the past so the message is immediately eligible for
132    /// transmission on the next call to
133    /// [`next_frame_to_send`](Self::next_frame_to_send).
134    pub fn send_message(&mut self, addressee: &str, text: &str, now: Instant) -> String {
135        // Pick a fresh ID, skipping any that clash with still-pending
136        // messages. The ID space is `1..=u16::MAX` (65 535 slots), far
137        // more than MAX_RETRIES of in-flight messages, so this loop
138        // always terminates.
139        let message_id = loop {
140            let candidate = self.next_message_id.to_string();
141            self.next_message_id = self.next_message_id.wrapping_add(1);
142            if self.next_message_id == 0 {
143                self.next_message_id = 1;
144            }
145            if !self
146                .pending_messages
147                .iter()
148                .any(|p| p.message_id == candidate)
149            {
150                break candidate;
151            }
152        };
153
154        let wire_frame = build_aprs_message(
155            &self.my_callsign,
156            addressee,
157            text,
158            Some(&message_id),
159            &self.digipeater_path,
160        );
161
162        // Use a time in the past so the message is immediately eligible.
163        let past = now.checked_sub(self.config.retry_interval).unwrap_or(now);
164
165        self.pending_messages.push(PendingMessage {
166            message_id: message_id.clone(),
167            wire_frame,
168            attempts: 0,
169            last_sent: past,
170        });
171
172        message_id
173    }
174
175    /// Get the next frame that needs to be sent (initial or retry).
176    ///
177    /// Returns `None` if no messages need sending right now. Retries
178    /// happen at [`MessengerConfig::retry_interval`], up to
179    /// [`MessengerConfig::max_retries`] attempts.
180    ///
181    /// `now` is compared against each pending message's `last_sent` to
182    /// decide whether the retry interval has elapsed.
183    #[must_use]
184    pub fn next_frame_to_send(&mut self, now: Instant) -> Option<Vec<u8>> {
185        let max_retries = self.config.max_retries;
186        let retry_interval = self.config.retry_interval;
187        for msg in &mut self.pending_messages {
188            if msg.attempts < max_retries && now.duration_since(msg.last_sent) >= retry_interval {
189                msg.attempts += 1;
190                msg.last_sent = now;
191                return Some(msg.wire_frame.clone());
192            }
193        }
194        None
195    }
196
197    /// Like [`Self::send_message`] but returns `Err(MessageTooLong)` if
198    /// the text exceeds the APRS 1.0.1 §14 limit of 67 bytes.
199    ///
200    /// # Errors
201    ///
202    /// Returns [`AprsError::MessageTooLong`] when the text is too long.
203    pub fn send_message_checked(
204        &mut self,
205        addressee: &str,
206        text: &str,
207        now: Instant,
208    ) -> Result<String, AprsError> {
209        if text.len() > MAX_APRS_MESSAGE_TEXT_LEN {
210            return Err(AprsError::MessageTooLong(text.len()));
211        }
212        Ok(self.send_message(addressee, text, now))
213    }
214
215    /// Check whether an incoming message is a duplicate of one recently
216    /// seen from the same source station with the same msgno.
217    ///
218    /// Returns `true` if this is a new message (first time seen within
219    /// [`INCOMING_DEDUP_WINDOW`]), `false` if it's a duplicate that
220    /// should be ignored by the caller. Stateful: records the message
221    /// in the dedup cache on `true`. Messages without a `message_id`
222    /// are always considered new.
223    ///
224    /// `now` is used to expire stale dedup entries and to record the
225    /// arrival time of the current message.
226    pub fn is_new_incoming(&mut self, source: &str, msg: &AprsMessage, now: Instant) -> bool {
227        let window = self.config.incoming_dedup_window;
228        self.incoming_seen
229            .retain(|_, t| now.duration_since(*t) < window);
230        let Some(ref id) = msg.message_id else {
231            return true;
232        };
233        let key = (source.to_owned(), id.clone());
234        if self.incoming_seen.contains_key(&key) {
235            return false;
236        }
237        let _prior = self.incoming_seen.insert(key, now);
238        true
239    }
240
241    /// Process an incoming APRS message.
242    ///
243    /// If the text is an ack or rej control frame (per [`classify_ack_rej`])
244    /// for a pending message, removes the pending entry and returns `true`.
245    /// Returns `false` for regular messages, including ones that happen to
246    /// start with the letters `ack`/`rej` but aren't valid control frames.
247    pub fn process_incoming(&mut self, msg: &AprsMessage) -> bool {
248        let Some((_is_ack, id)) = classify_ack_rej(&msg.text) else {
249            return false;
250        };
251        let before = self.pending_messages.len();
252        self.pending_messages.retain(|p| p.message_id != id);
253        self.pending_messages.len() < before
254    }
255
256    /// Build an ack frame for a received message.
257    ///
258    /// The ack is sent back to `from` with text `ack{message_id}`.
259    #[must_use]
260    pub fn build_ack(&self, from: &str, message_id: &str) -> Vec<u8> {
261        let text = format!("ack{message_id}");
262        build_aprs_message(&self.my_callsign, from, &text, None, &self.digipeater_path)
263    }
264
265    /// Build a rej (reject) frame for a received message.
266    ///
267    /// The rej is sent back to `from` with text `rej{message_id}`.
268    #[must_use]
269    pub fn build_rej(&self, from: &str, message_id: &str) -> Vec<u8> {
270        let text = format!("rej{message_id}");
271        build_aprs_message(&self.my_callsign, from, &text, None, &self.digipeater_path)
272    }
273
274    /// Remove expired messages (those that have reached [`MAX_RETRIES`]
275    /// attempts) and return their message IDs so callers can notify upstream.
276    ///
277    /// Takes `now: Instant` for API consistency with the other time-aware
278    /// methods even though no clock-dependent logic is currently used here
279    /// — the decision is based on attempt count, not elapsed time.
280    pub fn cleanup_expired(&mut self, _now: Instant) -> Vec<String> {
281        let mut expired = Vec::new();
282        let max_retries = self.config.max_retries;
283        self.pending_messages.retain(|m| {
284            if m.attempts >= max_retries {
285                expired.push(m.message_id.clone());
286                false
287            } else {
288                true
289            }
290        });
291        expired
292    }
293
294    /// Number of pending (unacknowledged) messages.
295    #[must_use]
296    pub const fn pending_count(&self) -> usize {
297        self.pending_messages.len()
298    }
299}
300
301#[cfg(test)]
302mod tests {
303    use super::*;
304    use ax25_codec::parse_ax25;
305    use kiss_tnc::decode_kiss_frame;
306
307    use crate::message::parse_aprs_message as parse_msg;
308
309    type TestResult = Result<(), Box<dyn std::error::Error>>;
310
311    fn test_callsign() -> Ax25Address {
312        Ax25Address::new("N0CALL", 7)
313    }
314
315    fn default_digipeater_path() -> Vec<Ax25Address> {
316        vec![Ax25Address::new("WIDE1", 1), Ax25Address::new("WIDE2", 1)]
317    }
318
319    fn test_messenger() -> AprsMessenger {
320        AprsMessenger::new(test_callsign(), default_digipeater_path())
321    }
322
323    #[test]
324    fn send_message_assigns_incrementing_ids() {
325        let t0 = Instant::now();
326        let mut m = test_messenger();
327        let id1 = m.send_message("W1AW", "Hello", t0);
328        let id2 = m.send_message("W1AW", "World", t0);
329        assert_eq!(id1, "1");
330        assert_eq!(id2, "2");
331        assert_eq!(m.pending_count(), 2);
332    }
333
334    #[test]
335    fn next_frame_returns_pending_message() -> TestResult {
336        let t0 = Instant::now();
337        let mut m = test_messenger();
338        let _id = m.send_message("W1AW", "Test", t0);
339
340        // Message was created with last_sent in the past, so it should be ready.
341        let frame = m.next_frame_to_send(t0);
342        let wire = frame.ok_or("expected a frame to send")?;
343
344        // Verify the frame decodes to a valid APRS message.
345        let kiss = decode_kiss_frame(&wire)?;
346        let packet = parse_ax25(&kiss.data)?;
347        let msg = parse_msg(&packet.info)?;
348        assert_eq!(msg.addressee, "W1AW");
349        assert_eq!(msg.text, "Test");
350        assert_eq!(msg.message_id, Some("1".to_owned()));
351        Ok(())
352    }
353
354    #[test]
355    fn next_frame_returns_none_when_recently_sent() {
356        let t0 = Instant::now();
357        let mut m = test_messenger();
358        let _id = m.send_message("W1AW", "Test", t0);
359
360        // First call sends the message.
361        let _frame = m.next_frame_to_send(t0);
362        // Second call should return None (retry interval not elapsed).
363        assert!(m.next_frame_to_send(t0).is_none());
364    }
365
366    #[test]
367    fn process_incoming_ack_removes_pending() {
368        let t0 = Instant::now();
369        let mut m = test_messenger();
370        let id = m.send_message("W1AW", "Hello", t0);
371        assert_eq!(m.pending_count(), 1);
372
373        let ack = AprsMessage {
374            addressee: "N0CALL".to_owned(),
375            text: format!("ack{id}"),
376            message_id: None,
377            reply_ack: None,
378        };
379        assert!(m.process_incoming(&ack));
380        assert_eq!(m.pending_count(), 0);
381    }
382
383    #[test]
384    fn process_incoming_rej_removes_pending() {
385        let t0 = Instant::now();
386        let mut m = test_messenger();
387        let id = m.send_message("W1AW", "Hello", t0);
388
389        let rej = AprsMessage {
390            addressee: "N0CALL".to_owned(),
391            text: format!("rej{id}"),
392            message_id: None,
393            reply_ack: None,
394        };
395        assert!(m.process_incoming(&rej));
396        assert_eq!(m.pending_count(), 0);
397    }
398
399    #[test]
400    fn process_incoming_unrelated_message_returns_false() {
401        let t0 = Instant::now();
402        let mut m = test_messenger();
403        let _id = m.send_message("W1AW", "Hello", t0);
404
405        let unrelated = AprsMessage {
406            addressee: "N0CALL".to_owned(),
407            text: "Just a regular message".to_owned(),
408            message_id: Some("42".to_owned()),
409            reply_ack: None,
410        };
411        assert!(!m.process_incoming(&unrelated));
412        assert_eq!(m.pending_count(), 1);
413    }
414
415    #[test]
416    fn build_ack_produces_valid_frame() -> TestResult {
417        let m = test_messenger();
418        let wire = m.build_ack("W1AW", "42");
419
420        let kiss = decode_kiss_frame(&wire)?;
421        let packet = parse_ax25(&kiss.data)?;
422        let msg = parse_msg(&packet.info)?;
423        assert_eq!(msg.addressee, "W1AW");
424        assert_eq!(msg.text, "ack42");
425        Ok(())
426    }
427
428    #[test]
429    fn build_rej_produces_valid_frame() -> TestResult {
430        let m = test_messenger();
431        let wire = m.build_rej("W1AW", "42");
432
433        let kiss = decode_kiss_frame(&wire)?;
434        let packet = parse_ax25(&kiss.data)?;
435        let msg = parse_msg(&packet.info)?;
436        assert_eq!(msg.addressee, "W1AW");
437        assert_eq!(msg.text, "rej42");
438        Ok(())
439    }
440
441    #[test]
442    fn cleanup_expired_removes_maxed_messages() {
443        let t0 = Instant::now();
444        let mut m = test_messenger();
445        let id = m.send_message("W1AW", "Test", t0);
446
447        // Exhaust all retries by advancing time past the retry interval
448        // each round. Sans-io: we mint the timestamps; no real waiting.
449        let mut clock = t0;
450        for _ in 0..MAX_RETRIES {
451            clock += RETRY_INTERVAL;
452            drop(m.next_frame_to_send(clock));
453        }
454
455        assert_eq!(m.pending_count(), 1); // Still present, just exhausted.
456        let expired = m.cleanup_expired(clock);
457        assert_eq!(expired, vec![id]);
458        assert_eq!(m.pending_count(), 0);
459    }
460
461    #[test]
462    fn send_message_checked_rejects_too_long_text() {
463        let t0 = Instant::now();
464        let mut m = test_messenger();
465        let long = "x".repeat(100);
466        assert!(m.send_message_checked("W1AW", &long, t0).is_err());
467        assert_eq!(m.pending_count(), 0);
468    }
469
470    #[test]
471    fn send_message_checked_accepts_boundary_length() {
472        let t0 = Instant::now();
473        let mut m = test_messenger();
474        let text = "x".repeat(67);
475        assert!(m.send_message_checked("W1AW", &text, t0).is_ok());
476    }
477
478    #[test]
479    fn is_new_incoming_dedup_matches_source_msgno() {
480        let t0 = Instant::now();
481        let mut m = test_messenger();
482        let msg = AprsMessage {
483            addressee: "N0CALL".to_owned(),
484            text: "hello".to_owned(),
485            message_id: Some("42".to_owned()),
486            reply_ack: None,
487        };
488        assert!(m.is_new_incoming("W1AW", &msg, t0));
489        assert!(!m.is_new_incoming("W1AW", &msg, t0));
490        // Different source → not a duplicate.
491        assert!(m.is_new_incoming("W2AW", &msg, t0));
492    }
493
494    #[test]
495    fn is_new_incoming_no_id_always_new() {
496        let t0 = Instant::now();
497        let mut m = test_messenger();
498        let msg = AprsMessage {
499            addressee: "N0CALL".to_owned(),
500            text: "hello".to_owned(),
501            message_id: None,
502            reply_ack: None,
503        };
504        assert!(m.is_new_incoming("W1AW", &msg, t0));
505        assert!(m.is_new_incoming("W1AW", &msg, t0));
506    }
507
508    #[test]
509    fn is_new_incoming_expires_stale_entries() {
510        let t0 = Instant::now();
511        let mut m = test_messenger();
512        let msg = AprsMessage {
513            addressee: "N0CALL".to_owned(),
514            text: "hello".to_owned(),
515            message_id: Some("42".to_owned()),
516            reply_ack: None,
517        };
518        assert!(m.is_new_incoming("W1AW", &msg, t0));
519        // Jump past the dedup window — the entry should be expired.
520        let later = t0 + INCOMING_DEDUP_WINDOW + Duration::from_secs(1);
521        assert!(m.is_new_incoming("W1AW", &msg, later));
522    }
523
524    #[test]
525    fn process_incoming_ignores_false_positive_message() {
526        let t0 = Instant::now();
527        let mut m = test_messenger();
528        let _id = m.send_message("W1AW", "Hello", t0);
529
530        // Regression: this used to be treated as an ack for msg "nowle".
531        let false_ack = AprsMessage {
532            addressee: "N0CALL".to_owned(),
533            text: "acknowledge receipt".to_owned(),
534            message_id: None,
535            reply_ack: None,
536        };
537        assert!(!m.process_incoming(&false_ack));
538        assert_eq!(m.pending_count(), 1);
539    }
540
541    #[test]
542    fn message_id_wraps_around_skipping_zero() {
543        let t0 = Instant::now();
544        let mut m = test_messenger();
545        m.next_message_id = u16::MAX;
546        let id1 = m.send_message("W1AW", "A", t0);
547        assert_eq!(id1, u16::MAX.to_string());
548        // After wrapping, 0 is skipped, so next is 1.
549        let id2 = m.send_message("W1AW", "B", t0);
550        assert_eq!(id2, "1");
551    }
552}