1#!/usr/bin/env python3 2# 3# Copyright (C) 2019 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16"""Stream music through connected device from phone across different 17attenuations.""" 18 19import random 20import time 21from acts.signals import TestFailure 22from acts_contrib.test_utils.bt.BtInterferenceBaseTest import BtInterferenceBaseTest 23from acts_contrib.test_utils.bt.BtInterferenceBaseTest import get_iperf_results 24from acts_contrib.test_utils.power.PowerBTBaseTest import ramp_attenuation 25from multiprocessing import Process, Queue 26 27DEFAULT_THDN_THRESHOLD = 0.9 28MAX_ATTENUATION = 95 29TIME_OVERHEAD = 2 30 31 32class BtInterferenceDynamicTest(BtInterferenceBaseTest): 33 def __init__(self, configs): 34 super().__init__(configs) 35 self.iperf_duration = self.audio_params['duration'] + TIME_OVERHEAD 36 self.wait_for_interference = self.dynamic_wifi_interference[ 37 'waittime_to_inject_interference'] 38 self.channel_change_interval = self.dynamic_wifi_interference[ 39 'channel_change_interval_second'] 40 self.interference_channels = self.dynamic_wifi_interference[ 41 'two_hoppable_channels'] + self.dynamic_wifi_interference[ 42 'one_hoppable_channel'] 43 44 self.bt_signal_levels = list( 45 self.dynamic_wifi_interference['bt_signal_level'].keys()) 46 self.wifi_int_levels = list( 47 self.dynamic_wifi_interference['interference_level'].keys()) 48 self.bt_atten_levels = list( 49 self.dynamic_wifi_interference['bt_signal_level'].values()) 50 self.wifi_atten_levels = list( 51 self.dynamic_wifi_interference['interference_level'].values()) 52 for bt_level in self.bt_signal_levels: 53 bt_atten_level = self.dynamic_wifi_interference['bt_signal_level'][ 54 bt_level] 55 for wifi_level in self.wifi_int_levels: 56 interference_atten_level = self.dynamic_wifi_interference[ 57 'interference_level'][wifi_level] 58 self.generate_test_case_randomchange( 59 bt_atten_level, interference_atten_level, 60 self.channel_change_interval) 61 for channels in self.interference_channels: 62 self.generate_test_case(bt_atten_level, 63 interference_atten_level, channels) 64 65 def generate_test_case(self, bt_atten_level, interference_atten_level, 66 dynamic_channels): 67 """Function to generate test cases with different parameters. 68 Args: 69 bt_atten_level: bt path attenuation level 70 interference_atten_level: wifi interference path attenuation level 71 channels: wifi interference channel or channel combination 72 """ 73 def test_case_fn(): 74 self.bt_afh_with_dynamic_interference(bt_atten_level, 75 interference_atten_level, 76 dynamic_channels) 77 78 bt_signal_level = self.bt_signal_levels[self.bt_atten_levels.index( 79 bt_atten_level)] 80 wifi_int_level = self.wifi_int_levels[self.wifi_atten_levels.index( 81 interference_atten_level)] 82 interference_chans_before = dynamic_channels[0] 83 interference_chans_after = dynamic_channels[1] 84 chans_before_str = 'channel_' 85 chans_after_str = 'channel_' 86 if 0 in interference_chans_before: 87 chans_before_str = 'no_interference' 88 else: 89 for i in interference_chans_before: 90 chans_before_str = chans_before_str + str(i) + '_' 91 for i in interference_chans_after: 92 chans_after_str = chans_after_str + str(i) + '_' 93 test_case_name = ('test_bt_afh_from_{}to_{}bt_signal_level_{}_' 94 'interference_level_{}'.format( 95 chans_before_str, chans_after_str, 96 bt_signal_level, wifi_int_level)) 97 setattr(self, test_case_name, test_case_fn) 98 99 def generate_test_case_randomchange(self, bt_atten_level, 100 interference_atten_level, interval): 101 def test_case_fn(): 102 self.bt_afh_with_fast_changing_interference( 103 bt_atten_level, interference_atten_level, interval) 104 105 bt_signal_level = self.bt_signal_levels[self.bt_atten_levels.index( 106 bt_atten_level)] 107 wifi_int_level = self.wifi_atten_levels[self.wifi_atten_levels.index( 108 interference_atten_level)] 109 test_case_name = ('test_bt_afh_with_random_channel_interference_bt' 110 '_signal_level_{}_interference_level_{}'.format( 111 bt_signal_level, wifi_int_level)) 112 setattr(self, test_case_name, test_case_fn) 113 114 def interference_rssi_mapping_from_attenuation(self, interference_level): 115 """Function to get wifi rssi-to-interference level mapping 116 Args: 117 interference_level: interference level in terms of attenuation 118 """ 119 self.log.info('Get WiFi RSSI at the desired attenuation level') 120 for obj in self.wifi_int_pairs: 121 obj.attenuator.set_atten(interference_level) 122 self.get_interference_rssi() 123 124 def get_rssi_at_channel(self, channel): 125 """Function to get wifi rssi-to-interference level at each channel 126 Args: 127 channel: the channel to query the rssi 128 Returns: 129 rssi: wifi rssi at the queried channel 130 """ 131 for item in self.interference_rssi: 132 if item['chan'] == channel: 133 rssi = item['rssi'] 134 return rssi 135 136 def inject_dynamic_wifi_interference(self, interference_level, 137 interference_channels, time_wait): 138 """Function to inject dynamic wifi interference to bt link. 139 Args: 140 interference_level: signal strength of interference, represented 141 by attenuation level 142 interference_channels: interference channel for before and after, 143 e.g. [chans_before, chans_after] 144 time_wait: time wait to inject new interference 145 """ 146 all_pair = range(len(self.wifi_int_pairs)) 147 #List of channels before and after changing the interference 148 interference_chans_before = interference_channels[0] 149 interference_chans_after = interference_channels[1] 150 #Set existing wifi interference attenuation level 151 if 0 not in interference_chans_before: 152 interference_pair_indices_before = self.locate_interference_pair_by_channel( 153 interference_chans_before) 154 inactive_interference_pair_indices_before = [ 155 item for item in all_pair 156 if item not in interference_pair_indices_before 157 ] 158 self.log.info( 159 'Set pre-existing interference before A2DP streaming') 160 for i in interference_pair_indices_before: 161 self.log.info( 162 'Set {} dB on attenuator {}, wifi rssi {} dBm at chan {}'. 163 format( 164 interference_level, i + 1, 165 self.get_rssi_at_channel( 166 self.wifi_int_pairs[i].channel), 167 self.wifi_int_pairs[i].channel)) 168 self.wifi_int_pairs[i].attenuator.set_atten(interference_level) 169 for i in inactive_interference_pair_indices_before: 170 self.log.info('Set attenuation {} dB on attenuator {}'.format( 171 MAX_ATTENUATION, i + 1)) 172 self.wifi_int_pairs[i].attenuator.set_atten(MAX_ATTENUATION) 173 ##Debug_purpose 174 for i in self.attenuators: 175 self.log.info(i.get_atten()) 176 177 #Set after change wifi interference attenuation level 178 interference_pair_indices_after = self.locate_interference_pair_by_channel( 179 interference_chans_after) 180 inactive_interference_pair_indices_after = [ 181 item for item in all_pair 182 if item not in interference_pair_indices_after 183 ] 184 #Wait for time_wait second to inject new interference 185 time.sleep(time_wait) 186 self.log.info('Inject new interference during A2DP streaming') 187 for i in interference_pair_indices_after: 188 self.log.info( 189 'Set {} dB on attenuator {}, with wifi rssi {} dBm at chan {}'. 190 format( 191 interference_level, i + 1, 192 self.get_rssi_at_channel(self.wifi_int_pairs[i].channel), 193 self.wifi_int_pairs[i].channel)) 194 self.wifi_int_pairs[i].attenuator.set_atten(interference_level) 195 for i in inactive_interference_pair_indices_after: 196 self.log.info('Set attenuation {} dB on attenuator {}'.format( 197 MAX_ATTENUATION, i + 1)) 198 self.wifi_int_pairs[i].attenuator.set_atten(MAX_ATTENUATION) 199 ##Debug_purpose 200 for i in self.attenuators: 201 self.log.info('Attenuator {} set to {} dB'.format( 202 self.attenuators.index(i) + 1, i.get_atten())) 203 self.log.info('Dymanic inteference injected') 204 205 def inject_fast_changing_wifi_interference(self, interference_level, 206 interval): 207 """Function to inject changing wifi interference one channel a time. 208 Args: 209 interference_level: signal strength of interference, represented 210 by attenuation level 211 interval: interval between channel changes 212 """ 213 all_pair = range(len(self.wifi_int_pairs)) 214 #Set initial WiFi interference at channel 1 215 self.log.info('Start with interference at channel 1') 216 self.wifi_int_pairs[0].attenuator.set_atten(interference_level) 217 self.wifi_int_pairs[1].attenuator.set_atten(MAX_ATTENUATION) 218 self.wifi_int_pairs[2].attenuator.set_atten(MAX_ATTENUATION) 219 current_int_pair = [0] 220 inactive_int_pairs = [ 221 item for item in all_pair if item not in current_int_pair 222 ] 223 time.sleep(interval) 224 #Inject randomlized channel wifi interference 225 self.log.info( 226 'Inject random changing channel (1,6,11) wifi interference' 227 'every {} second'.format(interval)) 228 while True: 229 current_int_pair = [ 230 random.randint(inactive_int_pairs[0], inactive_int_pairs[1]) 231 ] 232 inactive_int_pairs = [ 233 item for item in all_pair if item not in current_int_pair 234 ] 235 self.wifi_int_pairs[current_int_pair[0]].attenuator.set_atten( 236 interference_level) 237 self.log.info( 238 'Current interference {} at channel {} with rssi {} dBm'. 239 format( 240 interference_level, 241 self.wifi_int_pairs[current_int_pair[0]].channel, 242 self.get_rssi_at_channel( 243 self.wifi_int_pairs[current_int_pair[0]].channel))) 244 for i in inactive_int_pairs: 245 self.wifi_int_pairs[i].attenuator.set_atten(MAX_ATTENUATION) 246 ##Debug_purpose 247 for i in self.attenuators: 248 self.log.info('Attenuator {} set to {} dB'.format( 249 self.attenuators.index(i) + 1, i.get_atten())) 250 time.sleep(interval) 251 252 def bt_afh_with_dynamic_interference(self, bt_atten_level, 253 interference_atten_level, 254 dynamic_channels): 255 """Run a2dp audio quality with dynamic interference added. 256 Args: 257 bt_atten_level: signal level of bt in terms of attenuation 258 interference_atten_level: interference level in terms of attenuation 259 dynamic_channels: interference channels before and after 260 """ 261 ramp_attenuation(self.attenuator, bt_atten_level) 262 self.interference_rssi_mapping_from_attenuation( 263 interference_atten_level) 264 [rssi_master, pwl_master, rssi_slave] = self._get_bt_link_metrics() 265 tag_bt = 'bt_signal_level_{}_rssi_{}_dBm'.format( 266 bt_atten_level, rssi_master) 267 procs_iperf = [] 268 for obj in self.wifi_int_pairs: 269 obj.iperf_server.start() 270 iperf_args = '-i 1 -t {} -p {} -J -R'.format( 271 self.iperf_duration, obj.iperf_server.port) 272 tag = 'chan_{}'.format(obj.channel) 273 proc_iperf = Process(target=obj.iperf_client.start, 274 args=(obj.server_address, iperf_args, tag)) 275 proc_iperf.start() 276 procs_iperf.append(proc_iperf) 277 self.log.info('Start IPERF on all three channels') 278 queue = Queue() 279 proc_bt_audio = Process(target=self.play_and_record_audio, 280 args=(self.audio_params['duration'], queue)) 281 proc_interference = Process( 282 target=self.inject_dynamic_wifi_interference, 283 args=(interference_atten_level, dynamic_channels, 284 self.wait_for_interference)) 285 proc_bt_audio.start() 286 proc_interference.start() 287 proc_bt_audio.join() 288 proc_interference.join() 289 for proc in procs_iperf: 290 proc.join() 291 for obj in self.wifi_int_pairs: 292 iperf_throughput = get_iperf_results(obj.iperf_server) 293 self.log.info( 294 'Throughput for channel {} interference is {} Mbps'.format( 295 obj.channel, iperf_throughput)) 296 obj.iperf_server.stop() 297 audio_captured = queue.get() 298 thdns = self.run_thdn_analysis(audio_captured, tag_bt) 299 self.log.info('THDN results are {}'.format(thdns)) 300 for thdn in thdns: 301 if thdn >= self.audio_params['thdn_threshold']: 302 raise TestFailure('AFH failed') 303 304 def bt_afh_with_fast_changing_interference(self, bt_atten_level, 305 interference_atten_level, 306 interval): 307 """Run a2dp audio quality with random channel fast changing interference 308 Args: 309 bt_signale_level: signal level of bt in terms of attenuation 310 interference_level: interference level in terms of attenuation 311 interval: interval between channel changes 312 """ 313 ramp_attenuation(self.attenuator, bt_atten_level) 314 self.interference_rssi_mapping_from_attenuation( 315 interference_atten_level) 316 [rssi_master, pwl_master, rssi_slave] = self._get_bt_link_metrics() 317 tag_bt = 'bt_signal_level_{}_rssi_{}_dBm'.format( 318 bt_atten_level, rssi_master) 319 procs_iperf = [] 320 #Start IPERF on all three interference pairs 321 for obj in self.wifi_int_pairs: 322 obj.iperf_server.start() 323 iperf_args = '-i 1 -t {} -p {} -J -R'.format( 324 self.iperf_duration, obj.iperf_server.port) 325 tag = 'chan_{}'.format(obj.channel) 326 proc_iperf = Process(target=obj.iperf_client.start, 327 args=(obj.server_address, iperf_args, tag)) 328 proc_iperf.start() 329 procs_iperf.append(proc_iperf) 330 self.log.info('Start IPERF on all three channels') 331 queue = Queue() 332 proc_bt_audio = Process(target=self.play_and_record_audio, 333 args=(self.audio_params['duration'], queue)) 334 proc_interference = Process( 335 target=self.inject_fast_changing_wifi_interference, 336 args=(interference_atten_level, interval)) 337 proc_bt_audio.start() 338 proc_interference.start() 339 proc_bt_audio.join() 340 while proc_bt_audio.is_alive(): 341 continue 342 proc_interference.terminate() 343 proc_interference.join() 344 for proc in procs_iperf: 345 proc.join() 346 for obj in self.wifi_int_pairs: 347 iperf_throughput = get_iperf_results(obj.iperf_server) 348 self.log.info( 349 'Throughput for channel {} interference is {} Mbps'.format( 350 obj.channel, iperf_throughput)) 351 obj.iperf_server.stop() 352 audio_captured = queue.get() 353 thdns = self.run_thdn_analysis(audio_captured, tag_bt) 354 self.log.info('THDN results are {}'.format(thdns)) 355 for thdn in thdns: 356 if thdn >= self.audio_params['thdn_threshold']: 357 raise TestFailure('AFH failed') 358