kenwood_thd75/transport/
mmdvm_adapter.rs

1//! Adapter bridging [`crate::Transport`] to tokio's
2//! [`AsyncRead`] + [`AsyncWrite`] contracts.
3//!
4//! The [`mmdvm`] crate's tokio shell requires transports that implement
5//! [`tokio::io::AsyncRead`] and [`tokio::io::AsyncWrite`]. The
6//! [`crate::Transport`] trait exposes ergonomic `async fn read` /
7//! `async fn write` methods, which are incompatible at the trait-object
8//! level. This adapter converts the `async fn` interface into the
9//! poll-based interface tokio uses internally.
10//!
11//! # Implementation strategy
12//!
13//! A **pump task** owns the inner transport and serializes reads and
14//! writes via [`tokio::select!`]. The adapter communicates with the
15//! pump via two mpsc channels:
16//!
17//! - **Write channel** (`adapter → pump`): byte buffers to write.
18//! - **Read channel** (`pump → adapter`): byte buffers read from the
19//!   transport, one `Vec<u8>` per [`crate::Transport::read`] call.
20//!
21//! The pump task's `select!` interleaves read and write operations on
22//! the same `T`, so a pending read never blocks an outgoing write (and
23//! vice versa). This mirrors the serialization that
24//! [`tokio::io::split`] provides for types that support an explicit
25//! half-split, without requiring `T` to support one.
26//!
27//! [`MmdvmTransportAdapter::into_inner`] closes the write channel,
28//! awaits the pump task's [`JoinHandle`], and recovers the inner `T`
29//! that the pump returned on clean exit.
30//!
31//! # Thread-affinity (macOS Bluetooth)
32//!
33//! The pump task is spawned with [`tokio::task::spawn_local`] so it
34//! runs on the same OS thread as the calling [`tokio::task::LocalSet`].
35//! This is **required** for [`crate::transport::BluetoothTransport`] on
36//! macOS: `IOBluetooth`'s RFCOMM channel callbacks are dispatched to the
37//! `CFRunLoop` of the thread that opened the channel (typically the
38//! main thread, before the tokio runtime starts). Pumping that runloop
39//! from a worker thread is a no-op — the callbacks never deliver data
40//! into the pipe that `BluetoothTransport::read` waits on. By keeping
41//! the pump on the same thread, every `bt_pump_runloop()` call drains
42//! pending callbacks where they actually live.
43//!
44//! Callers must therefore construct this adapter from inside a
45//! [`tokio::task::LocalSet`]. For the REPL/TUI, the top-level
46//! `run_repl` future is launched via `LocalSet::block_on`, satisfying
47//! this requirement transparently.
48
49use std::io;
50use std::pin::Pin;
51use std::task::{Context, Poll};
52
53use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
54use tokio::sync::mpsc;
55use tokio::task::JoinHandle;
56
57use crate::error::TransportError;
58
59use super::Transport;
60
61/// Channel capacity for outbound write buffers.
62///
63/// MMDVM frames are small (≤ 255 bytes) and send rates are modest; a
64/// small buffer here is plenty while still providing modest
65/// backpressure if the pump task ever falls behind.
66const WRITE_CHANNEL_CAPACITY: usize = 64;
67
68/// Channel capacity for inbound read buffers.
69///
70/// Read chunks are up to [`READ_CHUNK_SIZE`] bytes each; 64 slots is
71/// over 30 KiB of burst capacity, far beyond anything the MMDVM
72/// protocol produces.
73const READ_CHANNEL_CAPACITY: usize = 64;
74
75/// Size of each scratch buffer the pump task uses for one
76/// [`Transport::read`] call.
77const READ_CHUNK_SIZE: usize = 512;
78
79/// Adapter that presents a [`crate::Transport`] as a tokio
80/// [`AsyncRead`] + [`AsyncWrite`] + [`Send`] + [`Unpin`] duplex stream.
81///
82/// See the [module-level docs](self) for the pump-task architecture
83/// and rationale.
84pub struct MmdvmTransportAdapter<T: Transport + 'static> {
85    /// Buffered bytes from the latest read that didn't fit in the
86    /// caller's [`ReadBuf`]. Drained first by [`Self::poll_read`]
87    /// before pulling more from [`Self::read_rx`].
88    leftover: Vec<u8>,
89    /// Inbound byte buffers from the pump task.
90    read_rx: mpsc::Receiver<io::Result<Vec<u8>>>,
91    /// Outbound byte buffers to the pump task.
92    write_tx: mpsc::Sender<Vec<u8>>,
93    /// Join handle for the pump task. Dropping the adapter without
94    /// [`Self::into_inner`] still cleanly terminates the pump via the
95    /// channel close; the join handle is retained only so
96    /// [`Self::into_inner`] can await the pump and recover `T`.
97    pump: Option<JoinHandle<T>>,
98}
99
100impl<T: Transport + 'static> std::fmt::Debug for MmdvmTransportAdapter<T> {
101    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102        f.debug_struct("MmdvmTransportAdapter")
103            .field("leftover_len", &self.leftover.len())
104            .finish_non_exhaustive()
105    }
106}
107
108impl<T: Transport + 'static> MmdvmTransportAdapter<T> {
109    /// Wrap an existing transport.
110    ///
111    /// Spawns the pump task on the current [`tokio::task::LocalSet`]
112    /// via [`tokio::task::spawn_local`]. **Panics** if no `LocalSet`
113    /// is active — see the [module-level docs](self) for why this is
114    /// required (macOS Bluetooth thread-affinity).
115    #[must_use]
116    pub fn new(inner: T) -> Self {
117        let (write_tx, write_rx) = mpsc::channel::<Vec<u8>>(WRITE_CHANNEL_CAPACITY);
118        let (read_tx, read_rx) = mpsc::channel::<io::Result<Vec<u8>>>(READ_CHANNEL_CAPACITY);
119        let pump = tokio::task::spawn_local(pump_task(inner, write_rx, read_tx));
120        Self {
121            leftover: Vec::new(),
122            read_rx,
123            write_tx,
124            pump: Some(pump),
125        }
126    }
127
128    /// Recover the inner transport after the adapter's consumer has
129    /// finished with it.
130    ///
131    /// Closes the write channel, which signals the pump task to drop
132    /// the transport cleanly. Then awaits the pump's [`JoinHandle`]
133    /// to recover the inner `T`. Call this after
134    /// [`mmdvm::AsyncModem::shutdown`] has returned — by then the
135    /// modem loop has released the adapter and the pump's write
136    /// channel will close as soon as the adapter is dropped... but
137    /// we own the adapter here, so closing happens via explicit drop
138    /// of `write_tx` below.
139    ///
140    /// # Errors
141    ///
142    /// Returns [`io::Error`] if:
143    /// - The pump task panicked (join fails).
144    /// - The pump exited because of a transport error before we asked
145    ///   it to shut down.
146    pub async fn into_inner(mut self) -> io::Result<T> {
147        // Drop the write sender; this closes the write channel,
148        // causing the pump task to break out of its loop and return
149        // the transport.
150        drop(std::mem::replace(
151            &mut self.write_tx,
152            mpsc::channel(1).0, // placeholder, never used
153        ));
154        // Also drop the read receiver so the pump doesn't block
155        // trying to send a final read result on shutdown.
156        let _ = std::mem::replace(&mut self.read_rx, mpsc::channel(1).1);
157
158        let pump = self
159            .pump
160            .take()
161            .ok_or_else(|| io::Error::other("MmdvmTransportAdapter: pump already joined"))?;
162        match pump.await {
163            Ok(transport) => Ok(transport),
164            Err(join_err) => Err(io::Error::other(format!(
165                "MmdvmTransportAdapter: pump task panicked: {join_err}"
166            ))),
167        }
168    }
169}
170
171/// Convert a [`TransportError`] to [`io::Error`] for the tokio traits.
172fn transport_err_to_io(err: TransportError) -> io::Error {
173    match err {
174        TransportError::Disconnected(e) => io::Error::new(io::ErrorKind::BrokenPipe, e),
175        TransportError::Read(e) | TransportError::Write(e) => e,
176        TransportError::NotFound => io::Error::new(io::ErrorKind::NotFound, "device not found"),
177        TransportError::Open { path, source } => {
178            io::Error::new(source.kind(), format!("failed to open {path}: {source}"))
179        }
180    }
181}
182
183/// Background task that owns the transport and serializes reads and
184/// writes via [`tokio::select!`].
185///
186/// Exits and returns the transport when:
187/// - The write channel is closed by the adapter (normal shutdown).
188/// - A read or write fails (transport error).
189/// - The read receiver closes (consumer lost interest).
190///
191/// On any exit path the transport is returned so
192/// [`MmdvmTransportAdapter::into_inner`] can recover it for the next
193/// session (TX re-entry into CAT mode, etc.).
194async fn pump_task<T: Transport>(
195    mut transport: T,
196    mut write_rx: mpsc::Receiver<Vec<u8>>,
197    read_tx: mpsc::Sender<io::Result<Vec<u8>>>,
198) -> T {
199    let mut scratch = [0u8; READ_CHUNK_SIZE];
200    loop {
201        tokio::select! {
202            biased;
203
204            maybe_write = write_rx.recv() => {
205                let Some(data) = maybe_write else {
206                    tracing::debug!(
207                        target: "kenwood_thd75::transport::mmdvm_adapter",
208                        "write channel closed; pump task exiting"
209                    );
210                    return transport;
211                };
212                tracing::trace!(
213                    target: "mmdvm::hang_hunt",
214                    len = data.len(),
215                    "pump: write branch — calling transport.write"
216                );
217                if let Err(e) = transport.write(&data).await {
218                    tracing::warn!(
219                        target: "kenwood_thd75::transport::mmdvm_adapter",
220                        error = %e,
221                        "transport write failed; pump task exiting"
222                    );
223                    let _ = read_tx.send(Err(transport_err_to_io(e))).await;
224                    return transport;
225                }
226                tracing::trace!(target: "mmdvm::hang_hunt", "pump: transport.write returned");
227            }
228
229            read_result = transport.read(&mut scratch) => {
230                match read_result {
231                    Ok(0) => {
232                        tracing::debug!(
233                            target: "kenwood_thd75::transport::mmdvm_adapter",
234                            "transport read returned EOF; pump exiting"
235                        );
236                        let _ = read_tx
237                            .send(Err(io::Error::new(
238                                io::ErrorKind::UnexpectedEof,
239                                "transport EOF",
240                            )))
241                            .await;
242                        return transport;
243                    }
244                    Ok(n) => {
245                        let Some(slice) = scratch.get(..n) else {
246                            tracing::warn!(
247                                target: "kenwood_thd75::transport::mmdvm_adapter",
248                                got = n,
249                                cap = READ_CHUNK_SIZE,
250                                "transport read reported impossible length; dropping"
251                            );
252                            continue;
253                        };
254                        let bytes = slice.to_vec();
255                        tracing::trace!(
256                            target: "mmdvm::hang_hunt",
257                            len = bytes.len(),
258                            cap_remaining = read_tx.capacity(),
259                            "pump: read branch — awaiting read_tx.send"
260                        );
261                        if read_tx.send(Ok(bytes)).await.is_err() {
262                            tracing::debug!(
263                                target: "kenwood_thd75::transport::mmdvm_adapter",
264                                "read consumer closed; pump exiting"
265                            );
266                            return transport;
267                        }
268                        tracing::trace!(target: "mmdvm::hang_hunt", "pump: read_tx.send done");
269                    }
270                    Err(e) => {
271                        tracing::warn!(
272                            target: "kenwood_thd75::transport::mmdvm_adapter",
273                            error = %e,
274                            "transport read failed; pump exiting"
275                        );
276                        let _ = read_tx.send(Err(transport_err_to_io(e))).await;
277                        return transport;
278                    }
279                }
280            }
281        }
282    }
283}
284
285impl<T: Transport + Unpin + 'static> AsyncRead for MmdvmTransportAdapter<T> {
286    fn poll_read(
287        self: Pin<&mut Self>,
288        cx: &mut Context<'_>,
289        buf: &mut ReadBuf<'_>,
290    ) -> Poll<io::Result<()>> {
291        let this = self.get_mut();
292
293        // First drain anything left over from a previous oversize read.
294        if !this.leftover.is_empty() {
295            let take = this.leftover.len().min(buf.remaining());
296            let drained: Vec<u8> = this.leftover.drain(..take).collect();
297            buf.put_slice(&drained);
298            return Poll::Ready(Ok(()));
299        }
300
301        match this.read_rx.poll_recv(cx) {
302            Poll::Pending => Poll::Pending,
303            Poll::Ready(None) => {
304                // Pump exited. Treat as EOF so tokio's higher-level
305                // readers stop cleanly.
306                Poll::Ready(Ok(()))
307            }
308            Poll::Ready(Some(Err(e))) => Poll::Ready(Err(e)),
309            Poll::Ready(Some(Ok(bytes))) => {
310                let take = bytes.len().min(buf.remaining());
311                let (to_put, to_save) = bytes.split_at(take);
312                buf.put_slice(to_put);
313                if !to_save.is_empty() {
314                    this.leftover.extend_from_slice(to_save);
315                }
316                Poll::Ready(Ok(()))
317            }
318        }
319    }
320}
321
322impl<T: Transport + Unpin + 'static> AsyncWrite for MmdvmTransportAdapter<T> {
323    fn poll_write(
324        self: Pin<&mut Self>,
325        cx: &mut Context<'_>,
326        buf: &[u8],
327    ) -> Poll<io::Result<usize>> {
328        let this = self.get_mut();
329        let data = buf.to_vec();
330        match this.write_tx.try_send(data) {
331            Ok(()) => Poll::Ready(Ok(buf.len())),
332            Err(mpsc::error::TrySendError::Full(_)) => {
333                // Pump task is briefly busy. Wake ourselves and
334                // retry shortly; the pump drains write_rx eagerly.
335                // A sustained hang-hunt trace here (many "Full" in
336                // a row with no matching pump write progress) means
337                // the pump task is wedged in FFI. The log is
338                // rate-limited by the spin itself — every retry
339                // emits one line, so "hundreds of Full in a millisecond"
340                // is the signal, not the volume.
341                tracing::trace!(
342                    target: "mmdvm::hang_hunt",
343                    cap_remaining = this.write_tx.capacity(),
344                    "adapter.poll_write: write_tx FULL (will wake-retry)"
345                );
346                cx.waker().wake_by_ref();
347                Poll::Pending
348            }
349            Err(mpsc::error::TrySendError::Closed(_)) => Poll::Ready(Err(io::Error::new(
350                io::ErrorKind::BrokenPipe,
351                "MmdvmTransportAdapter: pump task exited",
352            ))),
353        }
354    }
355
356    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
357        // [`crate::Transport::write`] flushes synchronously and the
358        // pump task writes eagerly — once `poll_write` accepts a
359        // buffer, the pump will drain it on its next loop turn and
360        // call [`Transport::write`] which itself flushes. No explicit
361        // flush is needed here.
362        Poll::Ready(Ok(()))
363    }
364
365    fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
366        // Shutdown is driven by dropping the adapter (which closes
367        // `write_tx` and terminates the pump). Returning `Ok` lets
368        // tokio's shutdown-on-drop path complete cleanly.
369        Poll::Ready(Ok(()))
370    }
371}
372
373#[cfg(test)]
374mod tests {
375    use super::*;
376    use crate::transport::MockTransport;
377    use tokio::io::{AsyncReadExt, AsyncWriteExt};
378    use tokio::task::LocalSet;
379
380    type TestResult = Result<(), Box<dyn std::error::Error>>;
381
382    #[tokio::test]
383    async fn roundtrip_write_then_read() -> TestResult {
384        LocalSet::new()
385            .run_until(async {
386                let mut mock = MockTransport::new();
387                mock.expect(b"PING\r", b"PONG\r");
388                let mut adapter = MmdvmTransportAdapter::new(mock);
389
390                adapter.write_all(b"PING\r").await?;
391                let mut buf = [0u8; 16];
392                let n = adapter.read(&mut buf).await?;
393                assert_eq!(buf.get(..n).ok_or("slice")?, b"PONG\r");
394                Ok(())
395            })
396            .await
397    }
398
399    #[tokio::test]
400    async fn into_inner_recovers_transport() -> TestResult {
401        LocalSet::new()
402            .run_until(async {
403                let mut mock = MockTransport::new();
404                mock.expect(b"X", b"Y");
405                let mut adapter = MmdvmTransportAdapter::new(mock);
406                adapter.write_all(b"X").await?;
407                let mut buf = [0u8; 1];
408                let n = adapter.read(&mut buf).await?;
409                assert_eq!(n, 1);
410
411                let recovered = adapter.into_inner().await?;
412                drop(recovered);
413                Ok(())
414            })
415            .await
416    }
417
418    #[tokio::test]
419    async fn into_inner_without_io_succeeds() -> TestResult {
420        LocalSet::new()
421            .run_until(async {
422                let mock = MockTransport::new();
423                let adapter = MmdvmTransportAdapter::new(mock);
424                let _mock = adapter.into_inner().await?;
425                Ok(())
426            })
427            .await
428    }
429}