1use std::time::Duration;
28
29use axum::extract::{Path, Query, State};
30use axum::http::StatusCode;
31use axum::response::{IntoResponse, Json};
32use chrono::{DateTime, Utc};
33use serde::{Deserialize, Serialize};
34use sqlx::PgPool;
35
36use crate::db;
37use crate::db::activity::ActivityRow;
38use crate::db::connected_nodes::ConnectedNodeRow;
39use crate::db::reflectors::ReflectorRow;
40use crate::db::streams::{StreamRow, StreamStatusCounts};
41
42const DEFAULT_REFLECTOR_WINDOW: Duration = Duration::from_secs(1_800);
47
48const DEFAULT_ACTIVITY_WINDOW: Duration = Duration::from_secs(21_600);
54
55const DEFAULT_STREAM_WINDOW: Duration = Duration::from_secs(3_600);
57
58const DEFAULT_STREAM_LIMIT: i64 = 50;
60
61const DEFAULT_UPLOAD_LIMIT: i64 = 50;
63
64const DEFAULT_ACTIVITY_LIMIT: i64 = 50;
66
67#[derive(Debug, Default, Deserialize)]
73pub(crate) struct ReflectorQuery {
74 pub(crate) since: Option<String>,
76}
77
78#[derive(Debug, Default, Deserialize)]
80pub(crate) struct ActivityQuery {
81 pub(crate) since: Option<String>,
83}
84
85#[derive(Debug, Default, Deserialize)]
87pub(crate) struct StreamQuery {
88 pub(crate) since: Option<String>,
90
91 pub(crate) reflector: Option<String>,
94
95 pub(crate) limit: Option<i64>,
97}
98
99#[derive(Debug, Default, Deserialize)]
101pub(crate) struct UploadQueueQuery {
102 pub(crate) limit: Option<i64>,
104}
105
106#[derive(Debug, Default, Deserialize)]
108pub(crate) struct GlobalActivityQuery {
109 pub(crate) since: Option<String>,
111
112 pub(crate) limit: Option<i64>,
114}
115
116#[derive(Debug, Serialize)]
122pub(crate) struct HealthResponse {
123 pub(crate) status: &'static str,
126}
127
128#[derive(Debug, Serialize)]
134pub(crate) struct MetricsResponse {
135 pub(crate) reflectors_count: i64,
137
138 pub(crate) streams_total: i64,
140
141 pub(crate) streams_pending: i64,
143
144 pub(crate) streams_uploaded: i64,
146
147 pub(crate) streams_failed: i64,
149}
150
151#[derive(Debug, Serialize)]
153pub(crate) struct ReflectorView {
154 pub(crate) callsign: String,
156
157 pub(crate) protocol: String,
159
160 pub(crate) last_seen: Option<DateTime<Utc>>,
162
163 pub(crate) country: Option<String>,
165
166 pub(crate) dashboard_url: Option<String>,
168
169 pub(crate) tier2_available: bool,
171
172 pub(crate) created_at: Option<DateTime<Utc>>,
177}
178
179impl From<ReflectorRow> for ReflectorView {
180 fn from(row: ReflectorRow) -> Self {
181 Self {
182 callsign: row.callsign,
183 protocol: row.protocol,
184 last_seen: row.last_seen,
185 country: row.country,
186 dashboard_url: row.dashboard_url,
187 tier2_available: row.tier2_available.unwrap_or(false),
188 created_at: row.created_at,
189 }
190 }
191}
192
193#[derive(Debug, Serialize)]
195pub(crate) struct ActivityView {
196 pub(crate) id: i64,
200
201 pub(crate) reflector: String,
203
204 pub(crate) module: Option<String>,
206
207 pub(crate) callsign: String,
209
210 pub(crate) source: String,
212
213 pub(crate) observed_at: DateTime<Utc>,
215}
216
217impl From<ActivityRow> for ActivityView {
218 fn from(row: ActivityRow) -> Self {
219 Self {
220 id: row.id,
221 reflector: row.reflector,
222 module: row.module,
223 callsign: row.callsign,
224 source: row.source,
225 observed_at: row.observed_at,
226 }
227 }
228}
229
230#[derive(Debug, Serialize)]
236pub(crate) struct StreamView {
237 pub(crate) id: i64,
239
240 pub(crate) reflector: String,
242
243 pub(crate) module: String,
245
246 pub(crate) protocol: String,
248
249 pub(crate) stream_id: i32,
254
255 pub(crate) callsign: String,
257
258 pub(crate) suffix: Option<String>,
260
261 pub(crate) ur_call: Option<String>,
267
268 pub(crate) dstar_text: Option<String>,
270
271 pub(crate) dprs_lat: Option<f64>,
273
274 pub(crate) dprs_lon: Option<f64>,
276
277 pub(crate) started_at: DateTime<Utc>,
279
280 pub(crate) ended_at: Option<DateTime<Utc>>,
282
283 pub(crate) frame_count: Option<i32>,
285
286 pub(crate) upload_status: Option<String>,
288
289 pub(crate) last_upload_error: Option<String>,
291
292 pub(crate) uploaded_at: Option<DateTime<Utc>>,
294
295 pub(crate) created_at: Option<DateTime<Utc>>,
297}
298
299impl From<StreamRow> for StreamView {
300 fn from(row: StreamRow) -> Self {
301 Self {
302 id: row.id,
303 reflector: row.reflector,
304 module: row.module,
305 protocol: row.protocol,
306 stream_id: row.stream_id,
307 callsign: row.callsign,
308 suffix: row.suffix,
309 ur_call: row.ur_call,
310 dstar_text: row.dstar_text,
311 dprs_lat: row.dprs_lat,
312 dprs_lon: row.dprs_lon,
313 started_at: row.started_at,
314 ended_at: row.ended_at,
315 frame_count: row.frame_count,
316 upload_status: row.upload_status,
317 last_upload_error: row.last_upload_error,
318 uploaded_at: row.uploaded_at,
319 created_at: row.created_at,
320 }
321 }
322}
323
324#[derive(Debug, Serialize)]
329pub(crate) struct ConnectedNodeView {
330 pub(crate) reflector: String,
332
333 pub(crate) node_callsign: String,
335
336 pub(crate) module: Option<String>,
338
339 pub(crate) protocol: Option<String>,
341
342 pub(crate) connected_since: Option<DateTime<Utc>>,
344
345 pub(crate) last_heard: Option<DateTime<Utc>>,
347}
348
349impl From<ConnectedNodeRow> for ConnectedNodeView {
350 fn from(row: ConnectedNodeRow) -> Self {
351 Self {
352 reflector: row.reflector,
353 node_callsign: row.node_callsign,
354 module: row.module,
355 protocol: row.protocol,
356 connected_since: row.connected_since,
357 last_heard: row.last_heard,
358 }
359 }
360}
361
362fn parse_duration_string(s: &str) -> Option<Duration> {
380 let s = s.trim();
381 let (num_str, unit) = s.split_at(s.len().checked_sub(1)?);
382 if num_str.is_empty() {
383 return None;
384 }
385 let n: u64 = num_str.parse().ok()?;
386 let secs = match unit {
387 "s" => n,
388 "m" => n.checked_mul(60)?,
389 "h" => n.checked_mul(3600)?,
390 "d" => n.checked_mul(86_400)?,
391 _ => return None,
392 };
393 Some(Duration::from_secs(secs))
394}
395
396fn parse_since(since: Option<&str>, default: Duration) -> DateTime<Utc> {
400 let window = since.and_then(parse_duration_string).unwrap_or(default);
401 let window_chrono =
404 chrono::Duration::from_std(window).unwrap_or_else(|_| chrono::Duration::seconds(0));
405 Utc::now() - window_chrono
406}
407
408fn clamp_limit(provided: Option<i64>, default: i64, max: i64) -> i64 {
414 let requested = provided.unwrap_or(default);
415 if requested <= 0 {
416 default
417 } else {
418 requested.min(max)
419 }
420}
421
422const MAX_LIMIT: i64 = 500;
424
425pub(crate) async fn health() -> Json<HealthResponse> {
435 Json(HealthResponse { status: "ok" })
436}
437
438pub(crate) async fn metrics(
445 State(pool): State<PgPool>,
446) -> Result<Json<MetricsResponse>, StatusCode> {
447 let reflectors_count = db::reflectors::count_total(&pool).await.map_err(|e| {
448 tracing::warn!(error = %e, "metrics: failed to count reflectors");
449 StatusCode::INTERNAL_SERVER_ERROR
450 })?;
451
452 let StreamStatusCounts {
453 total,
454 pending,
455 uploaded,
456 failed,
457 } = db::streams::count_by_status(&pool).await.map_err(|e| {
458 tracing::warn!(error = %e, "metrics: failed to count streams");
459 StatusCode::INTERNAL_SERVER_ERROR
460 })?;
461
462 Ok(Json(MetricsResponse {
463 reflectors_count,
464 streams_total: total,
465 streams_pending: pending,
466 streams_uploaded: uploaded,
467 streams_failed: failed,
468 }))
469}
470
471pub(crate) async fn list_reflectors(
476 State(pool): State<PgPool>,
477 Query(params): Query<ReflectorQuery>,
478) -> Result<Json<Vec<ReflectorView>>, StatusCode> {
479 let since = parse_since(params.since.as_deref(), DEFAULT_REFLECTOR_WINDOW);
480 let rows = db::reflectors::get_active(&pool, since)
481 .await
482 .map_err(|e| {
483 tracing::warn!(error = %e, "list_reflectors: query failed");
484 StatusCode::INTERNAL_SERVER_ERROR
485 })?;
486 Ok(Json(rows.into_iter().map(ReflectorView::from).collect()))
487}
488
489pub(crate) async fn reflector_activity(
494 State(pool): State<PgPool>,
495 Path(callsign): Path<String>,
496 Query(params): Query<ActivityQuery>,
497) -> Result<Json<Vec<ActivityView>>, StatusCode> {
498 let since = parse_since(params.since.as_deref(), DEFAULT_ACTIVITY_WINDOW);
499 let rows = db::activity::get_for_reflector(&pool, &callsign, since)
500 .await
501 .map_err(|e| {
502 tracing::warn!(error = %e, callsign = %callsign, "reflector_activity: query failed");
503 StatusCode::INTERNAL_SERVER_ERROR
504 })?;
505 Ok(Json(rows.into_iter().map(ActivityView::from).collect()))
506}
507
508pub(crate) async fn list_streams(
513 State(pool): State<PgPool>,
514 Query(params): Query<StreamQuery>,
515) -> Result<Json<Vec<StreamView>>, StatusCode> {
516 let since = parse_since(params.since.as_deref(), DEFAULT_STREAM_WINDOW);
517 let limit = clamp_limit(params.limit, DEFAULT_STREAM_LIMIT, MAX_LIMIT);
518 let reflector_filter = params.reflector.as_deref();
519 let rows = db::streams::query(&pool, reflector_filter, since, limit)
520 .await
521 .map_err(|e| {
522 tracing::warn!(error = %e, "list_streams: query failed");
523 StatusCode::INTERNAL_SERVER_ERROR
524 })?;
525 Ok(Json(rows.into_iter().map(StreamView::from).collect()))
526}
527
528pub(crate) async fn upload_queue(
534 State(pool): State<PgPool>,
535 Query(params): Query<UploadQueueQuery>,
536) -> Result<Json<Vec<StreamView>>, StatusCode> {
537 let limit = clamp_limit(params.limit, DEFAULT_UPLOAD_LIMIT, MAX_LIMIT);
538 let rows = db::uploads::get_pending(&pool, limit).await.map_err(|e| {
539 tracing::warn!(error = %e, "upload_queue: query failed");
540 StatusCode::INTERNAL_SERVER_ERROR
541 })?;
542 Ok(Json(rows.into_iter().map(StreamView::from).collect()))
543}
544
545pub(crate) async fn reflector_nodes(
551 State(pool): State<PgPool>,
552 Path(callsign): Path<String>,
553) -> Result<Json<Vec<ConnectedNodeView>>, StatusCode> {
554 let rows = db::connected_nodes::get_for_reflector(&pool, &callsign)
555 .await
556 .map_err(|e| {
557 tracing::warn!(error = %e, callsign = %callsign, "reflector_nodes: query failed");
558 StatusCode::INTERNAL_SERVER_ERROR
559 })?;
560 Ok(Json(
561 rows.into_iter().map(ConnectedNodeView::from).collect(),
562 ))
563}
564
565pub(crate) async fn list_activity(
572 State(pool): State<PgPool>,
573 Query(params): Query<GlobalActivityQuery>,
574) -> Result<Json<Vec<ActivityView>>, StatusCode> {
575 let since = parse_since(params.since.as_deref(), DEFAULT_ACTIVITY_WINDOW);
576 let limit = clamp_limit(params.limit, DEFAULT_ACTIVITY_LIMIT, MAX_LIMIT);
577 let rows = db::activity::get_recent(&pool, since, limit)
578 .await
579 .map_err(|e| {
580 tracing::warn!(error = %e, "list_activity: query failed");
581 StatusCode::INTERNAL_SERVER_ERROR
582 })?;
583 Ok(Json(rows.into_iter().map(ActivityView::from).collect()))
584}
585
586pub(crate) async fn tier3_connect() -> impl IntoResponse {
592 StatusCode::NOT_IMPLEMENTED
593}
594
595pub(crate) async fn tier3_disconnect(
601 Path((_callsign, _module)): Path<(String, String)>,
602) -> impl IntoResponse {
603 StatusCode::NOT_IMPLEMENTED
604}
605
606#[cfg(test)]
611mod tests {
612 use super::{
613 ActivityView, MAX_LIMIT, ReflectorView, StreamView, clamp_limit, health,
614 parse_duration_string, parse_since,
615 };
616 use crate::db::activity::ActivityRow;
617 use crate::db::reflectors::ReflectorRow;
618 use crate::db::streams::StreamRow;
619 use chrono::{TimeZone, Utc};
620 use std::time::Duration;
621
622 #[test]
623 fn parse_duration_seconds() {
624 assert_eq!(parse_duration_string("30s"), Some(Duration::from_secs(30)));
625 assert_eq!(parse_duration_string("1s"), Some(Duration::from_secs(1)));
626 }
627
628 #[test]
629 fn parse_duration_minutes() {
630 assert_eq!(parse_duration_string("1m"), Some(Duration::from_secs(60)));
631 assert_eq!(
632 parse_duration_string("90m"),
633 Some(Duration::from_secs(5_400))
634 );
635 }
636
637 #[test]
638 fn parse_duration_hours() {
639 assert_eq!(
640 parse_duration_string("1h"),
641 Some(Duration::from_secs(3_600))
642 );
643 assert_eq!(
644 parse_duration_string("24h"),
645 Some(Duration::from_secs(86_400))
646 );
647 }
648
649 #[test]
650 fn parse_duration_days() {
651 assert_eq!(
652 parse_duration_string("1d"),
653 Some(Duration::from_secs(86_400))
654 );
655 assert_eq!(
656 parse_duration_string("7d"),
657 Some(Duration::from_secs(604_800))
658 );
659 }
660
661 #[test]
662 fn parse_duration_rejects_malformed() {
663 assert_eq!(parse_duration_string("30"), None);
665 assert_eq!(parse_duration_string("5x"), None);
667 assert_eq!(parse_duration_string("h"), None);
669 assert_eq!(parse_duration_string(""), None);
671 assert_eq!(parse_duration_string("-1h"), None);
673 assert_eq!(parse_duration_string("1 h"), None);
675 }
676
677 #[test]
678 fn parse_duration_handles_overflow() {
679 assert_eq!(parse_duration_string("999999999999999999d"), None);
681 }
682
683 #[test]
684 fn parse_since_defaults_when_missing() {
685 let t = parse_since(None, Duration::from_secs(3600));
688 let delta = Utc::now().signed_duration_since(t);
689 assert!(
690 (3590..=3605).contains(&delta.num_seconds()),
691 "expected ~3600s ago, got {}s",
692 delta.num_seconds()
693 );
694 }
695
696 #[test]
697 fn parse_since_uses_default_on_bad_input() {
698 let t = parse_since(Some("bogus"), Duration::from_secs(600));
701 let delta = Utc::now().signed_duration_since(t);
702 assert!(
703 (595..=605).contains(&delta.num_seconds()),
704 "expected ~600s ago, got {}s",
705 delta.num_seconds()
706 );
707 }
708
709 #[test]
710 fn parse_since_honors_valid_input() {
711 let t = parse_since(Some("2h"), Duration::from_secs(60));
712 let delta = Utc::now().signed_duration_since(t);
713 assert!(
714 (7190..=7210).contains(&delta.num_seconds()),
715 "expected ~7200s ago, got {}s",
716 delta.num_seconds()
717 );
718 }
719
720 #[test]
721 fn clamp_limit_applies_default_for_none() {
722 assert_eq!(clamp_limit(None, 50, MAX_LIMIT), 50);
723 }
724
725 #[test]
726 fn clamp_limit_applies_default_for_nonpositive() {
727 assert_eq!(clamp_limit(Some(0), 50, MAX_LIMIT), 50);
728 assert_eq!(clamp_limit(Some(-1), 50, MAX_LIMIT), 50);
729 assert_eq!(clamp_limit(Some(-9_999), 50, MAX_LIMIT), 50);
730 }
731
732 #[test]
733 fn clamp_limit_caps_at_max() {
734 assert_eq!(clamp_limit(Some(9_999), 50, MAX_LIMIT), MAX_LIMIT);
735 assert_eq!(clamp_limit(Some(MAX_LIMIT + 1), 50, MAX_LIMIT), MAX_LIMIT);
736 }
737
738 #[test]
739 fn clamp_limit_passes_valid_values() {
740 assert_eq!(clamp_limit(Some(1), 50, MAX_LIMIT), 1);
741 assert_eq!(clamp_limit(Some(100), 50, MAX_LIMIT), 100);
742 assert_eq!(clamp_limit(Some(MAX_LIMIT), 50, MAX_LIMIT), MAX_LIMIT);
743 }
744
745 #[tokio::test]
746 async fn health_returns_ok_literal() {
747 let resp = health().await;
748 assert_eq!(resp.0.status, "ok");
749 }
750
751 #[test]
752 fn reflector_view_from_row_maps_all_fields() -> Result<(), Box<dyn std::error::Error>> {
753 let ts = Utc
756 .with_ymd_and_hms(2026, 4, 12, 10, 0, 0)
757 .single()
758 .ok_or("fixed timestamp must be unambiguous")?;
759 let row = ReflectorRow {
760 callsign: "REF030".to_owned(),
761 protocol: "dplus".to_owned(),
762 ip_address: Some("1.2.3.4".to_owned()),
763 dashboard_url: Some("https://example.invalid".to_owned()),
764 country: Some("US".to_owned()),
765 last_seen: Some(ts),
766 tier2_available: Some(true),
767 created_at: Some(ts),
768 };
769 let view = ReflectorView::from(row);
770 assert_eq!(view.callsign, "REF030");
771 assert_eq!(view.protocol, "dplus");
772 assert_eq!(view.country.as_deref(), Some("US"));
773 assert_eq!(
774 view.dashboard_url.as_deref(),
775 Some("https://example.invalid")
776 );
777 assert_eq!(view.last_seen, Some(ts));
778 assert!(view.tier2_available);
779 Ok(())
780 }
781
782 #[test]
783 fn reflector_view_tier2_available_defaults_false() {
784 let row = ReflectorRow {
787 callsign: "DCS030".to_owned(),
788 protocol: "dcs".to_owned(),
789 ip_address: None,
790 dashboard_url: None,
791 country: None,
792 last_seen: None,
793 tier2_available: None,
794 created_at: None,
795 };
796 let view = ReflectorView::from(row);
797 assert!(!view.tier2_available);
798 }
799
800 #[test]
801 fn activity_view_from_row_preserves_fields() -> Result<(), Box<dyn std::error::Error>> {
802 let ts = Utc
803 .with_ymd_and_hms(2026, 4, 12, 11, 0, 0)
804 .single()
805 .ok_or("fixed timestamp must be unambiguous")?;
806 let row = ActivityRow {
807 id: 42,
808 reflector: "XLX030".to_owned(),
809 module: Some("C".to_owned()),
810 callsign: "W1AW".to_owned(),
811 source: "xlx_monitor".to_owned(),
812 observed_at: ts,
813 };
814 let view = ActivityView::from(row);
815 assert_eq!(view.reflector, "XLX030");
816 assert_eq!(view.module.as_deref(), Some("C"));
817 assert_eq!(view.callsign, "W1AW");
818 assert_eq!(view.source, "xlx_monitor");
819 assert_eq!(view.observed_at, ts);
820 Ok(())
821 }
822
823 #[test]
824 fn stream_view_from_row_omits_audio_blob() -> Result<(), Box<dyn std::error::Error>> {
825 let ts = Utc
828 .with_ymd_and_hms(2026, 4, 12, 12, 0, 0)
829 .single()
830 .ok_or("fixed timestamp must be unambiguous")?;
831 let row = StreamRow {
832 id: 7,
833 reflector: "REF030".to_owned(),
834 module: "C".to_owned(),
835 protocol: "dplus".to_owned(),
836 stream_id: 1234,
837 callsign: "W1AW".to_owned(),
838 suffix: Some("D75".to_owned()),
839 ur_call: Some("CQCQCQ".to_owned()),
840 dstar_text: Some("hello".to_owned()),
841 dprs_lat: Some(42.0),
842 dprs_lon: Some(-71.0),
843 started_at: ts,
844 ended_at: Some(ts),
845 frame_count: Some(100),
846 audio_mp3: Some(vec![0xFF; 1024]),
847 upload_status: Some("pending".to_owned()),
848 upload_attempts: Some(0),
849 last_upload_error: None,
850 uploaded_at: None,
851 created_at: Some(ts),
852 };
853 let view = StreamView::from(row);
854 assert_eq!(view.id, 7);
855 assert_eq!(view.reflector, "REF030");
856 assert_eq!(view.module, "C");
857 assert_eq!(view.callsign, "W1AW");
858 assert_eq!(view.suffix.as_deref(), Some("D75"));
859 assert_eq!(view.dstar_text.as_deref(), Some("hello"));
860 assert_eq!(view.frame_count, Some(100));
861 assert_eq!(view.upload_status.as_deref(), Some("pending"));
862
863 let json = serde_json::to_string(&view)?;
868 assert!(
869 !json.contains("audio_mp3"),
870 "audio_mp3 must not appear in the JSON output: {json}"
871 );
872 assert!(
875 json.contains("\"ur_call\":\"CQCQCQ\""),
876 "ur_call must appear in the JSON output: {json}"
877 );
878 assert!(
879 json.contains("\"dprs_lat\":42.0"),
880 "dprs_lat must appear in the JSON output: {json}"
881 );
882 assert!(
883 json.contains("\"stream_id\":1234"),
884 "stream_id must appear in the JSON output: {json}"
885 );
886 Ok(())
887 }
888
889 #[test]
890 fn stream_query_route_pattern_matches_axum_syntax() {
891 const ACTIVITY_ROUTE: &str = "/api/reflectors/{callsign}/activity";
897 const TIER3_ROUTE: &str = "/api/tier3/{callsign}/{module}";
898 assert!(ACTIVITY_ROUTE.contains("{callsign}"));
899 assert!(TIER3_ROUTE.contains("{callsign}"));
900 assert!(TIER3_ROUTE.contains("{module}"));
901 assert!(!ACTIVITY_ROUTE.contains(":callsign"));
902 assert!(!TIER3_ROUTE.contains(":module"));
903 }
904}