1 //! HFP service facade
2 
3 use bt_topshim::btif::{BluetoothInterface, RawAddress, ToggleableProfile};
4 use bt_topshim::profiles::hfp::{Hfp, HfpCallbacks, HfpCallbacksDispatcher};
5 use bt_topshim_facade_protobuf::empty::Empty;
6 use bt_topshim_facade_protobuf::facade::{
7     ConnectAudioRequest, DisconnectAudioRequest, EventType, FetchEventsRequest,
8     FetchEventsResponse, SetVolumeRequest, StartSlcRequest, StopSlcRequest,
9 };
10 use bt_topshim_facade_protobuf::facade_grpc::{create_hfp_service, HfpService};
11 use futures::sink::SinkExt;
12 use grpcio::*;
13 
14 use std::str::from_utf8;
15 use std::sync::{Arc, Mutex};
16 use tokio::runtime::Runtime;
17 use tokio::sync::mpsc;
18 
get_hfp_dispatcher( _hfp: Arc<Mutex<Hfp>>, tx: Arc<Mutex<Option<mpsc::Sender<HfpCallbacks>>>>, ) -> HfpCallbacksDispatcher19 fn get_hfp_dispatcher(
20     _hfp: Arc<Mutex<Hfp>>,
21     tx: Arc<Mutex<Option<mpsc::Sender<HfpCallbacks>>>>,
22 ) -> HfpCallbacksDispatcher {
23     HfpCallbacksDispatcher {
24         dispatch: Box::new(move |cb: HfpCallbacks| {
25             println!("Hfp Callback found {:?}", cb);
26             if let HfpCallbacks::ConnectionState(state, address) = &cb {
27                 println!("Hfp Connection state changed to {:?} for address {:?}", state, address);
28             }
29             let guard_tx = tx.lock().unwrap();
30             if let Some(event_tx) = guard_tx.as_ref() {
31                 let txclone = event_tx.clone();
32                 if txclone.try_send(cb.clone()).is_err() {
33                     println!("Cannot send event {:?}", cb);
34                 }
35                 /*tokio::spawn(async move {
36                     let _ = txclone.send(cb).await;
37                 });*/
38             }
39         }),
40     }
41 }
42 
43 /// Main object for Hfp facade service
44 #[derive(Clone)]
45 pub struct HfpServiceImpl {
46     #[allow(dead_code)]
47     rt: Arc<Runtime>,
48     pub btif_hfp: Arc<Mutex<Hfp>>,
49     #[allow(dead_code)]
50     event_tx: Arc<Mutex<Option<mpsc::Sender<HfpCallbacks>>>>,
51 }
52 
53 impl HfpServiceImpl {
54     /// Create a new instance of the root facade service
create(rt: Arc<Runtime>, btif_intf: Arc<Mutex<BluetoothInterface>>) -> grpcio::Service55     pub fn create(rt: Arc<Runtime>, btif_intf: Arc<Mutex<BluetoothInterface>>) -> grpcio::Service {
56         let btif_hfp = Arc::new(Mutex::new(Hfp::new(&btif_intf.lock().unwrap())));
57         let event_tx = Arc::new(Mutex::new(None));
58         btif_hfp.lock().unwrap().initialize(get_hfp_dispatcher(btif_hfp.clone(), event_tx.clone()));
59         btif_hfp.lock().unwrap().enable();
60         create_hfp_service(Self { rt, btif_hfp, event_tx })
61     }
62 }
63 
64 impl HfpService for HfpServiceImpl {
start_slc(&mut self, ctx: RpcContext<'_>, req: StartSlcRequest, sink: UnarySink<Empty>)65     fn start_slc(&mut self, ctx: RpcContext<'_>, req: StartSlcRequest, sink: UnarySink<Empty>) {
66         let hfp = self.btif_hfp.clone();
67         ctx.spawn(async move {
68             let addr_bytes = &req.connection.unwrap().cookie;
69             let bt_addr = from_utf8(addr_bytes).unwrap();
70             if let Some(addr) = RawAddress::from_string(bt_addr) {
71                 hfp.lock().unwrap().connect(addr);
72                 sink.success(Empty::default()).await.unwrap();
73             } else {
74                 sink.fail(RpcStatus::with_message(
75                     RpcStatusCode::INVALID_ARGUMENT,
76                     format!("Invalid Request Address: {}", bt_addr),
77                 ))
78                 .await
79                 .unwrap();
80             }
81         })
82     }
83 
stop_slc(&mut self, ctx: RpcContext<'_>, req: StopSlcRequest, sink: UnarySink<Empty>)84     fn stop_slc(&mut self, ctx: RpcContext<'_>, req: StopSlcRequest, sink: UnarySink<Empty>) {
85         let hfp = self.btif_hfp.clone();
86         ctx.spawn(async move {
87             let addr_bytes = &req.connection.unwrap().cookie;
88             let bt_addr = from_utf8(addr_bytes).unwrap();
89             if let Some(addr) = RawAddress::from_string(bt_addr) {
90                 hfp.lock().unwrap().disconnect(addr);
91                 sink.success(Empty::default()).await.unwrap();
92             } else {
93                 sink.fail(RpcStatus::with_message(
94                     RpcStatusCode::INVALID_ARGUMENT,
95                     format!("Invalid Request Address: {}", bt_addr),
96                 ))
97                 .await
98                 .unwrap();
99             }
100         })
101     }
102 
connect_audio( &mut self, ctx: RpcContext<'_>, req: ConnectAudioRequest, sink: UnarySink<Empty>, )103     fn connect_audio(
104         &mut self,
105         ctx: RpcContext<'_>,
106         req: ConnectAudioRequest,
107         sink: UnarySink<Empty>,
108     ) {
109         let hfp = self.btif_hfp.clone();
110         ctx.spawn(async move {
111             let addr_bytes = &req.connection.unwrap().cookie;
112             let bt_addr = from_utf8(addr_bytes).unwrap();
113             if let Some(addr) = RawAddress::from_string(bt_addr) {
114                 hfp.lock().unwrap().connect_audio(
115                     addr,
116                     req.is_sco_offload_enabled,
117                     req.disabled_codecs,
118                 );
119                 hfp.lock().unwrap().set_active_device(addr);
120                 sink.success(Empty::default()).await.unwrap();
121             } else {
122                 sink.fail(RpcStatus::with_message(
123                     RpcStatusCode::INVALID_ARGUMENT,
124                     format!("Invalid Request Address: {}", bt_addr),
125                 ))
126                 .await
127                 .unwrap();
128             }
129         })
130     }
131 
disconnect_audio( &mut self, ctx: RpcContext<'_>, req: DisconnectAudioRequest, sink: UnarySink<Empty>, )132     fn disconnect_audio(
133         &mut self,
134         ctx: RpcContext<'_>,
135         req: DisconnectAudioRequest,
136         sink: UnarySink<Empty>,
137     ) {
138         let hfp = self.btif_hfp.clone();
139         ctx.spawn(async move {
140             let addr_bytes = &req.connection.unwrap().cookie;
141             let bt_addr = from_utf8(addr_bytes).unwrap();
142             if let Some(addr) = RawAddress::from_string(bt_addr) {
143                 hfp.lock().unwrap().disconnect_audio(addr);
144                 sink.success(Empty::default()).await.unwrap();
145             } else {
146                 sink.fail(RpcStatus::with_message(
147                     RpcStatusCode::INVALID_ARGUMENT,
148                     format!("Invalid Request Address: {}", bt_addr),
149                 ))
150                 .await
151                 .unwrap();
152             }
153         })
154     }
155 
set_volume(&mut self, ctx: RpcContext<'_>, req: SetVolumeRequest, sink: UnarySink<Empty>)156     fn set_volume(&mut self, ctx: RpcContext<'_>, req: SetVolumeRequest, sink: UnarySink<Empty>) {
157         let hfp = self.btif_hfp.clone();
158         ctx.spawn(async move {
159             let addr_bytes = &req.connection.unwrap().cookie;
160             let bt_addr = from_utf8(addr_bytes).unwrap();
161             if let Some(addr) = RawAddress::from_string(bt_addr) {
162                 // TODO(aritrasen): Consider using TryFrom and cap the maximum volume here
163                 // since `as` silently deals with data overflow, which might not be preferred.
164                 hfp.lock().unwrap().set_volume(req.volume as i8, addr);
165                 sink.success(Empty::default()).await.unwrap();
166             } else {
167                 sink.fail(RpcStatus::with_message(
168                     RpcStatusCode::INVALID_ARGUMENT,
169                     format!("Invalid Request Address: {}", bt_addr),
170                 ))
171                 .await
172                 .unwrap();
173             }
174         })
175     }
176 
fetch_events( &mut self, ctx: RpcContext<'_>, _req: FetchEventsRequest, mut sink: ServerStreamingSink<FetchEventsResponse>, )177     fn fetch_events(
178         &mut self,
179         ctx: RpcContext<'_>,
180         _req: FetchEventsRequest,
181         mut sink: ServerStreamingSink<FetchEventsResponse>,
182     ) {
183         let (tx, mut rx) = mpsc::channel(10);
184         {
185             let mut guard = self.event_tx.lock().unwrap();
186             if guard.is_some() {
187                 ctx.spawn(async move {
188                     sink.fail(RpcStatus::with_message(
189                         RpcStatusCode::UNAVAILABLE,
190                         String::from("Profile is currently already connected and streaming"),
191                     ))
192                     .await
193                     .unwrap();
194                 });
195                 return;
196             } else {
197                 *guard = Some(tx);
198             }
199         }
200 
201         ctx.spawn(async move {
202             while let Some(event) = rx.recv().await {
203                 if let HfpCallbacks::ConnectionState(state, address) = event {
204                     let mut rsp = FetchEventsResponse::new();
205                     rsp.event_type = EventType::HFP_CONNECTION_STATE.into();
206                     rsp.data = format!("{:?}, {}", state, address.to_string());
207                     sink.send((rsp, WriteFlags::default())).await.unwrap();
208                 }
209             }
210         })
211     }
212 }
213