dstar_gateway_core/session/
outbox.rs

1//! Outbound packet priority queue keyed by `not_before: Instant`.
2//!
3//! The session's outbox holds outbound packets with optional
4//! retransmission scheduling. `pop_ready(now)` returns the next
5//! packet whose `not_before` is at or before `now`. Used by both
6//! the immediate-send path and the retransmit scheduler.
7
8use std::cmp::Ordering;
9use std::collections::BinaryHeap;
10use std::net::SocketAddr;
11use std::time::Instant;
12
13/// One queued outbound datagram with its earliest send instant.
14#[derive(Debug, Clone)]
15pub(crate) struct OutboundPacket {
16    pub(crate) dst: SocketAddr,
17    pub(crate) payload: Vec<u8>,
18    pub(crate) not_before: Instant,
19}
20
21impl PartialEq for OutboundPacket {
22    fn eq(&self, other: &Self) -> bool {
23        self.not_before == other.not_before
24    }
25}
26impl Eq for OutboundPacket {}
27impl PartialOrd for OutboundPacket {
28    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
29        Some(self.cmp(other))
30    }
31}
32impl Ord for OutboundPacket {
33    fn cmp(&self, other: &Self) -> Ordering {
34        // Reversed ordering so BinaryHeap pops the EARLIEST first.
35        other.not_before.cmp(&self.not_before)
36    }
37}
38
39/// Priority queue of outbound packets ordered by `not_before`.
40#[derive(Debug, Default)]
41pub(crate) struct Outbox {
42    queue: BinaryHeap<OutboundPacket>,
43}
44
45impl Outbox {
46    pub(crate) fn new() -> Self {
47        Self::default()
48    }
49
50    pub(crate) fn enqueue(&mut self, packet: OutboundPacket) {
51        self.queue.push(packet);
52    }
53
54    pub(crate) fn pop_ready(&mut self, now: Instant) -> Option<OutboundPacket> {
55        if self.queue.peek().is_some_and(|p| p.not_before <= now) {
56            self.queue.pop()
57        } else {
58            None
59        }
60    }
61
62    pub(crate) fn peek_next_deadline(&self) -> Option<Instant> {
63        self.queue.peek().map(|p| p.not_before)
64    }
65}
66
67#[cfg(test)]
68mod tests {
69    use super::*;
70    use std::net::{IpAddr, Ipv4Addr};
71    use std::time::Duration;
72
73    const ADDR: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 30001);
74
75    #[test]
76    fn empty_pop_returns_none() {
77        let mut ob = Outbox::new();
78        assert!(ob.pop_ready(Instant::now()).is_none());
79    }
80
81    #[test]
82    fn pop_ready_returns_due_packet() -> Result<(), Box<dyn std::error::Error>> {
83        let mut ob = Outbox::new();
84        let now = Instant::now();
85        ob.enqueue(OutboundPacket {
86            dst: ADDR,
87            payload: vec![1, 2, 3],
88            not_before: now,
89        });
90        let popped = ob.pop_ready(now).ok_or("expected due packet")?;
91        assert_eq!(popped.payload, vec![1, 2, 3]);
92        assert_eq!(popped.dst, ADDR);
93        assert!(
94            ob.pop_ready(now).is_none(),
95            "outbox drained after single pop"
96        );
97        Ok(())
98    }
99
100    #[test]
101    fn pop_ready_holds_future_packet() {
102        let mut ob = Outbox::new();
103        let now = Instant::now();
104        let later = now + Duration::from_secs(1);
105        ob.enqueue(OutboundPacket {
106            dst: ADDR,
107            payload: vec![1],
108            not_before: later,
109        });
110        assert!(ob.pop_ready(now).is_none());
111        assert_eq!(ob.peek_next_deadline(), Some(later));
112        assert!(ob.pop_ready(later).is_some());
113    }
114
115    #[test]
116    fn earlier_packet_pops_first() -> Result<(), Box<dyn std::error::Error>> {
117        let mut ob = Outbox::new();
118        let now = Instant::now();
119        ob.enqueue(OutboundPacket {
120            dst: ADDR,
121            payload: vec![2],
122            not_before: now + Duration::from_millis(100),
123        });
124        ob.enqueue(OutboundPacket {
125            dst: ADDR,
126            payload: vec![1],
127            not_before: now,
128        });
129        assert_eq!(
130            ob.pop_ready(now + Duration::from_millis(200))
131                .ok_or("expected first")?
132                .payload,
133            vec![1]
134        );
135        assert_eq!(
136            ob.pop_ready(now + Duration::from_millis(200))
137                .ok_or("expected second")?
138                .payload,
139            vec![2]
140        );
141        Ok(())
142    }
143
144    #[test]
145    fn peek_next_deadline_reports_earliest() {
146        let mut ob = Outbox::new();
147        let now = Instant::now();
148        ob.enqueue(OutboundPacket {
149            dst: ADDR,
150            payload: vec![],
151            not_before: now + Duration::from_secs(5),
152        });
153        ob.enqueue(OutboundPacket {
154            dst: ADDR,
155            payload: vec![],
156            not_before: now + Duration::from_secs(1),
157        });
158        assert_eq!(ob.peek_next_deadline(), Some(now + Duration::from_secs(1)));
159    }
160}