1#!/usr/bin/env python3.4
2#
3#   Copyright 2018 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the 'License');
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an 'AS IS' BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16import acts
17import json
18import logging
19import math
20import os
21import time
22import acts.controllers.iperf_server as ipf
23from acts import asserts
24from acts import base_test
25from acts import utils
26from acts.controllers import monsoon
27from acts.test_utils.wifi import wifi_test_utils as wutils
28from acts.test_utils.wifi import wifi_power_test_utils as wputils
29
30SETTINGS_PAGE = 'am start -n com.android.settings/.Settings'
31SCROLL_BOTTOM = 'input swipe 0 2000 0 0'
32UNLOCK_SCREEN = 'input keyevent 82'
33SCREENON_USB_DISABLE = 'dumpsys battery unplug'
34RESET_BATTERY_STATS = 'dumpsys batterystats --reset'
35AOD_OFF = 'settings put secure doze_always_on 0'
36MUSIC_IQ_OFF = 'pm disable-user com.google.intelligence.sense'
37# Command to disable gestures
38LIFT = 'settings put secure doze_pulse_on_pick_up 0'
39DOUBLE_TAP = 'settings put secure doze_pulse_on_double_tap 0'
40JUMP_TO_CAMERA = 'settings put secure camera_double_tap_power_gesture_disabled 1'
41RAISE_TO_CAMERA = 'settings put secure camera_lift_trigger_enabled 0'
42FLIP_CAMERA = 'settings put secure camera_double_twist_to_flip_enabled 0'
43ASSIST_GESTURE = 'settings put secure assist_gesture_enabled 0'
44ASSIST_GESTURE_ALERT = 'settings put secure assist_gesture_silence_alerts_enabled 0'
45ASSIST_GESTURE_WAKE = 'settings put secure assist_gesture_wake_enabled 0'
46SYSTEM_NAVI = 'settings put secure system_navigation_keys_enabled 0'
47# End of command to disable gestures
48AUTO_TIME_OFF = 'settings put global auto_time 0'
49AUTO_TIMEZONE_OFF = 'settings put global auto_time_zone 0'
50FORCE_YOUTUBE_STOP = 'am force-stop com.google.android.youtube'
51FORCE_DIALER_STOP = 'am force-stop com.google.android.dialer'
52IPERF_TIMEOUT = 180
53THRESHOLD_TOLERANCE = 0.2
54GET_FROM_PHONE = 'get_from_dut'
55GET_FROM_AP = 'get_from_ap'
56PHONE_BATTERY_VOLTAGE = 4.2
57MONSOON_MAX_CURRENT = 8.0
58MONSOON_RETRY_INTERVAL = 300
59MEASUREMENT_RETRY_COUNT = 3
60RECOVER_MONSOON_RETRY_COUNT = 3
61MIN_PERCENT_SAMPLE = 95
62ENABLED_MODULATED_DTIM = 'gEnableModulatedDTIM='
63MAX_MODULATED_DTIM = 'gMaxLIModulatedDTIM='
64TEMP_FILE = '/sdcard/Download/tmp.log'
65IPERF_DURATION = 'iperf_duration'
66INITIAL_ATTEN = [0, 0, 90, 90]
67
68
69class ObjNew():
70    """Create a random obj with unknown attributes and value.
71
72    """
73
74    def __init__(self, **kwargs):
75        self.__dict__.update(kwargs)
76
77    def __contains__(self, item):
78        """Function to check if one attribute is contained in the object.
79
80        Args:
81            item: the item to check
82        Return:
83            True/False
84        """
85        return hasattr(self, item)
86
87
88class PowerBaseTest(base_test.BaseTestClass):
89    """Base class for all wireless power related tests.
90
91    """
92
93    def __init__(self, controllers):
94
95        base_test.BaseTestClass.__init__(self, controllers)
96
97    def setup_class(self):
98
99        self.log = logging.getLogger()
100        self.tests = self._get_all_test_names()
101
102        # Setup the must have controllers, phone and monsoon
103        self.dut = self.android_devices[0]
104        self.mon_data_path = os.path.join(self.log_path, 'Monsoon')
105        self.mon = self.monsoons[0]
106        self.mon.set_max_current(8.0)
107        self.mon.set_voltage(4.2)
108        self.mon.attach_device(self.dut)
109
110        # Unpack the test/device specific parameters
111        TEST_PARAMS = self.TAG + '_params'
112        req_params = [TEST_PARAMS, 'custom_files']
113        self.unpack_userparams(req_params)
114        # Unpack the custom files based on the test configs
115        for file in self.custom_files:
116            if 'pass_fail_threshold_' + self.dut.model in file:
117                self.threshold_file = file
118            elif 'attenuator_setting' in file:
119                self.attenuation_file = file
120            elif 'network_config' in file:
121                self.network_file = file
122
123        # Unpack test specific configs
124        self.unpack_testparams(getattr(self, TEST_PARAMS))
125        if hasattr(self, 'attenuators'):
126            self.num_atten = self.attenuators[0].instrument.num_atten
127            self.atten_level = self.unpack_custom_file(self.attenuation_file)
128        self.set_attenuation(INITIAL_ATTEN)
129        self.threshold = self.unpack_custom_file(self.threshold_file)
130        self.mon_info = self.create_monsoon_info()
131
132        # Onetime task for each test class
133        # Temporary fix for b/77873679
134        self.adb_disable_verity()
135        self.dut.adb.shell('mv /vendor/bin/chre /vendor/bin/chre_renamed')
136        self.dut.adb.shell('pkill chre')
137
138    def setup_test(self):
139        """Set up test specific parameters or configs.
140
141        """
142        # Set the device into rockbottom state
143        self.dut_rockbottom()
144        # Wait for extra time if needed for the first test
145        if hasattr(self, 'extra_wait'):
146            self.more_wait_first_test()
147
148    def teardown_test(self):
149        """Tear down necessary objects after test case is finished.
150
151        """
152        self.log.info('Tearing down the test case')
153        self.mon.usb('on')
154
155    def teardown_class(self):
156        """Clean up the test class after tests finish running
157
158        """
159        self.log.info('Tearing down the test class')
160        self.mon.usb('on')
161
162    def unpack_testparams(self, bulk_params):
163        """Unpack all the test specific parameters.
164
165        Args:
166            bulk_params: dict with all test specific params in the config file
167        """
168        for key in bulk_params.keys():
169            setattr(self, key, bulk_params[key])
170
171    def unpack_custom_file(self, file, test_specific=True):
172        """Unpack the pass_fail_thresholds from a common file.
173
174        Args:
175            file: the common file containing pass fail threshold.
176        """
177        with open(file, 'r') as f:
178            params = json.load(f)
179        if test_specific:
180            try:
181                return params[self.TAG]
182            except KeyError:
183                pass
184        else:
185            return params
186
187    def decode_test_configs(self, attrs, indices):
188        """Decode the test config/params from test name.
189
190        Remove redundant function calls when tests are similar.
191        Args:
192            attrs: a list of the attrs of the test config obj
193            indices: a list of the location indices of keyword in the test name.
194        """
195        # Decode test parameters for the current test
196        test_params = self.current_test_name.split('_')
197        values = [test_params[x] for x in indices]
198        config_dict = dict(zip(attrs, values))
199        self.test_configs = ObjNew(**config_dict)
200
201    def more_wait_first_test(self):
202        # For the first test, increase the offset for longer wait time
203        if self.current_test_name == self.tests[0]:
204            self.mon_info.offset = self.mon_offset + self.extra_wait
205        else:
206            self.mon_info.offset = self.mon_offset
207
208    def set_attenuation(self, atten_list):
209        """Function to set the attenuator to desired attenuations.
210
211        Args:
212            atten_list: list containing the attenuation for each attenuator.
213        """
214        if len(atten_list) != self.num_atten:
215            raise Exception('List given does not have the correct length')
216        for i in range(self.num_atten):
217            self.attenuators[i].set_atten(atten_list[i])
218
219    def dut_rockbottom(self):
220        """Set the phone into Rock-bottom state.
221
222        """
223        self.dut.log.info('Now set the device to Rockbottom State')
224        utils.require_sl4a((self.dut, ))
225        self.dut.droid.connectivityToggleAirplaneMode(False)
226        time.sleep(2)
227        self.dut.droid.connectivityToggleAirplaneMode(True)
228        time.sleep(2)
229        utils.set_ambient_display(self.dut, False)
230        utils.set_auto_rotate(self.dut, False)
231        utils.set_adaptive_brightness(self.dut, False)
232        utils.sync_device_time(self.dut)
233        utils.set_location_service(self.dut, False)
234        utils.set_mobile_data_always_on(self.dut, False)
235        utils.disable_doze_light(self.dut)
236        utils.disable_doze(self.dut)
237        wutils.reset_wifi(self.dut)
238        wutils.wifi_toggle_state(self.dut, False)
239        try:
240            self.dut.droid.nfcDisable()
241        except acts.controllers.sl4a_lib.rpc_client.Sl4aApiError:
242            self.dut.log.info('NFC is not available')
243        self.dut.droid.setScreenBrightness(0)
244        self.dut.adb.shell(AOD_OFF)
245        self.dut.droid.setScreenTimeout(2200)
246        self.dut.droid.wakeUpNow()
247        self.dut.adb.shell(LIFT)
248        self.dut.adb.shell(DOUBLE_TAP)
249        self.dut.adb.shell(JUMP_TO_CAMERA)
250        self.dut.adb.shell(RAISE_TO_CAMERA)
251        self.dut.adb.shell(FLIP_CAMERA)
252        self.dut.adb.shell(ASSIST_GESTURE)
253        self.dut.adb.shell(ASSIST_GESTURE_ALERT)
254        self.dut.adb.shell(ASSIST_GESTURE_WAKE)
255        self.dut.adb.shell(SCREENON_USB_DISABLE)
256        self.dut.adb.shell(UNLOCK_SCREEN)
257        self.dut.adb.shell(SETTINGS_PAGE)
258        self.dut.adb.shell(SCROLL_BOTTOM)
259        self.dut.adb.shell(MUSIC_IQ_OFF)
260        self.dut.adb.shell(AUTO_TIME_OFF)
261        self.dut.adb.shell(AUTO_TIMEZONE_OFF)
262        self.dut.adb.shell(FORCE_YOUTUBE_STOP)
263        self.dut.adb.shell(FORCE_DIALER_STOP)
264        self.dut.droid.wifiSetCountryCode('US')
265        self.dut.droid.wakeUpNow()
266        self.dut.log.info('Device has been set to Rockbottom state')
267        self.dut.log.info('Screen is ON')
268
269    def measure_power_and_validate(self):
270        """The actual test flow and result processing and validate.
271
272        """
273        self.collect_power_data()
274        self.pass_fail_check()
275
276    def collect_power_data(self):
277        """Measure power, plot and take log if needed.
278
279        """
280        tag = ''
281        # Collecting current measurement data and plot
282        begin_time = utils.get_current_epoch_time()
283        self.file_path, self.test_result = self.monsoon_data_collect_save()
284        wputils.monsoon_data_plot(self.mon_info, self.file_path, tag=tag)
285        # Take Bugreport
286        if self.bug_report:
287            self.dut.take_bug_report(self.test_name, begin_time)
288
289    def pass_fail_check(self):
290        """Check the test result and decide if it passed or failed.
291
292        The threshold is provided in the config file. In this class, result is
293        current in mA.
294        """
295        current_threshold = self.threshold[self.test_name]
296        if self.test_result:
297            asserts.assert_true(
298                abs(self.test_result - current_threshold) / current_threshold <
299                THRESHOLD_TOLERANCE,
300                ('Measured average current in [{}]: {}, which is '
301                 'more than {} percent off than acceptable threshold {:.2f}mA'
302                 ).format(self.test_name, self.test_result,
303                          self.pass_fail_tolerance * 100, current_threshold))
304            asserts.explicit_pass('Measurement finished for {}.'.format(
305                self.test_name))
306        else:
307            asserts.fail(
308                'Something happened, measurement is not complete, test failed')
309
310    def create_monsoon_info(self):
311        """Creates the config dictionary for monsoon
312
313        Returns:
314            mon_info: Dictionary with the monsoon packet config
315        """
316        if hasattr(self, IPERF_DURATION):
317            self.mon_duration = self.iperf_duration - 10
318        mon_info = ObjNew(
319            dut=self.mon,
320            freq=self.mon_freq,
321            duration=self.mon_duration,
322            offset=self.mon_offset,
323            data_path=self.mon_data_path)
324        return mon_info
325
326    def monsoon_recover(self):
327        """Test loop to wait for monsoon recover from unexpected error.
328
329        Wait for a certain time duration, then quit.0
330        Args:
331            mon: monsoon object
332        Returns:
333            True/False
334        """
335        try:
336            self.mon.reconnect_monsoon()
337            time.sleep(2)
338            self.mon.usb('on')
339            logging.info('Monsoon recovered from unexpected error')
340            time.sleep(2)
341            return True
342        except monsoon.MonsoonError:
343            logging.info(self.mon.mon.ser.in_waiting)
344            logging.warning('Unable to recover monsoon from unexpected error')
345            return False
346
347    def monsoon_data_collect_save(self):
348        """Current measurement and save the log file.
349
350        Collect current data using Monsoon box and return the path of the
351        log file. Take bug report if requested.
352
353        Returns:
354            data_path: the absolute path to the log file of monsoon current
355                       measurement
356            avg_current: the average current of the test
357        """
358
359        tag = '{}_{}_{}'.format(self.test_name, self.dut.model,
360                                self.dut.build_info['build_id'])
361        data_path = os.path.join(self.mon_info.data_path, '{}.txt'.format(tag))
362        total_expected_samples = self.mon_info.freq * (
363            self.mon_info.duration + self.mon_info.offset)
364        min_required_samples = total_expected_samples * MIN_PERCENT_SAMPLE / 100
365        # Retry counter for monsoon data aquisition
366        retry_measure = 1
367        # Indicator that need to re-collect data
368        need_collect_data = 1
369        result = None
370        while retry_measure <= MEASUREMENT_RETRY_COUNT:
371            try:
372                # If need to retake data
373                if need_collect_data == 1:
374                    #Resets the battery status right before the test started
375                    self.dut.adb.shell(RESET_BATTERY_STATS)
376                    self.log.info(
377                        'Starting power measurement with monsoon box, try #{}'.
378                        format(retry_measure))
379                    #Start the power measurement using monsoon
380                    self.mon_info.dut.monsoon_usb_auto()
381                    result = self.mon_info.dut.measure_power(
382                        self.mon_info.freq,
383                        self.mon_info.duration,
384                        tag=tag,
385                        offset=self.mon_info.offset)
386                    self.mon_info.dut.reconnect_dut()
387                # Reconnect to dut
388                else:
389                    self.mon_info.dut.reconnect_dut()
390                # Reconnect and return measurement results if no error happens
391                avg_current = result.average_current
392                monsoon.MonsoonData.save_to_text_file([result], data_path)
393                self.log.info('Power measurement done within {} try'.format(
394                    retry_measure))
395                return data_path, avg_current
396            # Catch monsoon errors during measurement
397            except monsoon.MonsoonError:
398                self.log.info(self.mon_info.dut.mon.ser.in_waiting)
399                # Break early if it's one count away from limit
400                if retry_measure == MEASUREMENT_RETRY_COUNT:
401                    self.log.error(
402                        'Test failed after maximum measurement retry')
403                    break
404
405                self.log.warning('Monsoon error happened, now try to recover')
406                # Retry loop to recover monsoon from error
407                retry_monsoon = 1
408                while retry_monsoon <= RECOVER_MONSOON_RETRY_COUNT:
409                    mon_status = self.monsoon_recover(self.mon_info.dut)
410                    if mon_status:
411                        break
412                    else:
413                        retry_monsoon += 1
414                        self.log.warning(
415                            'Wait for {} second then try again'.format(
416                                MONSOON_RETRY_INTERVAL))
417                        time.sleep(MONSOON_RETRY_INTERVAL)
418
419                # Break the loop to end test if failed to recover monsoon
420                if not mon_status:
421                    self.log.error(
422                        'Tried our best, still failed to recover monsoon')
423                    break
424                else:
425                    # If there is no data, or captured samples are less than min
426                    # required, re-take
427                    if not result:
428                        self.log.warning('No data taken, need to remeasure')
429                    elif len(result._data_points) <= min_required_samples:
430                        self.log.warning(
431                            'More than {} percent of samples are missing due to monsoon error. Need to remeasure'.
432                            format(100 - MIN_PERCENT_SAMPLE))
433                    else:
434                        need_collect_data = 0
435                        self.log.warning(
436                            'Data collected is valid, try reconnect to DUT to finish test'
437                        )
438                    retry_measure += 1
439
440        if retry_measure > MEASUREMENT_RETRY_COUNT:
441            self.log.error('Test failed after maximum measurement retry')
442
443    def setup_ap_connection(self, network, bandwidth=80, connect=True):
444        """Setup AP and connect DUT to it.
445
446        Args:
447            network: the network config for the AP to be setup
448            bandwidth: bandwidth of the WiFi network to be setup
449            connect: indicator of if connect dut to the network after setup
450        Returns:
451            self.brconfigs: dict for bridge interface configs
452        """
453        wutils.wifi_toggle_state(self.dut, True)
454        self.brconfigs = wputils.ap_setup(
455            self.access_point, network, bandwidth=bandwidth)
456        if connect:
457            wutils.wifi_connect(self.dut, network)
458        return self.brconfigs
459
460    def process_iperf_results(self):
461        """Get the iperf results and process.
462
463        Returns:
464             throughput: the average throughput during tests.
465        """
466        # Get IPERF results and add this to the plot title
467        RESULTS_DESTINATION = os.path.join(self.iperf_server.log_path,
468                                           'iperf_client_output_{}.log'.format(
469                                               self.current_test_name))
470        PULL_FILE = '{} {}'.format(TEMP_FILE, RESULTS_DESTINATION)
471        self.dut.adb.pull(PULL_FILE)
472        # Calculate the average throughput
473        if self.use_client_output:
474            iperf_file = RESULTS_DESTINATION
475        else:
476            iperf_file = self.iperf_server.log_files[-1]
477        try:
478            iperf_result = ipf.IPerfResult(iperf_file)
479            throughput = (math.fsum(iperf_result.instantaneous_rates[:-1]) /
480                          len(iperf_result.instantaneous_rates[:-1])) * 8
481            self.log.info('The average throughput is {}'.format(throughput))
482        except ValueError:
483            self.log.warning('Cannot get iperf result. Setting to 0')
484            throughput = 0
485        return throughput
486
487    # TODO(@qijiang)Merge with tel_test_utils.py
488    def adb_disable_verity(self):
489        """Disable verity on the device.
490
491        """
492        if self.dut.adb.getprop("ro.boot.veritymode") == "enforcing":
493            self.dut.adb.disable_verity()
494            self.dut.reboot()
495            self.dut.adb.root()
496            self.dut.adb.remount()
497