1#!/usr/bin/python 2 3# Copyright (C) 2014 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import logging 18import os.path 19import select 20import sys 21import time 22import collections 23import socket 24import gflags as flags # http://code.google.com/p/python-gflags/ 25import pkgutil 26import threading 27import Queue 28import traceback 29import math 30import bisect 31from bisect import bisect_left 32 33""" 34scipy, numpy and matplotlib are python packages that can be installed 35from: http://www.scipy.org/ 36 37""" 38import scipy 39import matplotlib.pyplot as plt 40 41# let this script know about the power monitor implementations 42sys.path = [os.path.basename(__file__)] + sys.path 43available_monitors = [ 44 name 45 for _, name, _ in pkgutil.iter_modules( 46 [os.path.join(os.path.dirname(__file__), "power_monitors")]) 47 if not name.startswith("_")] 48 49APK = os.path.join(os.path.dirname(__file__), "..", "CtsVerifier.apk") 50 51FLAGS = flags.FLAGS 52 53# DELAY_SCREEN_OFF is the number of seconds to wait for baseline state 54DELAY_SCREEN_OFF = 20.0 55 56# whether to log data collected to a file for each sensor run: 57LOG_DATA_TO_FILE = True 58 59logging.getLogger().setLevel(logging.ERROR) 60 61 62def do_import(name): 63 """import a module by name dynamically""" 64 mod = __import__(name) 65 components = name.split(".") 66 for comp in components[1:]: 67 mod = getattr(mod, comp) 68 return mod 69 70class PowerTestException(Exception): 71 """ 72 Definition of specialized Exception class for CTS power tests 73 """ 74 def __init__(self, message): 75 self._error_message = message 76 def __str__(self): 77 return self._error_message 78 79class PowerTest: 80 """Class to run a suite of power tests. This has methods for obtaining 81 measurements from the power monitor (through the driver) and then 82 processing it to determine baseline and AP suspend state and 83 measure ampere draw of various sensors. 84 Ctrl+C causes a keyboard interrupt exception which terminates the test.""" 85 86 # Thresholds for max allowed power usage per sensor tested 87 # TODO: Accel, Mag and Gyro have no maximum power specified in the CDD; 88 # the following numbers are bogus and will be replaced soon by what 89 # the device reports (from Sensor.getPower()) 90 MAX_ACCEL_AMPS = 0.08 # Amps 91 MAX_MAG_AMPS = 0.08 # Amps 92 MAX_GYRO_AMPS = 0.08 # Amps 93 MAX_SIGMO_AMPS = 0.08 # Amps 94 95 # TODO: The following numbers for step counter, etc must be replaced by 96 # the numbers specified in CDD for low-power sensors. The expected current 97 # draw must be computed from the specified power and the voltage used to 98 # power the device (specified from a config file). 99 MAX_STEP_COUNTER_AMPS = 0.08 # Amps 100 MAX_STEP_DETECTOR_AMPS = 0.08 # Amps 101 # The variable EXPECTED_AMPS_VARIATION_HALF_RANGE denotes the expected 102 # variation of the ampere measurements 103 # around the mean value at baseline state. i.e. we expect most of the 104 # ampere measurements at baseline state to vary around the mean by 105 # between +/- of the number below 106 EXPECTED_AMPS_VARIATION_HALF_RANGE = 0.0005 107 # The variable THRESHOLD_BASELINE_SAMPLES_FRACTION denotes the minimum fraction of samples that must 108 # be in the range of variation defined by EXPECTED_AMPS_VARIATION_HALF_RANGE 109 # around the mean baseline for us to decide that the phone has settled into 110 # its baseline state 111 THRESHOLD_BASELINE_SAMPLES_FRACTION = 0.86 112 # The variable MAX_PERCENTILE_AP_SCREEN_OFF_AMPS denotes the maximum ampere 113 # draw that the device can consume when it has gone to suspend state with 114 # one or more sensors registered and batching samples (screen and AP are 115 # off in this case) 116 MAX_PERCENTILE_AP_SCREEN_OFF_AMPS = 0.030 # Amps 117 # The variable PERCENTILE_MAX_AP_SCREEN_OFF denotes the fraction of ampere 118 # measurements that must be below the specified maximum amperes 119 # MAX_PERCENTILE_AP_SCREEN_OFF_AMPS for us to decide that the phone has 120 # reached suspend state. 121 PERCENTILE_MAX_AP_SCREEN_OFF = 0.95 122 DOMAIN_NAME = "/android/cts/powertest" 123 # SAMPLE_COUNT_NOMINAL denotes the typical number of measurements of amperes 124 # to collect from the power monitor 125 SAMPLE_COUNT_NOMINAL = 1000 126 # RATE_NOMINAL denotes the nominal frequency at which ampere measurements 127 # are taken from the monsoon power monitor 128 RATE_NOMINAL = 100 129 ENABLE_PLOTTING = False 130 131 REQUEST_EXTERNAL_STORAGE = "EXTERNAL STORAGE?" 132 REQUEST_EXIT = "EXIT" 133 REQUEST_RAISE = "RAISE %s %s" 134 REQUEST_USER_RESPONSE = "USER RESPONSE %s" 135 REQUEST_SET_TEST_RESULT = "SET TEST RESULT %s %s %s" 136 REQUEST_SENSOR_SWITCH = "SENSOR %s %s" 137 REQUEST_SENSOR_AVAILABILITY = "SENSOR? %s" 138 REQUEST_SCREEN_OFF = "SCREEN OFF" 139 REQUEST_SHOW_MESSAGE = "MESSAGE %s" 140 141 NEGATIVE_AMPERE_ERROR_MESSAGE = ( 142 "Negative ampere draw measured, possibly due to power " 143 "supply from USB cable. Check the setup of device and power " 144 "monitor to make sure that the device is not connected " 145 "to machine via USB directly. The device should be " 146 "connected to the USB slot in the power monitor. It is okay " 147 "to change the wiring when the test is in progress.") 148 149 150 def __init__(self, max_baseline_amps): 151 """ 152 Args: 153 max_baseline_amps: The maximum value of baseline amperes 154 that we expect the device to consume at baseline state. 155 This can be different between models of phones. 156 """ 157 power_monitors = do_import("power_monitors.%s" % FLAGS.power_monitor) 158 testid = time.strftime("%d_%m_%Y__%H__%M_%S") 159 self._power_monitor = power_monitors.Power_Monitor(log_file_id = testid) 160 self._tcp_connect_port = 0 # any available port 161 print ("Establishing connection to device...") 162 self.setUsbEnabled(True) 163 status = self._power_monitor.GetStatus() 164 self._native_hz = status["sampleRate"] * 1000 165 # the following describes power test being run (i.e on what sensor 166 # and what type of test. This is used for logging. 167 self._current_test = "None" 168 self._external_storage = self.executeOnDevice(PowerTest.REQUEST_EXTERNAL_STORAGE) 169 self._max_baseline_amps = max_baseline_amps 170 171 def __del__(self): 172 self.finalize() 173 174 def finalize(self): 175 """To be called upon termination of host connection to device""" 176 if self._tcp_connect_port > 0: 177 # tell device side to exit connection loop, and remove the forwarding 178 # connection 179 self.executeOnDevice(PowerTest.REQUEST_EXIT, reportErrors = False) 180 self.executeLocal("adb forward --remove tcp:%d" % self._tcp_connect_port) 181 self._tcp_connect_port = 0 182 if self._power_monitor: 183 self._power_monitor.Close() 184 self._power_monitor = None 185 186 def _send(self, msg, report_errors = True): 187 """Connect to the device, send the given command, and then disconnect""" 188 if self._tcp_connect_port == 0: 189 # on first attempt to send a command, connect to device via any open port number, 190 # forwarding that port to a local socket on the device via adb 191 logging.debug("Seeking port for communication...") 192 # discover an open port 193 dummysocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 194 dummysocket.bind(("localhost", 0)) 195 (_, self._tcp_connect_port) = dummysocket.getsockname() 196 dummysocket.close() 197 assert(self._tcp_connect_port > 0) 198 199 status = self.executeLocal("adb forward tcp:%d localabstract:%s" % 200 (self._tcp_connect_port, PowerTest.DOMAIN_NAME)) 201 # If the status !=0, then the host machine is unable to 202 # forward requests to client over adb. Ending the test and logging error message 203 # to the console on the host. 204 self.endTestIfLostConnection( 205 status != 0, 206 "Unable to forward requests to client over adb") 207 logging.info("Forwarding requests over local port %d", 208 self._tcp_connect_port) 209 210 link = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 211 212 try: 213 logging.debug("Connecting to device...") 214 link.connect(("localhost", self._tcp_connect_port)) 215 logging.debug("Connected.") 216 except socket.error as serr: 217 print "Socket connection error: ", serr 218 print "Finalizing and exiting the test" 219 self.endTestIfLostConnection( 220 report_errors, 221 "Unable to communicate with device: connection refused") 222 except: 223 print "Non socket-related exception at this block in _send(); re-raising now." 224 raise 225 logging.debug("Sending '%s'", msg) 226 link.sendall(msg) 227 logging.debug("Getting response...") 228 response = link.recv(4096) 229 logging.debug("Got response '%s'", response) 230 link.close() 231 return response 232 233 def queryDevice(self, query): 234 """Post a yes/no query to the device, return True upon successful query, False otherwise""" 235 logging.info("Querying device with '%s'", query) 236 return self._send(query) == "OK" 237 238 # TODO: abstract device communication (and string commands) into its own class 239 def executeOnDevice(self, cmd, reportErrors = True): 240 """Execute a (string) command on the remote device""" 241 return self._send(cmd, reportErrors) 242 243 def executeLocal(self, cmd, check_status = True): 244 """execute a shell command locally (on the host)""" 245 from subprocess import call 246 status = call(cmd.split(" ")) 247 if status != 0 and check_status: 248 logging.error("Failed to execute \"%s\"", cmd) 249 else: 250 logging.debug("Executed \"%s\"", cmd) 251 return status 252 253 def reportErrorRaiseExceptionIf(self, condition, msg): 254 """Report an error condition to the device if condition is True. 255 Will raise an exception on the device if condition is True. 256 Args: 257 condition: If true, this reports error 258 msg: Message related to exception 259 Raises: 260 A PowerTestException encapsulating the message provided in msg 261 """ 262 if condition: 263 try: 264 logging.error("Exiting on error: %s" % msg) 265 self.executeOnDevice(PowerTest.REQUEST_RAISE % (self._current_test, msg), 266 reportErrors = True) 267 except: 268 logging.error("Unable to communicate with device to report " 269 "error: %s" % msg) 270 self.finalize() 271 sys.exit(msg) 272 raise PowerTestException(msg) 273 274 def endTestIfLostConnection(self, lost_connection, error_message): 275 """ 276 This function ends the test if lost_connection was true, 277 which indicates that the connection to the device was lost. 278 Args: 279 lost_connection: boolean variable, if True it indicates that 280 connection to device was lost and the test must be terminated. 281 error_message: String to print to the host console before exiting the test 282 (if lost_connection is True) 283 Returns: 284 None. 285 """ 286 if lost_connection: 287 logging.error(error_message) 288 self.finalize() 289 sys.exit(error_message) 290 291 def setUsbEnabled(self, enabled, verbose = True): 292 if enabled: 293 val = 1 294 else: 295 val = 0 296 self._power_monitor.SetUsbPassthrough(val) 297 tries = 0 298 299 # Sometimes command won't go through first time, particularly if immediately after a data 300 # collection, so allow for retries 301 # TODO: Move this retry mechanism to the power monitor driver. 302 status = self._power_monitor.GetStatus() 303 while status is None and tries < 5: 304 tries += 1 305 time.sleep(2.0) 306 logging.error("Retrying get status call...") 307 self._power_monitor.StopDataCollection() 308 self._power_monitor.SetUsbPassthrough(val) 309 status = self._power_monitor.GetStatus() 310 311 if enabled: 312 if verbose: 313 print("...USB enabled, waiting for device") 314 self.executeLocal("adb wait-for-device") 315 if verbose: 316 print("...device online") 317 else: 318 if verbose: 319 logging.info("...USB disabled") 320 # re-establish port forwarding 321 if enabled and self._tcp_connect_port > 0: 322 status = self.executeLocal("adb forward tcp:%d localabstract:%s" % 323 (self._tcp_connect_port, PowerTest.DOMAIN_NAME)) 324 self.reportErrorRaiseExceptionIf(status != 0, msg = "Unable to forward requests to client over adb") 325 326 def computeBaselineState(self, measurements): 327 """ 328 Args: 329 measurements: List of floats containing ampere draw measurements 330 taken from the monsoon power monitor. 331 Must be atleast 100 measurements long 332 Returns: 333 A tuple (isBaseline, mean_current) where isBaseline is a 334 boolean that is True only if the baseline state for the phone is 335 detected. mean_current is an estimate of the average baseline 336 current for the device, which is valid only if baseline state is 337 detected (if not, it is set to -1). 338 """ 339 340 # Looks at the measurements to see if it is in baseline state 341 if len(measurements) < 100: 342 print( 343 "Need at least 100 measurements to determine if baseline state has" 344 " been reached") 345 return (False, -1) 346 347 # Assumption: At baseline state, the power profile is Gaussian distributed 348 # with low-variance around the mean current draw. 349 # Ideally we should find the mode from a histogram bin to find an estimated mean. 350 # Assuming here that the median is very close to this value; later we check that the 351 # variance of the samples is low enough to validate baseline. 352 sorted_measurements = sorted(measurements) 353 number_measurements = len(measurements) 354 if not number_measurements % 2: 355 median_measurement = (sorted_measurements[(number_measurements - 1) / 2] + 356 sorted_measurements[(number_measurements + 1) / 2]) / 2 357 else: 358 median_measurement = sorted_measurements[number_measurements / 2] 359 360 # Assume that at baseline state, a large fraction of power measurements 361 # are within +/- EXPECTED_AMPS_VARIATION_HALF_RANGE milliAmperes of 362 # the average baseline current. Find all such measurements in the 363 # sorted measurement vector. 364 left_index = ( 365 bisect_left( 366 sorted_measurements, 367 median_measurement - 368 PowerTest.EXPECTED_AMPS_VARIATION_HALF_RANGE)) 369 right_index = ( 370 bisect_left( 371 sorted_measurements, 372 median_measurement + 373 PowerTest.EXPECTED_AMPS_VARIATION_HALF_RANGE)) 374 375 average_baseline_amps = scipy.mean( 376 sorted_measurements[left_index: (right_index - 1)]) 377 378 detected_baseline = True 379 # We enforce that a fraction of more than 'THRESHOLD_BASELINE_SAMPLES_FRACTION' 380 # of samples must be within +/- EXPECTED_AMPS_VARIATION_HALF_RANGE 381 # milliAmperes of the mean baseline current, which we have estimated as 382 # the median. 383 if ((right_index - left_index) < PowerTest.THRESHOLD_BASELINE_SAMPLES_FRACTION * len( 384 measurements)): 385 detected_baseline = False 386 387 # We check for the maximum limit of the expected baseline 388 if median_measurement > self._max_baseline_amps: 389 detected_baseline = False 390 if average_baseline_amps < 0: 391 print PowerTest.NEGATIVE_AMPERE_ERROR_MESSAGE 392 detected_baseline = False 393 394 print("%s baseline state" % ("Could detect" if detected_baseline else "Could NOT detect")) 395 print( 396 "median amps = %f, avg amps = %f, fraction of good samples = %f" % 397 (median_measurement, average_baseline_amps, 398 float(right_index - left_index) / len(measurements))) 399 if PowerTest.ENABLE_PLOTTING: 400 plt.plot(measurements) 401 plt.show() 402 print("To continue test, please close the plot window manually.") 403 return (detected_baseline, average_baseline_amps) 404 405 def isApInSuspendState(self, measurements_amps, nominal_max_amps, test_percentile): 406 """ 407 This function detects AP suspend and display off state of phone 408 after a sensor has been registered. 409 410 Because the power profile can be very different between sensors and 411 even across builds, it is difficult to specify a tight threshold for 412 mean current draw or mandate that the power measurements must have low 413 variance. We use a criteria that allows for a certain fraction of 414 peaks in power spectrum and checks that test_percentile fraction of 415 measurements must be below the specified value nominal_max_amps 416 Args: 417 measurements_amps: amperes draw measurements from power monitor 418 test_percentile: the fraction of measurements we require to be below 419 a specified amps value 420 nominal_max_amps: the specified value of the max current draw 421 Returns: 422 returns a boolean which is True if and only if the AP suspend and 423 display off state is detected 424 """ 425 count_good = len([m for m in measurements_amps if m < nominal_max_amps]) 426 count_negative = len([m for m in measurements_amps if m < 0]) 427 if count_negative > 0: 428 print PowerTest.NEGATIVE_AMPERE_ERROR_MESSAGE 429 return False; 430 return count_good > test_percentile * len(measurements_amps) 431 432 def getBaselineState(self): 433 """This function first disables all sensors, then collects measurements 434 through the power monitor and continuously evaluates if baseline state 435 is reached. Once baseline state is detected, it returns a tuple with 436 status information. If baseline is not detected in a preset maximum 437 number of trials, it returns as well. 438 439 Returns: 440 Returns a tuple (isBaseline, mean_current) where isBaseline is a 441 boolean that is True only if the baseline state for the phone is 442 detected. mean_current is an estimate of the average baseline current 443 for the device, which is valid only if baseline state is detected 444 (if not, it is set to -1) 445 """ 446 self.setPowerOn("ALL", False) 447 self.setUsbEnabled(False) 448 print("Waiting %d seconds for baseline state" % DELAY_SCREEN_OFF) 449 time.sleep(DELAY_SCREEN_OFF) 450 451 MEASUREMENT_DURATION_SECONDS_BASELINE_DETECTION = 5 # seconds 452 NUMBER_MEASUREMENTS_BASELINE_DETECTION = ( 453 PowerTest.RATE_NOMINAL * 454 MEASUREMENT_DURATION_SECONDS_BASELINE_DETECTION) 455 NUMBER_MEASUREMENTS_BASELINE_VERIFICATION = ( 456 NUMBER_MEASUREMENTS_BASELINE_DETECTION * 5) 457 MAX_TRIALS = 50 458 459 collected_baseline_measurements = False 460 461 for tries in xrange(MAX_TRIALS): 462 print("Trial number %d of %d..." % (tries, MAX_TRIALS)) 463 measurements = self.collectMeasurements( 464 NUMBER_MEASUREMENTS_BASELINE_DETECTION, PowerTest.RATE_NOMINAL, 465 verbose = False) 466 if self.computeBaselineState(measurements)[0] is True: 467 collected_baseline_measurements = True 468 break 469 470 if collected_baseline_measurements: 471 print("Verifying baseline state over a longer interval " 472 "in order to double check baseline state") 473 measurements = self.collectMeasurements( 474 NUMBER_MEASUREMENTS_BASELINE_VERIFICATION, PowerTest.RATE_NOMINAL, 475 verbose = False) 476 self.reportErrorRaiseExceptionIf( 477 not measurements, "No background measurements could be taken") 478 retval = self.computeBaselineState(measurements) 479 if retval[0]: 480 print("Verified baseline.") 481 if measurements and LOG_DATA_TO_FILE: 482 with open("/tmp/cts-power-tests-background-data.log", "w") as f: 483 for m in measurements: 484 f.write("%.4f\n" % m) 485 return retval 486 else: 487 return (False, -1) 488 489 def waitForApSuspendMode(self): 490 """This function repeatedly collects measurements until AP suspend and display off 491 mode is detected. After a maximum number of trials, if this state is not reached, it 492 raises an error. 493 Returns: 494 boolean which is True if device was detected to be in suspend state 495 Raises: 496 Power monitor-related exception 497 """ 498 print("waitForApSuspendMode(): Sleeping for %d seconds" % DELAY_SCREEN_OFF) 499 time.sleep(DELAY_SCREEN_OFF) 500 501 NUMBER_MEASUREMENTS = 200 502 # Maximum trials for which to collect measurements to get to Ap suspend 503 # state 504 MAX_TRIALS = 50 505 506 got_to_suspend_state = False 507 for count in xrange(MAX_TRIALS): 508 print ("waitForApSuspendMode(): Trial %d of %d" % (count, MAX_TRIALS)) 509 measurements = self.collectMeasurements(NUMBER_MEASUREMENTS, 510 PowerTest.RATE_NOMINAL, 511 verbose = False) 512 if self.isApInSuspendState( 513 measurements, PowerTest.MAX_PERCENTILE_AP_SCREEN_OFF_AMPS, 514 PowerTest.PERCENTILE_MAX_AP_SCREEN_OFF): 515 got_to_suspend_state = True 516 break 517 self.reportErrorRaiseExceptionIf( 518 got_to_suspend_state is False, 519 msg = "Unable to determine application processor suspend mode status.") 520 print("Got to AP suspend state") 521 return got_to_suspend_state 522 523 def collectMeasurements(self, measurementCount, rate, verbose = True): 524 """Args: 525 measurementCount: Number of measurements to collect from the power 526 monitor 527 rate: The integer frequency in Hertz at which to collect measurements from 528 the power monitor 529 Returns: 530 A list containing measurements from the power monitor; that has the 531 requested count of the number of measurements at the specified rate 532 """ 533 assert (measurementCount > 0) 534 decimate_by = self._native_hz / rate or 1 535 536 self._power_monitor.StartDataCollection() 537 sub_measurements = [] 538 measurements = [] 539 tries = 0 540 if verbose: print("") 541 try: 542 while len(measurements) < measurementCount and tries < 5: 543 if tries: 544 self._power_monitor.StopDataCollection() 545 self._power_monitor.StartDataCollection() 546 time.sleep(1.0) 547 tries += 1 548 additional = self._power_monitor.CollectData() 549 if additional is not None: 550 tries = 0 551 sub_measurements.extend(additional) 552 while len(sub_measurements) >= decimate_by: 553 sub_avg = sum(sub_measurements[0:decimate_by]) / decimate_by 554 measurements.append(sub_avg) 555 sub_measurements = sub_measurements[decimate_by:] 556 if verbose: 557 # "\33[1A\33[2K" is a special Linux console control 558 # sequence for moving to the previous line, and 559 # erasing it; and reprinting new text on that 560 # erased line. 561 sys.stdout.write("\33[1A\33[2K") 562 print ("MEASURED[%d]: %f" % (len(measurements), measurements[-1])) 563 finally: 564 self._power_monitor.StopDataCollection() 565 566 self.reportErrorRaiseExceptionIf(measurementCount > len(measurements), 567 "Unable to collect all requested measurements") 568 return measurements 569 570 def requestUserAcknowledgment(self, msg): 571 """Post message to user on screen and wait for acknowledgment""" 572 response = self.executeOnDevice(PowerTest.REQUEST_USER_RESPONSE % msg) 573 self.reportErrorRaiseExceptionIf( 574 response != "OK", "Unable to request user acknowledgment") 575 576 def setTestResult(self, test_name, test_result, test_message): 577 """ 578 Reports the result of a test to the device 579 Args: 580 test_name: name of the test 581 test_result: Boolean result of the test (True means Pass) 582 test_message: Relevant message 583 """ 584 print ("Test %s : %s" % (test_name, test_result)) 585 586 response = ( 587 self.executeOnDevice( 588 PowerTest.REQUEST_SET_TEST_RESULT % 589 (test_name, test_result, test_message))) 590 self.reportErrorRaiseExceptionIf( 591 response != "OK", "Unable to send test status to Verifier") 592 593 def setPowerOn(self, sensor, powered_on): 594 response = self.executeOnDevice(PowerTest.REQUEST_SENSOR_SWITCH % 595 (("ON" if powered_on else "OFF"), sensor)) 596 self.reportErrorRaiseExceptionIf( 597 response == "ERR", "Unable to set sensor %s state" % sensor) 598 logging.info("Set %s %s", sensor, ("ON" if powered_on else "OFF")) 599 return response 600 601 def runSensorPowerTest( 602 self, sensor, max_amperes_allowed, baseline_amps, user_request = None): 603 """ 604 Runs power test for a specific sensor; i.e. measures the amperes draw 605 of the phone using monsoon, with the specified sensor mregistered 606 and the phone in suspend state; and verifies that the incremental 607 consumed amperes is within expected bounds. 608 Args: 609 sensor: The specified sensor for which to run the power test 610 max_amperes_allowed: Maximum ampere draw of the device with the 611 sensor registered and device in suspend state 612 baseline_amps: The power draw of the device when it is in baseline 613 state (no sensors registered, display off, AP asleep) 614 """ 615 self._current_test = ("%s_Power_Test_While_%s" % ( 616 sensor, ("Under_Motion" if user_request is not None else "Still"))) 617 try: 618 print ("\n\n---------------------------------") 619 if user_request is not None: 620 print ("Running power test on %s under motion." % sensor) 621 else: 622 print ("Running power test on %s while device is still." % sensor) 623 print ("---------------------------------") 624 response = self.executeOnDevice( 625 PowerTest.REQUEST_SENSOR_AVAILABILITY % sensor) 626 if response == "UNAVAILABLE": 627 self.setTestResult( 628 self._current_test, test_result = "SKIPPED", 629 test_message = "Sensor %s not available on this platform" % sensor) 630 self.setPowerOn("ALL", False) 631 if response == "UNAVAILABLE": 632 self.setTestResult( 633 self._current_test, test_result = "SKIPPED", 634 test_message = "Sensor %s not available on this device" % sensor) 635 return 636 self.reportErrorRaiseExceptionIf(response != "OK", "Unable to set all sensor off") 637 self.executeOnDevice(PowerTest.REQUEST_SCREEN_OFF) 638 self.setUsbEnabled(False) 639 self.setUsbEnabled(True) 640 self.setPowerOn(sensor, True) 641 if user_request is not None: 642 print("===========================================\n" + 643 "==> Please follow the instructions presented on the device\n" + 644 "===========================================") 645 self.requestUserAcknowledgment(user_request) 646 self.executeOnDevice(PowerTest.REQUEST_SCREEN_OFF) 647 self.setUsbEnabled(False) 648 self.reportErrorRaiseExceptionIf( 649 response != "OK", "Unable to set sensor %s ON" % sensor) 650 651 self.waitForApSuspendMode() 652 print ("Collecting sensor %s measurements" % sensor) 653 measurements = self.collectMeasurements(PowerTest.SAMPLE_COUNT_NOMINAL, 654 PowerTest.RATE_NOMINAL) 655 656 if measurements and LOG_DATA_TO_FILE: 657 with open("/tmp/cts-power-tests-%s-%s-sensor-data.log" % (sensor, 658 ("Under_Motion" if user_request is not None else "Still")), "w") as f: 659 for m in measurements: 660 f.write("%.4f\n" % m) 661 self.setUsbEnabled(True, verbose = False) 662 print("Saving raw data files to device...") 663 self.executeLocal("adb shell mkdir -p %s" % self._external_storage, False) 664 self.executeLocal("adb push %s %s/." % (f.name, self._external_storage)) 665 self.setUsbEnabled(False, verbose = False) 666 self.reportErrorRaiseExceptionIf( 667 not measurements, "No measurements could be taken for %s" % sensor) 668 avg = sum(measurements) / len(measurements) 669 squared = [(m - avg) * (m - avg) for m in measurements] 670 671 stddev = math.sqrt(sum(squared) / len(squared)) 672 current_diff = avg - baseline_amps 673 self.setUsbEnabled(True) 674 max_power = max(measurements) - avg 675 if current_diff <= max_amperes_allowed: 676 # TODO: fail the test of background > current 677 message = ( 678 "Draw is within limits. Sensor delta:%f mAmp Baseline:%f " 679 "mAmp Sensor: %f mAmp Stddev : %f mAmp Peak: %f mAmp") % ( 680 current_diff * 1000.0, baseline_amps * 1000.0, avg * 1000.0, 681 stddev * 1000.0, max_power * 1000.0) 682 else: 683 message = ( 684 "Draw is too high. Current:%f Background:%f Measured: %f " 685 "Stddev: %f Peak: %f") % ( 686 current_diff * 1000.0, baseline_amps * 1000.0, avg * 1000.0, 687 stddev * 1000.0, max_power * 1000.0) 688 self.setTestResult( 689 self._current_test, 690 ("PASS" if (current_diff <= max_amperes_allowed) else "FAIL"), 691 message) 692 print("Result: " + message) 693 except: 694 traceback.print_exc() 695 self.setTestResult(self._current_test, test_result = "FAIL", 696 test_message = "Exception occurred during run of test.") 697 raise 698 699 @staticmethod 700 def runTests(max_baseline_amps): 701 testrunner = None 702 try: 703 GENERIC_MOTION_REQUEST = ("\n===> Please press Next and when the " 704 "screen is off, keep the device under motion with only tiny, " 705 "slow movements until the screen turns on again.\nPlease " 706 "refrain from interacting with the screen or pressing any side " 707 "buttons while measurements are taken.") 708 USER_STEPS_REQUEST = ("\n===> Please press Next and when the " 709 "screen is off, then move the device to simulate step motion " 710 "until the screen turns on again.\nPlease refrain from " 711 "interacting with the screen or pressing any side buttons " 712 "while measurements are taken.") 713 testrunner = PowerTest(max_baseline_amps) 714 testrunner.executeOnDevice( 715 PowerTest.REQUEST_SHOW_MESSAGE % "Connected. Running tests...") 716 is_baseline_success, baseline_amps = testrunner.getBaselineState() 717 718 if is_baseline_success: 719 testrunner.setUsbEnabled(True) 720 # TODO: Enable testing a single sensor 721 testrunner.runSensorPowerTest( 722 "SIGNIFICANT_MOTION", PowerTest.MAX_SIGMO_AMPS, baseline_amps, 723 user_request = GENERIC_MOTION_REQUEST) 724 testrunner.runSensorPowerTest( 725 "STEP_DETECTOR", PowerTest.MAX_STEP_DETECTOR_AMPS, baseline_amps, 726 user_request = USER_STEPS_REQUEST) 727 testrunner.runSensorPowerTest( 728 "STEP_COUNTER", PowerTest.MAX_STEP_COUNTER_AMPS, baseline_amps, 729 user_request = USER_STEPS_REQUEST) 730 testrunner.runSensorPowerTest( 731 "ACCELEROMETER", PowerTest.MAX_ACCEL_AMPS, baseline_amps, 732 user_request = GENERIC_MOTION_REQUEST) 733 testrunner.runSensorPowerTest( 734 "MAGNETIC_FIELD", PowerTest.MAX_MAG_AMPS, baseline_amps, 735 user_request = GENERIC_MOTION_REQUEST) 736 testrunner.runSensorPowerTest( 737 "GYROSCOPE", PowerTest.MAX_GYRO_AMPS, baseline_amps, 738 user_request = GENERIC_MOTION_REQUEST) 739 testrunner.runSensorPowerTest( 740 "ACCELEROMETER", PowerTest.MAX_ACCEL_AMPS, baseline_amps, 741 user_request = None) 742 testrunner.runSensorPowerTest( 743 "MAGNETIC_FIELD", PowerTest.MAX_MAG_AMPS, baseline_amps, 744 user_request = None) 745 testrunner.runSensorPowerTest( 746 "GYROSCOPE", PowerTest.MAX_GYRO_AMPS, baseline_amps, 747 user_request = None) 748 testrunner.runSensorPowerTest( 749 "SIGNIFICANT_MOTION", PowerTest.MAX_SIGMO_AMPS, baseline_amps, 750 user_request = None) 751 testrunner.runSensorPowerTest( 752 "STEP_DETECTOR", PowerTest.MAX_STEP_DETECTOR_AMPS, baseline_amps, 753 user_request = None) 754 testrunner.runSensorPowerTest( 755 "STEP_COUNTER", PowerTest.MAX_STEP_COUNTER_AMPS, baseline_amps, 756 user_request = None) 757 else: 758 print("Could not get to baseline state. This is either because " 759 "in several trials, the monitor could not measure a set " 760 "of power measurements that had the specified low " 761 "variance or the mean measurements were below the " 762 "expected value. None of the sensor power measurement " 763 " tests were performed due to not being able to detect " 764 "baseline state. Please re-run the power tests.") 765 except KeyboardInterrupt: 766 print "Keyboard interrupt from user." 767 raise 768 except: 769 import traceback 770 traceback.print_exc() 771 finally: 772 logging.info("TESTS COMPLETE") 773 if testrunner: 774 try: 775 testrunner.finalize() 776 except socket.error: 777 sys.exit( 778 "===================================================\n" 779 "Unable to connect to device under test. Make sure \n" 780 "the device is connected via the usb pass-through, \n" 781 "the CtsVerifier app is running the SensorPowerTest on \n" 782 "the device, and USB pass-through is enabled.\n" 783 "===================================================") 784 785def main(argv): 786 """ Simple command-line interface for a power test application.""" 787 useful_flags = ["voltage", "status", "usbpassthrough", 788 "samples", "current", "log", "power_monitor"] 789 if not [f for f in useful_flags if FLAGS.get(f, None) is not None]: 790 print __doc__.strip() 791 print FLAGS.MainModuleHelp() 792 return 793 794 if FLAGS.avg and FLAGS.avg < 0: 795 logging.error("--avg must be greater than 0") 796 return 797 798 if FLAGS.voltage is not None: 799 if FLAGS.voltage > 5.5: 800 print("!!WARNING: Voltage higher than typical values!!!") 801 try: 802 response = raw_input( 803 "Voltage of %.3f requested. Confirm this is correct (Y/N)" % 804 FLAGS.voltage) 805 if response.upper() != "Y": 806 sys.exit("Aborting") 807 except: 808 sys.exit("Aborting.") 809 810 if not FLAGS.power_monitor: 811 sys.exit( 812 "You must specify a '--power_monitor' option to specify which power " 813 "monitor type " + 814 "you are using.\nOne of:\n \n ".join(available_monitors)) 815 power_monitors = do_import("power_monitors.%s" % FLAGS.power_monitor) 816 try: 817 mon = power_monitors.Power_Monitor(device = FLAGS.device) 818 except: 819 import traceback 820 821 traceback.print_exc() 822 sys.exit("No power monitors found") 823 824 if FLAGS.voltage is not None: 825 826 if FLAGS.ramp is not None: 827 mon.RampVoltage(mon.start_voltage, FLAGS.voltage) 828 else: 829 mon.SetVoltage(FLAGS.voltage) 830 831 if FLAGS.current is not None: 832 mon.SetMaxCurrent(FLAGS.current) 833 834 if FLAGS.status: 835 items = sorted(mon.GetStatus().items()) 836 print "\n".join(["%s: %s" % item for item in items]) 837 838 if FLAGS.usbpassthrough: 839 if FLAGS.usbpassthrough == "off": 840 mon.SetUsbPassthrough(0) 841 elif FLAGS.usbpassthrough == "on": 842 mon.SetUsbPassthrough(1) 843 elif FLAGS.usbpassthrough == "auto": 844 mon.SetUsbPassthrough(2) 845 else: 846 mon.Close() 847 sys.exit("bad pass-through flag: %s" % FLAGS.usbpassthrough) 848 849 if FLAGS.samples: 850 # Make sure state is normal 851 mon.StopDataCollection() 852 status = mon.GetStatus() 853 native_hz = status["sampleRate"] * 1000 854 855 # Collect and average samples as specified 856 mon.StartDataCollection() 857 858 # In case FLAGS.hz doesn't divide native_hz exactly, use this invariant: 859 # 'offset' = (consumed samples) * FLAGS.hz - (emitted samples) * native_hz 860 # This is the error accumulator in a variation of Bresenham's algorithm. 861 emitted = offset = 0 862 collected = [] 863 history_deque = collections.deque() # past n samples for rolling average 864 865 # TODO: Complicated lines of code below. Refactoring needed 866 try: 867 last_flush = time.time() 868 while emitted < FLAGS.samples or FLAGS.samples == -1: 869 # The number of raw samples to consume before emitting the next output 870 need = (native_hz - offset + FLAGS.hz - 1) / FLAGS.hz 871 if need > len(collected): # still need more input samples 872 samples = mon.CollectData() 873 if not samples: break 874 collected.extend(samples) 875 else: 876 # Have enough data, generate output samples. 877 # Adjust for consuming 'need' input samples. 878 offset += need * FLAGS.hz 879 while offset >= native_hz: # maybe multiple, if FLAGS.hz > native_hz 880 this_sample = sum(collected[:need]) / need 881 882 if FLAGS.timestamp: print int(time.time()), 883 884 if FLAGS.avg: 885 history_deque.appendleft(this_sample) 886 if len(history_deque) > FLAGS.avg: history_deque.pop() 887 print "%f %f" % (this_sample, 888 sum(history_deque) / len(history_deque)) 889 else: 890 print "%f" % this_sample 891 sys.stdout.flush() 892 893 offset -= native_hz 894 emitted += 1 # adjust for emitting 1 output sample 895 collected = collected[need:] 896 now = time.time() 897 if now - last_flush >= 0.99: # flush every second 898 sys.stdout.flush() 899 last_flush = now 900 except KeyboardInterrupt: 901 print("interrupted") 902 return 1 903 finally: 904 mon.Close() 905 return 0 906 907 if FLAGS.run: 908 if not FLAGS.power_monitor: 909 sys.exit( 910 "When running power tests, you must specify which type of power " 911 "monitor to use" + 912 " with '--power_monitor <type of power monitor>'") 913 try: 914 PowerTest.runTests(FLAGS.max_baseline_amps) 915 except KeyboardInterrupt: 916 print "Keyboard interrupt from user" 917 918if __name__ == "__main__": 919 flags.DEFINE_boolean("status", None, "Print power meter status") 920 flags.DEFINE_integer("avg", None, 921 "Also report average over last n data points") 922 flags.DEFINE_float("voltage", None, "Set output voltage (0 for off)") 923 flags.DEFINE_float("current", None, "Set max output current") 924 flags.DEFINE_string("usbpassthrough", None, "USB control (on, off, auto)") 925 flags.DEFINE_integer("samples", None, "Collect and print this many samples") 926 flags.DEFINE_integer("hz", 5000, "Print this many samples/sec") 927 flags.DEFINE_string("device", None, 928 "Path to the device in /dev/... (ex:/dev/ttyACM1)") 929 flags.DEFINE_boolean("timestamp", None, 930 "Also print integer (seconds) timestamp on each line") 931 flags.DEFINE_boolean("ramp", True, "Gradually increase voltage") 932 flags.DEFINE_boolean("log", False, "Log progress to a file or not") 933 flags.DEFINE_boolean("run", False, "Run the test suite for power") 934 flags.DEFINE_string("power_monitor", None, "Type of power monitor to use") 935 flags.DEFINE_float("max_baseline_amps", 0.005, 936 "Set maximum baseline current for device being tested") 937 sys.exit(main(FLAGS(sys.argv))) 938