dstar_gateway_server/reflector/
stream_cache.rs1use std::net::SocketAddr;
14use std::time::{Duration, Instant};
15
16use dstar_gateway_core::header::DStarHeader;
17use dstar_gateway_core::types::StreamId;
18
19#[derive(Debug, Clone)]
21pub struct StreamCache {
22 stream_id: StreamId,
23 header: DStarHeader,
24 header_bytes: Vec<u8>,
28 seq_counter: u32,
29 started_at: Instant,
30 last_activity: Instant,
31 from: SocketAddr,
32}
33
34impl StreamCache {
35 #[must_use]
41 pub const fn new(
42 stream_id: StreamId,
43 header: DStarHeader,
44 from: SocketAddr,
45 now: Instant,
46 ) -> Self {
47 Self {
48 stream_id,
49 header,
50 header_bytes: Vec::new(),
51 seq_counter: 0,
52 started_at: now,
53 last_activity: now,
54 from,
55 }
56 }
57
58 #[must_use]
65 pub const fn new_with_bytes(
66 stream_id: StreamId,
67 header: DStarHeader,
68 header_bytes: Vec<u8>,
69 from: SocketAddr,
70 now: Instant,
71 ) -> Self {
72 Self {
73 stream_id,
74 header,
75 header_bytes,
76 seq_counter: 0,
77 started_at: now,
78 last_activity: now,
79 from,
80 }
81 }
82
83 #[must_use]
88 pub fn header_bytes(&self) -> &[u8] {
89 &self.header_bytes
90 }
91
92 pub const fn record_frame(&mut self, now: Instant) {
97 self.seq_counter = self.seq_counter.saturating_add(1);
98 self.last_activity = now;
99 }
100
101 #[must_use]
107 pub const fn should_rebroadcast_header(&self) -> bool {
108 (self.seq_counter.wrapping_add(1)).is_multiple_of(21)
109 }
110
111 #[must_use]
113 pub fn should_evict(&self, now: Instant, timeout: Duration) -> bool {
114 now.duration_since(self.last_activity) >= timeout
115 }
116
117 #[must_use]
119 pub const fn header(&self) -> &DStarHeader {
120 &self.header
121 }
122
123 #[must_use]
125 pub const fn stream_id(&self) -> StreamId {
126 self.stream_id
127 }
128
129 #[must_use]
131 pub const fn from(&self) -> SocketAddr {
132 self.from
133 }
134
135 #[must_use]
137 pub const fn started_at(&self) -> Instant {
138 self.started_at
139 }
140
141 #[must_use]
143 pub const fn last_activity(&self) -> Instant {
144 self.last_activity
145 }
146}
147
148#[cfg(test)]
149mod tests {
150 use super::{Duration, Instant, SocketAddr, StreamCache, StreamId};
151 use dstar_gateway_core::header::DStarHeader;
152 use dstar_gateway_core::types::{Callsign, Suffix};
153 use std::net::{IpAddr, Ipv4Addr};
154
155 const PEER: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 30001);
156
157 fn peer() -> SocketAddr {
158 PEER
159 }
160
161 fn header() -> DStarHeader {
162 DStarHeader {
163 flag1: 0,
164 flag2: 0,
165 flag3: 0,
166 rpt2: Callsign::from_wire_bytes(*b"REF030 G"),
167 rpt1: Callsign::from_wire_bytes(*b"REF030 C"),
168 ur_call: Callsign::from_wire_bytes(*b"CQCQCQ "),
169 my_call: Callsign::from_wire_bytes(*b"W1AW "),
170 my_suffix: Suffix::EMPTY,
171 }
172 }
173
174 const fn sid() -> StreamId {
175 match StreamId::new(0x1234) {
176 Some(s) => s,
177 None => unreachable!(),
178 }
179 }
180
181 #[test]
182 fn accessors_return_constructor_inputs() {
183 let now = Instant::now();
184 let cache = StreamCache::new(sid(), header(), peer(), now);
185 assert_eq!(cache.stream_id(), sid());
186 assert_eq!(cache.from(), peer());
187 assert_eq!(cache.started_at(), now);
188 assert_eq!(cache.last_activity(), now);
189 let h = cache.header();
191 assert_eq!(h.my_call, header().my_call);
192 }
193
194 #[test]
195 fn should_rebroadcast_header_fires_every_21_frames() {
196 let now = Instant::now();
197 let mut cache = StreamCache::new(sid(), header(), peer(), now);
198 let mut rebroadcasts = 0_u32;
200 for _ in 0..50 {
201 cache.record_frame(now);
202 if cache.should_rebroadcast_header() {
203 rebroadcasts = rebroadcasts.saturating_add(1);
204 }
205 }
206 assert_eq!(
209 rebroadcasts, 2,
210 "two rebroadcast boundaries in 50 frames at 21-frame cadence"
211 );
212 }
213
214 #[test]
215 fn should_rebroadcast_header_is_false_initially() {
216 let now = Instant::now();
217 let cache = StreamCache::new(sid(), header(), peer(), now);
218 assert!(!cache.should_rebroadcast_header());
220 }
221
222 #[test]
223 fn should_evict_triggers_after_timeout() {
224 let start = Instant::now();
225 let mut cache = StreamCache::new(sid(), header(), peer(), start);
226 let timeout = Duration::from_secs(2);
227 assert!(!cache.should_evict(start, timeout));
229 let t1 = start + Duration::from_millis(500);
231 cache.record_frame(t1);
232 assert!(!cache.should_evict(t1, timeout));
233 let t2 = t1 + Duration::from_millis(2500);
235 assert!(cache.should_evict(t2, timeout));
236 }
237
238 #[test]
239 fn record_frame_updates_last_activity() {
240 let start = Instant::now();
241 let mut cache = StreamCache::new(sid(), header(), peer(), start);
242 assert_eq!(cache.last_activity(), start);
243 let t1 = start + Duration::from_millis(100);
244 cache.record_frame(t1);
245 assert_eq!(cache.last_activity(), t1);
246 }
247}