mmdvm/tokio_shell/
handle.rs

1//! User-facing handle for an async MMDVM modem running in a spawned
2//! task.
3
4use mmdvm_core::ModemMode;
5use tokio::sync::{mpsc, oneshot};
6use tokio::task::JoinHandle;
7
8use crate::error::ShellError;
9use crate::transport::Transport;
10
11use super::{Command, Event, ModemLoop};
12
13/// Capacity of the command channel between handle and modem task.
14///
15/// Commands are small (frame enqueues, mode changes). 32 provides
16/// headroom for bursts without unbounded memory use. If the loop is
17/// running behind, `send_*` awaits backpressure rather than blocking
18/// unboundedly.
19const COMMAND_CHANNEL_CAPACITY: usize = 32;
20
21/// Capacity of the event channel from modem task to handle.
22///
23/// Events cover both periodic status pushes and inbound radio frames.
24/// 256 covers a full D-STAR transmission (~100 voice frames) plus
25/// status polls at 4 Hz with generous headroom.
26const EVENT_CHANNEL_CAPACITY: usize = 256;
27
28/// Async handle to an MMDVM modem running in a spawned tokio task.
29///
30/// The handle is generic over the transport type `T` so that
31/// [`AsyncModem::shutdown`] can recover the original transport for
32/// reuse (e.g. to send post-MMDVM CAT commands on the same serial
33/// port).
34///
35/// Dropping the handle closes the command channel, which causes the
36/// spawned loop to exit on its next iteration. For a graceful
37/// shutdown that also flushes the pending TX queue AND recovers the
38/// inner transport, call [`AsyncModem::shutdown`].
39#[derive(Debug)]
40pub struct AsyncModem<T: Transport + 'static> {
41    command_tx: mpsc::Sender<Command>,
42    event_rx: mpsc::Receiver<Event>,
43    join_handle: Option<JoinHandle<Result<T, ShellError>>>,
44}
45
46impl<T: Transport + 'static> AsyncModem<T> {
47    /// Spawn the modem loop on the current tokio runtime and return
48    /// a handle for controlling it.
49    ///
50    /// The `transport` must be an already-connected duplex byte
51    /// stream (serial port, Bluetooth SPP, test duplex). The shell
52    /// takes ownership; it is automatically dropped when the loop
53    /// exits.
54    ///
55    /// # Example
56    ///
57    /// ```no_run
58    /// use mmdvm::AsyncModem;
59    /// use tokio::io::duplex;
60    ///
61    /// # async fn demo() {
62    /// let (client, _modem_side) = duplex(4096);
63    /// let mut modem = AsyncModem::spawn(client);
64    /// while let Some(event) = modem.next_event().await {
65    ///     println!("{event:?}");
66    /// }
67    /// # }
68    /// ```
69    #[must_use]
70    pub fn spawn(transport: T) -> Self {
71        let (command_tx, command_rx) = mpsc::channel(COMMAND_CHANNEL_CAPACITY);
72        let (event_tx, event_rx) = mpsc::channel(EVENT_CHANNEL_CAPACITY);
73
74        let loop_state = ModemLoop::new(transport, command_rx, event_tx);
75        let join_handle = tokio::spawn(async move { loop_state.run().await });
76
77        Self {
78            command_tx,
79            event_rx,
80            join_handle: Some(join_handle),
81        }
82    }
83
84    /// Pull the next event from the modem loop.
85    ///
86    /// Returns `None` once the task has exited and the event channel
87    /// has been fully drained.
88    ///
89    /// # Cancellation safety
90    ///
91    /// Cancel-safe — backed by `tokio::sync::mpsc::Receiver::recv`.
92    pub async fn next_event(&mut self) -> Option<Event> {
93        self.event_rx.recv().await
94    }
95
96    /// Enqueue a D-STAR header for transmission.
97    ///
98    /// The frame is placed in the loop's TX queue and drained only
99    /// when the modem reports enough D-STAR FIFO space.
100    ///
101    /// # Errors
102    ///
103    /// - [`ShellError::SessionClosed`] if the loop has exited.
104    pub async fn send_dstar_header(&mut self, bytes: [u8; 41]) -> Result<(), ShellError> {
105        let (tx, rx) = oneshot::channel();
106        self.command_tx
107            .send(Command::SendDStarHeader { bytes, reply: tx })
108            .await
109            .map_err(|_| ShellError::SessionClosed)?;
110        rx.await.map_err(|_| ShellError::SessionClosed)?
111    }
112
113    /// Enqueue a D-STAR voice data frame for transmission.
114    ///
115    /// # Errors
116    ///
117    /// - [`ShellError::SessionClosed`] if the loop has exited.
118    pub async fn send_dstar_data(&mut self, bytes: [u8; 12]) -> Result<(), ShellError> {
119        // Hang-hunt: two awaits here. Trace both sides so a repro
120        // log narrows the freeze to "command_tx full" (ModemLoop
121        // not draining command_rx) vs. "reply never came"
122        // (ModemLoop received but stuck before replying).
123        let (tx, rx) = oneshot::channel();
124        tracing::trace!(
125            target: "mmdvm::hang_hunt",
126            cmd_cap = self.command_tx.capacity(),
127            "send_dstar_data: awaiting command_tx.send"
128        );
129        self.command_tx
130            .send(Command::SendDStarData { bytes, reply: tx })
131            .await
132            .map_err(|_| ShellError::SessionClosed)?;
133        tracing::trace!(
134            target: "mmdvm::hang_hunt",
135            "send_dstar_data: command queued, awaiting reply"
136        );
137        let r = rx.await.map_err(|_| ShellError::SessionClosed)?;
138        tracing::trace!(
139            target: "mmdvm::hang_hunt",
140            "send_dstar_data: reply received"
141        );
142        r
143    }
144
145    /// Enqueue a D-STAR end-of-transmission marker.
146    ///
147    /// # Errors
148    ///
149    /// - [`ShellError::SessionClosed`] if the loop has exited.
150    pub async fn send_dstar_eot(&mut self) -> Result<(), ShellError> {
151        let (tx, rx) = oneshot::channel();
152        self.command_tx
153            .send(Command::SendDStarEot { reply: tx })
154            .await
155            .map_err(|_| ShellError::SessionClosed)?;
156        rx.await.map_err(|_| ShellError::SessionClosed)?
157    }
158
159    /// Set the modem's operating mode.
160    ///
161    /// # Errors
162    ///
163    /// - [`ShellError::SessionClosed`] if the loop has exited.
164    /// - [`ShellError::Io`] if writing to the transport fails.
165    /// - [`ShellError::Core`] if the codec rejects the frame.
166    pub async fn set_mode(&mut self, mode: ModemMode) -> Result<(), ShellError> {
167        let (tx, rx) = oneshot::channel();
168        self.command_tx
169            .send(Command::SetMode { mode, reply: tx })
170            .await
171            .map_err(|_| ShellError::SessionClosed)?;
172        rx.await.map_err(|_| ShellError::SessionClosed)?
173    }
174
175    /// Trigger a `GetVersion` request. The response arrives as
176    /// [`Event::Version`].
177    ///
178    /// # Errors
179    ///
180    /// - [`ShellError::SessionClosed`] if the loop has exited.
181    /// - [`ShellError::Io`] if writing to the transport fails.
182    pub async fn request_version(&mut self) -> Result<(), ShellError> {
183        let (tx, rx) = oneshot::channel();
184        self.command_tx
185            .send(Command::GetVersion { reply: tx })
186            .await
187            .map_err(|_| ShellError::SessionClosed)?;
188        rx.await.map_err(|_| ShellError::SessionClosed)?
189    }
190
191    /// Trigger an immediate `GetStatus` request. The response
192    /// arrives as [`Event::Status`]. The loop also polls status
193    /// every 250 ms on its own, so this is only needed for explicit
194    /// "check now" flows.
195    ///
196    /// # Errors
197    ///
198    /// - [`ShellError::SessionClosed`] if the loop has exited.
199    /// - [`ShellError::Io`] if writing to the transport fails.
200    pub async fn request_status(&mut self) -> Result<(), ShellError> {
201        let (tx, rx) = oneshot::channel();
202        self.command_tx
203            .send(Command::GetStatus { reply: tx })
204            .await
205            .map_err(|_| ShellError::SessionClosed)?;
206        rx.await.map_err(|_| ShellError::SessionClosed)?
207    }
208
209    /// Send a raw frame — escape hatch for protocols we don't model
210    /// yet.
211    ///
212    /// # Errors
213    ///
214    /// - [`ShellError::SessionClosed`] if the loop has exited.
215    /// - [`ShellError::Io`] if writing to the transport fails.
216    /// - [`ShellError::Core`] if the codec rejects the frame (e.g.
217    ///   oversized payload).
218    pub async fn send_raw(&mut self, command: u8, payload: Vec<u8>) -> Result<(), ShellError> {
219        let (tx, rx) = oneshot::channel();
220        self.command_tx
221            .send(Command::SendRaw {
222                command,
223                payload,
224                reply: tx,
225            })
226            .await
227            .map_err(|_| ShellError::SessionClosed)?;
228        rx.await.map_err(|_| ShellError::SessionClosed)?
229    }
230
231    /// Graceful shutdown — flushes the TX queue, exits the loop, and
232    /// returns the recovered transport.
233    ///
234    /// Consumes the handle. After `shutdown` returns, the task has
235    /// fully wound down and ownership of the transport is handed back
236    /// to the caller so it can be reused (e.g. to switch back to CAT
237    /// mode on a serial port).
238    ///
239    /// # Errors
240    ///
241    /// - [`ShellError::SessionClosed`] if the loop had already exited
242    ///   before the shutdown command could be delivered, or the task
243    ///   panicked / was aborted before it could hand the transport back.
244    pub async fn shutdown(mut self) -> Result<T, ShellError> {
245        let (tx, rx) = oneshot::channel();
246        self.command_tx
247            .send(Command::Shutdown { reply: tx })
248            .await
249            .map_err(|_| ShellError::SessionClosed)?;
250        rx.await.map_err(|_| ShellError::SessionClosed)?;
251
252        // Drain any remaining events so the loop can finish its
253        // flush. Once the send half drops (when the loop exits), this
254        // loop terminates.
255        while self.event_rx.recv().await.is_some() {}
256
257        // Reclaim the transport from the task.
258        let handle = self.join_handle.take().ok_or(ShellError::SessionClosed)?;
259        match handle.await {
260            Ok(transport_result) => transport_result,
261            Err(_join_err) => Err(ShellError::SessionClosed),
262        }
263    }
264}
265
266impl<T: Transport + 'static> Drop for AsyncModem<T> {
267    fn drop(&mut self) {
268        // Dropping command_tx closes the channel, which signals the
269        // modem task to exit on its next loop iteration. The spawned
270        // task's JoinHandle is detached — if the caller never invoked
271        // `shutdown`, we do not await the task (awaiting in Drop would
272        // require blocking). The tokio runtime detaches the task and
273        // its transport will be dropped when the task finishes.
274    }
275}