stargazer/tier3/mod.rs
1//! Tier 3: deep D-STAR protocol connections and voice capture.
2//!
3//! Establishes full D-STAR protocol connections (`DPlus`, `DExtra`, DCS) to
4//! reflectors via the `dstar-gateway` crate's `AsyncSession` API. Captures
5//! voice streams including:
6//!
7//! - D-STAR header metadata (callsign, suffix, UR call, RPT fields)
8//! - AMBE voice frames decoded to PCM via `mbelib-rs`, then encoded to MP3
9//! - Slow data (D-STAR text messages, DPRS position reports)
10//!
11//! The capture pipeline for each stream:
12//!
13//! ```text
14//! AsyncSession<P> -> VoiceStart (header)
15//! -> VoiceFrame x N (AMBE + slow data)
16//! -> VoiceEnd
17//! |
18//! mbelib-rs: AMBE -> PCM
19//! |
20//! mp3lame-encoder: PCM -> MP3
21//! |
22//! Postgres: streams table (metadata + audio_mp3)
23//! ```
24//!
25//! Connections are managed by a priority queue: user-pinned reflectors take
26//! precedence, followed by auto-promoted reflectors ranked by activity score.
27//! Idle sessions (no voice activity beyond the configured timeout) are
28//! disconnected to free slots for more active reflectors.
29//!
30//! # Module layout
31//!
32//! - [`decoder`] — AMBE-to-PCM-to-MP3 audio pipeline using `mbelib-rs` and
33//! `mp3lame-encoder`.
34//! - [`capture`] — Per-stream state tracker (`StreamCapture`) and
35//! cross-stream dispatcher (`CaptureManager`).
36//!
37//! # Current status
38//!
39//! The audio decode pipeline and capture-state machinery are complete and
40//! unit-tested. The remaining orchestration work — maintaining a pool of
41//! `AsyncSession` tasks keyed off the Tier 1/Tier 2 reflector scores — is
42//! left to a follow-up. The public [`run`] entry point currently logs its
43//! configuration and blocks forever; [`process_completed_stream`] is the
44//! production-grade finalizer that the future orchestrator will wire in.
45
46pub(crate) mod capture;
47pub(crate) mod decoder;
48
49use chrono::Utc;
50use sqlx::PgPool;
51
52use crate::config::{AudioConfig, Tier3Config};
53use crate::db::streams::{self, AudioUpdate, NewStream};
54
55use capture::StreamCapture;
56
57/// Runs the Tier 3 voice capture orchestrator.
58///
59/// Manages a pool of D-STAR protocol connections, capturing voice streams and
60/// writing decoded audio with metadata to `PostgreSQL`. Runs until cancelled.
61///
62/// # Current behavior
63///
64/// This is a placeholder: the connection-pool management layer that
65/// drives `AsyncSession<P>` is not yet implemented. For now the
66/// function logs its configuration and then blocks on a never-completing
67/// future so the main task-supervisor loop treats it as a well-behaved
68/// long-running task.
69///
70/// # Errors
71///
72/// Returns an error if a fatal, non-retryable failure occurs (e.g., the
73/// database pool is closed). Today, the stub path cannot fail.
74pub(crate) async fn run(
75 config: Tier3Config,
76 audio_config: AudioConfig,
77 _pool: PgPool,
78) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
79 tracing::info!(
80 max_concurrent_connections = config.max_concurrent_connections,
81 idle_disconnect_secs = config.idle_disconnect_secs,
82 auto_promote = config.auto_promote,
83 dplus_callsign = %config.dplus_callsign,
84 audio_format = %audio_config.format,
85 mp3_bitrate = audio_config.mp3_bitrate,
86 "tier3 voice capture: session-pool orchestration not yet wired — \
87 decoder + capture subsystems ready"
88 );
89
90 // TODO: spawn the AsyncSession pool, fan `VoiceStart`/`VoiceFrame`/
91 // `VoiceEnd` events into `CaptureManager`, and call
92 // `process_completed_stream` on each terminated capture.
93 std::future::pending::<()>().await;
94 Ok(())
95}
96
97/// Finalizes one captured voice stream: MP3-encodes the audio and persists
98/// the row to `PostgreSQL`.
99///
100/// Called by the Tier 3 orchestrator when a `VoiceEnd` event fires (or an
101/// idle timeout elapses). The orchestrator is stubbed in [`run`] above so
102/// the function is unreferenced at build time today — the follow-up task
103/// will wire it into the session pool.
104///
105/// Two database writes happen:
106///
107/// 1. [`streams::insert_stream`] — creates the row with header metadata.
108/// 2. [`streams::update_audio`] — fills in the MP3 blob, frame count, end
109/// timestamp, and decoded slow-data fields (text, DPRS lat/lon).
110///
111/// The split into insert + update mirrors the schema lifecycle documented
112/// in `db/streams.rs`: the row is visible to the HTTP API as soon as the
113/// header arrives, and its `audio_mp3` field transitions from NULL to a
114/// populated blob when encoding completes.
115///
116/// # Errors
117///
118/// - AMBE/MP3 decode errors ([`decoder::DecodeError`]) are wrapped in the
119/// returned boxed error and logged at `warn` level — one bad stream
120/// should not abort the capture loop.
121/// - SQL errors from the insert or update are bubbled up so the caller can
122/// distinguish transient database issues from bad audio.
123#[expect(
124 dead_code,
125 reason = "wired into the session pool in the follow-up orchestrator task"
126)]
127pub(crate) async fn process_completed_stream(
128 capture: StreamCapture,
129 pool: &PgPool,
130 mp3_bitrate: u32,
131) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
132 let frame_count = capture.frame_count();
133 if frame_count == 0 {
134 // Empty capture: nothing to encode. Drop silently — the header
135 // alone is insufficient to justify a row (the HTTP API would show
136 // a zero-duration "transmission" that serves no purpose).
137 tracing::debug!(
138 reflector = %capture.reflector,
139 module = %capture.module,
140 stream_id = capture.stream_id,
141 "stream ended with zero voice frames — dropping"
142 );
143 return Ok(());
144 }
145
146 // Encode AMBE frames to MP3. This is CPU-bound but short (typically
147 // a few milliseconds even for a 30-second transmission); inline is
148 // fine rather than `spawn_blocking`.
149 let mp3_bytes = decoder::decode_to_mp3(&capture.ambe_frames, mp3_bitrate).map_err(|e| {
150 tracing::warn!(
151 reflector = %capture.reflector,
152 module = %capture.module,
153 stream_id = capture.stream_id,
154 frame_count,
155 error = %e,
156 "failed to encode captured stream to MP3"
157 );
158 e
159 })?;
160
161 // Insert the row with header metadata. `audio_mp3` starts NULL; the
162 // second query (`update_audio`) fills it in.
163 let new_stream = NewStream {
164 reflector: &capture.reflector,
165 module: &capture.module,
166 protocol: &capture.protocol,
167 stream_id: i32::from(capture.stream_id),
168 callsign: &capture.callsign,
169 suffix: capture.suffix.as_deref(),
170 ur_call: capture.ur_call.as_deref(),
171 started_at: capture.started_at,
172 };
173 let row_id = streams::insert_stream(pool, &new_stream).await?;
174
175 // Populate audio + metadata. frame_count is usize on our side but
176 // Postgres stores INTEGER (i32); saturate on overflow to avoid a panic
177 // on pathologically long (>2.1B-frame) transmissions.
178 let frame_count_i32 = i32::try_from(frame_count).unwrap_or(i32::MAX);
179 let audio_update = AudioUpdate {
180 audio_mp3: &mp3_bytes,
181 ended_at: Utc::now(),
182 frame_count: frame_count_i32,
183 dstar_text: capture.dstar_text.as_deref(),
184 dprs_lat: capture.dprs_lat,
185 dprs_lon: capture.dprs_lon,
186 };
187 streams::update_audio(pool, row_id, &audio_update).await?;
188
189 tracing::info!(
190 row_id,
191 reflector = %capture.reflector,
192 module = %capture.module,
193 protocol = %capture.protocol,
194 stream_id = capture.stream_id,
195 callsign = %capture.callsign,
196 frame_count,
197 mp3_bytes = mp3_bytes.len(),
198 has_text = capture.dstar_text.is_some(),
199 has_dprs = capture.dprs_lat.is_some(),
200 "voice stream captured and stored"
201 );
202
203 Ok(())
204}