stargazer/db/uploads.rs
1//! Query functions for the stream upload queue.
2//!
3//! The upload queue is not a separate table; it operates on the `streams` table
4//! using the `upload_status`, `upload_attempts`, and `last_upload_error` columns
5//! to track the lifecycle of each stream's upload to the Rdio API.
6//!
7//! # Upload Lifecycle
8//!
9//! ```text
10//! pending ──► uploaded (success: mark_uploaded)
11//! │
12//! ├──► pending (transient failure: increment_attempts)
13//! │
14//! └──► failed (permanent failure or max retries: mark_failed)
15//! ```
16//!
17//! The upload processor polls [`get_pending`] on a timer, attempts each upload,
18//! and calls the appropriate status-transition function based on the result.
19//! Only streams with non-null `audio_mp3` are returned by [`get_pending`],
20//! ensuring that in-progress streams (still recording) are not prematurely
21//! queued.
22
23use chrono::Utc;
24use sqlx::PgPool;
25
26use super::streams::StreamRow;
27
28/// Returns streams awaiting upload, ordered by creation time.
29///
30/// Filters for `upload_status = 'pending'` AND `audio_mp3 IS NOT NULL` (the
31/// stream must have finished recording and MP3 encoding). Results are capped
32/// at `limit` rows and ordered oldest-first so that the upload processor
33/// works through the backlog in FIFO order.
34///
35/// # Errors
36///
37/// Returns `sqlx::Error` on query failure.
38pub(crate) async fn get_pending(pool: &PgPool, limit: i64) -> Result<Vec<StreamRow>, sqlx::Error> {
39 // Only select streams that have completed MP3 encoding (audio_mp3 IS NOT
40 // NULL) and are still in pending state. Ordered by created_at ASC for FIFO.
41 sqlx::query_as::<_, StreamRow>(
42 "SELECT id, reflector, module, protocol, stream_id, callsign,
43 suffix, ur_call, dstar_text, dprs_lat, dprs_lon,
44 started_at, ended_at, frame_count, audio_mp3,
45 upload_status, upload_attempts, last_upload_error,
46 uploaded_at, created_at
47 FROM streams
48 WHERE upload_status = 'pending' AND audio_mp3 IS NOT NULL
49 ORDER BY created_at ASC
50 LIMIT $1",
51 )
52 .bind(limit)
53 .fetch_all(pool)
54 .await
55}
56
57/// Marks a stream as successfully uploaded.
58///
59/// Sets `upload_status = 'uploaded'` and records the current timestamp in
60/// `uploaded_at`. Called by the upload processor after a successful Rdio API
61/// response.
62///
63/// # Errors
64///
65/// Returns `sqlx::Error` on query failure.
66pub(crate) async fn mark_uploaded(pool: &PgPool, id: i64) -> Result<(), sqlx::Error> {
67 // Transition to terminal 'uploaded' state with timestamp.
68 let _result = sqlx::query(
69 "UPDATE streams
70 SET upload_status = 'uploaded', uploaded_at = $1
71 WHERE id = $2",
72 )
73 .bind(Utc::now())
74 .bind(id)
75 .execute(pool)
76 .await?;
77 Ok(())
78}
79
80/// Marks a stream as permanently failed.
81///
82/// Sets `upload_status = 'failed'` and records the error message. Called when
83/// the upload processor decides not to retry (e.g., max attempts exceeded or
84/// a non-retryable HTTP status).
85///
86/// # Errors
87///
88/// Returns `sqlx::Error` on query failure.
89pub(crate) async fn mark_failed(pool: &PgPool, id: i64, error: &str) -> Result<(), sqlx::Error> {
90 // Transition to terminal 'failed' state with error message.
91 let _result = sqlx::query(
92 "UPDATE streams
93 SET upload_status = 'failed', last_upload_error = $1
94 WHERE id = $2",
95 )
96 .bind(error)
97 .bind(id)
98 .execute(pool)
99 .await?;
100 Ok(())
101}
102
103/// Increments the attempt counter and records the error for a retryable failure.
104///
105/// The stream stays in `upload_status = 'pending'` so it will be picked up
106/// again on the next poll cycle. The caller is responsible for checking
107/// `upload_attempts` against the configured maximum and calling [`mark_failed`]
108/// when retries are exhausted.
109///
110/// # Errors
111///
112/// Returns `sqlx::Error` on query failure.
113pub(crate) async fn increment_attempts(
114 pool: &PgPool,
115 id: i64,
116 error: &str,
117) -> Result<(), sqlx::Error> {
118 // Bump attempt counter and record the latest error, but leave status as
119 // 'pending' so the upload processor retries on the next cycle.
120 let _result = sqlx::query(
121 "UPDATE streams
122 SET upload_attempts = upload_attempts + 1, last_upload_error = $1
123 WHERE id = $2",
124 )
125 .bind(error)
126 .bind(id)
127 .execute(pool)
128 .await?;
129 Ok(())
130}