dstar_gateway_server/tokio_shell/
endpoint.rs

1//! `ProtocolEndpoint<P>` — per-protocol reflector RX shell.
2//!
3//! Holds the client pool, active stream cache, and protocol
4//! discriminator for one of the three D-STAR reflector protocols.
5//! [`ProtocolEndpoint::handle_inbound`] is the sans-io entry point;
6//! [`ProtocolEndpoint::run`] is the UDP pump plus the voice fan-out path.
7
8use std::collections::{HashMap, VecDeque};
9use std::marker::PhantomData;
10use std::net::SocketAddr;
11use std::sync::Arc;
12use std::time::Instant;
13
14use tokio::net::UdpSocket;
15use tokio::sync::Mutex;
16use tokio::sync::{broadcast, watch};
17
18use dstar_gateway_core::ServerSessionCore;
19use dstar_gateway_core::codec::dcs::{
20    ClientPacket as DcsClientPacket, decode_client_to_server as decode_dcs_client_to_server,
21    encode_connect_nak as encode_dcs_connect_nak,
22};
23use dstar_gateway_core::codec::dextra::{
24    ClientPacket, decode_client_to_server, encode_connect_nak,
25};
26use dstar_gateway_core::codec::dplus::{
27    ClientPacket as DPlusClientPacket, Link2Result,
28    decode_client_to_server as decode_dplus_client_to_server, encode_link2_reply,
29};
30use dstar_gateway_core::error::Error as CoreError;
31use dstar_gateway_core::header::DStarHeader;
32use dstar_gateway_core::session::client::Protocol;
33use dstar_gateway_core::session::server::ServerEvent;
34use dstar_gateway_core::types::{Callsign, Module, ProtocolKind, StreamId};
35use dstar_gateway_core::validator::NullSink;
36
37use crate::client_pool::{ClientHandle, ClientPool, UnhealthyOutcome};
38use crate::reflector::{AccessPolicy, ClientAuthorizer, LinkAttempt, StreamCache};
39use crate::tokio_shell::fanout::fan_out_voice;
40use crate::tokio_shell::transcode::{
41    CrossProtocolEvent, TranscodeError, VoiceEvent, transcode_voice,
42};
43
44/// Outbound result from a single [`ProtocolEndpoint::handle_inbound`] call.
45///
46/// Carries the outbound datagrams the core wants to send plus the
47/// server events the core emitted. The run loop consumes this to drive
48/// the real `UdpSocket` and to update the fan-out engine's cache.
49#[derive(Debug, Clone)]
50pub struct EndpointOutcome<P: Protocol> {
51    /// Outbound datagrams — each `(bytes, destination)`.
52    pub txs: Vec<(Vec<u8>, SocketAddr)>,
53    /// Consumer-visible server events.
54    pub events: Vec<ServerEvent<P>>,
55    /// Cached voice-header bytes to rebroadcast to the rest of the
56    /// module on this tick.
57    ///
58    /// Populated by the stream cache every 21 voice frames to match
59    /// the `xlxd` / `MMDVMHost` cadence — the run loop fans these
60    /// bytes out to every non-originator peer on the module in
61    /// addition to the normal voice frame that triggered the cadence.
62    ///
63    /// Empty on the vast majority of ticks.
64    pub header_retransmit: Option<Vec<u8>>,
65}
66
67impl<P: Protocol> EndpointOutcome<P> {
68    /// Construct an empty outcome (no txs, no events, no retransmit).
69    ///
70    /// We cannot derive `Default` because it would require
71    /// `P: Default`, which the sealed `Protocol` trait intentionally
72    /// doesn't bound. Every protocol marker is a ZST so constructing
73    /// an empty outcome has no data-dependent initialization anyway.
74    #[must_use]
75    pub const fn empty() -> Self {
76        Self {
77            txs: Vec::new(),
78            events: Vec::new(),
79            header_retransmit: None,
80        }
81    }
82}
83
84/// Derive a cross-protocol [`VoiceEvent`] from a server event.
85///
86/// Returns `None` for non-voice events (linked/unlinked/rejected/…).
87/// The EOT branch reports seq `0` because the server event doesn't
88/// carry the final seq — downstream encoders OR the 0x40 end-bit in
89/// on their own, so the value doesn't matter for correctness of the
90/// encoding, only for bandwidth log parity with the originator.
91const fn voice_event_from_server_event<P: Protocol>(ev: &ServerEvent<P>) -> Option<VoiceEvent> {
92    match ev {
93        ServerEvent::ClientStreamStarted {
94            stream_id, header, ..
95        } => Some(VoiceEvent::StreamStart {
96            header: *header,
97            stream_id: *stream_id,
98        }),
99        ServerEvent::ClientStreamFrame {
100            stream_id,
101            seq,
102            frame,
103            ..
104        } => Some(VoiceEvent::Frame {
105            stream_id: *stream_id,
106            seq: *seq,
107            frame: *frame,
108        }),
109        ServerEvent::ClientStreamEnded { stream_id, .. } => Some(VoiceEvent::StreamEnd {
110            stream_id: *stream_id,
111            seq: 0,
112        }),
113        _ => None,
114    }
115}
116
117/// Errors returned by the shell layer.
118#[non_exhaustive]
119#[derive(Debug, thiserror::Error)]
120pub enum ShellError {
121    /// Decoding or state-machine error bubbled up from the core.
122    #[error("core error: {0}")]
123    Core(#[from] CoreError),
124    /// Protocol-layer error (framing problem, unexpected variant, etc.).
125    #[error("protocol error: {0}")]
126    Protocol(String),
127    /// UDP socket I/O error.
128    #[error("socket I/O error: {0}")]
129    Io(#[from] std::io::Error),
130}
131
132/// A hint describing an inbound datagram's role in the fan-out path.
133///
134/// Extracted from the [`EndpointOutcome::events`] list so the run loop
135/// can forward voice bytes without re-examining the wire format.
136#[derive(Debug, Clone, Copy)]
137enum ForwardHint {
138    Header { module: Module, stream_id: StreamId },
139    Data { module: Module, stream_id: StreamId },
140    Eot { module: Module, stream_id: StreamId },
141}
142
143/// Per-protocol reflector endpoint.
144///
145/// Owns the client pool, the per-module stream cache, and the
146/// authorizer used to admit LINK attempts for one reflector protocol.
147/// Supports all three D-STAR reflector protocols (`DExtra`, `DPlus`,
148/// `DCS`); the endpoint's default reflector module is used as the
149/// fallback for `DPlus` sessions (which don't carry a module on the
150/// wire) and as the seed for `DExtra`/`DCS` sessions before the LINK
151/// packet overwrites it.
152pub struct ProtocolEndpoint<P: Protocol> {
153    protocol: ProtocolKind,
154    clients: ClientPool<P>,
155    /// Default reflector module for this endpoint.
156    ///
157    /// Used as the initial `reflector_module` for every
158    /// [`ServerSessionCore`] created on this endpoint. `DExtra` and
159    /// `DCS` sessions overwrite their `client_module` from the LINK
160    /// packet; `DPlus` sessions keep this placeholder because the
161    /// `DPlus` LINK2 packet doesn't carry a module on the wire.
162    default_reflector_module: Module,
163    /// Per-module active stream cache — populated on voice header,
164    /// updated on voice data, cleared on voice EOT. Drives the
165    /// 21-frame header-retransmit cadence in [`Self::handle_inbound`].
166    stream_cache: Mutex<HashMap<Module, StreamCache>>,
167    /// Authorizer consulted on every LINK attempt.
168    authorizer: Arc<dyn ClientAuthorizer>,
169    /// Pending events produced by background work (fan-out eviction,
170    /// health checks) that didn't happen during a
171    /// [`Self::handle_inbound`] call. Drained into the next outcome
172    /// surfaced to the caller so consumers of the event stream see
173    /// eviction decisions.
174    pending_events: Mutex<VecDeque<ServerEvent<P>>>,
175    /// Cross-protocol voice bus — `Some` iff the reflector was
176    /// constructed with `cross_protocol_forwarding = true`. Published
177    /// to after each inbound voice event so other protocols'
178    /// endpoints can transcode and fan out the frame on their own
179    /// wire format.
180    voice_bus: Option<broadcast::Sender<CrossProtocolEvent>>,
181    _protocol: PhantomData<fn() -> P>,
182}
183
184impl<P: Protocol> std::fmt::Debug for ProtocolEndpoint<P> {
185    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
186        // `ClientPool<P>` and the stream cache map aren't printed —
187        // `P` doesn't bound `Debug`, and the pool contents are
188        // runtime-owned by tokio locks we can't cheaply peek at in a
189        // Debug impl.
190        f.debug_struct("ProtocolEndpoint")
191            .field("protocol", &self.protocol)
192            .finish_non_exhaustive()
193    }
194}
195
196impl<P: Protocol> ProtocolEndpoint<P> {
197    /// Construct a new endpoint for the given protocol with the
198    /// supplied authorizer.
199    ///
200    /// `default_reflector_module` is passed to every
201    /// [`ServerSessionCore`] created on this endpoint; `DExtra` and
202    /// `DCS` sessions overwrite their `client_module` from the LINK
203    /// packet but `DPlus` sessions keep the default because the
204    /// `DPlus` LINK2 wire packet doesn't carry a module.
205    ///
206    /// The authorizer is consulted on every inbound LINK attempt;
207    /// rejected attempts never materialize a [`ClientHandle`] and
208    /// instead produce a protocol-appropriate NAK plus a
209    /// [`ServerEvent::ClientRejected`] event.
210    #[must_use]
211    pub fn new(
212        protocol: ProtocolKind,
213        default_reflector_module: Module,
214        authorizer: Arc<dyn ClientAuthorizer>,
215    ) -> Self {
216        Self::new_with_voice_bus(protocol, default_reflector_module, authorizer, None)
217    }
218
219    /// Construct a new endpoint with an optional cross-protocol voice bus.
220    ///
221    /// Identical to [`Self::new`] except the caller supplies a
222    /// [`broadcast::Sender<CrossProtocolEvent>`] clone; when `Some`,
223    /// the endpoint publishes inbound voice events to the bus so
224    /// other protocols' endpoints can transcode and re-broadcast.
225    ///
226    /// Used by [`crate::reflector::Reflector`] when its config has
227    /// `cross_protocol_forwarding = true`. Pass `None` to disable
228    /// cross-protocol participation on this endpoint.
229    #[must_use]
230    pub fn new_with_voice_bus(
231        protocol: ProtocolKind,
232        default_reflector_module: Module,
233        authorizer: Arc<dyn ClientAuthorizer>,
234        voice_bus: Option<broadcast::Sender<CrossProtocolEvent>>,
235    ) -> Self {
236        Self {
237            protocol,
238            clients: ClientPool::<P>::new(),
239            default_reflector_module,
240            stream_cache: Mutex::new(HashMap::new()),
241            authorizer,
242            pending_events: Mutex::new(VecDeque::new()),
243            voice_bus,
244            _protocol: PhantomData,
245        }
246    }
247
248    /// Runtime protocol discriminator for this endpoint.
249    #[must_use]
250    pub const fn protocol_kind(&self) -> ProtocolKind {
251        self.protocol
252    }
253
254    /// Access the endpoint's client pool (primarily for tests).
255    #[must_use]
256    pub const fn clients(&self) -> &ClientPool<P> {
257        &self.clients
258    }
259
260    /// Feed one inbound datagram into the endpoint.
261    ///
262    /// Dispatches to the protocol-specific handler based on
263    /// [`Self::protocol_kind`]. Each handler pre-decodes the inbound
264    /// packet, consults the authorizer on LINK attempts, gates
265    /// voice-stream ingress on [`AccessPolicy`], drives the core via
266    /// the private `drive_core` helper, then updates the per-module
267    /// stream cache and drains pending background events into the
268    /// outcome.
269    ///
270    /// # Errors
271    ///
272    /// Returns [`ShellError::Core`] if the core rejects the input
273    /// (parse failure, wrong-state, etc.). Returns
274    /// [`ShellError::Protocol`] if the endpoint was constructed with
275    /// a [`ProtocolKind`] the shell does not recognize.
276    ///
277    /// # Cancellation safety
278    ///
279    /// This method is **not** cancel-safe. It takes multiple
280    /// [`ClientPool`] locks in sequence (`contains` → `insert` →
281    /// `set_module` → `record_last_heard`) and cancellation between
282    /// any two awaits can leave the pool in a half-updated state where
283    /// a session has been created but not yet attached to its module
284    /// in the reverse index. The reflector's run loop is the only
285    /// expected caller and it never cancels this future except via
286    /// shutdown.
287    pub async fn handle_inbound(
288        &self,
289        bytes: &[u8],
290        peer: SocketAddr,
291        now: Instant,
292    ) -> Result<EndpointOutcome<P>, ShellError> {
293        match self.protocol {
294            ProtocolKind::DExtra => self.handle_inbound_dextra(bytes, peer, now).await,
295            ProtocolKind::DPlus => self.handle_inbound_dplus(bytes, peer, now).await,
296            ProtocolKind::Dcs => self.handle_inbound_dcs(bytes, peer, now).await,
297            _ => Err(ShellError::Protocol(format!(
298                "unsupported protocol discriminator: {:?}",
299                self.protocol
300            ))),
301        }
302    }
303
304    /// `DExtra`-specific inbound pipeline.
305    ///
306    /// Pre-decodes the `DExtra` wire packet, consults the authorizer on
307    /// `Link`, gates voice-stream ingress on `AccessPolicy::ReadOnly`,
308    /// drives the core, mirrors `ClientLinked` module into the pool's
309    /// reverse index, and maintains the per-module stream cache.
310    async fn handle_inbound_dextra(
311        &self,
312        bytes: &[u8],
313        peer: SocketAddr,
314        now: Instant,
315    ) -> Result<EndpointOutcome<P>, ShellError> {
316        // Pre-decode the DExtra packet for dispatch only. The real
317        // state transitions happen in `drive_core` via
318        // `ServerSessionCore::handle_input`.
319        let mut null_sink = NullSink;
320        let pre_decoded = decode_client_to_server(bytes, &mut null_sink).ok();
321
322        // LINK → authorizer. Rejected attempts never materialize a
323        // ClientHandle; they produce a NAK + `ClientRejected` event.
324        let link_access: Option<AccessPolicy> = if let Some(ClientPacket::Link {
325            callsign,
326            reflector_module,
327            ..
328        }) = pre_decoded.clone()
329        {
330            let attempt = LinkAttempt {
331                protocol: self.protocol,
332                callsign,
333                peer,
334                module: reflector_module,
335            };
336            match self.authorizer.authorize(&attempt) {
337                Ok(access_policy) => Some(access_policy),
338                Err(reject) => {
339                    tracing::info!(
340                        ?peer,
341                        %callsign,
342                        %reflector_module,
343                        reason = ?reject,
344                        "authorizer rejected DExtra LINK attempt"
345                    );
346                    return Ok(Self::build_dextra_reject_outcome(
347                        peer,
348                        callsign,
349                        reflector_module,
350                        reject,
351                    ));
352                }
353            }
354        } else {
355            None
356        };
357
358        self.ensure_handle(peer, link_access, now).await;
359
360        // ReadOnly voice drop check — must happen BEFORE drive_core
361        // so the state machine never sees the voice bytes.
362        if self
363            .read_only_drop_voice_dextra(pre_decoded.as_ref(), peer, now)
364            .await
365        {
366            let mut outcome = EndpointOutcome::<P>::empty();
367            if let Some(pkt) = pre_decoded.as_ref()
368                && let Some(stream_id) = Self::voice_stream_id_dextra(pkt)
369            {
370                outcome
371                    .events
372                    .push(ServerEvent::VoiceFromReadOnlyDropped { peer, stream_id });
373            }
374            return Ok(outcome);
375        }
376
377        let mut outcome = self.drive_core(&peer, bytes, now).await?;
378        self.clients.record_last_heard(&peer, now).await;
379        self.mirror_linked_module(&outcome, peer).await;
380
381        if let Some(pkt) = pre_decoded.as_ref() {
382            outcome.header_retransmit =
383                self.update_stream_cache_dextra(pkt, bytes, peer, now).await;
384        }
385
386        self.publish_voice_events(&outcome, peer).await;
387        self.drain_pending_events(&mut outcome).await;
388        Ok(outcome)
389    }
390
391    /// `DPlus`-specific inbound pipeline.
392    ///
393    /// `DPlus` has a two-step handshake: `Link1` carries no callsign
394    /// (pass-through to the core, which transitions to a transitional
395    /// `Link1Received` state and enqueues the 5-byte ACK echo), then
396    /// `Link2` carries the client's callsign and fires the authorizer.
397    /// On a rejected `Link2` we emit an 8-byte `BUSY` reply and a
398    /// [`ServerEvent::ClientRejected`] event but do NOT create a
399    /// pool handle.
400    async fn handle_inbound_dplus(
401        &self,
402        bytes: &[u8],
403        peer: SocketAddr,
404        now: Instant,
405    ) -> Result<EndpointOutcome<P>, ShellError> {
406        let mut null_sink = NullSink;
407        let pre_decoded = decode_dplus_client_to_server(bytes, &mut null_sink).ok();
408
409        // LINK2 → authorizer. LINK1 passes through unconditionally
410        // because it carries no callsign; the core's
411        // `handle_dplus_input` walks the state machine from
412        // `Unknown → Link1Received` and enqueues the LINK1 ACK.
413        let link_access: Option<AccessPolicy> =
414            if let Some(DPlusClientPacket::Link2 { callsign }) = pre_decoded.clone() {
415                let attempt = LinkAttempt {
416                    protocol: self.protocol,
417                    callsign,
418                    peer,
419                    module: self.default_reflector_module,
420                };
421                match self.authorizer.authorize(&attempt) {
422                    Ok(access_policy) => Some(access_policy),
423                    Err(reject) => {
424                        tracing::info!(
425                            ?peer,
426                            %callsign,
427                            reason = ?reject,
428                            "authorizer rejected DPlus LINK2 attempt"
429                        );
430                        return Ok(Self::build_dplus_reject_outcome(peer, reject));
431                    }
432                }
433            } else {
434                None
435            };
436
437        self.ensure_handle(peer, link_access, now).await;
438
439        if self
440            .read_only_drop_voice_dplus(pre_decoded.as_ref(), peer, now)
441            .await
442        {
443            let mut outcome = EndpointOutcome::<P>::empty();
444            if let Some(pkt) = pre_decoded.as_ref()
445                && let Some(stream_id) = Self::voice_stream_id_dplus(pkt)
446            {
447                outcome
448                    .events
449                    .push(ServerEvent::VoiceFromReadOnlyDropped { peer, stream_id });
450            }
451            return Ok(outcome);
452        }
453
454        let mut outcome = self.drive_core(&peer, bytes, now).await?;
455        self.clients.record_last_heard(&peer, now).await;
456        self.mirror_linked_module(&outcome, peer).await;
457
458        if let Some(pkt) = pre_decoded.as_ref() {
459            outcome.header_retransmit = self.update_stream_cache_dplus(pkt, bytes, peer, now).await;
460        }
461
462        self.publish_voice_events(&outcome, peer).await;
463        self.drain_pending_events(&mut outcome).await;
464        Ok(outcome)
465    }
466
467    /// `DCS`-specific inbound pipeline.
468    ///
469    /// DCS carries the D-STAR header embedded in every voice packet,
470    /// so the stream-cache lifecycle is different from `DExtra`/`DPlus`:
471    /// the first voice packet for a new `stream_id` is treated as a
472    /// header (and cached), subsequent packets with the same
473    /// `stream_id` are data, and a packet with `is_end = true`
474    /// clears the cache.
475    async fn handle_inbound_dcs(
476        &self,
477        bytes: &[u8],
478        peer: SocketAddr,
479        now: Instant,
480    ) -> Result<EndpointOutcome<P>, ShellError> {
481        let mut null_sink = NullSink;
482        let pre_decoded = decode_dcs_client_to_server(bytes, &mut null_sink).ok();
483
484        let link_access: Option<AccessPolicy> = if let Some(DcsClientPacket::Link {
485            callsign,
486            reflector_module,
487            ..
488        }) = pre_decoded.clone()
489        {
490            let attempt = LinkAttempt {
491                protocol: self.protocol,
492                callsign,
493                peer,
494                module: reflector_module,
495            };
496            match self.authorizer.authorize(&attempt) {
497                Ok(access_policy) => Some(access_policy),
498                Err(reject) => {
499                    tracing::info!(
500                        ?peer,
501                        %callsign,
502                        %reflector_module,
503                        reason = ?reject,
504                        "authorizer rejected DCS LINK attempt"
505                    );
506                    return Ok(Self::build_dcs_reject_outcome(
507                        peer,
508                        callsign,
509                        reflector_module,
510                        reject,
511                    ));
512                }
513            }
514        } else {
515            None
516        };
517
518        self.ensure_handle(peer, link_access, now).await;
519
520        if self
521            .read_only_drop_voice_dcs(pre_decoded.as_ref(), peer, now)
522            .await
523        {
524            let mut outcome = EndpointOutcome::<P>::empty();
525            if let Some(pkt) = pre_decoded.as_ref()
526                && let Some(stream_id) = Self::voice_stream_id_dcs(pkt)
527            {
528                outcome
529                    .events
530                    .push(ServerEvent::VoiceFromReadOnlyDropped { peer, stream_id });
531            }
532            return Ok(outcome);
533        }
534
535        let mut outcome = self.drive_core(&peer, bytes, now).await?;
536        self.clients.record_last_heard(&peer, now).await;
537        self.mirror_linked_module(&outcome, peer).await;
538
539        if let Some(pkt) = pre_decoded.as_ref() {
540            outcome.header_retransmit = self.update_stream_cache_dcs(pkt, bytes, peer, now).await;
541        }
542
543        self.publish_voice_events(&outcome, peer).await;
544        self.drain_pending_events(&mut outcome).await;
545        Ok(outcome)
546    }
547
548    /// Ensure a [`ClientHandle`] exists for `peer` in the pool,
549    /// creating one if needed.
550    ///
551    /// `link_access` is the authorizer decision from a fresh LINK
552    /// pre-decode; if `None` (e.g. for non-LINK packets) the
553    /// fallback is [`AccessPolicy::ReadWrite`]. The LINK path above
554    /// overwrites the fallback when it fires.
555    async fn ensure_handle(
556        &self,
557        peer: SocketAddr,
558        link_access: Option<AccessPolicy>,
559        now: Instant,
560    ) {
561        if self.clients.contains(&peer).await {
562            return;
563        }
564        let access = link_access.unwrap_or(AccessPolicy::ReadWrite);
565        let reflector_module = self.default_reflector_module;
566        let core = ServerSessionCore::new(self.protocol, peer, reflector_module);
567        let handle = ClientHandle::new(core, access, now);
568        self.clients.insert(peer, handle).await;
569    }
570
571    /// Mirror any `ClientLinked` module transitions into the pool's
572    /// reverse index so fan-out can enumerate module members in O(1).
573    async fn mirror_linked_module(&self, outcome: &EndpointOutcome<P>, peer: SocketAddr) {
574        for ev in &outcome.events {
575            if let ServerEvent::ClientLinked { module, .. } = ev {
576                self.clients.set_module(&peer, *module).await;
577            }
578        }
579    }
580
581    /// Drain any pending background events (fan-out eviction, etc.)
582    /// into the outcome the caller will observe.
583    async fn drain_pending_events(&self, outcome: &mut EndpointOutcome<P>) {
584        let mut pending = self.pending_events.lock().await;
585        while let Some(ev) = pending.pop_front() {
586            outcome.events.push(ev);
587        }
588    }
589
590    /// Publish cross-protocol voice events onto the voice bus, if
591    /// configured.
592    ///
593    /// Scans `outcome.events` for voice-lifecycle events and forwards
594    /// each one as a [`CrossProtocolEvent`] so other protocols'
595    /// endpoints can transcode and fan out to their own module
596    /// members. No-op when the endpoint was constructed with a
597    /// `None` voice bus.
598    async fn publish_voice_events(&self, outcome: &EndpointOutcome<P>, peer: SocketAddr) {
599        let Some(bus) = &self.voice_bus else {
600            return;
601        };
602        let Some(module) = self.clients.module_of(&peer).await else {
603            return;
604        };
605        let cached_header = self.cached_header_for_module(module).await;
606        for ev in &outcome.events {
607            let Some(voice_event) = voice_event_from_server_event(ev) else {
608                continue;
609            };
610            // `broadcast::Sender::send` errors only when there are
611            // no live receivers; that's fine for publish — we don't
612            // want to fail the inbound path because nobody listens.
613            drop(bus.send(CrossProtocolEvent {
614                source_protocol: self.protocol,
615                source_peer: peer,
616                module,
617                event: voice_event,
618                cached_header,
619            }));
620        }
621    }
622
623    /// Look up the cached `DStarHeader` (if any) for the given module.
624    ///
625    /// Used by [`Self::publish_voice_events`] so `DCS` subscribers
626    /// on the other side of the bus receive the header context they
627    /// need to re-encode inbound voice data into 100-byte packets.
628    async fn cached_header_for_module(&self, module: Module) -> Option<DStarHeader> {
629        let cache = self.stream_cache.lock().await;
630        cache.get(&module).map(|entry| *entry.header())
631    }
632
633    /// Evict a peer from the pool and enqueue a
634    /// [`ServerEvent::ClientEvicted`] event onto the pending-event
635    /// queue.
636    ///
637    /// The queued event is drained on the next [`Self::handle_inbound`]
638    /// call and appears on that tick's outcome. Callers invoke this
639    /// from the run loop after `fan_out_voice` reports an eviction
640    /// decision.
641    async fn evict_peer(&self, peer: SocketAddr, reason: &str) {
642        drop(self.clients.remove(&peer).await);
643        let mut pending = self.pending_events.lock().await;
644        pending.push_back(ServerEvent::ClientEvicted {
645            peer,
646            reason: reason.to_string(),
647        });
648    }
649
650    /// Update the per-module `DExtra` stream cache for this packet.
651    ///
652    /// Lifecycle:
653    /// - `VoiceHeader`: insert or replace the module's cache entry,
654    ///   store the raw 56-byte header.
655    /// - `VoiceData`: bump the seq counter; if `should_rebroadcast_header`
656    ///   fires return a clone of the cached bytes.
657    /// - `VoiceEot`: remove the module's cache entry.
658    ///
659    /// Returns `None` on all non-retransmit ticks.
660    async fn update_stream_cache_dextra(
661        &self,
662        pkt: &ClientPacket,
663        bytes: &[u8],
664        peer: SocketAddr,
665        now: Instant,
666    ) -> Option<Vec<u8>> {
667        let module = self.clients.module_of(&peer).await?;
668        let mut cache_guard = self.stream_cache.lock().await;
669        match pkt {
670            ClientPacket::VoiceHeader { stream_id, header } => {
671                let entry =
672                    StreamCache::new_with_bytes(*stream_id, *header, bytes.to_vec(), peer, now);
673                let _prev = cache_guard.insert(module, entry);
674                None
675            }
676            ClientPacket::VoiceData { .. } => {
677                let entry = cache_guard.get_mut(&module)?;
678                entry.record_frame(now);
679                if entry.should_rebroadcast_header() {
680                    Some(entry.header_bytes().to_vec())
681                } else {
682                    None
683                }
684            }
685            ClientPacket::VoiceEot { .. } => {
686                let _prev = cache_guard.remove(&module);
687                None
688            }
689            _ => None,
690        }
691    }
692
693    /// Update the per-module `DPlus` stream cache for this packet.
694    ///
695    /// Same lifecycle as [`Self::update_stream_cache_dextra`], but
696    /// operates on the [`dstar_gateway_core::codec::dplus::ClientPacket`]
697    /// enum.
698    async fn update_stream_cache_dplus(
699        &self,
700        pkt: &DPlusClientPacket,
701        bytes: &[u8],
702        peer: SocketAddr,
703        now: Instant,
704    ) -> Option<Vec<u8>> {
705        let module = self.clients.module_of(&peer).await?;
706        let mut cache_guard = self.stream_cache.lock().await;
707        match pkt {
708            DPlusClientPacket::VoiceHeader { stream_id, header } => {
709                let entry =
710                    StreamCache::new_with_bytes(*stream_id, *header, bytes.to_vec(), peer, now);
711                let _prev = cache_guard.insert(module, entry);
712                None
713            }
714            DPlusClientPacket::VoiceData { .. } => {
715                let entry = cache_guard.get_mut(&module)?;
716                entry.record_frame(now);
717                if entry.should_rebroadcast_header() {
718                    Some(entry.header_bytes().to_vec())
719                } else {
720                    None
721                }
722            }
723            DPlusClientPacket::VoiceEot { .. } => {
724                let _prev = cache_guard.remove(&module);
725                None
726            }
727            _ => None,
728        }
729    }
730
731    /// Update the per-module `DCS` stream cache for this packet.
732    ///
733    /// `DCS` is a single-packet-per-frame protocol: every `Voice`
734    /// packet carries the header + AMBE + optional end marker. The
735    /// first sighting of a new `stream_id` acts as the implicit
736    /// stream-start and is cached. Subsequent packets with the same
737    /// `stream_id` are data (and trigger the 21-frame retransmit
738    /// cadence). A packet with `is_end = true` clears the cache.
739    async fn update_stream_cache_dcs(
740        &self,
741        pkt: &DcsClientPacket,
742        bytes: &[u8],
743        peer: SocketAddr,
744        now: Instant,
745    ) -> Option<Vec<u8>> {
746        let module = self.clients.module_of(&peer).await?;
747        let mut cache_guard = self.stream_cache.lock().await;
748        let DcsClientPacket::Voice {
749            header,
750            stream_id,
751            is_end,
752            ..
753        } = pkt
754        else {
755            return None;
756        };
757
758        // First sighting of this stream id on this module: cache the
759        // header + raw bytes and emit no retransmit.
760        let existing_stream = cache_guard.get(&module).map(StreamCache::stream_id);
761        if existing_stream != Some(*stream_id) {
762            if *is_end {
763                // A one-packet stream that starts and ends in the
764                // same datagram — clear any stale entry and bail.
765                let _prev = cache_guard.remove(&module);
766                return None;
767            }
768            let entry = StreamCache::new_with_bytes(*stream_id, *header, bytes.to_vec(), peer, now);
769            let _prev = cache_guard.insert(module, entry);
770            return None;
771        }
772
773        // Same stream id — data frame. Bump the seq counter; if the
774        // packet is the end-of-stream marker, clear the cache after
775        // checking the retransmit cadence one last time.
776        let retransmit_payload = cache_guard.get_mut(&module).and_then(|entry| {
777            entry.record_frame(now);
778            if entry.should_rebroadcast_header() {
779                Some(entry.header_bytes().to_vec())
780            } else {
781                None
782            }
783        });
784        if *is_end {
785            let _prev = cache_guard.remove(&module);
786        }
787        retransmit_payload
788    }
789
790    /// Return the voice stream id if this `DExtra` packet is a voice
791    /// header, voice data, or voice EOT frame; otherwise `None`.
792    const fn voice_stream_id_dextra(pkt: &ClientPacket) -> Option<StreamId> {
793        match pkt {
794            ClientPacket::VoiceHeader { stream_id, .. }
795            | ClientPacket::VoiceData { stream_id, .. }
796            | ClientPacket::VoiceEot { stream_id, .. } => Some(*stream_id),
797            _ => None,
798        }
799    }
800
801    /// Return the voice stream id if this `DPlus` packet is a voice
802    /// header, voice data, or voice EOT frame; otherwise `None`.
803    const fn voice_stream_id_dplus(pkt: &DPlusClientPacket) -> Option<StreamId> {
804        match pkt {
805            DPlusClientPacket::VoiceHeader { stream_id, .. }
806            | DPlusClientPacket::VoiceData { stream_id, .. }
807            | DPlusClientPacket::VoiceEot { stream_id, .. } => Some(*stream_id),
808            _ => None,
809        }
810    }
811
812    /// Return the voice stream id if this `DCS` packet is a voice
813    /// frame; otherwise `None`.
814    const fn voice_stream_id_dcs(pkt: &DcsClientPacket) -> Option<StreamId> {
815        match pkt {
816            DcsClientPacket::Voice { stream_id, .. } => Some(*stream_id),
817            _ => None,
818        }
819    }
820
821    /// Check whether a `DExtra` pre-decoded packet should be dropped
822    /// because the peer has `AccessPolicy::ReadOnly`, and record the
823    /// `last_heard` bookkeeping for the drop path.
824    ///
825    /// Returns `true` if the caller should short-circuit with a
826    /// `VoiceFromReadOnlyDropped` event.
827    async fn read_only_drop_voice_dextra(
828        &self,
829        pkt: Option<&ClientPacket>,
830        peer: SocketAddr,
831        now: Instant,
832    ) -> bool {
833        let Some(pkt) = pkt else {
834            return false;
835        };
836        if !matches!(
837            self.clients.access_of(&peer).await,
838            Some(AccessPolicy::ReadOnly)
839        ) {
840            return false;
841        }
842        if Self::voice_stream_id_dextra(pkt).is_none() {
843            return false;
844        }
845        self.clients.record_last_heard(&peer, now).await;
846        true
847    }
848
849    /// `DPlus` sibling of [`Self::read_only_drop_voice_dextra`].
850    async fn read_only_drop_voice_dplus(
851        &self,
852        pkt: Option<&DPlusClientPacket>,
853        peer: SocketAddr,
854        now: Instant,
855    ) -> bool {
856        let Some(pkt) = pkt else {
857            return false;
858        };
859        if !matches!(
860            self.clients.access_of(&peer).await,
861            Some(AccessPolicy::ReadOnly)
862        ) {
863            return false;
864        }
865        if Self::voice_stream_id_dplus(pkt).is_none() {
866            return false;
867        }
868        self.clients.record_last_heard(&peer, now).await;
869        true
870    }
871
872    /// `DCS` sibling of [`Self::read_only_drop_voice_dextra`].
873    async fn read_only_drop_voice_dcs(
874        &self,
875        pkt: Option<&DcsClientPacket>,
876        peer: SocketAddr,
877        now: Instant,
878    ) -> bool {
879        let Some(pkt) = pkt else {
880            return false;
881        };
882        if !matches!(
883            self.clients.access_of(&peer).await,
884            Some(AccessPolicy::ReadOnly)
885        ) {
886            return false;
887        }
888        if Self::voice_stream_id_dcs(pkt).is_none() {
889            return false;
890        }
891        self.clients.record_last_heard(&peer, now).await;
892        true
893    }
894
895    /// Build an outcome for a rejected `DExtra` link attempt.
896    ///
897    /// Emits a single 14-byte NAK datagram and a
898    /// [`ServerEvent::ClientRejected`] event. The client pool is not
899    /// touched — the peer never becomes a handle.
900    fn build_dextra_reject_outcome(
901        peer: SocketAddr,
902        callsign: Callsign,
903        reflector_module: Module,
904        reject: crate::reflector::RejectReason,
905    ) -> EndpointOutcome<P> {
906        let mut outcome = EndpointOutcome::<P>::empty();
907        let mut buf = [0u8; 16];
908        if let Ok(n) = encode_connect_nak(&mut buf, &callsign, reflector_module)
909            && let Some(payload) = buf.get(..n)
910        {
911            outcome.txs.push((payload.to_vec(), peer));
912        }
913        outcome.events.push(ServerEvent::ClientRejected {
914            peer,
915            reason: reject.into_core_reason(),
916        });
917        outcome
918    }
919
920    /// Build an outcome for a rejected `DPlus` LINK2 attempt.
921    ///
922    /// Emits an 8-byte `BUSY` reply (`Link2Result::Busy`) and a
923    /// [`ServerEvent::ClientRejected`] event. The client pool is not
924    /// touched — the peer never becomes a handle.
925    fn build_dplus_reject_outcome(
926        peer: SocketAddr,
927        reject: crate::reflector::RejectReason,
928    ) -> EndpointOutcome<P> {
929        let mut outcome = EndpointOutcome::<P>::empty();
930        let mut buf = [0u8; 16];
931        if let Ok(n) = encode_link2_reply(&mut buf, Link2Result::Busy)
932            && let Some(payload) = buf.get(..n)
933        {
934            outcome.txs.push((payload.to_vec(), peer));
935        }
936        outcome.events.push(ServerEvent::ClientRejected {
937            peer,
938            reason: reject.into_core_reason(),
939        });
940        outcome
941    }
942
943    /// Build an outcome for a rejected `DCS` link attempt.
944    ///
945    /// Emits a single 14-byte DCS NAK datagram and a
946    /// [`ServerEvent::ClientRejected`] event. The client pool is not
947    /// touched — the peer never becomes a handle.
948    fn build_dcs_reject_outcome(
949        peer: SocketAddr,
950        callsign: Callsign,
951        reflector_module: Module,
952        reject: crate::reflector::RejectReason,
953    ) -> EndpointOutcome<P> {
954        let mut outcome = EndpointOutcome::<P>::empty();
955        let mut buf = [0u8; 32];
956        if let Ok(n) = encode_dcs_connect_nak(&mut buf, &callsign, reflector_module)
957            && let Some(payload) = buf.get(..n)
958        {
959            outcome.txs.push((payload.to_vec(), peer));
960        }
961        outcome.events.push(ServerEvent::ClientRejected {
962            peer,
963            reason: reject.into_core_reason(),
964        });
965        outcome
966    }
967
968    /// Drive the core's state machine and drain its outbox + events.
969    ///
970    /// Held as a private helper so the lock-protected mutation of the
971    /// per-peer `ServerSessionCore` stays in one place. We take the
972    /// pool's mutex, borrow the handle mutably, feed the core, and
973    /// drain everything into owned `Vec`s before releasing the lock.
974    async fn drive_core(
975        &self,
976        peer: &SocketAddr,
977        bytes: &[u8],
978        now: Instant,
979    ) -> Result<EndpointOutcome<P>, ShellError> {
980        // We need mutable access to the handle inside the pool's
981        // HashMap. `ClientPool` intentionally doesn't expose `&mut`
982        // directly — reach through the private `Mutex<HashMap>` here
983        // via a dedicated method on the pool.
984        let mut outcome = EndpointOutcome::<P>::empty();
985
986        self.clients
987            .with_handle_mut(peer, |handle| -> Result<(), ShellError> {
988                handle.session.handle_input(now, bytes)?;
989                // The core currently stamps outbox entries with a
990                // fresh `Instant::now()` rather than the injected
991                // `now`. Sampling the wall clock again here ensures
992                // `pop_ready` sees a time strictly after the enqueue
993                // instant so just-enqueued packets actually drain.
994                let drain_now = Instant::now();
995                while let Some(tx) = handle.session.pop_transmit(drain_now) {
996                    outcome.txs.push((tx.payload.to_vec(), tx.dst));
997                }
998                while let Some(ev) = handle.session.pop_event::<P>() {
999                    outcome.events.push(ev);
1000                }
1001                Ok(())
1002            })
1003            .await
1004            .unwrap_or(Ok(()))?;
1005
1006        Ok(outcome)
1007    }
1008
1009    /// Inspect the outcome events to classify the received datagram.
1010    ///
1011    /// Returns the first forwardable voice hint found in the event
1012    /// list. The run loop uses this hint to route the raw inbound
1013    /// bytes through the fan-out engine without re-decoding.
1014    ///
1015    /// The event carries the stream id, and the peer's module is
1016    /// resolved from the client pool — the caller has already linked
1017    /// the peer by the time the hint is extracted.
1018    fn forward_hint(events: &[ServerEvent<P>], peer_module: Option<Module>) -> Option<ForwardHint> {
1019        let module = peer_module?;
1020        for ev in events {
1021            let hint = match ev {
1022                ServerEvent::ClientStreamStarted { stream_id, .. } => Some(ForwardHint::Header {
1023                    module,
1024                    stream_id: *stream_id,
1025                }),
1026                ServerEvent::ClientStreamFrame { stream_id, .. } => Some(ForwardHint::Data {
1027                    module,
1028                    stream_id: *stream_id,
1029                }),
1030                ServerEvent::ClientStreamEnded { stream_id, .. } => Some(ForwardHint::Eot {
1031                    module,
1032                    stream_id: *stream_id,
1033                }),
1034                // `ServerEvent` is `non_exhaustive`; the wildcard
1035                // covers `ClientLinked`/`ClientUnlinked` plus any
1036                // future variants.
1037                _ => None,
1038            };
1039            if hint.is_some() {
1040                return hint;
1041            }
1042        }
1043        None
1044    }
1045
1046    /// Bind-less run loop that owns a pre-bound [`UdpSocket`].
1047    ///
1048    /// Each iteration reads one datagram, feeds it to
1049    /// [`Self::handle_inbound`], writes outbound responses back to
1050    /// their destination peer, and finally fans voice frames out to
1051    /// every other peer on the same module.
1052    ///
1053    /// Returns when `shutdown` transitions to `true`, or when an
1054    /// unrecoverable I/O error occurs.
1055    ///
1056    /// # Errors
1057    ///
1058    /// Returns [`ShellError::Io`] if the socket errors during a
1059    /// `recv_from`. Send-side failures are logged and the offending
1060    /// peer is marked unhealthy; they do not terminate the loop.
1061    ///
1062    /// # Cancellation safety
1063    ///
1064    /// Dropping this future is the intended shutdown mechanism for an
1065    /// endpoint task — the enclosing [`tokio::task::JoinSet`] in
1066    /// [`crate::reflector::Reflector::run`] will abort the task when the
1067    /// shutdown watch channel fires, which drops the `run` future
1068    /// cleanly. Any in-progress `handle_inbound` call for a single
1069    /// datagram will be abandoned mid-lock-sequence, which is
1070    /// acceptable during shutdown because the entire [`ClientPool`]
1071    /// is about to be dropped with it. Do **not** race `run()` against
1072    /// another future with `tokio::select!` while the endpoint is
1073    /// expected to remain operational.
1074    pub async fn run(
1075        self: Arc<Self>,
1076        socket: Arc<UdpSocket>,
1077        mut shutdown: watch::Receiver<bool>,
1078    ) -> Result<(), ShellError> {
1079        let mut buf = [0u8; 2048];
1080        let mut voice_rx = self.voice_bus.as_ref().map(broadcast::Sender::subscribe);
1081        loop {
1082            // Pattern: "maybe-subscribed optional branch" — when
1083            // `voice_rx` is None the voice arm must never resolve so
1084            // the other arms can still drive. `std::future::pending()`
1085            // returns `!` which we wrap in `Option` so the select
1086            // arm type-checks against the `Ok` branch below.
1087            let voice_branch = async {
1088                match voice_rx.as_mut() {
1089                    Some(rx) => Some(rx.recv().await),
1090                    None => std::future::pending().await,
1091                }
1092            };
1093            tokio::select! {
1094                biased;
1095                change = shutdown.changed() => {
1096                    // `changed` resolves `Err` when all senders drop —
1097                    // treat that as an implicit shutdown.
1098                    if change.is_err() || *shutdown.borrow() {
1099                        return Ok(());
1100                    }
1101                }
1102                result = socket.recv_from(&mut buf) => {
1103                    let (n, peer) = result?;
1104                    let recv_slice = buf.get(..n).unwrap_or(&[]);
1105                    let owned_bytes = recv_slice.to_vec();
1106                    let now = Instant::now();
1107                    self.run_one_tick(&socket, &owned_bytes, peer, now).await?;
1108                }
1109                Some(result) = voice_branch => {
1110                    match result {
1111                        Ok(event) => {
1112                            self.handle_cross_protocol_event(&socket, event).await;
1113                        }
1114                        Err(broadcast::error::RecvError::Lagged(skipped)) => {
1115                            tracing::warn!(
1116                                skipped,
1117                                "cross-protocol bus lagged; catching up"
1118                            );
1119                        }
1120                        Err(broadcast::error::RecvError::Closed) => {
1121                            // Bus has been closed — no more events
1122                            // will arrive. Drop the subscription so
1123                            // the voice arm goes quiet forever and
1124                            // `run` keeps servicing UDP + shutdown.
1125                            voice_rx = None;
1126                        }
1127                    }
1128                }
1129            }
1130        }
1131    }
1132
1133    /// Handle a cross-protocol voice event delivered via the
1134    /// broadcast bus.
1135    ///
1136    /// Transcodes the event into this endpoint's wire format via
1137    /// [`transcode_voice`] and fans the result out to every peer
1138    /// currently linked to the originator's module on this endpoint.
1139    /// Same-protocol events (those published by this endpoint itself)
1140    /// are dropped — the normal within-protocol fan-out path already
1141    /// handles them.
1142    async fn handle_cross_protocol_event(
1143        self: &Arc<Self>,
1144        socket: &Arc<UdpSocket>,
1145        event: CrossProtocolEvent,
1146    ) {
1147        if event.source_protocol == self.protocol {
1148            return;
1149        }
1150        let mut scratch = [0u8; 2048];
1151        let len = match transcode_voice(
1152            self.protocol,
1153            &event.event,
1154            event.cached_header.as_ref(),
1155            &mut scratch,
1156        ) {
1157            Ok(n) => n,
1158            Err(TranscodeError::Encode(e)) => {
1159                tracing::warn!(
1160                    target = ?self.protocol,
1161                    source = ?event.source_protocol,
1162                    err = ?e,
1163                    "cross-protocol transcode encode failed"
1164                );
1165                return;
1166            }
1167            Err(TranscodeError::MissingCachedHeader) => {
1168                tracing::debug!(
1169                    target = ?self.protocol,
1170                    source = ?event.source_protocol,
1171                    "cross-protocol transcode dropped: target requires cached header"
1172                );
1173                return;
1174            }
1175        };
1176        let Some(payload) = scratch.get(..len) else {
1177            return;
1178        };
1179        let members = self.clients.members_of_module(event.module).await;
1180        for peer in members {
1181            if peer == event.source_peer {
1182                // Defensive: the source peer should be on a
1183                // different protocol's pool, not this one, but the
1184                // check is cheap.
1185                continue;
1186            }
1187            if let Err(e) = socket.send_to(payload, peer).await {
1188                tracing::warn!(
1189                    ?peer,
1190                    ?e,
1191                    "cross-protocol send failed; marking peer unhealthy"
1192                );
1193                if let UnhealthyOutcome::ShouldEvict { failure_count } =
1194                    self.clients.mark_unhealthy(&peer).await
1195                {
1196                    tracing::warn!(
1197                        ?peer,
1198                        failure_count,
1199                        "cross-protocol send failure threshold exceeded; evicting peer"
1200                    );
1201                    self.evict_peer(peer, "too many cross-protocol send failures")
1202                        .await;
1203                }
1204            }
1205        }
1206    }
1207
1208    /// Process one received datagram through the full pipeline:
1209    /// `handle_inbound` → reply `send_to` → fan-out → eviction.
1210    ///
1211    /// Extracted from [`Self::run`] to keep the top-level run loop
1212    /// readable and within clippy's cognitive complexity budget.
1213    async fn run_one_tick(
1214        self: &Arc<Self>,
1215        socket: &Arc<UdpSocket>,
1216        owned_bytes: &[u8],
1217        peer: SocketAddr,
1218        now: Instant,
1219    ) -> Result<(), ShellError> {
1220        let outcome = match self.handle_inbound(owned_bytes, peer, now).await {
1221            Ok(o) => o,
1222            Err(ShellError::Core(e)) => {
1223                tracing::warn!(?peer, ?e, "dropping malformed datagram");
1224                return Ok(());
1225            }
1226            Err(ShellError::Protocol(msg)) => {
1227                tracing::warn!(?peer, msg, "protocol not supported");
1228                return Ok(());
1229            }
1230            Err(e @ ShellError::Io(_)) => return Err(e),
1231        };
1232
1233        let mut evicted_peers: Vec<SocketAddr> = Vec::new();
1234        self.send_replies(socket, &outcome.txs, &mut evicted_peers)
1235            .await;
1236        self.fan_out_outcome(socket, &outcome, peer, owned_bytes, &mut evicted_peers)
1237            .await;
1238
1239        // Fix 4: Remove any peers whose send-failure count crossed
1240        // the eviction threshold on this tick. The ClientEvicted
1241        // event itself is emitted by `evict_peer` so consumers of
1242        // the server event stream can observe the eviction.
1243        for evicted in evicted_peers {
1244            self.evict_peer(evicted, "too many send failures").await;
1245        }
1246        Ok(())
1247    }
1248
1249    /// Send all reply datagrams from `outcome.txs`, marking peers
1250    /// unhealthy on send failure and collecting any that cross the
1251    /// eviction threshold into `evicted_peers`.
1252    async fn send_replies(
1253        self: &Arc<Self>,
1254        socket: &Arc<UdpSocket>,
1255        txs: &[(Vec<u8>, SocketAddr)],
1256        evicted_peers: &mut Vec<SocketAddr>,
1257    ) {
1258        for (payload, dst) in txs {
1259            if let Err(e) = socket.send_to(payload, *dst).await {
1260                tracing::warn!(?dst, ?e, "reply send_to failed");
1261                if let UnhealthyOutcome::ShouldEvict { failure_count } =
1262                    self.clients.mark_unhealthy(dst).await
1263                {
1264                    tracing::warn!(
1265                        ?dst,
1266                        failure_count,
1267                        "reply send failure threshold exceeded; evicting peer"
1268                    );
1269                    evicted_peers.push(*dst);
1270                }
1271            }
1272        }
1273    }
1274
1275    /// Fan out the received datagram (and any cached header
1276    /// retransmit) to every other peer on the same module.
1277    async fn fan_out_outcome(
1278        self: &Arc<Self>,
1279        socket: &Arc<UdpSocket>,
1280        outcome: &EndpointOutcome<P>,
1281        peer: SocketAddr,
1282        owned_bytes: &[u8],
1283        evicted_peers: &mut Vec<SocketAddr>,
1284    ) {
1285        let peer_module = self.clients.module_of(&peer).await;
1286        let Some(hint) = Self::forward_hint(&outcome.events, peer_module) else {
1287            return;
1288        };
1289        let (module, _stream_id) = match hint {
1290            ForwardHint::Header { module, stream_id }
1291            | ForwardHint::Data { module, stream_id }
1292            | ForwardHint::Eot { module, stream_id } => (module, stream_id),
1293        };
1294        match fan_out_voice(
1295            socket.as_ref(),
1296            &self.clients,
1297            peer,
1298            module,
1299            self.protocol,
1300            owned_bytes,
1301        )
1302        .await
1303        {
1304            Ok(report) => evicted_peers.extend(report.evicted),
1305            Err(e) => tracing::warn!(?peer, ?e, "fan_out_voice failed"),
1306        }
1307        // Fix 3: If the stream cache fired a header retransmit on
1308        // this tick, fan out the cached bytes alongside the normal
1309        // frame. We send the data frame FIRST (above) and the cached
1310        // header SECOND so listeners who missed the initial header
1311        // still get refreshed context immediately after decoding the
1312        // data.
1313        if let Some(cached) = outcome.header_retransmit.as_ref() {
1314            match fan_out_voice(
1315                socket.as_ref(),
1316                &self.clients,
1317                peer,
1318                module,
1319                self.protocol,
1320                cached,
1321            )
1322            .await
1323            {
1324                Ok(report) => evicted_peers.extend(report.evicted),
1325                Err(e) => tracing::warn!(?peer, ?e, "fan_out_voice header retransmit failed"),
1326            }
1327        }
1328    }
1329}
1330
1331#[cfg(test)]
1332mod tests {
1333    use super::{EndpointOutcome, ProtocolEndpoint};
1334    use crate::reflector::{
1335        AllowAllAuthorizer, ClientAuthorizer, DenyAllAuthorizer, ReadOnlyAuthorizer,
1336    };
1337    use dstar_gateway_core::codec::dcs::{
1338        GatewayType as DcsGatewayType, encode_connect_link as encode_dcs_link,
1339        encode_voice as encode_dcs_voice,
1340    };
1341    use dstar_gateway_core::codec::dextra::{
1342        encode_connect_link, encode_voice_data, encode_voice_eot, encode_voice_header,
1343    };
1344    use dstar_gateway_core::codec::dplus::{
1345        encode_link1 as encode_dplus_link1, encode_link2 as encode_dplus_link2,
1346        encode_voice_data as encode_dplus_voice_data, encode_voice_eot as encode_dplus_voice_eot,
1347        encode_voice_header as encode_dplus_voice_header,
1348    };
1349    use dstar_gateway_core::header::DStarHeader;
1350    use dstar_gateway_core::session::client::{DExtra, DPlus, Dcs};
1351    use dstar_gateway_core::session::server::{ServerEvent, ServerStateKind};
1352    use dstar_gateway_core::types::{Callsign, Module, ProtocolKind, StreamId, Suffix};
1353    use dstar_gateway_core::voice::VoiceFrame;
1354    use std::net::{IpAddr, Ipv4Addr, SocketAddr};
1355    use std::sync::Arc;
1356    use std::time::Instant;
1357
1358    type TestResult = Result<(), Box<dyn std::error::Error>>;
1359
1360    const PEER: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 30001);
1361
1362    fn peer() -> SocketAddr {
1363        PEER
1364    }
1365
1366    fn allow_all() -> Arc<dyn ClientAuthorizer> {
1367        Arc::new(AllowAllAuthorizer)
1368    }
1369
1370    #[tokio::test]
1371    async fn new_endpoint_has_empty_pool() {
1372        let ep = ProtocolEndpoint::<DExtra>::new(ProtocolKind::DExtra, Module::C, allow_all());
1373        assert_eq!(ep.protocol_kind(), ProtocolKind::DExtra);
1374        assert_eq!(ep.clients().len().await, 0);
1375    }
1376
1377    #[tokio::test]
1378    async fn dextra_link_produces_ack_and_event() -> TestResult {
1379        let ep = ProtocolEndpoint::<DExtra>::new(ProtocolKind::DExtra, Module::C, allow_all());
1380        let mut buf = [0u8; 16];
1381        let n = encode_connect_link(
1382            &mut buf,
1383            &Callsign::from_wire_bytes(*b"W1AW    "),
1384            Module::C,
1385            Module::B,
1386        )?;
1387        let slice = buf.get(..n).ok_or("encode produced no bytes")?;
1388
1389        let outcome: EndpointOutcome<DExtra> =
1390            ep.handle_inbound(slice, peer(), Instant::now()).await?;
1391
1392        // Exactly one outbound ACK to the same peer. The ACK tag
1393        // offset is asserted by the codec's own golden tests — we
1394        // just verify one 14-byte datagram was enqueued to the peer
1395        // and contains the ACK tag somewhere in the payload.
1396        assert_eq!(outcome.txs.len(), 1);
1397        let (payload, dst) = outcome.txs.first().ok_or("no tx")?;
1398        assert_eq!(*dst, peer());
1399        assert_eq!(payload.len(), 14, "DExtra ACK is 14 bytes");
1400        assert!(
1401            payload.windows(3).any(|w| w == b"ACK"),
1402            "payload must contain ACK tag"
1403        );
1404
1405        // Exactly one ClientLinked event.
1406        assert_eq!(outcome.events.len(), 1);
1407        assert!(matches!(
1408            outcome.events.first(),
1409            Some(ServerEvent::ClientLinked { .. })
1410        ));
1411
1412        // Pool now has one entry and has the reverse-index populated.
1413        assert_eq!(ep.clients().len().await, 1);
1414        let members = ep.clients().members_of_module(Module::C).await;
1415        assert_eq!(members, vec![peer()]);
1416        Ok(())
1417    }
1418
1419    // ─── DPlus handshake ─────────────────────────────────────────
1420    #[tokio::test]
1421    async fn dplus_link2_after_link1_creates_handle_and_acks_okrw() -> TestResult {
1422        let ep = ProtocolEndpoint::<DPlus>::new(ProtocolKind::DPlus, Module::C, allow_all());
1423
1424        // LINK1 — 5 bytes, no callsign. The core transitions to
1425        // `Link1Received` and enqueues the 5-byte ACK echo.
1426        let mut link1_buf = [0u8; 8];
1427        let n1 = encode_dplus_link1(&mut link1_buf)?;
1428        let link1_slice = link1_buf.get(..n1).ok_or("link1 empty")?;
1429        let outcome1: EndpointOutcome<DPlus> = ep
1430            .handle_inbound(link1_slice, peer(), Instant::now())
1431            .await?;
1432        // LINK1 ACK echo is 5 bytes back to the peer.
1433        assert_eq!(outcome1.txs.len(), 1);
1434        let (payload1, dst1) = outcome1.txs.first().ok_or("no tx")?;
1435        assert_eq!(*dst1, peer());
1436        assert_eq!(payload1.len(), 5, "DPlus LINK1 ACK is 5 bytes");
1437        // LINK1 does not emit a ClientLinked event — the login isn't
1438        // complete until LINK2 arrives with the callsign.
1439        assert!(
1440            outcome1.events.is_empty(),
1441            "LINK1 emits no events (no callsign yet)"
1442        );
1443        // A handle was created (needed to carry the Link1Received state).
1444        assert_eq!(ep.clients().len().await, 1);
1445
1446        // LINK2 — 28 bytes carrying the callsign. The core fires
1447        // `ClientLinked` with the fallback reflector module (DPlus
1448        // LINK2 carries no module), and enqueues the 8-byte OKRW reply.
1449        let mut link2_buf = [0u8; 32];
1450        let n2 = encode_dplus_link2(&mut link2_buf, &Callsign::from_wire_bytes(*b"W1AW    "))?;
1451        let link2_slice = link2_buf.get(..n2).ok_or("link2 empty")?;
1452        let outcome2: EndpointOutcome<DPlus> = ep
1453            .handle_inbound(link2_slice, peer(), Instant::now())
1454            .await?;
1455        assert_eq!(outcome2.txs.len(), 1);
1456        let (payload2, dst2) = outcome2.txs.first().ok_or("no tx")?;
1457        assert_eq!(*dst2, peer());
1458        assert_eq!(payload2.len(), 8, "DPlus LINK2 reply is 8 bytes");
1459        assert!(
1460            payload2.windows(4).any(|w| w == b"OKRW"),
1461            "LINK2 ACCEPT reply contains OKRW tag"
1462        );
1463        assert_eq!(outcome2.events.len(), 1);
1464        assert!(matches!(
1465            outcome2.events.first(),
1466            Some(ServerEvent::ClientLinked { .. })
1467        ));
1468        let members = ep.clients().members_of_module(Module::C).await;
1469        assert_eq!(members, vec![peer()]);
1470        Ok(())
1471    }
1472
1473    #[tokio::test]
1474    async fn dplus_voice_header_during_linked_creates_stream_cache() -> TestResult {
1475        let ep = ProtocolEndpoint::<DPlus>::new(ProtocolKind::DPlus, Module::C, allow_all());
1476        // LINK1 + LINK2 to establish the session.
1477        let mut link1_buf = [0u8; 8];
1478        let n1 = encode_dplus_link1(&mut link1_buf)?;
1479        drop(
1480            ep.handle_inbound(link1_buf.get(..n1).ok_or("empty")?, peer(), Instant::now())
1481                .await?,
1482        );
1483        let mut link2_buf = [0u8; 32];
1484        let n2 = encode_dplus_link2(&mut link2_buf, &Callsign::from_wire_bytes(*b"W1AW    "))?;
1485        drop(
1486            ep.handle_inbound(link2_buf.get(..n2).ok_or("empty")?, peer(), Instant::now())
1487                .await?,
1488        );
1489
1490        // Voice header — 58 bytes.
1491        let mut hdr_buf = [0u8; 64];
1492        let hdr_n = encode_dplus_voice_header(&mut hdr_buf, sid(), &test_header("W1AW"))?;
1493        let hdr_slice = hdr_buf.get(..hdr_n).ok_or("empty")?;
1494        let outcome = ep.handle_inbound(hdr_slice, peer(), Instant::now()).await?;
1495        // ClientStreamStarted event must be present.
1496        assert!(
1497            outcome
1498                .events
1499                .iter()
1500                .any(|ev| matches!(ev, ServerEvent::ClientStreamStarted { .. })),
1501            "voice header emits ClientStreamStarted"
1502        );
1503        // Header tick itself does not retransmit.
1504        assert!(outcome.header_retransmit.is_none());
1505
1506        // Send 20 data frames to trip the retransmit cadence.
1507        let frame = VoiceFrame::silence();
1508        let mut cache_fired = 0_u32;
1509        for seq in 0_u8..20 {
1510            let mut data_buf = [0u8; 64];
1511            let data_n = encode_dplus_voice_data(&mut data_buf, sid(), seq, &frame)?;
1512            let data_slice = data_buf.get(..data_n).ok_or("empty")?;
1513            let outcome = ep
1514                .handle_inbound(data_slice, peer(), Instant::now())
1515                .await?;
1516            if outcome.header_retransmit.is_some() {
1517                cache_fired = cache_fired.saturating_add(1);
1518            }
1519        }
1520        assert_eq!(
1521            cache_fired, 1,
1522            "DPlus stream cache retransmits after 20 frames"
1523        );
1524
1525        // EOT clears the cache.
1526        let mut eot_buf = [0u8; 64];
1527        let eot_n = encode_dplus_voice_eot(&mut eot_buf, sid(), 20)?;
1528        let eot_slice = eot_buf.get(..eot_n).ok_or("empty")?;
1529        drop(ep.handle_inbound(eot_slice, peer(), Instant::now()).await?);
1530        Ok(())
1531    }
1532
1533    // ─── DCS handshake ───────────────────────────────────────────
1534    fn dcs_reflector_cs() -> Callsign {
1535        Callsign::from_wire_bytes(*b"DCS030  ")
1536    }
1537
1538    #[tokio::test]
1539    async fn dcs_link_creates_handle_and_acks() -> TestResult {
1540        let ep = ProtocolEndpoint::<Dcs>::new(ProtocolKind::Dcs, Module::C, allow_all());
1541        // DCS LINK is 519 bytes.
1542        let mut buf = vec![0u8; 600];
1543        let n = encode_dcs_link(
1544            &mut buf,
1545            &Callsign::from_wire_bytes(*b"W1AW    "),
1546            Module::B,
1547            Module::C,
1548            &dcs_reflector_cs(),
1549            DcsGatewayType::Repeater,
1550        )?;
1551        let slice = buf.get(..n).ok_or("empty")?;
1552
1553        let outcome: EndpointOutcome<Dcs> =
1554            ep.handle_inbound(slice, peer(), Instant::now()).await?;
1555        // Exactly one 14-byte ACK datagram.
1556        assert_eq!(outcome.txs.len(), 1);
1557        let (payload, dst) = outcome.txs.first().ok_or("no tx")?;
1558        assert_eq!(*dst, peer());
1559        assert_eq!(payload.len(), 14, "DCS ACK is 14 bytes");
1560        assert!(
1561            payload.windows(3).any(|w| w == b"ACK"),
1562            "DCS ACK payload contains ACK tag"
1563        );
1564        // ClientLinked event present.
1565        assert!(
1566            outcome
1567                .events
1568                .iter()
1569                .any(|ev| matches!(ev, ServerEvent::ClientLinked { .. })),
1570            "DCS link emits ClientLinked"
1571        );
1572        // Pool has one member on module C.
1573        assert_eq!(ep.clients().len().await, 1);
1574        let members = ep.clients().members_of_module(Module::C).await;
1575        assert_eq!(members, vec![peer()]);
1576        Ok(())
1577    }
1578
1579    #[tokio::test]
1580    async fn dcs_voice_first_packet_starts_stream_and_caches_header() -> TestResult {
1581        let ep = ProtocolEndpoint::<Dcs>::new(ProtocolKind::Dcs, Module::C, allow_all());
1582        // LINK first.
1583        let mut link_buf = vec![0u8; 600];
1584        let link_n = encode_dcs_link(
1585            &mut link_buf,
1586            &Callsign::from_wire_bytes(*b"W1AW    "),
1587            Module::B,
1588            Module::C,
1589            &dcs_reflector_cs(),
1590            DcsGatewayType::Repeater,
1591        )?;
1592        drop(
1593            ep.handle_inbound(
1594                link_buf.get(..link_n).ok_or("empty")?,
1595                peer(),
1596                Instant::now(),
1597            )
1598            .await?,
1599        );
1600
1601        // DCS voice is 100 bytes — first packet for a new stream id
1602        // is the implicit "header". DCS carries the header in every
1603        // voice frame so there's no separate VoiceHeader packet type.
1604        let frame = VoiceFrame::silence();
1605        let mut voice_buf = [0u8; 128];
1606        let voice_n = encode_dcs_voice(
1607            &mut voice_buf,
1608            &test_header("W1AW"),
1609            sid(),
1610            0,
1611            &frame,
1612            false,
1613        )?;
1614        let voice_slice = voice_buf.get(..voice_n).ok_or("empty")?;
1615        let outcome = ep
1616            .handle_inbound(voice_slice, peer(), Instant::now())
1617            .await?;
1618        // Both ClientStreamStarted (new stream id) and
1619        // ClientStreamFrame (the frame itself) are present.
1620        assert!(
1621            outcome
1622                .events
1623                .iter()
1624                .any(|ev| matches!(ev, ServerEvent::ClientStreamStarted { .. })),
1625            "first DCS voice packet emits ClientStreamStarted"
1626        );
1627        assert!(
1628            outcome
1629                .events
1630                .iter()
1631                .any(|ev| matches!(ev, ServerEvent::ClientStreamFrame { .. })),
1632            "first DCS voice packet also emits ClientStreamFrame"
1633        );
1634        // First packet never triggers retransmit cadence.
1635        assert!(outcome.header_retransmit.is_none());
1636        Ok(())
1637    }
1638
1639    #[tokio::test]
1640    async fn dcs_voice_with_is_end_clears_stream_cache() -> TestResult {
1641        let ep = ProtocolEndpoint::<Dcs>::new(ProtocolKind::Dcs, Module::C, allow_all());
1642        // LINK first.
1643        let mut link_buf = vec![0u8; 600];
1644        let link_n = encode_dcs_link(
1645            &mut link_buf,
1646            &Callsign::from_wire_bytes(*b"W1AW    "),
1647            Module::B,
1648            Module::C,
1649            &dcs_reflector_cs(),
1650            DcsGatewayType::Repeater,
1651        )?;
1652        drop(
1653            ep.handle_inbound(
1654                link_buf.get(..link_n).ok_or("empty")?,
1655                peer(),
1656                Instant::now(),
1657            )
1658            .await?,
1659        );
1660
1661        // Send a voice header-ish packet (first of a new stream).
1662        let frame = VoiceFrame::silence();
1663        let mut voice_buf = [0u8; 128];
1664        let voice_n = encode_dcs_voice(
1665            &mut voice_buf,
1666            &test_header("W1AW"),
1667            sid(),
1668            0,
1669            &frame,
1670            false,
1671        )?;
1672        drop(
1673            ep.handle_inbound(
1674                voice_buf.get(..voice_n).ok_or("empty")?,
1675                peer(),
1676                Instant::now(),
1677            )
1678            .await?,
1679        );
1680
1681        // Now send the end-of-stream packet.
1682        let mut eot_buf = [0u8; 128];
1683        let eot_n = encode_dcs_voice(
1684            &mut eot_buf,
1685            &test_header("W1AW"),
1686            sid(),
1687            1,
1688            &frame,
1689            true, // is_end
1690        )?;
1691        drop(
1692            ep.handle_inbound(eot_buf.get(..eot_n).ok_or("empty")?, peer(), Instant::now())
1693                .await?,
1694        );
1695
1696        // Now start a NEW stream with a different stream id — this
1697        // must behave as a fresh stream (new ClientStreamStarted
1698        // event), which can only happen if the DCS EOT cleared the
1699        // cache on the previous tick.
1700        let Some(new_sid) = StreamId::new(0x9999) else {
1701            unreachable!()
1702        };
1703        let mut fresh_buf = [0u8; 128];
1704        let fresh_n = encode_dcs_voice(
1705            &mut fresh_buf,
1706            &test_header("W1AW"),
1707            new_sid,
1708            0,
1709            &frame,
1710            false,
1711        )?;
1712        let fresh_outcome = ep
1713            .handle_inbound(
1714                fresh_buf.get(..fresh_n).ok_or("empty")?,
1715                peer(),
1716                Instant::now(),
1717            )
1718            .await?;
1719        assert!(
1720            fresh_outcome
1721                .events
1722                .iter()
1723                .any(|ev| matches!(ev, ServerEvent::ClientStreamStarted { .. })),
1724            "fresh stream after is_end must emit ClientStreamStarted"
1725        );
1726        Ok(())
1727    }
1728
1729    fn test_header(cs_my: &str) -> DStarHeader {
1730        let mut my_bytes = *b"        ";
1731        let src = cs_my.as_bytes();
1732        let len = src.len().min(8);
1733        my_bytes[..len].copy_from_slice(&src[..len]);
1734        DStarHeader {
1735            flag1: 0,
1736            flag2: 0,
1737            flag3: 0,
1738            rpt2: Callsign::from_wire_bytes(*b"REF030 G"),
1739            rpt1: Callsign::from_wire_bytes(*b"REF030 C"),
1740            ur_call: Callsign::from_wire_bytes(*b"CQCQCQ  "),
1741            my_call: Callsign::from_wire_bytes(my_bytes),
1742            my_suffix: Suffix::EMPTY,
1743        }
1744    }
1745
1746    const fn sid() -> StreamId {
1747        match StreamId::new(0x4242) {
1748            Some(s) => s,
1749            None => unreachable!(),
1750        }
1751    }
1752
1753    // ─── Fix 1: DenyAllAuthorizer path ────────────────────────────
1754    #[tokio::test]
1755    async fn dextra_link_rejected_by_deny_all_authorizer() -> TestResult {
1756        let ep = ProtocolEndpoint::<DExtra>::new(
1757            ProtocolKind::DExtra,
1758            Module::C,
1759            Arc::new(DenyAllAuthorizer),
1760        );
1761        let mut buf = [0u8; 16];
1762        let n = encode_connect_link(
1763            &mut buf,
1764            &Callsign::from_wire_bytes(*b"W1AW    "),
1765            Module::C,
1766            Module::B,
1767        )?;
1768        let slice = buf.get(..n).ok_or("empty")?;
1769
1770        let outcome: EndpointOutcome<DExtra> =
1771            ep.handle_inbound(slice, peer(), Instant::now()).await?;
1772
1773        // The pool must be empty — no handle was created.
1774        assert_eq!(
1775            ep.clients().len().await,
1776            0,
1777            "rejected peer must not be in pool"
1778        );
1779
1780        // Exactly one outbound NAK to the same peer. The NAK tag
1781        // position is asserted by the codec's own golden tests — we
1782        // just verify one 14-byte datagram was enqueued to the peer
1783        // that tried to link.
1784        assert_eq!(outcome.txs.len(), 1);
1785        let (payload, dst) = outcome.txs.first().ok_or("no tx")?;
1786        assert_eq!(*dst, peer());
1787        assert_eq!(payload.len(), 14, "DExtra NAK is 14 bytes");
1788
1789        // Exactly one ClientRejected event.
1790        assert_eq!(outcome.events.len(), 1);
1791        assert!(matches!(
1792            outcome.events.first(),
1793            Some(ServerEvent::ClientRejected { .. })
1794        ));
1795        Ok(())
1796    }
1797
1798    // ─── Fix 3: StreamCache 21-frame header retransmit ────────────
1799    #[tokio::test]
1800    async fn dextra_stream_cache_retransmits_header_every_21_frames() -> TestResult {
1801        let ep = ProtocolEndpoint::<DExtra>::new(ProtocolKind::DExtra, Module::C, allow_all());
1802
1803        // LINK first so the peer has a module assignment.
1804        let mut link_buf = [0u8; 16];
1805        let n = encode_connect_link(
1806            &mut link_buf,
1807            &Callsign::from_wire_bytes(*b"W1AW    "),
1808            Module::C,
1809            Module::B,
1810        )?;
1811        let link_slice = link_buf.get(..n).ok_or("empty")?;
1812        drop(
1813            ep.handle_inbound(link_slice, peer(), Instant::now())
1814                .await?,
1815        );
1816
1817        // Voice header.
1818        let mut hdr_buf = [0u8; 64];
1819        let hdr_n = encode_voice_header(&mut hdr_buf, sid(), &test_header("W1AW"))?;
1820        let hdr_slice = hdr_buf.get(..hdr_n).ok_or("empty")?;
1821        let hdr_outcome = ep.handle_inbound(hdr_slice, peer(), Instant::now()).await?;
1822        // The header tick itself does NOT trigger a retransmit —
1823        // the first retransmit fires after 20 data frames.
1824        assert!(
1825            hdr_outcome.header_retransmit.is_none(),
1826            "header tick must not trigger retransmit",
1827        );
1828
1829        // Send 20 voice data frames; the 20th (seq_counter=20 after
1830        // bump) fires the retransmit boundary.
1831        let frame = VoiceFrame::silence();
1832        let mut cache_fired = 0_u32;
1833        for seq in 0_u8..20 {
1834            let mut data_buf = [0u8; 64];
1835            let data_n = encode_voice_data(&mut data_buf, sid(), seq, &frame)?;
1836            let data_slice = data_buf.get(..data_n).ok_or("empty")?;
1837            let outcome = ep
1838                .handle_inbound(data_slice, peer(), Instant::now())
1839                .await?;
1840            if outcome.header_retransmit.is_some() {
1841                cache_fired = cache_fired.saturating_add(1);
1842            }
1843        }
1844        assert_eq!(cache_fired, 1, "one header retransmit after 20 data frames");
1845
1846        // Voice EOT — clears the cache.
1847        let mut eot_buf = [0u8; 64];
1848        let eot_n = encode_voice_eot(&mut eot_buf, sid(), 20)?;
1849        let eot_slice = eot_buf.get(..eot_n).ok_or("empty")?;
1850        drop(ep.handle_inbound(eot_slice, peer(), Instant::now()).await?);
1851
1852        // The stream cache is empty — subsequent data frames from the
1853        // same peer (without a fresh header) must not produce a
1854        // retransmit.
1855        let mut stale_buf = [0u8; 64];
1856        let stale_n = encode_voice_data(&mut stale_buf, sid(), 99, &frame)?;
1857        let stale_slice = stale_buf.get(..stale_n).ok_or("empty")?;
1858        let stale_outcome = ep
1859            .handle_inbound(stale_slice, peer(), Instant::now())
1860            .await?;
1861        assert!(
1862            stale_outcome.header_retransmit.is_none(),
1863            "EOT must clear the cache",
1864        );
1865        Ok(())
1866    }
1867
1868    // ─── Fix 4: ClientEvicted event path ──────────────────────────
1869    #[tokio::test]
1870    async fn dextra_endpoint_surfaces_evict_peer_event_next_tick() -> TestResult {
1871        // evict_peer is an async helper; we exercise it directly to
1872        // confirm the event queues and drains correctly on the next
1873        // handle_inbound call. The real wire trigger comes via the
1874        // run loop when fan_out_voice reports ShouldEvict.
1875        let ep = ProtocolEndpoint::<DExtra>::new(ProtocolKind::DExtra, Module::C, allow_all());
1876        // LINK a peer first so it has a pool entry to be evicted.
1877        let mut link_buf = [0u8; 16];
1878        let n = encode_connect_link(
1879            &mut link_buf,
1880            &Callsign::from_wire_bytes(*b"W1AW    "),
1881            Module::C,
1882            Module::B,
1883        )?;
1884        let link_slice = link_buf.get(..n).ok_or("empty")?;
1885        drop(
1886            ep.handle_inbound(link_slice, peer(), Instant::now())
1887                .await?,
1888        );
1889        assert_eq!(ep.clients().len().await, 1);
1890
1891        // Evict the peer out-of-band.
1892        ep.evict_peer(peer(), "test eviction").await;
1893        assert_eq!(ep.clients().len().await, 0, "peer removed");
1894
1895        // A subsequent LINK from a NEW peer on the same port surfaces
1896        // the queued ClientEvicted event from the previous eviction
1897        // in its outcome. We use peer() again — the previous handle
1898        // is gone, so this counts as a fresh LINK.
1899        let outcome = ep
1900            .handle_inbound(link_slice, peer(), Instant::now())
1901            .await?;
1902
1903        // The relink produced its own ClientLinked event, AND the
1904        // queued ClientEvicted event from the prior tick.
1905        let events = &outcome.events;
1906        assert!(
1907            events
1908                .iter()
1909                .any(|ev| matches!(ev, ServerEvent::ClientLinked { .. })),
1910            "fresh link still emits ClientLinked"
1911        );
1912        assert!(
1913            events
1914                .iter()
1915                .any(|ev| matches!(ev, ServerEvent::ClientEvicted { .. })),
1916            "queued ClientEvicted drains on next tick"
1917        );
1918        Ok(())
1919    }
1920
1921    // ─── Voice bus publish path ───────────────────────────────────
1922    #[tokio::test]
1923    async fn dextra_voice_events_publish_to_voice_bus() -> TestResult {
1924        use tokio::sync::broadcast;
1925        let (tx, mut rx) = broadcast::channel::<super::CrossProtocolEvent>(32);
1926        let ep = ProtocolEndpoint::<DExtra>::new_with_voice_bus(
1927            ProtocolKind::DExtra,
1928            Module::C,
1929            allow_all(),
1930            Some(tx),
1931        );
1932        // LINK so the peer has a module assignment in the pool.
1933        let mut link_buf = [0u8; 16];
1934        let n = encode_connect_link(
1935            &mut link_buf,
1936            &Callsign::from_wire_bytes(*b"W1AW    "),
1937            Module::C,
1938            Module::B,
1939        )?;
1940        let link_slice = link_buf.get(..n).ok_or("empty")?;
1941        drop(
1942            ep.handle_inbound(link_slice, peer(), Instant::now())
1943                .await?,
1944        );
1945        // LINK itself emits ClientLinked (not voice) — nothing on bus yet.
1946        assert!(
1947            rx.try_recv().is_err(),
1948            "LINK emits no cross-protocol events"
1949        );
1950
1951        // Voice header — should produce exactly one StreamStart on the bus.
1952        let mut hdr_buf = [0u8; 64];
1953        let hdr_n = encode_voice_header(&mut hdr_buf, sid(), &test_header("W1AW"))?;
1954        let hdr_slice = hdr_buf.get(..hdr_n).ok_or("empty")?;
1955        drop(ep.handle_inbound(hdr_slice, peer(), Instant::now()).await?);
1956        let event = rx.try_recv()?;
1957        assert_eq!(event.source_protocol, ProtocolKind::DExtra);
1958        assert_eq!(event.source_peer, peer());
1959        assert_eq!(event.module, Module::C);
1960        assert!(matches!(event.event, super::VoiceEvent::StreamStart { .. }));
1961
1962        // Voice data — should produce one Frame on the bus with cached header.
1963        let frame = VoiceFrame::silence();
1964        let mut data_buf = [0u8; 64];
1965        let data_n = encode_voice_data(&mut data_buf, sid(), 1, &frame)?;
1966        let data_slice = data_buf.get(..data_n).ok_or("empty")?;
1967        drop(
1968            ep.handle_inbound(data_slice, peer(), Instant::now())
1969                .await?,
1970        );
1971        let event = rx.try_recv()?;
1972        assert!(matches!(event.event, super::VoiceEvent::Frame { .. }));
1973        assert!(
1974            event.cached_header.is_some(),
1975            "voice data frame carries cached header"
1976        );
1977
1978        // Voice EOT — should produce one StreamEnd on the bus.
1979        let mut eot_buf = [0u8; 64];
1980        let eot_n = encode_voice_eot(&mut eot_buf, sid(), 1)?;
1981        let eot_slice = eot_buf.get(..eot_n).ok_or("empty")?;
1982        drop(ep.handle_inbound(eot_slice, peer(), Instant::now()).await?);
1983        let event = rx.try_recv()?;
1984        assert!(matches!(event.event, super::VoiceEvent::StreamEnd { .. }));
1985        Ok(())
1986    }
1987
1988    #[tokio::test]
1989    async fn no_voice_bus_means_no_publish() -> TestResult {
1990        // Sanity check: an endpoint constructed without a voice bus
1991        // MUST NOT attempt to publish (ergo, voice_bus field is None,
1992        // and handle_inbound's publish_voice_events helper is a no-op).
1993        let ep = ProtocolEndpoint::<DExtra>::new(ProtocolKind::DExtra, Module::C, allow_all());
1994        let mut link_buf = [0u8; 16];
1995        let n = encode_connect_link(
1996            &mut link_buf,
1997            &Callsign::from_wire_bytes(*b"W1AW    "),
1998            Module::C,
1999            Module::B,
2000        )?;
2001        drop(
2002            ep.handle_inbound(link_buf.get(..n).ok_or("empty")?, peer(), Instant::now())
2003                .await?,
2004        );
2005        let mut hdr_buf = [0u8; 64];
2006        let hdr_n = encode_voice_header(&mut hdr_buf, sid(), &test_header("W1AW"))?;
2007        // This must not panic and must not error — publish_voice_events
2008        // is a silent no-op when voice_bus is None.
2009        drop(
2010            ep.handle_inbound(hdr_buf.get(..hdr_n).ok_or("empty")?, peer(), Instant::now())
2011                .await?,
2012        );
2013        Ok(())
2014    }
2015
2016    // ─── Fix 2: ReadOnly voice drop path ──────────────────────────
2017    #[tokio::test]
2018    async fn dextra_readonly_voice_header_is_dropped() -> TestResult {
2019        let ep = ProtocolEndpoint::<DExtra>::new(
2020            ProtocolKind::DExtra,
2021            Module::C,
2022            Arc::new(ReadOnlyAuthorizer),
2023        );
2024        // First LINK so the peer is admitted with ReadOnly access.
2025        let mut link_buf = [0u8; 16];
2026        let n = encode_connect_link(
2027            &mut link_buf,
2028            &Callsign::from_wire_bytes(*b"W1AW    "),
2029            Module::C,
2030            Module::B,
2031        )?;
2032        let link_slice = link_buf.get(..n).ok_or("empty")?;
2033        let link_outcome = ep
2034            .handle_inbound(link_slice, peer(), Instant::now())
2035            .await?;
2036        assert_eq!(ep.clients().len().await, 1, "peer admitted as read-only");
2037        // The link itself still produced an ACK + ClientLinked event.
2038        assert_eq!(link_outcome.txs.len(), 1);
2039        assert_eq!(link_outcome.events.len(), 1);
2040
2041        // Now send a voice header from the read-only peer.
2042        let mut hdr_buf = [0u8; 64];
2043        let hdr_n = encode_voice_header(&mut hdr_buf, sid(), &test_header("W1AW"))?;
2044        let hdr_slice = hdr_buf.get(..hdr_n).ok_or("empty")?;
2045
2046        let voice_outcome: EndpointOutcome<DExtra> =
2047            ep.handle_inbound(hdr_slice, peer(), Instant::now()).await?;
2048
2049        // No fan-out side-effects: zero outbound txs for the voice
2050        // header (the pool is size-1 anyway so even ReadWrite would
2051        // produce no fan-out, but we also verify the state below).
2052        assert!(
2053            voice_outcome.txs.is_empty(),
2054            "read-only voice must not emit any outbound datagrams"
2055        );
2056
2057        // Exactly one VoiceFromReadOnlyDropped event is surfaced.
2058        assert_eq!(voice_outcome.events.len(), 1);
2059        assert!(matches!(
2060            voice_outcome.events.first(),
2061            Some(ServerEvent::VoiceFromReadOnlyDropped { .. })
2062        ));
2063
2064        // The server session MUST still be in Linked state — the
2065        // voice header must NOT have transitioned it to Streaming.
2066        let state = ep
2067            .clients()
2068            .with_handle_mut(&peer(), |h| h.session.state_kind())
2069            .await
2070            .ok_or("handle not found")?;
2071        assert_eq!(
2072            state,
2073            ServerStateKind::Linked,
2074            "read-only voice must not push session into Streaming"
2075        );
2076        Ok(())
2077    }
2078}