1"""Controller for Open WRT access point.""" 2 3import random 4import re 5import time 6from acts import logger 7from acts import signals 8from acts.controllers.ap_lib import hostapd_constants 9from acts.controllers.openwrt_lib import network_settings 10from acts.controllers.openwrt_lib import wireless_config 11from acts.controllers.openwrt_lib import wireless_settings_applier 12from acts.controllers.utils_lib.ssh import connection 13from acts.controllers.utils_lib.ssh import settings 14from acts.controllers.openwrt_lib.openwrt_constants import OpenWrtWifiSetting 15import yaml 16 17MOBLY_CONTROLLER_CONFIG_NAME = "OpenWrtAP" 18ACTS_CONTROLLER_REFERENCE_NAME = "access_points" 19OPEN_SECURITY = "none" 20PSK1_SECURITY = 'psk' 21PSK_SECURITY = "psk2" 22WEP_SECURITY = "wep" 23ENT_SECURITY = "wpa2" 24OWE_SECURITY = "owe" 25SAE_SECURITY = "sae" 26SAEMIXED_SECURITY = "sae-mixed" 27ENABLE_RADIO = "0" 28PMF_ENABLED = 2 29WIFI_2G = "wifi2g" 30WIFI_5G = "wifi5g" 31WAIT_TIME = 20 32DEFAULT_RADIOS = ("radio0", "radio1") 33 34 35def create(configs): 36 """Creates ap controllers from a json config. 37 38 Creates an ap controller from either a list, or a single element. The element 39 can either be just the hostname or a dictionary containing the hostname and 40 username of the AP to connect to over SSH. 41 42 Args: 43 configs: The json configs that represent this controller. 44 45 Returns: 46 AccessPoint object 47 48 Example: 49 Below is the config file entry for OpenWrtAP as a list. A testbed can have 50 1 or more APs to configure. Each AP has a "ssh_config" key to provide SSH 51 login information. OpenWrtAP#__init__() uses this to create SSH object. 52 53 "OpenWrtAP": [ 54 { 55 "ssh_config": { 56 "user" : "root", 57 "host" : "192.168.1.1" 58 } 59 }, 60 { 61 "ssh_config": { 62 "user" : "root", 63 "host" : "192.168.1.2" 64 } 65 } 66 ] 67 """ 68 return [OpenWrtAP(c) for c in configs] 69 70 71def destroy(aps): 72 """Destroys a list of AccessPoints. 73 74 Args: 75 aps: The list of AccessPoints to destroy. 76 """ 77 for ap in aps: 78 ap.close() 79 ap.close_ssh() 80 81 82def get_info(aps): 83 """Get information on a list of access points. 84 85 Args: 86 aps: A list of AccessPoints. 87 88 Returns: 89 A list of all aps hostname. 90 """ 91 return [ap.ssh_settings.hostname for ap in aps] 92 93 94class OpenWrtAP(object): 95 """An AccessPoint controller. 96 97 Attributes: 98 ssh: The ssh connection to the AP. 99 ssh_settings: The ssh settings being used by the ssh connection. 100 log: Logging object for AccessPoint. 101 wireless_setting: object holding wireless configuration. 102 network_setting: Object for network configuration 103 """ 104 105 def __init__(self, config): 106 """Initialize AP.""" 107 self.ssh_settings = settings.from_config(config["ssh_config"]) 108 self.ssh = connection.SshConnection(self.ssh_settings) 109 self.log = logger.create_logger( 110 lambda msg: "[OpenWrtAP|%s] %s" % (self.ssh_settings.hostname, msg)) 111 self.wireless_setting = None 112 self.network_setting = network_settings.NetworkSettings( 113 self.ssh, self.ssh_settings, self.log) 114 115 def configure_ap(self, wifi_configs, channel_2g, channel_5g): 116 """Configure AP with the required settings. 117 118 Each test class inherits WifiBaseTest. Based on the test, we may need to 119 configure PSK, WEP, OPEN, ENT networks on 2G and 5G bands in any 120 combination. We call WifiBaseTest methods get_psk_network(), 121 get_open_network(), get_wep_network() and get_ent_network() to create 122 dictionaries which contains this information. 'wifi_configs' is a list of 123 such dictionaries. Example below configures 2 WiFi networks - 1 PSK 2G and 124 1 Open 5G on one AP. configure_ap() is called from WifiBaseTest to 125 configure the APs. 126 127 wifi_configs = [ 128 { 129 '2g': { 130 'SSID': '2g_AkqXWPK4', 131 'security': 'psk2', 132 'password': 'YgYuXqDO9H', 133 'hiddenSSID': False 134 }, 135 }, 136 { 137 '5g': { 138 'SSID': '5g_8IcMR1Sg', 139 'security': 'none', 140 'hiddenSSID': False 141 }, 142 } 143 ] 144 145 Args: 146 wifi_configs: list of network settings for 2G and 5G bands. 147 channel_2g: channel for 2G band. 148 channel_5g: channel for 5G band. 149 """ 150 # generate wifi configs to configure 151 wireless_configs = self.generate_wireless_configs(wifi_configs) 152 self.wireless_setting = wireless_settings_applier.WirelessSettingsApplier( 153 self.ssh, wireless_configs, channel_2g, channel_5g) 154 self.wireless_setting.apply_wireless_settings() 155 156 def start_ap(self): 157 """Starts the AP with the settings in /etc/config/wireless.""" 158 self.ssh.run("wifi up") 159 curr_time = time.time() 160 while time.time() < curr_time + WAIT_TIME: 161 if self.get_wifi_status(): 162 return 163 time.sleep(3) 164 if not self.get_wifi_status(): 165 raise ValueError("Failed to turn on WiFi on the AP.") 166 167 def stop_ap(self): 168 """Stops the AP.""" 169 self.ssh.run("wifi down") 170 curr_time = time.time() 171 while time.time() < curr_time + WAIT_TIME: 172 if not self.get_wifi_status(): 173 return 174 time.sleep(3) 175 if self.get_wifi_status(): 176 raise ValueError("Failed to turn off WiFi on the AP.") 177 178 def get_bssids_for_wifi_networks(self): 179 """Get BSSIDs for wifi networks configured. 180 181 Returns: 182 Dictionary of SSID - BSSID map for both bands. 183 """ 184 bssid_map = {"2g": {}, "5g": {}} 185 for radio in ["radio0", "radio1"]: 186 ssid_ifname_map = self.get_ifnames_for_ssids(radio) 187 if radio == "radio0": 188 for ssid, ifname in ssid_ifname_map.items(): 189 bssid_map["5g"][ssid] = self.get_bssid(ifname) 190 elif radio == "radio1": 191 for ssid, ifname in ssid_ifname_map.items(): 192 bssid_map["2g"][ssid] = self.get_bssid(ifname) 193 return bssid_map 194 195 def get_wifi_status(self): 196 """Check if radios are up for both 2G and 5G bands. 197 198 Returns: 199 True if both radios are up. False if not. 200 """ 201 radios = ["radio0", "radio1"] 202 status = True 203 for radio in radios: 204 str_output = self.ssh.run("wifi status %s" % radio).stdout 205 wifi_status = yaml.load(str_output.replace("\t", "").replace("\n", ""), 206 Loader=yaml.FullLoader) 207 status = wifi_status[radio]["up"] and status 208 return status 209 210 def get_ifnames_for_ssids(self, radio): 211 """Get interfaces for wifi networks. 212 213 Args: 214 radio: 2g or 5g radio get the bssids from. 215 216 Returns: 217 dictionary of ssid - ifname mappings. 218 """ 219 ssid_ifname_map = {} 220 str_output = self.ssh.run("wifi status %s" % radio).stdout 221 wifi_status = yaml.load(str_output.replace("\t", "").replace("\n", ""), 222 Loader=yaml.FullLoader) 223 wifi_status = wifi_status[radio] 224 if wifi_status["up"]: 225 interfaces = wifi_status["interfaces"] 226 for config in interfaces: 227 ssid = config["config"]["ssid"] 228 ifname = config["ifname"] 229 ssid_ifname_map[ssid] = ifname 230 return ssid_ifname_map 231 232 def get_bssid(self, ifname): 233 """Get MAC address from an interface. 234 235 Args: 236 ifname: interface name of the corresponding MAC. 237 238 Returns: 239 BSSID of the interface. 240 """ 241 ifconfig = self.ssh.run("ifconfig %s" % ifname).stdout 242 mac_addr = ifconfig.split("\n")[0].split()[-1] 243 return mac_addr 244 245 def set_wpa_encryption(self, encryption): 246 """Set different encryptions to wpa or wpa2. 247 248 Args: 249 encryption: ccmp, tkip, or ccmp+tkip. 250 """ 251 str_output = self.ssh.run("wifi status").stdout 252 wifi_status = yaml.load(str_output.replace("\t", "").replace("\n", ""), 253 Loader=yaml.FullLoader) 254 255 # Counting how many interface are enabled. 256 total_interface = 0 257 for radio in ["radio0", "radio1"]: 258 num_interface = len(wifi_status[radio]['interfaces']) 259 total_interface += num_interface 260 261 # Iterates every interface to get and set wpa encryption. 262 default_extra_interface = 2 263 for i in range(total_interface + default_extra_interface): 264 origin_encryption = self.ssh.run( 265 'uci get wireless.@wifi-iface[{}].encryption'.format(i)).stdout 266 origin_psk_pattern = re.match(r'psk\b', origin_encryption) 267 target_psk_pattern = re.match(r'psk\b', encryption) 268 origin_psk2_pattern = re.match(r'psk2\b', origin_encryption) 269 target_psk2_pattern = re.match(r'psk2\b', encryption) 270 271 if origin_psk_pattern == target_psk_pattern: 272 self.ssh.run( 273 'uci set wireless.@wifi-iface[{}].encryption={}'.format( 274 i, encryption)) 275 276 if origin_psk2_pattern == target_psk2_pattern: 277 self.ssh.run( 278 'uci set wireless.@wifi-iface[{}].encryption={}'.format( 279 i, encryption)) 280 281 self.ssh.run("uci commit wireless") 282 self.ssh.run("wifi") 283 284 def set_password(self, pwd_5g=None, pwd_2g=None): 285 """Set password for individual interface. 286 287 Args: 288 pwd_5g: 8 ~ 63 chars, ascii letters and digits password for 5g network. 289 pwd_2g: 8 ~ 63 chars, ascii letters and digits password for 2g network. 290 """ 291 if pwd_5g: 292 if len(pwd_5g) < 8 or len(pwd_5g) > 63: 293 self.log.error("Password must be 8~63 characters long") 294 # Only accept ascii letters and digits 295 elif not re.match("^[A-Za-z0-9]*$", pwd_5g): 296 self.log.error("Password must only contains ascii letters and digits") 297 else: 298 self.ssh.run( 299 'uci set wireless.@wifi-iface[{}].key={}'.format(3, pwd_5g)) 300 self.log.info("Set 5G password to :{}".format(pwd_5g)) 301 302 if pwd_2g: 303 if len(pwd_2g) < 8 or len(pwd_2g) > 63: 304 self.log.error("Password must be 8~63 characters long") 305 # Only accept ascii letters and digits 306 elif not re.match("^[A-Za-z0-9]*$", pwd_2g): 307 self.log.error("Password must only contains ascii letters and digits") 308 else: 309 self.ssh.run( 310 'uci set wireless.@wifi-iface[{}].key={}'.format(2, pwd_2g)) 311 self.log.info("Set 2G password to :{}".format(pwd_2g)) 312 313 self.ssh.run("uci commit wireless") 314 self.ssh.run("wifi") 315 316 def set_ssid(self, ssid_5g=None, ssid_2g=None): 317 """Set SSID for individual interface. 318 319 Args: 320 ssid_5g: 8 ~ 63 chars for 5g network. 321 ssid_2g: 8 ~ 63 chars for 2g network. 322 """ 323 if ssid_5g: 324 if len(ssid_5g) < 8 or len(ssid_5g) > 63: 325 self.log.error("SSID must be 8~63 characters long") 326 # Only accept ascii letters and digits 327 else: 328 self.ssh.run( 329 'uci set wireless.@wifi-iface[{}].ssid={}'.format(3, ssid_5g)) 330 self.log.info("Set 5G SSID to :{}".format(ssid_5g)) 331 332 if ssid_2g: 333 if len(ssid_2g) < 8 or len(ssid_2g) > 63: 334 self.log.error("SSID must be 8~63 characters long") 335 # Only accept ascii letters and digits 336 else: 337 self.ssh.run( 338 'uci set wireless.@wifi-iface[{}].ssid={}'.format(2, ssid_2g)) 339 self.log.info("Set 2G SSID to :{}".format(ssid_2g)) 340 341 self.ssh.run("uci commit wireless") 342 self.ssh.run("wifi") 343 344 def generate_mobility_domain(self): 345 """Generate 4-character hexadecimal ID 346 347 Returns: String; a 4-character hexadecimal ID. 348 """ 349 md = "{:04x}".format(random.getrandbits(16)) 350 self.log.info("Mobility Domain ID: {}".format(md)) 351 return md 352 353 def enable_80211r(self, iface, md): 354 """Enable 802.11r for one single radio. 355 356 Args: 357 iface: index number of wifi-iface. 358 2: radio1 359 3: radio0 360 md: mobility domain. a 4-character hexadecimal ID. 361 Raises: TestSkip if 2g or 5g radio is not up or 802.11r is not enabled. 362 """ 363 str_output = self.ssh.run("wifi status").stdout 364 wifi_status = yaml.load(str_output.replace("\t", "").replace("\n", ""), 365 Loader=yaml.FullLoader) 366 # Check if the radio is up. 367 if iface == OpenWrtWifiSetting.IFACE_2G: 368 if wifi_status['radio1']['up']: 369 self.log.info("2g network is ENABLED") 370 else: 371 raise signals.TestSkip("2g network is NOT ENABLED") 372 elif iface == OpenWrtWifiSetting.IFACE_5G: 373 if wifi_status['radio0']['up']: 374 self.log.info("5g network is ENABLED") 375 else: 376 raise signals.TestSkip("5g network is NOT ENABLED") 377 378 # Setup 802.11r. 379 self.ssh.run( 380 "uci set wireless.@wifi-iface[{}].ieee80211r='1'".format(iface)) 381 self.ssh.run( 382 "uci set wireless.@wifi-iface[{}].ft_psk_generate_local='1'" 383 .format(iface)) 384 self.ssh.run( 385 "uci set wireless.@wifi-iface[{}].mobility_domain='{}'" 386 .format(iface, md)) 387 self.ssh.run( 388 "uci commit wireless") 389 self.ssh.run("wifi") 390 391 # Check if 802.11r is enabled. 392 result = self.ssh.run( 393 "uci get wireless.@wifi-iface[{}].ieee80211r".format(iface)).stdout 394 if result == '1': 395 self.log.info("802.11r is ENABLED") 396 else: 397 raise signals.TestSkip("802.11r is NOT ENABLED") 398 399 def generate_wireless_configs(self, wifi_configs): 400 """Generate wireless configs to configure. 401 402 Converts wifi_configs from configure_ap() to a list of 'WirelessConfig' 403 objects. Each object represents a wifi network to configure on the AP. 404 405 Args: 406 wifi_configs: Network list of different security types and bands. 407 408 Returns: 409 wireless configuration for openwrt AP. 410 """ 411 num_2g = 1 412 num_5g = 1 413 wireless_configs = [] 414 415 for i in range(len(wifi_configs)): 416 if hostapd_constants.BAND_2G in wifi_configs[i]: 417 config = wifi_configs[i][hostapd_constants.BAND_2G] 418 if config["security"] == PSK_SECURITY: 419 wireless_configs.append( 420 wireless_config.WirelessConfig("%s%s" % (WIFI_2G, num_2g), 421 config["SSID"], 422 config["security"], 423 hostapd_constants.BAND_2G, 424 password=config["password"], 425 hidden=config["hiddenSSID"], 426 ieee80211w=config["ieee80211w"])) 427 elif config["security"] == PSK1_SECURITY: 428 wireless_configs.append( 429 wireless_config.WirelessConfig("%s%s" % (WIFI_2G, num_2g), 430 config["SSID"], 431 config["security"], 432 hostapd_constants.BAND_2G, 433 password=config["password"], 434 hidden=config["hiddenSSID"], 435 ieee80211w=config["ieee80211w"])) 436 elif config["security"] == WEP_SECURITY: 437 wireless_configs.append( 438 wireless_config.WirelessConfig("%s%s" % (WIFI_2G, num_2g), 439 config["SSID"], 440 config["security"], 441 hostapd_constants.BAND_2G, 442 wep_key=config["wepKeys"][0], 443 hidden=config["hiddenSSID"])) 444 elif config["security"] == OPEN_SECURITY: 445 wireless_configs.append( 446 wireless_config.WirelessConfig("%s%s" % (WIFI_2G, num_2g), 447 config["SSID"], 448 config["security"], 449 hostapd_constants.BAND_2G, 450 hidden=config["hiddenSSID"])) 451 elif config["security"] == OWE_SECURITY: 452 wireless_configs.append( 453 wireless_config.WirelessConfig("%s%s" % (WIFI_2G, num_2g), 454 config["SSID"], 455 config["security"], 456 hostapd_constants.BAND_2G, 457 hidden=config["hiddenSSID"], 458 ieee80211w=PMF_ENABLED)) 459 elif config["security"] == SAE_SECURITY: 460 wireless_configs.append( 461 wireless_config.WirelessConfig("%s%s" % (WIFI_2G, num_2g), 462 config["SSID"], 463 config["security"], 464 hostapd_constants.BAND_2G, 465 password=config["password"], 466 hidden=config["hiddenSSID"], 467 ieee80211w=PMF_ENABLED)) 468 elif config["security"] == SAEMIXED_SECURITY: 469 wireless_configs.append( 470 wireless_config.WirelessConfig("%s%s" % (WIFI_2G, num_2g), 471 config["SSID"], 472 config["security"], 473 hostapd_constants.BAND_2G, 474 password=config["password"], 475 hidden=config["hiddenSSID"], 476 ieee80211w=config["ieee80211w"])) 477 elif config["security"] == ENT_SECURITY: 478 wireless_configs.append( 479 wireless_config.WirelessConfig( 480 "%s%s" % (WIFI_2G, num_2g), 481 config["SSID"], 482 config["security"], 483 hostapd_constants.BAND_2G, 484 radius_server_ip=config["radius_server_ip"], 485 radius_server_port=config["radius_server_port"], 486 radius_server_secret=config["radius_server_secret"], 487 hidden=config["hiddenSSID"])) 488 num_2g += 1 489 if hostapd_constants.BAND_5G in wifi_configs[i]: 490 config = wifi_configs[i][hostapd_constants.BAND_5G] 491 if config["security"] == PSK_SECURITY: 492 wireless_configs.append( 493 wireless_config.WirelessConfig("%s%s" % (WIFI_5G, num_5g), 494 config["SSID"], 495 config["security"], 496 hostapd_constants.BAND_5G, 497 password=config["password"], 498 hidden=config["hiddenSSID"], 499 ieee80211w=config["ieee80211w"])) 500 elif config["security"] == PSK1_SECURITY: 501 wireless_configs.append( 502 wireless_config.WirelessConfig("%s%s" % (WIFI_5G, num_5g), 503 config["SSID"], 504 config["security"], 505 hostapd_constants.BAND_5G, 506 password=config["password"], 507 hidden=config["hiddenSSID"], 508 ieee80211w=config["ieee80211w"])) 509 elif config["security"] == WEP_SECURITY: 510 wireless_configs.append( 511 wireless_config.WirelessConfig("%s%s" % (WIFI_5G, num_5g), 512 config["SSID"], 513 config["security"], 514 hostapd_constants.BAND_5G, 515 wep_key=config["wepKeys"][0], 516 hidden=config["hiddenSSID"])) 517 elif config["security"] == OPEN_SECURITY: 518 wireless_configs.append( 519 wireless_config.WirelessConfig("%s%s" % (WIFI_5G, num_5g), 520 config["SSID"], 521 config["security"], 522 hostapd_constants.BAND_5G, 523 hidden=config["hiddenSSID"])) 524 elif config["security"] == OWE_SECURITY: 525 wireless_configs.append( 526 wireless_config.WirelessConfig("%s%s" % (WIFI_5G, num_5g), 527 config["SSID"], 528 config["security"], 529 hostapd_constants.BAND_5G, 530 hidden=config["hiddenSSID"], 531 ieee80211w=PMF_ENABLED)) 532 elif config["security"] == SAE_SECURITY: 533 wireless_configs.append( 534 wireless_config.WirelessConfig("%s%s" % (WIFI_5G, num_5g), 535 config["SSID"], 536 config["security"], 537 hostapd_constants.BAND_5G, 538 password=config["password"], 539 hidden=config["hiddenSSID"], 540 ieee80211w=PMF_ENABLED)) 541 elif config["security"] == SAEMIXED_SECURITY: 542 wireless_configs.append( 543 wireless_config.WirelessConfig("%s%s" % (WIFI_5G, num_5g), 544 config["SSID"], 545 config["security"], 546 hostapd_constants.BAND_5G, 547 password=config["password"], 548 hidden=config["hiddenSSID"], 549 ieee80211w=config["ieee80211w"])) 550 elif config["security"] == ENT_SECURITY: 551 wireless_configs.append( 552 wireless_config.WirelessConfig( 553 "%s%s" % (WIFI_5G, num_5g), 554 config["SSID"], 555 config["security"], 556 hostapd_constants.BAND_5G, 557 radius_server_ip=config["radius_server_ip"], 558 radius_server_port=config["radius_server_port"], 559 radius_server_secret=config["radius_server_secret"], 560 hidden=config["hiddenSSID"])) 561 num_5g += 1 562 563 return wireless_configs 564 565 def get_wifi_network(self, security=None, band=None): 566 """Return first match wifi interface's config. 567 568 Args: 569 security: psk2 or none 570 band: '2g' or '5g' 571 572 Returns: 573 A dict contains match wifi interface's config. 574 """ 575 576 for wifi_iface in self.wireless_setting.wireless_configs: 577 match_list = [] 578 wifi_network = wifi_iface.__dict__ 579 if security: 580 match_list.append(security == wifi_network["security"]) 581 if band: 582 match_list.append(band == wifi_network["band"]) 583 584 if all(match_list): 585 wifi_network["SSID"] = wifi_network["ssid"] 586 if not wifi_network["password"]: 587 del wifi_network["password"] 588 return wifi_network 589 return None 590 591 def get_wifi_status(self, radios=DEFAULT_RADIOS): 592 """Check if radios are up. Default are 2G and 5G bands. 593 594 Args: 595 radios: Wifi interfaces for check status. 596 Returns: 597 True if both radios are up. False if not. 598 """ 599 status = True 600 for radio in radios: 601 str_output = self.ssh.run("wifi status %s" % radio).stdout 602 wifi_status = yaml.load(str_output.replace("\t", "").replace("\n", ""), 603 Loader=yaml.FullLoader) 604 status = wifi_status[radio]["up"] and status 605 return status 606 607 def verify_wifi_status(self, radios=DEFAULT_RADIOS, timeout=20): 608 """Ensure wifi interfaces are ready. 609 610 Args: 611 radios: Wifi interfaces for check status. 612 timeout: An integer that is the number of times to try 613 wait for interface ready. 614 Returns: 615 True if both radios are up. False if not. 616 """ 617 start_time = time.time() 618 end_time = start_time + timeout 619 while time.time() < end_time: 620 if self.get_wifi_status(radios): 621 return True 622 time.sleep(1) 623 return False 624 625 def close(self): 626 """Reset wireless and network settings to default and stop AP.""" 627 if self.network_setting.config: 628 self.network_setting.cleanup_network_settings() 629 if self.wireless_setting: 630 self.wireless_setting.cleanup_wireless_settings() 631 632 def close_ssh(self): 633 """Close SSH connection to AP.""" 634 self.ssh.close() 635