1#!/usr/bin/env python3
2#
3# Copyright (C) 2016 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16"""
17Ble libraries
18"""
19
20import time
21import queue
22import logging
23
24from blueberry.tests.gd_sl4a.lib.bt_constants import ble_advertise_settings_modes
25from blueberry.tests.gd_sl4a.lib.bt_constants import small_timeout
26from blueberry.tests.gd_sl4a.lib.bt_constants import adv_fail
27from blueberry.tests.gd_sl4a.lib.bt_constants import adv_succ
28from blueberry.tests.gd_sl4a.lib.bt_constants import advertising_set_on_own_address_read
29from blueberry.tests.gd_sl4a.lib.bt_constants import advertising_set_started
30from blueberry.tests.gd_sl4a.lib.bt_constants import bluetooth_on
31from blueberry.tests.gd_sl4a.lib.bt_constants import bluetooth_off
32from blueberry.tests.gd_sl4a.lib.bt_constants import bt_default_timeout
33
34
35def enable_bluetooth(sl4a, ed):
36    if sl4a.bluetoothCheckState():
37        return True
38
39    sl4a.bluetoothToggleState(True)
40    expected_bluetooth_on_event_name = bluetooth_on
41    try:
42        ed.pop_event(expected_bluetooth_on_event_name, bt_default_timeout)
43    except Exception:
44        logging.info("Failed to toggle Bluetooth on (no broadcast received)")
45        if sl4a.bluetoothCheckState():
46            logging.info(".. actual state is ON")
47            return True
48        logging.info(".. actual state is OFF")
49        return False
50
51    return True
52
53
54def disable_bluetooth(sl4a, ed):
55    if not sl4a.bluetoothCheckState():
56        return True
57    sl4a.bluetoothToggleState(False)
58    expected_bluetooth_off_event_name = bluetooth_off
59    try:
60        ed.pop_event(expected_bluetooth_off_event_name, bt_default_timeout)
61    except Exception:
62        logging.info("Failed to toggle Bluetooth off (no broadcast received)")
63        if sl4a.bluetoothCheckState():
64            logging.info(".. actual state is ON")
65            return False
66        logging.info(".. actual state is OFF")
67        return True
68    return True
69
70
71def generate_ble_scan_objects(sl4a):
72    """Generate generic LE scan objects.
73
74    Args:
75        sl4a: The SL4A object to generate LE scan objects from.
76
77    Returns:
78        filter_list: The generated scan filter list id.
79        scan_settings: The generated scan settings id.
80        scan_callback: The generated scan callback id.
81    """
82    filter_list = sl4a.bleGenFilterList()
83    scan_settings = sl4a.bleBuildScanSetting()
84    scan_callback = sl4a.bleGenScanCallback()
85    return filter_list, scan_settings, scan_callback
86
87
88def generate_ble_advertise_objects(sl4a):
89    """Generate generic LE advertise objects.
90
91    Args:
92        sl4a: The SL4A object to generate advertise LE objects from.
93
94    Returns:
95        advertise_callback: The generated advertise callback id.
96        advertise_data: The generated advertise data id.
97        advertise_settings: The generated advertise settings id.
98    """
99    advertise_callback = sl4a.bleGenBleAdvertiseCallback()
100    advertise_data = sl4a.bleBuildAdvertiseData()
101    advertise_settings = sl4a.bleBuildAdvertiseSettings()
102    return advertise_callback, advertise_data, advertise_settings
103
104
105class BleLib():
106
107    def __init__(self, dut):
108        self.advertisement_list = []
109        self.dut = dut
110        self.default_timeout = 5
111        self.set_advertisement_list = []
112        self.generic_uuid = "0000{}-0000-1000-8000-00805f9b34fb"
113
114    def _verify_ble_adv_started(self, advertise_callback):
115        """Helper for verifying if an advertisment started or not"""
116        regex = "({}|{})".format(adv_succ.format(advertise_callback), adv_fail.format(advertise_callback))
117        try:
118            event = self.dut.ed.pop_events(regex, 5, small_timeout)
119        except queue.Empty:
120            logging.error("Failed to get success or failed event.")
121            return
122        if event[0]["name"] == adv_succ.format(advertise_callback):
123            logging.info("Advertisement started successfully.")
124            return True
125        else:
126            logging.info("Advertisement failed to start.")
127            return False
128
129    def start_generic_connectable_advertisement(self, line):
130        """Start a connectable LE advertisement"""
131        scan_response = None
132        if line:
133            scan_response = bool(line)
134        self.dut.sl4a.bleSetAdvertiseSettingsAdvertiseMode(ble_advertise_settings_modes['low_latency'])
135        self.dut.sl4a.bleSetAdvertiseSettingsIsConnectable(True)
136        advertise_callback, advertise_data, advertise_settings = (generate_ble_advertise_objects(self.dut.sl4a))
137        if scan_response:
138            self.dut.sl4a.bleStartBleAdvertisingWithScanResponse(advertise_callback, advertise_data, advertise_settings,
139                                                                 advertise_data)
140        else:
141            self.dut.sl4a.bleStartBleAdvertising(advertise_callback, advertise_data, advertise_settings)
142        if self._verify_ble_adv_started(advertise_callback):
143            logging.info("Tracking Callback ID: {}".format(advertise_callback))
144            self.advertisement_list.append(advertise_callback)
145            logging.info(self.advertisement_list)
146
147    def start_connectable_advertisement_set(self, line):
148        """Start Connectable Advertisement Set"""
149        adv_callback = self.dut.sl4a.bleAdvSetGenCallback()
150        adv_data = {
151            "includeDeviceName": True,
152        }
153        self.dut.sl4a.bleAdvSetStartAdvertisingSet({
154            "connectable": True,
155            "legacyMode": False,
156            "primaryPhy": "PHY_LE_1M",
157            "secondaryPhy": "PHY_LE_1M",
158            "interval": 320
159        }, adv_data, None, None, None, 0, 0, adv_callback)
160        evt = self.dut.ed.pop_event(advertising_set_started.format(adv_callback), self.default_timeout)
161        set_id = evt['data']['setId']
162        logging.error("did not receive the set started event!")
163        evt = self.dut.ed.pop_event(advertising_set_on_own_address_read.format(set_id), self.default_timeout)
164        address = evt['data']['address']
165        logging.info("Advertiser address is: {}".format(str(address)))
166        self.set_advertisement_list.append(adv_callback)
167
168    def stop_all_advertisement_set(self, line):
169        """Stop all Advertisement Sets"""
170        for adv in self.set_advertisement_list:
171            try:
172                self.dut.sl4a.bleAdvSetStopAdvertisingSet(adv)
173            except Exception as err:
174                logging.error("Failed to stop advertisement: {}".format(err))
175
176    def adv_add_service_uuid_list(self, line):
177        """Add service UUID to the LE advertisement inputs:
178         [uuid1 uuid2 ... uuidN]"""
179        uuids = line.split()
180        uuid_list = []
181        for uuid in uuids:
182            if len(uuid) == 4:
183                uuid = self.generic_uuid.format(line)
184            uuid_list.append(uuid)
185        self.dut.sl4a.bleSetAdvertiseDataSetServiceUuids(uuid_list)
186
187    def adv_data_include_local_name(self, is_included):
188        """Include local name in the advertisement. inputs: [true|false]"""
189        self.dut.sl4a.bleSetAdvertiseDataIncludeDeviceName(bool(is_included))
190
191    def adv_data_include_tx_power_level(self, is_included):
192        """Include tx power level in the advertisement. inputs: [true|false]"""
193        self.dut.sl4a.bleSetAdvertiseDataIncludeTxPowerLevel(bool(is_included))
194
195    def adv_data_add_manufacturer_data(self, line):
196        """Include manufacturer id and data to the advertisment:
197        [id data1 data2 ... dataN]"""
198        info = line.split()
199        manu_id = int(info[0])
200        manu_data = []
201        for data in info[1:]:
202            manu_data.append(int(data))
203        self.dut.sl4a.bleAddAdvertiseDataManufacturerId(manu_id, manu_data)
204
205    def start_generic_nonconnectable_advertisement(self, line):
206        """Start a nonconnectable LE advertisement"""
207        self.dut.sl4a.bleSetAdvertiseSettingsAdvertiseMode(ble_advertise_settings_modes['low_latency'])
208        self.dut.sl4a.bleSetAdvertiseSettingsIsConnectable(False)
209        advertise_callback, advertise_data, advertise_settings = (generate_ble_advertise_objects(self.dut.sl4a))
210        self.dut.sl4a.bleStartBleAdvertising(advertise_callback, advertise_data, advertise_settings)
211        if self._verify_ble_adv_started(advertise_callback):
212            logging.info("Tracking Callback ID: {}".format(advertise_callback))
213            self.advertisement_list.append(advertise_callback)
214            logging.info(self.advertisement_list)
215
216    def stop_all_advertisements(self, line):
217        """Stop all LE advertisements"""
218        for callback_id in self.advertisement_list:
219            logging.info("Stopping Advertisement {}".format(callback_id))
220            self.dut.sl4a.bleStopBleAdvertising(callback_id)
221            time.sleep(1)
222        self.advertisement_list = []
223
224    def ble_stop_advertisement(self, callback_id):
225        """Stop an LE advertisement"""
226        if not callback_id:
227            logging.info("Need a callback ID")
228            return
229        callback_id = int(callback_id)
230        if callback_id not in self.advertisement_list:
231            logging.info("Callback not in list of advertisements.")
232            return
233        self.dut.sl4a.bleStopBleAdvertising(callback_id)
234        self.advertisement_list.remove(callback_id)
235
236    def start_max_advertisements(self, line):
237        scan_response = None
238        if line:
239            scan_response = bool(line)
240        while (True):
241            try:
242                self.dut.sl4a.bleSetAdvertiseSettingsAdvertiseMode(ble_advertise_settings_modes['low_latency'])
243                self.dut.sl4a.bleSetAdvertiseSettingsIsConnectable(True)
244                advertise_callback, advertise_data, advertise_settings = (generate_ble_advertise_objects(self.dut.sl4a))
245                if scan_response:
246                    self.dut.sl4a.bleStartBleAdvertisingWithScanResponse(advertise_callback, advertise_data,
247                                                                         advertise_settings, advertise_data)
248                else:
249                    self.dut.sl4a.bleStartBleAdvertising(advertise_callback, advertise_data, advertise_settings)
250                if self._verify_ble_adv_started(advertise_callback):
251                    logging.info("Tracking Callback ID: {}".format(advertise_callback))
252                    self.advertisement_list.append(advertise_callback)
253                    logging.info(self.advertisement_list)
254                else:
255                    logging.info("Advertisements active: {}".format(len(self.advertisement_list)))
256                    return False
257            except Exception as err:
258                logging.info("Advertisements active: {}".format(len(self.advertisement_list)))
259                return True
260