1 //! common facade & shim helpers
2 
3 use bt_facade_proto::common::Data;
4 use bt_packets::hci::Packet;
5 use futures::sink::SinkExt;
6 use grpcio::*;
7 use std::sync::Arc;
8 use tokio::runtime::Runtime;
9 use tokio::sync::mpsc::Receiver;
10 use tokio::sync::Mutex;
11 
12 /// Wrapper so we can invoke callbacks
13 pub trait U8SliceRunnable {
14     /// Do the thing
run(&self, data: &[u8])15     fn run(&self, data: &[u8]);
16 }
17 
18 /// Helper for interfacing channels with shim or gRPC boundaries
19 #[derive(Clone)]
20 pub struct RxAdapter<T> {
21     rx: Arc<Mutex<Receiver<T>>>,
22     running: bool,
23 }
24 
25 impl<T: 'static + Packet + Send> RxAdapter<T> {
26     /// New, from an unwrapped receiver
new(rx: Receiver<T>) -> Self27     pub fn new(rx: Receiver<T>) -> Self {
28         Self::from_arc(Arc::new(Mutex::new(rx)))
29     }
30 
31     /// New, from an already arc mutexed receiver
from_arc(rx: Arc<Mutex<Receiver<T>>>) -> Self32     pub fn from_arc(rx: Arc<Mutex<Receiver<T>>>) -> Self {
33         Self { rx, running: false }
34     }
35 
36     /// Stream out the channel over the provided sink
stream_grpc(&mut self, ctx: RpcContext<'_>, mut sink: ServerStreamingSink<Data>)37     pub fn stream_grpc(&mut self, ctx: RpcContext<'_>, mut sink: ServerStreamingSink<Data>) {
38         assert!(!self.running);
39         self.running = true;
40 
41         let clone_rx = self.rx.clone();
42         ctx.spawn(async move {
43             while let Some(payload) = clone_rx.lock().await.recv().await {
44                 let mut data = Data::default();
45                 data.set_payload(payload.to_vec());
46                 if let Err(e) = sink.send((data, WriteFlags::default())).await {
47                     log::error!("failure sending data: {:?}", e);
48                 }
49             }
50         });
51     }
52 
53     /// Stream out the channel over the provided shim runnable
stream_runnable<R: 'static + U8SliceRunnable + Send>( &mut self, rt: &Arc<Runtime>, runnable: R, )54     pub fn stream_runnable<R: 'static + U8SliceRunnable + Send>(
55         &mut self,
56         rt: &Arc<Runtime>,
57         runnable: R,
58     ) {
59         assert!(!self.running);
60         self.running = true;
61 
62         let clone_rx = self.rx.clone();
63         rt.spawn(async move {
64             while let Some(payload) = clone_rx.lock().await.recv().await {
65                 runnable.run(&payload.to_bytes());
66             }
67         });
68     }
69 }
70