stargazer/api/
routes.rs

1//! HTTP handler functions for the stargazer operational API.
2//!
3//! Each handler is an `async fn` that takes axum extractors (`State`, `Query`,
4//! `Path`) and returns either a JSON response or a `StatusCode` error. All
5//! database errors are logged at `warn` level and surfaced to the caller as
6//! `500 Internal Server Error` with a generic body — the raw `sqlx::Error`
7//! is never leaked because it may contain connection strings or schema
8//! details that are useful to an attacker.
9//!
10//! # Response types
11//!
12//! The handlers operate on database rows (`ReflectorRow`, `ActivityRow`,
13//! `StreamRow`) but serialize trimmed `*View` structs over the wire. The
14//! view structs intentionally omit columns that are operationally useless
15//! to the HTTP consumer (raw `audio_mp3` blobs, stream ids internal to the
16//! capture pipeline, etc.) and rename fields to the JSON conventions the
17//! operator dashboards expect.
18//!
19//! # Time-window parsing
20//!
21//! Query endpoints accept a `since` parameter of the form `<n><unit>` where
22//! `n` is a positive integer and `unit` is `s`, `m`, `h`, or `d`. The helper
23//! [`parse_since`] turns this into a `DateTime<Utc>` anchored at `Utc::now()`.
24//! A missing or unparseable value falls back to the endpoint-specific
25//! default (documented per handler).
26
27use std::time::Duration;
28
29use axum::extract::{Path, Query, State};
30use axum::http::StatusCode;
31use axum::response::{IntoResponse, Json};
32use chrono::{DateTime, Utc};
33use serde::{Deserialize, Serialize};
34use sqlx::PgPool;
35
36use crate::db;
37use crate::db::activity::ActivityRow;
38use crate::db::connected_nodes::ConnectedNodeRow;
39use crate::db::reflectors::ReflectorRow;
40use crate::db::streams::{StreamRow, StreamStatusCounts};
41
42/// Default time window for `/api/reflectors` when `since` is missing.
43///
44/// Matches the default Tier 2 activity threshold (30 minutes) so the
45/// dashboard's default view mirrors the monitor-pool selection logic.
46const DEFAULT_REFLECTOR_WINDOW: Duration = Duration::from_secs(1_800);
47
48/// Default time window for `/api/reflectors/{callsign}/activity`.
49///
50/// Six hours is a reasonable window for a "recent activity" pane: long
51/// enough to show multi-hour conversations, short enough that the query
52/// hits the composite `idx_activity_log_lookup` index cleanly.
53const DEFAULT_ACTIVITY_WINDOW: Duration = Duration::from_secs(21_600);
54
55/// Default time window for `/api/streams`.
56const DEFAULT_STREAM_WINDOW: Duration = Duration::from_secs(3_600);
57
58/// Default row cap for `/api/streams` when `limit` is missing.
59const DEFAULT_STREAM_LIMIT: i64 = 50;
60
61/// Default row cap for `/api/upload-queue` when `limit` is missing.
62const DEFAULT_UPLOAD_LIMIT: i64 = 50;
63
64/// Default row cap for `/api/activity` when `limit` is missing.
65const DEFAULT_ACTIVITY_LIMIT: i64 = 50;
66
67// ---------------------------------------------------------------------------
68// Query parameter structs
69// ---------------------------------------------------------------------------
70
71/// Query parameters for the `/api/reflectors` endpoint.
72#[derive(Debug, Default, Deserialize)]
73pub(crate) struct ReflectorQuery {
74    /// Relative time window, e.g. `"1h"`, `"30m"`, `"7d"`. Defaults to 30m.
75    pub(crate) since: Option<String>,
76}
77
78/// Query parameters for the `/api/reflectors/{callsign}/activity` endpoint.
79#[derive(Debug, Default, Deserialize)]
80pub(crate) struct ActivityQuery {
81    /// Relative time window, e.g. `"1h"`, `"24h"`, `"7d"`. Defaults to 6h.
82    pub(crate) since: Option<String>,
83}
84
85/// Query parameters for the `/api/streams` endpoint.
86#[derive(Debug, Default, Deserialize)]
87pub(crate) struct StreamQuery {
88    /// Relative time window, e.g. `"1h"`, `"24h"`, `"7d"`. Defaults to 1h.
89    pub(crate) since: Option<String>,
90
91    /// Optional reflector callsign filter. When present, only streams
92    /// captured from this reflector are returned.
93    pub(crate) reflector: Option<String>,
94
95    /// Row cap. Clamped to a max of 500 rows per response; defaults to 50.
96    pub(crate) limit: Option<i64>,
97}
98
99/// Query parameters for the `/api/upload-queue` endpoint.
100#[derive(Debug, Default, Deserialize)]
101pub(crate) struct UploadQueueQuery {
102    /// Row cap for the pending-queue slice. Capped at 500; defaults to 50.
103    pub(crate) limit: Option<i64>,
104}
105
106/// Query parameters for the `/api/activity` endpoint.
107#[derive(Debug, Default, Deserialize)]
108pub(crate) struct GlobalActivityQuery {
109    /// Relative time window, e.g. `"1h"`, `"24h"`, `"7d"`. Defaults to 6h.
110    pub(crate) since: Option<String>,
111
112    /// Row cap. Clamped to a max of 500 rows per response; defaults to 50.
113    pub(crate) limit: Option<i64>,
114}
115
116// ---------------------------------------------------------------------------
117// Response structs
118// ---------------------------------------------------------------------------
119
120/// JSON response body for the `/health` endpoint.
121#[derive(Debug, Serialize)]
122pub(crate) struct HealthResponse {
123    /// Always `"ok"` — stargazer responds with this whenever the HTTP
124    /// server task is alive.
125    pub(crate) status: &'static str,
126}
127
128/// JSON response body for the `/metrics` endpoint.
129///
130/// Aggregated counters across the `reflectors` and `streams` tables. Not
131/// Prometheus-formatted — this is an operational snapshot meant for human
132/// inspection or k8s health checks, not long-term time-series storage.
133#[derive(Debug, Serialize)]
134pub(crate) struct MetricsResponse {
135    /// Total rows in the `reflectors` table.
136    pub(crate) reflectors_count: i64,
137
138    /// Total captured streams.
139    pub(crate) streams_total: i64,
140
141    /// Streams awaiting upload.
142    pub(crate) streams_pending: i64,
143
144    /// Streams successfully uploaded to the Rdio API.
145    pub(crate) streams_uploaded: i64,
146
147    /// Streams that failed permanently (max retries exceeded or hard error).
148    pub(crate) streams_failed: i64,
149}
150
151/// JSON view of one reflector row.
152#[derive(Debug, Serialize)]
153pub(crate) struct ReflectorView {
154    /// Reflector callsign, e.g. `"REF030"`.
155    pub(crate) callsign: String,
156
157    /// Protocol family (`"dplus"`, `"dextra"`, or `"dcs"`).
158    pub(crate) protocol: String,
159
160    /// Last time any activity was observed.
161    pub(crate) last_seen: Option<DateTime<Utc>>,
162
163    /// Two-letter country code, if known.
164    pub(crate) country: Option<String>,
165
166    /// URL of the reflector's web dashboard, if known.
167    pub(crate) dashboard_url: Option<String>,
168
169    /// Whether the reflector supports Tier 2 XLX UDP monitor.
170    pub(crate) tier2_available: bool,
171
172    /// When this row was first inserted into the reflectors table.
173    ///
174    /// Operators use this to distinguish "recently discovered" reflectors
175    /// from long-established ones when triaging activity.
176    pub(crate) created_at: Option<DateTime<Utc>>,
177}
178
179impl From<ReflectorRow> for ReflectorView {
180    fn from(row: ReflectorRow) -> Self {
181        Self {
182            callsign: row.callsign,
183            protocol: row.protocol,
184            last_seen: row.last_seen,
185            country: row.country,
186            dashboard_url: row.dashboard_url,
187            tier2_available: row.tier2_available.unwrap_or(false),
188            created_at: row.created_at,
189        }
190    }
191}
192
193/// JSON view of one activity-log row.
194#[derive(Debug, Serialize)]
195pub(crate) struct ActivityView {
196    /// Auto-generated row identifier from the `activity_log` BIGSERIAL
197    /// primary key. Exposed so clients can deduplicate observations
198    /// across pages or correlate them with backend logs.
199    pub(crate) id: i64,
200
201    /// Reflector callsign where the activity was heard.
202    pub(crate) reflector: String,
203
204    /// Module letter (A-Z), if the source reported one.
205    pub(crate) module: Option<String>,
206
207    /// Operator or node callsign.
208    pub(crate) callsign: String,
209
210    /// Data source tag (`"xlx_monitor"`, `"ircddb"`, `"pistar"`).
211    pub(crate) source: String,
212
213    /// When the activity was observed.
214    pub(crate) observed_at: DateTime<Utc>,
215}
216
217impl From<ActivityRow> for ActivityView {
218    fn from(row: ActivityRow) -> Self {
219        Self {
220            id: row.id,
221            reflector: row.reflector,
222            module: row.module,
223            callsign: row.callsign,
224            source: row.source,
225            observed_at: row.observed_at,
226        }
227    }
228}
229
230/// JSON view of one captured stream.
231///
232/// Intentionally excludes `audio_mp3` — raw binary audio blobs have no
233/// place in a JSON response. Consumers who need audio fetch it separately
234/// via the upload URL once the row is marked uploaded.
235#[derive(Debug, Serialize)]
236pub(crate) struct StreamView {
237    /// Database row id.
238    pub(crate) id: i64,
239
240    /// Reflector callsign.
241    pub(crate) reflector: String,
242
243    /// Module letter.
244    pub(crate) module: String,
245
246    /// D-STAR protocol name.
247    pub(crate) protocol: String,
248
249    /// D-STAR wire stream ID (non-zero `u16`, stored as `i32`).
250    ///
251    /// Operators correlate this with gateway and reflector logs that
252    /// record the same raw header value.
253    pub(crate) stream_id: i32,
254
255    /// Operator callsign from the D-STAR header.
256    pub(crate) callsign: String,
257
258    /// Operator suffix, if any.
259    pub(crate) suffix: Option<String>,
260
261    /// `UR` (destination) callsign from the D-STAR header.
262    ///
263    /// Typically `CQCQCQ` for ragchews or a callsign/routing string for
264    /// directed traffic. Exposed so clients can filter and render the
265    /// destination without re-parsing the audio blob.
266    pub(crate) ur_call: Option<String>,
267
268    /// Decoded slow-data text message, if any.
269    pub(crate) dstar_text: Option<String>,
270
271    /// DPRS latitude (decimal degrees) decoded from the stream, if present.
272    pub(crate) dprs_lat: Option<f64>,
273
274    /// DPRS longitude (decimal degrees) decoded from the stream, if present.
275    pub(crate) dprs_lon: Option<f64>,
276
277    /// Start timestamp (first voice frame).
278    pub(crate) started_at: DateTime<Utc>,
279
280    /// End timestamp (EOT or timeout). `None` if still in progress.
281    pub(crate) ended_at: Option<DateTime<Utc>>,
282
283    /// Number of voice frames captured.
284    pub(crate) frame_count: Option<i32>,
285
286    /// Upload lifecycle state.
287    pub(crate) upload_status: Option<String>,
288
289    /// Error message from the most recent failed upload attempt, if any.
290    pub(crate) last_upload_error: Option<String>,
291
292    /// When the stream was successfully uploaded to the Rdio API, if at all.
293    pub(crate) uploaded_at: Option<DateTime<Utc>>,
294
295    /// When this row was first inserted into the streams table.
296    pub(crate) created_at: Option<DateTime<Utc>>,
297}
298
299impl From<StreamRow> for StreamView {
300    fn from(row: StreamRow) -> Self {
301        Self {
302            id: row.id,
303            reflector: row.reflector,
304            module: row.module,
305            protocol: row.protocol,
306            stream_id: row.stream_id,
307            callsign: row.callsign,
308            suffix: row.suffix,
309            ur_call: row.ur_call,
310            dstar_text: row.dstar_text,
311            dprs_lat: row.dprs_lat,
312            dprs_lon: row.dprs_lon,
313            started_at: row.started_at,
314            ended_at: row.ended_at,
315            frame_count: row.frame_count,
316            upload_status: row.upload_status,
317            last_upload_error: row.last_upload_error,
318            uploaded_at: row.uploaded_at,
319            created_at: row.created_at,
320        }
321    }
322}
323
324/// JSON view of one connected-node row.
325///
326/// A node is a gateway or hotspot currently linked to a reflector module,
327/// as reported by the Tier 2 XLX UDP monitor `nodes` push notifications.
328#[derive(Debug, Serialize)]
329pub(crate) struct ConnectedNodeView {
330    /// Reflector callsign the node is connected to.
331    pub(crate) reflector: String,
332
333    /// Node callsign with module suffix (e.g. `"W1AW  B"`).
334    pub(crate) node_callsign: String,
335
336    /// Reflector module letter the node is linked to, if reported.
337    pub(crate) module: Option<String>,
338
339    /// Protocol used by the node (e.g. `"dplus"`, `"dextra"`, `"dcs"`).
340    pub(crate) protocol: Option<String>,
341
342    /// When the node first connected, as reported by the reflector.
343    pub(crate) connected_since: Option<DateTime<Utc>>,
344
345    /// When the node was last seen in a monitor update.
346    pub(crate) last_heard: Option<DateTime<Utc>>,
347}
348
349impl From<ConnectedNodeRow> for ConnectedNodeView {
350    fn from(row: ConnectedNodeRow) -> Self {
351        Self {
352            reflector: row.reflector,
353            node_callsign: row.node_callsign,
354            module: row.module,
355            protocol: row.protocol,
356            connected_since: row.connected_since,
357            last_heard: row.last_heard,
358        }
359    }
360}
361
362// ---------------------------------------------------------------------------
363// Helpers
364// ---------------------------------------------------------------------------
365
366/// Parses a relative time string like `"1h"` or `"30m"` into a `Duration`.
367///
368/// Accepts a positive integer followed by exactly one unit character:
369///
370/// | Suffix | Unit |
371/// |--------|------|
372/// | `s`    | seconds |
373/// | `m`    | minutes |
374/// | `h`    | hours |
375/// | `d`    | days |
376///
377/// Returns `None` for any malformed input (missing digits, unknown unit,
378/// overflow). Callers fall back to endpoint-specific defaults.
379fn parse_duration_string(s: &str) -> Option<Duration> {
380    let s = s.trim();
381    let (num_str, unit) = s.split_at(s.len().checked_sub(1)?);
382    if num_str.is_empty() {
383        return None;
384    }
385    let n: u64 = num_str.parse().ok()?;
386    let secs = match unit {
387        "s" => n,
388        "m" => n.checked_mul(60)?,
389        "h" => n.checked_mul(3600)?,
390        "d" => n.checked_mul(86_400)?,
391        _ => return None,
392    };
393    Some(Duration::from_secs(secs))
394}
395
396/// Turns a relative `since` string into an absolute `DateTime<Utc>`.
397///
398/// `None` or a malformed input falls back to `now - default`.
399fn parse_since(since: Option<&str>, default: Duration) -> DateTime<Utc> {
400    let window = since.and_then(parse_duration_string).unwrap_or(default);
401    // Negative or overflowing durations wrap to the default window anchored
402    // at `now`. chrono::Duration handles u64→i64 conversions with saturation.
403    let window_chrono =
404        chrono::Duration::from_std(window).unwrap_or_else(|_| chrono::Duration::seconds(0));
405    Utc::now() - window_chrono
406}
407
408/// Clamps a caller-provided row limit to the configured maximum.
409///
410/// `None`, zero, or negative values return the default. Values above the
411/// maximum are clamped down so the database is never forced to materialise
412/// an unbounded result set.
413fn clamp_limit(provided: Option<i64>, default: i64, max: i64) -> i64 {
414    let requested = provided.unwrap_or(default);
415    if requested <= 0 {
416        default
417    } else {
418        requested.min(max)
419    }
420}
421
422/// Maximum rows any query endpoint will return regardless of `limit`.
423const MAX_LIMIT: i64 = 500;
424
425// ---------------------------------------------------------------------------
426// Handlers
427// ---------------------------------------------------------------------------
428
429/// `GET /health` — kubernetes liveness/readiness probe.
430///
431/// Always returns `200 OK` with body `{"status":"ok"}`. The endpoint is
432/// deliberately side-effect-free (no database touch) so a database stall
433/// does not take the whole pod out of the load balancer.
434pub(crate) async fn health() -> Json<HealthResponse> {
435    Json(HealthResponse { status: "ok" })
436}
437
438/// `GET /metrics` — tier statistics snapshot.
439///
440/// Returns aggregated counters: total reflectors and stream upload-status
441/// counts. Used by operators to answer "is capture working?" at a glance.
442///
443/// Logs and returns `500` on database errors.
444pub(crate) async fn metrics(
445    State(pool): State<PgPool>,
446) -> Result<Json<MetricsResponse>, StatusCode> {
447    let reflectors_count = db::reflectors::count_total(&pool).await.map_err(|e| {
448        tracing::warn!(error = %e, "metrics: failed to count reflectors");
449        StatusCode::INTERNAL_SERVER_ERROR
450    })?;
451
452    let StreamStatusCounts {
453        total,
454        pending,
455        uploaded,
456        failed,
457    } = db::streams::count_by_status(&pool).await.map_err(|e| {
458        tracing::warn!(error = %e, "metrics: failed to count streams");
459        StatusCode::INTERNAL_SERVER_ERROR
460    })?;
461
462    Ok(Json(MetricsResponse {
463        reflectors_count,
464        streams_total: total,
465        streams_pending: pending,
466        streams_uploaded: uploaded,
467        streams_failed: failed,
468    }))
469}
470
471/// `GET /api/reflectors` — list active reflectors with status.
472///
473/// Returns reflectors that have been observed within the `since` window
474/// (default 30m), ordered by most-recently-seen first.
475pub(crate) async fn list_reflectors(
476    State(pool): State<PgPool>,
477    Query(params): Query<ReflectorQuery>,
478) -> Result<Json<Vec<ReflectorView>>, StatusCode> {
479    let since = parse_since(params.since.as_deref(), DEFAULT_REFLECTOR_WINDOW);
480    let rows = db::reflectors::get_active(&pool, since)
481        .await
482        .map_err(|e| {
483            tracing::warn!(error = %e, "list_reflectors: query failed");
484            StatusCode::INTERNAL_SERVER_ERROR
485        })?;
486    Ok(Json(rows.into_iter().map(ReflectorView::from).collect()))
487}
488
489/// `GET /api/reflectors/{callsign}/activity` — recent activity for one reflector.
490///
491/// Returns the `activity_log` rows for `callsign` within the `since`
492/// window (default 6h), ordered most-recent-first.
493pub(crate) async fn reflector_activity(
494    State(pool): State<PgPool>,
495    Path(callsign): Path<String>,
496    Query(params): Query<ActivityQuery>,
497) -> Result<Json<Vec<ActivityView>>, StatusCode> {
498    let since = parse_since(params.since.as_deref(), DEFAULT_ACTIVITY_WINDOW);
499    let rows = db::activity::get_for_reflector(&pool, &callsign, since)
500        .await
501        .map_err(|e| {
502            tracing::warn!(error = %e, callsign = %callsign, "reflector_activity: query failed");
503            StatusCode::INTERNAL_SERVER_ERROR
504        })?;
505    Ok(Json(rows.into_iter().map(ActivityView::from).collect()))
506}
507
508/// `GET /api/streams` — query captured streams with filters.
509///
510/// Supports `?since=<duration>`, `?reflector=<callsign>`, `?limit=<n>`.
511/// Default window is 1h, default limit 50, max limit 500.
512pub(crate) async fn list_streams(
513    State(pool): State<PgPool>,
514    Query(params): Query<StreamQuery>,
515) -> Result<Json<Vec<StreamView>>, StatusCode> {
516    let since = parse_since(params.since.as_deref(), DEFAULT_STREAM_WINDOW);
517    let limit = clamp_limit(params.limit, DEFAULT_STREAM_LIMIT, MAX_LIMIT);
518    let reflector_filter = params.reflector.as_deref();
519    let rows = db::streams::query(&pool, reflector_filter, since, limit)
520        .await
521        .map_err(|e| {
522            tracing::warn!(error = %e, "list_streams: query failed");
523            StatusCode::INTERNAL_SERVER_ERROR
524        })?;
525    Ok(Json(rows.into_iter().map(StreamView::from).collect()))
526}
527
528/// `GET /api/upload-queue` — pending and failed uploads.
529///
530/// Returns streams currently awaiting upload, ordered oldest-first (FIFO
531/// matches the upload processor's drain order). Failed streams are not
532/// included; they are terminal and viewable via `/api/streams`.
533pub(crate) async fn upload_queue(
534    State(pool): State<PgPool>,
535    Query(params): Query<UploadQueueQuery>,
536) -> Result<Json<Vec<StreamView>>, StatusCode> {
537    let limit = clamp_limit(params.limit, DEFAULT_UPLOAD_LIMIT, MAX_LIMIT);
538    let rows = db::uploads::get_pending(&pool, limit).await.map_err(|e| {
539        tracing::warn!(error = %e, "upload_queue: query failed");
540        StatusCode::INTERNAL_SERVER_ERROR
541    })?;
542    Ok(Json(rows.into_iter().map(StreamView::from).collect()))
543}
544
545/// `GET /api/reflectors/{callsign}/nodes` — nodes linked to a reflector.
546///
547/// Returns the roster of gateways and hotspots currently linked to the
548/// reflector's modules as reported by the most recent Tier 2 monitor
549/// snapshot. Entries are ordered by `last_heard DESC NULLS LAST`.
550pub(crate) async fn reflector_nodes(
551    State(pool): State<PgPool>,
552    Path(callsign): Path<String>,
553) -> Result<Json<Vec<ConnectedNodeView>>, StatusCode> {
554    let rows = db::connected_nodes::get_for_reflector(&pool, &callsign)
555        .await
556        .map_err(|e| {
557            tracing::warn!(error = %e, callsign = %callsign, "reflector_nodes: query failed");
558            StatusCode::INTERNAL_SERVER_ERROR
559        })?;
560    Ok(Json(
561        rows.into_iter().map(ConnectedNodeView::from).collect(),
562    ))
563}
564
565/// `GET /api/activity` — global activity feed across all reflectors.
566///
567/// Returns `activity_log` rows within the `since` window (default 6h),
568/// ordered most-recent-first and capped at `limit` rows (default 50,
569/// max 500). Mirrors the per-reflector `/api/reflectors/{cs}/activity`
570/// endpoint but across the whole registry.
571pub(crate) async fn list_activity(
572    State(pool): State<PgPool>,
573    Query(params): Query<GlobalActivityQuery>,
574) -> Result<Json<Vec<ActivityView>>, StatusCode> {
575    let since = parse_since(params.since.as_deref(), DEFAULT_ACTIVITY_WINDOW);
576    let limit = clamp_limit(params.limit, DEFAULT_ACTIVITY_LIMIT, MAX_LIMIT);
577    let rows = db::activity::get_recent(&pool, since, limit)
578        .await
579        .map_err(|e| {
580            tracing::warn!(error = %e, "list_activity: query failed");
581            StatusCode::INTERNAL_SERVER_ERROR
582        })?;
583    Ok(Json(rows.into_iter().map(ActivityView::from).collect()))
584}
585
586/// `POST /api/tier3/connect` — manually promote a reflector to Tier 3.
587///
588/// Stub: returns `501 Not Implemented` until the Tier 3 orchestrator
589/// exposes a management channel. The route is live so callers discover
590/// the endpoint and get a correct protocol-level response, not a 404.
591pub(crate) async fn tier3_connect() -> impl IntoResponse {
592    StatusCode::NOT_IMPLEMENTED
593}
594
595/// `DELETE /api/tier3/{callsign}/{module}` — disconnect a Tier 3 session.
596///
597/// Stub: returns `501 Not Implemented`. The path parameters are parsed and
598/// validated (both must be non-empty ASCII) so that when the orchestrator
599/// wiring lands the extractor signature need not change.
600pub(crate) async fn tier3_disconnect(
601    Path((_callsign, _module)): Path<(String, String)>,
602) -> impl IntoResponse {
603    StatusCode::NOT_IMPLEMENTED
604}
605
606// ---------------------------------------------------------------------------
607// Unit tests
608// ---------------------------------------------------------------------------
609
610#[cfg(test)]
611mod tests {
612    use super::{
613        ActivityView, MAX_LIMIT, ReflectorView, StreamView, clamp_limit, health,
614        parse_duration_string, parse_since,
615    };
616    use crate::db::activity::ActivityRow;
617    use crate::db::reflectors::ReflectorRow;
618    use crate::db::streams::StreamRow;
619    use chrono::{TimeZone, Utc};
620    use std::time::Duration;
621
622    #[test]
623    fn parse_duration_seconds() {
624        assert_eq!(parse_duration_string("30s"), Some(Duration::from_secs(30)));
625        assert_eq!(parse_duration_string("1s"), Some(Duration::from_secs(1)));
626    }
627
628    #[test]
629    fn parse_duration_minutes() {
630        assert_eq!(parse_duration_string("1m"), Some(Duration::from_secs(60)));
631        assert_eq!(
632            parse_duration_string("90m"),
633            Some(Duration::from_secs(5_400))
634        );
635    }
636
637    #[test]
638    fn parse_duration_hours() {
639        assert_eq!(
640            parse_duration_string("1h"),
641            Some(Duration::from_secs(3_600))
642        );
643        assert_eq!(
644            parse_duration_string("24h"),
645            Some(Duration::from_secs(86_400))
646        );
647    }
648
649    #[test]
650    fn parse_duration_days() {
651        assert_eq!(
652            parse_duration_string("1d"),
653            Some(Duration::from_secs(86_400))
654        );
655        assert_eq!(
656            parse_duration_string("7d"),
657            Some(Duration::from_secs(604_800))
658        );
659    }
660
661    #[test]
662    fn parse_duration_rejects_malformed() {
663        // No unit suffix.
664        assert_eq!(parse_duration_string("30"), None);
665        // Unknown unit.
666        assert_eq!(parse_duration_string("5x"), None);
667        // Empty digits.
668        assert_eq!(parse_duration_string("h"), None);
669        // Empty string.
670        assert_eq!(parse_duration_string(""), None);
671        // Negative values not supported.
672        assert_eq!(parse_duration_string("-1h"), None);
673        // Trailing whitespace after unit.
674        assert_eq!(parse_duration_string("1 h"), None);
675    }
676
677    #[test]
678    fn parse_duration_handles_overflow() {
679        // 2^64 / 86400 ≈ 2.1e14; feed a value that overflows u64 * 86400.
680        assert_eq!(parse_duration_string("999999999999999999d"), None);
681    }
682
683    #[test]
684    fn parse_since_defaults_when_missing() {
685        // With no `since` and a 1h default, the returned timestamp should be
686        // ~1h before now. Allow 5 seconds of slack for test runtime.
687        let t = parse_since(None, Duration::from_secs(3600));
688        let delta = Utc::now().signed_duration_since(t);
689        assert!(
690            (3590..=3605).contains(&delta.num_seconds()),
691            "expected ~3600s ago, got {}s",
692            delta.num_seconds()
693        );
694    }
695
696    #[test]
697    fn parse_since_uses_default_on_bad_input() {
698        // Malformed input silently falls back to default rather than
699        // returning an HTTP error — permissive parsing for the UI.
700        let t = parse_since(Some("bogus"), Duration::from_secs(600));
701        let delta = Utc::now().signed_duration_since(t);
702        assert!(
703            (595..=605).contains(&delta.num_seconds()),
704            "expected ~600s ago, got {}s",
705            delta.num_seconds()
706        );
707    }
708
709    #[test]
710    fn parse_since_honors_valid_input() {
711        let t = parse_since(Some("2h"), Duration::from_secs(60));
712        let delta = Utc::now().signed_duration_since(t);
713        assert!(
714            (7190..=7210).contains(&delta.num_seconds()),
715            "expected ~7200s ago, got {}s",
716            delta.num_seconds()
717        );
718    }
719
720    #[test]
721    fn clamp_limit_applies_default_for_none() {
722        assert_eq!(clamp_limit(None, 50, MAX_LIMIT), 50);
723    }
724
725    #[test]
726    fn clamp_limit_applies_default_for_nonpositive() {
727        assert_eq!(clamp_limit(Some(0), 50, MAX_LIMIT), 50);
728        assert_eq!(clamp_limit(Some(-1), 50, MAX_LIMIT), 50);
729        assert_eq!(clamp_limit(Some(-9_999), 50, MAX_LIMIT), 50);
730    }
731
732    #[test]
733    fn clamp_limit_caps_at_max() {
734        assert_eq!(clamp_limit(Some(9_999), 50, MAX_LIMIT), MAX_LIMIT);
735        assert_eq!(clamp_limit(Some(MAX_LIMIT + 1), 50, MAX_LIMIT), MAX_LIMIT);
736    }
737
738    #[test]
739    fn clamp_limit_passes_valid_values() {
740        assert_eq!(clamp_limit(Some(1), 50, MAX_LIMIT), 1);
741        assert_eq!(clamp_limit(Some(100), 50, MAX_LIMIT), 100);
742        assert_eq!(clamp_limit(Some(MAX_LIMIT), 50, MAX_LIMIT), MAX_LIMIT);
743    }
744
745    #[tokio::test]
746    async fn health_returns_ok_literal() {
747        let resp = health().await;
748        assert_eq!(resp.0.status, "ok");
749    }
750
751    #[test]
752    fn reflector_view_from_row_maps_all_fields() -> Result<(), Box<dyn std::error::Error>> {
753        // Use a specific, deterministic timestamp so the field mapping
754        // is actually verified — not just that "some" value gets copied.
755        let ts = Utc
756            .with_ymd_and_hms(2026, 4, 12, 10, 0, 0)
757            .single()
758            .ok_or("fixed timestamp must be unambiguous")?;
759        let row = ReflectorRow {
760            callsign: "REF030".to_owned(),
761            protocol: "dplus".to_owned(),
762            ip_address: Some("1.2.3.4".to_owned()),
763            dashboard_url: Some("https://example.invalid".to_owned()),
764            country: Some("US".to_owned()),
765            last_seen: Some(ts),
766            tier2_available: Some(true),
767            created_at: Some(ts),
768        };
769        let view = ReflectorView::from(row);
770        assert_eq!(view.callsign, "REF030");
771        assert_eq!(view.protocol, "dplus");
772        assert_eq!(view.country.as_deref(), Some("US"));
773        assert_eq!(
774            view.dashboard_url.as_deref(),
775            Some("https://example.invalid")
776        );
777        assert_eq!(view.last_seen, Some(ts));
778        assert!(view.tier2_available);
779        Ok(())
780    }
781
782    #[test]
783    fn reflector_view_tier2_available_defaults_false() {
784        // NULL in the database must not surface as `null` in JSON — we
785        // document tier2_available as a bool, so `None` becomes `false`.
786        let row = ReflectorRow {
787            callsign: "DCS030".to_owned(),
788            protocol: "dcs".to_owned(),
789            ip_address: None,
790            dashboard_url: None,
791            country: None,
792            last_seen: None,
793            tier2_available: None,
794            created_at: None,
795        };
796        let view = ReflectorView::from(row);
797        assert!(!view.tier2_available);
798    }
799
800    #[test]
801    fn activity_view_from_row_preserves_fields() -> Result<(), Box<dyn std::error::Error>> {
802        let ts = Utc
803            .with_ymd_and_hms(2026, 4, 12, 11, 0, 0)
804            .single()
805            .ok_or("fixed timestamp must be unambiguous")?;
806        let row = ActivityRow {
807            id: 42,
808            reflector: "XLX030".to_owned(),
809            module: Some("C".to_owned()),
810            callsign: "W1AW".to_owned(),
811            source: "xlx_monitor".to_owned(),
812            observed_at: ts,
813        };
814        let view = ActivityView::from(row);
815        assert_eq!(view.reflector, "XLX030");
816        assert_eq!(view.module.as_deref(), Some("C"));
817        assert_eq!(view.callsign, "W1AW");
818        assert_eq!(view.source, "xlx_monitor");
819        assert_eq!(view.observed_at, ts);
820        Ok(())
821    }
822
823    #[test]
824    fn stream_view_from_row_omits_audio_blob() -> Result<(), Box<dyn std::error::Error>> {
825        // The HTTP view intentionally drops audio_mp3; confirm it is not
826        // accidentally added back (the field isn't present in StreamView).
827        let ts = Utc
828            .with_ymd_and_hms(2026, 4, 12, 12, 0, 0)
829            .single()
830            .ok_or("fixed timestamp must be unambiguous")?;
831        let row = StreamRow {
832            id: 7,
833            reflector: "REF030".to_owned(),
834            module: "C".to_owned(),
835            protocol: "dplus".to_owned(),
836            stream_id: 1234,
837            callsign: "W1AW".to_owned(),
838            suffix: Some("D75".to_owned()),
839            ur_call: Some("CQCQCQ".to_owned()),
840            dstar_text: Some("hello".to_owned()),
841            dprs_lat: Some(42.0),
842            dprs_lon: Some(-71.0),
843            started_at: ts,
844            ended_at: Some(ts),
845            frame_count: Some(100),
846            audio_mp3: Some(vec![0xFF; 1024]),
847            upload_status: Some("pending".to_owned()),
848            upload_attempts: Some(0),
849            last_upload_error: None,
850            uploaded_at: None,
851            created_at: Some(ts),
852        };
853        let view = StreamView::from(row);
854        assert_eq!(view.id, 7);
855        assert_eq!(view.reflector, "REF030");
856        assert_eq!(view.module, "C");
857        assert_eq!(view.callsign, "W1AW");
858        assert_eq!(view.suffix.as_deref(), Some("D75"));
859        assert_eq!(view.dstar_text.as_deref(), Some("hello"));
860        assert_eq!(view.frame_count, Some(100));
861        assert_eq!(view.upload_status.as_deref(), Some("pending"));
862
863        // Serialize to JSON and confirm audio_mp3 is absent. This is the
864        // real "system" test — the view's serialization is the HTTP
865        // wire contract. Every other row field is surfaced in the view,
866        // so this is the single exclusion we guard.
867        let json = serde_json::to_string(&view)?;
868        assert!(
869            !json.contains("audio_mp3"),
870            "audio_mp3 must not appear in the JSON output: {json}"
871        );
872        // ur_call, dprs_lat, dprs_lon, last_upload_error, uploaded_at,
873        // created_at, and stream_id are surfaced for operator visibility.
874        assert!(
875            json.contains("\"ur_call\":\"CQCQCQ\""),
876            "ur_call must appear in the JSON output: {json}"
877        );
878        assert!(
879            json.contains("\"dprs_lat\":42.0"),
880            "dprs_lat must appear in the JSON output: {json}"
881        );
882        assert!(
883            json.contains("\"stream_id\":1234"),
884            "stream_id must appear in the JSON output: {json}"
885        );
886        Ok(())
887    }
888
889    #[test]
890    fn stream_query_route_pattern_matches_axum_syntax() {
891        // Regression guard: axum 0.8 uses `{name}` path params, not
892        // `:name` (the 0.7-and-earlier syntax). A wrong syntax here
893        // compiles fine but 404s on every request. Confirm the routes
894        // we register use the modern form by inspecting the string
895        // constants.
896        const ACTIVITY_ROUTE: &str = "/api/reflectors/{callsign}/activity";
897        const TIER3_ROUTE: &str = "/api/tier3/{callsign}/{module}";
898        assert!(ACTIVITY_ROUTE.contains("{callsign}"));
899        assert!(TIER3_ROUTE.contains("{callsign}"));
900        assert!(TIER3_ROUTE.contains("{module}"));
901        assert!(!ACTIVITY_ROUTE.contains(":callsign"));
902        assert!(!TIER3_ROUTE.contains(":module"));
903    }
904}