1#!/usr/bin/env python3
2#
3# Copyright (C) 2019 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16
17import acts
18import acts.test_utils.bt.bt_test_utils as btutils
19from acts import asserts
20from acts.signals import TestPass
21from acts.test_utils.bt import bt_constants
22from acts.test_utils.bt import BtEnum
23from acts.test_utils.bt.A2dpBaseTest import A2dpBaseTest
24from acts.test_utils.bt.loggers import bluetooth_metric_logger as log
25from acts.test_utils.power.PowerBTBaseTest import ramp_attenuation
26
27
28class BtA2dpRangeTest(A2dpBaseTest):
29    def __init__(self, configs):
30        super().__init__(configs)
31        self.bt_logger = log.BluetoothMetricLogger.for_test_case()
32        req_params = ['attenuation_vector', 'codecs']
33        #'attenuation_vector' is a dict containing: start, stop and step of
34        #attenuation changes
35        #'codecs' is a list containing all codecs required in the tests
36        self.unpack_userparams(req_params)
37        for codec_config in self.codecs:
38            self.generate_test_case(codec_config)
39
40    def setup_class(self):
41        super().setup_class()
42        # Enable BQR on all android devices
43        btutils.enable_bqr(self.android_devices)
44
45    def generate_test_case(self, codec_config):
46        def test_case_fn():
47            self.run_a2dp_to_max_range(codec_config)
48
49        test_case_name = 'test_bt_a2dp_range_codec_{}'.format(
50            codec_config['codec_type'])
51        setattr(self, test_case_name, test_case_fn)
52
53    def _get_bt_link_metrics(self):
54        """Get bt link metrics such as rssi and tx pwls.
55
56        Returns:
57            rssi_master: master rssi
58            pwl_master: master tx pwl
59            rssi_slave: slave rssi
60        """
61
62        self.media.play()
63        # Get master rssi and power level
64        rssi_master = btutils.get_bt_metric(self.dut)['rssi']
65        pwl_master = btutils.get_bt_metric(self.dut)['pwlv']
66        # Get slave rssi if possible
67        if isinstance(self.bt_device_controller,
68                      acts.controllers.android_device.AndroidDevice):
69            rssi_slave = btutils.get_bt_rssi(self.bt_device_controller)
70        else:
71            rssi_slave = None
72        self.media.stop()
73        return [rssi_master, pwl_master, rssi_slave]
74
75    def generate_proto(self, data_points, codec_type, sample_rate,
76                       bits_per_sample, channel_mode):
77        """Generate a results protobuf.
78
79        Args:
80            data_points: list of dicts representing info to go into
81              AudioTestDataPoint protobuffer message.
82            codec_type: The codec type config to store in the proto.
83            sample_rate: The sample rate config to store in the proto.
84            bits_per_sample: The bits per sample config to store in the proto.
85            channel_mode: The channel mode config to store in the proto.
86        Returns:
87             dict: Dictionary with key 'proto' mapping to serialized protobuf,
88               'proto_ascii' mapping to human readable protobuf info, and 'test'
89               mapping to the test class name that generated the results.
90        """
91
92        # Populate protobuf
93        test_case_proto = self.bt_logger.proto_module.BluetoothAudioTestResult(
94        )
95
96        for data_point in data_points:
97            audio_data_proto = test_case_proto.data_points.add()
98            log.recursive_assign(audio_data_proto, data_point)
99
100        codec_proto = test_case_proto.a2dp_codec_config
101        codec_proto.codec_type = bt_constants.codec_types[codec_type]
102        codec_proto.sample_rate = int(sample_rate)
103        codec_proto.bits_per_sample = int(bits_per_sample)
104        codec_proto.channel_mode = bt_constants.channel_modes[channel_mode]
105
106        self.bt_logger.add_config_data_to_proto(test_case_proto, self.dut,
107                                                self.bt_device)
108
109        self.bt_logger.add_proto_to_results(test_case_proto,
110                                            self.__class__.__name__)
111
112        proto_dict = self.bt_logger.get_proto_dict(self.__class__.__name__,
113                                                   test_case_proto)
114        del proto_dict["proto_ascii"]
115        return proto_dict
116
117    def run_a2dp_to_max_range(self, codec_config):
118        attenuation_range = range(self.attenuation_vector['start'],
119                                  self.attenuation_vector['stop'] + 1,
120                                  self.attenuation_vector['step'])
121
122        data_points = []
123
124        # Set Codec if needed
125        current_codec = self.dut.droid.bluetoothA2dpGetCurrentCodecConfig()
126        current_codec_type = BtEnum.BluetoothA2dpCodecType(
127            current_codec['codecType']).name
128        if current_codec_type != codec_config['codec_type']:
129            codec_set = btutils.set_bluetooth_codec(self.dut, **codec_config)
130            asserts.assert_true(codec_set, 'Codec configuration failed.')
131        else:
132            self.log.info('Current codec is {}, no need to change'.format(
133                current_codec_type))
134
135        #loop RSSI with the same codec setting
136        for atten in attenuation_range:
137            ramp_attenuation(self.attenuator, atten)
138            self.log.info('Set attenuation to %d dB', atten)
139
140            tag = 'codec_{}_attenuation_{}dB_'.format(
141                codec_config['codec_type'], atten)
142            recorded_file = self.play_and_record_audio(
143                self.audio_params['duration'])
144            [rssi_master, pwl_master, rssi_slave] = self._get_bt_link_metrics()
145            thdns = self.run_thdn_analysis(recorded_file, tag)
146            # Collect Metrics for dashboard
147            data_point = {
148                'attenuation_db': int(self.attenuator.get_atten()),
149                'rssi_master': rssi_master[self.dut.serial],
150                'tx_power_level_master': pwl_master[self.dut.serial],
151                'rssi_slave': rssi_slave[self.bt_device_controller.serial],
152                'total_harmonic_distortion_plus_noise_percent': thdns[0] * 100
153            }
154            data_points.append(data_point)
155            self.log.info(data_point)
156            # Check thdn for glitches, stop if max range reached
157            for thdn in thdns:
158                if thdn >= self.audio_params['thdn_threshold']:
159                    self.log.info(
160                        'Max range at attenuation {} dB'.format(atten))
161                    self.log.info(
162                        'master rssi {} dBm, master tx power level {}, '
163                        'slave rssi {} dBm'
164                        .format(rssi_master, pwl_master, rssi_slave))
165                    proto_dict = self.generate_proto(data_points,
166                                                     **codec_config)
167                    raise TestPass('Max range reached and move to next codec',
168                                   extras=proto_dict)
169
170        proto_dict = self.generate_proto(data_points, **codec_config)
171        raise TestPass('Could not reach max range, need extra attenuation.',
172                       extras=proto_dict)
173