1use std::collections::VecDeque;
15use std::net::SocketAddr;
16use std::time::{Duration, Instant};
17
18use crate::codec::{dcs, dextra, dplus};
19use crate::error::{DcsError, Error, ProtocolError, StateError};
20use crate::header::DStarHeader;
21use crate::session::driver::Transmit;
22use crate::session::outbox::{OutboundPacket, Outbox};
23use crate::session::timer_wheel::TimerWheel;
24use crate::types::{Callsign, Module, ProtocolKind, StreamId};
25use crate::validator::{Diagnostic, VecSink};
26use crate::voice::VoiceFrame;
27
28use super::event::{DisconnectReason, Event, VoiceEndReason};
29use super::protocol::Protocol;
30use super::state::ClientStateKind;
31
32const TIMER_KEEPALIVE: &str = "keepalive";
34const TIMER_KEEPALIVE_INACTIVITY: &str = "keepalive_inactivity";
36const TIMER_DISCONNECT_DEADLINE: &str = "disconnect_deadline";
38
39#[derive(Debug, Clone)]
44enum RawEvent {
45 Connected {
47 peer: SocketAddr,
49 },
50 Disconnected {
52 reason: DisconnectReason,
54 },
55 PollEcho {
57 peer: SocketAddr,
59 },
60 VoiceStart {
62 stream_id: StreamId,
64 header: DStarHeader,
66 },
67 VoiceFrame {
69 stream_id: StreamId,
71 seq: u8,
73 frame: VoiceFrame,
75 },
76 VoiceEnd {
78 stream_id: StreamId,
80 reason: VoiceEndReason,
82 },
83}
84
85pub struct SessionCore {
92 kind: ProtocolKind,
94 callsign: Callsign,
96 local_module: Module,
98 reflector_module: Module,
100 reflector_callsign: Option<Callsign>,
115 peer: SocketAddr,
117 state: ClientStateKind,
119 outbox: Outbox,
121 timers: TimerWheel,
123 current_tx: Option<OutboundPacket>,
127 events: VecDeque<RawEvent>,
129 host_list: Option<dplus::HostList>,
135 cached_tx_header: Option<DStarHeader>,
144 diagnostics: VecSink,
150}
151
152impl std::fmt::Debug for SessionCore {
153 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
154 f.debug_struct("SessionCore")
155 .field("kind", &self.kind)
156 .field("callsign", &self.callsign)
157 .field("local_module", &self.local_module)
158 .field("reflector_module", &self.reflector_module)
159 .field("reflector_callsign", &self.reflector_callsign)
160 .field("peer", &self.peer)
161 .field("state", &self.state)
162 .field("outbox", &self.outbox)
163 .field("timers", &self.timers)
164 .field("current_tx", &self.current_tx)
165 .field("events", &self.events)
166 .field("host_list", &self.host_list)
167 .field("cached_tx_header", &self.cached_tx_header)
168 .field("diagnostics", &self.diagnostics)
169 .finish()
170 }
171}
172
173impl SessionCore {
174 #[must_use]
183 pub fn new(
184 kind: ProtocolKind,
185 callsign: Callsign,
186 local_module: Module,
187 reflector_module: Module,
188 peer: SocketAddr,
189 ) -> Self {
190 Self::new_with_reflector_callsign(
191 kind,
192 callsign,
193 local_module,
194 reflector_module,
195 peer,
196 None,
197 )
198 }
199
200 #[must_use]
214 pub fn new_with_reflector_callsign(
215 kind: ProtocolKind,
216 callsign: Callsign,
217 local_module: Module,
218 reflector_module: Module,
219 peer: SocketAddr,
220 reflector_callsign: Option<Callsign>,
221 ) -> Self {
222 if kind == ProtocolKind::Dcs && reflector_callsign.is_none() {
223 tracing::warn!(
224 target: "dstar_gateway_core::session",
225 %callsign,
226 %peer,
227 "DCS session constructed without reflector_callsign — \
228 falling back to \"DCS001 \" default. Connections to \
229 any other DCS reflector will fail silently because the \
230 target reflector reads the callsign field from the \
231 LINK packet and drops mismatched traffic. Call \
232 SessionBuilder::reflector_callsign to fix."
233 );
234 }
235 Self {
236 kind,
237 callsign,
238 local_module,
239 reflector_module,
240 reflector_callsign,
241 peer,
242 state: ClientStateKind::Configured,
243 outbox: Outbox::new(),
244 timers: TimerWheel::new(),
245 current_tx: None,
246 events: VecDeque::new(),
247 host_list: None,
248 cached_tx_header: None,
249 diagnostics: VecSink::default(),
250 }
251 }
252
253 pub fn drain_diagnostics(&mut self) -> Vec<Diagnostic> {
260 self.diagnostics.drain().collect()
261 }
262
263 #[must_use]
267 pub const fn state_kind(&self) -> ClientStateKind {
268 self.state
269 }
270
271 #[must_use]
273 pub const fn peer(&self) -> SocketAddr {
274 self.peer
275 }
276
277 #[must_use]
279 pub const fn callsign(&self) -> Callsign {
280 self.callsign
281 }
282
283 #[must_use]
285 pub const fn local_module(&self) -> Module {
286 self.local_module
287 }
288
289 #[must_use]
291 pub const fn reflector_module(&self) -> Module {
292 self.reflector_module
293 }
294
295 #[must_use]
297 pub const fn protocol_kind(&self) -> ProtocolKind {
298 self.kind
299 }
300
301 #[must_use]
303 pub const fn host_list(&self) -> Option<&dplus::HostList> {
304 self.host_list.as_ref()
305 }
306
307 pub fn attach_host_list(&mut self, list: dplus::HostList) -> Result<(), Error> {
326 if self.kind != ProtocolKind::DPlus || self.state != ClientStateKind::Configured {
327 return Err(Error::State(StateError::WrongState {
328 operation: "attach_host_list",
329 state: self.state,
330 protocol: self.kind,
331 }));
332 }
333 self.host_list = Some(list);
334 self.state = ClientStateKind::Authenticated;
335 Ok(())
336 }
337
338 pub fn enqueue_connect(&mut self, now: Instant) -> Result<(), Error> {
349 let packet = match self.kind {
350 ProtocolKind::DPlus => {
351 let mut buf = [0u8; 32];
352 let n = dplus::encode_link1(&mut buf)
353 .map_err(dplus::DPlusError::from)
354 .map_err(ProtocolError::DPlus)?;
355 OutboundPacket {
356 dst: self.peer,
357 payload: buf.get(..n).unwrap_or(&[]).to_vec(),
358 not_before: now,
359 }
360 }
361 ProtocolKind::DExtra => {
362 let mut buf = [0u8; 16];
363 let n = dextra::encode_connect_link(
364 &mut buf,
365 &self.callsign,
366 self.reflector_module,
367 self.local_module,
368 )
369 .map_err(dextra::DExtraError::from)
370 .map_err(ProtocolError::DExtra)?;
371 OutboundPacket {
372 dst: self.peer,
373 payload: buf.get(..n).unwrap_or(&[]).to_vec(),
374 not_before: now,
375 }
376 }
377 ProtocolKind::Dcs => {
378 let mut buf = vec![0u8; 600];
379 let reflector_callsign = self.dcs_reflector_callsign();
380 let n = dcs::encode_connect_link(
381 &mut buf,
382 &self.callsign,
383 self.local_module,
384 self.reflector_module,
385 &reflector_callsign,
386 dcs::GatewayType::Dongle,
387 )
388 .map_err(DcsError::from)
389 .map_err(ProtocolError::Dcs)?;
390 buf.truncate(n);
391 OutboundPacket {
392 dst: self.peer,
393 payload: buf,
394 not_before: now,
395 }
396 }
397 };
398 self.outbox.enqueue(packet);
399 self.state = ClientStateKind::Connecting;
400 self.arm_keepalive_inactivity(now);
401 Ok(())
402 }
403
404 pub fn enqueue_disconnect(&mut self, now: Instant) -> Result<(), Error> {
411 let packet = match self.kind {
412 ProtocolKind::DPlus => {
413 let mut buf = [0u8; 16];
414 let n = dplus::encode_unlink(&mut buf)
415 .map_err(dplus::DPlusError::from)
416 .map_err(ProtocolError::DPlus)?;
417 OutboundPacket {
418 dst: self.peer,
419 payload: buf.get(..n).unwrap_or(&[]).to_vec(),
420 not_before: now,
421 }
422 }
423 ProtocolKind::DExtra => {
424 let mut buf = [0u8; 16];
425 let n = dextra::encode_unlink(&mut buf, &self.callsign, self.local_module)
426 .map_err(dextra::DExtraError::from)
427 .map_err(ProtocolError::DExtra)?;
428 OutboundPacket {
429 dst: self.peer,
430 payload: buf.get(..n).unwrap_or(&[]).to_vec(),
431 not_before: now,
432 }
433 }
434 ProtocolKind::Dcs => {
435 let mut buf = [0u8; 32];
436 let reflector_callsign = self.dcs_reflector_callsign();
437 let n = dcs::encode_connect_unlink(
438 &mut buf,
439 &self.callsign,
440 self.local_module,
441 &reflector_callsign,
442 )
443 .map_err(DcsError::from)
444 .map_err(ProtocolError::Dcs)?;
445 OutboundPacket {
446 dst: self.peer,
447 payload: buf.get(..n).unwrap_or(&[]).to_vec(),
448 not_before: now,
449 }
450 }
451 };
452 self.outbox.enqueue(packet);
453 self.state = ClientStateKind::Disconnecting;
454 self.arm_disconnect_deadline(now);
455 Ok(())
456 }
457
458 pub fn enqueue_send_header(
483 &mut self,
484 now: Instant,
485 header: &DStarHeader,
486 stream_id: StreamId,
487 ) -> Result<(), Error> {
488 if self.state != ClientStateKind::Connected {
489 return Err(Error::State(StateError::WrongState {
490 operation: "enqueue_send_header",
491 state: self.state,
492 protocol: self.kind,
493 }));
494 }
495 self.cached_tx_header = Some(*header);
496 let mut buf = [0u8; 256];
497 let n = match self.kind {
498 ProtocolKind::DPlus => dplus::encode_voice_header(&mut buf, stream_id, header)
499 .map_err(dplus::DPlusError::from)
500 .map_err(ProtocolError::DPlus)?,
501 ProtocolKind::DExtra => dextra::encode_voice_header(&mut buf, stream_id, header)
502 .map_err(dextra::DExtraError::from)
503 .map_err(ProtocolError::DExtra)?,
504 ProtocolKind::Dcs => {
505 let silence = VoiceFrame::silence();
509 dcs::encode_voice(&mut buf, header, stream_id, 0, &silence, false)
510 .map_err(DcsError::from)
511 .map_err(ProtocolError::Dcs)?
512 }
513 };
514 self.outbox.enqueue(OutboundPacket {
515 dst: self.peer,
516 payload: buf.get(..n).unwrap_or(&[]).to_vec(),
517 not_before: now,
518 });
519 Ok(())
520 }
521
522 pub fn enqueue_send_voice(
541 &mut self,
542 now: Instant,
543 stream_id: StreamId,
544 seq: u8,
545 frame: &VoiceFrame,
546 ) -> Result<(), Error> {
547 if self.state != ClientStateKind::Connected {
548 return Err(Error::State(StateError::WrongState {
549 operation: "enqueue_send_voice",
550 state: self.state,
551 protocol: self.kind,
552 }));
553 }
554 let mut buf = [0u8; 256];
555 let n = match self.kind {
556 ProtocolKind::DPlus => dplus::encode_voice_data(&mut buf, stream_id, seq, frame)
557 .map_err(dplus::DPlusError::from)
558 .map_err(ProtocolError::DPlus)?,
559 ProtocolKind::DExtra => dextra::encode_voice_data(&mut buf, stream_id, seq, frame)
560 .map_err(dextra::DExtraError::from)
561 .map_err(ProtocolError::DExtra)?,
562 ProtocolKind::Dcs => {
563 let header = self
564 .cached_tx_header
565 .as_ref()
566 .ok_or(Error::Protocol(ProtocolError::Dcs(DcsError::NoTxHeader)))?;
567 dcs::encode_voice(&mut buf, header, stream_id, seq, frame, false)
568 .map_err(DcsError::from)
569 .map_err(ProtocolError::Dcs)?
570 }
571 };
572 self.outbox.enqueue(OutboundPacket {
573 dst: self.peer,
574 payload: buf.get(..n).unwrap_or(&[]).to_vec(),
575 not_before: now,
576 });
577 Ok(())
578 }
579
580 pub fn enqueue_send_eot(
599 &mut self,
600 now: Instant,
601 stream_id: StreamId,
602 seq: u8,
603 ) -> Result<(), Error> {
604 if self.state != ClientStateKind::Connected {
605 return Err(Error::State(StateError::WrongState {
606 operation: "enqueue_send_eot",
607 state: self.state,
608 protocol: self.kind,
609 }));
610 }
611 let mut buf = [0u8; 256];
612 let n = match self.kind {
613 ProtocolKind::DPlus => dplus::encode_voice_eot(&mut buf, stream_id, seq)
614 .map_err(dplus::DPlusError::from)
615 .map_err(ProtocolError::DPlus)?,
616 ProtocolKind::DExtra => dextra::encode_voice_eot(&mut buf, stream_id, seq)
617 .map_err(dextra::DExtraError::from)
618 .map_err(ProtocolError::DExtra)?,
619 ProtocolKind::Dcs => {
620 let header = self
621 .cached_tx_header
622 .as_ref()
623 .ok_or(Error::Protocol(ProtocolError::Dcs(DcsError::NoTxHeader)))?;
624 let silence = VoiceFrame::silence();
625 dcs::encode_voice(&mut buf, header, stream_id, seq, &silence, true)
626 .map_err(DcsError::from)
627 .map_err(ProtocolError::Dcs)?
628 }
629 };
630 self.outbox.enqueue(OutboundPacket {
631 dst: self.peer,
632 payload: buf.get(..n).unwrap_or(&[]).to_vec(),
633 not_before: now,
634 });
635 Ok(())
636 }
637
638 pub fn handle_input(
656 &mut self,
657 now: Instant,
658 peer: SocketAddr,
659 bytes: &[u8],
660 ) -> Result<(), Error> {
661 match self.kind {
662 ProtocolKind::DPlus => self.handle_dplus_input(now, peer, bytes),
663 ProtocolKind::DExtra => self.handle_dextra_input(now, peer, bytes),
664 ProtocolKind::Dcs => self.handle_dcs_input(now, peer, bytes),
665 }
666 }
667
668 fn handle_dplus_input(
669 &mut self,
670 now: Instant,
671 peer: SocketAddr,
672 bytes: &[u8],
673 ) -> Result<(), Error> {
674 let pkt = match dplus::decode_server_to_client(bytes, &mut self.diagnostics) {
682 Ok(pkt) => pkt,
683 Err(e) => {
684 tracing::debug!(
685 target: "dstar_gateway_core::codec",
686 error = %e,
687 peer = %peer,
688 bytes_len = bytes.len(),
689 "DPlus decoder rejected datagram; dropping (session stays alive)"
690 );
691 return Ok(());
692 }
693 };
694 match pkt {
695 dplus::ServerPacket::Link1Ack => {
696 if self.state == ClientStateKind::Connecting {
697 let mut buf = [0u8; 32];
700 let n = dplus::encode_link2(&mut buf, &self.callsign)
701 .map_err(dplus::DPlusError::from)
702 .map_err(ProtocolError::DPlus)?;
703 self.outbox.enqueue(OutboundPacket {
704 dst: self.peer,
705 payload: buf.get(..n).unwrap_or(&[]).to_vec(),
706 not_before: now,
707 });
708 self.arm_keepalive_inactivity(now);
709 } else if self.state == ClientStateKind::Disconnecting {
710 self.finalize_disconnect(DisconnectReason::UnlinkAcked);
712 }
713 Ok(())
714 }
715 dplus::ServerPacket::Link2Reply { result } => {
716 if self.state == ClientStateKind::Connecting {
717 match result {
718 dplus::Link2Result::Accept => {
719 self.finalize_connected(peer, now);
720 }
721 dplus::Link2Result::Busy | dplus::Link2Result::Unknown { .. } => {
722 self.finalize_rejected();
723 }
724 }
725 }
726 Ok(())
727 }
728 dplus::ServerPacket::Link2Echo { .. } => {
729 if self.state == ClientStateKind::Connecting {
732 self.finalize_connected(peer, now);
733 }
734 Ok(())
735 }
736 dplus::ServerPacket::UnlinkAck => {
737 if self.state == ClientStateKind::Disconnecting {
738 self.finalize_disconnect(DisconnectReason::UnlinkAcked);
739 }
740 Ok(())
741 }
742 dplus::ServerPacket::PollEcho => {
743 self.arm_keepalive_inactivity(now);
744 self.events.push_back(RawEvent::PollEcho { peer });
745 Ok(())
746 }
747 dplus::ServerPacket::VoiceHeader { stream_id, header } => {
748 self.arm_keepalive_inactivity(now);
749 self.events
750 .push_back(RawEvent::VoiceStart { stream_id, header });
751 Ok(())
752 }
753 dplus::ServerPacket::VoiceData {
754 stream_id,
755 seq,
756 frame,
757 } => {
758 self.arm_keepalive_inactivity(now);
759 self.events.push_back(RawEvent::VoiceFrame {
760 stream_id,
761 seq,
762 frame,
763 });
764 Ok(())
765 }
766 dplus::ServerPacket::VoiceEot { stream_id, seq: _ } => {
767 self.arm_keepalive_inactivity(now);
768 self.events.push_back(RawEvent::VoiceEnd {
769 stream_id,
770 reason: VoiceEndReason::Eot,
771 });
772 Ok(())
773 }
774 }
775 }
776
777 #[expect(
778 clippy::unnecessary_wraps,
779 reason = "uniform signature with handle_dplus_input / handle_dcs_input; top-level dispatcher returns Result<(), Error>"
780 )]
781 fn handle_dextra_input(
782 &mut self,
783 now: Instant,
784 peer: SocketAddr,
785 bytes: &[u8],
786 ) -> Result<(), Error> {
787 let pkt = match dextra::decode_server_to_client(bytes, &mut self.diagnostics) {
790 Ok(pkt) => pkt,
791 Err(e) => {
792 tracing::debug!(
793 target: "dstar_gateway_core::codec",
794 error = %e,
795 peer = %peer,
796 bytes_len = bytes.len(),
797 "DExtra decoder rejected datagram; dropping (session stays alive)"
798 );
799 return Ok(());
800 }
801 };
802 match pkt {
803 dextra::ServerPacket::ConnectAck { .. } => {
804 if self.state == ClientStateKind::Connecting {
805 self.finalize_connected(peer, now);
806 }
807 Ok(())
808 }
809 dextra::ServerPacket::ConnectNak { .. } => {
810 if self.state == ClientStateKind::Connecting {
811 self.finalize_rejected();
812 } else if self.state == ClientStateKind::Disconnecting {
813 self.finalize_disconnect(DisconnectReason::UnlinkAcked);
816 }
817 Ok(())
818 }
819 dextra::ServerPacket::PollEcho { .. } => {
820 self.arm_keepalive_inactivity(now);
821 self.events.push_back(RawEvent::PollEcho { peer });
822 Ok(())
823 }
824 dextra::ServerPacket::VoiceHeader { stream_id, header } => {
825 self.arm_keepalive_inactivity(now);
826 self.events
827 .push_back(RawEvent::VoiceStart { stream_id, header });
828 Ok(())
829 }
830 dextra::ServerPacket::VoiceData {
831 stream_id,
832 seq,
833 frame,
834 } => {
835 self.arm_keepalive_inactivity(now);
836 self.events.push_back(RawEvent::VoiceFrame {
837 stream_id,
838 seq,
839 frame,
840 });
841 Ok(())
842 }
843 dextra::ServerPacket::VoiceEot { stream_id, seq: _ } => {
844 self.arm_keepalive_inactivity(now);
845 self.events.push_back(RawEvent::VoiceEnd {
846 stream_id,
847 reason: VoiceEndReason::Eot,
848 });
849 Ok(())
850 }
851 }
852 }
853
854 #[expect(
855 clippy::unnecessary_wraps,
856 reason = "uniform signature with handle_dplus_input / handle_dextra_input; top-level dispatcher returns Result<(), Error>"
857 )]
858 fn handle_dcs_input(
859 &mut self,
860 now: Instant,
861 peer: SocketAddr,
862 bytes: &[u8],
863 ) -> Result<(), Error> {
864 let pkt = match dcs::decode_server_to_client(bytes, &mut self.diagnostics) {
867 Ok(pkt) => pkt,
868 Err(e) => {
869 tracing::debug!(
870 target: "dstar_gateway_core::codec",
871 error = %e,
872 peer = %peer,
873 bytes_len = bytes.len(),
874 "DCS decoder rejected datagram; dropping (session stays alive)"
875 );
876 return Ok(());
877 }
878 };
879 match pkt {
880 dcs::ServerPacket::ConnectAck { .. } => {
881 if self.state == ClientStateKind::Connecting {
882 self.finalize_connected(peer, now);
883 } else if self.state == ClientStateKind::Disconnecting {
884 self.finalize_disconnect(DisconnectReason::UnlinkAcked);
885 }
886 Ok(())
887 }
888 dcs::ServerPacket::ConnectNak { .. } => {
889 if self.state == ClientStateKind::Connecting {
890 self.finalize_rejected();
891 }
892 Ok(())
893 }
894 dcs::ServerPacket::PollEcho { .. } => {
895 self.arm_keepalive_inactivity(now);
896 self.events.push_back(RawEvent::PollEcho { peer });
897 Ok(())
898 }
899 dcs::ServerPacket::Voice {
900 header,
901 stream_id,
902 seq,
903 frame,
904 is_end,
905 } => {
906 self.arm_keepalive_inactivity(now);
907 if is_end {
908 self.events.push_back(RawEvent::VoiceEnd {
909 stream_id,
910 reason: VoiceEndReason::Eot,
911 });
912 } else if seq == 0 {
913 self.events
918 .push_back(RawEvent::VoiceStart { stream_id, header });
919 } else {
920 self.events.push_back(RawEvent::VoiceFrame {
921 stream_id,
922 seq,
923 frame,
924 });
925 }
926 Ok(())
927 }
928 }
929 }
930
931 pub fn handle_timeout(&mut self, now: Instant) {
946 if self.state == ClientStateKind::Connected && self.timers.is_expired(TIMER_KEEPALIVE, now)
947 {
948 self.enqueue_poll(now);
949 self.timers
950 .set(TIMER_KEEPALIVE, now + self.keepalive_interval());
951 }
952
953 if (self.state == ClientStateKind::Connecting || self.state == ClientStateKind::Connected)
954 && self.timers.is_expired(TIMER_KEEPALIVE_INACTIVITY, now)
955 {
956 self.finalize_disconnect(DisconnectReason::KeepaliveInactivity);
957 return;
958 }
959
960 if self.state == ClientStateKind::Disconnecting
961 && self.timers.is_expired(TIMER_DISCONNECT_DEADLINE, now)
962 {
963 self.finalize_disconnect(DisconnectReason::DisconnectTimeout);
964 }
965 }
966
967 #[must_use]
976 pub fn pop_transmit(&mut self, now: Instant) -> Option<Transmit<'_>> {
977 let next = self.outbox.pop_ready(now)?;
978 self.current_tx = Some(next);
979 let held = self.current_tx.as_ref()?;
980 Some(Transmit {
981 dst: held.dst,
982 payload: held.payload.as_slice(),
983 })
984 }
985
986 pub fn pop_event<P: Protocol>(&mut self) -> Option<Event<P>> {
991 let raw = self.events.pop_front()?;
992 Some(match raw {
993 RawEvent::Connected { peer } => Event::Connected { peer },
994 RawEvent::Disconnected { reason } => Event::Disconnected { reason },
995 RawEvent::PollEcho { peer } => Event::PollEcho { peer },
996 RawEvent::VoiceStart { stream_id, header } => Event::VoiceStart {
997 stream_id,
998 header,
999 diagnostics: Vec::new(),
1002 },
1003 RawEvent::VoiceFrame {
1004 stream_id,
1005 seq,
1006 frame,
1007 } => Event::VoiceFrame {
1008 stream_id,
1009 seq,
1010 frame,
1011 },
1012 RawEvent::VoiceEnd { stream_id, reason } => Event::VoiceEnd { stream_id, reason },
1013 })
1014 }
1015
1016 #[must_use]
1021 pub fn next_deadline(&self) -> Option<Instant> {
1022 match (
1023 self.outbox.peek_next_deadline(),
1024 self.timers.next_deadline(),
1025 ) {
1026 (None, None) => None,
1027 (Some(o), None) => Some(o),
1028 (None, Some(t)) => Some(t),
1029 (Some(o), Some(t)) => Some(o.min(t)),
1030 }
1031 }
1032
1033 fn enqueue_poll(&mut self, now: Instant) {
1043 let encoded: Option<Vec<u8>> = match self.kind {
1044 ProtocolKind::DPlus => {
1045 let mut buf = [0u8; 8];
1046 dplus::encode_poll(&mut buf)
1047 .ok()
1048 .and_then(|n| buf.get(..n).map(<[u8]>::to_vec))
1049 }
1050 ProtocolKind::DExtra => {
1051 let mut buf = [0u8; 16];
1052 dextra::encode_poll(&mut buf, &self.callsign)
1053 .ok()
1054 .and_then(|n| buf.get(..n).map(<[u8]>::to_vec))
1055 }
1056 ProtocolKind::Dcs => {
1057 let mut buf = [0u8; 32];
1058 let reflector_callsign = self.dcs_reflector_callsign();
1059 dcs::encode_poll_request(&mut buf, &self.callsign, &reflector_callsign)
1060 .ok()
1061 .and_then(|n| buf.get(..n).map(<[u8]>::to_vec))
1062 }
1063 };
1064 if let Some(payload) = encoded {
1065 self.outbox.enqueue(OutboundPacket {
1066 dst: self.peer,
1067 payload,
1068 not_before: now,
1069 });
1070 }
1071 }
1072
1073 fn finalize_connected(&mut self, peer: SocketAddr, now: Instant) {
1075 self.state = ClientStateKind::Connected;
1076 self.timers
1077 .set(TIMER_KEEPALIVE, now + self.keepalive_interval());
1078 self.arm_keepalive_inactivity(now);
1079 self.events.push_back(RawEvent::Connected { peer });
1080 }
1081
1082 fn finalize_rejected(&mut self) {
1084 self.state = ClientStateKind::Closed;
1085 self.clear_timers();
1086 self.events.push_back(RawEvent::Disconnected {
1087 reason: DisconnectReason::Rejected,
1088 });
1089 }
1090
1091 fn finalize_disconnect(&mut self, reason: DisconnectReason) {
1093 self.state = ClientStateKind::Closed;
1094 self.clear_timers();
1095 self.events.push_back(RawEvent::Disconnected { reason });
1096 }
1097
1098 fn clear_timers(&mut self) {
1099 self.timers.clear(TIMER_KEEPALIVE);
1100 self.timers.clear(TIMER_KEEPALIVE_INACTIVITY);
1101 self.timers.clear(TIMER_DISCONNECT_DEADLINE);
1102 }
1103
1104 fn arm_keepalive_inactivity(&mut self, now: Instant) {
1105 self.timers.set(
1106 TIMER_KEEPALIVE_INACTIVITY,
1107 now + self.keepalive_inactivity_timeout(),
1108 );
1109 }
1110
1111 fn arm_disconnect_deadline(&mut self, now: Instant) {
1112 self.timers
1113 .set(TIMER_DISCONNECT_DEADLINE, now + self.disconnect_timeout());
1114 }
1115
1116 const fn keepalive_interval(&self) -> Duration {
1117 match self.kind {
1118 ProtocolKind::DPlus => dplus::consts::KEEPALIVE_INTERVAL,
1119 ProtocolKind::DExtra => dextra::consts::KEEPALIVE_INTERVAL,
1120 ProtocolKind::Dcs => dcs::consts::KEEPALIVE_INTERVAL,
1121 }
1122 }
1123
1124 const fn keepalive_inactivity_timeout(&self) -> Duration {
1125 match self.kind {
1126 ProtocolKind::DPlus => dplus::consts::KEEPALIVE_INACTIVITY_TIMEOUT,
1127 ProtocolKind::DExtra => dextra::consts::KEEPALIVE_INACTIVITY_TIMEOUT,
1128 ProtocolKind::Dcs => dcs::consts::KEEPALIVE_INACTIVITY_TIMEOUT,
1129 }
1130 }
1131
1132 const fn disconnect_timeout(&self) -> Duration {
1133 match self.kind {
1134 ProtocolKind::DPlus => dplus::consts::DISCONNECT_TIMEOUT,
1135 ProtocolKind::DExtra => dextra::consts::DISCONNECT_TIMEOUT,
1136 ProtocolKind::Dcs => dcs::consts::DISCONNECT_TIMEOUT,
1137 }
1138 }
1139
1140 fn dcs_reflector_callsign(&self) -> Callsign {
1150 self.reflector_callsign
1151 .unwrap_or_else(|| Callsign::from_wire_bytes(*b"DCS001 "))
1152 }
1153}
1154
1155#[cfg(test)]
1156mod tests {
1157 use super::*;
1158 use crate::codec::dplus::HostList;
1159 use crate::codec::{dcs as dcs_codec, dextra as dextra_codec, dplus as dplus_codec};
1160 use crate::session::client::protocol::{DExtra, DPlus, Dcs};
1161 use std::net::{IpAddr, Ipv4Addr};
1162
1163 const fn cs() -> Callsign {
1164 Callsign::from_wire_bytes(*b"W1AW ")
1165 }
1166
1167 const ADDR_DEXTRA: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 30001);
1168 const ADDR_DPLUS: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 20001);
1169 const ADDR_DCS: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 30051);
1170
1171 fn new_dextra() -> SessionCore {
1172 SessionCore::new(
1173 ProtocolKind::DExtra,
1174 cs(),
1175 Module::B,
1176 Module::C,
1177 ADDR_DEXTRA,
1178 )
1179 }
1180
1181 fn new_dplus() -> SessionCore {
1182 SessionCore::new(ProtocolKind::DPlus, cs(), Module::B, Module::C, ADDR_DPLUS)
1183 }
1184
1185 fn new_dcs() -> SessionCore {
1186 SessionCore::new(ProtocolKind::Dcs, cs(), Module::B, Module::C, ADDR_DCS)
1187 }
1188
1189 type TestResult = Result<(), Box<dyn std::error::Error>>;
1190
1191 fn connected_dextra() -> Result<(SessionCore, Instant), Box<dyn std::error::Error>> {
1195 let t0 = Instant::now();
1196 let mut core = new_dextra();
1197 core.enqueue_connect(t0)?;
1198 let _ = core.pop_transmit(t0).ok_or("no link")?;
1199 let mut ack = [0u8; 16];
1200 let n = dextra_codec::encode_connect_ack(&mut ack, &cs(), Module::C)?;
1201 core.handle_input(t0, ADDR_DEXTRA, ack.get(..n).ok_or("n > buf")?)?;
1202 drop(core.pop_event::<DExtra>().ok_or("no connected event")?);
1203 Ok((core, t0))
1204 }
1205
1206 fn connected_dplus() -> Result<(SessionCore, Instant), Box<dyn std::error::Error>> {
1208 let t0 = Instant::now();
1209 let mut core = new_dplus();
1210 core.attach_host_list(HostList::new())?;
1211 core.enqueue_connect(t0)?;
1212 let _ = core.pop_transmit(t0).ok_or("no link1")?;
1213 let mut ack = [0u8; 8];
1214 let n = dplus_codec::encode_link1_ack(&mut ack)?;
1215 core.handle_input(t0, ADDR_DPLUS, ack.get(..n).ok_or("n > buf")?)?;
1216 let _ = core.pop_transmit(t0).ok_or("no link2")?;
1217 let mut reply = [0u8; 16];
1218 let n = dplus_codec::encode_link2_reply(&mut reply, dplus_codec::Link2Result::Accept)?;
1219 core.handle_input(t0, ADDR_DPLUS, reply.get(..n).ok_or("n > buf")?)?;
1220 drop(core.pop_event::<DPlus>().ok_or("no connected event")?);
1221 Ok((core, t0))
1222 }
1223
1224 fn connected_dcs() -> Result<(SessionCore, Instant), Box<dyn std::error::Error>> {
1226 let t0 = Instant::now();
1227 let mut core = new_dcs();
1228 core.enqueue_connect(t0)?;
1229 let _ = core.pop_transmit(t0).ok_or("no link")?;
1230 let mut ack = [0u8; 16];
1231 let n = dcs_codec::encode_connect_ack(
1232 &mut ack,
1233 &Callsign::from_wire_bytes(*b"DCS001 "),
1234 Module::C,
1235 )?;
1236 core.handle_input(t0, ADDR_DCS, ack.get(..n).ok_or("n > buf")?)?;
1237 drop(core.pop_event::<Dcs>().ok_or("no connected event")?);
1238 Ok((core, t0))
1239 }
1240
1241 #[test]
1244 fn dextra_connect_success() -> TestResult {
1245 let mut core = new_dextra();
1246 assert_eq!(core.state_kind(), ClientStateKind::Configured);
1247 let t0 = Instant::now();
1248 core.enqueue_connect(t0)?;
1249 assert_eq!(core.state_kind(), ClientStateKind::Connecting);
1250
1251 let tx = core.pop_transmit(t0).ok_or("no link packet in outbox")?;
1253 assert_eq!(tx.payload.len(), 11);
1254 assert_eq!(tx.dst, ADDR_DEXTRA);
1255
1256 let mut ack = [0u8; 16];
1258 let n = dextra_codec::encode_connect_ack(&mut ack, &cs(), Module::C)?;
1259 core.handle_input(
1260 t0,
1261 ADDR_DEXTRA,
1262 ack.get(..n).ok_or("encode returned n > buf")?,
1263 )?;
1264 assert_eq!(core.state_kind(), ClientStateKind::Connected);
1265
1266 let ev = core.pop_event::<DExtra>().ok_or("no Connected event")?;
1268 assert!(matches!(ev, Event::Connected { .. }));
1269 Ok(())
1270 }
1271
1272 #[test]
1273 fn dextra_connect_rejected() -> Result<(), Box<dyn std::error::Error>> {
1274 let mut core = new_dextra();
1275 let t0 = Instant::now();
1276 core.enqueue_connect(t0)?;
1277 let _ = core.pop_transmit(t0).ok_or("no link packet")?;
1278
1279 let mut nak = [0u8; 16];
1280 let n = dextra_codec::encode_connect_nak(&mut nak, &cs(), Module::C)?;
1281 core.handle_input(t0, ADDR_DEXTRA, nak.get(..n).ok_or("n > buf")?)?;
1282 assert_eq!(core.state_kind(), ClientStateKind::Closed);
1283
1284 let ev = core.pop_event::<DExtra>().ok_or("no event")?;
1285 assert!(
1286 matches!(
1287 ev,
1288 Event::Disconnected {
1289 reason: DisconnectReason::Rejected
1290 }
1291 ),
1292 "expected Disconnected(Rejected), got {ev:?}"
1293 );
1294 Ok(())
1295 }
1296
1297 #[test]
1298 fn dextra_keepalive_fires_poll() -> TestResult {
1299 let (mut core, t0) = connected_dextra()?;
1300
1301 let t1 = t0 + dextra_codec::consts::KEEPALIVE_INTERVAL + Duration::from_millis(1);
1302 core.handle_timeout(t1);
1303
1304 let tx = core.pop_transmit(t1).ok_or("no poll packet")?;
1305 assert_eq!(tx.payload.len(), 9);
1306 Ok(())
1307 }
1308
1309 #[test]
1310 fn dextra_keepalive_inactivity_closes() -> TestResult {
1311 let (mut core, t0) = connected_dextra()?;
1312
1313 let t1 = t0 + dextra_codec::consts::KEEPALIVE_INACTIVITY_TIMEOUT + Duration::from_secs(1);
1314 core.handle_timeout(t1);
1315 assert_eq!(core.state_kind(), ClientStateKind::Closed);
1316
1317 let ev = core.pop_event::<DExtra>().ok_or("no event")?;
1318 assert!(
1319 matches!(
1320 ev,
1321 Event::Disconnected {
1322 reason: DisconnectReason::KeepaliveInactivity
1323 }
1324 ),
1325 "expected Disconnected(KeepaliveInactivity), got {ev:?}"
1326 );
1327 Ok(())
1328 }
1329
1330 #[test]
1331 fn dextra_disconnect_success() -> TestResult {
1332 let (mut core, t0) = connected_dextra()?;
1333
1334 core.enqueue_disconnect(t0)?;
1335 assert_eq!(core.state_kind(), ClientStateKind::Disconnecting);
1336
1337 let tx = core.pop_transmit(t0).ok_or("no unlink packet")?;
1338 assert_eq!(tx.payload.len(), 11);
1339
1340 let mut nak = [0u8; 16];
1342 let n = dextra_codec::encode_connect_nak(&mut nak, &cs(), Module::C)?;
1343 core.handle_input(t0, ADDR_DEXTRA, nak.get(..n).ok_or("n > buf")?)?;
1344 assert_eq!(core.state_kind(), ClientStateKind::Closed);
1345
1346 let ev = core.pop_event::<DExtra>().ok_or("no event")?;
1347 assert!(
1348 matches!(
1349 ev,
1350 Event::Disconnected {
1351 reason: DisconnectReason::UnlinkAcked
1352 }
1353 ),
1354 "expected Disconnected(UnlinkAcked), got {ev:?}"
1355 );
1356 Ok(())
1357 }
1358
1359 #[test]
1362 fn dplus_two_step_connect_success() -> TestResult {
1363 let mut core = new_dplus();
1364 assert_eq!(core.state_kind(), ClientStateKind::Configured);
1365
1366 core.attach_host_list(HostList::new())?;
1367 assert_eq!(core.state_kind(), ClientStateKind::Authenticated);
1368 assert!(core.host_list().is_some());
1369
1370 let t0 = Instant::now();
1371 core.enqueue_connect(t0)?;
1372 assert_eq!(core.state_kind(), ClientStateKind::Connecting);
1373
1374 let tx = core.pop_transmit(t0).ok_or("no link1")?;
1375 assert_eq!(tx.payload.len(), 5);
1376
1377 let mut ack = [0u8; 8];
1378 let n = dplus_codec::encode_link1_ack(&mut ack)?;
1379 core.handle_input(t0, ADDR_DPLUS, ack.get(..n).ok_or("n > buf")?)?;
1380 assert_eq!(
1381 core.state_kind(),
1382 ClientStateKind::Connecting,
1383 "still connecting after LINK1 ACK"
1384 );
1385
1386 let tx = core.pop_transmit(t0).ok_or("no link2")?;
1387 assert_eq!(tx.payload.len(), 28);
1388
1389 let mut reply = [0u8; 16];
1390 let n = dplus_codec::encode_link2_reply(&mut reply, dplus_codec::Link2Result::Accept)?;
1391 core.handle_input(t0, ADDR_DPLUS, reply.get(..n).ok_or("n > buf")?)?;
1392 assert_eq!(core.state_kind(), ClientStateKind::Connected);
1393
1394 let ev = core.pop_event::<DPlus>().ok_or("no event")?;
1395 assert!(matches!(ev, Event::Connected { .. }));
1396 Ok(())
1397 }
1398
1399 #[test]
1400 fn dplus_rejected_on_busy() -> TestResult {
1401 let mut core = new_dplus();
1402 core.attach_host_list(HostList::new())?;
1403 let t0 = Instant::now();
1404 core.enqueue_connect(t0)?;
1405 let _ = core.pop_transmit(t0).ok_or("no link1")?;
1406
1407 let mut ack = [0u8; 8];
1408 let n = dplus_codec::encode_link1_ack(&mut ack)?;
1409 core.handle_input(t0, ADDR_DPLUS, ack.get(..n).ok_or("n > buf")?)?;
1410 let _ = core.pop_transmit(t0).ok_or("no link2")?;
1411
1412 let mut reply = [0u8; 16];
1413 let n = dplus_codec::encode_link2_reply(&mut reply, dplus_codec::Link2Result::Busy)?;
1414 core.handle_input(t0, ADDR_DPLUS, reply.get(..n).ok_or("n > buf")?)?;
1415 assert_eq!(core.state_kind(), ClientStateKind::Closed);
1416
1417 let ev = core.pop_event::<DPlus>().ok_or("no event")?;
1418 assert!(
1419 matches!(
1420 ev,
1421 Event::Disconnected {
1422 reason: DisconnectReason::Rejected
1423 }
1424 ),
1425 "expected Disconnected(Rejected), got {ev:?}"
1426 );
1427 Ok(())
1428 }
1429
1430 #[test]
1433 fn dcs_connect_success() -> TestResult {
1434 let mut core = new_dcs();
1435 let t0 = Instant::now();
1436 core.enqueue_connect(t0)?;
1437
1438 let tx = core.pop_transmit(t0).ok_or("no link packet")?;
1439 assert_eq!(tx.payload.len(), 519);
1440
1441 let mut ack = [0u8; 16];
1442 let n = dcs_codec::encode_connect_ack(
1443 &mut ack,
1444 &Callsign::from_wire_bytes(*b"DCS001 "),
1445 Module::C,
1446 )?;
1447 core.handle_input(t0, ADDR_DCS, ack.get(..n).ok_or("n > buf")?)?;
1448 assert_eq!(core.state_kind(), ClientStateKind::Connected);
1449
1450 let ev = core.pop_event::<Dcs>().ok_or("no event")?;
1451 assert!(matches!(ev, Event::Connected { .. }));
1452 Ok(())
1453 }
1454
1455 #[test]
1456 fn dcs_rejected_on_nak() -> TestResult {
1457 let mut core = new_dcs();
1458 let t0 = Instant::now();
1459 core.enqueue_connect(t0)?;
1460 let _ = core.pop_transmit(t0).ok_or("no link")?;
1461
1462 let mut nak = [0u8; 16];
1463 let n = dcs_codec::encode_connect_nak(
1464 &mut nak,
1465 &Callsign::from_wire_bytes(*b"DCS001 "),
1466 Module::C,
1467 )?;
1468 core.handle_input(t0, ADDR_DCS, nak.get(..n).ok_or("n > buf")?)?;
1469 assert_eq!(core.state_kind(), ClientStateKind::Closed);
1470
1471 let ev = core.pop_event::<Dcs>().ok_or("no event")?;
1472 assert!(
1473 matches!(
1474 ev,
1475 Event::Disconnected {
1476 reason: DisconnectReason::Rejected
1477 }
1478 ),
1479 "expected Disconnected(Rejected), got {ev:?}"
1480 );
1481 Ok(())
1482 }
1483
1484 #[test]
1487 fn pop_transmit_holds_current_tx_across_calls() -> TestResult {
1488 let mut core = new_dextra();
1489 let t0 = Instant::now();
1490
1491 core.outbox.enqueue(OutboundPacket {
1492 dst: ADDR_DEXTRA,
1493 payload: vec![1, 2, 3],
1494 not_before: t0,
1495 });
1496 core.outbox.enqueue(OutboundPacket {
1497 dst: ADDR_DEXTRA,
1498 payload: vec![4, 5, 6],
1499 not_before: t0 + Duration::from_millis(1),
1500 });
1501
1502 {
1503 let tx = core
1504 .pop_transmit(t0 + Duration::from_secs(1))
1505 .ok_or("no tx1")?;
1506 assert_eq!(tx.payload, &[1, 2, 3]);
1507 }
1508 {
1509 let tx = core
1510 .pop_transmit(t0 + Duration::from_secs(1))
1511 .ok_or("no tx2")?;
1512 assert_eq!(tx.payload, &[4, 5, 6]);
1513 }
1514 assert!(core.pop_transmit(t0 + Duration::from_secs(1)).is_none());
1515 Ok(())
1516 }
1517
1518 #[test]
1519 fn next_deadline_combines_sources() -> TestResult {
1520 let mut core = new_dextra();
1521 let t0 = Instant::now();
1522 core.timers
1523 .set(TIMER_KEEPALIVE, t0 + Duration::from_secs(5));
1524 core.outbox.enqueue(OutboundPacket {
1525 dst: ADDR_DEXTRA,
1526 payload: vec![],
1527 not_before: t0 + Duration::from_secs(2),
1528 });
1529
1530 let dl = core.next_deadline().ok_or("no deadline")?;
1531 assert_eq!(dl, t0 + Duration::from_secs(2));
1532 Ok(())
1533 }
1534
1535 #[test]
1536 fn next_deadline_none_when_idle() {
1537 let core = new_dextra();
1538 assert!(core.next_deadline().is_none());
1539 }
1540
1541 #[test]
1544 fn drain_diagnostics_is_empty_on_fresh_core() {
1545 let mut core = new_dextra();
1546 assert!(core.drain_diagnostics().is_empty());
1547 }
1548
1549 #[test]
1552 fn attach_host_list_rejects_non_dplus() {
1553 let mut core = new_dextra();
1554 let result = core.attach_host_list(HostList::new());
1555 assert!(
1556 matches!(result, Err(Error::State(StateError::WrongState { .. }))),
1557 "DExtra must reject host list, got {result:?}"
1558 );
1559 }
1560
1561 #[test]
1562 fn attach_host_list_rejects_wrong_state() -> TestResult {
1563 let mut core = new_dplus();
1564 core.attach_host_list(HostList::new())?;
1565 let result = core.attach_host_list(HostList::new());
1566 assert!(
1567 matches!(result, Err(Error::State(StateError::WrongState { .. }))),
1568 "second attach must fail, got {result:?}"
1569 );
1570 Ok(())
1571 }
1572
1573 #[test]
1574 fn event_queue_empty_returns_none() {
1575 let mut core = new_dextra();
1576 assert!(core.pop_event::<DExtra>().is_none());
1577 }
1578
1579 use crate::error::DcsError;
1582 use crate::header::DStarHeader;
1583 use crate::types::{StreamId, Suffix};
1584 use crate::voice::VoiceFrame;
1585
1586 #[expect(clippy::unwrap_used, reason = "const-validated: n is non-zero")]
1587 const fn sid(n: u16) -> StreamId {
1588 StreamId::new(n).unwrap()
1591 }
1592
1593 const fn test_header() -> DStarHeader {
1594 DStarHeader {
1595 flag1: 0,
1596 flag2: 0,
1597 flag3: 0,
1598 rpt2: Callsign::from_wire_bytes(*b"XRF030 G"),
1599 rpt1: Callsign::from_wire_bytes(*b"XRF030 C"),
1600 ur_call: Callsign::from_wire_bytes(*b"CQCQCQ "),
1601 my_call: Callsign::from_wire_bytes(*b"W1AW "),
1602 my_suffix: Suffix::from_wire_bytes(*b"D75 "),
1603 }
1604 }
1605
1606 #[test]
1609 fn dextra_connected_enqueue_send_header_produces_56_bytes() -> TestResult {
1610 let (mut core, _) = connected_dextra()?;
1611 let now = Instant::now();
1612 core.enqueue_send_header(now, &test_header(), sid(0x1234))?;
1613 let tx = core.pop_transmit(now).ok_or("no header tx")?;
1614 assert_eq!(tx.payload.len(), 56, "DExtra voice header is 56 bytes");
1615 Ok(())
1616 }
1617
1618 #[test]
1619 fn dplus_connected_enqueue_send_header_produces_58_bytes() -> TestResult {
1620 let (mut core, _) = connected_dplus()?;
1621 let now = Instant::now();
1622 core.enqueue_send_header(now, &test_header(), sid(0x1234))?;
1623 let tx = core.pop_transmit(now).ok_or("no header tx")?;
1624 assert_eq!(tx.payload.len(), 58, "DPlus voice header is 58 bytes");
1625 Ok(())
1626 }
1627
1628 #[test]
1629 fn dcs_connected_enqueue_send_header_produces_100_bytes() -> TestResult {
1630 let (mut core, _) = connected_dcs()?;
1631 let now = Instant::now();
1632 core.enqueue_send_header(now, &test_header(), sid(0x1234))?;
1633 let tx = core.pop_transmit(now).ok_or("no header tx")?;
1634 assert_eq!(tx.payload.len(), 100, "DCS voice frame is always 100 bytes");
1635 Ok(())
1636 }
1637
1638 #[test]
1641 fn dplus_connected_enqueue_send_voice_produces_29_bytes() -> TestResult {
1642 let (mut core, _) = connected_dplus()?;
1643 let now = Instant::now();
1644 core.enqueue_send_voice(now, sid(0x1234), 5, &VoiceFrame::silence())?;
1645 let tx = core.pop_transmit(now).ok_or("no voice tx")?;
1646 assert_eq!(tx.payload.len(), 29, "DPlus voice data is 29 bytes");
1647 Ok(())
1648 }
1649
1650 #[test]
1651 fn dextra_connected_enqueue_send_voice_produces_27_bytes() -> TestResult {
1652 let (mut core, _) = connected_dextra()?;
1653 let now = Instant::now();
1654 core.enqueue_send_voice(now, sid(0x1234), 5, &VoiceFrame::silence())?;
1655 let tx = core.pop_transmit(now).ok_or("no voice tx")?;
1656 assert_eq!(tx.payload.len(), 27, "DExtra voice data is 27 bytes");
1657 Ok(())
1658 }
1659
1660 #[test]
1663 fn dplus_connected_enqueue_send_eot_produces_32_bytes() -> TestResult {
1664 let (mut core, _) = connected_dplus()?;
1665 let now = Instant::now();
1666 core.enqueue_send_eot(now, sid(0x1234), 21)?;
1667 let tx = core.pop_transmit(now).ok_or("no eot tx")?;
1668 assert_eq!(tx.payload.len(), 32, "DPlus voice EOT is 32 bytes");
1669 Ok(())
1670 }
1671
1672 #[test]
1673 fn dextra_connected_enqueue_send_eot_produces_27_bytes() -> TestResult {
1674 let (mut core, _) = connected_dextra()?;
1675 let now = Instant::now();
1676 core.enqueue_send_eot(now, sid(0x1234), 21)?;
1677 let tx = core.pop_transmit(now).ok_or("no eot tx")?;
1678 assert_eq!(tx.payload.len(), 27, "DExtra voice EOT is 27 bytes");
1679 Ok(())
1680 }
1681
1682 #[test]
1685 fn dcs_send_voice_without_header_errors() -> TestResult {
1686 let (mut core, _) = connected_dcs()?;
1687 let now = Instant::now();
1688 let result = core.enqueue_send_voice(now, sid(0x1234), 1, &VoiceFrame::silence());
1689 assert!(
1690 matches!(
1691 result,
1692 Err(Error::Protocol(ProtocolError::Dcs(DcsError::NoTxHeader)))
1693 ),
1694 "expected NoTxHeader, got {result:?}"
1695 );
1696 Ok(())
1697 }
1698
1699 #[test]
1700 fn dcs_send_eot_without_header_errors() -> TestResult {
1701 let (mut core, _) = connected_dcs()?;
1702 let now = Instant::now();
1703 let result = core.enqueue_send_eot(now, sid(0x1234), 1);
1704 assert!(
1705 matches!(
1706 result,
1707 Err(Error::Protocol(ProtocolError::Dcs(DcsError::NoTxHeader)))
1708 ),
1709 "expected NoTxHeader, got {result:?}"
1710 );
1711 Ok(())
1712 }
1713
1714 #[test]
1715 fn dcs_send_voice_after_header_succeeds() -> TestResult {
1716 let (mut core, _) = connected_dcs()?;
1717 let now = Instant::now();
1718 core.enqueue_send_header(now, &test_header(), sid(0x1234))?;
1719 let _ = core.pop_transmit(now).ok_or("no header tx")?;
1720 core.enqueue_send_voice(now, sid(0x1234), 1, &VoiceFrame::silence())?;
1721 let voice_tx = core.pop_transmit(now).ok_or("no voice tx")?;
1722 assert_eq!(voice_tx.payload.len(), 100);
1723 Ok(())
1724 }
1725
1726 #[test]
1727 fn dcs_send_eot_after_header_succeeds() -> TestResult {
1728 let (mut core, _) = connected_dcs()?;
1729 let now = Instant::now();
1730 core.enqueue_send_header(now, &test_header(), sid(0x1234))?;
1731 let _ = core.pop_transmit(now).ok_or("no header tx")?;
1732 core.enqueue_send_eot(now, sid(0x1234), 21)?;
1733 let eot_tx = core.pop_transmit(now).ok_or("no eot tx")?;
1734 assert_eq!(eot_tx.payload.len(), 100);
1735 Ok(())
1736 }
1737
1738 #[test]
1741 fn enqueue_send_header_in_configured_state_errors() {
1742 let mut core = new_dextra();
1743 let now = Instant::now();
1744 let result = core.enqueue_send_header(now, &test_header(), sid(0x1234));
1745 assert!(matches!(
1746 result,
1747 Err(Error::State(StateError::WrongState { .. }))
1748 ));
1749 }
1750
1751 #[test]
1752 fn enqueue_send_voice_in_configured_state_errors() {
1753 let mut core = new_dextra();
1754 let now = Instant::now();
1755 let result = core.enqueue_send_voice(now, sid(0x1234), 1, &VoiceFrame::silence());
1756 assert!(matches!(
1757 result,
1758 Err(Error::State(StateError::WrongState { .. }))
1759 ));
1760 }
1761
1762 #[test]
1763 fn enqueue_send_eot_in_configured_state_errors() {
1764 let mut core = new_dextra();
1765 let now = Instant::now();
1766 let result = core.enqueue_send_eot(now, sid(0x1234), 1);
1767 assert!(matches!(
1768 result,
1769 Err(Error::State(StateError::WrongState { .. }))
1770 ));
1771 }
1772
1773 #[test]
1776 fn dextra_handle_voice_header_emits_voice_start() -> TestResult {
1777 let (mut core, _) = connected_dextra()?;
1778 let now = Instant::now();
1779 let mut buf = [0u8; 64];
1780 let n = dextra_codec::encode_voice_header(&mut buf, sid(0x1234), &test_header())?;
1781 core.handle_input(now, ADDR_DEXTRA, buf.get(..n).ok_or("n > buf")?)?;
1782 let event = core.pop_event::<DExtra>().ok_or("no event")?;
1783 assert!(matches!(event, Event::VoiceStart { stream_id, .. } if stream_id.get() == 0x1234));
1784 Ok(())
1785 }
1786
1787 #[test]
1788 fn dextra_handle_voice_data_emits_voice_frame() -> TestResult {
1789 let (mut core, _) = connected_dextra()?;
1790 let now = Instant::now();
1791 let mut buf = [0u8; 64];
1792 let n = dextra_codec::encode_voice_data(&mut buf, sid(0x1234), 7, &VoiceFrame::silence())?;
1793 core.handle_input(now, ADDR_DEXTRA, buf.get(..n).ok_or("n > buf")?)?;
1794 let event = core.pop_event::<DExtra>().ok_or("no event")?;
1795 assert!(
1796 matches!(event, Event::VoiceFrame { stream_id, seq, .. } if stream_id.get() == 0x1234 && seq == 7)
1797 );
1798 Ok(())
1799 }
1800
1801 #[test]
1802 fn dextra_handle_voice_eot_emits_voice_end() -> TestResult {
1803 let (mut core, _) = connected_dextra()?;
1804 let now = Instant::now();
1805 let mut buf = [0u8; 64];
1806 let n = dextra_codec::encode_voice_eot(&mut buf, sid(0x1234), 21)?;
1807 core.handle_input(now, ADDR_DEXTRA, buf.get(..n).ok_or("n > buf")?)?;
1808 let event = core.pop_event::<DExtra>().ok_or("no event")?;
1809 assert!(
1810 matches!(event, Event::VoiceEnd { stream_id, reason } if stream_id.get() == 0x1234 && reason == VoiceEndReason::Eot)
1811 );
1812 Ok(())
1813 }
1814
1815 #[test]
1816 fn dplus_handle_voice_header_emits_voice_start() -> TestResult {
1817 let (mut core, _) = connected_dplus()?;
1818 let now = Instant::now();
1819 let mut buf = [0u8; 64];
1820 let n = dplus_codec::encode_voice_header(&mut buf, sid(0x4567), &test_header())?;
1821 core.handle_input(now, ADDR_DPLUS, buf.get(..n).ok_or("n > buf")?)?;
1822 let event = core.pop_event::<DPlus>().ok_or("no event")?;
1823 assert!(matches!(event, Event::VoiceStart { stream_id, .. } if stream_id.get() == 0x4567));
1824 Ok(())
1825 }
1826
1827 #[test]
1828 fn dplus_handle_voice_eot_emits_voice_end() -> TestResult {
1829 let (mut core, _) = connected_dplus()?;
1830 let now = Instant::now();
1831 let mut buf = [0u8; 64];
1832 let n = dplus_codec::encode_voice_eot(&mut buf, sid(0x4567), 21)?;
1833 core.handle_input(now, ADDR_DPLUS, buf.get(..n).ok_or("n > buf")?)?;
1834 let event = core.pop_event::<DPlus>().ok_or("no event")?;
1835 assert!(matches!(event, Event::VoiceEnd { .. }));
1836 Ok(())
1837 }
1838
1839 fn non_silence_frame() -> VoiceFrame {
1840 VoiceFrame {
1841 ambe: [0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09],
1842 slow_data: [0xAA, 0xBB, 0xCC],
1843 }
1844 }
1845
1846 #[test]
1847 fn dcs_handle_first_voice_frame_emits_voice_start() -> TestResult {
1848 let (mut core, _) = connected_dcs()?;
1849 let now = Instant::now();
1850 let mut buf = [0u8; 128];
1851 let n = dcs_codec::encode_voice(
1852 &mut buf,
1853 &test_header(),
1854 sid(0x789A),
1855 0,
1856 &non_silence_frame(),
1857 false,
1858 )?;
1859 core.handle_input(now, ADDR_DCS, buf.get(..n).ok_or("n > buf")?)?;
1860 let event = core.pop_event::<Dcs>().ok_or("no event")?;
1861 assert!(matches!(event, Event::VoiceStart { stream_id, .. } if stream_id.get() == 0x789A));
1862 Ok(())
1863 }
1864
1865 #[test]
1866 fn dcs_handle_subsequent_voice_frame_emits_voice_frame() -> TestResult {
1867 let (mut core, _) = connected_dcs()?;
1868 let now = Instant::now();
1869 let mut buf = [0u8; 128];
1870 let n = dcs_codec::encode_voice(
1871 &mut buf,
1872 &test_header(),
1873 sid(0x789A),
1874 5,
1875 &non_silence_frame(),
1876 false,
1877 )?;
1878 core.handle_input(now, ADDR_DCS, buf.get(..n).ok_or("n > buf")?)?;
1879 let event = core.pop_event::<Dcs>().ok_or("no event")?;
1880 assert!(
1881 matches!(event, Event::VoiceFrame { stream_id, seq, .. } if stream_id.get() == 0x789A && seq == 5)
1882 );
1883 Ok(())
1884 }
1885
1886 #[test]
1887 fn dcs_handle_voice_end_emits_voice_end() -> TestResult {
1888 let (mut core, _) = connected_dcs()?;
1889 let now = Instant::now();
1890 let mut buf = [0u8; 128];
1891 let n = dcs_codec::encode_voice(
1892 &mut buf,
1893 &test_header(),
1894 sid(0x789A),
1895 21,
1896 &VoiceFrame::silence(),
1897 true,
1898 )?;
1899 core.handle_input(now, ADDR_DCS, buf.get(..n).ok_or("n > buf")?)?;
1900 let event = core.pop_event::<Dcs>().ok_or("no event")?;
1901 assert!(
1902 matches!(event, Event::VoiceEnd { stream_id, reason } if stream_id.get() == 0x789A && reason == VoiceEndReason::Eot)
1903 );
1904 Ok(())
1905 }
1906}