stargazer/tier3/
mod.rs

1//! Tier 3: deep D-STAR protocol connections and voice capture.
2//!
3//! Establishes full D-STAR protocol connections (`DPlus`, `DExtra`, DCS) to
4//! reflectors via the `dstar-gateway` crate's `AsyncSession` API. Captures
5//! voice streams including:
6//!
7//! - D-STAR header metadata (callsign, suffix, UR call, RPT fields)
8//! - AMBE voice frames decoded to PCM via `mbelib-rs`, then encoded to MP3
9//! - Slow data (D-STAR text messages, DPRS position reports)
10//!
11//! The capture pipeline for each stream:
12//!
13//! ```text
14//! AsyncSession<P> -> VoiceStart (header)
15//!                 -> VoiceFrame x N (AMBE + slow data)
16//!                 -> VoiceEnd
17//!                     |
18//!             mbelib-rs: AMBE -> PCM
19//!                     |
20//!             mp3lame-encoder: PCM -> MP3
21//!                     |
22//!             Postgres: streams table (metadata + audio_mp3)
23//! ```
24//!
25//! Connections are managed by a priority queue: user-pinned reflectors take
26//! precedence, followed by auto-promoted reflectors ranked by activity score.
27//! Idle sessions (no voice activity beyond the configured timeout) are
28//! disconnected to free slots for more active reflectors.
29//!
30//! # Module layout
31//!
32//! - [`decoder`] — AMBE-to-PCM-to-MP3 audio pipeline using `mbelib-rs` and
33//!   `mp3lame-encoder`.
34//! - [`capture`] — Per-stream state tracker (`StreamCapture`) and
35//!   cross-stream dispatcher (`CaptureManager`).
36//!
37//! # Current status
38//!
39//! The audio decode pipeline and capture-state machinery are complete and
40//! unit-tested. The remaining orchestration work — maintaining a pool of
41//! `AsyncSession` tasks keyed off the Tier 1/Tier 2 reflector scores — is
42//! left to a follow-up. The public [`run`] entry point currently logs its
43//! configuration and blocks forever; [`process_completed_stream`] is the
44//! production-grade finalizer that the future orchestrator will wire in.
45
46pub(crate) mod capture;
47pub(crate) mod decoder;
48
49use chrono::Utc;
50use sqlx::PgPool;
51
52use crate::config::{AudioConfig, Tier3Config};
53use crate::db::streams::{self, AudioUpdate, NewStream};
54
55use capture::StreamCapture;
56
57/// Runs the Tier 3 voice capture orchestrator.
58///
59/// Manages a pool of D-STAR protocol connections, capturing voice streams and
60/// writing decoded audio with metadata to `PostgreSQL`. Runs until cancelled.
61///
62/// # Current behavior
63///
64/// This is a placeholder: the connection-pool management layer that
65/// drives `AsyncSession<P>` is not yet implemented. For now the
66/// function logs its configuration and then blocks on a never-completing
67/// future so the main task-supervisor loop treats it as a well-behaved
68/// long-running task.
69///
70/// # Errors
71///
72/// Returns an error if a fatal, non-retryable failure occurs (e.g., the
73/// database pool is closed). Today, the stub path cannot fail.
74pub(crate) async fn run(
75    config: Tier3Config,
76    audio_config: AudioConfig,
77    _pool: PgPool,
78) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
79    tracing::info!(
80        max_concurrent_connections = config.max_concurrent_connections,
81        idle_disconnect_secs = config.idle_disconnect_secs,
82        auto_promote = config.auto_promote,
83        dplus_callsign = %config.dplus_callsign,
84        audio_format = %audio_config.format,
85        mp3_bitrate = audio_config.mp3_bitrate,
86        "tier3 voice capture: session-pool orchestration not yet wired — \
87         decoder + capture subsystems ready"
88    );
89
90    // TODO: spawn the AsyncSession pool, fan `VoiceStart`/`VoiceFrame`/
91    // `VoiceEnd` events into `CaptureManager`, and call
92    // `process_completed_stream` on each terminated capture.
93    std::future::pending::<()>().await;
94    Ok(())
95}
96
97/// Finalizes one captured voice stream: MP3-encodes the audio and persists
98/// the row to `PostgreSQL`.
99///
100/// Called by the Tier 3 orchestrator when a `VoiceEnd` event fires (or an
101/// idle timeout elapses). The orchestrator is stubbed in [`run`] above so
102/// the function is unreferenced at build time today — the follow-up task
103/// will wire it into the session pool.
104///
105/// Two database writes happen:
106///
107/// 1. [`streams::insert_stream`] — creates the row with header metadata.
108/// 2. [`streams::update_audio`] — fills in the MP3 blob, frame count, end
109///    timestamp, and decoded slow-data fields (text, DPRS lat/lon).
110///
111/// The split into insert + update mirrors the schema lifecycle documented
112/// in `db/streams.rs`: the row is visible to the HTTP API as soon as the
113/// header arrives, and its `audio_mp3` field transitions from NULL to a
114/// populated blob when encoding completes.
115///
116/// # Errors
117///
118/// - AMBE/MP3 decode errors ([`decoder::DecodeError`]) are wrapped in the
119///   returned boxed error and logged at `warn` level — one bad stream
120///   should not abort the capture loop.
121/// - SQL errors from the insert or update are bubbled up so the caller can
122///   distinguish transient database issues from bad audio.
123#[expect(
124    dead_code,
125    reason = "wired into the session pool in the follow-up orchestrator task"
126)]
127pub(crate) async fn process_completed_stream(
128    capture: StreamCapture,
129    pool: &PgPool,
130    mp3_bitrate: u32,
131) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
132    let frame_count = capture.frame_count();
133    if frame_count == 0 {
134        // Empty capture: nothing to encode. Drop silently — the header
135        // alone is insufficient to justify a row (the HTTP API would show
136        // a zero-duration "transmission" that serves no purpose).
137        tracing::debug!(
138            reflector = %capture.reflector,
139            module = %capture.module,
140            stream_id = capture.stream_id,
141            "stream ended with zero voice frames — dropping"
142        );
143        return Ok(());
144    }
145
146    // Encode AMBE frames to MP3. This is CPU-bound but short (typically
147    // a few milliseconds even for a 30-second transmission); inline is
148    // fine rather than `spawn_blocking`.
149    let mp3_bytes = decoder::decode_to_mp3(&capture.ambe_frames, mp3_bitrate).map_err(|e| {
150        tracing::warn!(
151            reflector = %capture.reflector,
152            module = %capture.module,
153            stream_id = capture.stream_id,
154            frame_count,
155            error = %e,
156            "failed to encode captured stream to MP3"
157        );
158        e
159    })?;
160
161    // Insert the row with header metadata. `audio_mp3` starts NULL; the
162    // second query (`update_audio`) fills it in.
163    let new_stream = NewStream {
164        reflector: &capture.reflector,
165        module: &capture.module,
166        protocol: &capture.protocol,
167        stream_id: i32::from(capture.stream_id),
168        callsign: &capture.callsign,
169        suffix: capture.suffix.as_deref(),
170        ur_call: capture.ur_call.as_deref(),
171        started_at: capture.started_at,
172    };
173    let row_id = streams::insert_stream(pool, &new_stream).await?;
174
175    // Populate audio + metadata. frame_count is usize on our side but
176    // Postgres stores INTEGER (i32); saturate on overflow to avoid a panic
177    // on pathologically long (>2.1B-frame) transmissions.
178    let frame_count_i32 = i32::try_from(frame_count).unwrap_or(i32::MAX);
179    let audio_update = AudioUpdate {
180        audio_mp3: &mp3_bytes,
181        ended_at: Utc::now(),
182        frame_count: frame_count_i32,
183        dstar_text: capture.dstar_text.as_deref(),
184        dprs_lat: capture.dprs_lat,
185        dprs_lon: capture.dprs_lon,
186    };
187    streams::update_audio(pool, row_id, &audio_update).await?;
188
189    tracing::info!(
190        row_id,
191        reflector = %capture.reflector,
192        module = %capture.module,
193        protocol = %capture.protocol,
194        stream_id = capture.stream_id,
195        callsign = %capture.callsign,
196        frame_count,
197        mp3_bytes = mp3_bytes.len(),
198        has_text = capture.dstar_text.is_some(),
199        has_dprs = capture.dprs_lat.is_some(),
200        "voice stream captured and stored"
201    );
202
203    Ok(())
204}