stargazer/db/
streams.rs

1//! Query functions for the `streams` table.
2//!
3//! The streams table stores captured D-STAR voice transmissions from Tier 3
4//! deep connections. Each row represents one complete or in-progress voice
5//! stream: the D-STAR header metadata, optional slow-data fields (text message,
6//! DPRS position), the decoded MP3 audio blob, and upload lifecycle state.
7//!
8//! # Lifecycle
9//!
10//! 1. Tier 3 calls [`insert_stream`] when a voice header arrives, creating a
11//!    row with `audio_mp3 = NULL` and `ended_at = NULL`.
12//! 2. When the stream ends (EOT or timeout), Tier 3 calls [`update_audio`] to
13//!    fill in the MP3 blob, final frame count, and end timestamp.
14//! 3. The upload processor picks up rows with `upload_status = 'pending'` and
15//!    non-null `audio_mp3` via the `uploads` module.
16
17use chrono::{DateTime, Utc};
18use sqlx::PgPool;
19
20/// A single row from the `streams` table.
21///
22/// Maps directly to the table columns via `sqlx::FromRow`. The `audio_mp3`
23/// field is `Option<Vec<u8>>` because it starts as `NULL` and is filled in
24/// after MP3 encoding completes.
25#[derive(Debug, sqlx::FromRow)]
26pub(crate) struct StreamRow {
27    /// Auto-generated row identifier, used as the stream's database primary key.
28    pub(crate) id: i64,
29
30    /// Reflector callsign where this stream was captured.
31    pub(crate) reflector: String,
32
33    /// Module letter (A-Z) the stream was received on.
34    pub(crate) module: String,
35
36    /// D-STAR protocol used: `"dplus"`, `"dextra"`, or `"dcs"`.
37    pub(crate) protocol: String,
38
39    /// D-STAR stream ID (non-zero u16 on wire, stored as i32 for Postgres
40    /// `INTEGER` compatibility).
41    pub(crate) stream_id: i32,
42
43    /// Operator callsign extracted from the D-STAR header `my_callsign` field.
44    pub(crate) callsign: String,
45
46    /// Operator suffix (4 bytes from `my_suffix`), if present.
47    pub(crate) suffix: Option<String>,
48
49    /// `UR` (destination) callsign from the D-STAR header.
50    pub(crate) ur_call: Option<String>,
51
52    /// Slow-data text message decoded from the voice frames.
53    pub(crate) dstar_text: Option<String>,
54
55    /// DPRS latitude, if a position report was embedded in the slow data.
56    pub(crate) dprs_lat: Option<f64>,
57
58    /// DPRS longitude, if a position report was embedded in the slow data.
59    pub(crate) dprs_lon: Option<f64>,
60
61    /// When the first voice frame was received.
62    pub(crate) started_at: DateTime<Utc>,
63
64    /// When the stream ended (EOT received or timeout). `None` while the
65    /// stream is still in progress.
66    pub(crate) ended_at: Option<DateTime<Utc>>,
67
68    /// Number of voice frames captured.
69    pub(crate) frame_count: Option<i32>,
70
71    /// Decoded MP3 audio blob. `None` until encoding completes.
72    pub(crate) audio_mp3: Option<Vec<u8>>,
73
74    /// Upload lifecycle state: `"pending"`, `"uploaded"`, or `"failed"`.
75    pub(crate) upload_status: Option<String>,
76
77    /// Number of upload attempts made so far.
78    pub(crate) upload_attempts: Option<i32>,
79
80    /// Error message from the most recent failed upload attempt.
81    pub(crate) last_upload_error: Option<String>,
82
83    /// When the stream was successfully uploaded to the Rdio API.
84    pub(crate) uploaded_at: Option<DateTime<Utc>>,
85
86    /// When this row was first inserted.
87    pub(crate) created_at: Option<DateTime<Utc>>,
88}
89
90/// Parameters for inserting a new stream row.
91///
92/// Collected from the D-STAR voice header when a Tier 3 connection receives
93/// a new transmission. Audio and end-time are filled in later via
94/// [`update_audio`].
95#[derive(Debug)]
96pub(crate) struct NewStream<'a> {
97    /// Reflector callsign.
98    pub(crate) reflector: &'a str,
99    /// Module letter.
100    pub(crate) module: &'a str,
101    /// Protocol name.
102    pub(crate) protocol: &'a str,
103    /// D-STAR stream ID.
104    pub(crate) stream_id: i32,
105    /// Operator callsign.
106    pub(crate) callsign: &'a str,
107    /// Operator suffix, if any.
108    pub(crate) suffix: Option<&'a str>,
109    /// UR callsign, if any.
110    pub(crate) ur_call: Option<&'a str>,
111    /// When the header was received.
112    pub(crate) started_at: DateTime<Utc>,
113}
114
115/// Parameters for updating a stream after voice capture and MP3 encoding
116/// complete.
117///
118/// Groups the audio blob, timing, and optional slow-data fields into a single
119/// struct to stay within clippy's argument-count limit.
120#[derive(Debug)]
121pub(crate) struct AudioUpdate<'a> {
122    /// Encoded MP3 audio bytes.
123    pub(crate) audio_mp3: &'a [u8],
124    /// When the stream ended (EOT or timeout).
125    pub(crate) ended_at: DateTime<Utc>,
126    /// Total number of voice frames captured.
127    pub(crate) frame_count: i32,
128    /// Slow-data text message decoded from voice frames.
129    pub(crate) dstar_text: Option<&'a str>,
130    /// DPRS latitude, if a position report was embedded.
131    pub(crate) dprs_lat: Option<f64>,
132    /// DPRS longitude, if a position report was embedded.
133    pub(crate) dprs_lon: Option<f64>,
134}
135
136/// Inserts a new stream row and returns its auto-generated `id`.
137///
138/// Called by Tier 3 when a D-STAR voice header arrives. The row is created
139/// with `audio_mp3 = NULL`, `ended_at = NULL`, and `upload_status = 'pending'`.
140/// The returned `id` is used by subsequent [`update_audio`] calls.
141///
142/// # Errors
143///
144/// Returns `sqlx::Error` on connection or constraint failures.
145pub(crate) async fn insert_stream(
146    pool: &PgPool,
147    stream: &NewStream<'_>,
148) -> Result<i64, sqlx::Error> {
149    // INSERT with RETURNING id to get the auto-generated BIGSERIAL value.
150    let row: (i64,) = sqlx::query_as(
151        "INSERT INTO streams (reflector, module, protocol, stream_id, callsign,
152                              suffix, ur_call, started_at)
153         VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
154         RETURNING id",
155    )
156    .bind(stream.reflector)
157    .bind(stream.module)
158    .bind(stream.protocol)
159    .bind(stream.stream_id)
160    .bind(stream.callsign)
161    .bind(stream.suffix)
162    .bind(stream.ur_call)
163    .bind(stream.started_at)
164    .fetch_one(pool)
165    .await?;
166    Ok(row.0)
167}
168
169/// Updates a stream with the encoded MP3 audio, end timestamp, and frame count.
170///
171/// Called by Tier 3 after the voice stream ends and MP3 encoding completes.
172/// The [`AudioUpdate`] struct groups the audio blob, timing, and optional
173/// slow-data fields decoded from the voice frames.
174///
175/// # Errors
176///
177/// Returns `sqlx::Error` on query failure.
178pub(crate) async fn update_audio(
179    pool: &PgPool,
180    id: i64,
181    update: &AudioUpdate<'_>,
182) -> Result<(), sqlx::Error> {
183    // UPDATE targeting a single row by primary key, filling in audio and
184    // metadata fields that were NULL at insert time.
185    let _result = sqlx::query(
186        "UPDATE streams
187         SET audio_mp3 = $1, ended_at = $2, frame_count = $3,
188             dstar_text = $4, dprs_lat = $5, dprs_lon = $6
189         WHERE id = $7",
190    )
191    .bind(update.audio_mp3)
192    .bind(update.ended_at)
193    .bind(update.frame_count)
194    .bind(update.dstar_text)
195    .bind(update.dprs_lat)
196    .bind(update.dprs_lon)
197    .bind(id)
198    .execute(pool)
199    .await?;
200    Ok(())
201}
202
203/// Aggregated counts of streams grouped by `upload_status`.
204///
205/// Used by the HTTP API `/metrics` endpoint for operational visibility.
206/// Any stream row whose `upload_status` is NULL or an unrecognized value is
207/// bucketed into `unknown` so the total always sums back to the full row
208/// count of the `streams` table.
209#[derive(Debug, Default, serde::Serialize)]
210pub(crate) struct StreamStatusCounts {
211    /// Total rows in the `streams` table.
212    pub(crate) total: i64,
213    /// Rows with `upload_status = 'pending'`.
214    pub(crate) pending: i64,
215    /// Rows with `upload_status = 'uploaded'`.
216    pub(crate) uploaded: i64,
217    /// Rows with `upload_status = 'failed'`.
218    pub(crate) failed: i64,
219}
220
221/// Returns aggregated upload-status counts for the `streams` table.
222///
223/// Runs a single `GROUP BY upload_status` query so this is cheap even with
224/// millions of rows — the planner uses the `idx_streams_upload` index.
225///
226/// # Errors
227///
228/// Returns `sqlx::Error` on query failure.
229pub(crate) async fn count_by_status(pool: &PgPool) -> Result<StreamStatusCounts, sqlx::Error> {
230    // Each row is (upload_status, COUNT(*)) — we fold into the typed
231    // StreamStatusCounts struct so the HTTP layer can serialize directly.
232    let rows: Vec<(Option<String>, i64)> =
233        sqlx::query_as("SELECT upload_status, COUNT(*) FROM streams GROUP BY upload_status")
234            .fetch_all(pool)
235            .await?;
236    let mut counts = StreamStatusCounts::default();
237    for (status, n) in rows {
238        counts.total = counts.total.saturating_add(n);
239        match status.as_deref() {
240            Some("pending") => counts.pending = n,
241            Some("uploaded") => counts.uploaded = n,
242            Some("failed") => counts.failed = n,
243            _ => {}
244        }
245    }
246    Ok(counts)
247}
248
249/// Queries streams with optional filters on reflector, time window, and limit.
250///
251/// Used by the HTTP API to serve the stream listing endpoint. When
252/// `reflector_filter` is `Some`, only streams from that reflector are returned.
253/// Results are ordered by `started_at DESC` (most recent first).
254///
255/// # Errors
256///
257/// Returns `sqlx::Error` on query failure.
258pub(crate) async fn query(
259    pool: &PgPool,
260    reflector_filter: Option<&str>,
261    since: DateTime<Utc>,
262    limit: i64,
263) -> Result<Vec<StreamRow>, sqlx::Error> {
264    // Two query paths: filtered by reflector, or all reflectors.
265    // Both are time-bounded and row-limited, using idx_streams_lookup.
266    if let Some(reflector) = reflector_filter {
267        sqlx::query_as::<_, StreamRow>(
268            "SELECT id, reflector, module, protocol, stream_id, callsign,
269                    suffix, ur_call, dstar_text, dprs_lat, dprs_lon,
270                    started_at, ended_at, frame_count, audio_mp3,
271                    upload_status, upload_attempts, last_upload_error,
272                    uploaded_at, created_at
273             FROM streams
274             WHERE reflector = $1 AND started_at >= $2
275             ORDER BY started_at DESC
276             LIMIT $3",
277        )
278        .bind(reflector)
279        .bind(since)
280        .bind(limit)
281        .fetch_all(pool)
282        .await
283    } else {
284        sqlx::query_as::<_, StreamRow>(
285            "SELECT id, reflector, module, protocol, stream_id, callsign,
286                    suffix, ur_call, dstar_text, dprs_lat, dprs_lon,
287                    started_at, ended_at, frame_count, audio_mp3,
288                    upload_status, upload_attempts, last_upload_error,
289                    uploaded_at, created_at
290             FROM streams
291             WHERE started_at >= $1
292             ORDER BY started_at DESC
293             LIMIT $2",
294        )
295        .bind(since)
296        .bind(limit)
297        .fetch_all(pool)
298        .await
299    }
300}