stargazer/tier3/
capture.rs

1// StreamCapture and CaptureManager are consumed by the AsyncSession-based
2// Tier 3 orchestrator (connection-pool management), which is not yet
3// implemented. Until then the types are built and unit-tested in place.
4// The `cfg_attr(not(test), ...)` gate avoids an unfulfilled expectation
5// in the test build, where the unit tests exercise every item.
6#![cfg_attr(
7    not(test),
8    expect(
9        dead_code,
10        reason = "capture state is wired in once the tier-3 connection-pool orchestrator lands"
11    )
12)]
13
14//! Voice-stream capture state manager.
15//!
16//! A D-STAR voice transmission on the wire is a sequence of protocol events:
17//!
18//! ```text
19//! VoiceStart(header) -> VoiceFrame x N -> VoiceEnd
20//! ```
21//!
22//! Each transmission carries two parallel data streams inside the voice frames:
23//!
24//! - **AMBE voice data** (9 bytes/frame, 50 frames/sec): the compressed
25//!   speech audio that will be decoded to PCM and then to MP3.
26//!
27//! - **Slow data** (3 bytes/frame, 50 frames/sec): a low-bandwidth side
28//!   channel carrying things like text messages, DPRS position reports,
29//!   and squelch codes. Each logical slow-data block spans several frames
30//!   and is reassembled by [`SlowDataAssembler`]. Frame 0 of every 21-frame
31//!   superframe is a sync frame (slow-data content `0x555555`), not a data
32//!   fragment — the assembler handles the descrambling internally.
33//!
34//! This module provides two types that encapsulate the per-stream and
35//! cross-stream bookkeeping:
36//!
37//! - [`StreamCapture`] — one in-progress capture: header metadata, all
38//!   accumulated AMBE frames, and the slow-data assembler with its
39//!   recovered text/DPRS fields.
40//!
41//! - [`CaptureManager`] — maps stream IDs to active captures, so that
42//!   voice frames from multiple concurrent transmissions (different
43//!   modules, different reflectors) can be dispatched to the correct
44//!   capture without cross-contamination.
45//!
46//! ## Slow-data assembly details
47//!
48//! [`dstar_gateway_core::SlowDataAssembler::push`] takes one 3-byte fragment
49//! at a time and returns `Some(SlowDataBlock)` when a block completes. For
50//! [`SlowDataBlock::Text`], the `.text` field holds the 20-character status
51//! message. For [`SlowDataBlock::Gps`], the payload is a raw NMEA-style
52//! DPRS sentence that we feed into [`dstar_gateway_core::parse_dprs`] to
53//! extract latitude and longitude. All other block kinds (header retx,
54//! fast data, squelch, unknown) are ignored for persistence purposes —
55//! they aren't part of the streams table schema.
56
57use std::collections::HashMap;
58
59use chrono::{DateTime, Utc};
60use dstar_gateway::{DStarHeader, StreamId, VoiceFrame};
61use dstar_gateway_core::{SlowDataAssembler, SlowDataBlock, parse_dprs};
62
63/// State for one in-progress voice-stream capture.
64///
65/// Created when a D-STAR `VoiceStart` event arrives (via
66/// [`Self::new`], which extracts header metadata). Frames are added via
67/// [`Self::push_frame`] as `VoiceFrame` events arrive. When the stream ends
68/// (EOT or timeout), the accumulated state is consumed by the Tier 3
69/// orchestrator to produce an MP3 and persist a database row.
70#[derive(Debug)]
71pub(crate) struct StreamCapture {
72    /// Reflector callsign this stream was captured from (e.g. `"REF030"`).
73    pub(crate) reflector: String,
74
75    /// Module letter (A-Z) the stream was received on.
76    pub(crate) module: String,
77
78    /// Protocol name: `"dplus"`, `"dextra"`, or `"dcs"`.
79    pub(crate) protocol: String,
80
81    /// D-STAR stream ID (non-zero u16 on wire).
82    pub(crate) stream_id: u16,
83
84    /// Operator callsign (`my_call` from the header), ASCII-trimmed.
85    pub(crate) callsign: String,
86
87    /// Operator suffix (`my_suffix` from the header), if non-empty after trim.
88    pub(crate) suffix: Option<String>,
89
90    /// Destination callsign (`ur_call` from the header), if non-empty after trim.
91    pub(crate) ur_call: Option<String>,
92
93    /// AMBE voice frames accumulated in receive order.
94    ///
95    /// Each frame is 9 bytes; 160 PCM samples will be produced per frame
96    /// by `mbelib-rs`. The order is load-bearing: the AMBE decoder uses
97    /// inter-frame delta prediction.
98    pub(crate) ambe_frames: Vec<[u8; 9]>,
99
100    /// Stateful slow-data reassembler.
101    ///
102    /// Fed one 3-byte fragment per voice frame via
103    /// [`SlowDataAssembler::push`]. Emits [`SlowDataBlock`] values when
104    /// complete blocks arrive; we consume those immediately to update
105    /// `dstar_text` and `dprs_lat`/`dprs_lon`.
106    slow_data: SlowDataAssembler,
107
108    /// Latest slow-data text message decoded from this stream, if any.
109    pub(crate) dstar_text: Option<String>,
110
111    /// DPRS latitude in decimal degrees, if a DPRS sentence was decoded.
112    pub(crate) dprs_lat: Option<f64>,
113
114    /// DPRS longitude in decimal degrees, if a DPRS sentence was decoded.
115    pub(crate) dprs_lon: Option<f64>,
116
117    /// When the voice header arrived (UTC wall-clock time).
118    pub(crate) started_at: DateTime<Utc>,
119}
120
121impl StreamCapture {
122    /// Creates a new capture from the D-STAR voice header.
123    ///
124    /// Extracts `my_call`, `my_suffix`, and `ur_call` from the header,
125    /// trimming ASCII whitespace and dropping empty strings. The header's
126    /// RPT1/RPT2 fields are not persisted — they are routing metadata and
127    /// are redundant with the `reflector`/`module` arguments.
128    pub(crate) fn new(
129        reflector: String,
130        module: String,
131        protocol: String,
132        stream_id: StreamId,
133        header: &DStarHeader,
134    ) -> Self {
135        // Callsign and suffix are fixed-width, space-padded on the wire.
136        // Trim trailing whitespace for persistence; drop to None if empty.
137        let my_call_raw = header.my_call.as_str();
138        let callsign = my_call_raw.trim_end().to_string();
139
140        let my_suffix_raw = header.my_suffix.as_str();
141        let suffix_trimmed = my_suffix_raw.trim_end().to_string();
142        let suffix = if suffix_trimmed.is_empty() {
143            None
144        } else {
145            Some(suffix_trimmed)
146        };
147
148        let ur_call_raw = header.ur_call.as_str();
149        let ur_trimmed = ur_call_raw.trim_end().to_string();
150        let ur_call = if ur_trimmed.is_empty() {
151            None
152        } else {
153            Some(ur_trimmed)
154        };
155
156        Self {
157            reflector,
158            module,
159            protocol,
160            stream_id: stream_id.get(),
161            callsign,
162            suffix,
163            ur_call,
164            ambe_frames: Vec::new(),
165            slow_data: SlowDataAssembler::new(),
166            dstar_text: None,
167            dprs_lat: None,
168            dprs_lon: None,
169            started_at: Utc::now(),
170        }
171    }
172
173    /// Appends one voice frame to the capture.
174    ///
175    /// The 9-byte AMBE payload is stored verbatim for later mbelib-rs
176    /// decoding. The 3-byte slow-data fragment is fed to the assembler,
177    /// which may emit a complete [`SlowDataBlock`] after enough fragments
178    /// arrive. Text and GPS blocks are retained (last-write-wins if the
179    /// transmission contains multiple); other block kinds are ignored.
180    pub(crate) fn push_frame(&mut self, frame: &VoiceFrame) {
181        // Store the AMBE voice payload in receive order.
182        self.ambe_frames.push(frame.ambe);
183
184        // Feed the slow-data fragment and process any emitted block.
185        if let Some(block) = self.slow_data.push(frame.slow_data) {
186            self.consume_slow_data(block);
187        }
188    }
189
190    /// Processes a single assembled slow-data block, updating capture fields.
191    ///
192    /// - `Text`: store the trimmed text message as `dstar_text`.
193    /// - `Gps`: parse the DPRS sentence and store lat/lon if valid; DPRS
194    ///   parse failures are non-fatal (real-world slow data often contains
195    ///   malformed GPS payloads from non-GPS-equipped radios).
196    /// - All other variants (`HeaderRetx`, `FastData`, `Squelch`,
197    ///   `Unknown`): ignored — not part of the streams table.
198    fn consume_slow_data(&mut self, block: SlowDataBlock) {
199        match block {
200            SlowDataBlock::Text(text) => {
201                // The assembler already trimmed trailing spaces/nulls.
202                // Discard if completely empty after trim.
203                if !text.text.is_empty() {
204                    self.dstar_text = Some(text.text);
205                }
206            }
207            SlowDataBlock::Gps(sentence) => {
208                // Non-DPRS GPS slow-data is common (raw NMEA passthrough
209                // from GPS-capable radios). `parse_dprs` only succeeds on
210                // `$$CRC`-prefixed DPRS sentences; other GPS payloads are
211                // silently dropped.
212                if let Ok(report) = parse_dprs(&sentence) {
213                    self.dprs_lat = Some(report.latitude.degrees());
214                    self.dprs_lon = Some(report.longitude.degrees());
215                }
216            }
217            // Header retx, fast data, squelch, unknown — none are part
218            // of the streams table schema. `SlowDataBlock` is marked
219            // `#[non_exhaustive]`; the wildcard also catches any future
220            // block kinds added upstream.
221            _ => {}
222        }
223    }
224
225    /// Returns the number of voice frames captured so far.
226    pub(crate) const fn frame_count(&self) -> usize {
227        self.ambe_frames.len()
228    }
229}
230
231/// Manages multiple concurrent voice-stream captures.
232///
233/// A single Tier 3 connection can multiplex voice streams across modules
234/// (e.g. a reflector with modules A, B, C each carrying an independent
235/// transmission at the same time). The `CaptureManager` maps each wire
236/// stream ID to its in-progress [`StreamCapture`] so that frames are
237/// routed correctly.
238///
239/// # Lifecycle
240///
241/// 1. [`Self::start`] — called on `VoiceStart`: registers a new capture
242///    keyed by the stream ID from the header.
243/// 2. [`Self::push_frame`] — called on `VoiceFrame`: appends the frame to
244///    the matching capture (no-op if the stream ID is unknown, which can
245///    happen if we missed the header).
246/// 3. [`Self::end`] — called on `VoiceEnd`: removes and returns the
247///    capture for finalization (MP3 encode + database insert).
248#[derive(Debug, Default)]
249pub(crate) struct CaptureManager {
250    /// Active captures keyed by the wire stream ID.
251    active: HashMap<u16, StreamCapture>,
252}
253
254impl CaptureManager {
255    /// Creates an empty manager with no active captures.
256    pub(crate) fn new() -> Self {
257        Self::default()
258    }
259
260    /// Registers a new capture. Replaces any existing entry for the same
261    /// stream ID (a fresh header always supersedes any partial state).
262    pub(crate) fn start(&mut self, capture: StreamCapture) {
263        let _existing = self.active.insert(capture.stream_id, capture);
264    }
265
266    /// Appends a voice frame to the capture matching `stream_id`.
267    ///
268    /// No-op (logs a debug message) if no capture is registered for this
269    /// stream ID. This can happen if we joined the wire mid-stream and
270    /// missed the header, or if a stale frame arrives after we already
271    /// processed the `VoiceEnd`.
272    pub(crate) fn push_frame(&mut self, stream_id: u16, frame: &VoiceFrame) {
273        if let Some(capture) = self.active.get_mut(&stream_id) {
274            capture.push_frame(frame);
275        } else {
276            tracing::debug!(stream_id, "voice frame for unknown stream — dropping");
277        }
278    }
279
280    /// Finalizes and removes the capture matching `stream_id`.
281    ///
282    /// Returns the captured state for downstream processing (MP3 encode,
283    /// database insert). Returns `None` if no capture was registered for
284    /// this stream ID.
285    pub(crate) fn end(&mut self, stream_id: u16) -> Option<StreamCapture> {
286        self.active.remove(&stream_id)
287    }
288
289    /// Returns the number of captures currently in progress.
290    ///
291    /// Used for observability (metrics exported on the HTTP API) and for
292    /// backpressure decisions in the orchestrator.
293    pub(crate) fn active_count(&self) -> usize {
294        self.active.len()
295    }
296}
297
298#[cfg(test)]
299mod tests {
300    use super::*;
301
302    use dstar_gateway_core::{Callsign, Suffix};
303
304    type TestResult = Result<(), Box<dyn std::error::Error>>;
305
306    /// Builds a minimal `DStarHeader` for tests.
307    fn test_header() -> DStarHeader {
308        DStarHeader {
309            flag1: 0x00,
310            flag2: 0x00,
311            flag3: 0x00,
312            rpt2: Callsign::from_wire_bytes(*b"REF030 G"),
313            rpt1: Callsign::from_wire_bytes(*b"REF030 C"),
314            ur_call: Callsign::from_wire_bytes(*b"CQCQCQ  "),
315            my_call: Callsign::from_wire_bytes(*b"W1AW    "),
316            my_suffix: Suffix::from_wire_bytes(*b"D75 "),
317        }
318    }
319
320    fn stream_id(raw: u16) -> StreamId {
321        // Tests only use non-zero IDs; fall back to 1 for safety.
322        // `StreamId::new(1)` is const-safe and never returns None.
323        const ONE: StreamId = match StreamId::new(1) {
324            Some(s) => s,
325            None => unreachable!(),
326        };
327        StreamId::new(raw).unwrap_or(ONE)
328    }
329
330    fn make_capture(raw_id: u16) -> StreamCapture {
331        StreamCapture::new(
332            "REF030".to_string(),
333            "C".to_string(),
334            "dplus".to_string(),
335            stream_id(raw_id),
336            &test_header(),
337        )
338    }
339
340    #[test]
341    fn new_extracts_callsign_and_trims_suffix() {
342        let cap = make_capture(0x1234);
343        assert_eq!(cap.callsign, "W1AW", "trailing spaces trimmed");
344        assert_eq!(cap.suffix.as_deref(), Some("D75"), "suffix extracted");
345        assert_eq!(cap.ur_call.as_deref(), Some("CQCQCQ"), "ur_call extracted");
346        assert_eq!(cap.stream_id, 0x1234);
347        assert_eq!(cap.reflector, "REF030");
348        assert_eq!(cap.module, "C");
349        assert_eq!(cap.protocol, "dplus");
350        assert_eq!(cap.frame_count(), 0);
351    }
352
353    #[test]
354    fn new_handles_empty_suffix() {
355        let header = DStarHeader {
356            my_suffix: Suffix::from_wire_bytes(*b"    "),
357            ..test_header()
358        };
359        let cap = StreamCapture::new(
360            "REF030".to_string(),
361            "C".to_string(),
362            "dplus".to_string(),
363            stream_id(1),
364            &header,
365        );
366        assert!(cap.suffix.is_none(), "all-space suffix becomes None");
367    }
368
369    #[test]
370    fn push_frame_accumulates_ambe_and_increments_count() -> TestResult {
371        let mut cap = make_capture(0x1111);
372        let frame = VoiceFrame::silence();
373        cap.push_frame(&frame);
374        cap.push_frame(&frame);
375        cap.push_frame(&frame);
376        assert_eq!(cap.frame_count(), 3);
377        assert_eq!(cap.ambe_frames.len(), 3);
378        let first_frame = cap.ambe_frames.first().ok_or("no first frame")?;
379        assert_eq!(*first_frame, frame.ambe);
380        Ok(())
381    }
382
383    #[test]
384    fn manager_new_is_empty() {
385        let mgr = CaptureManager::new();
386        assert_eq!(mgr.active_count(), 0);
387    }
388
389    #[test]
390    fn manager_start_push_end_lifecycle() -> TestResult {
391        let mut mgr = CaptureManager::new();
392
393        // Start a capture.
394        let capture = make_capture(0xAABB);
395        mgr.start(capture);
396        assert_eq!(mgr.active_count(), 1);
397
398        // Push a few frames.
399        let frame = VoiceFrame::silence();
400        mgr.push_frame(0xAABB, &frame);
401        mgr.push_frame(0xAABB, &frame);
402        mgr.push_frame(0xAABB, &frame);
403
404        // End the capture and verify frames accumulated.
405        let finalized = mgr.end(0xAABB).ok_or("capture was removed during push")?;
406        assert_eq!(finalized.frame_count(), 3);
407        assert_eq!(finalized.stream_id, 0xAABB);
408        assert_eq!(mgr.active_count(), 0, "manager empty after end");
409        Ok(())
410    }
411
412    #[test]
413    fn manager_push_to_unknown_stream_is_noop() {
414        let mut mgr = CaptureManager::new();
415        let frame = VoiceFrame::silence();
416        // No active captures; this should just log and return.
417        mgr.push_frame(0xDEAD, &frame);
418        assert_eq!(mgr.active_count(), 0);
419    }
420
421    #[test]
422    fn manager_end_on_unknown_stream_returns_none() {
423        let mut mgr = CaptureManager::new();
424        assert!(mgr.end(0xFFFE).is_none());
425    }
426
427    #[test]
428    fn manager_handles_multiple_concurrent_streams() -> TestResult {
429        let mut mgr = CaptureManager::new();
430        mgr.start(make_capture(0x0001));
431        mgr.start(make_capture(0x0002));
432        mgr.start(make_capture(0x0003));
433        assert_eq!(mgr.active_count(), 3);
434
435        let frame = VoiceFrame::silence();
436        mgr.push_frame(0x0001, &frame);
437        mgr.push_frame(0x0002, &frame);
438        mgr.push_frame(0x0002, &frame);
439        mgr.push_frame(0x0003, &frame);
440        mgr.push_frame(0x0003, &frame);
441        mgr.push_frame(0x0003, &frame);
442
443        let cap1 = mgr.end(0x0001).ok_or("cap1 missing")?;
444        let cap2 = mgr.end(0x0002).ok_or("cap2 missing")?;
445        let cap3 = mgr.end(0x0003).ok_or("cap3 missing")?;
446        assert_eq!(cap1.frame_count(), 1);
447        assert_eq!(cap2.frame_count(), 2);
448        assert_eq!(cap3.frame_count(), 3);
449        assert_eq!(mgr.active_count(), 0);
450        Ok(())
451    }
452
453    #[test]
454    fn manager_start_replaces_existing_stream() -> TestResult {
455        let mut mgr = CaptureManager::new();
456        let mut cap = make_capture(0x1000);
457        cap.ambe_frames.push([1; 9]);
458        mgr.start(cap);
459        // A fresh header for the same stream ID should reset the capture.
460        mgr.start(make_capture(0x1000));
461        assert_eq!(mgr.active_count(), 1);
462        // The new capture starts with zero frames.
463        let cap = mgr
464            .end(0x1000)
465            .ok_or("capture registered via second start")?;
466        assert_eq!(cap.frame_count(), 0);
467        Ok(())
468    }
469}