1use std::collections::HashMap;
18use std::time::{Duration, Instant};
19
20use ax25_codec::Ax25Address;
21
22use crate::build::build_aprs_message;
23use crate::error::AprsError;
24use crate::message::{AprsMessage, MAX_APRS_MESSAGE_TEXT_LEN, classify_ack_rej};
25
26pub const INCOMING_DEDUP_WINDOW: Duration = Duration::from_secs(5 * 60);
29
30pub const MAX_RETRIES: u8 = 5;
33
34pub const RETRY_INTERVAL: Duration = Duration::from_secs(30);
36
37#[derive(Debug, Clone, PartialEq, Eq)]
42pub struct MessengerConfig {
43 pub max_retries: u8,
45 pub retry_interval: Duration,
47 pub incoming_dedup_window: Duration,
49}
50
51impl Default for MessengerConfig {
52 fn default() -> Self {
53 Self {
54 max_retries: MAX_RETRIES,
55 retry_interval: RETRY_INTERVAL,
56 incoming_dedup_window: INCOMING_DEDUP_WINDOW,
57 }
58 }
59}
60
61#[derive(Debug)]
63struct PendingMessage {
64 message_id: String,
66 wire_frame: Vec<u8>,
68 attempts: u8,
70 last_sent: Instant,
72}
73
74#[derive(Debug)]
81pub struct AprsMessenger {
82 my_callsign: Ax25Address,
84 digipeater_path: Vec<Ax25Address>,
86 pending_messages: Vec<PendingMessage>,
88 next_message_id: u16,
90 incoming_seen: HashMap<(String, String), Instant>,
92 config: MessengerConfig,
94}
95
96impl AprsMessenger {
97 #[must_use]
99 #[allow(clippy::missing_const_for_fn)] pub fn new(callsign: Ax25Address, digipeater_path: Vec<Ax25Address>) -> Self {
101 Self::with_config(callsign, digipeater_path, MessengerConfig::default())
102 }
103
104 #[must_use]
106 #[allow(clippy::missing_const_for_fn)] pub fn with_config(
108 callsign: Ax25Address,
109 digipeater_path: Vec<Ax25Address>,
110 config: MessengerConfig,
111 ) -> Self {
112 Self {
113 my_callsign: callsign,
114 digipeater_path,
115 pending_messages: Vec::new(),
116 next_message_id: 1,
117 incoming_seen: HashMap::new(),
118 config,
119 }
120 }
121
122 pub fn send_message(&mut self, addressee: &str, text: &str, now: Instant) -> String {
135 let message_id = loop {
140 let candidate = self.next_message_id.to_string();
141 self.next_message_id = self.next_message_id.wrapping_add(1);
142 if self.next_message_id == 0 {
143 self.next_message_id = 1;
144 }
145 if !self
146 .pending_messages
147 .iter()
148 .any(|p| p.message_id == candidate)
149 {
150 break candidate;
151 }
152 };
153
154 let wire_frame = build_aprs_message(
155 &self.my_callsign,
156 addressee,
157 text,
158 Some(&message_id),
159 &self.digipeater_path,
160 );
161
162 let past = now.checked_sub(self.config.retry_interval).unwrap_or(now);
164
165 self.pending_messages.push(PendingMessage {
166 message_id: message_id.clone(),
167 wire_frame,
168 attempts: 0,
169 last_sent: past,
170 });
171
172 message_id
173 }
174
175 #[must_use]
184 pub fn next_frame_to_send(&mut self, now: Instant) -> Option<Vec<u8>> {
185 let max_retries = self.config.max_retries;
186 let retry_interval = self.config.retry_interval;
187 for msg in &mut self.pending_messages {
188 if msg.attempts < max_retries && now.duration_since(msg.last_sent) >= retry_interval {
189 msg.attempts += 1;
190 msg.last_sent = now;
191 return Some(msg.wire_frame.clone());
192 }
193 }
194 None
195 }
196
197 pub fn send_message_checked(
204 &mut self,
205 addressee: &str,
206 text: &str,
207 now: Instant,
208 ) -> Result<String, AprsError> {
209 if text.len() > MAX_APRS_MESSAGE_TEXT_LEN {
210 return Err(AprsError::MessageTooLong(text.len()));
211 }
212 Ok(self.send_message(addressee, text, now))
213 }
214
215 pub fn is_new_incoming(&mut self, source: &str, msg: &AprsMessage, now: Instant) -> bool {
227 let window = self.config.incoming_dedup_window;
228 self.incoming_seen
229 .retain(|_, t| now.duration_since(*t) < window);
230 let Some(ref id) = msg.message_id else {
231 return true;
232 };
233 let key = (source.to_owned(), id.clone());
234 if self.incoming_seen.contains_key(&key) {
235 return false;
236 }
237 let _prior = self.incoming_seen.insert(key, now);
238 true
239 }
240
241 pub fn process_incoming(&mut self, msg: &AprsMessage) -> bool {
248 let Some((_is_ack, id)) = classify_ack_rej(&msg.text) else {
249 return false;
250 };
251 let before = self.pending_messages.len();
252 self.pending_messages.retain(|p| p.message_id != id);
253 self.pending_messages.len() < before
254 }
255
256 #[must_use]
260 pub fn build_ack(&self, from: &str, message_id: &str) -> Vec<u8> {
261 let text = format!("ack{message_id}");
262 build_aprs_message(&self.my_callsign, from, &text, None, &self.digipeater_path)
263 }
264
265 #[must_use]
269 pub fn build_rej(&self, from: &str, message_id: &str) -> Vec<u8> {
270 let text = format!("rej{message_id}");
271 build_aprs_message(&self.my_callsign, from, &text, None, &self.digipeater_path)
272 }
273
274 pub fn cleanup_expired(&mut self, _now: Instant) -> Vec<String> {
281 let mut expired = Vec::new();
282 let max_retries = self.config.max_retries;
283 self.pending_messages.retain(|m| {
284 if m.attempts >= max_retries {
285 expired.push(m.message_id.clone());
286 false
287 } else {
288 true
289 }
290 });
291 expired
292 }
293
294 #[must_use]
296 pub const fn pending_count(&self) -> usize {
297 self.pending_messages.len()
298 }
299}
300
301#[cfg(test)]
302mod tests {
303 use super::*;
304 use ax25_codec::parse_ax25;
305 use kiss_tnc::decode_kiss_frame;
306
307 use crate::message::parse_aprs_message as parse_msg;
308
309 type TestResult = Result<(), Box<dyn std::error::Error>>;
310
311 fn test_callsign() -> Ax25Address {
312 Ax25Address::new("N0CALL", 7)
313 }
314
315 fn default_digipeater_path() -> Vec<Ax25Address> {
316 vec![Ax25Address::new("WIDE1", 1), Ax25Address::new("WIDE2", 1)]
317 }
318
319 fn test_messenger() -> AprsMessenger {
320 AprsMessenger::new(test_callsign(), default_digipeater_path())
321 }
322
323 #[test]
324 fn send_message_assigns_incrementing_ids() {
325 let t0 = Instant::now();
326 let mut m = test_messenger();
327 let id1 = m.send_message("W1AW", "Hello", t0);
328 let id2 = m.send_message("W1AW", "World", t0);
329 assert_eq!(id1, "1");
330 assert_eq!(id2, "2");
331 assert_eq!(m.pending_count(), 2);
332 }
333
334 #[test]
335 fn next_frame_returns_pending_message() -> TestResult {
336 let t0 = Instant::now();
337 let mut m = test_messenger();
338 let _id = m.send_message("W1AW", "Test", t0);
339
340 let frame = m.next_frame_to_send(t0);
342 let wire = frame.ok_or("expected a frame to send")?;
343
344 let kiss = decode_kiss_frame(&wire)?;
346 let packet = parse_ax25(&kiss.data)?;
347 let msg = parse_msg(&packet.info)?;
348 assert_eq!(msg.addressee, "W1AW");
349 assert_eq!(msg.text, "Test");
350 assert_eq!(msg.message_id, Some("1".to_owned()));
351 Ok(())
352 }
353
354 #[test]
355 fn next_frame_returns_none_when_recently_sent() {
356 let t0 = Instant::now();
357 let mut m = test_messenger();
358 let _id = m.send_message("W1AW", "Test", t0);
359
360 let _frame = m.next_frame_to_send(t0);
362 assert!(m.next_frame_to_send(t0).is_none());
364 }
365
366 #[test]
367 fn process_incoming_ack_removes_pending() {
368 let t0 = Instant::now();
369 let mut m = test_messenger();
370 let id = m.send_message("W1AW", "Hello", t0);
371 assert_eq!(m.pending_count(), 1);
372
373 let ack = AprsMessage {
374 addressee: "N0CALL".to_owned(),
375 text: format!("ack{id}"),
376 message_id: None,
377 reply_ack: None,
378 };
379 assert!(m.process_incoming(&ack));
380 assert_eq!(m.pending_count(), 0);
381 }
382
383 #[test]
384 fn process_incoming_rej_removes_pending() {
385 let t0 = Instant::now();
386 let mut m = test_messenger();
387 let id = m.send_message("W1AW", "Hello", t0);
388
389 let rej = AprsMessage {
390 addressee: "N0CALL".to_owned(),
391 text: format!("rej{id}"),
392 message_id: None,
393 reply_ack: None,
394 };
395 assert!(m.process_incoming(&rej));
396 assert_eq!(m.pending_count(), 0);
397 }
398
399 #[test]
400 fn process_incoming_unrelated_message_returns_false() {
401 let t0 = Instant::now();
402 let mut m = test_messenger();
403 let _id = m.send_message("W1AW", "Hello", t0);
404
405 let unrelated = AprsMessage {
406 addressee: "N0CALL".to_owned(),
407 text: "Just a regular message".to_owned(),
408 message_id: Some("42".to_owned()),
409 reply_ack: None,
410 };
411 assert!(!m.process_incoming(&unrelated));
412 assert_eq!(m.pending_count(), 1);
413 }
414
415 #[test]
416 fn build_ack_produces_valid_frame() -> TestResult {
417 let m = test_messenger();
418 let wire = m.build_ack("W1AW", "42");
419
420 let kiss = decode_kiss_frame(&wire)?;
421 let packet = parse_ax25(&kiss.data)?;
422 let msg = parse_msg(&packet.info)?;
423 assert_eq!(msg.addressee, "W1AW");
424 assert_eq!(msg.text, "ack42");
425 Ok(())
426 }
427
428 #[test]
429 fn build_rej_produces_valid_frame() -> TestResult {
430 let m = test_messenger();
431 let wire = m.build_rej("W1AW", "42");
432
433 let kiss = decode_kiss_frame(&wire)?;
434 let packet = parse_ax25(&kiss.data)?;
435 let msg = parse_msg(&packet.info)?;
436 assert_eq!(msg.addressee, "W1AW");
437 assert_eq!(msg.text, "rej42");
438 Ok(())
439 }
440
441 #[test]
442 fn cleanup_expired_removes_maxed_messages() {
443 let t0 = Instant::now();
444 let mut m = test_messenger();
445 let id = m.send_message("W1AW", "Test", t0);
446
447 let mut clock = t0;
450 for _ in 0..MAX_RETRIES {
451 clock += RETRY_INTERVAL;
452 drop(m.next_frame_to_send(clock));
453 }
454
455 assert_eq!(m.pending_count(), 1); let expired = m.cleanup_expired(clock);
457 assert_eq!(expired, vec![id]);
458 assert_eq!(m.pending_count(), 0);
459 }
460
461 #[test]
462 fn send_message_checked_rejects_too_long_text() {
463 let t0 = Instant::now();
464 let mut m = test_messenger();
465 let long = "x".repeat(100);
466 assert!(m.send_message_checked("W1AW", &long, t0).is_err());
467 assert_eq!(m.pending_count(), 0);
468 }
469
470 #[test]
471 fn send_message_checked_accepts_boundary_length() {
472 let t0 = Instant::now();
473 let mut m = test_messenger();
474 let text = "x".repeat(67);
475 assert!(m.send_message_checked("W1AW", &text, t0).is_ok());
476 }
477
478 #[test]
479 fn is_new_incoming_dedup_matches_source_msgno() {
480 let t0 = Instant::now();
481 let mut m = test_messenger();
482 let msg = AprsMessage {
483 addressee: "N0CALL".to_owned(),
484 text: "hello".to_owned(),
485 message_id: Some("42".to_owned()),
486 reply_ack: None,
487 };
488 assert!(m.is_new_incoming("W1AW", &msg, t0));
489 assert!(!m.is_new_incoming("W1AW", &msg, t0));
490 assert!(m.is_new_incoming("W2AW", &msg, t0));
492 }
493
494 #[test]
495 fn is_new_incoming_no_id_always_new() {
496 let t0 = Instant::now();
497 let mut m = test_messenger();
498 let msg = AprsMessage {
499 addressee: "N0CALL".to_owned(),
500 text: "hello".to_owned(),
501 message_id: None,
502 reply_ack: None,
503 };
504 assert!(m.is_new_incoming("W1AW", &msg, t0));
505 assert!(m.is_new_incoming("W1AW", &msg, t0));
506 }
507
508 #[test]
509 fn is_new_incoming_expires_stale_entries() {
510 let t0 = Instant::now();
511 let mut m = test_messenger();
512 let msg = AprsMessage {
513 addressee: "N0CALL".to_owned(),
514 text: "hello".to_owned(),
515 message_id: Some("42".to_owned()),
516 reply_ack: None,
517 };
518 assert!(m.is_new_incoming("W1AW", &msg, t0));
519 let later = t0 + INCOMING_DEDUP_WINDOW + Duration::from_secs(1);
521 assert!(m.is_new_incoming("W1AW", &msg, later));
522 }
523
524 #[test]
525 fn process_incoming_ignores_false_positive_message() {
526 let t0 = Instant::now();
527 let mut m = test_messenger();
528 let _id = m.send_message("W1AW", "Hello", t0);
529
530 let false_ack = AprsMessage {
532 addressee: "N0CALL".to_owned(),
533 text: "acknowledge receipt".to_owned(),
534 message_id: None,
535 reply_ack: None,
536 };
537 assert!(!m.process_incoming(&false_ack));
538 assert_eq!(m.pending_count(), 1);
539 }
540
541 #[test]
542 fn message_id_wraps_around_skipping_zero() {
543 let t0 = Instant::now();
544 let mut m = test_messenger();
545 m.next_message_id = u16::MAX;
546 let id1 = m.send_message("W1AW", "A", t0);
547 assert_eq!(id1, u16::MAX.to_string());
548 let id2 = m.send_message("W1AW", "B", t0);
550 assert_eq!(id2, "1");
551 }
552}