dstar_gateway_server/client_pool/handle.rs
1//! `ClientHandle` — one linked client as tracked by the reflector.
2//!
3//! The pool stores protocol-erased [`ServerSessionCore`] instances
4//! keyed by `SocketAddr`. A phantom `P: Protocol` marker threads the
5//! protocol type through the API so callers can get DExtra/DPlus/DCS
6//! typed accessors without the storage itself being generic.
7//!
8//! This module also defines [`TokenBucket`], the per-client rate
9//! limiter used to cap the number of fan-out voice frames a single
10//! client can consume per second. It lives here (rather than in a
11//! dedicated module) to keep the per-handle state co-located.
12
13use std::marker::PhantomData;
14use std::time::Instant;
15
16use dstar_gateway_core::ServerSessionCore;
17use dstar_gateway_core::session::client::Protocol;
18use dstar_gateway_core::types::Module;
19
20use crate::reflector::AccessPolicy;
21
22/// Default burst capacity for per-client TX rate limiting, in frames.
23///
24/// Sized to absorb a ~1 second burst of voice at the nominal 20 fps
25/// D-STAR rate plus headroom.
26pub const DEFAULT_TX_BUDGET_MAX_TOKENS: u32 = 60;
27
28/// Default steady-state refill rate for per-client TX rate limiting,
29/// in frames per second.
30///
31/// Set to `60.0` (3× the nominal 20 fps D-STAR voice rate) to leave
32/// headroom for jitter and burstiness. A client legitimately
33/// transmitting audio will never hit the limit; a client trying to
34/// `DoS` the reflector with 200 fps of voice will.
35pub const DEFAULT_TX_BUDGET_REFILL_PER_SEC: f64 = 60.0;
36
37/// Rate limiter that caps the number of tokens consumed per second.
38///
39/// Used on [`ClientHandle`] to throttle how many fan-out voice
40/// frames a single client can absorb per second, so one slow or
41/// adversarial client can't monopolize the reflector's fan-out
42/// loop. Classic leaky/token-bucket hybrid: the bucket starts full
43/// and [`Self::try_consume`] refills by `refill_rate_per_sec *
44/// elapsed` tokens (capped at `max_tokens`) before attempting to
45/// withdraw.
46///
47/// This is a sans-io state machine: [`Self::try_consume`] takes
48/// the caller's current [`Instant`] and never calls
49/// [`Instant::now`] itself. Tests drive the clock forward by
50/// constructing synthetic instants.
51#[derive(Debug, Clone, Copy)]
52pub struct TokenBucket {
53 tokens: f64,
54 max_tokens: f64,
55 refill_rate_per_sec: f64,
56 last_refill: Instant,
57}
58
59impl TokenBucket {
60 /// Construct a new bucket full of tokens.
61 ///
62 /// - `max_tokens`: burst capacity; the bucket starts with this
63 /// many tokens and refills up to (but never beyond) this
64 /// value.
65 /// - `refill_rate_per_sec`: steady-state refill rate in tokens
66 /// per second. A nominal 20 fps D-STAR voice stream would set
67 /// this to `60.0` (3× nominal) to leave headroom for jitter.
68 /// - `now`: the wall-clock instant at which the bucket is
69 /// constructed, used as the seed for the first refill delta.
70 #[must_use]
71 pub fn new(max_tokens: u32, refill_rate_per_sec: f64, now: Instant) -> Self {
72 let max_tokens = f64::from(max_tokens);
73 Self {
74 tokens: max_tokens,
75 max_tokens,
76 refill_rate_per_sec,
77 last_refill: now,
78 }
79 }
80
81 /// Attempt to withdraw `tokens` from the bucket at time `now`.
82 ///
83 /// Returns `true` if the bucket had enough tokens (withdrawal
84 /// committed); returns `false` otherwise.
85 pub fn try_consume(&mut self, now: Instant, tokens: u32) -> bool {
86 let elapsed = now.saturating_duration_since(self.last_refill);
87 let refill = elapsed.as_secs_f64() * self.refill_rate_per_sec;
88 self.tokens = (self.tokens + refill).min(self.max_tokens);
89 self.last_refill = now;
90
91 let cost = f64::from(tokens);
92 if self.tokens >= cost {
93 self.tokens -= cost;
94 true
95 } else {
96 false
97 }
98 }
99
100 /// Current (refill-approximated) token count, for tests/metrics.
101 #[must_use]
102 pub const fn tokens(&self) -> f64 {
103 self.tokens
104 }
105
106 /// Maximum tokens the bucket can hold.
107 #[must_use]
108 pub const fn max_tokens(&self) -> f64 {
109 self.max_tokens
110 }
111
112 /// Refill rate in tokens per second.
113 #[must_use]
114 pub const fn refill_rate_per_sec(&self) -> f64 {
115 self.refill_rate_per_sec
116 }
117}
118
119/// One entry in [`super::ClientPool`].
120///
121/// Tracks the per-peer server session, its module membership (if
122/// any), the last time we heard from the client, the access policy
123/// the authorizer granted, a running count of send failures so the
124/// fan-out engine can evict unhealthy peers, and a per-client TX
125/// token bucket used to rate-limit how many fan-out voice frames
126/// one client can consume per second.
127#[derive(Debug)]
128pub struct ClientHandle<P: Protocol> {
129 /// Protocol-erased server session state machine.
130 pub session: ServerSessionCore,
131 /// Module the client has linked to, if any.
132 pub module: Option<Module>,
133 /// Last time we received a datagram from this client.
134 pub last_heard: Instant,
135 /// Access policy granted by the authorizer.
136 pub access: AccessPolicy,
137 /// Monotonically increasing count of fan-out send failures.
138 pub send_failure_count: u32,
139 /// Per-client TX token bucket. Each outbound voice frame in
140 /// fan-out consumes one token; when the bucket is empty, the
141 /// frame is dropped for THIS peer (the other peers on the same
142 /// module still receive it). Rate-limited is NOT the same as
143 /// broken — the peer is not marked unhealthy.
144 pub tx_budget: TokenBucket,
145 _protocol: PhantomData<fn() -> P>,
146}
147
148impl<P: Protocol> ClientHandle<P> {
149 /// Construct a new handle for a freshly observed client.
150 ///
151 /// The TX budget is initialized with [`DEFAULT_TX_BUDGET_MAX_TOKENS`]
152 /// capacity and [`DEFAULT_TX_BUDGET_REFILL_PER_SEC`] refill rate.
153 #[must_use]
154 pub fn new(session: ServerSessionCore, access: AccessPolicy, now: Instant) -> Self {
155 Self::new_with_tx_budget(
156 session,
157 access,
158 now,
159 DEFAULT_TX_BUDGET_MAX_TOKENS,
160 DEFAULT_TX_BUDGET_REFILL_PER_SEC,
161 )
162 }
163
164 /// Construct a new handle with a caller-specified TX budget.
165 ///
166 /// Primarily used by tests that need to drive the rate limiter
167 /// past its limit in a single tick without waiting for real
168 /// wall-clock refill.
169 #[must_use]
170 pub fn new_with_tx_budget(
171 session: ServerSessionCore,
172 access: AccessPolicy,
173 now: Instant,
174 max_tokens: u32,
175 refill_rate_per_sec: f64,
176 ) -> Self {
177 Self {
178 session,
179 module: None,
180 last_heard: now,
181 access,
182 send_failure_count: 0,
183 tx_budget: TokenBucket::new(max_tokens, refill_rate_per_sec, now),
184 _protocol: PhantomData,
185 }
186 }
187}
188
189#[cfg(test)]
190mod tests {
191 use super::TokenBucket;
192 use std::time::{Duration, Instant};
193
194 #[test]
195 fn bucket_starts_full() {
196 let now = Instant::now();
197 let bucket = TokenBucket::new(5, 1.0, now);
198 assert!((bucket.tokens() - 5.0).abs() < 1e-9);
199 assert!((bucket.max_tokens() - 5.0).abs() < 1e-9);
200 }
201
202 #[test]
203 fn consume_under_budget_succeeds() {
204 let now = Instant::now();
205 let mut bucket = TokenBucket::new(3, 1.0, now);
206 assert!(bucket.try_consume(now, 1));
207 assert!(bucket.try_consume(now, 1));
208 assert!(bucket.try_consume(now, 1));
209 }
210
211 #[test]
212 fn consume_over_budget_fails() {
213 let now = Instant::now();
214 let mut bucket = TokenBucket::new(1, 1.0, now);
215 assert!(bucket.try_consume(now, 1), "first consume succeeds");
216 assert!(!bucket.try_consume(now, 1), "empty bucket rejects");
217 }
218
219 #[test]
220 fn bucket_refills_over_time() {
221 let start = Instant::now();
222 let mut bucket = TokenBucket::new(1, 1.0, start);
223 assert!(bucket.try_consume(start, 1), "first consume");
224 assert!(!bucket.try_consume(start, 1), "second fails at t0");
225 let t1 = start + Duration::from_secs(1);
226 assert!(bucket.try_consume(t1, 1), "refilled to 1 after 1s");
227 }
228
229 #[test]
230 fn bucket_refill_clamps_to_max_tokens() {
231 let start = Instant::now();
232 let mut bucket = TokenBucket::new(2, 10.0, start);
233 assert!(bucket.try_consume(start, 2));
234 let t1 = start + Duration::from_secs(10);
235 assert!(bucket.try_consume(t1, 2), "bucket refills to max");
236 assert!(!bucket.try_consume(t1, 1), "bucket empty again");
237 }
238
239 #[test]
240 fn consume_zero_tokens_always_succeeds() {
241 let now = Instant::now();
242 let mut bucket = TokenBucket::new(0, 0.0, now);
243 assert!(bucket.try_consume(now, 0), "zero cost always succeeds");
244 }
245
246 #[test]
247 fn regressing_clock_does_not_underflow() {
248 let later = Instant::now();
249 let mut bucket = TokenBucket::new(1, 1.0, later);
250 let earlier = later.checked_sub(Duration::from_secs(1)).unwrap_or(later);
251 assert!(bucket.try_consume(earlier, 1));
252 }
253}