1#!/usr/bin/env python3
2#
3# Copyright (C) 2019 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16"""Stream music through connected device from phone across different
17attenuations."""
18
19import random
20import time
21from acts.signals import TestFailure
22from acts_contrib.test_utils.bt.BtInterferenceBaseTest import BtInterferenceBaseTest
23from acts_contrib.test_utils.bt.BtInterferenceBaseTest import get_iperf_results
24from acts_contrib.test_utils.power.PowerBTBaseTest import ramp_attenuation
25from multiprocessing import Process, Queue
26
27DEFAULT_THDN_THRESHOLD = 0.9
28MAX_ATTENUATION = 95
29TIME_OVERHEAD = 2
30
31
32class BtInterferenceDynamicTest(BtInterferenceBaseTest):
33    def __init__(self, configs):
34        super().__init__(configs)
35        self.iperf_duration = self.audio_params['duration'] + TIME_OVERHEAD
36        self.wait_for_interference = self.dynamic_wifi_interference[
37            'waittime_to_inject_interference']
38        self.channel_change_interval = self.dynamic_wifi_interference[
39            'channel_change_interval_second']
40        self.interference_channels = self.dynamic_wifi_interference[
41            'two_hoppable_channels'] + self.dynamic_wifi_interference[
42                'one_hoppable_channel']
43
44        self.bt_signal_levels = list(
45            self.dynamic_wifi_interference['bt_signal_level'].keys())
46        self.wifi_int_levels = list(
47            self.dynamic_wifi_interference['interference_level'].keys())
48        self.bt_atten_levels = list(
49            self.dynamic_wifi_interference['bt_signal_level'].values())
50        self.wifi_atten_levels = list(
51            self.dynamic_wifi_interference['interference_level'].values())
52        for bt_level in self.bt_signal_levels:
53            bt_atten_level = self.dynamic_wifi_interference['bt_signal_level'][
54                bt_level]
55            for wifi_level in self.wifi_int_levels:
56                interference_atten_level = self.dynamic_wifi_interference[
57                    'interference_level'][wifi_level]
58                self.generate_test_case_randomchange(
59                    bt_atten_level, interference_atten_level,
60                    self.channel_change_interval)
61                for channels in self.interference_channels:
62                    self.generate_test_case(bt_atten_level,
63                                            interference_atten_level, channels)
64
65    def generate_test_case(self, bt_atten_level, interference_atten_level,
66                           dynamic_channels):
67        """Function to generate test cases with different parameters.
68        Args:
69            bt_atten_level: bt path attenuation level
70            interference_atten_level: wifi interference path attenuation level
71            channels: wifi interference channel or channel combination
72        """
73        def test_case_fn():
74            self.bt_afh_with_dynamic_interference(bt_atten_level,
75                                                  interference_atten_level,
76                                                  dynamic_channels)
77
78        bt_signal_level = self.bt_signal_levels[self.bt_atten_levels.index(
79            bt_atten_level)]
80        wifi_int_level = self.wifi_int_levels[self.wifi_atten_levels.index(
81            interference_atten_level)]
82        interference_chans_before = dynamic_channels[0]
83        interference_chans_after = dynamic_channels[1]
84        chans_before_str = 'channel_'
85        chans_after_str = 'channel_'
86        if 0 in interference_chans_before:
87            chans_before_str = 'no_interference'
88        else:
89            for i in interference_chans_before:
90                chans_before_str = chans_before_str + str(i) + '_'
91        for i in interference_chans_after:
92            chans_after_str = chans_after_str + str(i) + '_'
93        test_case_name = ('test_bt_afh_from_{}to_{}bt_signal_level_{}_'
94                          'interference_level_{}'.format(
95                              chans_before_str, chans_after_str,
96                              bt_signal_level, wifi_int_level))
97        setattr(self, test_case_name, test_case_fn)
98
99    def generate_test_case_randomchange(self, bt_atten_level,
100                                        interference_atten_level, interval):
101        def test_case_fn():
102            self.bt_afh_with_fast_changing_interference(
103                bt_atten_level, interference_atten_level, interval)
104
105        bt_signal_level = self.bt_signal_levels[self.bt_atten_levels.index(
106            bt_atten_level)]
107        wifi_int_level = self.wifi_atten_levels[self.wifi_atten_levels.index(
108            interference_atten_level)]
109        test_case_name = ('test_bt_afh_with_random_channel_interference_bt'
110                          '_signal_level_{}_interference_level_{}'.format(
111                              bt_signal_level, wifi_int_level))
112        setattr(self, test_case_name, test_case_fn)
113
114    def interference_rssi_mapping_from_attenuation(self, interference_level):
115        """Function to get wifi rssi-to-interference level mapping
116        Args:
117            interference_level: interference level in terms of attenuation
118        """
119        self.log.info('Get WiFi RSSI at the desired attenuation level')
120        for obj in self.wifi_int_pairs:
121            obj.attenuator.set_atten(interference_level)
122        self.get_interference_rssi()
123
124    def get_rssi_at_channel(self, channel):
125        """Function to get wifi rssi-to-interference level at each channel
126        Args:
127            channel: the channel to query the rssi
128        Returns:
129            rssi: wifi rssi at the queried channel
130        """
131        for item in self.interference_rssi:
132            if item['chan'] == channel:
133                rssi = item['rssi']
134        return rssi
135
136    def inject_dynamic_wifi_interference(self, interference_level,
137                                         interference_channels, time_wait):
138        """Function to inject dynamic wifi interference to bt link.
139        Args:
140            interference_level: signal strength of interference, represented
141                by attenuation level
142            interference_channels: interference channel for before and after,
143                e.g. [chans_before, chans_after]
144            time_wait: time wait to inject new interference
145        """
146        all_pair = range(len(self.wifi_int_pairs))
147        #List of channels before and after changing the interference
148        interference_chans_before = interference_channels[0]
149        interference_chans_after = interference_channels[1]
150        #Set existing wifi interference attenuation level
151        if 0 not in interference_chans_before:
152            interference_pair_indices_before = self.locate_interference_pair_by_channel(
153                interference_chans_before)
154            inactive_interference_pair_indices_before = [
155                item for item in all_pair
156                if item not in interference_pair_indices_before
157            ]
158            self.log.info(
159                'Set pre-existing interference before A2DP streaming')
160            for i in interference_pair_indices_before:
161                self.log.info(
162                    'Set {} dB on attenuator {}, wifi rssi {} dBm at chan {}'.
163                    format(
164                        interference_level, i + 1,
165                        self.get_rssi_at_channel(
166                            self.wifi_int_pairs[i].channel),
167                        self.wifi_int_pairs[i].channel))
168                self.wifi_int_pairs[i].attenuator.set_atten(interference_level)
169            for i in inactive_interference_pair_indices_before:
170                self.log.info('Set attenuation {} dB on attenuator {}'.format(
171                    MAX_ATTENUATION, i + 1))
172                self.wifi_int_pairs[i].attenuator.set_atten(MAX_ATTENUATION)
173        ##Debug_purpose
174        for i in self.attenuators:
175            self.log.info(i.get_atten())
176
177        #Set after change wifi interference attenuation level
178        interference_pair_indices_after = self.locate_interference_pair_by_channel(
179            interference_chans_after)
180        inactive_interference_pair_indices_after = [
181            item for item in all_pair
182            if item not in interference_pair_indices_after
183        ]
184        #Wait for time_wait second to inject new interference
185        time.sleep(time_wait)
186        self.log.info('Inject new interference during A2DP streaming')
187        for i in interference_pair_indices_after:
188            self.log.info(
189                'Set {} dB on attenuator {}, with wifi rssi {} dBm at chan {}'.
190                format(
191                    interference_level, i + 1,
192                    self.get_rssi_at_channel(self.wifi_int_pairs[i].channel),
193                    self.wifi_int_pairs[i].channel))
194            self.wifi_int_pairs[i].attenuator.set_atten(interference_level)
195        for i in inactive_interference_pair_indices_after:
196            self.log.info('Set attenuation {} dB on attenuator {}'.format(
197                MAX_ATTENUATION, i + 1))
198            self.wifi_int_pairs[i].attenuator.set_atten(MAX_ATTENUATION)
199        ##Debug_purpose
200        for i in self.attenuators:
201            self.log.info('Attenuator {} set to {} dB'.format(
202                self.attenuators.index(i) + 1, i.get_atten()))
203        self.log.info('Dymanic inteference injected')
204
205    def inject_fast_changing_wifi_interference(self, interference_level,
206                                               interval):
207        """Function to inject changing wifi interference one channel a time.
208        Args:
209            interference_level: signal strength of interference, represented
210                by attenuation level
211            interval: interval between channel changes
212        """
213        all_pair = range(len(self.wifi_int_pairs))
214        #Set initial WiFi interference at channel 1
215        self.log.info('Start with interference at channel 1')
216        self.wifi_int_pairs[0].attenuator.set_atten(interference_level)
217        self.wifi_int_pairs[1].attenuator.set_atten(MAX_ATTENUATION)
218        self.wifi_int_pairs[2].attenuator.set_atten(MAX_ATTENUATION)
219        current_int_pair = [0]
220        inactive_int_pairs = [
221            item for item in all_pair if item not in current_int_pair
222        ]
223        time.sleep(interval)
224        #Inject randomlized channel wifi interference
225        self.log.info(
226            'Inject random changing channel (1,6,11) wifi interference'
227            'every {} second'.format(interval))
228        while True:
229            current_int_pair = [
230                random.randint(inactive_int_pairs[0], inactive_int_pairs[1])
231            ]
232            inactive_int_pairs = [
233                item for item in all_pair if item not in current_int_pair
234            ]
235            self.wifi_int_pairs[current_int_pair[0]].attenuator.set_atten(
236                interference_level)
237            self.log.info(
238                'Current interference {} at channel {} with rssi {} dBm'.
239                format(
240                    interference_level,
241                    self.wifi_int_pairs[current_int_pair[0]].channel,
242                    self.get_rssi_at_channel(
243                        self.wifi_int_pairs[current_int_pair[0]].channel)))
244            for i in inactive_int_pairs:
245                self.wifi_int_pairs[i].attenuator.set_atten(MAX_ATTENUATION)
246            ##Debug_purpose
247            for i in self.attenuators:
248                self.log.info('Attenuator {} set to {} dB'.format(
249                    self.attenuators.index(i) + 1, i.get_atten()))
250            time.sleep(interval)
251
252    def bt_afh_with_dynamic_interference(self, bt_atten_level,
253                                         interference_atten_level,
254                                         dynamic_channels):
255        """Run a2dp audio quality with dynamic interference added.
256        Args:
257            bt_atten_level: signal level of bt in terms of attenuation
258            interference_atten_level: interference level in terms of attenuation
259            dynamic_channels: interference channels before and after
260        """
261        ramp_attenuation(self.attenuator, bt_atten_level)
262        self.interference_rssi_mapping_from_attenuation(
263            interference_atten_level)
264        [rssi_master, pwl_master, rssi_slave] = self._get_bt_link_metrics()
265        tag_bt = 'bt_signal_level_{}_rssi_{}_dBm'.format(
266            bt_atten_level, rssi_master)
267        procs_iperf = []
268        for obj in self.wifi_int_pairs:
269            obj.iperf_server.start()
270            iperf_args = '-i 1 -t {} -p {} -J -R'.format(
271                self.iperf_duration, obj.iperf_server.port)
272            tag = 'chan_{}'.format(obj.channel)
273            proc_iperf = Process(target=obj.iperf_client.start,
274                                 args=(obj.server_address, iperf_args, tag))
275            proc_iperf.start()
276            procs_iperf.append(proc_iperf)
277        self.log.info('Start IPERF on all three channels')
278        queue = Queue()
279        proc_bt_audio = Process(target=self.play_and_record_audio,
280                                args=(self.audio_params['duration'], queue))
281        proc_interference = Process(
282            target=self.inject_dynamic_wifi_interference,
283            args=(interference_atten_level, dynamic_channels,
284                  self.wait_for_interference))
285        proc_bt_audio.start()
286        proc_interference.start()
287        proc_bt_audio.join()
288        proc_interference.join()
289        for proc in procs_iperf:
290            proc.join()
291        for obj in self.wifi_int_pairs:
292            iperf_throughput = get_iperf_results(obj.iperf_server)
293            self.log.info(
294                'Throughput for channel {} interference is {} Mbps'.format(
295                    obj.channel, iperf_throughput))
296            obj.iperf_server.stop()
297        audio_captured = queue.get()
298        thdns = self.run_thdn_analysis(audio_captured, tag_bt)
299        self.log.info('THDN results are {}'.format(thdns))
300        for thdn in thdns:
301            if thdn >= self.audio_params['thdn_threshold']:
302                raise TestFailure('AFH failed')
303
304    def bt_afh_with_fast_changing_interference(self, bt_atten_level,
305                                               interference_atten_level,
306                                               interval):
307        """Run a2dp audio quality with random channel fast changing interference
308        Args:
309            bt_signale_level: signal level of bt in terms of attenuation
310            interference_level: interference level in terms of attenuation
311            interval: interval between channel changes
312        """
313        ramp_attenuation(self.attenuator, bt_atten_level)
314        self.interference_rssi_mapping_from_attenuation(
315            interference_atten_level)
316        [rssi_master, pwl_master, rssi_slave] = self._get_bt_link_metrics()
317        tag_bt = 'bt_signal_level_{}_rssi_{}_dBm'.format(
318            bt_atten_level, rssi_master)
319        procs_iperf = []
320        #Start IPERF on all three interference pairs
321        for obj in self.wifi_int_pairs:
322            obj.iperf_server.start()
323            iperf_args = '-i 1 -t {} -p {} -J -R'.format(
324                self.iperf_duration, obj.iperf_server.port)
325            tag = 'chan_{}'.format(obj.channel)
326            proc_iperf = Process(target=obj.iperf_client.start,
327                                 args=(obj.server_address, iperf_args, tag))
328            proc_iperf.start()
329            procs_iperf.append(proc_iperf)
330        self.log.info('Start IPERF on all three channels')
331        queue = Queue()
332        proc_bt_audio = Process(target=self.play_and_record_audio,
333                                args=(self.audio_params['duration'], queue))
334        proc_interference = Process(
335            target=self.inject_fast_changing_wifi_interference,
336            args=(interference_atten_level, interval))
337        proc_bt_audio.start()
338        proc_interference.start()
339        proc_bt_audio.join()
340        while proc_bt_audio.is_alive():
341            continue
342        proc_interference.terminate()
343        proc_interference.join()
344        for proc in procs_iperf:
345            proc.join()
346        for obj in self.wifi_int_pairs:
347            iperf_throughput = get_iperf_results(obj.iperf_server)
348            self.log.info(
349                'Throughput for channel {} interference is {} Mbps'.format(
350                    obj.channel, iperf_throughput))
351            obj.iperf_server.stop()
352        audio_captured = queue.get()
353        thdns = self.run_thdn_analysis(audio_captured, tag_bt)
354        self.log.info('THDN results are {}'.format(thdns))
355        for thdn in thdns:
356            if thdn >= self.audio_params['thdn_threshold']:
357                raise TestFailure('AFH failed')
358