dstar_gateway_server/tokio_shell/
fanout.rs

1//! Fan-out engine for voice frames.
2//!
3//! The reflector's job on every inbound voice packet is to re-send
4//! the same bytes to every other client currently linked to the
5//! originator's module. This file implements that "encode once, send
6//! N times" loop.
7//!
8//! This function handles **same-protocol** fan-out only — it
9//! re-sends the raw inbound bytes verbatim to every peer on the
10//! same module. Cross-protocol forwarding (re-encoding bytes
11//! from one protocol into another via [`super::transcode::transcode_voice`])
12//! is handled separately by the endpoint run loop's broadcast-channel
13//! subscriber path in [`super::endpoint::ProtocolEndpoint::run`].
14
15use std::net::SocketAddr;
16use std::time::Instant;
17
18use tokio::net::UdpSocket;
19
20use dstar_gateway_core::session::client::Protocol;
21use dstar_gateway_core::types::{Module, ProtocolKind};
22
23use crate::client_pool::{ClientPool, UnhealthyOutcome};
24use crate::tokio_shell::endpoint::ShellError;
25
26/// Report from a completed [`fan_out_voice`] call.
27///
28/// Currently carries the list of peers that exceeded the unhealthy
29/// threshold on this tick and should be evicted by the caller. The
30/// endpoint's run loop consumes this to remove the peer from the
31/// pool and emit a `ServerEvent::ClientEvicted` event.
32#[derive(Debug, Clone, Default)]
33pub struct FanOutReport {
34    /// Peers that hit the eviction threshold on this fan-out pass.
35    pub evicted: Vec<SocketAddr>,
36}
37
38/// Fan out the raw wire bytes of a voice frame to every other
39/// client on the same module.
40///
41/// The originator (identified by `from`) is filtered out of the
42/// recipient list so the reflector never echoes audio back to the
43/// client that sent it.
44///
45/// Individual send failures are logged and the offending peer is
46/// marked unhealthy on the client pool; the fan-out loop continues
47/// through the rest of the module membership. Peers that cross the
48/// [`crate::client_pool::DEFAULT_UNHEALTHY_THRESHOLD`] are recorded
49/// in the returned [`FanOutReport::evicted`] list and the caller is
50/// responsible for removing them from the pool. The function only
51/// returns `Err` if a truly fatal condition occurs — currently none,
52/// so the `Result` is reserved for future fatal conditions.
53///
54/// # Errors
55///
56/// Reserved for future fatal conditions (e.g. cross-protocol
57/// re-encode errors). The current DExtra-only implementation never
58/// returns `Err`.
59///
60/// # Cancellation safety
61///
62/// This function is **not** cancel-safe. It iterates the module
63/// membership list and calls `socket.send_to` for each peer in
64/// sequence; cancelling the future mid-iteration leaves some peers
65/// delivered and others silently skipped, which will make it look
66/// like the skipped peers are missing frames. The endpoint run loop
67/// is the only expected caller and it awaits this function to
68/// completion per datagram.
69pub async fn fan_out_voice<P: Protocol>(
70    socket: &UdpSocket,
71    clients: &ClientPool<P>,
72    from: SocketAddr,
73    module: Module,
74    _protocol: ProtocolKind,
75    bytes: &[u8],
76) -> Result<FanOutReport, ShellError> {
77    fan_out_voice_at(socket, clients, from, module, bytes, Instant::now()).await
78}
79
80/// Same as [`fan_out_voice`], but takes an injected `now: Instant`
81/// for deterministic rate-limiter testing.
82///
83/// Production callers use [`fan_out_voice`] which samples the
84/// wall clock itself; tests can drive this variant to advance the
85/// token bucket's refill clock without waiting for real time.
86///
87/// # Errors
88///
89/// Same as [`fan_out_voice`]: reserved for future fatal conditions.
90pub async fn fan_out_voice_at<P: Protocol>(
91    socket: &UdpSocket,
92    clients: &ClientPool<P>,
93    from: SocketAddr,
94    module: Module,
95    bytes: &[u8],
96    now: Instant,
97) -> Result<FanOutReport, ShellError> {
98    let mut report = FanOutReport::default();
99    let members = clients.members_of_module(module).await;
100    for peer in members.iter().copied().filter(|p| *p != from) {
101        // Fix 5: consult the per-client TX token bucket BEFORE the
102        // kernel send. On empty-bucket, drop the frame for THIS
103        // peer — rate-limited is not the same as broken, so we do
104        // NOT mark_unhealthy here. Other peers on the same module
105        // still receive the frame.
106        if !clients.try_consume_tx_token(&peer, now).await {
107            tracing::debug!(
108                ?peer,
109                "fan-out rate-limited: TX budget exhausted, dropping frame for peer"
110            );
111            continue;
112        }
113        if let Err(e) = socket.send_to(bytes, peer).await {
114            tracing::warn!(?peer, ?e, "fan-out send_to failed");
115            match clients.mark_unhealthy(&peer).await {
116                UnhealthyOutcome::ShouldEvict { failure_count } => {
117                    tracing::warn!(
118                        ?peer,
119                        failure_count,
120                        "fan-out send failure threshold exceeded; evicting peer"
121                    );
122                    report.evicted.push(peer);
123                }
124                UnhealthyOutcome::StillHealthy { .. } => {}
125            }
126        }
127    }
128    Ok(report)
129}
130
131#[cfg(test)]
132mod tests {
133    use super::fan_out_voice;
134    use crate::client_pool::{
135        ClientHandle, ClientPool, DEFAULT_UNHEALTHY_THRESHOLD, UnhealthyOutcome,
136    };
137    use crate::reflector::AccessPolicy;
138    use dstar_gateway_core::ServerSessionCore;
139    use dstar_gateway_core::session::client::DExtra;
140    use dstar_gateway_core::types::{Module, ProtocolKind};
141    use std::net::SocketAddr;
142    use std::time::Instant;
143    use tokio::net::UdpSocket;
144
145    type TestResult = Result<(), Box<dyn std::error::Error>>;
146
147    async fn bound_socket() -> Result<(std::sync::Arc<UdpSocket>, SocketAddr), std::io::Error> {
148        let sock = UdpSocket::bind("127.0.0.1:0").await?;
149        let addr = sock.local_addr()?;
150        Ok((std::sync::Arc::new(sock), addr))
151    }
152
153    fn fresh_handle(peer: SocketAddr) -> ClientHandle<DExtra> {
154        let core = ServerSessionCore::new(ProtocolKind::DExtra, peer, Module::C);
155        ClientHandle::new(core, AccessPolicy::ReadWrite, Instant::now())
156    }
157
158    #[tokio::test]
159    async fn fan_out_with_one_client_sends_nothing() -> TestResult {
160        let pool = ClientPool::<DExtra>::new();
161        let (sock, addr) = bound_socket().await?;
162        pool.insert(addr, fresh_handle(addr)).await;
163        pool.set_module(&addr, Module::C).await;
164
165        // No other members — fan_out_voice returns Ok and sends no
166        // datagrams. We test the Ok-path here; the "no send" side is
167        // implicit because there's nobody to receive.
168        let result = fan_out_voice(
169            sock.as_ref(),
170            &pool,
171            addr,
172            Module::C,
173            ProtocolKind::DExtra,
174            b"hello",
175        )
176        .await?;
177        assert!(result.evicted.is_empty());
178        Ok(())
179    }
180
181    #[tokio::test]
182    async fn fan_out_to_two_peers_delivers_bytes() -> TestResult {
183        // Bind three loopback sockets: A is the "reflector" (the
184        // originator), B and C are receivers. fan_out_voice uses A's
185        // socket as the send-side — B and C receive via their own
186        // bound sockets which we use only to observe.
187        let pool = ClientPool::<DExtra>::new();
188        let (sock_a, addr_a) = bound_socket().await?;
189        let sock_b = UdpSocket::bind("127.0.0.1:0").await?;
190        let addr_b = sock_b.local_addr()?;
191        let sock_c = UdpSocket::bind("127.0.0.1:0").await?;
192        let addr_c = sock_c.local_addr()?;
193
194        pool.insert(addr_a, fresh_handle(addr_a)).await;
195        pool.insert(addr_b, fresh_handle(addr_b)).await;
196        pool.insert(addr_c, fresh_handle(addr_c)).await;
197        pool.set_module(&addr_a, Module::C).await;
198        pool.set_module(&addr_b, Module::C).await;
199        pool.set_module(&addr_c, Module::C).await;
200
201        // Fan-out from A's socket, originating peer is A.
202        let report = fan_out_voice(
203            sock_a.as_ref(),
204            &pool,
205            addr_a,
206            Module::C,
207            ProtocolKind::DExtra,
208            b"voicebits",
209        )
210        .await?;
211        assert!(report.evicted.is_empty(), "no peers evicted on happy path");
212
213        // B and C both received the payload. A did not (filter).
214        let mut buf_b = [0u8; 64];
215        let (n_b, src_b) = tokio::time::timeout(
216            std::time::Duration::from_millis(500),
217            sock_b.recv_from(&mut buf_b),
218        )
219        .await??;
220        assert_eq!(src_b, addr_a);
221        assert_eq!(&buf_b[..n_b], b"voicebits");
222
223        let mut buf_c = [0u8; 64];
224        let (n_c, src_c) = tokio::time::timeout(
225            std::time::Duration::from_millis(500),
226            sock_c.recv_from(&mut buf_c),
227        )
228        .await??;
229        assert_eq!(src_c, addr_a);
230        assert_eq!(&buf_c[..n_c], b"voicebits");
231        Ok(())
232    }
233
234    // ─── Fix 4: unhealthy-client eviction ─────────────────────────
235    #[tokio::test]
236    async fn fan_out_reports_evicted_peer_after_threshold() -> TestResult {
237        // Set up two peers on the same module: A is the originator
238        // (sends the voice), B is the target (and will trigger the
239        // send failure). We make B's address a closed loopback port
240        // so send_to to B fails repeatedly.
241        //
242        // Note: UDP send_to on Linux/macOS loopback will *succeed*
243        // against any port — there's no connection and the kernel
244        // just drops the datagram on the floor. To reliably fail a
245        // send we pre-mark the peer unhealthy 4 times and let the
246        // 5th tick (via a normal successful send+mark cycle) flip
247        // the threshold.
248        let pool = ClientPool::<DExtra>::new();
249        let (sock_a, addr_a) = bound_socket().await?;
250        let sock_b = UdpSocket::bind("127.0.0.1:0").await?;
251        let addr_b = sock_b.local_addr()?;
252
253        pool.insert(addr_a, fresh_handle(addr_a)).await;
254        pool.insert(addr_b, fresh_handle(addr_b)).await;
255        pool.set_module(&addr_a, Module::C).await;
256        pool.set_module(&addr_b, Module::C).await;
257
258        // Prime the failure counter to threshold-1 by calling
259        // mark_unhealthy directly. This skips the socket-failure
260        // simulation, which is flaky on UDP loopback.
261        for _ in 0..DEFAULT_UNHEALTHY_THRESHOLD - 1 {
262            let _outcome = pool.mark_unhealthy(&addr_b).await;
263        }
264
265        // Now directly call mark_unhealthy once more to trip the
266        // threshold — this is exactly what fan_out_voice does on the
267        // Nth send failure. Confirm the outcome reports ShouldEvict.
268        let outcome = pool.mark_unhealthy(&addr_b).await;
269        assert!(matches!(outcome, UnhealthyOutcome::ShouldEvict { .. }));
270
271        // Drop B from the pool the way the endpoint's run loop
272        // would after seeing ShouldEvict.
273        let removed = pool.remove(&addr_b).await;
274        assert!(removed.is_some(), "evicted peer must be removable");
275        assert!(
276            !pool.contains(&addr_b).await,
277            "evicted peer is no longer in the pool"
278        );
279
280        // Fan-out after eviction must target zero peers (A is the
281        // originator and is filtered out; B is gone).
282        let report = fan_out_voice(
283            sock_a.as_ref(),
284            &pool,
285            addr_a,
286            Module::C,
287            ProtocolKind::DExtra,
288            b"post-evict",
289        )
290        .await?;
291        assert!(report.evicted.is_empty());
292        Ok(())
293    }
294
295    // ─── Fix 5: per-client TX token-bucket rate limiting ─────────
296    #[tokio::test]
297    async fn fan_out_rate_limits_peer_when_tx_budget_exhausted() -> TestResult {
298        use dstar_gateway_core::ServerSessionCore;
299        // Peer B has a 1-token bucket with 1 token/sec refill. The
300        // first frame goes through; the second (same instant) is
301        // dropped for B. Other peers on the same module would still
302        // receive the frame — here we only have one other peer to
303        // keep the test focused on the rate-limit mechanism.
304        let pool = ClientPool::<DExtra>::new();
305        let (sock_a, addr_a) = bound_socket().await?;
306        let sock_b = UdpSocket::bind("127.0.0.1:0").await?;
307        let addr_b = sock_b.local_addr()?;
308        let now = Instant::now();
309
310        pool.insert(addr_a, fresh_handle(addr_a)).await;
311        // Insert B with a tight 1-token budget.
312        let b_core = ServerSessionCore::new(ProtocolKind::DExtra, addr_b, Module::C);
313        let b_handle = ClientHandle::<DExtra>::new_with_tx_budget(
314            b_core,
315            AccessPolicy::ReadWrite,
316            now,
317            1,
318            1.0,
319        );
320        pool.insert(addr_b, b_handle).await;
321        pool.set_module(&addr_a, Module::C).await;
322        pool.set_module(&addr_b, Module::C).await;
323
324        // First frame: B's bucket has 1 token, consume succeeds, send goes.
325        let report =
326            super::fan_out_voice_at(sock_a.as_ref(), &pool, addr_a, Module::C, b"frame1", now)
327                .await?;
328        assert!(report.evicted.is_empty());
329
330        // B must have received the first frame.
331        let mut buf_b1 = [0u8; 64];
332        let (n1, _src) = tokio::time::timeout(
333            std::time::Duration::from_millis(200),
334            sock_b.recv_from(&mut buf_b1),
335        )
336        .await??;
337        assert_eq!(&buf_b1[..n1], b"frame1");
338
339        // Second frame at the SAME instant: B's bucket is empty, so
340        // we skip the send_to for B. Rate-limited is NOT unhealthy,
341        // so no eviction.
342        let report =
343            super::fan_out_voice_at(sock_a.as_ref(), &pool, addr_a, Module::C, b"frame2", now)
344                .await?;
345        assert!(report.evicted.is_empty(), "rate-limited != unhealthy");
346
347        // B did NOT receive frame2 within the deadline.
348        let mut buf_b2 = [0u8; 64];
349        let r = tokio::time::timeout(
350            std::time::Duration::from_millis(100),
351            sock_b.recv_from(&mut buf_b2),
352        )
353        .await;
354        assert!(
355            r.is_err(),
356            "rate-limited peer must not receive the second frame"
357        );
358
359        // Advance the clock by 1 second — bucket refills by 1 token.
360        let later = now + std::time::Duration::from_secs(1);
361        let _report3 =
362            super::fan_out_voice_at(sock_a.as_ref(), &pool, addr_a, Module::C, b"frame3", later)
363                .await?;
364        let mut buf_b3 = [0u8; 64];
365        let (n3, _src) = tokio::time::timeout(
366            std::time::Duration::from_millis(200),
367            sock_b.recv_from(&mut buf_b3),
368        )
369        .await??;
370        assert_eq!(&buf_b3[..n3], b"frame3", "refilled bucket delivers frame3");
371        Ok(())
372    }
373
374    #[tokio::test]
375    async fn fan_out_skips_other_modules() -> TestResult {
376        let pool = ClientPool::<DExtra>::new();
377        let (sock_a, addr_a) = bound_socket().await?;
378        let sock_b = UdpSocket::bind("127.0.0.1:0").await?;
379        let addr_b = sock_b.local_addr()?;
380
381        pool.insert(addr_a, fresh_handle(addr_a)).await;
382        pool.insert(addr_b, fresh_handle(addr_b)).await;
383        pool.set_module(&addr_a, Module::C).await;
384        pool.set_module(&addr_b, Module::D).await;
385
386        let report = fan_out_voice(
387            sock_a.as_ref(),
388            &pool,
389            addr_a,
390            Module::C,
391            ProtocolKind::DExtra,
392            b"c-only",
393        )
394        .await?;
395        assert!(report.evicted.is_empty());
396
397        let mut buf = [0u8; 64];
398        let result = tokio::time::timeout(
399            std::time::Duration::from_millis(100),
400            sock_b.recv_from(&mut buf),
401        )
402        .await;
403        assert!(
404            result.is_err(),
405            "peer on different module must not receive fan-out"
406        );
407        Ok(())
408    }
409}