1#!/usr/bin/env python3.4 2# 3# Copyright 2018 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the 'License'); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an 'AS IS' BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import collections 18import itertools 19import json 20import logging 21import math 22import numpy 23import os 24import statistics 25from acts import asserts 26from acts import base_test 27from acts import context 28from acts import utils 29from acts.controllers.utils_lib import ssh 30from acts.controllers import iperf_server as ipf 31from acts.metrics.loggers.blackbox import BlackboxMappedMetricLogger 32from acts.test_utils.wifi import ota_chamber 33from acts.test_utils.wifi import wifi_performance_test_utils as wputils 34from acts.test_utils.wifi import wifi_retail_ap as retail_ap 35from acts.test_utils.wifi import wifi_test_utils as wutils 36from concurrent.futures import ThreadPoolExecutor 37from functools import partial 38 39SHORT_SLEEP = 1 40MED_SLEEP = 6 41CONST_3dB = 3.01029995664 42RSSI_ERROR_VAL = float('nan') 43 44 45class WifiRssiTest(base_test.BaseTestClass): 46 """Class to test WiFi RSSI reporting. 47 48 This class tests RSSI reporting on android devices. The class tests RSSI 49 accuracy by checking RSSI over a large attenuation range, checks for RSSI 50 stability over time when attenuation is fixed, and checks that RSSI quickly 51 and reacts to changes attenuation by checking RSSI trajectories over 52 configurable attenuation waveforms.For an example config file to run this 53 test class see example_connectivity_performance_ap_sta.json. 54 """ 55 def __init__(self, controllers): 56 base_test.BaseTestClass.__init__(self, controllers) 57 self.testcase_metric_logger = ( 58 BlackboxMappedMetricLogger.for_test_case()) 59 self.testclass_metric_logger = ( 60 BlackboxMappedMetricLogger.for_test_class()) 61 self.publish_test_metrics = True 62 63 def setup_class(self): 64 self.dut = self.android_devices[0] 65 req_params = [ 66 'RemoteServer', 'RetailAccessPoints', 'rssi_test_params', 67 'main_network', 'testbed_params' 68 ] 69 self.unpack_userparams(req_params) 70 self.testclass_params = self.rssi_test_params 71 self.num_atten = self.attenuators[0].instrument.num_atten 72 self.iperf_server = self.iperf_servers[0] 73 self.iperf_client = self.iperf_clients[0] 74 self.remote_server = ssh.connection.SshConnection( 75 ssh.settings.from_config(self.RemoteServer[0]['ssh_config'])) 76 self.access_point = retail_ap.create(self.RetailAccessPoints)[0] 77 self.log_path = os.path.join(logging.log_path, 'results') 78 os.makedirs(self.log_path, exist_ok=True) 79 self.log.info('Access Point Configuration: {}'.format( 80 self.access_point.ap_settings)) 81 if hasattr(self, 'bdf'): 82 self.log.info('Pushing WiFi BDF to DUT.') 83 wputils.push_bdf(self.dut, self.bdf) 84 if hasattr(self, 'firmware'): 85 self.log.info('Pushing WiFi firmware to DUT.') 86 wlanmdsp = [ 87 file for file in self.firmware if "wlanmdsp.mbn" in file 88 ][0] 89 data_msc = [file for file in self.firmware 90 if "Data.msc" in file][0] 91 wputils.push_firmware(self.dut, wlanmdsp, data_msc) 92 self.testclass_results = [] 93 94 # Turn WiFi ON 95 if self.testclass_params.get('airplane_mode', 1): 96 self.log.info('Turning on airplane mode.') 97 asserts.assert_true(utils.force_airplane_mode(self.dut, True), 98 "Can not turn on airplane mode.") 99 wutils.wifi_toggle_state(self.dut, True) 100 101 def teardown_test(self): 102 self.iperf_server.stop() 103 104 def pass_fail_check_rssi_stability(self, testcase_params, 105 postprocessed_results): 106 """Check the test result and decide if it passed or failed. 107 108 Checks the RSSI test result and fails the test if the standard 109 deviation of signal_poll_rssi is beyond the threshold defined in the 110 config file. 111 112 Args: 113 testcase_params: dict containing test-specific parameters 114 postprocessed_results: compiled arrays of RSSI measurements 115 """ 116 # Set Blackbox metric values 117 if self.publish_test_metrics: 118 self.testcase_metric_logger.add_metric( 119 'signal_poll_rssi_stdev', 120 max(postprocessed_results['signal_poll_rssi']['stdev'])) 121 self.testcase_metric_logger.add_metric( 122 'chain_0_rssi_stdev', 123 max(postprocessed_results['chain_0_rssi']['stdev'])) 124 self.testcase_metric_logger.add_metric( 125 'chain_1_rssi_stdev', 126 max(postprocessed_results['chain_1_rssi']['stdev'])) 127 128 # Evaluate test pass/fail 129 test_failed = any([ 130 stdev > self.testclass_params['stdev_tolerance'] 131 for stdev in postprocessed_results['signal_poll_rssi']['stdev'] 132 ]) 133 test_message = ( 134 'RSSI stability {0}. Standard deviation was {1} dB ' 135 '(limit {2}), per chain standard deviation [{3}, {4}] dB'.format( 136 'failed' * test_failed + 'passed' * (not test_failed), [ 137 float('{:.2f}'.format(x)) 138 for x in postprocessed_results['signal_poll_rssi']['stdev'] 139 ], self.testclass_params['stdev_tolerance'], [ 140 float('{:.2f}'.format(x)) 141 for x in postprocessed_results['chain_0_rssi']['stdev'] 142 ], [ 143 float('{:.2f}'.format(x)) 144 for x in postprocessed_results['chain_1_rssi']['stdev'] 145 ])) 146 if test_failed: 147 asserts.fail(test_message) 148 asserts.explicit_pass(test_message) 149 150 def pass_fail_check_rssi_accuracy(self, testcase_params, 151 postprocessed_results): 152 """Check the test result and decide if it passed or failed. 153 154 Checks the RSSI test result and compares and compute its deviation from 155 the predicted RSSI. This computation is done for all reported RSSI 156 values. The test fails if any of the RSSI values specified in 157 rssi_under_test have an average error beyond what is specified in the 158 configuration file. 159 160 Args: 161 postprocessed_results: compiled arrays of RSSI measurements 162 testcase_params: dict containing params such as list of RSSIs under 163 test, i.e., can cause test to fail and boolean indicating whether 164 to look at absolute RSSI accuracy, or centered RSSI accuracy. 165 Centered accuracy is computed after systematic RSSI shifts are 166 removed. 167 """ 168 test_failed = False 169 test_message = '' 170 if testcase_params['absolute_accuracy']: 171 error_type = 'absolute' 172 else: 173 error_type = 'centered' 174 175 for key, val in postprocessed_results.items(): 176 # Compute the error metrics ignoring invalid RSSI readings 177 # If all readings invalid, set error to RSSI_ERROR_VAL 178 if 'rssi' in key and 'predicted' not in key: 179 filtered_error = [x for x in val['error'] if not math.isnan(x)] 180 if filtered_error: 181 avg_shift = statistics.mean(filtered_error) 182 if testcase_params['absolute_accuracy']: 183 avg_error = statistics.mean( 184 [abs(x) for x in filtered_error]) 185 else: 186 avg_error = statistics.mean( 187 [abs(x - avg_shift) for x in filtered_error]) 188 else: 189 avg_error = RSSI_ERROR_VAL 190 avg_shift = RSSI_ERROR_VAL 191 # Set Blackbox metric values 192 if self.publish_test_metrics: 193 self.testcase_metric_logger.add_metric( 194 '{}_error'.format(key), avg_error) 195 self.testcase_metric_logger.add_metric( 196 '{}_shift'.format(key), avg_shift) 197 # Evaluate test pass/fail 198 rssi_failure = (avg_error > 199 self.testclass_params['abs_tolerance'] 200 ) or math.isnan(avg_error) 201 if rssi_failure and key in testcase_params['rssi_under_test']: 202 test_message = test_message + ( 203 '{} failed ({} error = {:.2f} dB, ' 204 'shift = {:.2f} dB)\n').format(key, error_type, 205 avg_error, avg_shift) 206 test_failed = True 207 elif rssi_failure: 208 test_message = test_message + ( 209 '{} failed (ignored) ({} error = {:.2f} dB, ' 210 'shift = {:.2f} dB)\n').format(key, error_type, 211 avg_error, avg_shift) 212 else: 213 test_message = test_message + ( 214 '{} passed ({} error = {:.2f} dB, ' 215 'shift = {:.2f} dB)\n').format(key, error_type, 216 avg_error, avg_shift) 217 if test_failed: 218 asserts.fail(test_message) 219 asserts.explicit_pass(test_message) 220 221 def post_process_rssi_sweep(self, rssi_result): 222 """Postprocesses and saves JSON formatted results. 223 224 Args: 225 rssi_result: dict containing attenuation, rssi and other meta 226 data 227 Returns: 228 postprocessed_results: compiled arrays of RSSI data used in 229 pass/fail check 230 """ 231 # Save output as text file 232 results_file_path = os.path.join(self.log_path, self.current_test_name) 233 with open(results_file_path, 'w') as results_file: 234 json.dump(rssi_result, results_file, indent=4) 235 # Compile results into arrays of RSSIs suitable for plotting 236 # yapf: disable 237 postprocessed_results = collections.OrderedDict( 238 [('signal_poll_rssi', {}), 239 ('signal_poll_avg_rssi', {}), 240 ('scan_rssi', {}), 241 ('chain_0_rssi', {}), 242 ('chain_1_rssi', {}), 243 ('total_attenuation', []), 244 ('predicted_rssi', [])]) 245 # yapf: enable 246 for key, val in postprocessed_results.items(): 247 if 'scan_rssi' in key: 248 postprocessed_results[key]['data'] = [ 249 x for data_point in rssi_result['rssi_result'] for x in 250 data_point[key][rssi_result['connected_bssid']]['data'] 251 ] 252 postprocessed_results[key]['mean'] = [ 253 x[key][rssi_result['connected_bssid']]['mean'] 254 for x in rssi_result['rssi_result'] 255 ] 256 postprocessed_results[key]['stdev'] = [ 257 x[key][rssi_result['connected_bssid']]['stdev'] 258 for x in rssi_result['rssi_result'] 259 ] 260 elif 'predicted_rssi' in key: 261 postprocessed_results['total_attenuation'] = [ 262 att + rssi_result['fixed_attenuation'] + 263 rssi_result['dut_front_end_loss'] 264 for att in rssi_result['attenuation'] 265 ] 266 postprocessed_results['predicted_rssi'] = [ 267 rssi_result['ap_tx_power'] - att 268 for att in postprocessed_results['total_attenuation'] 269 ] 270 elif 'rssi' in key: 271 postprocessed_results[key]['data'] = [ 272 x for data_point in rssi_result['rssi_result'] 273 for x in data_point[key]['data'] 274 ] 275 postprocessed_results[key]['mean'] = [ 276 x[key]['mean'] for x in rssi_result['rssi_result'] 277 ] 278 postprocessed_results[key]['stdev'] = [ 279 x[key]['stdev'] for x in rssi_result['rssi_result'] 280 ] 281 # Compute RSSI errors 282 for key, val in postprocessed_results.items(): 283 if 'chain' in key: 284 postprocessed_results[key]['error'] = [ 285 postprocessed_results[key]['mean'][idx] + CONST_3dB - 286 postprocessed_results['predicted_rssi'][idx] 287 for idx in range( 288 len(postprocessed_results['predicted_rssi'])) 289 ] 290 elif 'rssi' in key and 'predicted' not in key: 291 postprocessed_results[key]['error'] = [ 292 postprocessed_results[key]['mean'][idx] - 293 postprocessed_results['predicted_rssi'][idx] 294 for idx in range( 295 len(postprocessed_results['predicted_rssi'])) 296 ] 297 return postprocessed_results 298 299 def plot_rssi_vs_attenuation(self, postprocessed_results): 300 """Function to plot RSSI vs attenuation sweeps 301 302 Args: 303 postprocessed_results: compiled arrays of RSSI data. 304 """ 305 figure = wputils.BokehFigure(self.current_test_name, 306 x_label='Attenuation (dB)', 307 primary_y_label='RSSI (dBm)') 308 figure.add_line(postprocessed_results['total_attenuation'], 309 postprocessed_results['signal_poll_rssi']['mean'], 310 'Signal Poll RSSI', 311 marker='circle') 312 figure.add_line(postprocessed_results['total_attenuation'], 313 postprocessed_results['scan_rssi']['mean'], 314 'Scan RSSI', 315 marker='circle') 316 figure.add_line(postprocessed_results['total_attenuation'], 317 postprocessed_results['chain_0_rssi']['mean'], 318 'Chain 0 RSSI', 319 marker='circle') 320 figure.add_line(postprocessed_results['total_attenuation'], 321 postprocessed_results['chain_1_rssi']['mean'], 322 'Chain 1 RSSI', 323 marker='circle') 324 figure.add_line(postprocessed_results['total_attenuation'], 325 postprocessed_results['predicted_rssi'], 326 'Predicted RSSI', 327 marker='circle') 328 329 output_file_path = os.path.join(self.log_path, 330 self.current_test_name + '.html') 331 figure.generate_figure(output_file_path) 332 333 def plot_rssi_vs_time(self, rssi_result, postprocessed_results, 334 center_curves): 335 """Function to plot RSSI vs time. 336 337 Args: 338 rssi_result: dict containing raw RSSI data 339 postprocessed_results: compiled arrays of RSSI data 340 center_curvers: boolean indicating whether to shift curves to align 341 them with predicted RSSIs 342 """ 343 figure = wputils.BokehFigure( 344 self.current_test_name, 345 x_label='Time (s)', 346 primary_y_label=center_curves * 'Centered' + 'RSSI (dBm)', 347 ) 348 349 # yapf: disable 350 rssi_time_series = collections.OrderedDict( 351 [('signal_poll_rssi', []), 352 ('signal_poll_avg_rssi', []), 353 ('scan_rssi', []), 354 ('chain_0_rssi', []), 355 ('chain_1_rssi', []), 356 ('predicted_rssi', [])]) 357 # yapf: enable 358 for key, val in rssi_time_series.items(): 359 if 'predicted_rssi' in key: 360 rssi_time_series[key] = [ 361 x for x in postprocessed_results[key] for copies in range( 362 len(rssi_result['rssi_result'][0]['signal_poll_rssi'] 363 ['data'])) 364 ] 365 elif 'rssi' in key: 366 if center_curves: 367 filtered_error = [ 368 x for x in postprocessed_results[key]['error'] 369 if not math.isnan(x) 370 ] 371 if filtered_error: 372 avg_shift = statistics.mean(filtered_error) 373 else: 374 avg_shift = 0 375 rssi_time_series[key] = [ 376 x - avg_shift 377 for x in postprocessed_results[key]['data'] 378 ] 379 else: 380 rssi_time_series[key] = postprocessed_results[key]['data'] 381 time_vec = [ 382 self.testclass_params['polling_frequency'] * x 383 for x in range(len(rssi_time_series[key])) 384 ] 385 if len(rssi_time_series[key]) > 0: 386 figure.add_line(time_vec, rssi_time_series[key], key) 387 388 output_file_path = os.path.join(self.log_path, 389 self.current_test_name + '.html') 390 figure.generate_figure(output_file_path) 391 392 def plot_rssi_distribution(self, postprocessed_results): 393 """Function to plot RSSI distributions. 394 395 Args: 396 postprocessed_results: compiled arrays of RSSI data 397 """ 398 monitored_rssis = ['signal_poll_rssi', 'chain_0_rssi', 'chain_1_rssi'] 399 400 rssi_dist = collections.OrderedDict() 401 for rssi_key in monitored_rssis: 402 rssi_data = postprocessed_results[rssi_key] 403 rssi_dist[rssi_key] = collections.OrderedDict() 404 unique_rssi = sorted(set(rssi_data['data'])) 405 rssi_counts = [] 406 for value in unique_rssi: 407 rssi_counts.append(rssi_data['data'].count(value)) 408 total_count = sum(rssi_counts) 409 rssi_dist[rssi_key]['rssi_values'] = unique_rssi 410 rssi_dist[rssi_key]['rssi_pdf'] = [ 411 x / total_count for x in rssi_counts 412 ] 413 rssi_dist[rssi_key]['rssi_cdf'] = [] 414 cum_prob = 0 415 for prob in rssi_dist[rssi_key]['rssi_pdf']: 416 cum_prob += prob 417 rssi_dist[rssi_key]['rssi_cdf'].append(cum_prob) 418 419 figure = wputils.BokehFigure(self.current_test_name, 420 x_label='RSSI (dBm)', 421 primary_y_label='p(RSSI = x)', 422 secondary_y_label='p(RSSI <= x)') 423 for rssi_key, rssi_data in rssi_dist.items(): 424 figure.add_line(x_data=rssi_data['rssi_values'], 425 y_data=rssi_data['rssi_pdf'], 426 legend='{} PDF'.format(rssi_key), 427 y_axis='default') 428 figure.add_line(x_data=rssi_data['rssi_values'], 429 y_data=rssi_data['rssi_cdf'], 430 legend='{} CDF'.format(rssi_key), 431 y_axis='secondary') 432 output_file_path = os.path.join(self.log_path, 433 self.current_test_name + '_dist.html') 434 figure.generate_figure(output_file_path) 435 436 def run_rssi_test(self, testcase_params): 437 """Test function to run RSSI tests. 438 439 The function runs an RSSI test in the current device/AP configuration. 440 Function is called from another wrapper function that sets up the 441 testbed for the RvR test 442 443 Args: 444 testcase_params: dict containing test-specific parameters 445 Returns: 446 rssi_result: dict containing rssi_result and meta data 447 """ 448 # Run test and log result 449 rssi_result = collections.OrderedDict() 450 rssi_result['test_name'] = self.current_test_name 451 rssi_result['testcase_params'] = testcase_params 452 rssi_result['ap_settings'] = self.access_point.ap_settings.copy() 453 rssi_result['attenuation'] = list(testcase_params['rssi_atten_range']) 454 rssi_result['connected_bssid'] = self.main_network[ 455 testcase_params['band']].get('BSSID', '00:00:00:00') 456 channel_mode_combo = '{}_{}'.format(str(testcase_params['channel']), 457 testcase_params['mode']) 458 channel_str = str(testcase_params['channel']) 459 if channel_mode_combo in self.testbed_params['ap_tx_power']: 460 rssi_result['ap_tx_power'] = self.testbed_params['ap_tx_power'][ 461 channel_mode_combo] 462 else: 463 rssi_result['ap_tx_power'] = self.testbed_params['ap_tx_power'][ 464 str(testcase_params['channel'])] 465 rssi_result['fixed_attenuation'] = self.testbed_params[ 466 'fixed_attenuation'][channel_str] 467 rssi_result['dut_front_end_loss'] = self.testbed_params[ 468 'dut_front_end_loss'][channel_str] 469 470 self.log.info('Start running RSSI test.') 471 rssi_result['rssi_result'] = [] 472 rssi_result['llstats'] = [] 473 llstats_obj = wputils.LinkLayerStats(self.dut) 474 # Start iperf traffic if required by test 475 if testcase_params['active_traffic'] and testcase_params[ 476 'traffic_type'] == 'iperf': 477 self.iperf_server.start(tag=0) 478 if isinstance(self.iperf_server, ipf.IPerfServerOverAdb): 479 iperf_server_address = self.dut_ip 480 else: 481 iperf_server_address = wputils.get_server_address( 482 self.remote_server, self.dut_ip, '255.255.255.0') 483 executor = ThreadPoolExecutor(max_workers=1) 484 thread_future = executor.submit( 485 self.iperf_client.start, iperf_server_address, 486 testcase_params['iperf_args'], 0, 487 testcase_params['traffic_timeout'] + SHORT_SLEEP) 488 executor.shutdown(wait=False) 489 elif testcase_params['active_traffic'] and testcase_params[ 490 'traffic_type'] == 'ping': 491 thread_future = wputils.get_ping_stats_nb( 492 self.remote_server, self.dut_ip, 493 testcase_params['traffic_timeout'], 0.02, 64) 494 for atten in testcase_params['rssi_atten_range']: 495 # Set Attenuation 496 self.log.info('Setting attenuation to {} dB'.format(atten)) 497 for attenuator in self.attenuators: 498 attenuator.set_atten(atten) 499 llstats_obj.update_stats() 500 current_rssi = collections.OrderedDict() 501 current_rssi = wputils.get_connected_rssi( 502 self.dut, testcase_params['connected_measurements'], 503 self.testclass_params['polling_frequency'], 504 testcase_params['first_measurement_delay']) 505 current_rssi['scan_rssi'] = wputils.get_scan_rssi( 506 self.dut, testcase_params['tracked_bssid'], 507 testcase_params['scan_measurements']) 508 rssi_result['rssi_result'].append(current_rssi) 509 llstats_obj.update_stats() 510 curr_llstats = llstats_obj.llstats_incremental.copy() 511 rssi_result['llstats'].append(curr_llstats) 512 self.log.info( 513 'Connected RSSI at {0:.2f} dB is {1:.2f} [{2:.2f}, {3:.2f}] dB' 514 .format(atten, current_rssi['signal_poll_rssi']['mean'], 515 current_rssi['chain_0_rssi']['mean'], 516 current_rssi['chain_1_rssi']['mean'])) 517 # Stop iperf traffic if needed 518 for attenuator in self.attenuators: 519 attenuator.set_atten(0) 520 if testcase_params['active_traffic']: 521 thread_future.result() 522 if testcase_params['traffic_type'] == 'iperf': 523 self.iperf_server.stop() 524 return rssi_result 525 526 def setup_ap(self, testcase_params): 527 """Function that gets devices ready for the test. 528 529 Args: 530 testcase_params: dict containing test-specific parameters 531 """ 532 if '2G' in testcase_params['band']: 533 frequency = wutils.WifiEnums.channel_2G_to_freq[ 534 testcase_params['channel']] 535 else: 536 frequency = wutils.WifiEnums.channel_5G_to_freq[ 537 testcase_params['channel']] 538 if frequency in wutils.WifiEnums.DFS_5G_FREQUENCIES: 539 self.access_point.set_region(self.testbed_params['DFS_region']) 540 else: 541 self.access_point.set_region(self.testbed_params['default_region']) 542 self.access_point.set_channel(testcase_params['band'], 543 testcase_params['channel']) 544 self.access_point.set_bandwidth(testcase_params['band'], 545 testcase_params['mode']) 546 self.log.info('Access Point Configuration: {}'.format( 547 self.access_point.ap_settings)) 548 549 def setup_dut(self, testcase_params): 550 """Sets up the DUT in the configuration required by the test.""" 551 # Check battery level before test 552 if not wputils.health_check(self.dut, 10): 553 asserts.skip('Battery level too low. Skipping test.') 554 # Turn screen off to preserve battery 555 self.dut.go_to_sleep() 556 if wputils.validate_network(self.dut, 557 testcase_params['test_network']['SSID']): 558 self.log.info('Already connected to desired network') 559 else: 560 wutils.wifi_toggle_state(self.dut, True) 561 wutils.reset_wifi(self.dut) 562 self.main_network[testcase_params['band']][ 563 'channel'] = testcase_params['channel'] 564 wutils.set_wifi_country_code(self.dut, 565 self.testclass_params['country_code']) 566 wutils.wifi_connect(self.dut, 567 self.main_network[testcase_params['band']], 568 num_of_tries=5) 569 self.dut_ip = self.dut.droid.connectivityGetIPv4Addresses('wlan0')[0] 570 571 def setup_rssi_test(self, testcase_params): 572 """Main function to test RSSI. 573 574 The function sets up the AP in the correct channel and mode 575 configuration and called rssi_test to sweep attenuation and measure 576 RSSI 577 578 Args: 579 testcase_params: dict containing test-specific parameters 580 Returns: 581 rssi_result: dict containing rssi_results and meta data 582 """ 583 # Configure AP 584 self.setup_ap(testcase_params) 585 # Initialize attenuators 586 for attenuator in self.attenuators: 587 attenuator.set_atten(testcase_params['rssi_atten_range'][0]) 588 # Connect DUT to Network 589 self.setup_dut(testcase_params) 590 591 def get_traffic_timeout(self, testcase_params): 592 """Function to comput iperf session length required in RSSI test. 593 594 Args: 595 testcase_params: dict containing test-specific parameters 596 Returns: 597 traffic_timeout: length of iperf session required in rssi test 598 """ 599 atten_step_duration = testcase_params['first_measurement_delay'] + ( 600 testcase_params['connected_measurements'] * 601 self.testclass_params['polling_frequency'] 602 ) + testcase_params['scan_measurements'] * MED_SLEEP 603 timeout = len(testcase_params['rssi_atten_range'] 604 ) * atten_step_duration + MED_SLEEP 605 return timeout 606 607 def compile_rssi_vs_atten_test_params(self, testcase_params): 608 """Function to complete compiling test-specific parameters 609 610 Args: 611 testcase_params: dict containing test-specific parameters 612 """ 613 testcase_params.update( 614 connected_measurements=self. 615 testclass_params['rssi_vs_atten_connected_measurements'], 616 scan_measurements=self. 617 testclass_params['rssi_vs_atten_scan_measurements'], 618 first_measurement_delay=MED_SLEEP, 619 rssi_under_test=self.testclass_params['rssi_vs_atten_metrics'], 620 absolute_accuracy=1) 621 622 testcase_params['band'] = self.access_point.band_lookup_by_channel( 623 testcase_params['channel']) 624 testcase_params['test_network'] = self.main_network[ 625 testcase_params['band']] 626 testcase_params['tracked_bssid'] = [ 627 self.main_network[testcase_params['band']].get( 628 'BSSID', '00:00:00:00') 629 ] 630 631 num_atten_steps = int((self.testclass_params['rssi_vs_atten_stop'] - 632 self.testclass_params['rssi_vs_atten_start']) / 633 self.testclass_params['rssi_vs_atten_step']) 634 testcase_params['rssi_atten_range'] = [ 635 self.testclass_params['rssi_vs_atten_start'] + 636 x * self.testclass_params['rssi_vs_atten_step'] 637 for x in range(0, num_atten_steps) 638 ] 639 testcase_params['traffic_timeout'] = self.get_traffic_timeout( 640 testcase_params) 641 642 if isinstance(self.iperf_server, ipf.IPerfServerOverAdb): 643 testcase_params['iperf_args'] = '-i 1 -t {} -J'.format( 644 testcase_params['traffic_timeout']) 645 else: 646 testcase_params['iperf_args'] = '-i 1 -t {} -J -R'.format( 647 testcase_params['traffic_timeout']) 648 return testcase_params 649 650 def compile_rssi_stability_test_params(self, testcase_params): 651 """Function to complete compiling test-specific parameters 652 653 Args: 654 testcase_params: dict containing test-specific parameters 655 """ 656 testcase_params.update( 657 connected_measurements=int( 658 self.testclass_params['rssi_stability_duration'] / 659 self.testclass_params['polling_frequency']), 660 scan_measurements=0, 661 first_measurement_delay=MED_SLEEP, 662 rssi_atten_range=self.testclass_params['rssi_stability_atten']) 663 testcase_params['band'] = self.access_point.band_lookup_by_channel( 664 testcase_params['channel']) 665 testcase_params['test_network'] = self.main_network[ 666 testcase_params['band']] 667 testcase_params['tracked_bssid'] = [ 668 self.main_network[testcase_params['band']].get( 669 'BSSID', '00:00:00:00') 670 ] 671 672 testcase_params['traffic_timeout'] = self.get_traffic_timeout( 673 testcase_params) 674 if isinstance(self.iperf_server, ipf.IPerfServerOverAdb): 675 testcase_params['iperf_args'] = '-i 1 -t {} -J'.format( 676 testcase_params['traffic_timeout']) 677 else: 678 testcase_params['iperf_args'] = '-i 1 -t {} -J -R'.format( 679 testcase_params['traffic_timeout']) 680 return testcase_params 681 682 def compile_rssi_tracking_test_params(self, testcase_params): 683 """Function to complete compiling test-specific parameters 684 685 Args: 686 testcase_params: dict containing test-specific parameters 687 """ 688 testcase_params.update(connected_measurements=int( 689 1 / self.testclass_params['polling_frequency']), 690 scan_measurements=0, 691 first_measurement_delay=0, 692 rssi_under_test=['signal_poll_rssi'], 693 absolute_accuracy=0) 694 testcase_params['band'] = self.access_point.band_lookup_by_channel( 695 testcase_params['channel']) 696 testcase_params['test_network'] = self.main_network[ 697 testcase_params['band']] 698 testcase_params['tracked_bssid'] = [ 699 self.main_network[testcase_params['band']].get( 700 'BSSID', '00:00:00:00') 701 ] 702 703 rssi_atten_range = [] 704 for waveform in self.testclass_params['rssi_tracking_waveforms']: 705 waveform_vector = [] 706 for section in range(len(waveform['atten_levels']) - 1): 707 section_limits = waveform['atten_levels'][section:section + 2] 708 up_down = (1 - 2 * (section_limits[1] < section_limits[0])) 709 temp_section = list( 710 range(section_limits[0], section_limits[1] + up_down, 711 up_down * waveform['step_size'])) 712 temp_section = [ 713 temp_section[idx] for idx in range(len(temp_section)) 714 for n in range(waveform['step_duration']) 715 ] 716 waveform_vector += temp_section 717 waveform_vector = waveform_vector * waveform['repetitions'] 718 rssi_atten_range = rssi_atten_range + waveform_vector 719 testcase_params['rssi_atten_range'] = rssi_atten_range 720 testcase_params['traffic_timeout'] = self.get_traffic_timeout( 721 testcase_params) 722 723 if isinstance(self.iperf_server, ipf.IPerfServerOverAdb): 724 testcase_params['iperf_args'] = '-i 1 -t {} -J'.format( 725 testcase_params['traffic_timeout']) 726 else: 727 testcase_params['iperf_args'] = '-i 1 -t {} -J -R'.format( 728 testcase_params['traffic_timeout']) 729 return testcase_params 730 731 def _test_rssi_vs_atten(self, testcase_params): 732 """Function that gets called for each test case of rssi_vs_atten 733 734 The function gets called in each rssi test case. The function 735 customizes the test based on the test name of the test that called it 736 737 Args: 738 testcase_params: dict containing test-specific parameters 739 """ 740 testcase_params = self.compile_rssi_vs_atten_test_params( 741 testcase_params) 742 743 self.setup_rssi_test(testcase_params) 744 rssi_result = self.run_rssi_test(testcase_params) 745 rssi_result['postprocessed_results'] = self.post_process_rssi_sweep( 746 rssi_result) 747 self.testclass_results.append(rssi_result) 748 self.plot_rssi_vs_attenuation(rssi_result['postprocessed_results']) 749 self.pass_fail_check_rssi_accuracy( 750 testcase_params, rssi_result['postprocessed_results']) 751 752 def _test_rssi_stability(self, testcase_params): 753 """ Function that gets called for each test case of rssi_stability 754 755 The function gets called in each stability test case. The function 756 customizes test based on the test name of the test that called it 757 """ 758 testcase_params = self.compile_rssi_stability_test_params( 759 testcase_params) 760 761 self.setup_rssi_test(testcase_params) 762 rssi_result = self.run_rssi_test(testcase_params) 763 rssi_result['postprocessed_results'] = self.post_process_rssi_sweep( 764 rssi_result) 765 self.testclass_results.append(rssi_result) 766 self.plot_rssi_vs_time(rssi_result, 767 rssi_result['postprocessed_results'], 1) 768 self.plot_rssi_distribution(rssi_result['postprocessed_results']) 769 self.pass_fail_check_rssi_stability( 770 testcase_params, rssi_result['postprocessed_results']) 771 772 def _test_rssi_tracking(self, testcase_params): 773 """ Function that gets called for each test case of rssi_tracking 774 775 The function gets called in each rssi test case. The function 776 customizes the test based on the test name of the test that called it 777 """ 778 testcase_params = self.compile_rssi_tracking_test_params( 779 testcase_params) 780 781 self.setup_rssi_test(testcase_params) 782 rssi_result = self.run_rssi_test(testcase_params) 783 rssi_result['postprocessed_results'] = self.post_process_rssi_sweep( 784 rssi_result) 785 self.testclass_results.append(rssi_result) 786 self.plot_rssi_vs_time(rssi_result, 787 rssi_result['postprocessed_results'], 1) 788 self.pass_fail_check_rssi_accuracy( 789 testcase_params, rssi_result['postprocessed_results']) 790 791 def generate_test_cases(self, test_types, channels, modes, traffic_modes): 792 """Function that auto-generates test cases for a test class.""" 793 test_cases = [] 794 allowed_configs = { 795 'VHT20': [ 796 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 36, 40, 44, 48, 149, 153, 797 157, 161 798 ], 799 'VHT40': [36, 44, 149, 157], 800 'VHT80': [36, 149] 801 } 802 803 for channel, mode, traffic_mode, test_type in itertools.product( 804 channels, modes, traffic_modes, test_types): 805 if channel not in allowed_configs[mode]: 806 continue 807 test_name = test_type + '_ch{}_{}_{}'.format( 808 channel, mode, traffic_mode) 809 testcase_params = collections.OrderedDict( 810 channel=channel, 811 mode=mode, 812 active_traffic=(traffic_mode == 'ActiveTraffic'), 813 traffic_type=self.user_params['rssi_test_params'] 814 ['traffic_type'], 815 ) 816 test_function = getattr(self, '_{}'.format(test_type)) 817 setattr(self, test_name, partial(test_function, testcase_params)) 818 test_cases.append(test_name) 819 return test_cases 820 821 822class WifiRssi_2GHz_ActiveTraffic_Test(WifiRssiTest): 823 def __init__(self, controllers): 824 super().__init__(controllers) 825 self.tests = self.generate_test_cases( 826 ['test_rssi_stability', 'test_rssi_vs_atten'], [1, 2, 6, 10, 11], 827 ['VHT20'], ['ActiveTraffic']) 828 829 830class WifiRssi_5GHz_ActiveTraffic_Test(WifiRssiTest): 831 def __init__(self, controllers): 832 super().__init__(controllers) 833 self.tests = self.generate_test_cases( 834 ['test_rssi_stability', 'test_rssi_vs_atten'], 835 [36, 40, 44, 48, 149, 153, 157, 161], ['VHT20', 'VHT40', 'VHT80'], 836 ['ActiveTraffic']) 837 838 839class WifiRssi_AllChannels_ActiveTraffic_Test(WifiRssiTest): 840 def __init__(self, controllers): 841 super().__init__(controllers) 842 self.tests = self.generate_test_cases( 843 ['test_rssi_stability', 'test_rssi_vs_atten'], 844 [1, 6, 11, 36, 40, 44, 48, 149, 153, 157, 161], 845 ['VHT20', 'VHT40', 'VHT80'], ['ActiveTraffic']) 846 847 848class WifiRssi_SampleChannels_NoTraffic_Test(WifiRssiTest): 849 def __init__(self, controllers): 850 super().__init__(controllers) 851 self.tests = self.generate_test_cases( 852 ['test_rssi_stability', 'test_rssi_vs_atten'], [6, 36, 149], 853 ['VHT20', 'VHT40', 'VHT80'], ['NoTraffic']) 854 855 856class WifiRssiTrackingTest(WifiRssiTest): 857 def __init__(self, controllers): 858 super().__init__(controllers) 859 self.tests = self.generate_test_cases(['test_rssi_tracking'], 860 [6, 36, 149], 861 ['VHT20', 'VHT40', 'VHT80'], 862 ['ActiveTraffic', 'NoTraffic']) 863 864 865# Over-the air version of RSSI tests 866class WifiOtaRssiTest(WifiRssiTest): 867 """Class to test over-the-air rssi tests. 868 869 This class implements measures WiFi RSSI tests in an OTA chamber. 870 It allows setting orientation and other chamber parameters to study 871 performance in varying channel conditions 872 """ 873 def __init__(self, controllers): 874 base_test.BaseTestClass.__init__(self, controllers) 875 self.testcase_metric_logger = ( 876 BlackboxMappedMetricLogger.for_test_case()) 877 self.testclass_metric_logger = ( 878 BlackboxMappedMetricLogger.for_test_class()) 879 self.publish_test_metrics = False 880 881 def setup_class(self): 882 WifiRssiTest.setup_class(self) 883 self.ota_chamber = ota_chamber.create( 884 self.user_params['OTAChamber'])[0] 885 886 def teardown_class(self): 887 self.ota_chamber.reset_chamber() 888 self.process_testclass_results() 889 890 def teardown_test(self): 891 if self.ota_chamber.current_mode == 'continuous': 892 self.ota_chamber.reset_chamber() 893 894 def extract_test_id(self, testcase_params, id_fields): 895 test_id = collections.OrderedDict( 896 (param, testcase_params[param]) for param in id_fields) 897 return test_id 898 899 def process_testclass_results(self): 900 """Saves all test results to enable comparison.""" 901 testclass_data = collections.OrderedDict() 902 for test_result in self.testclass_results: 903 current_params = test_result['testcase_params'] 904 905 channel = current_params['channel'] 906 channel_data = testclass_data.setdefault( 907 channel, 908 collections.OrderedDict(orientation=[], 909 rssi=collections.OrderedDict( 910 signal_poll_rssi=[], 911 chain_0_rssi=[], 912 chain_1_rssi=[]))) 913 914 channel_data['orientation'].append(current_params['orientation']) 915 channel_data['rssi']['signal_poll_rssi'].append( 916 test_result['postprocessed_results']['signal_poll_rssi'] 917 ['mean'][0]) 918 channel_data['rssi']['chain_0_rssi'].append( 919 test_result['postprocessed_results']['chain_0_rssi']['mean'] 920 [0]) 921 channel_data['rssi']['chain_1_rssi'].append( 922 test_result['postprocessed_results']['chain_1_rssi']['mean'] 923 [0]) 924 925 chamber_mode = self.testclass_results[0]['testcase_params'][ 926 'chamber_mode'] 927 if chamber_mode == 'orientation': 928 x_label = 'Angle (deg)' 929 elif chamber_mode == 'stepped stirrers': 930 x_label = 'Position Index' 931 932 # Publish test class metrics 933 for channel, channel_data in testclass_data.items(): 934 for rssi_metric, rssi_metric_value in channel_data['rssi'].items(): 935 metric_name = 'ota_summary_ch{}.avg_{}'.format( 936 channel, rssi_metric) 937 metric_value = numpy.mean(rssi_metric_value) 938 self.testclass_metric_logger.add_metric( 939 metric_name, metric_value) 940 941 # Plot test class results 942 plots = [] 943 for channel, channel_data in testclass_data.items(): 944 current_plot = wputils.BokehFigure( 945 title='Channel {} - Rssi vs. Position'.format(channel), 946 x_label=x_label, 947 primary_y_label='RSSI (dBm)', 948 ) 949 for rssi_metric, rssi_metric_value in channel_data['rssi'].items(): 950 legend = rssi_metric 951 current_plot.add_line(channel_data['orientation'], 952 rssi_metric_value, legend) 953 current_plot.generate_figure() 954 plots.append(current_plot) 955 current_context = context.get_current_context().get_full_output_path() 956 plot_file_path = os.path.join(current_context, 'results.html') 957 wputils.BokehFigure.save_figures(plots, plot_file_path) 958 959 def setup_rssi_test(self, testcase_params): 960 # Test setup 961 WifiRssiTest.setup_rssi_test(self, testcase_params) 962 if testcase_params['chamber_mode'] == 'StirrersOn': 963 self.ota_chamber.start_continuous_stirrers() 964 else: 965 self.ota_chamber.set_orientation(testcase_params['orientation']) 966 967 def compile_ota_rssi_test_params(self, testcase_params): 968 """Function to complete compiling test-specific parameters 969 970 Args: 971 testcase_params: dict containing test-specific parameters 972 """ 973 if "rssi_over_orientation" in self.test_name: 974 rssi_test_duration = self.testclass_params[ 975 'rssi_over_orientation_duration'] 976 elif "rssi_variation" in self.test_name: 977 rssi_test_duration = self.testclass_params[ 978 'rssi_variation_duration'] 979 980 testcase_params.update( 981 connected_measurements=int( 982 rssi_test_duration / 983 self.testclass_params['polling_frequency']), 984 scan_measurements=0, 985 first_measurement_delay=MED_SLEEP, 986 rssi_atten_range=[ 987 self.testclass_params['rssi_ota_test_attenuation'] 988 ]) 989 testcase_params['band'] = self.access_point.band_lookup_by_channel( 990 testcase_params['channel']) 991 testcase_params['test_network'] = self.main_network[ 992 testcase_params['band']] 993 testcase_params['tracked_bssid'] = [ 994 self.main_network[testcase_params['band']].get( 995 'BSSID', '00:00:00:00') 996 ] 997 998 testcase_params['traffic_timeout'] = self.get_traffic_timeout( 999 testcase_params) 1000 if isinstance(self.iperf_server, ipf.IPerfServerOverAdb): 1001 testcase_params['iperf_args'] = '-i 1 -t {} -J'.format( 1002 testcase_params['traffic_timeout']) 1003 else: 1004 testcase_params['iperf_args'] = '-i 1 -t {} -J -R'.format( 1005 testcase_params['traffic_timeout']) 1006 return testcase_params 1007 1008 def _test_ota_rssi(self, testcase_params): 1009 testcase_params = self.compile_ota_rssi_test_params(testcase_params) 1010 1011 self.setup_rssi_test(testcase_params) 1012 rssi_result = self.run_rssi_test(testcase_params) 1013 rssi_result['postprocessed_results'] = self.post_process_rssi_sweep( 1014 rssi_result) 1015 self.testclass_results.append(rssi_result) 1016 self.plot_rssi_vs_time(rssi_result, 1017 rssi_result['postprocessed_results'], 1) 1018 self.plot_rssi_distribution(rssi_result['postprocessed_results']) 1019 1020 def generate_test_cases(self, test_types, channels, modes, traffic_modes, 1021 chamber_modes, orientations): 1022 test_cases = [] 1023 allowed_configs = { 1024 'VHT20': [ 1025 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 36, 40, 44, 48, 149, 153, 1026 157, 161 1027 ], 1028 'VHT40': [36, 44, 149, 157], 1029 'VHT80': [36, 149] 1030 } 1031 1032 for (channel, mode, traffic, chamber_mode, orientation, 1033 test_type) in itertools.product(channels, modes, traffic_modes, 1034 chamber_modes, orientations, 1035 test_types): 1036 if channel not in allowed_configs[mode]: 1037 continue 1038 test_name = test_type + '_ch{}_{}_{}_{}deg'.format( 1039 channel, mode, traffic, orientation) 1040 testcase_params = collections.OrderedDict( 1041 channel=channel, 1042 mode=mode, 1043 active_traffic=(traffic == 'ActiveTraffic'), 1044 traffic_type=self.user_params['rssi_test_params'] 1045 ['traffic_type'], 1046 chamber_mode=chamber_mode, 1047 orientation=orientation) 1048 test_function = self._test_ota_rssi 1049 setattr(self, test_name, partial(test_function, testcase_params)) 1050 test_cases.append(test_name) 1051 return test_cases 1052 1053 1054class WifiOtaRssi_Accuracy_Test(WifiOtaRssiTest): 1055 def __init__(self, controllers): 1056 super().__init__(controllers) 1057 self.tests = self.generate_test_cases(['test_rssi_vs_atten'], 1058 [6, 36, 149], ['VHT20'], 1059 ['ActiveTraffic'], 1060 ['orientation'], 1061 list(range(0, 360, 45))) 1062 1063 1064class WifiOtaRssi_StirrerVariation_Test(WifiOtaRssiTest): 1065 def __init__(self, controllers): 1066 WifiRssiTest.__init__(self, controllers) 1067 self.tests = self.generate_test_cases(['test_rssi_variation'], 1068 [6, 36, 149], ['VHT20'], 1069 ['ActiveTraffic'], 1070 ['StirrersOn'], [0]) 1071 1072 1073class WifiOtaRssi_TenDegree_Test(WifiOtaRssiTest): 1074 def __init__(self, controllers): 1075 WifiRssiTest.__init__(self, controllers) 1076 self.tests = self.generate_test_cases(['test_rssi_over_orientation'], 1077 [6, 36, 149], ['VHT20'], 1078 ['ActiveTraffic'], 1079 ['orientation'], 1080 list(range(0, 360, 10))) 1081