dstar_gateway_core/session/
outbox.rs1use std::cmp::Ordering;
9use std::collections::BinaryHeap;
10use std::net::SocketAddr;
11use std::time::Instant;
12
13#[derive(Debug, Clone)]
15pub(crate) struct OutboundPacket {
16 pub(crate) dst: SocketAddr,
17 pub(crate) payload: Vec<u8>,
18 pub(crate) not_before: Instant,
19}
20
21impl PartialEq for OutboundPacket {
22 fn eq(&self, other: &Self) -> bool {
23 self.not_before == other.not_before
24 }
25}
26impl Eq for OutboundPacket {}
27impl PartialOrd for OutboundPacket {
28 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
29 Some(self.cmp(other))
30 }
31}
32impl Ord for OutboundPacket {
33 fn cmp(&self, other: &Self) -> Ordering {
34 other.not_before.cmp(&self.not_before)
36 }
37}
38
39#[derive(Debug, Default)]
41pub(crate) struct Outbox {
42 queue: BinaryHeap<OutboundPacket>,
43}
44
45impl Outbox {
46 pub(crate) fn new() -> Self {
47 Self::default()
48 }
49
50 pub(crate) fn enqueue(&mut self, packet: OutboundPacket) {
51 self.queue.push(packet);
52 }
53
54 pub(crate) fn pop_ready(&mut self, now: Instant) -> Option<OutboundPacket> {
55 if self.queue.peek().is_some_and(|p| p.not_before <= now) {
56 self.queue.pop()
57 } else {
58 None
59 }
60 }
61
62 pub(crate) fn peek_next_deadline(&self) -> Option<Instant> {
63 self.queue.peek().map(|p| p.not_before)
64 }
65}
66
67#[cfg(test)]
68mod tests {
69 use super::*;
70 use std::net::{IpAddr, Ipv4Addr};
71 use std::time::Duration;
72
73 const ADDR: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 30001);
74
75 #[test]
76 fn empty_pop_returns_none() {
77 let mut ob = Outbox::new();
78 assert!(ob.pop_ready(Instant::now()).is_none());
79 }
80
81 #[test]
82 fn pop_ready_returns_due_packet() -> Result<(), Box<dyn std::error::Error>> {
83 let mut ob = Outbox::new();
84 let now = Instant::now();
85 ob.enqueue(OutboundPacket {
86 dst: ADDR,
87 payload: vec![1, 2, 3],
88 not_before: now,
89 });
90 let popped = ob.pop_ready(now).ok_or("expected due packet")?;
91 assert_eq!(popped.payload, vec![1, 2, 3]);
92 assert_eq!(popped.dst, ADDR);
93 assert!(
94 ob.pop_ready(now).is_none(),
95 "outbox drained after single pop"
96 );
97 Ok(())
98 }
99
100 #[test]
101 fn pop_ready_holds_future_packet() {
102 let mut ob = Outbox::new();
103 let now = Instant::now();
104 let later = now + Duration::from_secs(1);
105 ob.enqueue(OutboundPacket {
106 dst: ADDR,
107 payload: vec![1],
108 not_before: later,
109 });
110 assert!(ob.pop_ready(now).is_none());
111 assert_eq!(ob.peek_next_deadline(), Some(later));
112 assert!(ob.pop_ready(later).is_some());
113 }
114
115 #[test]
116 fn earlier_packet_pops_first() -> Result<(), Box<dyn std::error::Error>> {
117 let mut ob = Outbox::new();
118 let now = Instant::now();
119 ob.enqueue(OutboundPacket {
120 dst: ADDR,
121 payload: vec![2],
122 not_before: now + Duration::from_millis(100),
123 });
124 ob.enqueue(OutboundPacket {
125 dst: ADDR,
126 payload: vec![1],
127 not_before: now,
128 });
129 assert_eq!(
130 ob.pop_ready(now + Duration::from_millis(200))
131 .ok_or("expected first")?
132 .payload,
133 vec![1]
134 );
135 assert_eq!(
136 ob.pop_ready(now + Duration::from_millis(200))
137 .ok_or("expected second")?
138 .payload,
139 vec![2]
140 );
141 Ok(())
142 }
143
144 #[test]
145 fn peek_next_deadline_reports_earliest() {
146 let mut ob = Outbox::new();
147 let now = Instant::now();
148 ob.enqueue(OutboundPacket {
149 dst: ADDR,
150 payload: vec![],
151 not_before: now + Duration::from_secs(5),
152 });
153 ob.enqueue(OutboundPacket {
154 dst: ADDR,
155 payload: vec![],
156 not_before: now + Duration::from_secs(1),
157 });
158 assert_eq!(ob.peek_next_deadline(), Some(now + Duration::from_secs(1)));
159 }
160}