stargazer/db/
activity.rs

1//! Query functions for the `activity_log` table.
2//!
3//! The activity log records timestamped observations of callsign activity on
4//! reflector modules. Each row represents one callsign heard at a specific
5//! instant, tagged with the data source that reported it.
6//!
7//! Sources include:
8//! - `"xlx_monitor"` — Tier 2 XLX UDP JSON monitor real-time events.
9//! - `"ircddb"` — Tier 1 ircDDB last-heard page scrapes.
10//! - `"pistar"` — Tier 1 Pi-Star host file activity hints.
11//!
12//! The table is append-only by design. Old rows are not updated; queries filter
13//! by `observed_at` to select recent activity windows.
14
15use chrono::{DateTime, Utc};
16use sqlx::PgPool;
17
18/// A single row from the `activity_log` table.
19///
20/// Maps directly to the table columns via `sqlx::FromRow`.
21#[derive(Debug, sqlx::FromRow)]
22pub(crate) struct ActivityRow {
23    /// Auto-generated row identifier.
24    ///
25    /// Populated by `sqlx::FromRow` from the `BIGSERIAL` primary key and
26    /// surfaced in `ActivityView` so clients can deduplicate rows across
27    /// paged queries.
28    pub(crate) id: i64,
29
30    /// Reflector callsign (foreign key to `reflectors.callsign`).
31    pub(crate) reflector: String,
32
33    /// Module letter (A-Z), or `None` if the source did not specify.
34    pub(crate) module: Option<String>,
35
36    /// Callsign of the operator or node that was heard.
37    pub(crate) callsign: String,
38
39    /// Data source that produced this observation.
40    pub(crate) source: String,
41
42    /// When the activity was observed.
43    pub(crate) observed_at: DateTime<Utc>,
44}
45
46/// Inserts a single activity observation.
47///
48/// Each call appends one row. Callers should batch observations where possible
49/// to reduce round-trips, but individual inserts are acceptable for real-time
50/// Tier 2 events.
51///
52/// # Errors
53///
54/// Returns `sqlx::Error` on connection or foreign-key constraint failures
55/// (the referenced reflector must exist in the `reflectors` table).
56pub(crate) async fn insert_observation(
57    pool: &PgPool,
58    reflector: &str,
59    module: Option<&str>,
60    callsign: &str,
61    source: &str,
62    observed_at: DateTime<Utc>,
63) -> Result<(), sqlx::Error> {
64    // Simple INSERT with all columns provided by the caller.
65    let _result = sqlx::query(
66        "INSERT INTO activity_log (reflector, module, callsign, source, observed_at)
67         VALUES ($1, $2, $3, $4, $5)",
68    )
69    .bind(reflector)
70    .bind(module)
71    .bind(callsign)
72    .bind(source)
73    .bind(observed_at)
74    .execute(pool)
75    .await?;
76    Ok(())
77}
78
79/// Returns recent activity across all reflectors since the given timestamp.
80///
81/// Results are ordered by `observed_at DESC` (most recent first) and capped
82/// at `limit` rows. Used by the HTTP API to serve the global activity feed
83/// (`GET /api/activity`).
84///
85/// # Errors
86///
87/// Returns `sqlx::Error` on query failure.
88pub(crate) async fn get_recent(
89    pool: &PgPool,
90    since: DateTime<Utc>,
91    limit: i64,
92) -> Result<Vec<ActivityRow>, sqlx::Error> {
93    // Time-bounded query with row limit, ordered by recency.
94    sqlx::query_as::<_, ActivityRow>(
95        "SELECT id, reflector, module, callsign, source, observed_at
96         FROM activity_log
97         WHERE observed_at >= $1
98         ORDER BY observed_at DESC
99         LIMIT $2",
100    )
101    .bind(since)
102    .bind(limit)
103    .fetch_all(pool)
104    .await
105}
106
107/// Returns activity for a specific reflector since the given timestamp.
108///
109/// Results are ordered by `observed_at DESC` (most recent first). Used by
110/// the HTTP API and Tier 2 promotion logic to assess per-reflector activity
111/// levels.
112///
113/// # Errors
114///
115/// Returns `sqlx::Error` on query failure.
116pub(crate) async fn get_for_reflector(
117    pool: &PgPool,
118    reflector: &str,
119    since: DateTime<Utc>,
120) -> Result<Vec<ActivityRow>, sqlx::Error> {
121    // Filters by reflector callsign and time window; uses the
122    // idx_activity_log_lookup composite index (reflector, module, observed_at).
123    sqlx::query_as::<_, ActivityRow>(
124        "SELECT id, reflector, module, callsign, source, observed_at
125         FROM activity_log
126         WHERE reflector = $1 AND observed_at >= $2
127         ORDER BY observed_at DESC",
128    )
129    .bind(reflector)
130    .bind(since)
131    .fetch_all(pool)
132    .await
133}