1#!/usr/bin/env python3 2# 3# Copyright 2017 - Google 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16""" 17 Base Class for Defining Common WiFi Test Functionality 18""" 19 20import copy 21import itertools 22import os 23import time 24 25import acts.controllers.access_point as ap 26 27from acts import asserts 28from acts import signals 29from acts import utils 30from acts.base_test import BaseTestClass 31from acts.signals import TestSignal 32from acts.controllers import android_device 33from acts.controllers.access_point import AccessPoint 34from acts.controllers.ap_lib import hostapd_ap_preset 35from acts.controllers.ap_lib import hostapd_bss_settings 36from acts.controllers.ap_lib import hostapd_constants 37from acts.controllers.ap_lib import hostapd_security 38from acts.keys import Config 39from acts_contrib.test_utils.net import net_test_utils as nutils 40from acts_contrib.test_utils.wifi import wifi_test_utils as wutils 41 42AP_1 = 0 43AP_2 = 1 44MAX_AP_COUNT = 2 45 46 47class WifiBaseTest(BaseTestClass): 48 def __init__(self, configs): 49 super().__init__(configs) 50 self.enable_packet_log = False 51 self.packet_log_2g = hostapd_constants.AP_DEFAULT_CHANNEL_2G 52 self.packet_log_5g = hostapd_constants.AP_DEFAULT_CHANNEL_5G 53 54 def setup_class(self): 55 if hasattr(self, 'attenuators') and self.attenuators: 56 for attenuator in self.attenuators: 57 attenuator.set_atten(0) 58 opt_param = ["pixel_models", "cnss_diag_file", "country_code_file"] 59 self.unpack_userparams(opt_param_names=opt_param) 60 if hasattr(self, "cnss_diag_file"): 61 if isinstance(self.cnss_diag_file, list): 62 self.cnss_diag_file = self.cnss_diag_file[0] 63 if not os.path.isfile(self.cnss_diag_file): 64 self.cnss_diag_file = os.path.join( 65 self.user_params[Config.key_config_path.value], 66 self.cnss_diag_file) 67 if self.enable_packet_log and hasattr(self, "packet_capture"): 68 self.packet_logger = self.packet_capture[0] 69 self.packet_logger.configure_monitor_mode("2G", self.packet_log_2g) 70 self.packet_logger.configure_monitor_mode("5G", self.packet_log_5g) 71 if hasattr(self, "android_devices"): 72 for ad in self.android_devices: 73 wutils.wifi_test_device_init(ad) 74 if hasattr(self, "country_code_file"): 75 if isinstance(self.country_code_file, list): 76 self.country_code_file = self.country_code_file[0] 77 if not os.path.isfile(self.country_code_file): 78 self.country_code_file = os.path.join( 79 self.user_params[Config.key_config_path.value], 80 self.country_code_file) 81 self.country_code = utils.load_config( 82 self.country_code_file)["country"] 83 wutils.set_wifi_country_code(ad, self.country_code) 84 85 def setup_test(self): 86 if (hasattr(self, "android_devices") 87 and hasattr(self, "cnss_diag_file") 88 and hasattr(self, "pixel_models")): 89 wutils.start_cnss_diags(self.android_devices, self.cnss_diag_file, 90 self.pixel_models) 91 self.tcpdump_proc = [] 92 if hasattr(self, "android_devices"): 93 for ad in self.android_devices: 94 proc = nutils.start_tcpdump(ad, self.test_name) 95 self.tcpdump_proc.append((ad, proc)) 96 if hasattr(self, "packet_logger"): 97 self.packet_log_pid = wutils.start_pcap(self.packet_logger, 'dual', 98 self.test_name) 99 100 def teardown_test(self): 101 if (hasattr(self, "android_devices") 102 and hasattr(self, "cnss_diag_file") 103 and hasattr(self, "pixel_models")): 104 wutils.stop_cnss_diags(self.android_devices, self.pixel_models) 105 for proc in self.tcpdump_proc: 106 nutils.stop_tcpdump(proc[0], 107 proc[1], 108 self.test_name, 109 pull_dump=False) 110 self.tcpdump_proc = [] 111 if hasattr(self, "packet_logger") and self.packet_log_pid: 112 wutils.stop_pcap(self.packet_logger, 113 self.packet_log_pid, 114 test_status=True) 115 self.packet_log_pid = {} 116 117 def on_fail(self, test_name, begin_time): 118 if hasattr(self, "android_devices"): 119 for ad in self.android_devices: 120 ad.take_bug_report(test_name, begin_time) 121 ad.cat_adb_log(test_name, begin_time) 122 wutils.get_ssrdumps(ad) 123 if (hasattr(self, "cnss_diag_file") 124 and hasattr(self, "pixel_models")): 125 wutils.stop_cnss_diags(self.android_devices, self.pixel_models) 126 for ad in self.android_devices: 127 wutils.get_cnss_diag_log(ad) 128 for proc in self.tcpdump_proc: 129 nutils.stop_tcpdump(proc[0], proc[1], self.test_name) 130 self.tcpdump_proc = [] 131 if hasattr(self, "packet_logger") and self.packet_log_pid: 132 wutils.stop_pcap(self.packet_logger, 133 self.packet_log_pid, 134 test_status=False) 135 self.packet_log_pid = {} 136 137 def get_psk_network( 138 self, 139 mirror_ap, 140 reference_networks, 141 hidden=False, 142 same_ssid=False, 143 security_mode=hostapd_constants.WPA2_STRING, 144 ssid_length_2g=hostapd_constants.AP_SSID_LENGTH_2G, 145 ssid_length_5g=hostapd_constants.AP_SSID_LENGTH_5G, 146 passphrase_length_2g=hostapd_constants.AP_PASSPHRASE_LENGTH_2G, 147 passphrase_length_5g=hostapd_constants.AP_PASSPHRASE_LENGTH_5G): 148 """Generates SSID and passphrase for a WPA2 network using random 149 generator. 150 151 Args: 152 mirror_ap: Boolean, determines if both APs use the same hostapd 153 config or different configs. 154 reference_networks: List of PSK networks. 155 same_ssid: Boolean, determines if both bands on AP use the same 156 SSID. 157 ssid_length_2gecond AP Int, number of characters to use for 2G SSID. 158 ssid_length_5g: Int, number of characters to use for 5G SSID. 159 passphrase_length_2g: Int, length of password for 2G network. 160 passphrase_length_5g: Int, length of password for 5G network. 161 162 Returns: A dict of 2G and 5G network lists for hostapd configuration. 163 164 """ 165 network_dict_2g = {} 166 network_dict_5g = {} 167 ref_5g_security = security_mode 168 ref_2g_security = security_mode 169 170 if same_ssid: 171 ref_2g_ssid = 'xg_%s' % utils.rand_ascii_str(ssid_length_2g) 172 ref_5g_ssid = ref_2g_ssid 173 174 ref_2g_passphrase = utils.rand_ascii_str(passphrase_length_2g) 175 ref_5g_passphrase = ref_2g_passphrase 176 177 else: 178 ref_2g_ssid = '2g_%s' % utils.rand_ascii_str(ssid_length_2g) 179 ref_2g_passphrase = utils.rand_ascii_str(passphrase_length_2g) 180 181 ref_5g_ssid = '5g_%s' % utils.rand_ascii_str(ssid_length_5g) 182 ref_5g_passphrase = utils.rand_ascii_str(passphrase_length_5g) 183 184 network_dict_2g = { 185 "SSID": ref_2g_ssid, 186 "security": ref_2g_security, 187 "password": ref_2g_passphrase, 188 "hiddenSSID": hidden 189 } 190 191 network_dict_5g = { 192 "SSID": ref_5g_ssid, 193 "security": ref_5g_security, 194 "password": ref_5g_passphrase, 195 "hiddenSSID": hidden 196 } 197 198 ap = 0 199 for ap in range(MAX_AP_COUNT): 200 reference_networks.append({ 201 "2g": copy.copy(network_dict_2g), 202 "5g": copy.copy(network_dict_5g) 203 }) 204 if not mirror_ap: 205 break 206 return {"2g": network_dict_2g, "5g": network_dict_5g} 207 208 def get_open_network(self, 209 mirror_ap, 210 open_network, 211 hidden=False, 212 same_ssid=False, 213 ssid_length_2g=hostapd_constants.AP_SSID_LENGTH_2G, 214 ssid_length_5g=hostapd_constants.AP_SSID_LENGTH_5G, 215 security_mode='none'): 216 """Generates SSIDs for a open network using a random generator. 217 218 Args: 219 mirror_ap: Boolean, determines if both APs use the same hostapd 220 config or different configs. 221 open_network: List of open networks. 222 same_ssid: Boolean, determines if both bands on AP use the same 223 SSID. 224 ssid_length_2g: Int, number of characters to use for 2G SSID. 225 ssid_length_5g: Int, number of characters to use for 5G SSID. 226 security_mode: 'none' for open and 'OWE' for WPA3 OWE. 227 228 Returns: A dict of 2G and 5G network lists for hostapd configuration. 229 230 """ 231 network_dict_2g = {} 232 network_dict_5g = {} 233 234 if same_ssid: 235 open_2g_ssid = 'xg_%s' % utils.rand_ascii_str(ssid_length_2g) 236 open_5g_ssid = open_2g_ssid 237 238 else: 239 open_2g_ssid = '2g_%s' % utils.rand_ascii_str(ssid_length_2g) 240 open_5g_ssid = '5g_%s' % utils.rand_ascii_str(ssid_length_5g) 241 242 network_dict_2g = { 243 "SSID": open_2g_ssid, 244 "security": security_mode, 245 "hiddenSSID": hidden 246 } 247 248 network_dict_5g = { 249 "SSID": open_5g_ssid, 250 "security": security_mode, 251 "hiddenSSID": hidden 252 } 253 254 ap = 0 255 for ap in range(MAX_AP_COUNT): 256 open_network.append({ 257 "2g": copy.copy(network_dict_2g), 258 "5g": copy.copy(network_dict_5g) 259 }) 260 if not mirror_ap: 261 break 262 return {"2g": network_dict_2g, "5g": network_dict_5g} 263 264 def get_wep_network( 265 self, 266 mirror_ap, 267 networks, 268 hidden=False, 269 same_ssid=False, 270 ssid_length_2g=hostapd_constants.AP_SSID_LENGTH_2G, 271 ssid_length_5g=hostapd_constants.AP_SSID_LENGTH_5G, 272 passphrase_length_2g=hostapd_constants.AP_PASSPHRASE_LENGTH_2G, 273 passphrase_length_5g=hostapd_constants.AP_PASSPHRASE_LENGTH_5G): 274 """Generates SSID and passphrase for a WEP network using random 275 generator. 276 277 Args: 278 mirror_ap: Boolean, determines if both APs use the same hostapd 279 config or different configs. 280 networks: List of WEP networks. 281 same_ssid: Boolean, determines if both bands on AP use the same 282 SSID. 283 ssid_length_2gecond AP Int, number of characters to use for 2G SSID. 284 ssid_length_5g: Int, number of characters to use for 5G SSID. 285 passphrase_length_2g: Int, length of password for 2G network. 286 passphrase_length_5g: Int, length of password for 5G network. 287 288 Returns: A dict of 2G and 5G network lists for hostapd configuration. 289 290 """ 291 network_dict_2g = {} 292 network_dict_5g = {} 293 ref_5g_security = hostapd_constants.WEP_STRING 294 ref_2g_security = hostapd_constants.WEP_STRING 295 296 if same_ssid: 297 ref_2g_ssid = 'xg_%s' % utils.rand_ascii_str(ssid_length_2g) 298 ref_5g_ssid = ref_2g_ssid 299 300 ref_2g_passphrase = utils.rand_hex_str(passphrase_length_2g) 301 ref_5g_passphrase = ref_2g_passphrase 302 303 else: 304 ref_2g_ssid = '2g_%s' % utils.rand_ascii_str(ssid_length_2g) 305 ref_2g_passphrase = utils.rand_hex_str(passphrase_length_2g) 306 307 ref_5g_ssid = '5g_%s' % utils.rand_ascii_str(ssid_length_5g) 308 ref_5g_passphrase = utils.rand_hex_str(passphrase_length_5g) 309 310 network_dict_2g = { 311 "SSID": ref_2g_ssid, 312 "security": ref_2g_security, 313 "wepKeys": [ref_2g_passphrase] * 4, 314 "hiddenSSID": hidden 315 } 316 317 network_dict_5g = { 318 "SSID": ref_5g_ssid, 319 "security": ref_5g_security, 320 "wepKeys": [ref_2g_passphrase] * 4, 321 "hiddenSSID": hidden 322 } 323 324 ap = 0 325 for ap in range(MAX_AP_COUNT): 326 networks.append({ 327 "2g": copy.copy(network_dict_2g), 328 "5g": copy.copy(network_dict_5g) 329 }) 330 if not mirror_ap: 331 break 332 return {"2g": network_dict_2g, "5g": network_dict_5g} 333 334 def update_bssid(self, ap_instance, ap, network, band): 335 """Get bssid and update network dictionary. 336 337 Args: 338 ap_instance: Accesspoint index that was configured. 339 ap: Accesspoint object corresponding to ap_instance. 340 network: Network dictionary. 341 band: Wifi networks' band. 342 343 """ 344 bssid = ap.get_bssid_from_ssid(network["SSID"], band) 345 346 if network["security"] == hostapd_constants.WPA2_STRING: 347 # TODO:(bamahadev) Change all occurances of reference_networks 348 # to wpa_networks. 349 self.reference_networks[ap_instance][band]["bssid"] = bssid 350 if network["security"] == hostapd_constants.WPA_STRING: 351 self.wpa_networks[ap_instance][band]["bssid"] = bssid 352 if network["security"] == hostapd_constants.WEP_STRING: 353 self.wep_networks[ap_instance][band]["bssid"] = bssid 354 if network["security"] == hostapd_constants.ENT_STRING: 355 if "bssid" not in self.ent_networks[ap_instance][band]: 356 self.ent_networks[ap_instance][band]["bssid"] = bssid 357 else: 358 self.ent_networks_pwd[ap_instance][band]["bssid"] = bssid 359 if network["security"] == 'none': 360 self.open_network[ap_instance][band]["bssid"] = bssid 361 362 def populate_bssid(self, ap_instance, ap, networks_5g, networks_2g): 363 """Get bssid for a given SSID and add it to the network dictionary. 364 365 Args: 366 ap_instance: Accesspoint index that was configured. 367 ap: Accesspoint object corresponding to ap_instance. 368 networks_5g: List of 5g networks configured on the APs. 369 networks_2g: List of 2g networks configured on the APs. 370 371 """ 372 373 if not (networks_5g or networks_2g): 374 return 375 376 for network in networks_5g: 377 if 'channel' in network: 378 continue 379 self.update_bssid(ap_instance, ap, network, 380 hostapd_constants.BAND_5G) 381 382 for network in networks_2g: 383 if 'channel' in network: 384 continue 385 self.update_bssid(ap_instance, ap, network, 386 hostapd_constants.BAND_2G) 387 388 def configure_openwrt_ap_and_start( 389 self, 390 channel_5g=hostapd_constants.AP_DEFAULT_CHANNEL_5G, 391 channel_2g=hostapd_constants.AP_DEFAULT_CHANNEL_2G, 392 channel_5g_ap2=None, 393 channel_2g_ap2=None, 394 ssid_length_2g=hostapd_constants.AP_SSID_LENGTH_2G, 395 passphrase_length_2g=hostapd_constants.AP_PASSPHRASE_LENGTH_2G, 396 ssid_length_5g=hostapd_constants.AP_SSID_LENGTH_5G, 397 passphrase_length_5g=hostapd_constants.AP_PASSPHRASE_LENGTH_5G, 398 mirror_ap=False, 399 hidden=False, 400 same_ssid=False, 401 open_network=False, 402 wpa1_network=False, 403 wpa_network=False, 404 wep_network=False, 405 ent_network=False, 406 ent_network_pwd=False, 407 owe_network=False, 408 sae_network=False, 409 saemixed_network=False, 410 radius_conf_2g=None, 411 radius_conf_5g=None, 412 radius_conf_pwd=None, 413 ap_count=1, 414 ieee80211w=None): 415 """Create, configure and start OpenWrt AP. 416 417 Args: 418 channel_5g: 5G channel to configure. 419 channel_2g: 2G channel to configure. 420 channel_5g_ap2: 5G channel to configure on AP2. 421 channel_2g_ap2: 2G channel to configure on AP2. 422 ssid_length_2g: Int, number of characters to use for 2G SSID. 423 passphrase_length_2g: Int, length of password for 2G network. 424 ssid_length_5g: Int, number of characters to use for 5G SSID. 425 passphrase_length_5g: Int, length of password for 5G network. 426 same_ssid: Boolean, determines if both bands on AP use the same SSID. 427 open_network: Boolean, to check if open network should be configured. 428 wpa_network: Boolean, to check if wpa network should be configured. 429 wep_network: Boolean, to check if wep network should be configured. 430 ent_network: Boolean, to check if ent network should be configured. 431 ent_network_pwd: Boolean, to check if ent pwd network should be configured. 432 owe_network: Boolean, to check if owe network should be configured. 433 sae_network: Boolean, to check if sae network should be configured. 434 saemixed_network: Boolean, to check if saemixed network should be configured. 435 radius_conf_2g: dictionary with enterprise radius server details. 436 radius_conf_5g: dictionary with enterprise radius server details. 437 radius_conf_pwd: dictionary with enterprise radiuse server details. 438 ap_count: APs to configure. 439 ieee80211w:PMF to configure 440 """ 441 if mirror_ap and ap_count == 1: 442 raise ValueError("ap_count cannot be 1 if mirror_ap is True.") 443 if (channel_5g_ap2 or channel_2g_ap2) and ap_count == 1: 444 raise ValueError( 445 "ap_count cannot be 1 if channels of AP2 are provided.") 446 # we are creating a channel list for 2G and 5G bands. The list is of 447 # size 2 and this is based on the assumption that each testbed will have 448 # at most 2 APs. 449 if not channel_5g_ap2: 450 channel_5g_ap2 = channel_5g 451 if not channel_2g_ap2: 452 channel_2g_ap2 = channel_2g 453 channels_2g = [channel_2g, channel_2g_ap2] 454 channels_5g = [channel_5g, channel_5g_ap2] 455 456 self.reference_networks = [] 457 self.wpa1_networks = [] 458 self.wpa_networks = [] 459 self.wep_networks = [] 460 self.ent_networks = [] 461 self.ent_networks_pwd = [] 462 self.open_network = [] 463 self.owe_networks = [] 464 self.sae_networks = [] 465 self.saemixed_networks = [] 466 self.bssid_map = [] 467 for i in range(ap_count): 468 network_list = [] 469 if wpa1_network: 470 wpa1_dict = self.get_psk_network(mirror_ap, 471 self.wpa1_networks, 472 hidden, same_ssid, 473 ssid_length_2g, ssid_length_5g, 474 passphrase_length_2g, 475 passphrase_length_5g) 476 wpa1_dict[hostapd_constants.BAND_2G]["security"] = "psk" 477 wpa1_dict[hostapd_constants.BAND_5G]["security"] = "psk" 478 wpa1_dict[hostapd_constants.BAND_2G]["ieee80211w"] = ieee80211w 479 wpa1_dict[hostapd_constants.BAND_5G]["ieee80211w"] = ieee80211w 480 self.wpa1_networks.append(wpa1_dict) 481 network_list.append(wpa1_dict) 482 if wpa_network: 483 wpa_dict = self.get_psk_network(mirror_ap, 484 self.reference_networks, 485 hidden, same_ssid, 486 ssid_length_2g, ssid_length_5g, 487 passphrase_length_2g, 488 passphrase_length_5g) 489 wpa_dict[hostapd_constants.BAND_2G]["security"] = "psk2" 490 wpa_dict[hostapd_constants.BAND_5G]["security"] = "psk2" 491 wpa_dict[hostapd_constants.BAND_2G]["ieee80211w"] = ieee80211w 492 wpa_dict[hostapd_constants.BAND_5G]["ieee80211w"] = ieee80211w 493 self.wpa_networks.append(wpa_dict) 494 network_list.append(wpa_dict) 495 if wep_network: 496 wep_dict = self.get_wep_network(mirror_ap, self.wep_networks, 497 hidden, same_ssid, 498 ssid_length_2g, ssid_length_5g) 499 network_list.append(wep_dict) 500 if ent_network: 501 ent_dict = self.get_open_network(mirror_ap, self.ent_networks, 502 hidden, same_ssid, 503 ssid_length_2g, 504 ssid_length_5g) 505 ent_dict["2g"]["security"] = "wpa2" 506 ent_dict["2g"].update(radius_conf_2g) 507 ent_dict["5g"]["security"] = "wpa2" 508 ent_dict["5g"].update(radius_conf_5g) 509 network_list.append(ent_dict) 510 if ent_network_pwd: 511 ent_pwd_dict = self.get_open_network(mirror_ap, 512 self.ent_networks_pwd, 513 hidden, same_ssid, 514 ssid_length_2g, 515 ssid_length_5g) 516 ent_pwd_dict["2g"]["security"] = "wpa2" 517 ent_pwd_dict["2g"].update(radius_conf_pwd) 518 ent_pwd_dict["5g"]["security"] = "wpa2" 519 ent_pwd_dict["5g"].update(radius_conf_pwd) 520 network_list.append(ent_pwd_dict) 521 if open_network: 522 open_dict = self.get_open_network(mirror_ap, self.open_network, 523 hidden, same_ssid, 524 ssid_length_2g, 525 ssid_length_5g) 526 network_list.append(open_dict) 527 if owe_network: 528 owe_dict = self.get_open_network(mirror_ap, self.owe_networks, 529 hidden, same_ssid, 530 ssid_length_2g, 531 ssid_length_5g, "OWE") 532 owe_dict[hostapd_constants.BAND_2G]["security"] = "owe" 533 owe_dict[hostapd_constants.BAND_5G]["security"] = "owe" 534 network_list.append(owe_dict) 535 if sae_network: 536 sae_dict = self.get_psk_network(mirror_ap, self.sae_networks, 537 hidden, same_ssid, 538 hostapd_constants.SAE_KEY_MGMT, 539 ssid_length_2g, ssid_length_5g, 540 passphrase_length_2g, 541 passphrase_length_5g) 542 sae_dict[hostapd_constants.BAND_2G]["security"] = "sae" 543 sae_dict[hostapd_constants.BAND_5G]["security"] = "sae" 544 network_list.append(sae_dict) 545 if saemixed_network: 546 saemixed_dict = self.get_psk_network(mirror_ap, self.saemixed_networks, 547 hidden, same_ssid, 548 hostapd_constants.SAE_KEY_MGMT, 549 ssid_length_2g, ssid_length_5g, 550 passphrase_length_2g, 551 passphrase_length_5g) 552 saemixed_dict[hostapd_constants.BAND_2G]["security"] = "sae-mixed" 553 saemixed_dict[hostapd_constants.BAND_5G]["security"] = "sae-mixed" 554 saemixed_dict[hostapd_constants.BAND_2G]["ieee80211w"] = ieee80211w 555 saemixed_dict[hostapd_constants.BAND_5G]["ieee80211w"] = ieee80211w 556 network_list.append(saemixed_dict) 557 self.access_points[i].configure_ap(network_list, channels_2g[i], 558 channels_5g[i]) 559 self.access_points[i].start_ap() 560 self.bssid_map.append( 561 self.access_points[i].get_bssids_for_wifi_networks()) 562 if mirror_ap: 563 self.access_points[i + 1].configure_ap(network_list, 564 channels_2g[i+1], 565 channels_5g[i+1]) 566 self.access_points[i + 1].start_ap() 567 self.bssid_map.append( 568 self.access_points[i + 1].get_bssids_for_wifi_networks()) 569 break 570 571 def legacy_configure_ap_and_start( 572 self, 573 channel_5g=hostapd_constants.AP_DEFAULT_CHANNEL_5G, 574 channel_2g=hostapd_constants.AP_DEFAULT_CHANNEL_2G, 575 max_2g_networks=hostapd_constants.AP_DEFAULT_MAX_SSIDS_2G, 576 max_5g_networks=hostapd_constants.AP_DEFAULT_MAX_SSIDS_5G, 577 ap_ssid_length_2g=hostapd_constants.AP_SSID_LENGTH_2G, 578 ap_passphrase_length_2g=hostapd_constants.AP_PASSPHRASE_LENGTH_2G, 579 ap_ssid_length_5g=hostapd_constants.AP_SSID_LENGTH_5G, 580 ap_passphrase_length_5g=hostapd_constants.AP_PASSPHRASE_LENGTH_5G, 581 hidden=False, 582 same_ssid=False, 583 mirror_ap=True, 584 wpa_network=False, 585 wep_network=False, 586 ent_network=False, 587 radius_conf_2g=None, 588 radius_conf_5g=None, 589 ent_network_pwd=False, 590 radius_conf_pwd=None, 591 ap_count=1): 592 593 config_count = 1 594 count = 0 595 596 # For example, the NetworkSelector tests use 2 APs and require that 597 # both APs are not mirrored. 598 if not mirror_ap and ap_count == 1: 599 raise ValueError("ap_count cannot be 1 if mirror_ap is False.") 600 601 if not mirror_ap: 602 config_count = ap_count 603 604 self.user_params["reference_networks"] = [] 605 self.user_params["open_network"] = [] 606 if wpa_network: 607 self.user_params["wpa_networks"] = [] 608 if wep_network: 609 self.user_params["wep_networks"] = [] 610 if ent_network: 611 self.user_params["ent_networks"] = [] 612 if ent_network_pwd: 613 self.user_params["ent_networks_pwd"] = [] 614 615 # kill hostapd & dhcpd if the cleanup was not successful 616 for i in range(len(self.access_points)): 617 self.log.debug("Check ap state and cleanup") 618 self._cleanup_hostapd_and_dhcpd(i) 619 620 for count in range(config_count): 621 622 network_list_2g = [] 623 network_list_5g = [] 624 625 orig_network_list_2g = [] 626 orig_network_list_5g = [] 627 628 network_list_2g.append({"channel": channel_2g}) 629 network_list_5g.append({"channel": channel_5g}) 630 631 networks_dict = self.get_psk_network( 632 mirror_ap, 633 self.user_params["reference_networks"], 634 hidden=hidden, 635 same_ssid=same_ssid) 636 self.reference_networks = self.user_params["reference_networks"] 637 638 network_list_2g.append(networks_dict["2g"]) 639 network_list_5g.append(networks_dict["5g"]) 640 641 # When same_ssid is set, only configure one set of WPA networks. 642 # We cannot have more than one set because duplicate interface names 643 # are not allowed. 644 # TODO(bmahadev): Provide option to select the type of network, 645 # instead of defaulting to WPA. 646 if not same_ssid: 647 networks_dict = self.get_open_network( 648 mirror_ap, 649 self.user_params["open_network"], 650 hidden=hidden, 651 same_ssid=same_ssid) 652 self.open_network = self.user_params["open_network"] 653 654 network_list_2g.append(networks_dict["2g"]) 655 network_list_5g.append(networks_dict["5g"]) 656 657 if wpa_network: 658 networks_dict = self.get_psk_network( 659 mirror_ap, 660 self.user_params["wpa_networks"], 661 hidden=hidden, 662 same_ssid=same_ssid, 663 security_mode=hostapd_constants.WPA_STRING) 664 self.wpa_networks = self.user_params["wpa_networks"] 665 666 network_list_2g.append(networks_dict["2g"]) 667 network_list_5g.append(networks_dict["5g"]) 668 669 if wep_network: 670 networks_dict = self.get_wep_network( 671 mirror_ap, 672 self.user_params["wep_networks"], 673 hidden=hidden, 674 same_ssid=same_ssid) 675 self.wep_networks = self.user_params["wep_networks"] 676 677 network_list_2g.append(networks_dict["2g"]) 678 network_list_5g.append(networks_dict["5g"]) 679 680 if ent_network: 681 networks_dict = self.get_open_network( 682 mirror_ap, 683 self.user_params["ent_networks"], 684 hidden=hidden, 685 same_ssid=same_ssid) 686 networks_dict["2g"][ 687 "security"] = hostapd_constants.ENT_STRING 688 networks_dict["2g"].update(radius_conf_2g) 689 networks_dict["5g"][ 690 "security"] = hostapd_constants.ENT_STRING 691 networks_dict["5g"].update(radius_conf_5g) 692 self.ent_networks = self.user_params["ent_networks"] 693 694 network_list_2g.append(networks_dict["2g"]) 695 network_list_5g.append(networks_dict["5g"]) 696 697 if ent_network_pwd: 698 networks_dict = self.get_open_network( 699 mirror_ap, 700 self.user_params["ent_networks_pwd"], 701 hidden=hidden, 702 same_ssid=same_ssid) 703 networks_dict["2g"][ 704 "security"] = hostapd_constants.ENT_STRING 705 networks_dict["2g"].update(radius_conf_pwd) 706 networks_dict["5g"][ 707 "security"] = hostapd_constants.ENT_STRING 708 networks_dict["5g"].update(radius_conf_pwd) 709 self.ent_networks_pwd = self.user_params[ 710 "ent_networks_pwd"] 711 712 network_list_2g.append(networks_dict["2g"]) 713 network_list_5g.append(networks_dict["5g"]) 714 715 orig_network_list_5g = copy.copy(network_list_5g) 716 orig_network_list_2g = copy.copy(network_list_2g) 717 718 if len(network_list_5g) > 1: 719 self.config_5g = self._generate_legacy_ap_config( 720 network_list_5g) 721 if len(network_list_2g) > 1: 722 self.config_2g = self._generate_legacy_ap_config( 723 network_list_2g) 724 725 self.access_points[count].start_ap(self.config_2g) 726 self.access_points[count].start_ap(self.config_5g) 727 self.populate_bssid(count, self.access_points[count], 728 orig_network_list_5g, orig_network_list_2g) 729 730 # Repeat configuration on the second router. 731 if mirror_ap and ap_count == 2: 732 self.access_points[AP_2].start_ap(self.config_2g) 733 self.access_points[AP_2].start_ap(self.config_5g) 734 self.populate_bssid(AP_2, self.access_points[AP_2], 735 orig_network_list_5g, orig_network_list_2g) 736 737 def _kill_processes(self, ap, daemon): 738 """ Kill hostapd and dhcpd daemons 739 740 Args: 741 ap: AP to cleanup 742 daemon: process to kill 743 744 Returns: True/False if killing process is successful 745 """ 746 self.log.info("Killing %s" % daemon) 747 pids = ap.ssh.run('pidof %s' % daemon, ignore_status=True) 748 if pids.stdout: 749 ap.ssh.run('kill %s' % pids.stdout, ignore_status=True) 750 time.sleep(3) 751 pids = ap.ssh.run('pidof %s' % daemon, ignore_status=True) 752 if pids.stdout: 753 return False 754 return True 755 756 def _cleanup_hostapd_and_dhcpd(self, count): 757 """ Check if AP was cleaned up properly 758 759 Kill hostapd and dhcpd processes if cleanup was not successful in the 760 last run 761 762 Args: 763 count: AP to check 764 765 Returns: 766 New AccessPoint object if AP required cleanup 767 768 Raises: 769 Error: if the AccessPoint timed out to setup 770 """ 771 ap = self.access_points[count] 772 phy_ifaces = ap.interfaces.get_physical_interface() 773 kill_hostapd = False 774 for iface in phy_ifaces: 775 if '2g_' in iface or '5g_' in iface or 'xg_' in iface: 776 kill_hostapd = True 777 break 778 779 if not kill_hostapd: 780 return 781 782 self.log.debug("Cleanup AP") 783 if not self._kill_processes(ap, 'hostapd') or \ 784 not self._kill_processes(ap, 'dhcpd'): 785 raise ("Failed to cleanup AP") 786 787 ap.__init__(self.user_params['AccessPoint'][count]) 788 789 def _generate_legacy_ap_config(self, network_list): 790 bss_settings = [] 791 wlan_2g = self.access_points[AP_1].wlan_2g 792 wlan_5g = self.access_points[AP_1].wlan_5g 793 ap_settings = network_list.pop(0) 794 # TODO:(bmahadev) This is a bug. We should not have to pop the first 795 # network in the list and treat it as a separate case. Instead, 796 # create_ap_preset() should be able to take NULL ssid and security and 797 # build config based on the bss_Settings alone. 798 hostapd_config_settings = network_list.pop(0) 799 for network in network_list: 800 if "password" in network: 801 bss_settings.append( 802 hostapd_bss_settings.BssSettings( 803 name=network["SSID"], 804 ssid=network["SSID"], 805 hidden=network["hiddenSSID"], 806 security=hostapd_security.Security( 807 security_mode=network["security"], 808 password=network["password"]))) 809 elif "wepKeys" in network: 810 bss_settings.append( 811 hostapd_bss_settings.BssSettings( 812 name=network["SSID"], 813 ssid=network["SSID"], 814 hidden=network["hiddenSSID"], 815 security=hostapd_security.Security( 816 security_mode=network["security"], 817 password=network["wepKeys"][0]))) 818 elif network["security"] == hostapd_constants.ENT_STRING: 819 bss_settings.append( 820 hostapd_bss_settings.BssSettings( 821 name=network["SSID"], 822 ssid=network["SSID"], 823 hidden=network["hiddenSSID"], 824 security=hostapd_security.Security( 825 security_mode=network["security"], 826 radius_server_ip=network["radius_server_ip"], 827 radius_server_port=network["radius_server_port"], 828 radius_server_secret=network[ 829 "radius_server_secret"]))) 830 else: 831 bss_settings.append( 832 hostapd_bss_settings.BssSettings( 833 name=network["SSID"], 834 ssid=network["SSID"], 835 hidden=network["hiddenSSID"])) 836 if "password" in hostapd_config_settings: 837 config = hostapd_ap_preset.create_ap_preset( 838 iface_wlan_2g=wlan_2g, 839 iface_wlan_5g=wlan_5g, 840 channel=ap_settings["channel"], 841 ssid=hostapd_config_settings["SSID"], 842 hidden=hostapd_config_settings["hiddenSSID"], 843 security=hostapd_security.Security( 844 security_mode=hostapd_config_settings["security"], 845 password=hostapd_config_settings["password"]), 846 bss_settings=bss_settings) 847 elif "wepKeys" in hostapd_config_settings: 848 config = hostapd_ap_preset.create_ap_preset( 849 iface_wlan_2g=wlan_2g, 850 iface_wlan_5g=wlan_5g, 851 channel=ap_settings["channel"], 852 ssid=hostapd_config_settings["SSID"], 853 hidden=hostapd_config_settings["hiddenSSID"], 854 security=hostapd_security.Security( 855 security_mode=hostapd_config_settings["security"], 856 password=hostapd_config_settings["wepKeys"][0]), 857 bss_settings=bss_settings) 858 else: 859 config = hostapd_ap_preset.create_ap_preset( 860 iface_wlan_2g=wlan_2g, 861 iface_wlan_5g=wlan_5g, 862 channel=ap_settings["channel"], 863 ssid=hostapd_config_settings["SSID"], 864 hidden=hostapd_config_settings["hiddenSSID"], 865 bss_settings=bss_settings) 866 return config 867 868 def configure_packet_capture( 869 self, 870 channel_5g=hostapd_constants.AP_DEFAULT_CHANNEL_5G, 871 channel_2g=hostapd_constants.AP_DEFAULT_CHANNEL_2G): 872 """Configure packet capture for 2G and 5G bands. 873 874 Args: 875 channel_5g: Channel to set the monitor mode to for 5G band. 876 channel_2g: Channel to set the monitor mode to for 2G band. 877 """ 878 self.packet_capture = self.packet_capture[0] 879 result = self.packet_capture.configure_monitor_mode( 880 hostapd_constants.BAND_2G, channel_2g) 881 if not result: 882 raise ValueError("Failed to configure channel for 2G band") 883 884 result = self.packet_capture.configure_monitor_mode( 885 hostapd_constants.BAND_5G, channel_5g) 886 if not result: 887 raise ValueError("Failed to configure channel for 5G band.") 888 889 @staticmethod 890 def wifi_test_wrap(fn): 891 def _safe_wrap_test_case(self, *args, **kwargs): 892 test_id = "%s:%s:%s" % (self.__class__.__name__, self.test_name, 893 self.log_begin_time.replace(' ', '-')) 894 self.test_id = test_id 895 self.result_detail = "" 896 tries = int(self.user_params.get("wifi_auto_rerun", 3)) 897 for ad in self.android_devices: 898 ad.log_path = self.log_path 899 for i in range(tries + 1): 900 result = True 901 if i > 0: 902 log_string = "[Test Case] RETRY:%s %s" % (i, 903 self.test_name) 904 self.log.info(log_string) 905 self._teardown_test(self.test_name) 906 self._setup_test(self.test_name) 907 try: 908 result = fn(self, *args, **kwargs) 909 except signals.TestFailure as e: 910 self.log.warn("Error msg: %s" % e) 911 if self.result_detail: 912 signal.details = self.result_detail 913 result = False 914 except signals.TestSignal: 915 if self.result_detail: 916 signal.details = self.result_detail 917 raise 918 except Exception as e: 919 self.log.exception(e) 920 asserts.fail(self.result_detail) 921 if result is False: 922 if i < tries: 923 continue 924 else: 925 break 926 if result is not False: 927 asserts.explicit_pass(self.result_detail) 928 else: 929 asserts.fail(self.result_detail) 930 931 return _safe_wrap_test_case 932