//! common facade & shim helpers use bt_facade_proto::common::Data; use bt_packets::hci::Packet; use futures::sink::SinkExt; use grpcio::*; use std::sync::Arc; use tokio::runtime::Runtime; use tokio::sync::mpsc::Receiver; use tokio::sync::Mutex; /// Wrapper so we can invoke callbacks pub trait U8SliceRunnable { /// Do the thing fn run(&self, data: &[u8]); } /// Helper for interfacing channels with shim or gRPC boundaries #[derive(Clone)] pub struct RxAdapter { rx: Arc>>, running: bool, } impl RxAdapter { /// New, from an unwrapped receiver pub fn new(rx: Receiver) -> Self { Self::from_arc(Arc::new(Mutex::new(rx))) } /// New, from an already arc mutexed receiver pub fn from_arc(rx: Arc>>) -> Self { Self { rx, running: false } } /// Stream out the channel over the provided sink pub fn stream_grpc(&mut self, ctx: RpcContext<'_>, mut sink: ServerStreamingSink) { assert!(!self.running); self.running = true; let clone_rx = self.rx.clone(); ctx.spawn(async move { while let Some(payload) = clone_rx.lock().await.recv().await { let mut data = Data::default(); data.set_payload(payload.to_vec()); if let Err(e) = sink.send((data, WriteFlags::default())).await { log::error!("failure sending data: {:?}", e); } } }); } /// Stream out the channel over the provided shim runnable pub fn stream_runnable( &mut self, rt: &Arc, runnable: R, ) { assert!(!self.running); self.running = true; let clone_rx = self.rx.clone(); rt.spawn(async move { while let Some(payload) = clone_rx.lock().await.recv().await { runnable.run(&payload.to_bytes()); } }); } }