1# Copyright 2023 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the 'License'); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an 'AS IS' BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""GATT grpc interface.""" 15 16import asyncio 17import logging 18from uuid import UUID 19 20from floss.pandora.floss import adapter_client 21from floss.pandora.floss import floss_enums 22from floss.pandora.floss import gatt_client 23from floss.pandora.floss import gatt_server 24from floss.pandora.floss import utils 25from floss.pandora.server import bluetooth as bluetooth_module 26import grpc 27from pandora_experimental import gatt_grpc_aio 28from pandora_experimental import gatt_pb2 29 30 31class GATTService(gatt_grpc_aio.GATTServicer): 32 """Service to trigger Bluetooth GATT procedures. 33 34 This class implements the Pandora bluetooth test interfaces, 35 where the metaclass definition is automatically generated by the protobuf. 36 The interface definition can be found in: 37 https://cs.android.com/android/platform/superproject/main/+/main:packages/modules/Bluetooth/pandora/interfaces/pandora_experimental/gatt.proto?q=gatt.proto 38 """ 39 40 # Write characteristic, requesting acknowledgement by the remote device. 41 WRITE_TYPE_DEFAULT = 2 42 # No authentication required. 43 AUTHENTICATION_NONE = 0 44 # Value used to enable indication for a client configuration descriptor. 45 ENABLE_INDICATION_VALUE = [0x02, 0x00] 46 # Value used to enable notification for a client configuration descriptor. 47 ENABLE_NOTIFICATION_VALUE = [0x01, 0x00] 48 # Key size (default = 16). 49 KEY_SIZE = 16 50 # Service type primary. 51 SERVICE_TYPE_PRIMARY = 0 52 # Instance id for service or characteristic or descriptor. 53 DEFAULT_INSTANCE_ID = 0 54 55 def __init__(self, bluetooth: bluetooth_module.Bluetooth): 56 self.bluetooth = bluetooth 57 self.characteristic_changed_map = {} 58 59 # Register the observer for characteristic notifications. 60 observer = self.SendCharacteristicNotificationObserver(asyncio.get_running_loop(), self) 61 name = utils.create_observer_name(observer) 62 self.bluetooth.gatt_client.register_callback_observer(name, observer) 63 64 class SendCharacteristicNotificationObserver(gatt_client.GattClientCallbacks): 65 66 def __init__(self, loop: asyncio.AbstractEventLoop, gatt_service): 67 self.loop = loop 68 self.gatt_service = gatt_service 69 70 @utils.glib_callback() 71 def on_notify(self, addr, handle, value): 72 logging.info('Characteristic Notification Received. addr: %s, handle: %s', addr, handle) 73 if addr not in self.gatt_service.characteristic_changed_map: 74 self.gatt_service.characteristic_changed_map[addr] = {} 75 char_map = self.gatt_service.characteristic_changed_map[addr] 76 77 if handle in char_map: 78 # Set the future to indicate that the characteristic has changed. 79 char_future = char_map[handle] 80 char_future.get_loop().call_soon_threadsafe(char_future.set_result, True) 81 else: 82 # Create and set the future in the map. 83 char_future = self.loop.create_future() 84 self.gatt_service.characteristic_changed_map[addr][handle] = char_future 85 char_future.get_loop().call_soon_threadsafe(char_future.set_result, True) 86 87 async def ExchangeMTU(self, request: gatt_pb2.ExchangeMTURequest, 88 context: grpc.ServicerContext) -> gatt_pb2.ExchangeMTUResponse: 89 90 class MTUChangeObserver(gatt_client.GattClientCallbacks): 91 """Observer to observe MTU change state.""" 92 93 def __init__(self, task): 94 self.task = task 95 96 @utils.glib_callback() 97 def on_configure_mtu(self, addr, mtu, status): 98 if addr != self.task['address']: 99 return 100 if floss_enums.GattStatus(status) != floss_enums.GattStatus.SUCCESS: 101 logging.error('Failed to configure MTU. Status: %s', status) 102 future = self.task['configure_mtu'] 103 future.get_loop().call_soon_threadsafe(future.set_result, status) 104 105 address = utils.connection_from(request.connection).address 106 try: 107 configure_mtu = asyncio.get_running_loop().create_future() 108 observer = MTUChangeObserver({'configure_mtu': configure_mtu, 'address': address}) 109 name = utils.create_observer_name(observer) 110 self.bluetooth.gatt_client.register_callback_observer(name, observer) 111 self.bluetooth.configure_mtu(address, request.mtu) 112 status = await configure_mtu 113 if status != floss_enums.GattStatus.SUCCESS: 114 await context.abort(grpc.StatusCode.INTERNAL, 'Failed to configure MTU.') 115 finally: 116 self.bluetooth.gatt_client.unregister_callback_observer(name, observer) 117 return gatt_pb2.ExchangeMTUResponse() 118 119 async def WriteAttFromHandle(self, request: gatt_pb2.WriteRequest, 120 context: grpc.ServicerContext) -> gatt_pb2.WriteResponse: 121 122 class WriteAttObserver(gatt_client.GattClientCallbacks): 123 """Observer to observe write attribute state.""" 124 125 def __init__(self, task): 126 self.task = task 127 128 @utils.glib_callback() 129 def on_characteristic_write(self, addr, status, handle): 130 if addr != self.task['address']: 131 return 132 if floss_enums.GattStatus(status) != floss_enums.GattStatus.SUCCESS: 133 logging.error('Failed to write characteristic from handle. Status: %s', status) 134 future = self.task['write_attribute'] 135 future.get_loop().call_soon_threadsafe(future.set_result, (status, handle)) 136 137 @utils.glib_callback() 138 def on_descriptor_write(self, addr, status, handle): 139 if addr != self.task['address']: 140 return 141 if floss_enums.GattStatus(status) != floss_enums.GattStatus.SUCCESS: 142 logging.error('Failed to write descriptor from handle. Status: %s', status) 143 future = self.task['write_attribute'] 144 future.get_loop().call_soon_threadsafe(future.set_result, (status, handle)) 145 146 class ReadCharacteristicFromHandleObserver(gatt_client.GattClientCallbacks): 147 """Observer to observe the read characteristics state.""" 148 149 def __init__(self, task): 150 self.task = task 151 152 @utils.glib_callback() 153 def on_characteristic_read(self, addr, status, handle, value): 154 if addr != self.task['address']: 155 return 156 if floss_enums.GattStatus(status) != floss_enums.GattStatus.SUCCESS: 157 logging.error('Failed to read characteristic from handle. Status: %s', status) 158 future = self.task['characteristics'] 159 future.get_loop().call_soon_threadsafe(future.set_result, status) 160 161 class ReadCharacteristicDescriptorFromHandleObserver(gatt_client.GattClientCallbacks): 162 """Observer to observe the read characteristic descriptor state.""" 163 164 def __init__(self, task): 165 self.task = task 166 167 @utils.glib_callback() 168 def on_descriptor_read(self, addr, status, handle, value): 169 if addr != self.task['address']: 170 return 171 if floss_enums.GattStatus(status) != floss_enums.GattStatus.SUCCESS: 172 logging.error('Failed to read descriptors. Status: %s', status) 173 future = self.task['descriptors'] 174 future.get_loop().call_soon_threadsafe(future.set_result, status) 175 176 address = utils.connection_from(request.connection).address 177 observers = [] 178 valid_handle = True 179 try: 180 write_attribute = asyncio.get_running_loop().create_future() 181 observer = WriteAttObserver({'write_attribute': write_attribute, 'address': address}) 182 name = utils.create_observer_name(observer) 183 self.bluetooth.gatt_client.register_callback_observer(name, observer) 184 observers.append((name, observer)) 185 186 characteristics = asyncio.get_running_loop().create_future() 187 observer = ReadCharacteristicFromHandleObserver({'characteristics': characteristics, 'address': address}) 188 name = utils.create_observer_name(observer) 189 self.bluetooth.gatt_client.register_callback_observer(name, observer) 190 observers.append((name, observer)) 191 192 self.bluetooth.read_characteristic(address, request.handle, self.AUTHENTICATION_NONE) 193 char_status = await characteristics 194 if char_status != floss_enums.GattStatus.SUCCESS: 195 descriptors = asyncio.get_running_loop().create_future() 196 observer = ReadCharacteristicDescriptorFromHandleObserver({ 197 'descriptors': descriptors, 198 'address': address 199 }) 200 name = utils.create_observer_name(observer) 201 self.bluetooth.gatt_client.register_callback_observer(name, observer) 202 observers.append((name, observer)) 203 self.bluetooth.gatt_client.read_descriptor(address, request.handle, self.AUTHENTICATION_NONE) 204 desc_status = await descriptors 205 if desc_status != floss_enums.GattStatus.SUCCESS: 206 valid_handle = False 207 else: 208 self.bluetooth.write_descriptor(address, request.handle, self.AUTHENTICATION_NONE, request.value) 209 else: 210 self.bluetooth.write_characteristic(address, request.handle, self.WRITE_TYPE_DEFAULT, 211 self.AUTHENTICATION_NONE, request.value) 212 if valid_handle: 213 status, handle = await write_attribute 214 215 finally: 216 for name, observer in observers: 217 self.bluetooth.gatt_client.unregister_callback_observer(name, observer) 218 if valid_handle: 219 return gatt_pb2.WriteResponse(handle=handle, status=status) 220 return gatt_pb2.WriteResponse(handle=request.handle, status=gatt_pb2.INVALID_HANDLE) 221 222 async def DiscoverServiceByUuid(self, request: gatt_pb2.DiscoverServiceByUuidRequest, 223 context: grpc.ServicerContext) -> gatt_pb2.DiscoverServicesResponse: 224 225 address = utils.connection_from(request.connection).address 226 self.bluetooth.btif_gattc_discover_service_by_uuid(address, request.uuid) 227 228 return gatt_pb2.DiscoverServicesResponse() 229 230 async def DiscoverServices(self, request: gatt_pb2.DiscoverServicesRequest, 231 context: grpc.ServicerContext) -> gatt_pb2.DiscoverServicesResponse: 232 233 class DiscoveryObserver(gatt_client.GattClientCallbacks): 234 """Observer to observe the discovery service state.""" 235 236 def __init__(self, task): 237 self.task = task 238 239 @utils.glib_callback() 240 def on_search_complete(self, addr, services, status): 241 if addr != self.task['address']: 242 return 243 if floss_enums.GattStatus(status) != floss_enums.GattStatus.SUCCESS: 244 logging.error('Failed to complete search. Status: %s', status) 245 future = self.task['search_services'] 246 future.get_loop().call_soon_threadsafe(future.set_result, (services, status)) 247 248 address = utils.connection_from(request.connection).address 249 try: 250 search_services = asyncio.get_running_loop().create_future() 251 observer = DiscoveryObserver({'search_services': search_services, 'address': address}) 252 name = utils.create_observer_name(observer) 253 self.bluetooth.gatt_client.register_callback_observer(name, observer) 254 self.bluetooth.discover_services(address) 255 256 services, status = await search_services 257 if status != floss_enums.GattStatus.SUCCESS: 258 await context.abort(grpc.StatusCode.INTERNAL, 'Failed to find services.') 259 response = gatt_pb2.DiscoverServicesResponse() 260 for serv in services: 261 response.services.append(self.create_gatt_service(serv)) 262 finally: 263 self.bluetooth.gatt_client.unregister_callback_observer(name, observer) 264 return response 265 266 async def DiscoverServicesSdp(self, request: gatt_pb2.DiscoverServicesSdpRequest, 267 context: grpc.ServicerContext) -> gatt_pb2.DiscoverServicesSdpResponse: 268 269 class DiscoverySDPObserver(adapter_client.BluetoothCallbacks): 270 """Observer to observe the SDP discovery service state.""" 271 272 def __init__(self, task): 273 self.task = task 274 275 @utils.glib_callback() 276 def on_device_properties_changed(self, remote_device, props): 277 if remote_device['address'] != self.task['address']: 278 return 279 if floss_enums.BtPropertyType.Uuids in props: 280 future = self.task['device_uuids_changed'] 281 future.get_loop().call_soon_threadsafe(future.set_result, ()) 282 283 address = utils.address_from(request.address) 284 try: 285 uuids = self.bluetooth.get_remote_uuids(address) 286 if self.bluetooth.get_bond_state(address) == floss_enums.BondState.BONDING and (uuids is None or 287 len(uuids)) == 0: 288 logging.error('Failed to get UUIDs.') 289 return gatt_pb2.DiscoverServicesSdpResponse() 290 if self.bluetooth.get_bond_state(address) != floss_enums.BondState.BONDING: 291 device_uuids_changed = asyncio.get_running_loop().create_future() 292 observer = DiscoverySDPObserver({'device_uuids_changed': device_uuids_changed, 'address': address}) 293 name = utils.create_observer_name(observer) 294 self.bluetooth.adapter_client.register_callback_observer(name, observer) 295 296 status = self.bluetooth.fetch_remote(address) 297 if not status: 298 await context.abort(grpc.StatusCode.INTERNAL, f'Failed to fetch remote device {address} uuids.') 299 await device_uuids_changed 300 uuids = self.bluetooth.get_remote_uuids(address) 301 response = gatt_pb2.DiscoverServicesSdpResponse() 302 if uuids: 303 for uuid in uuids: 304 response.service_uuids.append(str(UUID(bytes=bytes(uuid))).upper()) 305 finally: 306 self.bluetooth.adapter_client.unregister_callback_observer(name, observer) 307 308 return response 309 310 async def ClearCache(self, request: gatt_pb2.ClearCacheRequest, 311 context: grpc.ServicerContext) -> gatt_pb2.ClearCacheResponse: 312 313 class ClearCacheObserver(gatt_client.GattClientCallbacks): 314 """Observer to observe the clear cache state.""" 315 316 def __init__(self, task): 317 self.task = task 318 319 @utils.glib_callback() 320 def on_connection_updated(self, addr, interval, latency, timeout, status): 321 if addr != self.task['address']: 322 return 323 if floss_enums.GattStatus(status) != floss_enums.GattStatus.SUCCESS: 324 logging.error('Failed to update connection. Status: %s', status) 325 future = self.task['refresh'] 326 future.get_loop().call_soon_threadsafe(future.set_result, status) 327 328 address = utils.connection_from(request.connection).address 329 try: 330 refresh = asyncio.get_running_loop().create_future() 331 observer = ClearCacheObserver({'refresh': refresh, 'address': address}) 332 name = utils.create_observer_name(observer) 333 self.bluetooth.gatt_client.register_callback_observer(name, observer) 334 self.bluetooth.refresh_device(address) 335 status = await refresh 336 if status != floss_enums.GattStatus.SUCCESS: 337 await context.abort(grpc.StatusCode.INTERNAL, 'Failed to clear cache.') 338 finally: 339 self.bluetooth.gatt_client.unregister_callback_observer(name, observer) 340 return gatt_pb2.ClearCacheResponse() 341 342 async def ReadCharacteristicFromHandle(self, request: gatt_pb2.ReadCharacteristicRequest, 343 context: grpc.ServicerContext) -> gatt_pb2.ReadCharacteristicResponse: 344 345 class ReadCharacteristicFromHandleObserver(gatt_client.GattClientCallbacks): 346 """Observer to observe the read characteristic from handle state.""" 347 348 def __init__(self, task): 349 self.task = task 350 351 @utils.glib_callback() 352 def on_characteristic_read(self, addr, status, handle, value): 353 if addr != self.task['address'] or handle != self.task['handle']: 354 return 355 if floss_enums.GattStatus(status) != floss_enums.GattStatus.SUCCESS: 356 logging.error('Failed to read characteristic from handle. Status: %s', status) 357 future = self.task['characteristic_from_handle'] 358 future.get_loop().call_soon_threadsafe(future.set_result, (value, status)) 359 360 address = utils.connection_from(request.connection).address 361 try: 362 characteristic_from_handle = asyncio.get_running_loop().create_future() 363 observer = ReadCharacteristicFromHandleObserver({ 364 'characteristic_from_handle': characteristic_from_handle, 365 'address': address, 366 'handle': request.handle 367 }) 368 name = utils.create_observer_name(observer) 369 self.bluetooth.gatt_client.register_callback_observer(name, observer) 370 self.bluetooth.read_characteristic(address, request.handle, self.AUTHENTICATION_NONE) 371 value, status = await characteristic_from_handle 372 finally: 373 self.bluetooth.gatt_client.unregister_callback_observer(name, observer) 374 375 return gatt_pb2.ReadCharacteristicResponse(value=gatt_pb2.AttValue(handle=request.handle, value=bytes(value)), 376 status=status) 377 378 async def ReadCharacteristicsFromUuid( 379 self, request: gatt_pb2.ReadCharacteristicsFromUuidRequest, 380 context: grpc.ServicerContext) -> gatt_pb2.ReadCharacteristicsFromUuidResponse: 381 382 class DiscoveryObserver(gatt_client.GattClientCallbacks): 383 """Observer to observe the discovery service state.""" 384 385 def __init__(self, task): 386 self.task = task 387 388 @utils.glib_callback() 389 def on_search_complete(self, addr, services, status): 390 if addr != self.task['address']: 391 return 392 if floss_enums.GattStatus(status) != floss_enums.GattStatus.SUCCESS: 393 logging.error('Failed to complete search. Status: %s', status) 394 future = self.task['search_services'] 395 future.get_loop().call_soon_threadsafe(future.set_result, (services, status)) 396 397 class ReadCharacteristicsFromUuidObserver(gatt_client.GattClientCallbacks): 398 """Observer to observe the read characteristics from uuid state.""" 399 400 def __init__(self, task): 401 self.task = task 402 403 @utils.glib_callback() 404 def on_characteristic_read(self, addr, status, handle, value): 405 if addr != self.task['address']: 406 return 407 if floss_enums.GattStatus(status) != floss_enums.GattStatus.SUCCESS: 408 logging.error('Failed to read characteristic from handle. Status: %s', status) 409 future = self.task['characteristic_from_uuid'] 410 future.get_loop().call_soon_threadsafe(future.set_result, (status, handle, value)) 411 412 address = utils.connection_from(request.connection).address 413 observers = [] 414 characteristics = [] 415 try: 416 search_services = asyncio.get_running_loop().create_future() 417 observer = DiscoveryObserver({'search_services': search_services, 'address': address}) 418 name = utils.create_observer_name(observer) 419 self.bluetooth.gatt_client.register_callback_observer(name, observer) 420 observers.append((name, observer)) 421 422 self.bluetooth.discover_services(address) 423 services, status = await search_services 424 if status != floss_enums.GattStatus.SUCCESS: 425 await context.abort(grpc.StatusCode.INTERNAL, 'Found no services.') 426 characteristic_from_uuid = asyncio.get_running_loop().create_future() 427 observer = ReadCharacteristicsFromUuidObserver({ 428 'characteristic_from_uuid': characteristic_from_uuid, 429 'address': address, 430 'start_handle': request.start_handle, 431 'end_handle': request.end_handle 432 }) 433 name = utils.create_observer_name(observer) 434 self.bluetooth.gatt_client.register_callback_observer(name, observer) 435 observers.append((name, observer)) 436 437 for serv in services: 438 for characteristic in serv['characteristics']: 439 if (str(UUID(bytes=bytes(characteristic['uuid']))).upper() == request.uuid and 440 request.start_handle <= characteristic['instance_id'] <= request.end_handle): 441 self.bluetooth.read_using_characteristic_uuid(address, request.uuid, 442 characteristic['instance_id'], 443 characteristic['instance_id'], 444 self.AUTHENTICATION_NONE) 445 status, handle, value = await characteristic_from_uuid 446 characteristics.append((status, handle, value)) 447 if not characteristics: 448 self.bluetooth.read_using_characteristic_uuid(address, request.uuid, request.start_handle, 449 request.end_handle, self.AUTHENTICATION_NONE) 450 status, handle, value = await characteristic_from_uuid 451 result = gatt_pb2.ReadCharacteristicsFromUuidResponse(characteristics_read=[ 452 gatt_pb2.ReadCharacteristicResponse(value=gatt_pb2.AttValue(value=bytes(value), handle=handle), 453 status=status) 454 ]) 455 else: 456 result = gatt_pb2.ReadCharacteristicsFromUuidResponse(characteristics_read=[ 457 gatt_pb2.ReadCharacteristicResponse( 458 value=gatt_pb2.AttValue(value=bytes(value), handle=handle), 459 status=status, 460 ) for status, handle, value in characteristics 461 ]) 462 finally: 463 for name, observer in observers: 464 self.bluetooth.gatt_client.unregister_callback_observer(name, observer) 465 return result 466 467 async def ReadCharacteristicDescriptorFromHandle( 468 self, request: gatt_pb2.ReadCharacteristicDescriptorRequest, 469 context: grpc.ServicerContext) -> gatt_pb2.ReadCharacteristicDescriptorResponse: 470 471 class ReadCharacteristicDescriptorFromHandleObserver(gatt_client.GattClientCallbacks): 472 """Observer to observe the read descriptor state.""" 473 474 def __init__(self, task): 475 self.task = task 476 477 @utils.glib_callback() 478 def on_descriptor_read(self, addr, status, handle, value): 479 if addr != self.task['address'] or handle != self.task['handle']: 480 return 481 if floss_enums.GattStatus(status) != floss_enums.GattStatus.SUCCESS: 482 logging.error('Failed to read descriptor. Status: %s', status) 483 future = self.task['descriptor'] 484 future.get_loop().call_soon_threadsafe(future.set_result, (value, status)) 485 486 address = utils.connection_from(request.connection).address 487 try: 488 descriptor = asyncio.get_running_loop().create_future() 489 observer = ReadCharacteristicDescriptorFromHandleObserver({ 490 'descriptor': descriptor, 491 'address': address, 492 'handle': request.handle 493 }) 494 name = utils.create_observer_name(observer) 495 self.bluetooth.gatt_client.register_callback_observer(name, observer) 496 self.bluetooth.read_descriptor(address, request.handle, self.AUTHENTICATION_NONE) 497 value, status = await descriptor 498 finally: 499 self.bluetooth.gatt_client.unregister_callback_observer(name, observer) 500 501 return gatt_pb2.ReadCharacteristicDescriptorResponse(value=gatt_pb2.AttValue(handle=request.handle, 502 value=bytes(value)), 503 status=status) 504 505 async def RegisterService(self, request: gatt_pb2.RegisterServiceRequest, 506 context: grpc.ServicerContext) -> gatt_pb2.RegisterServiceResponse: 507 508 class RegisterServiceObserver(gatt_server.GattServerCallbacks): 509 """Observer to observe the service registration.""" 510 511 def __init__(self, task): 512 self.task = task 513 514 @utils.glib_callback() 515 def on_service_added(self, status, service): 516 if floss_enums.GattStatus(status) != floss_enums.GattStatus.SUCCESS: 517 logging.error('Failed to add service. Status: %s', status) 518 future = self.task['register_service'] 519 future.get_loop().call_soon_threadsafe(future.set_result, (status, service)) 520 521 def convert_req_to_dictionary(request): 522 service_dict = { 523 'service_type': self.SERVICE_TYPE_PRIMARY, 524 'uuid': request.uuid, 525 'instance_id': self.DEFAULT_INSTANCE_ID, 526 'included_services': [], 527 'characteristics': [], 528 } 529 530 # Iterate through the characteristics in the request. 531 for char in request.characteristics: 532 char_dict = { 533 'uuid': char.uuid, 534 'instance_id': self.DEFAULT_INSTANCE_ID, 535 'properties': char.properties, 536 'permissions': char.permissions, 537 'key_size': self.KEY_SIZE, 538 'write_type': self.WRITE_TYPE_DEFAULT, 539 'descriptors': [], 540 } 541 542 # Iterate through the descriptors in the characteristic. 543 for desc in char.descriptors: 544 desc_dict = { 545 'uuid': desc.uuid, 546 'instance_id': self.DEFAULT_INSTANCE_ID, 547 'permissions': desc.permissions, 548 } 549 char_dict['descriptors'].append(desc_dict) 550 551 service_dict['characteristics'].append(char_dict) 552 return service_dict 553 554 try: 555 register_service = asyncio.get_running_loop().create_future() 556 observer = RegisterServiceObserver({'register_service': register_service}) 557 name = utils.create_observer_name(observer) 558 self.bluetooth.gatt_server.register_callback_observer(name, observer) 559 serv_dic = convert_req_to_dictionary(request.service) 560 self.bluetooth.add_service(serv_dic) 561 status, service = await register_service 562 if status != floss_enums.GattStatus.SUCCESS: 563 await context.abort(grpc.StatusCode.INTERNAL, 'Failed to register service.') 564 finally: 565 self.bluetooth.gatt_server.unregister_callback_observer(name, observer) 566 567 return gatt_pb2.RegisterServiceResponse(service=self.create_gatt_service(service)) 568 569 async def SetCharacteristicNotificationFromHandle( 570 self, request: gatt_pb2.SetCharacteristicNotificationFromHandleRequest, 571 context: grpc.ServicerContext) -> gatt_pb2.SetCharacteristicNotificationFromHandleResponse: 572 573 class DiscoveryObserver(gatt_client.GattClientCallbacks): 574 """Observer to observe the discovery service state.""" 575 576 def __init__(self, task): 577 self.task = task 578 579 @utils.glib_callback() 580 def on_search_complete(self, addr, services, status): 581 if addr != self.task['address']: 582 return 583 if floss_enums.GattStatus(status) != floss_enums.GattStatus.SUCCESS: 584 logging.error('Failed to complete search. Status: %s', status) 585 future = self.task['search_services'] 586 future.get_loop().call_soon_threadsafe(future.set_result, (services, status)) 587 588 class DescriptorObserver(gatt_client.GattClientCallbacks): 589 """Observer to observe the read/write descriptor state.""" 590 591 def __init__(self, task): 592 self.task = task 593 594 @utils.glib_callback() 595 def on_descriptor_read(self, addr, status, handle, value): 596 if addr != self.task['address'] or handle != self.task['handle']: 597 return 598 if floss_enums.GattStatus(status) != floss_enums.GattStatus.SUCCESS: 599 logging.error('Failed to read descriptors. Status: %s', status) 600 future = self.task['read_descriptor'] 601 future.get_loop().call_soon_threadsafe(future.set_result, (value, status)) 602 603 @utils.glib_callback() 604 def on_descriptor_write(self, addr, status, handle): 605 if addr != self.task['address'] or handle != self.task['handle']: 606 return 607 if floss_enums.GattStatus(status) != floss_enums.GattStatus.SUCCESS: 608 logging.error('Failed to write descriptor. Status: %s', status) 609 future = self.task['write_descriptor'] 610 future.get_loop().call_soon_threadsafe(future.set_result, status) 611 612 address = utils.connection_from(request.connection).address 613 observers = [] 614 try: 615 descriptor_futures = { 616 'read_descriptor': asyncio.get_running_loop().create_future(), 617 'write_descriptor': asyncio.get_running_loop().create_future(), 618 'address': address, 619 'handle': request.handle 620 } 621 observer = DescriptorObserver(descriptor_futures) 622 name = utils.create_observer_name(observer) 623 self.bluetooth.gatt_client.register_callback_observer(name, observer) 624 observers.append((name, observer)) 625 self.bluetooth.read_descriptor(address, request.handle, self.AUTHENTICATION_NONE) 626 value, status = await descriptor_futures['read_descriptor'] 627 if status != floss_enums.GattStatus.SUCCESS: 628 await context.abort(grpc.StatusCode.INTERNAL, 'Found no descriptor with supported handle.') 629 630 search_services = asyncio.get_running_loop().create_future() 631 observer = DiscoveryObserver({'search_services': search_services, 'address': address}) 632 name = utils.create_observer_name(observer) 633 self.bluetooth.gatt_client.register_callback_observer(name, observer) 634 observers.append((name, observer)) 635 self.bluetooth.discover_services(address) 636 services, status = await search_services 637 if status != floss_enums.GattStatus.SUCCESS: 638 await context.abort(grpc.StatusCode.INTERNAL, 'Found no device services.') 639 640 characteristic_handle = None 641 for serv in services: 642 for characteristic in serv['characteristics']: 643 for desc in characteristic['descriptors']: 644 if desc['instance_id'] == request.handle: 645 characteristic_handle = characteristic['instance_id'] 646 break 647 648 self.bluetooth.register_for_notification(address, characteristic_handle, True) 649 650 if request.enable_value == gatt_pb2.ENABLE_INDICATION_VALUE: 651 # Write descriptor value that used to enable indication for a client configuration descriptor. 652 self.bluetooth.write_descriptor(address, request.handle, self.AUTHENTICATION_NONE, 653 self.ENABLE_INDICATION_VALUE) 654 else: 655 # Write descriptor value that used to enable notification for a client configuration descriptor. 656 self.bluetooth.write_descriptor(address, request.handle, self.AUTHENTICATION_NONE, 657 self.ENABLE_NOTIFICATION_VALUE) 658 status = await descriptor_futures['write_descriptor'] 659 finally: 660 for name, observer in observers: 661 self.bluetooth.gatt_client.unregister_callback_observer(name, observer) 662 663 return gatt_pb2.SetCharacteristicNotificationFromHandleResponse(handle=request.handle, status=status) 664 665 async def WaitCharacteristicNotification( 666 self, request: gatt_pb2.WaitCharacteristicNotificationRequest, 667 context: grpc.ServicerContext) -> gatt_pb2.WaitCharacteristicNotificationResponse: 668 669 class DiscoveryObserver(gatt_client.GattClientCallbacks): 670 """Observer to observe the discovery service state.""" 671 672 def __init__(self, task): 673 self.task = task 674 675 @utils.glib_callback() 676 def on_search_complete(self, addr, services, status): 677 if addr != self.task['address']: 678 return 679 if floss_enums.GattStatus(status) != floss_enums.GattStatus.SUCCESS: 680 logging.error('Failed to complete search. Status: %s', status) 681 future = self.task['search_services'] 682 future.get_loop().call_soon_threadsafe(future.set_result, (services, status)) 683 684 class ReadDescriptorObserver(gatt_client.GattClientCallbacks): 685 """Observer to observe the read descriptor state.""" 686 687 def __init__(self, task): 688 self.task = task 689 690 @utils.glib_callback() 691 def on_descriptor_read(self, addr, status, handle, value): 692 if addr != self.task['address'] or handle != self.task['handle']: 693 return 694 if floss_enums.GattStatus(status) != floss_enums.GattStatus.SUCCESS: 695 logging.error('Failed to read descriptors. Status: %s', status) 696 future = self.task['read_descriptor'] 697 future.get_loop().call_soon_threadsafe(future.set_result, (value, status)) 698 699 address = utils.connection_from(request.connection).address 700 observers = [] 701 try: 702 read_descriptor = asyncio.get_running_loop().create_future() 703 observer = ReadDescriptorObserver({ 704 'read_descriptor': read_descriptor, 705 'address': address, 706 'handle': request.handle 707 }) 708 name = utils.create_observer_name(observer) 709 self.bluetooth.gatt_client.register_callback_observer(name, observer) 710 observers.append((name, observer)) 711 self.bluetooth.read_descriptor(address, request.handle, self.AUTHENTICATION_NONE) 712 value, status = await read_descriptor 713 if status != floss_enums.GattStatus.SUCCESS: 714 await context.abort(grpc.StatusCode.INTERNAL, 'Found no descriptor with supported handle.') 715 716 search_services = asyncio.get_running_loop().create_future() 717 observer = DiscoveryObserver({'search_services': search_services, 'address': address}) 718 name = utils.create_observer_name(observer) 719 self.bluetooth.gatt_client.register_callback_observer(name, observer) 720 observers.append((name, observer)) 721 self.bluetooth.discover_services(address) 722 services, status = await search_services 723 if status != floss_enums.GattStatus.SUCCESS: 724 await context.abort(grpc.StatusCode.INTERNAL, 'Found no device services.') 725 726 characteristic_handle = None 727 for serv in services: 728 for characteristic in serv['characteristics']: 729 for desc in characteristic['descriptors']: 730 if desc['instance_id'] == request.handle: 731 characteristic_handle = characteristic['instance_id'] 732 break 733 # Wait for the characteristic notification. 734 if address not in self.characteristic_changed_map: 735 self.characteristic_changed_map[address] = {} 736 737 if characteristic_handle not in self.characteristic_changed_map[address]: 738 char_future = asyncio.get_running_loop().create_future() 739 self.characteristic_changed_map[address][characteristic_handle] = char_future 740 else: 741 char_future = self.characteristic_changed_map[address][characteristic_handle] 742 743 await char_future 744 finally: 745 for name, observer in observers: 746 self.bluetooth.gatt_client.unregister_callback_observer(name, observer) 747 748 return gatt_pb2.WaitCharacteristicNotificationResponse( 749 characteristic_notification_received=self.characteristic_changed_map[address] 750 [characteristic_handle].result()) 751 752 def create_gatt_characteristic_descriptor(self, descriptor): 753 return gatt_pb2.GattCharacteristicDescriptor(handle=descriptor['instance_id'], 754 permissions=descriptor['permissions'], 755 uuid=str(UUID(bytes=bytes(descriptor['uuid']))).upper()) 756 757 def create_gatt_characteristic(self, characteristic): 758 return gatt_pb2.GattCharacteristic( 759 properties=characteristic['properties'], 760 permissions=characteristic['permissions'], 761 uuid=str(UUID(bytes=bytes(characteristic['uuid']))).upper(), 762 handle=characteristic['instance_id'], 763 descriptors=[ 764 self.create_gatt_characteristic_descriptor(descriptor) for descriptor in characteristic['descriptors'] 765 ]) 766 767 def create_gatt_service(self, service): 768 return gatt_pb2.GattService( 769 handle=service['instance_id'], 770 type=service['service_type'], 771 uuid=str(UUID(bytes=bytes(service['uuid']))).upper(), 772 included_services=[ 773 self.create_gatt_service(included_service) for included_service in service['included_services'] 774 ], 775 characteristics=[ 776 self.create_gatt_characteristic(characteristic) for characteristic in service['characteristics'] 777 ]) 778