1use std::marker::PhantomData;
13use std::net::SocketAddr;
14use std::time::Instant;
15
16use crate::codec::dplus::HostList;
17use crate::error::{Error, StateError};
18use crate::header::DStarHeader;
19use crate::session::driver::{Driver, Transmit};
20use crate::types::{Callsign, StreamId};
21use crate::validator::Diagnostic;
22use crate::voice::VoiceFrame;
23
24use super::core::SessionCore;
25use super::event::Event;
26use super::failed::Failed;
27use super::protocol::{DPlus, NoAuthRequired, Protocol};
28use super::state::{
29 Authenticated, ClientState, ClientStateKind, Closed, Configured, Connected, Connecting,
30 Disconnecting,
31};
32
33#[derive(Debug)]
46pub struct Session<P: Protocol, S: ClientState> {
47 pub(crate) inner: SessionCore,
48 pub(crate) _protocol: PhantomData<P>,
49 pub(crate) _state: PhantomData<S>,
50}
51
52impl<P: Protocol, S: ClientState> Session<P, S> {
55 #[must_use]
57 pub const fn state_kind(&self) -> ClientStateKind {
58 self.inner.state_kind()
59 }
60
61 #[must_use]
63 pub const fn peer(&self) -> SocketAddr {
64 self.inner.peer()
65 }
66
67 #[must_use]
69 pub const fn local_callsign(&self) -> Callsign {
70 self.inner.callsign()
71 }
72
73 pub fn diagnostics(&mut self) -> Vec<Diagnostic> {
81 self.inner.drain_diagnostics()
82 }
83}
84
85impl<P: Protocol, S: ClientState> Driver for Session<P, S> {
89 type Event = Event<P>;
90 type Error = Error;
91
92 fn handle_input(
93 &mut self,
94 now: Instant,
95 peer: SocketAddr,
96 bytes: &[u8],
97 ) -> Result<(), Self::Error> {
98 self.inner.handle_input(now, peer, bytes)
99 }
100
101 fn handle_timeout(&mut self, now: Instant) {
102 self.inner.handle_timeout(now);
103 }
104
105 fn poll_transmit(&mut self, now: Instant) -> Option<Transmit<'_>> {
106 self.inner.pop_transmit(now)
107 }
108
109 fn poll_event(&mut self) -> Option<Self::Event> {
110 self.inner.pop_event::<P>()
111 }
112
113 fn poll_timeout(&self) -> Option<Instant> {
114 self.inner.next_deadline()
115 }
116}
117
118impl<P: Protocol + NoAuthRequired> Session<P, Configured> {
121 #[expect(
129 clippy::result_large_err,
130 reason = "Failed<Self, Error> is large because Self wraps the full SessionCore; \
131 boxing would force every caller to unbox on success too"
132 )]
133 pub fn connect(mut self, now: Instant) -> Result<Session<P, Connecting>, Failed<Self, Error>> {
134 match self.inner.enqueue_connect(now) {
135 Ok(()) => Ok(Session {
136 inner: self.inner,
137 _protocol: PhantomData,
138 _state: PhantomData,
139 }),
140 Err(error) => Err(Failed {
141 session: self,
142 error,
143 }),
144 }
145 }
146}
147
148impl Session<DPlus, Configured> {
151 #[expect(
161 clippy::result_large_err,
162 reason = "Failed<Self, Error> is large because Self wraps the full SessionCore; \
163 boxing would force every caller to unbox on success too"
164 )]
165 pub fn authenticate(
166 mut self,
167 hosts: HostList,
168 ) -> Result<Session<DPlus, Authenticated>, Failed<Self, Error>> {
169 match self.inner.attach_host_list(hosts) {
170 Ok(()) => Ok(Session {
171 inner: self.inner,
172 _protocol: PhantomData,
173 _state: PhantomData,
174 }),
175 Err(error) => Err(Failed {
176 session: self,
177 error,
178 }),
179 }
180 }
181}
182
183impl Session<DPlus, Authenticated> {
186 #[expect(
193 clippy::result_large_err,
194 reason = "Failed<Self, Error> is large because Self wraps the full SessionCore; \
195 boxing would force every caller to unbox on success too"
196 )]
197 pub fn connect(
198 mut self,
199 now: Instant,
200 ) -> Result<Session<DPlus, Connecting>, Failed<Self, Error>> {
201 match self.inner.enqueue_connect(now) {
202 Ok(()) => Ok(Session {
203 inner: self.inner,
204 _protocol: PhantomData,
205 _state: PhantomData,
206 }),
207 Err(error) => Err(Failed {
208 session: self,
209 error,
210 }),
211 }
212 }
213
214 #[must_use]
223 pub fn host_list(&self) -> &HostList {
224 self.inner.host_list().unwrap_or(&EMPTY_HOST_LIST)
225 }
226}
227
228static EMPTY_HOST_LIST: HostList = HostList::new();
234
235impl<P: Protocol> Session<P, Connecting> {
238 #[expect(
254 clippy::result_large_err,
255 reason = "Failed<Self, Error> is large because Self wraps the full SessionCore; \
256 boxing would force every caller to unbox on success too"
257 )]
258 pub fn promote(self) -> Result<Session<P, Connected>, Failed<Self, Error>> {
259 if self.inner.state_kind() == ClientStateKind::Connected {
260 Ok(Session {
261 inner: self.inner,
262 _protocol: PhantomData,
263 _state: PhantomData,
264 })
265 } else {
266 let error = Error::State(StateError::WrongState {
267 operation: "Session::promote",
268 state: self.inner.state_kind(),
269 protocol: self.inner.protocol_kind(),
270 });
271 Err(Failed {
272 session: self,
273 error,
274 })
275 }
276 }
277}
278
279impl<P: Protocol> Session<P, Connected> {
282 #[expect(
298 clippy::result_large_err,
299 reason = "Failed<Self, Error> is large because Self wraps the full SessionCore; \
300 boxing would force every caller to unbox on success too"
301 )]
302 pub fn disconnect(
303 mut self,
304 now: Instant,
305 ) -> Result<Session<P, Disconnecting>, Failed<Self, Error>> {
306 match self.inner.enqueue_disconnect(now) {
307 Ok(()) => Ok(Session {
308 inner: self.inner,
309 _protocol: PhantomData,
310 _state: PhantomData,
311 }),
312 Err(error) => Err(Failed {
313 session: self,
314 error,
315 }),
316 }
317 }
318
319 pub fn disconnect_in_place(&mut self, now: Instant) -> Result<(), Error> {
334 self.inner.enqueue_disconnect(now)
335 }
336
337 pub fn send_header(
352 &mut self,
353 now: Instant,
354 header: &DStarHeader,
355 stream_id: StreamId,
356 ) -> Result<(), Error> {
357 self.inner.enqueue_send_header(now, header, stream_id)
358 }
359
360 pub fn send_voice(
371 &mut self,
372 now: Instant,
373 stream_id: StreamId,
374 seq: u8,
375 frame: &VoiceFrame,
376 ) -> Result<(), Error> {
377 self.inner.enqueue_send_voice(now, stream_id, seq, frame)
378 }
379
380 pub fn send_eot(&mut self, now: Instant, stream_id: StreamId, seq: u8) -> Result<(), Error> {
393 self.inner.enqueue_send_eot(now, stream_id, seq)
394 }
395}
396
397impl<P: Protocol> Session<P, Disconnecting> {
400 #[expect(
411 clippy::result_large_err,
412 reason = "Failed<Self, Error> is large because Self wraps the full SessionCore; \
413 boxing would force every caller to unbox on success too"
414 )]
415 pub fn promote(self) -> Result<Session<P, Closed>, Failed<Self, Error>> {
416 if self.inner.state_kind() == ClientStateKind::Closed {
417 Ok(Session {
418 inner: self.inner,
419 _protocol: PhantomData,
420 _state: PhantomData,
421 })
422 } else {
423 let error = Error::State(StateError::WrongState {
424 operation: "Session::promote",
425 state: self.inner.state_kind(),
426 protocol: self.inner.protocol_kind(),
427 });
428 Err(Failed {
429 session: self,
430 error,
431 })
432 }
433 }
434}
435
436#[cfg(test)]
437mod tests {
438 use super::*;
439 use crate::codec::dextra::encode_connect_ack;
440 use crate::session::client::protocol::DExtra;
441 use crate::types::{Module, ProtocolKind};
442 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
443 use std::time::Duration;
444
445 type TestResult = Result<(), Box<dyn std::error::Error>>;
446
447 const ADDR: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 30001);
448 const fn cs(bytes: [u8; 8]) -> Callsign {
449 Callsign::from_wire_bytes(bytes)
450 }
451
452 fn new_dextra_configured() -> Session<DExtra, Configured> {
453 let core = SessionCore::new(
454 ProtocolKind::DExtra,
455 cs(*b"W1AW "),
456 Module::B,
457 Module::C,
458 ADDR,
459 );
460 Session {
461 inner: core,
462 _protocol: PhantomData,
463 _state: PhantomData,
464 }
465 }
466
467 #[test]
468 fn dextra_configured_state_kind() {
469 let session = new_dextra_configured();
470 assert_eq!(session.state_kind(), ClientStateKind::Configured);
471 }
472
473 #[test]
474 fn dextra_connect_transitions_to_connecting() -> TestResult {
475 let session = new_dextra_configured();
476 let now = Instant::now();
477 let connecting = session.connect(now)?;
478 assert_eq!(connecting.state_kind(), ClientStateKind::Connecting);
479 Ok(())
480 }
481
482 #[test]
483 fn dextra_full_connect_cycle() -> TestResult {
484 let now = Instant::now();
485 let session = new_dextra_configured();
486 let mut connecting = session.connect(now)?;
487 assert!(
488 connecting.poll_transmit(now).is_some(),
489 "LINK transmit ready"
490 );
491
492 let mut ack_buf = [0u8; 16];
493 let n = encode_connect_ack(&mut ack_buf, &cs(*b"W1AW "), Module::C)?;
494 connecting.handle_input(now, ADDR, ack_buf.get(..n).ok_or("slice")?)?;
495
496 assert_eq!(connecting.state_kind(), ClientStateKind::Connected);
497 let connected = connecting.promote()?;
498 assert_eq!(connected.state_kind(), ClientStateKind::Connected);
499 Ok(())
500 }
501
502 #[test]
503 fn dextra_promote_fails_if_still_connecting() -> TestResult {
504 let now = Instant::now();
505 let session = new_dextra_configured();
506 let connecting = session.connect(now)?;
507 let Err(err) = connecting.promote() else {
508 return Err("expected promote to fail".into());
509 };
510 assert_eq!(err.session.state_kind(), ClientStateKind::Connecting);
511 Ok(())
512 }
513
514 #[test]
515 fn dextra_connected_disconnect_transitions_to_disconnecting() -> TestResult {
516 let now = Instant::now();
517 let session = new_dextra_configured();
518 let mut connecting = session.connect(now)?;
519 let mut ack_buf = [0u8; 16];
520 let n = encode_connect_ack(&mut ack_buf, &cs(*b"W1AW "), Module::C)?;
521 connecting.handle_input(now, ADDR, ack_buf.get(..n).ok_or("slice")?)?;
522 let connected = connecting.promote()?;
523 let disconnecting = connected.disconnect(now + Duration::from_secs(1))?;
524 assert_eq!(disconnecting.state_kind(), ClientStateKind::Disconnecting);
525 Ok(())
526 }
527
528 #[test]
529 fn peer_accessor_works_in_any_state() {
530 let session = new_dextra_configured();
531 assert_eq!(session.peer(), ADDR);
532 }
533
534 #[test]
535 fn local_callsign_accessor_works_in_any_state() {
536 let session = new_dextra_configured();
537 assert_eq!(session.local_callsign(), cs(*b"W1AW "));
538 }
539
540 #[test]
541 fn diagnostics_drain_starts_empty() {
542 let mut session = new_dextra_configured();
543 assert!(session.diagnostics().is_empty());
544 }
545
546 use crate::types::Suffix;
549
550 const fn test_header() -> DStarHeader {
551 DStarHeader {
552 flag1: 0,
553 flag2: 0,
554 flag3: 0,
555 rpt2: Callsign::from_wire_bytes(*b"XRF030 G"),
556 rpt1: Callsign::from_wire_bytes(*b"XRF030 C"),
557 ur_call: Callsign::from_wire_bytes(*b"CQCQCQ "),
558 my_call: Callsign::from_wire_bytes(*b"W1AW "),
559 my_suffix: Suffix::from_wire_bytes(*b"D75 "),
560 }
561 }
562
563 #[expect(clippy::unwrap_used, reason = "const-validated: n is non-zero")]
564 const fn sid(n: u16) -> StreamId {
565 StreamId::new(n).unwrap()
566 }
567
568 fn dextra_connected() -> Result<Session<DExtra, Connected>, Box<dyn std::error::Error>> {
569 let now = Instant::now();
570 let session = new_dextra_configured();
571 let mut connecting = session.connect(now)?;
572 let mut ack_buf = [0u8; 16];
573 let n = encode_connect_ack(&mut ack_buf, &cs(*b"W1AW "), Module::C)?;
574 connecting.handle_input(now, ADDR, ack_buf.get(..n).ok_or("slice")?)?;
575 Ok(connecting.promote()?)
576 }
577
578 #[test]
579 fn dextra_connected_send_header_succeeds() -> TestResult {
580 let mut session = dextra_connected()?;
581 let now = Instant::now();
582 session.send_header(now, &test_header(), sid(0x1234))?;
583 let _link = session.poll_transmit(now).ok_or("link tx")?;
584 let header_tx = session.poll_transmit(now).ok_or("voice header tx")?;
585 assert_eq!(header_tx.payload.len(), 56);
586 Ok(())
587 }
588
589 #[test]
590 fn dextra_connected_send_voice_succeeds() -> TestResult {
591 let mut session = dextra_connected()?;
592 let now = Instant::now();
593 let frame = VoiceFrame::silence();
594 session.send_voice(now, sid(0x1234), 5, &frame)?;
595 let _link = session.poll_transmit(now).ok_or("link tx")?;
596 let voice_tx = session.poll_transmit(now).ok_or("voice tx")?;
597 assert_eq!(voice_tx.payload.len(), 27);
598 Ok(())
599 }
600
601 #[test]
602 fn dextra_connected_send_eot_succeeds() -> TestResult {
603 let mut session = dextra_connected()?;
604 let now = Instant::now();
605 session.send_eot(now, sid(0x1234), 21)?;
606 let _link = session.poll_transmit(now).ok_or("link tx")?;
607 let eot_tx = session.poll_transmit(now).ok_or("eot tx")?;
608 assert_eq!(eot_tx.payload.len(), 27);
609 Ok(())
610 }
611
612 #[test]
613 fn dextra_connected_send_header_does_not_change_state() -> TestResult {
614 let mut session = dextra_connected()?;
615 let now = Instant::now();
616 session.send_header(now, &test_header(), sid(0x1234))?;
617 assert_eq!(session.state_kind(), ClientStateKind::Connected);
618 Ok(())
619 }
620
621 #[test]
622 fn dextra_connected_disconnect_in_place_transitions_internal_state() -> TestResult {
623 let mut session = dextra_connected()?;
624 assert_eq!(session.state_kind(), ClientStateKind::Connected);
625 session.disconnect_in_place(Instant::now())?;
626 assert_eq!(session.state_kind(), ClientStateKind::Disconnecting);
627 Ok(())
628 }
629}