dstar_gateway_server/reflector/
stream_cache.rs

1//! `StreamCache` — tracks one active voice stream per module.
2//!
3//! Voice headers must be periodically rebroadcast so clients who
4//! joined mid-stream (or missed the initial header) can still decode
5//! the audio. The reflector stores one [`StreamCache`] per module
6//! currently carrying a stream, and consults it on each incoming
7//! frame to decide:
8//!
9//! - whether to rebroadcast the cached header (every 21 frames, matching
10//!   the xlxd cadence in `cdplusprotocol.cpp:318`);
11//! - whether the stream has gone silent and should be evicted.
12
13use std::net::SocketAddr;
14use std::time::{Duration, Instant};
15
16use dstar_gateway_core::header::DStarHeader;
17use dstar_gateway_core::types::StreamId;
18
19/// Cached state for one active voice stream on one module.
20#[derive(Debug, Clone)]
21pub struct StreamCache {
22    stream_id: StreamId,
23    header: DStarHeader,
24    /// Raw wire-format bytes of the voice header packet, cached for
25    /// 21-frame retransmit. Populated when the endpoint observes a
26    /// fresh voice header; re-sent verbatim on each cadence tick.
27    header_bytes: Vec<u8>,
28    seq_counter: u32,
29    started_at: Instant,
30    last_activity: Instant,
31    from: SocketAddr,
32}
33
34impl StreamCache {
35    /// Construct a new stream cache entry.
36    ///
37    /// Called when the reflector sees a fresh voice header from a
38    /// client. `now` is the wall-clock instant of receipt, used as
39    /// both `started_at` and the initial `last_activity`.
40    #[must_use]
41    pub const fn new(
42        stream_id: StreamId,
43        header: DStarHeader,
44        from: SocketAddr,
45        now: Instant,
46    ) -> Self {
47        Self {
48            stream_id,
49            header,
50            header_bytes: Vec::new(),
51            seq_counter: 0,
52            started_at: now,
53            last_activity: now,
54            from,
55        }
56    }
57
58    /// Construct a new stream cache entry with the raw header bytes
59    /// cached for retransmit.
60    ///
61    /// Preferred entry point for the fan-out engine, which needs to
62    /// re-send the original wire-format header verbatim every 21
63    /// frames without re-encoding it.
64    #[must_use]
65    pub const fn new_with_bytes(
66        stream_id: StreamId,
67        header: DStarHeader,
68        header_bytes: Vec<u8>,
69        from: SocketAddr,
70        now: Instant,
71    ) -> Self {
72        Self {
73            stream_id,
74            header,
75            header_bytes,
76            seq_counter: 0,
77            started_at: now,
78            last_activity: now,
79            from,
80        }
81    }
82
83    /// Raw wire-format bytes of the cached voice header.
84    ///
85    /// Returns an empty slice if the cache was constructed via
86    /// [`Self::new`] rather than [`Self::new_with_bytes`].
87    #[must_use]
88    pub fn header_bytes(&self) -> &[u8] {
89        &self.header_bytes
90    }
91
92    /// Record the arrival of another voice frame.
93    ///
94    /// Increments the internal sequence counter and refreshes
95    /// `last_activity` so the inactivity watchdog stays armed.
96    pub const fn record_frame(&mut self, now: Instant) {
97        self.seq_counter = self.seq_counter.saturating_add(1);
98        self.last_activity = now;
99    }
100
101    /// Whether the cached header should be rebroadcast on the next tick.
102    ///
103    /// Returns `true` once every 21 frames, matching the xlxd /
104    /// `MMDVMHost` cadence. The boundary is `(seq_counter + 1) % 21 == 0`
105    /// so the first rebroadcast happens after 20 data frames.
106    #[must_use]
107    pub const fn should_rebroadcast_header(&self) -> bool {
108        (self.seq_counter.wrapping_add(1)).is_multiple_of(21)
109    }
110
111    /// Whether this stream has been idle long enough to be evicted.
112    #[must_use]
113    pub fn should_evict(&self, now: Instant, timeout: Duration) -> bool {
114        now.duration_since(self.last_activity) >= timeout
115    }
116
117    /// The cached voice header.
118    #[must_use]
119    pub const fn header(&self) -> &DStarHeader {
120        &self.header
121    }
122
123    /// The stream id this cache tracks.
124    #[must_use]
125    pub const fn stream_id(&self) -> StreamId {
126        self.stream_id
127    }
128
129    /// The peer that originated the stream (so fan-out can avoid echo).
130    #[must_use]
131    pub const fn from(&self) -> SocketAddr {
132        self.from
133    }
134
135    /// When the stream first began (useful for duration metrics).
136    #[must_use]
137    pub const fn started_at(&self) -> Instant {
138        self.started_at
139    }
140
141    /// When the last frame was observed (useful for watchdogs / metrics).
142    #[must_use]
143    pub const fn last_activity(&self) -> Instant {
144        self.last_activity
145    }
146}
147
148#[cfg(test)]
149mod tests {
150    use super::{Duration, Instant, SocketAddr, StreamCache, StreamId};
151    use dstar_gateway_core::header::DStarHeader;
152    use dstar_gateway_core::types::{Callsign, Suffix};
153    use std::net::{IpAddr, Ipv4Addr};
154
155    const PEER: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 30001);
156
157    fn peer() -> SocketAddr {
158        PEER
159    }
160
161    fn header() -> DStarHeader {
162        DStarHeader {
163            flag1: 0,
164            flag2: 0,
165            flag3: 0,
166            rpt2: Callsign::from_wire_bytes(*b"REF030 G"),
167            rpt1: Callsign::from_wire_bytes(*b"REF030 C"),
168            ur_call: Callsign::from_wire_bytes(*b"CQCQCQ  "),
169            my_call: Callsign::from_wire_bytes(*b"W1AW    "),
170            my_suffix: Suffix::EMPTY,
171        }
172    }
173
174    const fn sid() -> StreamId {
175        match StreamId::new(0x1234) {
176            Some(s) => s,
177            None => unreachable!(),
178        }
179    }
180
181    #[test]
182    fn accessors_return_constructor_inputs() {
183        let now = Instant::now();
184        let cache = StreamCache::new(sid(), header(), peer(), now);
185        assert_eq!(cache.stream_id(), sid());
186        assert_eq!(cache.from(), peer());
187        assert_eq!(cache.started_at(), now);
188        assert_eq!(cache.last_activity(), now);
189        // header() returns a borrowed reference — compare field-by-field
190        let h = cache.header();
191        assert_eq!(h.my_call, header().my_call);
192    }
193
194    #[test]
195    fn should_rebroadcast_header_fires_every_21_frames() {
196        let now = Instant::now();
197        let mut cache = StreamCache::new(sid(), header(), peer(), now);
198        // First 19 frames: counter becomes 1..=19, (n+1)%21 != 0 until n=20.
199        let mut rebroadcasts = 0_u32;
200        for _ in 0..50 {
201            cache.record_frame(now);
202            if cache.should_rebroadcast_header() {
203                rebroadcasts = rebroadcasts.saturating_add(1);
204            }
205        }
206        // Two full cycles of 21 within 50 frames → 2 rebroadcasts
207        // (at seq_counter == 20 and seq_counter == 41).
208        assert_eq!(
209            rebroadcasts, 2,
210            "two rebroadcast boundaries in 50 frames at 21-frame cadence"
211        );
212    }
213
214    #[test]
215    fn should_rebroadcast_header_is_false_initially() {
216        let now = Instant::now();
217        let cache = StreamCache::new(sid(), header(), peer(), now);
218        // seq_counter=0, (0+1)%21 = 1 != 0
219        assert!(!cache.should_rebroadcast_header());
220    }
221
222    #[test]
223    fn should_evict_triggers_after_timeout() {
224        let start = Instant::now();
225        let mut cache = StreamCache::new(sid(), header(), peer(), start);
226        let timeout = Duration::from_secs(2);
227        // Fresh cache — not yet evicted at start.
228        assert!(!cache.should_evict(start, timeout));
229        // Record a frame at start + 500ms.
230        let t1 = start + Duration::from_millis(500);
231        cache.record_frame(t1);
232        assert!(!cache.should_evict(t1, timeout));
233        // 2.5s after last activity → evict.
234        let t2 = t1 + Duration::from_millis(2500);
235        assert!(cache.should_evict(t2, timeout));
236    }
237
238    #[test]
239    fn record_frame_updates_last_activity() {
240        let start = Instant::now();
241        let mut cache = StreamCache::new(sid(), header(), peer(), start);
242        assert_eq!(cache.last_activity(), start);
243        let t1 = start + Duration::from_millis(100);
244        cache.record_frame(t1);
245        assert_eq!(cache.last_activity(), t1);
246    }
247}