dstar_gateway/tokio_shell/
handle.rs

1//! User-facing handle for an async session running in a spawned task.
2
3use std::marker::PhantomData;
4use std::sync::Arc;
5
6use dstar_gateway_core::header::DStarHeader;
7use dstar_gateway_core::session::client::{Connected, Event, Protocol, Session};
8use dstar_gateway_core::types::StreamId;
9use dstar_gateway_core::voice::VoiceFrame;
10use tokio::net::UdpSocket;
11use tokio::sync::mpsc;
12
13use super::{Command, SessionLoop, ShellError};
14
15/// Capacity of the command channel between handle and session task.
16///
17/// Voice commands are small (header/voice/eot) and arrive at a
18/// modest rate (≈50 frames/s max). 32 provides headroom for bursts
19/// without unbounded memory use. If the consumer is running
20/// behind, `send_voice` awaits backpressure rather than blocking
21/// unboundedly.
22const COMMAND_CHANNEL_CAPACITY: usize = 32;
23
24/// Capacity of the event channel from session task to handle.
25///
26/// Events are produced by the loop and consumed by the user via
27/// `next_event`. A deeper buffer here lets the loop keep running
28/// while the consumer is processing the previous batch. 256 frames
29/// is enough to cover a full 5-second stream of voice data (rough
30/// upper bound of ≈100 frames) plus some headroom.
31const EVENT_CHANNEL_CAPACITY: usize = 256;
32
33/// Async handle to a session running in a spawned tokio task.
34///
35/// Methods translate to commands sent over an internal channel and
36/// reply over a oneshot. Dropping the handle severs the connection
37/// from the consumer side; the spawned task exits on its next loop.
38///
39/// **Drop is not graceful** — for graceful shutdown call
40/// [`AsyncSession::disconnect`]. Drop just severs the connection from
41/// the consumer's side; the reflector eventually times the link out
42/// via inactivity.
43#[derive(Debug)]
44pub struct AsyncSession<P: Protocol> {
45    pub(crate) command_tx: mpsc::Sender<Command>,
46    pub(crate) event_rx: mpsc::Receiver<Event<P>>,
47    pub(crate) _protocol: PhantomData<P>,
48}
49
50impl<P: Protocol> AsyncSession<P> {
51    /// Spawn the session loop on the current tokio runtime and
52    /// return a handle for controlling it.
53    ///
54    /// The `session` must already be in the [`Connected`] state
55    /// (typically via `Session::<P, Connecting>::promote` after
56    /// observing [`Event::Connected`] from the handshake). The
57    /// `socket` must be bound (typically via `UdpSocket::bind`).
58    ///
59    /// The loop runs until the handle is dropped, the consumer's
60    /// command channel closes, or a fatal I/O error occurs.
61    ///
62    /// # Example
63    ///
64    /// ```no_run
65    /// use std::sync::Arc;
66    /// use dstar_gateway::tokio_shell::AsyncSession;
67    /// use dstar_gateway_core::session::client::{Connected, DExtra, Session};
68    /// use tokio::net::UdpSocket;
69    ///
70    /// # async fn demo(connected: Session<DExtra, Connected>) -> Result<(), Box<dyn std::error::Error>> {
71    /// let sock = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
72    /// let mut shell = AsyncSession::spawn(connected, sock);
73    /// while let Some(event) = shell.next_event().await {
74    ///     println!("{event:?}");
75    /// }
76    /// # Ok(()) }
77    /// ```
78    #[must_use]
79    pub fn spawn(session: Session<P, Connected>, socket: Arc<UdpSocket>) -> Self
80    where
81        P: Send + 'static,
82    {
83        let (command_tx, command_rx) = mpsc::channel(COMMAND_CHANNEL_CAPACITY);
84        let (event_tx, event_rx) = mpsc::channel(EVENT_CHANNEL_CAPACITY);
85
86        let inner_loop = SessionLoop {
87            session,
88            socket,
89            event_tx,
90            command_rx,
91        };
92
93        drop(tokio::spawn(async move {
94            // Loop errors bubble up as `Err`; the consumer sees
95            // `SessionClosed` via the event channel closing when the
96            // task exits.
97            drop(inner_loop.run().await);
98        }));
99
100        Self {
101            command_tx,
102            event_rx,
103            _protocol: PhantomData,
104        }
105    }
106
107    /// Pull the next event from the inbound stream.
108    ///
109    /// Returns `None` once the session task has exited and the event
110    /// channel has been fully drained.
111    ///
112    /// # Cancellation safety
113    ///
114    /// This method is cancel-safe. It only awaits a `tokio::sync::mpsc`
115    /// receiver, which is documented as cancel-safe: dropping the future
116    /// leaves the channel in a clean state and any undelivered events
117    /// remain queued for the next call.
118    pub async fn next_event(&mut self) -> Option<Event<P>> {
119        self.event_rx.recv().await
120    }
121
122    /// Send a voice header and start a new outbound voice stream.
123    ///
124    /// # Errors
125    ///
126    /// - [`ShellError::SessionClosed`] if the session task has exited
127    /// - [`ShellError::Core`] if the encoder rejects the header
128    ///
129    /// # Cancellation safety
130    ///
131    /// This method is cancel-safe. The method enqueues a [`Command`]
132    /// on the command channel and awaits a oneshot reply. If the future
133    /// is dropped before the enqueue completes no command is sent; if
134    /// it is dropped after the enqueue the session task still executes
135    /// the command and the (now-orphaned) oneshot reply is simply
136    /// discarded. Either way the session state remains consistent.
137    pub async fn send_header(
138        &mut self,
139        header: DStarHeader,
140        stream_id: StreamId,
141    ) -> Result<(), ShellError> {
142        let (tx, rx) = tokio::sync::oneshot::channel();
143        self.command_tx
144            .send(Command::SendHeader {
145                header: Box::new(header),
146                stream_id,
147                reply: tx,
148            })
149            .await
150            .map_err(|_| ShellError::SessionClosed)?;
151        rx.await.map_err(|_| ShellError::SessionClosed)?
152    }
153
154    /// Send a voice data frame.
155    ///
156    /// # Errors
157    ///
158    /// - [`ShellError::SessionClosed`] if the session task has exited
159    /// - [`ShellError::Core`] if the encoder rejects the frame
160    ///
161    /// # Cancellation safety
162    ///
163    /// This method is cancel-safe under the same rules as
164    /// [`Self::send_header`]. Dropping the future either before the
165    /// command is enqueued or after it has been dispatched leaves the
166    /// session in a coherent state; orphaning the oneshot reply is
167    /// harmless.
168    pub async fn send_voice(
169        &mut self,
170        stream_id: StreamId,
171        seq: u8,
172        frame: VoiceFrame,
173    ) -> Result<(), ShellError> {
174        let (tx, rx) = tokio::sync::oneshot::channel();
175        self.command_tx
176            .send(Command::SendVoice {
177                stream_id,
178                seq,
179                frame: Box::new(frame),
180                reply: tx,
181            })
182            .await
183            .map_err(|_| ShellError::SessionClosed)?;
184        rx.await.map_err(|_| ShellError::SessionClosed)?
185    }
186
187    /// Send a voice EOT and close the outbound stream.
188    ///
189    /// # Errors
190    ///
191    /// - [`ShellError::SessionClosed`] if the session task has exited
192    /// - [`ShellError::Core`] if the encoder rejects the EOT
193    ///
194    /// # Cancellation safety
195    ///
196    /// This method is cancel-safe under the same rules as
197    /// [`Self::send_header`].
198    pub async fn send_eot(&mut self, stream_id: StreamId, seq: u8) -> Result<(), ShellError> {
199        let (tx, rx) = tokio::sync::oneshot::channel();
200        self.command_tx
201            .send(Command::SendEot {
202                stream_id,
203                seq,
204                reply: tx,
205            })
206            .await
207            .map_err(|_| ShellError::SessionClosed)?;
208        rx.await.map_err(|_| ShellError::SessionClosed)?
209    }
210
211    /// Request a graceful disconnect.
212    ///
213    /// Sends an UNLINK to the reflector and returns when the loop
214    /// has enqueued it. The caller should continue polling
215    /// [`Self::next_event`] until [`Event::Disconnected`] arrives,
216    /// then drop the session.
217    ///
218    /// # Errors
219    ///
220    /// - [`ShellError::SessionClosed`] if the session task has exited
221    ///
222    /// # Cancellation safety
223    ///
224    /// This method is **not** cancel-safe. Cancelling the future drives
225    /// a state-machine transition (`Connected` → `Disconnecting`) that
226    /// may be partially complete: the UNLINK may already be in the
227    /// outbox even though the reply oneshot has been dropped. Callers
228    /// that cancel `disconnect()` should treat the session as
229    /// indeterminate and drop the handle rather than attempting further
230    /// sends. For graceful shutdown, always `await` this method to
231    /// completion before dropping the session.
232    pub async fn disconnect(&mut self) -> Result<(), ShellError> {
233        let (tx, rx) = tokio::sync::oneshot::channel();
234        self.command_tx
235            .send(Command::Disconnect { reply: tx })
236            .await
237            .map_err(|_| ShellError::SessionClosed)?;
238        rx.await.map_err(|_| ShellError::SessionClosed)?;
239        Ok(())
240    }
241}
242
243impl<P: Protocol> Drop for AsyncSession<P> {
244    fn drop(&mut self) {
245        // Dropping command_tx closes the channel, which signals
246        // the session task to exit on its next loop iteration.
247        // No explicit shutdown needed.
248    }
249}