1#!/usr/bin/env python3
2#
3# Copyright (C) 2019 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16"""Run sine wave audio quality test from Android to headset over 5 codecs."""
17import time
18
19from acts import asserts
20from acts.signals import TestPass
21from acts.test_utils.bt.A2dpBaseTest import A2dpBaseTest
22from acts.test_utils.bt import bt_constants
23from acts.test_utils.bt.loggers.bluetooth_metric_logger import BluetoothMetricLogger
24
25DEFAULT_THDN_THRESHOLD = .1
26DEFAULT_ANOMALIES_THRESHOLD = 0
27
28
29class BtCodecSweepTest(A2dpBaseTest):
30
31    def setup_class(self):
32        super().setup_class()
33        self.bt_logger = BluetoothMetricLogger.for_test_case()
34        self.start_time = time.time()
35
36    def setup_test(self):
37        super().setup_test()
38        req_params = ['dut',
39                      'phone_music_file_dir',
40                      'host_music_file_dir',
41                      'music_file_name',
42                      'audio_params']
43        opt_params = ['RelayDevice', 'codecs']
44        self.unpack_userparams(req_params, opt_params)
45        for codec in self.user_params.get('codecs', []):
46            self.generate_test_case(codec)
47        self.log.info('Sleep to ensure connection...')
48        time.sleep(30)
49
50    def teardown_test(self):
51        # TODO(aidanhb): Modify abstract device classes to make this generic.
52        self.bt_device.earstudio_controller.clean_up()
53
54    def print_results_summary(self, thdn_results, anomaly_results):
55        channnel_results = zip(thdn_results, anomaly_results)
56        for ch_no, result in enumerate(channnel_results):
57            self.log.info('======CHANNEL %s RESULTS======' % ch_no)
58            self.log.info('\tTHD+N: %s%%' % (result[0] * 100))
59            self.log.info('\tANOMALIES: %s' % len(result[1]))
60            for anom in result[1]:
61                self.log.info('\t\tAnomaly from %s to %s of duration %s' % (
62                    anom[0], anom[1], anom[1] - anom[0]
63                ))
64
65    def base_codec_test(self, codec_type, sample_rate, bits_per_sample,
66                        channel_mode):
67        """Base test flow that all test cases in this class will follow.
68        Args:
69            codec_type (str): the desired codec type. For reference, see
70                test_utils.bt.bt_constants.codec_types
71            sample_rate (int|str): the desired sample rate. For reference, see
72                test_utils.bt.bt_constants.sample_rates
73            bits_per_sample (int|str): the desired bits per sample. For
74                reference, see test_utils.bt.bt_constants.bits_per_samples
75            channel_mode (str): the desired channel mode. For reference, see
76                test_utils.bt.bt_constants.channel_modes
77        Raises:
78            TestPass, TestFail, or TestError test signal.
79        """
80        self.stream_music_on_codec(codec_type=codec_type,
81                                   sample_rate=sample_rate,
82                                   bits_per_sample=bits_per_sample,
83                                   channel_mode=channel_mode)
84        proto = self.run_analysis_and_generate_proto(
85                codec_type=codec_type,
86                sample_rate=sample_rate,
87                bits_per_sample=bits_per_sample,
88                channel_mode=channel_mode)
89        self.raise_pass_fail(proto)
90
91    def generate_test_case(self, codec_config):
92        def test_case_fn(inst):
93            inst.stream_music_on_codec(**codec_config)
94            proto = inst.run_analysis_and_generate_proto(**codec_config)
95            inst.raise_pass_fail(proto)
96        test_case_name = 'test_{}'.format(
97            '_'.join([str(codec_config[key]) for key in [
98                'codec_type',
99                'sample_rate',
100                'bits_per_sample',
101                'channel_mode',
102                'codec_specific_1'
103            ] if key in codec_config])
104        )
105        if hasattr(self, test_case_name):
106            self.log.warning('Test case %s already defined. Skipping '
107                             'assignment...')
108        else:
109            bound_test_case = test_case_fn.__get__(self, BtCodecSweepTest)
110            setattr(self, test_case_name, bound_test_case)
111
112    def run_analysis_and_generate_proto(self, codec_type, sample_rate,
113                                        bits_per_sample, channel_mode):
114        """Analyze audio and generate a results protobuf.
115
116        Args:
117            codec_type: The codec type config to store in the proto.
118            sample_rate: The sample rate config to store in the proto.
119            bits_per_sample: The bits per sample config to store in the proto.
120            channel_mode: The channel mode config to store in the proto.
121        Returns:
122             dict: Dictionary with key 'proto' mapping to serialized protobuf,
123               'proto_ascii' mapping to human readable protobuf info, and 'test'
124               mapping to the test class name that generated the results.
125        """
126        # Analyze audio and log results.
127        thdn_results = self.run_thdn_analysis()
128        anomaly_results = self.run_anomaly_detection()
129        self.print_results_summary(thdn_results, anomaly_results)
130
131        # Populate protobuf
132        test_case_proto = self.bt_logger.proto_module.BluetoothAudioTestResult()
133        audio_data_proto = test_case_proto.data_points.add()
134
135        audio_data_proto.timestamp_since_beginning_of_test_millis = int(
136                (time.time() - self.start_time) * 1000)
137        audio_data_proto.audio_streaming_duration_millis = (
138                int(self.mic.get_last_record_duration_millis()))
139        audio_data_proto.attenuation_db = 0
140        audio_data_proto.total_harmonic_distortion_plus_noise_percent = float(
141            thdn_results[0])
142        audio_data_proto.audio_glitches_count = len(anomaly_results[0])
143
144        codec_proto = test_case_proto.a2dp_codec_config
145        codec_proto.codec_type = bt_constants.codec_types[codec_type]
146        codec_proto.sample_rate = int(sample_rate)
147        codec_proto.bits_per_sample = int(bits_per_sample)
148        codec_proto.channel_mode = bt_constants.channel_modes[channel_mode]
149
150        self.bt_logger.add_config_data_to_proto(test_case_proto,
151                                                self.android,
152                                                self.bt_device)
153
154        self.bt_logger.add_proto_to_results(test_case_proto,
155                                            self.__class__.__name__)
156
157        return self.bt_logger.get_proto_dict(self.__class__.__name__,
158                                             test_case_proto)
159
160    def raise_pass_fail(self, extras=None):
161        """Raise pass or fail test signal based on analysis results."""
162        try:
163            anomalies_threshold = self.user_params.get(
164                'anomalies_threshold', DEFAULT_ANOMALIES_THRESHOLD)
165            asserts.assert_true(len(self.metrics['anomalies'][0]) <=
166                                anomalies_threshold,
167                                'Number of glitches exceeds threshold.',
168                                extras=extras)
169            thdn_threshold = self.user_params.get('thdn_threshold',
170                                                  DEFAULT_THDN_THRESHOLD)
171            asserts.assert_true(self.metrics['thdn'][0] <= thdn_threshold,
172                                'THD+N exceeds threshold.',
173                                extras=extras)
174        except IndexError as e:
175            self.log.error('self.raise_pass_fail called before self.analyze. '
176                           'Anomaly and THD+N results not populated.')
177            raise e
178        raise TestPass('Test passed.', extras=extras)
179
180    def test_SBC_44100_16_STEREO(self):
181        self.base_codec_test(codec_type='SBC',
182                             sample_rate=44100,
183                             bits_per_sample=16,
184                             channel_mode='STEREO')
185
186    def test_AAC_44100_16_STEREO(self):
187        self.base_codec_test(codec_type='AAC',
188                             sample_rate=44100,
189                             bits_per_sample=16,
190                             channel_mode='STEREO')
191
192    def test_APTX_44100_16_STEREO(self):
193        self.base_codec_test(codec_type='APTX',
194                             sample_rate=44100,
195                             bits_per_sample=16,
196                             channel_mode='STEREO')
197
198    def test_APTX_HD_48000_24_STEREO(self):
199        self.base_codec_test(codec_type='APTX-HD',
200                             sample_rate=48000,
201                             bits_per_sample=24,
202                             channel_mode='STEREO')
203
204    def test_LDAC_44100_16_STEREO(self):
205        self.base_codec_test(codec_type='LDAC',
206                             sample_rate=44100,
207                             bits_per_sample=16,
208                             channel_mode='STEREO')
209