1#!/usr/bin/env python3
2#
3#   Copyright 2017 - Google
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16"""
17    Base Class for Defining Common WiFi Test Functionality
18"""
19
20import copy
21import itertools
22import os
23import time
24
25import acts.controllers.access_point as ap
26
27from acts import asserts
28from acts import signals
29from acts import utils
30from acts.base_test import BaseTestClass
31from acts.signals import TestSignal
32from acts.controllers import android_device
33from acts.controllers.access_point import AccessPoint
34from acts.controllers.ap_lib import hostapd_ap_preset
35from acts.controllers.ap_lib import hostapd_bss_settings
36from acts.controllers.ap_lib import hostapd_constants
37from acts.controllers.ap_lib import hostapd_security
38from acts.keys import Config
39from acts_contrib.test_utils.net import net_test_utils as nutils
40from acts_contrib.test_utils.wifi import wifi_test_utils as wutils
41
42AP_1 = 0
43AP_2 = 1
44MAX_AP_COUNT = 2
45
46
47class WifiBaseTest(BaseTestClass):
48    def __init__(self, configs):
49        super().__init__(configs)
50        self.enable_packet_log = False
51        self.packet_log_2g = hostapd_constants.AP_DEFAULT_CHANNEL_2G
52        self.packet_log_5g = hostapd_constants.AP_DEFAULT_CHANNEL_5G
53
54    def setup_class(self):
55        if hasattr(self, 'attenuators') and self.attenuators:
56            for attenuator in self.attenuators:
57                attenuator.set_atten(0)
58        opt_param = ["pixel_models", "cnss_diag_file", "country_code_file"]
59        self.unpack_userparams(opt_param_names=opt_param)
60        if hasattr(self, "cnss_diag_file"):
61            if isinstance(self.cnss_diag_file, list):
62                self.cnss_diag_file = self.cnss_diag_file[0]
63            if not os.path.isfile(self.cnss_diag_file):
64                self.cnss_diag_file = os.path.join(
65                    self.user_params[Config.key_config_path.value],
66                    self.cnss_diag_file)
67        if self.enable_packet_log and hasattr(self, "packet_capture"):
68            self.packet_logger = self.packet_capture[0]
69            self.packet_logger.configure_monitor_mode("2G", self.packet_log_2g)
70            self.packet_logger.configure_monitor_mode("5G", self.packet_log_5g)
71        if hasattr(self, "android_devices"):
72            for ad in self.android_devices:
73                wutils.wifi_test_device_init(ad)
74                if hasattr(self, "country_code_file"):
75                    if isinstance(self.country_code_file, list):
76                        self.country_code_file = self.country_code_file[0]
77                    if not os.path.isfile(self.country_code_file):
78                        self.country_code_file = os.path.join(
79                            self.user_params[Config.key_config_path.value],
80                            self.country_code_file)
81                    self.country_code = utils.load_config(
82                        self.country_code_file)["country"]
83                    wutils.set_wifi_country_code(ad, self.country_code)
84
85    def setup_test(self):
86        if (hasattr(self, "android_devices")
87                and hasattr(self, "cnss_diag_file")
88                and hasattr(self, "pixel_models")):
89            wutils.start_cnss_diags(self.android_devices, self.cnss_diag_file,
90                                    self.pixel_models)
91        self.tcpdump_proc = []
92        if hasattr(self, "android_devices"):
93            for ad in self.android_devices:
94                proc = nutils.start_tcpdump(ad, self.test_name)
95                self.tcpdump_proc.append((ad, proc))
96        if hasattr(self, "packet_logger"):
97            self.packet_log_pid = wutils.start_pcap(self.packet_logger, 'dual',
98                                                    self.test_name)
99
100    def teardown_test(self):
101        if (hasattr(self, "android_devices")
102                and hasattr(self, "cnss_diag_file")
103                and hasattr(self, "pixel_models")):
104            wutils.stop_cnss_diags(self.android_devices, self.pixel_models)
105            for proc in self.tcpdump_proc:
106                nutils.stop_tcpdump(proc[0],
107                                    proc[1],
108                                    self.test_name,
109                                    pull_dump=False)
110            self.tcpdump_proc = []
111        if hasattr(self, "packet_logger") and self.packet_log_pid:
112            wutils.stop_pcap(self.packet_logger,
113                             self.packet_log_pid,
114                             test_status=True)
115            self.packet_log_pid = {}
116
117    def on_fail(self, test_name, begin_time):
118        if hasattr(self, "android_devices"):
119            for ad in self.android_devices:
120                ad.take_bug_report(test_name, begin_time)
121                ad.cat_adb_log(test_name, begin_time)
122                wutils.get_ssrdumps(ad)
123            if (hasattr(self, "cnss_diag_file")
124                    and hasattr(self, "pixel_models")):
125                wutils.stop_cnss_diags(self.android_devices, self.pixel_models)
126                for ad in self.android_devices:
127                    wutils.get_cnss_diag_log(ad)
128            for proc in self.tcpdump_proc:
129                nutils.stop_tcpdump(proc[0], proc[1], self.test_name)
130            self.tcpdump_proc = []
131        if hasattr(self, "packet_logger") and self.packet_log_pid:
132            wutils.stop_pcap(self.packet_logger,
133                             self.packet_log_pid,
134                             test_status=False)
135            self.packet_log_pid = {}
136
137    def get_psk_network(
138            self,
139            mirror_ap,
140            reference_networks,
141            hidden=False,
142            same_ssid=False,
143            security_mode=hostapd_constants.WPA2_STRING,
144            ssid_length_2g=hostapd_constants.AP_SSID_LENGTH_2G,
145            ssid_length_5g=hostapd_constants.AP_SSID_LENGTH_5G,
146            passphrase_length_2g=hostapd_constants.AP_PASSPHRASE_LENGTH_2G,
147            passphrase_length_5g=hostapd_constants.AP_PASSPHRASE_LENGTH_5G):
148        """Generates SSID and passphrase for a WPA2 network using random
149           generator.
150
151           Args:
152               mirror_ap: Boolean, determines if both APs use the same hostapd
153                          config or different configs.
154               reference_networks: List of PSK networks.
155               same_ssid: Boolean, determines if both bands on AP use the same
156                          SSID.
157               ssid_length_2gecond AP Int, number of characters to use for 2G SSID.
158               ssid_length_5g: Int, number of characters to use for 5G SSID.
159               passphrase_length_2g: Int, length of password for 2G network.
160               passphrase_length_5g: Int, length of password for 5G network.
161
162           Returns: A dict of 2G and 5G network lists for hostapd configuration.
163
164        """
165        network_dict_2g = {}
166        network_dict_5g = {}
167        ref_5g_security = security_mode
168        ref_2g_security = security_mode
169
170        if same_ssid:
171            ref_2g_ssid = 'xg_%s' % utils.rand_ascii_str(ssid_length_2g)
172            ref_5g_ssid = ref_2g_ssid
173
174            ref_2g_passphrase = utils.rand_ascii_str(passphrase_length_2g)
175            ref_5g_passphrase = ref_2g_passphrase
176
177        else:
178            ref_2g_ssid = '2g_%s' % utils.rand_ascii_str(ssid_length_2g)
179            ref_2g_passphrase = utils.rand_ascii_str(passphrase_length_2g)
180
181            ref_5g_ssid = '5g_%s' % utils.rand_ascii_str(ssid_length_5g)
182            ref_5g_passphrase = utils.rand_ascii_str(passphrase_length_5g)
183
184        network_dict_2g = {
185            "SSID": ref_2g_ssid,
186            "security": ref_2g_security,
187            "password": ref_2g_passphrase,
188            "hiddenSSID": hidden
189        }
190
191        network_dict_5g = {
192            "SSID": ref_5g_ssid,
193            "security": ref_5g_security,
194            "password": ref_5g_passphrase,
195            "hiddenSSID": hidden
196        }
197
198        ap = 0
199        for ap in range(MAX_AP_COUNT):
200            reference_networks.append({
201                "2g": copy.copy(network_dict_2g),
202                "5g": copy.copy(network_dict_5g)
203            })
204            if not mirror_ap:
205                break
206        return {"2g": network_dict_2g, "5g": network_dict_5g}
207
208    def get_open_network(self,
209                         mirror_ap,
210                         open_network,
211                         hidden=False,
212                         same_ssid=False,
213                         ssid_length_2g=hostapd_constants.AP_SSID_LENGTH_2G,
214                         ssid_length_5g=hostapd_constants.AP_SSID_LENGTH_5G,
215                         security_mode='none'):
216        """Generates SSIDs for a open network using a random generator.
217
218        Args:
219            mirror_ap: Boolean, determines if both APs use the same hostapd
220                       config or different configs.
221            open_network: List of open networks.
222            same_ssid: Boolean, determines if both bands on AP use the same
223                       SSID.
224            ssid_length_2g: Int, number of characters to use for 2G SSID.
225            ssid_length_5g: Int, number of characters to use for 5G SSID.
226            security_mode: 'none' for open and 'OWE' for WPA3 OWE.
227
228        Returns: A dict of 2G and 5G network lists for hostapd configuration.
229
230        """
231        network_dict_2g = {}
232        network_dict_5g = {}
233
234        if same_ssid:
235            open_2g_ssid = 'xg_%s' % utils.rand_ascii_str(ssid_length_2g)
236            open_5g_ssid = open_2g_ssid
237
238        else:
239            open_2g_ssid = '2g_%s' % utils.rand_ascii_str(ssid_length_2g)
240            open_5g_ssid = '5g_%s' % utils.rand_ascii_str(ssid_length_5g)
241
242        network_dict_2g = {
243            "SSID": open_2g_ssid,
244            "security": security_mode,
245            "hiddenSSID": hidden
246        }
247
248        network_dict_5g = {
249            "SSID": open_5g_ssid,
250            "security": security_mode,
251            "hiddenSSID": hidden
252        }
253
254        ap = 0
255        for ap in range(MAX_AP_COUNT):
256            open_network.append({
257                "2g": copy.copy(network_dict_2g),
258                "5g": copy.copy(network_dict_5g)
259            })
260            if not mirror_ap:
261                break
262        return {"2g": network_dict_2g, "5g": network_dict_5g}
263
264    def get_wep_network(
265            self,
266            mirror_ap,
267            networks,
268            hidden=False,
269            same_ssid=False,
270            ssid_length_2g=hostapd_constants.AP_SSID_LENGTH_2G,
271            ssid_length_5g=hostapd_constants.AP_SSID_LENGTH_5G,
272            passphrase_length_2g=hostapd_constants.AP_PASSPHRASE_LENGTH_2G,
273            passphrase_length_5g=hostapd_constants.AP_PASSPHRASE_LENGTH_5G):
274        """Generates SSID and passphrase for a WEP network using random
275           generator.
276
277           Args:
278               mirror_ap: Boolean, determines if both APs use the same hostapd
279                          config or different configs.
280               networks: List of WEP networks.
281               same_ssid: Boolean, determines if both bands on AP use the same
282                          SSID.
283               ssid_length_2gecond AP Int, number of characters to use for 2G SSID.
284               ssid_length_5g: Int, number of characters to use for 5G SSID.
285               passphrase_length_2g: Int, length of password for 2G network.
286               passphrase_length_5g: Int, length of password for 5G network.
287
288           Returns: A dict of 2G and 5G network lists for hostapd configuration.
289
290        """
291        network_dict_2g = {}
292        network_dict_5g = {}
293        ref_5g_security = hostapd_constants.WEP_STRING
294        ref_2g_security = hostapd_constants.WEP_STRING
295
296        if same_ssid:
297            ref_2g_ssid = 'xg_%s' % utils.rand_ascii_str(ssid_length_2g)
298            ref_5g_ssid = ref_2g_ssid
299
300            ref_2g_passphrase = utils.rand_hex_str(passphrase_length_2g)
301            ref_5g_passphrase = ref_2g_passphrase
302
303        else:
304            ref_2g_ssid = '2g_%s' % utils.rand_ascii_str(ssid_length_2g)
305            ref_2g_passphrase = utils.rand_hex_str(passphrase_length_2g)
306
307            ref_5g_ssid = '5g_%s' % utils.rand_ascii_str(ssid_length_5g)
308            ref_5g_passphrase = utils.rand_hex_str(passphrase_length_5g)
309
310        network_dict_2g = {
311            "SSID": ref_2g_ssid,
312            "security": ref_2g_security,
313            "wepKeys": [ref_2g_passphrase] * 4,
314            "hiddenSSID": hidden
315        }
316
317        network_dict_5g = {
318            "SSID": ref_5g_ssid,
319            "security": ref_5g_security,
320            "wepKeys": [ref_2g_passphrase] * 4,
321            "hiddenSSID": hidden
322        }
323
324        ap = 0
325        for ap in range(MAX_AP_COUNT):
326            networks.append({
327                "2g": copy.copy(network_dict_2g),
328                "5g": copy.copy(network_dict_5g)
329            })
330            if not mirror_ap:
331                break
332        return {"2g": network_dict_2g, "5g": network_dict_5g}
333
334    def update_bssid(self, ap_instance, ap, network, band):
335        """Get bssid and update network dictionary.
336
337        Args:
338            ap_instance: Accesspoint index that was configured.
339            ap: Accesspoint object corresponding to ap_instance.
340            network: Network dictionary.
341            band: Wifi networks' band.
342
343        """
344        bssid = ap.get_bssid_from_ssid(network["SSID"], band)
345
346        if network["security"] == hostapd_constants.WPA2_STRING:
347            # TODO:(bamahadev) Change all occurances of reference_networks
348            # to wpa_networks.
349            self.reference_networks[ap_instance][band]["bssid"] = bssid
350        if network["security"] == hostapd_constants.WPA_STRING:
351            self.wpa_networks[ap_instance][band]["bssid"] = bssid
352        if network["security"] == hostapd_constants.WEP_STRING:
353            self.wep_networks[ap_instance][band]["bssid"] = bssid
354        if network["security"] == hostapd_constants.ENT_STRING:
355            if "bssid" not in self.ent_networks[ap_instance][band]:
356                self.ent_networks[ap_instance][band]["bssid"] = bssid
357            else:
358                self.ent_networks_pwd[ap_instance][band]["bssid"] = bssid
359        if network["security"] == 'none':
360            self.open_network[ap_instance][band]["bssid"] = bssid
361
362    def populate_bssid(self, ap_instance, ap, networks_5g, networks_2g):
363        """Get bssid for a given SSID and add it to the network dictionary.
364
365        Args:
366            ap_instance: Accesspoint index that was configured.
367            ap: Accesspoint object corresponding to ap_instance.
368            networks_5g: List of 5g networks configured on the APs.
369            networks_2g: List of 2g networks configured on the APs.
370
371        """
372
373        if not (networks_5g or networks_2g):
374            return
375
376        for network in networks_5g:
377            if 'channel' in network:
378                continue
379            self.update_bssid(ap_instance, ap, network,
380                              hostapd_constants.BAND_5G)
381
382        for network in networks_2g:
383            if 'channel' in network:
384                continue
385            self.update_bssid(ap_instance, ap, network,
386                              hostapd_constants.BAND_2G)
387
388    def configure_openwrt_ap_and_start(
389            self,
390            channel_5g=hostapd_constants.AP_DEFAULT_CHANNEL_5G,
391            channel_2g=hostapd_constants.AP_DEFAULT_CHANNEL_2G,
392            channel_5g_ap2=None,
393            channel_2g_ap2=None,
394            ssid_length_2g=hostapd_constants.AP_SSID_LENGTH_2G,
395            passphrase_length_2g=hostapd_constants.AP_PASSPHRASE_LENGTH_2G,
396            ssid_length_5g=hostapd_constants.AP_SSID_LENGTH_5G,
397            passphrase_length_5g=hostapd_constants.AP_PASSPHRASE_LENGTH_5G,
398            mirror_ap=False,
399            hidden=False,
400            same_ssid=False,
401            open_network=False,
402            wpa1_network=False,
403            wpa_network=False,
404            wep_network=False,
405            ent_network=False,
406            ent_network_pwd=False,
407            owe_network=False,
408            sae_network=False,
409            saemixed_network=False,
410            radius_conf_2g=None,
411            radius_conf_5g=None,
412            radius_conf_pwd=None,
413            ap_count=1,
414            ieee80211w=None):
415        """Create, configure and start OpenWrt AP.
416
417        Args:
418            channel_5g: 5G channel to configure.
419            channel_2g: 2G channel to configure.
420            channel_5g_ap2: 5G channel to configure on AP2.
421            channel_2g_ap2: 2G channel to configure on AP2.
422            ssid_length_2g: Int, number of characters to use for 2G SSID.
423            passphrase_length_2g: Int, length of password for 2G network.
424            ssid_length_5g: Int, number of characters to use for 5G SSID.
425            passphrase_length_5g: Int, length of password for 5G network.
426            same_ssid: Boolean, determines if both bands on AP use the same SSID.
427            open_network: Boolean, to check if open network should be configured.
428            wpa_network: Boolean, to check if wpa network should be configured.
429            wep_network: Boolean, to check if wep network should be configured.
430            ent_network: Boolean, to check if ent network should be configured.
431            ent_network_pwd: Boolean, to check if ent pwd network should be configured.
432            owe_network: Boolean, to check if owe network should be configured.
433            sae_network: Boolean, to check if sae network should be configured.
434            saemixed_network: Boolean, to check if saemixed network should be configured.
435            radius_conf_2g: dictionary with enterprise radius server details.
436            radius_conf_5g: dictionary with enterprise radius server details.
437            radius_conf_pwd: dictionary with enterprise radiuse server details.
438            ap_count: APs to configure.
439            ieee80211w:PMF to configure
440        """
441        if mirror_ap and ap_count == 1:
442            raise ValueError("ap_count cannot be 1 if mirror_ap is True.")
443        if (channel_5g_ap2 or channel_2g_ap2) and ap_count == 1:
444            raise ValueError(
445                "ap_count cannot be 1 if channels of AP2 are provided.")
446        # we are creating a channel list for 2G and 5G bands. The list is of
447        # size 2 and this is based on the assumption that each testbed will have
448        # at most 2 APs.
449        if not channel_5g_ap2:
450            channel_5g_ap2 = channel_5g
451        if not channel_2g_ap2:
452            channel_2g_ap2 = channel_2g
453        channels_2g = [channel_2g, channel_2g_ap2]
454        channels_5g = [channel_5g, channel_5g_ap2]
455
456        self.reference_networks = []
457        self.wpa1_networks = []
458        self.wpa_networks = []
459        self.wep_networks = []
460        self.ent_networks = []
461        self.ent_networks_pwd = []
462        self.open_network = []
463        self.owe_networks = []
464        self.sae_networks = []
465        self.saemixed_networks = []
466        self.bssid_map = []
467        for i in range(ap_count):
468            network_list = []
469            if wpa1_network:
470                wpa1_dict = self.get_psk_network(mirror_ap,
471                                                 self.wpa1_networks,
472                                                 hidden, same_ssid,
473                                                 ssid_length_2g, ssid_length_5g,
474                                                 passphrase_length_2g,
475                                                 passphrase_length_5g)
476                wpa1_dict[hostapd_constants.BAND_2G]["security"] = "psk"
477                wpa1_dict[hostapd_constants.BAND_5G]["security"] = "psk"
478                wpa1_dict[hostapd_constants.BAND_2G]["ieee80211w"] = ieee80211w
479                wpa1_dict[hostapd_constants.BAND_5G]["ieee80211w"] = ieee80211w
480                self.wpa1_networks.append(wpa1_dict)
481                network_list.append(wpa1_dict)
482            if wpa_network:
483                wpa_dict = self.get_psk_network(mirror_ap,
484                                                self.reference_networks,
485                                                hidden, same_ssid,
486                                                ssid_length_2g, ssid_length_5g,
487                                                passphrase_length_2g,
488                                                passphrase_length_5g)
489                wpa_dict[hostapd_constants.BAND_2G]["security"] = "psk2"
490                wpa_dict[hostapd_constants.BAND_5G]["security"] = "psk2"
491                wpa_dict[hostapd_constants.BAND_2G]["ieee80211w"] = ieee80211w
492                wpa_dict[hostapd_constants.BAND_5G]["ieee80211w"] = ieee80211w
493                self.wpa_networks.append(wpa_dict)
494                network_list.append(wpa_dict)
495            if wep_network:
496                wep_dict = self.get_wep_network(mirror_ap, self.wep_networks,
497                                                hidden, same_ssid,
498                                                ssid_length_2g, ssid_length_5g)
499                network_list.append(wep_dict)
500            if ent_network:
501                ent_dict = self.get_open_network(mirror_ap, self.ent_networks,
502                                                 hidden, same_ssid,
503                                                 ssid_length_2g,
504                                                 ssid_length_5g)
505                ent_dict["2g"]["security"] = "wpa2"
506                ent_dict["2g"].update(radius_conf_2g)
507                ent_dict["5g"]["security"] = "wpa2"
508                ent_dict["5g"].update(radius_conf_5g)
509                network_list.append(ent_dict)
510            if ent_network_pwd:
511                ent_pwd_dict = self.get_open_network(mirror_ap,
512                                                     self.ent_networks_pwd,
513                                                     hidden, same_ssid,
514                                                     ssid_length_2g,
515                                                     ssid_length_5g)
516                ent_pwd_dict["2g"]["security"] = "wpa2"
517                ent_pwd_dict["2g"].update(radius_conf_pwd)
518                ent_pwd_dict["5g"]["security"] = "wpa2"
519                ent_pwd_dict["5g"].update(radius_conf_pwd)
520                network_list.append(ent_pwd_dict)
521            if open_network:
522                open_dict = self.get_open_network(mirror_ap, self.open_network,
523                                                  hidden, same_ssid,
524                                                  ssid_length_2g,
525                                                  ssid_length_5g)
526                network_list.append(open_dict)
527            if owe_network:
528                owe_dict = self.get_open_network(mirror_ap, self.owe_networks,
529                                                 hidden, same_ssid,
530                                                 ssid_length_2g,
531                                                 ssid_length_5g, "OWE")
532                owe_dict[hostapd_constants.BAND_2G]["security"] = "owe"
533                owe_dict[hostapd_constants.BAND_5G]["security"] = "owe"
534                network_list.append(owe_dict)
535            if sae_network:
536                sae_dict = self.get_psk_network(mirror_ap, self.sae_networks,
537                                                hidden, same_ssid,
538                                                hostapd_constants.SAE_KEY_MGMT,
539                                                ssid_length_2g, ssid_length_5g,
540                                                passphrase_length_2g,
541                                                passphrase_length_5g)
542                sae_dict[hostapd_constants.BAND_2G]["security"] = "sae"
543                sae_dict[hostapd_constants.BAND_5G]["security"] = "sae"
544                network_list.append(sae_dict)
545            if saemixed_network:
546                saemixed_dict = self.get_psk_network(mirror_ap, self.saemixed_networks,
547                                                hidden, same_ssid,
548                                                hostapd_constants.SAE_KEY_MGMT,
549                                                ssid_length_2g, ssid_length_5g,
550                                                passphrase_length_2g,
551                                                passphrase_length_5g)
552                saemixed_dict[hostapd_constants.BAND_2G]["security"] = "sae-mixed"
553                saemixed_dict[hostapd_constants.BAND_5G]["security"] = "sae-mixed"
554                saemixed_dict[hostapd_constants.BAND_2G]["ieee80211w"] = ieee80211w
555                saemixed_dict[hostapd_constants.BAND_5G]["ieee80211w"] = ieee80211w
556                network_list.append(saemixed_dict)
557            self.access_points[i].configure_ap(network_list, channels_2g[i],
558                                               channels_5g[i])
559            self.access_points[i].start_ap()
560            self.bssid_map.append(
561                self.access_points[i].get_bssids_for_wifi_networks())
562            if mirror_ap:
563                self.access_points[i + 1].configure_ap(network_list,
564                                                       channels_2g[i+1],
565                                                       channels_5g[i+1])
566                self.access_points[i + 1].start_ap()
567                self.bssid_map.append(
568                    self.access_points[i + 1].get_bssids_for_wifi_networks())
569                break
570
571    def legacy_configure_ap_and_start(
572            self,
573            channel_5g=hostapd_constants.AP_DEFAULT_CHANNEL_5G,
574            channel_2g=hostapd_constants.AP_DEFAULT_CHANNEL_2G,
575            max_2g_networks=hostapd_constants.AP_DEFAULT_MAX_SSIDS_2G,
576            max_5g_networks=hostapd_constants.AP_DEFAULT_MAX_SSIDS_5G,
577            ap_ssid_length_2g=hostapd_constants.AP_SSID_LENGTH_2G,
578            ap_passphrase_length_2g=hostapd_constants.AP_PASSPHRASE_LENGTH_2G,
579            ap_ssid_length_5g=hostapd_constants.AP_SSID_LENGTH_5G,
580            ap_passphrase_length_5g=hostapd_constants.AP_PASSPHRASE_LENGTH_5G,
581            hidden=False,
582            same_ssid=False,
583            mirror_ap=True,
584            wpa_network=False,
585            wep_network=False,
586            ent_network=False,
587            radius_conf_2g=None,
588            radius_conf_5g=None,
589            ent_network_pwd=False,
590            radius_conf_pwd=None,
591            ap_count=1):
592
593        config_count = 1
594        count = 0
595
596        # For example, the NetworkSelector tests use 2 APs and require that
597        # both APs are not mirrored.
598        if not mirror_ap and ap_count == 1:
599            raise ValueError("ap_count cannot be 1 if mirror_ap is False.")
600
601        if not mirror_ap:
602            config_count = ap_count
603
604        self.user_params["reference_networks"] = []
605        self.user_params["open_network"] = []
606        if wpa_network:
607            self.user_params["wpa_networks"] = []
608        if wep_network:
609            self.user_params["wep_networks"] = []
610        if ent_network:
611            self.user_params["ent_networks"] = []
612        if ent_network_pwd:
613            self.user_params["ent_networks_pwd"] = []
614
615        # kill hostapd & dhcpd if the cleanup was not successful
616        for i in range(len(self.access_points)):
617            self.log.debug("Check ap state and cleanup")
618            self._cleanup_hostapd_and_dhcpd(i)
619
620        for count in range(config_count):
621
622            network_list_2g = []
623            network_list_5g = []
624
625            orig_network_list_2g = []
626            orig_network_list_5g = []
627
628            network_list_2g.append({"channel": channel_2g})
629            network_list_5g.append({"channel": channel_5g})
630
631            networks_dict = self.get_psk_network(
632                mirror_ap,
633                self.user_params["reference_networks"],
634                hidden=hidden,
635                same_ssid=same_ssid)
636            self.reference_networks = self.user_params["reference_networks"]
637
638            network_list_2g.append(networks_dict["2g"])
639            network_list_5g.append(networks_dict["5g"])
640
641            # When same_ssid is set, only configure one set of WPA networks.
642            # We cannot have more than one set because duplicate interface names
643            # are not allowed.
644            # TODO(bmahadev): Provide option to select the type of network,
645            # instead of defaulting to WPA.
646            if not same_ssid:
647                networks_dict = self.get_open_network(
648                    mirror_ap,
649                    self.user_params["open_network"],
650                    hidden=hidden,
651                    same_ssid=same_ssid)
652                self.open_network = self.user_params["open_network"]
653
654                network_list_2g.append(networks_dict["2g"])
655                network_list_5g.append(networks_dict["5g"])
656
657                if wpa_network:
658                    networks_dict = self.get_psk_network(
659                        mirror_ap,
660                        self.user_params["wpa_networks"],
661                        hidden=hidden,
662                        same_ssid=same_ssid,
663                        security_mode=hostapd_constants.WPA_STRING)
664                    self.wpa_networks = self.user_params["wpa_networks"]
665
666                    network_list_2g.append(networks_dict["2g"])
667                    network_list_5g.append(networks_dict["5g"])
668
669                if wep_network:
670                    networks_dict = self.get_wep_network(
671                        mirror_ap,
672                        self.user_params["wep_networks"],
673                        hidden=hidden,
674                        same_ssid=same_ssid)
675                    self.wep_networks = self.user_params["wep_networks"]
676
677                    network_list_2g.append(networks_dict["2g"])
678                    network_list_5g.append(networks_dict["5g"])
679
680                if ent_network:
681                    networks_dict = self.get_open_network(
682                        mirror_ap,
683                        self.user_params["ent_networks"],
684                        hidden=hidden,
685                        same_ssid=same_ssid)
686                    networks_dict["2g"][
687                        "security"] = hostapd_constants.ENT_STRING
688                    networks_dict["2g"].update(radius_conf_2g)
689                    networks_dict["5g"][
690                        "security"] = hostapd_constants.ENT_STRING
691                    networks_dict["5g"].update(radius_conf_5g)
692                    self.ent_networks = self.user_params["ent_networks"]
693
694                    network_list_2g.append(networks_dict["2g"])
695                    network_list_5g.append(networks_dict["5g"])
696
697                if ent_network_pwd:
698                    networks_dict = self.get_open_network(
699                        mirror_ap,
700                        self.user_params["ent_networks_pwd"],
701                        hidden=hidden,
702                        same_ssid=same_ssid)
703                    networks_dict["2g"][
704                        "security"] = hostapd_constants.ENT_STRING
705                    networks_dict["2g"].update(radius_conf_pwd)
706                    networks_dict["5g"][
707                        "security"] = hostapd_constants.ENT_STRING
708                    networks_dict["5g"].update(radius_conf_pwd)
709                    self.ent_networks_pwd = self.user_params[
710                        "ent_networks_pwd"]
711
712                    network_list_2g.append(networks_dict["2g"])
713                    network_list_5g.append(networks_dict["5g"])
714
715            orig_network_list_5g = copy.copy(network_list_5g)
716            orig_network_list_2g = copy.copy(network_list_2g)
717
718            if len(network_list_5g) > 1:
719                self.config_5g = self._generate_legacy_ap_config(
720                    network_list_5g)
721            if len(network_list_2g) > 1:
722                self.config_2g = self._generate_legacy_ap_config(
723                    network_list_2g)
724
725            self.access_points[count].start_ap(self.config_2g)
726            self.access_points[count].start_ap(self.config_5g)
727            self.populate_bssid(count, self.access_points[count],
728                                orig_network_list_5g, orig_network_list_2g)
729
730        # Repeat configuration on the second router.
731        if mirror_ap and ap_count == 2:
732            self.access_points[AP_2].start_ap(self.config_2g)
733            self.access_points[AP_2].start_ap(self.config_5g)
734            self.populate_bssid(AP_2, self.access_points[AP_2],
735                                orig_network_list_5g, orig_network_list_2g)
736
737    def _kill_processes(self, ap, daemon):
738        """ Kill hostapd and dhcpd daemons
739
740        Args:
741            ap: AP to cleanup
742            daemon: process to kill
743
744        Returns: True/False if killing process is successful
745        """
746        self.log.info("Killing %s" % daemon)
747        pids = ap.ssh.run('pidof %s' % daemon, ignore_status=True)
748        if pids.stdout:
749            ap.ssh.run('kill %s' % pids.stdout, ignore_status=True)
750        time.sleep(3)
751        pids = ap.ssh.run('pidof %s' % daemon, ignore_status=True)
752        if pids.stdout:
753            return False
754        return True
755
756    def _cleanup_hostapd_and_dhcpd(self, count):
757        """ Check if AP was cleaned up properly
758
759        Kill hostapd and dhcpd processes if cleanup was not successful in the
760        last run
761
762        Args:
763            count: AP to check
764
765        Returns:
766            New AccessPoint object if AP required cleanup
767
768        Raises:
769            Error: if the AccessPoint timed out to setup
770        """
771        ap = self.access_points[count]
772        phy_ifaces = ap.interfaces.get_physical_interface()
773        kill_hostapd = False
774        for iface in phy_ifaces:
775            if '2g_' in iface or '5g_' in iface or 'xg_' in iface:
776                kill_hostapd = True
777                break
778
779        if not kill_hostapd:
780            return
781
782        self.log.debug("Cleanup AP")
783        if not self._kill_processes(ap, 'hostapd') or \
784            not self._kill_processes(ap, 'dhcpd'):
785            raise ("Failed to cleanup AP")
786
787        ap.__init__(self.user_params['AccessPoint'][count])
788
789    def _generate_legacy_ap_config(self, network_list):
790        bss_settings = []
791        wlan_2g = self.access_points[AP_1].wlan_2g
792        wlan_5g = self.access_points[AP_1].wlan_5g
793        ap_settings = network_list.pop(0)
794        # TODO:(bmahadev) This is a bug. We should not have to pop the first
795        # network in the list and treat it as a separate case. Instead,
796        # create_ap_preset() should be able to take NULL ssid and security and
797        # build config based on the bss_Settings alone.
798        hostapd_config_settings = network_list.pop(0)
799        for network in network_list:
800            if "password" in network:
801                bss_settings.append(
802                    hostapd_bss_settings.BssSettings(
803                        name=network["SSID"],
804                        ssid=network["SSID"],
805                        hidden=network["hiddenSSID"],
806                        security=hostapd_security.Security(
807                            security_mode=network["security"],
808                            password=network["password"])))
809            elif "wepKeys" in network:
810                bss_settings.append(
811                    hostapd_bss_settings.BssSettings(
812                        name=network["SSID"],
813                        ssid=network["SSID"],
814                        hidden=network["hiddenSSID"],
815                        security=hostapd_security.Security(
816                            security_mode=network["security"],
817                            password=network["wepKeys"][0])))
818            elif network["security"] == hostapd_constants.ENT_STRING:
819                bss_settings.append(
820                    hostapd_bss_settings.BssSettings(
821                        name=network["SSID"],
822                        ssid=network["SSID"],
823                        hidden=network["hiddenSSID"],
824                        security=hostapd_security.Security(
825                            security_mode=network["security"],
826                            radius_server_ip=network["radius_server_ip"],
827                            radius_server_port=network["radius_server_port"],
828                            radius_server_secret=network[
829                                "radius_server_secret"])))
830            else:
831                bss_settings.append(
832                    hostapd_bss_settings.BssSettings(
833                        name=network["SSID"],
834                        ssid=network["SSID"],
835                        hidden=network["hiddenSSID"]))
836        if "password" in hostapd_config_settings:
837            config = hostapd_ap_preset.create_ap_preset(
838                iface_wlan_2g=wlan_2g,
839                iface_wlan_5g=wlan_5g,
840                channel=ap_settings["channel"],
841                ssid=hostapd_config_settings["SSID"],
842                hidden=hostapd_config_settings["hiddenSSID"],
843                security=hostapd_security.Security(
844                    security_mode=hostapd_config_settings["security"],
845                    password=hostapd_config_settings["password"]),
846                bss_settings=bss_settings)
847        elif "wepKeys" in hostapd_config_settings:
848            config = hostapd_ap_preset.create_ap_preset(
849                iface_wlan_2g=wlan_2g,
850                iface_wlan_5g=wlan_5g,
851                channel=ap_settings["channel"],
852                ssid=hostapd_config_settings["SSID"],
853                hidden=hostapd_config_settings["hiddenSSID"],
854                security=hostapd_security.Security(
855                    security_mode=hostapd_config_settings["security"],
856                    password=hostapd_config_settings["wepKeys"][0]),
857                bss_settings=bss_settings)
858        else:
859            config = hostapd_ap_preset.create_ap_preset(
860                iface_wlan_2g=wlan_2g,
861                iface_wlan_5g=wlan_5g,
862                channel=ap_settings["channel"],
863                ssid=hostapd_config_settings["SSID"],
864                hidden=hostapd_config_settings["hiddenSSID"],
865                bss_settings=bss_settings)
866        return config
867
868    def configure_packet_capture(
869            self,
870            channel_5g=hostapd_constants.AP_DEFAULT_CHANNEL_5G,
871            channel_2g=hostapd_constants.AP_DEFAULT_CHANNEL_2G):
872        """Configure packet capture for 2G and 5G bands.
873
874        Args:
875            channel_5g: Channel to set the monitor mode to for 5G band.
876            channel_2g: Channel to set the monitor mode to for 2G band.
877        """
878        self.packet_capture = self.packet_capture[0]
879        result = self.packet_capture.configure_monitor_mode(
880            hostapd_constants.BAND_2G, channel_2g)
881        if not result:
882            raise ValueError("Failed to configure channel for 2G band")
883
884        result = self.packet_capture.configure_monitor_mode(
885            hostapd_constants.BAND_5G, channel_5g)
886        if not result:
887            raise ValueError("Failed to configure channel for 5G band.")
888
889    @staticmethod
890    def wifi_test_wrap(fn):
891        def _safe_wrap_test_case(self, *args, **kwargs):
892            test_id = "%s:%s:%s" % (self.__class__.__name__, self.test_name,
893                                    self.log_begin_time.replace(' ', '-'))
894            self.test_id = test_id
895            self.result_detail = ""
896            tries = int(self.user_params.get("wifi_auto_rerun", 3))
897            for ad in self.android_devices:
898                ad.log_path = self.log_path
899            for i in range(tries + 1):
900                result = True
901                if i > 0:
902                    log_string = "[Test Case] RETRY:%s %s" % (i,
903                                                              self.test_name)
904                    self.log.info(log_string)
905                    self._teardown_test(self.test_name)
906                    self._setup_test(self.test_name)
907                try:
908                    result = fn(self, *args, **kwargs)
909                except signals.TestFailure as e:
910                    self.log.warn("Error msg: %s" % e)
911                    if self.result_detail:
912                        signal.details = self.result_detail
913                    result = False
914                except signals.TestSignal:
915                    if self.result_detail:
916                        signal.details = self.result_detail
917                    raise
918                except Exception as e:
919                    self.log.exception(e)
920                    asserts.fail(self.result_detail)
921                if result is False:
922                    if i < tries:
923                        continue
924                else:
925                    break
926            if result is not False:
927                asserts.explicit_pass(self.result_detail)
928            else:
929                asserts.fail(self.result_detail)
930
931        return _safe_wrap_test_case
932