1#!/usr/bin/env python3 2# 3# Copyright 2018 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the 'License'); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an 'AS IS' BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16import json 17import logging 18import math 19import os 20import re 21import time 22 23import acts.controllers.power_monitor as power_monitor_lib 24import acts.controllers.iperf_server as ipf 25from acts import asserts 26from acts import base_test 27from acts import utils 28from acts.metrics.loggers.blackbox import BlackboxMetricLogger 29from acts_contrib.test_utils.power.loggers.power_metric_logger import PowerMetricLogger 30from acts_contrib.test_utils.power import plot_utils 31from acts_contrib.test_utils.wifi import wifi_test_utils as wutils 32 33RESET_BATTERY_STATS = 'dumpsys batterystats --reset' 34IPERF_TIMEOUT = 180 35THRESHOLD_TOLERANCE_DEFAULT = 0.2 36GET_FROM_PHONE = 'get_from_dut' 37GET_FROM_AP = 'get_from_ap' 38PHONE_BATTERY_VOLTAGE_DEFAULT = 4.2 39MONSOON_MAX_CURRENT = 8.0 40DEFAULT_MONSOON_FREQUENCY = 500 41ENABLED_MODULATED_DTIM = 'gEnableModulatedDTIM=' 42MAX_MODULATED_DTIM = 'gMaxLIModulatedDTIM=' 43TEMP_FILE = '/sdcard/Download/tmp.log' 44 45 46class ObjNew(object): 47 """Create a random obj with unknown attributes and value. 48 49 """ 50 def __init__(self, **kwargs): 51 self.__dict__.update(kwargs) 52 53 def __contains__(self, item): 54 """Function to check if one attribute is contained in the object. 55 56 Args: 57 item: the item to check 58 Return: 59 True/False 60 """ 61 return hasattr(self, item) 62 63 64class PowerBaseTest(base_test.BaseTestClass): 65 """Base class for all wireless power related tests. 66 67 """ 68 def __init__(self, controllers): 69 70 super().__init__(controllers) 71 self.power_result = BlackboxMetricLogger.for_test_case( 72 metric_name='avg_power') 73 self.start_meas_time = 0 74 self.rockbottom_script = None 75 self.img_name = '' 76 self.dut = None 77 self.power_logger = PowerMetricLogger.for_test_case() 78 self.power_monitor = None 79 80 @property 81 def final_test(self): 82 return len( 83 self.results.requested 84 ) > 0 and self.current_test_name == self.results.requested[-1] 85 86 @property 87 def display_name_test_suite(self): 88 return getattr(self, '_display_name_test_suite', 89 self.__class__.__name__) 90 91 @display_name_test_suite.setter 92 def display_name_test_suite(self, name): 93 self._display_name_test_suite = name 94 95 @property 96 def display_name_test_case(self): 97 default_test_name = getattr(self, 'test_name', None) 98 return getattr(self, '_display_name_test_case', default_test_name) 99 100 @display_name_test_case.setter 101 def display_name_test_case(self, name): 102 self._display_name_test_case = name 103 104 def initialize_power_monitor(self): 105 """ Initializes the power monitor object. 106 107 Raises an exception if there are no controllers available. 108 """ 109 if hasattr(self, 'monsoons'): 110 self.power_monitor = power_monitor_lib.PowerMonitorMonsoonFacade( 111 self.monsoons[0]) 112 self.monsoons[0].set_max_current(8.0) 113 self.monsoons[0].set_voltage(self.mon_voltage) 114 else: 115 raise RuntimeError('No power monitors available.') 116 117 def setup_class(self): 118 119 super().setup_class() 120 121 self.log = logging.getLogger() 122 self.tests = self.get_existing_test_names() 123 124 # Obtain test parameters from user_params 125 TEST_PARAMS = self.TAG + '_params' 126 self.test_params = self.user_params.get(TEST_PARAMS, {}) 127 if not self.test_params: 128 self.log.warning(TEST_PARAMS + ' was not found in the user ' 129 'parameters defined in the config file.') 130 131 # Override user_param values with test parameters 132 self.user_params.update(self.test_params) 133 134 # Unpack user_params with default values. All the usages of user_params 135 # as self attributes need to be included either as a required parameter 136 # or as a parameter with a default value. 137 req_params = ['custom_files', 'mon_duration'] 138 self.unpack_userparams(req_params, 139 mon_freq=DEFAULT_MONSOON_FREQUENCY, 140 mon_offset=0, 141 bug_report=False, 142 extra_wait=None, 143 iperf_duration=None, 144 pass_fail_tolerance=THRESHOLD_TOLERANCE_DEFAULT, 145 mon_voltage=PHONE_BATTERY_VOLTAGE_DEFAULT) 146 147 # Setup the must have controllers, phone and monsoon 148 self.dut = self.android_devices[0] 149 self.mon_data_path = os.path.join(self.log_path, 'Monsoon') 150 os.makedirs(self.mon_data_path, exist_ok=True) 151 152 # Initialize the power monitor object that will be used to measure 153 self.initialize_power_monitor() 154 155 # Unpack the thresholds file or fail class setup if it can't be found 156 for file in self.custom_files: 157 if 'pass_fail_threshold_' + self.dut.model in file: 158 self.threshold_file = file 159 break 160 else: 161 raise RuntimeError('Required test pass/fail threshold file is ' 162 'missing') 163 164 # Unpack the rockbottom script or fail class setup if it can't be found 165 for file in self.custom_files: 166 if 'rockbottom_' + self.dut.model in file: 167 self.rockbottom_script = file 168 break 169 else: 170 raise RuntimeError('Required rockbottom script is missing.') 171 172 # Unpack optional custom files 173 for file in self.custom_files: 174 if 'attenuator_setting' in file: 175 self.attenuation_file = file 176 elif 'network_config' in file: 177 self.network_file = file 178 179 if hasattr(self, 'attenuators'): 180 self.num_atten = self.attenuators[0].instrument.num_atten 181 self.atten_level = self.unpack_custom_file(self.attenuation_file) 182 self.threshold = self.unpack_custom_file(self.threshold_file) 183 self.mon_info = self.create_monsoon_info() 184 185 # Sync device time, timezone and country code 186 utils.require_sl4a((self.dut, )) 187 utils.sync_device_time(self.dut) 188 wutils.set_wifi_country_code(self.dut, 'US') 189 190 screen_on_img = self.user_params.get('screen_on_img', []) 191 if screen_on_img: 192 img_src = screen_on_img[0] 193 img_dest = '/sdcard/Pictures/' 194 success = self.dut.push_system_file(img_src, img_dest) 195 if success: 196 self.img_name = os.path.basename(img_src) 197 198 def setup_test(self): 199 """Set up test specific parameters or configs. 200 201 """ 202 super().setup_test() 203 204 # Reset result variables 205 self.avg_current = 0 206 self.samples = [] 207 self.power_result.metric_value = 0 208 209 # Set the device into rockbottom state 210 self.dut_rockbottom() 211 wutils.reset_wifi(self.dut) 212 wutils.wifi_toggle_state(self.dut, False) 213 214 # Wait for extra time if needed for the first test 215 if self.extra_wait: 216 self.more_wait_first_test() 217 218 def teardown_test(self): 219 """Tear down necessary objects after test case is finished. 220 221 """ 222 self.log.info('Tearing down the test case') 223 self.power_monitor.connect_usb() 224 self.power_logger.set_avg_power(self.power_result.metric_value) 225 self.power_logger.set_avg_current(self.avg_current) 226 self.power_logger.set_voltage(self.mon_voltage) 227 self.power_logger.set_testbed(self.testbed_name) 228 229 # If a threshold was provided, log it in the power proto 230 if self.threshold and self.test_name in self.threshold: 231 avg_current_threshold = self.threshold[self.test_name] 232 self.power_logger.set_avg_current_threshold(avg_current_threshold) 233 234 build_id = self.dut.build_info.get('build_id', '') 235 incr_build_id = self.dut.build_info.get('incremental_build_id', '') 236 branch = self.user_params.get('branch', '') 237 target = self.dut.device_info.get('flavor', '') 238 239 self.power_logger.set_branch(branch) 240 self.power_logger.set_build_id(build_id) 241 self.power_logger.set_incremental_build_id(incr_build_id) 242 self.power_logger.set_target(target) 243 244 # Log the display name of the test suite and test case 245 if self.display_name_test_suite: 246 name = self.display_name_test_suite 247 self.power_logger.set_test_suite_display_name(name) 248 249 if self.display_name_test_case: 250 name = self.display_name_test_case 251 self.power_logger.set_test_case_display_name(name) 252 253 # Take Bugreport 254 if self.bug_report: 255 begin_time = utils.get_current_epoch_time() 256 self.dut.take_bug_report(self.test_name, begin_time) 257 258 # Allow the device to cooldown before executing the next test 259 cooldown = self.test_params.get('cooldown', None) 260 if cooldown and not self.final_test: 261 time.sleep(cooldown) 262 263 def teardown_class(self): 264 """Clean up the test class after tests finish running 265 266 """ 267 self.log.info('Tearing down the test class') 268 if self.power_monitor: 269 self.power_monitor.connect_usb() 270 271 def on_fail(self, test_name, begin_time): 272 self.power_logger.set_pass_fail_status('FAIL') 273 274 def on_pass(self, test_name, begin_time): 275 self.power_logger.set_pass_fail_status('PASS') 276 277 def dut_rockbottom(self): 278 """Set the dut to rockbottom state 279 280 """ 281 # The rockbottom script might include a device reboot, so it is 282 # necessary to stop SL4A during its execution. 283 self.dut.stop_services() 284 self.log.info('Executing rockbottom script for ' + self.dut.model) 285 os.chmod(self.rockbottom_script, 0o777) 286 os.system('{} {} {}'.format(self.rockbottom_script, self.dut.serial, 287 self.img_name)) 288 # Make sure the DUT is in root mode after coming back 289 self.dut.root_adb() 290 # Restart SL4A 291 self.dut.start_services() 292 293 def unpack_custom_file(self, file, test_specific=True): 294 """Unpack the pass_fail_thresholds from a common file. 295 296 Args: 297 file: the common file containing pass fail threshold. 298 test_specific: if True, returns the JSON element within the file 299 that starts with the test class name. 300 """ 301 with open(file, 'r') as f: 302 params = json.load(f) 303 if test_specific: 304 try: 305 return params[self.TAG] 306 except KeyError: 307 pass 308 else: 309 return params 310 311 def decode_test_configs(self, attrs, indices): 312 """Decode the test config/params from test name. 313 314 Remove redundant function calls when tests are similar. 315 Args: 316 attrs: a list of the attrs of the test config obj 317 indices: a list of the location indices of keyword in the test name. 318 """ 319 # Decode test parameters for the current test 320 test_params = self.current_test_name.split('_') 321 values = [test_params[x] for x in indices] 322 config_dict = dict(zip(attrs, values)) 323 self.test_configs = ObjNew(**config_dict) 324 325 def more_wait_first_test(self): 326 # For the first test, increase the offset for longer wait time 327 if self.current_test_name == self.tests[0]: 328 self.mon_info.offset = self.mon_offset + self.extra_wait 329 else: 330 self.mon_info.offset = self.mon_offset 331 332 def set_attenuation(self, atten_list): 333 """Function to set the attenuator to desired attenuations. 334 335 Args: 336 atten_list: list containing the attenuation for each attenuator. 337 """ 338 if len(atten_list) != self.num_atten: 339 raise Exception('List given does not have the correct length') 340 for i in range(self.num_atten): 341 self.attenuators[i].set_atten(atten_list[i]) 342 343 def measure_power_and_validate(self): 344 """The actual test flow and result processing and validate. 345 346 """ 347 self.collect_power_data() 348 self.pass_fail_check(self.avg_current) 349 350 def collect_power_data(self): 351 """Measure power, plot and take log if needed. 352 353 Returns: 354 A MonsoonResult object. 355 """ 356 # Collecting current measurement data and plot 357 samples = self.power_monitor_data_collect_save() 358 359 current = [sample[1] for sample in samples] 360 average_current = sum(current) * 1000 / len(current) 361 362 self.power_result.metric_value = (average_current * self.mon_voltage) 363 self.avg_current = average_current 364 365 plot_title = '{}_{}_{}'.format(self.test_name, self.dut.model, 366 self.dut.build_info['build_id']) 367 plot_utils.current_waveform_plot(samples, self.mon_voltage, 368 self.mon_info.data_path, plot_title) 369 370 return samples 371 372 def pass_fail_check(self, average_current=None): 373 """Check the test result and decide if it passed or failed. 374 375 The threshold is provided in the config file. In this class, result is 376 current in mA. 377 """ 378 379 if not self.threshold or self.test_name not in self.threshold: 380 self.log.error("No threshold is provided for the test '{}' in " 381 "the configuration file.".format(self.test_name)) 382 return 383 384 current_threshold = self.threshold[self.test_name] 385 if average_current: 386 asserts.assert_true( 387 abs(average_current - current_threshold) / current_threshold < 388 self.pass_fail_tolerance, 389 'Measured average current in [{}]: {:.2f}mA, which is ' 390 'out of the acceptable range {:.2f}±{:.2f}mA'.format( 391 self.test_name, average_current, current_threshold, 392 self.pass_fail_tolerance * current_threshold)) 393 asserts.explicit_pass( 394 'Measurement finished for [{}]: {:.2f}mA, which is ' 395 'within the acceptable range {:.2f}±{:.2f}'.format( 396 self.test_name, average_current, current_threshold, 397 self.pass_fail_tolerance * current_threshold)) 398 else: 399 asserts.fail( 400 'Something happened, measurement is not complete, test failed') 401 402 def create_monsoon_info(self): 403 """Creates the config dictionary for monsoon 404 405 Returns: 406 mon_info: Dictionary with the monsoon packet config 407 """ 408 mon_info = ObjNew(freq=self.mon_freq, 409 duration=self.mon_duration, 410 offset=self.mon_offset, 411 data_path=self.mon_data_path) 412 return mon_info 413 414 def power_monitor_data_collect_save(self): 415 """Current measurement and save the log file. 416 417 Collect current data using Monsoon box and return the path of the 418 log file. Take bug report if requested. 419 420 Returns: 421 A list of tuples in which the first element is a timestamp and the 422 second element is the sampled current in Amperes at that time. 423 """ 424 425 tag = '{}_{}_{}'.format(self.test_name, self.dut.model, 426 self.dut.build_info['build_id']) 427 428 data_path = os.path.join(self.mon_info.data_path, '{}.txt'.format(tag)) 429 430 # If the specified Monsoon data file already exists (e.g., multiple 431 # measurements in a single test), write the results to a new file with 432 # the postfix "_#". 433 if os.path.exists(data_path): 434 highest_value = 1 435 for filename in os.listdir(os.path.dirname(data_path)): 436 match = re.match(r'{}_(\d+).txt'.format(tag), filename) 437 if match: 438 highest_value = max(highest_value, int(match.group(1))) 439 440 data_path = os.path.join(self.mon_info.data_path, 441 '%s_%s.txt' % (tag, highest_value + 1)) 442 443 # Resets the battery status right before the test starts. 444 self.dut.adb.shell(RESET_BATTERY_STATS) 445 self.log.info('Starting power measurement. Duration: {}s. Offset: ' 446 '{}s. Voltage: {} V.'.format(self.mon_info.duration, 447 self.mon_info.offset, 448 self.mon_voltage)) 449 450 # TODO(b/155426729): Create an accurate host-to-device time difference 451 # measurement. 452 device_time_cmd = 'echo $EPOCHREALTIME' 453 device_time = self.dut.adb.shell(device_time_cmd) 454 host_time = time.time() 455 self.log.debug('device start time %s, host start time %s', device_time, 456 host_time) 457 device_to_host_offset = float(device_time) - host_time 458 459 # Start the power measurement using monsoon. 460 self.dut.stop_services() 461 time.sleep(1) 462 self.power_monitor.disconnect_usb() 463 measurement_args = dict(duration=self.mon_info.duration, 464 measure_after_seconds=self.mon_info.offset, 465 hz=self.mon_info.freq) 466 self.power_monitor.measure(measurement_args=measurement_args, 467 start_time=device_to_host_offset, 468 monsoon_output_path=data_path) 469 self.power_monitor.release_resources() 470 self.power_monitor.connect_usb() 471 self.dut.wait_for_boot_completion() 472 time.sleep(10) 473 self.dut.start_services() 474 475 return self.power_monitor.get_waveform(file_path=data_path) 476 477 def process_iperf_results(self): 478 """Get the iperf results and process. 479 480 Returns: 481 throughput: the average throughput during tests. 482 """ 483 # Get IPERF results and add this to the plot title 484 RESULTS_DESTINATION = os.path.join( 485 self.iperf_server.log_path, 486 'iperf_client_output_{}.log'.format(self.current_test_name)) 487 self.dut.pull_files(TEMP_FILE, RESULTS_DESTINATION) 488 # Calculate the average throughput 489 if self.use_client_output: 490 iperf_file = RESULTS_DESTINATION 491 else: 492 iperf_file = self.iperf_server.log_files[-1] 493 try: 494 iperf_result = ipf.IPerfResult(iperf_file) 495 496 # Compute the throughput in Mbit/s 497 throughput = (math.fsum( 498 iperf_result.instantaneous_rates[self.start_meas_time:-1] 499 ) / len(iperf_result.instantaneous_rates[self.start_meas_time:-1]) 500 ) * 8 * (1.024**2) 501 502 self.log.info('The average throughput is {}'.format(throughput)) 503 except ValueError: 504 self.log.warning('Cannot get iperf result. Setting to 0') 505 throughput = 0 506 return throughput 507