#!/usr/bin/env python3 # # Copyright (C) 2019 The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); you may not # use this file except in compliance with the License. You may obtain a copy of # the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations under # the License. """Stream music through connected device from phone across different attenuations.""" import random import time from acts.signals import TestFailure from acts_contrib.test_utils.bt.BtInterferenceBaseTest import BtInterferenceBaseTest from acts_contrib.test_utils.bt.BtInterferenceBaseTest import get_iperf_results from acts_contrib.test_utils.power.PowerBTBaseTest import ramp_attenuation from multiprocessing import Process, Queue DEFAULT_THDN_THRESHOLD = 0.9 MAX_ATTENUATION = 95 TIME_OVERHEAD = 2 class BtInterferenceDynamicTest(BtInterferenceBaseTest): def __init__(self, configs): super().__init__(configs) self.iperf_duration = self.audio_params['duration'] + TIME_OVERHEAD self.wait_for_interference = self.dynamic_wifi_interference[ 'waittime_to_inject_interference'] self.channel_change_interval = self.dynamic_wifi_interference[ 'channel_change_interval_second'] self.interference_channels = self.dynamic_wifi_interference[ 'two_hoppable_channels'] + self.dynamic_wifi_interference[ 'one_hoppable_channel'] self.bt_signal_levels = list( self.dynamic_wifi_interference['bt_signal_level'].keys()) self.wifi_int_levels = list( self.dynamic_wifi_interference['interference_level'].keys()) self.bt_atten_levels = list( self.dynamic_wifi_interference['bt_signal_level'].values()) self.wifi_atten_levels = list( self.dynamic_wifi_interference['interference_level'].values()) for bt_level in self.bt_signal_levels: bt_atten_level = self.dynamic_wifi_interference['bt_signal_level'][ bt_level] for wifi_level in self.wifi_int_levels: interference_atten_level = self.dynamic_wifi_interference[ 'interference_level'][wifi_level] self.generate_test_case_randomchange( bt_atten_level, interference_atten_level, self.channel_change_interval) for channels in self.interference_channels: self.generate_test_case(bt_atten_level, interference_atten_level, channels) def generate_test_case(self, bt_atten_level, interference_atten_level, dynamic_channels): """Function to generate test cases with different parameters. Args: bt_atten_level: bt path attenuation level interference_atten_level: wifi interference path attenuation level channels: wifi interference channel or channel combination """ def test_case_fn(): self.bt_afh_with_dynamic_interference(bt_atten_level, interference_atten_level, dynamic_channels) bt_signal_level = self.bt_signal_levels[self.bt_atten_levels.index( bt_atten_level)] wifi_int_level = self.wifi_int_levels[self.wifi_atten_levels.index( interference_atten_level)] interference_chans_before = dynamic_channels[0] interference_chans_after = dynamic_channels[1] chans_before_str = 'channel_' chans_after_str = 'channel_' if 0 in interference_chans_before: chans_before_str = 'no_interference' else: for i in interference_chans_before: chans_before_str = chans_before_str + str(i) + '_' for i in interference_chans_after: chans_after_str = chans_after_str + str(i) + '_' test_case_name = ('test_bt_afh_from_{}to_{}bt_signal_level_{}_' 'interference_level_{}'.format( chans_before_str, chans_after_str, bt_signal_level, wifi_int_level)) setattr(self, test_case_name, test_case_fn) def generate_test_case_randomchange(self, bt_atten_level, interference_atten_level, interval): def test_case_fn(): self.bt_afh_with_fast_changing_interference( bt_atten_level, interference_atten_level, interval) bt_signal_level = self.bt_signal_levels[self.bt_atten_levels.index( bt_atten_level)] wifi_int_level = self.wifi_atten_levels[self.wifi_atten_levels.index( interference_atten_level)] test_case_name = ('test_bt_afh_with_random_channel_interference_bt' '_signal_level_{}_interference_level_{}'.format( bt_signal_level, wifi_int_level)) setattr(self, test_case_name, test_case_fn) def interference_rssi_mapping_from_attenuation(self, interference_level): """Function to get wifi rssi-to-interference level mapping Args: interference_level: interference level in terms of attenuation """ self.log.info('Get WiFi RSSI at the desired attenuation level') for obj in self.wifi_int_pairs: obj.attenuator.set_atten(interference_level) self.get_interference_rssi() def get_rssi_at_channel(self, channel): """Function to get wifi rssi-to-interference level at each channel Args: channel: the channel to query the rssi Returns: rssi: wifi rssi at the queried channel """ for item in self.interference_rssi: if item['chan'] == channel: rssi = item['rssi'] return rssi def inject_dynamic_wifi_interference(self, interference_level, interference_channels, time_wait): """Function to inject dynamic wifi interference to bt link. Args: interference_level: signal strength of interference, represented by attenuation level interference_channels: interference channel for before and after, e.g. [chans_before, chans_after] time_wait: time wait to inject new interference """ all_pair = range(len(self.wifi_int_pairs)) #List of channels before and after changing the interference interference_chans_before = interference_channels[0] interference_chans_after = interference_channels[1] #Set existing wifi interference attenuation level if 0 not in interference_chans_before: interference_pair_indices_before = self.locate_interference_pair_by_channel( interference_chans_before) inactive_interference_pair_indices_before = [ item for item in all_pair if item not in interference_pair_indices_before ] self.log.info( 'Set pre-existing interference before A2DP streaming') for i in interference_pair_indices_before: self.log.info( 'Set {} dB on attenuator {}, wifi rssi {} dBm at chan {}'. format( interference_level, i + 1, self.get_rssi_at_channel( self.wifi_int_pairs[i].channel), self.wifi_int_pairs[i].channel)) self.wifi_int_pairs[i].attenuator.set_atten(interference_level) for i in inactive_interference_pair_indices_before: self.log.info('Set attenuation {} dB on attenuator {}'.format( MAX_ATTENUATION, i + 1)) self.wifi_int_pairs[i].attenuator.set_atten(MAX_ATTENUATION) ##Debug_purpose for i in self.attenuators: self.log.info(i.get_atten()) #Set after change wifi interference attenuation level interference_pair_indices_after = self.locate_interference_pair_by_channel( interference_chans_after) inactive_interference_pair_indices_after = [ item for item in all_pair if item not in interference_pair_indices_after ] #Wait for time_wait second to inject new interference time.sleep(time_wait) self.log.info('Inject new interference during A2DP streaming') for i in interference_pair_indices_after: self.log.info( 'Set {} dB on attenuator {}, with wifi rssi {} dBm at chan {}'. format( interference_level, i + 1, self.get_rssi_at_channel(self.wifi_int_pairs[i].channel), self.wifi_int_pairs[i].channel)) self.wifi_int_pairs[i].attenuator.set_atten(interference_level) for i in inactive_interference_pair_indices_after: self.log.info('Set attenuation {} dB on attenuator {}'.format( MAX_ATTENUATION, i + 1)) self.wifi_int_pairs[i].attenuator.set_atten(MAX_ATTENUATION) ##Debug_purpose for i in self.attenuators: self.log.info('Attenuator {} set to {} dB'.format( self.attenuators.index(i) + 1, i.get_atten())) self.log.info('Dymanic inteference injected') def inject_fast_changing_wifi_interference(self, interference_level, interval): """Function to inject changing wifi interference one channel a time. Args: interference_level: signal strength of interference, represented by attenuation level interval: interval between channel changes """ all_pair = range(len(self.wifi_int_pairs)) #Set initial WiFi interference at channel 1 self.log.info('Start with interference at channel 1') self.wifi_int_pairs[0].attenuator.set_atten(interference_level) self.wifi_int_pairs[1].attenuator.set_atten(MAX_ATTENUATION) self.wifi_int_pairs[2].attenuator.set_atten(MAX_ATTENUATION) current_int_pair = [0] inactive_int_pairs = [ item for item in all_pair if item not in current_int_pair ] time.sleep(interval) #Inject randomlized channel wifi interference self.log.info( 'Inject random changing channel (1,6,11) wifi interference' 'every {} second'.format(interval)) while True: current_int_pair = [ random.randint(inactive_int_pairs[0], inactive_int_pairs[1]) ] inactive_int_pairs = [ item for item in all_pair if item not in current_int_pair ] self.wifi_int_pairs[current_int_pair[0]].attenuator.set_atten( interference_level) self.log.info( 'Current interference {} at channel {} with rssi {} dBm'. format( interference_level, self.wifi_int_pairs[current_int_pair[0]].channel, self.get_rssi_at_channel( self.wifi_int_pairs[current_int_pair[0]].channel))) for i in inactive_int_pairs: self.wifi_int_pairs[i].attenuator.set_atten(MAX_ATTENUATION) ##Debug_purpose for i in self.attenuators: self.log.info('Attenuator {} set to {} dB'.format( self.attenuators.index(i) + 1, i.get_atten())) time.sleep(interval) def bt_afh_with_dynamic_interference(self, bt_atten_level, interference_atten_level, dynamic_channels): """Run a2dp audio quality with dynamic interference added. Args: bt_atten_level: signal level of bt in terms of attenuation interference_atten_level: interference level in terms of attenuation dynamic_channels: interference channels before and after """ ramp_attenuation(self.attenuator, bt_atten_level) self.interference_rssi_mapping_from_attenuation( interference_atten_level) [rssi_master, pwl_master, rssi_slave] = self._get_bt_link_metrics() tag_bt = 'bt_signal_level_{}_rssi_{}_dBm'.format( bt_atten_level, rssi_master) procs_iperf = [] for obj in self.wifi_int_pairs: obj.iperf_server.start() iperf_args = '-i 1 -t {} -p {} -J -R'.format( self.iperf_duration, obj.iperf_server.port) tag = 'chan_{}'.format(obj.channel) proc_iperf = Process(target=obj.iperf_client.start, args=(obj.server_address, iperf_args, tag)) proc_iperf.start() procs_iperf.append(proc_iperf) self.log.info('Start IPERF on all three channels') queue = Queue() proc_bt_audio = Process(target=self.play_and_record_audio, args=(self.audio_params['duration'], queue)) proc_interference = Process( target=self.inject_dynamic_wifi_interference, args=(interference_atten_level, dynamic_channels, self.wait_for_interference)) proc_bt_audio.start() proc_interference.start() proc_bt_audio.join() proc_interference.join() for proc in procs_iperf: proc.join() for obj in self.wifi_int_pairs: iperf_throughput = get_iperf_results(obj.iperf_server) self.log.info( 'Throughput for channel {} interference is {} Mbps'.format( obj.channel, iperf_throughput)) obj.iperf_server.stop() audio_captured = queue.get() thdns = self.run_thdn_analysis(audio_captured, tag_bt) self.log.info('THDN results are {}'.format(thdns)) for thdn in thdns: if thdn >= self.audio_params['thdn_threshold']: raise TestFailure('AFH failed') def bt_afh_with_fast_changing_interference(self, bt_atten_level, interference_atten_level, interval): """Run a2dp audio quality with random channel fast changing interference Args: bt_signale_level: signal level of bt in terms of attenuation interference_level: interference level in terms of attenuation interval: interval between channel changes """ ramp_attenuation(self.attenuator, bt_atten_level) self.interference_rssi_mapping_from_attenuation( interference_atten_level) [rssi_master, pwl_master, rssi_slave] = self._get_bt_link_metrics() tag_bt = 'bt_signal_level_{}_rssi_{}_dBm'.format( bt_atten_level, rssi_master) procs_iperf = [] #Start IPERF on all three interference pairs for obj in self.wifi_int_pairs: obj.iperf_server.start() iperf_args = '-i 1 -t {} -p {} -J -R'.format( self.iperf_duration, obj.iperf_server.port) tag = 'chan_{}'.format(obj.channel) proc_iperf = Process(target=obj.iperf_client.start, args=(obj.server_address, iperf_args, tag)) proc_iperf.start() procs_iperf.append(proc_iperf) self.log.info('Start IPERF on all three channels') queue = Queue() proc_bt_audio = Process(target=self.play_and_record_audio, args=(self.audio_params['duration'], queue)) proc_interference = Process( target=self.inject_fast_changing_wifi_interference, args=(interference_atten_level, interval)) proc_bt_audio.start() proc_interference.start() proc_bt_audio.join() while proc_bt_audio.is_alive(): continue proc_interference.terminate() proc_interference.join() for proc in procs_iperf: proc.join() for obj in self.wifi_int_pairs: iperf_throughput = get_iperf_results(obj.iperf_server) self.log.info( 'Throughput for channel {} interference is {} Mbps'.format( obj.channel, iperf_throughput)) obj.iperf_server.stop() audio_captured = queue.get() thdns = self.run_thdn_analysis(audio_captured, tag_bt) self.log.info('THDN results are {}'.format(thdns)) for thdn in thdns: if thdn >= self.audio_params['thdn_threshold']: raise TestFailure('AFH failed')