1#!/usr/bin/env python3.4
2#
3#   Copyright 2017 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the 'License');
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an 'AS IS' BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import collections
18import csv
19import itertools
20import json
21import logging
22import numpy
23import os
24from acts import asserts
25from acts import context
26from acts import base_test
27from acts import utils
28from acts.controllers import iperf_client
29from acts.controllers.utils_lib import ssh
30from acts.metrics.loggers.blackbox import BlackboxMappedMetricLogger
31from acts.test_utils.wifi import ota_chamber
32from acts.test_utils.wifi import wifi_performance_test_utils as wputils
33from acts.test_utils.wifi import wifi_test_utils as wutils
34from acts.test_utils.wifi import wifi_retail_ap as retail_ap
35from functools import partial
36from WifiRvrTest import WifiRvrTest
37from WifiPingTest import WifiPingTest
38
39
40class WifiSensitivityTest(WifiRvrTest, WifiPingTest):
41    """Class to test WiFi sensitivity tests.
42
43    This class implements measures WiFi sensitivity per rate. It heavily
44    leverages the WifiRvrTest class and introduced minor differences to set
45    specific rates and the access point, and implements a different pass/fail
46    check. For an example config file to run this test class see
47    example_connectivity_performance_ap_sta.json.
48    """
49
50    RSSI_POLL_INTERVAL = 0.2
51    VALID_TEST_CONFIGS = {
52        1: ['legacy', 'VHT20'],
53        2: ['legacy', 'VHT20'],
54        6: ['legacy', 'VHT20'],
55        10: ['legacy', 'VHT20'],
56        11: ['legacy', 'VHT20'],
57        36: ['legacy', 'VHT20', 'VHT40', 'VHT80'],
58        40: ['legacy', 'VHT20'],
59        44: ['legacy', 'VHT20'],
60        48: ['legacy', 'VHT20'],
61        149: ['legacy', 'VHT20', 'VHT40', 'VHT80'],
62        153: ['legacy', 'VHT20'],
63        157: ['legacy', 'VHT20'],
64        161: ['legacy', 'VHT20']
65    }
66    RateTuple = collections.namedtuple(('RateTuple'),
67                                       ['mcs', 'streams', 'data_rate'])
68    #yapf:disable
69    VALID_RATES = {
70        'legacy_2GHz': [
71            RateTuple(54, 1, 54), RateTuple(48, 1, 48),
72            RateTuple(36, 1, 36), RateTuple(24, 1, 24),
73            RateTuple(18, 1, 18), RateTuple(12, 1, 12),
74            RateTuple(11, 1, 11), RateTuple(9, 1, 9),
75            RateTuple(6, 1, 6), RateTuple(5.5, 1, 5.5),
76            RateTuple(2, 1, 2), RateTuple(1, 1, 1)],
77        'legacy_5GHz': [
78            RateTuple(54, 1, 54), RateTuple(48, 1, 48),
79            RateTuple(36, 1, 36), RateTuple(24, 1, 24),
80            RateTuple(18, 1, 18), RateTuple(12, 1, 12),
81            RateTuple(9, 1, 9), RateTuple(6, 1, 6)],
82        'HT20': [
83            RateTuple(7, 1, 72.2), RateTuple(6, 1, 65),
84            RateTuple(5, 1, 57.8), RateTuple(4, 1, 43.3),
85            RateTuple(3, 1, 26), RateTuple(2, 1, 21.7),
86            RateTuple(1, 1, 14.4), RateTuple(0, 1, 7.2),
87            RateTuple(15, 2, 144.4), RateTuple(14, 2, 130),
88            RateTuple(13, 2, 115.6), RateTuple(12, 2, 86.7),
89            RateTuple(11, 2, 57.8), RateTuple(10, 2, 43.4),
90            RateTuple(9, 2, 28.9), RateTuple(8, 2, 14.4)],
91        'VHT20': [
92            RateTuple(9, 1, 96), RateTuple(8, 1, 86.7),
93            RateTuple(7, 1, 72.2), RateTuple(6, 1, 65),
94            RateTuple(5, 1, 57.8), RateTuple(4, 1, 43.3),
95            RateTuple(3, 1, 28.9), RateTuple(2, 1, 21.7),
96            RateTuple(1, 1, 14.4), RateTuple(0, 1, 7.2),
97            RateTuple(9, 2, 192), RateTuple(8, 2, 173.3),
98            RateTuple(7, 2, 144.4), RateTuple(6, 2, 130.3),
99            RateTuple(5, 2, 115.6), RateTuple(4, 2, 86.7),
100            RateTuple(3, 2, 57.8), RateTuple(2, 2, 43.3),
101            RateTuple(1, 2, 28.9), RateTuple(0, 2, 14.4)],
102        'VHT40': [
103            RateTuple(9, 1, 96), RateTuple(8, 1, 86.7),
104            RateTuple(7, 1, 72.2), RateTuple(6, 1, 65),
105            RateTuple(5, 1, 57.8), RateTuple(4, 1, 43.3),
106            RateTuple(3, 1, 28.9), RateTuple(2, 1, 21.7),
107            RateTuple(1, 1, 14.4), RateTuple(0, 1, 7.2),
108            RateTuple(9, 2, 192), RateTuple(8, 2, 173.3),
109            RateTuple(7, 2, 144.4), RateTuple(6, 2, 130.3),
110            RateTuple(5, 2, 115.6), RateTuple(4, 2, 86.7),
111            RateTuple(3, 2, 57.8), RateTuple(2, 2, 43.3),
112            RateTuple(1, 2, 28.9), RateTuple(0, 2, 14.4)],
113        'VHT80': [
114            RateTuple(9, 1, 96), RateTuple(8, 1, 86.7),
115            RateTuple(7, 1, 72.2), RateTuple(6, 1, 65),
116            RateTuple(5, 1, 57.8), RateTuple(4, 1, 43.3),
117            RateTuple(3, 1, 28.9), RateTuple(2, 1, 21.7),
118            RateTuple(1, 1, 14.4), RateTuple(0, 1, 7.2),
119            RateTuple(9, 2, 192), RateTuple(8, 2, 173.3),
120            RateTuple(7, 2, 144.4), RateTuple(6, 2, 130.3),
121            RateTuple(5, 2, 115.6), RateTuple(4, 2, 86.7),
122            RateTuple(3, 2, 57.8), RateTuple(2, 2, 43.3),
123            RateTuple(1, 2, 28.9), RateTuple(0, 2, 14.4)],
124    }
125    #yapf:enable
126
127    def __init__(self, controllers):
128        base_test.BaseTestClass.__init__(self, controllers)
129        self.testcase_metric_logger = (
130            BlackboxMappedMetricLogger.for_test_case())
131        self.testclass_metric_logger = (
132            BlackboxMappedMetricLogger.for_test_class())
133        self.publish_testcase_metrics = True
134
135    def setup_class(self):
136        """Initializes common test hardware and parameters.
137
138        This function initializes hardwares and compiles parameters that are
139        common to all tests in this class.
140        """
141        self.dut = self.android_devices[-1]
142        req_params = [
143            'RetailAccessPoints', 'sensitivity_test_params', 'testbed_params',
144            'RemoteServer'
145        ]
146        opt_params = ['main_network', 'golden_files_list']
147        self.unpack_userparams(req_params, opt_params)
148        self.testclass_params = self.sensitivity_test_params
149        self.num_atten = self.attenuators[0].instrument.num_atten
150        self.ping_server = ssh.connection.SshConnection(
151            ssh.settings.from_config(self.RemoteServer[0]['ssh_config']))
152        self.iperf_server = self.iperf_servers[0]
153        self.iperf_client = self.iperf_clients[0]
154        self.access_point = retail_ap.create(self.RetailAccessPoints)[0]
155        self.log.info('Access Point Configuration: {}'.format(
156            self.access_point.ap_settings))
157        self.log_path = os.path.join(logging.log_path, 'results')
158        os.makedirs(self.log_path, exist_ok=True)
159        if not hasattr(self, 'golden_files_list'):
160            self.golden_files_list = [
161                os.path.join(self.testbed_params['golden_results_path'], file)
162                for file in os.listdir(
163                    self.testbed_params['golden_results_path'])
164            ]
165        if hasattr(self, 'bdf'):
166            self.log.info('Pushing WiFi BDF to DUT.')
167            wputils.push_bdf(self.dut, self.bdf)
168        if hasattr(self, 'firmware'):
169            self.log.info('Pushing WiFi firmware to DUT.')
170            wlanmdsp = [
171                file for file in self.firmware if "wlanmdsp.mbn" in file
172            ][0]
173            data_msc = [file for file in self.firmware
174                        if "Data.msc" in file][0]
175            wputils.push_firmware(self.dut, wlanmdsp, data_msc)
176        self.atten_dut_chain_map = {}
177        self.testclass_results = []
178
179        # Turn WiFi ON
180        if self.testclass_params.get('airplane_mode', 1):
181            self.log.info('Turning on airplane mode.')
182            asserts.assert_true(utils.force_airplane_mode(self.dut, True),
183                                "Can not turn on airplane mode.")
184        wutils.wifi_toggle_state(self.dut, True)
185
186    def teardown_class(self):
187        # Turn WiFi OFF
188        for dev in self.android_devices:
189            wutils.wifi_toggle_state(dev, False)
190        self.process_testclass_results()
191
192    def pass_fail_check(self, result):
193        """Checks sensitivity against golden results and decides on pass/fail.
194
195        Args:
196            result: dict containing attenuation, throughput and other meta
197                data
198        """
199        try:
200            golden_path = next(file_name
201                               for file_name in self.golden_files_list
202                               if 'sensitivity_targets' in file_name)
203            with open(golden_path, 'r') as golden_file:
204                golden_results = json.load(golden_file)
205            golden_sensitivity = golden_results[
206                self.current_test_name]['sensitivity']
207        except:
208            golden_sensitivity = float('nan')
209
210        result_string = ('Throughput = {}%, Sensitivity = {}.'
211                         'Target Sensitivity = {}'.format(
212                             result['peak_throughput_pct'],
213                             result['sensitivity'], golden_sensitivity))
214        if result['peak_throughput_pct'] < 95:
215            self.log.warning('Result unreliable. Peak rate unstable')
216        if result['sensitivity'] - golden_sensitivity < self.testclass_params[
217                'sensitivity_tolerance']:
218            asserts.explicit_pass('Test Passed. {}'.format(result_string))
219        else:
220            asserts.fail('Test Failed. {}'.format(result_string))
221
222    def process_testclass_results(self):
223        """Saves and plots test results from all executed test cases."""
224        # write json output
225        testclass_results_dict = collections.OrderedDict()
226        id_fields = ['mode', 'rate', 'num_streams', 'chain_mask']
227        channels_tested = []
228        for result in self.testclass_results:
229            testcase_params = result['testcase_params']
230            test_id = self.extract_test_id(testcase_params, id_fields)
231            test_id = tuple(test_id.items())
232            if test_id not in testclass_results_dict:
233                testclass_results_dict[test_id] = collections.OrderedDict()
234            channel = testcase_params['channel']
235            if channel not in channels_tested:
236                channels_tested.append(channel)
237            if result['peak_throughput_pct'] >= 95:
238                testclass_results_dict[test_id][channel] = result[
239                    'sensitivity']
240            else:
241                testclass_results_dict[test_id][channel] = ''
242
243        # calculate average metrics
244        metrics_dict = collections.OrderedDict()
245        id_fields = ['channel', 'mode', 'num_streams', 'chain_mask']
246        for test_id in testclass_results_dict.keys():
247            for channel in testclass_results_dict[test_id].keys():
248                metric_tag = collections.OrderedDict(test_id, channel=channel)
249                metric_tag = self.extract_test_id(metric_tag, id_fields)
250                metric_tag = tuple(metric_tag.items())
251                metrics_dict.setdefault(metric_tag, [])
252                sensitivity_result = testclass_results_dict[test_id][channel]
253                if sensitivity_result != '':
254                    metrics_dict[metric_tag].append(sensitivity_result)
255        for metric_tag_tuple, metric_data in metrics_dict.items():
256            metric_tag_dict = collections.OrderedDict(metric_tag_tuple)
257            metric_tag = 'ch{}_{}_nss{}_chain{}'.format(
258                metric_tag_dict['channel'], metric_tag_dict['mode'],
259                metric_tag_dict['num_streams'], metric_tag_dict['chain_mask'])
260            metric_key = "{}.avg_sensitivity".format(metric_tag)
261            metric_value = numpy.nanmean(metric_data)
262            self.testclass_metric_logger.add_metric(metric_key, metric_value)
263
264        # write csv
265        csv_header = ['Mode', 'MCS', 'Streams', 'Chain', 'Rate (Mbps)']
266        for channel in channels_tested:
267            csv_header.append('Ch. ' + str(channel))
268        results_file_path = os.path.join(self.log_path, 'results.csv')
269        with open(results_file_path, mode='w') as csv_file:
270            writer = csv.DictWriter(csv_file, fieldnames=csv_header)
271            writer.writeheader()
272            for test_id, test_results in testclass_results_dict.items():
273                test_id_dict = dict(test_id)
274                if 'legacy' in test_id_dict['mode']:
275                    rate_list = self.VALID_RATES['legacy_2GHz']
276                else:
277                    rate_list = self.VALID_RATES[test_id_dict['mode']]
278                data_rate = next(rate.data_rate for rate in rate_list
279                                 if rate[:-1] == (test_id_dict['rate'],
280                                                  test_id_dict['num_streams']))
281                row_value = {
282                    'Mode': test_id_dict['mode'],
283                    'MCS': test_id_dict['rate'],
284                    'Streams': test_id_dict['num_streams'],
285                    'Chain': test_id_dict['chain_mask'],
286                    'Rate (Mbps)': data_rate,
287                }
288                for channel in channels_tested:
289                    row_value['Ch. ' + str(channel)] = test_results.pop(
290                        channel, ' ')
291                writer.writerow(row_value)
292
293        if not self.testclass_params['traffic_type'].lower() == 'ping':
294            WifiRvrTest.process_testclass_results(self)
295
296    def process_rvr_test_results(self, testcase_params, rvr_result):
297        """Post processes RvR results to compute sensitivity.
298
299        Takes in the results of the RvR tests and computes the sensitivity of
300        the current rate by looking at the point at which throughput drops
301        below the percentage specified in the config file. The function then
302        calls on its parent class process_test_results to plot the result.
303
304        Args:
305            rvr_result: dict containing attenuation, throughput and other meta
306            data
307        """
308        rvr_result['peak_throughput'] = max(rvr_result['throughput_receive'])
309        rvr_result['peak_throughput_pct'] = 100
310        throughput_check = [
311            throughput < rvr_result['peak_throughput'] *
312            (self.testclass_params['throughput_pct_at_sensitivity'] / 100)
313            for throughput in rvr_result['throughput_receive']
314        ]
315        consistency_check = [
316            idx for idx in range(len(throughput_check))
317            if all(throughput_check[idx:])
318        ]
319        rvr_result['atten_at_range'] = rvr_result['attenuation'][
320            consistency_check[0] - 1]
321        rvr_result['range'] = rvr_result['fixed_attenuation'] + (
322            rvr_result['atten_at_range'])
323        rvr_result['sensitivity'] = self.testclass_params['ap_tx_power'] + (
324            self.testbed_params['ap_tx_power_offset'][str(
325                testcase_params['channel'])] - rvr_result['range'])
326        WifiRvrTest.process_test_results(self, rvr_result)
327
328    def process_ping_test_results(self, testcase_params, ping_result):
329        """Post processes RvR results to compute sensitivity.
330
331        Takes in the results of the RvR tests and computes the sensitivity of
332        the current rate by looking at the point at which throughput drops
333        below the percentage specified in the config file. The function then
334        calls on its parent class process_test_results to plot the result.
335
336        Args:
337            rvr_result: dict containing attenuation, throughput and other meta
338            data
339        """
340        WifiPingTest.process_ping_results(self, testcase_params, ping_result)
341        ping_result['sensitivity'] = self.testclass_params['ap_tx_power'] + (
342            self.testbed_params['ap_tx_power_offset'][str(
343                testcase_params['channel'])] - ping_result['range'])
344
345    def setup_sensitivity_test(self, testcase_params):
346        if testcase_params['traffic_type'].lower() == 'ping':
347            self.setup_ping_test(testcase_params)
348            self.run_sensitivity_test = self.run_ping_test
349            self.process_sensitivity_test_results = (
350                self.process_ping_test_results)
351        else:
352            self.setup_rvr_test(testcase_params)
353            self.run_sensitivity_test = self.run_rvr_test
354            self.process_sensitivity_test_results = (
355                self.process_rvr_test_results)
356
357    def setup_ap(self, testcase_params):
358        """Sets up the AP and attenuator to compensate for AP chain imbalance.
359
360        Args:
361            testcase_params: dict containing AP and other test params
362        """
363        band = self.access_point.band_lookup_by_channel(
364            testcase_params['channel'])
365        if '2G' in band:
366            frequency = wutils.WifiEnums.channel_2G_to_freq[
367                testcase_params['channel']]
368        else:
369            frequency = wutils.WifiEnums.channel_5G_to_freq[
370                testcase_params['channel']]
371        if frequency in wutils.WifiEnums.DFS_5G_FREQUENCIES:
372            self.access_point.set_region(self.testbed_params['DFS_region'])
373        else:
374            self.access_point.set_region(self.testbed_params['default_region'])
375        self.access_point.set_channel(band, testcase_params['channel'])
376        self.access_point.set_bandwidth(band, testcase_params['mode'])
377        self.access_point.set_power(band, testcase_params['ap_tx_power'])
378        self.access_point.set_rate(band, testcase_params['mode'],
379                                   testcase_params['num_streams'],
380                                   testcase_params['rate'],
381                                   testcase_params['short_gi'])
382        # Set attenuator offsets and set attenuators to initial condition
383        atten_offsets = self.testbed_params['chain_offset'][str(
384            testcase_params['channel'])]
385        for atten in self.attenuators:
386            if 'AP-Chain-0' in atten.path:
387                atten.offset = atten_offsets[0]
388            elif 'AP-Chain-1' in atten.path:
389                atten.offset = atten_offsets[1]
390            else:
391                atten.offset = 0
392        self.log.info('Access Point Configuration: {}'.format(
393            self.access_point.ap_settings))
394
395    def setup_dut(self, testcase_params):
396        """Sets up the DUT in the configuration required by the test.
397
398        Args:
399            testcase_params: dict containing AP and other test params
400        """
401        # Check battery level before test
402        if not wputils.health_check(self.dut, 10):
403            asserts.skip('Battery level too low. Skipping test.')
404        # Turn screen off to preserve battery
405        self.dut.go_to_sleep()
406        if wputils.validate_network(self.dut,
407                                    testcase_params['test_network']['SSID']):
408            self.log.info('Already connected to desired network')
409        else:
410            wutils.reset_wifi(self.dut)
411            wutils.set_wifi_country_code(self.dut,
412                                         self.testclass_params['country_code'])
413            testcase_params['test_network']['channel'] = testcase_params[
414                'channel']
415            wutils.wifi_connect(self.dut,
416                                testcase_params['test_network'],
417                                num_of_tries=5,
418                                check_connectivity=False)
419        self.dut_ip = self.dut.droid.connectivityGetIPv4Addresses('wlan0')[0]
420        # Activate/attenuate the correct chains
421        if testcase_params['channel'] not in self.atten_dut_chain_map.keys():
422            self.atten_dut_chain_map[testcase_params[
423                'channel']] = wputils.get_current_atten_dut_chain_map(
424                    self.attenuators, self.dut, self.ping_server)
425        self.log.info("Current Attenuator-DUT Chain Map: {}".format(
426            self.atten_dut_chain_map[testcase_params['channel']]))
427        for idx, atten in enumerate(self.attenuators):
428            if self.atten_dut_chain_map[testcase_params['channel']][
429                    idx] == testcase_params['attenuated_chain']:
430                atten.offset = atten.instrument.max_atten
431
432    def extract_test_id(self, testcase_params, id_fields):
433        test_id = collections.OrderedDict(
434            (param, testcase_params[param]) for param in id_fields)
435        return test_id
436
437    def get_start_atten(self, testcase_params):
438        """Gets the starting attenuation for this sensitivity test.
439
440        The function gets the starting attenuation by checking whether a test
441        as the next higher MCS has been executed. If so it sets the starting
442        point a configurable number of dBs below the next MCS's sensitivity.
443
444        Returns:
445            start_atten: starting attenuation for current test
446        """
447        # Get the current and reference test config. The reference test is the
448        # one performed at the current MCS+1
449        current_rate = testcase_params['rate']
450        ref_test_params = self.extract_test_id(
451            testcase_params,
452            ['channel', 'mode', 'rate', 'num_streams', 'chain_mask'])
453        if 'legacy' in testcase_params['mode']:
454            if testcase_params['channel'] <= 13:
455                rate_list = self.VALID_RATES['legacy_2GHz']
456            else:
457                rate_list = self.VALID_RATES['legacy_5GHz']
458            ref_index = max(
459                0,
460                rate_list.index(self.RateTuple(current_rate, 1, current_rate))
461                - 1)
462            ref_test_params['rate'] = rate_list[ref_index].mcs
463        else:
464            ref_test_params['rate'] = current_rate + 1
465
466        # Check if reference test has been run and set attenuation accordingly
467        previous_params = [
468            self.extract_test_id(
469                result['testcase_params'],
470                ['channel', 'mode', 'rate', 'num_streams', 'chain_mask'])
471            for result in self.testclass_results
472        ]
473
474        try:
475            ref_index = previous_params.index(ref_test_params)
476            start_atten = self.testclass_results[ref_index][
477                'atten_at_range'] - (
478                    self.testclass_params['adjacent_mcs_range_gap'])
479        except ValueError:
480            self.log.warning(
481                'Reference test not found. Starting from {} dB'.format(
482                    self.testclass_params['atten_start']))
483            start_atten = self.testclass_params['atten_start']
484            start_atten = max(start_atten, 0)
485        return start_atten
486
487    def compile_test_params(self, testcase_params):
488        """Function that generates test params based on the test name."""
489        band = self.access_point.band_lookup_by_channel(
490            testcase_params['channel'])
491        testcase_params['test_network'] = self.main_network[band]
492        if testcase_params['chain_mask'] in ['0', '1']:
493            testcase_params['attenuated_chain'] = 'DUT-Chain-{}'.format(
494                1 if testcase_params['chain_mask'] == '0' else 0)
495        else:
496            # Set attenuated chain to -1. Do not set to None as this will be
497            # compared to RF chain map which may include None
498            testcase_params['attenuated_chain'] = -1
499
500        self.testclass_params[
501            'range_ping_loss_threshold'] = 100 - self.testclass_params[
502                'throughput_pct_at_sensitivity']
503        if self.testclass_params['traffic_type'] == 'UDP':
504            testcase_params['iperf_args'] = '-i 1 -t {} -J -u -b {}'.format(
505                self.testclass_params['iperf_duration'],
506                self.testclass_params['UDP_rates'][testcase_params['mode']])
507        elif self.testclass_params['traffic_type'] == 'TCP':
508            testcase_params['iperf_args'] = '-i 1 -t {} -J'.format(
509                self.testclass_params['iperf_duration'])
510
511        if self.testclass_params['traffic_type'] != 'ping' and isinstance(
512                self.iperf_client, iperf_client.IPerfClientOverAdb):
513            testcase_params['iperf_args'] += ' -R'
514            testcase_params['use_client_output'] = True
515        else:
516            testcase_params['use_client_output'] = False
517
518        return testcase_params
519
520    def _test_sensitivity(self, testcase_params):
521        """ Function that gets called for each test case
522
523        The function gets called in each rvr test case. The function customizes
524        the rvr test based on the test name of the test that called it
525        """
526        # Compile test parameters from config and test name
527        testcase_params = self.compile_test_params(testcase_params)
528        testcase_params.update(self.testclass_params)
529        testcase_params['atten_start'] = self.get_start_atten(testcase_params)
530        num_atten_steps = int(
531            (testcase_params['atten_stop'] - testcase_params['atten_start']) /
532            testcase_params['atten_step'])
533        testcase_params['atten_range'] = [
534            testcase_params['atten_start'] + x * testcase_params['atten_step']
535            for x in range(0, num_atten_steps)
536        ]
537
538        # Prepare devices and run test
539        self.setup_sensitivity_test(testcase_params)
540        result = self.run_sensitivity_test(testcase_params)
541        self.process_sensitivity_test_results(testcase_params, result)
542
543        # Post-process results
544        self.testclass_results.append(result)
545        self.pass_fail_check(result)
546
547    def generate_test_cases(self, channels, modes, chain_mask):
548        """Function that auto-generates test cases for a test class."""
549        test_cases = []
550        for channel in channels:
551            requested_modes = [
552                mode for mode in modes
553                if mode in self.VALID_TEST_CONFIGS[channel]
554            ]
555            for mode in requested_modes:
556                if 'VHT' in mode:
557                    rates = self.VALID_RATES[mode]
558                elif 'HT' in mode:
559                    rates = self.VALID_RATES[mode]
560                elif 'legacy' in mode and channel < 14:
561                    rates = self.VALID_RATES['legacy_2GHz']
562                elif 'legacy' in mode and channel > 14:
563                    rates = self.VALID_RATES['legacy_5GHz']
564                else:
565                    raise ValueError('Invalid test mode.')
566                for chain, rate in itertools.product(chain_mask, rates):
567                    testcase_params = collections.OrderedDict(
568                        channel=channel,
569                        mode=mode,
570                        rate=rate.mcs,
571                        num_streams=rate.streams,
572                        short_gi=1,
573                        chain_mask=chain)
574                    if chain in ['0', '1'] and rate[1] == 2:
575                        # Do not test 2-stream rates in single chain mode
576                        continue
577                    if 'legacy' in mode:
578                        testcase_name = ('test_sensitivity_ch{}_{}_{}_nss{}'
579                                         '_ch{}'.format(
580                                             channel, mode,
581                                             str(rate.mcs).replace('.', 'p'),
582                                             rate.streams, chain))
583                    else:
584                        testcase_name = ('test_sensitivity_ch{}_{}_mcs{}_nss{}'
585                                         '_ch{}'.format(
586                                             channel, mode, rate.mcs,
587                                             rate.streams, chain))
588                    setattr(self, testcase_name,
589                            partial(self._test_sensitivity, testcase_params))
590                    test_cases.append(testcase_name)
591        return test_cases
592
593
594class WifiSensitivity_AllChannels_Test(WifiSensitivityTest):
595    def __init__(self, controllers):
596        super().__init__(controllers)
597        self.tests = self.generate_test_cases(
598            [6, 36, 40, 44, 48, 149, 153, 157, 161],
599            ['VHT20', 'VHT40', 'VHT80'], ['0', '1', '2x2'])
600
601
602class WifiSensitivity_SampleChannels_Test(WifiSensitivityTest):
603    def __init__(self, controllers):
604        super().__init__(controllers)
605        self.tests = self.generate_test_cases([6, 36, 149],
606                                              ['VHT20', 'VHT40', 'VHT80'],
607                                              ['0', '1', '2x2'])
608
609
610class WifiSensitivity_2GHz_Test(WifiSensitivityTest):
611    def __init__(self, controllers):
612        super().__init__(controllers)
613        self.tests = self.generate_test_cases([1, 2, 6, 10, 11], ['VHT20'],
614                                              ['0', '1', '2x2'])
615
616
617class WifiSensitivity_5GHz_Test(WifiSensitivityTest):
618    def __init__(self, controllers):
619        super().__init__(controllers)
620        self.tests = self.generate_test_cases(
621            [36, 40, 44, 48, 149, 153, 157, 161], ['VHT20', 'VHT40', 'VHT80'],
622            ['0', '1', '2x2'])
623
624
625class WifiSensitivity_UNII1_Test(WifiSensitivityTest):
626    def __init__(self, controllers):
627        super().__init__(controllers)
628        self.tests = self.generate_test_cases([36, 40, 44, 48],
629                                              ['VHT20', 'VHT40', 'VHT80'],
630                                              ['0', '1', '2x2'])
631
632
633class WifiSensitivity_UNII3_Test(WifiSensitivityTest):
634    def __init__(self, controllers):
635        super().__init__(controllers)
636        self.tests = self.generate_test_cases([149, 153, 157, 161],
637                                              ['VHT20', 'VHT40', 'VHT80'],
638                                              ['0', '1', '2x2'])
639
640
641# Over-the air version of senstivity tests
642class WifiOtaSensitivityTest(WifiSensitivityTest):
643    """Class to test over-the-air senstivity.
644
645    This class implements measures WiFi sensitivity tests in an OTA chamber.
646    It allows setting orientation and other chamber parameters to study
647    performance in varying channel conditions
648    """
649    def __init__(self, controllers):
650        base_test.BaseTestClass.__init__(self, controllers)
651        self.testcase_metric_logger = (
652            BlackboxMappedMetricLogger.for_test_case())
653        self.testclass_metric_logger = (
654            BlackboxMappedMetricLogger.for_test_class())
655        self.publish_testcase_metrics = False
656
657    def setup_class(self):
658        WifiSensitivityTest.setup_class(self)
659        self.current_chain_mask = '2x2'
660        self.ota_chamber = ota_chamber.create(
661            self.user_params['OTAChamber'])[0]
662
663    def teardown_class(self):
664        WifiSensitivityTest.teardown_class(self)
665        self.ota_chamber.reset_chamber()
666
667    def setup_sensitivity_test(self, testcase_params):
668        # Setup turntable
669        self.ota_chamber.set_orientation(testcase_params['orientation'])
670        # Continue test setup
671        WifiSensitivityTest.setup_sensitivity_test(self, testcase_params)
672
673    def setup_dut(self, testcase_params):
674        """Sets up the DUT in the configuration required by the test.
675
676        Args:
677            testcase_params: dict containing AP and other test params
678        """
679        # Configure the right INI settings
680        if testcase_params['chain_mask'] != self.current_chain_mask:
681            self.log.info('Updating WiFi chain mask to: {}'.format(
682                testcase_params['chain_mask']))
683            self.current_chain_mask = testcase_params['chain_mask']
684            if testcase_params['chain_mask'] in ['0', '1']:
685                wputils.set_ini_single_chain_mode(
686                    self.dut, int(testcase_params['chain_mask']))
687            else:
688                wputils.set_ini_two_chain_mode(self.dut)
689        # Check battery level before test
690        if not wputils.health_check(self.dut, 10):
691            asserts.skip('Battery level too low. Skipping test.')
692        # Turn screen off to preserve battery
693        self.dut.go_to_sleep()
694        if wputils.validate_network(self.dut,
695                                    testcase_params['test_network']['SSID']):
696            self.log.info('Already connected to desired network')
697        else:
698            wutils.reset_wifi(self.dut)
699            wutils.set_wifi_country_code(self.dut,
700                                         self.testclass_params['country_code'])
701            testcase_params['test_network']['channel'] = testcase_params[
702                'channel']
703            wutils.wifi_connect(self.dut,
704                                testcase_params['test_network'],
705                                num_of_tries=5,
706                                check_connectivity=False)
707        self.dut_ip = self.dut.droid.connectivityGetIPv4Addresses('wlan0')[0]
708
709    def process_testclass_results(self):
710        """Saves and plots test results from all executed test cases."""
711        testclass_results_dict = collections.OrderedDict()
712        id_fields = ['channel', 'mode', 'rate']
713        plots = []
714        for result in self.testclass_results:
715            test_id = self.extract_test_id(result['testcase_params'],
716                                           id_fields)
717            test_id = tuple(test_id.items())
718            chain_mask = result['testcase_params']['chain_mask']
719            num_streams = result['testcase_params']['num_streams']
720            line_id = (chain_mask, num_streams)
721            if test_id not in testclass_results_dict:
722                testclass_results_dict[test_id] = collections.OrderedDict()
723            if line_id not in testclass_results_dict[test_id]:
724                testclass_results_dict[test_id][line_id] = {
725                    'orientation': [],
726                    'sensitivity': []
727                }
728            testclass_results_dict[test_id][line_id]['orientation'].append(
729                result['testcase_params']['orientation'])
730            if result['peak_throughput_pct'] >= 95:
731                testclass_results_dict[test_id][line_id]['sensitivity'].append(
732                    result['sensitivity'])
733            else:
734                testclass_results_dict[test_id][line_id]['sensitivity'].append(
735                    float('nan'))
736
737        for test_id, test_data in testclass_results_dict.items():
738            test_id_dict = dict(test_id)
739            if 'legacy' in test_id_dict['mode']:
740                test_id_str = 'Channel {} - {} {}Mbps'.format(
741                    test_id_dict['channel'], test_id_dict['mode'],
742                    test_id_dict['rate'])
743            else:
744                test_id_str = 'Channel {} - {} MCS{}'.format(
745                    test_id_dict['channel'], test_id_dict['mode'],
746                    test_id_dict['rate'])
747            curr_plot = wputils.BokehFigure(
748                title=str(test_id_str),
749                x_label='Orientation (deg)',
750                primary_y_label='Sensitivity (dBm)')
751            for line_id, line_results in test_data.items():
752                curr_plot.add_line(line_results['orientation'],
753                                   line_results['sensitivity'],
754                                   legend='Nss{} - Chain Mask {}'.format(
755                                       line_id[1], line_id[0]),
756                                   marker='circle')
757                if 'legacy' in test_id_dict['mode']:
758                    metric_tag = 'ota_summary_ch{}_{}_{}_ch{}'.format(
759                        test_id_dict['channel'], test_id_dict['mode'],
760                        test_id_dict['rate'], line_id[0])
761                else:
762                    metric_tag = 'ota_summary_ch{}_{}_mcs{}_nss{}_ch{}'.format(
763                        test_id_dict['channel'], test_id_dict['mode'],
764                        test_id_dict['rate'], line_id[1], line_id[0])
765
766                metric_name = metric_tag + '.avg_sensitivity'
767                metric_value = numpy.nanmean(line_results['sensitivity'])
768                self.testclass_metric_logger.add_metric(
769                    metric_name, metric_value)
770                self.log.info(("Average Sensitivity for {}: {:.1f}").format(
771                    metric_tag, metric_value))
772            current_context = (
773                context.get_current_context().get_full_output_path())
774            output_file_path = os.path.join(current_context,
775                                            str(test_id_str) + '.html')
776            curr_plot.generate_figure(output_file_path)
777            plots.append(curr_plot)
778        output_file_path = os.path.join(current_context, 'results.html')
779        wputils.BokehFigure.save_figures(plots, output_file_path)
780
781    def get_start_atten(self, testcase_params):
782        """Gets the starting attenuation for this sensitivity test.
783
784        The function gets the starting attenuation by checking whether a test
785        at the same rate configuration has executed. If so it sets the starting
786        point a configurable number of dBs below the reference test.
787
788        Returns:
789            start_atten: starting attenuation for current test
790        """
791        # Get the current and reference test config. The reference test is the
792        # one performed at the current MCS+1
793        ref_test_params = self.extract_test_id(
794            testcase_params,
795            ['channel', 'mode', 'rate', 'num_streams', 'chain_mask'])
796        # Check if reference test has been run and set attenuation accordingly
797        previous_params = [
798            self.extract_test_id(
799                result['testcase_params'],
800                ['channel', 'mode', 'rate', 'num_streams', 'chain_mask'])
801            for result in self.testclass_results
802        ]
803        try:
804            ref_index = previous_params[::-1].index(ref_test_params)
805            ref_index = len(previous_params) - 1 - ref_index
806            start_atten = self.testclass_results[ref_index][
807                'atten_at_range'] - (
808                    self.testclass_params['adjacent_mcs_range_gap'])
809        except ValueError:
810            print('Reference test not found. Starting from {} dB'.format(
811                self.testclass_params['atten_start']))
812            start_atten = self.testclass_params['atten_start']
813        start_atten = max(start_atten, 0)
814        return start_atten
815
816    def generate_test_cases(self, channels, modes, requested_rates, chain_mask,
817                            angles):
818        """Function that auto-generates test cases for a test class."""
819        test_cases = []
820        for channel in channels:
821            requested_modes = [
822                mode for mode in modes
823                if mode in self.VALID_TEST_CONFIGS[channel]
824            ]
825            for chain, mode in itertools.product(chain_mask, requested_modes):
826                if 'VHT' in mode:
827                    valid_rates = self.VALID_RATES[mode]
828                elif 'HT' in mode:
829                    valid_rates = self.VALID_RATES[mode]
830                elif 'legacy' in mode and channel < 14:
831                    valid_rates = self.VALID_RATES['legacy_2GHz']
832                elif 'legacy' in mode and channel > 14:
833                    valid_rates = self.VALID_RATES['legacy_5GHz']
834                else:
835                    raise ValueError('Invalid test mode.')
836                for rate, angle in itertools.product(valid_rates, angles):
837                    testcase_params = collections.OrderedDict(
838                        channel=channel,
839                        mode=mode,
840                        rate=rate.mcs,
841                        num_streams=rate.streams,
842                        short_gi=1,
843                        chain_mask=chain,
844                        orientation=angle)
845                    if rate not in requested_rates:
846                        continue
847                    if str(chain) in ['0', '1'] and rate[1] == 2:
848                        # Do not test 2-stream rates in single chain mode
849                        continue
850                    if 'legacy' in mode:
851                        testcase_name = ('test_sensitivity_ch{}_{}_{}_nss{}'
852                                         '_ch{}_{}deg'.format(
853                                             channel, mode,
854                                             str(rate.mcs).replace('.', 'p'),
855                                             rate.streams, chain, angle))
856                    else:
857                        testcase_name = ('test_sensitivity_ch{}_{}_mcs{}_nss{}'
858                                         '_ch{}_{}deg'.format(
859                                             channel, mode, rate.mcs,
860                                             rate.streams, chain, angle))
861                    setattr(self, testcase_name,
862                            partial(self._test_sensitivity, testcase_params))
863                    test_cases.append(testcase_name)
864        return test_cases
865
866
867class WifiOtaSensitivity_TenDegree_Test(WifiOtaSensitivityTest):
868    def __init__(self, controllers):
869        WifiOtaSensitivityTest.__init__(self, controllers)
870        requested_channels = [6, 36, 149]
871        requested_rates = [
872            self.RateTuple(8, 1, 86.7),
873            self.RateTuple(2, 1, 21.7),
874            self.RateTuple(8, 2, 173.3),
875            self.RateTuple(2, 2, 43.3)
876        ]
877        self.tests = self.generate_test_cases(requested_channels,
878                                              ['VHT20', 'VHT80'],
879                                              requested_rates, ['2x2'],
880                                              list(range(0, 360, 10)))
881
882
883class WifiOtaSensitivity_PerChain_TenDegree_Test(WifiOtaSensitivityTest):
884    def __init__(self, controllers):
885        WifiOtaSensitivityTest.__init__(self, controllers)
886        requested_channels = [6, 36, 149]
887        requested_rates = [
888            self.RateTuple(2, 1, 21.7),
889            self.RateTuple(2, 2, 43.3)
890        ]
891        self.tests = self.generate_test_cases(requested_channels, ['VHT20'],
892                                              requested_rates,
893                                              ['0', '1', '2x2'],
894                                              list(range(0, 360, 10)))
895
896
897class WifiOtaSensitivity_ThirtyDegree_Test(WifiOtaSensitivityTest):
898    def __init__(self, controllers):
899        WifiOtaSensitivityTest.__init__(self, controllers)
900        requested_channels = [6, 36, 149]
901        requested_rates = [
902            self.RateTuple(9, 1, 96),
903            self.RateTuple(8, 1, 86.7),
904            self.RateTuple(7, 1, 72.2),
905            self.RateTuple(4, 1, 43.3),
906            self.RateTuple(2, 1, 21.7),
907            self.RateTuple(0, 1, 7.2),
908            self.RateTuple(9, 2, 192),
909            self.RateTuple(8, 2, 173.3),
910            self.RateTuple(7, 2, 144.4),
911            self.RateTuple(4, 2, 86.7),
912            self.RateTuple(2, 2, 43.3),
913            self.RateTuple(0, 2, 14.4)
914        ]
915        self.tests = self.generate_test_cases(requested_channels,
916                                              ['VHT20', 'VHT80'],
917                                              requested_rates, ['2x2'],
918                                              list(range(0, 360, 30)))
919
920
921class WifiOtaSensitivity_45Degree_Test(WifiOtaSensitivityTest):
922    def __init__(self, controllers):
923        WifiOtaSensitivityTest.__init__(self, controllers)
924        requested_rates = [
925            self.RateTuple(8, 1, 86.7),
926            self.RateTuple(2, 1, 21.7),
927            self.RateTuple(8, 2, 173.3),
928            self.RateTuple(2, 2, 43.3)
929        ]
930        self.tests = self.generate_test_cases(
931            [1, 6, 11, 36, 40, 44, 48, 149, 153, 157, 161], ['VHT20', 'VHT80'],
932            requested_rates, ['2x2'], list(range(0, 360, 45)))
933