dstar_gateway_core/session/server/
core.rs

1//! Protocol-erased server-side state machine.
2//!
3//! [`ServerSessionCore`] is the runtime-erased state machine that
4//! drives a single server-side client session. It handles one client
5//! at a time — the server's fan-out engine in `dstar-gateway-server`
6//! spawns a `ServerSessionCore` per inbound peer and routes datagrams
7//! through [`ServerSessionCore::handle_input`].
8//!
9//! **Supported protocols: `DExtra`, `DPlus`, `DCS`.** All three
10//! protocols drive through the same `handle_input` dispatcher; the
11//! protocol-specific handshakes are implemented as private helpers
12//! (`handle_dextra_input`, `handle_dplus_input`, `handle_dcs_input`).
13//!
14//! The server core does NOT authenticate clients — that's the
15//! `dstar-gateway-server` shell's `ClientAuthorizer` job. The core
16//! only manages wire decoding + state transitions + event emission.
17
18use std::collections::VecDeque;
19use std::net::SocketAddr;
20use std::time::Instant;
21
22use crate::codec::dcs::{
23    self as dcs_codec, ClientPacket as DcsClientPacket,
24    decode_client_to_server as decode_dcs_client_to_server,
25};
26use crate::codec::dextra::{
27    ClientPacket, decode_client_to_server, encode_connect_ack, encode_poll_echo,
28};
29use crate::codec::dplus::{
30    ClientPacket as DPlusClientPacket, Link2Result,
31    decode_client_to_server as decode_dplus_client_to_server, encode_link1_ack, encode_link2_reply,
32    encode_poll_echo as dplus_encode_poll_echo, encode_unlink_ack,
33};
34use crate::error::{Error, ProtocolError};
35use crate::header::DStarHeader;
36use crate::session::client::Protocol;
37use crate::session::driver::Transmit;
38use crate::session::outbox::{OutboundPacket, Outbox};
39use crate::types::{Callsign, Module, ProtocolKind, StreamId};
40use crate::validator::VecSink;
41use crate::voice::VoiceFrame;
42
43use super::event::ServerEvent;
44use super::state::ServerStateKind;
45
46/// Internal state for the server session machine.
47///
48/// Kept as a private enum rather than reusing [`ServerStateKind`]
49/// so we can add private-only transitional states without leaking
50/// them through the public runtime discriminator. [`Link1Received`]
51/// is a DPlus-specific transient state between LINK1 and LINK2 that
52/// the public view collapses into `Unknown` via [`Self::kind`].
53///
54/// [`Link1Received`]: Self::Link1Received
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56enum InternalState {
57    Unknown,
58    /// `DPlus`-specific: LINK1 seen and acknowledged, waiting for LINK2.
59    Link1Received,
60    Linked,
61    Streaming,
62    Unlinking,
63    Closed,
64}
65
66impl InternalState {
67    const fn kind(self) -> ServerStateKind {
68        match self {
69            // Link1Received is a DPlus-private transitional state —
70            // the public view sees "not linked yet", same as Unknown.
71            Self::Unknown | Self::Link1Received => ServerStateKind::Unknown,
72            Self::Linked => ServerStateKind::Linked,
73            Self::Streaming => ServerStateKind::Streaming,
74            Self::Unlinking => ServerStateKind::Unlinking,
75            Self::Closed => ServerStateKind::Closed,
76        }
77    }
78}
79
80/// Internal protocol-erased server event record.
81///
82/// [`ServerSessionCore::pop_event`] is generic over `P: Protocol` and
83/// converts each `RawServerEvent` into a [`ServerEvent<P>`] at drain
84/// time — the queue itself is protocol-erased.
85#[derive(Debug, Clone)]
86enum RawServerEvent {
87    Linked {
88        peer: SocketAddr,
89        callsign: Callsign,
90        module: Module,
91    },
92    Unlinked {
93        peer: SocketAddr,
94    },
95    StreamStarted {
96        peer: SocketAddr,
97        stream_id: StreamId,
98        header: DStarHeader,
99    },
100    StreamFrame {
101        peer: SocketAddr,
102        stream_id: StreamId,
103        seq: u8,
104        frame: VoiceFrame,
105    },
106    StreamEnded {
107        peer: SocketAddr,
108        stream_id: StreamId,
109    },
110}
111
112/// Protocol-erased server-side session machine.
113///
114/// Each instance tracks one client. The `dstar-gateway-server` shell
115/// spawns a `ServerSessionCore` per inbound peer and routes datagrams
116/// through [`Self::handle_input`]. The typestate wrapper
117/// [`super::ServerSession`] sits on top and adds compile-time state
118/// discrimination.
119pub struct ServerSessionCore {
120    /// Which protocol this client speaks.
121    kind: ProtocolKind,
122    /// Client peer address.
123    peer: SocketAddr,
124    /// Default reflector module for this session.
125    ///
126    /// `DPlus` LINK2 does not carry a module letter on the wire —
127    /// the reflector's own identity is what implicitly selects it.
128    /// This field carries the module the reflector endpoint is
129    /// bound to so `DPlus` sessions have something to put in their
130    /// `ClientLinked` event. `DExtra` and `DCS` LINK packets carry
131    /// their own `reflector_module` and will overwrite this on LINK.
132    reflector_module: Module,
133    /// Runtime state.
134    state: InternalState,
135    /// Callsign of the linked client (populated once LINK arrives).
136    client_callsign: Option<Callsign>,
137    /// Local module letter of the linked client.
138    client_module: Option<Module>,
139    /// Last stream id we surfaced as a `StreamStarted` event.
140    ///
141    /// DCS voice packets carry the D-STAR header embedded in every
142    /// 100-byte frame, so the server can't distinguish "start of
143    /// stream" by packet type alone — it must track whether the
144    /// incoming `stream_id` is the same as the last frame's or a
145    /// fresh one. On a fresh id we emit `StreamStarted` first, then
146    /// the `StreamFrame`. On the same id we emit only `StreamFrame`.
147    last_stream_id: Option<StreamId>,
148    /// Outbound packet queue.
149    outbox: Outbox,
150    /// Queued raw events awaiting [`Self::pop_event`] drain.
151    events: VecDeque<RawServerEvent>,
152    /// Diagnostic sink for lenient parser warnings.
153    diagnostics: VecSink,
154    /// Most-recently-popped outbound packet, held so
155    /// [`Self::pop_transmit`] can return a borrow into the owned
156    /// payload across multiple calls.
157    current_tx: Option<OutboundPacket>,
158}
159
160impl std::fmt::Debug for ServerSessionCore {
161    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
162        f.debug_struct("ServerSessionCore")
163            .field("kind", &self.kind)
164            .field("peer", &self.peer)
165            .field("reflector_module", &self.reflector_module)
166            .field("state", &self.state)
167            .field("client_callsign", &self.client_callsign)
168            .field("client_module", &self.client_module)
169            .field("last_stream_id", &self.last_stream_id)
170            .field("outbox", &self.outbox)
171            .field("events", &self.events)
172            .field("diagnostics", &self.diagnostics)
173            .field("current_tx", &self.current_tx)
174            .finish()
175    }
176}
177
178impl ServerSessionCore {
179    /// Create a new server session for a client with the given
180    /// protocol, peer, and reflector module.
181    ///
182    /// `reflector_module` is the module this reflector endpoint is
183    /// bound to — used as the default for `DPlus` `ClientLinked`
184    /// events and overwritten on LINK for `DExtra`/`DCS` which carry
185    /// their own module in the wire packet.
186    #[must_use]
187    pub fn new(kind: ProtocolKind, peer: SocketAddr, reflector_module: Module) -> Self {
188        Self {
189            kind,
190            peer,
191            reflector_module,
192            state: InternalState::Unknown,
193            client_callsign: None,
194            client_module: None,
195            last_stream_id: None,
196            outbox: Outbox::new(),
197            events: VecDeque::new(),
198            diagnostics: VecSink::default(),
199            current_tx: None,
200        }
201    }
202
203    /// Runtime state discriminator.
204    #[must_use]
205    pub const fn state_kind(&self) -> ServerStateKind {
206        self.state.kind()
207    }
208
209    /// Peer address of this client.
210    #[must_use]
211    pub const fn peer(&self) -> SocketAddr {
212        self.peer
213    }
214
215    /// Runtime protocol discriminator.
216    #[must_use]
217    pub const fn protocol_kind(&self) -> ProtocolKind {
218        self.kind
219    }
220
221    /// Reflector module for this session.
222    #[must_use]
223    pub const fn reflector_module(&self) -> Module {
224        self.reflector_module
225    }
226
227    /// Callsign of the linked client, if any.
228    #[must_use]
229    pub const fn client_callsign(&self) -> Option<Callsign> {
230        self.client_callsign
231    }
232
233    /// Local module letter of the linked client, if any.
234    #[must_use]
235    pub const fn client_module(&self) -> Option<Module> {
236        self.client_module
237    }
238
239    /// Feed an inbound datagram into the server session.
240    ///
241    /// Parses the bytes, updates state, pushes events and outbound
242    /// packets as needed. Protocol-erased dispatch: matches on
243    /// [`Self::protocol_kind`] and calls the appropriate decoder.
244    ///
245    /// # Errors
246    ///
247    /// Returns [`Error::Protocol`] wrapping the codec error if the
248    /// datagram cannot be parsed.
249    pub fn handle_input(&mut self, now: Instant, bytes: &[u8]) -> Result<(), Error> {
250        match self.kind {
251            ProtocolKind::DExtra => self.handle_dextra_input(now, bytes),
252            ProtocolKind::DPlus => self.handle_dplus_input(now, bytes),
253            ProtocolKind::Dcs => self.handle_dcs_input(bytes),
254        }
255    }
256
257    fn handle_dextra_input(&mut self, now: Instant, bytes: &[u8]) -> Result<(), Error> {
258        let packet = decode_client_to_server(bytes, &mut self.diagnostics)
259            .map_err(|e| Error::Protocol(ProtocolError::DExtra(e)))?;
260        match packet {
261            ClientPacket::Link {
262                callsign,
263                reflector_module,
264                client_module,
265            } => self.on_dextra_link(now, callsign, reflector_module, client_module),
266            ClientPacket::Unlink { callsign, .. } => {
267                self.on_dextra_unlink(callsign);
268                Ok(())
269            }
270            ClientPacket::Poll { callsign } => self.on_dextra_poll(now, callsign),
271            ClientPacket::VoiceHeader { stream_id, header } => {
272                self.on_dextra_voice_header(stream_id, header);
273                Ok(())
274            }
275            ClientPacket::VoiceData {
276                stream_id,
277                seq,
278                frame,
279            } => {
280                self.on_dextra_voice_data(stream_id, seq, frame);
281                Ok(())
282            }
283            ClientPacket::VoiceEot { stream_id, .. } => {
284                self.on_dextra_voice_eot(stream_id);
285                Ok(())
286            }
287        }
288    }
289
290    fn on_dextra_link(
291        &mut self,
292        now: Instant,
293        callsign: Callsign,
294        reflector_module: Module,
295        client_module: Module,
296    ) -> Result<(), Error> {
297        if self.state != InternalState::Unknown && self.state != InternalState::Linked {
298            return Ok(());
299        }
300        self.client_callsign = Some(callsign);
301        self.client_module = Some(client_module);
302        self.state = InternalState::Linked;
303        self.events.push_back(RawServerEvent::Linked {
304            peer: self.peer,
305            callsign,
306            module: reflector_module,
307        });
308        // Enqueue a 14-byte ACK to send back.
309        let mut buf = [0u8; 32];
310        let n = encode_connect_ack(&mut buf, &callsign, reflector_module)
311            .map_err(|e| Error::Protocol(ProtocolError::DExtra(e.into())))?;
312        let payload = buf.get(..n).unwrap_or(&[]).to_vec();
313        self.outbox.enqueue(OutboundPacket {
314            dst: self.peer,
315            payload,
316            not_before: now,
317        });
318        Ok(())
319    }
320
321    fn on_dextra_unlink(&mut self, callsign: Callsign) {
322        if self.client_callsign != Some(callsign) {
323            return;
324        }
325        self.state = InternalState::Unlinking;
326        self.events
327            .push_back(RawServerEvent::Unlinked { peer: self.peer });
328        // Transition straight to Closed — we don't wait for our ACK
329        // to be sent. The fan-out engine will drop this session
330        // reference on the next tick.
331        self.state = InternalState::Closed;
332    }
333
334    fn on_dextra_poll(&mut self, now: Instant, callsign: Callsign) -> Result<(), Error> {
335        if self.client_callsign != Some(callsign) {
336            return Ok(());
337        }
338        // Echo the poll back.
339        let mut buf = [0u8; 16];
340        let n = encode_poll_echo(&mut buf, &callsign)
341            .map_err(|e| Error::Protocol(ProtocolError::DExtra(e.into())))?;
342        let payload = buf.get(..n).unwrap_or(&[]).to_vec();
343        self.outbox.enqueue(OutboundPacket {
344            dst: self.peer,
345            payload,
346            not_before: now,
347        });
348        Ok(())
349    }
350
351    fn on_dextra_voice_header(&mut self, stream_id: StreamId, header: DStarHeader) {
352        if self.state != InternalState::Linked {
353            return;
354        }
355        self.state = InternalState::Streaming;
356        self.events.push_back(RawServerEvent::StreamStarted {
357            peer: self.peer,
358            stream_id,
359            header,
360        });
361    }
362
363    fn on_dextra_voice_data(&mut self, stream_id: StreamId, seq: u8, frame: VoiceFrame) {
364        if self.state != InternalState::Streaming {
365            return;
366        }
367        self.events.push_back(RawServerEvent::StreamFrame {
368            peer: self.peer,
369            stream_id,
370            seq,
371            frame,
372        });
373    }
374
375    fn on_dextra_voice_eot(&mut self, stream_id: StreamId) {
376        if self.state != InternalState::Streaming {
377            return;
378        }
379        self.state = InternalState::Linked;
380        self.events.push_back(RawServerEvent::StreamEnded {
381            peer: self.peer,
382            stream_id,
383        });
384    }
385
386    // ─── DPlus server handshake ───────────────────────────────────
387
388    fn handle_dplus_input(&mut self, now: Instant, bytes: &[u8]) -> Result<(), Error> {
389        let packet = decode_dplus_client_to_server(bytes, &mut self.diagnostics)
390            .map_err(|e| Error::Protocol(ProtocolError::DPlus(e)))?;
391        match packet {
392            DPlusClientPacket::Link1 => self.on_dplus_link1(now),
393            DPlusClientPacket::Link2 { callsign } => self.on_dplus_link2(now, callsign),
394            DPlusClientPacket::Unlink => self.on_dplus_unlink(now),
395            DPlusClientPacket::Poll => self.on_dplus_poll(now),
396            DPlusClientPacket::VoiceHeader { stream_id, header } => {
397                self.on_dplus_voice_header(stream_id, header);
398                Ok(())
399            }
400            DPlusClientPacket::VoiceData {
401                stream_id,
402                seq,
403                frame,
404            } => {
405                self.on_dplus_voice_data(stream_id, seq, frame);
406                Ok(())
407            }
408            DPlusClientPacket::VoiceEot { stream_id, .. } => {
409                self.on_dplus_voice_eot(stream_id);
410                Ok(())
411            }
412        }
413    }
414
415    fn on_dplus_link1(&mut self, now: Instant) -> Result<(), Error> {
416        match self.state {
417            InternalState::Unknown => {
418                self.state = InternalState::Link1Received;
419            }
420            InternalState::Link1Received | InternalState::Linked => {
421                // Real clients retransmit LINK1 — just re-enqueue the
422                // ACK idempotently. If we're already Linked the client
423                // is badly lagged; echoing LINK1 is still the safest
424                // response.
425            }
426            InternalState::Streaming | InternalState::Unlinking | InternalState::Closed => {
427                // Ignore — can't drop back to LINK1 handshake.
428                return Ok(());
429            }
430        }
431        // Enqueue a 5-byte LINK1 ACK echo.
432        let mut buf = [0u8; 8];
433        let n = encode_link1_ack(&mut buf)
434            .map_err(|e| Error::Protocol(ProtocolError::DPlus(e.into())))?;
435        let payload = buf.get(..n).unwrap_or(&[]).to_vec();
436        self.outbox.enqueue(OutboundPacket {
437            dst: self.peer,
438            payload,
439            not_before: now,
440        });
441        Ok(())
442    }
443
444    fn on_dplus_link2(&mut self, now: Instant, callsign: Callsign) -> Result<(), Error> {
445        match self.state {
446            InternalState::Link1Received => {
447                self.client_callsign = Some(callsign);
448                self.client_module = Some(self.reflector_module);
449                self.state = InternalState::Linked;
450                self.events.push_back(RawServerEvent::Linked {
451                    peer: self.peer,
452                    callsign,
453                    module: self.reflector_module,
454                });
455                // Enqueue an 8-byte OKRW reply.
456                self.enqueue_dplus_link2_reply(now, Link2Result::Accept)?;
457                Ok(())
458            }
459            InternalState::Linked | InternalState::Streaming => {
460                // Already linked. If it's the same callsign, idempotent
461                // re-ACK. If it's a different callsign, return BUSY.
462                if self.client_callsign == Some(callsign) {
463                    self.enqueue_dplus_link2_reply(now, Link2Result::Accept)?;
464                } else {
465                    self.enqueue_dplus_link2_reply(now, Link2Result::Busy)?;
466                }
467                Ok(())
468            }
469            InternalState::Unknown | InternalState::Unlinking | InternalState::Closed => {
470                // LINK2 without LINK1 — drop silently. The real client
471                // will retransmit LINK1 on its own retry timer. Safest
472                // path: do nothing.
473                Ok(())
474            }
475        }
476    }
477
478    fn enqueue_dplus_link2_reply(
479        &mut self,
480        now: Instant,
481        result: Link2Result,
482    ) -> Result<(), Error> {
483        let mut buf = [0u8; 16];
484        let n = encode_link2_reply(&mut buf, result)
485            .map_err(|e| Error::Protocol(ProtocolError::DPlus(e.into())))?;
486        let payload = buf.get(..n).unwrap_or(&[]).to_vec();
487        self.outbox.enqueue(OutboundPacket {
488            dst: self.peer,
489            payload,
490            not_before: now,
491        });
492        Ok(())
493    }
494
495    fn on_dplus_unlink(&mut self, now: Instant) -> Result<(), Error> {
496        if !matches!(
497            self.state,
498            InternalState::Linked | InternalState::Streaming | InternalState::Link1Received
499        ) {
500            return Ok(());
501        }
502        // Enqueue the 5-byte UNLINK ACK echo.
503        let mut buf = [0u8; 8];
504        let n = encode_unlink_ack(&mut buf)
505            .map_err(|e| Error::Protocol(ProtocolError::DPlus(e.into())))?;
506        let payload = buf.get(..n).unwrap_or(&[]).to_vec();
507        self.outbox.enqueue(OutboundPacket {
508            dst: self.peer,
509            payload,
510            not_before: now,
511        });
512        self.events
513            .push_back(RawServerEvent::Unlinked { peer: self.peer });
514        self.state = InternalState::Closed;
515        Ok(())
516    }
517
518    fn on_dplus_poll(&mut self, now: Instant) -> Result<(), Error> {
519        if !matches!(self.state, InternalState::Linked | InternalState::Streaming) {
520            return Ok(());
521        }
522        let mut buf = [0u8; 8];
523        let n = dplus_encode_poll_echo(&mut buf)
524            .map_err(|e| Error::Protocol(ProtocolError::DPlus(e.into())))?;
525        let payload = buf.get(..n).unwrap_or(&[]).to_vec();
526        self.outbox.enqueue(OutboundPacket {
527            dst: self.peer,
528            payload,
529            not_before: now,
530        });
531        Ok(())
532    }
533
534    fn on_dplus_voice_header(&mut self, stream_id: StreamId, header: DStarHeader) {
535        if self.state != InternalState::Linked {
536            return;
537        }
538        self.state = InternalState::Streaming;
539        self.last_stream_id = Some(stream_id);
540        self.events.push_back(RawServerEvent::StreamStarted {
541            peer: self.peer,
542            stream_id,
543            header,
544        });
545    }
546
547    fn on_dplus_voice_data(&mut self, stream_id: StreamId, seq: u8, frame: VoiceFrame) {
548        if self.state != InternalState::Streaming {
549            return;
550        }
551        self.events.push_back(RawServerEvent::StreamFrame {
552            peer: self.peer,
553            stream_id,
554            seq,
555            frame,
556        });
557    }
558
559    fn on_dplus_voice_eot(&mut self, stream_id: StreamId) {
560        if self.state != InternalState::Streaming {
561            return;
562        }
563        self.state = InternalState::Linked;
564        self.last_stream_id = None;
565        self.events.push_back(RawServerEvent::StreamEnded {
566            peer: self.peer,
567            stream_id,
568        });
569    }
570
571    // ─── DCS server handshake ─────────────────────────────────────
572
573    fn handle_dcs_input(&mut self, bytes: &[u8]) -> Result<(), Error> {
574        let packet = decode_dcs_client_to_server(bytes, &mut self.diagnostics)
575            .map_err(|e| Error::Protocol(ProtocolError::Dcs(e)))?;
576        match packet {
577            DcsClientPacket::Link {
578                callsign,
579                client_module,
580                reflector_module,
581                ..
582            } => self.on_dcs_link(callsign, client_module, reflector_module),
583            DcsClientPacket::Unlink { callsign, .. } => {
584                self.on_dcs_unlink(callsign);
585                Ok(())
586            }
587            DcsClientPacket::Poll {
588                callsign,
589                reflector_callsign,
590            } => self.on_dcs_poll(callsign, reflector_callsign),
591            DcsClientPacket::Voice {
592                header,
593                stream_id,
594                seq,
595                frame,
596                is_end,
597            } => {
598                self.on_dcs_voice(header, stream_id, seq, frame, is_end);
599                Ok(())
600            }
601        }
602    }
603
604    fn on_dcs_link(
605        &mut self,
606        callsign: Callsign,
607        client_module: Module,
608        reflector_module: Module,
609    ) -> Result<(), Error> {
610        if !matches!(
611            self.state,
612            InternalState::Unknown | InternalState::Linked | InternalState::Streaming
613        ) {
614            return Ok(());
615        }
616        self.client_callsign = Some(callsign);
617        self.client_module = Some(client_module);
618        if self.state != InternalState::Streaming {
619            self.state = InternalState::Linked;
620        }
621        self.events.push_back(RawServerEvent::Linked {
622            peer: self.peer,
623            callsign,
624            module: reflector_module,
625        });
626        let mut buf = [0u8; 32];
627        let n = dcs_codec::encode_connect_ack(&mut buf, &callsign, reflector_module)
628            .map_err(|e| Error::Protocol(ProtocolError::Dcs(e.into())))?;
629        let payload = buf.get(..n).unwrap_or(&[]).to_vec();
630        // DCS ACK is enqueued without a rate-limit delay — the caller
631        // drives `now` through `pop_transmit`.
632        self.outbox.enqueue(OutboundPacket {
633            dst: self.peer,
634            payload,
635            not_before: Instant::now(),
636        });
637        Ok(())
638    }
639
640    fn on_dcs_unlink(&mut self, callsign: Callsign) {
641        if self.client_callsign != Some(callsign) {
642            return;
643        }
644        self.events
645            .push_back(RawServerEvent::Unlinked { peer: self.peer });
646        self.state = InternalState::Closed;
647    }
648
649    fn on_dcs_poll(
650        &mut self,
651        callsign: Callsign,
652        reflector_callsign: Callsign,
653    ) -> Result<(), Error> {
654        if !matches!(self.state, InternalState::Linked | InternalState::Streaming) {
655            return Ok(());
656        }
657        if self.client_callsign != Some(callsign) {
658            return Ok(());
659        }
660        let mut buf = [0u8; 32];
661        let n = dcs_codec::encode_poll_reply(&mut buf, &callsign, &reflector_callsign)
662            .map_err(|e| Error::Protocol(ProtocolError::Dcs(e.into())))?;
663        let payload = buf.get(..n).unwrap_or(&[]).to_vec();
664        self.outbox.enqueue(OutboundPacket {
665            dst: self.peer,
666            payload,
667            not_before: Instant::now(),
668        });
669        Ok(())
670    }
671
672    fn on_dcs_voice(
673        &mut self,
674        header: DStarHeader,
675        stream_id: StreamId,
676        seq: u8,
677        frame: VoiceFrame,
678        is_end: bool,
679    ) {
680        if !matches!(self.state, InternalState::Linked | InternalState::Streaming) {
681            return;
682        }
683        // DCS embeds the header in every voice packet — detect
684        // stream-start by observing a new stream id.
685        let is_new_stream = self.last_stream_id != Some(stream_id);
686        if is_new_stream {
687            self.state = InternalState::Streaming;
688            self.last_stream_id = Some(stream_id);
689            self.events.push_back(RawServerEvent::StreamStarted {
690                peer: self.peer,
691                stream_id,
692                header,
693            });
694        }
695        self.events.push_back(RawServerEvent::StreamFrame {
696            peer: self.peer,
697            stream_id,
698            seq,
699            frame,
700        });
701        if is_end {
702            self.state = InternalState::Linked;
703            self.last_stream_id = None;
704            self.events.push_back(RawServerEvent::StreamEnded {
705                peer: self.peer,
706                stream_id,
707            });
708        }
709    }
710
711    /// Pop the next outbound packet (from the outbox).
712    ///
713    /// Holds the popped packet in `current_tx` so the returned
714    /// [`Transmit`] can borrow from it across calls.
715    #[must_use]
716    pub fn pop_transmit(&mut self, now: Instant) -> Option<Transmit<'_>> {
717        let next = self.outbox.pop_ready(now)?;
718        self.current_tx = Some(next);
719        let held = self.current_tx.as_ref()?;
720        Some(Transmit::new(held.dst, held.payload.as_slice()))
721    }
722
723    /// Drain the next event, typed with the correct protocol marker.
724    ///
725    /// The `P` type parameter re-attaches the protocol phantom at
726    /// drain time — the event queue itself is protocol-erased.
727    pub fn pop_event<P: Protocol>(&mut self) -> Option<ServerEvent<P>> {
728        let raw = self.events.pop_front()?;
729        Some(match raw {
730            RawServerEvent::Linked {
731                peer,
732                callsign,
733                module,
734            } => ServerEvent::ClientLinked {
735                peer,
736                callsign,
737                module,
738            },
739            RawServerEvent::Unlinked { peer } => ServerEvent::ClientUnlinked { peer },
740            RawServerEvent::StreamStarted {
741                peer,
742                stream_id,
743                header,
744            } => ServerEvent::ClientStreamStarted {
745                peer,
746                stream_id,
747                header,
748            },
749            RawServerEvent::StreamFrame {
750                peer,
751                stream_id,
752                seq,
753                frame,
754            } => ServerEvent::ClientStreamFrame {
755                peer,
756                stream_id,
757                seq,
758                frame,
759            },
760            RawServerEvent::StreamEnded { peer, stream_id } => {
761                ServerEvent::ClientStreamEnded { peer, stream_id }
762            }
763        })
764    }
765
766    /// Earliest time the loop needs to re-enter.
767    #[must_use]
768    pub fn next_deadline(&self) -> Option<Instant> {
769        self.outbox.peek_next_deadline()
770    }
771
772    /// Drain accumulated parser diagnostics.
773    pub fn drain_diagnostics(&mut self) -> Vec<crate::validator::Diagnostic> {
774        self.diagnostics.drain().collect()
775    }
776}
777
778#[cfg(test)]
779mod tests {
780    use super::{ProtocolKind, ServerEvent, ServerSessionCore, ServerStateKind};
781    use crate::codec::dcs::{
782        encode_connect_link as dcs_encode_link, encode_connect_unlink as dcs_encode_unlink,
783        encode_poll_request as dcs_encode_poll, encode_voice as dcs_encode_voice,
784    };
785    use crate::codec::dextra::{encode_connect_link, encode_poll, encode_unlink};
786    use crate::codec::dplus::{
787        encode_link1, encode_link2, encode_poll as dplus_encode_poll,
788        encode_unlink as dplus_encode_unlink,
789    };
790    use crate::header::DStarHeader;
791    use crate::session::client::{DExtra, DPlus, Dcs};
792    use crate::types::{Callsign, Module, StreamId, Suffix};
793    use crate::voice::VoiceFrame;
794    use std::net::{IpAddr, Ipv4Addr, SocketAddr};
795    use std::time::Instant;
796
797    type TestResult = Result<(), Box<dyn std::error::Error>>;
798
799    const PEER: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 30001);
800    const DPLUS_PEER: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 20001);
801    const DCS_PEER: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 30051);
802
803    const fn cs(bytes: [u8; 8]) -> Callsign {
804        Callsign::from_wire_bytes(bytes)
805    }
806
807    const fn test_header(my: [u8; 8]) -> DStarHeader {
808        DStarHeader {
809            flag1: 0,
810            flag2: 0,
811            flag3: 0,
812            rpt2: Callsign::from_wire_bytes(*b"REF030 G"),
813            rpt1: Callsign::from_wire_bytes(*b"REF030 C"),
814            ur_call: Callsign::from_wire_bytes(*b"CQCQCQ  "),
815            my_call: Callsign::from_wire_bytes(my),
816            my_suffix: Suffix::EMPTY,
817        }
818    }
819
820    #[expect(clippy::unwrap_used, reason = "const-validated: n is non-zero")]
821    const fn sid(n: u16) -> StreamId {
822        StreamId::new(n).unwrap()
823    }
824
825    // ─── DExtra tests ────────────────────────────────────────────
826
827    #[test]
828    fn dextra_starts_in_unknown_state() {
829        let core = ServerSessionCore::new(ProtocolKind::DExtra, PEER, Module::C);
830        assert_eq!(core.state_kind(), ServerStateKind::Unknown);
831        assert_eq!(core.peer(), PEER);
832        assert_eq!(core.protocol_kind(), ProtocolKind::DExtra);
833        assert_eq!(core.reflector_module(), Module::C);
834        assert!(core.client_callsign().is_none());
835        assert!(core.client_module().is_none());
836    }
837
838    #[test]
839    fn dextra_link_transitions_to_linked() -> TestResult {
840        let mut core = ServerSessionCore::new(ProtocolKind::DExtra, PEER, Module::C);
841        assert_eq!(core.state_kind(), ServerStateKind::Unknown);
842
843        let mut buf = [0u8; 16];
844        let n = encode_connect_link(&mut buf, &cs(*b"W1AW    "), Module::C, Module::B)?;
845        let slice = buf.get(..n).ok_or("n bytes")?;
846        core.handle_input(Instant::now(), slice)?;
847
848        assert_eq!(core.state_kind(), ServerStateKind::Linked);
849        assert_eq!(core.client_callsign(), Some(cs(*b"W1AW    ")));
850        assert_eq!(core.client_module(), Some(Module::B));
851
852        let event: Option<ServerEvent<DExtra>> = core.pop_event();
853        assert!(matches!(event, Some(ServerEvent::ClientLinked { .. })));
854        Ok(())
855    }
856
857    #[test]
858    fn dextra_link_enqueues_ack() -> TestResult {
859        let mut core = ServerSessionCore::new(ProtocolKind::DExtra, PEER, Module::C);
860        let mut buf = [0u8; 16];
861        let n = encode_connect_link(&mut buf, &cs(*b"W1AW    "), Module::C, Module::B)?;
862        let slice = buf.get(..n).ok_or("n bytes")?;
863        core.handle_input(Instant::now(), slice)?;
864
865        let tx = core.pop_transmit(Instant::now()).ok_or("tx")?;
866        assert_eq!(tx.payload.len(), 14, "DExtra ACK is 14 bytes");
867        assert_eq!(tx.dst, PEER);
868        assert_eq!(tx.payload.get(10..13), Some(b"ACK".as_slice()));
869        assert_eq!(tx.payload.get(13), Some(&0x00));
870        Ok(())
871    }
872
873    #[test]
874    fn dextra_poll_from_linked_client_is_echoed() -> TestResult {
875        let mut core = ServerSessionCore::new(ProtocolKind::DExtra, PEER, Module::C);
876        let mut link_buf = [0u8; 16];
877        let n = encode_connect_link(&mut link_buf, &cs(*b"W1AW    "), Module::C, Module::B)?;
878        let slice = link_buf.get(..n).ok_or("n bytes")?;
879        core.handle_input(Instant::now(), slice)?;
880        let _ack = core.pop_transmit(Instant::now()).ok_or("ack")?;
881        let _event: Option<ServerEvent<DExtra>> = core.pop_event();
882
883        let mut poll_buf = [0u8; 16];
884        let n = encode_poll(&mut poll_buf, &cs(*b"W1AW    "))?;
885        let slice = poll_buf.get(..n).ok_or("n bytes")?;
886        core.handle_input(Instant::now(), slice)?;
887
888        let tx = core.pop_transmit(Instant::now()).ok_or("echo")?;
889        assert_eq!(tx.payload.len(), 9, "DExtra poll echo is 9 bytes");
890        assert_eq!(tx.dst, PEER);
891        Ok(())
892    }
893
894    #[test]
895    fn dextra_poll_from_unknown_callsign_is_ignored() -> TestResult {
896        let mut core = ServerSessionCore::new(ProtocolKind::DExtra, PEER, Module::C);
897        let mut link_buf = [0u8; 16];
898        let n = encode_connect_link(&mut link_buf, &cs(*b"W1AW    "), Module::C, Module::B)?;
899        let slice = link_buf.get(..n).ok_or("n bytes")?;
900        core.handle_input(Instant::now(), slice)?;
901        let _ack = core.pop_transmit(Instant::now()).ok_or("ack")?;
902        let _event: Option<ServerEvent<DExtra>> = core.pop_event();
903
904        let mut poll_buf = [0u8; 16];
905        let n = encode_poll(&mut poll_buf, &cs(*b"N0CALL  "))?;
906        let slice = poll_buf.get(..n).ok_or("n bytes")?;
907        core.handle_input(Instant::now(), slice)?;
908
909        assert!(
910            core.pop_transmit(Instant::now()).is_none(),
911            "poll from wrong callsign ignored"
912        );
913        Ok(())
914    }
915
916    #[test]
917    fn dextra_unlink_transitions_to_closed() -> TestResult {
918        let mut core = ServerSessionCore::new(ProtocolKind::DExtra, PEER, Module::C);
919        let mut link_buf = [0u8; 16];
920        let n = encode_connect_link(&mut link_buf, &cs(*b"W1AW    "), Module::C, Module::B)?;
921        let slice = link_buf.get(..n).ok_or("n bytes")?;
922        core.handle_input(Instant::now(), slice)?;
923        let _ack = core.pop_transmit(Instant::now()).ok_or("ack")?;
924        let _event: Option<ServerEvent<DExtra>> = core.pop_event();
925
926        let mut ulink_buf = [0u8; 16];
927        let n = encode_unlink(&mut ulink_buf, &cs(*b"W1AW    "), Module::B)?;
928        let slice = ulink_buf.get(..n).ok_or("n bytes")?;
929        core.handle_input(Instant::now(), slice)?;
930
931        assert_eq!(core.state_kind(), ServerStateKind::Closed);
932        let event: Option<ServerEvent<DExtra>> = core.pop_event();
933        assert!(matches!(event, Some(ServerEvent::ClientUnlinked { .. })));
934        Ok(())
935    }
936
937    #[test]
938    fn dextra_unlink_from_wrong_callsign_is_ignored() -> TestResult {
939        let mut core = ServerSessionCore::new(ProtocolKind::DExtra, PEER, Module::C);
940        let mut link_buf = [0u8; 16];
941        let n = encode_connect_link(&mut link_buf, &cs(*b"W1AW    "), Module::C, Module::B)?;
942        let slice = link_buf.get(..n).ok_or("n bytes")?;
943        core.handle_input(Instant::now(), slice)?;
944        let _ack = core.pop_transmit(Instant::now()).ok_or("ack")?;
945        let _event: Option<ServerEvent<DExtra>> = core.pop_event();
946
947        let mut ulink_buf = [0u8; 16];
948        let n = encode_unlink(&mut ulink_buf, &cs(*b"N0CALL  "), Module::B)?;
949        let slice = ulink_buf.get(..n).ok_or("n bytes")?;
950        core.handle_input(Instant::now(), slice)?;
951
952        assert_eq!(
953            core.state_kind(),
954            ServerStateKind::Linked,
955            "state unchanged when wrong callsign tries to unlink"
956        );
957        Ok(())
958    }
959
960    // ─── DPlus tests ─────────────────────────────────────────────
961
962    #[test]
963    fn dplus_link1_transitions_to_link1_received() -> TestResult {
964        let mut core = ServerSessionCore::new(ProtocolKind::DPlus, DPLUS_PEER, Module::C);
965        let mut buf = [0u8; 16];
966        let n = encode_link1(&mut buf)?;
967        core.handle_input(Instant::now(), buf.get(..n).ok_or("bytes")?)?;
968        assert_eq!(core.state_kind(), ServerStateKind::Unknown);
969        let tx = core.pop_transmit(Instant::now()).ok_or("ack")?;
970        assert_eq!(tx.payload.len(), 5, "DPlus LINK1 ACK is 5 bytes");
971        assert_eq!(tx.payload, &[0x05, 0x00, 0x18, 0x00, 0x01]);
972        Ok(())
973    }
974
975    #[test]
976    fn dplus_link2_after_link1_transitions_to_linked_and_accepts() -> TestResult {
977        let mut core = ServerSessionCore::new(ProtocolKind::DPlus, DPLUS_PEER, Module::C);
978        let mut buf1 = [0u8; 16];
979        let n = encode_link1(&mut buf1)?;
980        core.handle_input(Instant::now(), buf1.get(..n).ok_or("bytes")?)?;
981        let _ack1 = core.pop_transmit(Instant::now()).ok_or("ack1")?;
982
983        let mut buf2 = [0u8; 32];
984        let n = encode_link2(&mut buf2, &cs(*b"W1AW    "))?;
985        core.handle_input(Instant::now(), buf2.get(..n).ok_or("bytes")?)?;
986
987        assert_eq!(core.state_kind(), ServerStateKind::Linked);
988        assert_eq!(core.client_callsign(), Some(cs(*b"W1AW    ")));
989        assert_eq!(core.client_module(), Some(Module::C));
990
991        let tx = core.pop_transmit(Instant::now()).ok_or("okrw")?;
992        assert_eq!(tx.payload.len(), 8, "DPlus LINK2 reply is 8 bytes");
993        assert_eq!(tx.payload.get(4..8), Some(b"OKRW".as_slice()));
994
995        let event: Option<ServerEvent<DPlus>> = core.pop_event();
996        assert!(matches!(event, Some(ServerEvent::ClientLinked { .. })));
997        Ok(())
998    }
999
1000    #[test]
1001    fn dplus_link2_without_link1_is_ignored() -> TestResult {
1002        let mut core = ServerSessionCore::new(ProtocolKind::DPlus, DPLUS_PEER, Module::C);
1003        let mut buf = [0u8; 32];
1004        let n = encode_link2(&mut buf, &cs(*b"W1AW    "))?;
1005        core.handle_input(Instant::now(), buf.get(..n).ok_or("bytes")?)?;
1006        assert_eq!(core.state_kind(), ServerStateKind::Unknown);
1007        assert!(core.pop_transmit(Instant::now()).is_none());
1008        Ok(())
1009    }
1010
1011    #[test]
1012    fn dplus_already_linked_link2_from_different_callsign_returns_busy() -> TestResult {
1013        let mut core = ServerSessionCore::new(ProtocolKind::DPlus, DPLUS_PEER, Module::C);
1014        let mut buf1 = [0u8; 16];
1015        let n = encode_link1(&mut buf1)?;
1016        core.handle_input(Instant::now(), buf1.get(..n).ok_or("bytes")?)?;
1017        let _ack1 = core.pop_transmit(Instant::now()).ok_or("ack1")?;
1018        let mut buf2 = [0u8; 32];
1019        let n = encode_link2(&mut buf2, &cs(*b"W1AW    "))?;
1020        core.handle_input(Instant::now(), buf2.get(..n).ok_or("bytes")?)?;
1021        let _ok = core.pop_transmit(Instant::now()).ok_or("okrw")?;
1022        let _ev: Option<ServerEvent<DPlus>> = core.pop_event();
1023
1024        let mut buf3 = [0u8; 32];
1025        let n = encode_link2(&mut buf3, &cs(*b"N0CALL  "))?;
1026        core.handle_input(Instant::now(), buf3.get(..n).ok_or("bytes")?)?;
1027
1028        let tx = core.pop_transmit(Instant::now()).ok_or("busy")?;
1029        assert_eq!(tx.payload.len(), 8);
1030        assert_eq!(tx.payload.get(4..8), Some(b"BUSY".as_slice()));
1031        assert_eq!(core.state_kind(), ServerStateKind::Linked);
1032        assert_eq!(core.client_callsign(), Some(cs(*b"W1AW    ")));
1033        Ok(())
1034    }
1035
1036    #[test]
1037    fn dplus_voice_header_after_link2_transitions_to_streaming() -> TestResult {
1038        let mut core = ServerSessionCore::new(ProtocolKind::DPlus, DPLUS_PEER, Module::C);
1039        let mut buf1 = [0u8; 16];
1040        let n = encode_link1(&mut buf1)?;
1041        core.handle_input(Instant::now(), buf1.get(..n).ok_or("bytes")?)?;
1042        let _a1 = core.pop_transmit(Instant::now());
1043        let mut buf2 = [0u8; 32];
1044        let n = encode_link2(&mut buf2, &cs(*b"W1AW    "))?;
1045        core.handle_input(Instant::now(), buf2.get(..n).ok_or("bytes")?)?;
1046        let _a2 = core.pop_transmit(Instant::now());
1047        let _ev: Option<ServerEvent<DPlus>> = core.pop_event();
1048
1049        let mut hdr_buf = [0u8; 128];
1050        let n = crate::codec::dplus::encode_voice_header(
1051            &mut hdr_buf,
1052            sid(0xCAFE),
1053            &test_header(*b"W1AW    "),
1054        )?;
1055        core.handle_input(Instant::now(), hdr_buf.get(..n).ok_or("bytes")?)?;
1056
1057        assert_eq!(core.state_kind(), ServerStateKind::Streaming);
1058        let event: Option<ServerEvent<DPlus>> = core.pop_event();
1059        assert!(matches!(
1060            event,
1061            Some(ServerEvent::ClientStreamStarted { .. })
1062        ));
1063        Ok(())
1064    }
1065
1066    #[test]
1067    fn dplus_voice_data_during_streaming_emits_frame_event() -> TestResult {
1068        let mut core = ServerSessionCore::new(ProtocolKind::DPlus, DPLUS_PEER, Module::C);
1069        let mut buf1 = [0u8; 16];
1070        let n = encode_link1(&mut buf1)?;
1071        core.handle_input(Instant::now(), buf1.get(..n).ok_or("bytes")?)?;
1072        let _ = core.pop_transmit(Instant::now());
1073        let mut buf2 = [0u8; 32];
1074        let n = encode_link2(&mut buf2, &cs(*b"W1AW    "))?;
1075        core.handle_input(Instant::now(), buf2.get(..n).ok_or("bytes")?)?;
1076        let _ = core.pop_transmit(Instant::now());
1077        let _ev: Option<ServerEvent<DPlus>> = core.pop_event();
1078        let mut hdr_buf = [0u8; 128];
1079        let n = crate::codec::dplus::encode_voice_header(
1080            &mut hdr_buf,
1081            sid(0xCAFE),
1082            &test_header(*b"W1AW    "),
1083        )?;
1084        core.handle_input(Instant::now(), hdr_buf.get(..n).ok_or("bytes")?)?;
1085        let _ev: Option<ServerEvent<DPlus>> = core.pop_event();
1086
1087        let frame = VoiceFrame::silence();
1088        let mut data_buf = [0u8; 128];
1089        let n = crate::codec::dplus::encode_voice_data(&mut data_buf, sid(0xCAFE), 3, &frame)?;
1090        core.handle_input(Instant::now(), data_buf.get(..n).ok_or("bytes")?)?;
1091
1092        let event: Option<ServerEvent<DPlus>> = core.pop_event();
1093        assert!(matches!(
1094            event,
1095            Some(ServerEvent::ClientStreamFrame { stream_id, seq, .. })
1096            if stream_id == sid(0xCAFE) && seq == 3
1097        ));
1098        Ok(())
1099    }
1100
1101    #[test]
1102    fn dplus_voice_eot_returns_to_linked() -> TestResult {
1103        let mut core = ServerSessionCore::new(ProtocolKind::DPlus, DPLUS_PEER, Module::C);
1104        let mut buf1 = [0u8; 16];
1105        let n = encode_link1(&mut buf1)?;
1106        core.handle_input(Instant::now(), buf1.get(..n).ok_or("bytes")?)?;
1107        let _ = core.pop_transmit(Instant::now());
1108        let mut buf2 = [0u8; 32];
1109        let n = encode_link2(&mut buf2, &cs(*b"W1AW    "))?;
1110        core.handle_input(Instant::now(), buf2.get(..n).ok_or("bytes")?)?;
1111        let _ = core.pop_transmit(Instant::now());
1112        let _ev: Option<ServerEvent<DPlus>> = core.pop_event();
1113        let mut hdr_buf = [0u8; 128];
1114        let n = crate::codec::dplus::encode_voice_header(
1115            &mut hdr_buf,
1116            sid(0xCAFE),
1117            &test_header(*b"W1AW    "),
1118        )?;
1119        core.handle_input(Instant::now(), hdr_buf.get(..n).ok_or("bytes")?)?;
1120        let _ev: Option<ServerEvent<DPlus>> = core.pop_event();
1121
1122        let mut eot_buf = [0u8; 128];
1123        let n = crate::codec::dplus::encode_voice_eot(&mut eot_buf, sid(0xCAFE), 20)?;
1124        core.handle_input(Instant::now(), eot_buf.get(..n).ok_or("bytes")?)?;
1125
1126        assert_eq!(core.state_kind(), ServerStateKind::Linked);
1127        let event: Option<ServerEvent<DPlus>> = core.pop_event();
1128        assert!(matches!(event, Some(ServerEvent::ClientStreamEnded { .. })));
1129        Ok(())
1130    }
1131
1132    #[test]
1133    fn dplus_unlink_transitions_to_closed_and_acks() -> TestResult {
1134        let mut core = ServerSessionCore::new(ProtocolKind::DPlus, DPLUS_PEER, Module::C);
1135        let mut buf1 = [0u8; 16];
1136        let n = encode_link1(&mut buf1)?;
1137        core.handle_input(Instant::now(), buf1.get(..n).ok_or("bytes")?)?;
1138        let _ = core.pop_transmit(Instant::now());
1139        let mut buf2 = [0u8; 32];
1140        let n = encode_link2(&mut buf2, &cs(*b"W1AW    "))?;
1141        core.handle_input(Instant::now(), buf2.get(..n).ok_or("bytes")?)?;
1142        let _ = core.pop_transmit(Instant::now());
1143        let _ev: Option<ServerEvent<DPlus>> = core.pop_event();
1144
1145        let mut ub = [0u8; 16];
1146        let n = dplus_encode_unlink(&mut ub)?;
1147        core.handle_input(Instant::now(), ub.get(..n).ok_or("bytes")?)?;
1148
1149        assert_eq!(core.state_kind(), ServerStateKind::Closed);
1150        let tx = core.pop_transmit(Instant::now()).ok_or("ack")?;
1151        assert_eq!(tx.payload.len(), 5);
1152        assert_eq!(tx.payload.get(4), Some(&0x00));
1153        let event: Option<ServerEvent<DPlus>> = core.pop_event();
1154        assert!(matches!(event, Some(ServerEvent::ClientUnlinked { .. })));
1155        Ok(())
1156    }
1157
1158    #[test]
1159    fn dplus_poll_emits_echo() -> TestResult {
1160        let mut core = ServerSessionCore::new(ProtocolKind::DPlus, DPLUS_PEER, Module::C);
1161        let mut buf1 = [0u8; 16];
1162        let n = encode_link1(&mut buf1)?;
1163        core.handle_input(Instant::now(), buf1.get(..n).ok_or("bytes")?)?;
1164        let _ = core.pop_transmit(Instant::now());
1165        let mut buf2 = [0u8; 32];
1166        let n = encode_link2(&mut buf2, &cs(*b"W1AW    "))?;
1167        core.handle_input(Instant::now(), buf2.get(..n).ok_or("bytes")?)?;
1168        let _ = core.pop_transmit(Instant::now());
1169        let _ev: Option<ServerEvent<DPlus>> = core.pop_event();
1170
1171        let mut pb = [0u8; 16];
1172        let n = dplus_encode_poll(&mut pb)?;
1173        core.handle_input(Instant::now(), pb.get(..n).ok_or("bytes")?)?;
1174
1175        let tx = core.pop_transmit(Instant::now()).ok_or("echo")?;
1176        assert_eq!(tx.payload.len(), 3, "DPlus poll echo is 3 bytes");
1177        assert_eq!(tx.payload, &[0x03, 0x60, 0x00]);
1178        Ok(())
1179    }
1180
1181    // ─── DCS tests ───────────────────────────────────────────────
1182
1183    #[test]
1184    fn dcs_link_transitions_to_linked_and_acks() -> TestResult {
1185        let mut core = ServerSessionCore::new(ProtocolKind::Dcs, DCS_PEER, Module::C);
1186        let mut buf = [0u8; 520];
1187        let n = dcs_encode_link(
1188            &mut buf,
1189            &cs(*b"W1AW    "),
1190            Module::B,
1191            Module::C,
1192            &cs(*b"DCS030  "),
1193            crate::codec::dcs::GatewayType::Repeater,
1194        )?;
1195        core.handle_input(Instant::now(), buf.get(..n).ok_or("bytes")?)?;
1196
1197        assert_eq!(core.state_kind(), ServerStateKind::Linked);
1198        assert_eq!(core.client_callsign(), Some(cs(*b"W1AW    ")));
1199        assert_eq!(core.client_module(), Some(Module::B));
1200
1201        let tx = core.pop_transmit(Instant::now()).ok_or("ack")?;
1202        assert_eq!(tx.payload.len(), 14, "DCS ACK is 14 bytes");
1203        assert_eq!(tx.payload.get(10..13), Some(b"ACK".as_slice()));
1204
1205        let event: Option<ServerEvent<Dcs>> = core.pop_event();
1206        assert!(matches!(event, Some(ServerEvent::ClientLinked { .. })));
1207        Ok(())
1208    }
1209
1210    /// Non-silence frame: the DCS decoder flags EOT when `slow_data`
1211    /// is `[0x55, 0x55, 0x55]` (the D-STAR sync pattern), which is
1212    /// what `VoiceFrame::silence` returns. Mid-stream tests use this
1213    /// constructor to avoid triggering the EOT heuristic.
1214    fn non_eot_frame() -> VoiceFrame {
1215        VoiceFrame {
1216            ambe: [0x11; 9],
1217            slow_data: [0x22; 3],
1218        }
1219    }
1220
1221    #[test]
1222    fn dcs_voice_from_linked_state_starts_stream() -> TestResult {
1223        let mut core = ServerSessionCore::new(ProtocolKind::Dcs, DCS_PEER, Module::C);
1224        let mut lb = [0u8; 520];
1225        let n = dcs_encode_link(
1226            &mut lb,
1227            &cs(*b"W1AW    "),
1228            Module::B,
1229            Module::C,
1230            &cs(*b"DCS030  "),
1231            crate::codec::dcs::GatewayType::Repeater,
1232        )?;
1233        core.handle_input(Instant::now(), lb.get(..n).ok_or("bytes")?)?;
1234        let _ack = core.pop_transmit(Instant::now());
1235        let _ev: Option<ServerEvent<Dcs>> = core.pop_event();
1236
1237        let frame = non_eot_frame();
1238        let mut vb = [0u8; 128];
1239        let n = dcs_encode_voice(
1240            &mut vb,
1241            &test_header(*b"W1AW    "),
1242            sid(0xBEEF),
1243            0,
1244            &frame,
1245            false,
1246        )?;
1247        core.handle_input(Instant::now(), vb.get(..n).ok_or("bytes")?)?;
1248
1249        assert_eq!(core.state_kind(), ServerStateKind::Streaming);
1250        let ev1: Option<ServerEvent<Dcs>> = core.pop_event();
1251        assert!(matches!(ev1, Some(ServerEvent::ClientStreamStarted { .. })));
1252        let ev2: Option<ServerEvent<Dcs>> = core.pop_event();
1253        assert!(matches!(ev2, Some(ServerEvent::ClientStreamFrame { .. })));
1254        Ok(())
1255    }
1256
1257    #[test]
1258    fn dcs_voice_mid_stream_emits_frame() -> TestResult {
1259        let mut core = ServerSessionCore::new(ProtocolKind::Dcs, DCS_PEER, Module::C);
1260        let mut lb = [0u8; 520];
1261        let n = dcs_encode_link(
1262            &mut lb,
1263            &cs(*b"W1AW    "),
1264            Module::B,
1265            Module::C,
1266            &cs(*b"DCS030  "),
1267            crate::codec::dcs::GatewayType::Repeater,
1268        )?;
1269        core.handle_input(Instant::now(), lb.get(..n).ok_or("bytes")?)?;
1270        let _a = core.pop_transmit(Instant::now());
1271        let _ev: Option<ServerEvent<Dcs>> = core.pop_event();
1272
1273        let frame = non_eot_frame();
1274        let mut vb = [0u8; 128];
1275        let n = dcs_encode_voice(
1276            &mut vb,
1277            &test_header(*b"W1AW    "),
1278            sid(0xBEEF),
1279            0,
1280            &frame,
1281            false,
1282        )?;
1283        core.handle_input(Instant::now(), vb.get(..n).ok_or("bytes")?)?;
1284        let _ev: Option<ServerEvent<Dcs>> = core.pop_event();
1285        let _ev: Option<ServerEvent<Dcs>> = core.pop_event();
1286
1287        let mut vb2 = [0u8; 128];
1288        let n = dcs_encode_voice(
1289            &mut vb2,
1290            &test_header(*b"W1AW    "),
1291            sid(0xBEEF),
1292            1,
1293            &frame,
1294            false,
1295        )?;
1296        core.handle_input(Instant::now(), vb2.get(..n).ok_or("bytes")?)?;
1297
1298        let ev: Option<ServerEvent<Dcs>> = core.pop_event();
1299        assert!(matches!(
1300            ev,
1301            Some(ServerEvent::ClientStreamFrame { seq: 1, .. })
1302        ));
1303        assert!(core.pop_event::<Dcs>().is_none());
1304        Ok(())
1305    }
1306
1307    #[test]
1308    fn dcs_voice_with_is_end_transitions_back_to_linked() -> TestResult {
1309        let mut core = ServerSessionCore::new(ProtocolKind::Dcs, DCS_PEER, Module::C);
1310        let mut lb = [0u8; 520];
1311        let n = dcs_encode_link(
1312            &mut lb,
1313            &cs(*b"W1AW    "),
1314            Module::B,
1315            Module::C,
1316            &cs(*b"DCS030  "),
1317            crate::codec::dcs::GatewayType::Repeater,
1318        )?;
1319        core.handle_input(Instant::now(), lb.get(..n).ok_or("bytes")?)?;
1320        let _ = core.pop_transmit(Instant::now());
1321        let _ev: Option<ServerEvent<Dcs>> = core.pop_event();
1322
1323        let frame = VoiceFrame::silence();
1324        let mut vb = [0u8; 128];
1325        let n = dcs_encode_voice(
1326            &mut vb,
1327            &test_header(*b"W1AW    "),
1328            sid(0xBEEF),
1329            5,
1330            &frame,
1331            true,
1332        )?;
1333        core.handle_input(Instant::now(), vb.get(..n).ok_or("bytes")?)?;
1334
1335        assert_eq!(core.state_kind(), ServerStateKind::Linked);
1336        let _a: Option<ServerEvent<Dcs>> = core.pop_event();
1337        let _b: Option<ServerEvent<Dcs>> = core.pop_event();
1338        let end: Option<ServerEvent<Dcs>> = core.pop_event();
1339        assert!(matches!(end, Some(ServerEvent::ClientStreamEnded { .. })));
1340        Ok(())
1341    }
1342
1343    #[test]
1344    fn dcs_poll_emits_echo() -> TestResult {
1345        let mut core = ServerSessionCore::new(ProtocolKind::Dcs, DCS_PEER, Module::C);
1346        let mut lb = [0u8; 520];
1347        let n = dcs_encode_link(
1348            &mut lb,
1349            &cs(*b"W1AW    "),
1350            Module::B,
1351            Module::C,
1352            &cs(*b"DCS030  "),
1353            crate::codec::dcs::GatewayType::Repeater,
1354        )?;
1355        core.handle_input(Instant::now(), lb.get(..n).ok_or("bytes")?)?;
1356        let _ack = core.pop_transmit(Instant::now());
1357        let _ev: Option<ServerEvent<Dcs>> = core.pop_event();
1358
1359        let mut pb = [0u8; 32];
1360        let n = dcs_encode_poll(&mut pb, &cs(*b"W1AW    "), &cs(*b"DCS030  "))?;
1361        core.handle_input(Instant::now(), pb.get(..n).ok_or("bytes")?)?;
1362
1363        let tx = core.pop_transmit(Instant::now()).ok_or("echo")?;
1364        assert_eq!(tx.payload.len(), 17, "DCS poll echo is 17 bytes");
1365        Ok(())
1366    }
1367
1368    #[test]
1369    fn dcs_unlink_transitions_to_closed() -> TestResult {
1370        let mut core = ServerSessionCore::new(ProtocolKind::Dcs, DCS_PEER, Module::C);
1371        let mut lb = [0u8; 520];
1372        let n = dcs_encode_link(
1373            &mut lb,
1374            &cs(*b"W1AW    "),
1375            Module::B,
1376            Module::C,
1377            &cs(*b"DCS030  "),
1378            crate::codec::dcs::GatewayType::Repeater,
1379        )?;
1380        core.handle_input(Instant::now(), lb.get(..n).ok_or("bytes")?)?;
1381        let _ack = core.pop_transmit(Instant::now());
1382        let _ev: Option<ServerEvent<Dcs>> = core.pop_event();
1383
1384        let mut ub = [0u8; 32];
1385        let n = dcs_encode_unlink(&mut ub, &cs(*b"W1AW    "), Module::B, &cs(*b"DCS030  "))?;
1386        core.handle_input(Instant::now(), ub.get(..n).ok_or("bytes")?)?;
1387
1388        assert_eq!(core.state_kind(), ServerStateKind::Closed);
1389        let event: Option<ServerEvent<Dcs>> = core.pop_event();
1390        assert!(matches!(event, Some(ServerEvent::ClientUnlinked { .. })));
1391        Ok(())
1392    }
1393
1394    // ─── Misc ────────────────────────────────────────────────────
1395
1396    #[test]
1397    fn next_deadline_is_none_on_empty_outbox() {
1398        let core = ServerSessionCore::new(ProtocolKind::DExtra, PEER, Module::C);
1399        assert!(core.next_deadline().is_none());
1400    }
1401}