1#!/usr/bin/env python3
2#
3#   Copyright 2018 Google, Inc.
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16import logging
17import os
18import re
19import time
20import urllib.request
21
22from acts import asserts
23from acts import signals
24from acts import utils
25from acts.controllers import adb
26from acts.controllers.adb_lib.error import AdbError
27from acts.utils import start_standing_subprocess
28from acts.utils import stop_standing_subprocess
29from acts_contrib.test_utils.net import connectivity_const as cconst
30from acts_contrib.test_utils.tel.tel_data_utils import wait_for_cell_data_connection
31from acts_contrib.test_utils.tel.tel_test_utils import get_operator_name
32from acts_contrib.test_utils.tel.tel_test_utils import verify_http_connection
33from acts_contrib.test_utils.wifi import wifi_test_utils as wutils
34from scapy.all import get_if_list
35
36
37VPN_CONST = cconst.VpnProfile
38VPN_TYPE = cconst.VpnProfileType
39VPN_PARAMS = cconst.VpnReqParams
40TCPDUMP_PATH = "/data/local/tmp/"
41USB_CHARGE_MODE = "svc usb setFunctions"
42USB_TETHERING_MODE = "svc usb setFunctions rndis"
43DEVICE_IP_ADDRESS = "ip address"
44LOCALHOST = "192.168.1.1"
45
46GCE_SSH = "gcloud compute ssh "
47GCE_SCP = "gcloud compute scp "
48
49
50def verify_lte_data_and_tethering_supported(ad):
51    """Verify if LTE data is enabled and tethering supported."""
52    wutils.wifi_toggle_state(ad, False)
53    ad.droid.telephonyToggleDataConnection(True)
54    wait_for_cell_data_connection(ad.log, ad, True)
55    asserts.assert_true(
56        verify_http_connection(ad.log, ad),
57        "HTTP verification failed on cell data connection")
58    asserts.assert_true(
59        ad.droid.connectivityIsTetheringSupported(),
60        "Tethering is not supported for the provider")
61    wutils.wifi_toggle_state(ad, True)
62
63
64def set_chrome_browser_permissions(ad):
65    """Set chrome browser start with no-first-run verification.
66
67    Give permission to read from and write to storage
68
69    Args:
70        ad: android device object
71    """
72    commands = ["pm grant com.android.chrome "
73                "android.permission.READ_EXTERNAL_STORAGE",
74                "pm grant com.android.chrome "
75                "android.permission.WRITE_EXTERNAL_STORAGE",
76                "rm /data/local/chrome-command-line",
77                "am set-debug-app --persistent com.android.chrome",
78                'echo "chrome --no-default-browser-check --no-first-run '
79                '--disable-fre" > /data/local/tmp/chrome-command-line']
80    for cmd in commands:
81        try:
82            ad.adb.shell(cmd)
83        except AdbError:
84            logging.warning("adb command %s failed on %s" % (cmd, ad.serial))
85
86
87def verify_ping_to_vpn_ip(ad, vpn_ping_addr):
88    """Verify if IP behind VPN server is pingable.
89
90    Ping should pass, if VPN is connected.
91    Ping should fail, if VPN is disconnected.
92
93    Args:
94        ad: android device object
95        vpn_ping_addr: target ping addr
96    """
97    ping_result = None
98    pkt_loss = "100% packet loss"
99    logging.info("Pinging: %s" % vpn_ping_addr)
100    try:
101        ping_result = ad.adb.shell("ping -c 3 -W 2 %s" % vpn_ping_addr)
102    except AdbError:
103        pass
104    return ping_result and pkt_loss not in ping_result
105
106
107def legacy_vpn_connection_test_logic(ad, vpn_profile, vpn_ping_addr):
108    """Test logic for each legacy VPN connection.
109
110    Steps:
111      1. Generate profile for the VPN type
112      2. Establish connection to the server
113      3. Verify that connection is established using LegacyVpnInfo
114      4. Verify the connection by pinging the IP behind VPN
115      5. Stop the VPN connection
116      6. Check the connection status
117      7. Verify that ping to IP behind VPN fails
118
119    Args:
120        ad: Android device object
121        vpn_profile: object contains attribute for create vpn profile
122        vpn_ping_addr: addr to verify vpn connection
123    """
124    # Wait for sometime so that VPN server flushes all interfaces and
125    # connections after graceful termination
126    time.sleep(10)
127
128    ad.adb.shell("ip xfrm state flush")
129    ad.log.info("Connecting to: %s", vpn_profile)
130    ad.droid.vpnStartLegacyVpn(vpn_profile)
131    time.sleep(cconst.VPN_TIMEOUT)
132
133    connected_vpn_info = ad.droid.vpnGetLegacyVpnInfo()
134    asserts.assert_equal(connected_vpn_info["state"],
135                         cconst.VPN_STATE_CONNECTED,
136                         "Unable to establish VPN connection for %s"
137                         % vpn_profile)
138
139    ping_result = verify_ping_to_vpn_ip(ad, vpn_ping_addr)
140    ip_xfrm_state = ad.adb.shell("ip xfrm state")
141    match_obj = re.search(r'hmac(.*)', "%s" % ip_xfrm_state)
142    if match_obj:
143        ip_xfrm_state = format(match_obj.group(0)).split()
144        ad.log.info("HMAC for ESP is %s " % ip_xfrm_state[0])
145
146    ad.droid.vpnStopLegacyVpn()
147    asserts.assert_true(ping_result,
148                        "Ping to the internal IP failed. "
149                        "Expected to pass as VPN is connected")
150
151    connected_vpn_info = ad.droid.vpnGetLegacyVpnInfo()
152    asserts.assert_true(not connected_vpn_info,
153                        "Unable to terminate VPN connection for %s"
154                        % vpn_profile)
155
156
157def download_load_certs(ad, vpn_params, vpn_type, vpn_server_addr,
158                        ipsec_server_type, log_path):
159    """Download the certificates from VPN server and push to sdcard of DUT.
160
161    Args:
162      ad: android device object
163      vpn_params: vpn params from config file
164      vpn_type: 1 of the 6 VPN types
165      vpn_server_addr: server addr to connect to
166      ipsec_server_type: ipsec version - strongswan or openswan
167      log_path: log path to download cert
168
169    Returns:
170      Client cert file name on DUT's sdcard
171    """
172    url = "http://%s%s%s" % (vpn_server_addr,
173                             vpn_params['cert_path_vpnserver'],
174                             vpn_params['client_pkcs_file_name'])
175    logging.info("URL is: %s" % url)
176    if vpn_server_addr == LOCALHOST:
177        ad.droid.httpDownloadFile(url, "/sdcard/")
178        return vpn_params['client_pkcs_file_name']
179
180    local_cert_name = "%s_%s_%s" % (vpn_type.name,
181                                    ipsec_server_type,
182                                    vpn_params['client_pkcs_file_name'])
183    local_file_path = os.path.join(log_path, local_cert_name)
184    try:
185        ret = urllib.request.urlopen(url)
186        with open(local_file_path, "wb") as f:
187            f.write(ret.read())
188    except Exception:
189        asserts.fail("Unable to download certificate from the server")
190
191    ad.adb.push("%s sdcard/" % local_file_path)
192    return local_cert_name
193
194
195def generate_legacy_vpn_profile(ad,
196                                vpn_params,
197                                vpn_type,
198                                vpn_server_addr,
199                                ipsec_server_type,
200                                log_path):
201    """Generate legacy VPN profile for a VPN.
202
203    Args:
204      ad: android device object
205      vpn_params: vpn params from config file
206      vpn_type: 1 of the 6 VPN types
207      vpn_server_addr: server addr to connect to
208      ipsec_server_type: ipsec version - strongswan or openswan
209      log_path: log path to download cert
210
211    Returns:
212      Vpn profile
213    """
214    vpn_profile = {VPN_CONST.USER: vpn_params['vpn_username'],
215                   VPN_CONST.PWD: vpn_params['vpn_password'],
216                   VPN_CONST.TYPE: vpn_type.value,
217                   VPN_CONST.SERVER: vpn_server_addr, }
218    vpn_profile[VPN_CONST.NAME] = "test_%s_%s" % (vpn_type.name,
219                                                  ipsec_server_type)
220    if vpn_type.name == "PPTP":
221        vpn_profile[VPN_CONST.NAME] = "test_%s" % vpn_type.name
222
223    psk_set = set(["L2TP_IPSEC_PSK", "IPSEC_XAUTH_PSK"])
224    rsa_set = set(["L2TP_IPSEC_RSA", "IPSEC_XAUTH_RSA", "IPSEC_HYBRID_RSA"])
225
226    if vpn_type.name in psk_set:
227        vpn_profile[VPN_CONST.IPSEC_SECRET] = vpn_params['psk_secret']
228    elif vpn_type.name in rsa_set:
229        cert_name = download_load_certs(ad,
230                                        vpn_params,
231                                        vpn_type,
232                                        vpn_server_addr,
233                                        ipsec_server_type,
234                                        log_path)
235        vpn_profile[VPN_CONST.IPSEC_USER_CERT] = cert_name.split('.')[0]
236        ad.droid.installCertificate(vpn_profile, cert_name,
237                                    vpn_params['cert_password'])
238    else:
239        vpn_profile[VPN_CONST.MPPE] = "mppe"
240
241    return vpn_profile
242
243
244def generate_ikev2_vpn_profile(ad, vpn_params, vpn_type, server_addr, log_path):
245    """Generate VPN profile for IKEv2 VPN.
246
247    Args:
248        ad: android device object.
249        vpn_params: vpn params from config file.
250        vpn_type: ikev2 vpn type.
251        server_addr: vpn server addr.
252        log_path: log path to download cert.
253
254    Returns:
255        Vpn profile.
256    """
257    vpn_profile = {
258        VPN_CONST.TYPE: vpn_type.value,
259        VPN_CONST.SERVER: server_addr,
260    }
261
262    if vpn_type.name == "IKEV2_IPSEC_USER_PASS":
263        vpn_profile[VPN_CONST.USER] = vpn_params["vpn_username"]
264        vpn_profile[VPN_CONST.PWD] = vpn_params["vpn_password"]
265        vpn_profile[VPN_CONST.IPSEC_ID] = vpn_params["vpn_identity"]
266        cert_name = download_load_certs(
267            ad, vpn_params, vpn_type, vpn_params["server_addr"],
268            "IKEV2_IPSEC_USER_PASS", log_path)
269        vpn_profile[VPN_CONST.IPSEC_CA_CERT] = cert_name.split('.')[0]
270        ad.droid.installCertificate(
271            vpn_profile, cert_name, vpn_params['cert_password'])
272    elif vpn_type.name == "IKEV2_IPSEC_PSK":
273        vpn_profile[VPN_CONST.IPSEC_ID] = vpn_params["vpn_identity"]
274        vpn_profile[VPN_CONST.IPSEC_SECRET] = vpn_params["psk_secret"]
275    else:
276        vpn_profile[VPN_CONST.IPSEC_ID] = "%s@%s" % (
277            vpn_params["vpn_identity"], server_addr)
278        logging.info("ID: %s@%s" % (vpn_params["vpn_identity"], server_addr))
279        cert_name = download_load_certs(
280            ad, vpn_params, vpn_type, vpn_params["server_addr"],
281            "IKEV2_IPSEC_RSA", log_path)
282        vpn_profile[VPN_CONST.IPSEC_USER_CERT] = cert_name.split('.')[0]
283        vpn_profile[VPN_CONST.IPSEC_CA_CERT] = cert_name.split('.')[0]
284        ad.droid.installCertificate(
285            vpn_profile, cert_name, vpn_params['cert_password'])
286
287    return vpn_profile
288
289
290def start_tcpdump(ad, test_name):
291    """Start tcpdump on all interfaces.
292
293    Args:
294        ad: android device object.
295        test_name: tcpdump file name will have this
296    """
297    ad.log.info("Starting tcpdump on all interfaces")
298    ad.adb.shell("killall -9 tcpdump", ignore_status=True)
299    ad.adb.shell("mkdir %s" % TCPDUMP_PATH, ignore_status=True)
300    ad.adb.shell("rm -rf %s/*" % TCPDUMP_PATH, ignore_status=True)
301
302    file_name = "%s/tcpdump_%s_%s.pcap" % (TCPDUMP_PATH, ad.serial, test_name)
303    ad.log.info("tcpdump file is %s", file_name)
304    cmd = "adb -s {} shell tcpdump -i any -s0 -w {}".format(ad.serial,
305                                                            file_name)
306    try:
307        return start_standing_subprocess(cmd, 5)
308    except Exception:
309        ad.log.exception('Could not start standing process %s' % repr(cmd))
310
311    return None
312
313
314def stop_tcpdump(ad,
315                 proc,
316                 test_name,
317                 pull_dump=True,
318                 adb_pull_timeout=adb.DEFAULT_ADB_PULL_TIMEOUT):
319    """Stops tcpdump on any iface.
320
321       Pulls the tcpdump file in the tcpdump dir if necessary.
322
323    Args:
324        ad: android device object.
325        proc: need to know which pid to stop
326        test_name: test name to save the tcpdump file
327        pull_dump: pull tcpdump file or not
328        adb_pull_timeout: timeout for adb_pull
329
330    Returns:
331      log_path of the tcpdump file
332    """
333    ad.log.info("Stopping and pulling tcpdump if any")
334    if proc is None:
335        return None
336    try:
337        stop_standing_subprocess(proc)
338    except Exception as e:
339        ad.log.warning(e)
340    if pull_dump:
341        log_path = os.path.join(ad.device_log_path, "TCPDUMP_%s" % ad.serial)
342        os.makedirs(log_path, exist_ok=True)
343        ad.adb.pull("%s/. %s" % (TCPDUMP_PATH, log_path),
344                    timeout=adb_pull_timeout)
345        ad.adb.shell("rm -rf %s/*" % TCPDUMP_PATH, ignore_status=True)
346        file_name = "tcpdump_%s_%s.pcap" % (ad.serial, test_name)
347        return "%s/%s" % (log_path, file_name)
348    return None
349
350
351def start_tcpdump_gce_server(ad, test_name, dest_port, gce):
352    """Start tcpdump on gce server.
353
354    Args:
355        ad: android device object
356        test_name: test case name
357        dest_port: port to collect tcpdump
358        gce: dictionary of gce instance
359
360    Returns:
361       process id and pcap file path from gce server
362    """
363    ad.log.info("Starting tcpdump on gce server")
364
365    # pcap file name
366    fname = "/tmp/%s_%s_%s_%s" % \
367        (test_name, ad.model, ad.serial,
368         time.strftime('%Y-%m-%d_%H-%M-%S', time.localtime(time.time())))
369
370    # start tcpdump
371    tcpdump_cmd = "sudo bash -c \'tcpdump -i %s -w %s.pcap port %s > \
372        %s.txt 2>&1 & echo $!\'" % (gce["interface"], fname, dest_port, fname)
373    gcloud_ssh_cmd = "%s --project=%s --zone=%s %s@%s --command " % \
374        (GCE_SSH, gce["project"], gce["zone"], gce["username"], gce["hostname"])
375    gce_ssh_cmd = '%s "%s"' % (gcloud_ssh_cmd, tcpdump_cmd)
376    utils.exe_cmd(gce_ssh_cmd)
377
378    # get process id
379    ps_cmd = '%s "ps aux | grep tcpdump | grep %s"' % (gcloud_ssh_cmd, fname)
380    tcpdump_pid = utils.exe_cmd(ps_cmd).decode("utf-8", "ignore").split()
381    if not tcpdump_pid:
382        raise signals.TestFailure("Failed to start tcpdump on gce server")
383    return tcpdump_pid[1], fname
384
385
386def stop_tcpdump_gce_server(ad, tcpdump_pid, fname, gce):
387    """Stop and pull tcpdump file from gce server.
388
389    Args:
390        ad: android device object
391        tcpdump_pid: process id for tcpdump file
392        fname: tcpdump file path
393        gce: dictionary of gce instance
394
395    Returns:
396       pcap file from gce server
397    """
398    ad.log.info("Stop and pull pcap file from gce server")
399
400    # stop tcpdump
401    tcpdump_cmd = "sudo kill %s" % tcpdump_pid
402    gcloud_ssh_cmd = "%s --project=%s --zone=%s %s@%s --command " % \
403        (GCE_SSH, gce["project"], gce["zone"], gce["username"], gce["hostname"])
404    gce_ssh_cmd = '%s "%s"' % (gcloud_ssh_cmd, tcpdump_cmd)
405    utils.exe_cmd(gce_ssh_cmd)
406
407    # verify tcpdump is stopped
408    ps_cmd = '%s "ps aux | grep tcpdump"' % gcloud_ssh_cmd
409    res = utils.exe_cmd(ps_cmd).decode("utf-8", "ignore")
410    if tcpdump_pid in res.split():
411        raise signals.TestFailure("Failed to stop tcpdump on gce server")
412    if not fname:
413        return None
414
415    # pull pcap file
416    gcloud_scp_cmd = "%s --project=%s --zone=%s %s@%s:" % \
417        (GCE_SCP, gce["project"], gce["zone"], gce["username"], gce["hostname"])
418    pull_file = '%s%s.pcap %s/' % (gcloud_scp_cmd, fname, ad.device_log_path)
419    utils.exe_cmd(pull_file)
420    if not os.path.exists(
421        "%s/%s.pcap" % (ad.device_log_path, fname.split('/')[-1])):
422        raise signals.TestFailure("Failed to pull tcpdump from gce server")
423
424    # delete pcaps
425    utils.exe_cmd('%s "sudo rm %s.*"' % (gcloud_ssh_cmd, fname))
426
427    # return pcap file
428    pcap_file = "%s/%s.pcap" % (ad.device_log_path, fname.split('/')[-1])
429    return pcap_file
430
431
432def is_ipaddress_ipv6(ip_address):
433    """Verify if the given string is a valid IPv6 address.
434
435    Args:
436        ip_address: string containing the IP address
437
438    Returns:
439        True: if valid ipv6 address
440        False: if not
441    """
442    try:
443        socket.inet_pton(socket.AF_INET6, ip_address)
444        return True
445    except socket.error:
446        return False
447
448
449def carrier_supports_ipv6(dut):
450    """Verify if carrier supports ipv6.
451
452    Args:
453        dut: Android device that is being checked
454
455    Returns:
456        True: if carrier supports ipv6
457        False: if not
458    """
459
460    carrier_supports_ipv6 = ["vzw", "tmo", "Far EasTone", "Chunghwa Telecom"]
461    operator = get_operator_name("log", dut)
462    return operator in carrier_supports_ipv6
463
464
465def supports_ipv6_tethering(self, dut):
466    """Check if provider supports IPv6 tethering.
467
468    Args:
469        dut: Android device that is being checked
470
471    Returns:
472        True: if provider supports IPv6 tethering
473        False: if not
474    """
475    carrier_supports_tethering = ["vzw", "tmo", "Far EasTone", "Chunghwa Telecom"]
476    operator = get_operator_name(self.log, dut)
477    return operator in carrier_supports_tethering
478
479
480def start_usb_tethering(ad):
481    """Start USB tethering.
482
483    Args:
484        ad: android device object
485    """
486    # TODO: test USB tethering by #startTethering API - b/149116235
487    ad.log.info("Starting USB Tethering")
488    ad.stop_services()
489    ad.adb.shell(USB_TETHERING_MODE, ignore_status=True)
490    ad.adb.wait_for_device()
491    ad.start_services()
492    if "rndis" not in ad.adb.shell(DEVICE_IP_ADDRESS):
493        raise signals.TestFailure("Unable to enable USB tethering.")
494
495
496def stop_usb_tethering(ad):
497    """Stop USB tethering.
498
499    Args:
500        ad: android device object
501    """
502    ad.log.info("Stopping USB Tethering")
503    ad.stop_services()
504    ad.adb.shell(USB_CHARGE_MODE)
505    ad.adb.wait_for_device()
506    ad.start_services()
507
508
509def wait_for_new_iface(old_ifaces):
510    """Wait for the new interface to come up.
511
512    Args:
513        old_ifaces: list of old interfaces
514    """
515    old_set = set(old_ifaces)
516    # Try 10 times to find a new interface with a 1s sleep every time
517    # (equivalent to a 9s timeout)
518    for _ in range(0, 10):
519        new_ifaces = set(get_if_list()) - old_set
520        asserts.assert_true(len(new_ifaces) < 2,
521                            "Too many new interfaces after turning on "
522                            "tethering")
523        if len(new_ifaces) == 1:
524            return new_ifaces.pop()
525        time.sleep(1)
526    asserts.fail("Timeout waiting for tethering interface on host")
527