1#!/usr/bin/env python3.4
2#
3#   Copyright 2017 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the 'License');
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an 'AS IS' BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import collections
18import itertools
19import json
20import logging
21import os
22import statistics
23from acts import asserts
24from acts import context
25from acts import base_test
26from acts import utils
27from acts.controllers.utils_lib import ssh
28from acts.metrics.loggers.blackbox import BlackboxMappedMetricLogger
29from acts_contrib.test_utils.wifi import ota_chamber
30from acts_contrib.test_utils.wifi import ota_sniffer
31from acts_contrib.test_utils.wifi import wifi_performance_test_utils as wputils
32from acts_contrib.test_utils.wifi import wifi_retail_ap as retail_ap
33from acts_contrib.test_utils.wifi import wifi_test_utils as wutils
34from functools import partial
35
36
37class WifiPingTest(base_test.BaseTestClass):
38    """Class for ping-based Wifi performance tests.
39
40    This class implements WiFi ping performance tests such as range and RTT.
41    The class setups up the AP in the desired configurations, configures
42    and connects the phone to the AP, and runs  For an example config file to
43    run this test class see example_connectivity_performance_ap_sta.json.
44    """
45
46    TEST_TIMEOUT = 10
47    RSSI_POLL_INTERVAL = 0.2
48    SHORT_SLEEP = 1
49    MED_SLEEP = 5
50    MAX_CONSECUTIVE_ZEROS = 5
51    DISCONNECTED_PING_RESULT = {
52        'connected': 0,
53        'rtt': [],
54        'time_stamp': [],
55        'ping_interarrivals': [],
56        'packet_loss_percentage': 100
57    }
58
59    def __init__(self, controllers):
60        base_test.BaseTestClass.__init__(self, controllers)
61        self.testcase_metric_logger = (
62            BlackboxMappedMetricLogger.for_test_case())
63        self.testclass_metric_logger = (
64            BlackboxMappedMetricLogger.for_test_class())
65        self.publish_testcase_metrics = True
66
67    def setup_class(self):
68        self.dut = self.android_devices[-1]
69        req_params = [
70            'ping_test_params', 'testbed_params', 'main_network',
71            'RetailAccessPoints', 'RemoteServer'
72        ]
73        opt_params = ['OTASniffer']
74        self.unpack_userparams(req_params, opt_params)
75        self.testclass_params = self.ping_test_params
76        self.num_atten = self.attenuators[0].instrument.num_atten
77        self.ping_server = ssh.connection.SshConnection(
78            ssh.settings.from_config(self.RemoteServer[0]['ssh_config']))
79        self.access_point = retail_ap.create(self.RetailAccessPoints)[0]
80        if hasattr(self,
81                   'OTASniffer') and self.testbed_params['sniffer_enable']:
82            self.sniffer = ota_sniffer.create(self.OTASniffer)[0]
83        self.log.info('Access Point Configuration: {}'.format(
84            self.access_point.ap_settings))
85        self.log_path = os.path.join(logging.log_path, 'results')
86        os.makedirs(self.log_path, exist_ok=True)
87        self.atten_dut_chain_map = {}
88        self.testclass_results = []
89
90        # Turn WiFi ON
91        if self.testclass_params.get('airplane_mode', 1):
92            self.log.info('Turning on airplane mode.')
93            asserts.assert_true(utils.force_airplane_mode(self.dut, True),
94                                'Can not turn on airplane mode.')
95        wutils.wifi_toggle_state(self.dut, True)
96
97        # Configure test retries
98        self.user_params['retry_tests'] = [self.__class__.__name__]
99
100    def teardown_class(self):
101        # Turn WiFi OFF and reset AP
102        for dev in self.android_devices:
103            wutils.wifi_toggle_state(dev, False)
104        self.process_testclass_results()
105
106    def setup_test(self):
107        self.retry_flag = False
108
109    def teardown_test(self):
110        self.retry_flag = False
111
112    def on_retry(self):
113        """Function to control test logic on retried tests.
114
115        This function is automatically executed on tests that are being
116        retried. In this case the function resets wifi, toggles it off and on
117        and sets a retry_flag to enable further tweaking the test logic on
118        second attempts.
119        """
120        self.retry_flag = True
121        for dev in self.android_devices:
122            wutils.reset_wifi(dev)
123            wutils.toggle_wifi_off_and_on(dev)
124
125    def process_testclass_results(self):
126        """Saves all test results to enable comparison."""
127        testclass_summary = {}
128        for test in self.testclass_results:
129            if 'range' in test['test_name']:
130                testclass_summary[test['test_name']] = test['range']
131        # Save results
132        results_file_path = os.path.join(self.log_path,
133                                         'testclass_summary.json')
134        with open(results_file_path, 'w') as results_file:
135            json.dump(testclass_summary, results_file, indent=4)
136
137    def pass_fail_check_ping_rtt(self, result):
138        """Check the test result and decide if it passed or failed.
139
140        The function computes RTT statistics and fails any tests in which the
141        tail of the ping latency results exceeds the threshold defined in the
142        configuration file.
143
144        Args:
145            result: dict containing ping results and other meta data
146        """
147        ignored_fraction = (self.testclass_params['rtt_ignored_interval'] /
148                            self.testclass_params['rtt_ping_duration'])
149        sorted_rtt = [
150            sorted(x['rtt'][round(ignored_fraction * len(x['rtt'])):])
151            for x in result['ping_results']
152        ]
153        disconnected = any([len(x) == 0 for x in sorted_rtt])
154        if disconnected:
155            asserts.fail('Test failed. DUT disconnected at least once.')
156
157        rtt_at_test_percentile = [
158            x[int((1 - self.testclass_params['rtt_test_percentile'] / 100) *
159                  len(x))] for x in sorted_rtt
160        ]
161        # Set blackbox metric
162        if self.publish_testcase_metrics:
163            self.testcase_metric_logger.add_metric('ping_rtt',
164                                                   max(rtt_at_test_percentile))
165        # Evaluate test pass/fail
166        rtt_failed = any([
167            rtt > self.testclass_params['rtt_threshold'] * 1000
168            for rtt in rtt_at_test_percentile
169        ])
170        if rtt_failed:
171            #TODO: figure out how to cleanly exclude RTT tests from retry
172            asserts.explicit_pass(
173                'Test failed. RTTs at test percentile = {}'.format(
174                    rtt_at_test_percentile))
175        else:
176            asserts.explicit_pass(
177                'Test Passed. RTTs at test percentile = {}'.format(
178                    rtt_at_test_percentile))
179
180    def pass_fail_check_ping_range(self, result):
181        """Check the test result and decide if it passed or failed.
182
183        Checks whether the attenuation at which ping packet losses begin to
184        exceed the threshold matches the range derived from golden
185        rate-vs-range result files. The test fails is ping range is
186        range_gap_threshold worse than RvR range.
187
188        Args:
189            result: dict containing ping results and meta data
190        """
191        # Evaluate test pass/fail
192        test_message = ('Attenuation at range is {}dB. '
193                        'LLStats at Range: {}'.format(
194                            result['range'], result['llstats_at_range']))
195        if result['peak_throughput_pct'] < 95:
196            asserts.fail('(RESULT NOT RELIABLE) {}'.format(test_message))
197
198        # If pass, set Blackbox metric
199        if self.publish_testcase_metrics:
200            self.testcase_metric_logger.add_metric('ping_range',
201                                                   result['range'])
202        asserts.explicit_pass(test_message)
203
204    def pass_fail_check(self, result):
205        if 'range' in result['testcase_params']['test_type']:
206            self.pass_fail_check_ping_range(result)
207        else:
208            self.pass_fail_check_ping_rtt(result)
209
210    def process_ping_results(self, testcase_params, ping_range_result):
211        """Saves and plots ping results.
212
213        Args:
214            ping_range_result: dict containing ping results and metadata
215        """
216        # Compute range
217        ping_loss_over_att = [
218            x['packet_loss_percentage']
219            for x in ping_range_result['ping_results']
220        ]
221        ping_loss_above_threshold = [
222            x > self.testclass_params['range_ping_loss_threshold']
223            for x in ping_loss_over_att
224        ]
225        for idx in range(len(ping_loss_above_threshold)):
226            if all(ping_loss_above_threshold[idx:]):
227                range_index = max(idx, 1) - 1
228                break
229        else:
230            range_index = -1
231        ping_range_result['atten_at_range'] = testcase_params['atten_range'][
232            range_index]
233        ping_range_result['peak_throughput_pct'] = 100 - min(
234            ping_loss_over_att)
235        ping_range_result['range'] = (ping_range_result['atten_at_range'] +
236                                      ping_range_result['fixed_attenuation'])
237        ping_range_result['llstats_at_range'] = (
238            'TX MCS = {0} ({1:.1f}%). '
239            'RX MCS = {2} ({3:.1f}%)'.format(
240                ping_range_result['llstats'][range_index]['summary']
241                ['common_tx_mcs'], ping_range_result['llstats'][range_index]
242                ['summary']['common_tx_mcs_freq'] * 100,
243                ping_range_result['llstats'][range_index]['summary']
244                ['common_rx_mcs'], ping_range_result['llstats'][range_index]
245                ['summary']['common_rx_mcs_freq'] * 100))
246
247        # Save results
248        results_file_path = os.path.join(
249            self.log_path, '{}.json'.format(self.current_test_name))
250        with open(results_file_path, 'w') as results_file:
251            json.dump(ping_range_result, results_file, indent=4)
252
253        # Plot results
254        if 'range' not in self.current_test_name:
255            figure = wputils.BokehFigure(
256                self.current_test_name,
257                x_label='Timestamp (s)',
258                primary_y_label='Round Trip Time (ms)')
259            for idx, result in enumerate(ping_range_result['ping_results']):
260                if len(result['rtt']) > 1:
261                    x_data = [
262                        t - result['time_stamp'][0]
263                        for t in result['time_stamp']
264                    ]
265                    figure.add_line(
266                        x_data, result['rtt'], 'RTT @ {}dB'.format(
267                            ping_range_result['attenuation'][idx]))
268
269            output_file_path = os.path.join(
270                self.log_path, '{}.html'.format(self.current_test_name))
271            figure.generate_figure(output_file_path)
272
273    def run_ping_test(self, testcase_params):
274        """Main function to test ping.
275
276        The function sets up the AP in the correct channel and mode
277        configuration and calls get_ping_stats while sweeping attenuation
278
279        Args:
280            testcase_params: dict containing all test parameters
281        Returns:
282            test_result: dict containing ping results and other meta data
283        """
284        # Prepare results dict
285        llstats_obj = wputils.LinkLayerStats(
286            self.dut, self.testclass_params.get('llstats_enabled', True))
287        test_result = collections.OrderedDict()
288        test_result['testcase_params'] = testcase_params.copy()
289        test_result['test_name'] = self.current_test_name
290        test_result['ap_config'] = self.access_point.ap_settings.copy()
291        test_result['attenuation'] = testcase_params['atten_range']
292        test_result['fixed_attenuation'] = self.testbed_params[
293            'fixed_attenuation'][str(testcase_params['channel'])]
294        test_result['rssi_results'] = []
295        test_result['ping_results'] = []
296        test_result['llstats'] = []
297        # Setup sniffer
298        if self.testbed_params['sniffer_enable']:
299            self.sniffer.start_capture(
300                testcase_params['test_network'],
301                chan=int(testcase_params['channel']),
302                bw=testcase_params['bandwidth'],
303                duration=testcase_params['ping_duration'] *
304                len(testcase_params['atten_range']) + self.TEST_TIMEOUT)
305        # Run ping and sweep attenuation as needed
306        zero_counter = 0
307        for atten in testcase_params['atten_range']:
308            for attenuator in self.attenuators:
309                attenuator.set_atten(atten, strict=False)
310            rssi_future = wputils.get_connected_rssi_nb(
311                self.dut,
312                int(testcase_params['ping_duration'] / 2 /
313                    self.RSSI_POLL_INTERVAL), self.RSSI_POLL_INTERVAL,
314                testcase_params['ping_duration'] / 2)
315            # Refresh link layer stats
316            llstats_obj.update_stats()
317            current_ping_stats = wputils.get_ping_stats(
318                self.ping_server, self.dut_ip,
319                testcase_params['ping_duration'],
320                testcase_params['ping_interval'], testcase_params['ping_size'])
321            current_rssi = rssi_future.result()
322            test_result['rssi_results'].append(current_rssi)
323            llstats_obj.update_stats()
324            curr_llstats = llstats_obj.llstats_incremental.copy()
325            test_result['llstats'].append(curr_llstats)
326            if current_ping_stats['connected']:
327                self.log.info(
328                    'Attenuation = {0}dB\tPacket Loss = {1}%\t'
329                    'Avg RTT = {2:.2f}ms\tRSSI = {3} [{4},{5}]\t'.format(
330                        atten, current_ping_stats['packet_loss_percentage'],
331                        statistics.mean(current_ping_stats['rtt']),
332                        current_rssi['signal_poll_rssi']['mean'],
333                        current_rssi['chain_0_rssi']['mean'],
334                        current_rssi['chain_1_rssi']['mean']))
335                if current_ping_stats['packet_loss_percentage'] == 100:
336                    zero_counter = zero_counter + 1
337                else:
338                    zero_counter = 0
339            else:
340                self.log.info(
341                    'Attenuation = {}dB. Disconnected.'.format(atten))
342                zero_counter = zero_counter + 1
343            test_result['ping_results'].append(current_ping_stats.as_dict())
344            if zero_counter == self.MAX_CONSECUTIVE_ZEROS:
345                self.log.info('Ping loss stable at 100%. Stopping test now.')
346                for idx in range(
347                        len(testcase_params['atten_range']) -
348                        len(test_result['ping_results'])):
349                    test_result['ping_results'].append(
350                        self.DISCONNECTED_PING_RESULT)
351                break
352        if self.testbed_params['sniffer_enable']:
353            self.sniffer.stop_capture()
354        return test_result
355
356    def setup_ap(self, testcase_params):
357        """Sets up the access point in the configuration required by the test.
358
359        Args:
360            testcase_params: dict containing AP and other test params
361        """
362        band = self.access_point.band_lookup_by_channel(
363            testcase_params['channel'])
364        if '2G' in band:
365            frequency = wutils.WifiEnums.channel_2G_to_freq[
366                testcase_params['channel']]
367        else:
368            frequency = wutils.WifiEnums.channel_5G_to_freq[
369                testcase_params['channel']]
370        if frequency in wutils.WifiEnums.DFS_5G_FREQUENCIES:
371            self.access_point.set_region(self.testbed_params['DFS_region'])
372        else:
373            self.access_point.set_region(self.testbed_params['default_region'])
374        self.access_point.set_channel(band, testcase_params['channel'])
375        self.access_point.set_bandwidth(band, testcase_params['mode'])
376        if 'low' in testcase_params['ap_power']:
377            self.log.info('Setting low AP power.')
378            self.access_point.set_power(
379                band, self.testclass_params['low_ap_tx_power'])
380        self.log.info('Access Point Configuration: {}'.format(
381            self.access_point.ap_settings))
382
383    def setup_dut(self, testcase_params):
384        """Sets up the DUT in the configuration required by the test.
385
386        Args:
387            testcase_params: dict containing AP and other test params
388        """
389        # Check battery level before test
390        if not wputils.health_check(self.dut, 10):
391            asserts.skip('Battery level too low. Skipping test.')
392        # Turn screen off to preserve battery
393        self.dut.go_to_sleep()
394        if wputils.validate_network(self.dut,
395                                    testcase_params['test_network']['SSID']):
396            self.log.info('Already connected to desired network')
397        else:
398            wutils.reset_wifi(self.dut)
399            wutils.set_wifi_country_code(self.dut,
400                                         self.testclass_params['country_code'])
401            testcase_params['test_network']['channel'] = testcase_params[
402                'channel']
403            wutils.wifi_connect(self.dut,
404                                testcase_params['test_network'],
405                                num_of_tries=5,
406                                check_connectivity=True)
407        self.dut_ip = self.dut.droid.connectivityGetIPv4Addresses('wlan0')[0]
408        if testcase_params['channel'] not in self.atten_dut_chain_map.keys():
409            self.atten_dut_chain_map[testcase_params[
410                'channel']] = wputils.get_current_atten_dut_chain_map(
411                    self.attenuators, self.dut, self.ping_server)
412        self.log.info('Current Attenuator-DUT Chain Map: {}'.format(
413            self.atten_dut_chain_map[testcase_params['channel']]))
414        for idx, atten in enumerate(self.attenuators):
415            if self.atten_dut_chain_map[testcase_params['channel']][
416                    idx] == testcase_params['attenuated_chain']:
417                atten.offset = atten.instrument.max_atten
418            else:
419                atten.offset = 0
420
421    def setup_ping_test(self, testcase_params):
422        """Function that gets devices ready for the test.
423
424        Args:
425            testcase_params: dict containing test-specific parameters
426        """
427        # Configure AP
428        self.setup_ap(testcase_params)
429        # Set attenuator to 0 dB
430        for attenuator in self.attenuators:
431            attenuator.set_atten(0, strict=False)
432        # Reset, configure, and connect DUT
433        self.setup_dut(testcase_params)
434
435    def get_range_start_atten(self, testcase_params):
436        """Gets the starting attenuation for this ping test.
437
438        This function is used to get the starting attenuation for ping range
439        tests. This implementation returns the default starting attenuation,
440        however, defining this function enables a more involved configuration
441        for over-the-air test classes.
442
443        Args:
444            testcase_params: dict containing all test params
445        """
446        return self.testclass_params['range_atten_start']
447
448    def compile_test_params(self, testcase_params):
449        band = self.access_point.band_lookup_by_channel(
450            testcase_params['channel'])
451        testcase_params['test_network'] = self.main_network[band]
452        if testcase_params['chain_mask'] in ['0', '1']:
453            testcase_params['attenuated_chain'] = 'DUT-Chain-{}'.format(
454                1 if testcase_params['chain_mask'] == '0' else 0)
455        else:
456            # Set attenuated chain to -1. Do not set to None as this will be
457            # compared to RF chain map which may include None
458            testcase_params['attenuated_chain'] = -1
459        if testcase_params['test_type'] == 'test_ping_range':
460            testcase_params.update(
461                ping_interval=self.testclass_params['range_ping_interval'],
462                ping_duration=self.testclass_params['range_ping_duration'],
463                ping_size=self.testclass_params['ping_size'],
464            )
465        elif testcase_params['test_type'] == 'test_fast_ping_rtt':
466            testcase_params.update(
467                ping_interval=self.testclass_params['rtt_ping_interval']
468                ['fast'],
469                ping_duration=self.testclass_params['rtt_ping_duration'],
470                ping_size=self.testclass_params['ping_size'],
471            )
472        elif testcase_params['test_type'] == 'test_slow_ping_rtt':
473            testcase_params.update(
474                ping_interval=self.testclass_params['rtt_ping_interval']
475                ['slow'],
476                ping_duration=self.testclass_params['rtt_ping_duration'],
477                ping_size=self.testclass_params['ping_size'])
478
479        if testcase_params['test_type'] == 'test_ping_range':
480            start_atten = self.get_range_start_atten(testcase_params)
481            num_atten_steps = int(
482                (self.testclass_params['range_atten_stop'] - start_atten) /
483                self.testclass_params['range_atten_step'])
484            testcase_params['atten_range'] = [
485                start_atten + x * self.testclass_params['range_atten_step']
486                for x in range(0, num_atten_steps)
487            ]
488        else:
489            testcase_params['atten_range'] = self.testclass_params[
490                'rtt_test_attenuation']
491        return testcase_params
492
493    def _test_ping(self, testcase_params):
494        """ Function that gets called for each range test case
495
496        The function gets called in each range test case. It customizes the
497        range test based on the test name of the test that called it
498
499        Args:
500            testcase_params: dict containing preliminary set of parameters
501        """
502        # Compile test parameters from config and test name
503        testcase_params = self.compile_test_params(testcase_params)
504        # Run ping test
505        self.setup_ping_test(testcase_params)
506        ping_result = self.run_ping_test(testcase_params)
507        # Postprocess results
508        self.process_ping_results(testcase_params, ping_result)
509        self.testclass_results.append(ping_result)
510        self.pass_fail_check(ping_result)
511
512    def generate_test_cases(self, ap_power, channels, modes, chain_mask,
513                            test_types):
514        """Function that auto-generates test cases for a test class."""
515        test_cases = []
516        allowed_configs = {
517            20: [
518                1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 36, 40, 44, 48, 64, 100,
519                116, 132, 140, 149, 153, 157, 161
520            ],
521            40: [36, 44, 100, 149, 157],
522            80: [36, 100, 149],
523            160: [36]
524        }
525
526        for channel, mode, chain, test_type in itertools.product(
527                channels, modes, chain_mask, test_types):
528            bandwidth = int(''.join([x for x in mode if x.isdigit()]))
529            if channel not in allowed_configs[bandwidth]:
530                continue
531            testcase_name = '{}_ch{}_{}_ch{}'.format(test_type, channel, mode,
532                                                     chain)
533            testcase_params = collections.OrderedDict(test_type=test_type,
534                                                      ap_power=ap_power,
535                                                      channel=channel,
536                                                      mode=mode,
537                                                      bandwidth=bandwidth,
538                                                      chain_mask=chain)
539            setattr(self, testcase_name,
540                    partial(self._test_ping, testcase_params))
541            test_cases.append(testcase_name)
542        return test_cases
543
544
545class WifiPing_TwoChain_Test(WifiPingTest):
546    def __init__(self, controllers):
547        super().__init__(controllers)
548        self.tests = self.generate_test_cases(
549            ap_power='standard',
550            channels=[1, 6, 11, 36, 40, 44, 48, 149, 153, 157, 161],
551            modes=['bw20', 'bw40', 'bw80'],
552            test_types=[
553                'test_ping_range', 'test_fast_ping_rtt', 'test_slow_ping_rtt'
554            ],
555            chain_mask=['2x2'])
556
557
558class WifiPing_PerChainRange_Test(WifiPingTest):
559    def __init__(self, controllers):
560        super().__init__(controllers)
561        self.tests = self.generate_test_cases(
562            ap_power='standard',
563            chain_mask=['0', '1', '2x2'],
564            channels=[1, 6, 11, 36, 40, 44, 48, 149, 153, 157, 161],
565            modes=['bw20', 'bw40', 'bw80'],
566            test_types=['test_ping_range'])
567
568
569class WifiPing_LowPowerAP_Test(WifiPingTest):
570    def __init__(self, controllers):
571        super().__init__(controllers)
572        self.tests = self.generate_test_cases(
573            ap_power='low_power',
574            chain_mask=['0', '1', '2x2'],
575            channels=[1, 6, 11, 36, 40, 44, 48, 149, 153, 157, 161],
576            modes=['bw20', 'bw40', 'bw80'],
577            test_types=['test_ping_range'])
578
579
580# Over-the air version of ping tests
581class WifiOtaPingTest(WifiPingTest):
582    """Class to test over-the-air ping
583
584    This class tests WiFi ping performance in an OTA chamber. It enables
585    setting turntable orientation and other chamber parameters to study
586    performance in varying channel conditions
587    """
588    def __init__(self, controllers):
589        base_test.BaseTestClass.__init__(self, controllers)
590        self.testcase_metric_logger = (
591            BlackboxMappedMetricLogger.for_test_case())
592        self.testclass_metric_logger = (
593            BlackboxMappedMetricLogger.for_test_class())
594        self.publish_testcase_metrics = False
595
596    def setup_class(self):
597        WifiPingTest.setup_class(self)
598        self.ota_chamber = ota_chamber.create(
599            self.user_params['OTAChamber'])[0]
600
601    def teardown_class(self):
602        WifiPingTest.teardown_class(self)
603        self.process_testclass_results()
604        self.ota_chamber.reset_chamber()
605
606    def process_testclass_results(self):
607        """Saves all test results to enable comparison."""
608        WifiPingTest.process_testclass_results(self)
609
610        range_vs_angle = collections.OrderedDict()
611        for test in self.testclass_results:
612            curr_params = test['testcase_params']
613            curr_config = curr_params['channel']
614            if curr_config in range_vs_angle:
615                if curr_params['position'] not in range_vs_angle[curr_config][
616                        'position']:
617                    range_vs_angle[curr_config]['position'].append(
618                        curr_params['position'])
619                    range_vs_angle[curr_config]['range'].append(test['range'])
620                    range_vs_angle[curr_config]['llstats_at_range'].append(
621                        test['llstats_at_range'])
622                else:
623                    range_vs_angle[curr_config]['range'][-1] = test['range']
624                    range_vs_angle[curr_config]['llstats_at_range'][-1] = test[
625                        'llstats_at_range']
626            else:
627                range_vs_angle[curr_config] = {
628                    'position': [curr_params['position']],
629                    'range': [test['range']],
630                    'llstats_at_range': [test['llstats_at_range']]
631                }
632        chamber_mode = self.testclass_results[0]['testcase_params'][
633            'chamber_mode']
634        if chamber_mode == 'orientation':
635            x_label = 'Angle (deg)'
636        elif chamber_mode == 'stepped stirrers':
637            x_label = 'Position Index'
638        figure = wputils.BokehFigure(
639            title='Range vs. Position',
640            x_label=x_label,
641            primary_y_label='Range (dB)',
642        )
643        for channel, channel_data in range_vs_angle.items():
644            figure.add_line(x_data=channel_data['position'],
645                            y_data=channel_data['range'],
646                            hover_text=channel_data['llstats_at_range'],
647                            legend='Channel {}'.format(channel))
648            average_range = sum(channel_data['range']) / len(
649                channel_data['range'])
650            self.log.info('Average range for Channel {} is: {}dB'.format(
651                channel, average_range))
652            metric_name = 'ota_summary_ch{}.avg_range'.format(channel)
653            self.testclass_metric_logger.add_metric(metric_name, average_range)
654        current_context = context.get_current_context().get_full_output_path()
655        plot_file_path = os.path.join(current_context, 'results.html')
656        figure.generate_figure(plot_file_path)
657
658        # Save results
659        results_file_path = os.path.join(current_context,
660                                         'testclass_summary.json')
661        with open(results_file_path, 'w') as results_file:
662            json.dump(range_vs_angle, results_file, indent=4)
663
664    def setup_ping_test(self, testcase_params):
665        WifiPingTest.setup_ping_test(self, testcase_params)
666        # Setup turntable
667        if testcase_params['chamber_mode'] == 'orientation':
668            self.ota_chamber.set_orientation(testcase_params['position'])
669        elif testcase_params['chamber_mode'] == 'stepped stirrers':
670            self.ota_chamber.step_stirrers(testcase_params['total_positions'])
671
672    def extract_test_id(self, testcase_params, id_fields):
673        test_id = collections.OrderedDict(
674            (param, testcase_params[param]) for param in id_fields)
675        return test_id
676
677    def get_range_start_atten(self, testcase_params):
678        """Gets the starting attenuation for this ping test.
679
680        The function gets the starting attenuation by checking whether a test
681        at the same configuration has executed. If so it sets the starting
682        point a configurable number of dBs below the reference test.
683
684        Returns:
685            start_atten: starting attenuation for current test
686        """
687        # If the test is being retried, start from the beginning
688        if self.retry_flag:
689            self.log.info('Retry flag set. Setting attenuation to minimum.')
690            return self.testclass_params['range_atten_start']
691        # Get the current and reference test config. The reference test is the
692        # one performed at the current MCS+1
693        ref_test_params = self.extract_test_id(testcase_params,
694                                               ['channel', 'mode'])
695        # Check if reference test has been run and set attenuation accordingly
696        previous_params = [
697            self.extract_test_id(result['testcase_params'],
698                                 ['channel', 'mode'])
699            for result in self.testclass_results
700        ]
701        try:
702            ref_index = previous_params[::-1].index(ref_test_params)
703            ref_index = len(previous_params) - 1 - ref_index
704            start_atten = self.testclass_results[ref_index][
705                'atten_at_range'] - (
706                    self.testclass_params['adjacent_range_test_gap'])
707        except ValueError:
708            self.log.info(
709                'Reference test not found. Starting from {} dB'.format(
710                    self.testclass_params['range_atten_start']))
711            start_atten = self.testclass_params['range_atten_start']
712        return start_atten
713
714    def generate_test_cases(self, ap_power, channels, modes, chamber_mode,
715                            positions):
716        test_cases = []
717        allowed_configs = {
718            20: [
719                1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 36, 40, 44, 48, 64, 100,
720                116, 132, 140, 149, 153, 157, 161
721            ],
722            40: [36, 44, 100, 149, 157],
723            80: [36, 100, 149],
724            160: [36]
725        }
726        for channel, mode, position in itertools.product(
727                channels, modes, positions):
728            bandwidth = int(''.join([x for x in mode if x.isdigit()]))
729            if channel not in allowed_configs[bandwidth]:
730                continue
731            testcase_name = 'test_ping_range_ch{}_{}_pos{}'.format(
732                channel, mode, position)
733            testcase_params = collections.OrderedDict(
734                test_type='test_ping_range',
735                ap_power=ap_power,
736                channel=channel,
737                mode=mode,
738                bandwidth=bandwidth,
739                chain_mask='2x2',
740                chamber_mode=chamber_mode,
741                total_positions=len(positions),
742                position=position)
743            setattr(self, testcase_name,
744                    partial(self._test_ping, testcase_params))
745            test_cases.append(testcase_name)
746        return test_cases
747
748
749class WifiOtaPing_TenDegree_Test(WifiOtaPingTest):
750    def __init__(self, controllers):
751        WifiOtaPingTest.__init__(self, controllers)
752        self.tests = self.generate_test_cases(ap_power='standard',
753                                              channels=[6, 36, 149],
754                                              modes=['bw20'],
755                                              chamber_mode='orientation',
756                                              positions=list(range(0, 360,
757                                                                   10)))
758
759
760class WifiOtaPing_45Degree_Test(WifiOtaPingTest):
761    def __init__(self, controllers):
762        WifiOtaPingTest.__init__(self, controllers)
763        self.tests = self.generate_test_cases(
764            ap_power='standard',
765            channels=[1, 6, 11, 36, 40, 44, 48, 149, 153, 157, 161],
766            modes=['bw20'],
767            chamber_mode='orientation',
768            positions=list(range(0, 360, 45)))
769
770
771class WifiOtaPing_SteppedStirrers_Test(WifiOtaPingTest):
772    def __init__(self, controllers):
773        WifiOtaPingTest.__init__(self, controllers)
774        self.tests = self.generate_test_cases(ap_power='standard',
775                                              channels=[6, 36, 149],
776                                              modes=['bw20'],
777                                              chamber_mode='stepped stirrers',
778                                              positions=list(range(100)))
779
780
781class WifiOtaPing_LowPowerAP_TenDegree_Test(WifiOtaPingTest):
782    def __init__(self, controllers):
783        WifiOtaPingTest.__init__(self, controllers)
784        self.tests = self.generate_test_cases(ap_power='low_power',
785                                              channels=[6, 36, 149],
786                                              modes=['bw20'],
787                                              chamber_mode='orientation',
788                                              positions=list(range(0, 360,
789                                                                   10)))
790
791
792class WifiOtaPing_LowPowerAP_45Degree_Test(WifiOtaPingTest):
793    def __init__(self, controllers):
794        WifiOtaPingTest.__init__(self, controllers)
795        self.tests = self.generate_test_cases(
796            ap_power='low_power',
797            channels=[1, 6, 11, 36, 40, 44, 48, 149, 153, 157, 161],
798            modes=['bw20'],
799            chamber_mode='orientation',
800            positions=list(range(0, 360, 45)))
801
802
803class WifiOtaPing_LowPowerAP_SteppedStirrers_Test(WifiOtaPingTest):
804    def __init__(self, controllers):
805        WifiOtaPingTest.__init__(self, controllers)
806        self.tests = self.generate_test_cases(ap_power='low_power',
807                                              channels=[6, 36, 149],
808                                              modes=['bw20'],
809                                              chamber_mode='stepped stirrers',
810                                              positions=list(range(100)))