1#
2#   Copyright 2018 - The Android Open Source Project
3#
4#   Licensed under the Apache License, Version 2.0 (the "License");
5#   you may not use this file except in compliance with the License.
6#   You may obtain a copy of the License at
7#
8#       http://www.apache.org/licenses/LICENSE-2.0
9#
10#   Unless required by applicable law or agreed to in writing, software
11#   distributed under the License is distributed on an "AS IS" BASIS,
12#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13#   See the License for the specific language governing permissions and
14#   limitations under the License.
15
16from acts import asserts
17from acts.test_decorators import test_tracker_info
18from acts.test_utils.net.net_test_utils import start_tcpdump
19from acts.test_utils.net.net_test_utils import stop_tcpdump
20from acts.test_utils.wifi.WifiBaseTest import WifiBaseTest
21from acts.test_utils.wifi import wifi_test_utils as wutils
22
23import acts.base_test
24import acts.test_utils.wifi.wifi_test_utils as wutils
25
26import copy
27import os
28import random
29import time
30
31WifiEnums = wutils.WifiEnums
32
33RA_SCRIPT = 'sendra.py'
34SCAPY = 'scapy-2.2.0.tar.gz'
35SCAPY_INSTALL_COMMAND = 'sudo python setup.py install'
36PROC_NET_SNMP6 = '/proc/net/snmp6'
37LIFETIME_FRACTION = 6
38LIFETIME = 180
39INTERVAL = 2
40
41
42class ApfCountersTest(WifiBaseTest):
43
44    def __init__(self, controllers):
45        WifiBaseTest.__init__(self, controllers)
46        self.tests = ("test_IPv6_RA_packets",
47                      "test_IPv6_RA_with_RTT", )
48
49    def setup_class(self):
50        self.dut = self.android_devices[0]
51        wutils.wifi_test_device_init(self.dut)
52        req_params = []
53        opt_param = ["reference_networks", ]
54
55        self.unpack_userparams(
56            req_param_names=req_params, opt_param_names=opt_param)
57
58        if "AccessPoint" in self.user_params:
59            self.legacy_configure_ap_and_start()
60
61        asserts.assert_true(
62            len(self.reference_networks) > 0,
63            "Need at least one reference network with psk.")
64        wutils.wifi_toggle_state(self.dut, True)
65
66        self.wpapsk_2g = self.reference_networks[0]["2g"]
67        self.wpapsk_5g = self.reference_networks[0]["5g"]
68
69        # install scapy
70        current_dir = os.path.dirname(os.path.realpath(__file__))
71        send_ra = os.path.join(current_dir, RA_SCRIPT)
72        send_scapy = os.path.join(current_dir, SCAPY)
73        self.access_points[0].install_scapy(send_scapy, send_ra)
74        self.tcpdump_pid = None
75
76    def setup_test(self):
77        self.tcpdump_pid = start_tcpdump(self.dut, self.test_name)
78
79    def teardown_test(self):
80        stop_tcpdump(self.dut, self.tcpdump_pid, self.test_name)
81
82    def on_fail(self, test_name, begin_time):
83        self.dut.take_bug_report(test_name, begin_time)
84        self.dut.cat_adb_log(test_name, begin_time)
85
86    def teardown_class(self):
87        if "AccessPoint" in self.user_params:
88            del self.user_params["reference_networks"]
89        self.access_points[0].cleanup_scapy()
90        wutils.reset_wifi(self.dut)
91
92    """ Helper methods """
93
94    def _get_icmp6intype134(self):
95        """ Get ICMP6 Type 134 packet count on the DUT
96
97        Returns:
98            Number of ICMP6 type 134 packets
99        """
100        cmd = "grep Icmp6InType134 %s || true" % PROC_NET_SNMP6
101        ra_count = self.dut.adb.shell(cmd)
102        if not ra_count:
103            return 0
104        ra_count = int(ra_count.split()[-1].rstrip())
105        self.dut.log.info("RA Count %s:" % ra_count)
106        return ra_count
107
108    """ Tests """
109
110    @test_tracker_info(uuid="bc8d3f27-582a-464a-be30-556f07b77ee1")
111    def test_IPv6_RA_packets(self):
112        """ Test if the device filters the IPv6 packets
113
114        Steps:
115          1. Send a RA packet to DUT. DUT should accept this
116          2. Send duplicate RA packets. The RA packets should be filtered
117             for the next 30 seconds (1/6th of RA lifetime)
118          3. The next RA packets should be accepted
119        """
120        # get mac address of the dut
121        ap = self.access_points[0]
122        wifi_network = copy.deepcopy(self.wpapsk_5g)
123        wifi_network['meteredOverride'] = 1
124        wutils.connect_to_wifi_network(self.dut, wifi_network)
125        mac_addr = self.dut.droid.wifiGetConnectionInfo()['mac_address']
126        self.log.info("mac_addr %s" % mac_addr)
127        time.sleep(30) # wait 30 sec before sending RAs
128
129        # get the current ra count
130        ra_count = self._get_icmp6intype134()
131
132        # Start scapy to send RA to the phone's MAC
133        ap.send_ra('wlan1', mac_addr, 0, 1)
134
135        # get the latest ra count
136        ra_count_latest = self._get_icmp6intype134()
137        asserts.assert_true(ra_count_latest == ra_count + 1,
138                            "Device dropped the first RA in sequence")
139
140        # Generate and send 'x' number of duplicate RAs, for 1/6th of the the
141        # lifetime of the original RA. Test assumes that the original RA has a
142        # lifetime of 180s. Hence, all RAs received within the next 30s of the
143        # original RA should be filtered.
144        ra_count = ra_count_latest
145        count = LIFETIME / LIFETIME_FRACTION / INTERVAL
146        ap.send_ra('wlan1', mac_addr, interval=INTERVAL, count=count)
147
148        # Fail test if at least 90% of RAs were not dropped.
149        ra_count_latest = self._get_icmp6intype134()
150        pkt_loss = count - (ra_count_latest - ra_count)
151        percentage_loss = float(pkt_loss) / count * 100
152        asserts.assert_true(percentage_loss >= 90, "Device did not filter "
153                            "duplicate RAs correctly. %d Percent of duplicate"
154                            " RAs were accepted" % (100 - percentage_loss))
155
156        # Any new RA after this should be accepted.
157        ap.send_ra('wlan1', mac_addr, interval=INTERVAL, count=1)
158        ra_count_latest = self._get_icmp6intype134()
159        asserts.assert_true(ra_count_latest == ra_count + 1,
160                            "Device did not accept new RA after 1/6th time "
161                            "interval. Device dropped a valid RA in sequence.")
162
163    @test_tracker_info(uuid="d2a0aff0-048c-475f-9bba-d90d8d9ebae3")
164    def test_IPv6_RA_with_RTT(self):
165        """Test if the device filters IPv6 RA packets with different re-trans time
166
167        Steps:
168          1. Get the current RA count
169          2. Send 400 packets with different re-trans time
170          3. Verify that RA count increased by 400
171          4. Verify internet connectivity
172        """
173        pkt_num = 400
174        rtt_list = random.sample(range(10, 10000), pkt_num)
175
176        # get mac address of the dut
177        ap = self.access_points[0]
178        wutils.connect_to_wifi_network(self.dut, self.wpapsk_5g)
179        mac_addr = self.dut.droid.wifiGetConnectionInfo()['mac_address']
180        self.log.info("mac_addr %s" % mac_addr)
181        time.sleep(30) # wait 30 sec before sending RAs
182
183        # get the current ra count
184        ra_count = self._get_icmp6intype134()
185
186        # send RA with differnt re-trans time
187        for rtt in rtt_list:
188            ap.send_ra('wlan1', mac_addr, 0, 1, rtt=rtt)
189
190        # get the new RA count
191        new_ra_count = self._get_icmp6intype134()
192        asserts.assert_true(new_ra_count == ra_count + pkt_num,
193                            "Device did not accept all RAs")
194
195        # verify if internet connectivity works after sending RA packets
196        wutils.validate_connection(self.dut)
197