stargazer/db/activity.rs
1//! Query functions for the `activity_log` table.
2//!
3//! The activity log records timestamped observations of callsign activity on
4//! reflector modules. Each row represents one callsign heard at a specific
5//! instant, tagged with the data source that reported it.
6//!
7//! Sources include:
8//! - `"xlx_monitor"` — Tier 2 XLX UDP JSON monitor real-time events.
9//! - `"ircddb"` — Tier 1 ircDDB last-heard page scrapes.
10//! - `"pistar"` — Tier 1 Pi-Star host file activity hints.
11//!
12//! The table is append-only by design. Old rows are not updated; queries filter
13//! by `observed_at` to select recent activity windows.
14
15use chrono::{DateTime, Utc};
16use sqlx::PgPool;
17
18/// A single row from the `activity_log` table.
19///
20/// Maps directly to the table columns via `sqlx::FromRow`.
21#[derive(Debug, sqlx::FromRow)]
22pub(crate) struct ActivityRow {
23 /// Auto-generated row identifier.
24 ///
25 /// Populated by `sqlx::FromRow` from the `BIGSERIAL` primary key and
26 /// surfaced in `ActivityView` so clients can deduplicate rows across
27 /// paged queries.
28 pub(crate) id: i64,
29
30 /// Reflector callsign (foreign key to `reflectors.callsign`).
31 pub(crate) reflector: String,
32
33 /// Module letter (A-Z), or `None` if the source did not specify.
34 pub(crate) module: Option<String>,
35
36 /// Callsign of the operator or node that was heard.
37 pub(crate) callsign: String,
38
39 /// Data source that produced this observation.
40 pub(crate) source: String,
41
42 /// When the activity was observed.
43 pub(crate) observed_at: DateTime<Utc>,
44}
45
46/// Inserts a single activity observation.
47///
48/// Each call appends one row. Callers should batch observations where possible
49/// to reduce round-trips, but individual inserts are acceptable for real-time
50/// Tier 2 events.
51///
52/// # Errors
53///
54/// Returns `sqlx::Error` on connection or foreign-key constraint failures
55/// (the referenced reflector must exist in the `reflectors` table).
56pub(crate) async fn insert_observation(
57 pool: &PgPool,
58 reflector: &str,
59 module: Option<&str>,
60 callsign: &str,
61 source: &str,
62 observed_at: DateTime<Utc>,
63) -> Result<(), sqlx::Error> {
64 // Simple INSERT with all columns provided by the caller.
65 let _result = sqlx::query(
66 "INSERT INTO activity_log (reflector, module, callsign, source, observed_at)
67 VALUES ($1, $2, $3, $4, $5)",
68 )
69 .bind(reflector)
70 .bind(module)
71 .bind(callsign)
72 .bind(source)
73 .bind(observed_at)
74 .execute(pool)
75 .await?;
76 Ok(())
77}
78
79/// Returns recent activity across all reflectors since the given timestamp.
80///
81/// Results are ordered by `observed_at DESC` (most recent first) and capped
82/// at `limit` rows. Used by the HTTP API to serve the global activity feed
83/// (`GET /api/activity`).
84///
85/// # Errors
86///
87/// Returns `sqlx::Error` on query failure.
88pub(crate) async fn get_recent(
89 pool: &PgPool,
90 since: DateTime<Utc>,
91 limit: i64,
92) -> Result<Vec<ActivityRow>, sqlx::Error> {
93 // Time-bounded query with row limit, ordered by recency.
94 sqlx::query_as::<_, ActivityRow>(
95 "SELECT id, reflector, module, callsign, source, observed_at
96 FROM activity_log
97 WHERE observed_at >= $1
98 ORDER BY observed_at DESC
99 LIMIT $2",
100 )
101 .bind(since)
102 .bind(limit)
103 .fetch_all(pool)
104 .await
105}
106
107/// Returns activity for a specific reflector since the given timestamp.
108///
109/// Results are ordered by `observed_at DESC` (most recent first). Used by
110/// the HTTP API and Tier 2 promotion logic to assess per-reflector activity
111/// levels.
112///
113/// # Errors
114///
115/// Returns `sqlx::Error` on query failure.
116pub(crate) async fn get_for_reflector(
117 pool: &PgPool,
118 reflector: &str,
119 since: DateTime<Utc>,
120) -> Result<Vec<ActivityRow>, sqlx::Error> {
121 // Filters by reflector callsign and time window; uses the
122 // idx_activity_log_lookup composite index (reflector, module, observed_at).
123 sqlx::query_as::<_, ActivityRow>(
124 "SELECT id, reflector, module, callsign, source, observed_at
125 FROM activity_log
126 WHERE reflector = $1 AND observed_at >= $2
127 ORDER BY observed_at DESC",
128 )
129 .bind(reflector)
130 .bind(since)
131 .fetch_all(pool)
132 .await
133}