dstar_gateway_server/client_pool/
handle.rs

1//! `ClientHandle` — one linked client as tracked by the reflector.
2//!
3//! The pool stores protocol-erased [`ServerSessionCore`] instances
4//! keyed by `SocketAddr`. A phantom `P: Protocol` marker threads the
5//! protocol type through the API so callers can get DExtra/DPlus/DCS
6//! typed accessors without the storage itself being generic.
7//!
8//! This module also defines [`TokenBucket`], the per-client rate
9//! limiter used to cap the number of fan-out voice frames a single
10//! client can consume per second. It lives here (rather than in a
11//! dedicated module) to keep the per-handle state co-located.
12
13use std::marker::PhantomData;
14use std::time::Instant;
15
16use dstar_gateway_core::ServerSessionCore;
17use dstar_gateway_core::session::client::Protocol;
18use dstar_gateway_core::types::Module;
19
20use crate::reflector::AccessPolicy;
21
22/// Default burst capacity for per-client TX rate limiting, in frames.
23///
24/// Sized to absorb a ~1 second burst of voice at the nominal 20 fps
25/// D-STAR rate plus headroom.
26pub const DEFAULT_TX_BUDGET_MAX_TOKENS: u32 = 60;
27
28/// Default steady-state refill rate for per-client TX rate limiting,
29/// in frames per second.
30///
31/// Set to `60.0` (3× the nominal 20 fps D-STAR voice rate) to leave
32/// headroom for jitter and burstiness. A client legitimately
33/// transmitting audio will never hit the limit; a client trying to
34/// `DoS` the reflector with 200 fps of voice will.
35pub const DEFAULT_TX_BUDGET_REFILL_PER_SEC: f64 = 60.0;
36
37/// Rate limiter that caps the number of tokens consumed per second.
38///
39/// Used on [`ClientHandle`] to throttle how many fan-out voice
40/// frames a single client can absorb per second, so one slow or
41/// adversarial client can't monopolize the reflector's fan-out
42/// loop. Classic leaky/token-bucket hybrid: the bucket starts full
43/// and [`Self::try_consume`] refills by `refill_rate_per_sec *
44/// elapsed` tokens (capped at `max_tokens`) before attempting to
45/// withdraw.
46///
47/// This is a sans-io state machine: [`Self::try_consume`] takes
48/// the caller's current [`Instant`] and never calls
49/// [`Instant::now`] itself. Tests drive the clock forward by
50/// constructing synthetic instants.
51#[derive(Debug, Clone, Copy)]
52pub struct TokenBucket {
53    tokens: f64,
54    max_tokens: f64,
55    refill_rate_per_sec: f64,
56    last_refill: Instant,
57}
58
59impl TokenBucket {
60    /// Construct a new bucket full of tokens.
61    ///
62    /// - `max_tokens`: burst capacity; the bucket starts with this
63    ///   many tokens and refills up to (but never beyond) this
64    ///   value.
65    /// - `refill_rate_per_sec`: steady-state refill rate in tokens
66    ///   per second. A nominal 20 fps D-STAR voice stream would set
67    ///   this to `60.0` (3× nominal) to leave headroom for jitter.
68    /// - `now`: the wall-clock instant at which the bucket is
69    ///   constructed, used as the seed for the first refill delta.
70    #[must_use]
71    pub fn new(max_tokens: u32, refill_rate_per_sec: f64, now: Instant) -> Self {
72        let max_tokens = f64::from(max_tokens);
73        Self {
74            tokens: max_tokens,
75            max_tokens,
76            refill_rate_per_sec,
77            last_refill: now,
78        }
79    }
80
81    /// Attempt to withdraw `tokens` from the bucket at time `now`.
82    ///
83    /// Returns `true` if the bucket had enough tokens (withdrawal
84    /// committed); returns `false` otherwise.
85    pub fn try_consume(&mut self, now: Instant, tokens: u32) -> bool {
86        let elapsed = now.saturating_duration_since(self.last_refill);
87        let refill = elapsed.as_secs_f64() * self.refill_rate_per_sec;
88        self.tokens = (self.tokens + refill).min(self.max_tokens);
89        self.last_refill = now;
90
91        let cost = f64::from(tokens);
92        if self.tokens >= cost {
93            self.tokens -= cost;
94            true
95        } else {
96            false
97        }
98    }
99
100    /// Current (refill-approximated) token count, for tests/metrics.
101    #[must_use]
102    pub const fn tokens(&self) -> f64 {
103        self.tokens
104    }
105
106    /// Maximum tokens the bucket can hold.
107    #[must_use]
108    pub const fn max_tokens(&self) -> f64 {
109        self.max_tokens
110    }
111
112    /// Refill rate in tokens per second.
113    #[must_use]
114    pub const fn refill_rate_per_sec(&self) -> f64 {
115        self.refill_rate_per_sec
116    }
117}
118
119/// One entry in [`super::ClientPool`].
120///
121/// Tracks the per-peer server session, its module membership (if
122/// any), the last time we heard from the client, the access policy
123/// the authorizer granted, a running count of send failures so the
124/// fan-out engine can evict unhealthy peers, and a per-client TX
125/// token bucket used to rate-limit how many fan-out voice frames
126/// one client can consume per second.
127#[derive(Debug)]
128pub struct ClientHandle<P: Protocol> {
129    /// Protocol-erased server session state machine.
130    pub session: ServerSessionCore,
131    /// Module the client has linked to, if any.
132    pub module: Option<Module>,
133    /// Last time we received a datagram from this client.
134    pub last_heard: Instant,
135    /// Access policy granted by the authorizer.
136    pub access: AccessPolicy,
137    /// Monotonically increasing count of fan-out send failures.
138    pub send_failure_count: u32,
139    /// Per-client TX token bucket. Each outbound voice frame in
140    /// fan-out consumes one token; when the bucket is empty, the
141    /// frame is dropped for THIS peer (the other peers on the same
142    /// module still receive it). Rate-limited is NOT the same as
143    /// broken — the peer is not marked unhealthy.
144    pub tx_budget: TokenBucket,
145    _protocol: PhantomData<fn() -> P>,
146}
147
148impl<P: Protocol> ClientHandle<P> {
149    /// Construct a new handle for a freshly observed client.
150    ///
151    /// The TX budget is initialized with [`DEFAULT_TX_BUDGET_MAX_TOKENS`]
152    /// capacity and [`DEFAULT_TX_BUDGET_REFILL_PER_SEC`] refill rate.
153    #[must_use]
154    pub fn new(session: ServerSessionCore, access: AccessPolicy, now: Instant) -> Self {
155        Self::new_with_tx_budget(
156            session,
157            access,
158            now,
159            DEFAULT_TX_BUDGET_MAX_TOKENS,
160            DEFAULT_TX_BUDGET_REFILL_PER_SEC,
161        )
162    }
163
164    /// Construct a new handle with a caller-specified TX budget.
165    ///
166    /// Primarily used by tests that need to drive the rate limiter
167    /// past its limit in a single tick without waiting for real
168    /// wall-clock refill.
169    #[must_use]
170    pub fn new_with_tx_budget(
171        session: ServerSessionCore,
172        access: AccessPolicy,
173        now: Instant,
174        max_tokens: u32,
175        refill_rate_per_sec: f64,
176    ) -> Self {
177        Self {
178            session,
179            module: None,
180            last_heard: now,
181            access,
182            send_failure_count: 0,
183            tx_budget: TokenBucket::new(max_tokens, refill_rate_per_sec, now),
184            _protocol: PhantomData,
185        }
186    }
187}
188
189#[cfg(test)]
190mod tests {
191    use super::TokenBucket;
192    use std::time::{Duration, Instant};
193
194    #[test]
195    fn bucket_starts_full() {
196        let now = Instant::now();
197        let bucket = TokenBucket::new(5, 1.0, now);
198        assert!((bucket.tokens() - 5.0).abs() < 1e-9);
199        assert!((bucket.max_tokens() - 5.0).abs() < 1e-9);
200    }
201
202    #[test]
203    fn consume_under_budget_succeeds() {
204        let now = Instant::now();
205        let mut bucket = TokenBucket::new(3, 1.0, now);
206        assert!(bucket.try_consume(now, 1));
207        assert!(bucket.try_consume(now, 1));
208        assert!(bucket.try_consume(now, 1));
209    }
210
211    #[test]
212    fn consume_over_budget_fails() {
213        let now = Instant::now();
214        let mut bucket = TokenBucket::new(1, 1.0, now);
215        assert!(bucket.try_consume(now, 1), "first consume succeeds");
216        assert!(!bucket.try_consume(now, 1), "empty bucket rejects");
217    }
218
219    #[test]
220    fn bucket_refills_over_time() {
221        let start = Instant::now();
222        let mut bucket = TokenBucket::new(1, 1.0, start);
223        assert!(bucket.try_consume(start, 1), "first consume");
224        assert!(!bucket.try_consume(start, 1), "second fails at t0");
225        let t1 = start + Duration::from_secs(1);
226        assert!(bucket.try_consume(t1, 1), "refilled to 1 after 1s");
227    }
228
229    #[test]
230    fn bucket_refill_clamps_to_max_tokens() {
231        let start = Instant::now();
232        let mut bucket = TokenBucket::new(2, 10.0, start);
233        assert!(bucket.try_consume(start, 2));
234        let t1 = start + Duration::from_secs(10);
235        assert!(bucket.try_consume(t1, 2), "bucket refills to max");
236        assert!(!bucket.try_consume(t1, 1), "bucket empty again");
237    }
238
239    #[test]
240    fn consume_zero_tokens_always_succeeds() {
241        let now = Instant::now();
242        let mut bucket = TokenBucket::new(0, 0.0, now);
243        assert!(bucket.try_consume(now, 0), "zero cost always succeeds");
244    }
245
246    #[test]
247    fn regressing_clock_does_not_underflow() {
248        let later = Instant::now();
249        let mut bucket = TokenBucket::new(1, 1.0, later);
250        let earlier = later.checked_sub(Duration::from_secs(1)).unwrap_or(later);
251        assert!(bucket.try_consume(earlier, 1));
252    }
253}