1use std::collections::VecDeque;
19use std::net::SocketAddr;
20use std::time::Instant;
21
22use crate::codec::dcs::{
23 self as dcs_codec, ClientPacket as DcsClientPacket,
24 decode_client_to_server as decode_dcs_client_to_server,
25};
26use crate::codec::dextra::{
27 ClientPacket, decode_client_to_server, encode_connect_ack, encode_poll_echo,
28};
29use crate::codec::dplus::{
30 ClientPacket as DPlusClientPacket, Link2Result,
31 decode_client_to_server as decode_dplus_client_to_server, encode_link1_ack, encode_link2_reply,
32 encode_poll_echo as dplus_encode_poll_echo, encode_unlink_ack,
33};
34use crate::error::{Error, ProtocolError};
35use crate::header::DStarHeader;
36use crate::session::client::Protocol;
37use crate::session::driver::Transmit;
38use crate::session::outbox::{OutboundPacket, Outbox};
39use crate::types::{Callsign, Module, ProtocolKind, StreamId};
40use crate::validator::VecSink;
41use crate::voice::VoiceFrame;
42
43use super::event::ServerEvent;
44use super::state::ServerStateKind;
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56enum InternalState {
57 Unknown,
58 Link1Received,
60 Linked,
61 Streaming,
62 Unlinking,
63 Closed,
64}
65
66impl InternalState {
67 const fn kind(self) -> ServerStateKind {
68 match self {
69 Self::Unknown | Self::Link1Received => ServerStateKind::Unknown,
72 Self::Linked => ServerStateKind::Linked,
73 Self::Streaming => ServerStateKind::Streaming,
74 Self::Unlinking => ServerStateKind::Unlinking,
75 Self::Closed => ServerStateKind::Closed,
76 }
77 }
78}
79
80#[derive(Debug, Clone)]
86enum RawServerEvent {
87 Linked {
88 peer: SocketAddr,
89 callsign: Callsign,
90 module: Module,
91 },
92 Unlinked {
93 peer: SocketAddr,
94 },
95 StreamStarted {
96 peer: SocketAddr,
97 stream_id: StreamId,
98 header: DStarHeader,
99 },
100 StreamFrame {
101 peer: SocketAddr,
102 stream_id: StreamId,
103 seq: u8,
104 frame: VoiceFrame,
105 },
106 StreamEnded {
107 peer: SocketAddr,
108 stream_id: StreamId,
109 },
110}
111
112pub struct ServerSessionCore {
120 kind: ProtocolKind,
122 peer: SocketAddr,
124 reflector_module: Module,
133 state: InternalState,
135 client_callsign: Option<Callsign>,
137 client_module: Option<Module>,
139 last_stream_id: Option<StreamId>,
148 outbox: Outbox,
150 events: VecDeque<RawServerEvent>,
152 diagnostics: VecSink,
154 current_tx: Option<OutboundPacket>,
158}
159
160impl std::fmt::Debug for ServerSessionCore {
161 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
162 f.debug_struct("ServerSessionCore")
163 .field("kind", &self.kind)
164 .field("peer", &self.peer)
165 .field("reflector_module", &self.reflector_module)
166 .field("state", &self.state)
167 .field("client_callsign", &self.client_callsign)
168 .field("client_module", &self.client_module)
169 .field("last_stream_id", &self.last_stream_id)
170 .field("outbox", &self.outbox)
171 .field("events", &self.events)
172 .field("diagnostics", &self.diagnostics)
173 .field("current_tx", &self.current_tx)
174 .finish()
175 }
176}
177
178impl ServerSessionCore {
179 #[must_use]
187 pub fn new(kind: ProtocolKind, peer: SocketAddr, reflector_module: Module) -> Self {
188 Self {
189 kind,
190 peer,
191 reflector_module,
192 state: InternalState::Unknown,
193 client_callsign: None,
194 client_module: None,
195 last_stream_id: None,
196 outbox: Outbox::new(),
197 events: VecDeque::new(),
198 diagnostics: VecSink::default(),
199 current_tx: None,
200 }
201 }
202
203 #[must_use]
205 pub const fn state_kind(&self) -> ServerStateKind {
206 self.state.kind()
207 }
208
209 #[must_use]
211 pub const fn peer(&self) -> SocketAddr {
212 self.peer
213 }
214
215 #[must_use]
217 pub const fn protocol_kind(&self) -> ProtocolKind {
218 self.kind
219 }
220
221 #[must_use]
223 pub const fn reflector_module(&self) -> Module {
224 self.reflector_module
225 }
226
227 #[must_use]
229 pub const fn client_callsign(&self) -> Option<Callsign> {
230 self.client_callsign
231 }
232
233 #[must_use]
235 pub const fn client_module(&self) -> Option<Module> {
236 self.client_module
237 }
238
239 pub fn handle_input(&mut self, now: Instant, bytes: &[u8]) -> Result<(), Error> {
250 match self.kind {
251 ProtocolKind::DExtra => self.handle_dextra_input(now, bytes),
252 ProtocolKind::DPlus => self.handle_dplus_input(now, bytes),
253 ProtocolKind::Dcs => self.handle_dcs_input(bytes),
254 }
255 }
256
257 fn handle_dextra_input(&mut self, now: Instant, bytes: &[u8]) -> Result<(), Error> {
258 let packet = decode_client_to_server(bytes, &mut self.diagnostics)
259 .map_err(|e| Error::Protocol(ProtocolError::DExtra(e)))?;
260 match packet {
261 ClientPacket::Link {
262 callsign,
263 reflector_module,
264 client_module,
265 } => self.on_dextra_link(now, callsign, reflector_module, client_module),
266 ClientPacket::Unlink { callsign, .. } => {
267 self.on_dextra_unlink(callsign);
268 Ok(())
269 }
270 ClientPacket::Poll { callsign } => self.on_dextra_poll(now, callsign),
271 ClientPacket::VoiceHeader { stream_id, header } => {
272 self.on_dextra_voice_header(stream_id, header);
273 Ok(())
274 }
275 ClientPacket::VoiceData {
276 stream_id,
277 seq,
278 frame,
279 } => {
280 self.on_dextra_voice_data(stream_id, seq, frame);
281 Ok(())
282 }
283 ClientPacket::VoiceEot { stream_id, .. } => {
284 self.on_dextra_voice_eot(stream_id);
285 Ok(())
286 }
287 }
288 }
289
290 fn on_dextra_link(
291 &mut self,
292 now: Instant,
293 callsign: Callsign,
294 reflector_module: Module,
295 client_module: Module,
296 ) -> Result<(), Error> {
297 if self.state != InternalState::Unknown && self.state != InternalState::Linked {
298 return Ok(());
299 }
300 self.client_callsign = Some(callsign);
301 self.client_module = Some(client_module);
302 self.state = InternalState::Linked;
303 self.events.push_back(RawServerEvent::Linked {
304 peer: self.peer,
305 callsign,
306 module: reflector_module,
307 });
308 let mut buf = [0u8; 32];
310 let n = encode_connect_ack(&mut buf, &callsign, reflector_module)
311 .map_err(|e| Error::Protocol(ProtocolError::DExtra(e.into())))?;
312 let payload = buf.get(..n).unwrap_or(&[]).to_vec();
313 self.outbox.enqueue(OutboundPacket {
314 dst: self.peer,
315 payload,
316 not_before: now,
317 });
318 Ok(())
319 }
320
321 fn on_dextra_unlink(&mut self, callsign: Callsign) {
322 if self.client_callsign != Some(callsign) {
323 return;
324 }
325 self.state = InternalState::Unlinking;
326 self.events
327 .push_back(RawServerEvent::Unlinked { peer: self.peer });
328 self.state = InternalState::Closed;
332 }
333
334 fn on_dextra_poll(&mut self, now: Instant, callsign: Callsign) -> Result<(), Error> {
335 if self.client_callsign != Some(callsign) {
336 return Ok(());
337 }
338 let mut buf = [0u8; 16];
340 let n = encode_poll_echo(&mut buf, &callsign)
341 .map_err(|e| Error::Protocol(ProtocolError::DExtra(e.into())))?;
342 let payload = buf.get(..n).unwrap_or(&[]).to_vec();
343 self.outbox.enqueue(OutboundPacket {
344 dst: self.peer,
345 payload,
346 not_before: now,
347 });
348 Ok(())
349 }
350
351 fn on_dextra_voice_header(&mut self, stream_id: StreamId, header: DStarHeader) {
352 if self.state != InternalState::Linked {
353 return;
354 }
355 self.state = InternalState::Streaming;
356 self.events.push_back(RawServerEvent::StreamStarted {
357 peer: self.peer,
358 stream_id,
359 header,
360 });
361 }
362
363 fn on_dextra_voice_data(&mut self, stream_id: StreamId, seq: u8, frame: VoiceFrame) {
364 if self.state != InternalState::Streaming {
365 return;
366 }
367 self.events.push_back(RawServerEvent::StreamFrame {
368 peer: self.peer,
369 stream_id,
370 seq,
371 frame,
372 });
373 }
374
375 fn on_dextra_voice_eot(&mut self, stream_id: StreamId) {
376 if self.state != InternalState::Streaming {
377 return;
378 }
379 self.state = InternalState::Linked;
380 self.events.push_back(RawServerEvent::StreamEnded {
381 peer: self.peer,
382 stream_id,
383 });
384 }
385
386 fn handle_dplus_input(&mut self, now: Instant, bytes: &[u8]) -> Result<(), Error> {
389 let packet = decode_dplus_client_to_server(bytes, &mut self.diagnostics)
390 .map_err(|e| Error::Protocol(ProtocolError::DPlus(e)))?;
391 match packet {
392 DPlusClientPacket::Link1 => self.on_dplus_link1(now),
393 DPlusClientPacket::Link2 { callsign } => self.on_dplus_link2(now, callsign),
394 DPlusClientPacket::Unlink => self.on_dplus_unlink(now),
395 DPlusClientPacket::Poll => self.on_dplus_poll(now),
396 DPlusClientPacket::VoiceHeader { stream_id, header } => {
397 self.on_dplus_voice_header(stream_id, header);
398 Ok(())
399 }
400 DPlusClientPacket::VoiceData {
401 stream_id,
402 seq,
403 frame,
404 } => {
405 self.on_dplus_voice_data(stream_id, seq, frame);
406 Ok(())
407 }
408 DPlusClientPacket::VoiceEot { stream_id, .. } => {
409 self.on_dplus_voice_eot(stream_id);
410 Ok(())
411 }
412 }
413 }
414
415 fn on_dplus_link1(&mut self, now: Instant) -> Result<(), Error> {
416 match self.state {
417 InternalState::Unknown => {
418 self.state = InternalState::Link1Received;
419 }
420 InternalState::Link1Received | InternalState::Linked => {
421 }
426 InternalState::Streaming | InternalState::Unlinking | InternalState::Closed => {
427 return Ok(());
429 }
430 }
431 let mut buf = [0u8; 8];
433 let n = encode_link1_ack(&mut buf)
434 .map_err(|e| Error::Protocol(ProtocolError::DPlus(e.into())))?;
435 let payload = buf.get(..n).unwrap_or(&[]).to_vec();
436 self.outbox.enqueue(OutboundPacket {
437 dst: self.peer,
438 payload,
439 not_before: now,
440 });
441 Ok(())
442 }
443
444 fn on_dplus_link2(&mut self, now: Instant, callsign: Callsign) -> Result<(), Error> {
445 match self.state {
446 InternalState::Link1Received => {
447 self.client_callsign = Some(callsign);
448 self.client_module = Some(self.reflector_module);
449 self.state = InternalState::Linked;
450 self.events.push_back(RawServerEvent::Linked {
451 peer: self.peer,
452 callsign,
453 module: self.reflector_module,
454 });
455 self.enqueue_dplus_link2_reply(now, Link2Result::Accept)?;
457 Ok(())
458 }
459 InternalState::Linked | InternalState::Streaming => {
460 if self.client_callsign == Some(callsign) {
463 self.enqueue_dplus_link2_reply(now, Link2Result::Accept)?;
464 } else {
465 self.enqueue_dplus_link2_reply(now, Link2Result::Busy)?;
466 }
467 Ok(())
468 }
469 InternalState::Unknown | InternalState::Unlinking | InternalState::Closed => {
470 Ok(())
474 }
475 }
476 }
477
478 fn enqueue_dplus_link2_reply(
479 &mut self,
480 now: Instant,
481 result: Link2Result,
482 ) -> Result<(), Error> {
483 let mut buf = [0u8; 16];
484 let n = encode_link2_reply(&mut buf, result)
485 .map_err(|e| Error::Protocol(ProtocolError::DPlus(e.into())))?;
486 let payload = buf.get(..n).unwrap_or(&[]).to_vec();
487 self.outbox.enqueue(OutboundPacket {
488 dst: self.peer,
489 payload,
490 not_before: now,
491 });
492 Ok(())
493 }
494
495 fn on_dplus_unlink(&mut self, now: Instant) -> Result<(), Error> {
496 if !matches!(
497 self.state,
498 InternalState::Linked | InternalState::Streaming | InternalState::Link1Received
499 ) {
500 return Ok(());
501 }
502 let mut buf = [0u8; 8];
504 let n = encode_unlink_ack(&mut buf)
505 .map_err(|e| Error::Protocol(ProtocolError::DPlus(e.into())))?;
506 let payload = buf.get(..n).unwrap_or(&[]).to_vec();
507 self.outbox.enqueue(OutboundPacket {
508 dst: self.peer,
509 payload,
510 not_before: now,
511 });
512 self.events
513 .push_back(RawServerEvent::Unlinked { peer: self.peer });
514 self.state = InternalState::Closed;
515 Ok(())
516 }
517
518 fn on_dplus_poll(&mut self, now: Instant) -> Result<(), Error> {
519 if !matches!(self.state, InternalState::Linked | InternalState::Streaming) {
520 return Ok(());
521 }
522 let mut buf = [0u8; 8];
523 let n = dplus_encode_poll_echo(&mut buf)
524 .map_err(|e| Error::Protocol(ProtocolError::DPlus(e.into())))?;
525 let payload = buf.get(..n).unwrap_or(&[]).to_vec();
526 self.outbox.enqueue(OutboundPacket {
527 dst: self.peer,
528 payload,
529 not_before: now,
530 });
531 Ok(())
532 }
533
534 fn on_dplus_voice_header(&mut self, stream_id: StreamId, header: DStarHeader) {
535 if self.state != InternalState::Linked {
536 return;
537 }
538 self.state = InternalState::Streaming;
539 self.last_stream_id = Some(stream_id);
540 self.events.push_back(RawServerEvent::StreamStarted {
541 peer: self.peer,
542 stream_id,
543 header,
544 });
545 }
546
547 fn on_dplus_voice_data(&mut self, stream_id: StreamId, seq: u8, frame: VoiceFrame) {
548 if self.state != InternalState::Streaming {
549 return;
550 }
551 self.events.push_back(RawServerEvent::StreamFrame {
552 peer: self.peer,
553 stream_id,
554 seq,
555 frame,
556 });
557 }
558
559 fn on_dplus_voice_eot(&mut self, stream_id: StreamId) {
560 if self.state != InternalState::Streaming {
561 return;
562 }
563 self.state = InternalState::Linked;
564 self.last_stream_id = None;
565 self.events.push_back(RawServerEvent::StreamEnded {
566 peer: self.peer,
567 stream_id,
568 });
569 }
570
571 fn handle_dcs_input(&mut self, bytes: &[u8]) -> Result<(), Error> {
574 let packet = decode_dcs_client_to_server(bytes, &mut self.diagnostics)
575 .map_err(|e| Error::Protocol(ProtocolError::Dcs(e)))?;
576 match packet {
577 DcsClientPacket::Link {
578 callsign,
579 client_module,
580 reflector_module,
581 ..
582 } => self.on_dcs_link(callsign, client_module, reflector_module),
583 DcsClientPacket::Unlink { callsign, .. } => {
584 self.on_dcs_unlink(callsign);
585 Ok(())
586 }
587 DcsClientPacket::Poll {
588 callsign,
589 reflector_callsign,
590 } => self.on_dcs_poll(callsign, reflector_callsign),
591 DcsClientPacket::Voice {
592 header,
593 stream_id,
594 seq,
595 frame,
596 is_end,
597 } => {
598 self.on_dcs_voice(header, stream_id, seq, frame, is_end);
599 Ok(())
600 }
601 }
602 }
603
604 fn on_dcs_link(
605 &mut self,
606 callsign: Callsign,
607 client_module: Module,
608 reflector_module: Module,
609 ) -> Result<(), Error> {
610 if !matches!(
611 self.state,
612 InternalState::Unknown | InternalState::Linked | InternalState::Streaming
613 ) {
614 return Ok(());
615 }
616 self.client_callsign = Some(callsign);
617 self.client_module = Some(client_module);
618 if self.state != InternalState::Streaming {
619 self.state = InternalState::Linked;
620 }
621 self.events.push_back(RawServerEvent::Linked {
622 peer: self.peer,
623 callsign,
624 module: reflector_module,
625 });
626 let mut buf = [0u8; 32];
627 let n = dcs_codec::encode_connect_ack(&mut buf, &callsign, reflector_module)
628 .map_err(|e| Error::Protocol(ProtocolError::Dcs(e.into())))?;
629 let payload = buf.get(..n).unwrap_or(&[]).to_vec();
630 self.outbox.enqueue(OutboundPacket {
633 dst: self.peer,
634 payload,
635 not_before: Instant::now(),
636 });
637 Ok(())
638 }
639
640 fn on_dcs_unlink(&mut self, callsign: Callsign) {
641 if self.client_callsign != Some(callsign) {
642 return;
643 }
644 self.events
645 .push_back(RawServerEvent::Unlinked { peer: self.peer });
646 self.state = InternalState::Closed;
647 }
648
649 fn on_dcs_poll(
650 &mut self,
651 callsign: Callsign,
652 reflector_callsign: Callsign,
653 ) -> Result<(), Error> {
654 if !matches!(self.state, InternalState::Linked | InternalState::Streaming) {
655 return Ok(());
656 }
657 if self.client_callsign != Some(callsign) {
658 return Ok(());
659 }
660 let mut buf = [0u8; 32];
661 let n = dcs_codec::encode_poll_reply(&mut buf, &callsign, &reflector_callsign)
662 .map_err(|e| Error::Protocol(ProtocolError::Dcs(e.into())))?;
663 let payload = buf.get(..n).unwrap_or(&[]).to_vec();
664 self.outbox.enqueue(OutboundPacket {
665 dst: self.peer,
666 payload,
667 not_before: Instant::now(),
668 });
669 Ok(())
670 }
671
672 fn on_dcs_voice(
673 &mut self,
674 header: DStarHeader,
675 stream_id: StreamId,
676 seq: u8,
677 frame: VoiceFrame,
678 is_end: bool,
679 ) {
680 if !matches!(self.state, InternalState::Linked | InternalState::Streaming) {
681 return;
682 }
683 let is_new_stream = self.last_stream_id != Some(stream_id);
686 if is_new_stream {
687 self.state = InternalState::Streaming;
688 self.last_stream_id = Some(stream_id);
689 self.events.push_back(RawServerEvent::StreamStarted {
690 peer: self.peer,
691 stream_id,
692 header,
693 });
694 }
695 self.events.push_back(RawServerEvent::StreamFrame {
696 peer: self.peer,
697 stream_id,
698 seq,
699 frame,
700 });
701 if is_end {
702 self.state = InternalState::Linked;
703 self.last_stream_id = None;
704 self.events.push_back(RawServerEvent::StreamEnded {
705 peer: self.peer,
706 stream_id,
707 });
708 }
709 }
710
711 #[must_use]
716 pub fn pop_transmit(&mut self, now: Instant) -> Option<Transmit<'_>> {
717 let next = self.outbox.pop_ready(now)?;
718 self.current_tx = Some(next);
719 let held = self.current_tx.as_ref()?;
720 Some(Transmit::new(held.dst, held.payload.as_slice()))
721 }
722
723 pub fn pop_event<P: Protocol>(&mut self) -> Option<ServerEvent<P>> {
728 let raw = self.events.pop_front()?;
729 Some(match raw {
730 RawServerEvent::Linked {
731 peer,
732 callsign,
733 module,
734 } => ServerEvent::ClientLinked {
735 peer,
736 callsign,
737 module,
738 },
739 RawServerEvent::Unlinked { peer } => ServerEvent::ClientUnlinked { peer },
740 RawServerEvent::StreamStarted {
741 peer,
742 stream_id,
743 header,
744 } => ServerEvent::ClientStreamStarted {
745 peer,
746 stream_id,
747 header,
748 },
749 RawServerEvent::StreamFrame {
750 peer,
751 stream_id,
752 seq,
753 frame,
754 } => ServerEvent::ClientStreamFrame {
755 peer,
756 stream_id,
757 seq,
758 frame,
759 },
760 RawServerEvent::StreamEnded { peer, stream_id } => {
761 ServerEvent::ClientStreamEnded { peer, stream_id }
762 }
763 })
764 }
765
766 #[must_use]
768 pub fn next_deadline(&self) -> Option<Instant> {
769 self.outbox.peek_next_deadline()
770 }
771
772 pub fn drain_diagnostics(&mut self) -> Vec<crate::validator::Diagnostic> {
774 self.diagnostics.drain().collect()
775 }
776}
777
778#[cfg(test)]
779mod tests {
780 use super::{ProtocolKind, ServerEvent, ServerSessionCore, ServerStateKind};
781 use crate::codec::dcs::{
782 encode_connect_link as dcs_encode_link, encode_connect_unlink as dcs_encode_unlink,
783 encode_poll_request as dcs_encode_poll, encode_voice as dcs_encode_voice,
784 };
785 use crate::codec::dextra::{encode_connect_link, encode_poll, encode_unlink};
786 use crate::codec::dplus::{
787 encode_link1, encode_link2, encode_poll as dplus_encode_poll,
788 encode_unlink as dplus_encode_unlink,
789 };
790 use crate::header::DStarHeader;
791 use crate::session::client::{DExtra, DPlus, Dcs};
792 use crate::types::{Callsign, Module, StreamId, Suffix};
793 use crate::voice::VoiceFrame;
794 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
795 use std::time::Instant;
796
797 type TestResult = Result<(), Box<dyn std::error::Error>>;
798
799 const PEER: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 30001);
800 const DPLUS_PEER: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 20001);
801 const DCS_PEER: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 30051);
802
803 const fn cs(bytes: [u8; 8]) -> Callsign {
804 Callsign::from_wire_bytes(bytes)
805 }
806
807 const fn test_header(my: [u8; 8]) -> DStarHeader {
808 DStarHeader {
809 flag1: 0,
810 flag2: 0,
811 flag3: 0,
812 rpt2: Callsign::from_wire_bytes(*b"REF030 G"),
813 rpt1: Callsign::from_wire_bytes(*b"REF030 C"),
814 ur_call: Callsign::from_wire_bytes(*b"CQCQCQ "),
815 my_call: Callsign::from_wire_bytes(my),
816 my_suffix: Suffix::EMPTY,
817 }
818 }
819
820 #[expect(clippy::unwrap_used, reason = "const-validated: n is non-zero")]
821 const fn sid(n: u16) -> StreamId {
822 StreamId::new(n).unwrap()
823 }
824
825 #[test]
828 fn dextra_starts_in_unknown_state() {
829 let core = ServerSessionCore::new(ProtocolKind::DExtra, PEER, Module::C);
830 assert_eq!(core.state_kind(), ServerStateKind::Unknown);
831 assert_eq!(core.peer(), PEER);
832 assert_eq!(core.protocol_kind(), ProtocolKind::DExtra);
833 assert_eq!(core.reflector_module(), Module::C);
834 assert!(core.client_callsign().is_none());
835 assert!(core.client_module().is_none());
836 }
837
838 #[test]
839 fn dextra_link_transitions_to_linked() -> TestResult {
840 let mut core = ServerSessionCore::new(ProtocolKind::DExtra, PEER, Module::C);
841 assert_eq!(core.state_kind(), ServerStateKind::Unknown);
842
843 let mut buf = [0u8; 16];
844 let n = encode_connect_link(&mut buf, &cs(*b"W1AW "), Module::C, Module::B)?;
845 let slice = buf.get(..n).ok_or("n bytes")?;
846 core.handle_input(Instant::now(), slice)?;
847
848 assert_eq!(core.state_kind(), ServerStateKind::Linked);
849 assert_eq!(core.client_callsign(), Some(cs(*b"W1AW ")));
850 assert_eq!(core.client_module(), Some(Module::B));
851
852 let event: Option<ServerEvent<DExtra>> = core.pop_event();
853 assert!(matches!(event, Some(ServerEvent::ClientLinked { .. })));
854 Ok(())
855 }
856
857 #[test]
858 fn dextra_link_enqueues_ack() -> TestResult {
859 let mut core = ServerSessionCore::new(ProtocolKind::DExtra, PEER, Module::C);
860 let mut buf = [0u8; 16];
861 let n = encode_connect_link(&mut buf, &cs(*b"W1AW "), Module::C, Module::B)?;
862 let slice = buf.get(..n).ok_or("n bytes")?;
863 core.handle_input(Instant::now(), slice)?;
864
865 let tx = core.pop_transmit(Instant::now()).ok_or("tx")?;
866 assert_eq!(tx.payload.len(), 14, "DExtra ACK is 14 bytes");
867 assert_eq!(tx.dst, PEER);
868 assert_eq!(tx.payload.get(10..13), Some(b"ACK".as_slice()));
869 assert_eq!(tx.payload.get(13), Some(&0x00));
870 Ok(())
871 }
872
873 #[test]
874 fn dextra_poll_from_linked_client_is_echoed() -> TestResult {
875 let mut core = ServerSessionCore::new(ProtocolKind::DExtra, PEER, Module::C);
876 let mut link_buf = [0u8; 16];
877 let n = encode_connect_link(&mut link_buf, &cs(*b"W1AW "), Module::C, Module::B)?;
878 let slice = link_buf.get(..n).ok_or("n bytes")?;
879 core.handle_input(Instant::now(), slice)?;
880 let _ack = core.pop_transmit(Instant::now()).ok_or("ack")?;
881 let _event: Option<ServerEvent<DExtra>> = core.pop_event();
882
883 let mut poll_buf = [0u8; 16];
884 let n = encode_poll(&mut poll_buf, &cs(*b"W1AW "))?;
885 let slice = poll_buf.get(..n).ok_or("n bytes")?;
886 core.handle_input(Instant::now(), slice)?;
887
888 let tx = core.pop_transmit(Instant::now()).ok_or("echo")?;
889 assert_eq!(tx.payload.len(), 9, "DExtra poll echo is 9 bytes");
890 assert_eq!(tx.dst, PEER);
891 Ok(())
892 }
893
894 #[test]
895 fn dextra_poll_from_unknown_callsign_is_ignored() -> TestResult {
896 let mut core = ServerSessionCore::new(ProtocolKind::DExtra, PEER, Module::C);
897 let mut link_buf = [0u8; 16];
898 let n = encode_connect_link(&mut link_buf, &cs(*b"W1AW "), Module::C, Module::B)?;
899 let slice = link_buf.get(..n).ok_or("n bytes")?;
900 core.handle_input(Instant::now(), slice)?;
901 let _ack = core.pop_transmit(Instant::now()).ok_or("ack")?;
902 let _event: Option<ServerEvent<DExtra>> = core.pop_event();
903
904 let mut poll_buf = [0u8; 16];
905 let n = encode_poll(&mut poll_buf, &cs(*b"N0CALL "))?;
906 let slice = poll_buf.get(..n).ok_or("n bytes")?;
907 core.handle_input(Instant::now(), slice)?;
908
909 assert!(
910 core.pop_transmit(Instant::now()).is_none(),
911 "poll from wrong callsign ignored"
912 );
913 Ok(())
914 }
915
916 #[test]
917 fn dextra_unlink_transitions_to_closed() -> TestResult {
918 let mut core = ServerSessionCore::new(ProtocolKind::DExtra, PEER, Module::C);
919 let mut link_buf = [0u8; 16];
920 let n = encode_connect_link(&mut link_buf, &cs(*b"W1AW "), Module::C, Module::B)?;
921 let slice = link_buf.get(..n).ok_or("n bytes")?;
922 core.handle_input(Instant::now(), slice)?;
923 let _ack = core.pop_transmit(Instant::now()).ok_or("ack")?;
924 let _event: Option<ServerEvent<DExtra>> = core.pop_event();
925
926 let mut ulink_buf = [0u8; 16];
927 let n = encode_unlink(&mut ulink_buf, &cs(*b"W1AW "), Module::B)?;
928 let slice = ulink_buf.get(..n).ok_or("n bytes")?;
929 core.handle_input(Instant::now(), slice)?;
930
931 assert_eq!(core.state_kind(), ServerStateKind::Closed);
932 let event: Option<ServerEvent<DExtra>> = core.pop_event();
933 assert!(matches!(event, Some(ServerEvent::ClientUnlinked { .. })));
934 Ok(())
935 }
936
937 #[test]
938 fn dextra_unlink_from_wrong_callsign_is_ignored() -> TestResult {
939 let mut core = ServerSessionCore::new(ProtocolKind::DExtra, PEER, Module::C);
940 let mut link_buf = [0u8; 16];
941 let n = encode_connect_link(&mut link_buf, &cs(*b"W1AW "), Module::C, Module::B)?;
942 let slice = link_buf.get(..n).ok_or("n bytes")?;
943 core.handle_input(Instant::now(), slice)?;
944 let _ack = core.pop_transmit(Instant::now()).ok_or("ack")?;
945 let _event: Option<ServerEvent<DExtra>> = core.pop_event();
946
947 let mut ulink_buf = [0u8; 16];
948 let n = encode_unlink(&mut ulink_buf, &cs(*b"N0CALL "), Module::B)?;
949 let slice = ulink_buf.get(..n).ok_or("n bytes")?;
950 core.handle_input(Instant::now(), slice)?;
951
952 assert_eq!(
953 core.state_kind(),
954 ServerStateKind::Linked,
955 "state unchanged when wrong callsign tries to unlink"
956 );
957 Ok(())
958 }
959
960 #[test]
963 fn dplus_link1_transitions_to_link1_received() -> TestResult {
964 let mut core = ServerSessionCore::new(ProtocolKind::DPlus, DPLUS_PEER, Module::C);
965 let mut buf = [0u8; 16];
966 let n = encode_link1(&mut buf)?;
967 core.handle_input(Instant::now(), buf.get(..n).ok_or("bytes")?)?;
968 assert_eq!(core.state_kind(), ServerStateKind::Unknown);
969 let tx = core.pop_transmit(Instant::now()).ok_or("ack")?;
970 assert_eq!(tx.payload.len(), 5, "DPlus LINK1 ACK is 5 bytes");
971 assert_eq!(tx.payload, &[0x05, 0x00, 0x18, 0x00, 0x01]);
972 Ok(())
973 }
974
975 #[test]
976 fn dplus_link2_after_link1_transitions_to_linked_and_accepts() -> TestResult {
977 let mut core = ServerSessionCore::new(ProtocolKind::DPlus, DPLUS_PEER, Module::C);
978 let mut buf1 = [0u8; 16];
979 let n = encode_link1(&mut buf1)?;
980 core.handle_input(Instant::now(), buf1.get(..n).ok_or("bytes")?)?;
981 let _ack1 = core.pop_transmit(Instant::now()).ok_or("ack1")?;
982
983 let mut buf2 = [0u8; 32];
984 let n = encode_link2(&mut buf2, &cs(*b"W1AW "))?;
985 core.handle_input(Instant::now(), buf2.get(..n).ok_or("bytes")?)?;
986
987 assert_eq!(core.state_kind(), ServerStateKind::Linked);
988 assert_eq!(core.client_callsign(), Some(cs(*b"W1AW ")));
989 assert_eq!(core.client_module(), Some(Module::C));
990
991 let tx = core.pop_transmit(Instant::now()).ok_or("okrw")?;
992 assert_eq!(tx.payload.len(), 8, "DPlus LINK2 reply is 8 bytes");
993 assert_eq!(tx.payload.get(4..8), Some(b"OKRW".as_slice()));
994
995 let event: Option<ServerEvent<DPlus>> = core.pop_event();
996 assert!(matches!(event, Some(ServerEvent::ClientLinked { .. })));
997 Ok(())
998 }
999
1000 #[test]
1001 fn dplus_link2_without_link1_is_ignored() -> TestResult {
1002 let mut core = ServerSessionCore::new(ProtocolKind::DPlus, DPLUS_PEER, Module::C);
1003 let mut buf = [0u8; 32];
1004 let n = encode_link2(&mut buf, &cs(*b"W1AW "))?;
1005 core.handle_input(Instant::now(), buf.get(..n).ok_or("bytes")?)?;
1006 assert_eq!(core.state_kind(), ServerStateKind::Unknown);
1007 assert!(core.pop_transmit(Instant::now()).is_none());
1008 Ok(())
1009 }
1010
1011 #[test]
1012 fn dplus_already_linked_link2_from_different_callsign_returns_busy() -> TestResult {
1013 let mut core = ServerSessionCore::new(ProtocolKind::DPlus, DPLUS_PEER, Module::C);
1014 let mut buf1 = [0u8; 16];
1015 let n = encode_link1(&mut buf1)?;
1016 core.handle_input(Instant::now(), buf1.get(..n).ok_or("bytes")?)?;
1017 let _ack1 = core.pop_transmit(Instant::now()).ok_or("ack1")?;
1018 let mut buf2 = [0u8; 32];
1019 let n = encode_link2(&mut buf2, &cs(*b"W1AW "))?;
1020 core.handle_input(Instant::now(), buf2.get(..n).ok_or("bytes")?)?;
1021 let _ok = core.pop_transmit(Instant::now()).ok_or("okrw")?;
1022 let _ev: Option<ServerEvent<DPlus>> = core.pop_event();
1023
1024 let mut buf3 = [0u8; 32];
1025 let n = encode_link2(&mut buf3, &cs(*b"N0CALL "))?;
1026 core.handle_input(Instant::now(), buf3.get(..n).ok_or("bytes")?)?;
1027
1028 let tx = core.pop_transmit(Instant::now()).ok_or("busy")?;
1029 assert_eq!(tx.payload.len(), 8);
1030 assert_eq!(tx.payload.get(4..8), Some(b"BUSY".as_slice()));
1031 assert_eq!(core.state_kind(), ServerStateKind::Linked);
1032 assert_eq!(core.client_callsign(), Some(cs(*b"W1AW ")));
1033 Ok(())
1034 }
1035
1036 #[test]
1037 fn dplus_voice_header_after_link2_transitions_to_streaming() -> TestResult {
1038 let mut core = ServerSessionCore::new(ProtocolKind::DPlus, DPLUS_PEER, Module::C);
1039 let mut buf1 = [0u8; 16];
1040 let n = encode_link1(&mut buf1)?;
1041 core.handle_input(Instant::now(), buf1.get(..n).ok_or("bytes")?)?;
1042 let _a1 = core.pop_transmit(Instant::now());
1043 let mut buf2 = [0u8; 32];
1044 let n = encode_link2(&mut buf2, &cs(*b"W1AW "))?;
1045 core.handle_input(Instant::now(), buf2.get(..n).ok_or("bytes")?)?;
1046 let _a2 = core.pop_transmit(Instant::now());
1047 let _ev: Option<ServerEvent<DPlus>> = core.pop_event();
1048
1049 let mut hdr_buf = [0u8; 128];
1050 let n = crate::codec::dplus::encode_voice_header(
1051 &mut hdr_buf,
1052 sid(0xCAFE),
1053 &test_header(*b"W1AW "),
1054 )?;
1055 core.handle_input(Instant::now(), hdr_buf.get(..n).ok_or("bytes")?)?;
1056
1057 assert_eq!(core.state_kind(), ServerStateKind::Streaming);
1058 let event: Option<ServerEvent<DPlus>> = core.pop_event();
1059 assert!(matches!(
1060 event,
1061 Some(ServerEvent::ClientStreamStarted { .. })
1062 ));
1063 Ok(())
1064 }
1065
1066 #[test]
1067 fn dplus_voice_data_during_streaming_emits_frame_event() -> TestResult {
1068 let mut core = ServerSessionCore::new(ProtocolKind::DPlus, DPLUS_PEER, Module::C);
1069 let mut buf1 = [0u8; 16];
1070 let n = encode_link1(&mut buf1)?;
1071 core.handle_input(Instant::now(), buf1.get(..n).ok_or("bytes")?)?;
1072 let _ = core.pop_transmit(Instant::now());
1073 let mut buf2 = [0u8; 32];
1074 let n = encode_link2(&mut buf2, &cs(*b"W1AW "))?;
1075 core.handle_input(Instant::now(), buf2.get(..n).ok_or("bytes")?)?;
1076 let _ = core.pop_transmit(Instant::now());
1077 let _ev: Option<ServerEvent<DPlus>> = core.pop_event();
1078 let mut hdr_buf = [0u8; 128];
1079 let n = crate::codec::dplus::encode_voice_header(
1080 &mut hdr_buf,
1081 sid(0xCAFE),
1082 &test_header(*b"W1AW "),
1083 )?;
1084 core.handle_input(Instant::now(), hdr_buf.get(..n).ok_or("bytes")?)?;
1085 let _ev: Option<ServerEvent<DPlus>> = core.pop_event();
1086
1087 let frame = VoiceFrame::silence();
1088 let mut data_buf = [0u8; 128];
1089 let n = crate::codec::dplus::encode_voice_data(&mut data_buf, sid(0xCAFE), 3, &frame)?;
1090 core.handle_input(Instant::now(), data_buf.get(..n).ok_or("bytes")?)?;
1091
1092 let event: Option<ServerEvent<DPlus>> = core.pop_event();
1093 assert!(matches!(
1094 event,
1095 Some(ServerEvent::ClientStreamFrame { stream_id, seq, .. })
1096 if stream_id == sid(0xCAFE) && seq == 3
1097 ));
1098 Ok(())
1099 }
1100
1101 #[test]
1102 fn dplus_voice_eot_returns_to_linked() -> TestResult {
1103 let mut core = ServerSessionCore::new(ProtocolKind::DPlus, DPLUS_PEER, Module::C);
1104 let mut buf1 = [0u8; 16];
1105 let n = encode_link1(&mut buf1)?;
1106 core.handle_input(Instant::now(), buf1.get(..n).ok_or("bytes")?)?;
1107 let _ = core.pop_transmit(Instant::now());
1108 let mut buf2 = [0u8; 32];
1109 let n = encode_link2(&mut buf2, &cs(*b"W1AW "))?;
1110 core.handle_input(Instant::now(), buf2.get(..n).ok_or("bytes")?)?;
1111 let _ = core.pop_transmit(Instant::now());
1112 let _ev: Option<ServerEvent<DPlus>> = core.pop_event();
1113 let mut hdr_buf = [0u8; 128];
1114 let n = crate::codec::dplus::encode_voice_header(
1115 &mut hdr_buf,
1116 sid(0xCAFE),
1117 &test_header(*b"W1AW "),
1118 )?;
1119 core.handle_input(Instant::now(), hdr_buf.get(..n).ok_or("bytes")?)?;
1120 let _ev: Option<ServerEvent<DPlus>> = core.pop_event();
1121
1122 let mut eot_buf = [0u8; 128];
1123 let n = crate::codec::dplus::encode_voice_eot(&mut eot_buf, sid(0xCAFE), 20)?;
1124 core.handle_input(Instant::now(), eot_buf.get(..n).ok_or("bytes")?)?;
1125
1126 assert_eq!(core.state_kind(), ServerStateKind::Linked);
1127 let event: Option<ServerEvent<DPlus>> = core.pop_event();
1128 assert!(matches!(event, Some(ServerEvent::ClientStreamEnded { .. })));
1129 Ok(())
1130 }
1131
1132 #[test]
1133 fn dplus_unlink_transitions_to_closed_and_acks() -> TestResult {
1134 let mut core = ServerSessionCore::new(ProtocolKind::DPlus, DPLUS_PEER, Module::C);
1135 let mut buf1 = [0u8; 16];
1136 let n = encode_link1(&mut buf1)?;
1137 core.handle_input(Instant::now(), buf1.get(..n).ok_or("bytes")?)?;
1138 let _ = core.pop_transmit(Instant::now());
1139 let mut buf2 = [0u8; 32];
1140 let n = encode_link2(&mut buf2, &cs(*b"W1AW "))?;
1141 core.handle_input(Instant::now(), buf2.get(..n).ok_or("bytes")?)?;
1142 let _ = core.pop_transmit(Instant::now());
1143 let _ev: Option<ServerEvent<DPlus>> = core.pop_event();
1144
1145 let mut ub = [0u8; 16];
1146 let n = dplus_encode_unlink(&mut ub)?;
1147 core.handle_input(Instant::now(), ub.get(..n).ok_or("bytes")?)?;
1148
1149 assert_eq!(core.state_kind(), ServerStateKind::Closed);
1150 let tx = core.pop_transmit(Instant::now()).ok_or("ack")?;
1151 assert_eq!(tx.payload.len(), 5);
1152 assert_eq!(tx.payload.get(4), Some(&0x00));
1153 let event: Option<ServerEvent<DPlus>> = core.pop_event();
1154 assert!(matches!(event, Some(ServerEvent::ClientUnlinked { .. })));
1155 Ok(())
1156 }
1157
1158 #[test]
1159 fn dplus_poll_emits_echo() -> TestResult {
1160 let mut core = ServerSessionCore::new(ProtocolKind::DPlus, DPLUS_PEER, Module::C);
1161 let mut buf1 = [0u8; 16];
1162 let n = encode_link1(&mut buf1)?;
1163 core.handle_input(Instant::now(), buf1.get(..n).ok_or("bytes")?)?;
1164 let _ = core.pop_transmit(Instant::now());
1165 let mut buf2 = [0u8; 32];
1166 let n = encode_link2(&mut buf2, &cs(*b"W1AW "))?;
1167 core.handle_input(Instant::now(), buf2.get(..n).ok_or("bytes")?)?;
1168 let _ = core.pop_transmit(Instant::now());
1169 let _ev: Option<ServerEvent<DPlus>> = core.pop_event();
1170
1171 let mut pb = [0u8; 16];
1172 let n = dplus_encode_poll(&mut pb)?;
1173 core.handle_input(Instant::now(), pb.get(..n).ok_or("bytes")?)?;
1174
1175 let tx = core.pop_transmit(Instant::now()).ok_or("echo")?;
1176 assert_eq!(tx.payload.len(), 3, "DPlus poll echo is 3 bytes");
1177 assert_eq!(tx.payload, &[0x03, 0x60, 0x00]);
1178 Ok(())
1179 }
1180
1181 #[test]
1184 fn dcs_link_transitions_to_linked_and_acks() -> TestResult {
1185 let mut core = ServerSessionCore::new(ProtocolKind::Dcs, DCS_PEER, Module::C);
1186 let mut buf = [0u8; 520];
1187 let n = dcs_encode_link(
1188 &mut buf,
1189 &cs(*b"W1AW "),
1190 Module::B,
1191 Module::C,
1192 &cs(*b"DCS030 "),
1193 crate::codec::dcs::GatewayType::Repeater,
1194 )?;
1195 core.handle_input(Instant::now(), buf.get(..n).ok_or("bytes")?)?;
1196
1197 assert_eq!(core.state_kind(), ServerStateKind::Linked);
1198 assert_eq!(core.client_callsign(), Some(cs(*b"W1AW ")));
1199 assert_eq!(core.client_module(), Some(Module::B));
1200
1201 let tx = core.pop_transmit(Instant::now()).ok_or("ack")?;
1202 assert_eq!(tx.payload.len(), 14, "DCS ACK is 14 bytes");
1203 assert_eq!(tx.payload.get(10..13), Some(b"ACK".as_slice()));
1204
1205 let event: Option<ServerEvent<Dcs>> = core.pop_event();
1206 assert!(matches!(event, Some(ServerEvent::ClientLinked { .. })));
1207 Ok(())
1208 }
1209
1210 fn non_eot_frame() -> VoiceFrame {
1215 VoiceFrame {
1216 ambe: [0x11; 9],
1217 slow_data: [0x22; 3],
1218 }
1219 }
1220
1221 #[test]
1222 fn dcs_voice_from_linked_state_starts_stream() -> TestResult {
1223 let mut core = ServerSessionCore::new(ProtocolKind::Dcs, DCS_PEER, Module::C);
1224 let mut lb = [0u8; 520];
1225 let n = dcs_encode_link(
1226 &mut lb,
1227 &cs(*b"W1AW "),
1228 Module::B,
1229 Module::C,
1230 &cs(*b"DCS030 "),
1231 crate::codec::dcs::GatewayType::Repeater,
1232 )?;
1233 core.handle_input(Instant::now(), lb.get(..n).ok_or("bytes")?)?;
1234 let _ack = core.pop_transmit(Instant::now());
1235 let _ev: Option<ServerEvent<Dcs>> = core.pop_event();
1236
1237 let frame = non_eot_frame();
1238 let mut vb = [0u8; 128];
1239 let n = dcs_encode_voice(
1240 &mut vb,
1241 &test_header(*b"W1AW "),
1242 sid(0xBEEF),
1243 0,
1244 &frame,
1245 false,
1246 )?;
1247 core.handle_input(Instant::now(), vb.get(..n).ok_or("bytes")?)?;
1248
1249 assert_eq!(core.state_kind(), ServerStateKind::Streaming);
1250 let ev1: Option<ServerEvent<Dcs>> = core.pop_event();
1251 assert!(matches!(ev1, Some(ServerEvent::ClientStreamStarted { .. })));
1252 let ev2: Option<ServerEvent<Dcs>> = core.pop_event();
1253 assert!(matches!(ev2, Some(ServerEvent::ClientStreamFrame { .. })));
1254 Ok(())
1255 }
1256
1257 #[test]
1258 fn dcs_voice_mid_stream_emits_frame() -> TestResult {
1259 let mut core = ServerSessionCore::new(ProtocolKind::Dcs, DCS_PEER, Module::C);
1260 let mut lb = [0u8; 520];
1261 let n = dcs_encode_link(
1262 &mut lb,
1263 &cs(*b"W1AW "),
1264 Module::B,
1265 Module::C,
1266 &cs(*b"DCS030 "),
1267 crate::codec::dcs::GatewayType::Repeater,
1268 )?;
1269 core.handle_input(Instant::now(), lb.get(..n).ok_or("bytes")?)?;
1270 let _a = core.pop_transmit(Instant::now());
1271 let _ev: Option<ServerEvent<Dcs>> = core.pop_event();
1272
1273 let frame = non_eot_frame();
1274 let mut vb = [0u8; 128];
1275 let n = dcs_encode_voice(
1276 &mut vb,
1277 &test_header(*b"W1AW "),
1278 sid(0xBEEF),
1279 0,
1280 &frame,
1281 false,
1282 )?;
1283 core.handle_input(Instant::now(), vb.get(..n).ok_or("bytes")?)?;
1284 let _ev: Option<ServerEvent<Dcs>> = core.pop_event();
1285 let _ev: Option<ServerEvent<Dcs>> = core.pop_event();
1286
1287 let mut vb2 = [0u8; 128];
1288 let n = dcs_encode_voice(
1289 &mut vb2,
1290 &test_header(*b"W1AW "),
1291 sid(0xBEEF),
1292 1,
1293 &frame,
1294 false,
1295 )?;
1296 core.handle_input(Instant::now(), vb2.get(..n).ok_or("bytes")?)?;
1297
1298 let ev: Option<ServerEvent<Dcs>> = core.pop_event();
1299 assert!(matches!(
1300 ev,
1301 Some(ServerEvent::ClientStreamFrame { seq: 1, .. })
1302 ));
1303 assert!(core.pop_event::<Dcs>().is_none());
1304 Ok(())
1305 }
1306
1307 #[test]
1308 fn dcs_voice_with_is_end_transitions_back_to_linked() -> TestResult {
1309 let mut core = ServerSessionCore::new(ProtocolKind::Dcs, DCS_PEER, Module::C);
1310 let mut lb = [0u8; 520];
1311 let n = dcs_encode_link(
1312 &mut lb,
1313 &cs(*b"W1AW "),
1314 Module::B,
1315 Module::C,
1316 &cs(*b"DCS030 "),
1317 crate::codec::dcs::GatewayType::Repeater,
1318 )?;
1319 core.handle_input(Instant::now(), lb.get(..n).ok_or("bytes")?)?;
1320 let _ = core.pop_transmit(Instant::now());
1321 let _ev: Option<ServerEvent<Dcs>> = core.pop_event();
1322
1323 let frame = VoiceFrame::silence();
1324 let mut vb = [0u8; 128];
1325 let n = dcs_encode_voice(
1326 &mut vb,
1327 &test_header(*b"W1AW "),
1328 sid(0xBEEF),
1329 5,
1330 &frame,
1331 true,
1332 )?;
1333 core.handle_input(Instant::now(), vb.get(..n).ok_or("bytes")?)?;
1334
1335 assert_eq!(core.state_kind(), ServerStateKind::Linked);
1336 let _a: Option<ServerEvent<Dcs>> = core.pop_event();
1337 let _b: Option<ServerEvent<Dcs>> = core.pop_event();
1338 let end: Option<ServerEvent<Dcs>> = core.pop_event();
1339 assert!(matches!(end, Some(ServerEvent::ClientStreamEnded { .. })));
1340 Ok(())
1341 }
1342
1343 #[test]
1344 fn dcs_poll_emits_echo() -> TestResult {
1345 let mut core = ServerSessionCore::new(ProtocolKind::Dcs, DCS_PEER, Module::C);
1346 let mut lb = [0u8; 520];
1347 let n = dcs_encode_link(
1348 &mut lb,
1349 &cs(*b"W1AW "),
1350 Module::B,
1351 Module::C,
1352 &cs(*b"DCS030 "),
1353 crate::codec::dcs::GatewayType::Repeater,
1354 )?;
1355 core.handle_input(Instant::now(), lb.get(..n).ok_or("bytes")?)?;
1356 let _ack = core.pop_transmit(Instant::now());
1357 let _ev: Option<ServerEvent<Dcs>> = core.pop_event();
1358
1359 let mut pb = [0u8; 32];
1360 let n = dcs_encode_poll(&mut pb, &cs(*b"W1AW "), &cs(*b"DCS030 "))?;
1361 core.handle_input(Instant::now(), pb.get(..n).ok_or("bytes")?)?;
1362
1363 let tx = core.pop_transmit(Instant::now()).ok_or("echo")?;
1364 assert_eq!(tx.payload.len(), 17, "DCS poll echo is 17 bytes");
1365 Ok(())
1366 }
1367
1368 #[test]
1369 fn dcs_unlink_transitions_to_closed() -> TestResult {
1370 let mut core = ServerSessionCore::new(ProtocolKind::Dcs, DCS_PEER, Module::C);
1371 let mut lb = [0u8; 520];
1372 let n = dcs_encode_link(
1373 &mut lb,
1374 &cs(*b"W1AW "),
1375 Module::B,
1376 Module::C,
1377 &cs(*b"DCS030 "),
1378 crate::codec::dcs::GatewayType::Repeater,
1379 )?;
1380 core.handle_input(Instant::now(), lb.get(..n).ok_or("bytes")?)?;
1381 let _ack = core.pop_transmit(Instant::now());
1382 let _ev: Option<ServerEvent<Dcs>> = core.pop_event();
1383
1384 let mut ub = [0u8; 32];
1385 let n = dcs_encode_unlink(&mut ub, &cs(*b"W1AW "), Module::B, &cs(*b"DCS030 "))?;
1386 core.handle_input(Instant::now(), ub.get(..n).ok_or("bytes")?)?;
1387
1388 assert_eq!(core.state_kind(), ServerStateKind::Closed);
1389 let event: Option<ServerEvent<Dcs>> = core.pop_event();
1390 assert!(matches!(event, Some(ServerEvent::ClientUnlinked { .. })));
1391 Ok(())
1392 }
1393
1394 #[test]
1397 fn next_deadline_is_none_on_empty_outbox() {
1398 let core = ServerSessionCore::new(ProtocolKind::DExtra, PEER, Module::C);
1399 assert!(core.next_deadline().is_none());
1400 }
1401}