1#!/usr/bin/env python3.4 2# 3# Copyright 2017 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the 'License'); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an 'AS IS' BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import collections 18import csv 19import itertools 20import json 21import logging 22import numpy 23import os 24from acts import asserts 25from acts import context 26from acts import base_test 27from acts import utils 28from acts.controllers import iperf_client 29from acts.controllers.utils_lib import ssh 30from acts.metrics.loggers.blackbox import BlackboxMappedMetricLogger 31from acts.test_utils.wifi import ota_chamber 32from acts.test_utils.wifi import wifi_performance_test_utils as wputils 33from acts.test_utils.wifi import wifi_test_utils as wutils 34from acts.test_utils.wifi import wifi_retail_ap as retail_ap 35from functools import partial 36from WifiRvrTest import WifiRvrTest 37from WifiPingTest import WifiPingTest 38 39 40class WifiSensitivityTest(WifiRvrTest, WifiPingTest): 41 """Class to test WiFi sensitivity tests. 42 43 This class implements measures WiFi sensitivity per rate. It heavily 44 leverages the WifiRvrTest class and introduced minor differences to set 45 specific rates and the access point, and implements a different pass/fail 46 check. For an example config file to run this test class see 47 example_connectivity_performance_ap_sta.json. 48 """ 49 50 RSSI_POLL_INTERVAL = 0.2 51 VALID_TEST_CONFIGS = { 52 1: ['legacy', 'VHT20'], 53 2: ['legacy', 'VHT20'], 54 6: ['legacy', 'VHT20'], 55 10: ['legacy', 'VHT20'], 56 11: ['legacy', 'VHT20'], 57 36: ['legacy', 'VHT20', 'VHT40', 'VHT80'], 58 40: ['legacy', 'VHT20'], 59 44: ['legacy', 'VHT20'], 60 48: ['legacy', 'VHT20'], 61 149: ['legacy', 'VHT20', 'VHT40', 'VHT80'], 62 153: ['legacy', 'VHT20'], 63 157: ['legacy', 'VHT20'], 64 161: ['legacy', 'VHT20'] 65 } 66 RateTuple = collections.namedtuple(('RateTuple'), 67 ['mcs', 'streams', 'data_rate']) 68 #yapf:disable 69 VALID_RATES = { 70 'legacy_2GHz': [ 71 RateTuple(54, 1, 54), RateTuple(48, 1, 48), 72 RateTuple(36, 1, 36), RateTuple(24, 1, 24), 73 RateTuple(18, 1, 18), RateTuple(12, 1, 12), 74 RateTuple(11, 1, 11), RateTuple(9, 1, 9), 75 RateTuple(6, 1, 6), RateTuple(5.5, 1, 5.5), 76 RateTuple(2, 1, 2), RateTuple(1, 1, 1)], 77 'legacy_5GHz': [ 78 RateTuple(54, 1, 54), RateTuple(48, 1, 48), 79 RateTuple(36, 1, 36), RateTuple(24, 1, 24), 80 RateTuple(18, 1, 18), RateTuple(12, 1, 12), 81 RateTuple(9, 1, 9), RateTuple(6, 1, 6)], 82 'HT20': [ 83 RateTuple(7, 1, 72.2), RateTuple(6, 1, 65), 84 RateTuple(5, 1, 57.8), RateTuple(4, 1, 43.3), 85 RateTuple(3, 1, 26), RateTuple(2, 1, 21.7), 86 RateTuple(1, 1, 14.4), RateTuple(0, 1, 7.2), 87 RateTuple(15, 2, 144.4), RateTuple(14, 2, 130), 88 RateTuple(13, 2, 115.6), RateTuple(12, 2, 86.7), 89 RateTuple(11, 2, 57.8), RateTuple(10, 2, 43.4), 90 RateTuple(9, 2, 28.9), RateTuple(8, 2, 14.4)], 91 'VHT20': [ 92 RateTuple(9, 1, 96), RateTuple(8, 1, 86.7), 93 RateTuple(7, 1, 72.2), RateTuple(6, 1, 65), 94 RateTuple(5, 1, 57.8), RateTuple(4, 1, 43.3), 95 RateTuple(3, 1, 28.9), RateTuple(2, 1, 21.7), 96 RateTuple(1, 1, 14.4), RateTuple(0, 1, 7.2), 97 RateTuple(9, 2, 192), RateTuple(8, 2, 173.3), 98 RateTuple(7, 2, 144.4), RateTuple(6, 2, 130.3), 99 RateTuple(5, 2, 115.6), RateTuple(4, 2, 86.7), 100 RateTuple(3, 2, 57.8), RateTuple(2, 2, 43.3), 101 RateTuple(1, 2, 28.9), RateTuple(0, 2, 14.4)], 102 'VHT40': [ 103 RateTuple(9, 1, 96), RateTuple(8, 1, 86.7), 104 RateTuple(7, 1, 72.2), RateTuple(6, 1, 65), 105 RateTuple(5, 1, 57.8), RateTuple(4, 1, 43.3), 106 RateTuple(3, 1, 28.9), RateTuple(2, 1, 21.7), 107 RateTuple(1, 1, 14.4), RateTuple(0, 1, 7.2), 108 RateTuple(9, 2, 192), RateTuple(8, 2, 173.3), 109 RateTuple(7, 2, 144.4), RateTuple(6, 2, 130.3), 110 RateTuple(5, 2, 115.6), RateTuple(4, 2, 86.7), 111 RateTuple(3, 2, 57.8), RateTuple(2, 2, 43.3), 112 RateTuple(1, 2, 28.9), RateTuple(0, 2, 14.4)], 113 'VHT80': [ 114 RateTuple(9, 1, 96), RateTuple(8, 1, 86.7), 115 RateTuple(7, 1, 72.2), RateTuple(6, 1, 65), 116 RateTuple(5, 1, 57.8), RateTuple(4, 1, 43.3), 117 RateTuple(3, 1, 28.9), RateTuple(2, 1, 21.7), 118 RateTuple(1, 1, 14.4), RateTuple(0, 1, 7.2), 119 RateTuple(9, 2, 192), RateTuple(8, 2, 173.3), 120 RateTuple(7, 2, 144.4), RateTuple(6, 2, 130.3), 121 RateTuple(5, 2, 115.6), RateTuple(4, 2, 86.7), 122 RateTuple(3, 2, 57.8), RateTuple(2, 2, 43.3), 123 RateTuple(1, 2, 28.9), RateTuple(0, 2, 14.4)], 124 } 125 #yapf:enable 126 127 def __init__(self, controllers): 128 base_test.BaseTestClass.__init__(self, controllers) 129 self.testcase_metric_logger = ( 130 BlackboxMappedMetricLogger.for_test_case()) 131 self.testclass_metric_logger = ( 132 BlackboxMappedMetricLogger.for_test_class()) 133 self.publish_testcase_metrics = True 134 135 def setup_class(self): 136 """Initializes common test hardware and parameters. 137 138 This function initializes hardwares and compiles parameters that are 139 common to all tests in this class. 140 """ 141 self.dut = self.android_devices[-1] 142 req_params = [ 143 'RetailAccessPoints', 'sensitivity_test_params', 'testbed_params', 144 'RemoteServer' 145 ] 146 opt_params = ['main_network', 'golden_files_list'] 147 self.unpack_userparams(req_params, opt_params) 148 self.testclass_params = self.sensitivity_test_params 149 self.num_atten = self.attenuators[0].instrument.num_atten 150 self.ping_server = ssh.connection.SshConnection( 151 ssh.settings.from_config(self.RemoteServer[0]['ssh_config'])) 152 self.iperf_server = self.iperf_servers[0] 153 self.iperf_client = self.iperf_clients[0] 154 self.access_point = retail_ap.create(self.RetailAccessPoints)[0] 155 self.log.info('Access Point Configuration: {}'.format( 156 self.access_point.ap_settings)) 157 self.log_path = os.path.join(logging.log_path, 'results') 158 os.makedirs(self.log_path, exist_ok=True) 159 if not hasattr(self, 'golden_files_list'): 160 self.golden_files_list = [ 161 os.path.join(self.testbed_params['golden_results_path'], file) 162 for file in os.listdir( 163 self.testbed_params['golden_results_path']) 164 ] 165 if hasattr(self, 'bdf'): 166 self.log.info('Pushing WiFi BDF to DUT.') 167 wputils.push_bdf(self.dut, self.bdf) 168 if hasattr(self, 'firmware'): 169 self.log.info('Pushing WiFi firmware to DUT.') 170 wlanmdsp = [ 171 file for file in self.firmware if "wlanmdsp.mbn" in file 172 ][0] 173 data_msc = [file for file in self.firmware 174 if "Data.msc" in file][0] 175 wputils.push_firmware(self.dut, wlanmdsp, data_msc) 176 self.atten_dut_chain_map = {} 177 self.testclass_results = [] 178 179 # Turn WiFi ON 180 if self.testclass_params.get('airplane_mode', 1): 181 self.log.info('Turning on airplane mode.') 182 asserts.assert_true(utils.force_airplane_mode(self.dut, True), 183 "Can not turn on airplane mode.") 184 wutils.wifi_toggle_state(self.dut, True) 185 186 def teardown_class(self): 187 # Turn WiFi OFF 188 for dev in self.android_devices: 189 wutils.wifi_toggle_state(dev, False) 190 self.process_testclass_results() 191 192 def pass_fail_check(self, result): 193 """Checks sensitivity against golden results and decides on pass/fail. 194 195 Args: 196 result: dict containing attenuation, throughput and other meta 197 data 198 """ 199 try: 200 golden_path = next(file_name 201 for file_name in self.golden_files_list 202 if 'sensitivity_targets' in file_name) 203 with open(golden_path, 'r') as golden_file: 204 golden_results = json.load(golden_file) 205 golden_sensitivity = golden_results[ 206 self.current_test_name]['sensitivity'] 207 except: 208 golden_sensitivity = float('nan') 209 210 result_string = ('Throughput = {}%, Sensitivity = {}.' 211 'Target Sensitivity = {}'.format( 212 result['peak_throughput_pct'], 213 result['sensitivity'], golden_sensitivity)) 214 if result['peak_throughput_pct'] < 95: 215 self.log.warning('Result unreliable. Peak rate unstable') 216 if result['sensitivity'] - golden_sensitivity < self.testclass_params[ 217 'sensitivity_tolerance']: 218 asserts.explicit_pass('Test Passed. {}'.format(result_string)) 219 else: 220 asserts.fail('Test Failed. {}'.format(result_string)) 221 222 def process_testclass_results(self): 223 """Saves and plots test results from all executed test cases.""" 224 # write json output 225 testclass_results_dict = collections.OrderedDict() 226 id_fields = ['mode', 'rate', 'num_streams', 'chain_mask'] 227 channels_tested = [] 228 for result in self.testclass_results: 229 testcase_params = result['testcase_params'] 230 test_id = self.extract_test_id(testcase_params, id_fields) 231 test_id = tuple(test_id.items()) 232 if test_id not in testclass_results_dict: 233 testclass_results_dict[test_id] = collections.OrderedDict() 234 channel = testcase_params['channel'] 235 if channel not in channels_tested: 236 channels_tested.append(channel) 237 if result['peak_throughput_pct'] >= 95: 238 testclass_results_dict[test_id][channel] = result[ 239 'sensitivity'] 240 else: 241 testclass_results_dict[test_id][channel] = '' 242 243 # calculate average metrics 244 metrics_dict = collections.OrderedDict() 245 id_fields = ['channel', 'mode', 'num_streams', 'chain_mask'] 246 for test_id in testclass_results_dict.keys(): 247 for channel in testclass_results_dict[test_id].keys(): 248 metric_tag = collections.OrderedDict(test_id, channel=channel) 249 metric_tag = self.extract_test_id(metric_tag, id_fields) 250 metric_tag = tuple(metric_tag.items()) 251 metrics_dict.setdefault(metric_tag, []) 252 sensitivity_result = testclass_results_dict[test_id][channel] 253 if sensitivity_result != '': 254 metrics_dict[metric_tag].append(sensitivity_result) 255 for metric_tag_tuple, metric_data in metrics_dict.items(): 256 metric_tag_dict = collections.OrderedDict(metric_tag_tuple) 257 metric_tag = 'ch{}_{}_nss{}_chain{}'.format( 258 metric_tag_dict['channel'], metric_tag_dict['mode'], 259 metric_tag_dict['num_streams'], metric_tag_dict['chain_mask']) 260 metric_key = "{}.avg_sensitivity".format(metric_tag) 261 metric_value = numpy.nanmean(metric_data) 262 self.testclass_metric_logger.add_metric(metric_key, metric_value) 263 264 # write csv 265 csv_header = ['Mode', 'MCS', 'Streams', 'Chain', 'Rate (Mbps)'] 266 for channel in channels_tested: 267 csv_header.append('Ch. ' + str(channel)) 268 results_file_path = os.path.join(self.log_path, 'results.csv') 269 with open(results_file_path, mode='w') as csv_file: 270 writer = csv.DictWriter(csv_file, fieldnames=csv_header) 271 writer.writeheader() 272 for test_id, test_results in testclass_results_dict.items(): 273 test_id_dict = dict(test_id) 274 if 'legacy' in test_id_dict['mode']: 275 rate_list = self.VALID_RATES['legacy_2GHz'] 276 else: 277 rate_list = self.VALID_RATES[test_id_dict['mode']] 278 data_rate = next(rate.data_rate for rate in rate_list 279 if rate[:-1] == (test_id_dict['rate'], 280 test_id_dict['num_streams'])) 281 row_value = { 282 'Mode': test_id_dict['mode'], 283 'MCS': test_id_dict['rate'], 284 'Streams': test_id_dict['num_streams'], 285 'Chain': test_id_dict['chain_mask'], 286 'Rate (Mbps)': data_rate, 287 } 288 for channel in channels_tested: 289 row_value['Ch. ' + str(channel)] = test_results.pop( 290 channel, ' ') 291 writer.writerow(row_value) 292 293 if not self.testclass_params['traffic_type'].lower() == 'ping': 294 WifiRvrTest.process_testclass_results(self) 295 296 def process_rvr_test_results(self, testcase_params, rvr_result): 297 """Post processes RvR results to compute sensitivity. 298 299 Takes in the results of the RvR tests and computes the sensitivity of 300 the current rate by looking at the point at which throughput drops 301 below the percentage specified in the config file. The function then 302 calls on its parent class process_test_results to plot the result. 303 304 Args: 305 rvr_result: dict containing attenuation, throughput and other meta 306 data 307 """ 308 rvr_result['peak_throughput'] = max(rvr_result['throughput_receive']) 309 rvr_result['peak_throughput_pct'] = 100 310 throughput_check = [ 311 throughput < rvr_result['peak_throughput'] * 312 (self.testclass_params['throughput_pct_at_sensitivity'] / 100) 313 for throughput in rvr_result['throughput_receive'] 314 ] 315 consistency_check = [ 316 idx for idx in range(len(throughput_check)) 317 if all(throughput_check[idx:]) 318 ] 319 rvr_result['atten_at_range'] = rvr_result['attenuation'][ 320 consistency_check[0] - 1] 321 rvr_result['range'] = rvr_result['fixed_attenuation'] + ( 322 rvr_result['atten_at_range']) 323 rvr_result['sensitivity'] = self.testclass_params['ap_tx_power'] + ( 324 self.testbed_params['ap_tx_power_offset'][str( 325 testcase_params['channel'])] - rvr_result['range']) 326 WifiRvrTest.process_test_results(self, rvr_result) 327 328 def process_ping_test_results(self, testcase_params, ping_result): 329 """Post processes RvR results to compute sensitivity. 330 331 Takes in the results of the RvR tests and computes the sensitivity of 332 the current rate by looking at the point at which throughput drops 333 below the percentage specified in the config file. The function then 334 calls on its parent class process_test_results to plot the result. 335 336 Args: 337 rvr_result: dict containing attenuation, throughput and other meta 338 data 339 """ 340 WifiPingTest.process_ping_results(self, testcase_params, ping_result) 341 ping_result['sensitivity'] = self.testclass_params['ap_tx_power'] + ( 342 self.testbed_params['ap_tx_power_offset'][str( 343 testcase_params['channel'])] - ping_result['range']) 344 345 def setup_sensitivity_test(self, testcase_params): 346 if testcase_params['traffic_type'].lower() == 'ping': 347 self.setup_ping_test(testcase_params) 348 self.run_sensitivity_test = self.run_ping_test 349 self.process_sensitivity_test_results = ( 350 self.process_ping_test_results) 351 else: 352 self.setup_rvr_test(testcase_params) 353 self.run_sensitivity_test = self.run_rvr_test 354 self.process_sensitivity_test_results = ( 355 self.process_rvr_test_results) 356 357 def setup_ap(self, testcase_params): 358 """Sets up the AP and attenuator to compensate for AP chain imbalance. 359 360 Args: 361 testcase_params: dict containing AP and other test params 362 """ 363 band = self.access_point.band_lookup_by_channel( 364 testcase_params['channel']) 365 if '2G' in band: 366 frequency = wutils.WifiEnums.channel_2G_to_freq[ 367 testcase_params['channel']] 368 else: 369 frequency = wutils.WifiEnums.channel_5G_to_freq[ 370 testcase_params['channel']] 371 if frequency in wutils.WifiEnums.DFS_5G_FREQUENCIES: 372 self.access_point.set_region(self.testbed_params['DFS_region']) 373 else: 374 self.access_point.set_region(self.testbed_params['default_region']) 375 self.access_point.set_channel(band, testcase_params['channel']) 376 self.access_point.set_bandwidth(band, testcase_params['mode']) 377 self.access_point.set_power(band, testcase_params['ap_tx_power']) 378 self.access_point.set_rate(band, testcase_params['mode'], 379 testcase_params['num_streams'], 380 testcase_params['rate'], 381 testcase_params['short_gi']) 382 # Set attenuator offsets and set attenuators to initial condition 383 atten_offsets = self.testbed_params['chain_offset'][str( 384 testcase_params['channel'])] 385 for atten in self.attenuators: 386 if 'AP-Chain-0' in atten.path: 387 atten.offset = atten_offsets[0] 388 elif 'AP-Chain-1' in atten.path: 389 atten.offset = atten_offsets[1] 390 else: 391 atten.offset = 0 392 self.log.info('Access Point Configuration: {}'.format( 393 self.access_point.ap_settings)) 394 395 def setup_dut(self, testcase_params): 396 """Sets up the DUT in the configuration required by the test. 397 398 Args: 399 testcase_params: dict containing AP and other test params 400 """ 401 # Check battery level before test 402 if not wputils.health_check(self.dut, 10): 403 asserts.skip('Battery level too low. Skipping test.') 404 # Turn screen off to preserve battery 405 self.dut.go_to_sleep() 406 if wputils.validate_network(self.dut, 407 testcase_params['test_network']['SSID']): 408 self.log.info('Already connected to desired network') 409 else: 410 wutils.reset_wifi(self.dut) 411 wutils.set_wifi_country_code(self.dut, 412 self.testclass_params['country_code']) 413 testcase_params['test_network']['channel'] = testcase_params[ 414 'channel'] 415 wutils.wifi_connect(self.dut, 416 testcase_params['test_network'], 417 num_of_tries=5, 418 check_connectivity=False) 419 self.dut_ip = self.dut.droid.connectivityGetIPv4Addresses('wlan0')[0] 420 # Activate/attenuate the correct chains 421 if testcase_params['channel'] not in self.atten_dut_chain_map.keys(): 422 self.atten_dut_chain_map[testcase_params[ 423 'channel']] = wputils.get_current_atten_dut_chain_map( 424 self.attenuators, self.dut, self.ping_server) 425 self.log.info("Current Attenuator-DUT Chain Map: {}".format( 426 self.atten_dut_chain_map[testcase_params['channel']])) 427 for idx, atten in enumerate(self.attenuators): 428 if self.atten_dut_chain_map[testcase_params['channel']][ 429 idx] == testcase_params['attenuated_chain']: 430 atten.offset = atten.instrument.max_atten 431 432 def extract_test_id(self, testcase_params, id_fields): 433 test_id = collections.OrderedDict( 434 (param, testcase_params[param]) for param in id_fields) 435 return test_id 436 437 def get_start_atten(self, testcase_params): 438 """Gets the starting attenuation for this sensitivity test. 439 440 The function gets the starting attenuation by checking whether a test 441 as the next higher MCS has been executed. If so it sets the starting 442 point a configurable number of dBs below the next MCS's sensitivity. 443 444 Returns: 445 start_atten: starting attenuation for current test 446 """ 447 # Get the current and reference test config. The reference test is the 448 # one performed at the current MCS+1 449 current_rate = testcase_params['rate'] 450 ref_test_params = self.extract_test_id( 451 testcase_params, 452 ['channel', 'mode', 'rate', 'num_streams', 'chain_mask']) 453 if 'legacy' in testcase_params['mode']: 454 if testcase_params['channel'] <= 13: 455 rate_list = self.VALID_RATES['legacy_2GHz'] 456 else: 457 rate_list = self.VALID_RATES['legacy_5GHz'] 458 ref_index = max( 459 0, 460 rate_list.index(self.RateTuple(current_rate, 1, current_rate)) 461 - 1) 462 ref_test_params['rate'] = rate_list[ref_index].mcs 463 else: 464 ref_test_params['rate'] = current_rate + 1 465 466 # Check if reference test has been run and set attenuation accordingly 467 previous_params = [ 468 self.extract_test_id( 469 result['testcase_params'], 470 ['channel', 'mode', 'rate', 'num_streams', 'chain_mask']) 471 for result in self.testclass_results 472 ] 473 474 try: 475 ref_index = previous_params.index(ref_test_params) 476 start_atten = self.testclass_results[ref_index][ 477 'atten_at_range'] - ( 478 self.testclass_params['adjacent_mcs_range_gap']) 479 except ValueError: 480 self.log.warning( 481 'Reference test not found. Starting from {} dB'.format( 482 self.testclass_params['atten_start'])) 483 start_atten = self.testclass_params['atten_start'] 484 start_atten = max(start_atten, 0) 485 return start_atten 486 487 def compile_test_params(self, testcase_params): 488 """Function that generates test params based on the test name.""" 489 band = self.access_point.band_lookup_by_channel( 490 testcase_params['channel']) 491 testcase_params['test_network'] = self.main_network[band] 492 if testcase_params['chain_mask'] in ['0', '1']: 493 testcase_params['attenuated_chain'] = 'DUT-Chain-{}'.format( 494 1 if testcase_params['chain_mask'] == '0' else 0) 495 else: 496 # Set attenuated chain to -1. Do not set to None as this will be 497 # compared to RF chain map which may include None 498 testcase_params['attenuated_chain'] = -1 499 500 self.testclass_params[ 501 'range_ping_loss_threshold'] = 100 - self.testclass_params[ 502 'throughput_pct_at_sensitivity'] 503 if self.testclass_params['traffic_type'] == 'UDP': 504 testcase_params['iperf_args'] = '-i 1 -t {} -J -u -b {}'.format( 505 self.testclass_params['iperf_duration'], 506 self.testclass_params['UDP_rates'][testcase_params['mode']]) 507 elif self.testclass_params['traffic_type'] == 'TCP': 508 testcase_params['iperf_args'] = '-i 1 -t {} -J'.format( 509 self.testclass_params['iperf_duration']) 510 511 if self.testclass_params['traffic_type'] != 'ping' and isinstance( 512 self.iperf_client, iperf_client.IPerfClientOverAdb): 513 testcase_params['iperf_args'] += ' -R' 514 testcase_params['use_client_output'] = True 515 else: 516 testcase_params['use_client_output'] = False 517 518 return testcase_params 519 520 def _test_sensitivity(self, testcase_params): 521 """ Function that gets called for each test case 522 523 The function gets called in each rvr test case. The function customizes 524 the rvr test based on the test name of the test that called it 525 """ 526 # Compile test parameters from config and test name 527 testcase_params = self.compile_test_params(testcase_params) 528 testcase_params.update(self.testclass_params) 529 testcase_params['atten_start'] = self.get_start_atten(testcase_params) 530 num_atten_steps = int( 531 (testcase_params['atten_stop'] - testcase_params['atten_start']) / 532 testcase_params['atten_step']) 533 testcase_params['atten_range'] = [ 534 testcase_params['atten_start'] + x * testcase_params['atten_step'] 535 for x in range(0, num_atten_steps) 536 ] 537 538 # Prepare devices and run test 539 self.setup_sensitivity_test(testcase_params) 540 result = self.run_sensitivity_test(testcase_params) 541 self.process_sensitivity_test_results(testcase_params, result) 542 543 # Post-process results 544 self.testclass_results.append(result) 545 self.pass_fail_check(result) 546 547 def generate_test_cases(self, channels, modes, chain_mask): 548 """Function that auto-generates test cases for a test class.""" 549 test_cases = [] 550 for channel in channels: 551 requested_modes = [ 552 mode for mode in modes 553 if mode in self.VALID_TEST_CONFIGS[channel] 554 ] 555 for mode in requested_modes: 556 if 'VHT' in mode: 557 rates = self.VALID_RATES[mode] 558 elif 'HT' in mode: 559 rates = self.VALID_RATES[mode] 560 elif 'legacy' in mode and channel < 14: 561 rates = self.VALID_RATES['legacy_2GHz'] 562 elif 'legacy' in mode and channel > 14: 563 rates = self.VALID_RATES['legacy_5GHz'] 564 else: 565 raise ValueError('Invalid test mode.') 566 for chain, rate in itertools.product(chain_mask, rates): 567 testcase_params = collections.OrderedDict( 568 channel=channel, 569 mode=mode, 570 rate=rate.mcs, 571 num_streams=rate.streams, 572 short_gi=1, 573 chain_mask=chain) 574 if chain in ['0', '1'] and rate[1] == 2: 575 # Do not test 2-stream rates in single chain mode 576 continue 577 if 'legacy' in mode: 578 testcase_name = ('test_sensitivity_ch{}_{}_{}_nss{}' 579 '_ch{}'.format( 580 channel, mode, 581 str(rate.mcs).replace('.', 'p'), 582 rate.streams, chain)) 583 else: 584 testcase_name = ('test_sensitivity_ch{}_{}_mcs{}_nss{}' 585 '_ch{}'.format( 586 channel, mode, rate.mcs, 587 rate.streams, chain)) 588 setattr(self, testcase_name, 589 partial(self._test_sensitivity, testcase_params)) 590 test_cases.append(testcase_name) 591 return test_cases 592 593 594class WifiSensitivity_AllChannels_Test(WifiSensitivityTest): 595 def __init__(self, controllers): 596 super().__init__(controllers) 597 self.tests = self.generate_test_cases( 598 [6, 36, 40, 44, 48, 149, 153, 157, 161], 599 ['VHT20', 'VHT40', 'VHT80'], ['0', '1', '2x2']) 600 601 602class WifiSensitivity_SampleChannels_Test(WifiSensitivityTest): 603 def __init__(self, controllers): 604 super().__init__(controllers) 605 self.tests = self.generate_test_cases([6, 36, 149], 606 ['VHT20', 'VHT40', 'VHT80'], 607 ['0', '1', '2x2']) 608 609 610class WifiSensitivity_2GHz_Test(WifiSensitivityTest): 611 def __init__(self, controllers): 612 super().__init__(controllers) 613 self.tests = self.generate_test_cases([1, 2, 6, 10, 11], ['VHT20'], 614 ['0', '1', '2x2']) 615 616 617class WifiSensitivity_5GHz_Test(WifiSensitivityTest): 618 def __init__(self, controllers): 619 super().__init__(controllers) 620 self.tests = self.generate_test_cases( 621 [36, 40, 44, 48, 149, 153, 157, 161], ['VHT20', 'VHT40', 'VHT80'], 622 ['0', '1', '2x2']) 623 624 625class WifiSensitivity_UNII1_Test(WifiSensitivityTest): 626 def __init__(self, controllers): 627 super().__init__(controllers) 628 self.tests = self.generate_test_cases([36, 40, 44, 48], 629 ['VHT20', 'VHT40', 'VHT80'], 630 ['0', '1', '2x2']) 631 632 633class WifiSensitivity_UNII3_Test(WifiSensitivityTest): 634 def __init__(self, controllers): 635 super().__init__(controllers) 636 self.tests = self.generate_test_cases([149, 153, 157, 161], 637 ['VHT20', 'VHT40', 'VHT80'], 638 ['0', '1', '2x2']) 639 640 641# Over-the air version of senstivity tests 642class WifiOtaSensitivityTest(WifiSensitivityTest): 643 """Class to test over-the-air senstivity. 644 645 This class implements measures WiFi sensitivity tests in an OTA chamber. 646 It allows setting orientation and other chamber parameters to study 647 performance in varying channel conditions 648 """ 649 def __init__(self, controllers): 650 base_test.BaseTestClass.__init__(self, controllers) 651 self.testcase_metric_logger = ( 652 BlackboxMappedMetricLogger.for_test_case()) 653 self.testclass_metric_logger = ( 654 BlackboxMappedMetricLogger.for_test_class()) 655 self.publish_testcase_metrics = False 656 657 def setup_class(self): 658 WifiSensitivityTest.setup_class(self) 659 self.current_chain_mask = '2x2' 660 self.ota_chamber = ota_chamber.create( 661 self.user_params['OTAChamber'])[0] 662 663 def teardown_class(self): 664 WifiSensitivityTest.teardown_class(self) 665 self.ota_chamber.reset_chamber() 666 667 def setup_sensitivity_test(self, testcase_params): 668 # Setup turntable 669 self.ota_chamber.set_orientation(testcase_params['orientation']) 670 # Continue test setup 671 WifiSensitivityTest.setup_sensitivity_test(self, testcase_params) 672 673 def setup_dut(self, testcase_params): 674 """Sets up the DUT in the configuration required by the test. 675 676 Args: 677 testcase_params: dict containing AP and other test params 678 """ 679 # Configure the right INI settings 680 if testcase_params['chain_mask'] != self.current_chain_mask: 681 self.log.info('Updating WiFi chain mask to: {}'.format( 682 testcase_params['chain_mask'])) 683 self.current_chain_mask = testcase_params['chain_mask'] 684 if testcase_params['chain_mask'] in ['0', '1']: 685 wputils.set_ini_single_chain_mode( 686 self.dut, int(testcase_params['chain_mask'])) 687 else: 688 wputils.set_ini_two_chain_mode(self.dut) 689 # Check battery level before test 690 if not wputils.health_check(self.dut, 10): 691 asserts.skip('Battery level too low. Skipping test.') 692 # Turn screen off to preserve battery 693 self.dut.go_to_sleep() 694 if wputils.validate_network(self.dut, 695 testcase_params['test_network']['SSID']): 696 self.log.info('Already connected to desired network') 697 else: 698 wutils.reset_wifi(self.dut) 699 wutils.set_wifi_country_code(self.dut, 700 self.testclass_params['country_code']) 701 testcase_params['test_network']['channel'] = testcase_params[ 702 'channel'] 703 wutils.wifi_connect(self.dut, 704 testcase_params['test_network'], 705 num_of_tries=5, 706 check_connectivity=False) 707 self.dut_ip = self.dut.droid.connectivityGetIPv4Addresses('wlan0')[0] 708 709 def process_testclass_results(self): 710 """Saves and plots test results from all executed test cases.""" 711 testclass_results_dict = collections.OrderedDict() 712 id_fields = ['channel', 'mode', 'rate'] 713 plots = [] 714 for result in self.testclass_results: 715 test_id = self.extract_test_id(result['testcase_params'], 716 id_fields) 717 test_id = tuple(test_id.items()) 718 chain_mask = result['testcase_params']['chain_mask'] 719 num_streams = result['testcase_params']['num_streams'] 720 line_id = (chain_mask, num_streams) 721 if test_id not in testclass_results_dict: 722 testclass_results_dict[test_id] = collections.OrderedDict() 723 if line_id not in testclass_results_dict[test_id]: 724 testclass_results_dict[test_id][line_id] = { 725 'orientation': [], 726 'sensitivity': [] 727 } 728 testclass_results_dict[test_id][line_id]['orientation'].append( 729 result['testcase_params']['orientation']) 730 if result['peak_throughput_pct'] >= 95: 731 testclass_results_dict[test_id][line_id]['sensitivity'].append( 732 result['sensitivity']) 733 else: 734 testclass_results_dict[test_id][line_id]['sensitivity'].append( 735 float('nan')) 736 737 for test_id, test_data in testclass_results_dict.items(): 738 test_id_dict = dict(test_id) 739 if 'legacy' in test_id_dict['mode']: 740 test_id_str = 'Channel {} - {} {}Mbps'.format( 741 test_id_dict['channel'], test_id_dict['mode'], 742 test_id_dict['rate']) 743 else: 744 test_id_str = 'Channel {} - {} MCS{}'.format( 745 test_id_dict['channel'], test_id_dict['mode'], 746 test_id_dict['rate']) 747 curr_plot = wputils.BokehFigure( 748 title=str(test_id_str), 749 x_label='Orientation (deg)', 750 primary_y_label='Sensitivity (dBm)') 751 for line_id, line_results in test_data.items(): 752 curr_plot.add_line(line_results['orientation'], 753 line_results['sensitivity'], 754 legend='Nss{} - Chain Mask {}'.format( 755 line_id[1], line_id[0]), 756 marker='circle') 757 if 'legacy' in test_id_dict['mode']: 758 metric_tag = 'ota_summary_ch{}_{}_{}_ch{}'.format( 759 test_id_dict['channel'], test_id_dict['mode'], 760 test_id_dict['rate'], line_id[0]) 761 else: 762 metric_tag = 'ota_summary_ch{}_{}_mcs{}_nss{}_ch{}'.format( 763 test_id_dict['channel'], test_id_dict['mode'], 764 test_id_dict['rate'], line_id[1], line_id[0]) 765 766 metric_name = metric_tag + '.avg_sensitivity' 767 metric_value = numpy.nanmean(line_results['sensitivity']) 768 self.testclass_metric_logger.add_metric( 769 metric_name, metric_value) 770 self.log.info(("Average Sensitivity for {}: {:.1f}").format( 771 metric_tag, metric_value)) 772 current_context = ( 773 context.get_current_context().get_full_output_path()) 774 output_file_path = os.path.join(current_context, 775 str(test_id_str) + '.html') 776 curr_plot.generate_figure(output_file_path) 777 plots.append(curr_plot) 778 output_file_path = os.path.join(current_context, 'results.html') 779 wputils.BokehFigure.save_figures(plots, output_file_path) 780 781 def get_start_atten(self, testcase_params): 782 """Gets the starting attenuation for this sensitivity test. 783 784 The function gets the starting attenuation by checking whether a test 785 at the same rate configuration has executed. If so it sets the starting 786 point a configurable number of dBs below the reference test. 787 788 Returns: 789 start_atten: starting attenuation for current test 790 """ 791 # Get the current and reference test config. The reference test is the 792 # one performed at the current MCS+1 793 ref_test_params = self.extract_test_id( 794 testcase_params, 795 ['channel', 'mode', 'rate', 'num_streams', 'chain_mask']) 796 # Check if reference test has been run and set attenuation accordingly 797 previous_params = [ 798 self.extract_test_id( 799 result['testcase_params'], 800 ['channel', 'mode', 'rate', 'num_streams', 'chain_mask']) 801 for result in self.testclass_results 802 ] 803 try: 804 ref_index = previous_params[::-1].index(ref_test_params) 805 ref_index = len(previous_params) - 1 - ref_index 806 start_atten = self.testclass_results[ref_index][ 807 'atten_at_range'] - ( 808 self.testclass_params['adjacent_mcs_range_gap']) 809 except ValueError: 810 print('Reference test not found. Starting from {} dB'.format( 811 self.testclass_params['atten_start'])) 812 start_atten = self.testclass_params['atten_start'] 813 start_atten = max(start_atten, 0) 814 return start_atten 815 816 def generate_test_cases(self, channels, modes, requested_rates, chain_mask, 817 angles): 818 """Function that auto-generates test cases for a test class.""" 819 test_cases = [] 820 for channel in channels: 821 requested_modes = [ 822 mode for mode in modes 823 if mode in self.VALID_TEST_CONFIGS[channel] 824 ] 825 for chain, mode in itertools.product(chain_mask, requested_modes): 826 if 'VHT' in mode: 827 valid_rates = self.VALID_RATES[mode] 828 elif 'HT' in mode: 829 valid_rates = self.VALID_RATES[mode] 830 elif 'legacy' in mode and channel < 14: 831 valid_rates = self.VALID_RATES['legacy_2GHz'] 832 elif 'legacy' in mode and channel > 14: 833 valid_rates = self.VALID_RATES['legacy_5GHz'] 834 else: 835 raise ValueError('Invalid test mode.') 836 for rate, angle in itertools.product(valid_rates, angles): 837 testcase_params = collections.OrderedDict( 838 channel=channel, 839 mode=mode, 840 rate=rate.mcs, 841 num_streams=rate.streams, 842 short_gi=1, 843 chain_mask=chain, 844 orientation=angle) 845 if rate not in requested_rates: 846 continue 847 if str(chain) in ['0', '1'] and rate[1] == 2: 848 # Do not test 2-stream rates in single chain mode 849 continue 850 if 'legacy' in mode: 851 testcase_name = ('test_sensitivity_ch{}_{}_{}_nss{}' 852 '_ch{}_{}deg'.format( 853 channel, mode, 854 str(rate.mcs).replace('.', 'p'), 855 rate.streams, chain, angle)) 856 else: 857 testcase_name = ('test_sensitivity_ch{}_{}_mcs{}_nss{}' 858 '_ch{}_{}deg'.format( 859 channel, mode, rate.mcs, 860 rate.streams, chain, angle)) 861 setattr(self, testcase_name, 862 partial(self._test_sensitivity, testcase_params)) 863 test_cases.append(testcase_name) 864 return test_cases 865 866 867class WifiOtaSensitivity_TenDegree_Test(WifiOtaSensitivityTest): 868 def __init__(self, controllers): 869 WifiOtaSensitivityTest.__init__(self, controllers) 870 requested_channels = [6, 36, 149] 871 requested_rates = [ 872 self.RateTuple(8, 1, 86.7), 873 self.RateTuple(2, 1, 21.7), 874 self.RateTuple(8, 2, 173.3), 875 self.RateTuple(2, 2, 43.3) 876 ] 877 self.tests = self.generate_test_cases(requested_channels, 878 ['VHT20', 'VHT80'], 879 requested_rates, ['2x2'], 880 list(range(0, 360, 10))) 881 882 883class WifiOtaSensitivity_PerChain_TenDegree_Test(WifiOtaSensitivityTest): 884 def __init__(self, controllers): 885 WifiOtaSensitivityTest.__init__(self, controllers) 886 requested_channels = [6, 36, 149] 887 requested_rates = [ 888 self.RateTuple(2, 1, 21.7), 889 self.RateTuple(2, 2, 43.3) 890 ] 891 self.tests = self.generate_test_cases(requested_channels, ['VHT20'], 892 requested_rates, 893 ['0', '1', '2x2'], 894 list(range(0, 360, 10))) 895 896 897class WifiOtaSensitivity_ThirtyDegree_Test(WifiOtaSensitivityTest): 898 def __init__(self, controllers): 899 WifiOtaSensitivityTest.__init__(self, controllers) 900 requested_channels = [6, 36, 149] 901 requested_rates = [ 902 self.RateTuple(9, 1, 96), 903 self.RateTuple(8, 1, 86.7), 904 self.RateTuple(7, 1, 72.2), 905 self.RateTuple(4, 1, 43.3), 906 self.RateTuple(2, 1, 21.7), 907 self.RateTuple(0, 1, 7.2), 908 self.RateTuple(9, 2, 192), 909 self.RateTuple(8, 2, 173.3), 910 self.RateTuple(7, 2, 144.4), 911 self.RateTuple(4, 2, 86.7), 912 self.RateTuple(2, 2, 43.3), 913 self.RateTuple(0, 2, 14.4) 914 ] 915 self.tests = self.generate_test_cases(requested_channels, 916 ['VHT20', 'VHT80'], 917 requested_rates, ['2x2'], 918 list(range(0, 360, 30))) 919 920 921class WifiOtaSensitivity_45Degree_Test(WifiOtaSensitivityTest): 922 def __init__(self, controllers): 923 WifiOtaSensitivityTest.__init__(self, controllers) 924 requested_rates = [ 925 self.RateTuple(8, 1, 86.7), 926 self.RateTuple(2, 1, 21.7), 927 self.RateTuple(8, 2, 173.3), 928 self.RateTuple(2, 2, 43.3) 929 ] 930 self.tests = self.generate_test_cases( 931 [1, 6, 11, 36, 40, 44, 48, 149, 153, 157, 161], ['VHT20', 'VHT80'], 932 requested_rates, ['2x2'], list(range(0, 360, 45))) 933