dstar_gateway_core/session/client/
core.rs

1//! Protocol-erased client session core.
2//!
3//! [`SessionCore`] is the runtime-erased state machine that drives a
4//! single D-STAR reflector client session. It is wrapped by the
5//! typestate [`Session<P, S>`][session] — the typestate sits on top
6//! of this struct and forwards every method. Keeping the state machine
7//! monomorphization-free avoids duplicating the machine body for each
8//! protocol.
9//!
10//! Handles connect / keepalive / disconnect and voice TX/RX.
11//!
12//! [session]: crate::session::client::Protocol
13
14use std::collections::VecDeque;
15use std::net::SocketAddr;
16use std::time::{Duration, Instant};
17
18use crate::codec::{dcs, dextra, dplus};
19use crate::error::{DcsError, Error, ProtocolError, StateError};
20use crate::header::DStarHeader;
21use crate::session::driver::Transmit;
22use crate::session::outbox::{OutboundPacket, Outbox};
23use crate::session::timer_wheel::TimerWheel;
24use crate::types::{Callsign, Module, ProtocolKind, StreamId};
25use crate::validator::{Diagnostic, VecSink};
26use crate::voice::VoiceFrame;
27
28use super::event::{DisconnectReason, Event, VoiceEndReason};
29use super::protocol::Protocol;
30use super::state::ClientStateKind;
31
32/// Named timer: keepalive poll.
33const TIMER_KEEPALIVE: &str = "keepalive";
34/// Named timer: keepalive inactivity (peer silent for too long).
35const TIMER_KEEPALIVE_INACTIVITY: &str = "keepalive_inactivity";
36/// Named timer: waiting for disconnect ACK from reflector.
37const TIMER_DISCONNECT_DEADLINE: &str = "disconnect_deadline";
38
39/// Internal protocol-erased event record.
40///
41/// [`SessionCore::pop_event`] is generic over `P: Protocol` and
42/// converts each `RawEvent` into an [`Event<P>`] at drain time.
43#[derive(Debug, Clone)]
44enum RawEvent {
45    /// Transitioned to `Connected`.
46    Connected {
47        /// Peer that accepted us.
48        peer: SocketAddr,
49    },
50    /// Transitioned to `Closed`.
51    Disconnected {
52        /// Why.
53        reason: DisconnectReason,
54    },
55    /// Reflector poll echo received.
56    PollEcho {
57        /// Peer that sent the echo.
58        peer: SocketAddr,
59    },
60    /// Voice stream started — header arrived from the reflector.
61    VoiceStart {
62        /// D-STAR stream id.
63        stream_id: StreamId,
64        /// Decoded D-STAR header.
65        header: DStarHeader,
66    },
67    /// Voice data frame within an active stream.
68    VoiceFrame {
69        /// D-STAR stream id.
70        stream_id: StreamId,
71        /// Frame sequence number.
72        seq: u8,
73        /// 9 AMBE bytes + 3 slow data bytes.
74        frame: VoiceFrame,
75    },
76    /// Voice stream ended.
77    VoiceEnd {
78        /// D-STAR stream id.
79        stream_id: StreamId,
80        /// Reason the stream ended.
81        reason: VoiceEndReason,
82    },
83}
84
85/// Protocol-erased session core.
86///
87/// Holds all mutable state for one reflector session. The typestate
88/// `Session<P, S>` wrapper forwards most calls straight through; the
89/// core does not itself enforce state transitions at compile time —
90/// that discipline is the wrapper's job.
91pub struct SessionCore {
92    /// Which protocol this session speaks.
93    kind: ProtocolKind,
94    /// Logged-in client callsign.
95    callsign: Callsign,
96    /// Client local module letter.
97    local_module: Module,
98    /// Reflector module letter we are linked (or linking) to.
99    reflector_module: Module,
100    /// Reflector's own callsign (e.g. `REF030`, `XLX307`, `DCS030`).
101    ///
102    /// Required by the DCS wire format — the 519-byte LINK packet,
103    /// the 19-byte UNLINK packet, and the 17-byte POLL packet all
104    /// embed the target reflector's callsign at specific offsets.
105    /// If the DCS client sends the wrong reflector callsign the
106    /// target reflector will drop the packet with no response.
107    ///
108    /// `None` means the caller did not supply one. For `DPlus` and
109    /// `DExtra` this is harmless — neither protocol embeds the
110    /// reflector callsign on the wire. For `DCS` the session falls
111    /// back to a `DCS001  ` default and emits a warning at construction
112    /// time so the operator can see why connections to
113    /// non-DCS001 reflectors fail.
114    reflector_callsign: Option<Callsign>,
115    /// Peer address of the reflector.
116    peer: SocketAddr,
117    /// Runtime state discriminator.
118    state: ClientStateKind,
119    /// Outbound packet queue.
120    outbox: Outbox,
121    /// Timer wheel.
122    timers: TimerWheel,
123    /// Most-recently-popped outbound packet, held so
124    /// [`SessionCore::pop_transmit`] can return a borrow into
125    /// the owned payload across multiple calls.
126    current_tx: Option<OutboundPacket>,
127    /// Queued raw events awaiting [`SessionCore::pop_event`].
128    events: VecDeque<RawEvent>,
129    /// Cached `DPlus` host list (only populated after TCP auth).
130    ///
131    /// `None` for `DExtra`/`Dcs`; `Some` after
132    /// [`SessionCore::attach_host_list`] transitions a `DPlus`
133    /// session from `Configured` to `Authenticated`.
134    host_list: Option<dplus::HostList>,
135    /// Most recent TX voice header, populated by [`Self::enqueue_send_header`].
136    ///
137    /// DCS embeds the full D-STAR header in every 100-byte voice
138    /// frame, so [`Self::enqueue_send_voice`] / [`Self::enqueue_send_eot`]
139    /// must be able to retrieve the header that started the stream.
140    /// `DPlus` and `DExtra` do not embed the header in voice frames,
141    /// so the cache is not consulted on those protocols — it is still
142    /// populated for symmetry and future header retransmit support.
143    cached_tx_header: Option<DStarHeader>,
144    /// Diagnostic sink for lenient parser warnings.
145    ///
146    /// Concrete [`VecSink`] owned by the core so [`Self::drain_diagnostics`]
147    /// can return them as a `Vec`. The `Session<P, S>` wrapper exposes
148    /// these via `Session::diagnostics()`.
149    diagnostics: VecSink,
150}
151
152impl std::fmt::Debug for SessionCore {
153    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
154        f.debug_struct("SessionCore")
155            .field("kind", &self.kind)
156            .field("callsign", &self.callsign)
157            .field("local_module", &self.local_module)
158            .field("reflector_module", &self.reflector_module)
159            .field("reflector_callsign", &self.reflector_callsign)
160            .field("peer", &self.peer)
161            .field("state", &self.state)
162            .field("outbox", &self.outbox)
163            .field("timers", &self.timers)
164            .field("current_tx", &self.current_tx)
165            .field("events", &self.events)
166            .field("host_list", &self.host_list)
167            .field("cached_tx_header", &self.cached_tx_header)
168            .field("diagnostics", &self.diagnostics)
169            .finish()
170    }
171}
172
173impl SessionCore {
174    // ── Construction ──────────────────────────────────────────
175
176    /// Build a new [`SessionCore`] in [`ClientStateKind::Configured`].
177    ///
178    /// The session has no host list, no pending packets, and no
179    /// active timers. The typestate builder ([`super::SessionBuilder`])
180    /// wraps this constructor and enforces per-protocol state-transition
181    /// rules.
182    #[must_use]
183    pub fn new(
184        kind: ProtocolKind,
185        callsign: Callsign,
186        local_module: Module,
187        reflector_module: Module,
188        peer: SocketAddr,
189    ) -> Self {
190        Self::new_with_reflector_callsign(
191            kind,
192            callsign,
193            local_module,
194            reflector_module,
195            peer,
196            None,
197        )
198    }
199
200    /// Build a new [`SessionCore`] with an explicit reflector
201    /// callsign.
202    ///
203    /// Required for `DCS` sessions that target a non-`DCS001`
204    /// reflector — the DCS codec embeds the reflector callsign in
205    /// every LINK/UNLINK/POLL packet, and the default fallback is
206    /// `DCS001  `. Optional for `DPlus` and `DExtra`, which do not
207    /// carry the reflector callsign on the wire.
208    ///
209    /// Emits a `tracing::warn!` at construction time if the
210    /// protocol is `DCS` and `reflector_callsign` is `None`, so
211    /// operators can see why connections to non-DCS001 reflectors
212    /// fail without the real callsign.
213    #[must_use]
214    pub fn new_with_reflector_callsign(
215        kind: ProtocolKind,
216        callsign: Callsign,
217        local_module: Module,
218        reflector_module: Module,
219        peer: SocketAddr,
220        reflector_callsign: Option<Callsign>,
221    ) -> Self {
222        if kind == ProtocolKind::Dcs && reflector_callsign.is_none() {
223            tracing::warn!(
224                target: "dstar_gateway_core::session",
225                %callsign,
226                %peer,
227                "DCS session constructed without reflector_callsign — \
228                 falling back to \"DCS001  \" default. Connections to \
229                 any other DCS reflector will fail silently because the \
230                 target reflector reads the callsign field from the \
231                 LINK packet and drops mismatched traffic. Call \
232                 SessionBuilder::reflector_callsign to fix."
233            );
234        }
235        Self {
236            kind,
237            callsign,
238            local_module,
239            reflector_module,
240            reflector_callsign,
241            peer,
242            state: ClientStateKind::Configured,
243            outbox: Outbox::new(),
244            timers: TimerWheel::new(),
245            current_tx: None,
246            events: VecDeque::new(),
247            host_list: None,
248            cached_tx_header: None,
249            diagnostics: VecSink::default(),
250        }
251    }
252
253    /// Drain accumulated parser diagnostics.
254    ///
255    /// Returns everything the internal [`VecSink`] has captured since
256    /// the previous drain (or since construction). The sink is empty
257    /// on return. Called by `Session::diagnostics()` from the
258    /// typestate wrapper.
259    pub fn drain_diagnostics(&mut self) -> Vec<Diagnostic> {
260        self.diagnostics.drain().collect()
261    }
262
263    // ── Accessors ─────────────────────────────────────────────
264
265    /// Current runtime state.
266    #[must_use]
267    pub const fn state_kind(&self) -> ClientStateKind {
268        self.state
269    }
270
271    /// Peer address of the reflector.
272    #[must_use]
273    pub const fn peer(&self) -> SocketAddr {
274        self.peer
275    }
276
277    /// Client callsign.
278    #[must_use]
279    pub const fn callsign(&self) -> Callsign {
280        self.callsign
281    }
282
283    /// Client local module letter.
284    #[must_use]
285    pub const fn local_module(&self) -> Module {
286        self.local_module
287    }
288
289    /// Reflector module letter.
290    #[must_use]
291    pub const fn reflector_module(&self) -> Module {
292        self.reflector_module
293    }
294
295    /// Runtime protocol discriminator.
296    #[must_use]
297    pub const fn protocol_kind(&self) -> ProtocolKind {
298        self.kind
299    }
300
301    /// Cached `DPlus` host list (`None` unless authenticated).
302    #[must_use]
303    pub const fn host_list(&self) -> Option<&dplus::HostList> {
304        self.host_list.as_ref()
305    }
306
307    // ── DPlus host list / auth ────────────────────────────────
308
309    /// Attach a `DPlus` host list, transitioning the session from
310    /// [`ClientStateKind::Configured`] to
311    /// [`ClientStateKind::Authenticated`].
312    ///
313    /// Only valid for `DPlus` sessions. The host list is what the
314    /// `dstar-gateway` shell would obtain from the
315    /// `auth.dstargateway.org` TCP handshake; the core does not
316    /// itself perform any I/O.
317    ///
318    /// # Errors
319    ///
320    /// Returns [`StateError::WrongState`] if the session is not a
321    /// `DPlus` session or is not in [`ClientStateKind::Configured`].
322    /// The typestate wrapper prevents both cases at compile time —
323    /// this runtime check is the residual safety net for direct
324    /// `SessionCore` users (tests + the protocol-erased fallback path).
325    pub fn attach_host_list(&mut self, list: dplus::HostList) -> Result<(), Error> {
326        if self.kind != ProtocolKind::DPlus || self.state != ClientStateKind::Configured {
327            return Err(Error::State(StateError::WrongState {
328                operation: "attach_host_list",
329                state: self.state,
330                protocol: self.kind,
331            }));
332        }
333        self.host_list = Some(list);
334        self.state = ClientStateKind::Authenticated;
335        Ok(())
336    }
337
338    // ── Connect / disconnect ──────────────────────────────────
339
340    /// Enqueue the protocol-appropriate LINK packet and transition
341    /// the session to [`ClientStateKind::Connecting`].
342    ///
343    /// # Errors
344    ///
345    /// Returns [`Error::Protocol`] if a codec encoder reports a
346    /// buffer-too-small (should never happen — the scratch
347    /// buffers in this core are oversized for every known packet).
348    pub fn enqueue_connect(&mut self, now: Instant) -> Result<(), Error> {
349        let packet = match self.kind {
350            ProtocolKind::DPlus => {
351                let mut buf = [0u8; 32];
352                let n = dplus::encode_link1(&mut buf)
353                    .map_err(dplus::DPlusError::from)
354                    .map_err(ProtocolError::DPlus)?;
355                OutboundPacket {
356                    dst: self.peer,
357                    payload: buf.get(..n).unwrap_or(&[]).to_vec(),
358                    not_before: now,
359                }
360            }
361            ProtocolKind::DExtra => {
362                let mut buf = [0u8; 16];
363                let n = dextra::encode_connect_link(
364                    &mut buf,
365                    &self.callsign,
366                    self.reflector_module,
367                    self.local_module,
368                )
369                .map_err(dextra::DExtraError::from)
370                .map_err(ProtocolError::DExtra)?;
371                OutboundPacket {
372                    dst: self.peer,
373                    payload: buf.get(..n).unwrap_or(&[]).to_vec(),
374                    not_before: now,
375                }
376            }
377            ProtocolKind::Dcs => {
378                let mut buf = vec![0u8; 600];
379                let reflector_callsign = self.dcs_reflector_callsign();
380                let n = dcs::encode_connect_link(
381                    &mut buf,
382                    &self.callsign,
383                    self.local_module,
384                    self.reflector_module,
385                    &reflector_callsign,
386                    dcs::GatewayType::Dongle,
387                )
388                .map_err(DcsError::from)
389                .map_err(ProtocolError::Dcs)?;
390                buf.truncate(n);
391                OutboundPacket {
392                    dst: self.peer,
393                    payload: buf,
394                    not_before: now,
395                }
396            }
397        };
398        self.outbox.enqueue(packet);
399        self.state = ClientStateKind::Connecting;
400        self.arm_keepalive_inactivity(now);
401        Ok(())
402    }
403
404    /// Enqueue the protocol-appropriate UNLINK packet and transition
405    /// the session to [`ClientStateKind::Disconnecting`].
406    ///
407    /// # Errors
408    ///
409    /// Returns [`Error::Protocol`] if a codec encoder fails.
410    pub fn enqueue_disconnect(&mut self, now: Instant) -> Result<(), Error> {
411        let packet = match self.kind {
412            ProtocolKind::DPlus => {
413                let mut buf = [0u8; 16];
414                let n = dplus::encode_unlink(&mut buf)
415                    .map_err(dplus::DPlusError::from)
416                    .map_err(ProtocolError::DPlus)?;
417                OutboundPacket {
418                    dst: self.peer,
419                    payload: buf.get(..n).unwrap_or(&[]).to_vec(),
420                    not_before: now,
421                }
422            }
423            ProtocolKind::DExtra => {
424                let mut buf = [0u8; 16];
425                let n = dextra::encode_unlink(&mut buf, &self.callsign, self.local_module)
426                    .map_err(dextra::DExtraError::from)
427                    .map_err(ProtocolError::DExtra)?;
428                OutboundPacket {
429                    dst: self.peer,
430                    payload: buf.get(..n).unwrap_or(&[]).to_vec(),
431                    not_before: now,
432                }
433            }
434            ProtocolKind::Dcs => {
435                let mut buf = [0u8; 32];
436                let reflector_callsign = self.dcs_reflector_callsign();
437                let n = dcs::encode_connect_unlink(
438                    &mut buf,
439                    &self.callsign,
440                    self.local_module,
441                    &reflector_callsign,
442                )
443                .map_err(DcsError::from)
444                .map_err(ProtocolError::Dcs)?;
445                OutboundPacket {
446                    dst: self.peer,
447                    payload: buf.get(..n).unwrap_or(&[]).to_vec(),
448                    not_before: now,
449                }
450            }
451        };
452        self.outbox.enqueue(packet);
453        self.state = ClientStateKind::Disconnecting;
454        self.arm_disconnect_deadline(now);
455        Ok(())
456    }
457
458    // ── Voice TX ──────────────────────────────────────────────
459
460    /// Enqueue a voice header packet for transmission.
461    ///
462    /// Caches the header internally so subsequent
463    /// [`Self::enqueue_send_voice`] and [`Self::enqueue_send_eot`]
464    /// calls can use it (required by DCS, which embeds the full
465    /// header in every voice frame).
466    ///
467    /// For DCS, the protocol does NOT have a separate header packet
468    /// — the first frame (seq=0) carries the embedded header. This
469    /// method emits a synthetic silence frame at seq=0 to start the
470    /// stream and matches the legacy
471    /// [`crate`]-internal behavior.
472    ///
473    /// # Errors
474    ///
475    /// Returns [`Error::State`] with [`StateError::WrongState`] if
476    /// the session is not in [`ClientStateKind::Connected`]. The
477    /// typestate wrapper prevents this at compile time, but the runtime
478    /// check is the residual safety net for direct [`SessionCore`] users.
479    ///
480    /// Returns [`Error::Protocol`] if the codec encoder fails
481    /// (buffer too small, etc.).
482    pub fn enqueue_send_header(
483        &mut self,
484        now: Instant,
485        header: &DStarHeader,
486        stream_id: StreamId,
487    ) -> Result<(), Error> {
488        if self.state != ClientStateKind::Connected {
489            return Err(Error::State(StateError::WrongState {
490                operation: "enqueue_send_header",
491                state: self.state,
492                protocol: self.kind,
493            }));
494        }
495        self.cached_tx_header = Some(*header);
496        let mut buf = [0u8; 256];
497        let n = match self.kind {
498            ProtocolKind::DPlus => dplus::encode_voice_header(&mut buf, stream_id, header)
499                .map_err(dplus::DPlusError::from)
500                .map_err(ProtocolError::DPlus)?,
501            ProtocolKind::DExtra => dextra::encode_voice_header(&mut buf, stream_id, header)
502                .map_err(dextra::DExtraError::from)
503                .map_err(ProtocolError::DExtra)?,
504            ProtocolKind::Dcs => {
505                // DCS has no separate header packet — the first frame
506                // (seq=0) carries the embedded header. Emit a silence
507                // frame at seq=0 to start the stream.
508                let silence = VoiceFrame::silence();
509                dcs::encode_voice(&mut buf, header, stream_id, 0, &silence, false)
510                    .map_err(DcsError::from)
511                    .map_err(ProtocolError::Dcs)?
512            }
513        };
514        self.outbox.enqueue(OutboundPacket {
515            dst: self.peer,
516            payload: buf.get(..n).unwrap_or(&[]).to_vec(),
517            not_before: now,
518        });
519        Ok(())
520    }
521
522    /// Enqueue a voice data frame for transmission.
523    ///
524    /// On DCS, the cached header from [`Self::enqueue_send_header`]
525    /// is required — DCS embeds the full header in every voice frame.
526    /// On `DPlus` and `DExtra`, the cache is consulted but not
527    /// strictly required for voice data.
528    ///
529    /// # Errors
530    ///
531    /// Returns [`Error::State`] with [`StateError::WrongState`] if
532    /// the session is not in [`ClientStateKind::Connected`].
533    ///
534    /// Returns [`Error::Protocol`] with
535    /// [`ProtocolError::Dcs`]([`crate::error::DcsError::NoTxHeader`])
536    /// if called on a DCS session before [`Self::enqueue_send_header`]
537    /// has cached a TX header.
538    ///
539    /// Returns [`Error::Protocol`] if the codec encoder fails.
540    pub fn enqueue_send_voice(
541        &mut self,
542        now: Instant,
543        stream_id: StreamId,
544        seq: u8,
545        frame: &VoiceFrame,
546    ) -> Result<(), Error> {
547        if self.state != ClientStateKind::Connected {
548            return Err(Error::State(StateError::WrongState {
549                operation: "enqueue_send_voice",
550                state: self.state,
551                protocol: self.kind,
552            }));
553        }
554        let mut buf = [0u8; 256];
555        let n = match self.kind {
556            ProtocolKind::DPlus => dplus::encode_voice_data(&mut buf, stream_id, seq, frame)
557                .map_err(dplus::DPlusError::from)
558                .map_err(ProtocolError::DPlus)?,
559            ProtocolKind::DExtra => dextra::encode_voice_data(&mut buf, stream_id, seq, frame)
560                .map_err(dextra::DExtraError::from)
561                .map_err(ProtocolError::DExtra)?,
562            ProtocolKind::Dcs => {
563                let header = self
564                    .cached_tx_header
565                    .as_ref()
566                    .ok_or(Error::Protocol(ProtocolError::Dcs(DcsError::NoTxHeader)))?;
567                dcs::encode_voice(&mut buf, header, stream_id, seq, frame, false)
568                    .map_err(DcsError::from)
569                    .map_err(ProtocolError::Dcs)?
570            }
571        };
572        self.outbox.enqueue(OutboundPacket {
573            dst: self.peer,
574            payload: buf.get(..n).unwrap_or(&[]).to_vec(),
575            not_before: now,
576        });
577        Ok(())
578    }
579
580    /// Enqueue a voice EOT packet for transmission.
581    ///
582    /// On DCS, the cached header from [`Self::enqueue_send_header`]
583    /// is required — DCS embeds the full header in every voice frame
584    /// (including the EOT). On `DPlus` and `DExtra`, the cache is not
585    /// consulted.
586    ///
587    /// # Errors
588    ///
589    /// Returns [`Error::State`] with [`StateError::WrongState`] if
590    /// the session is not in [`ClientStateKind::Connected`].
591    ///
592    /// Returns [`Error::Protocol`] with
593    /// [`ProtocolError::Dcs`]([`crate::error::DcsError::NoTxHeader`])
594    /// if called on a DCS session before [`Self::enqueue_send_header`]
595    /// has cached a TX header.
596    ///
597    /// Returns [`Error::Protocol`] if the codec encoder fails.
598    pub fn enqueue_send_eot(
599        &mut self,
600        now: Instant,
601        stream_id: StreamId,
602        seq: u8,
603    ) -> Result<(), Error> {
604        if self.state != ClientStateKind::Connected {
605            return Err(Error::State(StateError::WrongState {
606                operation: "enqueue_send_eot",
607                state: self.state,
608                protocol: self.kind,
609            }));
610        }
611        let mut buf = [0u8; 256];
612        let n = match self.kind {
613            ProtocolKind::DPlus => dplus::encode_voice_eot(&mut buf, stream_id, seq)
614                .map_err(dplus::DPlusError::from)
615                .map_err(ProtocolError::DPlus)?,
616            ProtocolKind::DExtra => dextra::encode_voice_eot(&mut buf, stream_id, seq)
617                .map_err(dextra::DExtraError::from)
618                .map_err(ProtocolError::DExtra)?,
619            ProtocolKind::Dcs => {
620                let header = self
621                    .cached_tx_header
622                    .as_ref()
623                    .ok_or(Error::Protocol(ProtocolError::Dcs(DcsError::NoTxHeader)))?;
624                let silence = VoiceFrame::silence();
625                dcs::encode_voice(&mut buf, header, stream_id, seq, &silence, true)
626                    .map_err(DcsError::from)
627                    .map_err(ProtocolError::Dcs)?
628            }
629        };
630        self.outbox.enqueue(OutboundPacket {
631            dst: self.peer,
632            payload: buf.get(..n).unwrap_or(&[]).to_vec(),
633            not_before: now,
634        });
635        Ok(())
636    }
637
638    // ── Input dispatch ────────────────────────────────────────
639
640    /// Feed an inbound UDP datagram.
641    ///
642    /// Parses the bytes, updates state, pushes events and outbound
643    /// packets as needed. Protocol-erased dispatch: matches on
644    /// [`Self::protocol_kind`] and calls the appropriate codec.
645    ///
646    /// The `peer` argument is the source address of the datagram.
647    /// The typestate wrapper filters out datagrams whose source does
648    /// not match the expected reflector; the core accepts whatever
649    /// the shell passes it.
650    ///
651    /// # Errors
652    ///
653    /// Returns [`Error::Protocol`] wrapping the codec error if the
654    /// datagram cannot be parsed.
655    pub fn handle_input(
656        &mut self,
657        now: Instant,
658        peer: SocketAddr,
659        bytes: &[u8],
660    ) -> Result<(), Error> {
661        match self.kind {
662            ProtocolKind::DPlus => self.handle_dplus_input(now, peer, bytes),
663            ProtocolKind::DExtra => self.handle_dextra_input(now, peer, bytes),
664            ProtocolKind::Dcs => self.handle_dcs_input(now, peer, bytes),
665        }
666    }
667
668    fn handle_dplus_input(
669        &mut self,
670        now: Instant,
671        peer: SocketAddr,
672        bytes: &[u8],
673    ) -> Result<(), Error> {
674        // Lenient decode: unknown-length, magic-missing, or otherwise
675        // unparseable datagrams must NOT tear down an active session.
676        // Real DPlus reflectors emit unrecognized traffic (status
677        // heartbeats, variable-length control, legacy framing) and
678        // any of those would previously propagate through `?` and
679        // kill the tokio shell's run loop. Record a diagnostic and
680        // keep going.
681        let pkt = match dplus::decode_server_to_client(bytes, &mut self.diagnostics) {
682            Ok(pkt) => pkt,
683            Err(e) => {
684                tracing::debug!(
685                    target: "dstar_gateway_core::codec",
686                    error = %e,
687                    peer = %peer,
688                    bytes_len = bytes.len(),
689                    "DPlus decoder rejected datagram; dropping (session stays alive)"
690                );
691                return Ok(());
692            }
693        };
694        match pkt {
695            dplus::ServerPacket::Link1Ack => {
696                if self.state == ClientStateKind::Connecting {
697                    // First half of the two-step DPlus handshake —
698                    // reply with LINK2 immediately.
699                    let mut buf = [0u8; 32];
700                    let n = dplus::encode_link2(&mut buf, &self.callsign)
701                        .map_err(dplus::DPlusError::from)
702                        .map_err(ProtocolError::DPlus)?;
703                    self.outbox.enqueue(OutboundPacket {
704                        dst: self.peer,
705                        payload: buf.get(..n).unwrap_or(&[]).to_vec(),
706                        not_before: now,
707                    });
708                    self.arm_keepalive_inactivity(now);
709                } else if self.state == ClientStateKind::Disconnecting {
710                    // DPlus servers echo the 5-byte packet on unlink too.
711                    self.finalize_disconnect(DisconnectReason::UnlinkAcked);
712                }
713                Ok(())
714            }
715            dplus::ServerPacket::Link2Reply { result } => {
716                if self.state == ClientStateKind::Connecting {
717                    match result {
718                        dplus::Link2Result::Accept => {
719                            self.finalize_connected(peer, now);
720                        }
721                        dplus::Link2Result::Busy | dplus::Link2Result::Unknown { .. } => {
722                            self.finalize_rejected();
723                        }
724                    }
725                }
726                Ok(())
727            }
728            dplus::ServerPacket::Link2Echo { .. } => {
729                // Some DPlus servers echo the full 28-byte LINK2 instead
730                // of OKRW. Treat it as an accept.
731                if self.state == ClientStateKind::Connecting {
732                    self.finalize_connected(peer, now);
733                }
734                Ok(())
735            }
736            dplus::ServerPacket::UnlinkAck => {
737                if self.state == ClientStateKind::Disconnecting {
738                    self.finalize_disconnect(DisconnectReason::UnlinkAcked);
739                }
740                Ok(())
741            }
742            dplus::ServerPacket::PollEcho => {
743                self.arm_keepalive_inactivity(now);
744                self.events.push_back(RawEvent::PollEcho { peer });
745                Ok(())
746            }
747            dplus::ServerPacket::VoiceHeader { stream_id, header } => {
748                self.arm_keepalive_inactivity(now);
749                self.events
750                    .push_back(RawEvent::VoiceStart { stream_id, header });
751                Ok(())
752            }
753            dplus::ServerPacket::VoiceData {
754                stream_id,
755                seq,
756                frame,
757            } => {
758                self.arm_keepalive_inactivity(now);
759                self.events.push_back(RawEvent::VoiceFrame {
760                    stream_id,
761                    seq,
762                    frame,
763                });
764                Ok(())
765            }
766            dplus::ServerPacket::VoiceEot { stream_id, seq: _ } => {
767                self.arm_keepalive_inactivity(now);
768                self.events.push_back(RawEvent::VoiceEnd {
769                    stream_id,
770                    reason: VoiceEndReason::Eot,
771                });
772                Ok(())
773            }
774        }
775    }
776
777    #[expect(
778        clippy::unnecessary_wraps,
779        reason = "uniform signature with handle_dplus_input / handle_dcs_input; top-level dispatcher returns Result<(), Error>"
780    )]
781    fn handle_dextra_input(
782        &mut self,
783        now: Instant,
784        peer: SocketAddr,
785        bytes: &[u8],
786    ) -> Result<(), Error> {
787        // Lenient decode: see `handle_dplus_input` for the rationale.
788        // An unknown datagram must not kill an established session.
789        let pkt = match dextra::decode_server_to_client(bytes, &mut self.diagnostics) {
790            Ok(pkt) => pkt,
791            Err(e) => {
792                tracing::debug!(
793                    target: "dstar_gateway_core::codec",
794                    error = %e,
795                    peer = %peer,
796                    bytes_len = bytes.len(),
797                    "DExtra decoder rejected datagram; dropping (session stays alive)"
798                );
799                return Ok(());
800            }
801        };
802        match pkt {
803            dextra::ServerPacket::ConnectAck { .. } => {
804                if self.state == ClientStateKind::Connecting {
805                    self.finalize_connected(peer, now);
806                }
807                Ok(())
808            }
809            dextra::ServerPacket::ConnectNak { .. } => {
810                if self.state == ClientStateKind::Connecting {
811                    self.finalize_rejected();
812                } else if self.state == ClientStateKind::Disconnecting {
813                    // DExtra reflectors echo the unlink as a NAK on
814                    // module-space. Treat it as the unlink ACK.
815                    self.finalize_disconnect(DisconnectReason::UnlinkAcked);
816                }
817                Ok(())
818            }
819            dextra::ServerPacket::PollEcho { .. } => {
820                self.arm_keepalive_inactivity(now);
821                self.events.push_back(RawEvent::PollEcho { peer });
822                Ok(())
823            }
824            dextra::ServerPacket::VoiceHeader { stream_id, header } => {
825                self.arm_keepalive_inactivity(now);
826                self.events
827                    .push_back(RawEvent::VoiceStart { stream_id, header });
828                Ok(())
829            }
830            dextra::ServerPacket::VoiceData {
831                stream_id,
832                seq,
833                frame,
834            } => {
835                self.arm_keepalive_inactivity(now);
836                self.events.push_back(RawEvent::VoiceFrame {
837                    stream_id,
838                    seq,
839                    frame,
840                });
841                Ok(())
842            }
843            dextra::ServerPacket::VoiceEot { stream_id, seq: _ } => {
844                self.arm_keepalive_inactivity(now);
845                self.events.push_back(RawEvent::VoiceEnd {
846                    stream_id,
847                    reason: VoiceEndReason::Eot,
848                });
849                Ok(())
850            }
851        }
852    }
853
854    #[expect(
855        clippy::unnecessary_wraps,
856        reason = "uniform signature with handle_dplus_input / handle_dextra_input; top-level dispatcher returns Result<(), Error>"
857    )]
858    fn handle_dcs_input(
859        &mut self,
860        now: Instant,
861        peer: SocketAddr,
862        bytes: &[u8],
863    ) -> Result<(), Error> {
864        // Lenient decode: see `handle_dplus_input` for the rationale.
865        // An unknown datagram must not kill an established session.
866        let pkt = match dcs::decode_server_to_client(bytes, &mut self.diagnostics) {
867            Ok(pkt) => pkt,
868            Err(e) => {
869                tracing::debug!(
870                    target: "dstar_gateway_core::codec",
871                    error = %e,
872                    peer = %peer,
873                    bytes_len = bytes.len(),
874                    "DCS decoder rejected datagram; dropping (session stays alive)"
875                );
876                return Ok(());
877            }
878        };
879        match pkt {
880            dcs::ServerPacket::ConnectAck { .. } => {
881                if self.state == ClientStateKind::Connecting {
882                    self.finalize_connected(peer, now);
883                } else if self.state == ClientStateKind::Disconnecting {
884                    self.finalize_disconnect(DisconnectReason::UnlinkAcked);
885                }
886                Ok(())
887            }
888            dcs::ServerPacket::ConnectNak { .. } => {
889                if self.state == ClientStateKind::Connecting {
890                    self.finalize_rejected();
891                }
892                Ok(())
893            }
894            dcs::ServerPacket::PollEcho { .. } => {
895                self.arm_keepalive_inactivity(now);
896                self.events.push_back(RawEvent::PollEcho { peer });
897                Ok(())
898            }
899            dcs::ServerPacket::Voice {
900                header,
901                stream_id,
902                seq,
903                frame,
904                is_end,
905            } => {
906                self.arm_keepalive_inactivity(now);
907                if is_end {
908                    self.events.push_back(RawEvent::VoiceEnd {
909                        stream_id,
910                        reason: VoiceEndReason::Eot,
911                    });
912                } else if seq == 0 {
913                    // DCS doesn't have a separate header packet — the
914                    // first frame (seq=0) carries the embedded header.
915                    // Surface it as the stream-start event so consumers
916                    // see VoiceStart, then VoiceFrame, ..., VoiceEnd.
917                    self.events
918                        .push_back(RawEvent::VoiceStart { stream_id, header });
919                } else {
920                    self.events.push_back(RawEvent::VoiceFrame {
921                        stream_id,
922                        seq,
923                        frame,
924                    });
925                }
926                Ok(())
927            }
928        }
929    }
930
931    // ── Timer handling ────────────────────────────────────────
932
933    /// Advance the session timers using `now` as the current
934    /// instant.
935    ///
936    /// Checks each named timer:
937    ///
938    /// - `keepalive` expired → enqueue poll packet, re-arm
939    /// - `keepalive_inactivity` expired → transition to Closed,
940    ///   emit [`Event::Disconnected`] with reason
941    ///   [`DisconnectReason::KeepaliveInactivity`]
942    /// - `disconnect_deadline` expired (in Disconnecting) →
943    ///   transition to Closed, emit [`Event::Disconnected`] with
944    ///   reason [`DisconnectReason::DisconnectTimeout`]
945    pub fn handle_timeout(&mut self, now: Instant) {
946        if self.state == ClientStateKind::Connected && self.timers.is_expired(TIMER_KEEPALIVE, now)
947        {
948            self.enqueue_poll(now);
949            self.timers
950                .set(TIMER_KEEPALIVE, now + self.keepalive_interval());
951        }
952
953        if (self.state == ClientStateKind::Connecting || self.state == ClientStateKind::Connected)
954            && self.timers.is_expired(TIMER_KEEPALIVE_INACTIVITY, now)
955        {
956            self.finalize_disconnect(DisconnectReason::KeepaliveInactivity);
957            return;
958        }
959
960        if self.state == ClientStateKind::Disconnecting
961            && self.timers.is_expired(TIMER_DISCONNECT_DEADLINE, now)
962        {
963            self.finalize_disconnect(DisconnectReason::DisconnectTimeout);
964        }
965    }
966
967    // ── Poll / event drain ────────────────────────────────────
968
969    /// Pop the next outbound datagram, if any.
970    ///
971    /// The returned [`Transmit`] borrows from `self.current_tx`,
972    /// which holds the most-recently-popped packet until the next
973    /// call replaces it. Callers must consume the borrow before
974    /// calling this method again.
975    #[must_use]
976    pub fn pop_transmit(&mut self, now: Instant) -> Option<Transmit<'_>> {
977        let next = self.outbox.pop_ready(now)?;
978        self.current_tx = Some(next);
979        let held = self.current_tx.as_ref()?;
980        Some(Transmit {
981            dst: held.dst,
982            payload: held.payload.as_slice(),
983        })
984    }
985
986    /// Pop the next consumer-visible event.
987    ///
988    /// The `P` type parameter re-attaches the protocol phantom at
989    /// drain time — the event queue itself is protocol-erased.
990    pub fn pop_event<P: Protocol>(&mut self) -> Option<Event<P>> {
991        let raw = self.events.pop_front()?;
992        Some(match raw {
993            RawEvent::Connected { peer } => Event::Connected { peer },
994            RawEvent::Disconnected { reason } => Event::Disconnected { reason },
995            RawEvent::PollEcho { peer } => Event::PollEcho { peer },
996            RawEvent::VoiceStart { stream_id, header } => Event::VoiceStart {
997                stream_id,
998                header,
999                // Per-event diagnostics are not populated here;
1000                // consumers drain them via `Session::diagnostics()`.
1001                diagnostics: Vec::new(),
1002            },
1003            RawEvent::VoiceFrame {
1004                stream_id,
1005                seq,
1006                frame,
1007            } => Event::VoiceFrame {
1008                stream_id,
1009                seq,
1010                frame,
1011            },
1012            RawEvent::VoiceEnd { stream_id, reason } => Event::VoiceEnd { stream_id, reason },
1013        })
1014    }
1015
1016    /// Earliest instant at which this core wants to be woken up.
1017    ///
1018    /// Combines the outbox's next release instant with the timer
1019    /// wheel's next deadline.
1020    #[must_use]
1021    pub fn next_deadline(&self) -> Option<Instant> {
1022        match (
1023            self.outbox.peek_next_deadline(),
1024            self.timers.next_deadline(),
1025        ) {
1026            (None, None) => None,
1027            (Some(o), None) => Some(o),
1028            (None, Some(t)) => Some(t),
1029            (Some(o), Some(t)) => Some(o.min(t)),
1030        }
1031    }
1032
1033    // ── Internal helpers ──────────────────────────────────────
1034
1035    /// Enqueue the protocol-appropriate keepalive poll packet.
1036    ///
1037    /// Encoder failures are swallowed — the scratch buffers in this
1038    /// method are always big enough for the smallest packet in each
1039    /// protocol, so the error path is unreachable in practice. A
1040    /// failure would simply mean no poll is sent this tick and the
1041    /// next timer expiry will try again.
1042    fn enqueue_poll(&mut self, now: Instant) {
1043        let encoded: Option<Vec<u8>> = match self.kind {
1044            ProtocolKind::DPlus => {
1045                let mut buf = [0u8; 8];
1046                dplus::encode_poll(&mut buf)
1047                    .ok()
1048                    .and_then(|n| buf.get(..n).map(<[u8]>::to_vec))
1049            }
1050            ProtocolKind::DExtra => {
1051                let mut buf = [0u8; 16];
1052                dextra::encode_poll(&mut buf, &self.callsign)
1053                    .ok()
1054                    .and_then(|n| buf.get(..n).map(<[u8]>::to_vec))
1055            }
1056            ProtocolKind::Dcs => {
1057                let mut buf = [0u8; 32];
1058                let reflector_callsign = self.dcs_reflector_callsign();
1059                dcs::encode_poll_request(&mut buf, &self.callsign, &reflector_callsign)
1060                    .ok()
1061                    .and_then(|n| buf.get(..n).map(<[u8]>::to_vec))
1062            }
1063        };
1064        if let Some(payload) = encoded {
1065            self.outbox.enqueue(OutboundPacket {
1066                dst: self.peer,
1067                payload,
1068                not_before: now,
1069            });
1070        }
1071    }
1072
1073    /// Transition to Connected, arm keepalive timers, emit event.
1074    fn finalize_connected(&mut self, peer: SocketAddr, now: Instant) {
1075        self.state = ClientStateKind::Connected;
1076        self.timers
1077            .set(TIMER_KEEPALIVE, now + self.keepalive_interval());
1078        self.arm_keepalive_inactivity(now);
1079        self.events.push_back(RawEvent::Connected { peer });
1080    }
1081
1082    /// Transition to Closed with `Rejected`, emit event.
1083    fn finalize_rejected(&mut self) {
1084        self.state = ClientStateKind::Closed;
1085        self.clear_timers();
1086        self.events.push_back(RawEvent::Disconnected {
1087            reason: DisconnectReason::Rejected,
1088        });
1089    }
1090
1091    /// Transition to Closed with the given reason, emit event.
1092    fn finalize_disconnect(&mut self, reason: DisconnectReason) {
1093        self.state = ClientStateKind::Closed;
1094        self.clear_timers();
1095        self.events.push_back(RawEvent::Disconnected { reason });
1096    }
1097
1098    fn clear_timers(&mut self) {
1099        self.timers.clear(TIMER_KEEPALIVE);
1100        self.timers.clear(TIMER_KEEPALIVE_INACTIVITY);
1101        self.timers.clear(TIMER_DISCONNECT_DEADLINE);
1102    }
1103
1104    fn arm_keepalive_inactivity(&mut self, now: Instant) {
1105        self.timers.set(
1106            TIMER_KEEPALIVE_INACTIVITY,
1107            now + self.keepalive_inactivity_timeout(),
1108        );
1109    }
1110
1111    fn arm_disconnect_deadline(&mut self, now: Instant) {
1112        self.timers
1113            .set(TIMER_DISCONNECT_DEADLINE, now + self.disconnect_timeout());
1114    }
1115
1116    const fn keepalive_interval(&self) -> Duration {
1117        match self.kind {
1118            ProtocolKind::DPlus => dplus::consts::KEEPALIVE_INTERVAL,
1119            ProtocolKind::DExtra => dextra::consts::KEEPALIVE_INTERVAL,
1120            ProtocolKind::Dcs => dcs::consts::KEEPALIVE_INTERVAL,
1121        }
1122    }
1123
1124    const fn keepalive_inactivity_timeout(&self) -> Duration {
1125        match self.kind {
1126            ProtocolKind::DPlus => dplus::consts::KEEPALIVE_INACTIVITY_TIMEOUT,
1127            ProtocolKind::DExtra => dextra::consts::KEEPALIVE_INACTIVITY_TIMEOUT,
1128            ProtocolKind::Dcs => dcs::consts::KEEPALIVE_INACTIVITY_TIMEOUT,
1129        }
1130    }
1131
1132    const fn disconnect_timeout(&self) -> Duration {
1133        match self.kind {
1134            ProtocolKind::DPlus => dplus::consts::DISCONNECT_TIMEOUT,
1135            ProtocolKind::DExtra => dextra::consts::DISCONNECT_TIMEOUT,
1136            ProtocolKind::Dcs => dcs::consts::DISCONNECT_TIMEOUT,
1137        }
1138    }
1139
1140    /// Return the stored reflector callsign for DCS codec paths.
1141    ///
1142    /// Returns the value supplied via
1143    /// [`Self::new_with_reflector_callsign`] when present, or a
1144    /// `DCS001  ` fallback when the caller did not supply one. The
1145    /// fallback is only correct for sessions targeting the DCS001
1146    /// reflector; for any other target the caller MUST supply the
1147    /// real reflector callsign via the builder or the DCS server
1148    /// will drop the LINK packet silently.
1149    fn dcs_reflector_callsign(&self) -> Callsign {
1150        self.reflector_callsign
1151            .unwrap_or_else(|| Callsign::from_wire_bytes(*b"DCS001  "))
1152    }
1153}
1154
1155#[cfg(test)]
1156mod tests {
1157    use super::*;
1158    use crate::codec::dplus::HostList;
1159    use crate::codec::{dcs as dcs_codec, dextra as dextra_codec, dplus as dplus_codec};
1160    use crate::session::client::protocol::{DExtra, DPlus, Dcs};
1161    use std::net::{IpAddr, Ipv4Addr};
1162
1163    const fn cs() -> Callsign {
1164        Callsign::from_wire_bytes(*b"W1AW    ")
1165    }
1166
1167    const ADDR_DEXTRA: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 30001);
1168    const ADDR_DPLUS: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 20001);
1169    const ADDR_DCS: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 30051);
1170
1171    fn new_dextra() -> SessionCore {
1172        SessionCore::new(
1173            ProtocolKind::DExtra,
1174            cs(),
1175            Module::B,
1176            Module::C,
1177            ADDR_DEXTRA,
1178        )
1179    }
1180
1181    fn new_dplus() -> SessionCore {
1182        SessionCore::new(ProtocolKind::DPlus, cs(), Module::B, Module::C, ADDR_DPLUS)
1183    }
1184
1185    fn new_dcs() -> SessionCore {
1186        SessionCore::new(ProtocolKind::Dcs, cs(), Module::B, Module::C, ADDR_DCS)
1187    }
1188
1189    type TestResult = Result<(), Box<dyn std::error::Error>>;
1190
1191    /// Drive a `DExtra` core through the full connect handshake,
1192    /// returning it in Connected state with the Connected event
1193    /// already drained.
1194    fn connected_dextra() -> Result<(SessionCore, Instant), Box<dyn std::error::Error>> {
1195        let t0 = Instant::now();
1196        let mut core = new_dextra();
1197        core.enqueue_connect(t0)?;
1198        let _ = core.pop_transmit(t0).ok_or("no link")?;
1199        let mut ack = [0u8; 16];
1200        let n = dextra_codec::encode_connect_ack(&mut ack, &cs(), Module::C)?;
1201        core.handle_input(t0, ADDR_DEXTRA, ack.get(..n).ok_or("n > buf")?)?;
1202        drop(core.pop_event::<DExtra>().ok_or("no connected event")?);
1203        Ok((core, t0))
1204    }
1205
1206    /// Drive a `DPlus` core through the full 2-step handshake.
1207    fn connected_dplus() -> Result<(SessionCore, Instant), Box<dyn std::error::Error>> {
1208        let t0 = Instant::now();
1209        let mut core = new_dplus();
1210        core.attach_host_list(HostList::new())?;
1211        core.enqueue_connect(t0)?;
1212        let _ = core.pop_transmit(t0).ok_or("no link1")?;
1213        let mut ack = [0u8; 8];
1214        let n = dplus_codec::encode_link1_ack(&mut ack)?;
1215        core.handle_input(t0, ADDR_DPLUS, ack.get(..n).ok_or("n > buf")?)?;
1216        let _ = core.pop_transmit(t0).ok_or("no link2")?;
1217        let mut reply = [0u8; 16];
1218        let n = dplus_codec::encode_link2_reply(&mut reply, dplus_codec::Link2Result::Accept)?;
1219        core.handle_input(t0, ADDR_DPLUS, reply.get(..n).ok_or("n > buf")?)?;
1220        drop(core.pop_event::<DPlus>().ok_or("no connected event")?);
1221        Ok((core, t0))
1222    }
1223
1224    /// Drive a DCS core through the connect handshake.
1225    fn connected_dcs() -> Result<(SessionCore, Instant), Box<dyn std::error::Error>> {
1226        let t0 = Instant::now();
1227        let mut core = new_dcs();
1228        core.enqueue_connect(t0)?;
1229        let _ = core.pop_transmit(t0).ok_or("no link")?;
1230        let mut ack = [0u8; 16];
1231        let n = dcs_codec::encode_connect_ack(
1232            &mut ack,
1233            &Callsign::from_wire_bytes(*b"DCS001  "),
1234            Module::C,
1235        )?;
1236        core.handle_input(t0, ADDR_DCS, ack.get(..n).ok_or("n > buf")?)?;
1237        drop(core.pop_event::<Dcs>().ok_or("no connected event")?);
1238        Ok((core, t0))
1239    }
1240
1241    // ── DExtra happy path ─────────────────────────────────────
1242
1243    #[test]
1244    fn dextra_connect_success() -> TestResult {
1245        let mut core = new_dextra();
1246        assert_eq!(core.state_kind(), ClientStateKind::Configured);
1247        let t0 = Instant::now();
1248        core.enqueue_connect(t0)?;
1249        assert_eq!(core.state_kind(), ClientStateKind::Connecting);
1250
1251        // Expect an 11-byte LINK packet in the outbox.
1252        let tx = core.pop_transmit(t0).ok_or("no link packet in outbox")?;
1253        assert_eq!(tx.payload.len(), 11);
1254        assert_eq!(tx.dst, ADDR_DEXTRA);
1255
1256        // Build the server-side ACK using the codec.
1257        let mut ack = [0u8; 16];
1258        let n = dextra_codec::encode_connect_ack(&mut ack, &cs(), Module::C)?;
1259        core.handle_input(
1260            t0,
1261            ADDR_DEXTRA,
1262            ack.get(..n).ok_or("encode returned n > buf")?,
1263        )?;
1264        assert_eq!(core.state_kind(), ClientStateKind::Connected);
1265
1266        // Should have emitted a Connected event.
1267        let ev = core.pop_event::<DExtra>().ok_or("no Connected event")?;
1268        assert!(matches!(ev, Event::Connected { .. }));
1269        Ok(())
1270    }
1271
1272    #[test]
1273    fn dextra_connect_rejected() -> Result<(), Box<dyn std::error::Error>> {
1274        let mut core = new_dextra();
1275        let t0 = Instant::now();
1276        core.enqueue_connect(t0)?;
1277        let _ = core.pop_transmit(t0).ok_or("no link packet")?;
1278
1279        let mut nak = [0u8; 16];
1280        let n = dextra_codec::encode_connect_nak(&mut nak, &cs(), Module::C)?;
1281        core.handle_input(t0, ADDR_DEXTRA, nak.get(..n).ok_or("n > buf")?)?;
1282        assert_eq!(core.state_kind(), ClientStateKind::Closed);
1283
1284        let ev = core.pop_event::<DExtra>().ok_or("no event")?;
1285        assert!(
1286            matches!(
1287                ev,
1288                Event::Disconnected {
1289                    reason: DisconnectReason::Rejected
1290                }
1291            ),
1292            "expected Disconnected(Rejected), got {ev:?}"
1293        );
1294        Ok(())
1295    }
1296
1297    #[test]
1298    fn dextra_keepalive_fires_poll() -> TestResult {
1299        let (mut core, t0) = connected_dextra()?;
1300
1301        let t1 = t0 + dextra_codec::consts::KEEPALIVE_INTERVAL + Duration::from_millis(1);
1302        core.handle_timeout(t1);
1303
1304        let tx = core.pop_transmit(t1).ok_or("no poll packet")?;
1305        assert_eq!(tx.payload.len(), 9);
1306        Ok(())
1307    }
1308
1309    #[test]
1310    fn dextra_keepalive_inactivity_closes() -> TestResult {
1311        let (mut core, t0) = connected_dextra()?;
1312
1313        let t1 = t0 + dextra_codec::consts::KEEPALIVE_INACTIVITY_TIMEOUT + Duration::from_secs(1);
1314        core.handle_timeout(t1);
1315        assert_eq!(core.state_kind(), ClientStateKind::Closed);
1316
1317        let ev = core.pop_event::<DExtra>().ok_or("no event")?;
1318        assert!(
1319            matches!(
1320                ev,
1321                Event::Disconnected {
1322                    reason: DisconnectReason::KeepaliveInactivity
1323                }
1324            ),
1325            "expected Disconnected(KeepaliveInactivity), got {ev:?}"
1326        );
1327        Ok(())
1328    }
1329
1330    #[test]
1331    fn dextra_disconnect_success() -> TestResult {
1332        let (mut core, t0) = connected_dextra()?;
1333
1334        core.enqueue_disconnect(t0)?;
1335        assert_eq!(core.state_kind(), ClientStateKind::Disconnecting);
1336
1337        let tx = core.pop_transmit(t0).ok_or("no unlink packet")?;
1338        assert_eq!(tx.payload.len(), 11);
1339
1340        // DExtra echoes the unlink as a NAK on space module.
1341        let mut nak = [0u8; 16];
1342        let n = dextra_codec::encode_connect_nak(&mut nak, &cs(), Module::C)?;
1343        core.handle_input(t0, ADDR_DEXTRA, nak.get(..n).ok_or("n > buf")?)?;
1344        assert_eq!(core.state_kind(), ClientStateKind::Closed);
1345
1346        let ev = core.pop_event::<DExtra>().ok_or("no event")?;
1347        assert!(
1348            matches!(
1349                ev,
1350                Event::Disconnected {
1351                    reason: DisconnectReason::UnlinkAcked
1352                }
1353            ),
1354            "expected Disconnected(UnlinkAcked), got {ev:?}"
1355        );
1356        Ok(())
1357    }
1358
1359    // ── DPlus happy path (two-step handshake) ─────────────────
1360
1361    #[test]
1362    fn dplus_two_step_connect_success() -> TestResult {
1363        let mut core = new_dplus();
1364        assert_eq!(core.state_kind(), ClientStateKind::Configured);
1365
1366        core.attach_host_list(HostList::new())?;
1367        assert_eq!(core.state_kind(), ClientStateKind::Authenticated);
1368        assert!(core.host_list().is_some());
1369
1370        let t0 = Instant::now();
1371        core.enqueue_connect(t0)?;
1372        assert_eq!(core.state_kind(), ClientStateKind::Connecting);
1373
1374        let tx = core.pop_transmit(t0).ok_or("no link1")?;
1375        assert_eq!(tx.payload.len(), 5);
1376
1377        let mut ack = [0u8; 8];
1378        let n = dplus_codec::encode_link1_ack(&mut ack)?;
1379        core.handle_input(t0, ADDR_DPLUS, ack.get(..n).ok_or("n > buf")?)?;
1380        assert_eq!(
1381            core.state_kind(),
1382            ClientStateKind::Connecting,
1383            "still connecting after LINK1 ACK"
1384        );
1385
1386        let tx = core.pop_transmit(t0).ok_or("no link2")?;
1387        assert_eq!(tx.payload.len(), 28);
1388
1389        let mut reply = [0u8; 16];
1390        let n = dplus_codec::encode_link2_reply(&mut reply, dplus_codec::Link2Result::Accept)?;
1391        core.handle_input(t0, ADDR_DPLUS, reply.get(..n).ok_or("n > buf")?)?;
1392        assert_eq!(core.state_kind(), ClientStateKind::Connected);
1393
1394        let ev = core.pop_event::<DPlus>().ok_or("no event")?;
1395        assert!(matches!(ev, Event::Connected { .. }));
1396        Ok(())
1397    }
1398
1399    #[test]
1400    fn dplus_rejected_on_busy() -> TestResult {
1401        let mut core = new_dplus();
1402        core.attach_host_list(HostList::new())?;
1403        let t0 = Instant::now();
1404        core.enqueue_connect(t0)?;
1405        let _ = core.pop_transmit(t0).ok_or("no link1")?;
1406
1407        let mut ack = [0u8; 8];
1408        let n = dplus_codec::encode_link1_ack(&mut ack)?;
1409        core.handle_input(t0, ADDR_DPLUS, ack.get(..n).ok_or("n > buf")?)?;
1410        let _ = core.pop_transmit(t0).ok_or("no link2")?;
1411
1412        let mut reply = [0u8; 16];
1413        let n = dplus_codec::encode_link2_reply(&mut reply, dplus_codec::Link2Result::Busy)?;
1414        core.handle_input(t0, ADDR_DPLUS, reply.get(..n).ok_or("n > buf")?)?;
1415        assert_eq!(core.state_kind(), ClientStateKind::Closed);
1416
1417        let ev = core.pop_event::<DPlus>().ok_or("no event")?;
1418        assert!(
1419            matches!(
1420                ev,
1421                Event::Disconnected {
1422                    reason: DisconnectReason::Rejected
1423                }
1424            ),
1425            "expected Disconnected(Rejected), got {ev:?}"
1426        );
1427        Ok(())
1428    }
1429
1430    // ── DCS ───────────────────────────────────────────────────
1431
1432    #[test]
1433    fn dcs_connect_success() -> TestResult {
1434        let mut core = new_dcs();
1435        let t0 = Instant::now();
1436        core.enqueue_connect(t0)?;
1437
1438        let tx = core.pop_transmit(t0).ok_or("no link packet")?;
1439        assert_eq!(tx.payload.len(), 519);
1440
1441        let mut ack = [0u8; 16];
1442        let n = dcs_codec::encode_connect_ack(
1443            &mut ack,
1444            &Callsign::from_wire_bytes(*b"DCS001  "),
1445            Module::C,
1446        )?;
1447        core.handle_input(t0, ADDR_DCS, ack.get(..n).ok_or("n > buf")?)?;
1448        assert_eq!(core.state_kind(), ClientStateKind::Connected);
1449
1450        let ev = core.pop_event::<Dcs>().ok_or("no event")?;
1451        assert!(matches!(ev, Event::Connected { .. }));
1452        Ok(())
1453    }
1454
1455    #[test]
1456    fn dcs_rejected_on_nak() -> TestResult {
1457        let mut core = new_dcs();
1458        let t0 = Instant::now();
1459        core.enqueue_connect(t0)?;
1460        let _ = core.pop_transmit(t0).ok_or("no link")?;
1461
1462        let mut nak = [0u8; 16];
1463        let n = dcs_codec::encode_connect_nak(
1464            &mut nak,
1465            &Callsign::from_wire_bytes(*b"DCS001  "),
1466            Module::C,
1467        )?;
1468        core.handle_input(t0, ADDR_DCS, nak.get(..n).ok_or("n > buf")?)?;
1469        assert_eq!(core.state_kind(), ClientStateKind::Closed);
1470
1471        let ev = core.pop_event::<Dcs>().ok_or("no event")?;
1472        assert!(
1473            matches!(
1474                ev,
1475                Event::Disconnected {
1476                    reason: DisconnectReason::Rejected
1477                }
1478            ),
1479            "expected Disconnected(Rejected), got {ev:?}"
1480        );
1481        Ok(())
1482    }
1483
1484    // ── pop_transmit / next_deadline ──────────────────────────
1485
1486    #[test]
1487    fn pop_transmit_holds_current_tx_across_calls() -> TestResult {
1488        let mut core = new_dextra();
1489        let t0 = Instant::now();
1490
1491        core.outbox.enqueue(OutboundPacket {
1492            dst: ADDR_DEXTRA,
1493            payload: vec![1, 2, 3],
1494            not_before: t0,
1495        });
1496        core.outbox.enqueue(OutboundPacket {
1497            dst: ADDR_DEXTRA,
1498            payload: vec![4, 5, 6],
1499            not_before: t0 + Duration::from_millis(1),
1500        });
1501
1502        {
1503            let tx = core
1504                .pop_transmit(t0 + Duration::from_secs(1))
1505                .ok_or("no tx1")?;
1506            assert_eq!(tx.payload, &[1, 2, 3]);
1507        }
1508        {
1509            let tx = core
1510                .pop_transmit(t0 + Duration::from_secs(1))
1511                .ok_or("no tx2")?;
1512            assert_eq!(tx.payload, &[4, 5, 6]);
1513        }
1514        assert!(core.pop_transmit(t0 + Duration::from_secs(1)).is_none());
1515        Ok(())
1516    }
1517
1518    #[test]
1519    fn next_deadline_combines_sources() -> TestResult {
1520        let mut core = new_dextra();
1521        let t0 = Instant::now();
1522        core.timers
1523            .set(TIMER_KEEPALIVE, t0 + Duration::from_secs(5));
1524        core.outbox.enqueue(OutboundPacket {
1525            dst: ADDR_DEXTRA,
1526            payload: vec![],
1527            not_before: t0 + Duration::from_secs(2),
1528        });
1529
1530        let dl = core.next_deadline().ok_or("no deadline")?;
1531        assert_eq!(dl, t0 + Duration::from_secs(2));
1532        Ok(())
1533    }
1534
1535    #[test]
1536    fn next_deadline_none_when_idle() {
1537        let core = new_dextra();
1538        assert!(core.next_deadline().is_none());
1539    }
1540
1541    // ── drain_diagnostics ────────────────────────────────────
1542
1543    #[test]
1544    fn drain_diagnostics_is_empty_on_fresh_core() {
1545        let mut core = new_dextra();
1546        assert!(core.drain_diagnostics().is_empty());
1547    }
1548
1549    // ── Error paths ───────────────────────────────────────────
1550
1551    #[test]
1552    fn attach_host_list_rejects_non_dplus() {
1553        let mut core = new_dextra();
1554        let result = core.attach_host_list(HostList::new());
1555        assert!(
1556            matches!(result, Err(Error::State(StateError::WrongState { .. }))),
1557            "DExtra must reject host list, got {result:?}"
1558        );
1559    }
1560
1561    #[test]
1562    fn attach_host_list_rejects_wrong_state() -> TestResult {
1563        let mut core = new_dplus();
1564        core.attach_host_list(HostList::new())?;
1565        let result = core.attach_host_list(HostList::new());
1566        assert!(
1567            matches!(result, Err(Error::State(StateError::WrongState { .. }))),
1568            "second attach must fail, got {result:?}"
1569        );
1570        Ok(())
1571    }
1572
1573    #[test]
1574    fn event_queue_empty_returns_none() {
1575        let mut core = new_dextra();
1576        assert!(core.pop_event::<DExtra>().is_none());
1577    }
1578
1579    // ── Voice TX / RX ────────────────────────────────────────
1580
1581    use crate::error::DcsError;
1582    use crate::header::DStarHeader;
1583    use crate::types::{StreamId, Suffix};
1584    use crate::voice::VoiceFrame;
1585
1586    #[expect(clippy::unwrap_used, reason = "const-validated: n is non-zero")]
1587    const fn sid(n: u16) -> StreamId {
1588        // Option::unwrap is const since 1.83 — panics at compile
1589        // time on zero, never at runtime with a non-zero literal.
1590        StreamId::new(n).unwrap()
1591    }
1592
1593    const fn test_header() -> DStarHeader {
1594        DStarHeader {
1595            flag1: 0,
1596            flag2: 0,
1597            flag3: 0,
1598            rpt2: Callsign::from_wire_bytes(*b"XRF030 G"),
1599            rpt1: Callsign::from_wire_bytes(*b"XRF030 C"),
1600            ur_call: Callsign::from_wire_bytes(*b"CQCQCQ  "),
1601            my_call: Callsign::from_wire_bytes(*b"W1AW    "),
1602            my_suffix: Suffix::from_wire_bytes(*b"D75 "),
1603        }
1604    }
1605
1606    // ── Voice TX: header sizes ────────────────────────────────
1607
1608    #[test]
1609    fn dextra_connected_enqueue_send_header_produces_56_bytes() -> TestResult {
1610        let (mut core, _) = connected_dextra()?;
1611        let now = Instant::now();
1612        core.enqueue_send_header(now, &test_header(), sid(0x1234))?;
1613        let tx = core.pop_transmit(now).ok_or("no header tx")?;
1614        assert_eq!(tx.payload.len(), 56, "DExtra voice header is 56 bytes");
1615        Ok(())
1616    }
1617
1618    #[test]
1619    fn dplus_connected_enqueue_send_header_produces_58_bytes() -> TestResult {
1620        let (mut core, _) = connected_dplus()?;
1621        let now = Instant::now();
1622        core.enqueue_send_header(now, &test_header(), sid(0x1234))?;
1623        let tx = core.pop_transmit(now).ok_or("no header tx")?;
1624        assert_eq!(tx.payload.len(), 58, "DPlus voice header is 58 bytes");
1625        Ok(())
1626    }
1627
1628    #[test]
1629    fn dcs_connected_enqueue_send_header_produces_100_bytes() -> TestResult {
1630        let (mut core, _) = connected_dcs()?;
1631        let now = Instant::now();
1632        core.enqueue_send_header(now, &test_header(), sid(0x1234))?;
1633        let tx = core.pop_transmit(now).ok_or("no header tx")?;
1634        assert_eq!(tx.payload.len(), 100, "DCS voice frame is always 100 bytes");
1635        Ok(())
1636    }
1637
1638    // ── Voice TX: data frame sizes ────────────────────────────
1639
1640    #[test]
1641    fn dplus_connected_enqueue_send_voice_produces_29_bytes() -> TestResult {
1642        let (mut core, _) = connected_dplus()?;
1643        let now = Instant::now();
1644        core.enqueue_send_voice(now, sid(0x1234), 5, &VoiceFrame::silence())?;
1645        let tx = core.pop_transmit(now).ok_or("no voice tx")?;
1646        assert_eq!(tx.payload.len(), 29, "DPlus voice data is 29 bytes");
1647        Ok(())
1648    }
1649
1650    #[test]
1651    fn dextra_connected_enqueue_send_voice_produces_27_bytes() -> TestResult {
1652        let (mut core, _) = connected_dextra()?;
1653        let now = Instant::now();
1654        core.enqueue_send_voice(now, sid(0x1234), 5, &VoiceFrame::silence())?;
1655        let tx = core.pop_transmit(now).ok_or("no voice tx")?;
1656        assert_eq!(tx.payload.len(), 27, "DExtra voice data is 27 bytes");
1657        Ok(())
1658    }
1659
1660    // ── Voice TX: EOT sizes ───────────────────────────────────
1661
1662    #[test]
1663    fn dplus_connected_enqueue_send_eot_produces_32_bytes() -> TestResult {
1664        let (mut core, _) = connected_dplus()?;
1665        let now = Instant::now();
1666        core.enqueue_send_eot(now, sid(0x1234), 21)?;
1667        let tx = core.pop_transmit(now).ok_or("no eot tx")?;
1668        assert_eq!(tx.payload.len(), 32, "DPlus voice EOT is 32 bytes");
1669        Ok(())
1670    }
1671
1672    #[test]
1673    fn dextra_connected_enqueue_send_eot_produces_27_bytes() -> TestResult {
1674        let (mut core, _) = connected_dextra()?;
1675        let now = Instant::now();
1676        core.enqueue_send_eot(now, sid(0x1234), 21)?;
1677        let tx = core.pop_transmit(now).ok_or("no eot tx")?;
1678        assert_eq!(tx.payload.len(), 27, "DExtra voice EOT is 27 bytes");
1679        Ok(())
1680    }
1681
1682    // ── DCS NoTxHeader error path ─────────────────────────────
1683
1684    #[test]
1685    fn dcs_send_voice_without_header_errors() -> TestResult {
1686        let (mut core, _) = connected_dcs()?;
1687        let now = Instant::now();
1688        let result = core.enqueue_send_voice(now, sid(0x1234), 1, &VoiceFrame::silence());
1689        assert!(
1690            matches!(
1691                result,
1692                Err(Error::Protocol(ProtocolError::Dcs(DcsError::NoTxHeader)))
1693            ),
1694            "expected NoTxHeader, got {result:?}"
1695        );
1696        Ok(())
1697    }
1698
1699    #[test]
1700    fn dcs_send_eot_without_header_errors() -> TestResult {
1701        let (mut core, _) = connected_dcs()?;
1702        let now = Instant::now();
1703        let result = core.enqueue_send_eot(now, sid(0x1234), 1);
1704        assert!(
1705            matches!(
1706                result,
1707                Err(Error::Protocol(ProtocolError::Dcs(DcsError::NoTxHeader)))
1708            ),
1709            "expected NoTxHeader, got {result:?}"
1710        );
1711        Ok(())
1712    }
1713
1714    #[test]
1715    fn dcs_send_voice_after_header_succeeds() -> TestResult {
1716        let (mut core, _) = connected_dcs()?;
1717        let now = Instant::now();
1718        core.enqueue_send_header(now, &test_header(), sid(0x1234))?;
1719        let _ = core.pop_transmit(now).ok_or("no header tx")?;
1720        core.enqueue_send_voice(now, sid(0x1234), 1, &VoiceFrame::silence())?;
1721        let voice_tx = core.pop_transmit(now).ok_or("no voice tx")?;
1722        assert_eq!(voice_tx.payload.len(), 100);
1723        Ok(())
1724    }
1725
1726    #[test]
1727    fn dcs_send_eot_after_header_succeeds() -> TestResult {
1728        let (mut core, _) = connected_dcs()?;
1729        let now = Instant::now();
1730        core.enqueue_send_header(now, &test_header(), sid(0x1234))?;
1731        let _ = core.pop_transmit(now).ok_or("no header tx")?;
1732        core.enqueue_send_eot(now, sid(0x1234), 21)?;
1733        let eot_tx = core.pop_transmit(now).ok_or("no eot tx")?;
1734        assert_eq!(eot_tx.payload.len(), 100);
1735        Ok(())
1736    }
1737
1738    // ── Wrong-state error path ───────────────────────────────
1739
1740    #[test]
1741    fn enqueue_send_header_in_configured_state_errors() {
1742        let mut core = new_dextra();
1743        let now = Instant::now();
1744        let result = core.enqueue_send_header(now, &test_header(), sid(0x1234));
1745        assert!(matches!(
1746            result,
1747            Err(Error::State(StateError::WrongState { .. }))
1748        ));
1749    }
1750
1751    #[test]
1752    fn enqueue_send_voice_in_configured_state_errors() {
1753        let mut core = new_dextra();
1754        let now = Instant::now();
1755        let result = core.enqueue_send_voice(now, sid(0x1234), 1, &VoiceFrame::silence());
1756        assert!(matches!(
1757            result,
1758            Err(Error::State(StateError::WrongState { .. }))
1759        ));
1760    }
1761
1762    #[test]
1763    fn enqueue_send_eot_in_configured_state_errors() {
1764        let mut core = new_dextra();
1765        let now = Instant::now();
1766        let result = core.enqueue_send_eot(now, sid(0x1234), 1);
1767        assert!(matches!(
1768            result,
1769            Err(Error::State(StateError::WrongState { .. }))
1770        ));
1771    }
1772
1773    // ── Voice RX: handle_input emits voice events ────────────
1774
1775    #[test]
1776    fn dextra_handle_voice_header_emits_voice_start() -> TestResult {
1777        let (mut core, _) = connected_dextra()?;
1778        let now = Instant::now();
1779        let mut buf = [0u8; 64];
1780        let n = dextra_codec::encode_voice_header(&mut buf, sid(0x1234), &test_header())?;
1781        core.handle_input(now, ADDR_DEXTRA, buf.get(..n).ok_or("n > buf")?)?;
1782        let event = core.pop_event::<DExtra>().ok_or("no event")?;
1783        assert!(matches!(event, Event::VoiceStart { stream_id, .. } if stream_id.get() == 0x1234));
1784        Ok(())
1785    }
1786
1787    #[test]
1788    fn dextra_handle_voice_data_emits_voice_frame() -> TestResult {
1789        let (mut core, _) = connected_dextra()?;
1790        let now = Instant::now();
1791        let mut buf = [0u8; 64];
1792        let n = dextra_codec::encode_voice_data(&mut buf, sid(0x1234), 7, &VoiceFrame::silence())?;
1793        core.handle_input(now, ADDR_DEXTRA, buf.get(..n).ok_or("n > buf")?)?;
1794        let event = core.pop_event::<DExtra>().ok_or("no event")?;
1795        assert!(
1796            matches!(event, Event::VoiceFrame { stream_id, seq, .. } if stream_id.get() == 0x1234 && seq == 7)
1797        );
1798        Ok(())
1799    }
1800
1801    #[test]
1802    fn dextra_handle_voice_eot_emits_voice_end() -> TestResult {
1803        let (mut core, _) = connected_dextra()?;
1804        let now = Instant::now();
1805        let mut buf = [0u8; 64];
1806        let n = dextra_codec::encode_voice_eot(&mut buf, sid(0x1234), 21)?;
1807        core.handle_input(now, ADDR_DEXTRA, buf.get(..n).ok_or("n > buf")?)?;
1808        let event = core.pop_event::<DExtra>().ok_or("no event")?;
1809        assert!(
1810            matches!(event, Event::VoiceEnd { stream_id, reason } if stream_id.get() == 0x1234 && reason == VoiceEndReason::Eot)
1811        );
1812        Ok(())
1813    }
1814
1815    #[test]
1816    fn dplus_handle_voice_header_emits_voice_start() -> TestResult {
1817        let (mut core, _) = connected_dplus()?;
1818        let now = Instant::now();
1819        let mut buf = [0u8; 64];
1820        let n = dplus_codec::encode_voice_header(&mut buf, sid(0x4567), &test_header())?;
1821        core.handle_input(now, ADDR_DPLUS, buf.get(..n).ok_or("n > buf")?)?;
1822        let event = core.pop_event::<DPlus>().ok_or("no event")?;
1823        assert!(matches!(event, Event::VoiceStart { stream_id, .. } if stream_id.get() == 0x4567));
1824        Ok(())
1825    }
1826
1827    #[test]
1828    fn dplus_handle_voice_eot_emits_voice_end() -> TestResult {
1829        let (mut core, _) = connected_dplus()?;
1830        let now = Instant::now();
1831        let mut buf = [0u8; 64];
1832        let n = dplus_codec::encode_voice_eot(&mut buf, sid(0x4567), 21)?;
1833        core.handle_input(now, ADDR_DPLUS, buf.get(..n).ok_or("n > buf")?)?;
1834        let event = core.pop_event::<DPlus>().ok_or("no event")?;
1835        assert!(matches!(event, Event::VoiceEnd { .. }));
1836        Ok(())
1837    }
1838
1839    fn non_silence_frame() -> VoiceFrame {
1840        VoiceFrame {
1841            ambe: [0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09],
1842            slow_data: [0xAA, 0xBB, 0xCC],
1843        }
1844    }
1845
1846    #[test]
1847    fn dcs_handle_first_voice_frame_emits_voice_start() -> TestResult {
1848        let (mut core, _) = connected_dcs()?;
1849        let now = Instant::now();
1850        let mut buf = [0u8; 128];
1851        let n = dcs_codec::encode_voice(
1852            &mut buf,
1853            &test_header(),
1854            sid(0x789A),
1855            0,
1856            &non_silence_frame(),
1857            false,
1858        )?;
1859        core.handle_input(now, ADDR_DCS, buf.get(..n).ok_or("n > buf")?)?;
1860        let event = core.pop_event::<Dcs>().ok_or("no event")?;
1861        assert!(matches!(event, Event::VoiceStart { stream_id, .. } if stream_id.get() == 0x789A));
1862        Ok(())
1863    }
1864
1865    #[test]
1866    fn dcs_handle_subsequent_voice_frame_emits_voice_frame() -> TestResult {
1867        let (mut core, _) = connected_dcs()?;
1868        let now = Instant::now();
1869        let mut buf = [0u8; 128];
1870        let n = dcs_codec::encode_voice(
1871            &mut buf,
1872            &test_header(),
1873            sid(0x789A),
1874            5,
1875            &non_silence_frame(),
1876            false,
1877        )?;
1878        core.handle_input(now, ADDR_DCS, buf.get(..n).ok_or("n > buf")?)?;
1879        let event = core.pop_event::<Dcs>().ok_or("no event")?;
1880        assert!(
1881            matches!(event, Event::VoiceFrame { stream_id, seq, .. } if stream_id.get() == 0x789A && seq == 5)
1882        );
1883        Ok(())
1884    }
1885
1886    #[test]
1887    fn dcs_handle_voice_end_emits_voice_end() -> TestResult {
1888        let (mut core, _) = connected_dcs()?;
1889        let now = Instant::now();
1890        let mut buf = [0u8; 128];
1891        let n = dcs_codec::encode_voice(
1892            &mut buf,
1893            &test_header(),
1894            sid(0x789A),
1895            21,
1896            &VoiceFrame::silence(),
1897            true,
1898        )?;
1899        core.handle_input(now, ADDR_DCS, buf.get(..n).ok_or("n > buf")?)?;
1900        let event = core.pop_event::<Dcs>().ok_or("no event")?;
1901        assert!(
1902            matches!(event, Event::VoiceEnd { stream_id, reason } if stream_id.get() == 0x789A && reason == VoiceEndReason::Eot)
1903        );
1904        Ok(())
1905    }
1906}