1use std::collections::{HashMap, VecDeque};
9use std::marker::PhantomData;
10use std::net::SocketAddr;
11use std::sync::Arc;
12use std::time::Instant;
13
14use tokio::net::UdpSocket;
15use tokio::sync::Mutex;
16use tokio::sync::{broadcast, watch};
17
18use dstar_gateway_core::ServerSessionCore;
19use dstar_gateway_core::codec::dcs::{
20 ClientPacket as DcsClientPacket, decode_client_to_server as decode_dcs_client_to_server,
21 encode_connect_nak as encode_dcs_connect_nak,
22};
23use dstar_gateway_core::codec::dextra::{
24 ClientPacket, decode_client_to_server, encode_connect_nak,
25};
26use dstar_gateway_core::codec::dplus::{
27 ClientPacket as DPlusClientPacket, Link2Result,
28 decode_client_to_server as decode_dplus_client_to_server, encode_link2_reply,
29};
30use dstar_gateway_core::error::Error as CoreError;
31use dstar_gateway_core::header::DStarHeader;
32use dstar_gateway_core::session::client::Protocol;
33use dstar_gateway_core::session::server::ServerEvent;
34use dstar_gateway_core::types::{Callsign, Module, ProtocolKind, StreamId};
35use dstar_gateway_core::validator::NullSink;
36
37use crate::client_pool::{ClientHandle, ClientPool, UnhealthyOutcome};
38use crate::reflector::{AccessPolicy, ClientAuthorizer, LinkAttempt, StreamCache};
39use crate::tokio_shell::fanout::fan_out_voice;
40use crate::tokio_shell::transcode::{
41 CrossProtocolEvent, TranscodeError, VoiceEvent, transcode_voice,
42};
43
44#[derive(Debug, Clone)]
50pub struct EndpointOutcome<P: Protocol> {
51 pub txs: Vec<(Vec<u8>, SocketAddr)>,
53 pub events: Vec<ServerEvent<P>>,
55 pub header_retransmit: Option<Vec<u8>>,
65}
66
67impl<P: Protocol> EndpointOutcome<P> {
68 #[must_use]
75 pub const fn empty() -> Self {
76 Self {
77 txs: Vec::new(),
78 events: Vec::new(),
79 header_retransmit: None,
80 }
81 }
82}
83
84const fn voice_event_from_server_event<P: Protocol>(ev: &ServerEvent<P>) -> Option<VoiceEvent> {
92 match ev {
93 ServerEvent::ClientStreamStarted {
94 stream_id, header, ..
95 } => Some(VoiceEvent::StreamStart {
96 header: *header,
97 stream_id: *stream_id,
98 }),
99 ServerEvent::ClientStreamFrame {
100 stream_id,
101 seq,
102 frame,
103 ..
104 } => Some(VoiceEvent::Frame {
105 stream_id: *stream_id,
106 seq: *seq,
107 frame: *frame,
108 }),
109 ServerEvent::ClientStreamEnded { stream_id, .. } => Some(VoiceEvent::StreamEnd {
110 stream_id: *stream_id,
111 seq: 0,
112 }),
113 _ => None,
114 }
115}
116
117#[non_exhaustive]
119#[derive(Debug, thiserror::Error)]
120pub enum ShellError {
121 #[error("core error: {0}")]
123 Core(#[from] CoreError),
124 #[error("protocol error: {0}")]
126 Protocol(String),
127 #[error("socket I/O error: {0}")]
129 Io(#[from] std::io::Error),
130}
131
132#[derive(Debug, Clone, Copy)]
137enum ForwardHint {
138 Header { module: Module, stream_id: StreamId },
139 Data { module: Module, stream_id: StreamId },
140 Eot { module: Module, stream_id: StreamId },
141}
142
143pub struct ProtocolEndpoint<P: Protocol> {
153 protocol: ProtocolKind,
154 clients: ClientPool<P>,
155 default_reflector_module: Module,
163 stream_cache: Mutex<HashMap<Module, StreamCache>>,
167 authorizer: Arc<dyn ClientAuthorizer>,
169 pending_events: Mutex<VecDeque<ServerEvent<P>>>,
175 voice_bus: Option<broadcast::Sender<CrossProtocolEvent>>,
181 _protocol: PhantomData<fn() -> P>,
182}
183
184impl<P: Protocol> std::fmt::Debug for ProtocolEndpoint<P> {
185 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
186 f.debug_struct("ProtocolEndpoint")
191 .field("protocol", &self.protocol)
192 .finish_non_exhaustive()
193 }
194}
195
196impl<P: Protocol> ProtocolEndpoint<P> {
197 #[must_use]
211 pub fn new(
212 protocol: ProtocolKind,
213 default_reflector_module: Module,
214 authorizer: Arc<dyn ClientAuthorizer>,
215 ) -> Self {
216 Self::new_with_voice_bus(protocol, default_reflector_module, authorizer, None)
217 }
218
219 #[must_use]
230 pub fn new_with_voice_bus(
231 protocol: ProtocolKind,
232 default_reflector_module: Module,
233 authorizer: Arc<dyn ClientAuthorizer>,
234 voice_bus: Option<broadcast::Sender<CrossProtocolEvent>>,
235 ) -> Self {
236 Self {
237 protocol,
238 clients: ClientPool::<P>::new(),
239 default_reflector_module,
240 stream_cache: Mutex::new(HashMap::new()),
241 authorizer,
242 pending_events: Mutex::new(VecDeque::new()),
243 voice_bus,
244 _protocol: PhantomData,
245 }
246 }
247
248 #[must_use]
250 pub const fn protocol_kind(&self) -> ProtocolKind {
251 self.protocol
252 }
253
254 #[must_use]
256 pub const fn clients(&self) -> &ClientPool<P> {
257 &self.clients
258 }
259
260 pub async fn handle_inbound(
288 &self,
289 bytes: &[u8],
290 peer: SocketAddr,
291 now: Instant,
292 ) -> Result<EndpointOutcome<P>, ShellError> {
293 match self.protocol {
294 ProtocolKind::DExtra => self.handle_inbound_dextra(bytes, peer, now).await,
295 ProtocolKind::DPlus => self.handle_inbound_dplus(bytes, peer, now).await,
296 ProtocolKind::Dcs => self.handle_inbound_dcs(bytes, peer, now).await,
297 _ => Err(ShellError::Protocol(format!(
298 "unsupported protocol discriminator: {:?}",
299 self.protocol
300 ))),
301 }
302 }
303
304 async fn handle_inbound_dextra(
311 &self,
312 bytes: &[u8],
313 peer: SocketAddr,
314 now: Instant,
315 ) -> Result<EndpointOutcome<P>, ShellError> {
316 let mut null_sink = NullSink;
320 let pre_decoded = decode_client_to_server(bytes, &mut null_sink).ok();
321
322 let link_access: Option<AccessPolicy> = if let Some(ClientPacket::Link {
325 callsign,
326 reflector_module,
327 ..
328 }) = pre_decoded.clone()
329 {
330 let attempt = LinkAttempt {
331 protocol: self.protocol,
332 callsign,
333 peer,
334 module: reflector_module,
335 };
336 match self.authorizer.authorize(&attempt) {
337 Ok(access_policy) => Some(access_policy),
338 Err(reject) => {
339 tracing::info!(
340 ?peer,
341 %callsign,
342 %reflector_module,
343 reason = ?reject,
344 "authorizer rejected DExtra LINK attempt"
345 );
346 return Ok(Self::build_dextra_reject_outcome(
347 peer,
348 callsign,
349 reflector_module,
350 reject,
351 ));
352 }
353 }
354 } else {
355 None
356 };
357
358 self.ensure_handle(peer, link_access, now).await;
359
360 if self
363 .read_only_drop_voice_dextra(pre_decoded.as_ref(), peer, now)
364 .await
365 {
366 let mut outcome = EndpointOutcome::<P>::empty();
367 if let Some(pkt) = pre_decoded.as_ref()
368 && let Some(stream_id) = Self::voice_stream_id_dextra(pkt)
369 {
370 outcome
371 .events
372 .push(ServerEvent::VoiceFromReadOnlyDropped { peer, stream_id });
373 }
374 return Ok(outcome);
375 }
376
377 let mut outcome = self.drive_core(&peer, bytes, now).await?;
378 self.clients.record_last_heard(&peer, now).await;
379 self.mirror_linked_module(&outcome, peer).await;
380
381 if let Some(pkt) = pre_decoded.as_ref() {
382 outcome.header_retransmit =
383 self.update_stream_cache_dextra(pkt, bytes, peer, now).await;
384 }
385
386 self.publish_voice_events(&outcome, peer).await;
387 self.drain_pending_events(&mut outcome).await;
388 Ok(outcome)
389 }
390
391 async fn handle_inbound_dplus(
401 &self,
402 bytes: &[u8],
403 peer: SocketAddr,
404 now: Instant,
405 ) -> Result<EndpointOutcome<P>, ShellError> {
406 let mut null_sink = NullSink;
407 let pre_decoded = decode_dplus_client_to_server(bytes, &mut null_sink).ok();
408
409 let link_access: Option<AccessPolicy> =
414 if let Some(DPlusClientPacket::Link2 { callsign }) = pre_decoded.clone() {
415 let attempt = LinkAttempt {
416 protocol: self.protocol,
417 callsign,
418 peer,
419 module: self.default_reflector_module,
420 };
421 match self.authorizer.authorize(&attempt) {
422 Ok(access_policy) => Some(access_policy),
423 Err(reject) => {
424 tracing::info!(
425 ?peer,
426 %callsign,
427 reason = ?reject,
428 "authorizer rejected DPlus LINK2 attempt"
429 );
430 return Ok(Self::build_dplus_reject_outcome(peer, reject));
431 }
432 }
433 } else {
434 None
435 };
436
437 self.ensure_handle(peer, link_access, now).await;
438
439 if self
440 .read_only_drop_voice_dplus(pre_decoded.as_ref(), peer, now)
441 .await
442 {
443 let mut outcome = EndpointOutcome::<P>::empty();
444 if let Some(pkt) = pre_decoded.as_ref()
445 && let Some(stream_id) = Self::voice_stream_id_dplus(pkt)
446 {
447 outcome
448 .events
449 .push(ServerEvent::VoiceFromReadOnlyDropped { peer, stream_id });
450 }
451 return Ok(outcome);
452 }
453
454 let mut outcome = self.drive_core(&peer, bytes, now).await?;
455 self.clients.record_last_heard(&peer, now).await;
456 self.mirror_linked_module(&outcome, peer).await;
457
458 if let Some(pkt) = pre_decoded.as_ref() {
459 outcome.header_retransmit = self.update_stream_cache_dplus(pkt, bytes, peer, now).await;
460 }
461
462 self.publish_voice_events(&outcome, peer).await;
463 self.drain_pending_events(&mut outcome).await;
464 Ok(outcome)
465 }
466
467 async fn handle_inbound_dcs(
476 &self,
477 bytes: &[u8],
478 peer: SocketAddr,
479 now: Instant,
480 ) -> Result<EndpointOutcome<P>, ShellError> {
481 let mut null_sink = NullSink;
482 let pre_decoded = decode_dcs_client_to_server(bytes, &mut null_sink).ok();
483
484 let link_access: Option<AccessPolicy> = if let Some(DcsClientPacket::Link {
485 callsign,
486 reflector_module,
487 ..
488 }) = pre_decoded.clone()
489 {
490 let attempt = LinkAttempt {
491 protocol: self.protocol,
492 callsign,
493 peer,
494 module: reflector_module,
495 };
496 match self.authorizer.authorize(&attempt) {
497 Ok(access_policy) => Some(access_policy),
498 Err(reject) => {
499 tracing::info!(
500 ?peer,
501 %callsign,
502 %reflector_module,
503 reason = ?reject,
504 "authorizer rejected DCS LINK attempt"
505 );
506 return Ok(Self::build_dcs_reject_outcome(
507 peer,
508 callsign,
509 reflector_module,
510 reject,
511 ));
512 }
513 }
514 } else {
515 None
516 };
517
518 self.ensure_handle(peer, link_access, now).await;
519
520 if self
521 .read_only_drop_voice_dcs(pre_decoded.as_ref(), peer, now)
522 .await
523 {
524 let mut outcome = EndpointOutcome::<P>::empty();
525 if let Some(pkt) = pre_decoded.as_ref()
526 && let Some(stream_id) = Self::voice_stream_id_dcs(pkt)
527 {
528 outcome
529 .events
530 .push(ServerEvent::VoiceFromReadOnlyDropped { peer, stream_id });
531 }
532 return Ok(outcome);
533 }
534
535 let mut outcome = self.drive_core(&peer, bytes, now).await?;
536 self.clients.record_last_heard(&peer, now).await;
537 self.mirror_linked_module(&outcome, peer).await;
538
539 if let Some(pkt) = pre_decoded.as_ref() {
540 outcome.header_retransmit = self.update_stream_cache_dcs(pkt, bytes, peer, now).await;
541 }
542
543 self.publish_voice_events(&outcome, peer).await;
544 self.drain_pending_events(&mut outcome).await;
545 Ok(outcome)
546 }
547
548 async fn ensure_handle(
556 &self,
557 peer: SocketAddr,
558 link_access: Option<AccessPolicy>,
559 now: Instant,
560 ) {
561 if self.clients.contains(&peer).await {
562 return;
563 }
564 let access = link_access.unwrap_or(AccessPolicy::ReadWrite);
565 let reflector_module = self.default_reflector_module;
566 let core = ServerSessionCore::new(self.protocol, peer, reflector_module);
567 let handle = ClientHandle::new(core, access, now);
568 self.clients.insert(peer, handle).await;
569 }
570
571 async fn mirror_linked_module(&self, outcome: &EndpointOutcome<P>, peer: SocketAddr) {
574 for ev in &outcome.events {
575 if let ServerEvent::ClientLinked { module, .. } = ev {
576 self.clients.set_module(&peer, *module).await;
577 }
578 }
579 }
580
581 async fn drain_pending_events(&self, outcome: &mut EndpointOutcome<P>) {
584 let mut pending = self.pending_events.lock().await;
585 while let Some(ev) = pending.pop_front() {
586 outcome.events.push(ev);
587 }
588 }
589
590 async fn publish_voice_events(&self, outcome: &EndpointOutcome<P>, peer: SocketAddr) {
599 let Some(bus) = &self.voice_bus else {
600 return;
601 };
602 let Some(module) = self.clients.module_of(&peer).await else {
603 return;
604 };
605 let cached_header = self.cached_header_for_module(module).await;
606 for ev in &outcome.events {
607 let Some(voice_event) = voice_event_from_server_event(ev) else {
608 continue;
609 };
610 drop(bus.send(CrossProtocolEvent {
614 source_protocol: self.protocol,
615 source_peer: peer,
616 module,
617 event: voice_event,
618 cached_header,
619 }));
620 }
621 }
622
623 async fn cached_header_for_module(&self, module: Module) -> Option<DStarHeader> {
629 let cache = self.stream_cache.lock().await;
630 cache.get(&module).map(|entry| *entry.header())
631 }
632
633 async fn evict_peer(&self, peer: SocketAddr, reason: &str) {
642 drop(self.clients.remove(&peer).await);
643 let mut pending = self.pending_events.lock().await;
644 pending.push_back(ServerEvent::ClientEvicted {
645 peer,
646 reason: reason.to_string(),
647 });
648 }
649
650 async fn update_stream_cache_dextra(
661 &self,
662 pkt: &ClientPacket,
663 bytes: &[u8],
664 peer: SocketAddr,
665 now: Instant,
666 ) -> Option<Vec<u8>> {
667 let module = self.clients.module_of(&peer).await?;
668 let mut cache_guard = self.stream_cache.lock().await;
669 match pkt {
670 ClientPacket::VoiceHeader { stream_id, header } => {
671 let entry =
672 StreamCache::new_with_bytes(*stream_id, *header, bytes.to_vec(), peer, now);
673 let _prev = cache_guard.insert(module, entry);
674 None
675 }
676 ClientPacket::VoiceData { .. } => {
677 let entry = cache_guard.get_mut(&module)?;
678 entry.record_frame(now);
679 if entry.should_rebroadcast_header() {
680 Some(entry.header_bytes().to_vec())
681 } else {
682 None
683 }
684 }
685 ClientPacket::VoiceEot { .. } => {
686 let _prev = cache_guard.remove(&module);
687 None
688 }
689 _ => None,
690 }
691 }
692
693 async fn update_stream_cache_dplus(
699 &self,
700 pkt: &DPlusClientPacket,
701 bytes: &[u8],
702 peer: SocketAddr,
703 now: Instant,
704 ) -> Option<Vec<u8>> {
705 let module = self.clients.module_of(&peer).await?;
706 let mut cache_guard = self.stream_cache.lock().await;
707 match pkt {
708 DPlusClientPacket::VoiceHeader { stream_id, header } => {
709 let entry =
710 StreamCache::new_with_bytes(*stream_id, *header, bytes.to_vec(), peer, now);
711 let _prev = cache_guard.insert(module, entry);
712 None
713 }
714 DPlusClientPacket::VoiceData { .. } => {
715 let entry = cache_guard.get_mut(&module)?;
716 entry.record_frame(now);
717 if entry.should_rebroadcast_header() {
718 Some(entry.header_bytes().to_vec())
719 } else {
720 None
721 }
722 }
723 DPlusClientPacket::VoiceEot { .. } => {
724 let _prev = cache_guard.remove(&module);
725 None
726 }
727 _ => None,
728 }
729 }
730
731 async fn update_stream_cache_dcs(
740 &self,
741 pkt: &DcsClientPacket,
742 bytes: &[u8],
743 peer: SocketAddr,
744 now: Instant,
745 ) -> Option<Vec<u8>> {
746 let module = self.clients.module_of(&peer).await?;
747 let mut cache_guard = self.stream_cache.lock().await;
748 let DcsClientPacket::Voice {
749 header,
750 stream_id,
751 is_end,
752 ..
753 } = pkt
754 else {
755 return None;
756 };
757
758 let existing_stream = cache_guard.get(&module).map(StreamCache::stream_id);
761 if existing_stream != Some(*stream_id) {
762 if *is_end {
763 let _prev = cache_guard.remove(&module);
766 return None;
767 }
768 let entry = StreamCache::new_with_bytes(*stream_id, *header, bytes.to_vec(), peer, now);
769 let _prev = cache_guard.insert(module, entry);
770 return None;
771 }
772
773 let retransmit_payload = cache_guard.get_mut(&module).and_then(|entry| {
777 entry.record_frame(now);
778 if entry.should_rebroadcast_header() {
779 Some(entry.header_bytes().to_vec())
780 } else {
781 None
782 }
783 });
784 if *is_end {
785 let _prev = cache_guard.remove(&module);
786 }
787 retransmit_payload
788 }
789
790 const fn voice_stream_id_dextra(pkt: &ClientPacket) -> Option<StreamId> {
793 match pkt {
794 ClientPacket::VoiceHeader { stream_id, .. }
795 | ClientPacket::VoiceData { stream_id, .. }
796 | ClientPacket::VoiceEot { stream_id, .. } => Some(*stream_id),
797 _ => None,
798 }
799 }
800
801 const fn voice_stream_id_dplus(pkt: &DPlusClientPacket) -> Option<StreamId> {
804 match pkt {
805 DPlusClientPacket::VoiceHeader { stream_id, .. }
806 | DPlusClientPacket::VoiceData { stream_id, .. }
807 | DPlusClientPacket::VoiceEot { stream_id, .. } => Some(*stream_id),
808 _ => None,
809 }
810 }
811
812 const fn voice_stream_id_dcs(pkt: &DcsClientPacket) -> Option<StreamId> {
815 match pkt {
816 DcsClientPacket::Voice { stream_id, .. } => Some(*stream_id),
817 _ => None,
818 }
819 }
820
821 async fn read_only_drop_voice_dextra(
828 &self,
829 pkt: Option<&ClientPacket>,
830 peer: SocketAddr,
831 now: Instant,
832 ) -> bool {
833 let Some(pkt) = pkt else {
834 return false;
835 };
836 if !matches!(
837 self.clients.access_of(&peer).await,
838 Some(AccessPolicy::ReadOnly)
839 ) {
840 return false;
841 }
842 if Self::voice_stream_id_dextra(pkt).is_none() {
843 return false;
844 }
845 self.clients.record_last_heard(&peer, now).await;
846 true
847 }
848
849 async fn read_only_drop_voice_dplus(
851 &self,
852 pkt: Option<&DPlusClientPacket>,
853 peer: SocketAddr,
854 now: Instant,
855 ) -> bool {
856 let Some(pkt) = pkt else {
857 return false;
858 };
859 if !matches!(
860 self.clients.access_of(&peer).await,
861 Some(AccessPolicy::ReadOnly)
862 ) {
863 return false;
864 }
865 if Self::voice_stream_id_dplus(pkt).is_none() {
866 return false;
867 }
868 self.clients.record_last_heard(&peer, now).await;
869 true
870 }
871
872 async fn read_only_drop_voice_dcs(
874 &self,
875 pkt: Option<&DcsClientPacket>,
876 peer: SocketAddr,
877 now: Instant,
878 ) -> bool {
879 let Some(pkt) = pkt else {
880 return false;
881 };
882 if !matches!(
883 self.clients.access_of(&peer).await,
884 Some(AccessPolicy::ReadOnly)
885 ) {
886 return false;
887 }
888 if Self::voice_stream_id_dcs(pkt).is_none() {
889 return false;
890 }
891 self.clients.record_last_heard(&peer, now).await;
892 true
893 }
894
895 fn build_dextra_reject_outcome(
901 peer: SocketAddr,
902 callsign: Callsign,
903 reflector_module: Module,
904 reject: crate::reflector::RejectReason,
905 ) -> EndpointOutcome<P> {
906 let mut outcome = EndpointOutcome::<P>::empty();
907 let mut buf = [0u8; 16];
908 if let Ok(n) = encode_connect_nak(&mut buf, &callsign, reflector_module)
909 && let Some(payload) = buf.get(..n)
910 {
911 outcome.txs.push((payload.to_vec(), peer));
912 }
913 outcome.events.push(ServerEvent::ClientRejected {
914 peer,
915 reason: reject.into_core_reason(),
916 });
917 outcome
918 }
919
920 fn build_dplus_reject_outcome(
926 peer: SocketAddr,
927 reject: crate::reflector::RejectReason,
928 ) -> EndpointOutcome<P> {
929 let mut outcome = EndpointOutcome::<P>::empty();
930 let mut buf = [0u8; 16];
931 if let Ok(n) = encode_link2_reply(&mut buf, Link2Result::Busy)
932 && let Some(payload) = buf.get(..n)
933 {
934 outcome.txs.push((payload.to_vec(), peer));
935 }
936 outcome.events.push(ServerEvent::ClientRejected {
937 peer,
938 reason: reject.into_core_reason(),
939 });
940 outcome
941 }
942
943 fn build_dcs_reject_outcome(
949 peer: SocketAddr,
950 callsign: Callsign,
951 reflector_module: Module,
952 reject: crate::reflector::RejectReason,
953 ) -> EndpointOutcome<P> {
954 let mut outcome = EndpointOutcome::<P>::empty();
955 let mut buf = [0u8; 32];
956 if let Ok(n) = encode_dcs_connect_nak(&mut buf, &callsign, reflector_module)
957 && let Some(payload) = buf.get(..n)
958 {
959 outcome.txs.push((payload.to_vec(), peer));
960 }
961 outcome.events.push(ServerEvent::ClientRejected {
962 peer,
963 reason: reject.into_core_reason(),
964 });
965 outcome
966 }
967
968 async fn drive_core(
975 &self,
976 peer: &SocketAddr,
977 bytes: &[u8],
978 now: Instant,
979 ) -> Result<EndpointOutcome<P>, ShellError> {
980 let mut outcome = EndpointOutcome::<P>::empty();
985
986 self.clients
987 .with_handle_mut(peer, |handle| -> Result<(), ShellError> {
988 handle.session.handle_input(now, bytes)?;
989 let drain_now = Instant::now();
995 while let Some(tx) = handle.session.pop_transmit(drain_now) {
996 outcome.txs.push((tx.payload.to_vec(), tx.dst));
997 }
998 while let Some(ev) = handle.session.pop_event::<P>() {
999 outcome.events.push(ev);
1000 }
1001 Ok(())
1002 })
1003 .await
1004 .unwrap_or(Ok(()))?;
1005
1006 Ok(outcome)
1007 }
1008
1009 fn forward_hint(events: &[ServerEvent<P>], peer_module: Option<Module>) -> Option<ForwardHint> {
1019 let module = peer_module?;
1020 for ev in events {
1021 let hint = match ev {
1022 ServerEvent::ClientStreamStarted { stream_id, .. } => Some(ForwardHint::Header {
1023 module,
1024 stream_id: *stream_id,
1025 }),
1026 ServerEvent::ClientStreamFrame { stream_id, .. } => Some(ForwardHint::Data {
1027 module,
1028 stream_id: *stream_id,
1029 }),
1030 ServerEvent::ClientStreamEnded { stream_id, .. } => Some(ForwardHint::Eot {
1031 module,
1032 stream_id: *stream_id,
1033 }),
1034 _ => None,
1038 };
1039 if hint.is_some() {
1040 return hint;
1041 }
1042 }
1043 None
1044 }
1045
1046 pub async fn run(
1075 self: Arc<Self>,
1076 socket: Arc<UdpSocket>,
1077 mut shutdown: watch::Receiver<bool>,
1078 ) -> Result<(), ShellError> {
1079 let mut buf = [0u8; 2048];
1080 let mut voice_rx = self.voice_bus.as_ref().map(broadcast::Sender::subscribe);
1081 loop {
1082 let voice_branch = async {
1088 match voice_rx.as_mut() {
1089 Some(rx) => Some(rx.recv().await),
1090 None => std::future::pending().await,
1091 }
1092 };
1093 tokio::select! {
1094 biased;
1095 change = shutdown.changed() => {
1096 if change.is_err() || *shutdown.borrow() {
1099 return Ok(());
1100 }
1101 }
1102 result = socket.recv_from(&mut buf) => {
1103 let (n, peer) = result?;
1104 let recv_slice = buf.get(..n).unwrap_or(&[]);
1105 let owned_bytes = recv_slice.to_vec();
1106 let now = Instant::now();
1107 self.run_one_tick(&socket, &owned_bytes, peer, now).await?;
1108 }
1109 Some(result) = voice_branch => {
1110 match result {
1111 Ok(event) => {
1112 self.handle_cross_protocol_event(&socket, event).await;
1113 }
1114 Err(broadcast::error::RecvError::Lagged(skipped)) => {
1115 tracing::warn!(
1116 skipped,
1117 "cross-protocol bus lagged; catching up"
1118 );
1119 }
1120 Err(broadcast::error::RecvError::Closed) => {
1121 voice_rx = None;
1126 }
1127 }
1128 }
1129 }
1130 }
1131 }
1132
1133 async fn handle_cross_protocol_event(
1143 self: &Arc<Self>,
1144 socket: &Arc<UdpSocket>,
1145 event: CrossProtocolEvent,
1146 ) {
1147 if event.source_protocol == self.protocol {
1148 return;
1149 }
1150 let mut scratch = [0u8; 2048];
1151 let len = match transcode_voice(
1152 self.protocol,
1153 &event.event,
1154 event.cached_header.as_ref(),
1155 &mut scratch,
1156 ) {
1157 Ok(n) => n,
1158 Err(TranscodeError::Encode(e)) => {
1159 tracing::warn!(
1160 target = ?self.protocol,
1161 source = ?event.source_protocol,
1162 err = ?e,
1163 "cross-protocol transcode encode failed"
1164 );
1165 return;
1166 }
1167 Err(TranscodeError::MissingCachedHeader) => {
1168 tracing::debug!(
1169 target = ?self.protocol,
1170 source = ?event.source_protocol,
1171 "cross-protocol transcode dropped: target requires cached header"
1172 );
1173 return;
1174 }
1175 };
1176 let Some(payload) = scratch.get(..len) else {
1177 return;
1178 };
1179 let members = self.clients.members_of_module(event.module).await;
1180 for peer in members {
1181 if peer == event.source_peer {
1182 continue;
1186 }
1187 if let Err(e) = socket.send_to(payload, peer).await {
1188 tracing::warn!(
1189 ?peer,
1190 ?e,
1191 "cross-protocol send failed; marking peer unhealthy"
1192 );
1193 if let UnhealthyOutcome::ShouldEvict { failure_count } =
1194 self.clients.mark_unhealthy(&peer).await
1195 {
1196 tracing::warn!(
1197 ?peer,
1198 failure_count,
1199 "cross-protocol send failure threshold exceeded; evicting peer"
1200 );
1201 self.evict_peer(peer, "too many cross-protocol send failures")
1202 .await;
1203 }
1204 }
1205 }
1206 }
1207
1208 async fn run_one_tick(
1214 self: &Arc<Self>,
1215 socket: &Arc<UdpSocket>,
1216 owned_bytes: &[u8],
1217 peer: SocketAddr,
1218 now: Instant,
1219 ) -> Result<(), ShellError> {
1220 let outcome = match self.handle_inbound(owned_bytes, peer, now).await {
1221 Ok(o) => o,
1222 Err(ShellError::Core(e)) => {
1223 tracing::warn!(?peer, ?e, "dropping malformed datagram");
1224 return Ok(());
1225 }
1226 Err(ShellError::Protocol(msg)) => {
1227 tracing::warn!(?peer, msg, "protocol not supported");
1228 return Ok(());
1229 }
1230 Err(e @ ShellError::Io(_)) => return Err(e),
1231 };
1232
1233 let mut evicted_peers: Vec<SocketAddr> = Vec::new();
1234 self.send_replies(socket, &outcome.txs, &mut evicted_peers)
1235 .await;
1236 self.fan_out_outcome(socket, &outcome, peer, owned_bytes, &mut evicted_peers)
1237 .await;
1238
1239 for evicted in evicted_peers {
1244 self.evict_peer(evicted, "too many send failures").await;
1245 }
1246 Ok(())
1247 }
1248
1249 async fn send_replies(
1253 self: &Arc<Self>,
1254 socket: &Arc<UdpSocket>,
1255 txs: &[(Vec<u8>, SocketAddr)],
1256 evicted_peers: &mut Vec<SocketAddr>,
1257 ) {
1258 for (payload, dst) in txs {
1259 if let Err(e) = socket.send_to(payload, *dst).await {
1260 tracing::warn!(?dst, ?e, "reply send_to failed");
1261 if let UnhealthyOutcome::ShouldEvict { failure_count } =
1262 self.clients.mark_unhealthy(dst).await
1263 {
1264 tracing::warn!(
1265 ?dst,
1266 failure_count,
1267 "reply send failure threshold exceeded; evicting peer"
1268 );
1269 evicted_peers.push(*dst);
1270 }
1271 }
1272 }
1273 }
1274
1275 async fn fan_out_outcome(
1278 self: &Arc<Self>,
1279 socket: &Arc<UdpSocket>,
1280 outcome: &EndpointOutcome<P>,
1281 peer: SocketAddr,
1282 owned_bytes: &[u8],
1283 evicted_peers: &mut Vec<SocketAddr>,
1284 ) {
1285 let peer_module = self.clients.module_of(&peer).await;
1286 let Some(hint) = Self::forward_hint(&outcome.events, peer_module) else {
1287 return;
1288 };
1289 let (module, _stream_id) = match hint {
1290 ForwardHint::Header { module, stream_id }
1291 | ForwardHint::Data { module, stream_id }
1292 | ForwardHint::Eot { module, stream_id } => (module, stream_id),
1293 };
1294 match fan_out_voice(
1295 socket.as_ref(),
1296 &self.clients,
1297 peer,
1298 module,
1299 self.protocol,
1300 owned_bytes,
1301 )
1302 .await
1303 {
1304 Ok(report) => evicted_peers.extend(report.evicted),
1305 Err(e) => tracing::warn!(?peer, ?e, "fan_out_voice failed"),
1306 }
1307 if let Some(cached) = outcome.header_retransmit.as_ref() {
1314 match fan_out_voice(
1315 socket.as_ref(),
1316 &self.clients,
1317 peer,
1318 module,
1319 self.protocol,
1320 cached,
1321 )
1322 .await
1323 {
1324 Ok(report) => evicted_peers.extend(report.evicted),
1325 Err(e) => tracing::warn!(?peer, ?e, "fan_out_voice header retransmit failed"),
1326 }
1327 }
1328 }
1329}
1330
1331#[cfg(test)]
1332mod tests {
1333 use super::{EndpointOutcome, ProtocolEndpoint};
1334 use crate::reflector::{
1335 AllowAllAuthorizer, ClientAuthorizer, DenyAllAuthorizer, ReadOnlyAuthorizer,
1336 };
1337 use dstar_gateway_core::codec::dcs::{
1338 GatewayType as DcsGatewayType, encode_connect_link as encode_dcs_link,
1339 encode_voice as encode_dcs_voice,
1340 };
1341 use dstar_gateway_core::codec::dextra::{
1342 encode_connect_link, encode_voice_data, encode_voice_eot, encode_voice_header,
1343 };
1344 use dstar_gateway_core::codec::dplus::{
1345 encode_link1 as encode_dplus_link1, encode_link2 as encode_dplus_link2,
1346 encode_voice_data as encode_dplus_voice_data, encode_voice_eot as encode_dplus_voice_eot,
1347 encode_voice_header as encode_dplus_voice_header,
1348 };
1349 use dstar_gateway_core::header::DStarHeader;
1350 use dstar_gateway_core::session::client::{DExtra, DPlus, Dcs};
1351 use dstar_gateway_core::session::server::{ServerEvent, ServerStateKind};
1352 use dstar_gateway_core::types::{Callsign, Module, ProtocolKind, StreamId, Suffix};
1353 use dstar_gateway_core::voice::VoiceFrame;
1354 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
1355 use std::sync::Arc;
1356 use std::time::Instant;
1357
1358 type TestResult = Result<(), Box<dyn std::error::Error>>;
1359
1360 const PEER: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 30001);
1361
1362 fn peer() -> SocketAddr {
1363 PEER
1364 }
1365
1366 fn allow_all() -> Arc<dyn ClientAuthorizer> {
1367 Arc::new(AllowAllAuthorizer)
1368 }
1369
1370 #[tokio::test]
1371 async fn new_endpoint_has_empty_pool() {
1372 let ep = ProtocolEndpoint::<DExtra>::new(ProtocolKind::DExtra, Module::C, allow_all());
1373 assert_eq!(ep.protocol_kind(), ProtocolKind::DExtra);
1374 assert_eq!(ep.clients().len().await, 0);
1375 }
1376
1377 #[tokio::test]
1378 async fn dextra_link_produces_ack_and_event() -> TestResult {
1379 let ep = ProtocolEndpoint::<DExtra>::new(ProtocolKind::DExtra, Module::C, allow_all());
1380 let mut buf = [0u8; 16];
1381 let n = encode_connect_link(
1382 &mut buf,
1383 &Callsign::from_wire_bytes(*b"W1AW "),
1384 Module::C,
1385 Module::B,
1386 )?;
1387 let slice = buf.get(..n).ok_or("encode produced no bytes")?;
1388
1389 let outcome: EndpointOutcome<DExtra> =
1390 ep.handle_inbound(slice, peer(), Instant::now()).await?;
1391
1392 assert_eq!(outcome.txs.len(), 1);
1397 let (payload, dst) = outcome.txs.first().ok_or("no tx")?;
1398 assert_eq!(*dst, peer());
1399 assert_eq!(payload.len(), 14, "DExtra ACK is 14 bytes");
1400 assert!(
1401 payload.windows(3).any(|w| w == b"ACK"),
1402 "payload must contain ACK tag"
1403 );
1404
1405 assert_eq!(outcome.events.len(), 1);
1407 assert!(matches!(
1408 outcome.events.first(),
1409 Some(ServerEvent::ClientLinked { .. })
1410 ));
1411
1412 assert_eq!(ep.clients().len().await, 1);
1414 let members = ep.clients().members_of_module(Module::C).await;
1415 assert_eq!(members, vec![peer()]);
1416 Ok(())
1417 }
1418
1419 #[tokio::test]
1421 async fn dplus_link2_after_link1_creates_handle_and_acks_okrw() -> TestResult {
1422 let ep = ProtocolEndpoint::<DPlus>::new(ProtocolKind::DPlus, Module::C, allow_all());
1423
1424 let mut link1_buf = [0u8; 8];
1427 let n1 = encode_dplus_link1(&mut link1_buf)?;
1428 let link1_slice = link1_buf.get(..n1).ok_or("link1 empty")?;
1429 let outcome1: EndpointOutcome<DPlus> = ep
1430 .handle_inbound(link1_slice, peer(), Instant::now())
1431 .await?;
1432 assert_eq!(outcome1.txs.len(), 1);
1434 let (payload1, dst1) = outcome1.txs.first().ok_or("no tx")?;
1435 assert_eq!(*dst1, peer());
1436 assert_eq!(payload1.len(), 5, "DPlus LINK1 ACK is 5 bytes");
1437 assert!(
1440 outcome1.events.is_empty(),
1441 "LINK1 emits no events (no callsign yet)"
1442 );
1443 assert_eq!(ep.clients().len().await, 1);
1445
1446 let mut link2_buf = [0u8; 32];
1450 let n2 = encode_dplus_link2(&mut link2_buf, &Callsign::from_wire_bytes(*b"W1AW "))?;
1451 let link2_slice = link2_buf.get(..n2).ok_or("link2 empty")?;
1452 let outcome2: EndpointOutcome<DPlus> = ep
1453 .handle_inbound(link2_slice, peer(), Instant::now())
1454 .await?;
1455 assert_eq!(outcome2.txs.len(), 1);
1456 let (payload2, dst2) = outcome2.txs.first().ok_or("no tx")?;
1457 assert_eq!(*dst2, peer());
1458 assert_eq!(payload2.len(), 8, "DPlus LINK2 reply is 8 bytes");
1459 assert!(
1460 payload2.windows(4).any(|w| w == b"OKRW"),
1461 "LINK2 ACCEPT reply contains OKRW tag"
1462 );
1463 assert_eq!(outcome2.events.len(), 1);
1464 assert!(matches!(
1465 outcome2.events.first(),
1466 Some(ServerEvent::ClientLinked { .. })
1467 ));
1468 let members = ep.clients().members_of_module(Module::C).await;
1469 assert_eq!(members, vec![peer()]);
1470 Ok(())
1471 }
1472
1473 #[tokio::test]
1474 async fn dplus_voice_header_during_linked_creates_stream_cache() -> TestResult {
1475 let ep = ProtocolEndpoint::<DPlus>::new(ProtocolKind::DPlus, Module::C, allow_all());
1476 let mut link1_buf = [0u8; 8];
1478 let n1 = encode_dplus_link1(&mut link1_buf)?;
1479 drop(
1480 ep.handle_inbound(link1_buf.get(..n1).ok_or("empty")?, peer(), Instant::now())
1481 .await?,
1482 );
1483 let mut link2_buf = [0u8; 32];
1484 let n2 = encode_dplus_link2(&mut link2_buf, &Callsign::from_wire_bytes(*b"W1AW "))?;
1485 drop(
1486 ep.handle_inbound(link2_buf.get(..n2).ok_or("empty")?, peer(), Instant::now())
1487 .await?,
1488 );
1489
1490 let mut hdr_buf = [0u8; 64];
1492 let hdr_n = encode_dplus_voice_header(&mut hdr_buf, sid(), &test_header("W1AW"))?;
1493 let hdr_slice = hdr_buf.get(..hdr_n).ok_or("empty")?;
1494 let outcome = ep.handle_inbound(hdr_slice, peer(), Instant::now()).await?;
1495 assert!(
1497 outcome
1498 .events
1499 .iter()
1500 .any(|ev| matches!(ev, ServerEvent::ClientStreamStarted { .. })),
1501 "voice header emits ClientStreamStarted"
1502 );
1503 assert!(outcome.header_retransmit.is_none());
1505
1506 let frame = VoiceFrame::silence();
1508 let mut cache_fired = 0_u32;
1509 for seq in 0_u8..20 {
1510 let mut data_buf = [0u8; 64];
1511 let data_n = encode_dplus_voice_data(&mut data_buf, sid(), seq, &frame)?;
1512 let data_slice = data_buf.get(..data_n).ok_or("empty")?;
1513 let outcome = ep
1514 .handle_inbound(data_slice, peer(), Instant::now())
1515 .await?;
1516 if outcome.header_retransmit.is_some() {
1517 cache_fired = cache_fired.saturating_add(1);
1518 }
1519 }
1520 assert_eq!(
1521 cache_fired, 1,
1522 "DPlus stream cache retransmits after 20 frames"
1523 );
1524
1525 let mut eot_buf = [0u8; 64];
1527 let eot_n = encode_dplus_voice_eot(&mut eot_buf, sid(), 20)?;
1528 let eot_slice = eot_buf.get(..eot_n).ok_or("empty")?;
1529 drop(ep.handle_inbound(eot_slice, peer(), Instant::now()).await?);
1530 Ok(())
1531 }
1532
1533 fn dcs_reflector_cs() -> Callsign {
1535 Callsign::from_wire_bytes(*b"DCS030 ")
1536 }
1537
1538 #[tokio::test]
1539 async fn dcs_link_creates_handle_and_acks() -> TestResult {
1540 let ep = ProtocolEndpoint::<Dcs>::new(ProtocolKind::Dcs, Module::C, allow_all());
1541 let mut buf = vec![0u8; 600];
1543 let n = encode_dcs_link(
1544 &mut buf,
1545 &Callsign::from_wire_bytes(*b"W1AW "),
1546 Module::B,
1547 Module::C,
1548 &dcs_reflector_cs(),
1549 DcsGatewayType::Repeater,
1550 )?;
1551 let slice = buf.get(..n).ok_or("empty")?;
1552
1553 let outcome: EndpointOutcome<Dcs> =
1554 ep.handle_inbound(slice, peer(), Instant::now()).await?;
1555 assert_eq!(outcome.txs.len(), 1);
1557 let (payload, dst) = outcome.txs.first().ok_or("no tx")?;
1558 assert_eq!(*dst, peer());
1559 assert_eq!(payload.len(), 14, "DCS ACK is 14 bytes");
1560 assert!(
1561 payload.windows(3).any(|w| w == b"ACK"),
1562 "DCS ACK payload contains ACK tag"
1563 );
1564 assert!(
1566 outcome
1567 .events
1568 .iter()
1569 .any(|ev| matches!(ev, ServerEvent::ClientLinked { .. })),
1570 "DCS link emits ClientLinked"
1571 );
1572 assert_eq!(ep.clients().len().await, 1);
1574 let members = ep.clients().members_of_module(Module::C).await;
1575 assert_eq!(members, vec![peer()]);
1576 Ok(())
1577 }
1578
1579 #[tokio::test]
1580 async fn dcs_voice_first_packet_starts_stream_and_caches_header() -> TestResult {
1581 let ep = ProtocolEndpoint::<Dcs>::new(ProtocolKind::Dcs, Module::C, allow_all());
1582 let mut link_buf = vec![0u8; 600];
1584 let link_n = encode_dcs_link(
1585 &mut link_buf,
1586 &Callsign::from_wire_bytes(*b"W1AW "),
1587 Module::B,
1588 Module::C,
1589 &dcs_reflector_cs(),
1590 DcsGatewayType::Repeater,
1591 )?;
1592 drop(
1593 ep.handle_inbound(
1594 link_buf.get(..link_n).ok_or("empty")?,
1595 peer(),
1596 Instant::now(),
1597 )
1598 .await?,
1599 );
1600
1601 let frame = VoiceFrame::silence();
1605 let mut voice_buf = [0u8; 128];
1606 let voice_n = encode_dcs_voice(
1607 &mut voice_buf,
1608 &test_header("W1AW"),
1609 sid(),
1610 0,
1611 &frame,
1612 false,
1613 )?;
1614 let voice_slice = voice_buf.get(..voice_n).ok_or("empty")?;
1615 let outcome = ep
1616 .handle_inbound(voice_slice, peer(), Instant::now())
1617 .await?;
1618 assert!(
1621 outcome
1622 .events
1623 .iter()
1624 .any(|ev| matches!(ev, ServerEvent::ClientStreamStarted { .. })),
1625 "first DCS voice packet emits ClientStreamStarted"
1626 );
1627 assert!(
1628 outcome
1629 .events
1630 .iter()
1631 .any(|ev| matches!(ev, ServerEvent::ClientStreamFrame { .. })),
1632 "first DCS voice packet also emits ClientStreamFrame"
1633 );
1634 assert!(outcome.header_retransmit.is_none());
1636 Ok(())
1637 }
1638
1639 #[tokio::test]
1640 async fn dcs_voice_with_is_end_clears_stream_cache() -> TestResult {
1641 let ep = ProtocolEndpoint::<Dcs>::new(ProtocolKind::Dcs, Module::C, allow_all());
1642 let mut link_buf = vec![0u8; 600];
1644 let link_n = encode_dcs_link(
1645 &mut link_buf,
1646 &Callsign::from_wire_bytes(*b"W1AW "),
1647 Module::B,
1648 Module::C,
1649 &dcs_reflector_cs(),
1650 DcsGatewayType::Repeater,
1651 )?;
1652 drop(
1653 ep.handle_inbound(
1654 link_buf.get(..link_n).ok_or("empty")?,
1655 peer(),
1656 Instant::now(),
1657 )
1658 .await?,
1659 );
1660
1661 let frame = VoiceFrame::silence();
1663 let mut voice_buf = [0u8; 128];
1664 let voice_n = encode_dcs_voice(
1665 &mut voice_buf,
1666 &test_header("W1AW"),
1667 sid(),
1668 0,
1669 &frame,
1670 false,
1671 )?;
1672 drop(
1673 ep.handle_inbound(
1674 voice_buf.get(..voice_n).ok_or("empty")?,
1675 peer(),
1676 Instant::now(),
1677 )
1678 .await?,
1679 );
1680
1681 let mut eot_buf = [0u8; 128];
1683 let eot_n = encode_dcs_voice(
1684 &mut eot_buf,
1685 &test_header("W1AW"),
1686 sid(),
1687 1,
1688 &frame,
1689 true, )?;
1691 drop(
1692 ep.handle_inbound(eot_buf.get(..eot_n).ok_or("empty")?, peer(), Instant::now())
1693 .await?,
1694 );
1695
1696 let Some(new_sid) = StreamId::new(0x9999) else {
1701 unreachable!()
1702 };
1703 let mut fresh_buf = [0u8; 128];
1704 let fresh_n = encode_dcs_voice(
1705 &mut fresh_buf,
1706 &test_header("W1AW"),
1707 new_sid,
1708 0,
1709 &frame,
1710 false,
1711 )?;
1712 let fresh_outcome = ep
1713 .handle_inbound(
1714 fresh_buf.get(..fresh_n).ok_or("empty")?,
1715 peer(),
1716 Instant::now(),
1717 )
1718 .await?;
1719 assert!(
1720 fresh_outcome
1721 .events
1722 .iter()
1723 .any(|ev| matches!(ev, ServerEvent::ClientStreamStarted { .. })),
1724 "fresh stream after is_end must emit ClientStreamStarted"
1725 );
1726 Ok(())
1727 }
1728
1729 fn test_header(cs_my: &str) -> DStarHeader {
1730 let mut my_bytes = *b" ";
1731 let src = cs_my.as_bytes();
1732 let len = src.len().min(8);
1733 my_bytes[..len].copy_from_slice(&src[..len]);
1734 DStarHeader {
1735 flag1: 0,
1736 flag2: 0,
1737 flag3: 0,
1738 rpt2: Callsign::from_wire_bytes(*b"REF030 G"),
1739 rpt1: Callsign::from_wire_bytes(*b"REF030 C"),
1740 ur_call: Callsign::from_wire_bytes(*b"CQCQCQ "),
1741 my_call: Callsign::from_wire_bytes(my_bytes),
1742 my_suffix: Suffix::EMPTY,
1743 }
1744 }
1745
1746 const fn sid() -> StreamId {
1747 match StreamId::new(0x4242) {
1748 Some(s) => s,
1749 None => unreachable!(),
1750 }
1751 }
1752
1753 #[tokio::test]
1755 async fn dextra_link_rejected_by_deny_all_authorizer() -> TestResult {
1756 let ep = ProtocolEndpoint::<DExtra>::new(
1757 ProtocolKind::DExtra,
1758 Module::C,
1759 Arc::new(DenyAllAuthorizer),
1760 );
1761 let mut buf = [0u8; 16];
1762 let n = encode_connect_link(
1763 &mut buf,
1764 &Callsign::from_wire_bytes(*b"W1AW "),
1765 Module::C,
1766 Module::B,
1767 )?;
1768 let slice = buf.get(..n).ok_or("empty")?;
1769
1770 let outcome: EndpointOutcome<DExtra> =
1771 ep.handle_inbound(slice, peer(), Instant::now()).await?;
1772
1773 assert_eq!(
1775 ep.clients().len().await,
1776 0,
1777 "rejected peer must not be in pool"
1778 );
1779
1780 assert_eq!(outcome.txs.len(), 1);
1785 let (payload, dst) = outcome.txs.first().ok_or("no tx")?;
1786 assert_eq!(*dst, peer());
1787 assert_eq!(payload.len(), 14, "DExtra NAK is 14 bytes");
1788
1789 assert_eq!(outcome.events.len(), 1);
1791 assert!(matches!(
1792 outcome.events.first(),
1793 Some(ServerEvent::ClientRejected { .. })
1794 ));
1795 Ok(())
1796 }
1797
1798 #[tokio::test]
1800 async fn dextra_stream_cache_retransmits_header_every_21_frames() -> TestResult {
1801 let ep = ProtocolEndpoint::<DExtra>::new(ProtocolKind::DExtra, Module::C, allow_all());
1802
1803 let mut link_buf = [0u8; 16];
1805 let n = encode_connect_link(
1806 &mut link_buf,
1807 &Callsign::from_wire_bytes(*b"W1AW "),
1808 Module::C,
1809 Module::B,
1810 )?;
1811 let link_slice = link_buf.get(..n).ok_or("empty")?;
1812 drop(
1813 ep.handle_inbound(link_slice, peer(), Instant::now())
1814 .await?,
1815 );
1816
1817 let mut hdr_buf = [0u8; 64];
1819 let hdr_n = encode_voice_header(&mut hdr_buf, sid(), &test_header("W1AW"))?;
1820 let hdr_slice = hdr_buf.get(..hdr_n).ok_or("empty")?;
1821 let hdr_outcome = ep.handle_inbound(hdr_slice, peer(), Instant::now()).await?;
1822 assert!(
1825 hdr_outcome.header_retransmit.is_none(),
1826 "header tick must not trigger retransmit",
1827 );
1828
1829 let frame = VoiceFrame::silence();
1832 let mut cache_fired = 0_u32;
1833 for seq in 0_u8..20 {
1834 let mut data_buf = [0u8; 64];
1835 let data_n = encode_voice_data(&mut data_buf, sid(), seq, &frame)?;
1836 let data_slice = data_buf.get(..data_n).ok_or("empty")?;
1837 let outcome = ep
1838 .handle_inbound(data_slice, peer(), Instant::now())
1839 .await?;
1840 if outcome.header_retransmit.is_some() {
1841 cache_fired = cache_fired.saturating_add(1);
1842 }
1843 }
1844 assert_eq!(cache_fired, 1, "one header retransmit after 20 data frames");
1845
1846 let mut eot_buf = [0u8; 64];
1848 let eot_n = encode_voice_eot(&mut eot_buf, sid(), 20)?;
1849 let eot_slice = eot_buf.get(..eot_n).ok_or("empty")?;
1850 drop(ep.handle_inbound(eot_slice, peer(), Instant::now()).await?);
1851
1852 let mut stale_buf = [0u8; 64];
1856 let stale_n = encode_voice_data(&mut stale_buf, sid(), 99, &frame)?;
1857 let stale_slice = stale_buf.get(..stale_n).ok_or("empty")?;
1858 let stale_outcome = ep
1859 .handle_inbound(stale_slice, peer(), Instant::now())
1860 .await?;
1861 assert!(
1862 stale_outcome.header_retransmit.is_none(),
1863 "EOT must clear the cache",
1864 );
1865 Ok(())
1866 }
1867
1868 #[tokio::test]
1870 async fn dextra_endpoint_surfaces_evict_peer_event_next_tick() -> TestResult {
1871 let ep = ProtocolEndpoint::<DExtra>::new(ProtocolKind::DExtra, Module::C, allow_all());
1876 let mut link_buf = [0u8; 16];
1878 let n = encode_connect_link(
1879 &mut link_buf,
1880 &Callsign::from_wire_bytes(*b"W1AW "),
1881 Module::C,
1882 Module::B,
1883 )?;
1884 let link_slice = link_buf.get(..n).ok_or("empty")?;
1885 drop(
1886 ep.handle_inbound(link_slice, peer(), Instant::now())
1887 .await?,
1888 );
1889 assert_eq!(ep.clients().len().await, 1);
1890
1891 ep.evict_peer(peer(), "test eviction").await;
1893 assert_eq!(ep.clients().len().await, 0, "peer removed");
1894
1895 let outcome = ep
1900 .handle_inbound(link_slice, peer(), Instant::now())
1901 .await?;
1902
1903 let events = &outcome.events;
1906 assert!(
1907 events
1908 .iter()
1909 .any(|ev| matches!(ev, ServerEvent::ClientLinked { .. })),
1910 "fresh link still emits ClientLinked"
1911 );
1912 assert!(
1913 events
1914 .iter()
1915 .any(|ev| matches!(ev, ServerEvent::ClientEvicted { .. })),
1916 "queued ClientEvicted drains on next tick"
1917 );
1918 Ok(())
1919 }
1920
1921 #[tokio::test]
1923 async fn dextra_voice_events_publish_to_voice_bus() -> TestResult {
1924 use tokio::sync::broadcast;
1925 let (tx, mut rx) = broadcast::channel::<super::CrossProtocolEvent>(32);
1926 let ep = ProtocolEndpoint::<DExtra>::new_with_voice_bus(
1927 ProtocolKind::DExtra,
1928 Module::C,
1929 allow_all(),
1930 Some(tx),
1931 );
1932 let mut link_buf = [0u8; 16];
1934 let n = encode_connect_link(
1935 &mut link_buf,
1936 &Callsign::from_wire_bytes(*b"W1AW "),
1937 Module::C,
1938 Module::B,
1939 )?;
1940 let link_slice = link_buf.get(..n).ok_or("empty")?;
1941 drop(
1942 ep.handle_inbound(link_slice, peer(), Instant::now())
1943 .await?,
1944 );
1945 assert!(
1947 rx.try_recv().is_err(),
1948 "LINK emits no cross-protocol events"
1949 );
1950
1951 let mut hdr_buf = [0u8; 64];
1953 let hdr_n = encode_voice_header(&mut hdr_buf, sid(), &test_header("W1AW"))?;
1954 let hdr_slice = hdr_buf.get(..hdr_n).ok_or("empty")?;
1955 drop(ep.handle_inbound(hdr_slice, peer(), Instant::now()).await?);
1956 let event = rx.try_recv()?;
1957 assert_eq!(event.source_protocol, ProtocolKind::DExtra);
1958 assert_eq!(event.source_peer, peer());
1959 assert_eq!(event.module, Module::C);
1960 assert!(matches!(event.event, super::VoiceEvent::StreamStart { .. }));
1961
1962 let frame = VoiceFrame::silence();
1964 let mut data_buf = [0u8; 64];
1965 let data_n = encode_voice_data(&mut data_buf, sid(), 1, &frame)?;
1966 let data_slice = data_buf.get(..data_n).ok_or("empty")?;
1967 drop(
1968 ep.handle_inbound(data_slice, peer(), Instant::now())
1969 .await?,
1970 );
1971 let event = rx.try_recv()?;
1972 assert!(matches!(event.event, super::VoiceEvent::Frame { .. }));
1973 assert!(
1974 event.cached_header.is_some(),
1975 "voice data frame carries cached header"
1976 );
1977
1978 let mut eot_buf = [0u8; 64];
1980 let eot_n = encode_voice_eot(&mut eot_buf, sid(), 1)?;
1981 let eot_slice = eot_buf.get(..eot_n).ok_or("empty")?;
1982 drop(ep.handle_inbound(eot_slice, peer(), Instant::now()).await?);
1983 let event = rx.try_recv()?;
1984 assert!(matches!(event.event, super::VoiceEvent::StreamEnd { .. }));
1985 Ok(())
1986 }
1987
1988 #[tokio::test]
1989 async fn no_voice_bus_means_no_publish() -> TestResult {
1990 let ep = ProtocolEndpoint::<DExtra>::new(ProtocolKind::DExtra, Module::C, allow_all());
1994 let mut link_buf = [0u8; 16];
1995 let n = encode_connect_link(
1996 &mut link_buf,
1997 &Callsign::from_wire_bytes(*b"W1AW "),
1998 Module::C,
1999 Module::B,
2000 )?;
2001 drop(
2002 ep.handle_inbound(link_buf.get(..n).ok_or("empty")?, peer(), Instant::now())
2003 .await?,
2004 );
2005 let mut hdr_buf = [0u8; 64];
2006 let hdr_n = encode_voice_header(&mut hdr_buf, sid(), &test_header("W1AW"))?;
2007 drop(
2010 ep.handle_inbound(hdr_buf.get(..hdr_n).ok_or("empty")?, peer(), Instant::now())
2011 .await?,
2012 );
2013 Ok(())
2014 }
2015
2016 #[tokio::test]
2018 async fn dextra_readonly_voice_header_is_dropped() -> TestResult {
2019 let ep = ProtocolEndpoint::<DExtra>::new(
2020 ProtocolKind::DExtra,
2021 Module::C,
2022 Arc::new(ReadOnlyAuthorizer),
2023 );
2024 let mut link_buf = [0u8; 16];
2026 let n = encode_connect_link(
2027 &mut link_buf,
2028 &Callsign::from_wire_bytes(*b"W1AW "),
2029 Module::C,
2030 Module::B,
2031 )?;
2032 let link_slice = link_buf.get(..n).ok_or("empty")?;
2033 let link_outcome = ep
2034 .handle_inbound(link_slice, peer(), Instant::now())
2035 .await?;
2036 assert_eq!(ep.clients().len().await, 1, "peer admitted as read-only");
2037 assert_eq!(link_outcome.txs.len(), 1);
2039 assert_eq!(link_outcome.events.len(), 1);
2040
2041 let mut hdr_buf = [0u8; 64];
2043 let hdr_n = encode_voice_header(&mut hdr_buf, sid(), &test_header("W1AW"))?;
2044 let hdr_slice = hdr_buf.get(..hdr_n).ok_or("empty")?;
2045
2046 let voice_outcome: EndpointOutcome<DExtra> =
2047 ep.handle_inbound(hdr_slice, peer(), Instant::now()).await?;
2048
2049 assert!(
2053 voice_outcome.txs.is_empty(),
2054 "read-only voice must not emit any outbound datagrams"
2055 );
2056
2057 assert_eq!(voice_outcome.events.len(), 1);
2059 assert!(matches!(
2060 voice_outcome.events.first(),
2061 Some(ServerEvent::VoiceFromReadOnlyDropped { .. })
2062 ));
2063
2064 let state = ep
2067 .clients()
2068 .with_handle_mut(&peer(), |h| h.session.state_kind())
2069 .await
2070 .ok_or("handle not found")?;
2071 assert_eq!(
2072 state,
2073 ServerStateKind::Linked,
2074 "read-only voice must not push session into Streaming"
2075 );
2076 Ok(())
2077 }
2078}