1#!/usr/bin/env python3.4
2#
3#   Copyright 2017 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the 'License');
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an 'AS IS' BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import collections
18import csv
19import itertools
20import logging
21import numpy
22import os
23from acts import asserts
24from acts import context
25from acts import base_test
26from acts import utils
27from acts.controllers import iperf_client
28from acts.controllers.utils_lib import ssh
29from acts.metrics.loggers.blackbox import BlackboxMappedMetricLogger
30from acts_contrib.test_utils.wifi import ota_chamber
31from acts_contrib.test_utils.wifi import wifi_performance_test_utils as wputils
32from acts_contrib.test_utils.wifi import wifi_test_utils as wutils
33from acts_contrib.test_utils.wifi import wifi_retail_ap as retail_ap
34from functools import partial
35from WifiRvrTest import WifiRvrTest
36from WifiPingTest import WifiPingTest
37
38
39class WifiSensitivityTest(WifiRvrTest, WifiPingTest):
40    """Class to test WiFi sensitivity tests.
41
42    This class implements measures WiFi sensitivity per rate. It heavily
43    leverages the WifiRvrTest class and introduced minor differences to set
44    specific rates and the access point, and implements a different pass/fail
45    check. For an example config file to run this test class see
46    example_connectivity_performance_ap_sta.json.
47    """
48
49    RSSI_POLL_INTERVAL = 0.2
50    VALID_TEST_CONFIGS = {
51        1: ['legacy', 'VHT20'],
52        2: ['legacy', 'VHT20'],
53        6: ['legacy', 'VHT20'],
54        10: ['legacy', 'VHT20'],
55        11: ['legacy', 'VHT20'],
56        36: ['legacy', 'VHT20', 'VHT40', 'VHT80'],
57        40: ['legacy', 'VHT20'],
58        44: ['legacy', 'VHT20'],
59        48: ['legacy', 'VHT20'],
60        149: ['legacy', 'VHT20', 'VHT40', 'VHT80'],
61        153: ['legacy', 'VHT20'],
62        157: ['legacy', 'VHT20'],
63        161: ['legacy', 'VHT20']
64    }
65    RateTuple = collections.namedtuple(('RateTuple'),
66                                       ['mcs', 'streams', 'data_rate'])
67    #yapf:disable
68    VALID_RATES = {
69        'legacy_2GHz': [
70            RateTuple(54, 1, 54), RateTuple(48, 1, 48),
71            RateTuple(36, 1, 36), RateTuple(24, 1, 24),
72            RateTuple(18, 1, 18), RateTuple(12, 1, 12),
73            RateTuple(11, 1, 11), RateTuple(9, 1, 9),
74            RateTuple(6, 1, 6), RateTuple(5.5, 1, 5.5),
75            RateTuple(2, 1, 2), RateTuple(1, 1, 1)],
76        'legacy_5GHz': [
77            RateTuple(54, 1, 54), RateTuple(48, 1, 48),
78            RateTuple(36, 1, 36), RateTuple(24, 1, 24),
79            RateTuple(18, 1, 18), RateTuple(12, 1, 12),
80            RateTuple(9, 1, 9), RateTuple(6, 1, 6)],
81        'HT20': [
82            RateTuple(7, 1, 72.2), RateTuple(6, 1, 65),
83            RateTuple(5, 1, 57.8), RateTuple(4, 1, 43.3),
84            RateTuple(3, 1, 26), RateTuple(2, 1, 21.7),
85            RateTuple(1, 1, 14.4), RateTuple(0, 1, 7.2),
86            RateTuple(15, 2, 144.4), RateTuple(14, 2, 130),
87            RateTuple(13, 2, 115.6), RateTuple(12, 2, 86.7),
88            RateTuple(11, 2, 57.8), RateTuple(10, 2, 43.4),
89            RateTuple(9, 2, 28.9), RateTuple(8, 2, 14.4)],
90        'VHT20': [
91            RateTuple(9, 1, 96), RateTuple(8, 1, 86.7),
92            RateTuple(7, 1, 72.2), RateTuple(6, 1, 65),
93            RateTuple(5, 1, 57.8), RateTuple(4, 1, 43.3),
94            RateTuple(3, 1, 28.9), RateTuple(2, 1, 21.7),
95            RateTuple(1, 1, 14.4), RateTuple(0, 1, 7.2),
96            RateTuple(9, 2, 192), RateTuple(8, 2, 173.3),
97            RateTuple(7, 2, 144.4), RateTuple(6, 2, 130.3),
98            RateTuple(5, 2, 115.6), RateTuple(4, 2, 86.7),
99            RateTuple(3, 2, 57.8), RateTuple(2, 2, 43.3),
100            RateTuple(1, 2, 28.9), RateTuple(0, 2, 14.4)],
101        'VHT40': [
102            RateTuple(9, 1, 96), RateTuple(8, 1, 86.7),
103            RateTuple(7, 1, 72.2), RateTuple(6, 1, 65),
104            RateTuple(5, 1, 57.8), RateTuple(4, 1, 43.3),
105            RateTuple(3, 1, 28.9), RateTuple(2, 1, 21.7),
106            RateTuple(1, 1, 14.4), RateTuple(0, 1, 7.2),
107            RateTuple(9, 2, 192), RateTuple(8, 2, 173.3),
108            RateTuple(7, 2, 144.4), RateTuple(6, 2, 130.3),
109            RateTuple(5, 2, 115.6), RateTuple(4, 2, 86.7),
110            RateTuple(3, 2, 57.8), RateTuple(2, 2, 43.3),
111            RateTuple(1, 2, 28.9), RateTuple(0, 2, 14.4)],
112        'VHT80': [
113            RateTuple(9, 1, 96), RateTuple(8, 1, 86.7),
114            RateTuple(7, 1, 72.2), RateTuple(6, 1, 65),
115            RateTuple(5, 1, 57.8), RateTuple(4, 1, 43.3),
116            RateTuple(3, 1, 28.9), RateTuple(2, 1, 21.7),
117            RateTuple(1, 1, 14.4), RateTuple(0, 1, 7.2),
118            RateTuple(9, 2, 192), RateTuple(8, 2, 173.3),
119            RateTuple(7, 2, 144.4), RateTuple(6, 2, 130.3),
120            RateTuple(5, 2, 115.6), RateTuple(4, 2, 86.7),
121            RateTuple(3, 2, 57.8), RateTuple(2, 2, 43.3),
122            RateTuple(1, 2, 28.9), RateTuple(0, 2, 14.4)],
123    }
124    #yapf:enable
125
126    def __init__(self, controllers):
127        base_test.BaseTestClass.__init__(self, controllers)
128        self.testcase_metric_logger = (
129            BlackboxMappedMetricLogger.for_test_case())
130        self.testclass_metric_logger = (
131            BlackboxMappedMetricLogger.for_test_class())
132        self.publish_testcase_metrics = True
133
134    def setup_class(self):
135        """Initializes common test hardware and parameters.
136
137        This function initializes hardwares and compiles parameters that are
138        common to all tests in this class.
139        """
140        self.dut = self.android_devices[-1]
141        req_params = [
142            'RetailAccessPoints', 'sensitivity_test_params', 'testbed_params',
143            'RemoteServer'
144        ]
145        opt_params = ['main_network']
146        self.unpack_userparams(req_params, opt_params)
147        self.testclass_params = self.sensitivity_test_params
148        self.num_atten = self.attenuators[0].instrument.num_atten
149        self.ping_server = ssh.connection.SshConnection(
150            ssh.settings.from_config(self.RemoteServer[0]['ssh_config']))
151        self.iperf_server = self.iperf_servers[0]
152        self.iperf_client = self.iperf_clients[0]
153        self.access_point = retail_ap.create(self.RetailAccessPoints)[0]
154        self.log.info('Access Point Configuration: {}'.format(
155            self.access_point.ap_settings))
156        self.log_path = os.path.join(logging.log_path, 'results')
157        os.makedirs(self.log_path, exist_ok=True)
158        self.atten_dut_chain_map = {}
159        self.testclass_results = []
160
161        # Turn WiFi ON
162        if self.testclass_params.get('airplane_mode', 1):
163            self.log.info('Turning on airplane mode.')
164            asserts.assert_true(utils.force_airplane_mode(self.dut, True),
165                                'Can not turn on airplane mode.')
166        wutils.wifi_toggle_state(self.dut, True)
167
168        # Configure test retries
169        self.user_params['retry_tests'] = [self.__class__.__name__]
170
171    def teardown_class(self):
172        # Turn WiFi OFF
173        for dev in self.android_devices:
174            wutils.wifi_toggle_state(dev, False)
175        self.process_testclass_results()
176
177    def setup_test(self):
178        self.retry_flag = False
179
180    def teardown_test(self):
181        self.retry_flag = False
182
183    def on_retry(self):
184        """Function to control test logic on retried tests.
185
186        This function is automatically executed on tests that are being
187        retried. In this case the function resets wifi, toggles it off and on
188        and sets a retry_flag to enable further tweaking the test logic on
189        second attempts.
190        """
191        self.retry_flag = True
192        for dev in self.android_devices:
193            wutils.reset_wifi(dev)
194            wutils.toggle_wifi_off_and_on(dev)
195
196    def pass_fail_check(self, result):
197        """Checks sensitivity results and decides on pass/fail.
198
199        Args:
200            result: dict containing attenuation, throughput and other meta
201                data
202        """
203        result_string = ('Throughput = {}%, Sensitivity = {}.'.format(
204            result['peak_throughput_pct'], result['sensitivity']))
205        if result['peak_throughput_pct'] < 95:
206            asserts.fail('Result unreliable. {}'.format(result_string))
207        else:
208            asserts.explicit_pass('Test Passed. {}'.format(result_string))
209
210    def process_testclass_results(self):
211        """Saves and plots test results from all executed test cases."""
212        # write json output
213        testclass_results_dict = collections.OrderedDict()
214        id_fields = ['mode', 'rate', 'num_streams', 'chain_mask']
215        channels_tested = []
216        for result in self.testclass_results:
217            testcase_params = result['testcase_params']
218            test_id = self.extract_test_id(testcase_params, id_fields)
219            test_id = tuple(test_id.items())
220            if test_id not in testclass_results_dict:
221                testclass_results_dict[test_id] = collections.OrderedDict()
222            channel = testcase_params['channel']
223            if channel not in channels_tested:
224                channels_tested.append(channel)
225            if result['peak_throughput_pct'] >= 95:
226                testclass_results_dict[test_id][channel] = result[
227                    'sensitivity']
228            else:
229                testclass_results_dict[test_id][channel] = ''
230
231        # calculate average metrics
232        metrics_dict = collections.OrderedDict()
233        id_fields = ['channel', 'mode', 'num_streams', 'chain_mask']
234        for test_id in testclass_results_dict.keys():
235            for channel in testclass_results_dict[test_id].keys():
236                metric_tag = collections.OrderedDict(test_id, channel=channel)
237                metric_tag = self.extract_test_id(metric_tag, id_fields)
238                metric_tag = tuple(metric_tag.items())
239                metrics_dict.setdefault(metric_tag, [])
240                sensitivity_result = testclass_results_dict[test_id][channel]
241                if sensitivity_result != '':
242                    metrics_dict[metric_tag].append(sensitivity_result)
243        for metric_tag_tuple, metric_data in metrics_dict.items():
244            metric_tag_dict = collections.OrderedDict(metric_tag_tuple)
245            metric_tag = 'ch{}_{}_nss{}_chain{}'.format(
246                metric_tag_dict['channel'], metric_tag_dict['mode'],
247                metric_tag_dict['num_streams'], metric_tag_dict['chain_mask'])
248            metric_key = '{}.avg_sensitivity'.format(metric_tag)
249            metric_value = numpy.nanmean(metric_data)
250            self.testclass_metric_logger.add_metric(metric_key, metric_value)
251
252        # write csv
253        csv_header = ['Mode', 'MCS', 'Streams', 'Chain', 'Rate (Mbps)']
254        for channel in channels_tested:
255            csv_header.append('Ch. ' + str(channel))
256        results_file_path = os.path.join(self.log_path, 'results.csv')
257        with open(results_file_path, mode='w') as csv_file:
258            writer = csv.DictWriter(csv_file, fieldnames=csv_header)
259            writer.writeheader()
260            for test_id, test_results in testclass_results_dict.items():
261                test_id_dict = dict(test_id)
262                if 'legacy' in test_id_dict['mode']:
263                    rate_list = self.VALID_RATES['legacy_2GHz']
264                else:
265                    rate_list = self.VALID_RATES[test_id_dict['mode']]
266                data_rate = next(rate.data_rate for rate in rate_list
267                                 if rate[:-1] == (test_id_dict['rate'],
268                                                  test_id_dict['num_streams']))
269                row_value = {
270                    'Mode': test_id_dict['mode'],
271                    'MCS': test_id_dict['rate'],
272                    'Streams': test_id_dict['num_streams'],
273                    'Chain': test_id_dict['chain_mask'],
274                    'Rate (Mbps)': data_rate,
275                }
276                for channel in channels_tested:
277                    row_value['Ch. ' + str(channel)] = test_results.pop(
278                        channel, ' ')
279                writer.writerow(row_value)
280
281        if not self.testclass_params['traffic_type'].lower() == 'ping':
282            WifiRvrTest.process_testclass_results(self)
283
284    def process_rvr_test_results(self, testcase_params, rvr_result):
285        """Post processes RvR results to compute sensitivity.
286
287        Takes in the results of the RvR tests and computes the sensitivity of
288        the current rate by looking at the point at which throughput drops
289        below the percentage specified in the config file. The function then
290        calls on its parent class process_test_results to plot the result.
291
292        Args:
293            rvr_result: dict containing attenuation, throughput and other meta
294            data
295        """
296        rvr_result['peak_throughput'] = max(rvr_result['throughput_receive'])
297        rvr_result['peak_throughput_pct'] = 100
298        throughput_check = [
299            throughput < rvr_result['peak_throughput'] *
300            (self.testclass_params['throughput_pct_at_sensitivity'] / 100)
301            for throughput in rvr_result['throughput_receive']
302        ]
303        consistency_check = [
304            idx for idx in range(len(throughput_check))
305            if all(throughput_check[idx:])
306        ]
307        rvr_result['atten_at_range'] = rvr_result['attenuation'][
308            consistency_check[0] - 1]
309        rvr_result['range'] = rvr_result['fixed_attenuation'] + (
310            rvr_result['atten_at_range'])
311        rvr_result['sensitivity'] = self.testclass_params['ap_tx_power'] + (
312            self.testbed_params['ap_tx_power_offset'][str(
313                testcase_params['channel'])] - rvr_result['range'])
314        WifiRvrTest.process_test_results(self, rvr_result)
315
316    def process_ping_test_results(self, testcase_params, ping_result):
317        """Post processes RvR results to compute sensitivity.
318
319        Takes in the results of the RvR tests and computes the sensitivity of
320        the current rate by looking at the point at which throughput drops
321        below the percentage specified in the config file. The function then
322        calls on its parent class process_test_results to plot the result.
323
324        Args:
325            rvr_result: dict containing attenuation, throughput and other meta
326            data
327        """
328        WifiPingTest.process_ping_results(self, testcase_params, ping_result)
329        ping_result['sensitivity'] = self.testclass_params['ap_tx_power'] + (
330            self.testbed_params['ap_tx_power_offset'][str(
331                testcase_params['channel'])] - ping_result['range'])
332
333    def setup_sensitivity_test(self, testcase_params):
334        if testcase_params['traffic_type'].lower() == 'ping':
335            self.setup_ping_test(testcase_params)
336            self.run_sensitivity_test = self.run_ping_test
337            self.process_sensitivity_test_results = (
338                self.process_ping_test_results)
339        else:
340            self.setup_rvr_test(testcase_params)
341            self.run_sensitivity_test = self.run_rvr_test
342            self.process_sensitivity_test_results = (
343                self.process_rvr_test_results)
344
345    def setup_ap(self, testcase_params):
346        """Sets up the AP and attenuator to compensate for AP chain imbalance.
347
348        Args:
349            testcase_params: dict containing AP and other test params
350        """
351        band = self.access_point.band_lookup_by_channel(
352            testcase_params['channel'])
353        if '2G' in band:
354            frequency = wutils.WifiEnums.channel_2G_to_freq[
355                testcase_params['channel']]
356        else:
357            frequency = wutils.WifiEnums.channel_5G_to_freq[
358                testcase_params['channel']]
359        if frequency in wutils.WifiEnums.DFS_5G_FREQUENCIES:
360            self.access_point.set_region(self.testbed_params['DFS_region'])
361        else:
362            self.access_point.set_region(self.testbed_params['default_region'])
363        self.access_point.set_channel(band, testcase_params['channel'])
364        self.access_point.set_bandwidth(band, testcase_params['mode'])
365        self.access_point.set_power(band, testcase_params['ap_tx_power'])
366        self.access_point.set_rate(band, testcase_params['mode'],
367                                   testcase_params['num_streams'],
368                                   testcase_params['rate'],
369                                   testcase_params['short_gi'])
370        # Set attenuator offsets and set attenuators to initial condition
371        atten_offsets = self.testbed_params['chain_offset'][str(
372            testcase_params['channel'])]
373        for atten in self.attenuators:
374            if 'AP-Chain-0' in atten.path:
375                atten.offset = atten_offsets[0]
376            elif 'AP-Chain-1' in atten.path:
377                atten.offset = atten_offsets[1]
378            else:
379                atten.offset = 0
380        self.log.info('Access Point Configuration: {}'.format(
381            self.access_point.ap_settings))
382
383    def setup_dut(self, testcase_params):
384        """Sets up the DUT in the configuration required by the test.
385
386        Args:
387            testcase_params: dict containing AP and other test params
388        """
389        # Check battery level before test
390        if not wputils.health_check(self.dut, 10):
391            asserts.skip('Battery level too low. Skipping test.')
392        # Turn screen off to preserve battery
393        self.dut.go_to_sleep()
394        if wputils.validate_network(self.dut,
395                                    testcase_params['test_network']['SSID']):
396            self.log.info('Already connected to desired network')
397        else:
398            wutils.reset_wifi(self.dut)
399            wutils.set_wifi_country_code(self.dut,
400                                         self.testclass_params['country_code'])
401            testcase_params['test_network']['channel'] = testcase_params[
402                'channel']
403            wutils.wifi_connect(self.dut,
404                                testcase_params['test_network'],
405                                num_of_tries=5,
406                                check_connectivity=False)
407        self.dut_ip = self.dut.droid.connectivityGetIPv4Addresses('wlan0')[0]
408        # Activate/attenuate the correct chains
409        if testcase_params['channel'] not in self.atten_dut_chain_map.keys():
410            self.atten_dut_chain_map[testcase_params[
411                'channel']] = wputils.get_current_atten_dut_chain_map(
412                    self.attenuators, self.dut, self.ping_server)
413        self.log.info('Current Attenuator-DUT Chain Map: {}'.format(
414            self.atten_dut_chain_map[testcase_params['channel']]))
415        for idx, atten in enumerate(self.attenuators):
416            if self.atten_dut_chain_map[testcase_params['channel']][
417                    idx] == testcase_params['attenuated_chain']:
418                atten.offset = atten.instrument.max_atten
419
420    def extract_test_id(self, testcase_params, id_fields):
421        test_id = collections.OrderedDict(
422            (param, testcase_params[param]) for param in id_fields)
423        return test_id
424
425    def get_start_atten(self, testcase_params):
426        """Gets the starting attenuation for this sensitivity test.
427
428        The function gets the starting attenuation by checking whether a test
429        as the next higher MCS has been executed. If so it sets the starting
430        point a configurable number of dBs below the next MCS's sensitivity.
431
432        Returns:
433            start_atten: starting attenuation for current test
434        """
435        # If the test is being retried, start from the beginning
436        if self.retry_flag:
437            self.log.info('Retry flag set. Setting attenuation to minimum.')
438            return self.testclass_params['atten_start']
439        # Get the current and reference test config. The reference test is the
440        # one performed at the current MCS+1
441        current_rate = testcase_params['rate']
442        ref_test_params = self.extract_test_id(
443            testcase_params,
444            ['channel', 'mode', 'rate', 'num_streams', 'chain_mask'])
445        if 'legacy' in testcase_params['mode']:
446            if testcase_params['channel'] <= 13:
447                rate_list = self.VALID_RATES['legacy_2GHz']
448            else:
449                rate_list = self.VALID_RATES['legacy_5GHz']
450            ref_index = max(
451                0,
452                rate_list.index(self.RateTuple(current_rate, 1, current_rate))
453                - 1)
454            ref_test_params['rate'] = rate_list[ref_index].mcs
455        else:
456            ref_test_params['rate'] = current_rate + 1
457
458        # Check if reference test has been run and set attenuation accordingly
459        previous_params = [
460            self.extract_test_id(
461                result['testcase_params'],
462                ['channel', 'mode', 'rate', 'num_streams', 'chain_mask'])
463            for result in self.testclass_results
464        ]
465
466        try:
467            ref_index = previous_params.index(ref_test_params)
468            start_atten = self.testclass_results[ref_index][
469                'atten_at_range'] - (
470                    self.testclass_params['adjacent_mcs_range_gap'])
471        except ValueError:
472            self.log.warning(
473                'Reference test not found. Starting from {} dB'.format(
474                    self.testclass_params['atten_start']))
475            start_atten = self.testclass_params['atten_start']
476            start_atten = max(start_atten, 0)
477        return start_atten
478
479    def compile_test_params(self, testcase_params):
480        """Function that generates test params based on the test name."""
481        band = self.access_point.band_lookup_by_channel(
482            testcase_params['channel'])
483        testcase_params['test_network'] = self.main_network[band]
484        if testcase_params['chain_mask'] in ['0', '1']:
485            testcase_params['attenuated_chain'] = 'DUT-Chain-{}'.format(
486                1 if testcase_params['chain_mask'] == '0' else 0)
487        else:
488            # Set attenuated chain to -1. Do not set to None as this will be
489            # compared to RF chain map which may include None
490            testcase_params['attenuated_chain'] = -1
491
492        self.testclass_params[
493            'range_ping_loss_threshold'] = 100 - self.testclass_params[
494                'throughput_pct_at_sensitivity']
495        if self.testclass_params['traffic_type'] == 'UDP':
496            testcase_params['iperf_args'] = '-i 1 -t {} -J -u -b {}'.format(
497                self.testclass_params['iperf_duration'],
498                self.testclass_params['UDP_rates'][testcase_params['mode']])
499        elif self.testclass_params['traffic_type'] == 'TCP':
500            testcase_params['iperf_args'] = '-i 1 -t {} -J'.format(
501                self.testclass_params['iperf_duration'])
502
503        if self.testclass_params['traffic_type'] != 'ping' and isinstance(
504                self.iperf_client, iperf_client.IPerfClientOverAdb):
505            testcase_params['iperf_args'] += ' -R'
506            testcase_params['use_client_output'] = True
507        else:
508            testcase_params['use_client_output'] = False
509
510        return testcase_params
511
512    def _test_sensitivity(self, testcase_params):
513        """Function that gets called for each test case
514
515        The function gets called in each rvr test case. The function customizes
516        the rvr test based on the test name of the test that called it
517        """
518        # Compile test parameters from config and test name
519        testcase_params = self.compile_test_params(testcase_params)
520        testcase_params.update(self.testclass_params)
521        testcase_params['atten_start'] = self.get_start_atten(testcase_params)
522        num_atten_steps = int(
523            (testcase_params['atten_stop'] - testcase_params['atten_start']) /
524            testcase_params['atten_step'])
525        testcase_params['atten_range'] = [
526            testcase_params['atten_start'] + x * testcase_params['atten_step']
527            for x in range(0, num_atten_steps)
528        ]
529
530        # Prepare devices and run test
531        self.setup_sensitivity_test(testcase_params)
532        result = self.run_sensitivity_test(testcase_params)
533        self.process_sensitivity_test_results(testcase_params, result)
534
535        # Post-process results
536        self.testclass_results.append(result)
537        self.pass_fail_check(result)
538
539    def generate_test_cases(self, channels, modes, chain_mask):
540        """Function that auto-generates test cases for a test class."""
541        test_cases = []
542        for channel in channels:
543            requested_modes = [
544                mode for mode in modes
545                if mode in self.VALID_TEST_CONFIGS[channel]
546            ]
547            for mode in requested_modes:
548                bandwidth = int(''.join([x for x in mode if x.isdigit()]))
549                if 'VHT' in mode:
550                    rates = self.VALID_RATES[mode]
551                elif 'HT' in mode:
552                    rates = self.VALID_RATES[mode]
553                elif 'legacy' in mode and channel < 14:
554                    rates = self.VALID_RATES['legacy_2GHz']
555                elif 'legacy' in mode and channel > 14:
556                    rates = self.VALID_RATES['legacy_5GHz']
557                else:
558                    raise ValueError('Invalid test mode.')
559                for chain, rate in itertools.product(chain_mask, rates):
560                    testcase_params = collections.OrderedDict(
561                        channel=channel,
562                        mode=mode,
563                        bandwidth=bandwidth,
564                        rate=rate.mcs,
565                        num_streams=rate.streams,
566                        short_gi=1,
567                        chain_mask=chain)
568                    if chain in ['0', '1'] and rate[1] == 2:
569                        # Do not test 2-stream rates in single chain mode
570                        continue
571                    if 'legacy' in mode:
572                        testcase_name = ('test_sensitivity_ch{}_{}_{}_nss{}'
573                                         '_ch{}'.format(
574                                             channel, mode,
575                                             str(rate.mcs).replace('.', 'p'),
576                                             rate.streams, chain))
577                    else:
578                        testcase_name = ('test_sensitivity_ch{}_{}_mcs{}_nss{}'
579                                         '_ch{}'.format(
580                                             channel, mode, rate.mcs,
581                                             rate.streams, chain))
582                    setattr(self, testcase_name,
583                            partial(self._test_sensitivity, testcase_params))
584                    test_cases.append(testcase_name)
585        return test_cases
586
587
588class WifiSensitivity_AllChannels_Test(WifiSensitivityTest):
589    def __init__(self, controllers):
590        super().__init__(controllers)
591        self.tests = self.generate_test_cases(
592            [6, 36, 40, 44, 48, 149, 153, 157, 161],
593            ['VHT20', 'VHT40', 'VHT80'], ['0', '1', '2x2'])
594
595
596class WifiSensitivity_SampleChannels_Test(WifiSensitivityTest):
597    def __init__(self, controllers):
598        super().__init__(controllers)
599        self.tests = self.generate_test_cases([6, 36, 149],
600                                              ['VHT20', 'VHT40', 'VHT80'],
601                                              ['0', '1', '2x2'])
602
603
604class WifiSensitivity_2GHz_Test(WifiSensitivityTest):
605    def __init__(self, controllers):
606        super().__init__(controllers)
607        self.tests = self.generate_test_cases([1, 2, 6, 10, 11], ['VHT20'],
608                                              ['0', '1', '2x2'])
609
610
611class WifiSensitivity_5GHz_Test(WifiSensitivityTest):
612    def __init__(self, controllers):
613        super().__init__(controllers)
614        self.tests = self.generate_test_cases(
615            [36, 40, 44, 48, 149, 153, 157, 161], ['VHT20', 'VHT40', 'VHT80'],
616            ['0', '1', '2x2'])
617
618
619class WifiSensitivity_UNII1_Test(WifiSensitivityTest):
620    def __init__(self, controllers):
621        super().__init__(controllers)
622        self.tests = self.generate_test_cases([36, 40, 44, 48],
623                                              ['VHT20', 'VHT40', 'VHT80'],
624                                              ['0', '1', '2x2'])
625
626
627class WifiSensitivity_UNII3_Test(WifiSensitivityTest):
628    def __init__(self, controllers):
629        super().__init__(controllers)
630        self.tests = self.generate_test_cases([149, 153, 157, 161],
631                                              ['VHT20', 'VHT40', 'VHT80'],
632                                              ['0', '1', '2x2'])
633
634
635# Over-the air version of senstivity tests
636class WifiOtaSensitivityTest(WifiSensitivityTest):
637    """Class to test over-the-air senstivity.
638
639    This class implements measures WiFi sensitivity tests in an OTA chamber.
640    It allows setting orientation and other chamber parameters to study
641    performance in varying channel conditions
642    """
643    def __init__(self, controllers):
644        base_test.BaseTestClass.__init__(self, controllers)
645        self.testcase_metric_logger = (
646            BlackboxMappedMetricLogger.for_test_case())
647        self.testclass_metric_logger = (
648            BlackboxMappedMetricLogger.for_test_class())
649        self.publish_testcase_metrics = False
650
651    def setup_class(self):
652        WifiSensitivityTest.setup_class(self)
653        self.current_chain_mask = '2x2'
654        self.ota_chamber = ota_chamber.create(
655            self.user_params['OTAChamber'])[0]
656
657    def teardown_class(self):
658        WifiSensitivityTest.teardown_class(self)
659        self.ota_chamber.reset_chamber()
660
661    def setup_sensitivity_test(self, testcase_params):
662        # Setup turntable
663        self.ota_chamber.set_orientation(testcase_params['orientation'])
664        # Continue test setup
665        WifiSensitivityTest.setup_sensitivity_test(self, testcase_params)
666
667    def setup_dut(self, testcase_params):
668        """Sets up the DUT in the configuration required by the test.
669
670        Args:
671            testcase_params: dict containing AP and other test params
672        """
673        # Configure the right INI settings
674        if testcase_params['chain_mask'] != self.current_chain_mask:
675            self.log.info('Updating WiFi chain mask to: {}'.format(
676                testcase_params['chain_mask']))
677            self.current_chain_mask = testcase_params['chain_mask']
678            if testcase_params['chain_mask'] in ['0', '1']:
679                wputils.set_ini_single_chain_mode(
680                    self.dut, int(testcase_params['chain_mask']))
681            else:
682                wputils.set_ini_two_chain_mode(self.dut)
683        # Check battery level before test
684        if not wputils.health_check(self.dut, 10):
685            asserts.skip('Battery level too low. Skipping test.')
686        # Turn screen off to preserve battery
687        self.dut.go_to_sleep()
688        if wputils.validate_network(self.dut,
689                                    testcase_params['test_network']['SSID']):
690            self.log.info('Already connected to desired network')
691        else:
692            wutils.reset_wifi(self.dut)
693            wutils.set_wifi_country_code(self.dut,
694                                         self.testclass_params['country_code'])
695            testcase_params['test_network']['channel'] = testcase_params[
696                'channel']
697            wutils.wifi_connect(self.dut,
698                                testcase_params['test_network'],
699                                num_of_tries=5,
700                                check_connectivity=False)
701        self.dut_ip = self.dut.droid.connectivityGetIPv4Addresses('wlan0')[0]
702
703    def process_testclass_results(self):
704        """Saves and plots test results from all executed test cases."""
705        testclass_results_dict = collections.OrderedDict()
706        id_fields = ['channel', 'mode', 'rate']
707        plots = []
708        for result in self.testclass_results:
709            test_id = self.extract_test_id(result['testcase_params'],
710                                           id_fields)
711            test_id = tuple(test_id.items())
712            chain_mask = result['testcase_params']['chain_mask']
713            num_streams = result['testcase_params']['num_streams']
714            line_id = (chain_mask, num_streams)
715            if test_id not in testclass_results_dict:
716                testclass_results_dict[test_id] = collections.OrderedDict()
717            if line_id not in testclass_results_dict[test_id]:
718                testclass_results_dict[test_id][line_id] = {
719                    'orientation': [],
720                    'sensitivity': []
721                }
722            orientation = result['testcase_params']['orientation']
723            if result['peak_throughput_pct'] >= 95:
724                sensitivity = result['sensitivity']
725            else:
726                sensitivity = float('nan')
727            if orientation not in testclass_results_dict[test_id][line_id][
728                    'orientation']:
729                testclass_results_dict[test_id][line_id]['orientation'].append(
730                    orientation)
731                testclass_results_dict[test_id][line_id]['sensitivity'].append(
732                    sensitivity)
733            else:
734                testclass_results_dict[test_id][line_id]['sensitivity'][
735                    -1] = sensitivity
736
737        for test_id, test_data in testclass_results_dict.items():
738            test_id_dict = dict(test_id)
739            if 'legacy' in test_id_dict['mode']:
740                test_id_str = 'Channel {} - {} {}Mbps'.format(
741                    test_id_dict['channel'], test_id_dict['mode'],
742                    test_id_dict['rate'])
743            else:
744                test_id_str = 'Channel {} - {} MCS{}'.format(
745                    test_id_dict['channel'], test_id_dict['mode'],
746                    test_id_dict['rate'])
747            curr_plot = wputils.BokehFigure(
748                title=str(test_id_str),
749                x_label='Orientation (deg)',
750                primary_y_label='Sensitivity (dBm)')
751            for line_id, line_results in test_data.items():
752                curr_plot.add_line(line_results['orientation'],
753                                   line_results['sensitivity'],
754                                   legend='Nss{} - Chain Mask {}'.format(
755                                       line_id[1], line_id[0]),
756                                   marker='circle')
757                if 'legacy' in test_id_dict['mode']:
758                    metric_tag = 'ota_summary_ch{}_{}_{}_ch{}'.format(
759                        test_id_dict['channel'], test_id_dict['mode'],
760                        test_id_dict['rate'], line_id[0])
761                else:
762                    metric_tag = 'ota_summary_ch{}_{}_mcs{}_nss{}_ch{}'.format(
763                        test_id_dict['channel'], test_id_dict['mode'],
764                        test_id_dict['rate'], line_id[1], line_id[0])
765
766                metric_name = metric_tag + '.avg_sensitivity'
767                metric_value = numpy.nanmean(line_results['sensitivity'])
768                self.testclass_metric_logger.add_metric(
769                    metric_name, metric_value)
770                self.log.info(('Average Sensitivity for {}: {:.1f}').format(
771                    metric_tag, metric_value))
772            current_context = (
773                context.get_current_context().get_full_output_path())
774            output_file_path = os.path.join(current_context,
775                                            str(test_id_str) + '.html')
776            curr_plot.generate_figure(output_file_path)
777            plots.append(curr_plot)
778        output_file_path = os.path.join(current_context, 'results.html')
779        wputils.BokehFigure.save_figures(plots, output_file_path)
780
781    def get_start_atten(self, testcase_params):
782        """Gets the starting attenuation for this sensitivity test.
783
784        The function gets the starting attenuation by checking whether a test
785        at the same rate configuration has executed. If so it sets the starting
786        point a configurable number of dBs below the reference test.
787
788        Returns:
789            start_atten: starting attenuation for current test
790        """
791        # If the test is being retried, start from the beginning
792        if self.retry_flag:
793            self.log.info('Retry flag set. Setting attenuation to minimum.')
794            return self.testclass_params['atten_start']
795        # Get the current and reference test config. The reference test is the
796        # one performed at the current MCS+1
797        ref_test_params = self.extract_test_id(
798            testcase_params,
799            ['channel', 'mode', 'rate', 'num_streams', 'chain_mask'])
800        # Check if reference test has been run and set attenuation accordingly
801        previous_params = [
802            self.extract_test_id(
803                result['testcase_params'],
804                ['channel', 'mode', 'rate', 'num_streams', 'chain_mask'])
805            for result in self.testclass_results
806        ]
807        try:
808            ref_index = previous_params[::-1].index(ref_test_params)
809            ref_index = len(previous_params) - 1 - ref_index
810            start_atten = self.testclass_results[ref_index][
811                'atten_at_range'] - (
812                    self.testclass_params['adjacent_mcs_range_gap'])
813        except ValueError:
814            print('Reference test not found. Starting from {} dB'.format(
815                self.testclass_params['atten_start']))
816            start_atten = self.testclass_params['atten_start']
817        start_atten = max(start_atten, 0)
818        return start_atten
819
820    def generate_test_cases(self, channels, modes, requested_rates, chain_mask,
821                            angles):
822        """Function that auto-generates test cases for a test class."""
823        test_cases = []
824        for channel in channels:
825            requested_modes = [
826                mode for mode in modes
827                if mode in self.VALID_TEST_CONFIGS[channel]
828            ]
829            for chain, mode in itertools.product(chain_mask, requested_modes):
830                bandwidth = int(''.join([x for x in mode if x.isdigit()]))
831                if 'VHT' in mode:
832                    valid_rates = self.VALID_RATES[mode]
833                elif 'HT' in mode:
834                    valid_rates = self.VALID_RATES[mode]
835                elif 'legacy' in mode and channel < 14:
836                    valid_rates = self.VALID_RATES['legacy_2GHz']
837                elif 'legacy' in mode and channel > 14:
838                    valid_rates = self.VALID_RATES['legacy_5GHz']
839                else:
840                    raise ValueError('Invalid test mode.')
841                for rate, angle in itertools.product(valid_rates, angles):
842                    testcase_params = collections.OrderedDict(
843                        channel=channel,
844                        mode=mode,
845                        bandwidth=bandwidth,
846                        rate=rate.mcs,
847                        num_streams=rate.streams,
848                        short_gi=1,
849                        chain_mask=chain,
850                        orientation=angle)
851                    if rate not in requested_rates:
852                        continue
853                    if str(chain) in ['0', '1'] and rate[1] == 2:
854                        # Do not test 2-stream rates in single chain mode
855                        continue
856                    if 'legacy' in mode:
857                        testcase_name = ('test_sensitivity_ch{}_{}_{}_nss{}'
858                                         '_ch{}_{}deg'.format(
859                                             channel, mode,
860                                             str(rate.mcs).replace('.', 'p'),
861                                             rate.streams, chain, angle))
862                    else:
863                        testcase_name = ('test_sensitivity_ch{}_{}_mcs{}_nss{}'
864                                         '_ch{}_{}deg'.format(
865                                             channel, mode, rate.mcs,
866                                             rate.streams, chain, angle))
867                    setattr(self, testcase_name,
868                            partial(self._test_sensitivity, testcase_params))
869                    test_cases.append(testcase_name)
870        return test_cases
871
872
873class WifiOtaSensitivity_TenDegree_Test(WifiOtaSensitivityTest):
874    def __init__(self, controllers):
875        WifiOtaSensitivityTest.__init__(self, controllers)
876        requested_channels = [6, 36, 149]
877        requested_rates = [
878            self.RateTuple(8, 1, 86.7),
879            self.RateTuple(2, 1, 21.7),
880            self.RateTuple(8, 2, 173.3),
881            self.RateTuple(2, 2, 43.3)
882        ]
883        self.tests = self.generate_test_cases(requested_channels,
884                                              ['VHT20', 'VHT80'],
885                                              requested_rates, ['2x2'],
886                                              list(range(0, 360, 10)))
887
888
889class WifiOtaSensitivity_PerChain_TenDegree_Test(WifiOtaSensitivityTest):
890    def __init__(self, controllers):
891        WifiOtaSensitivityTest.__init__(self, controllers)
892        requested_channels = [6, 36, 149]
893        requested_rates = [
894            self.RateTuple(2, 1, 21.7),
895            self.RateTuple(2, 2, 43.3)
896        ]
897        self.tests = self.generate_test_cases(requested_channels, ['VHT20'],
898                                              requested_rates,
899                                              ['0', '1', '2x2'],
900                                              list(range(0, 360, 10)))
901
902
903class WifiOtaSensitivity_ThirtyDegree_Test(WifiOtaSensitivityTest):
904    def __init__(self, controllers):
905        WifiOtaSensitivityTest.__init__(self, controllers)
906        requested_channels = [6, 36, 149]
907        requested_rates = [
908            self.RateTuple(9, 1, 96),
909            self.RateTuple(8, 1, 86.7),
910            self.RateTuple(7, 1, 72.2),
911            self.RateTuple(4, 1, 43.3),
912            self.RateTuple(2, 1, 21.7),
913            self.RateTuple(0, 1, 7.2),
914            self.RateTuple(9, 2, 192),
915            self.RateTuple(8, 2, 173.3),
916            self.RateTuple(7, 2, 144.4),
917            self.RateTuple(4, 2, 86.7),
918            self.RateTuple(2, 2, 43.3),
919            self.RateTuple(0, 2, 14.4)
920        ]
921        self.tests = self.generate_test_cases(requested_channels,
922                                              ['VHT20', 'VHT80'],
923                                              requested_rates, ['2x2'],
924                                              list(range(0, 360, 30)))
925
926
927class WifiOtaSensitivity_45Degree_Test(WifiOtaSensitivityTest):
928    def __init__(self, controllers):
929        WifiOtaSensitivityTest.__init__(self, controllers)
930        requested_rates = [
931            self.RateTuple(8, 1, 86.7),
932            self.RateTuple(2, 1, 21.7),
933            self.RateTuple(8, 2, 173.3),
934            self.RateTuple(2, 2, 43.3)
935        ]
936        self.tests = self.generate_test_cases(
937            [1, 6, 11, 36, 40, 44, 48, 149, 153, 157, 161], ['VHT20', 'VHT80'],
938            requested_rates, ['2x2'], list(range(0, 360, 45)))
939