1#!/usr/bin/env python3 2# 3# Copyright (C) 2016 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16""" 17This is base class for tests that exercises different GATT procedures between two connected devices. 18Setup/Teardown methods take care of establishing connection, and doing GATT DB initialization/discovery. 19 20Original file: 21 tools/test/connectivity/acts_tests/acts_contrib/test_utils/bt/GattConnectedBaseTest.py 22""" 23 24import logging 25from queue import Empty 26 27from blueberry.tests.gd_sl4a.lib.bt_constants import bt_default_timeout 28from blueberry.tests.sl4a_sl4a.lib.sl4a_sl4a_base_test import Sl4aSl4aBaseTestClass 29from blueberry.utils.bt_gatt_constants import GattCallbackError 30from blueberry.utils.bt_gatt_constants import GattCallbackString 31from blueberry.utils.bt_gatt_constants import GattCharDesc 32from blueberry.utils.bt_gatt_constants import GattCharacteristic 33from blueberry.utils.bt_gatt_constants import GattDescriptor 34from blueberry.utils.bt_gatt_constants import GattEvent 35from blueberry.utils.bt_gatt_constants import GattMtuSize 36from blueberry.utils.bt_gatt_constants import GattServiceType 37from blueberry.utils.bt_gatt_utils import GattTestUtilsError 38from blueberry.utils.bt_gatt_utils import disconnect_gatt_connection 39from blueberry.utils.bt_gatt_utils import orchestrate_gatt_connection 40from blueberry.utils.bt_gatt_utils import setup_gatt_characteristics 41from blueberry.utils.bt_gatt_utils import setup_gatt_descriptors 42 43 44class GattConnectedBaseTest(Sl4aSl4aBaseTestClass): 45 46 TEST_SERVICE_UUID = "3846D7A0-69C8-11E4-BA00-0002A5D5C51B" 47 READABLE_CHAR_UUID = "21c0a0bf-ad51-4a2d-8124-b74003e4e8c8" 48 READABLE_DESC_UUID = "aa7edd5a-4d1d-4f0e-883a-d145616a1630" 49 WRITABLE_CHAR_UUID = "aa7edd5a-4d1d-4f0e-883a-d145616a1630" 50 WRITABLE_DESC_UUID = "76d5ed92-ca81-4edb-bb6b-9f019665fb32" 51 NOTIFIABLE_CHAR_UUID = "b2c83efa-34ca-11e6-ac61-9e71128cae77" 52 53 def setup_class(self): 54 super().setup_class() 55 self.central = self.dut 56 self.peripheral = self.cert 57 58 def setup_test(self): 59 super(GattConnectedBaseTest, self).setup_test() 60 61 self.gatt_server_callback, self.gatt_server = \ 62 self._setup_multiple_services() 63 if not self.gatt_server_callback or not self.gatt_server: 64 raise AssertionError('Service setup failed') 65 66 self.bluetooth_gatt, self.gatt_callback, self.adv_callback = (orchestrate_gatt_connection( 67 self.central, self.peripheral)) 68 self.peripheral.sl4a.bleStopBleAdvertising(self.adv_callback) 69 70 self.mtu = GattMtuSize.MIN 71 72 if self.central.sl4a.gattClientDiscoverServices(self.bluetooth_gatt): 73 event = self._client_wait(GattEvent.GATT_SERV_DISC) 74 self.discovered_services_index = event['data']['ServicesIndex'] 75 services_count = self.central.sl4a.gattClientGetDiscoveredServicesCount(self.discovered_services_index) 76 self.test_service_index = None 77 for i in range(services_count): 78 disc_service_uuid = (self.central.sl4a.gattClientGetDiscoveredServiceUuid( 79 self.discovered_services_index, i).upper()) 80 if disc_service_uuid == self.TEST_SERVICE_UUID: 81 self.test_service_index = i 82 break 83 84 if not self.test_service_index: 85 print("Service not found") 86 return False 87 88 connected_device_list = self.peripheral.sl4a.gattServerGetConnectedDevices(self.gatt_server) 89 if len(connected_device_list) == 0: 90 logging.info("No devices connected from peripheral.") 91 return False 92 93 return True 94 95 def teardown_test(self): 96 self.peripheral.sl4a.gattServerClearServices(self.gatt_server) 97 self.peripheral.sl4a.gattServerClose(self.gatt_server) 98 99 del self.gatt_server_callback 100 del self.gatt_server 101 102 self._orchestrate_gatt_disconnection(self.bluetooth_gatt, self.gatt_callback) 103 104 return super(GattConnectedBaseTest, self).teardown_test() 105 106 def _server_wait(self, gatt_event): 107 return self._timed_pop(gatt_event, self.peripheral, self.gatt_server_callback) 108 109 def _client_wait(self, gatt_event: GattEvent): 110 return self._timed_pop(gatt_event, self.central, self.gatt_callback) 111 112 def _timed_pop(self, gatt_event: GattEvent, sl4a, gatt_callback): 113 expected_event = gatt_event["evt"].format(gatt_callback) 114 try: 115 return sl4a.ed.pop_event(expected_event, bt_default_timeout) 116 except Empty as emp: 117 raise AssertionError(gatt_event["err"].format(expected_event)) 118 119 def _setup_characteristics_and_descriptors(self, droid): 120 characteristic_input = [ 121 { 122 'uuid': self.WRITABLE_CHAR_UUID, 123 'property': GattCharacteristic.PROPERTY_WRITE | GattCharacteristic.PROPERTY_WRITE_NO_RESPONSE, 124 'permission': GattCharacteristic.PERMISSION_WRITE 125 }, 126 { 127 'uuid': self.READABLE_CHAR_UUID, 128 'property': GattCharacteristic.PROPERTY_READ, 129 'permission': GattCharacteristic.PROPERTY_READ 130 }, 131 { 132 'uuid': self.NOTIFIABLE_CHAR_UUID, 133 'property': GattCharacteristic.PROPERTY_NOTIFY | GattCharacteristic.PROPERTY_INDICATE, 134 'permission': GattCharacteristic.PERMISSION_READ 135 }, 136 ] 137 descriptor_input = [{ 138 'uuid': self.WRITABLE_DESC_UUID, 139 'property': GattDescriptor.PERMISSION_READ[0] | GattDescriptor.PERMISSION_WRITE[0] 140 }, { 141 'uuid': self.READABLE_DESC_UUID, 142 'property': GattDescriptor.PERMISSION_READ[0] | GattDescriptor.PERMISSION_WRITE[0], 143 }, { 144 'uuid': GattCharDesc.GATT_CLIENT_CHARAC_CFG_UUID, 145 'property': GattDescriptor.PERMISSION_READ[0] | GattDescriptor.PERMISSION_WRITE[0], 146 }] 147 characteristic_list = setup_gatt_characteristics(droid, characteristic_input) 148 self.notifiable_char_index = characteristic_list[2] 149 descriptor_list = setup_gatt_descriptors(droid, descriptor_input) 150 return characteristic_list, descriptor_list 151 152 def _orchestrate_gatt_disconnection(self, bluetooth_gatt, gatt_callback): 153 logging.info("Disconnecting from peripheral device.") 154 try: 155 disconnect_gatt_connection(self.central, bluetooth_gatt, gatt_callback) 156 except GattTestUtilsError as err: 157 logging.error(err) 158 return False 159 self.central.sl4a.gattClientClose(bluetooth_gatt) 160 return True 161 162 def _find_service_added_event(self, gatt_server_callback, uuid): 163 expected_event = GattCallbackString.SERV_ADDED.format(gatt_server_callback) 164 try: 165 event = self.peripheral.sl4a.ed.pop_event(expected_event, bt_default_timeout) 166 except Empty: 167 logging.error(GattCallbackError.SERV_ADDED_ERR.format(expected_event)) 168 return False 169 if event['data']['serviceUuid'].lower() != uuid.lower(): 170 logging.error("Uuid mismatch. Found: {}, Expected {}.".format(event['data']['serviceUuid'], uuid)) 171 return False 172 return True 173 174 def _setup_multiple_services(self): 175 gatt_server_callback = (self.peripheral.sl4a.gattServerCreateGattServerCallback()) 176 gatt_server = self.peripheral.sl4a.gattServerOpenGattServer(gatt_server_callback) 177 characteristic_list, descriptor_list = (self._setup_characteristics_and_descriptors(self.peripheral.sl4a)) 178 self.peripheral.sl4a.gattServerCharacteristicAddDescriptor(characteristic_list[0], descriptor_list[0]) 179 self.peripheral.sl4a.gattServerCharacteristicAddDescriptor(characteristic_list[1], descriptor_list[1]) 180 self.peripheral.sl4a.gattServerCharacteristicAddDescriptor(characteristic_list[2], descriptor_list[2]) 181 gatt_service3 = self.peripheral.sl4a.gattServerCreateService(self.TEST_SERVICE_UUID, 182 GattServiceType.SERVICE_TYPE_PRIMARY) 183 for characteristic in characteristic_list: 184 self.peripheral.sl4a.gattServerAddCharacteristicToService(gatt_service3, characteristic) 185 self.peripheral.sl4a.gattServerAddService(gatt_server, gatt_service3) 186 result = self._find_service_added_event(gatt_server_callback, self.TEST_SERVICE_UUID) 187 if not result: 188 return False, False 189 return gatt_server_callback, gatt_server 190