1#!/usr/bin/env python3
2#
3#   Copyright 2022 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import binascii
18import logging
19import queue
20
21from blueberry.facade import common_pb2 as common
22from blueberry.tests.gd.cert.closable import Closable
23from blueberry.tests.gd.cert.closable import safeClose
24from blueberry.tests.gd.cert.truth import assertThat
25from blueberry.tests.gd_sl4a.lib.ble_lib import generate_ble_advertise_objects
26from blueberry.tests.gd_sl4a.lib.bt_constants import adv_succ
27from blueberry.tests.gd_sl4a.lib.bt_constants import ble_advertise_settings_modes
28from blueberry.tests.sl4a_sl4a.lib import sl4a_sl4a_base_test
29
30
31class LeAdvertiser(Closable):
32
33    is_advertising = False
34    device = None
35    default_timeout = 10  # seconds
36    advertise_callback = None
37    advertise_data = None
38    advertise_settings = None
39
40    def __init__(self, device):
41        self.device = device
42
43    def __wait_for_event(self, expected_event_name):
44        try:
45            event_info = self.device.ed.pop_event(expected_event_name, self.default_timeout)
46            logging.info(event_info)
47        except queue.Empty as error:
48            logging.error("Failed to find event: %s", expected_event_name)
49            return False
50        return True
51
52    def advertise_public_extended_pdu(self, address_type=common.RANDOM_DEVICE_ADDRESS, name="SL4A Device"):
53        if self.is_advertising:
54            logging.info("Already advertising!")
55            return
56        logging.info("Configuring advertisement with address type %d", address_type)
57        self.is_advertising = True
58        self.device.sl4a.bleSetScanSettingsLegacy(False)
59        self.device.sl4a.bleSetAdvertiseSettingsIsConnectable(True)
60        self.device.sl4a.bleSetAdvertiseDataIncludeDeviceName(True)
61        self.device.sl4a.bleSetAdvertiseSettingsAdvertiseMode(ble_advertise_settings_modes['low_latency'])
62        self.device.sl4a.bleSetAdvertiseSettingsOwnAddressType(address_type)
63        self.advertise_callback, self.advertise_data, self.advertise_settings = generate_ble_advertise_objects(
64            self.device.sl4a)
65        self.device.sl4a.bleStartBleAdvertising(self.advertise_callback, self.advertise_data, self.advertise_settings)
66
67        # Wait for SL4A cert to start advertising
68        assertThat(self.__wait_for_event(adv_succ.format(self.advertise_callback))).isTrue()
69        logging.info("Advertising started")
70
71    def get_local_advertising_name(self):
72        return self.device.sl4a.bluetoothGetLocalName()
73
74    def stop_advertising(self):
75        if self.is_advertising:
76            logging.info("Stopping advertisement")
77            self.device.sl4a.bleStopBleAdvertising(self.advertise_callback)
78            self.is_advertising = False
79
80    def close(self):
81        self.stop_advertising()
82        self.device = None
83