1#!/usr/bin/env python3
2#
3#   Copyright 2019 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import re
18import time
19
20from acts import base_test
21from acts.libs.proc import job
22from acts import signals
23from acts.test_decorators import test_tracker_info
24
25# Time it takes for the usb tethering IP to show up in ifconfig.
26IFCONFIG_SETTLE_TIME = 5
27USB_CHARGE_MODE = 'svc usb setFunctions'
28USB_TETHERING_MODE = 'svc usb setFunctions rndis'
29DEVICE_IP_ADDRESS = 'ip address'
30
31
32class UsbTetheringThroughputTest(base_test.BaseTestClass):
33    """Tests for usb tethering throughput test.
34
35    Test Bed Requirement:
36          * One Android device.
37          * Sim card with data call.
38    """
39
40    def setup_class(self):
41        """ Setup devices for usb tethering """
42        self.dut = self.android_devices[0]
43        self.ip_server = self.iperf_servers[0]
44        self.port_num = self.ip_server.port
45        req_params = [
46            'iperf_usb_3_criteria', 'iperf_usb_2_criteria', 'controller_path',
47            'iperf_test_sec'
48        ]
49        self.unpack_userparams(req_param_names=req_params)
50        if self.dut.model in self.user_params.get('legacy_projects', []):
51            self.usb_controller_path = self.user_params[
52                'legacy_controller_path']
53        else:
54            self.usb_controller_path = self.controller_path
55
56    def setup_test(self):
57        self.dut.adb.wait_for_device()
58        self.dut.droid.wakeLockAcquireBright()
59        self.dut.droid.wakeUpNow()
60        self.dut.unlock_screen()
61
62    def teardown_test(self):
63        self.dut.droid.wakeLockRelease()
64        self.dut.droid.goToSleepNow()
65        # Reboot device to avoid the side effect that cause adb missing after echo usb speed.
66        self.dut.reboot()
67
68    def on_fail(self, test_name, begin_time):
69        self.dut.take_bug_report(test_name, begin_time)
70        self.dut.cat_adb_log(test_name, begin_time)
71
72    def enable_usb_tethering(self):
73        """Stop SL4A service and enable usb tethering.
74
75        Logic steps are
76        1. Stop SL4A service.
77        2. Enable usb tethering.
78        3. Restart SL4A service.
79        4. Check usb tethering enabled.
80
81        Raises:
82            Signals.TestFailure is raised by not find rndis string.
83        """
84        self.dut.stop_services()
85        self.dut.adb.shell(USB_TETHERING_MODE, ignore_status=True)
86        self.dut.adb.wait_for_device()
87        self.dut.start_services()
88        if 'rndis' not in self.dut.adb.shell(DEVICE_IP_ADDRESS):
89            raise signals.TestFailure('Unable to enable USB tethering.')
90
91    def get_pc_tethering_ip(self):
92        """Check usb tethering IP from PC.
93
94        Returns:
95            Usb tethering IP from PC.
96
97        Raises:
98            Signals.TestFailure is raised by not find usb tethering IP.
99        """
100        time.sleep(IFCONFIG_SETTLE_TIME)
101        check_usb_tethering = job.run('ifconfig').stdout
102        matches = re.findall('inet (\d+.\d+.42.\d+)', check_usb_tethering)
103        if not matches:
104            raise signals.TestFailure(
105                'Unable to find tethering IP. The device may not be tethered.')
106        else:
107            return matches[0]
108
109    def get_dut_tethering_ip(self):
110        """Check usb tethering IP from Android device.
111
112        Returns:
113            Usb tethering IP from Android device.
114
115        Raises:
116            Signals.TestFailure is raised by not find usb tethering IP.
117        """
118        time.sleep(IFCONFIG_SETTLE_TIME)
119        check_usb_tethering = self.dut.adb.shell('ifconfig')
120        matches = re.findall('addr:(\d+.\d+.42.\d+)', check_usb_tethering)
121        if not matches:
122            raise signals.TestFailure(
123                'Unable to find tethering IP. The device may not be tethered.')
124        else:
125            return matches[0]
126
127    def pc_can_ping(self, count=3):
128        """Run ping traffic from PC side.
129
130        Logic steps are
131        1. PC ping the usb server ip.
132        2. Check the packet loss rate.
133
134        Args:
135            count : count for ping test.
136
137        Returns:
138            True: If no packet loss.
139            False: Otherwise.
140        """
141        ip = self.get_dut_tethering_ip()
142        ping = job.run(
143            'ping -c {} {}'.format(count, ip), ignore_status=True).stdout
144        self.log.info(ping)
145        return '0% packet loss' in ping
146
147    def dut_can_ping(self, count=3):
148        """Run ping traffic from Android device side.
149
150        Logic steps are
151        1. Android device ping the 8.8.8.8.
152        2. Check the packet loss rate.
153
154        Args:
155            count : count for ping test.
156
157        Returns:
158            True: If no packet loss.
159            False: Otherwise.
160        """
161        ip = '8.8.8.8'
162        ping = self.dut.adb.shell(
163            'ping -c {} {}'.format(count, ip), ignore_status=True)
164        self.log.info(ping)
165        return '0% packet loss' in ping
166
167    def run_iperf_client(self, dut_ip, extra_params=''):
168        """Run iperf client command after iperf server enabled.
169
170        Args:
171            dut_ip: string which contains the ip address.
172            extra_params: params to be added to the iperf client.
173
174        Returns:
175            Success: True if the iperf execute. False otherwise.
176            Rate: throughput data.
177        """
178        self.log.info('Run iperf process.')
179        cmd = "iperf3 -c {} {} -p {}".format(dut_ip, extra_params,
180                                             self.port_num)
181        self.log.info(cmd)
182        try:
183            result = self.dut.adb.shell(cmd, ignore_status=True)
184            self.log.info(result)
185        except:
186            self.log.error('Fail to execute iperf client.')
187            return False, 0
188        rate = re.findall('(\d+.\d+) (.)bits/sec.*receiver', result)
189        return True, rate[0][0], rate[0][1]
190
191    def run_iperf_tx_rx(self, usb_speed, criteria):
192        """Run iperf tx and rx.
193
194        Args:
195            usb_speed: string which contains the speed info.
196            criteria: usb performance criteria.
197
198        Raises:
199            Signals.TestFailure is raised by usb tethering under criteria.
200        """
201        self.enable_usb_tethering()
202        self.dut.adb.wait_for_device()
203        self.ip_server.start()
204        pc_ip = self.get_pc_tethering_ip()
205        tx_success, tx_rate, tx_unit = self.run_iperf_client(
206            pc_ip, '-t{} -i1 -w2M'.format(self.iperf_test_sec))
207        rx_success, rx_rate, rx_unit = self.run_iperf_client(
208            pc_ip, '-t{} -i1 -w2M -R'.format(self.iperf_test_sec))
209        self.log.info('TestResult iperf_{}_rx'.format(usb_speed) + rx_rate +
210                      ' ' + rx_unit + 'bits/sec')
211        self.log.info('TestResult iperf_{}_tx'.format(usb_speed) + tx_rate +
212                      ' ' + tx_unit + 'bits/sec')
213        self.ip_server.stop()
214        if not tx_success or (float(tx_rate) < criteria and tx_unit != "G"):
215            raise signals.TestFailure('Iperf {}_tx test is {} {}bits/sec, '
216                                      'the throughput result failed.'.format(
217                                          usb_speed, tx_rate, tx_unit))
218        if not rx_success or (float(rx_rate) < criteria and rx_unit != "G"):
219            raise signals.TestFailure('Iperf {}_rx test is {} {}bits/sec, '
220                                      'the throughput result failed.'.format(
221                                          usb_speed, rx_rate, rx_unit))
222
223    def get_usb_speed(self):
224        """Get current usb speed."""
225        usb_controller_address = self.dut.adb.shell(
226            'getprop sys.usb.controller', ignore_status=True)
227        usb_speed = self.dut.adb.shell(
228            'cat sys/class/udc/{}/current_speed'.format(
229                usb_controller_address))
230        return usb_speed, usb_controller_address
231
232    @test_tracker_info(uuid="e7e0dfdc-3d1c-4642-a468-27326c49e4cb")
233    def test_tethering_ping(self):
234        """Enable usb tethering then executing ping test.
235
236        Steps:
237        1. Stop SL4A service.
238        2. Enable usb tethering.
239        3. Restart SL4A service.
240        4. Execute ping test from PC and Android Device.
241        5. Check the ping lost rate.
242        """
243        self.enable_usb_tethering()
244        if self.pc_can_ping() and self.dut_can_ping():
245            raise signals.TestPass(
246                'Ping test is passed. Network is reachable.')
247        raise signals.TestFailure(
248            'Ping test failed. Maybe network is unreachable.')
249
250    @test_tracker_info(uuid="8263c880-8a7e-4a68-b47f-e7caba3e9968")
251    def test_usb_tethering_iperf_super_speed(self):
252        """Enable usb tethering then executing iperf test.
253
254        Steps:
255        1. Stop SL4A service.
256        2. Enable usb tethering.
257        3. Restart SL4A service.
258        4. Skip test if device not support super-speed.
259        5. Execute iperf test for usb tethering and get the throughput result.
260        6. Check the iperf throughput result.
261        """
262        if (self.get_usb_speed()[0] != 'super-speed'):
263            raise signals.TestSkip(
264                'USB 3 not available for the device, skip super-speed performance test.'
265            )
266        self.run_iperf_tx_rx('usb_3', self.iperf_usb_3_criteria)
267
268    @test_tracker_info(uuid="5d8a22fd-1f9b-4758-a6b4-855d134b348a")
269    def test_usb_tethering_iperf_high_speed(self):
270        """Enable usb tethering then executing iperf test.
271
272        Steps:
273        1. Stop SL4A service.
274        2. Enable usb tethering.
275        3. Restart SL4A service.
276        4. Force set usb speed to high-speed if default is super-speed.
277        5. Execute iperf test for usb tethering and get the throughput result.
278        6. Check the iperf throughput result.
279        """
280        if (self.get_usb_speed()[0] != 'high-speed'):
281            self.log.info(
282                'Default usb speed is USB 3,Force set usb to high-speed.')
283            self.dut.stop_services()
284            self.dut.adb.wait_for_device()
285            self.dut.adb.shell(
286                'echo high > {}{}.ssusb/speed'.format(
287                    self.usb_controller_path,
288                    self.get_usb_speed()[1].strip('.dwc3')),
289                ignore_status=True)
290            self.dut.adb.wait_for_device()
291            self.dut.start_services()
292        self.run_iperf_tx_rx('usb_2', self.iperf_usb_2_criteria)
293