mmdvm/tokio_shell/handle.rs
1//! User-facing handle for an async MMDVM modem running in a spawned
2//! task.
3
4use mmdvm_core::ModemMode;
5use tokio::sync::{mpsc, oneshot};
6use tokio::task::JoinHandle;
7
8use crate::error::ShellError;
9use crate::transport::Transport;
10
11use super::{Command, Event, ModemLoop};
12
13/// Capacity of the command channel between handle and modem task.
14///
15/// Commands are small (frame enqueues, mode changes). 32 provides
16/// headroom for bursts without unbounded memory use. If the loop is
17/// running behind, `send_*` awaits backpressure rather than blocking
18/// unboundedly.
19const COMMAND_CHANNEL_CAPACITY: usize = 32;
20
21/// Capacity of the event channel from modem task to handle.
22///
23/// Events cover both periodic status pushes and inbound radio frames.
24/// 256 covers a full D-STAR transmission (~100 voice frames) plus
25/// status polls at 4 Hz with generous headroom.
26const EVENT_CHANNEL_CAPACITY: usize = 256;
27
28/// Async handle to an MMDVM modem running in a spawned tokio task.
29///
30/// The handle is generic over the transport type `T` so that
31/// [`AsyncModem::shutdown`] can recover the original transport for
32/// reuse (e.g. to send post-MMDVM CAT commands on the same serial
33/// port).
34///
35/// Dropping the handle closes the command channel, which causes the
36/// spawned loop to exit on its next iteration. For a graceful
37/// shutdown that also flushes the pending TX queue AND recovers the
38/// inner transport, call [`AsyncModem::shutdown`].
39#[derive(Debug)]
40pub struct AsyncModem<T: Transport + 'static> {
41 command_tx: mpsc::Sender<Command>,
42 event_rx: mpsc::Receiver<Event>,
43 join_handle: Option<JoinHandle<Result<T, ShellError>>>,
44}
45
46impl<T: Transport + 'static> AsyncModem<T> {
47 /// Spawn the modem loop on the current tokio runtime and return
48 /// a handle for controlling it.
49 ///
50 /// The `transport` must be an already-connected duplex byte
51 /// stream (serial port, Bluetooth SPP, test duplex). The shell
52 /// takes ownership; it is automatically dropped when the loop
53 /// exits.
54 ///
55 /// # Example
56 ///
57 /// ```no_run
58 /// use mmdvm::AsyncModem;
59 /// use tokio::io::duplex;
60 ///
61 /// # async fn demo() {
62 /// let (client, _modem_side) = duplex(4096);
63 /// let mut modem = AsyncModem::spawn(client);
64 /// while let Some(event) = modem.next_event().await {
65 /// println!("{event:?}");
66 /// }
67 /// # }
68 /// ```
69 #[must_use]
70 pub fn spawn(transport: T) -> Self {
71 let (command_tx, command_rx) = mpsc::channel(COMMAND_CHANNEL_CAPACITY);
72 let (event_tx, event_rx) = mpsc::channel(EVENT_CHANNEL_CAPACITY);
73
74 let loop_state = ModemLoop::new(transport, command_rx, event_tx);
75 let join_handle = tokio::spawn(async move { loop_state.run().await });
76
77 Self {
78 command_tx,
79 event_rx,
80 join_handle: Some(join_handle),
81 }
82 }
83
84 /// Pull the next event from the modem loop.
85 ///
86 /// Returns `None` once the task has exited and the event channel
87 /// has been fully drained.
88 ///
89 /// # Cancellation safety
90 ///
91 /// Cancel-safe — backed by `tokio::sync::mpsc::Receiver::recv`.
92 pub async fn next_event(&mut self) -> Option<Event> {
93 self.event_rx.recv().await
94 }
95
96 /// Enqueue a D-STAR header for transmission.
97 ///
98 /// The frame is placed in the loop's TX queue and drained only
99 /// when the modem reports enough D-STAR FIFO space.
100 ///
101 /// # Errors
102 ///
103 /// - [`ShellError::SessionClosed`] if the loop has exited.
104 pub async fn send_dstar_header(&mut self, bytes: [u8; 41]) -> Result<(), ShellError> {
105 let (tx, rx) = oneshot::channel();
106 self.command_tx
107 .send(Command::SendDStarHeader { bytes, reply: tx })
108 .await
109 .map_err(|_| ShellError::SessionClosed)?;
110 rx.await.map_err(|_| ShellError::SessionClosed)?
111 }
112
113 /// Enqueue a D-STAR voice data frame for transmission.
114 ///
115 /// # Errors
116 ///
117 /// - [`ShellError::SessionClosed`] if the loop has exited.
118 pub async fn send_dstar_data(&mut self, bytes: [u8; 12]) -> Result<(), ShellError> {
119 // Hang-hunt: two awaits here. Trace both sides so a repro
120 // log narrows the freeze to "command_tx full" (ModemLoop
121 // not draining command_rx) vs. "reply never came"
122 // (ModemLoop received but stuck before replying).
123 let (tx, rx) = oneshot::channel();
124 tracing::trace!(
125 target: "mmdvm::hang_hunt",
126 cmd_cap = self.command_tx.capacity(),
127 "send_dstar_data: awaiting command_tx.send"
128 );
129 self.command_tx
130 .send(Command::SendDStarData { bytes, reply: tx })
131 .await
132 .map_err(|_| ShellError::SessionClosed)?;
133 tracing::trace!(
134 target: "mmdvm::hang_hunt",
135 "send_dstar_data: command queued, awaiting reply"
136 );
137 let r = rx.await.map_err(|_| ShellError::SessionClosed)?;
138 tracing::trace!(
139 target: "mmdvm::hang_hunt",
140 "send_dstar_data: reply received"
141 );
142 r
143 }
144
145 /// Enqueue a D-STAR end-of-transmission marker.
146 ///
147 /// # Errors
148 ///
149 /// - [`ShellError::SessionClosed`] if the loop has exited.
150 pub async fn send_dstar_eot(&mut self) -> Result<(), ShellError> {
151 let (tx, rx) = oneshot::channel();
152 self.command_tx
153 .send(Command::SendDStarEot { reply: tx })
154 .await
155 .map_err(|_| ShellError::SessionClosed)?;
156 rx.await.map_err(|_| ShellError::SessionClosed)?
157 }
158
159 /// Set the modem's operating mode.
160 ///
161 /// # Errors
162 ///
163 /// - [`ShellError::SessionClosed`] if the loop has exited.
164 /// - [`ShellError::Io`] if writing to the transport fails.
165 /// - [`ShellError::Core`] if the codec rejects the frame.
166 pub async fn set_mode(&mut self, mode: ModemMode) -> Result<(), ShellError> {
167 let (tx, rx) = oneshot::channel();
168 self.command_tx
169 .send(Command::SetMode { mode, reply: tx })
170 .await
171 .map_err(|_| ShellError::SessionClosed)?;
172 rx.await.map_err(|_| ShellError::SessionClosed)?
173 }
174
175 /// Trigger a `GetVersion` request. The response arrives as
176 /// [`Event::Version`].
177 ///
178 /// # Errors
179 ///
180 /// - [`ShellError::SessionClosed`] if the loop has exited.
181 /// - [`ShellError::Io`] if writing to the transport fails.
182 pub async fn request_version(&mut self) -> Result<(), ShellError> {
183 let (tx, rx) = oneshot::channel();
184 self.command_tx
185 .send(Command::GetVersion { reply: tx })
186 .await
187 .map_err(|_| ShellError::SessionClosed)?;
188 rx.await.map_err(|_| ShellError::SessionClosed)?
189 }
190
191 /// Trigger an immediate `GetStatus` request. The response
192 /// arrives as [`Event::Status`]. The loop also polls status
193 /// every 250 ms on its own, so this is only needed for explicit
194 /// "check now" flows.
195 ///
196 /// # Errors
197 ///
198 /// - [`ShellError::SessionClosed`] if the loop has exited.
199 /// - [`ShellError::Io`] if writing to the transport fails.
200 pub async fn request_status(&mut self) -> Result<(), ShellError> {
201 let (tx, rx) = oneshot::channel();
202 self.command_tx
203 .send(Command::GetStatus { reply: tx })
204 .await
205 .map_err(|_| ShellError::SessionClosed)?;
206 rx.await.map_err(|_| ShellError::SessionClosed)?
207 }
208
209 /// Send a raw frame — escape hatch for protocols we don't model
210 /// yet.
211 ///
212 /// # Errors
213 ///
214 /// - [`ShellError::SessionClosed`] if the loop has exited.
215 /// - [`ShellError::Io`] if writing to the transport fails.
216 /// - [`ShellError::Core`] if the codec rejects the frame (e.g.
217 /// oversized payload).
218 pub async fn send_raw(&mut self, command: u8, payload: Vec<u8>) -> Result<(), ShellError> {
219 let (tx, rx) = oneshot::channel();
220 self.command_tx
221 .send(Command::SendRaw {
222 command,
223 payload,
224 reply: tx,
225 })
226 .await
227 .map_err(|_| ShellError::SessionClosed)?;
228 rx.await.map_err(|_| ShellError::SessionClosed)?
229 }
230
231 /// Graceful shutdown — flushes the TX queue, exits the loop, and
232 /// returns the recovered transport.
233 ///
234 /// Consumes the handle. After `shutdown` returns, the task has
235 /// fully wound down and ownership of the transport is handed back
236 /// to the caller so it can be reused (e.g. to switch back to CAT
237 /// mode on a serial port).
238 ///
239 /// # Errors
240 ///
241 /// - [`ShellError::SessionClosed`] if the loop had already exited
242 /// before the shutdown command could be delivered, or the task
243 /// panicked / was aborted before it could hand the transport back.
244 pub async fn shutdown(mut self) -> Result<T, ShellError> {
245 let (tx, rx) = oneshot::channel();
246 self.command_tx
247 .send(Command::Shutdown { reply: tx })
248 .await
249 .map_err(|_| ShellError::SessionClosed)?;
250 rx.await.map_err(|_| ShellError::SessionClosed)?;
251
252 // Drain any remaining events so the loop can finish its
253 // flush. Once the send half drops (when the loop exits), this
254 // loop terminates.
255 while self.event_rx.recv().await.is_some() {}
256
257 // Reclaim the transport from the task.
258 let handle = self.join_handle.take().ok_or(ShellError::SessionClosed)?;
259 match handle.await {
260 Ok(transport_result) => transport_result,
261 Err(_join_err) => Err(ShellError::SessionClosed),
262 }
263 }
264}
265
266impl<T: Transport + 'static> Drop for AsyncModem<T> {
267 fn drop(&mut self) {
268 // Dropping command_tx closes the channel, which signals the
269 // modem task to exit on its next loop iteration. The spawned
270 // task's JoinHandle is detached — if the caller never invoked
271 // `shutdown`, we do not await the task (awaiting in Drop would
272 // require blocking). The tokio runtime detaches the task and
273 // its transport will be dropped when the task finishes.
274 }
275}