1#!/usr/bin/env python3
2#
3# Copyright (C) 2016 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16"""
17This is base class for tests that exercises different GATT procedures between two connected devices.
18Setup/Teardown methods take care of establishing connection, and doing GATT DB initialization/discovery.
19
20Original file:
21    tools/test/connectivity/acts_tests/acts_contrib/test_utils/bt/GattConnectedBaseTest.py
22"""
23
24import logging
25from queue import Empty
26
27from blueberry.tests.gd_sl4a.lib.bt_constants import bt_default_timeout
28from blueberry.tests.sl4a_sl4a.lib.sl4a_sl4a_base_test import Sl4aSl4aBaseTestClass
29from blueberry.utils.bt_gatt_constants import GattCallbackError
30from blueberry.utils.bt_gatt_constants import GattCallbackString
31from blueberry.utils.bt_gatt_constants import GattCharDesc
32from blueberry.utils.bt_gatt_constants import GattCharacteristic
33from blueberry.utils.bt_gatt_constants import GattDescriptor
34from blueberry.utils.bt_gatt_constants import GattEvent
35from blueberry.utils.bt_gatt_constants import GattMtuSize
36from blueberry.utils.bt_gatt_constants import GattServiceType
37from blueberry.utils.bt_gatt_utils import GattTestUtilsError
38from blueberry.utils.bt_gatt_utils import disconnect_gatt_connection
39from blueberry.utils.bt_gatt_utils import orchestrate_gatt_connection
40from blueberry.utils.bt_gatt_utils import setup_gatt_characteristics
41from blueberry.utils.bt_gatt_utils import setup_gatt_descriptors
42
43
44class GattConnectedBaseTest(Sl4aSl4aBaseTestClass):
45
46    TEST_SERVICE_UUID = "3846D7A0-69C8-11E4-BA00-0002A5D5C51B"
47    READABLE_CHAR_UUID = "21c0a0bf-ad51-4a2d-8124-b74003e4e8c8"
48    READABLE_DESC_UUID = "aa7edd5a-4d1d-4f0e-883a-d145616a1630"
49    WRITABLE_CHAR_UUID = "aa7edd5a-4d1d-4f0e-883a-d145616a1630"
50    WRITABLE_DESC_UUID = "76d5ed92-ca81-4edb-bb6b-9f019665fb32"
51    NOTIFIABLE_CHAR_UUID = "b2c83efa-34ca-11e6-ac61-9e71128cae77"
52
53    def setup_class(self):
54        super().setup_class()
55        self.central = self.dut
56        self.peripheral = self.cert
57
58    def setup_test(self):
59        super(GattConnectedBaseTest, self).setup_test()
60
61        self.gatt_server_callback, self.gatt_server = \
62            self._setup_multiple_services()
63        if not self.gatt_server_callback or not self.gatt_server:
64            raise AssertionError('Service setup failed')
65
66        self.bluetooth_gatt, self.gatt_callback, self.adv_callback = (orchestrate_gatt_connection(
67            self.central, self.peripheral))
68        self.peripheral.sl4a.bleStopBleAdvertising(self.adv_callback)
69
70        self.mtu = GattMtuSize.MIN
71
72        if self.central.sl4a.gattClientDiscoverServices(self.bluetooth_gatt):
73            event = self._client_wait(GattEvent.GATT_SERV_DISC)
74            self.discovered_services_index = event['data']['ServicesIndex']
75        services_count = self.central.sl4a.gattClientGetDiscoveredServicesCount(self.discovered_services_index)
76        self.test_service_index = None
77        for i in range(services_count):
78            disc_service_uuid = (self.central.sl4a.gattClientGetDiscoveredServiceUuid(
79                self.discovered_services_index, i).upper())
80            if disc_service_uuid == self.TEST_SERVICE_UUID:
81                self.test_service_index = i
82                break
83
84        if not self.test_service_index:
85            print("Service not found")
86            return False
87
88        connected_device_list = self.peripheral.sl4a.gattServerGetConnectedDevices(self.gatt_server)
89        if len(connected_device_list) == 0:
90            logging.info("No devices connected from peripheral.")
91            return False
92
93        return True
94
95    def teardown_test(self):
96        self.peripheral.sl4a.gattServerClearServices(self.gatt_server)
97        self.peripheral.sl4a.gattServerClose(self.gatt_server)
98
99        del self.gatt_server_callback
100        del self.gatt_server
101
102        self._orchestrate_gatt_disconnection(self.bluetooth_gatt, self.gatt_callback)
103
104        return super(GattConnectedBaseTest, self).teardown_test()
105
106    def _server_wait(self, gatt_event):
107        return self._timed_pop(gatt_event, self.peripheral, self.gatt_server_callback)
108
109    def _client_wait(self, gatt_event: GattEvent):
110        return self._timed_pop(gatt_event, self.central, self.gatt_callback)
111
112    def _timed_pop(self, gatt_event: GattEvent, sl4a, gatt_callback):
113        expected_event = gatt_event["evt"].format(gatt_callback)
114        try:
115            return sl4a.ed.pop_event(expected_event, bt_default_timeout)
116        except Empty as emp:
117            raise AssertionError(gatt_event["err"].format(expected_event))
118
119    def _setup_characteristics_and_descriptors(self, droid):
120        characteristic_input = [
121            {
122                'uuid': self.WRITABLE_CHAR_UUID,
123                'property': GattCharacteristic.PROPERTY_WRITE | GattCharacteristic.PROPERTY_WRITE_NO_RESPONSE,
124                'permission': GattCharacteristic.PERMISSION_WRITE
125            },
126            {
127                'uuid': self.READABLE_CHAR_UUID,
128                'property': GattCharacteristic.PROPERTY_READ,
129                'permission': GattCharacteristic.PROPERTY_READ
130            },
131            {
132                'uuid': self.NOTIFIABLE_CHAR_UUID,
133                'property': GattCharacteristic.PROPERTY_NOTIFY | GattCharacteristic.PROPERTY_INDICATE,
134                'permission': GattCharacteristic.PERMISSION_READ
135            },
136        ]
137        descriptor_input = [{
138            'uuid': self.WRITABLE_DESC_UUID,
139            'property': GattDescriptor.PERMISSION_READ[0] | GattDescriptor.PERMISSION_WRITE[0]
140        }, {
141            'uuid': self.READABLE_DESC_UUID,
142            'property': GattDescriptor.PERMISSION_READ[0] | GattDescriptor.PERMISSION_WRITE[0],
143        }, {
144            'uuid': GattCharDesc.GATT_CLIENT_CHARAC_CFG_UUID,
145            'property': GattDescriptor.PERMISSION_READ[0] | GattDescriptor.PERMISSION_WRITE[0],
146        }]
147        characteristic_list = setup_gatt_characteristics(droid, characteristic_input)
148        self.notifiable_char_index = characteristic_list[2]
149        descriptor_list = setup_gatt_descriptors(droid, descriptor_input)
150        return characteristic_list, descriptor_list
151
152    def _orchestrate_gatt_disconnection(self, bluetooth_gatt, gatt_callback):
153        logging.info("Disconnecting from peripheral device.")
154        try:
155            disconnect_gatt_connection(self.central, bluetooth_gatt, gatt_callback)
156        except GattTestUtilsError as err:
157            logging.error(err)
158            return False
159        self.central.sl4a.gattClientClose(bluetooth_gatt)
160        return True
161
162    def _find_service_added_event(self, gatt_server_callback, uuid):
163        expected_event = GattCallbackString.SERV_ADDED.format(gatt_server_callback)
164        try:
165            event = self.peripheral.sl4a.ed.pop_event(expected_event, bt_default_timeout)
166        except Empty:
167            logging.error(GattCallbackError.SERV_ADDED_ERR.format(expected_event))
168            return False
169        if event['data']['serviceUuid'].lower() != uuid.lower():
170            logging.error("Uuid mismatch. Found: {}, Expected {}.".format(event['data']['serviceUuid'], uuid))
171            return False
172        return True
173
174    def _setup_multiple_services(self):
175        gatt_server_callback = (self.peripheral.sl4a.gattServerCreateGattServerCallback())
176        gatt_server = self.peripheral.sl4a.gattServerOpenGattServer(gatt_server_callback)
177        characteristic_list, descriptor_list = (self._setup_characteristics_and_descriptors(self.peripheral.sl4a))
178        self.peripheral.sl4a.gattServerCharacteristicAddDescriptor(characteristic_list[0], descriptor_list[0])
179        self.peripheral.sl4a.gattServerCharacteristicAddDescriptor(characteristic_list[1], descriptor_list[1])
180        self.peripheral.sl4a.gattServerCharacteristicAddDescriptor(characteristic_list[2], descriptor_list[2])
181        gatt_service3 = self.peripheral.sl4a.gattServerCreateService(self.TEST_SERVICE_UUID,
182                                                                     GattServiceType.SERVICE_TYPE_PRIMARY)
183        for characteristic in characteristic_list:
184            self.peripheral.sl4a.gattServerAddCharacteristicToService(gatt_service3, characteristic)
185        self.peripheral.sl4a.gattServerAddService(gatt_server, gatt_service3)
186        result = self._find_service_added_event(gatt_server_callback, self.TEST_SERVICE_UUID)
187        if not result:
188            return False, False
189        return gatt_server_callback, gatt_server
190