1#!/usr/bin/env python3
2#
3#   Copyright 2018 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the 'License');
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an 'AS IS' BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16import json
17import logging
18import math
19import os
20import re
21import time
22
23import acts.controllers.iperf_server as ipf
24from acts import asserts
25from acts import base_test
26from acts import utils
27from acts.controllers.monsoon_lib.api.common import MonsoonError
28from acts.controllers.monsoon_lib.api.common import PassthroughStates
29from acts.metrics.loggers.blackbox import BlackboxMetricLogger
30from acts.test_utils.power.loggers.power_metric_logger import PowerMetricLogger
31from acts.test_utils.wifi import wifi_power_test_utils as wputils
32from acts.test_utils.wifi import wifi_test_utils as wutils
33
34RESET_BATTERY_STATS = 'dumpsys batterystats --reset'
35IPERF_TIMEOUT = 180
36THRESHOLD_TOLERANCE_DEFAULT = 0.2
37GET_FROM_PHONE = 'get_from_dut'
38GET_FROM_AP = 'get_from_ap'
39PHONE_BATTERY_VOLTAGE_DEFAULT = 4.2
40MONSOON_MAX_CURRENT = 8.0
41MONSOON_RETRY_INTERVAL = 300
42DEFAULT_MONSOON_FREQUENCY = 500
43MEASUREMENT_RETRY_COUNT = 3
44RECOVER_MONSOON_RETRY_COUNT = 3
45MIN_PERCENT_SAMPLE = 95
46ENABLED_MODULATED_DTIM = 'gEnableModulatedDTIM='
47MAX_MODULATED_DTIM = 'gMaxLIModulatedDTIM='
48TEMP_FILE = '/sdcard/Download/tmp.log'
49
50
51class ObjNew():
52    """Create a random obj with unknown attributes and value.
53
54    """
55    def __init__(self, **kwargs):
56        self.__dict__.update(kwargs)
57
58    def __contains__(self, item):
59        """Function to check if one attribute is contained in the object.
60
61        Args:
62            item: the item to check
63        Return:
64            True/False
65        """
66        return hasattr(self, item)
67
68
69class PowerBaseTest(base_test.BaseTestClass):
70    """Base class for all wireless power related tests.
71
72    """
73    def __init__(self, controllers):
74
75        base_test.BaseTestClass.__init__(self, controllers)
76        self.power_result = BlackboxMetricLogger.for_test_case(
77            metric_name='avg_power')
78        self.start_meas_time = 0
79        self.rockbottom_script = None
80        self.img_name = ''
81        self.dut = None
82        self.power_logger = PowerMetricLogger.for_test_case()
83
84    @property
85    def final_test(self):
86        return len(
87            self.results.requested
88        ) > 0 and self.current_test_name == self.results.requested[-1]
89
90    def setup_class(self):
91
92        self.log = logging.getLogger()
93        self.tests = self.get_existing_test_names()
94
95        # Obtain test parameters from user_params
96        TEST_PARAMS = self.TAG + '_params'
97        self.test_params = self.user_params.get(TEST_PARAMS, {})
98        if not self.test_params:
99            self.log.warning(TEST_PARAMS + ' was not found in the user '
100                             'parameters defined in the config file.')
101
102        # Override user_param values with test parameters
103        self.user_params.update(self.test_params)
104
105        # Unpack user_params with default values. All the usages of user_params
106        # as self attributes need to be included either as a required parameter
107        # or as a parameter with a default value.
108        req_params = ['custom_files', 'mon_duration']
109        self.unpack_userparams(req_params,
110                               mon_freq=DEFAULT_MONSOON_FREQUENCY,
111                               mon_offset=0,
112                               bug_report=False,
113                               extra_wait=None,
114                               iperf_duration=None,
115                               pass_fail_tolerance=THRESHOLD_TOLERANCE_DEFAULT,
116                               mon_voltage=PHONE_BATTERY_VOLTAGE_DEFAULT)
117
118        # Setup the must have controllers, phone and monsoon
119        self.dut = self.android_devices[0]
120        self.mon_data_path = os.path.join(self.log_path, 'Monsoon')
121        os.makedirs(self.mon_data_path, exist_ok=True)
122        self.mon = self.monsoons[0]
123        self.mon.set_max_current(8.0)
124        self.mon.set_voltage(self.mon_voltage)
125        self.mon.attach_device(self.dut)
126
127        # Unpack the thresholds file or fail class setup if it can't be found
128        for file in self.custom_files:
129            if 'pass_fail_threshold_' + self.dut.model in file:
130                self.threshold_file = file
131                break
132        else:
133            raise RuntimeError('Required test pass/fail threshold file is '
134                               'missing')
135
136        # Unpack the rockbottom script or fail class setup if it can't be found
137        for file in self.custom_files:
138            if 'rockbottom_' + self.dut.model in file:
139                self.rockbottom_script = file
140                break
141        else:
142            raise RuntimeError('Required rockbottom script is missing.')
143
144        # Unpack optional custom files
145        for file in self.custom_files:
146            if 'attenuator_setting' in file:
147                self.attenuation_file = file
148            elif 'network_config' in file:
149                self.network_file = file
150
151        if hasattr(self, 'attenuators'):
152            self.num_atten = self.attenuators[0].instrument.num_atten
153            self.atten_level = self.unpack_custom_file(self.attenuation_file)
154        self.threshold = self.unpack_custom_file(self.threshold_file)
155        self.mon_info = self.create_monsoon_info()
156
157        # Sync device time, timezone and country code
158        utils.require_sl4a((self.dut, ))
159        utils.sync_device_time(self.dut)
160        wutils.set_wifi_country_code(self.dut, 'US')
161
162        screen_on_img = self.user_params.get('screen_on_img', [])
163        if screen_on_img:
164            img_src = screen_on_img[0]
165            img_dest = '/sdcard/Pictures/'
166            success = self.dut.push_system_file(img_src, img_dest)
167            if success:
168                self.img_name = os.path.basename(img_src)
169
170    def setup_test(self):
171        """Set up test specific parameters or configs.
172
173        """
174        # Reset the power consumption to 0 before each tests
175        self.power_result.metric_value = 0
176        # Set the device into rockbottom state
177        self.dut_rockbottom()
178        wutils.reset_wifi(self.dut)
179        wutils.wifi_toggle_state(self.dut, False)
180
181        # Wait for extra time if needed for the first test
182        if self.extra_wait:
183            self.more_wait_first_test()
184
185    def teardown_test(self):
186        """Tear down necessary objects after test case is finished.
187
188        """
189        self.log.info('Tearing down the test case')
190        self.mon.usb('on')
191        self.power_logger.set_avg_power(self.power_result.metric_value)
192        self.power_logger.set_testbed(self.testbed_name)
193
194        build_id = self.dut.build_info.get('incremental_build_id', '')
195        branch = self.user_params.get('branch', '')
196        target = self.dut.device_info.get('flavor', '')
197
198        self.power_logger.set_branch(branch)
199        self.power_logger.set_build_id(build_id)
200        self.power_logger.set_target(target)
201
202        # Take Bugreport
203        if self.bug_report:
204            begin_time = utils.get_current_epoch_time()
205            self.dut.take_bug_report(self.test_name, begin_time)
206
207        # Allow the device to cooldown before executing the next test
208        cooldown = self.test_params.get('cooldown', None)
209        if cooldown and not self.final_test:
210            time.sleep(cooldown)
211
212    def teardown_class(self):
213        """Clean up the test class after tests finish running
214
215        """
216        self.log.info('Tearing down the test class')
217        if hasattr(self, 'monsoons'):
218            self.monsoons[0].usb('on')
219
220    def dut_rockbottom(self):
221        """Set the dut to rockbottom state
222
223        """
224        # The rockbottom script might include a device reboot, so it is
225        # necessary to stop SL4A during its execution.
226        self.dut.stop_services()
227        self.log.info('Executing rockbottom script for ' + self.dut.model)
228        os.chmod(self.rockbottom_script, 0o777)
229        os.system('{} {} {}'.format(self.rockbottom_script, self.dut.serial,
230                                    self.img_name))
231        # Make sure the DUT is in root mode after coming back
232        self.dut.root_adb()
233        # Restart SL4A
234        self.dut.start_services()
235
236    def unpack_custom_file(self, file, test_specific=True):
237        """Unpack the pass_fail_thresholds from a common file.
238
239        Args:
240            file: the common file containing pass fail threshold.
241            test_specific: if True, returns the JSON element within the file
242                that starts with the test class name.
243        """
244        with open(file, 'r') as f:
245            params = json.load(f)
246        if test_specific:
247            try:
248                return params[self.TAG]
249            except KeyError:
250                pass
251        else:
252            return params
253
254    def decode_test_configs(self, attrs, indices):
255        """Decode the test config/params from test name.
256
257        Remove redundant function calls when tests are similar.
258        Args:
259            attrs: a list of the attrs of the test config obj
260            indices: a list of the location indices of keyword in the test name.
261        """
262        # Decode test parameters for the current test
263        test_params = self.current_test_name.split('_')
264        values = [test_params[x] for x in indices]
265        config_dict = dict(zip(attrs, values))
266        self.test_configs = ObjNew(**config_dict)
267
268    def more_wait_first_test(self):
269        # For the first test, increase the offset for longer wait time
270        if self.current_test_name == self.tests[0]:
271            self.mon_info.offset = self.mon_offset + self.extra_wait
272        else:
273            self.mon_info.offset = self.mon_offset
274
275    def set_attenuation(self, atten_list):
276        """Function to set the attenuator to desired attenuations.
277
278        Args:
279            atten_list: list containing the attenuation for each attenuator.
280        """
281        if len(atten_list) != self.num_atten:
282            raise Exception('List given does not have the correct length')
283        for i in range(self.num_atten):
284            self.attenuators[i].set_atten(atten_list[i])
285
286    def measure_power_and_validate(self):
287        """The actual test flow and result processing and validate.
288
289        """
290        result = self.collect_power_data()
291        self.pass_fail_check(result.average_current)
292
293    def collect_power_data(self):
294        """Measure power, plot and take log if needed.
295
296        Returns:
297            A MonsoonResult object.
298        """
299        # Collecting current measurement data and plot
300        result = self.monsoon_data_collect_save()
301        self.power_result.metric_value = (result.average_current *
302                                          self.mon_voltage)
303        wputils.monsoon_data_plot(self.mon_info, result)
304        return result
305
306    def pass_fail_check(self, average_current=None):
307        """Check the test result and decide if it passed or failed.
308
309        The threshold is provided in the config file. In this class, result is
310        current in mA.
311        """
312
313        if not self.threshold or self.test_name not in self.threshold:
314            self.log.error("No threshold is provided for the test '{}' in "
315                           "the configuration file.".format(self.test_name))
316            return
317
318        current_threshold = self.threshold[self.test_name]
319        if average_current:
320            asserts.assert_true(
321                abs(average_current - current_threshold) / current_threshold <
322                self.pass_fail_tolerance,
323                'Measured average current in [{}]: {:.2f}mA, which is '
324                'out of the acceptable range {:.2f}±{:.2f}mA'.format(
325                    self.test_name, average_current, current_threshold,
326                    self.pass_fail_tolerance * current_threshold))
327            asserts.explicit_pass(
328                'Measurement finished for [{}]: {:.2f}mA, which is '
329                'within the acceptable range {:.2f}±{:.2f}'.format(
330                    self.test_name, average_current, current_threshold,
331                    self.pass_fail_tolerance * current_threshold))
332        else:
333            asserts.fail(
334                'Something happened, measurement is not complete, test failed')
335
336    def create_monsoon_info(self):
337        """Creates the config dictionary for monsoon
338
339        Returns:
340            mon_info: Dictionary with the monsoon packet config
341        """
342        if self.iperf_duration:
343            self.mon_duration = self.iperf_duration - 10
344        mon_info = ObjNew(dut=self.mon,
345                          freq=self.mon_freq,
346                          duration=self.mon_duration,
347                          offset=self.mon_offset,
348                          data_path=self.mon_data_path)
349        return mon_info
350
351    def monsoon_recover(self):
352        """Test loop to wait for monsoon recover from unexpected error.
353
354        Wait for a certain time duration, then quit.0
355        Args:
356            mon: monsoon object
357        Returns:
358            True/False
359        """
360        try:
361            self.mon.reconnect_monsoon()
362            time.sleep(2)
363            self.mon.usb('on')
364            logging.info('Monsoon recovered from unexpected error')
365            time.sleep(2)
366            return True
367        except MonsoonError:
368            try:
369                self.log.info(self.mon_info.dut._mon.ser.in_waiting)
370            except AttributeError:
371                # This attribute does not exist for HVPMs.
372                pass
373            logging.warning('Unable to recover monsoon from unexpected error')
374            return False
375
376    def monsoon_data_collect_save(self):
377        """Current measurement and save the log file.
378
379        Collect current data using Monsoon box and return the path of the
380        log file. Take bug report if requested.
381
382        Returns:
383            A MonsoonResult object containing information about the gathered
384            data.
385        """
386
387        tag = '{}_{}_{}'.format(self.test_name, self.dut.model,
388                                self.dut.build_info['build_id'])
389
390        data_path = os.path.join(self.mon_info.data_path, '{}.txt'.format(tag))
391
392        # If the specified Monsoon data file already exists (e.g., multiple
393        # measurements in a single test), write the results to a new file with
394        # the postfix "_#".
395        if os.path.exists(data_path):
396            highest_value = 1
397            for filename in os.listdir(os.path.dirname(data_path)):
398                match = re.match(r'{}_(\d+).txt'.format(tag), filename)
399                if match:
400                    highest_value = int(match.group(1))
401
402            data_path = os.path.join(self.mon_info.data_path,
403                                     '%s_%s.txt' % (tag, highest_value + 1))
404
405        total_expected_samples = self.mon_info.freq * self.mon_info.duration
406        min_required_samples = (total_expected_samples * MIN_PERCENT_SAMPLE /
407                                100)
408        for retry_measure in range(1, MEASUREMENT_RETRY_COUNT + 1):
409            # Resets the battery status right before the test starts.
410            self.dut.adb.shell(RESET_BATTERY_STATS)
411            self.log.info('Starting power measurement, attempt #{}.'.format(
412                retry_measure))
413            # Start the power measurement using monsoon.
414            self.mon_info.dut.usb(PassthroughStates.AUTO)
415            result = self.mon_info.dut.measure_power(
416                self.mon_info.duration,
417                measure_after_seconds=self.mon_info.offset,
418                hz=self.mon_info.freq,
419                output_path=data_path)
420            self.mon_info.dut.usb(PassthroughStates.ON)
421
422            self.log.debug(result)
423            self.log.debug('Samples Gathered: %s. Max Samples: %s '
424                           'Min Samples Required: %s.' %
425                           (result.num_samples, total_expected_samples,
426                            min_required_samples))
427
428            if result.num_samples <= min_required_samples:
429                retry_measure += 1
430                self.log.warning(
431                    'More than {} percent of samples are missing due to '
432                    'dropped packets. Need to remeasure.'.format(
433                        100 - MIN_PERCENT_SAMPLE))
434                continue
435
436            self.log.info('Measurement successful after {} attempt(s).'.format(
437                retry_measure))
438            return result
439        else:
440            try:
441                self.log.info(self.mon_info.dut._mon.ser.in_waiting)
442            except AttributeError:
443                # This attribute does not exist for HVPMs.
444                pass
445            self.log.error(
446                'Unable to gather enough samples to run validation.')
447
448    def process_iperf_results(self):
449        """Get the iperf results and process.
450
451        Returns:
452             throughput: the average throughput during tests.
453        """
454        # Get IPERF results and add this to the plot title
455        RESULTS_DESTINATION = os.path.join(
456            self.iperf_server.log_path,
457            'iperf_client_output_{}.log'.format(self.current_test_name))
458        self.dut.pull_files(TEMP_FILE, RESULTS_DESTINATION)
459        # Calculate the average throughput
460        if self.use_client_output:
461            iperf_file = RESULTS_DESTINATION
462        else:
463            iperf_file = self.iperf_server.log_files[-1]
464        try:
465            iperf_result = ipf.IPerfResult(iperf_file)
466
467            # Compute the throughput in Mbit/s
468            throughput = (math.fsum(
469                iperf_result.instantaneous_rates[self.start_meas_time:-1]
470            ) / len(iperf_result.instantaneous_rates[self.start_meas_time:-1])
471                          ) * 8 * (1.024**2)
472
473            self.log.info('The average throughput is {}'.format(throughput))
474        except ValueError:
475            self.log.warning('Cannot get iperf result. Setting to 0')
476            throughput = 0
477        return throughput
478