1pub(crate) mod rdio;
52
53use std::time::Duration;
54
55use chrono::{DateTime, Utc};
56use sqlx::PgPool;
57
58use crate::config::RdioConfig;
59use crate::db::streams::StreamRow;
60use crate::db::uploads;
61use rdio::{UploadError, UploadFields};
62
63const BATCH_SIZE: i64 = 10;
68
69pub(crate) async fn run(
82 config: RdioConfig,
83 pool: PgPool,
84) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
85 tracing::info!(
86 endpoint = %config.endpoint,
87 retry_interval_secs = config.retry_interval_secs,
88 max_retries = config.max_retries,
89 "upload queue processor starting"
90 );
91
92 let client = reqwest::Client::builder()
95 .timeout(Duration::from_secs(60))
96 .build()
97 .map_err(|e| -> Box<dyn std::error::Error + Send + Sync> { Box::new(e) })?;
98
99 let mut interval = tokio::time::interval(Duration::from_secs(config.retry_interval_secs));
100 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
103
104 loop {
105 let _instant = interval.tick().await;
106 process_pending(&client, &pool, &config).await;
107 }
108}
109
110async fn process_pending(client: &reqwest::Client, pool: &PgPool, config: &RdioConfig) {
116 let rows = match uploads::get_pending(pool, BATCH_SIZE).await {
117 Ok(r) => r,
118 Err(e) => {
119 tracing::warn!(error = %e, "upload: failed to query pending streams");
120 return;
121 }
122 };
123
124 if rows.is_empty() {
125 tracing::debug!("upload: no pending streams");
126 return;
127 }
128
129 tracing::debug!(count = rows.len(), "upload: processing pending streams");
130
131 for row in rows {
132 process_one(client, pool, config, row).await;
133 }
134}
135
136async fn process_one(client: &reqwest::Client, pool: &PgPool, config: &RdioConfig, row: StreamRow) {
142 let Some(audio_mp3) = row.audio_mp3.clone() else {
146 tracing::warn!(
147 id = row.id,
148 "upload: pending row has no audio_mp3 — marking failed"
149 );
150 if let Err(e) = uploads::mark_failed(pool, row.id, "no audio_mp3").await {
151 tracing::warn!(id = row.id, error = %e, "upload: failed to mark row failed");
152 }
153 return;
154 };
155
156 let system_id = compute_system_id(&row.reflector, &row.protocol);
158 let protocol_label = protocol_label(&row.protocol);
159 let system_label = format!("{} ({})", row.reflector, protocol_label);
160 let talkgroup = module_to_talkgroup(Some(row.module.as_str()));
161 let talkgroup_label = format!("Module {}", row.module);
162 let talker_alias = make_talker_alias(&row.callsign, row.suffix.as_deref());
163 let audio_name = make_audio_name(row.started_at, &row.reflector, &row.module, &row.callsign);
164 let date_time = row.started_at.timestamp();
165
166 let fields = UploadFields {
167 api_key: &config.api_key,
168 system: &system_id,
169 system_label: &system_label,
170 talkgroup: &talkgroup,
171 talkgroup_label: &talkgroup_label,
172 source: &row.callsign,
173 talker_alias: &talker_alias,
174 talkgroup_tag: row.dstar_text.as_deref(),
175 date_time,
176 audio_name: &audio_name,
177 audio_mp3,
178 };
179
180 match rdio::upload_stream(client, &config.endpoint, fields).await {
181 Ok(()) => {
182 tracing::info!(
183 id = row.id,
184 reflector = %row.reflector,
185 module = %row.module,
186 callsign = %row.callsign,
187 "upload: succeeded"
188 );
189 if let Err(e) = uploads::mark_uploaded(pool, row.id).await {
190 tracing::warn!(id = row.id, error = %e, "upload: failed to mark row uploaded");
191 }
192 }
193 Err(e) => {
194 handle_upload_failure(pool, config, &row, &e).await;
195 }
196 }
197}
198
199async fn handle_upload_failure(
202 pool: &PgPool,
203 config: &RdioConfig,
204 row: &StreamRow,
205 error: &UploadError,
206) {
207 let attempts_so_far = row.upload_attempts.unwrap_or(0);
210 let max = i32::try_from(config.max_retries).unwrap_or(i32::MAX);
211
212 let attempts_after = attempts_so_far.saturating_add(1);
215 let error_msg = error.to_string();
216
217 if attempts_after >= max {
218 tracing::warn!(
219 id = row.id,
220 attempts = attempts_after,
221 max_retries = config.max_retries,
222 error = %error_msg,
223 "upload: max retries exhausted — marking failed"
224 );
225 if let Err(e) = uploads::mark_failed(pool, row.id, &error_msg).await {
226 tracing::warn!(id = row.id, error = %e, "upload: failed to mark row failed");
227 }
228 } else {
229 tracing::warn!(
230 id = row.id,
231 attempts = attempts_after,
232 max_retries = config.max_retries,
233 error = %error_msg,
234 "upload: failed — will retry"
235 );
236 if let Err(e) = uploads::increment_attempts(pool, row.id, &error_msg).await {
237 tracing::warn!(id = row.id, error = %e, "upload: failed to increment attempts");
238 }
239 }
240}
241
242fn compute_system_id(reflector: &str, protocol: &str) -> String {
257 let suffix = numeric_suffix(reflector).unwrap_or(0);
258 let base = match (protocol, reflector_prefix(reflector)) {
259 ("dplus", _) => 10_000,
260 ("dextra", Some("XRF")) => 40_000,
261 ("dextra", _) => 20_000,
262 ("dcs", _) => 30_000,
263 _ => 0,
264 };
265 (base + u32::from(suffix)).to_string()
266}
267
268fn reflector_prefix(reflector: &str) -> Option<&str> {
274 let prefix = reflector.get(..3)?;
275 if prefix.bytes().all(|b| b.is_ascii_uppercase()) {
276 Some(prefix)
277 } else {
278 None
279 }
280}
281
282fn numeric_suffix(reflector: &str) -> Option<u16> {
288 let tail: String = reflector
289 .chars()
290 .rev()
291 .take_while(char::is_ascii_digit)
292 .collect::<String>()
293 .chars()
294 .rev()
295 .collect();
296 tail.parse().ok()
297}
298
299fn protocol_label(protocol: &str) -> String {
305 match protocol {
306 "dplus" => "DPlus".to_owned(),
307 "dextra" => "DExtra".to_owned(),
308 "dcs" => "DCS".to_owned(),
309 other => other.to_owned(),
310 }
311}
312
313fn module_to_talkgroup(module: Option<&str>) -> String {
320 let Some(m) = module else {
321 return "0".to_owned();
322 };
323 let mut bytes = m.bytes();
324 let first = bytes.next();
325 let rest_empty = bytes.next().is_none();
326 match (first, rest_empty) {
327 (Some(c), true) if c.is_ascii_uppercase() => (c - b'A' + 1).to_string(),
328 _ => "0".to_owned(),
329 }
330}
331
332fn make_talker_alias(callsign: &str, suffix: Option<&str>) -> String {
338 let trimmed = suffix.map_or("", str::trim);
339 if trimmed.is_empty() {
340 callsign.to_owned()
341 } else {
342 format!("{callsign} / {trimmed}")
343 }
344}
345
346fn make_audio_name(
358 started_at: DateTime<Utc>,
359 reflector: &str,
360 module: &str,
361 callsign: &str,
362) -> String {
363 let stamp = started_at.format("%Y%m%d_%H%M%S");
364 let callsign = callsign.trim();
365 format!("{stamp}_{reflector}_{module}_{callsign}.mp3")
366}
367
368#[cfg(test)]
369mod tests {
370 use chrono::TimeZone;
371
372 use super::{
373 compute_system_id, make_audio_name, make_talker_alias, module_to_talkgroup, numeric_suffix,
374 protocol_label, reflector_prefix,
375 };
376
377 #[test]
378 fn compute_system_id_dplus_ref_prefix() {
379 assert_eq!(compute_system_id("REF030", "dplus"), "10030");
380 assert_eq!(compute_system_id("REF001", "dplus"), "10001");
381 assert_eq!(compute_system_id("REF999", "dplus"), "10999");
382 }
383
384 #[test]
385 fn compute_system_id_dextra_xlx_prefix() {
386 assert_eq!(compute_system_id("XLX030", "dextra"), "20030");
387 assert_eq!(compute_system_id("XLX001", "dextra"), "20001");
388 }
389
390 #[test]
391 fn compute_system_id_dcs_prefix() {
392 assert_eq!(compute_system_id("DCS030", "dcs"), "30030");
393 assert_eq!(compute_system_id("DCS001", "dcs"), "30001");
394 }
395
396 #[test]
397 fn compute_system_id_dextra_xrf_prefix() {
398 assert_eq!(compute_system_id("XRF030", "dextra"), "40030");
399 assert_eq!(compute_system_id("XRF123", "dextra"), "40123");
400 }
401
402 #[test]
403 fn compute_system_id_unknown_protocol_uses_zero_base() {
404 assert_eq!(compute_system_id("REF030", "unknown"), "30");
407 }
408
409 #[test]
410 fn compute_system_id_missing_suffix_returns_base() {
411 assert_eq!(compute_system_id("REFXYZ", "dplus"), "10000");
413 }
414
415 #[test]
416 fn module_to_talkgroup_covers_all_uppercase_letters() {
417 for (i, letter) in ('A'..='Z').enumerate() {
419 let input = letter.to_string();
420 let expected = (i + 1).to_string();
421 assert_eq!(
422 module_to_talkgroup(Some(input.as_str())),
423 expected,
424 "module {letter} must map to {expected}"
425 );
426 }
427 }
428
429 #[test]
430 fn module_to_talkgroup_none_returns_zero() {
431 assert_eq!(module_to_talkgroup(None), "0");
432 }
433
434 #[test]
435 fn module_to_talkgroup_rejects_invalid_inputs() {
436 assert_eq!(module_to_talkgroup(Some("")), "0");
438 assert_eq!(module_to_talkgroup(Some("a")), "0");
439 assert_eq!(module_to_talkgroup(Some("AA")), "0");
440 assert_eq!(module_to_talkgroup(Some("A ")), "0");
441 assert_eq!(module_to_talkgroup(Some("1")), "0");
442 }
443
444 #[test]
445 fn make_audio_name_formats_timestamp_correctly() -> Result<(), Box<dyn std::error::Error>> {
446 let ts = chrono::Utc
447 .with_ymd_and_hms(2026, 4, 12, 14, 30, 0)
448 .single()
449 .ok_or("fixed timestamp must be unambiguous")?;
450 let name = make_audio_name(ts, "REF030", "C", "W1AW");
451 assert_eq!(name, "20260412_143000_REF030_C_W1AW.mp3");
452 Ok(())
453 }
454
455 #[test]
456 fn make_audio_name_trims_callsign_padding() -> Result<(), Box<dyn std::error::Error>> {
457 let ts = chrono::Utc
460 .with_ymd_and_hms(2026, 1, 1, 0, 0, 0)
461 .single()
462 .ok_or("fixed timestamp must be unambiguous")?;
463 let name = make_audio_name(ts, "DCS030", "B", "W1AW ");
464 assert_eq!(name, "20260101_000000_DCS030_B_W1AW.mp3");
465 Ok(())
466 }
467
468 #[test]
469 fn make_talker_alias_appends_suffix() {
470 assert_eq!(make_talker_alias("W1AW", Some("D75")), "W1AW / D75");
471 }
472
473 #[test]
474 fn make_talker_alias_trims_suffix() {
475 assert_eq!(make_talker_alias("W1AW", Some("D75 ")), "W1AW / D75");
478 }
479
480 #[test]
481 fn make_talker_alias_skips_empty_suffix() {
482 assert_eq!(make_talker_alias("W1AW", Some(" ")), "W1AW");
483 assert_eq!(make_talker_alias("W1AW", Some("")), "W1AW");
484 assert_eq!(make_talker_alias("W1AW", None), "W1AW");
485 }
486
487 #[test]
488 fn protocol_label_known_protocols_capitalise() {
489 assert_eq!(protocol_label("dplus"), "DPlus");
490 assert_eq!(protocol_label("dextra"), "DExtra");
491 assert_eq!(protocol_label("dcs"), "DCS");
492 }
493
494 #[test]
495 fn protocol_label_unknown_passes_through() {
496 assert_eq!(protocol_label("weird"), "weird");
497 }
498
499 #[test]
500 fn numeric_suffix_extracts_trailing_digits() {
501 assert_eq!(numeric_suffix("REF030"), Some(30));
502 assert_eq!(numeric_suffix("DCS001"), Some(1));
503 assert_eq!(numeric_suffix("XRF999"), Some(999));
504 }
505
506 #[test]
507 fn numeric_suffix_handles_no_digits() {
508 assert_eq!(numeric_suffix("REFXYZ"), None);
509 assert_eq!(numeric_suffix(""), None);
510 }
511
512 #[test]
513 fn reflector_prefix_returns_three_uppercase_letters() {
514 assert_eq!(reflector_prefix("REF030"), Some("REF"));
515 assert_eq!(reflector_prefix("XRF030"), Some("XRF"));
516 assert_eq!(reflector_prefix("XLX030"), Some("XLX"));
517 assert_eq!(reflector_prefix("DCS030"), Some("DCS"));
518 }
519
520 #[test]
521 fn reflector_prefix_rejects_short_or_non_ascii() {
522 assert_eq!(reflector_prefix("RE"), None);
523 assert_eq!(reflector_prefix("ref030"), None);
524 }
525}