1"""Controller for Open WRT access point."""
2
3import random
4import re
5import time
6from acts import logger
7from acts import signals
8from acts.controllers.ap_lib import hostapd_constants
9from acts.controllers.openwrt_lib import network_settings
10from acts.controllers.openwrt_lib import wireless_config
11from acts.controllers.openwrt_lib import wireless_settings_applier
12from acts.controllers.utils_lib.ssh import connection
13from acts.controllers.utils_lib.ssh import settings
14from acts.controllers.openwrt_lib.openwrt_constants import OpenWrtWifiSetting
15import yaml
16
17MOBLY_CONTROLLER_CONFIG_NAME = "OpenWrtAP"
18ACTS_CONTROLLER_REFERENCE_NAME = "access_points"
19OPEN_SECURITY = "none"
20PSK1_SECURITY = 'psk'
21PSK_SECURITY = "psk2"
22WEP_SECURITY = "wep"
23ENT_SECURITY = "wpa2"
24OWE_SECURITY = "owe"
25SAE_SECURITY = "sae"
26SAEMIXED_SECURITY = "sae-mixed"
27ENABLE_RADIO = "0"
28PMF_ENABLED = 2
29WIFI_2G = "wifi2g"
30WIFI_5G = "wifi5g"
31WAIT_TIME = 20
32DEFAULT_RADIOS = ("radio0", "radio1")
33
34
35def create(configs):
36  """Creates ap controllers from a json config.
37
38  Creates an ap controller from either a list, or a single element. The element
39  can either be just the hostname or a dictionary containing the hostname and
40  username of the AP to connect to over SSH.
41
42  Args:
43    configs: The json configs that represent this controller.
44
45  Returns:
46    AccessPoint object
47
48  Example:
49    Below is the config file entry for OpenWrtAP as a list. A testbed can have
50    1 or more APs to configure. Each AP has a "ssh_config" key to provide SSH
51    login information. OpenWrtAP#__init__() uses this to create SSH object.
52
53      "OpenWrtAP": [
54        {
55          "ssh_config": {
56            "user" : "root",
57            "host" : "192.168.1.1"
58          }
59        },
60        {
61          "ssh_config": {
62            "user" : "root",
63            "host" : "192.168.1.2"
64          }
65        }
66      ]
67  """
68  return [OpenWrtAP(c) for c in configs]
69
70
71def destroy(aps):
72  """Destroys a list of AccessPoints.
73
74  Args:
75    aps: The list of AccessPoints to destroy.
76  """
77  for ap in aps:
78    ap.close()
79    ap.close_ssh()
80
81
82def get_info(aps):
83  """Get information on a list of access points.
84
85  Args:
86    aps: A list of AccessPoints.
87
88  Returns:
89    A list of all aps hostname.
90  """
91  return [ap.ssh_settings.hostname for ap in aps]
92
93
94class OpenWrtAP(object):
95  """An AccessPoint controller.
96
97  Attributes:
98    ssh: The ssh connection to the AP.
99    ssh_settings: The ssh settings being used by the ssh connection.
100    log: Logging object for AccessPoint.
101    wireless_setting: object holding wireless configuration.
102    network_setting: Object for network configuration
103  """
104
105  def __init__(self, config):
106    """Initialize AP."""
107    self.ssh_settings = settings.from_config(config["ssh_config"])
108    self.ssh = connection.SshConnection(self.ssh_settings)
109    self.log = logger.create_logger(
110        lambda msg: "[OpenWrtAP|%s] %s" % (self.ssh_settings.hostname, msg))
111    self.wireless_setting = None
112    self.network_setting = network_settings.NetworkSettings(
113        self.ssh, self.ssh_settings, self.log)
114
115  def configure_ap(self, wifi_configs, channel_2g, channel_5g):
116    """Configure AP with the required settings.
117
118    Each test class inherits WifiBaseTest. Based on the test, we may need to
119    configure PSK, WEP, OPEN, ENT networks on 2G and 5G bands in any
120    combination. We call WifiBaseTest methods get_psk_network(),
121    get_open_network(), get_wep_network() and get_ent_network() to create
122    dictionaries which contains this information. 'wifi_configs' is a list of
123    such dictionaries. Example below configures 2 WiFi networks - 1 PSK 2G and
124    1 Open 5G on one AP. configure_ap() is called from WifiBaseTest to
125    configure the APs.
126
127    wifi_configs = [
128      {
129        '2g': {
130          'SSID': '2g_AkqXWPK4',
131          'security': 'psk2',
132          'password': 'YgYuXqDO9H',
133          'hiddenSSID': False
134        },
135      },
136      {
137        '5g': {
138          'SSID': '5g_8IcMR1Sg',
139          'security': 'none',
140          'hiddenSSID': False
141        },
142      }
143    ]
144
145    Args:
146      wifi_configs: list of network settings for 2G and 5G bands.
147      channel_2g: channel for 2G band.
148      channel_5g: channel for 5G band.
149    """
150    # generate wifi configs to configure
151    wireless_configs = self.generate_wireless_configs(wifi_configs)
152    self.wireless_setting = wireless_settings_applier.WirelessSettingsApplier(
153        self.ssh, wireless_configs, channel_2g, channel_5g)
154    self.wireless_setting.apply_wireless_settings()
155
156  def start_ap(self):
157    """Starts the AP with the settings in /etc/config/wireless."""
158    self.ssh.run("wifi up")
159    curr_time = time.time()
160    while time.time() < curr_time + WAIT_TIME:
161      if self.get_wifi_status():
162        return
163      time.sleep(3)
164    if not self.get_wifi_status():
165      raise ValueError("Failed to turn on WiFi on the AP.")
166
167  def stop_ap(self):
168    """Stops the AP."""
169    self.ssh.run("wifi down")
170    curr_time = time.time()
171    while time.time() < curr_time + WAIT_TIME:
172      if not self.get_wifi_status():
173        return
174      time.sleep(3)
175    if self.get_wifi_status():
176      raise ValueError("Failed to turn off WiFi on the AP.")
177
178  def get_bssids_for_wifi_networks(self):
179    """Get BSSIDs for wifi networks configured.
180
181    Returns:
182      Dictionary of SSID - BSSID map for both bands.
183    """
184    bssid_map = {"2g": {}, "5g": {}}
185    for radio in ["radio0", "radio1"]:
186      ssid_ifname_map = self.get_ifnames_for_ssids(radio)
187      if radio == "radio0":
188        for ssid, ifname in ssid_ifname_map.items():
189          bssid_map["5g"][ssid] = self.get_bssid(ifname)
190      elif radio == "radio1":
191        for ssid, ifname in ssid_ifname_map.items():
192          bssid_map["2g"][ssid] = self.get_bssid(ifname)
193    return bssid_map
194
195  def get_wifi_status(self):
196    """Check if radios are up for both 2G and 5G bands.
197
198    Returns:
199      True if both radios are up. False if not.
200    """
201    radios = ["radio0", "radio1"]
202    status = True
203    for radio in radios:
204      str_output = self.ssh.run("wifi status %s" % radio).stdout
205      wifi_status = yaml.load(str_output.replace("\t", "").replace("\n", ""),
206                              Loader=yaml.FullLoader)
207      status = wifi_status[radio]["up"] and status
208    return status
209
210  def get_ifnames_for_ssids(self, radio):
211    """Get interfaces for wifi networks.
212
213    Args:
214      radio: 2g or 5g radio get the bssids from.
215
216    Returns:
217      dictionary of ssid - ifname mappings.
218    """
219    ssid_ifname_map = {}
220    str_output = self.ssh.run("wifi status %s" % radio).stdout
221    wifi_status = yaml.load(str_output.replace("\t", "").replace("\n", ""),
222                            Loader=yaml.FullLoader)
223    wifi_status = wifi_status[radio]
224    if wifi_status["up"]:
225      interfaces = wifi_status["interfaces"]
226      for config in interfaces:
227        ssid = config["config"]["ssid"]
228        ifname = config["ifname"]
229        ssid_ifname_map[ssid] = ifname
230    return ssid_ifname_map
231
232  def get_bssid(self, ifname):
233    """Get MAC address from an interface.
234
235    Args:
236      ifname: interface name of the corresponding MAC.
237
238    Returns:
239      BSSID of the interface.
240    """
241    ifconfig = self.ssh.run("ifconfig %s" % ifname).stdout
242    mac_addr = ifconfig.split("\n")[0].split()[-1]
243    return mac_addr
244
245  def set_wpa_encryption(self, encryption):
246    """Set different encryptions to wpa or wpa2.
247
248    Args:
249      encryption: ccmp, tkip, or ccmp+tkip.
250    """
251    str_output = self.ssh.run("wifi status").stdout
252    wifi_status = yaml.load(str_output.replace("\t", "").replace("\n", ""),
253                            Loader=yaml.FullLoader)
254
255    # Counting how many interface are enabled.
256    total_interface = 0
257    for radio in ["radio0", "radio1"]:
258      num_interface = len(wifi_status[radio]['interfaces'])
259      total_interface += num_interface
260
261    # Iterates every interface to get and set wpa encryption.
262    default_extra_interface = 2
263    for i in range(total_interface + default_extra_interface):
264      origin_encryption = self.ssh.run(
265          'uci get wireless.@wifi-iface[{}].encryption'.format(i)).stdout
266      origin_psk_pattern = re.match(r'psk\b', origin_encryption)
267      target_psk_pattern = re.match(r'psk\b', encryption)
268      origin_psk2_pattern = re.match(r'psk2\b', origin_encryption)
269      target_psk2_pattern = re.match(r'psk2\b', encryption)
270
271      if origin_psk_pattern == target_psk_pattern:
272        self.ssh.run(
273            'uci set wireless.@wifi-iface[{}].encryption={}'.format(
274                i, encryption))
275
276      if origin_psk2_pattern == target_psk2_pattern:
277        self.ssh.run(
278            'uci set wireless.@wifi-iface[{}].encryption={}'.format(
279                i, encryption))
280
281    self.ssh.run("uci commit wireless")
282    self.ssh.run("wifi")
283
284  def set_password(self, pwd_5g=None, pwd_2g=None):
285    """Set password for individual interface.
286
287    Args:
288        pwd_5g: 8 ~ 63 chars, ascii letters and digits password for 5g network.
289        pwd_2g: 8 ~ 63 chars, ascii letters and digits password for 2g network.
290    """
291    if pwd_5g:
292      if len(pwd_5g) < 8 or len(pwd_5g) > 63:
293        self.log.error("Password must be 8~63 characters long")
294      # Only accept ascii letters and digits
295      elif not re.match("^[A-Za-z0-9]*$", pwd_5g):
296        self.log.error("Password must only contains ascii letters and digits")
297      else:
298        self.ssh.run(
299            'uci set wireless.@wifi-iface[{}].key={}'.format(3, pwd_5g))
300        self.log.info("Set 5G password to :{}".format(pwd_5g))
301
302    if pwd_2g:
303      if len(pwd_2g) < 8 or len(pwd_2g) > 63:
304        self.log.error("Password must be 8~63 characters long")
305      # Only accept ascii letters and digits
306      elif not re.match("^[A-Za-z0-9]*$", pwd_2g):
307        self.log.error("Password must only contains ascii letters and digits")
308      else:
309        self.ssh.run(
310            'uci set wireless.@wifi-iface[{}].key={}'.format(2, pwd_2g))
311        self.log.info("Set 2G password to :{}".format(pwd_2g))
312
313    self.ssh.run("uci commit wireless")
314    self.ssh.run("wifi")
315
316  def set_ssid(self, ssid_5g=None, ssid_2g=None):
317    """Set SSID for individual interface.
318
319    Args:
320        ssid_5g: 8 ~ 63 chars for 5g network.
321        ssid_2g: 8 ~ 63 chars for 2g network.
322    """
323    if ssid_5g:
324      if len(ssid_5g) < 8 or len(ssid_5g) > 63:
325        self.log.error("SSID must be 8~63 characters long")
326      # Only accept ascii letters and digits
327      else:
328        self.ssh.run(
329            'uci set wireless.@wifi-iface[{}].ssid={}'.format(3, ssid_5g))
330        self.log.info("Set 5G SSID to :{}".format(ssid_5g))
331
332    if ssid_2g:
333      if len(ssid_2g) < 8 or len(ssid_2g) > 63:
334        self.log.error("SSID must be 8~63 characters long")
335      # Only accept ascii letters and digits
336      else:
337        self.ssh.run(
338            'uci set wireless.@wifi-iface[{}].ssid={}'.format(2, ssid_2g))
339        self.log.info("Set 2G SSID to :{}".format(ssid_2g))
340
341    self.ssh.run("uci commit wireless")
342    self.ssh.run("wifi")
343
344  def generate_mobility_domain(self):
345      """Generate 4-character hexadecimal ID
346
347      Returns: String; a 4-character hexadecimal ID.
348      """
349      md = "{:04x}".format(random.getrandbits(16))
350      self.log.info("Mobility Domain ID: {}".format(md))
351      return md
352
353  def enable_80211r(self, iface, md):
354    """Enable 802.11r for one single radio.
355
356     Args:
357       iface: index number of wifi-iface.
358              2: radio1
359              3: radio0
360       md: mobility domain. a 4-character hexadecimal ID.
361    Raises: TestSkip if 2g or 5g radio is not up or 802.11r is not enabled.
362     """
363    str_output = self.ssh.run("wifi status").stdout
364    wifi_status = yaml.load(str_output.replace("\t", "").replace("\n", ""),
365                            Loader=yaml.FullLoader)
366    # Check if the radio is up.
367    if iface == OpenWrtWifiSetting.IFACE_2G:
368      if wifi_status['radio1']['up']:
369        self.log.info("2g network is ENABLED")
370      else:
371        raise signals.TestSkip("2g network is NOT ENABLED")
372    elif iface == OpenWrtWifiSetting.IFACE_5G:
373      if wifi_status['radio0']['up']:
374        self.log.info("5g network is ENABLED")
375      else:
376        raise signals.TestSkip("5g network is NOT ENABLED")
377
378    # Setup 802.11r.
379    self.ssh.run(
380        "uci set wireless.@wifi-iface[{}].ieee80211r='1'".format(iface))
381    self.ssh.run(
382        "uci set wireless.@wifi-iface[{}].ft_psk_generate_local='1'"
383          .format(iface))
384    self.ssh.run(
385        "uci set wireless.@wifi-iface[{}].mobility_domain='{}'"
386          .format(iface, md))
387    self.ssh.run(
388        "uci commit wireless")
389    self.ssh.run("wifi")
390
391    # Check if 802.11r is enabled.
392    result = self.ssh.run(
393        "uci get wireless.@wifi-iface[{}].ieee80211r".format(iface)).stdout
394    if result == '1':
395      self.log.info("802.11r is ENABLED")
396    else:
397      raise signals.TestSkip("802.11r is NOT ENABLED")
398
399  def generate_wireless_configs(self, wifi_configs):
400    """Generate wireless configs to configure.
401
402    Converts wifi_configs from configure_ap() to a list of 'WirelessConfig'
403    objects. Each object represents a wifi network to configure on the AP.
404
405    Args:
406      wifi_configs: Network list of different security types and bands.
407
408    Returns:
409      wireless configuration for openwrt AP.
410    """
411    num_2g = 1
412    num_5g = 1
413    wireless_configs = []
414
415    for i in range(len(wifi_configs)):
416      if hostapd_constants.BAND_2G in wifi_configs[i]:
417        config = wifi_configs[i][hostapd_constants.BAND_2G]
418        if config["security"] == PSK_SECURITY:
419          wireless_configs.append(
420              wireless_config.WirelessConfig("%s%s" % (WIFI_2G, num_2g),
421                                             config["SSID"],
422                                             config["security"],
423                                             hostapd_constants.BAND_2G,
424                                             password=config["password"],
425                                             hidden=config["hiddenSSID"],
426                                             ieee80211w=config["ieee80211w"]))
427        elif config["security"] == PSK1_SECURITY:
428          wireless_configs.append(
429              wireless_config.WirelessConfig("%s%s" % (WIFI_2G, num_2g),
430                                             config["SSID"],
431                                             config["security"],
432                                             hostapd_constants.BAND_2G,
433                                             password=config["password"],
434                                             hidden=config["hiddenSSID"],
435                                             ieee80211w=config["ieee80211w"]))
436        elif config["security"] == WEP_SECURITY:
437          wireless_configs.append(
438              wireless_config.WirelessConfig("%s%s" % (WIFI_2G, num_2g),
439                                             config["SSID"],
440                                             config["security"],
441                                             hostapd_constants.BAND_2G,
442                                             wep_key=config["wepKeys"][0],
443                                             hidden=config["hiddenSSID"]))
444        elif config["security"] == OPEN_SECURITY:
445          wireless_configs.append(
446              wireless_config.WirelessConfig("%s%s" % (WIFI_2G, num_2g),
447                                             config["SSID"],
448                                             config["security"],
449                                             hostapd_constants.BAND_2G,
450                                             hidden=config["hiddenSSID"]))
451        elif config["security"] == OWE_SECURITY:
452          wireless_configs.append(
453              wireless_config.WirelessConfig("%s%s" % (WIFI_2G, num_2g),
454                                             config["SSID"],
455                                             config["security"],
456                                             hostapd_constants.BAND_2G,
457                                             hidden=config["hiddenSSID"],
458                                             ieee80211w=PMF_ENABLED))
459        elif config["security"] == SAE_SECURITY:
460          wireless_configs.append(
461              wireless_config.WirelessConfig("%s%s" % (WIFI_2G, num_2g),
462                                             config["SSID"],
463                                             config["security"],
464                                             hostapd_constants.BAND_2G,
465                                             password=config["password"],
466                                             hidden=config["hiddenSSID"],
467                                             ieee80211w=PMF_ENABLED))
468        elif config["security"] == SAEMIXED_SECURITY:
469          wireless_configs.append(
470              wireless_config.WirelessConfig("%s%s" % (WIFI_2G, num_2g),
471                                             config["SSID"],
472                                             config["security"],
473                                             hostapd_constants.BAND_2G,
474                                             password=config["password"],
475                                             hidden=config["hiddenSSID"],
476                                             ieee80211w=config["ieee80211w"]))
477        elif config["security"] == ENT_SECURITY:
478          wireless_configs.append(
479              wireless_config.WirelessConfig(
480                  "%s%s" % (WIFI_2G, num_2g),
481                  config["SSID"],
482                  config["security"],
483                  hostapd_constants.BAND_2G,
484                  radius_server_ip=config["radius_server_ip"],
485                  radius_server_port=config["radius_server_port"],
486                  radius_server_secret=config["radius_server_secret"],
487                  hidden=config["hiddenSSID"]))
488        num_2g += 1
489      if hostapd_constants.BAND_5G in wifi_configs[i]:
490        config = wifi_configs[i][hostapd_constants.BAND_5G]
491        if config["security"] == PSK_SECURITY:
492          wireless_configs.append(
493              wireless_config.WirelessConfig("%s%s" % (WIFI_5G, num_5g),
494                                             config["SSID"],
495                                             config["security"],
496                                             hostapd_constants.BAND_5G,
497                                             password=config["password"],
498                                             hidden=config["hiddenSSID"],
499                                             ieee80211w=config["ieee80211w"]))
500        elif config["security"] == PSK1_SECURITY:
501          wireless_configs.append(
502              wireless_config.WirelessConfig("%s%s" % (WIFI_5G, num_5g),
503                                             config["SSID"],
504                                             config["security"],
505                                             hostapd_constants.BAND_5G,
506                                             password=config["password"],
507                                             hidden=config["hiddenSSID"],
508                                             ieee80211w=config["ieee80211w"]))
509        elif config["security"] == WEP_SECURITY:
510          wireless_configs.append(
511              wireless_config.WirelessConfig("%s%s" % (WIFI_5G, num_5g),
512                                             config["SSID"],
513                                             config["security"],
514                                             hostapd_constants.BAND_5G,
515                                             wep_key=config["wepKeys"][0],
516                                             hidden=config["hiddenSSID"]))
517        elif config["security"] == OPEN_SECURITY:
518          wireless_configs.append(
519              wireless_config.WirelessConfig("%s%s" % (WIFI_5G, num_5g),
520                                             config["SSID"],
521                                             config["security"],
522                                             hostapd_constants.BAND_5G,
523                                             hidden=config["hiddenSSID"]))
524        elif config["security"] == OWE_SECURITY:
525          wireless_configs.append(
526              wireless_config.WirelessConfig("%s%s" % (WIFI_5G, num_5g),
527                                             config["SSID"],
528                                             config["security"],
529                                             hostapd_constants.BAND_5G,
530                                             hidden=config["hiddenSSID"],
531                                             ieee80211w=PMF_ENABLED))
532        elif config["security"] == SAE_SECURITY:
533          wireless_configs.append(
534              wireless_config.WirelessConfig("%s%s" % (WIFI_5G, num_5g),
535                                             config["SSID"],
536                                             config["security"],
537                                             hostapd_constants.BAND_5G,
538                                             password=config["password"],
539                                             hidden=config["hiddenSSID"],
540                                             ieee80211w=PMF_ENABLED))
541        elif config["security"] == SAEMIXED_SECURITY:
542          wireless_configs.append(
543              wireless_config.WirelessConfig("%s%s" % (WIFI_5G, num_5g),
544                                             config["SSID"],
545                                             config["security"],
546                                             hostapd_constants.BAND_5G,
547                                             password=config["password"],
548                                             hidden=config["hiddenSSID"],
549                                             ieee80211w=config["ieee80211w"]))
550        elif config["security"] == ENT_SECURITY:
551          wireless_configs.append(
552              wireless_config.WirelessConfig(
553                  "%s%s" % (WIFI_5G, num_5g),
554                  config["SSID"],
555                  config["security"],
556                  hostapd_constants.BAND_5G,
557                  radius_server_ip=config["radius_server_ip"],
558                  radius_server_port=config["radius_server_port"],
559                  radius_server_secret=config["radius_server_secret"],
560                  hidden=config["hiddenSSID"]))
561        num_5g += 1
562
563    return wireless_configs
564
565  def get_wifi_network(self, security=None, band=None):
566    """Return first match wifi interface's config.
567
568    Args:
569      security: psk2 or none
570      band: '2g' or '5g'
571
572    Returns:
573      A dict contains match wifi interface's config.
574    """
575
576    for wifi_iface in self.wireless_setting.wireless_configs:
577      match_list = []
578      wifi_network = wifi_iface.__dict__
579      if security:
580        match_list.append(security == wifi_network["security"])
581      if band:
582        match_list.append(band == wifi_network["band"])
583
584      if all(match_list):
585        wifi_network["SSID"] = wifi_network["ssid"]
586        if not wifi_network["password"]:
587          del wifi_network["password"]
588        return wifi_network
589    return None
590
591  def get_wifi_status(self, radios=DEFAULT_RADIOS):
592    """Check if radios are up. Default are 2G and 5G bands.
593
594    Args:
595      radios: Wifi interfaces for check status.
596    Returns:
597      True if both radios are up. False if not.
598    """
599    status = True
600    for radio in radios:
601      str_output = self.ssh.run("wifi status %s" % radio).stdout
602      wifi_status = yaml.load(str_output.replace("\t", "").replace("\n", ""),
603                              Loader=yaml.FullLoader)
604      status = wifi_status[radio]["up"] and status
605    return status
606
607  def verify_wifi_status(self, radios=DEFAULT_RADIOS, timeout=20):
608    """Ensure wifi interfaces are ready.
609
610    Args:
611      radios: Wifi interfaces for check status.
612      timeout: An integer that is the number of times to try
613               wait for interface ready.
614    Returns:
615      True if both radios are up. False if not.
616    """
617    start_time = time.time()
618    end_time = start_time + timeout
619    while time.time() < end_time:
620      if self.get_wifi_status(radios):
621        return True
622      time.sleep(1)
623    return False
624
625  def close(self):
626    """Reset wireless and network settings to default and stop AP."""
627    if self.network_setting.config:
628      self.network_setting.cleanup_network_settings()
629    if self.wireless_setting:
630      self.wireless_setting.cleanup_wireless_settings()
631
632  def close_ssh(self):
633    """Close SSH connection to AP."""
634    self.ssh.close()
635