stargazer/upload/
mod.rs

1//! Rdio API upload queue processor.
2//!
3//! Polls the `streams` table for completed voice recordings with
4//! `upload_status = 'pending'` and uploads them to an `SDRTrunk`-compatible
5//! Rdio Scanner API server using the `POST /api/call-upload`
6//! multipart/form-data protocol.
7//!
8//! The protocol wire format — field names, the `User-Agent: sdrtrunk`
9//! header, the `"Call imported successfully."` success marker — lives in
10//! [`rdio`]. This module is the orchestrator: it drains the database
11//! queue, maps `StreamRow` fields onto the Rdio field set, calls
12//! [`rdio::upload_stream`] for each row, and transitions the row through
13//! the upload lifecycle.
14//!
15//! # Lifecycle transitions
16//!
17//! For every `pending` row returned by [`crate::db::uploads::get_pending`]
18//! the processor:
19//!
20//! - on success — calls [`crate::db::uploads::mark_uploaded`].
21//! - on failure with attempts below `max_retries` — calls
22//!   [`crate::db::uploads::increment_attempts`] to bump the counter and
23//!   leave the row in `pending` so the next tick retries.
24//! - on failure with attempts at or above `max_retries` — calls
25//!   [`crate::db::uploads::mark_failed`] to transition to the terminal
26//!   `failed` state.
27//!
28//! # System id scheme
29//!
30//! Rdio Scanner expects every `system` to be numeric, but D-STAR reflector
31//! names are alphabetic with a numeric suffix (`REF030`, `DCS030`, ...).
32//! To disambiguate reflectors that share a numeric suffix across protocol
33//! families we prefix the numeric suffix with a protocol tag:
34//!
35//! | Protocol | Base | Example | Resulting id |
36//! |---|---|---|---|
37//! | `dplus` (REF) | `10_000` | `REF030` | `10030` |
38//! | `dextra` (XLX) | `20_000` | `XLX030` | `20030` |
39//! | `dcs` (DCS) | `30_000` | `DCS030` | `30030` |
40//! | `dextra` (XRF) | `40_000` | `XRF030` | `40030` |
41//!
42//! Pi-Star labels XLX and XRF reflectors identically as `dextra`; we
43//! distinguish them by the 3-letter prefix on the reflector callsign.
44//!
45//! # Talkgroup scheme
46//!
47//! Rdio Scanner talkgroups are numeric; D-STAR modules are letters A-Z.
48//! We map `A → 1`, `B → 2`, ..., `Z → 26`. Unknown module letters fall
49//! back to `0`.
50
51pub(crate) mod rdio;
52
53use std::time::Duration;
54
55use chrono::{DateTime, Utc};
56use sqlx::PgPool;
57
58use crate::config::RdioConfig;
59use crate::db::streams::StreamRow;
60use crate::db::uploads;
61use rdio::{UploadError, UploadFields};
62
63/// Maximum number of pending rows to drain per tick.
64///
65/// Tuned small so that a transient API outage cannot lock a large batch of
66/// rows in-flight for the duration of the HTTP timeout.
67const BATCH_SIZE: i64 = 10;
68
69/// Runs the upload queue processor loop.
70///
71/// Polls the database for pending streams every
72/// `config.retry_interval_secs` and uploads each to the Rdio Scanner
73/// endpoint. Runs until cancelled.
74///
75/// # Errors
76///
77/// Returns an error only if building the shared `reqwest::Client` fails —
78/// subsequent iteration errors (SQL, HTTP, multipart) are logged at `warn`
79/// level and the loop keeps running so a brief outage does not freeze the
80/// whole processor.
81pub(crate) async fn run(
82    config: RdioConfig,
83    pool: PgPool,
84) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
85    tracing::info!(
86        endpoint = %config.endpoint,
87        retry_interval_secs = config.retry_interval_secs,
88        max_retries = config.max_retries,
89        "upload queue processor starting"
90    );
91
92    // Shared HTTP client. reqwest pools connections per-client, so a
93    // single instance keeps keep-alive connections warm across ticks.
94    let client = reqwest::Client::builder()
95        .timeout(Duration::from_secs(60))
96        .build()
97        .map_err(|e| -> Box<dyn std::error::Error + Send + Sync> { Box::new(e) })?;
98
99    let mut interval = tokio::time::interval(Duration::from_secs(config.retry_interval_secs));
100    // If the loop stalls (e.g. slow DB), do not burst-tick on resume —
101    // skip missed ticks instead of trying to catch up.
102    interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
103
104    loop {
105        let _instant = interval.tick().await;
106        process_pending(&client, &pool, &config).await;
107    }
108}
109
110/// Drains one batch of pending uploads.
111///
112/// Queries up to [`BATCH_SIZE`] pending rows, uploads each, and records the
113/// outcome in the database. All errors are logged at `warn` level and
114/// swallowed — a failure on one row never blocks the others.
115async fn process_pending(client: &reqwest::Client, pool: &PgPool, config: &RdioConfig) {
116    let rows = match uploads::get_pending(pool, BATCH_SIZE).await {
117        Ok(r) => r,
118        Err(e) => {
119            tracing::warn!(error = %e, "upload: failed to query pending streams");
120            return;
121        }
122    };
123
124    if rows.is_empty() {
125        tracing::debug!("upload: no pending streams");
126        return;
127    }
128
129    tracing::debug!(count = rows.len(), "upload: processing pending streams");
130
131    for row in rows {
132        process_one(client, pool, config, row).await;
133    }
134}
135
136/// Uploads a single stream and records the outcome.
137///
138/// Errors from either the HTTP upload or the status-transition query are
139/// logged but never propagated — the caller iterates over a batch and we
140/// do not want one bad row to abort the rest.
141async fn process_one(client: &reqwest::Client, pool: &PgPool, config: &RdioConfig, row: StreamRow) {
142    // Guard: `get_pending` already filters on `audio_mp3 IS NOT NULL`, but
143    // the column is Option<Vec<u8>> at the type level. If somehow the row
144    // is incomplete we log and mark it failed so it does not spin forever.
145    let Some(audio_mp3) = row.audio_mp3.clone() else {
146        tracing::warn!(
147            id = row.id,
148            "upload: pending row has no audio_mp3 — marking failed"
149        );
150        if let Err(e) = uploads::mark_failed(pool, row.id, "no audio_mp3").await {
151            tracing::warn!(id = row.id, error = %e, "upload: failed to mark row failed");
152        }
153        return;
154    };
155
156    // Compute all the Rdio fields from the StreamRow.
157    let system_id = compute_system_id(&row.reflector, &row.protocol);
158    let protocol_label = protocol_label(&row.protocol);
159    let system_label = format!("{} ({})", row.reflector, protocol_label);
160    let talkgroup = module_to_talkgroup(Some(row.module.as_str()));
161    let talkgroup_label = format!("Module {}", row.module);
162    let talker_alias = make_talker_alias(&row.callsign, row.suffix.as_deref());
163    let audio_name = make_audio_name(row.started_at, &row.reflector, &row.module, &row.callsign);
164    let date_time = row.started_at.timestamp();
165
166    let fields = UploadFields {
167        api_key: &config.api_key,
168        system: &system_id,
169        system_label: &system_label,
170        talkgroup: &talkgroup,
171        talkgroup_label: &talkgroup_label,
172        source: &row.callsign,
173        talker_alias: &talker_alias,
174        talkgroup_tag: row.dstar_text.as_deref(),
175        date_time,
176        audio_name: &audio_name,
177        audio_mp3,
178    };
179
180    match rdio::upload_stream(client, &config.endpoint, fields).await {
181        Ok(()) => {
182            tracing::info!(
183                id = row.id,
184                reflector = %row.reflector,
185                module = %row.module,
186                callsign = %row.callsign,
187                "upload: succeeded"
188            );
189            if let Err(e) = uploads::mark_uploaded(pool, row.id).await {
190                tracing::warn!(id = row.id, error = %e, "upload: failed to mark row uploaded");
191            }
192        }
193        Err(e) => {
194            handle_upload_failure(pool, config, &row, &e).await;
195        }
196    }
197}
198
199/// Handles an upload failure by deciding whether to retry or mark the row
200/// as permanently failed, based on the attempts-so-far counter.
201async fn handle_upload_failure(
202    pool: &PgPool,
203    config: &RdioConfig,
204    row: &StreamRow,
205    error: &UploadError,
206) {
207    // attempts_so_far counts attempts BEFORE this one. The database will
208    // be incremented by `increment_attempts` after the check.
209    let attempts_so_far = row.upload_attempts.unwrap_or(0);
210    let max = i32::try_from(config.max_retries).unwrap_or(i32::MAX);
211
212    // The current attempt raised `error`, so `attempts_so_far + 1` total
213    // attempts have now been made. If that reaches `max`, no more retries.
214    let attempts_after = attempts_so_far.saturating_add(1);
215    let error_msg = error.to_string();
216
217    if attempts_after >= max {
218        tracing::warn!(
219            id = row.id,
220            attempts = attempts_after,
221            max_retries = config.max_retries,
222            error = %error_msg,
223            "upload: max retries exhausted — marking failed"
224        );
225        if let Err(e) = uploads::mark_failed(pool, row.id, &error_msg).await {
226            tracing::warn!(id = row.id, error = %e, "upload: failed to mark row failed");
227        }
228    } else {
229        tracing::warn!(
230            id = row.id,
231            attempts = attempts_after,
232            max_retries = config.max_retries,
233            error = %error_msg,
234            "upload: failed — will retry"
235        );
236        if let Err(e) = uploads::increment_attempts(pool, row.id, &error_msg).await {
237            tracing::warn!(id = row.id, error = %e, "upload: failed to increment attempts");
238        }
239    }
240}
241
242/// Returns the Rdio Scanner `system` id for a given D-STAR reflector.
243///
244/// See the module-level documentation for the base-offset table. Unknown
245/// protocols or non-matching reflector-name patterns fall back to the raw
246/// 3-digit suffix (or `"0"` if none).
247///
248/// # Examples
249///
250/// ```ignore
251/// assert_eq!(compute_system_id("REF030", "dplus"), "10030");
252/// assert_eq!(compute_system_id("DCS030", "dcs"),   "30030");
253/// assert_eq!(compute_system_id("XLX030", "dextra"),"20030");
254/// assert_eq!(compute_system_id("XRF030", "dextra"),"40030");
255/// ```
256fn compute_system_id(reflector: &str, protocol: &str) -> String {
257    let suffix = numeric_suffix(reflector).unwrap_or(0);
258    let base = match (protocol, reflector_prefix(reflector)) {
259        ("dplus", _) => 10_000,
260        ("dextra", Some("XRF")) => 40_000,
261        ("dextra", _) => 20_000,
262        ("dcs", _) => 30_000,
263        _ => 0,
264    };
265    (base + u32::from(suffix)).to_string()
266}
267
268/// Returns the first three ASCII-uppercase letters of a reflector name, or
269/// `None` if the prefix is shorter than three bytes or contains non-ASCII.
270///
271/// Used by [`compute_system_id`] to distinguish XLX from XRF within the
272/// `dextra` protocol family.
273fn reflector_prefix(reflector: &str) -> Option<&str> {
274    let prefix = reflector.get(..3)?;
275    if prefix.bytes().all(|b| b.is_ascii_uppercase()) {
276        Some(prefix)
277    } else {
278        None
279    }
280}
281
282/// Extracts the trailing numeric suffix of a reflector callsign.
283///
284/// `"REF030"` → `Some(30)`, `"XLX123"` → `Some(123)`, `"invalid"` → `None`.
285/// Clamped to `u16` because all known D-STAR reflector registries use
286/// 3-digit suffixes.
287fn numeric_suffix(reflector: &str) -> Option<u16> {
288    let tail: String = reflector
289        .chars()
290        .rev()
291        .take_while(char::is_ascii_digit)
292        .collect::<String>()
293        .chars()
294        .rev()
295        .collect();
296    tail.parse().ok()
297}
298
299/// Returns the human-readable protocol label used in `systemLabel`.
300///
301/// `"dplus"` → `"DPlus"`, `"dextra"` → `"DExtra"`, `"dcs"` → `"DCS"`.
302/// Unknown protocols pass through as-is so the operator sees the raw
303/// string rather than silently losing information.
304fn protocol_label(protocol: &str) -> String {
305    match protocol {
306        "dplus" => "DPlus".to_owned(),
307        "dextra" => "DExtra".to_owned(),
308        "dcs" => "DCS".to_owned(),
309        other => other.to_owned(),
310    }
311}
312
313/// Returns the Rdio Scanner `talkgroup` id for a D-STAR module letter.
314///
315/// `A` → `1`, `B` → `2`, ..., `Z` → `26`. Any other input (including
316/// `None`, empty strings, lowercase letters, multi-byte strings) falls
317/// back to `"0"` so the upload still goes through with a sentinel value
318/// the server can filter on.
319fn module_to_talkgroup(module: Option<&str>) -> String {
320    let Some(m) = module else {
321        return "0".to_owned();
322    };
323    let mut bytes = m.bytes();
324    let first = bytes.next();
325    let rest_empty = bytes.next().is_none();
326    match (first, rest_empty) {
327        (Some(c), true) if c.is_ascii_uppercase() => (c - b'A' + 1).to_string(),
328        _ => "0".to_owned(),
329    }
330}
331
332/// Builds the `talkerAlias` field from a callsign and optional suffix.
333///
334/// `("W1AW", Some("D75"))` → `"W1AW / D75"`, `("W1AW", None)` → `"W1AW"`.
335/// Trims trailing whitespace from the suffix because D-STAR pads
336/// suffixes to 4 bytes on the wire.
337fn make_talker_alias(callsign: &str, suffix: Option<&str>) -> String {
338    let trimmed = suffix.map_or("", str::trim);
339    if trimmed.is_empty() {
340        callsign.to_owned()
341    } else {
342        format!("{callsign} / {trimmed}")
343    }
344}
345
346/// Builds the `audio` part's filename.
347///
348/// Format: `YYYYMMDD_HHMMSS_<reflector>_<module>_<callsign>.mp3`, derived
349/// from the stream's `started_at` timestamp (UTC). Example:
350///
351/// ```text
352/// 20260412_143000_REF030_C_W1AW.mp3
353/// ```
354///
355/// The callsign is trimmed of trailing whitespace (D-STAR pads callsigns
356/// to 8 bytes on the wire) so the filename is not littered with padding.
357fn make_audio_name(
358    started_at: DateTime<Utc>,
359    reflector: &str,
360    module: &str,
361    callsign: &str,
362) -> String {
363    let stamp = started_at.format("%Y%m%d_%H%M%S");
364    let callsign = callsign.trim();
365    format!("{stamp}_{reflector}_{module}_{callsign}.mp3")
366}
367
368#[cfg(test)]
369mod tests {
370    use chrono::TimeZone;
371
372    use super::{
373        compute_system_id, make_audio_name, make_talker_alias, module_to_talkgroup, numeric_suffix,
374        protocol_label, reflector_prefix,
375    };
376
377    #[test]
378    fn compute_system_id_dplus_ref_prefix() {
379        assert_eq!(compute_system_id("REF030", "dplus"), "10030");
380        assert_eq!(compute_system_id("REF001", "dplus"), "10001");
381        assert_eq!(compute_system_id("REF999", "dplus"), "10999");
382    }
383
384    #[test]
385    fn compute_system_id_dextra_xlx_prefix() {
386        assert_eq!(compute_system_id("XLX030", "dextra"), "20030");
387        assert_eq!(compute_system_id("XLX001", "dextra"), "20001");
388    }
389
390    #[test]
391    fn compute_system_id_dcs_prefix() {
392        assert_eq!(compute_system_id("DCS030", "dcs"), "30030");
393        assert_eq!(compute_system_id("DCS001", "dcs"), "30001");
394    }
395
396    #[test]
397    fn compute_system_id_dextra_xrf_prefix() {
398        assert_eq!(compute_system_id("XRF030", "dextra"), "40030");
399        assert_eq!(compute_system_id("XRF123", "dextra"), "40123");
400    }
401
402    #[test]
403    fn compute_system_id_unknown_protocol_uses_zero_base() {
404        // Unknown protocols fall back to base 0 so the numeric suffix is
405        // preserved and the upload still goes through.
406        assert_eq!(compute_system_id("REF030", "unknown"), "30");
407    }
408
409    #[test]
410    fn compute_system_id_missing_suffix_returns_base() {
411        // No numeric suffix means 0 — base prefix only.
412        assert_eq!(compute_system_id("REFXYZ", "dplus"), "10000");
413    }
414
415    #[test]
416    fn module_to_talkgroup_covers_all_uppercase_letters() {
417        // A through Z map to 1 through 26 inclusive.
418        for (i, letter) in ('A'..='Z').enumerate() {
419            let input = letter.to_string();
420            let expected = (i + 1).to_string();
421            assert_eq!(
422                module_to_talkgroup(Some(input.as_str())),
423                expected,
424                "module {letter} must map to {expected}"
425            );
426        }
427    }
428
429    #[test]
430    fn module_to_talkgroup_none_returns_zero() {
431        assert_eq!(module_to_talkgroup(None), "0");
432    }
433
434    #[test]
435    fn module_to_talkgroup_rejects_invalid_inputs() {
436        // Empty string, lowercase, multi-char, and non-ASCII all sentinel.
437        assert_eq!(module_to_talkgroup(Some("")), "0");
438        assert_eq!(module_to_talkgroup(Some("a")), "0");
439        assert_eq!(module_to_talkgroup(Some("AA")), "0");
440        assert_eq!(module_to_talkgroup(Some("A ")), "0");
441        assert_eq!(module_to_talkgroup(Some("1")), "0");
442    }
443
444    #[test]
445    fn make_audio_name_formats_timestamp_correctly() -> Result<(), Box<dyn std::error::Error>> {
446        let ts = chrono::Utc
447            .with_ymd_and_hms(2026, 4, 12, 14, 30, 0)
448            .single()
449            .ok_or("fixed timestamp must be unambiguous")?;
450        let name = make_audio_name(ts, "REF030", "C", "W1AW");
451        assert_eq!(name, "20260412_143000_REF030_C_W1AW.mp3");
452        Ok(())
453    }
454
455    #[test]
456    fn make_audio_name_trims_callsign_padding() -> Result<(), Box<dyn std::error::Error>> {
457        // D-STAR pads callsigns to 8 bytes with trailing spaces; those
458        // must not end up in the filename.
459        let ts = chrono::Utc
460            .with_ymd_and_hms(2026, 1, 1, 0, 0, 0)
461            .single()
462            .ok_or("fixed timestamp must be unambiguous")?;
463        let name = make_audio_name(ts, "DCS030", "B", "W1AW    ");
464        assert_eq!(name, "20260101_000000_DCS030_B_W1AW.mp3");
465        Ok(())
466    }
467
468    #[test]
469    fn make_talker_alias_appends_suffix() {
470        assert_eq!(make_talker_alias("W1AW", Some("D75")), "W1AW / D75");
471    }
472
473    #[test]
474    fn make_talker_alias_trims_suffix() {
475        // D-STAR suffixes are padded to 4 bytes; trailing whitespace must
476        // not leak into the talker alias display.
477        assert_eq!(make_talker_alias("W1AW", Some("D75 ")), "W1AW / D75");
478    }
479
480    #[test]
481    fn make_talker_alias_skips_empty_suffix() {
482        assert_eq!(make_talker_alias("W1AW", Some("    ")), "W1AW");
483        assert_eq!(make_talker_alias("W1AW", Some("")), "W1AW");
484        assert_eq!(make_talker_alias("W1AW", None), "W1AW");
485    }
486
487    #[test]
488    fn protocol_label_known_protocols_capitalise() {
489        assert_eq!(protocol_label("dplus"), "DPlus");
490        assert_eq!(protocol_label("dextra"), "DExtra");
491        assert_eq!(protocol_label("dcs"), "DCS");
492    }
493
494    #[test]
495    fn protocol_label_unknown_passes_through() {
496        assert_eq!(protocol_label("weird"), "weird");
497    }
498
499    #[test]
500    fn numeric_suffix_extracts_trailing_digits() {
501        assert_eq!(numeric_suffix("REF030"), Some(30));
502        assert_eq!(numeric_suffix("DCS001"), Some(1));
503        assert_eq!(numeric_suffix("XRF999"), Some(999));
504    }
505
506    #[test]
507    fn numeric_suffix_handles_no_digits() {
508        assert_eq!(numeric_suffix("REFXYZ"), None);
509        assert_eq!(numeric_suffix(""), None);
510    }
511
512    #[test]
513    fn reflector_prefix_returns_three_uppercase_letters() {
514        assert_eq!(reflector_prefix("REF030"), Some("REF"));
515        assert_eq!(reflector_prefix("XRF030"), Some("XRF"));
516        assert_eq!(reflector_prefix("XLX030"), Some("XLX"));
517        assert_eq!(reflector_prefix("DCS030"), Some("DCS"));
518    }
519
520    #[test]
521    fn reflector_prefix_rejects_short_or_non_ascii() {
522        assert_eq!(reflector_prefix("RE"), None);
523        assert_eq!(reflector_prefix("ref030"), None);
524    }
525}