stargazer/db/
uploads.rs

1//! Query functions for the stream upload queue.
2//!
3//! The upload queue is not a separate table; it operates on the `streams` table
4//! using the `upload_status`, `upload_attempts`, and `last_upload_error` columns
5//! to track the lifecycle of each stream's upload to the Rdio API.
6//!
7//! # Upload Lifecycle
8//!
9//! ```text
10//! pending ──► uploaded    (success: mark_uploaded)
11//!    │
12//!    ├──► pending         (transient failure: increment_attempts)
13//!    │
14//!    └──► failed          (permanent failure or max retries: mark_failed)
15//! ```
16//!
17//! The upload processor polls [`get_pending`] on a timer, attempts each upload,
18//! and calls the appropriate status-transition function based on the result.
19//! Only streams with non-null `audio_mp3` are returned by [`get_pending`],
20//! ensuring that in-progress streams (still recording) are not prematurely
21//! queued.
22
23use chrono::Utc;
24use sqlx::PgPool;
25
26use super::streams::StreamRow;
27
28/// Returns streams awaiting upload, ordered by creation time.
29///
30/// Filters for `upload_status = 'pending'` AND `audio_mp3 IS NOT NULL` (the
31/// stream must have finished recording and MP3 encoding). Results are capped
32/// at `limit` rows and ordered oldest-first so that the upload processor
33/// works through the backlog in FIFO order.
34///
35/// # Errors
36///
37/// Returns `sqlx::Error` on query failure.
38pub(crate) async fn get_pending(pool: &PgPool, limit: i64) -> Result<Vec<StreamRow>, sqlx::Error> {
39    // Only select streams that have completed MP3 encoding (audio_mp3 IS NOT
40    // NULL) and are still in pending state. Ordered by created_at ASC for FIFO.
41    sqlx::query_as::<_, StreamRow>(
42        "SELECT id, reflector, module, protocol, stream_id, callsign,
43                suffix, ur_call, dstar_text, dprs_lat, dprs_lon,
44                started_at, ended_at, frame_count, audio_mp3,
45                upload_status, upload_attempts, last_upload_error,
46                uploaded_at, created_at
47         FROM streams
48         WHERE upload_status = 'pending' AND audio_mp3 IS NOT NULL
49         ORDER BY created_at ASC
50         LIMIT $1",
51    )
52    .bind(limit)
53    .fetch_all(pool)
54    .await
55}
56
57/// Marks a stream as successfully uploaded.
58///
59/// Sets `upload_status = 'uploaded'` and records the current timestamp in
60/// `uploaded_at`. Called by the upload processor after a successful Rdio API
61/// response.
62///
63/// # Errors
64///
65/// Returns `sqlx::Error` on query failure.
66pub(crate) async fn mark_uploaded(pool: &PgPool, id: i64) -> Result<(), sqlx::Error> {
67    // Transition to terminal 'uploaded' state with timestamp.
68    let _result = sqlx::query(
69        "UPDATE streams
70         SET upload_status = 'uploaded', uploaded_at = $1
71         WHERE id = $2",
72    )
73    .bind(Utc::now())
74    .bind(id)
75    .execute(pool)
76    .await?;
77    Ok(())
78}
79
80/// Marks a stream as permanently failed.
81///
82/// Sets `upload_status = 'failed'` and records the error message. Called when
83/// the upload processor decides not to retry (e.g., max attempts exceeded or
84/// a non-retryable HTTP status).
85///
86/// # Errors
87///
88/// Returns `sqlx::Error` on query failure.
89pub(crate) async fn mark_failed(pool: &PgPool, id: i64, error: &str) -> Result<(), sqlx::Error> {
90    // Transition to terminal 'failed' state with error message.
91    let _result = sqlx::query(
92        "UPDATE streams
93         SET upload_status = 'failed', last_upload_error = $1
94         WHERE id = $2",
95    )
96    .bind(error)
97    .bind(id)
98    .execute(pool)
99    .await?;
100    Ok(())
101}
102
103/// Increments the attempt counter and records the error for a retryable failure.
104///
105/// The stream stays in `upload_status = 'pending'` so it will be picked up
106/// again on the next poll cycle. The caller is responsible for checking
107/// `upload_attempts` against the configured maximum and calling [`mark_failed`]
108/// when retries are exhausted.
109///
110/// # Errors
111///
112/// Returns `sqlx::Error` on query failure.
113pub(crate) async fn increment_attempts(
114    pool: &PgPool,
115    id: i64,
116    error: &str,
117) -> Result<(), sqlx::Error> {
118    // Bump attempt counter and record the latest error, but leave status as
119    // 'pending' so the upload processor retries on the next cycle.
120    let _result = sqlx::query(
121        "UPDATE streams
122         SET upload_attempts = upload_attempts + 1, last_upload_error = $1
123         WHERE id = $2",
124    )
125    .bind(error)
126    .bind(id)
127    .execute(pool)
128    .await?;
129    Ok(())
130}