kenwood_thd75/transport/mmdvm_adapter.rs
1//! Adapter bridging [`crate::Transport`] to tokio's
2//! [`AsyncRead`] + [`AsyncWrite`] contracts.
3//!
4//! The [`mmdvm`] crate's tokio shell requires transports that implement
5//! [`tokio::io::AsyncRead`] and [`tokio::io::AsyncWrite`]. The
6//! [`crate::Transport`] trait exposes ergonomic `async fn read` /
7//! `async fn write` methods, which are incompatible at the trait-object
8//! level. This adapter converts the `async fn` interface into the
9//! poll-based interface tokio uses internally.
10//!
11//! # Implementation strategy
12//!
13//! A **pump task** owns the inner transport and serializes reads and
14//! writes via [`tokio::select!`]. The adapter communicates with the
15//! pump via two mpsc channels:
16//!
17//! - **Write channel** (`adapter → pump`): byte buffers to write.
18//! - **Read channel** (`pump → adapter`): byte buffers read from the
19//! transport, one `Vec<u8>` per [`crate::Transport::read`] call.
20//!
21//! The pump task's `select!` interleaves read and write operations on
22//! the same `T`, so a pending read never blocks an outgoing write (and
23//! vice versa). This mirrors the serialization that
24//! [`tokio::io::split`] provides for types that support an explicit
25//! half-split, without requiring `T` to support one.
26//!
27//! [`MmdvmTransportAdapter::into_inner`] closes the write channel,
28//! awaits the pump task's [`JoinHandle`], and recovers the inner `T`
29//! that the pump returned on clean exit.
30//!
31//! # Thread-affinity (macOS Bluetooth)
32//!
33//! The pump task is spawned with [`tokio::task::spawn_local`] so it
34//! runs on the same OS thread as the calling [`tokio::task::LocalSet`].
35//! This is **required** for [`crate::transport::BluetoothTransport`] on
36//! macOS: `IOBluetooth`'s RFCOMM channel callbacks are dispatched to the
37//! `CFRunLoop` of the thread that opened the channel (typically the
38//! main thread, before the tokio runtime starts). Pumping that runloop
39//! from a worker thread is a no-op — the callbacks never deliver data
40//! into the pipe that `BluetoothTransport::read` waits on. By keeping
41//! the pump on the same thread, every `bt_pump_runloop()` call drains
42//! pending callbacks where they actually live.
43//!
44//! Callers must therefore construct this adapter from inside a
45//! [`tokio::task::LocalSet`]. For the REPL/TUI, the top-level
46//! `run_repl` future is launched via `LocalSet::block_on`, satisfying
47//! this requirement transparently.
48
49use std::io;
50use std::pin::Pin;
51use std::task::{Context, Poll};
52
53use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
54use tokio::sync::mpsc;
55use tokio::task::JoinHandle;
56
57use crate::error::TransportError;
58
59use super::Transport;
60
61/// Channel capacity for outbound write buffers.
62///
63/// MMDVM frames are small (≤ 255 bytes) and send rates are modest; a
64/// small buffer here is plenty while still providing modest
65/// backpressure if the pump task ever falls behind.
66const WRITE_CHANNEL_CAPACITY: usize = 64;
67
68/// Channel capacity for inbound read buffers.
69///
70/// Read chunks are up to [`READ_CHUNK_SIZE`] bytes each; 64 slots is
71/// over 30 KiB of burst capacity, far beyond anything the MMDVM
72/// protocol produces.
73const READ_CHANNEL_CAPACITY: usize = 64;
74
75/// Size of each scratch buffer the pump task uses for one
76/// [`Transport::read`] call.
77const READ_CHUNK_SIZE: usize = 512;
78
79/// Adapter that presents a [`crate::Transport`] as a tokio
80/// [`AsyncRead`] + [`AsyncWrite`] + [`Send`] + [`Unpin`] duplex stream.
81///
82/// See the [module-level docs](self) for the pump-task architecture
83/// and rationale.
84pub struct MmdvmTransportAdapter<T: Transport + 'static> {
85 /// Buffered bytes from the latest read that didn't fit in the
86 /// caller's [`ReadBuf`]. Drained first by [`Self::poll_read`]
87 /// before pulling more from [`Self::read_rx`].
88 leftover: Vec<u8>,
89 /// Inbound byte buffers from the pump task.
90 read_rx: mpsc::Receiver<io::Result<Vec<u8>>>,
91 /// Outbound byte buffers to the pump task.
92 write_tx: mpsc::Sender<Vec<u8>>,
93 /// Join handle for the pump task. Dropping the adapter without
94 /// [`Self::into_inner`] still cleanly terminates the pump via the
95 /// channel close; the join handle is retained only so
96 /// [`Self::into_inner`] can await the pump and recover `T`.
97 pump: Option<JoinHandle<T>>,
98}
99
100impl<T: Transport + 'static> std::fmt::Debug for MmdvmTransportAdapter<T> {
101 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102 f.debug_struct("MmdvmTransportAdapter")
103 .field("leftover_len", &self.leftover.len())
104 .finish_non_exhaustive()
105 }
106}
107
108impl<T: Transport + 'static> MmdvmTransportAdapter<T> {
109 /// Wrap an existing transport.
110 ///
111 /// Spawns the pump task on the current [`tokio::task::LocalSet`]
112 /// via [`tokio::task::spawn_local`]. **Panics** if no `LocalSet`
113 /// is active — see the [module-level docs](self) for why this is
114 /// required (macOS Bluetooth thread-affinity).
115 #[must_use]
116 pub fn new(inner: T) -> Self {
117 let (write_tx, write_rx) = mpsc::channel::<Vec<u8>>(WRITE_CHANNEL_CAPACITY);
118 let (read_tx, read_rx) = mpsc::channel::<io::Result<Vec<u8>>>(READ_CHANNEL_CAPACITY);
119 let pump = tokio::task::spawn_local(pump_task(inner, write_rx, read_tx));
120 Self {
121 leftover: Vec::new(),
122 read_rx,
123 write_tx,
124 pump: Some(pump),
125 }
126 }
127
128 /// Recover the inner transport after the adapter's consumer has
129 /// finished with it.
130 ///
131 /// Closes the write channel, which signals the pump task to drop
132 /// the transport cleanly. Then awaits the pump's [`JoinHandle`]
133 /// to recover the inner `T`. Call this after
134 /// [`mmdvm::AsyncModem::shutdown`] has returned — by then the
135 /// modem loop has released the adapter and the pump's write
136 /// channel will close as soon as the adapter is dropped... but
137 /// we own the adapter here, so closing happens via explicit drop
138 /// of `write_tx` below.
139 ///
140 /// # Errors
141 ///
142 /// Returns [`io::Error`] if:
143 /// - The pump task panicked (join fails).
144 /// - The pump exited because of a transport error before we asked
145 /// it to shut down.
146 pub async fn into_inner(mut self) -> io::Result<T> {
147 // Drop the write sender; this closes the write channel,
148 // causing the pump task to break out of its loop and return
149 // the transport.
150 drop(std::mem::replace(
151 &mut self.write_tx,
152 mpsc::channel(1).0, // placeholder, never used
153 ));
154 // Also drop the read receiver so the pump doesn't block
155 // trying to send a final read result on shutdown.
156 let _ = std::mem::replace(&mut self.read_rx, mpsc::channel(1).1);
157
158 let pump = self
159 .pump
160 .take()
161 .ok_or_else(|| io::Error::other("MmdvmTransportAdapter: pump already joined"))?;
162 match pump.await {
163 Ok(transport) => Ok(transport),
164 Err(join_err) => Err(io::Error::other(format!(
165 "MmdvmTransportAdapter: pump task panicked: {join_err}"
166 ))),
167 }
168 }
169}
170
171/// Convert a [`TransportError`] to [`io::Error`] for the tokio traits.
172fn transport_err_to_io(err: TransportError) -> io::Error {
173 match err {
174 TransportError::Disconnected(e) => io::Error::new(io::ErrorKind::BrokenPipe, e),
175 TransportError::Read(e) | TransportError::Write(e) => e,
176 TransportError::NotFound => io::Error::new(io::ErrorKind::NotFound, "device not found"),
177 TransportError::Open { path, source } => {
178 io::Error::new(source.kind(), format!("failed to open {path}: {source}"))
179 }
180 }
181}
182
183/// Background task that owns the transport and serializes reads and
184/// writes via [`tokio::select!`].
185///
186/// Exits and returns the transport when:
187/// - The write channel is closed by the adapter (normal shutdown).
188/// - A read or write fails (transport error).
189/// - The read receiver closes (consumer lost interest).
190///
191/// On any exit path the transport is returned so
192/// [`MmdvmTransportAdapter::into_inner`] can recover it for the next
193/// session (TX re-entry into CAT mode, etc.).
194async fn pump_task<T: Transport>(
195 mut transport: T,
196 mut write_rx: mpsc::Receiver<Vec<u8>>,
197 read_tx: mpsc::Sender<io::Result<Vec<u8>>>,
198) -> T {
199 let mut scratch = [0u8; READ_CHUNK_SIZE];
200 loop {
201 tokio::select! {
202 biased;
203
204 maybe_write = write_rx.recv() => {
205 let Some(data) = maybe_write else {
206 tracing::debug!(
207 target: "kenwood_thd75::transport::mmdvm_adapter",
208 "write channel closed; pump task exiting"
209 );
210 return transport;
211 };
212 tracing::trace!(
213 target: "mmdvm::hang_hunt",
214 len = data.len(),
215 "pump: write branch — calling transport.write"
216 );
217 if let Err(e) = transport.write(&data).await {
218 tracing::warn!(
219 target: "kenwood_thd75::transport::mmdvm_adapter",
220 error = %e,
221 "transport write failed; pump task exiting"
222 );
223 let _ = read_tx.send(Err(transport_err_to_io(e))).await;
224 return transport;
225 }
226 tracing::trace!(target: "mmdvm::hang_hunt", "pump: transport.write returned");
227 }
228
229 read_result = transport.read(&mut scratch) => {
230 match read_result {
231 Ok(0) => {
232 tracing::debug!(
233 target: "kenwood_thd75::transport::mmdvm_adapter",
234 "transport read returned EOF; pump exiting"
235 );
236 let _ = read_tx
237 .send(Err(io::Error::new(
238 io::ErrorKind::UnexpectedEof,
239 "transport EOF",
240 )))
241 .await;
242 return transport;
243 }
244 Ok(n) => {
245 let Some(slice) = scratch.get(..n) else {
246 tracing::warn!(
247 target: "kenwood_thd75::transport::mmdvm_adapter",
248 got = n,
249 cap = READ_CHUNK_SIZE,
250 "transport read reported impossible length; dropping"
251 );
252 continue;
253 };
254 let bytes = slice.to_vec();
255 tracing::trace!(
256 target: "mmdvm::hang_hunt",
257 len = bytes.len(),
258 cap_remaining = read_tx.capacity(),
259 "pump: read branch — awaiting read_tx.send"
260 );
261 if read_tx.send(Ok(bytes)).await.is_err() {
262 tracing::debug!(
263 target: "kenwood_thd75::transport::mmdvm_adapter",
264 "read consumer closed; pump exiting"
265 );
266 return transport;
267 }
268 tracing::trace!(target: "mmdvm::hang_hunt", "pump: read_tx.send done");
269 }
270 Err(e) => {
271 tracing::warn!(
272 target: "kenwood_thd75::transport::mmdvm_adapter",
273 error = %e,
274 "transport read failed; pump exiting"
275 );
276 let _ = read_tx.send(Err(transport_err_to_io(e))).await;
277 return transport;
278 }
279 }
280 }
281 }
282 }
283}
284
285impl<T: Transport + Unpin + 'static> AsyncRead for MmdvmTransportAdapter<T> {
286 fn poll_read(
287 self: Pin<&mut Self>,
288 cx: &mut Context<'_>,
289 buf: &mut ReadBuf<'_>,
290 ) -> Poll<io::Result<()>> {
291 let this = self.get_mut();
292
293 // First drain anything left over from a previous oversize read.
294 if !this.leftover.is_empty() {
295 let take = this.leftover.len().min(buf.remaining());
296 let drained: Vec<u8> = this.leftover.drain(..take).collect();
297 buf.put_slice(&drained);
298 return Poll::Ready(Ok(()));
299 }
300
301 match this.read_rx.poll_recv(cx) {
302 Poll::Pending => Poll::Pending,
303 Poll::Ready(None) => {
304 // Pump exited. Treat as EOF so tokio's higher-level
305 // readers stop cleanly.
306 Poll::Ready(Ok(()))
307 }
308 Poll::Ready(Some(Err(e))) => Poll::Ready(Err(e)),
309 Poll::Ready(Some(Ok(bytes))) => {
310 let take = bytes.len().min(buf.remaining());
311 let (to_put, to_save) = bytes.split_at(take);
312 buf.put_slice(to_put);
313 if !to_save.is_empty() {
314 this.leftover.extend_from_slice(to_save);
315 }
316 Poll::Ready(Ok(()))
317 }
318 }
319 }
320}
321
322impl<T: Transport + Unpin + 'static> AsyncWrite for MmdvmTransportAdapter<T> {
323 fn poll_write(
324 self: Pin<&mut Self>,
325 cx: &mut Context<'_>,
326 buf: &[u8],
327 ) -> Poll<io::Result<usize>> {
328 let this = self.get_mut();
329 let data = buf.to_vec();
330 match this.write_tx.try_send(data) {
331 Ok(()) => Poll::Ready(Ok(buf.len())),
332 Err(mpsc::error::TrySendError::Full(_)) => {
333 // Pump task is briefly busy. Wake ourselves and
334 // retry shortly; the pump drains write_rx eagerly.
335 // A sustained hang-hunt trace here (many "Full" in
336 // a row with no matching pump write progress) means
337 // the pump task is wedged in FFI. The log is
338 // rate-limited by the spin itself — every retry
339 // emits one line, so "hundreds of Full in a millisecond"
340 // is the signal, not the volume.
341 tracing::trace!(
342 target: "mmdvm::hang_hunt",
343 cap_remaining = this.write_tx.capacity(),
344 "adapter.poll_write: write_tx FULL (will wake-retry)"
345 );
346 cx.waker().wake_by_ref();
347 Poll::Pending
348 }
349 Err(mpsc::error::TrySendError::Closed(_)) => Poll::Ready(Err(io::Error::new(
350 io::ErrorKind::BrokenPipe,
351 "MmdvmTransportAdapter: pump task exited",
352 ))),
353 }
354 }
355
356 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
357 // [`crate::Transport::write`] flushes synchronously and the
358 // pump task writes eagerly — once `poll_write` accepts a
359 // buffer, the pump will drain it on its next loop turn and
360 // call [`Transport::write`] which itself flushes. No explicit
361 // flush is needed here.
362 Poll::Ready(Ok(()))
363 }
364
365 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
366 // Shutdown is driven by dropping the adapter (which closes
367 // `write_tx` and terminates the pump). Returning `Ok` lets
368 // tokio's shutdown-on-drop path complete cleanly.
369 Poll::Ready(Ok(()))
370 }
371}
372
373#[cfg(test)]
374mod tests {
375 use super::*;
376 use crate::transport::MockTransport;
377 use tokio::io::{AsyncReadExt, AsyncWriteExt};
378 use tokio::task::LocalSet;
379
380 type TestResult = Result<(), Box<dyn std::error::Error>>;
381
382 #[tokio::test]
383 async fn roundtrip_write_then_read() -> TestResult {
384 LocalSet::new()
385 .run_until(async {
386 let mut mock = MockTransport::new();
387 mock.expect(b"PING\r", b"PONG\r");
388 let mut adapter = MmdvmTransportAdapter::new(mock);
389
390 adapter.write_all(b"PING\r").await?;
391 let mut buf = [0u8; 16];
392 let n = adapter.read(&mut buf).await?;
393 assert_eq!(buf.get(..n).ok_or("slice")?, b"PONG\r");
394 Ok(())
395 })
396 .await
397 }
398
399 #[tokio::test]
400 async fn into_inner_recovers_transport() -> TestResult {
401 LocalSet::new()
402 .run_until(async {
403 let mut mock = MockTransport::new();
404 mock.expect(b"X", b"Y");
405 let mut adapter = MmdvmTransportAdapter::new(mock);
406 adapter.write_all(b"X").await?;
407 let mut buf = [0u8; 1];
408 let n = adapter.read(&mut buf).await?;
409 assert_eq!(n, 1);
410
411 let recovered = adapter.into_inner().await?;
412 drop(recovered);
413 Ok(())
414 })
415 .await
416 }
417
418 #[tokio::test]
419 async fn into_inner_without_io_succeeds() -> TestResult {
420 LocalSet::new()
421 .run_until(async {
422 let mock = MockTransport::new();
423 let adapter = MmdvmTransportAdapter::new(mock);
424 let _mock = adapter.into_inner().await?;
425 Ok(())
426 })
427 .await
428 }
429}