dstar_gateway_server/tokio_shell/
fanout.rs1use std::net::SocketAddr;
16use std::time::Instant;
17
18use tokio::net::UdpSocket;
19
20use dstar_gateway_core::session::client::Protocol;
21use dstar_gateway_core::types::{Module, ProtocolKind};
22
23use crate::client_pool::{ClientPool, UnhealthyOutcome};
24use crate::tokio_shell::endpoint::ShellError;
25
26#[derive(Debug, Clone, Default)]
33pub struct FanOutReport {
34 pub evicted: Vec<SocketAddr>,
36}
37
38pub async fn fan_out_voice<P: Protocol>(
70 socket: &UdpSocket,
71 clients: &ClientPool<P>,
72 from: SocketAddr,
73 module: Module,
74 _protocol: ProtocolKind,
75 bytes: &[u8],
76) -> Result<FanOutReport, ShellError> {
77 fan_out_voice_at(socket, clients, from, module, bytes, Instant::now()).await
78}
79
80pub async fn fan_out_voice_at<P: Protocol>(
91 socket: &UdpSocket,
92 clients: &ClientPool<P>,
93 from: SocketAddr,
94 module: Module,
95 bytes: &[u8],
96 now: Instant,
97) -> Result<FanOutReport, ShellError> {
98 let mut report = FanOutReport::default();
99 let members = clients.members_of_module(module).await;
100 for peer in members.iter().copied().filter(|p| *p != from) {
101 if !clients.try_consume_tx_token(&peer, now).await {
107 tracing::debug!(
108 ?peer,
109 "fan-out rate-limited: TX budget exhausted, dropping frame for peer"
110 );
111 continue;
112 }
113 if let Err(e) = socket.send_to(bytes, peer).await {
114 tracing::warn!(?peer, ?e, "fan-out send_to failed");
115 match clients.mark_unhealthy(&peer).await {
116 UnhealthyOutcome::ShouldEvict { failure_count } => {
117 tracing::warn!(
118 ?peer,
119 failure_count,
120 "fan-out send failure threshold exceeded; evicting peer"
121 );
122 report.evicted.push(peer);
123 }
124 UnhealthyOutcome::StillHealthy { .. } => {}
125 }
126 }
127 }
128 Ok(report)
129}
130
131#[cfg(test)]
132mod tests {
133 use super::fan_out_voice;
134 use crate::client_pool::{
135 ClientHandle, ClientPool, DEFAULT_UNHEALTHY_THRESHOLD, UnhealthyOutcome,
136 };
137 use crate::reflector::AccessPolicy;
138 use dstar_gateway_core::ServerSessionCore;
139 use dstar_gateway_core::session::client::DExtra;
140 use dstar_gateway_core::types::{Module, ProtocolKind};
141 use std::net::SocketAddr;
142 use std::time::Instant;
143 use tokio::net::UdpSocket;
144
145 type TestResult = Result<(), Box<dyn std::error::Error>>;
146
147 async fn bound_socket() -> Result<(std::sync::Arc<UdpSocket>, SocketAddr), std::io::Error> {
148 let sock = UdpSocket::bind("127.0.0.1:0").await?;
149 let addr = sock.local_addr()?;
150 Ok((std::sync::Arc::new(sock), addr))
151 }
152
153 fn fresh_handle(peer: SocketAddr) -> ClientHandle<DExtra> {
154 let core = ServerSessionCore::new(ProtocolKind::DExtra, peer, Module::C);
155 ClientHandle::new(core, AccessPolicy::ReadWrite, Instant::now())
156 }
157
158 #[tokio::test]
159 async fn fan_out_with_one_client_sends_nothing() -> TestResult {
160 let pool = ClientPool::<DExtra>::new();
161 let (sock, addr) = bound_socket().await?;
162 pool.insert(addr, fresh_handle(addr)).await;
163 pool.set_module(&addr, Module::C).await;
164
165 let result = fan_out_voice(
169 sock.as_ref(),
170 &pool,
171 addr,
172 Module::C,
173 ProtocolKind::DExtra,
174 b"hello",
175 )
176 .await?;
177 assert!(result.evicted.is_empty());
178 Ok(())
179 }
180
181 #[tokio::test]
182 async fn fan_out_to_two_peers_delivers_bytes() -> TestResult {
183 let pool = ClientPool::<DExtra>::new();
188 let (sock_a, addr_a) = bound_socket().await?;
189 let sock_b = UdpSocket::bind("127.0.0.1:0").await?;
190 let addr_b = sock_b.local_addr()?;
191 let sock_c = UdpSocket::bind("127.0.0.1:0").await?;
192 let addr_c = sock_c.local_addr()?;
193
194 pool.insert(addr_a, fresh_handle(addr_a)).await;
195 pool.insert(addr_b, fresh_handle(addr_b)).await;
196 pool.insert(addr_c, fresh_handle(addr_c)).await;
197 pool.set_module(&addr_a, Module::C).await;
198 pool.set_module(&addr_b, Module::C).await;
199 pool.set_module(&addr_c, Module::C).await;
200
201 let report = fan_out_voice(
203 sock_a.as_ref(),
204 &pool,
205 addr_a,
206 Module::C,
207 ProtocolKind::DExtra,
208 b"voicebits",
209 )
210 .await?;
211 assert!(report.evicted.is_empty(), "no peers evicted on happy path");
212
213 let mut buf_b = [0u8; 64];
215 let (n_b, src_b) = tokio::time::timeout(
216 std::time::Duration::from_millis(500),
217 sock_b.recv_from(&mut buf_b),
218 )
219 .await??;
220 assert_eq!(src_b, addr_a);
221 assert_eq!(&buf_b[..n_b], b"voicebits");
222
223 let mut buf_c = [0u8; 64];
224 let (n_c, src_c) = tokio::time::timeout(
225 std::time::Duration::from_millis(500),
226 sock_c.recv_from(&mut buf_c),
227 )
228 .await??;
229 assert_eq!(src_c, addr_a);
230 assert_eq!(&buf_c[..n_c], b"voicebits");
231 Ok(())
232 }
233
234 #[tokio::test]
236 async fn fan_out_reports_evicted_peer_after_threshold() -> TestResult {
237 let pool = ClientPool::<DExtra>::new();
249 let (sock_a, addr_a) = bound_socket().await?;
250 let sock_b = UdpSocket::bind("127.0.0.1:0").await?;
251 let addr_b = sock_b.local_addr()?;
252
253 pool.insert(addr_a, fresh_handle(addr_a)).await;
254 pool.insert(addr_b, fresh_handle(addr_b)).await;
255 pool.set_module(&addr_a, Module::C).await;
256 pool.set_module(&addr_b, Module::C).await;
257
258 for _ in 0..DEFAULT_UNHEALTHY_THRESHOLD - 1 {
262 let _outcome = pool.mark_unhealthy(&addr_b).await;
263 }
264
265 let outcome = pool.mark_unhealthy(&addr_b).await;
269 assert!(matches!(outcome, UnhealthyOutcome::ShouldEvict { .. }));
270
271 let removed = pool.remove(&addr_b).await;
274 assert!(removed.is_some(), "evicted peer must be removable");
275 assert!(
276 !pool.contains(&addr_b).await,
277 "evicted peer is no longer in the pool"
278 );
279
280 let report = fan_out_voice(
283 sock_a.as_ref(),
284 &pool,
285 addr_a,
286 Module::C,
287 ProtocolKind::DExtra,
288 b"post-evict",
289 )
290 .await?;
291 assert!(report.evicted.is_empty());
292 Ok(())
293 }
294
295 #[tokio::test]
297 async fn fan_out_rate_limits_peer_when_tx_budget_exhausted() -> TestResult {
298 use dstar_gateway_core::ServerSessionCore;
299 let pool = ClientPool::<DExtra>::new();
305 let (sock_a, addr_a) = bound_socket().await?;
306 let sock_b = UdpSocket::bind("127.0.0.1:0").await?;
307 let addr_b = sock_b.local_addr()?;
308 let now = Instant::now();
309
310 pool.insert(addr_a, fresh_handle(addr_a)).await;
311 let b_core = ServerSessionCore::new(ProtocolKind::DExtra, addr_b, Module::C);
313 let b_handle = ClientHandle::<DExtra>::new_with_tx_budget(
314 b_core,
315 AccessPolicy::ReadWrite,
316 now,
317 1,
318 1.0,
319 );
320 pool.insert(addr_b, b_handle).await;
321 pool.set_module(&addr_a, Module::C).await;
322 pool.set_module(&addr_b, Module::C).await;
323
324 let report =
326 super::fan_out_voice_at(sock_a.as_ref(), &pool, addr_a, Module::C, b"frame1", now)
327 .await?;
328 assert!(report.evicted.is_empty());
329
330 let mut buf_b1 = [0u8; 64];
332 let (n1, _src) = tokio::time::timeout(
333 std::time::Duration::from_millis(200),
334 sock_b.recv_from(&mut buf_b1),
335 )
336 .await??;
337 assert_eq!(&buf_b1[..n1], b"frame1");
338
339 let report =
343 super::fan_out_voice_at(sock_a.as_ref(), &pool, addr_a, Module::C, b"frame2", now)
344 .await?;
345 assert!(report.evicted.is_empty(), "rate-limited != unhealthy");
346
347 let mut buf_b2 = [0u8; 64];
349 let r = tokio::time::timeout(
350 std::time::Duration::from_millis(100),
351 sock_b.recv_from(&mut buf_b2),
352 )
353 .await;
354 assert!(
355 r.is_err(),
356 "rate-limited peer must not receive the second frame"
357 );
358
359 let later = now + std::time::Duration::from_secs(1);
361 let _report3 =
362 super::fan_out_voice_at(sock_a.as_ref(), &pool, addr_a, Module::C, b"frame3", later)
363 .await?;
364 let mut buf_b3 = [0u8; 64];
365 let (n3, _src) = tokio::time::timeout(
366 std::time::Duration::from_millis(200),
367 sock_b.recv_from(&mut buf_b3),
368 )
369 .await??;
370 assert_eq!(&buf_b3[..n3], b"frame3", "refilled bucket delivers frame3");
371 Ok(())
372 }
373
374 #[tokio::test]
375 async fn fan_out_skips_other_modules() -> TestResult {
376 let pool = ClientPool::<DExtra>::new();
377 let (sock_a, addr_a) = bound_socket().await?;
378 let sock_b = UdpSocket::bind("127.0.0.1:0").await?;
379 let addr_b = sock_b.local_addr()?;
380
381 pool.insert(addr_a, fresh_handle(addr_a)).await;
382 pool.insert(addr_b, fresh_handle(addr_b)).await;
383 pool.set_module(&addr_a, Module::C).await;
384 pool.set_module(&addr_b, Module::D).await;
385
386 let report = fan_out_voice(
387 sock_a.as_ref(),
388 &pool,
389 addr_a,
390 Module::C,
391 ProtocolKind::DExtra,
392 b"c-only",
393 )
394 .await?;
395 assert!(report.evicted.is_empty());
396
397 let mut buf = [0u8; 64];
398 let result = tokio::time::timeout(
399 std::time::Duration::from_millis(100),
400 sock_b.recv_from(&mut buf),
401 )
402 .await;
403 assert!(
404 result.is_err(),
405 "peer on different module must not receive fan-out"
406 );
407 Ok(())
408 }
409}