1#!/usr/bin/env python3
2#
3#   Copyright 2018 Google, Inc.
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16import logging
17import os
18
19from acts.controllers import adb
20from acts import asserts
21from acts import signals
22from acts import utils
23from acts.controllers.adb import AdbError
24from acts.logger import epoch_to_log_line_timestamp
25from acts.utils import get_current_epoch_time
26from acts.logger import normalize_log_line_timestamp
27from acts.utils import start_standing_subprocess
28from acts.utils import stop_standing_subprocess
29from acts.test_utils.net import connectivity_const as cconst
30from acts.test_utils.tel.tel_test_utils import get_operator_name
31from acts.test_utils.tel.tel_data_utils import wait_for_cell_data_connection
32from acts.test_utils.tel.tel_test_utils import verify_http_connection
33from acts.test_utils.wifi import wifi_test_utils as wutils
34from scapy.all import get_if_list
35
36import os
37import re
38import time
39import urllib.request
40
41VPN_CONST = cconst.VpnProfile
42VPN_TYPE = cconst.VpnProfileType
43VPN_PARAMS = cconst.VpnReqParams
44TCPDUMP_PATH = "/data/local/tmp/"
45USB_CHARGE_MODE = "svc usb setFunctions"
46USB_TETHERING_MODE = "svc usb setFunctions rndis"
47DEVICE_IP_ADDRESS = "ip address"
48
49GCE_SSH = "gcloud compute ssh "
50GCE_SCP = "gcloud compute scp "
51
52
53def verify_lte_data_and_tethering_supported(ad):
54    """Verify if LTE data is enabled and tethering supported"""
55    wutils.wifi_toggle_state(ad, False)
56    ad.droid.telephonyToggleDataConnection(True)
57    wait_for_cell_data_connection(ad.log, ad, True)
58    asserts.assert_true(
59        verify_http_connection(ad.log, ad),
60        "HTTP verification failed on cell data connection")
61    asserts.assert_true(
62        ad.droid.connectivityIsTetheringSupported(),
63        "Tethering is not supported for the provider")
64    wutils.wifi_toggle_state(ad, True)
65
66
67def set_chrome_browser_permissions(ad):
68    """Set chrome browser start with no-first-run verification.
69    Give permission to read from and write to storage
70    """
71    commands = ["pm grant com.android.chrome "
72                "android.permission.READ_EXTERNAL_STORAGE",
73                "pm grant com.android.chrome "
74                "android.permission.WRITE_EXTERNAL_STORAGE",
75                "rm /data/local/chrome-command-line",
76                "am set-debug-app --persistent com.android.chrome",
77                'echo "chrome --no-default-browser-check --no-first-run '
78                '--disable-fre" > /data/local/tmp/chrome-command-line']
79    for cmd in commands:
80        try:
81            ad.adb.shell(cmd)
82        except AdbError:
83            logging.warning("adb command %s failed on %s" % (cmd, ad.serial))
84
85
86def verify_ping_to_vpn_ip(ad, vpn_ping_addr):
87    """ Verify if IP behind VPN server is pingable.
88    Ping should pass, if VPN is connected.
89    Ping should fail, if VPN is disconnected.
90
91    Args:
92      ad: android device object
93    """
94    ping_result = None
95    pkt_loss = "100% packet loss"
96    try:
97        ping_result = ad.adb.shell("ping -c 3 -W 2 %s" % vpn_ping_addr)
98    except AdbError:
99        pass
100    return ping_result and pkt_loss not in ping_result
101
102
103def legacy_vpn_connection_test_logic(ad, vpn_profile, vpn_ping_addr):
104    """ Test logic for each legacy VPN connection
105
106    Steps:
107      1. Generate profile for the VPN type
108      2. Establish connection to the server
109      3. Verify that connection is established using LegacyVpnInfo
110      4. Verify the connection by pinging the IP behind VPN
111      5. Stop the VPN connection
112      6. Check the connection status
113      7. Verify that ping to IP behind VPN fails
114
115    Args:
116      1. ad: Android device object
117      2. VpnProfileType (1 of the 6 types supported by Android)
118    """
119    # Wait for sometime so that VPN server flushes all interfaces and
120    # connections after graceful termination
121    time.sleep(10)
122
123    ad.adb.shell("ip xfrm state flush")
124    ad.log.info("Connecting to: %s", vpn_profile)
125    ad.droid.vpnStartLegacyVpn(vpn_profile)
126    time.sleep(cconst.VPN_TIMEOUT)
127
128    connected_vpn_info = ad.droid.vpnGetLegacyVpnInfo()
129    asserts.assert_equal(connected_vpn_info["state"],
130                         cconst.VPN_STATE_CONNECTED,
131                         "Unable to establish VPN connection for %s"
132                         % vpn_profile)
133
134    ping_result = verify_ping_to_vpn_ip(ad, vpn_ping_addr)
135    ip_xfrm_state = ad.adb.shell("ip xfrm state")
136    match_obj = re.search(r'hmac(.*)', "%s" % ip_xfrm_state)
137    if match_obj:
138        ip_xfrm_state = format(match_obj.group(0)).split()
139        ad.log.info("HMAC for ESP is %s " % ip_xfrm_state[0])
140
141    ad.droid.vpnStopLegacyVpn()
142    asserts.assert_true(ping_result,
143                        "Ping to the internal IP failed. "
144                        "Expected to pass as VPN is connected")
145
146    connected_vpn_info = ad.droid.vpnGetLegacyVpnInfo()
147    asserts.assert_true(not connected_vpn_info,
148                        "Unable to terminate VPN connection for %s"
149                        % vpn_profile)
150
151
152def download_load_certs(ad, vpn_params, vpn_type, vpn_server_addr,
153                        ipsec_server_type, log_path):
154    """ Download the certificates from VPN server and push to sdcard of DUT
155
156    Args:
157      ad: android device object
158      vpn_params: vpn params from config file
159      vpn_type: 1 of the 6 VPN types
160      vpn_server_addr: server addr to connect to
161      ipsec_server_type: ipsec version - strongswan or openswan
162      log_path: log path to download cert
163
164    Returns:
165      Client cert file name on DUT's sdcard
166    """
167    url = "http://%s%s%s" % (vpn_server_addr,
168                             vpn_params['cert_path_vpnserver'],
169                             vpn_params['client_pkcs_file_name'])
170    local_cert_name = "%s_%s_%s" % (vpn_type.name,
171                                    ipsec_server_type,
172                                    vpn_params['client_pkcs_file_name'])
173
174    local_file_path = os.path.join(log_path, local_cert_name)
175    try:
176        ret = urllib.request.urlopen(url)
177        with open(local_file_path, "wb") as f:
178            f.write(ret.read())
179    except Exception:
180        asserts.fail("Unable to download certificate from the server")
181
182    ad.adb.push("%s sdcard/" % local_file_path)
183    return local_cert_name
184
185
186def generate_legacy_vpn_profile(ad,
187                                vpn_params,
188                                vpn_type,
189                                vpn_server_addr,
190                                ipsec_server_type,
191                                log_path):
192    """ Generate legacy VPN profile for a VPN
193
194    Args:
195      ad: android device object
196      vpn_params: vpn params from config file
197      vpn_type: 1 of the 6 VPN types
198      vpn_server_addr: server addr to connect to
199      ipsec_server_type: ipsec version - strongswan or openswan
200      log_path: log path to download cert
201
202    Returns:
203      Vpn profile
204    """
205    vpn_profile = {VPN_CONST.USER: vpn_params['vpn_username'],
206                   VPN_CONST.PWD: vpn_params['vpn_password'],
207                   VPN_CONST.TYPE: vpn_type.value,
208                   VPN_CONST.SERVER: vpn_server_addr, }
209    vpn_profile[VPN_CONST.NAME] = "test_%s_%s" % (vpn_type.name,
210                                                  ipsec_server_type)
211    if vpn_type.name == "PPTP":
212        vpn_profile[VPN_CONST.NAME] = "test_%s" % vpn_type.name
213
214    psk_set = set(["L2TP_IPSEC_PSK", "IPSEC_XAUTH_PSK"])
215    rsa_set = set(["L2TP_IPSEC_RSA", "IPSEC_XAUTH_RSA", "IPSEC_HYBRID_RSA"])
216
217    if vpn_type.name in psk_set:
218        vpn_profile[VPN_CONST.IPSEC_SECRET] = vpn_params['psk_secret']
219    elif vpn_type.name in rsa_set:
220        cert_name = download_load_certs(ad,
221                                        vpn_params,
222                                        vpn_type,
223                                        vpn_server_addr,
224                                        ipsec_server_type,
225                                        log_path)
226        vpn_profile[VPN_CONST.IPSEC_USER_CERT] = cert_name.split('.')[0]
227        vpn_profile[VPN_CONST.IPSEC_CA_CERT] = cert_name.split('.')[0]
228        ad.droid.installCertificate(vpn_profile, cert_name,
229                                    vpn_params['cert_password'])
230    else:
231        vpn_profile[VPN_CONST.MPPE] = "mppe"
232
233    return vpn_profile
234
235
236def start_tcpdump(ad, test_name):
237    """Start tcpdump on all interfaces
238
239    Args:
240        ad: android device object.
241        test_name: tcpdump file name will have this
242    """
243    ad.log.info("Starting tcpdump on all interfaces")
244    try:
245        ad.adb.shell("killall -9 tcpdump")
246    except AdbError:
247        ad.log.warn("Killing existing tcpdump processes failed")
248    out = ad.adb.shell("ls -l %s" % TCPDUMP_PATH)
249    if "No such file" in out or not out:
250        ad.adb.shell("mkdir %s" % TCPDUMP_PATH)
251    else:
252        ad.adb.shell("rm -rf %s/*" % TCPDUMP_PATH, ignore_status=True)
253
254    begin_time = epoch_to_log_line_timestamp(get_current_epoch_time())
255    begin_time = normalize_log_line_timestamp(begin_time)
256
257    file_name = "%s/tcpdump_%s_%s.pcap" % (TCPDUMP_PATH, ad.serial, test_name)
258    ad.log.info("tcpdump file is %s", file_name)
259    cmd = "adb -s {} shell tcpdump -i any -s0 -w {}".format(ad.serial,
260                                                            file_name)
261    try:
262        return start_standing_subprocess(cmd, 5)
263    except Exception:
264        ad.log.exception('Could not start standing process %s' % repr(cmd))
265
266    return None
267
268def stop_tcpdump(ad,
269                 proc,
270                 test_name,
271                 adb_pull_timeout=adb.DEFAULT_ADB_PULL_TIMEOUT):
272    """Stops tcpdump on any iface
273       Pulls the tcpdump file in the tcpdump dir
274
275    Args:
276        ad: android device object.
277        proc: need to know which pid to stop
278        test_name: test name to save the tcpdump file
279        adb_pull_timeout: timeout for adb_pull
280
281    Returns:
282      log_path of the tcpdump file
283    """
284    ad.log.info("Stopping and pulling tcpdump if any")
285    if proc is None:
286        return None
287    try:
288        stop_standing_subprocess(proc)
289    except Exception as e:
290        ad.log.warning(e)
291    log_path = os.path.join(ad.log_path, test_name)
292    os.makedirs(log_path, exist_ok=True)
293    ad.adb.pull("%s/. %s" % (TCPDUMP_PATH, log_path), timeout=adb_pull_timeout)
294    ad.adb.shell("rm -rf %s/*" % TCPDUMP_PATH, ignore_status=True)
295    file_name = "tcpdump_%s_%s.pcap" % (ad.serial, test_name)
296    return "%s/%s" % (log_path, file_name)
297
298def start_tcpdump_gce_server(ad, test_name, dest_port, gce):
299    """ Start tcpdump on gce server
300
301    Args:
302        ad: android device object
303        test_name: test case name
304        dest_port: port to collect tcpdump
305        gce: dictionary of gce instance
306
307    Returns:
308       process id and pcap file path from gce server
309    """
310    ad.log.info("Starting tcpdump on gce server")
311
312    # pcap file name
313    fname = "/tmp/%s_%s_%s_%s" % \
314        (test_name, ad.model, ad.serial,
315         time.strftime('%Y-%m-%d_%H-%M-%S', time.localtime(time.time())))
316
317    # start tcpdump
318    tcpdump_cmd = "sudo bash -c \'tcpdump -i %s -w %s.pcap port %s > \
319        %s.txt 2>&1 & echo $!\'" % (gce["interface"], fname, dest_port, fname)
320    gcloud_ssh_cmd = "%s --project=%s --zone=%s %s@%s --command " % \
321        (GCE_SSH, gce["project"], gce["zone"], gce["username"], gce["hostname"])
322    gce_ssh_cmd = '%s "%s"' % (gcloud_ssh_cmd, tcpdump_cmd)
323    utils.exe_cmd(gce_ssh_cmd)
324
325    # get process id
326    ps_cmd = '%s "ps aux | grep tcpdump | grep %s"' % (gcloud_ssh_cmd, fname)
327    tcpdump_pid = utils.exe_cmd(ps_cmd).decode("utf-8", "ignore").split()
328    if not tcpdump_pid:
329        raise signals.TestFailure("Failed to start tcpdump on gce server")
330    return tcpdump_pid[1], fname
331
332def stop_tcpdump_gce_server(ad, tcpdump_pid, fname, gce):
333    """ Stop and pull tcpdump file from gce server
334
335    Args:
336        ad: android device object
337        tcpdump_pid: process id for tcpdump file
338        fname: tcpdump file path
339        gce: dictionary of gce instance
340
341    Returns:
342       pcap file from gce server
343    """
344    ad.log.info("Stop and pull pcap file from gce server")
345
346    # stop tcpdump
347    tcpdump_cmd = "sudo kill %s" % tcpdump_pid
348    gcloud_ssh_cmd = "%s --project=%s --zone=%s %s@%s --command " % \
349        (GCE_SSH, gce["project"], gce["zone"], gce["username"], gce["hostname"])
350    gce_ssh_cmd = '%s "%s"' % (gcloud_ssh_cmd, tcpdump_cmd)
351    utils.exe_cmd(gce_ssh_cmd)
352
353    # verify tcpdump is stopped
354    ps_cmd = '%s "ps aux | grep tcpdump"' % gcloud_ssh_cmd
355    res = utils.exe_cmd(ps_cmd).decode("utf-8", "ignore")
356    if tcpdump_pid in res.split():
357        raise signals.TestFailure("Failed to stop tcpdump on gce server")
358    if not fname:
359        return None
360
361    # pull pcap file
362    gcloud_scp_cmd = "%s --project=%s --zone=%s %s@%s:" % \
363        (GCE_SCP, gce["project"], gce["zone"], gce["username"], gce["hostname"])
364    pull_file = '%s%s.pcap %s/' % (gcloud_scp_cmd, fname, ad.device_log_path)
365    utils.exe_cmd(pull_file)
366    if not os.path.exists(
367        "%s/%s.pcap" % (ad.device_log_path, fname.split('/')[-1])):
368        raise signals.TestFailure("Failed to pull tcpdump from gce server")
369
370    # delete pcaps
371    utils.exe_cmd('%s "sudo rm %s.*"' % (gcloud_ssh_cmd, fname))
372
373    # return pcap file
374    pcap_file = "%s/%s.pcap" % (ad.device_log_path, fname.split('/')[-1])
375    return pcap_file
376
377def is_ipaddress_ipv6(ip_address):
378    """Verify if the given string is a valid IPv6 address.
379
380    Args:
381        ip_address: string containing the IP address
382
383    Returns:
384        True: if valid ipv6 address
385        False: if not
386    """
387    try:
388        socket.inet_pton(socket.AF_INET6, ip_address)
389        return True
390    except socket.error:
391        return False
392
393def carrier_supports_ipv6(dut):
394    """Verify if carrier supports ipv6.
395
396    Args:
397        dut: Android device that is being checked
398
399    Returns:
400        True: if carrier supports ipv6
401        False: if not
402    """
403
404    carrier_supports_ipv6 = ["vzw", "tmo", "Far EasTone", "Chunghwa Telecom"]
405    operator = get_operator_name("log", dut)
406    return operator in carrier_supports_ipv6
407
408def supports_ipv6_tethering(self, dut):
409    """ Check if provider supports IPv6 tethering.
410
411    Returns:
412        True: if provider supports IPv6 tethering
413        False: if not
414    """
415    carrier_supports_tethering = ["vzw", "tmo", "Far EasTone", "Chunghwa Telecom"]
416    operator = get_operator_name(self.log, dut)
417    return operator in carrier_supports_tethering
418
419
420def start_usb_tethering(ad):
421    """Start USB tethering.
422
423    Args:
424        ad: android device object
425    """
426    # TODO: test USB tethering by #startTethering API - b/149116235
427    ad.log.info("Starting USB Tethering")
428    ad.stop_services()
429    ad.adb.shell(USB_TETHERING_MODE, ignore_status=True)
430    ad.adb.wait_for_device()
431    ad.start_services()
432    if "rndis" not in ad.adb.shell(DEVICE_IP_ADDRESS):
433        raise signals.TestFailure("Unable to enable USB tethering.")
434
435
436def stop_usb_tethering(ad):
437    """Stop USB tethering.
438
439    Args:
440        ad: android device object
441    """
442    ad.log.info("Stopping USB Tethering")
443    ad.stop_services()
444    ad.adb.shell(USB_CHARGE_MODE)
445    ad.adb.wait_for_device()
446    ad.start_services()
447
448
449def wait_for_new_iface(old_ifaces):
450    """Wait for the new interface to come up.
451
452    Args:
453        old_ifaces: list of old interfaces
454    """
455    old_set = set(old_ifaces)
456    # Try 10 times to find a new interface with a 1s sleep every time
457    # (equivalent to a 9s timeout)
458    for i in range(0, 10):
459        new_ifaces = set(get_if_list()) - old_set
460        asserts.assert_true(len(new_ifaces) < 2,
461                            "Too many new interfaces after turning on "
462                            "tethering")
463        if len(new_ifaces) == 1:
464            return new_ifaces.pop()
465        time.sleep(1)
466    asserts.fail("Timeout waiting for tethering interface on host")
467