dstar_gateway_core/session/client/
session.rs

1//! Typestate `Session<P, S>` wrapper.
2//!
3//! Wraps [`SessionCore`] with compile-time state tracking via the
4//! `S: ClientState` phantom. Methods are gated by `impl` blocks on
5//! specific `Session<P, S>` shapes — calling `send_voice` on a
6//! `Session<P, Configured>` is a compile error, not a runtime error.
7//!
8//! The `Session<P, S>` is a thin wrapper over [`SessionCore`] (the
9//! protocol-erased state machine). The phantom types add zero
10//! runtime cost — the entire typestate machinery compiles away.
11
12use std::marker::PhantomData;
13use std::net::SocketAddr;
14use std::time::Instant;
15
16use crate::codec::dplus::HostList;
17use crate::error::{Error, StateError};
18use crate::header::DStarHeader;
19use crate::session::driver::{Driver, Transmit};
20use crate::types::{Callsign, StreamId};
21use crate::validator::Diagnostic;
22use crate::voice::VoiceFrame;
23
24use super::core::SessionCore;
25use super::event::Event;
26use super::failed::Failed;
27use super::protocol::{DPlus, NoAuthRequired, Protocol};
28use super::state::{
29    Authenticated, ClientState, ClientStateKind, Closed, Configured, Connected, Connecting,
30    Disconnecting,
31};
32
33/// A typed reflector session.
34///
35/// `P` is the protocol marker ([`DPlus`], [`super::DExtra`],
36/// [`super::Dcs`]). `S` is the connection state marker
37/// ([`Configured`], [`Connecting`], etc.). Methods are gated by
38/// `impl` blocks on specific `Session<P, S>` shapes — calling
39/// `send_voice` on a `Session<P, Configured>` is a compile error, not
40/// a runtime error.
41///
42/// The `Session<P, S>` is a thin wrapper over [`SessionCore`] (the
43/// protocol-erased state machine). The phantom types add zero
44/// runtime cost — the entire typestate machinery compiles away.
45#[derive(Debug)]
46pub struct Session<P: Protocol, S: ClientState> {
47    pub(crate) inner: SessionCore,
48    pub(crate) _protocol: PhantomData<P>,
49    pub(crate) _state: PhantomData<S>,
50}
51
52// ─── Universal: state inspection works in any state ────────────
53
54impl<P: Protocol, S: ClientState> Session<P, S> {
55    /// Runtime discriminator for the current state.
56    #[must_use]
57    pub const fn state_kind(&self) -> ClientStateKind {
58        self.inner.state_kind()
59    }
60
61    /// The reflector address this session was built to talk to.
62    #[must_use]
63    pub const fn peer(&self) -> SocketAddr {
64        self.inner.peer()
65    }
66
67    /// The local station callsign.
68    #[must_use]
69    pub const fn local_callsign(&self) -> Callsign {
70        self.inner.callsign()
71    }
72
73    /// Drain accumulated diagnostics, if any.
74    ///
75    /// Returns a `Vec` (not an iterator) because the underlying sink
76    /// lives inside the session; returning a borrowed iterator would
77    /// force the caller to hold a `&mut Session` for the lifetime of
78    /// the iterator. Most consumers call this periodically and
79    /// process the batch.
80    pub fn diagnostics(&mut self) -> Vec<Diagnostic> {
81        self.inner.drain_diagnostics()
82    }
83}
84
85// ─── Universal `Driver` impl — the typestate wraps the same state
86//     machine regardless of which `(P, S)` it's in. ────────────
87
88impl<P: Protocol, S: ClientState> Driver for Session<P, S> {
89    type Event = Event<P>;
90    type Error = Error;
91
92    fn handle_input(
93        &mut self,
94        now: Instant,
95        peer: SocketAddr,
96        bytes: &[u8],
97    ) -> Result<(), Self::Error> {
98        self.inner.handle_input(now, peer, bytes)
99    }
100
101    fn handle_timeout(&mut self, now: Instant) {
102        self.inner.handle_timeout(now);
103    }
104
105    fn poll_transmit(&mut self, now: Instant) -> Option<Transmit<'_>> {
106        self.inner.pop_transmit(now)
107    }
108
109    fn poll_event(&mut self) -> Option<Self::Event> {
110        self.inner.pop_event::<P>()
111    }
112
113    fn poll_timeout(&self) -> Option<Instant> {
114        self.inner.next_deadline()
115    }
116}
117
118// ─── Configured -> Connecting (no-auth protocols only) ─────────
119
120impl<P: Protocol + NoAuthRequired> Session<P, Configured> {
121    /// Send the LINK/connect packet and transition to [`Connecting`].
122    ///
123    /// # Errors
124    ///
125    /// Returns the original session in the error position if the
126    /// state machine refuses the transition (e.g., codec encoder
127    /// failure). This lets the caller retry without rebuilding.
128    #[expect(
129        clippy::result_large_err,
130        reason = "Failed<Self, Error> is large because Self wraps the full SessionCore; \
131                  boxing would force every caller to unbox on success too"
132    )]
133    pub fn connect(mut self, now: Instant) -> Result<Session<P, Connecting>, Failed<Self, Error>> {
134        match self.inner.enqueue_connect(now) {
135            Ok(()) => Ok(Session {
136                inner: self.inner,
137                _protocol: PhantomData,
138                _state: PhantomData,
139            }),
140            Err(error) => Err(Failed {
141                session: self,
142                error,
143            }),
144        }
145    }
146}
147
148// ─── DPlus has an extra hop: Configured -> Authenticated ────────
149
150impl Session<DPlus, Configured> {
151    /// Mark the session as authenticated using a previously-fetched host list.
152    ///
153    /// The actual TCP auth happens in the shell crate — the
154    /// sans-io core takes the resulting [`HostList`] here as input.
155    ///
156    /// # Errors
157    ///
158    /// Returns the original session in the error position if the
159    /// state machine rejects the host list attachment.
160    #[expect(
161        clippy::result_large_err,
162        reason = "Failed<Self, Error> is large because Self wraps the full SessionCore; \
163                  boxing would force every caller to unbox on success too"
164    )]
165    pub fn authenticate(
166        mut self,
167        hosts: HostList,
168    ) -> Result<Session<DPlus, Authenticated>, Failed<Self, Error>> {
169        match self.inner.attach_host_list(hosts) {
170            Ok(()) => Ok(Session {
171                inner: self.inner,
172                _protocol: PhantomData,
173                _state: PhantomData,
174            }),
175            Err(error) => Err(Failed {
176                session: self,
177                error,
178            }),
179        }
180    }
181}
182
183// ─── DPlus Authenticated -> Connecting ─────────────────────────
184
185impl Session<DPlus, Authenticated> {
186    /// Send LINK1 and transition to [`Connecting`].
187    ///
188    /// # Errors
189    ///
190    /// Returns the original session in the error position if the
191    /// state machine refuses the transition.
192    #[expect(
193        clippy::result_large_err,
194        reason = "Failed<Self, Error> is large because Self wraps the full SessionCore; \
195                  boxing would force every caller to unbox on success too"
196    )]
197    pub fn connect(
198        mut self,
199        now: Instant,
200    ) -> Result<Session<DPlus, Connecting>, Failed<Self, Error>> {
201        match self.inner.enqueue_connect(now) {
202            Ok(()) => Ok(Session {
203                inner: self.inner,
204                _protocol: PhantomData,
205                _state: PhantomData,
206            }),
207            Err(error) => Err(Failed {
208                session: self,
209                error,
210            }),
211        }
212    }
213
214    /// Get the cached host list from the TCP auth step.
215    ///
216    /// The [`Authenticated`] state is entered only after
217    /// [`SessionCore::attach_host_list`] succeeds, so the host list is
218    /// always present here. A `None` would indicate a bug in
219    /// [`SessionCore`]; we fall back to a module-level empty
220    /// sentinel rather than panic so lib code stays
221    /// `expect_used`-clean.
222    #[must_use]
223    pub fn host_list(&self) -> &HostList {
224        self.inner.host_list().unwrap_or(&EMPTY_HOST_LIST)
225    }
226}
227
228/// Module-level sentinel returned by
229/// [`Session::<DPlus, Authenticated>::host_list`] when the core's
230/// internal host list is (impossibly) `None`. Defined as a `static`
231/// so callers can hold a `&'static HostList` without lifetime
232/// gymnastics.
233static EMPTY_HOST_LIST: HostList = HostList::new();
234
235// ─── Connecting -> Connected (promote) ─────────────────────────
236
237impl<P: Protocol> Session<P, Connecting> {
238    /// Promote a session that has reached [`Connected`] state.
239    ///
240    /// The shell calls this after observing [`Event::Connected`] from
241    /// the event stream. `promote` only succeeds if the core is
242    /// already in [`ClientStateKind::Connected`]; any other state
243    /// returns a [`Failed`] with a [`StateError::WrongState`] error.
244    /// If the session was rejected mid-handshake the caller should
245    /// inspect the failed session's [`Session::state_kind`] — it may
246    /// already be in [`ClientStateKind::Closed`].
247    ///
248    /// # Errors
249    ///
250    /// Returns `Err(Failed)` if the session is not yet in
251    /// [`ClientStateKind::Connected`]. The `session` field of the
252    /// [`Failed`] carries the unchanged `Session<P, Connecting>`.
253    #[expect(
254        clippy::result_large_err,
255        reason = "Failed<Self, Error> is large because Self wraps the full SessionCore; \
256                  boxing would force every caller to unbox on success too"
257    )]
258    pub fn promote(self) -> Result<Session<P, Connected>, Failed<Self, Error>> {
259        if self.inner.state_kind() == ClientStateKind::Connected {
260            Ok(Session {
261                inner: self.inner,
262                _protocol: PhantomData,
263                _state: PhantomData,
264            })
265        } else {
266            let error = Error::State(StateError::WrongState {
267                operation: "Session::promote",
268                state: self.inner.state_kind(),
269                protocol: self.inner.protocol_kind(),
270            });
271            Err(Failed {
272                session: self,
273                error,
274            })
275        }
276    }
277}
278
279// ─── Connected -> Disconnecting + voice TX ─────────────────────
280
281impl<P: Protocol> Session<P, Connected> {
282    /// Initiate a graceful disconnect.
283    ///
284    /// Enqueues the UNLINK packet and transitions to
285    /// [`Disconnecting`]. The caller must continue to drive the
286    /// [`Driver`] loop (polling `poll_transmit` / `poll_event`) until
287    /// either the UNLINK ACK arrives (emitting [`Event::Disconnected`]
288    /// with [`super::DisconnectReason::UnlinkAcked`]) or the
289    /// disconnect deadline expires (emitting [`Event::Disconnected`]
290    /// with [`super::DisconnectReason::DisconnectTimeout`]).
291    ///
292    /// # Errors
293    ///
294    /// Returns the original session in the error position if the
295    /// state machine refuses the transition (e.g., codec encoder
296    /// failure).
297    #[expect(
298        clippy::result_large_err,
299        reason = "Failed<Self, Error> is large because Self wraps the full SessionCore; \
300                  boxing would force every caller to unbox on success too"
301    )]
302    pub fn disconnect(
303        mut self,
304        now: Instant,
305    ) -> Result<Session<P, Disconnecting>, Failed<Self, Error>> {
306        match self.inner.enqueue_disconnect(now) {
307            Ok(()) => Ok(Session {
308                inner: self.inner,
309                _protocol: PhantomData,
310                _state: PhantomData,
311            }),
312            Err(error) => Err(Failed {
313                session: self,
314                error,
315            }),
316        }
317    }
318
319    /// Enqueue an UNLINK packet without consuming the session.
320    ///
321    /// Used by the tokio shell's `SessionLoop` to trigger a disconnect
322    /// from within the event loop, where consuming `self` isn't
323    /// possible. After this returns, the internal state machine has
324    /// transitioned to [`Disconnecting`], but the typestate parameter
325    /// on this handle is still [`Connected`]. The caller should exit
326    /// the event loop and construct a `Session<P, Disconnecting>` or
327    /// wait for the internal state to reach [`Closed`].
328    ///
329    /// # Errors
330    ///
331    /// Returns [`Error`] if the core's [`SessionCore::enqueue_disconnect`]
332    /// call fails (e.g., the codec encoder rejects the UNLINK packet).
333    pub fn disconnect_in_place(&mut self, now: Instant) -> Result<(), Error> {
334        self.inner.enqueue_disconnect(now)
335    }
336
337    /// Send a voice header and start a new outbound voice stream.
338    ///
339    /// The header is cached internally; subsequent
340    /// [`Self::send_voice`] / [`Self::send_eot`] calls will reference
341    /// it (DCS embeds the full header in every voice frame, so the
342    /// cache is mandatory there).
343    ///
344    /// This method takes `&mut self`, NOT `self` — voice TX does not
345    /// change the typestate. The session stays in [`Connected`] until
346    /// `disconnect` or a timeout closes it.
347    ///
348    /// # Errors
349    ///
350    /// Returns [`Error::Protocol`] if the codec encoder fails.
351    pub fn send_header(
352        &mut self,
353        now: Instant,
354        header: &DStarHeader,
355        stream_id: StreamId,
356    ) -> Result<(), Error> {
357        self.inner.enqueue_send_header(now, header, stream_id)
358    }
359
360    /// Send a voice data frame.
361    ///
362    /// This method takes `&mut self`, NOT `self`.
363    ///
364    /// # Errors
365    ///
366    /// Returns [`Error::Protocol`] if the codec encoder fails.
367    /// On DCS, returns [`Error::Protocol`] with
368    /// [`crate::error::DcsError::NoTxHeader`] if [`Self::send_header`]
369    /// has not been called first.
370    pub fn send_voice(
371        &mut self,
372        now: Instant,
373        stream_id: StreamId,
374        seq: u8,
375        frame: &VoiceFrame,
376    ) -> Result<(), Error> {
377        self.inner.enqueue_send_voice(now, stream_id, seq, frame)
378    }
379
380    /// Send a voice EOT packet, ending the outbound stream.
381    ///
382    /// This method takes `&mut self`, NOT `self`. The session stays
383    /// in [`Connected`] after EOT — the caller may begin a new stream
384    /// by calling [`Self::send_header`] again.
385    ///
386    /// # Errors
387    ///
388    /// Returns [`Error::Protocol`] if the codec encoder fails.
389    /// On DCS, returns [`Error::Protocol`] with
390    /// [`crate::error::DcsError::NoTxHeader`] if [`Self::send_header`]
391    /// has not been called first.
392    pub fn send_eot(&mut self, now: Instant, stream_id: StreamId, seq: u8) -> Result<(), Error> {
393        self.inner.enqueue_send_eot(now, stream_id, seq)
394    }
395}
396
397// ─── Disconnecting -> Closed (promote) ──────────────────────────
398
399impl<P: Protocol> Session<P, Disconnecting> {
400    /// Promote to [`Closed`] once the UNLINK ACK arrives or the deadline fires.
401    ///
402    /// Same pattern as [`Session::<P, Connecting>::promote`]. The
403    /// shell watches for [`Event::Disconnected`] and then calls this.
404    ///
405    /// # Errors
406    ///
407    /// Returns `Err(Failed)` if the session is not yet in
408    /// [`ClientStateKind::Closed`] (the disconnect ACK hasn't arrived
409    /// yet).
410    #[expect(
411        clippy::result_large_err,
412        reason = "Failed<Self, Error> is large because Self wraps the full SessionCore; \
413                  boxing would force every caller to unbox on success too"
414    )]
415    pub fn promote(self) -> Result<Session<P, Closed>, Failed<Self, Error>> {
416        if self.inner.state_kind() == ClientStateKind::Closed {
417            Ok(Session {
418                inner: self.inner,
419                _protocol: PhantomData,
420                _state: PhantomData,
421            })
422        } else {
423            let error = Error::State(StateError::WrongState {
424                operation: "Session::promote",
425                state: self.inner.state_kind(),
426                protocol: self.inner.protocol_kind(),
427            });
428            Err(Failed {
429                session: self,
430                error,
431            })
432        }
433    }
434}
435
436#[cfg(test)]
437mod tests {
438    use super::*;
439    use crate::codec::dextra::encode_connect_ack;
440    use crate::session::client::protocol::DExtra;
441    use crate::types::{Module, ProtocolKind};
442    use std::net::{IpAddr, Ipv4Addr, SocketAddr};
443    use std::time::Duration;
444
445    type TestResult = Result<(), Box<dyn std::error::Error>>;
446
447    const ADDR: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 30001);
448    const fn cs(bytes: [u8; 8]) -> Callsign {
449        Callsign::from_wire_bytes(bytes)
450    }
451
452    fn new_dextra_configured() -> Session<DExtra, Configured> {
453        let core = SessionCore::new(
454            ProtocolKind::DExtra,
455            cs(*b"W1AW    "),
456            Module::B,
457            Module::C,
458            ADDR,
459        );
460        Session {
461            inner: core,
462            _protocol: PhantomData,
463            _state: PhantomData,
464        }
465    }
466
467    #[test]
468    fn dextra_configured_state_kind() {
469        let session = new_dextra_configured();
470        assert_eq!(session.state_kind(), ClientStateKind::Configured);
471    }
472
473    #[test]
474    fn dextra_connect_transitions_to_connecting() -> TestResult {
475        let session = new_dextra_configured();
476        let now = Instant::now();
477        let connecting = session.connect(now)?;
478        assert_eq!(connecting.state_kind(), ClientStateKind::Connecting);
479        Ok(())
480    }
481
482    #[test]
483    fn dextra_full_connect_cycle() -> TestResult {
484        let now = Instant::now();
485        let session = new_dextra_configured();
486        let mut connecting = session.connect(now)?;
487        assert!(
488            connecting.poll_transmit(now).is_some(),
489            "LINK transmit ready"
490        );
491
492        let mut ack_buf = [0u8; 16];
493        let n = encode_connect_ack(&mut ack_buf, &cs(*b"W1AW    "), Module::C)?;
494        connecting.handle_input(now, ADDR, ack_buf.get(..n).ok_or("slice")?)?;
495
496        assert_eq!(connecting.state_kind(), ClientStateKind::Connected);
497        let connected = connecting.promote()?;
498        assert_eq!(connected.state_kind(), ClientStateKind::Connected);
499        Ok(())
500    }
501
502    #[test]
503    fn dextra_promote_fails_if_still_connecting() -> TestResult {
504        let now = Instant::now();
505        let session = new_dextra_configured();
506        let connecting = session.connect(now)?;
507        let Err(err) = connecting.promote() else {
508            return Err("expected promote to fail".into());
509        };
510        assert_eq!(err.session.state_kind(), ClientStateKind::Connecting);
511        Ok(())
512    }
513
514    #[test]
515    fn dextra_connected_disconnect_transitions_to_disconnecting() -> TestResult {
516        let now = Instant::now();
517        let session = new_dextra_configured();
518        let mut connecting = session.connect(now)?;
519        let mut ack_buf = [0u8; 16];
520        let n = encode_connect_ack(&mut ack_buf, &cs(*b"W1AW    "), Module::C)?;
521        connecting.handle_input(now, ADDR, ack_buf.get(..n).ok_or("slice")?)?;
522        let connected = connecting.promote()?;
523        let disconnecting = connected.disconnect(now + Duration::from_secs(1))?;
524        assert_eq!(disconnecting.state_kind(), ClientStateKind::Disconnecting);
525        Ok(())
526    }
527
528    #[test]
529    fn peer_accessor_works_in_any_state() {
530        let session = new_dextra_configured();
531        assert_eq!(session.peer(), ADDR);
532    }
533
534    #[test]
535    fn local_callsign_accessor_works_in_any_state() {
536        let session = new_dextra_configured();
537        assert_eq!(session.local_callsign(), cs(*b"W1AW    "));
538    }
539
540    #[test]
541    fn diagnostics_drain_starts_empty() {
542        let mut session = new_dextra_configured();
543        assert!(session.diagnostics().is_empty());
544    }
545
546    // ─── Voice TX on Session<P, Connected> ─────────────────────
547
548    use crate::types::Suffix;
549
550    const fn test_header() -> DStarHeader {
551        DStarHeader {
552            flag1: 0,
553            flag2: 0,
554            flag3: 0,
555            rpt2: Callsign::from_wire_bytes(*b"XRF030 G"),
556            rpt1: Callsign::from_wire_bytes(*b"XRF030 C"),
557            ur_call: Callsign::from_wire_bytes(*b"CQCQCQ  "),
558            my_call: Callsign::from_wire_bytes(*b"W1AW    "),
559            my_suffix: Suffix::from_wire_bytes(*b"D75 "),
560        }
561    }
562
563    #[expect(clippy::unwrap_used, reason = "const-validated: n is non-zero")]
564    const fn sid(n: u16) -> StreamId {
565        StreamId::new(n).unwrap()
566    }
567
568    fn dextra_connected() -> Result<Session<DExtra, Connected>, Box<dyn std::error::Error>> {
569        let now = Instant::now();
570        let session = new_dextra_configured();
571        let mut connecting = session.connect(now)?;
572        let mut ack_buf = [0u8; 16];
573        let n = encode_connect_ack(&mut ack_buf, &cs(*b"W1AW    "), Module::C)?;
574        connecting.handle_input(now, ADDR, ack_buf.get(..n).ok_or("slice")?)?;
575        Ok(connecting.promote()?)
576    }
577
578    #[test]
579    fn dextra_connected_send_header_succeeds() -> TestResult {
580        let mut session = dextra_connected()?;
581        let now = Instant::now();
582        session.send_header(now, &test_header(), sid(0x1234))?;
583        let _link = session.poll_transmit(now).ok_or("link tx")?;
584        let header_tx = session.poll_transmit(now).ok_or("voice header tx")?;
585        assert_eq!(header_tx.payload.len(), 56);
586        Ok(())
587    }
588
589    #[test]
590    fn dextra_connected_send_voice_succeeds() -> TestResult {
591        let mut session = dextra_connected()?;
592        let now = Instant::now();
593        let frame = VoiceFrame::silence();
594        session.send_voice(now, sid(0x1234), 5, &frame)?;
595        let _link = session.poll_transmit(now).ok_or("link tx")?;
596        let voice_tx = session.poll_transmit(now).ok_or("voice tx")?;
597        assert_eq!(voice_tx.payload.len(), 27);
598        Ok(())
599    }
600
601    #[test]
602    fn dextra_connected_send_eot_succeeds() -> TestResult {
603        let mut session = dextra_connected()?;
604        let now = Instant::now();
605        session.send_eot(now, sid(0x1234), 21)?;
606        let _link = session.poll_transmit(now).ok_or("link tx")?;
607        let eot_tx = session.poll_transmit(now).ok_or("eot tx")?;
608        assert_eq!(eot_tx.payload.len(), 27);
609        Ok(())
610    }
611
612    #[test]
613    fn dextra_connected_send_header_does_not_change_state() -> TestResult {
614        let mut session = dextra_connected()?;
615        let now = Instant::now();
616        session.send_header(now, &test_header(), sid(0x1234))?;
617        assert_eq!(session.state_kind(), ClientStateKind::Connected);
618        Ok(())
619    }
620
621    #[test]
622    fn dextra_connected_disconnect_in_place_transitions_internal_state() -> TestResult {
623        let mut session = dextra_connected()?;
624        assert_eq!(session.state_kind(), ClientStateKind::Connected);
625        session.disconnect_in_place(Instant::now())?;
626        assert_eq!(session.state_kind(), ClientStateKind::Disconnecting);
627        Ok(())
628    }
629}