1#!/usr/bin/env python3 2# 3# Copyright 2018 Google, Inc. 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16import logging 17import os 18 19from acts.controllers import adb 20from acts import asserts 21from acts import signals 22from acts import utils 23from acts.controllers.adb import AdbError 24from acts.logger import epoch_to_log_line_timestamp 25from acts.utils import get_current_epoch_time 26from acts.logger import normalize_log_line_timestamp 27from acts.utils import start_standing_subprocess 28from acts.utils import stop_standing_subprocess 29from acts.test_utils.net import connectivity_const as cconst 30from acts.test_utils.tel.tel_test_utils import get_operator_name 31from acts.test_utils.tel.tel_data_utils import wait_for_cell_data_connection 32from acts.test_utils.tel.tel_test_utils import verify_http_connection 33from acts.test_utils.wifi import wifi_test_utils as wutils 34from scapy.all import get_if_list 35 36import os 37import re 38import time 39import urllib.request 40 41VPN_CONST = cconst.VpnProfile 42VPN_TYPE = cconst.VpnProfileType 43VPN_PARAMS = cconst.VpnReqParams 44TCPDUMP_PATH = "/data/local/tmp/" 45USB_CHARGE_MODE = "svc usb setFunctions" 46USB_TETHERING_MODE = "svc usb setFunctions rndis" 47DEVICE_IP_ADDRESS = "ip address" 48 49GCE_SSH = "gcloud compute ssh " 50GCE_SCP = "gcloud compute scp " 51 52 53def verify_lte_data_and_tethering_supported(ad): 54 """Verify if LTE data is enabled and tethering supported""" 55 wutils.wifi_toggle_state(ad, False) 56 ad.droid.telephonyToggleDataConnection(True) 57 wait_for_cell_data_connection(ad.log, ad, True) 58 asserts.assert_true( 59 verify_http_connection(ad.log, ad), 60 "HTTP verification failed on cell data connection") 61 asserts.assert_true( 62 ad.droid.connectivityIsTetheringSupported(), 63 "Tethering is not supported for the provider") 64 wutils.wifi_toggle_state(ad, True) 65 66 67def set_chrome_browser_permissions(ad): 68 """Set chrome browser start with no-first-run verification. 69 Give permission to read from and write to storage 70 """ 71 commands = ["pm grant com.android.chrome " 72 "android.permission.READ_EXTERNAL_STORAGE", 73 "pm grant com.android.chrome " 74 "android.permission.WRITE_EXTERNAL_STORAGE", 75 "rm /data/local/chrome-command-line", 76 "am set-debug-app --persistent com.android.chrome", 77 'echo "chrome --no-default-browser-check --no-first-run ' 78 '--disable-fre" > /data/local/tmp/chrome-command-line'] 79 for cmd in commands: 80 try: 81 ad.adb.shell(cmd) 82 except AdbError: 83 logging.warning("adb command %s failed on %s" % (cmd, ad.serial)) 84 85 86def verify_ping_to_vpn_ip(ad, vpn_ping_addr): 87 """ Verify if IP behind VPN server is pingable. 88 Ping should pass, if VPN is connected. 89 Ping should fail, if VPN is disconnected. 90 91 Args: 92 ad: android device object 93 """ 94 ping_result = None 95 pkt_loss = "100% packet loss" 96 try: 97 ping_result = ad.adb.shell("ping -c 3 -W 2 %s" % vpn_ping_addr) 98 except AdbError: 99 pass 100 return ping_result and pkt_loss not in ping_result 101 102 103def legacy_vpn_connection_test_logic(ad, vpn_profile, vpn_ping_addr): 104 """ Test logic for each legacy VPN connection 105 106 Steps: 107 1. Generate profile for the VPN type 108 2. Establish connection to the server 109 3. Verify that connection is established using LegacyVpnInfo 110 4. Verify the connection by pinging the IP behind VPN 111 5. Stop the VPN connection 112 6. Check the connection status 113 7. Verify that ping to IP behind VPN fails 114 115 Args: 116 1. ad: Android device object 117 2. VpnProfileType (1 of the 6 types supported by Android) 118 """ 119 # Wait for sometime so that VPN server flushes all interfaces and 120 # connections after graceful termination 121 time.sleep(10) 122 123 ad.adb.shell("ip xfrm state flush") 124 ad.log.info("Connecting to: %s", vpn_profile) 125 ad.droid.vpnStartLegacyVpn(vpn_profile) 126 time.sleep(cconst.VPN_TIMEOUT) 127 128 connected_vpn_info = ad.droid.vpnGetLegacyVpnInfo() 129 asserts.assert_equal(connected_vpn_info["state"], 130 cconst.VPN_STATE_CONNECTED, 131 "Unable to establish VPN connection for %s" 132 % vpn_profile) 133 134 ping_result = verify_ping_to_vpn_ip(ad, vpn_ping_addr) 135 ip_xfrm_state = ad.adb.shell("ip xfrm state") 136 match_obj = re.search(r'hmac(.*)', "%s" % ip_xfrm_state) 137 if match_obj: 138 ip_xfrm_state = format(match_obj.group(0)).split() 139 ad.log.info("HMAC for ESP is %s " % ip_xfrm_state[0]) 140 141 ad.droid.vpnStopLegacyVpn() 142 asserts.assert_true(ping_result, 143 "Ping to the internal IP failed. " 144 "Expected to pass as VPN is connected") 145 146 connected_vpn_info = ad.droid.vpnGetLegacyVpnInfo() 147 asserts.assert_true(not connected_vpn_info, 148 "Unable to terminate VPN connection for %s" 149 % vpn_profile) 150 151 152def download_load_certs(ad, vpn_params, vpn_type, vpn_server_addr, 153 ipsec_server_type, log_path): 154 """ Download the certificates from VPN server and push to sdcard of DUT 155 156 Args: 157 ad: android device object 158 vpn_params: vpn params from config file 159 vpn_type: 1 of the 6 VPN types 160 vpn_server_addr: server addr to connect to 161 ipsec_server_type: ipsec version - strongswan or openswan 162 log_path: log path to download cert 163 164 Returns: 165 Client cert file name on DUT's sdcard 166 """ 167 url = "http://%s%s%s" % (vpn_server_addr, 168 vpn_params['cert_path_vpnserver'], 169 vpn_params['client_pkcs_file_name']) 170 local_cert_name = "%s_%s_%s" % (vpn_type.name, 171 ipsec_server_type, 172 vpn_params['client_pkcs_file_name']) 173 174 local_file_path = os.path.join(log_path, local_cert_name) 175 try: 176 ret = urllib.request.urlopen(url) 177 with open(local_file_path, "wb") as f: 178 f.write(ret.read()) 179 except Exception: 180 asserts.fail("Unable to download certificate from the server") 181 182 ad.adb.push("%s sdcard/" % local_file_path) 183 return local_cert_name 184 185 186def generate_legacy_vpn_profile(ad, 187 vpn_params, 188 vpn_type, 189 vpn_server_addr, 190 ipsec_server_type, 191 log_path): 192 """ Generate legacy VPN profile for a VPN 193 194 Args: 195 ad: android device object 196 vpn_params: vpn params from config file 197 vpn_type: 1 of the 6 VPN types 198 vpn_server_addr: server addr to connect to 199 ipsec_server_type: ipsec version - strongswan or openswan 200 log_path: log path to download cert 201 202 Returns: 203 Vpn profile 204 """ 205 vpn_profile = {VPN_CONST.USER: vpn_params['vpn_username'], 206 VPN_CONST.PWD: vpn_params['vpn_password'], 207 VPN_CONST.TYPE: vpn_type.value, 208 VPN_CONST.SERVER: vpn_server_addr, } 209 vpn_profile[VPN_CONST.NAME] = "test_%s_%s" % (vpn_type.name, 210 ipsec_server_type) 211 if vpn_type.name == "PPTP": 212 vpn_profile[VPN_CONST.NAME] = "test_%s" % vpn_type.name 213 214 psk_set = set(["L2TP_IPSEC_PSK", "IPSEC_XAUTH_PSK"]) 215 rsa_set = set(["L2TP_IPSEC_RSA", "IPSEC_XAUTH_RSA", "IPSEC_HYBRID_RSA"]) 216 217 if vpn_type.name in psk_set: 218 vpn_profile[VPN_CONST.IPSEC_SECRET] = vpn_params['psk_secret'] 219 elif vpn_type.name in rsa_set: 220 cert_name = download_load_certs(ad, 221 vpn_params, 222 vpn_type, 223 vpn_server_addr, 224 ipsec_server_type, 225 log_path) 226 vpn_profile[VPN_CONST.IPSEC_USER_CERT] = cert_name.split('.')[0] 227 vpn_profile[VPN_CONST.IPSEC_CA_CERT] = cert_name.split('.')[0] 228 ad.droid.installCertificate(vpn_profile, cert_name, 229 vpn_params['cert_password']) 230 else: 231 vpn_profile[VPN_CONST.MPPE] = "mppe" 232 233 return vpn_profile 234 235 236def start_tcpdump(ad, test_name): 237 """Start tcpdump on all interfaces 238 239 Args: 240 ad: android device object. 241 test_name: tcpdump file name will have this 242 """ 243 ad.log.info("Starting tcpdump on all interfaces") 244 try: 245 ad.adb.shell("killall -9 tcpdump") 246 except AdbError: 247 ad.log.warn("Killing existing tcpdump processes failed") 248 out = ad.adb.shell("ls -l %s" % TCPDUMP_PATH) 249 if "No such file" in out or not out: 250 ad.adb.shell("mkdir %s" % TCPDUMP_PATH) 251 else: 252 ad.adb.shell("rm -rf %s/*" % TCPDUMP_PATH, ignore_status=True) 253 254 begin_time = epoch_to_log_line_timestamp(get_current_epoch_time()) 255 begin_time = normalize_log_line_timestamp(begin_time) 256 257 file_name = "%s/tcpdump_%s_%s.pcap" % (TCPDUMP_PATH, ad.serial, test_name) 258 ad.log.info("tcpdump file is %s", file_name) 259 cmd = "adb -s {} shell tcpdump -i any -s0 -w {}".format(ad.serial, 260 file_name) 261 try: 262 return start_standing_subprocess(cmd, 5) 263 except Exception: 264 ad.log.exception('Could not start standing process %s' % repr(cmd)) 265 266 return None 267 268def stop_tcpdump(ad, 269 proc, 270 test_name, 271 adb_pull_timeout=adb.DEFAULT_ADB_PULL_TIMEOUT): 272 """Stops tcpdump on any iface 273 Pulls the tcpdump file in the tcpdump dir 274 275 Args: 276 ad: android device object. 277 proc: need to know which pid to stop 278 test_name: test name to save the tcpdump file 279 adb_pull_timeout: timeout for adb_pull 280 281 Returns: 282 log_path of the tcpdump file 283 """ 284 ad.log.info("Stopping and pulling tcpdump if any") 285 if proc is None: 286 return None 287 try: 288 stop_standing_subprocess(proc) 289 except Exception as e: 290 ad.log.warning(e) 291 log_path = os.path.join(ad.log_path, test_name) 292 os.makedirs(log_path, exist_ok=True) 293 ad.adb.pull("%s/. %s" % (TCPDUMP_PATH, log_path), timeout=adb_pull_timeout) 294 ad.adb.shell("rm -rf %s/*" % TCPDUMP_PATH, ignore_status=True) 295 file_name = "tcpdump_%s_%s.pcap" % (ad.serial, test_name) 296 return "%s/%s" % (log_path, file_name) 297 298def start_tcpdump_gce_server(ad, test_name, dest_port, gce): 299 """ Start tcpdump on gce server 300 301 Args: 302 ad: android device object 303 test_name: test case name 304 dest_port: port to collect tcpdump 305 gce: dictionary of gce instance 306 307 Returns: 308 process id and pcap file path from gce server 309 """ 310 ad.log.info("Starting tcpdump on gce server") 311 312 # pcap file name 313 fname = "/tmp/%s_%s_%s_%s" % \ 314 (test_name, ad.model, ad.serial, 315 time.strftime('%Y-%m-%d_%H-%M-%S', time.localtime(time.time()))) 316 317 # start tcpdump 318 tcpdump_cmd = "sudo bash -c \'tcpdump -i %s -w %s.pcap port %s > \ 319 %s.txt 2>&1 & echo $!\'" % (gce["interface"], fname, dest_port, fname) 320 gcloud_ssh_cmd = "%s --project=%s --zone=%s %s@%s --command " % \ 321 (GCE_SSH, gce["project"], gce["zone"], gce["username"], gce["hostname"]) 322 gce_ssh_cmd = '%s "%s"' % (gcloud_ssh_cmd, tcpdump_cmd) 323 utils.exe_cmd(gce_ssh_cmd) 324 325 # get process id 326 ps_cmd = '%s "ps aux | grep tcpdump | grep %s"' % (gcloud_ssh_cmd, fname) 327 tcpdump_pid = utils.exe_cmd(ps_cmd).decode("utf-8", "ignore").split() 328 if not tcpdump_pid: 329 raise signals.TestFailure("Failed to start tcpdump on gce server") 330 return tcpdump_pid[1], fname 331 332def stop_tcpdump_gce_server(ad, tcpdump_pid, fname, gce): 333 """ Stop and pull tcpdump file from gce server 334 335 Args: 336 ad: android device object 337 tcpdump_pid: process id for tcpdump file 338 fname: tcpdump file path 339 gce: dictionary of gce instance 340 341 Returns: 342 pcap file from gce server 343 """ 344 ad.log.info("Stop and pull pcap file from gce server") 345 346 # stop tcpdump 347 tcpdump_cmd = "sudo kill %s" % tcpdump_pid 348 gcloud_ssh_cmd = "%s --project=%s --zone=%s %s@%s --command " % \ 349 (GCE_SSH, gce["project"], gce["zone"], gce["username"], gce["hostname"]) 350 gce_ssh_cmd = '%s "%s"' % (gcloud_ssh_cmd, tcpdump_cmd) 351 utils.exe_cmd(gce_ssh_cmd) 352 353 # verify tcpdump is stopped 354 ps_cmd = '%s "ps aux | grep tcpdump"' % gcloud_ssh_cmd 355 res = utils.exe_cmd(ps_cmd).decode("utf-8", "ignore") 356 if tcpdump_pid in res.split(): 357 raise signals.TestFailure("Failed to stop tcpdump on gce server") 358 if not fname: 359 return None 360 361 # pull pcap file 362 gcloud_scp_cmd = "%s --project=%s --zone=%s %s@%s:" % \ 363 (GCE_SCP, gce["project"], gce["zone"], gce["username"], gce["hostname"]) 364 pull_file = '%s%s.pcap %s/' % (gcloud_scp_cmd, fname, ad.device_log_path) 365 utils.exe_cmd(pull_file) 366 if not os.path.exists( 367 "%s/%s.pcap" % (ad.device_log_path, fname.split('/')[-1])): 368 raise signals.TestFailure("Failed to pull tcpdump from gce server") 369 370 # delete pcaps 371 utils.exe_cmd('%s "sudo rm %s.*"' % (gcloud_ssh_cmd, fname)) 372 373 # return pcap file 374 pcap_file = "%s/%s.pcap" % (ad.device_log_path, fname.split('/')[-1]) 375 return pcap_file 376 377def is_ipaddress_ipv6(ip_address): 378 """Verify if the given string is a valid IPv6 address. 379 380 Args: 381 ip_address: string containing the IP address 382 383 Returns: 384 True: if valid ipv6 address 385 False: if not 386 """ 387 try: 388 socket.inet_pton(socket.AF_INET6, ip_address) 389 return True 390 except socket.error: 391 return False 392 393def carrier_supports_ipv6(dut): 394 """Verify if carrier supports ipv6. 395 396 Args: 397 dut: Android device that is being checked 398 399 Returns: 400 True: if carrier supports ipv6 401 False: if not 402 """ 403 404 carrier_supports_ipv6 = ["vzw", "tmo", "Far EasTone", "Chunghwa Telecom"] 405 operator = get_operator_name("log", dut) 406 return operator in carrier_supports_ipv6 407 408def supports_ipv6_tethering(self, dut): 409 """ Check if provider supports IPv6 tethering. 410 411 Returns: 412 True: if provider supports IPv6 tethering 413 False: if not 414 """ 415 carrier_supports_tethering = ["vzw", "tmo", "Far EasTone", "Chunghwa Telecom"] 416 operator = get_operator_name(self.log, dut) 417 return operator in carrier_supports_tethering 418 419 420def start_usb_tethering(ad): 421 """Start USB tethering. 422 423 Args: 424 ad: android device object 425 """ 426 # TODO: test USB tethering by #startTethering API - b/149116235 427 ad.log.info("Starting USB Tethering") 428 ad.stop_services() 429 ad.adb.shell(USB_TETHERING_MODE, ignore_status=True) 430 ad.adb.wait_for_device() 431 ad.start_services() 432 if "rndis" not in ad.adb.shell(DEVICE_IP_ADDRESS): 433 raise signals.TestFailure("Unable to enable USB tethering.") 434 435 436def stop_usb_tethering(ad): 437 """Stop USB tethering. 438 439 Args: 440 ad: android device object 441 """ 442 ad.log.info("Stopping USB Tethering") 443 ad.stop_services() 444 ad.adb.shell(USB_CHARGE_MODE) 445 ad.adb.wait_for_device() 446 ad.start_services() 447 448 449def wait_for_new_iface(old_ifaces): 450 """Wait for the new interface to come up. 451 452 Args: 453 old_ifaces: list of old interfaces 454 """ 455 old_set = set(old_ifaces) 456 # Try 10 times to find a new interface with a 1s sleep every time 457 # (equivalent to a 9s timeout) 458 for i in range(0, 10): 459 new_ifaces = set(get_if_list()) - old_set 460 asserts.assert_true(len(new_ifaces) < 2, 461 "Too many new interfaces after turning on " 462 "tethering") 463 if len(new_ifaces) == 1: 464 return new_ifaces.pop() 465 time.sleep(1) 466 asserts.fail("Timeout waiting for tethering interface on host") 467