1#!/usr/bin/env python3 2# 3# Copyright 2017 Google, Inc. 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import logging 18import time 19from acts import utils 20from acts.libs.proc import job 21from acts.controllers.ap_lib import bridge_interface as bi 22from acts_contrib.test_utils.wifi import wifi_test_utils as wutils 23from acts.controllers.ap_lib import hostapd_security 24from acts.controllers.ap_lib import hostapd_ap_preset 25 26# http://www.secdev.org/projects/scapy/ 27# On ubuntu, sudo pip3 install scapy 28import scapy.all as scapy 29 30GET_FROM_PHONE = 'get_from_dut' 31GET_FROM_AP = 'get_from_ap' 32ENABLED_MODULATED_DTIM = 'gEnableModulatedDTIM=' 33MAX_MODULATED_DTIM = 'gMaxLIModulatedDTIM=' 34 35 36def change_dtim(ad, gEnableModulatedDTIM, gMaxLIModulatedDTIM=10): 37 """Function to change the DTIM setting in the phone. 38 39 Args: 40 ad: the target android device, AndroidDevice object 41 gEnableModulatedDTIM: Modulated DTIM, int 42 gMaxLIModulatedDTIM: Maximum modulated DTIM, int 43 """ 44 # First trying to find the ini file with DTIM settings 45 ini_file_phone = ad.adb.shell('ls /vendor/firmware/wlan/*/*.ini') 46 ini_file_local = ini_file_phone.split('/')[-1] 47 48 # Pull the file and change the DTIM to desired value 49 ad.adb.pull('{} {}'.format(ini_file_phone, ini_file_local)) 50 51 with open(ini_file_local, 'r') as fin: 52 for line in fin: 53 if ENABLED_MODULATED_DTIM in line: 54 gE_old = line.strip('\n') 55 gEDTIM_old = line.strip(ENABLED_MODULATED_DTIM).strip('\n') 56 if MAX_MODULATED_DTIM in line: 57 gM_old = line.strip('\n') 58 gMDTIM_old = line.strip(MAX_MODULATED_DTIM).strip('\n') 59 fin.close() 60 if int(gEDTIM_old) == gEnableModulatedDTIM and int( 61 gMDTIM_old) == gMaxLIModulatedDTIM: 62 ad.log.info('Current DTIM is already the desired value,' 63 'no need to reset it') 64 return 0 65 66 gE_new = ENABLED_MODULATED_DTIM + str(gEnableModulatedDTIM) 67 gM_new = MAX_MODULATED_DTIM + str(gMaxLIModulatedDTIM) 68 69 sed_gE = 'sed -i \'s/{}/{}/g\' {}'.format(gE_old, gE_new, ini_file_local) 70 sed_gM = 'sed -i \'s/{}/{}/g\' {}'.format(gM_old, gM_new, ini_file_local) 71 job.run(sed_gE) 72 job.run(sed_gM) 73 74 # Push the file to the phone 75 push_file_to_phone(ad, ini_file_local, ini_file_phone) 76 ad.log.info('DTIM changes checked in and rebooting...') 77 ad.reboot() 78 # Wait for auto-wifi feature to start 79 time.sleep(20) 80 ad.adb.shell('dumpsys battery set level 100') 81 ad.log.info('DTIM updated and device back from reboot') 82 return 1 83 84 85def push_file_to_phone(ad, file_local, file_phone): 86 """Function to push local file to android phone. 87 88 Args: 89 ad: the target android device 90 file_local: the locla file to push 91 file_phone: the file/directory on the phone to be pushed 92 """ 93 ad.adb.root() 94 cmd_out = ad.adb.remount() 95 if 'Permission denied' in cmd_out: 96 ad.log.info('Need to disable verity first and reboot') 97 ad.adb.disable_verity() 98 time.sleep(1) 99 ad.reboot() 100 ad.log.info('Verity disabled and device back from reboot') 101 ad.adb.root() 102 ad.adb.remount() 103 time.sleep(1) 104 ad.adb.push('{} {}'.format(file_local, file_phone)) 105 106 107def ap_setup(ap, network, bandwidth=80): 108 """Set up the whirlwind AP with provided network info. 109 110 Args: 111 ap: access_point object of the AP 112 network: dict with information of the network, including ssid, password 113 bssid, channel etc. 114 bandwidth: the operation bandwidth for the AP, default 80MHz 115 Returns: 116 brconfigs: the bridge interface configs 117 """ 118 log = logging.getLogger() 119 bss_settings = [] 120 ssid = network[wutils.WifiEnums.SSID_KEY] 121 if "password" in network.keys(): 122 password = network["password"] 123 security = hostapd_security.Security( 124 security_mode="wpa", password=password) 125 else: 126 security = hostapd_security.Security(security_mode=None, password=None) 127 channel = network["channel"] 128 config = hostapd_ap_preset.create_ap_preset( 129 channel=channel, 130 ssid=ssid, 131 security=security, 132 bss_settings=bss_settings, 133 vht_bandwidth=bandwidth, 134 profile_name='whirlwind', 135 iface_wlan_2g=ap.wlan_2g, 136 iface_wlan_5g=ap.wlan_5g) 137 config_bridge = ap.generate_bridge_configs(channel) 138 brconfigs = bi.BridgeInterfaceConfigs(config_bridge[0], config_bridge[1], 139 config_bridge[2]) 140 ap.bridge.startup(brconfigs) 141 ap.start_ap(config) 142 log.info("AP started on channel {} with SSID {}".format(channel, ssid)) 143 return brconfigs 144 145 146def run_iperf_client_nonblocking(ad, server_host, extra_args=""): 147 """Start iperf client on the device with nohup. 148 149 Return status as true if iperf client start successfully. 150 And data flow information as results. 151 152 Args: 153 ad: the android device under test 154 server_host: Address of the iperf server. 155 extra_args: A string representing extra arguments for iperf client, 156 e.g. "-i 1 -t 30". 157 158 """ 159 log = logging.getLogger() 160 ad.adb.shell_nb("nohup >/dev/null 2>&1 sh -c 'iperf3 -c {} {} &'".format( 161 server_host, extra_args)) 162 log.info("IPerf client started") 163 164 165def get_wifi_rssi(ad): 166 """Get the RSSI of the device. 167 168 Args: 169 ad: the android device under test 170 Returns: 171 RSSI: the rssi level of the device 172 """ 173 RSSI = ad.droid.wifiGetConnectionInfo()['rssi'] 174 return RSSI 175 176 177def get_phone_ip(ad): 178 """Get the WiFi IP address of the phone. 179 180 Args: 181 ad: the android device under test 182 Returns: 183 IP: IP address of the phone for WiFi, as a string 184 """ 185 IP = ad.droid.connectivityGetIPv4Addresses('wlan0')[0] 186 187 return IP 188 189 190def get_phone_mac(ad): 191 """Get the WiFi MAC address of the phone. 192 193 Args: 194 ad: the android device under test 195 Returns: 196 mac: MAC address of the phone for WiFi, as a string 197 """ 198 mac = ad.droid.wifiGetConnectionInfo()["mac_address"] 199 200 return mac 201 202 203def get_phone_ipv6(ad): 204 """Get the WiFi IPV6 address of the phone. 205 206 Args: 207 ad: the android device under test 208 Returns: 209 IPv6: IPv6 address of the phone for WiFi, as a string 210 """ 211 IPv6 = ad.droid.connectivityGetLinkLocalIpv6Address('wlan0')[:-6] 212 213 return IPv6 214 215 216def wait_for_dhcp(interface_name): 217 """Wait the DHCP address assigned to desired interface. 218 219 Getting DHCP address takes time and the wait time isn't constant. Utilizing 220 utils.timeout to keep trying until success 221 222 Args: 223 interface_name: desired interface name 224 Returns: 225 ip: ip address of the desired interface name 226 Raise: 227 TimeoutError: After timeout, if no DHCP assigned, raise 228 """ 229 log = logging.getLogger() 230 reset_host_interface(interface_name) 231 start_time = time.time() 232 time_limit_seconds = 60 233 ip = '0.0.0.0' 234 while start_time + time_limit_seconds > time.time(): 235 ip = scapy.get_if_addr(interface_name) 236 if ip == '0.0.0.0': 237 time.sleep(1) 238 else: 239 log.info( 240 'DHCP address assigned to %s as %s' % (interface_name, ip)) 241 return ip 242 raise TimeoutError('Timed out while getting if_addr after %s seconds.' % 243 time_limit_seconds) 244 245 246def reset_host_interface(intferface_name): 247 """Reset the host interface. 248 249 Args: 250 intferface_name: the desired interface to reset 251 """ 252 log = logging.getLogger() 253 intf_down_cmd = 'ifconfig %s down' % intferface_name 254 intf_up_cmd = 'ifconfig %s up' % intferface_name 255 try: 256 job.run(intf_down_cmd) 257 time.sleep(10) 258 job.run(intf_up_cmd) 259 log.info('{} has been reset'.format(intferface_name)) 260 except job.Error: 261 raise Exception('No such interface') 262 263 264def bringdown_host_interface(intferface_name): 265 """Reset the host interface. 266 267 Args: 268 intferface_name: the desired interface to reset 269 """ 270 log = logging.getLogger() 271 intf_down_cmd = 'ifconfig %s down' % intferface_name 272 try: 273 job.run(intf_down_cmd) 274 time.sleep(2) 275 log.info('{} has been brought down'.format(intferface_name)) 276 except job.Error: 277 raise Exception('No such interface') 278 279 280def create_pkt_config(test_class): 281 """Creates the config for generating multicast packets 282 283 Args: 284 test_class: object with all networking paramters 285 286 Returns: 287 Dictionary with the multicast packet config 288 """ 289 addr_type = (scapy.IPV6_ADDR_LINKLOCAL 290 if test_class.ipv6_src_type == 'LINK_LOCAL' else 291 scapy.IPV6_ADDR_GLOBAL) 292 293 mac_dst = test_class.mac_dst 294 if GET_FROM_PHONE in test_class.mac_dst: 295 mac_dst = get_phone_mac(test_class.dut) 296 297 ipv4_dst = test_class.ipv4_dst 298 if GET_FROM_PHONE in test_class.ipv4_dst: 299 ipv4_dst = get_phone_ip(test_class.dut) 300 301 ipv6_dst = test_class.ipv6_dst 302 if GET_FROM_PHONE in test_class.ipv6_dst: 303 ipv6_dst = get_phone_ipv6(test_class.dut) 304 305 ipv4_gw = test_class.ipv4_gwt 306 if GET_FROM_AP in test_class.ipv4_gwt: 307 ipv4_gw = test_class.access_point.ssh_settings.hostname 308 309 pkt_gen_config = { 310 'interf': test_class.pkt_sender.interface, 311 'subnet_mask': test_class.sub_mask, 312 'src_mac': test_class.mac_src, 313 'dst_mac': mac_dst, 314 'src_ipv4': test_class.ipv4_src, 315 'dst_ipv4': ipv4_dst, 316 'src_ipv6': test_class.ipv6_src, 317 'src_ipv6_type': addr_type, 318 'dst_ipv6': ipv6_dst, 319 'gw_ipv4': ipv4_gw 320 } 321 return pkt_gen_config 322