1#!/usr/bin/env python3
2#
3#   Copyright 2018 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the 'License');
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an 'AS IS' BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16import json
17import logging
18import math
19import os
20import re
21import time
22
23import acts.controllers.power_monitor as power_monitor_lib
24import acts.controllers.iperf_server as ipf
25from acts import asserts
26from acts import base_test
27from acts import utils
28from acts.metrics.loggers.blackbox import BlackboxMetricLogger
29from acts_contrib.test_utils.power.loggers.power_metric_logger import PowerMetricLogger
30from acts_contrib.test_utils.power import plot_utils
31from acts_contrib.test_utils.wifi import wifi_test_utils as wutils
32
33RESET_BATTERY_STATS = 'dumpsys batterystats --reset'
34IPERF_TIMEOUT = 180
35THRESHOLD_TOLERANCE_DEFAULT = 0.2
36GET_FROM_PHONE = 'get_from_dut'
37GET_FROM_AP = 'get_from_ap'
38PHONE_BATTERY_VOLTAGE_DEFAULT = 4.2
39MONSOON_MAX_CURRENT = 8.0
40DEFAULT_MONSOON_FREQUENCY = 500
41ENABLED_MODULATED_DTIM = 'gEnableModulatedDTIM='
42MAX_MODULATED_DTIM = 'gMaxLIModulatedDTIM='
43TEMP_FILE = '/sdcard/Download/tmp.log'
44
45
46class ObjNew(object):
47    """Create a random obj with unknown attributes and value.
48
49    """
50    def __init__(self, **kwargs):
51        self.__dict__.update(kwargs)
52
53    def __contains__(self, item):
54        """Function to check if one attribute is contained in the object.
55
56        Args:
57            item: the item to check
58        Return:
59            True/False
60        """
61        return hasattr(self, item)
62
63
64class PowerBaseTest(base_test.BaseTestClass):
65    """Base class for all wireless power related tests.
66
67    """
68    def __init__(self, controllers):
69
70        super().__init__(controllers)
71        self.power_result = BlackboxMetricLogger.for_test_case(
72            metric_name='avg_power')
73        self.start_meas_time = 0
74        self.rockbottom_script = None
75        self.img_name = ''
76        self.dut = None
77        self.power_logger = PowerMetricLogger.for_test_case()
78        self.power_monitor = None
79
80    @property
81    def final_test(self):
82        return len(
83            self.results.requested
84        ) > 0 and self.current_test_name == self.results.requested[-1]
85
86    @property
87    def display_name_test_suite(self):
88        return getattr(self, '_display_name_test_suite',
89                       self.__class__.__name__)
90
91    @display_name_test_suite.setter
92    def display_name_test_suite(self, name):
93        self._display_name_test_suite = name
94
95    @property
96    def display_name_test_case(self):
97        default_test_name = getattr(self, 'test_name', None)
98        return getattr(self, '_display_name_test_case', default_test_name)
99
100    @display_name_test_case.setter
101    def display_name_test_case(self, name):
102        self._display_name_test_case = name
103
104    def initialize_power_monitor(self):
105        """ Initializes the power monitor object.
106
107        Raises an exception if there are no controllers available.
108        """
109        if hasattr(self, 'monsoons'):
110            self.power_monitor = power_monitor_lib.PowerMonitorMonsoonFacade(
111                self.monsoons[0])
112            self.monsoons[0].set_max_current(8.0)
113            self.monsoons[0].set_voltage(self.mon_voltage)
114        else:
115            raise RuntimeError('No power monitors available.')
116
117    def setup_class(self):
118
119        super().setup_class()
120
121        self.log = logging.getLogger()
122        self.tests = self.get_existing_test_names()
123
124        # Obtain test parameters from user_params
125        TEST_PARAMS = self.TAG + '_params'
126        self.test_params = self.user_params.get(TEST_PARAMS, {})
127        if not self.test_params:
128            self.log.warning(TEST_PARAMS + ' was not found in the user '
129                             'parameters defined in the config file.')
130
131        # Override user_param values with test parameters
132        self.user_params.update(self.test_params)
133
134        # Unpack user_params with default values. All the usages of user_params
135        # as self attributes need to be included either as a required parameter
136        # or as a parameter with a default value.
137        req_params = ['custom_files', 'mon_duration']
138        self.unpack_userparams(req_params,
139                               mon_freq=DEFAULT_MONSOON_FREQUENCY,
140                               mon_offset=0,
141                               bug_report=False,
142                               extra_wait=None,
143                               iperf_duration=None,
144                               pass_fail_tolerance=THRESHOLD_TOLERANCE_DEFAULT,
145                               mon_voltage=PHONE_BATTERY_VOLTAGE_DEFAULT)
146
147        # Setup the must have controllers, phone and monsoon
148        self.dut = self.android_devices[0]
149        self.mon_data_path = os.path.join(self.log_path, 'Monsoon')
150        os.makedirs(self.mon_data_path, exist_ok=True)
151
152        # Initialize the power monitor object that will be used to measure
153        self.initialize_power_monitor()
154
155        # Unpack the thresholds file or fail class setup if it can't be found
156        for file in self.custom_files:
157            if 'pass_fail_threshold_' + self.dut.model in file:
158                self.threshold_file = file
159                break
160        else:
161            raise RuntimeError('Required test pass/fail threshold file is '
162                               'missing')
163
164        # Unpack the rockbottom script or fail class setup if it can't be found
165        for file in self.custom_files:
166            if 'rockbottom_' + self.dut.model in file:
167                self.rockbottom_script = file
168                break
169        else:
170            raise RuntimeError('Required rockbottom script is missing.')
171
172        # Unpack optional custom files
173        for file in self.custom_files:
174            if 'attenuator_setting' in file:
175                self.attenuation_file = file
176            elif 'network_config' in file:
177                self.network_file = file
178
179        if hasattr(self, 'attenuators'):
180            self.num_atten = self.attenuators[0].instrument.num_atten
181            self.atten_level = self.unpack_custom_file(self.attenuation_file)
182        self.threshold = self.unpack_custom_file(self.threshold_file)
183        self.mon_info = self.create_monsoon_info()
184
185        # Sync device time, timezone and country code
186        utils.require_sl4a((self.dut, ))
187        utils.sync_device_time(self.dut)
188        wutils.set_wifi_country_code(self.dut, 'US')
189
190        screen_on_img = self.user_params.get('screen_on_img', [])
191        if screen_on_img:
192            img_src = screen_on_img[0]
193            img_dest = '/sdcard/Pictures/'
194            success = self.dut.push_system_file(img_src, img_dest)
195            if success:
196                self.img_name = os.path.basename(img_src)
197
198    def setup_test(self):
199        """Set up test specific parameters or configs.
200
201        """
202        super().setup_test()
203
204        # Reset result variables
205        self.avg_current = 0
206        self.samples = []
207        self.power_result.metric_value = 0
208
209        # Set the device into rockbottom state
210        self.dut_rockbottom()
211        wutils.reset_wifi(self.dut)
212        wutils.wifi_toggle_state(self.dut, False)
213
214        # Wait for extra time if needed for the first test
215        if self.extra_wait:
216            self.more_wait_first_test()
217
218    def teardown_test(self):
219        """Tear down necessary objects after test case is finished.
220
221        """
222        self.log.info('Tearing down the test case')
223        self.power_monitor.connect_usb()
224        self.power_logger.set_avg_power(self.power_result.metric_value)
225        self.power_logger.set_avg_current(self.avg_current)
226        self.power_logger.set_voltage(self.mon_voltage)
227        self.power_logger.set_testbed(self.testbed_name)
228
229        # If a threshold was provided, log it in the power proto
230        if self.threshold and self.test_name in self.threshold:
231            avg_current_threshold = self.threshold[self.test_name]
232            self.power_logger.set_avg_current_threshold(avg_current_threshold)
233
234        build_id = self.dut.build_info.get('build_id', '')
235        incr_build_id = self.dut.build_info.get('incremental_build_id', '')
236        branch = self.user_params.get('branch', '')
237        target = self.dut.device_info.get('flavor', '')
238
239        self.power_logger.set_branch(branch)
240        self.power_logger.set_build_id(build_id)
241        self.power_logger.set_incremental_build_id(incr_build_id)
242        self.power_logger.set_target(target)
243
244        # Log the display name of the test suite and test case
245        if self.display_name_test_suite:
246            name = self.display_name_test_suite
247            self.power_logger.set_test_suite_display_name(name)
248
249        if self.display_name_test_case:
250            name = self.display_name_test_case
251            self.power_logger.set_test_case_display_name(name)
252
253        # Take Bugreport
254        if self.bug_report:
255            begin_time = utils.get_current_epoch_time()
256            self.dut.take_bug_report(self.test_name, begin_time)
257
258        # Allow the device to cooldown before executing the next test
259        cooldown = self.test_params.get('cooldown', None)
260        if cooldown and not self.final_test:
261            time.sleep(cooldown)
262
263    def teardown_class(self):
264        """Clean up the test class after tests finish running
265
266        """
267        self.log.info('Tearing down the test class')
268        if self.power_monitor:
269            self.power_monitor.connect_usb()
270
271    def on_fail(self, test_name, begin_time):
272        self.power_logger.set_pass_fail_status('FAIL')
273
274    def on_pass(self, test_name, begin_time):
275        self.power_logger.set_pass_fail_status('PASS')
276
277    def dut_rockbottom(self):
278        """Set the dut to rockbottom state
279
280        """
281        # The rockbottom script might include a device reboot, so it is
282        # necessary to stop SL4A during its execution.
283        self.dut.stop_services()
284        self.log.info('Executing rockbottom script for ' + self.dut.model)
285        os.chmod(self.rockbottom_script, 0o777)
286        os.system('{} {} {}'.format(self.rockbottom_script, self.dut.serial,
287                                    self.img_name))
288        # Make sure the DUT is in root mode after coming back
289        self.dut.root_adb()
290        # Restart SL4A
291        self.dut.start_services()
292
293    def unpack_custom_file(self, file, test_specific=True):
294        """Unpack the pass_fail_thresholds from a common file.
295
296        Args:
297            file: the common file containing pass fail threshold.
298            test_specific: if True, returns the JSON element within the file
299                that starts with the test class name.
300        """
301        with open(file, 'r') as f:
302            params = json.load(f)
303        if test_specific:
304            try:
305                return params[self.TAG]
306            except KeyError:
307                pass
308        else:
309            return params
310
311    def decode_test_configs(self, attrs, indices):
312        """Decode the test config/params from test name.
313
314        Remove redundant function calls when tests are similar.
315        Args:
316            attrs: a list of the attrs of the test config obj
317            indices: a list of the location indices of keyword in the test name.
318        """
319        # Decode test parameters for the current test
320        test_params = self.current_test_name.split('_')
321        values = [test_params[x] for x in indices]
322        config_dict = dict(zip(attrs, values))
323        self.test_configs = ObjNew(**config_dict)
324
325    def more_wait_first_test(self):
326        # For the first test, increase the offset for longer wait time
327        if self.current_test_name == self.tests[0]:
328            self.mon_info.offset = self.mon_offset + self.extra_wait
329        else:
330            self.mon_info.offset = self.mon_offset
331
332    def set_attenuation(self, atten_list):
333        """Function to set the attenuator to desired attenuations.
334
335        Args:
336            atten_list: list containing the attenuation for each attenuator.
337        """
338        if len(atten_list) != self.num_atten:
339            raise Exception('List given does not have the correct length')
340        for i in range(self.num_atten):
341            self.attenuators[i].set_atten(atten_list[i])
342
343    def measure_power_and_validate(self):
344        """The actual test flow and result processing and validate.
345
346        """
347        self.collect_power_data()
348        self.pass_fail_check(self.avg_current)
349
350    def collect_power_data(self):
351        """Measure power, plot and take log if needed.
352
353        Returns:
354            A MonsoonResult object.
355        """
356        # Collecting current measurement data and plot
357        samples = self.power_monitor_data_collect_save()
358
359        current = [sample[1] for sample in samples]
360        average_current = sum(current) * 1000 / len(current)
361
362        self.power_result.metric_value = (average_current * self.mon_voltage)
363        self.avg_current = average_current
364
365        plot_title = '{}_{}_{}'.format(self.test_name, self.dut.model,
366                                       self.dut.build_info['build_id'])
367        plot_utils.current_waveform_plot(samples, self.mon_voltage,
368                                         self.mon_info.data_path, plot_title)
369
370        return samples
371
372    def pass_fail_check(self, average_current=None):
373        """Check the test result and decide if it passed or failed.
374
375        The threshold is provided in the config file. In this class, result is
376        current in mA.
377        """
378
379        if not self.threshold or self.test_name not in self.threshold:
380            self.log.error("No threshold is provided for the test '{}' in "
381                           "the configuration file.".format(self.test_name))
382            return
383
384        current_threshold = self.threshold[self.test_name]
385        if average_current:
386            asserts.assert_true(
387                abs(average_current - current_threshold) / current_threshold <
388                self.pass_fail_tolerance,
389                'Measured average current in [{}]: {:.2f}mA, which is '
390                'out of the acceptable range {:.2f}±{:.2f}mA'.format(
391                    self.test_name, average_current, current_threshold,
392                    self.pass_fail_tolerance * current_threshold))
393            asserts.explicit_pass(
394                'Measurement finished for [{}]: {:.2f}mA, which is '
395                'within the acceptable range {:.2f}±{:.2f}'.format(
396                    self.test_name, average_current, current_threshold,
397                    self.pass_fail_tolerance * current_threshold))
398        else:
399            asserts.fail(
400                'Something happened, measurement is not complete, test failed')
401
402    def create_monsoon_info(self):
403        """Creates the config dictionary for monsoon
404
405        Returns:
406            mon_info: Dictionary with the monsoon packet config
407        """
408        mon_info = ObjNew(freq=self.mon_freq,
409                          duration=self.mon_duration,
410                          offset=self.mon_offset,
411                          data_path=self.mon_data_path)
412        return mon_info
413
414    def power_monitor_data_collect_save(self):
415        """Current measurement and save the log file.
416
417        Collect current data using Monsoon box and return the path of the
418        log file. Take bug report if requested.
419
420        Returns:
421            A list of tuples in which the first element is a timestamp and the
422            second element is the sampled current in Amperes at that time.
423        """
424
425        tag = '{}_{}_{}'.format(self.test_name, self.dut.model,
426                                self.dut.build_info['build_id'])
427
428        data_path = os.path.join(self.mon_info.data_path, '{}.txt'.format(tag))
429
430        # If the specified Monsoon data file already exists (e.g., multiple
431        # measurements in a single test), write the results to a new file with
432        # the postfix "_#".
433        if os.path.exists(data_path):
434            highest_value = 1
435            for filename in os.listdir(os.path.dirname(data_path)):
436                match = re.match(r'{}_(\d+).txt'.format(tag), filename)
437                if match:
438                    highest_value = max(highest_value, int(match.group(1)))
439
440            data_path = os.path.join(self.mon_info.data_path,
441                                     '%s_%s.txt' % (tag, highest_value + 1))
442
443        # Resets the battery status right before the test starts.
444        self.dut.adb.shell(RESET_BATTERY_STATS)
445        self.log.info('Starting power measurement. Duration: {}s. Offset: '
446                      '{}s. Voltage: {} V.'.format(self.mon_info.duration,
447                                                   self.mon_info.offset,
448                                                   self.mon_voltage))
449
450        # TODO(b/155426729): Create an accurate host-to-device time difference
451        # measurement.
452        device_time_cmd = 'echo $EPOCHREALTIME'
453        device_time = self.dut.adb.shell(device_time_cmd)
454        host_time = time.time()
455        self.log.debug('device start time %s, host start time %s', device_time,
456                       host_time)
457        device_to_host_offset = float(device_time) - host_time
458
459        # Start the power measurement using monsoon.
460        self.dut.stop_services()
461        time.sleep(1)
462        self.power_monitor.disconnect_usb()
463        measurement_args = dict(duration=self.mon_info.duration,
464                                measure_after_seconds=self.mon_info.offset,
465                                hz=self.mon_info.freq)
466        self.power_monitor.measure(measurement_args=measurement_args,
467                                   start_time=device_to_host_offset,
468                                   monsoon_output_path=data_path)
469        self.power_monitor.release_resources()
470        self.power_monitor.connect_usb()
471        self.dut.wait_for_boot_completion()
472        time.sleep(10)
473        self.dut.start_services()
474
475        return self.power_monitor.get_waveform(file_path=data_path)
476
477    def process_iperf_results(self):
478        """Get the iperf results and process.
479
480        Returns:
481             throughput: the average throughput during tests.
482        """
483        # Get IPERF results and add this to the plot title
484        RESULTS_DESTINATION = os.path.join(
485            self.iperf_server.log_path,
486            'iperf_client_output_{}.log'.format(self.current_test_name))
487        self.dut.pull_files(TEMP_FILE, RESULTS_DESTINATION)
488        # Calculate the average throughput
489        if self.use_client_output:
490            iperf_file = RESULTS_DESTINATION
491        else:
492            iperf_file = self.iperf_server.log_files[-1]
493        try:
494            iperf_result = ipf.IPerfResult(iperf_file)
495
496            # Compute the throughput in Mbit/s
497            throughput = (math.fsum(
498                iperf_result.instantaneous_rates[self.start_meas_time:-1]
499            ) / len(iperf_result.instantaneous_rates[self.start_meas_time:-1])
500                          ) * 8 * (1.024**2)
501
502            self.log.info('The average throughput is {}'.format(throughput))
503        except ValueError:
504            self.log.warning('Cannot get iperf result. Setting to 0')
505            throughput = 0
506        return throughput
507