1 //! HFP service facade
2
3 use bt_topshim::btif::{BluetoothInterface, RawAddress, ToggleableProfile};
4 use bt_topshim::profiles::hfp::{Hfp, HfpCallbacks, HfpCallbacksDispatcher};
5 use bt_topshim_facade_protobuf::empty::Empty;
6 use bt_topshim_facade_protobuf::facade::{
7 ConnectAudioRequest, DisconnectAudioRequest, EventType, FetchEventsRequest,
8 FetchEventsResponse, SetVolumeRequest, StartSlcRequest, StopSlcRequest,
9 };
10 use bt_topshim_facade_protobuf::facade_grpc::{create_hfp_service, HfpService};
11 use futures::sink::SinkExt;
12 use grpcio::*;
13
14 use std::str::from_utf8;
15 use std::sync::{Arc, Mutex};
16 use tokio::runtime::Runtime;
17 use tokio::sync::mpsc;
18
get_hfp_dispatcher( _hfp: Arc<Mutex<Hfp>>, tx: Arc<Mutex<Option<mpsc::Sender<HfpCallbacks>>>>, ) -> HfpCallbacksDispatcher19 fn get_hfp_dispatcher(
20 _hfp: Arc<Mutex<Hfp>>,
21 tx: Arc<Mutex<Option<mpsc::Sender<HfpCallbacks>>>>,
22 ) -> HfpCallbacksDispatcher {
23 HfpCallbacksDispatcher {
24 dispatch: Box::new(move |cb: HfpCallbacks| {
25 println!("Hfp Callback found {:?}", cb);
26 if let HfpCallbacks::ConnectionState(state, address) = &cb {
27 println!("Hfp Connection state changed to {:?} for address {:?}", state, address);
28 }
29 let guard_tx = tx.lock().unwrap();
30 if let Some(event_tx) = guard_tx.as_ref() {
31 let txclone = event_tx.clone();
32 if txclone.try_send(cb.clone()).is_err() {
33 println!("Cannot send event {:?}", cb);
34 }
35 /*tokio::spawn(async move {
36 let _ = txclone.send(cb).await;
37 });*/
38 }
39 }),
40 }
41 }
42
43 /// Main object for Hfp facade service
44 #[derive(Clone)]
45 pub struct HfpServiceImpl {
46 #[allow(dead_code)]
47 rt: Arc<Runtime>,
48 pub btif_hfp: Arc<Mutex<Hfp>>,
49 #[allow(dead_code)]
50 event_tx: Arc<Mutex<Option<mpsc::Sender<HfpCallbacks>>>>,
51 }
52
53 impl HfpServiceImpl {
54 /// Create a new instance of the root facade service
create(rt: Arc<Runtime>, btif_intf: Arc<Mutex<BluetoothInterface>>) -> grpcio::Service55 pub fn create(rt: Arc<Runtime>, btif_intf: Arc<Mutex<BluetoothInterface>>) -> grpcio::Service {
56 let btif_hfp = Arc::new(Mutex::new(Hfp::new(&btif_intf.lock().unwrap())));
57 let event_tx = Arc::new(Mutex::new(None));
58 btif_hfp.lock().unwrap().initialize(get_hfp_dispatcher(btif_hfp.clone(), event_tx.clone()));
59 btif_hfp.lock().unwrap().enable();
60 create_hfp_service(Self { rt, btif_hfp, event_tx })
61 }
62 }
63
64 impl HfpService for HfpServiceImpl {
start_slc(&mut self, ctx: RpcContext<'_>, req: StartSlcRequest, sink: UnarySink<Empty>)65 fn start_slc(&mut self, ctx: RpcContext<'_>, req: StartSlcRequest, sink: UnarySink<Empty>) {
66 let hfp = self.btif_hfp.clone();
67 ctx.spawn(async move {
68 let addr_bytes = &req.connection.unwrap().cookie;
69 let bt_addr = from_utf8(addr_bytes).unwrap();
70 if let Some(addr) = RawAddress::from_string(bt_addr) {
71 hfp.lock().unwrap().connect(addr);
72 sink.success(Empty::default()).await.unwrap();
73 } else {
74 sink.fail(RpcStatus::with_message(
75 RpcStatusCode::INVALID_ARGUMENT,
76 format!("Invalid Request Address: {}", bt_addr),
77 ))
78 .await
79 .unwrap();
80 }
81 })
82 }
83
stop_slc(&mut self, ctx: RpcContext<'_>, req: StopSlcRequest, sink: UnarySink<Empty>)84 fn stop_slc(&mut self, ctx: RpcContext<'_>, req: StopSlcRequest, sink: UnarySink<Empty>) {
85 let hfp = self.btif_hfp.clone();
86 ctx.spawn(async move {
87 let addr_bytes = &req.connection.unwrap().cookie;
88 let bt_addr = from_utf8(addr_bytes).unwrap();
89 if let Some(addr) = RawAddress::from_string(bt_addr) {
90 hfp.lock().unwrap().disconnect(addr);
91 sink.success(Empty::default()).await.unwrap();
92 } else {
93 sink.fail(RpcStatus::with_message(
94 RpcStatusCode::INVALID_ARGUMENT,
95 format!("Invalid Request Address: {}", bt_addr),
96 ))
97 .await
98 .unwrap();
99 }
100 })
101 }
102
connect_audio( &mut self, ctx: RpcContext<'_>, req: ConnectAudioRequest, sink: UnarySink<Empty>, )103 fn connect_audio(
104 &mut self,
105 ctx: RpcContext<'_>,
106 req: ConnectAudioRequest,
107 sink: UnarySink<Empty>,
108 ) {
109 let hfp = self.btif_hfp.clone();
110 ctx.spawn(async move {
111 let addr_bytes = &req.connection.unwrap().cookie;
112 let bt_addr = from_utf8(addr_bytes).unwrap();
113 if let Some(addr) = RawAddress::from_string(bt_addr) {
114 hfp.lock().unwrap().connect_audio(
115 addr,
116 req.is_sco_offload_enabled,
117 req.disabled_codecs,
118 );
119 hfp.lock().unwrap().set_active_device(addr);
120 sink.success(Empty::default()).await.unwrap();
121 } else {
122 sink.fail(RpcStatus::with_message(
123 RpcStatusCode::INVALID_ARGUMENT,
124 format!("Invalid Request Address: {}", bt_addr),
125 ))
126 .await
127 .unwrap();
128 }
129 })
130 }
131
disconnect_audio( &mut self, ctx: RpcContext<'_>, req: DisconnectAudioRequest, sink: UnarySink<Empty>, )132 fn disconnect_audio(
133 &mut self,
134 ctx: RpcContext<'_>,
135 req: DisconnectAudioRequest,
136 sink: UnarySink<Empty>,
137 ) {
138 let hfp = self.btif_hfp.clone();
139 ctx.spawn(async move {
140 let addr_bytes = &req.connection.unwrap().cookie;
141 let bt_addr = from_utf8(addr_bytes).unwrap();
142 if let Some(addr) = RawAddress::from_string(bt_addr) {
143 hfp.lock().unwrap().disconnect_audio(addr);
144 sink.success(Empty::default()).await.unwrap();
145 } else {
146 sink.fail(RpcStatus::with_message(
147 RpcStatusCode::INVALID_ARGUMENT,
148 format!("Invalid Request Address: {}", bt_addr),
149 ))
150 .await
151 .unwrap();
152 }
153 })
154 }
155
set_volume(&mut self, ctx: RpcContext<'_>, req: SetVolumeRequest, sink: UnarySink<Empty>)156 fn set_volume(&mut self, ctx: RpcContext<'_>, req: SetVolumeRequest, sink: UnarySink<Empty>) {
157 let hfp = self.btif_hfp.clone();
158 ctx.spawn(async move {
159 let addr_bytes = &req.connection.unwrap().cookie;
160 let bt_addr = from_utf8(addr_bytes).unwrap();
161 if let Some(addr) = RawAddress::from_string(bt_addr) {
162 // TODO(aritrasen): Consider using TryFrom and cap the maximum volume here
163 // since `as` silently deals with data overflow, which might not be preferred.
164 hfp.lock().unwrap().set_volume(req.volume as i8, addr);
165 sink.success(Empty::default()).await.unwrap();
166 } else {
167 sink.fail(RpcStatus::with_message(
168 RpcStatusCode::INVALID_ARGUMENT,
169 format!("Invalid Request Address: {}", bt_addr),
170 ))
171 .await
172 .unwrap();
173 }
174 })
175 }
176
fetch_events( &mut self, ctx: RpcContext<'_>, _req: FetchEventsRequest, mut sink: ServerStreamingSink<FetchEventsResponse>, )177 fn fetch_events(
178 &mut self,
179 ctx: RpcContext<'_>,
180 _req: FetchEventsRequest,
181 mut sink: ServerStreamingSink<FetchEventsResponse>,
182 ) {
183 let (tx, mut rx) = mpsc::channel(10);
184 {
185 let mut guard = self.event_tx.lock().unwrap();
186 if guard.is_some() {
187 ctx.spawn(async move {
188 sink.fail(RpcStatus::with_message(
189 RpcStatusCode::UNAVAILABLE,
190 String::from("Profile is currently already connected and streaming"),
191 ))
192 .await
193 .unwrap();
194 });
195 return;
196 } else {
197 *guard = Some(tx);
198 }
199 }
200
201 ctx.spawn(async move {
202 while let Some(event) = rx.recv().await {
203 if let HfpCallbacks::ConnectionState(state, address) = event {
204 let mut rsp = FetchEventsResponse::new();
205 rsp.event_type = EventType::HFP_CONNECTION_STATE.into();
206 rsp.data = format!("{:?}, {}", state, address.to_string());
207 sink.send((rsp, WriteFlags::default())).await.unwrap();
208 }
209 }
210 })
211 }
212 }
213