stargazer/tier3/capture.rs
1// StreamCapture and CaptureManager are consumed by the AsyncSession-based
2// Tier 3 orchestrator (connection-pool management), which is not yet
3// implemented. Until then the types are built and unit-tested in place.
4// The `cfg_attr(not(test), ...)` gate avoids an unfulfilled expectation
5// in the test build, where the unit tests exercise every item.
6#![cfg_attr(
7 not(test),
8 expect(
9 dead_code,
10 reason = "capture state is wired in once the tier-3 connection-pool orchestrator lands"
11 )
12)]
13
14//! Voice-stream capture state manager.
15//!
16//! A D-STAR voice transmission on the wire is a sequence of protocol events:
17//!
18//! ```text
19//! VoiceStart(header) -> VoiceFrame x N -> VoiceEnd
20//! ```
21//!
22//! Each transmission carries two parallel data streams inside the voice frames:
23//!
24//! - **AMBE voice data** (9 bytes/frame, 50 frames/sec): the compressed
25//! speech audio that will be decoded to PCM and then to MP3.
26//!
27//! - **Slow data** (3 bytes/frame, 50 frames/sec): a low-bandwidth side
28//! channel carrying things like text messages, DPRS position reports,
29//! and squelch codes. Each logical slow-data block spans several frames
30//! and is reassembled by [`SlowDataAssembler`]. Frame 0 of every 21-frame
31//! superframe is a sync frame (slow-data content `0x555555`), not a data
32//! fragment — the assembler handles the descrambling internally.
33//!
34//! This module provides two types that encapsulate the per-stream and
35//! cross-stream bookkeeping:
36//!
37//! - [`StreamCapture`] — one in-progress capture: header metadata, all
38//! accumulated AMBE frames, and the slow-data assembler with its
39//! recovered text/DPRS fields.
40//!
41//! - [`CaptureManager`] — maps stream IDs to active captures, so that
42//! voice frames from multiple concurrent transmissions (different
43//! modules, different reflectors) can be dispatched to the correct
44//! capture without cross-contamination.
45//!
46//! ## Slow-data assembly details
47//!
48//! [`dstar_gateway_core::SlowDataAssembler::push`] takes one 3-byte fragment
49//! at a time and returns `Some(SlowDataBlock)` when a block completes. For
50//! [`SlowDataBlock::Text`], the `.text` field holds the 20-character status
51//! message. For [`SlowDataBlock::Gps`], the payload is a raw NMEA-style
52//! DPRS sentence that we feed into [`dstar_gateway_core::parse_dprs`] to
53//! extract latitude and longitude. All other block kinds (header retx,
54//! fast data, squelch, unknown) are ignored for persistence purposes —
55//! they aren't part of the streams table schema.
56
57use std::collections::HashMap;
58
59use chrono::{DateTime, Utc};
60use dstar_gateway::{DStarHeader, StreamId, VoiceFrame};
61use dstar_gateway_core::{SlowDataAssembler, SlowDataBlock, parse_dprs};
62
63/// State for one in-progress voice-stream capture.
64///
65/// Created when a D-STAR `VoiceStart` event arrives (via
66/// [`Self::new`], which extracts header metadata). Frames are added via
67/// [`Self::push_frame`] as `VoiceFrame` events arrive. When the stream ends
68/// (EOT or timeout), the accumulated state is consumed by the Tier 3
69/// orchestrator to produce an MP3 and persist a database row.
70#[derive(Debug)]
71pub(crate) struct StreamCapture {
72 /// Reflector callsign this stream was captured from (e.g. `"REF030"`).
73 pub(crate) reflector: String,
74
75 /// Module letter (A-Z) the stream was received on.
76 pub(crate) module: String,
77
78 /// Protocol name: `"dplus"`, `"dextra"`, or `"dcs"`.
79 pub(crate) protocol: String,
80
81 /// D-STAR stream ID (non-zero u16 on wire).
82 pub(crate) stream_id: u16,
83
84 /// Operator callsign (`my_call` from the header), ASCII-trimmed.
85 pub(crate) callsign: String,
86
87 /// Operator suffix (`my_suffix` from the header), if non-empty after trim.
88 pub(crate) suffix: Option<String>,
89
90 /// Destination callsign (`ur_call` from the header), if non-empty after trim.
91 pub(crate) ur_call: Option<String>,
92
93 /// AMBE voice frames accumulated in receive order.
94 ///
95 /// Each frame is 9 bytes; 160 PCM samples will be produced per frame
96 /// by `mbelib-rs`. The order is load-bearing: the AMBE decoder uses
97 /// inter-frame delta prediction.
98 pub(crate) ambe_frames: Vec<[u8; 9]>,
99
100 /// Stateful slow-data reassembler.
101 ///
102 /// Fed one 3-byte fragment per voice frame via
103 /// [`SlowDataAssembler::push`]. Emits [`SlowDataBlock`] values when
104 /// complete blocks arrive; we consume those immediately to update
105 /// `dstar_text` and `dprs_lat`/`dprs_lon`.
106 slow_data: SlowDataAssembler,
107
108 /// Latest slow-data text message decoded from this stream, if any.
109 pub(crate) dstar_text: Option<String>,
110
111 /// DPRS latitude in decimal degrees, if a DPRS sentence was decoded.
112 pub(crate) dprs_lat: Option<f64>,
113
114 /// DPRS longitude in decimal degrees, if a DPRS sentence was decoded.
115 pub(crate) dprs_lon: Option<f64>,
116
117 /// When the voice header arrived (UTC wall-clock time).
118 pub(crate) started_at: DateTime<Utc>,
119}
120
121impl StreamCapture {
122 /// Creates a new capture from the D-STAR voice header.
123 ///
124 /// Extracts `my_call`, `my_suffix`, and `ur_call` from the header,
125 /// trimming ASCII whitespace and dropping empty strings. The header's
126 /// RPT1/RPT2 fields are not persisted — they are routing metadata and
127 /// are redundant with the `reflector`/`module` arguments.
128 pub(crate) fn new(
129 reflector: String,
130 module: String,
131 protocol: String,
132 stream_id: StreamId,
133 header: &DStarHeader,
134 ) -> Self {
135 // Callsign and suffix are fixed-width, space-padded on the wire.
136 // Trim trailing whitespace for persistence; drop to None if empty.
137 let my_call_raw = header.my_call.as_str();
138 let callsign = my_call_raw.trim_end().to_string();
139
140 let my_suffix_raw = header.my_suffix.as_str();
141 let suffix_trimmed = my_suffix_raw.trim_end().to_string();
142 let suffix = if suffix_trimmed.is_empty() {
143 None
144 } else {
145 Some(suffix_trimmed)
146 };
147
148 let ur_call_raw = header.ur_call.as_str();
149 let ur_trimmed = ur_call_raw.trim_end().to_string();
150 let ur_call = if ur_trimmed.is_empty() {
151 None
152 } else {
153 Some(ur_trimmed)
154 };
155
156 Self {
157 reflector,
158 module,
159 protocol,
160 stream_id: stream_id.get(),
161 callsign,
162 suffix,
163 ur_call,
164 ambe_frames: Vec::new(),
165 slow_data: SlowDataAssembler::new(),
166 dstar_text: None,
167 dprs_lat: None,
168 dprs_lon: None,
169 started_at: Utc::now(),
170 }
171 }
172
173 /// Appends one voice frame to the capture.
174 ///
175 /// The 9-byte AMBE payload is stored verbatim for later mbelib-rs
176 /// decoding. The 3-byte slow-data fragment is fed to the assembler,
177 /// which may emit a complete [`SlowDataBlock`] after enough fragments
178 /// arrive. Text and GPS blocks are retained (last-write-wins if the
179 /// transmission contains multiple); other block kinds are ignored.
180 pub(crate) fn push_frame(&mut self, frame: &VoiceFrame) {
181 // Store the AMBE voice payload in receive order.
182 self.ambe_frames.push(frame.ambe);
183
184 // Feed the slow-data fragment and process any emitted block.
185 if let Some(block) = self.slow_data.push(frame.slow_data) {
186 self.consume_slow_data(block);
187 }
188 }
189
190 /// Processes a single assembled slow-data block, updating capture fields.
191 ///
192 /// - `Text`: store the trimmed text message as `dstar_text`.
193 /// - `Gps`: parse the DPRS sentence and store lat/lon if valid; DPRS
194 /// parse failures are non-fatal (real-world slow data often contains
195 /// malformed GPS payloads from non-GPS-equipped radios).
196 /// - All other variants (`HeaderRetx`, `FastData`, `Squelch`,
197 /// `Unknown`): ignored — not part of the streams table.
198 fn consume_slow_data(&mut self, block: SlowDataBlock) {
199 match block {
200 SlowDataBlock::Text(text) => {
201 // The assembler already trimmed trailing spaces/nulls.
202 // Discard if completely empty after trim.
203 if !text.text.is_empty() {
204 self.dstar_text = Some(text.text);
205 }
206 }
207 SlowDataBlock::Gps(sentence) => {
208 // Non-DPRS GPS slow-data is common (raw NMEA passthrough
209 // from GPS-capable radios). `parse_dprs` only succeeds on
210 // `$$CRC`-prefixed DPRS sentences; other GPS payloads are
211 // silently dropped.
212 if let Ok(report) = parse_dprs(&sentence) {
213 self.dprs_lat = Some(report.latitude.degrees());
214 self.dprs_lon = Some(report.longitude.degrees());
215 }
216 }
217 // Header retx, fast data, squelch, unknown — none are part
218 // of the streams table schema. `SlowDataBlock` is marked
219 // `#[non_exhaustive]`; the wildcard also catches any future
220 // block kinds added upstream.
221 _ => {}
222 }
223 }
224
225 /// Returns the number of voice frames captured so far.
226 pub(crate) const fn frame_count(&self) -> usize {
227 self.ambe_frames.len()
228 }
229}
230
231/// Manages multiple concurrent voice-stream captures.
232///
233/// A single Tier 3 connection can multiplex voice streams across modules
234/// (e.g. a reflector with modules A, B, C each carrying an independent
235/// transmission at the same time). The `CaptureManager` maps each wire
236/// stream ID to its in-progress [`StreamCapture`] so that frames are
237/// routed correctly.
238///
239/// # Lifecycle
240///
241/// 1. [`Self::start`] — called on `VoiceStart`: registers a new capture
242/// keyed by the stream ID from the header.
243/// 2. [`Self::push_frame`] — called on `VoiceFrame`: appends the frame to
244/// the matching capture (no-op if the stream ID is unknown, which can
245/// happen if we missed the header).
246/// 3. [`Self::end`] — called on `VoiceEnd`: removes and returns the
247/// capture for finalization (MP3 encode + database insert).
248#[derive(Debug, Default)]
249pub(crate) struct CaptureManager {
250 /// Active captures keyed by the wire stream ID.
251 active: HashMap<u16, StreamCapture>,
252}
253
254impl CaptureManager {
255 /// Creates an empty manager with no active captures.
256 pub(crate) fn new() -> Self {
257 Self::default()
258 }
259
260 /// Registers a new capture. Replaces any existing entry for the same
261 /// stream ID (a fresh header always supersedes any partial state).
262 pub(crate) fn start(&mut self, capture: StreamCapture) {
263 let _existing = self.active.insert(capture.stream_id, capture);
264 }
265
266 /// Appends a voice frame to the capture matching `stream_id`.
267 ///
268 /// No-op (logs a debug message) if no capture is registered for this
269 /// stream ID. This can happen if we joined the wire mid-stream and
270 /// missed the header, or if a stale frame arrives after we already
271 /// processed the `VoiceEnd`.
272 pub(crate) fn push_frame(&mut self, stream_id: u16, frame: &VoiceFrame) {
273 if let Some(capture) = self.active.get_mut(&stream_id) {
274 capture.push_frame(frame);
275 } else {
276 tracing::debug!(stream_id, "voice frame for unknown stream — dropping");
277 }
278 }
279
280 /// Finalizes and removes the capture matching `stream_id`.
281 ///
282 /// Returns the captured state for downstream processing (MP3 encode,
283 /// database insert). Returns `None` if no capture was registered for
284 /// this stream ID.
285 pub(crate) fn end(&mut self, stream_id: u16) -> Option<StreamCapture> {
286 self.active.remove(&stream_id)
287 }
288
289 /// Returns the number of captures currently in progress.
290 ///
291 /// Used for observability (metrics exported on the HTTP API) and for
292 /// backpressure decisions in the orchestrator.
293 pub(crate) fn active_count(&self) -> usize {
294 self.active.len()
295 }
296}
297
298#[cfg(test)]
299mod tests {
300 use super::*;
301
302 use dstar_gateway_core::{Callsign, Suffix};
303
304 type TestResult = Result<(), Box<dyn std::error::Error>>;
305
306 /// Builds a minimal `DStarHeader` for tests.
307 fn test_header() -> DStarHeader {
308 DStarHeader {
309 flag1: 0x00,
310 flag2: 0x00,
311 flag3: 0x00,
312 rpt2: Callsign::from_wire_bytes(*b"REF030 G"),
313 rpt1: Callsign::from_wire_bytes(*b"REF030 C"),
314 ur_call: Callsign::from_wire_bytes(*b"CQCQCQ "),
315 my_call: Callsign::from_wire_bytes(*b"W1AW "),
316 my_suffix: Suffix::from_wire_bytes(*b"D75 "),
317 }
318 }
319
320 fn stream_id(raw: u16) -> StreamId {
321 // Tests only use non-zero IDs; fall back to 1 for safety.
322 // `StreamId::new(1)` is const-safe and never returns None.
323 const ONE: StreamId = match StreamId::new(1) {
324 Some(s) => s,
325 None => unreachable!(),
326 };
327 StreamId::new(raw).unwrap_or(ONE)
328 }
329
330 fn make_capture(raw_id: u16) -> StreamCapture {
331 StreamCapture::new(
332 "REF030".to_string(),
333 "C".to_string(),
334 "dplus".to_string(),
335 stream_id(raw_id),
336 &test_header(),
337 )
338 }
339
340 #[test]
341 fn new_extracts_callsign_and_trims_suffix() {
342 let cap = make_capture(0x1234);
343 assert_eq!(cap.callsign, "W1AW", "trailing spaces trimmed");
344 assert_eq!(cap.suffix.as_deref(), Some("D75"), "suffix extracted");
345 assert_eq!(cap.ur_call.as_deref(), Some("CQCQCQ"), "ur_call extracted");
346 assert_eq!(cap.stream_id, 0x1234);
347 assert_eq!(cap.reflector, "REF030");
348 assert_eq!(cap.module, "C");
349 assert_eq!(cap.protocol, "dplus");
350 assert_eq!(cap.frame_count(), 0);
351 }
352
353 #[test]
354 fn new_handles_empty_suffix() {
355 let header = DStarHeader {
356 my_suffix: Suffix::from_wire_bytes(*b" "),
357 ..test_header()
358 };
359 let cap = StreamCapture::new(
360 "REF030".to_string(),
361 "C".to_string(),
362 "dplus".to_string(),
363 stream_id(1),
364 &header,
365 );
366 assert!(cap.suffix.is_none(), "all-space suffix becomes None");
367 }
368
369 #[test]
370 fn push_frame_accumulates_ambe_and_increments_count() -> TestResult {
371 let mut cap = make_capture(0x1111);
372 let frame = VoiceFrame::silence();
373 cap.push_frame(&frame);
374 cap.push_frame(&frame);
375 cap.push_frame(&frame);
376 assert_eq!(cap.frame_count(), 3);
377 assert_eq!(cap.ambe_frames.len(), 3);
378 let first_frame = cap.ambe_frames.first().ok_or("no first frame")?;
379 assert_eq!(*first_frame, frame.ambe);
380 Ok(())
381 }
382
383 #[test]
384 fn manager_new_is_empty() {
385 let mgr = CaptureManager::new();
386 assert_eq!(mgr.active_count(), 0);
387 }
388
389 #[test]
390 fn manager_start_push_end_lifecycle() -> TestResult {
391 let mut mgr = CaptureManager::new();
392
393 // Start a capture.
394 let capture = make_capture(0xAABB);
395 mgr.start(capture);
396 assert_eq!(mgr.active_count(), 1);
397
398 // Push a few frames.
399 let frame = VoiceFrame::silence();
400 mgr.push_frame(0xAABB, &frame);
401 mgr.push_frame(0xAABB, &frame);
402 mgr.push_frame(0xAABB, &frame);
403
404 // End the capture and verify frames accumulated.
405 let finalized = mgr.end(0xAABB).ok_or("capture was removed during push")?;
406 assert_eq!(finalized.frame_count(), 3);
407 assert_eq!(finalized.stream_id, 0xAABB);
408 assert_eq!(mgr.active_count(), 0, "manager empty after end");
409 Ok(())
410 }
411
412 #[test]
413 fn manager_push_to_unknown_stream_is_noop() {
414 let mut mgr = CaptureManager::new();
415 let frame = VoiceFrame::silence();
416 // No active captures; this should just log and return.
417 mgr.push_frame(0xDEAD, &frame);
418 assert_eq!(mgr.active_count(), 0);
419 }
420
421 #[test]
422 fn manager_end_on_unknown_stream_returns_none() {
423 let mut mgr = CaptureManager::new();
424 assert!(mgr.end(0xFFFE).is_none());
425 }
426
427 #[test]
428 fn manager_handles_multiple_concurrent_streams() -> TestResult {
429 let mut mgr = CaptureManager::new();
430 mgr.start(make_capture(0x0001));
431 mgr.start(make_capture(0x0002));
432 mgr.start(make_capture(0x0003));
433 assert_eq!(mgr.active_count(), 3);
434
435 let frame = VoiceFrame::silence();
436 mgr.push_frame(0x0001, &frame);
437 mgr.push_frame(0x0002, &frame);
438 mgr.push_frame(0x0002, &frame);
439 mgr.push_frame(0x0003, &frame);
440 mgr.push_frame(0x0003, &frame);
441 mgr.push_frame(0x0003, &frame);
442
443 let cap1 = mgr.end(0x0001).ok_or("cap1 missing")?;
444 let cap2 = mgr.end(0x0002).ok_or("cap2 missing")?;
445 let cap3 = mgr.end(0x0003).ok_or("cap3 missing")?;
446 assert_eq!(cap1.frame_count(), 1);
447 assert_eq!(cap2.frame_count(), 2);
448 assert_eq!(cap3.frame_count(), 3);
449 assert_eq!(mgr.active_count(), 0);
450 Ok(())
451 }
452
453 #[test]
454 fn manager_start_replaces_existing_stream() -> TestResult {
455 let mut mgr = CaptureManager::new();
456 let mut cap = make_capture(0x1000);
457 cap.ambe_frames.push([1; 9]);
458 mgr.start(cap);
459 // A fresh header for the same stream ID should reset the capture.
460 mgr.start(make_capture(0x1000));
461 assert_eq!(mgr.active_count(), 1);
462 // The new capture starts with zero frames.
463 let cap = mgr
464 .end(0x1000)
465 .ok_or("capture registered via second start")?;
466 assert_eq!(cap.frame_count(), 0);
467 Ok(())
468 }
469}