stargazer/db/streams.rs
1//! Query functions for the `streams` table.
2//!
3//! The streams table stores captured D-STAR voice transmissions from Tier 3
4//! deep connections. Each row represents one complete or in-progress voice
5//! stream: the D-STAR header metadata, optional slow-data fields (text message,
6//! DPRS position), the decoded MP3 audio blob, and upload lifecycle state.
7//!
8//! # Lifecycle
9//!
10//! 1. Tier 3 calls [`insert_stream`] when a voice header arrives, creating a
11//! row with `audio_mp3 = NULL` and `ended_at = NULL`.
12//! 2. When the stream ends (EOT or timeout), Tier 3 calls [`update_audio`] to
13//! fill in the MP3 blob, final frame count, and end timestamp.
14//! 3. The upload processor picks up rows with `upload_status = 'pending'` and
15//! non-null `audio_mp3` via the `uploads` module.
16
17use chrono::{DateTime, Utc};
18use sqlx::PgPool;
19
20/// A single row from the `streams` table.
21///
22/// Maps directly to the table columns via `sqlx::FromRow`. The `audio_mp3`
23/// field is `Option<Vec<u8>>` because it starts as `NULL` and is filled in
24/// after MP3 encoding completes.
25#[derive(Debug, sqlx::FromRow)]
26pub(crate) struct StreamRow {
27 /// Auto-generated row identifier, used as the stream's database primary key.
28 pub(crate) id: i64,
29
30 /// Reflector callsign where this stream was captured.
31 pub(crate) reflector: String,
32
33 /// Module letter (A-Z) the stream was received on.
34 pub(crate) module: String,
35
36 /// D-STAR protocol used: `"dplus"`, `"dextra"`, or `"dcs"`.
37 pub(crate) protocol: String,
38
39 /// D-STAR stream ID (non-zero u16 on wire, stored as i32 for Postgres
40 /// `INTEGER` compatibility).
41 pub(crate) stream_id: i32,
42
43 /// Operator callsign extracted from the D-STAR header `my_callsign` field.
44 pub(crate) callsign: String,
45
46 /// Operator suffix (4 bytes from `my_suffix`), if present.
47 pub(crate) suffix: Option<String>,
48
49 /// `UR` (destination) callsign from the D-STAR header.
50 pub(crate) ur_call: Option<String>,
51
52 /// Slow-data text message decoded from the voice frames.
53 pub(crate) dstar_text: Option<String>,
54
55 /// DPRS latitude, if a position report was embedded in the slow data.
56 pub(crate) dprs_lat: Option<f64>,
57
58 /// DPRS longitude, if a position report was embedded in the slow data.
59 pub(crate) dprs_lon: Option<f64>,
60
61 /// When the first voice frame was received.
62 pub(crate) started_at: DateTime<Utc>,
63
64 /// When the stream ended (EOT received or timeout). `None` while the
65 /// stream is still in progress.
66 pub(crate) ended_at: Option<DateTime<Utc>>,
67
68 /// Number of voice frames captured.
69 pub(crate) frame_count: Option<i32>,
70
71 /// Decoded MP3 audio blob. `None` until encoding completes.
72 pub(crate) audio_mp3: Option<Vec<u8>>,
73
74 /// Upload lifecycle state: `"pending"`, `"uploaded"`, or `"failed"`.
75 pub(crate) upload_status: Option<String>,
76
77 /// Number of upload attempts made so far.
78 pub(crate) upload_attempts: Option<i32>,
79
80 /// Error message from the most recent failed upload attempt.
81 pub(crate) last_upload_error: Option<String>,
82
83 /// When the stream was successfully uploaded to the Rdio API.
84 pub(crate) uploaded_at: Option<DateTime<Utc>>,
85
86 /// When this row was first inserted.
87 pub(crate) created_at: Option<DateTime<Utc>>,
88}
89
90/// Parameters for inserting a new stream row.
91///
92/// Collected from the D-STAR voice header when a Tier 3 connection receives
93/// a new transmission. Audio and end-time are filled in later via
94/// [`update_audio`].
95#[derive(Debug)]
96pub(crate) struct NewStream<'a> {
97 /// Reflector callsign.
98 pub(crate) reflector: &'a str,
99 /// Module letter.
100 pub(crate) module: &'a str,
101 /// Protocol name.
102 pub(crate) protocol: &'a str,
103 /// D-STAR stream ID.
104 pub(crate) stream_id: i32,
105 /// Operator callsign.
106 pub(crate) callsign: &'a str,
107 /// Operator suffix, if any.
108 pub(crate) suffix: Option<&'a str>,
109 /// UR callsign, if any.
110 pub(crate) ur_call: Option<&'a str>,
111 /// When the header was received.
112 pub(crate) started_at: DateTime<Utc>,
113}
114
115/// Parameters for updating a stream after voice capture and MP3 encoding
116/// complete.
117///
118/// Groups the audio blob, timing, and optional slow-data fields into a single
119/// struct to stay within clippy's argument-count limit.
120#[derive(Debug)]
121pub(crate) struct AudioUpdate<'a> {
122 /// Encoded MP3 audio bytes.
123 pub(crate) audio_mp3: &'a [u8],
124 /// When the stream ended (EOT or timeout).
125 pub(crate) ended_at: DateTime<Utc>,
126 /// Total number of voice frames captured.
127 pub(crate) frame_count: i32,
128 /// Slow-data text message decoded from voice frames.
129 pub(crate) dstar_text: Option<&'a str>,
130 /// DPRS latitude, if a position report was embedded.
131 pub(crate) dprs_lat: Option<f64>,
132 /// DPRS longitude, if a position report was embedded.
133 pub(crate) dprs_lon: Option<f64>,
134}
135
136/// Inserts a new stream row and returns its auto-generated `id`.
137///
138/// Called by Tier 3 when a D-STAR voice header arrives. The row is created
139/// with `audio_mp3 = NULL`, `ended_at = NULL`, and `upload_status = 'pending'`.
140/// The returned `id` is used by subsequent [`update_audio`] calls.
141///
142/// # Errors
143///
144/// Returns `sqlx::Error` on connection or constraint failures.
145pub(crate) async fn insert_stream(
146 pool: &PgPool,
147 stream: &NewStream<'_>,
148) -> Result<i64, sqlx::Error> {
149 // INSERT with RETURNING id to get the auto-generated BIGSERIAL value.
150 let row: (i64,) = sqlx::query_as(
151 "INSERT INTO streams (reflector, module, protocol, stream_id, callsign,
152 suffix, ur_call, started_at)
153 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
154 RETURNING id",
155 )
156 .bind(stream.reflector)
157 .bind(stream.module)
158 .bind(stream.protocol)
159 .bind(stream.stream_id)
160 .bind(stream.callsign)
161 .bind(stream.suffix)
162 .bind(stream.ur_call)
163 .bind(stream.started_at)
164 .fetch_one(pool)
165 .await?;
166 Ok(row.0)
167}
168
169/// Updates a stream with the encoded MP3 audio, end timestamp, and frame count.
170///
171/// Called by Tier 3 after the voice stream ends and MP3 encoding completes.
172/// The [`AudioUpdate`] struct groups the audio blob, timing, and optional
173/// slow-data fields decoded from the voice frames.
174///
175/// # Errors
176///
177/// Returns `sqlx::Error` on query failure.
178pub(crate) async fn update_audio(
179 pool: &PgPool,
180 id: i64,
181 update: &AudioUpdate<'_>,
182) -> Result<(), sqlx::Error> {
183 // UPDATE targeting a single row by primary key, filling in audio and
184 // metadata fields that were NULL at insert time.
185 let _result = sqlx::query(
186 "UPDATE streams
187 SET audio_mp3 = $1, ended_at = $2, frame_count = $3,
188 dstar_text = $4, dprs_lat = $5, dprs_lon = $6
189 WHERE id = $7",
190 )
191 .bind(update.audio_mp3)
192 .bind(update.ended_at)
193 .bind(update.frame_count)
194 .bind(update.dstar_text)
195 .bind(update.dprs_lat)
196 .bind(update.dprs_lon)
197 .bind(id)
198 .execute(pool)
199 .await?;
200 Ok(())
201}
202
203/// Aggregated counts of streams grouped by `upload_status`.
204///
205/// Used by the HTTP API `/metrics` endpoint for operational visibility.
206/// Any stream row whose `upload_status` is NULL or an unrecognized value is
207/// bucketed into `unknown` so the total always sums back to the full row
208/// count of the `streams` table.
209#[derive(Debug, Default, serde::Serialize)]
210pub(crate) struct StreamStatusCounts {
211 /// Total rows in the `streams` table.
212 pub(crate) total: i64,
213 /// Rows with `upload_status = 'pending'`.
214 pub(crate) pending: i64,
215 /// Rows with `upload_status = 'uploaded'`.
216 pub(crate) uploaded: i64,
217 /// Rows with `upload_status = 'failed'`.
218 pub(crate) failed: i64,
219}
220
221/// Returns aggregated upload-status counts for the `streams` table.
222///
223/// Runs a single `GROUP BY upload_status` query so this is cheap even with
224/// millions of rows — the planner uses the `idx_streams_upload` index.
225///
226/// # Errors
227///
228/// Returns `sqlx::Error` on query failure.
229pub(crate) async fn count_by_status(pool: &PgPool) -> Result<StreamStatusCounts, sqlx::Error> {
230 // Each row is (upload_status, COUNT(*)) — we fold into the typed
231 // StreamStatusCounts struct so the HTTP layer can serialize directly.
232 let rows: Vec<(Option<String>, i64)> =
233 sqlx::query_as("SELECT upload_status, COUNT(*) FROM streams GROUP BY upload_status")
234 .fetch_all(pool)
235 .await?;
236 let mut counts = StreamStatusCounts::default();
237 for (status, n) in rows {
238 counts.total = counts.total.saturating_add(n);
239 match status.as_deref() {
240 Some("pending") => counts.pending = n,
241 Some("uploaded") => counts.uploaded = n,
242 Some("failed") => counts.failed = n,
243 _ => {}
244 }
245 }
246 Ok(counts)
247}
248
249/// Queries streams with optional filters on reflector, time window, and limit.
250///
251/// Used by the HTTP API to serve the stream listing endpoint. When
252/// `reflector_filter` is `Some`, only streams from that reflector are returned.
253/// Results are ordered by `started_at DESC` (most recent first).
254///
255/// # Errors
256///
257/// Returns `sqlx::Error` on query failure.
258pub(crate) async fn query(
259 pool: &PgPool,
260 reflector_filter: Option<&str>,
261 since: DateTime<Utc>,
262 limit: i64,
263) -> Result<Vec<StreamRow>, sqlx::Error> {
264 // Two query paths: filtered by reflector, or all reflectors.
265 // Both are time-bounded and row-limited, using idx_streams_lookup.
266 if let Some(reflector) = reflector_filter {
267 sqlx::query_as::<_, StreamRow>(
268 "SELECT id, reflector, module, protocol, stream_id, callsign,
269 suffix, ur_call, dstar_text, dprs_lat, dprs_lon,
270 started_at, ended_at, frame_count, audio_mp3,
271 upload_status, upload_attempts, last_upload_error,
272 uploaded_at, created_at
273 FROM streams
274 WHERE reflector = $1 AND started_at >= $2
275 ORDER BY started_at DESC
276 LIMIT $3",
277 )
278 .bind(reflector)
279 .bind(since)
280 .bind(limit)
281 .fetch_all(pool)
282 .await
283 } else {
284 sqlx::query_as::<_, StreamRow>(
285 "SELECT id, reflector, module, protocol, stream_id, callsign,
286 suffix, ur_call, dstar_text, dprs_lat, dprs_lon,
287 started_at, ended_at, frame_count, audio_mp3,
288 upload_status, upload_attempts, last_upload_error,
289 uploaded_at, created_at
290 FROM streams
291 WHERE started_at >= $1
292 ORDER BY started_at DESC
293 LIMIT $2",
294 )
295 .bind(since)
296 .bind(limit)
297 .fetch_all(pool)
298 .await
299 }
300}