dstar_gateway/tokio_shell/handle.rs
1//! User-facing handle for an async session running in a spawned task.
2
3use std::marker::PhantomData;
4use std::sync::Arc;
5
6use dstar_gateway_core::header::DStarHeader;
7use dstar_gateway_core::session::client::{Connected, Event, Protocol, Session};
8use dstar_gateway_core::types::StreamId;
9use dstar_gateway_core::voice::VoiceFrame;
10use tokio::net::UdpSocket;
11use tokio::sync::mpsc;
12
13use super::{Command, SessionLoop, ShellError};
14
15/// Capacity of the command channel between handle and session task.
16///
17/// Voice commands are small (header/voice/eot) and arrive at a
18/// modest rate (≈50 frames/s max). 32 provides headroom for bursts
19/// without unbounded memory use. If the consumer is running
20/// behind, `send_voice` awaits backpressure rather than blocking
21/// unboundedly.
22const COMMAND_CHANNEL_CAPACITY: usize = 32;
23
24/// Capacity of the event channel from session task to handle.
25///
26/// Events are produced by the loop and consumed by the user via
27/// `next_event`. A deeper buffer here lets the loop keep running
28/// while the consumer is processing the previous batch. 256 frames
29/// is enough to cover a full 5-second stream of voice data (rough
30/// upper bound of ≈100 frames) plus some headroom.
31const EVENT_CHANNEL_CAPACITY: usize = 256;
32
33/// Async handle to a session running in a spawned tokio task.
34///
35/// Methods translate to commands sent over an internal channel and
36/// reply over a oneshot. Dropping the handle severs the connection
37/// from the consumer side; the spawned task exits on its next loop.
38///
39/// **Drop is not graceful** — for graceful shutdown call
40/// [`AsyncSession::disconnect`]. Drop just severs the connection from
41/// the consumer's side; the reflector eventually times the link out
42/// via inactivity.
43#[derive(Debug)]
44pub struct AsyncSession<P: Protocol> {
45 pub(crate) command_tx: mpsc::Sender<Command>,
46 pub(crate) event_rx: mpsc::Receiver<Event<P>>,
47 pub(crate) _protocol: PhantomData<P>,
48}
49
50impl<P: Protocol> AsyncSession<P> {
51 /// Spawn the session loop on the current tokio runtime and
52 /// return a handle for controlling it.
53 ///
54 /// The `session` must already be in the [`Connected`] state
55 /// (typically via `Session::<P, Connecting>::promote` after
56 /// observing [`Event::Connected`] from the handshake). The
57 /// `socket` must be bound (typically via `UdpSocket::bind`).
58 ///
59 /// The loop runs until the handle is dropped, the consumer's
60 /// command channel closes, or a fatal I/O error occurs.
61 ///
62 /// # Example
63 ///
64 /// ```no_run
65 /// use std::sync::Arc;
66 /// use dstar_gateway::tokio_shell::AsyncSession;
67 /// use dstar_gateway_core::session::client::{Connected, DExtra, Session};
68 /// use tokio::net::UdpSocket;
69 ///
70 /// # async fn demo(connected: Session<DExtra, Connected>) -> Result<(), Box<dyn std::error::Error>> {
71 /// let sock = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
72 /// let mut shell = AsyncSession::spawn(connected, sock);
73 /// while let Some(event) = shell.next_event().await {
74 /// println!("{event:?}");
75 /// }
76 /// # Ok(()) }
77 /// ```
78 #[must_use]
79 pub fn spawn(session: Session<P, Connected>, socket: Arc<UdpSocket>) -> Self
80 where
81 P: Send + 'static,
82 {
83 let (command_tx, command_rx) = mpsc::channel(COMMAND_CHANNEL_CAPACITY);
84 let (event_tx, event_rx) = mpsc::channel(EVENT_CHANNEL_CAPACITY);
85
86 let inner_loop = SessionLoop {
87 session,
88 socket,
89 event_tx,
90 command_rx,
91 };
92
93 drop(tokio::spawn(async move {
94 // Loop errors bubble up as `Err`; the consumer sees
95 // `SessionClosed` via the event channel closing when the
96 // task exits.
97 drop(inner_loop.run().await);
98 }));
99
100 Self {
101 command_tx,
102 event_rx,
103 _protocol: PhantomData,
104 }
105 }
106
107 /// Pull the next event from the inbound stream.
108 ///
109 /// Returns `None` once the session task has exited and the event
110 /// channel has been fully drained.
111 ///
112 /// # Cancellation safety
113 ///
114 /// This method is cancel-safe. It only awaits a `tokio::sync::mpsc`
115 /// receiver, which is documented as cancel-safe: dropping the future
116 /// leaves the channel in a clean state and any undelivered events
117 /// remain queued for the next call.
118 pub async fn next_event(&mut self) -> Option<Event<P>> {
119 self.event_rx.recv().await
120 }
121
122 /// Send a voice header and start a new outbound voice stream.
123 ///
124 /// # Errors
125 ///
126 /// - [`ShellError::SessionClosed`] if the session task has exited
127 /// - [`ShellError::Core`] if the encoder rejects the header
128 ///
129 /// # Cancellation safety
130 ///
131 /// This method is cancel-safe. The method enqueues a [`Command`]
132 /// on the command channel and awaits a oneshot reply. If the future
133 /// is dropped before the enqueue completes no command is sent; if
134 /// it is dropped after the enqueue the session task still executes
135 /// the command and the (now-orphaned) oneshot reply is simply
136 /// discarded. Either way the session state remains consistent.
137 pub async fn send_header(
138 &mut self,
139 header: DStarHeader,
140 stream_id: StreamId,
141 ) -> Result<(), ShellError> {
142 let (tx, rx) = tokio::sync::oneshot::channel();
143 self.command_tx
144 .send(Command::SendHeader {
145 header: Box::new(header),
146 stream_id,
147 reply: tx,
148 })
149 .await
150 .map_err(|_| ShellError::SessionClosed)?;
151 rx.await.map_err(|_| ShellError::SessionClosed)?
152 }
153
154 /// Send a voice data frame.
155 ///
156 /// # Errors
157 ///
158 /// - [`ShellError::SessionClosed`] if the session task has exited
159 /// - [`ShellError::Core`] if the encoder rejects the frame
160 ///
161 /// # Cancellation safety
162 ///
163 /// This method is cancel-safe under the same rules as
164 /// [`Self::send_header`]. Dropping the future either before the
165 /// command is enqueued or after it has been dispatched leaves the
166 /// session in a coherent state; orphaning the oneshot reply is
167 /// harmless.
168 pub async fn send_voice(
169 &mut self,
170 stream_id: StreamId,
171 seq: u8,
172 frame: VoiceFrame,
173 ) -> Result<(), ShellError> {
174 let (tx, rx) = tokio::sync::oneshot::channel();
175 self.command_tx
176 .send(Command::SendVoice {
177 stream_id,
178 seq,
179 frame: Box::new(frame),
180 reply: tx,
181 })
182 .await
183 .map_err(|_| ShellError::SessionClosed)?;
184 rx.await.map_err(|_| ShellError::SessionClosed)?
185 }
186
187 /// Send a voice EOT and close the outbound stream.
188 ///
189 /// # Errors
190 ///
191 /// - [`ShellError::SessionClosed`] if the session task has exited
192 /// - [`ShellError::Core`] if the encoder rejects the EOT
193 ///
194 /// # Cancellation safety
195 ///
196 /// This method is cancel-safe under the same rules as
197 /// [`Self::send_header`].
198 pub async fn send_eot(&mut self, stream_id: StreamId, seq: u8) -> Result<(), ShellError> {
199 let (tx, rx) = tokio::sync::oneshot::channel();
200 self.command_tx
201 .send(Command::SendEot {
202 stream_id,
203 seq,
204 reply: tx,
205 })
206 .await
207 .map_err(|_| ShellError::SessionClosed)?;
208 rx.await.map_err(|_| ShellError::SessionClosed)?
209 }
210
211 /// Request a graceful disconnect.
212 ///
213 /// Sends an UNLINK to the reflector and returns when the loop
214 /// has enqueued it. The caller should continue polling
215 /// [`Self::next_event`] until [`Event::Disconnected`] arrives,
216 /// then drop the session.
217 ///
218 /// # Errors
219 ///
220 /// - [`ShellError::SessionClosed`] if the session task has exited
221 ///
222 /// # Cancellation safety
223 ///
224 /// This method is **not** cancel-safe. Cancelling the future drives
225 /// a state-machine transition (`Connected` → `Disconnecting`) that
226 /// may be partially complete: the UNLINK may already be in the
227 /// outbox even though the reply oneshot has been dropped. Callers
228 /// that cancel `disconnect()` should treat the session as
229 /// indeterminate and drop the handle rather than attempting further
230 /// sends. For graceful shutdown, always `await` this method to
231 /// completion before dropping the session.
232 pub async fn disconnect(&mut self) -> Result<(), ShellError> {
233 let (tx, rx) = tokio::sync::oneshot::channel();
234 self.command_tx
235 .send(Command::Disconnect { reply: tx })
236 .await
237 .map_err(|_| ShellError::SessionClosed)?;
238 rx.await.map_err(|_| ShellError::SessionClosed)?;
239 Ok(())
240 }
241}
242
243impl<P: Protocol> Drop for AsyncSession<P> {
244 fn drop(&mut self) {
245 // Dropping command_tx closes the channel, which signals
246 // the session task to exit on its next loop iteration.
247 // No explicit shutdown needed.
248 }
249}