1#!/usr/bin/env python3.4
2#
3#   Copyright 2017 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the 'License');
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an 'AS IS' BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import collections
18import itertools
19import json
20import logging
21import os
22import statistics
23from acts import asserts
24from acts import context
25from acts import base_test
26from acts import utils
27from acts.controllers.utils_lib import ssh
28from acts.metrics.loggers.blackbox import BlackboxMappedMetricLogger
29from acts.test_utils.wifi import ota_chamber
30from acts.test_utils.wifi import ota_sniffer
31from acts.test_utils.wifi import wifi_performance_test_utils as wputils
32from acts.test_utils.wifi import wifi_retail_ap as retail_ap
33from acts.test_utils.wifi import wifi_test_utils as wutils
34from functools import partial
35
36
37class WifiPingTest(base_test.BaseTestClass):
38    """Class for ping-based Wifi performance tests.
39
40    This class implements WiFi ping performance tests such as range and RTT.
41    The class setups up the AP in the desired configurations, configures
42    and connects the phone to the AP, and runs  For an example config file to
43    run this test class see example_connectivity_performance_ap_sta.json.
44    """
45
46    TEST_TIMEOUT = 10
47    RSSI_POLL_INTERVAL = 0.2
48    SHORT_SLEEP = 1
49    MED_SLEEP = 5
50    MAX_CONSECUTIVE_ZEROS = 5
51    DISCONNECTED_PING_RESULT = {
52        'connected': 0,
53        'rtt': [],
54        'time_stamp': [],
55        'ping_interarrivals': [],
56        'packet_loss_percentage': 100
57    }
58
59    def __init__(self, controllers):
60        base_test.BaseTestClass.__init__(self, controllers)
61        self.testcase_metric_logger = (
62            BlackboxMappedMetricLogger.for_test_case())
63        self.testclass_metric_logger = (
64            BlackboxMappedMetricLogger.for_test_class())
65        self.publish_testcase_metrics = True
66
67    def setup_class(self):
68        self.dut = self.android_devices[-1]
69        req_params = [
70            'ping_test_params', 'testbed_params', 'main_network',
71            'RetailAccessPoints', 'RemoteServer'
72        ]
73        opt_params = ['golden_files_list', 'OTASniffer']
74        self.unpack_userparams(req_params, opt_params)
75        self.testclass_params = self.ping_test_params
76        self.num_atten = self.attenuators[0].instrument.num_atten
77        self.ping_server = ssh.connection.SshConnection(
78            ssh.settings.from_config(self.RemoteServer[0]['ssh_config']))
79        self.access_point = retail_ap.create(self.RetailAccessPoints)[0]
80        if hasattr(self, 'OTASniffer'):
81            self.sniffer = ota_sniffer.create(self.OTASniffer)[0]
82        self.log.info('Access Point Configuration: {}'.format(
83            self.access_point.ap_settings))
84        self.log_path = os.path.join(logging.log_path, 'results')
85        os.makedirs(self.log_path, exist_ok=True)
86        if not hasattr(self, 'golden_files_list'):
87            self.golden_files_list = [
88                os.path.join(self.testbed_params['golden_results_path'], file)
89                for file in os.listdir(
90                    self.testbed_params['golden_results_path'])
91            ]
92        if hasattr(self, 'bdf'):
93            self.log.info('Pushing WiFi BDF to DUT.')
94            wputils.push_bdf(self.dut, self.bdf)
95        if hasattr(self, 'firmware'):
96            self.log.info('Pushing WiFi firmware to DUT.')
97            wlanmdsp = [
98                file for file in self.firmware if "wlanmdsp.mbn" in file
99            ][0]
100            data_msc = [file for file in self.firmware
101                        if "Data.msc" in file][0]
102            wputils.push_firmware(self.dut, wlanmdsp, data_msc)
103        self.atten_dut_chain_map = {}
104        self.testclass_results = []
105
106        # Turn WiFi ON
107        if self.testclass_params.get('airplane_mode', 1):
108            self.log.info('Turning on airplane mode.')
109            asserts.assert_true(utils.force_airplane_mode(self.dut, True),
110                                "Can not turn on airplane mode.")
111        wutils.wifi_toggle_state(self.dut, True)
112
113    def teardown_class(self):
114        # Turn WiFi OFF
115        for dev in self.android_devices:
116            wutils.wifi_toggle_state(dev, False)
117        self.process_testclass_results()
118
119    def process_testclass_results(self):
120        """Saves all test results to enable comparison."""
121        testclass_summary = {}
122        for test in self.testclass_results:
123            if 'range' in test['test_name']:
124                testclass_summary[test['test_name']] = test['range']
125        # Save results
126        results_file_path = os.path.join(self.log_path,
127                                         'testclass_summary.json')
128        with open(results_file_path, 'w') as results_file:
129            json.dump(testclass_summary, results_file, indent=4)
130
131    def pass_fail_check_ping_rtt(self, result):
132        """Check the test result and decide if it passed or failed.
133
134        The function computes RTT statistics and fails any tests in which the
135        tail of the ping latency results exceeds the threshold defined in the
136        configuration file.
137
138        Args:
139            result: dict containing ping results and other meta data
140        """
141        ignored_fraction = (self.testclass_params['rtt_ignored_interval'] /
142                            self.testclass_params['rtt_ping_duration'])
143        sorted_rtt = [
144            sorted(x['rtt'][round(ignored_fraction * len(x['rtt'])):])
145            for x in result['ping_results']
146        ]
147        disconnected = any([len(x) == 0 for x in sorted_rtt])
148        if disconnected:
149            asserts.fail('Test failed. DUT disconnected at least once.')
150
151        rtt_at_test_percentile = [
152            x[int((1 - self.testclass_params['rtt_test_percentile'] / 100) *
153                  len(x))] for x in sorted_rtt
154        ]
155        # Set blackbox metric
156        if self.publish_testcase_metrics:
157            self.testcase_metric_logger.add_metric('ping_rtt',
158                                                   max(rtt_at_test_percentile))
159        # Evaluate test pass/fail
160        rtt_failed = any([
161            rtt > self.testclass_params['rtt_threshold'] * 1000
162            for rtt in rtt_at_test_percentile
163        ])
164        if rtt_failed:
165            asserts.fail('Test failed. RTTs at test percentile = {}'.format(
166                rtt_at_test_percentile))
167        else:
168            asserts.explicit_pass(
169                'Test Passed. RTTs at test percentile = {}'.format(
170                    rtt_at_test_percentile))
171
172    def pass_fail_check_ping_range(self, result):
173        """Check the test result and decide if it passed or failed.
174
175        Checks whether the attenuation at which ping packet losses begin to
176        exceed the threshold matches the range derived from golden
177        rate-vs-range result files. The test fails is ping range is
178        range_gap_threshold worse than RvR range.
179
180        Args:
181            result: dict containing ping results and meta data
182        """
183        # Get target range
184        rvr_range = self.get_range_from_rvr()
185        # Set Blackbox metric
186        if self.publish_testcase_metrics:
187            self.testcase_metric_logger.add_metric('ping_range',
188                                                   result['range'])
189        # Evaluate test pass/fail
190        test_message = ('Attenuation at range is {}dB. Golden range is {}dB. '
191                        'LLStats at Range: {}'.format(
192                            result['range'], rvr_range,
193                            result['llstats_at_range']))
194        if result['range'] - rvr_range < -self.testclass_params[
195                'range_gap_threshold']:
196            asserts.fail(test_message)
197        else:
198            asserts.explicit_pass(test_message)
199
200    def pass_fail_check(self, result):
201        if 'range' in result['testcase_params']['test_type']:
202            self.pass_fail_check_ping_range(result)
203        else:
204            self.pass_fail_check_ping_rtt(result)
205
206    def process_ping_results(self, testcase_params, ping_range_result):
207        """Saves and plots ping results.
208
209        Args:
210            ping_range_result: dict containing ping results and metadata
211        """
212        # Compute range
213        ping_loss_over_att = [
214            x['packet_loss_percentage']
215            for x in ping_range_result['ping_results']
216        ]
217        ping_loss_above_threshold = [
218            x > self.testclass_params['range_ping_loss_threshold']
219            for x in ping_loss_over_att
220        ]
221        for idx in range(len(ping_loss_above_threshold)):
222            if all(ping_loss_above_threshold[idx:]):
223                range_index = max(idx, 1) - 1
224                break
225        else:
226            range_index = -1
227        ping_range_result['atten_at_range'] = testcase_params['atten_range'][
228            range_index]
229        ping_range_result['peak_throughput_pct'] = 100 - min(
230            ping_loss_over_att)
231        ping_range_result['range'] = (ping_range_result['atten_at_range'] +
232                                      ping_range_result['fixed_attenuation'])
233        ping_range_result['llstats_at_range'] = (
234            'TX MCS = {0} ({1:.1f}%). '
235            'RX MCS = {2} ({3:.1f}%)'.format(
236                ping_range_result['llstats'][range_index]['summary']
237                ['common_tx_mcs'], ping_range_result['llstats'][range_index]
238                ['summary']['common_tx_mcs_freq'] * 100,
239                ping_range_result['llstats'][range_index]['summary']
240                ['common_rx_mcs'], ping_range_result['llstats'][range_index]
241                ['summary']['common_rx_mcs_freq'] * 100))
242
243        # Save results
244        results_file_path = os.path.join(
245            self.log_path, '{}.json'.format(self.current_test_name))
246        with open(results_file_path, 'w') as results_file:
247            json.dump(ping_range_result, results_file, indent=4)
248
249        # Plot results
250        if 'range' not in self.current_test_name:
251            figure = wputils.BokehFigure(
252                self.current_test_name,
253                x_label='Timestamp (s)',
254                primary_y_label='Round Trip Time (ms)')
255            for idx, result in enumerate(ping_range_result['ping_results']):
256                if len(result['rtt']) > 1:
257                    x_data = [
258                        t - result['time_stamp'][0]
259                        for t in result['time_stamp']
260                    ]
261                    figure.add_line(
262                        x_data, result['rtt'], 'RTT @ {}dB'.format(
263                            ping_range_result['attenuation'][idx]))
264
265            output_file_path = os.path.join(
266                self.log_path, '{}.html'.format(self.current_test_name))
267            figure.generate_figure(output_file_path)
268
269    def get_range_from_rvr(self):
270        """Function gets range from RvR golden results
271
272        The function fetches the attenuation at which the RvR throughput goes
273        to zero.
274
275        Returns:
276            rvr_range: range derived from looking at rvr curves
277        """
278        # Fetch the golden RvR results
279        test_name = self.current_test_name
280        rvr_golden_file_name = 'test_rvr_TCP_DL_' + '_'.join(
281            test_name.split('_')[3:])
282        golden_path = [
283            file_name for file_name in self.golden_files_list
284            if rvr_golden_file_name in file_name
285        ]
286        if len(golden_path) == 0:
287            rvr_range = float('nan')
288            return rvr_range
289
290        # Get 0 Mbps attenuation and backoff by low_rssi_backoff_from_range
291        with open(golden_path[0], 'r') as golden_file:
292            golden_results = json.load(golden_file)
293        try:
294            atten_idx = golden_results['throughput_receive'].index(0)
295            rvr_range = (golden_results['attenuation'][atten_idx - 1] +
296                         golden_results['fixed_attenuation'])
297        except ValueError:
298            rvr_range = float('nan')
299        return rvr_range
300
301    def run_ping_test(self, testcase_params):
302        """Main function to test ping.
303
304        The function sets up the AP in the correct channel and mode
305        configuration and calls get_ping_stats while sweeping attenuation
306
307        Args:
308            testcase_params: dict containing all test parameters
309        Returns:
310            test_result: dict containing ping results and other meta data
311        """
312        # Prepare results dict
313        llstats_obj = wputils.LinkLayerStats(self.dut)
314        test_result = collections.OrderedDict()
315        test_result['testcase_params'] = testcase_params.copy()
316        test_result['test_name'] = self.current_test_name
317        test_result['ap_config'] = self.access_point.ap_settings.copy()
318        test_result['attenuation'] = testcase_params['atten_range']
319        test_result['fixed_attenuation'] = self.testbed_params[
320            'fixed_attenuation'][str(testcase_params['channel'])]
321        test_result['rssi_results'] = []
322        test_result['ping_results'] = []
323        test_result['llstats'] = []
324        # Setup sniffer
325        if self.testbed_params['sniffer_enable']:
326            self.sniffer.start_capture(
327                testcase_params['test_network'],
328                testcase_params['ping_duration'] *
329                len(testcase_params['atten_range']) + self.TEST_TIMEOUT)
330        # Run ping and sweep attenuation as needed
331        zero_counter = 0
332        for atten in testcase_params['atten_range']:
333            for attenuator in self.attenuators:
334                attenuator.set_atten(atten, strict=False)
335            rssi_future = wputils.get_connected_rssi_nb(
336                self.dut,
337                int(testcase_params['ping_duration'] / 2 /
338                    self.RSSI_POLL_INTERVAL), self.RSSI_POLL_INTERVAL,
339                testcase_params['ping_duration'] / 2)
340            # Refresh link layer stats
341            llstats_obj.update_stats()
342            current_ping_stats = wputils.get_ping_stats(
343                self.ping_server, self.dut_ip,
344                testcase_params['ping_duration'],
345                testcase_params['ping_interval'], testcase_params['ping_size'])
346            current_rssi = rssi_future.result()
347            test_result['rssi_results'].append(current_rssi)
348            llstats_obj.update_stats()
349            curr_llstats = llstats_obj.llstats_incremental.copy()
350            test_result['llstats'].append(curr_llstats)
351            if current_ping_stats['connected']:
352                self.log.info(
353                    'Attenuation = {0}dB\tPacket Loss = {1}%\t'
354                    'Avg RTT = {2:.2f}ms\tRSSI = {3} [{4},{5}]\t'.format(
355                        atten, current_ping_stats['packet_loss_percentage'],
356                        statistics.mean(current_ping_stats['rtt']),
357                        current_rssi['signal_poll_rssi']['mean'],
358                        current_rssi['chain_0_rssi']['mean'],
359                        current_rssi['chain_1_rssi']['mean']))
360                if current_ping_stats['packet_loss_percentage'] == 100:
361                    zero_counter = zero_counter + 1
362                else:
363                    zero_counter = 0
364            else:
365                self.log.info(
366                    'Attenuation = {}dB. Disconnected.'.format(atten))
367                zero_counter = zero_counter + 1
368            test_result['ping_results'].append(current_ping_stats.as_dict())
369            if zero_counter == self.MAX_CONSECUTIVE_ZEROS:
370                self.log.info('Ping loss stable at 100%. Stopping test now.')
371                for idx in range(
372                        len(testcase_params['atten_range']) -
373                        len(test_result['ping_results'])):
374                    test_result['ping_results'].append(
375                        self.DISCONNECTED_PING_RESULT)
376                break
377        if self.testbed_params['sniffer_enable']:
378            self.sniffer.stop_capture()
379        return test_result
380
381    def setup_ap(self, testcase_params):
382        """Sets up the access point in the configuration required by the test.
383
384        Args:
385            testcase_params: dict containing AP and other test params
386        """
387        band = self.access_point.band_lookup_by_channel(
388            testcase_params['channel'])
389        if '2G' in band:
390            frequency = wutils.WifiEnums.channel_2G_to_freq[
391                testcase_params['channel']]
392        else:
393            frequency = wutils.WifiEnums.channel_5G_to_freq[
394                testcase_params['channel']]
395        if frequency in wutils.WifiEnums.DFS_5G_FREQUENCIES:
396            self.access_point.set_region(self.testbed_params['DFS_region'])
397        else:
398            self.access_point.set_region(self.testbed_params['default_region'])
399        self.access_point.set_channel(band, testcase_params['channel'])
400        self.access_point.set_bandwidth(band, testcase_params['mode'])
401        if 'low' in testcase_params['ap_power']:
402            self.log.info('Setting low AP power.')
403            self.access_point.set_power(band, 0)
404        self.log.info('Access Point Configuration: {}'.format(
405            self.access_point.ap_settings))
406
407    def setup_dut(self, testcase_params):
408        """Sets up the DUT in the configuration required by the test.
409
410        Args:
411            testcase_params: dict containing AP and other test params
412        """
413        # Check battery level before test
414        if not wputils.health_check(self.dut, 10):
415            asserts.skip('Battery level too low. Skipping test.')
416        # Turn screen off to preserve battery
417        self.dut.go_to_sleep()
418        if wputils.validate_network(self.dut,
419                                    testcase_params['test_network']['SSID']):
420            self.log.info('Already connected to desired network')
421        else:
422            wutils.reset_wifi(self.dut)
423            wutils.set_wifi_country_code(self.dut,
424                self.testclass_params['country_code'])
425            testcase_params['test_network']['channel'] = testcase_params[
426                'channel']
427            wutils.wifi_connect(self.dut,
428                                testcase_params['test_network'],
429                                num_of_tries=5,
430                                check_connectivity=True)
431        self.dut_ip = self.dut.droid.connectivityGetIPv4Addresses('wlan0')[0]
432        if testcase_params['channel'] not in self.atten_dut_chain_map.keys():
433            self.atten_dut_chain_map[testcase_params[
434                'channel']] = wputils.get_current_atten_dut_chain_map(
435                    self.attenuators, self.dut, self.ping_server)
436        self.log.info("Current Attenuator-DUT Chain Map: {}".format(
437            self.atten_dut_chain_map[testcase_params['channel']]))
438        for idx, atten in enumerate(self.attenuators):
439            if self.atten_dut_chain_map[testcase_params['channel']][
440                    idx] == testcase_params['attenuated_chain']:
441                atten.offset = atten.instrument.max_atten
442            else:
443                atten.offset = 0
444
445    def setup_ping_test(self, testcase_params):
446        """Function that gets devices ready for the test.
447
448        Args:
449            testcase_params: dict containing test-specific parameters
450        """
451        # Configure AP
452        self.setup_ap(testcase_params)
453        # Set attenuator to 0 dB
454        for attenuator in self.attenuators:
455            attenuator.set_atten(0, strict=False)
456        # Reset, configure, and connect DUT
457        self.setup_dut(testcase_params)
458
459    def get_range_start_atten(self, testcase_params):
460        """Gets the starting attenuation for this ping test.
461
462        This function is used to get the starting attenuation for ping range
463        tests. This implementation returns the default starting attenuation,
464        however, defining this function enables a more involved configuration
465        for over-the-air test classes.
466
467        Args:
468            testcase_params: dict containing all test params
469        """
470        return self.testclass_params['range_atten_start']
471
472    def compile_test_params(self, testcase_params):
473        band = self.access_point.band_lookup_by_channel(
474            testcase_params['channel'])
475        testcase_params['test_network'] = self.main_network[band]
476        if testcase_params['chain_mask'] in ['0', '1']:
477            testcase_params['attenuated_chain'] = 'DUT-Chain-{}'.format(
478                1 if testcase_params['chain_mask'] == '0' else 0)
479        else:
480            # Set attenuated chain to -1. Do not set to None as this will be
481            # compared to RF chain map which may include None
482            testcase_params['attenuated_chain'] = -1
483        if testcase_params['test_type'] == 'test_ping_range':
484            testcase_params.update(
485                ping_interval=self.testclass_params['range_ping_interval'],
486                ping_duration=self.testclass_params['range_ping_duration'],
487                ping_size=self.testclass_params['ping_size'],
488            )
489        elif testcase_params['test_type'] == 'test_fast_ping_rtt':
490            testcase_params.update(
491                ping_interval=self.testclass_params['rtt_ping_interval']
492                ['fast'],
493                ping_duration=self.testclass_params['rtt_ping_duration'],
494                ping_size=self.testclass_params['ping_size'],
495            )
496        elif testcase_params['test_type'] == 'test_slow_ping_rtt':
497            testcase_params.update(
498                ping_interval=self.testclass_params['rtt_ping_interval']
499                ['slow'],
500                ping_duration=self.testclass_params['rtt_ping_duration'],
501                ping_size=self.testclass_params['ping_size'])
502
503        if testcase_params['test_type'] == 'test_ping_range':
504            start_atten = self.get_range_start_atten(testcase_params)
505            num_atten_steps = int(
506                (self.testclass_params['range_atten_stop'] - start_atten) /
507                self.testclass_params['range_atten_step'])
508            testcase_params['atten_range'] = [
509                start_atten + x * self.testclass_params['range_atten_step']
510                for x in range(0, num_atten_steps)
511            ]
512        else:
513            testcase_params['atten_range'] = self.testclass_params[
514                'rtt_test_attenuation']
515        return testcase_params
516
517    def _test_ping(self, testcase_params):
518        """ Function that gets called for each range test case
519
520        The function gets called in each range test case. It customizes the
521        range test based on the test name of the test that called it
522
523        Args:
524            testcase_params: dict containing preliminary set of parameters
525        """
526        # Compile test parameters from config and test name
527        testcase_params = self.compile_test_params(testcase_params)
528        # Run ping test
529        self.setup_ping_test(testcase_params)
530        ping_result = self.run_ping_test(testcase_params)
531        # Postprocess results
532        self.testclass_results.append(ping_result)
533        self.process_ping_results(testcase_params, ping_result)
534        self.pass_fail_check(ping_result)
535
536    def generate_test_cases(self, ap_power, channels, modes, chain_mask,
537                            test_types):
538        test_cases = []
539        allowed_configs = {
540            'VHT20': [
541                1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 36, 40, 44, 48, 149, 153,
542                157, 161
543            ],
544            'VHT40': [36, 44, 149, 157],
545            'VHT80': [36, 149]
546        }
547        for channel, mode, chain, test_type in itertools.product(
548                channels, modes, chain_mask, test_types):
549            if channel not in allowed_configs[mode]:
550                continue
551            testcase_name = '{}_ch{}_{}_ch{}'.format(test_type, channel, mode,
552                                                     chain)
553            testcase_params = collections.OrderedDict(test_type=test_type,
554                                                      ap_power=ap_power,
555                                                      channel=channel,
556                                                      mode=mode,
557                                                      chain_mask=chain)
558            setattr(self, testcase_name,
559                    partial(self._test_ping, testcase_params))
560            test_cases.append(testcase_name)
561        return test_cases
562
563
564class WifiPing_TwoChain_Test(WifiPingTest):
565    def __init__(self, controllers):
566        super().__init__(controllers)
567        self.tests = self.generate_test_cases(
568            ap_power='standard',
569            channels=[1, 6, 11, 36, 40, 44, 48, 149, 153, 157, 161],
570            modes=['VHT20', 'VHT40', 'VHT80'],
571            test_types=[
572                'test_ping_range', 'test_fast_ping_rtt', 'test_slow_ping_rtt'
573            ],
574            chain_mask=['2x2'])
575
576
577class WifiPing_PerChainRange_Test(WifiPingTest):
578    def __init__(self, controllers):
579        super().__init__(controllers)
580        self.tests = self.generate_test_cases(
581            ap_power='standard',
582            chain_mask=['0', '1', '2x2'],
583            channels=[1, 6, 11, 36, 40, 44, 48, 149, 153, 157, 161],
584            modes=['VHT20', 'VHT40', 'VHT80'],
585            test_types=['test_ping_range'])
586
587
588class WifiPing_LowPowerAP_Test(WifiPingTest):
589    def __init__(self, controllers):
590        super().__init__(controllers)
591        self.tests = self.generate_test_cases(
592            ap_power='low_power',
593            chain_mask=['0', '1', '2x2'],
594            channels=[1, 6, 11, 36, 40, 44, 48, 149, 153, 157, 161],
595            modes=['VHT20', 'VHT40', 'VHT80'],
596            test_types=['test_ping_range'])
597
598
599# Over-the air version of ping tests
600class WifiOtaPingTest(WifiPingTest):
601    """Class to test over-the-air ping
602
603    This class tests WiFi ping performance in an OTA chamber. It enables
604    setting turntable orientation and other chamber parameters to study
605    performance in varying channel conditions
606    """
607    def __init__(self, controllers):
608        base_test.BaseTestClass.__init__(self, controllers)
609        self.testcase_metric_logger = (
610            BlackboxMappedMetricLogger.for_test_case())
611        self.testclass_metric_logger = (
612            BlackboxMappedMetricLogger.for_test_class())
613        self.publish_testcase_metrics = False
614
615    def setup_class(self):
616        WifiPingTest.setup_class(self)
617        self.ota_chamber = ota_chamber.create(
618            self.user_params['OTAChamber'])[0]
619
620    def teardown_class(self):
621        self.process_testclass_results()
622        self.ota_chamber.reset_chamber()
623
624    def process_testclass_results(self):
625        """Saves all test results to enable comparison."""
626        WifiPingTest.process_testclass_results(self)
627
628        range_vs_angle = collections.OrderedDict()
629        for test in self.testclass_results:
630            curr_params = test['testcase_params']
631            curr_config = curr_params['channel']
632            if curr_config in range_vs_angle:
633                range_vs_angle[curr_config]['position'].append(
634                    curr_params['position'])
635                range_vs_angle[curr_config]['range'].append(test['range'])
636                range_vs_angle[curr_config]['llstats_at_range'].append(
637                    test['llstats_at_range'])
638            else:
639                range_vs_angle[curr_config] = {
640                    'position': [curr_params['position']],
641                    'range': [test['range']],
642                    'llstats_at_range': [test['llstats_at_range']]
643                }
644        chamber_mode = self.testclass_results[0]['testcase_params'][
645            'chamber_mode']
646        if chamber_mode == 'orientation':
647            x_label = 'Angle (deg)'
648        elif chamber_mode == 'stepped stirrers':
649            x_label = 'Position Index'
650        figure = wputils.BokehFigure(
651            title='Range vs. Position',
652            x_label=x_label,
653            primary_y_label='Range (dB)',
654        )
655        for channel, channel_data in range_vs_angle.items():
656            figure.add_line(x_data=channel_data['position'],
657                            y_data=channel_data['range'],
658                            hover_text=channel_data['llstats_at_range'],
659                            legend='Channel {}'.format(channel))
660            average_range = sum(channel_data['range']) / len(
661                channel_data['range'])
662            self.log.info('Average range for Channel {} is: {}dB'.format(
663                channel, average_range))
664            metric_name = 'ota_summary_ch{}.avg_range'.format(channel)
665            self.testclass_metric_logger.add_metric(metric_name, average_range)
666        current_context = context.get_current_context().get_full_output_path()
667        plot_file_path = os.path.join(current_context, 'results.html')
668        figure.generate_figure(plot_file_path)
669
670        # Save results
671        results_file_path = os.path.join(current_context,
672                                         'testclass_summary.json')
673        with open(results_file_path, 'w') as results_file:
674            json.dump(range_vs_angle, results_file, indent=4)
675
676    def setup_ping_test(self, testcase_params):
677        WifiPingTest.setup_ping_test(self, testcase_params)
678        # Setup turntable
679        if testcase_params['chamber_mode'] == 'orientation':
680            self.ota_chamber.set_orientation(testcase_params['position'])
681        elif testcase_params['chamber_mode'] == 'stepped stirrers':
682            self.ota_chamber.step_stirrers(testcase_params['total_positions'])
683
684    def extract_test_id(self, testcase_params, id_fields):
685        test_id = collections.OrderedDict(
686            (param, testcase_params[param]) for param in id_fields)
687        return test_id
688
689    def get_range_start_atten(self, testcase_params):
690        """Gets the starting attenuation for this ping test.
691
692        The function gets the starting attenuation by checking whether a test
693        at the same configuration has executed. If so it sets the starting
694        point a configurable number of dBs below the reference test.
695
696        Returns:
697            start_atten: starting attenuation for current test
698        """
699        # Get the current and reference test config. The reference test is the
700        # one performed at the current MCS+1
701        ref_test_params = self.extract_test_id(testcase_params,
702                                               ['channel', 'mode'])
703        # Check if reference test has been run and set attenuation accordingly
704        previous_params = [
705            self.extract_test_id(result['testcase_params'],
706                                 ['channel', 'mode'])
707            for result in self.testclass_results
708        ]
709        try:
710            ref_index = previous_params[::-1].index(ref_test_params)
711            ref_index = len(previous_params) - 1 - ref_index
712            start_atten = self.testclass_results[ref_index][
713                'atten_at_range'] - (
714                    self.testclass_params['adjacent_range_test_gap'])
715        except ValueError:
716            print('Reference test not found. Starting from {} dB'.format(
717                self.testclass_params['range_atten_start']))
718            start_atten = self.testclass_params['range_atten_start']
719        return start_atten
720
721    def generate_test_cases(self, ap_power, channels, modes, chamber_mode,
722                            positions):
723        test_cases = []
724        allowed_configs = {
725            'VHT20': [
726                1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 36, 40, 44, 48, 149, 153,
727                157, 161
728            ],
729            'VHT40': [36, 44, 149, 157],
730            'VHT80': [36, 149]
731        }
732        for channel, mode, position in itertools.product(
733                channels, modes, positions):
734            if channel not in allowed_configs[mode]:
735                continue
736            testcase_name = 'test_ping_range_ch{}_{}_pos{}'.format(
737                channel, mode, position)
738            testcase_params = collections.OrderedDict(
739                test_type='test_ping_range',
740                ap_power=ap_power,
741                channel=channel,
742                mode=mode,
743                chain_mask='2x2',
744                chamber_mode=chamber_mode,
745                total_positions=len(positions),
746                position=position)
747            setattr(self, testcase_name,
748                    partial(self._test_ping, testcase_params))
749            test_cases.append(testcase_name)
750        return test_cases
751
752
753class WifiOtaPing_TenDegree_Test(WifiOtaPingTest):
754    def __init__(self, controllers):
755        WifiOtaPingTest.__init__(self, controllers)
756        self.tests = self.generate_test_cases(ap_power='standard',
757                                              channels=[6, 36, 149],
758                                              modes=['VHT20'],
759                                              chamber_mode='orientation',
760                                              positions=list(range(0, 360,
761                                                                   10)))
762
763
764class WifiOtaPing_45Degree_Test(WifiOtaPingTest):
765    def __init__(self, controllers):
766        WifiOtaPingTest.__init__(self, controllers)
767        self.tests = self.generate_test_cases(
768            ap_power='standard',
769            channels=[1, 6, 11, 36, 40, 44, 48, 149, 153, 157, 161],
770            modes=['VHT20'],
771            chamber_mode='orientation',
772            positions=list(range(0, 360, 45)))
773
774
775class WifiOtaPing_SteppedStirrers_Test(WifiOtaPingTest):
776    def __init__(self, controllers):
777        WifiOtaPingTest.__init__(self, controllers)
778        self.tests = self.generate_test_cases(ap_power='standard',
779                                              channels=[6, 36, 149],
780                                              modes=['VHT20'],
781                                              chamber_mode='stepped stirrers',
782                                              positions=list(range(100)))
783
784
785class WifiOtaPing_LowPowerAP_TenDegree_Test(WifiOtaPingTest):
786    def __init__(self, controllers):
787        WifiOtaPingTest.__init__(self, controllers)
788        self.tests = self.generate_test_cases(ap_power='low_power',
789                                              channels=[6, 36, 149],
790                                              modes=['VHT20'],
791                                              chamber_mode='orientation',
792                                              positions=list(range(0, 360,
793                                                                   10)))
794
795
796class WifiOtaPing_LowPowerAP_45Degree_Test(WifiOtaPingTest):
797    def __init__(self, controllers):
798        WifiOtaPingTest.__init__(self, controllers)
799        self.tests = self.generate_test_cases(
800            ap_power='low_power',
801            channels=[1, 6, 11, 36, 40, 44, 48, 149, 153, 157, 161],
802            modes=['VHT20'],
803            chamber_mode='orientation',
804            positions=list(range(0, 360, 45)))
805
806
807class WifiOtaPing_LowPowerAP_SteppedStirrers_Test(WifiOtaPingTest):
808    def __init__(self, controllers):
809        WifiOtaPingTest.__init__(self, controllers)
810        self.tests = self.generate_test_cases(ap_power='low_power',
811                                              channels=[6, 36, 149],
812                                              modes=['VHT20'],
813                                              chamber_mode='stepped stirrers',
814                                              positions=list(range(100)))
815