1#!/usr/bin/env python3
2#
3#   Copyright 2022 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import binascii
18import io
19import logging
20import os
21import queue
22import time
23
24from blueberry.tests.gd.cert.context import get_current_context
25from blueberry.tests.gd.cert.truth import assertThat
26from blueberry.tests.gd_sl4a.lib.bt_constants import ble_address_types
27from blueberry.tests.sl4a_sl4a.lib import sl4a_sl4a_base_test
28from blueberry.tests.sl4a_sl4a.lib.security import Security
29from mobly import test_runner
30
31
32class LeScanningTest(sl4a_sl4a_base_test.Sl4aSl4aBaseTestClass):
33
34    def __get_cert_public_address_and_irk_from_bt_config(self):
35        # Pull IRK from SL4A cert side to pass in from SL4A DUT side when scanning
36        bt_config_file_path = os.path.join(get_current_context().get_full_output_path(),
37                                           "DUT_%s_bt_config.conf" % self.cert.serial)
38        try:
39            self.cert.adb.pull(["/data/misc/bluedroid/bt_config.conf", bt_config_file_path])
40        except AdbError as error:
41            logging.error("Failed to pull SL4A cert BT config")
42            return False
43        logging.debug("Reading SL4A cert BT config")
44        with io.open(bt_config_file_path) as f:
45            for line in f.readlines():
46                stripped_line = line.strip()
47                if (stripped_line.startswith("Address")):
48                    address_fields = stripped_line.split(' ')
49                    # API currently requires public address to be capitalized
50                    address = address_fields[2].upper()
51                    logging.debug("Found cert address: %s" % address)
52                    continue
53                if (stripped_line.startswith("LE_LOCAL_KEY_IRK")):
54                    irk_fields = stripped_line.split(' ')
55                    irk = irk_fields[2]
56                    logging.debug("Found cert IRK: %s" % irk)
57                    continue
58
59        return address, irk
60
61    def setup_class(self):
62        super().setup_class()
63
64    def setup_test(self):
65        assertThat(super().setup_test()).isTrue()
66
67    def teardown_test(self):
68        super().teardown_test()
69
70    def test_scan_result_address(self):
71        cert_public_address, irk = self.__get_cert_public_address_and_irk_from_bt_config()
72        self.cert_advertiser_.advertise_public_extended_pdu()
73        advertising_name = self.cert_advertiser_.get_local_advertising_name()
74
75        # Scan with name and verify we get back a scan result with the RPA
76        scan_result_addr = self.dut_scanner_.scan_for_name(advertising_name)
77        assertThat(scan_result_addr).isNotNone()
78        assertThat(scan_result_addr).isNotEqualTo(cert_public_address)
79
80        # Bond
81        logging.info("Bonding with %s", scan_result_addr)
82        self.dut_security_.create_bond_numeric_comparison(scan_result_addr)
83        self.dut_scanner_.stop_scanning()
84
85        # Start advertising again and scan for identity address
86        scan_result_addr = self.dut_scanner_.scan_for_address(cert_public_address, ble_address_types["public"])
87        assertThat(scan_result_addr).isNotNone()
88        assertThat(scan_result_addr).isNotEqualTo(cert_public_address)
89
90        # Teardown advertiser and scanner
91        self.dut_scanner_.stop_scanning()
92        self.cert_advertiser_.stop_advertising()
93
94
95if __name__ == '__main__':
96    test_runner.main()