dstar_gateway_server/client_pool/
pool.rs

1//! Concurrent map of currently linked clients per protocol.
2//!
3//! Keyed by `SocketAddr` (the only stable identifier for a UDP
4//! client). Wrapped in [`tokio::sync::Mutex`] so multiple tokio tasks
5//! can update concurrently — this is intentionally simple for Batch
6//! 2; we can swap to a sharded map if contention is observed.
7
8use std::collections::{HashMap, HashSet};
9use std::marker::PhantomData;
10use std::net::SocketAddr;
11use std::time::Instant;
12
13use tokio::sync::Mutex;
14
15use dstar_gateway_core::session::client::Protocol;
16use dstar_gateway_core::types::Module;
17
18use crate::reflector::AccessPolicy;
19
20use super::handle::ClientHandle;
21
22/// Default number of consecutive send failures before a client is evicted.
23///
24/// Hard-coded to `5` in this crate; a follow-up patch will make this
25/// configurable via [`crate::ReflectorConfig`].
26pub const DEFAULT_UNHEALTHY_THRESHOLD: u32 = 5;
27
28/// Outcome of a [`ClientPool::mark_unhealthy`] call.
29///
30/// Tells the caller whether the peer is still within its failure
31/// budget or has crossed the eviction threshold — in which case the
32/// fan-out engine should remove the peer and emit a
33/// [`ServerEvent::ClientEvicted`] event.
34///
35/// [`ServerEvent::ClientEvicted`]: dstar_gateway_core::session::server::ServerEvent::ClientEvicted
36#[non_exhaustive]
37#[derive(Debug, Clone, Copy, PartialEq, Eq)]
38pub enum UnhealthyOutcome {
39    /// Client is still within the allowed failure budget.
40    StillHealthy {
41        /// Running count of send failures after this increment.
42        failure_count: u32,
43    },
44    /// Client has exceeded the threshold and should be evicted.
45    ShouldEvict {
46        /// Running count of send failures after this increment.
47        failure_count: u32,
48    },
49}
50
51/// Concurrent pool of linked clients for one [`Protocol`].
52///
53/// Provides `async` access to the underlying map via an internal
54/// [`tokio::sync::Mutex`]. The reverse index (`by_module`) is kept in
55/// lockstep with the primary map so fan-out can enumerate members of
56/// a module in O(module members) without scanning every client.
57#[derive(Debug)]
58pub struct ClientPool<P: Protocol> {
59    clients: Mutex<HashMap<SocketAddr, ClientHandle<P>>>,
60    by_module: Mutex<HashMap<Module, HashSet<SocketAddr>>>,
61    _protocol: PhantomData<fn() -> P>,
62}
63
64impl<P: Protocol> Default for ClientPool<P> {
65    fn default() -> Self {
66        Self::new()
67    }
68}
69
70impl<P: Protocol> ClientPool<P> {
71    /// Create an empty pool.
72    #[must_use]
73    pub fn new() -> Self {
74        Self {
75            clients: Mutex::new(HashMap::new()),
76            by_module: Mutex::new(HashMap::new()),
77            _protocol: PhantomData,
78        }
79    }
80
81    /// Insert a client handle keyed by peer address.
82    ///
83    /// If the handle already has a module set, the reverse index is
84    /// updated to include this peer under that module.
85    ///
86    /// # Cancellation safety
87    ///
88    /// This method is **not** cancel-safe. It takes two internal
89    /// mutex locks in sequence: dropping the future between the first
90    /// and second `lock().await` leaves the forward map updated but
91    /// the reverse module index stale. The only correct recovery is
92    /// to call [`Self::remove`] for the same peer and retry.
93    pub async fn insert(&self, peer: SocketAddr, handle: ClientHandle<P>) {
94        let module = handle.module;
95        let mut clients = self.clients.lock().await;
96        // Ignore the prior-entry return value: if a stale handle was
97        // replaced, the reflector has no use for it.
98        drop(clients.insert(peer, handle));
99        drop(clients);
100        if let Some(module) = module {
101            let mut index = self.by_module.lock().await;
102            let _ = index.entry(module).or_default().insert(peer);
103        }
104    }
105
106    /// Remove a client by peer address, returning the handle if present.
107    ///
108    /// # Cancellation safety
109    ///
110    /// This method is **not** cancel-safe for the same reason as
111    /// [`Self::insert`] — it touches the forward and reverse maps in
112    /// sequence and cancellation between the two awaits leaves a
113    /// stale module-index entry.
114    pub async fn remove(&self, peer: &SocketAddr) -> Option<ClientHandle<P>> {
115        let mut clients = self.clients.lock().await;
116        let handle = clients.remove(peer)?;
117        drop(clients);
118        if let Some(module) = handle.module {
119            let mut index = self.by_module.lock().await;
120            if let Some(set) = index.get_mut(&module) {
121                let _ = set.remove(peer);
122                if set.is_empty() {
123                    drop(index.remove(&module));
124                }
125            }
126        }
127        Some(handle)
128    }
129
130    /// Attach a peer to a module, updating both the handle and the reverse index.
131    ///
132    /// No-op if the peer is not in the pool. If the peer was already
133    /// attached to a different module, the old entry is removed.
134    ///
135    /// # Cancellation safety
136    ///
137    /// This method is **not** cancel-safe. See [`Self::insert`] for
138    /// the dual-lock sequencing rationale.
139    pub async fn set_module(&self, peer: &SocketAddr, module: Module) {
140        let mut clients = self.clients.lock().await;
141        let Some(handle) = clients.get_mut(peer) else {
142            return;
143        };
144        let previous_module = handle.module;
145        handle.module = Some(module);
146        drop(clients);
147        let mut index = self.by_module.lock().await;
148        if let Some(prev) = previous_module
149            && prev != module
150            && let Some(set) = index.get_mut(&prev)
151        {
152            let _ = set.remove(peer);
153            if set.is_empty() {
154                drop(index.remove(&prev));
155            }
156        }
157        let _ = index.entry(module).or_default().insert(*peer);
158    }
159
160    /// Enumerate the peers currently linked to the given module.
161    ///
162    /// # Cancellation safety
163    ///
164    /// This method is cancel-safe. It acquires a single mutex lock
165    /// and clones the membership set; dropping the future either
166    /// before or after the clone leaves the pool state unchanged.
167    pub async fn members_of_module(&self, module: Module) -> Vec<SocketAddr> {
168        let index = self.by_module.lock().await;
169        index
170            .get(&module)
171            .map(|set| set.iter().copied().collect())
172            .unwrap_or_default()
173    }
174
175    /// Number of clients currently in the pool.
176    ///
177    /// # Cancellation safety
178    ///
179    /// This method is cancel-safe. It takes a single mutex lock and
180    /// reads a `usize`; no mutation occurs.
181    pub async fn len(&self) -> usize {
182        self.clients.lock().await.len()
183    }
184
185    /// Whether the pool is empty.
186    ///
187    /// # Cancellation safety
188    ///
189    /// This method is cancel-safe. See [`Self::len`].
190    pub async fn is_empty(&self) -> bool {
191        self.clients.lock().await.is_empty()
192    }
193
194    /// Whether the pool contains a handle for the given peer.
195    ///
196    /// # Cancellation safety
197    ///
198    /// This method is cancel-safe. See [`Self::len`].
199    pub async fn contains(&self, peer: &SocketAddr) -> bool {
200        self.clients.lock().await.contains_key(peer)
201    }
202
203    /// Increment the send-failure counter on a peer and report
204    /// whether the peer should now be evicted.
205    ///
206    /// Returns [`UnhealthyOutcome::StillHealthy`] with `failure_count`
207    /// `0` if the peer is not present — callers should treat that as
208    /// "no increment happened" rather than "counter reset".
209    ///
210    /// The eviction threshold is [`DEFAULT_UNHEALTHY_THRESHOLD`]. Once
211    /// `failure_count >= threshold` the return value switches to
212    /// [`UnhealthyOutcome::ShouldEvict`] and stays there on every
213    /// subsequent increment until the caller actually removes the
214    /// peer.
215    ///
216    /// # Cancellation safety
217    ///
218    /// This method is cancel-safe. Only a single mutex lock is taken
219    /// and the mutation is committed before `drop(clients)` releases
220    /// the lock; dropping the future after that point has no effect
221    /// on pool state.
222    pub async fn mark_unhealthy(&self, peer: &SocketAddr) -> UnhealthyOutcome {
223        let mut clients = self.clients.lock().await;
224        let count = match clients.get_mut(peer) {
225            Some(handle) => {
226                handle.send_failure_count = handle.send_failure_count.saturating_add(1);
227                handle.send_failure_count
228            }
229            None => 0,
230        };
231        drop(clients);
232        if count >= DEFAULT_UNHEALTHY_THRESHOLD {
233            UnhealthyOutcome::ShouldEvict {
234                failure_count: count,
235            }
236        } else {
237            UnhealthyOutcome::StillHealthy {
238                failure_count: count,
239            }
240        }
241    }
242
243    /// Record that we just received a datagram from the given peer.
244    ///
245    /// # Cancellation safety
246    ///
247    /// This method is cancel-safe. See [`Self::mark_unhealthy`].
248    pub async fn record_last_heard(&self, peer: &SocketAddr, now: Instant) {
249        let mut clients = self.clients.lock().await;
250        if let Some(handle) = clients.get_mut(peer) {
251            handle.last_heard = now;
252        }
253    }
254
255    /// Look up the module a peer is currently linked to, if any.
256    ///
257    /// # Cancellation safety
258    ///
259    /// This method is cancel-safe. See [`Self::len`].
260    pub async fn module_of(&self, peer: &SocketAddr) -> Option<Module> {
261        let clients = self.clients.lock().await;
262        clients.get(peer).and_then(|handle| handle.module)
263    }
264
265    /// Look up the [`AccessPolicy`] for a peer, if any.
266    ///
267    /// Returns [`None`] if the peer is not in the pool. Used by the
268    /// endpoint to gate voice-frame forwarding for [`AccessPolicy::ReadOnly`]
269    /// clients.
270    ///
271    /// # Cancellation safety
272    ///
273    /// This method is cancel-safe. See [`Self::len`].
274    pub async fn access_of(&self, peer: &SocketAddr) -> Option<AccessPolicy> {
275        let clients = self.clients.lock().await;
276        clients.get(peer).map(|handle| handle.access)
277    }
278
279    /// Attempt to consume one TX-budget token from the given peer.
280    ///
281    /// Returns `true` on success (the fan-out engine may send this
282    /// frame to the peer); returns `false` if the bucket is empty
283    /// (the frame must be dropped for this peer). Returns `false`
284    /// if the peer is not in the pool — no handle means no budget.
285    ///
286    /// The `now` argument is the caller's injected wall-clock
287    /// instant and is used to drive the token bucket's refill
288    /// bookkeeping. The method never calls [`Instant::now`] itself,
289    /// matching the sans-io rate-limiter pattern.
290    ///
291    /// # Cancellation safety
292    ///
293    /// This method is cancel-safe. Only a single mutex lock is
294    /// taken and the bucket mutation is committed before the lock
295    /// is released.
296    pub async fn try_consume_tx_token(&self, peer: &SocketAddr, now: Instant) -> bool {
297        let mut clients = self.clients.lock().await;
298        match clients.get_mut(peer) {
299            Some(handle) => handle.tx_budget.try_consume(now, 1),
300            None => false,
301        }
302    }
303
304    /// Run a closure with exclusive access to a peer's handle.
305    ///
306    /// Holds the internal `Mutex` for the duration of the closure;
307    /// keep the body short and avoid blocking operations inside it.
308    /// Returns `None` if no handle exists for the given peer (the
309    /// closure is not invoked in that case).
310    ///
311    /// # Cancellation safety
312    ///
313    /// This method is cancel-safe **before** the lock is acquired
314    /// and **after** the closure returns. While the closure is
315    /// executing it runs synchronously under the lock, so cancellation
316    /// during `f` is not possible. Do not call `.await` inside `f`
317    /// or this guarantee is lost.
318    pub async fn with_handle_mut<F, R>(&self, peer: &SocketAddr, f: F) -> Option<R>
319    where
320        F: FnOnce(&mut ClientHandle<P>) -> R,
321    {
322        let mut clients = self.clients.lock().await;
323        clients.get_mut(peer).map(f)
324    }
325}
326
327#[cfg(test)]
328mod tests {
329    use super::{ClientHandle, ClientPool, Instant, Module, SocketAddr};
330    use crate::reflector::AccessPolicy;
331    use dstar_gateway_core::ServerSessionCore;
332    use dstar_gateway_core::session::client::DExtra;
333    use dstar_gateway_core::types::ProtocolKind;
334    use std::net::{IpAddr, Ipv4Addr};
335
336    fn peer(port: u16) -> SocketAddr {
337        SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port)
338    }
339
340    fn fresh_handle(port: u16) -> ClientHandle<DExtra> {
341        let core = ServerSessionCore::new(ProtocolKind::DExtra, peer(port), Module::C);
342        ClientHandle::new(core, AccessPolicy::ReadWrite, Instant::now())
343    }
344
345    #[tokio::test]
346    async fn insert_and_contains() {
347        let pool = ClientPool::<DExtra>::new();
348        assert_eq!(pool.len().await, 0);
349        assert!(!pool.contains(&peer(30001)).await);
350
351        pool.insert(peer(30001), fresh_handle(30001)).await;
352        assert_eq!(pool.len().await, 1);
353        assert!(pool.contains(&peer(30001)).await);
354    }
355
356    #[tokio::test]
357    async fn remove_returns_handle() {
358        let pool = ClientPool::<DExtra>::new();
359        pool.insert(peer(30001), fresh_handle(30001)).await;
360        let removed = pool.remove(&peer(30001)).await;
361        assert!(removed.is_some());
362        assert_eq!(pool.len().await, 0);
363        // Second remove returns None.
364        let removed_again = pool.remove(&peer(30001)).await;
365        assert!(removed_again.is_none());
366    }
367
368    #[tokio::test]
369    async fn set_module_populates_reverse_index() {
370        let pool = ClientPool::<DExtra>::new();
371        pool.insert(peer(30001), fresh_handle(30001)).await;
372        pool.insert(peer(30002), fresh_handle(30002)).await;
373        pool.set_module(&peer(30001), Module::C).await;
374        pool.set_module(&peer(30002), Module::C).await;
375
376        let members = pool.members_of_module(Module::C).await;
377        assert_eq!(members.len(), 2);
378        assert!(members.contains(&peer(30001)));
379        assert!(members.contains(&peer(30002)));
380    }
381
382    #[tokio::test]
383    async fn set_module_moves_peer_between_modules() {
384        let pool = ClientPool::<DExtra>::new();
385        pool.insert(peer(30001), fresh_handle(30001)).await;
386        pool.set_module(&peer(30001), Module::C).await;
387        pool.set_module(&peer(30001), Module::D).await;
388
389        assert!(pool.members_of_module(Module::C).await.is_empty());
390        let d_members = pool.members_of_module(Module::D).await;
391        assert_eq!(d_members, vec![peer(30001)]);
392    }
393
394    #[tokio::test]
395    async fn members_of_empty_module_is_empty() {
396        let pool = ClientPool::<DExtra>::new();
397        assert!(pool.members_of_module(Module::Z).await.is_empty());
398    }
399
400    #[tokio::test]
401    async fn mark_unhealthy_increments_counter() {
402        let pool = ClientPool::<DExtra>::new();
403        pool.insert(peer(30001), fresh_handle(30001)).await;
404        assert_eq!(
405            pool.mark_unhealthy(&peer(30001)).await,
406            super::UnhealthyOutcome::StillHealthy { failure_count: 1 }
407        );
408        assert_eq!(
409            pool.mark_unhealthy(&peer(30001)).await,
410            super::UnhealthyOutcome::StillHealthy { failure_count: 2 }
411        );
412        assert_eq!(
413            pool.mark_unhealthy(&peer(30001)).await,
414            super::UnhealthyOutcome::StillHealthy { failure_count: 3 }
415        );
416    }
417
418    #[tokio::test]
419    async fn mark_unhealthy_missing_peer_is_zero() {
420        let pool = ClientPool::<DExtra>::new();
421        assert_eq!(
422            pool.mark_unhealthy(&peer(30001)).await,
423            super::UnhealthyOutcome::StillHealthy { failure_count: 0 }
424        );
425    }
426
427    #[tokio::test]
428    async fn mark_unhealthy_threshold_triggers_eviction() {
429        let pool = ClientPool::<DExtra>::new();
430        pool.insert(peer(30001), fresh_handle(30001)).await;
431        // 4 increments stay within budget.
432        for expected in 1_u32..=4 {
433            assert_eq!(
434                pool.mark_unhealthy(&peer(30001)).await,
435                super::UnhealthyOutcome::StillHealthy {
436                    failure_count: expected
437                }
438            );
439        }
440        // 5th increment fires eviction.
441        assert_eq!(
442            pool.mark_unhealthy(&peer(30001)).await,
443            super::UnhealthyOutcome::ShouldEvict { failure_count: 5 }
444        );
445    }
446
447    #[tokio::test]
448    async fn record_last_heard_updates_timestamp() {
449        let pool = ClientPool::<DExtra>::new();
450        pool.insert(peer(30001), fresh_handle(30001)).await;
451        let later = Instant::now() + std::time::Duration::from_secs(5);
452        pool.record_last_heard(&peer(30001), later).await;
453        // No crash + no state leak = pass. We can't read last_heard
454        // without exposing it, which is intentional.
455    }
456
457    #[tokio::test]
458    async fn remove_clears_reverse_index() {
459        let pool = ClientPool::<DExtra>::new();
460        pool.insert(peer(30001), fresh_handle(30001)).await;
461        pool.set_module(&peer(30001), Module::C).await;
462        drop(pool.remove(&peer(30001)).await);
463        assert!(pool.members_of_module(Module::C).await.is_empty());
464    }
465
466    #[tokio::test]
467    async fn module_of_returns_assigned_module() {
468        let pool = ClientPool::<DExtra>::new();
469        pool.insert(peer(30001), fresh_handle(30001)).await;
470        assert!(pool.module_of(&peer(30001)).await.is_none());
471        pool.set_module(&peer(30001), Module::C).await;
472        assert_eq!(pool.module_of(&peer(30001)).await, Some(Module::C));
473    }
474
475    #[tokio::test]
476    async fn module_of_missing_peer_is_none() {
477        let pool = ClientPool::<DExtra>::new();
478        assert!(pool.module_of(&peer(30001)).await.is_none());
479    }
480}