1#!/usr/bin/python
2
3# Copyright (C) 2014 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17import logging
18import os.path
19import select
20import sys
21import time
22import collections
23import socket
24import gflags as flags  # http://code.google.com/p/python-gflags/
25import pkgutil
26import threading
27import Queue
28import traceback
29import math
30import bisect
31from bisect import bisect_left
32
33"""
34scipy, numpy and matplotlib are python packages that can be installed
35from: http://www.scipy.org/
36
37"""
38import scipy
39import matplotlib.pyplot as plt
40
41# let this script know about the power monitor implementations
42sys.path = [os.path.basename(__file__)] + sys.path
43available_monitors = [
44    name
45    for _, name, _ in pkgutil.iter_modules(
46        [os.path.join(os.path.dirname(__file__), "power_monitors")])
47    if not name.startswith("_")]
48
49APK = os.path.join(os.path.dirname(__file__), "..", "CtsVerifier.apk")
50
51FLAGS = flags.FLAGS
52
53# DELAY_SCREEN_OFF is the number of seconds to wait for baseline state
54DELAY_SCREEN_OFF = 20.0
55
56# whether to log data collected to a file for each sensor run:
57LOG_DATA_TO_FILE = True
58
59logging.getLogger().setLevel(logging.ERROR)
60
61
62def do_import(name):
63    """import a module by name dynamically"""
64    mod = __import__(name)
65    components = name.split(".")
66    for comp in components[1:]:
67        mod = getattr(mod, comp)
68    return mod
69
70class PowerTestException(Exception):
71    """
72    Definition of specialized Exception class for CTS power tests
73    """
74    def __init__(self, message):
75        self._error_message = message
76    def __str__(self):
77        return self._error_message
78
79class PowerTest:
80    """Class to run a suite of power tests. This has methods for obtaining
81    measurements from the power monitor (through the driver) and then
82    processing it to determine baseline and AP suspend state and
83    measure ampere draw of various sensors.
84    Ctrl+C causes a keyboard interrupt exception which terminates the test."""
85
86    # Thresholds for max allowed power usage per sensor tested
87    # TODO: Accel, Mag and Gyro have no maximum power specified in the CDD;
88    # the following numbers are bogus and will be replaced soon by what
89    # the device reports (from Sensor.getPower())
90    MAX_ACCEL_AMPS = 0.08  # Amps
91    MAX_MAG_AMPS = 0.08  # Amps
92    MAX_GYRO_AMPS = 0.08  # Amps
93    MAX_SIGMO_AMPS = 0.08  # Amps
94
95    # TODO: The following numbers for step counter, etc must be replaced by
96    # the numbers specified in CDD for low-power sensors. The expected current
97    # draw must be computed from the specified power and the voltage used to
98    # power the device (specified from a config file).
99    MAX_STEP_COUNTER_AMPS = 0.08  # Amps
100    MAX_STEP_DETECTOR_AMPS = 0.08  # Amps
101    # The variable EXPECTED_AMPS_VARIATION_HALF_RANGE denotes the expected
102    # variation of  the ampere measurements
103    # around the mean value at baseline state. i.e. we expect most of the
104    # ampere measurements at baseline state to vary around the mean by
105    # between +/- of the number below
106    EXPECTED_AMPS_VARIATION_HALF_RANGE = 0.0005
107    # The variable THRESHOLD_BASELINE_SAMPLES_FRACTION denotes the minimum fraction of samples that must
108    # be in the range of variation defined by EXPECTED_AMPS_VARIATION_HALF_RANGE
109    # around the mean baseline for us to decide that the phone has settled into
110    # its baseline state
111    THRESHOLD_BASELINE_SAMPLES_FRACTION = 0.86
112    # The variable MAX_PERCENTILE_AP_SCREEN_OFF_AMPS denotes the maximum ampere
113    # draw that the device can consume when it has gone to suspend state with
114    # one or more sensors registered and batching samples (screen and AP are
115    # off in this case)
116    MAX_PERCENTILE_AP_SCREEN_OFF_AMPS = 0.030  # Amps
117    # The variable PERCENTILE_MAX_AP_SCREEN_OFF denotes the fraction of ampere
118    # measurements that must be below the specified maximum amperes
119    # MAX_PERCENTILE_AP_SCREEN_OFF_AMPS for us to decide that the phone has
120    # reached suspend state.
121    PERCENTILE_MAX_AP_SCREEN_OFF = 0.95
122    DOMAIN_NAME = "/android/cts/powertest"
123    # SAMPLE_COUNT_NOMINAL denotes the typical number of measurements of amperes
124    # to collect from the power monitor
125    SAMPLE_COUNT_NOMINAL = 1000
126    # RATE_NOMINAL denotes the nominal frequency at which ampere measurements
127    # are taken from the monsoon power monitor
128    RATE_NOMINAL = 100
129    ENABLE_PLOTTING = False
130
131    REQUEST_EXTERNAL_STORAGE = "EXTERNAL STORAGE?"
132    REQUEST_EXIT = "EXIT"
133    REQUEST_RAISE = "RAISE %s %s"
134    REQUEST_USER_RESPONSE = "USER RESPONSE %s"
135    REQUEST_SET_TEST_RESULT = "SET TEST RESULT %s %s %s"
136    REQUEST_SENSOR_SWITCH = "SENSOR %s %s"
137    REQUEST_SENSOR_AVAILABILITY = "SENSOR? %s"
138    REQUEST_SCREEN_OFF = "SCREEN OFF"
139    REQUEST_SHOW_MESSAGE = "MESSAGE %s"
140
141    NEGATIVE_AMPERE_ERROR_MESSAGE = (
142        "Negative ampere draw measured, possibly due to power "
143        "supply from USB cable. Check the setup of device and power "
144        "monitor to make sure that the device is not connected "
145        "to machine via USB directly. The device should be "
146        "connected to the USB slot in the power monitor. It is okay "
147        "to change the wiring when the test is in progress.")
148
149
150    def __init__(self, max_baseline_amps):
151        """
152        Args:
153            max_baseline_amps: The maximum value of baseline amperes
154                    that we expect the device to consume at baseline state.
155                    This can be different between models of phones.
156        """
157        power_monitors = do_import("power_monitors.%s" % FLAGS.power_monitor)
158        testid = time.strftime("%d_%m_%Y__%H__%M_%S")
159        self._power_monitor = power_monitors.Power_Monitor(log_file_id = testid)
160        self._tcp_connect_port = 0  # any available port
161        print ("Establishing connection to device...")
162        self.setUsbEnabled(True)
163        status = self._power_monitor.GetStatus()
164        self._native_hz = status["sampleRate"] * 1000
165        # the following describes power test being run (i.e on what sensor
166        # and what type of test. This is used for logging.
167        self._current_test = "None"
168        self._external_storage = self.executeOnDevice(PowerTest.REQUEST_EXTERNAL_STORAGE)
169        self._max_baseline_amps = max_baseline_amps
170
171    def __del__(self):
172        self.finalize()
173
174    def finalize(self):
175        """To be called upon termination of host connection to device"""
176        if self._tcp_connect_port > 0:
177            # tell device side to exit connection loop, and remove the forwarding
178            # connection
179            self.executeOnDevice(PowerTest.REQUEST_EXIT, reportErrors = False)
180            self.executeLocal("adb forward --remove tcp:%d" % self._tcp_connect_port)
181        self._tcp_connect_port = 0
182        if self._power_monitor:
183            self._power_monitor.Close()
184            self._power_monitor = None
185
186    def _send(self, msg, report_errors = True):
187        """Connect to the device, send the given command, and then disconnect"""
188        if self._tcp_connect_port == 0:
189            # on first attempt to send a command, connect to device via any open port number,
190            # forwarding that port to a local socket on the device via adb
191            logging.debug("Seeking port for communication...")
192            # discover an open port
193            dummysocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
194            dummysocket.bind(("localhost", 0))
195            (_, self._tcp_connect_port) = dummysocket.getsockname()
196            dummysocket.close()
197            assert(self._tcp_connect_port > 0)
198
199            status = self.executeLocal("adb forward tcp:%d localabstract:%s" %
200                                       (self._tcp_connect_port, PowerTest.DOMAIN_NAME))
201            # If the status !=0, then the host machine is unable to
202            # forward requests to client over adb. Ending the test and logging error message
203            # to the console on the host.
204            self.endTestIfLostConnection(
205                status != 0,
206                "Unable to forward requests to client over adb")
207            logging.info("Forwarding requests over local port %d",
208                         self._tcp_connect_port)
209
210        link = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
211
212        try:
213            logging.debug("Connecting to device...")
214            link.connect(("localhost", self._tcp_connect_port))
215            logging.debug("Connected.")
216        except socket.error as serr:
217            print "Socket connection error: ", serr
218            print "Finalizing and exiting the test"
219            self.endTestIfLostConnection(
220                report_errors,
221                "Unable to communicate with device: connection refused")
222        except:
223            print "Non socket-related exception at this block in _send(); re-raising now."
224            raise
225        logging.debug("Sending '%s'", msg)
226        link.sendall(msg)
227        logging.debug("Getting response...")
228        response = link.recv(4096)
229        logging.debug("Got response '%s'", response)
230        link.close()
231        return response
232
233    def queryDevice(self, query):
234        """Post a yes/no query to the device, return True upon successful query, False otherwise"""
235        logging.info("Querying device with '%s'", query)
236        return self._send(query) == "OK"
237
238    # TODO: abstract device communication (and string commands) into its own class
239    def executeOnDevice(self, cmd, reportErrors = True):
240        """Execute a (string) command on the remote device"""
241        return self._send(cmd, reportErrors)
242
243    def executeLocal(self, cmd, check_status = True):
244        """execute a shell command locally (on the host)"""
245        from subprocess import call
246        status = call(cmd.split(" "))
247        if status != 0 and check_status:
248            logging.error("Failed to execute \"%s\"", cmd)
249        else:
250            logging.debug("Executed \"%s\"", cmd)
251        return status
252
253    def reportErrorRaiseExceptionIf(self, condition, msg):
254        """Report an error condition to the device if condition is True.
255        Will raise an exception on the device if condition is True.
256        Args:
257            condition: If true, this reports error
258            msg: Message related to exception
259        Raises:
260            A PowerTestException encapsulating the message provided in msg
261        """
262        if condition:
263            try:
264                logging.error("Exiting on error: %s" % msg)
265                self.executeOnDevice(PowerTest.REQUEST_RAISE % (self._current_test, msg),
266                                     reportErrors = True)
267            except:
268                logging.error("Unable to communicate with device to report "
269                              "error: %s" % msg)
270                self.finalize()
271                sys.exit(msg)
272            raise PowerTestException(msg)
273
274    def endTestIfLostConnection(self, lost_connection, error_message):
275        """
276        This function ends the test if lost_connection was true,
277        which indicates that the connection to the device was lost.
278        Args:
279            lost_connection: boolean variable, if True it indicates that
280                connection to device was lost and the test must be terminated.
281            error_message: String to print to the host console before exiting the test
282                (if lost_connection is True)
283        Returns:
284            None.
285        """
286        if lost_connection:
287            logging.error(error_message)
288            self.finalize()
289            sys.exit(error_message)
290
291    def setUsbEnabled(self, enabled, verbose = True):
292        if enabled:
293            val = 1
294        else:
295            val = 0
296        self._power_monitor.SetUsbPassthrough(val)
297        tries = 0
298
299        # Sometimes command won't go through first time, particularly if immediately after a data
300        # collection, so allow for retries
301        # TODO: Move this retry mechanism to the power monitor driver.
302        status = self._power_monitor.GetStatus()
303        while status is None and tries < 5:
304            tries += 1
305            time.sleep(2.0)
306            logging.error("Retrying get status call...")
307            self._power_monitor.StopDataCollection()
308            self._power_monitor.SetUsbPassthrough(val)
309            status = self._power_monitor.GetStatus()
310
311        if enabled:
312            if verbose:
313                print("...USB enabled, waiting for device")
314            self.executeLocal("adb wait-for-device")
315            if verbose:
316                print("...device online")
317        else:
318            if verbose:
319                logging.info("...USB disabled")
320        # re-establish port forwarding
321        if enabled and self._tcp_connect_port > 0:
322            status = self.executeLocal("adb forward tcp:%d localabstract:%s" %
323                                       (self._tcp_connect_port, PowerTest.DOMAIN_NAME))
324            self.reportErrorRaiseExceptionIf(status != 0, msg = "Unable to forward requests to client over adb")
325
326    def computeBaselineState(self, measurements):
327        """
328        Args:
329            measurements: List of floats containing ampere draw measurements
330                taken from the monsoon power monitor.
331                Must be atleast 100 measurements long
332        Returns:
333            A tuple (isBaseline, mean_current) where isBaseline is a
334            boolean that is True only if the baseline state for the phone is
335            detected. mean_current is an estimate of the average baseline
336            current for the device, which is valid only if baseline state is
337            detected (if not, it is set to -1).
338        """
339
340        # Looks at the measurements to see if it is in baseline state
341        if len(measurements) < 100:
342            print(
343                "Need at least 100 measurements to determine if baseline state has"
344                " been reached")
345            return (False, -1)
346
347        # Assumption: At baseline state, the power profile is Gaussian distributed
348        # with low-variance around the mean current draw.
349        # Ideally we should find the mode from a histogram bin to find an estimated mean.
350        # Assuming here that the median is very close to this value; later we check that the
351        # variance of the samples is low enough to validate baseline.
352        sorted_measurements = sorted(measurements)
353        number_measurements = len(measurements)
354        if not number_measurements % 2:
355            median_measurement = (sorted_measurements[(number_measurements - 1) / 2] +
356                                  sorted_measurements[(number_measurements + 1) / 2]) / 2
357        else:
358            median_measurement = sorted_measurements[number_measurements / 2]
359
360        # Assume that at baseline state, a large fraction of power measurements
361        # are within +/- EXPECTED_AMPS_VARIATION_HALF_RANGE milliAmperes of
362        # the average baseline current. Find all such measurements in the
363        # sorted measurement vector.
364        left_index = (
365            bisect_left(
366                sorted_measurements,
367                median_measurement -
368                PowerTest.EXPECTED_AMPS_VARIATION_HALF_RANGE))
369        right_index = (
370            bisect_left(
371                sorted_measurements,
372                median_measurement +
373                PowerTest.EXPECTED_AMPS_VARIATION_HALF_RANGE))
374
375        average_baseline_amps = scipy.mean(
376            sorted_measurements[left_index: (right_index - 1)])
377
378        detected_baseline = True
379        # We enforce that a fraction of more than 'THRESHOLD_BASELINE_SAMPLES_FRACTION'
380        # of samples must be within +/- EXPECTED_AMPS_VARIATION_HALF_RANGE
381        # milliAmperes of the mean baseline current, which we have estimated as
382        # the median.
383        if ((right_index - left_index) < PowerTest.THRESHOLD_BASELINE_SAMPLES_FRACTION * len(
384                measurements)):
385            detected_baseline = False
386
387        # We check for the maximum limit of the expected baseline
388        if median_measurement > self._max_baseline_amps:
389            detected_baseline = False
390        if average_baseline_amps < 0:
391            print PowerTest.NEGATIVE_AMPERE_ERROR_MESSAGE
392            detected_baseline = False
393
394        print("%s baseline state" % ("Could detect" if detected_baseline else "Could NOT detect"))
395        print(
396            "median amps = %f, avg amps = %f, fraction of good samples = %f" %
397            (median_measurement, average_baseline_amps,
398             float(right_index - left_index) / len(measurements)))
399        if PowerTest.ENABLE_PLOTTING:
400            plt.plot(measurements)
401            plt.show()
402            print("To continue test, please close the plot window manually.")
403        return (detected_baseline, average_baseline_amps)
404
405    def isApInSuspendState(self, measurements_amps, nominal_max_amps, test_percentile):
406        """
407        This function detects AP suspend and display off state of phone
408        after a sensor has been registered.
409
410        Because the power profile can be very different between sensors and
411        even across builds, it is difficult to specify a tight threshold for
412        mean current draw or mandate that the power measurements must have low
413        variance. We use a criteria that allows for a certain fraction of
414        peaks in power spectrum and checks that test_percentile fraction of
415        measurements must be below the specified value nominal_max_amps
416        Args:
417            measurements_amps: amperes draw measurements from power monitor
418            test_percentile: the fraction of measurements we require to be below
419                             a specified amps value
420            nominal_max_amps: the specified value of the max current draw
421        Returns:
422            returns a boolean which is True if and only if the AP suspend and
423            display off state is detected
424        """
425        count_good = len([m for m in measurements_amps if m < nominal_max_amps])
426        count_negative = len([m for m in measurements_amps if m < 0])
427        if count_negative > 0:
428            print PowerTest.NEGATIVE_AMPERE_ERROR_MESSAGE
429            return False;
430        return count_good > test_percentile * len(measurements_amps)
431
432    def getBaselineState(self):
433        """This function first disables all sensors, then collects measurements
434        through the power monitor and continuously evaluates if baseline state
435        is reached. Once baseline state is detected, it returns a tuple with
436        status information. If baseline is not detected in a preset maximum
437        number of trials, it returns as well.
438
439        Returns:
440            Returns a tuple (isBaseline, mean_current) where isBaseline is a
441            boolean that is True only if the baseline state for the phone is
442            detected. mean_current is an estimate of the average baseline current
443            for the device, which is valid only if baseline state is detected
444            (if not, it is set to -1)
445        """
446        self.setPowerOn("ALL", False)
447        self.setUsbEnabled(False)
448        print("Waiting %d seconds for baseline state" % DELAY_SCREEN_OFF)
449        time.sleep(DELAY_SCREEN_OFF)
450
451        MEASUREMENT_DURATION_SECONDS_BASELINE_DETECTION = 5  # seconds
452        NUMBER_MEASUREMENTS_BASELINE_DETECTION = (
453            PowerTest.RATE_NOMINAL *
454            MEASUREMENT_DURATION_SECONDS_BASELINE_DETECTION)
455        NUMBER_MEASUREMENTS_BASELINE_VERIFICATION = (
456            NUMBER_MEASUREMENTS_BASELINE_DETECTION * 5)
457        MAX_TRIALS = 50
458
459        collected_baseline_measurements = False
460
461        for tries in xrange(MAX_TRIALS):
462            print("Trial number %d of %d..." % (tries, MAX_TRIALS))
463            measurements = self.collectMeasurements(
464                NUMBER_MEASUREMENTS_BASELINE_DETECTION, PowerTest.RATE_NOMINAL,
465                verbose = False)
466            if self.computeBaselineState(measurements)[0] is True:
467                collected_baseline_measurements = True
468                break
469
470        if collected_baseline_measurements:
471            print("Verifying baseline state over a longer interval "
472                  "in order to double check baseline state")
473            measurements = self.collectMeasurements(
474                NUMBER_MEASUREMENTS_BASELINE_VERIFICATION, PowerTest.RATE_NOMINAL,
475                verbose = False)
476            self.reportErrorRaiseExceptionIf(
477                not measurements, "No background measurements could be taken")
478            retval = self.computeBaselineState(measurements)
479            if retval[0]:
480                print("Verified baseline.")
481                if measurements and LOG_DATA_TO_FILE:
482                    with open("/tmp/cts-power-tests-background-data.log", "w") as f:
483                        for m in measurements:
484                            f.write("%.4f\n" % m)
485            return retval
486        else:
487            return (False, -1)
488
489    def waitForApSuspendMode(self):
490        """This function repeatedly collects measurements until AP suspend and display off
491        mode is detected. After a maximum number of trials, if this state is not reached, it
492        raises an error.
493        Returns:
494            boolean which is True if device was detected to be in suspend state
495        Raises:
496            Power monitor-related exception
497        """
498        print("waitForApSuspendMode(): Sleeping for %d seconds" % DELAY_SCREEN_OFF)
499        time.sleep(DELAY_SCREEN_OFF)
500
501        NUMBER_MEASUREMENTS = 200
502        # Maximum trials for which to collect measurements to get to Ap suspend
503        # state
504        MAX_TRIALS = 50
505
506        got_to_suspend_state = False
507        for count in xrange(MAX_TRIALS):
508            print ("waitForApSuspendMode(): Trial %d of %d" % (count, MAX_TRIALS))
509            measurements = self.collectMeasurements(NUMBER_MEASUREMENTS,
510                                                    PowerTest.RATE_NOMINAL,
511                                                    verbose = False)
512            if self.isApInSuspendState(
513                    measurements, PowerTest.MAX_PERCENTILE_AP_SCREEN_OFF_AMPS,
514                    PowerTest.PERCENTILE_MAX_AP_SCREEN_OFF):
515                got_to_suspend_state = True
516                break
517        self.reportErrorRaiseExceptionIf(
518            got_to_suspend_state is False,
519            msg = "Unable to determine application processor suspend mode status.")
520        print("Got to AP suspend state")
521        return got_to_suspend_state
522
523    def collectMeasurements(self, measurementCount, rate, verbose = True):
524        """Args:
525            measurementCount: Number of measurements to collect from the power
526                              monitor
527            rate: The integer frequency in Hertz at which to collect measurements from
528                  the power monitor
529        Returns:
530            A list containing measurements from the power monitor; that has the
531            requested count of the number of measurements at the specified rate
532        """
533        assert (measurementCount > 0)
534        decimate_by = self._native_hz / rate or 1
535
536        self._power_monitor.StartDataCollection()
537        sub_measurements = []
538        measurements = []
539        tries = 0
540        if verbose: print("")
541        try:
542            while len(measurements) < measurementCount and tries < 5:
543                if tries:
544                    self._power_monitor.StopDataCollection()
545                    self._power_monitor.StartDataCollection()
546                    time.sleep(1.0)
547                tries += 1
548                additional = self._power_monitor.CollectData()
549                if additional is not None:
550                    tries = 0
551                    sub_measurements.extend(additional)
552                    while len(sub_measurements) >= decimate_by:
553                        sub_avg = sum(sub_measurements[0:decimate_by]) / decimate_by
554                        measurements.append(sub_avg)
555                        sub_measurements = sub_measurements[decimate_by:]
556                        if verbose:
557                            # "\33[1A\33[2K" is a special Linux console control
558                            # sequence for moving to the previous line, and
559                            # erasing it; and reprinting new text on that
560                            # erased line.
561                            sys.stdout.write("\33[1A\33[2K")
562                            print ("MEASURED[%d]: %f" % (len(measurements), measurements[-1]))
563        finally:
564            self._power_monitor.StopDataCollection()
565
566        self.reportErrorRaiseExceptionIf(measurementCount > len(measurements),
567                           "Unable to collect all requested measurements")
568        return measurements
569
570    def requestUserAcknowledgment(self, msg):
571        """Post message to user on screen and wait for acknowledgment"""
572        response = self.executeOnDevice(PowerTest.REQUEST_USER_RESPONSE % msg)
573        self.reportErrorRaiseExceptionIf(
574            response != "OK", "Unable to request user acknowledgment")
575
576    def setTestResult(self, test_name, test_result, test_message):
577        """
578        Reports the result of a test to the device
579        Args:
580            test_name: name of the test
581            test_result: Boolean result of the test (True means Pass)
582            test_message: Relevant message
583        """
584        print ("Test %s : %s" % (test_name, test_result))
585
586        response = (
587            self.executeOnDevice(
588                PowerTest.REQUEST_SET_TEST_RESULT %
589                (test_name, test_result, test_message)))
590        self.reportErrorRaiseExceptionIf(
591            response != "OK", "Unable to send test status to Verifier")
592
593    def setPowerOn(self, sensor, powered_on):
594        response = self.executeOnDevice(PowerTest.REQUEST_SENSOR_SWITCH %
595            (("ON" if powered_on else "OFF"), sensor))
596        self.reportErrorRaiseExceptionIf(
597            response == "ERR", "Unable to set sensor %s state" % sensor)
598        logging.info("Set %s %s", sensor, ("ON" if powered_on else "OFF"))
599        return response
600
601    def runSensorPowerTest(
602            self, sensor, max_amperes_allowed, baseline_amps, user_request = None):
603        """
604        Runs power test for a specific sensor; i.e. measures the amperes draw
605        of the phone using monsoon, with the specified sensor mregistered
606        and the phone in suspend state; and verifies that the incremental
607        consumed amperes is within expected bounds.
608        Args:
609            sensor: The specified sensor for which to run the power test
610            max_amperes_allowed: Maximum ampere draw of the device with the
611                    sensor registered and device in suspend state
612            baseline_amps: The power draw of the device when it is in baseline
613                    state (no sensors registered, display off, AP asleep)
614        """
615        self._current_test = ("%s_Power_Test_While_%s" % (
616            sensor, ("Under_Motion" if user_request is not None else "Still")))
617        try:
618            print ("\n\n---------------------------------")
619            if user_request is not None:
620                print ("Running power test on %s under motion." % sensor)
621            else:
622                print ("Running power test on %s while device is still." % sensor)
623            print ("---------------------------------")
624            response = self.executeOnDevice(
625                PowerTest.REQUEST_SENSOR_AVAILABILITY % sensor)
626            if response == "UNAVAILABLE":
627                self.setTestResult(
628                    self._current_test, test_result = "SKIPPED",
629                    test_message = "Sensor %s not available on this platform" % sensor)
630            self.setPowerOn("ALL", False)
631            if response == "UNAVAILABLE":
632                self.setTestResult(
633                    self._current_test, test_result = "SKIPPED",
634                    test_message = "Sensor %s not available on this device" % sensor)
635                return
636            self.reportErrorRaiseExceptionIf(response != "OK", "Unable to set all sensor off")
637            self.executeOnDevice(PowerTest.REQUEST_SCREEN_OFF)
638            self.setUsbEnabled(False)
639            self.setUsbEnabled(True)
640            self.setPowerOn(sensor, True)
641            if user_request is not None:
642                print("===========================================\n" +
643                      "==> Please follow the instructions presented on the device\n" +
644                      "===========================================")
645                self.requestUserAcknowledgment(user_request)
646            self.executeOnDevice(PowerTest.REQUEST_SCREEN_OFF)
647            self.setUsbEnabled(False)
648            self.reportErrorRaiseExceptionIf(
649                response != "OK", "Unable to set sensor %s ON" % sensor)
650
651            self.waitForApSuspendMode()
652            print ("Collecting sensor %s measurements" % sensor)
653            measurements = self.collectMeasurements(PowerTest.SAMPLE_COUNT_NOMINAL,
654                                                    PowerTest.RATE_NOMINAL)
655
656            if measurements and LOG_DATA_TO_FILE:
657                with open("/tmp/cts-power-tests-%s-%s-sensor-data.log" % (sensor,
658                    ("Under_Motion" if user_request is not None else "Still")), "w") as f:
659                    for m in measurements:
660                        f.write("%.4f\n" % m)
661                    self.setUsbEnabled(True, verbose = False)
662                    print("Saving raw data files to device...")
663                    self.executeLocal("adb shell mkdir -p %s" % self._external_storage, False)
664                    self.executeLocal("adb push %s %s/." % (f.name, self._external_storage))
665                    self.setUsbEnabled(False, verbose = False)
666            self.reportErrorRaiseExceptionIf(
667                not measurements, "No measurements could be taken for %s" % sensor)
668            avg = sum(measurements) / len(measurements)
669            squared = [(m - avg) * (m - avg) for m in measurements]
670
671            stddev = math.sqrt(sum(squared) / len(squared))
672            current_diff = avg - baseline_amps
673            self.setUsbEnabled(True)
674            max_power = max(measurements) - avg
675            if current_diff <= max_amperes_allowed:
676                # TODO: fail the test of background > current
677                message = (
678                              "Draw is within limits. Sensor delta:%f mAmp   Baseline:%f "
679                              "mAmp   Sensor: %f mAmp  Stddev : %f mAmp  Peak: %f mAmp") % (
680                              current_diff * 1000.0, baseline_amps * 1000.0, avg * 1000.0,
681                              stddev * 1000.0, max_power * 1000.0)
682            else:
683                message = (
684                              "Draw is too high. Current:%f Background:%f   Measured: %f "
685                              "Stddev: %f  Peak: %f") % (
686                              current_diff * 1000.0, baseline_amps * 1000.0, avg * 1000.0,
687                              stddev * 1000.0, max_power * 1000.0)
688            self.setTestResult(
689                self._current_test,
690                ("PASS" if (current_diff <= max_amperes_allowed) else "FAIL"),
691                message)
692            print("Result: " + message)
693        except:
694            traceback.print_exc()
695            self.setTestResult(self._current_test, test_result = "FAIL",
696                               test_message = "Exception occurred during run of test.")
697            raise
698
699    @staticmethod
700    def runTests(max_baseline_amps):
701        testrunner = None
702        try:
703            GENERIC_MOTION_REQUEST = ("\n===> Please press Next and when the "
704                "screen is off, keep the device under motion with only tiny, "
705                "slow movements until the screen turns on again.\nPlease "
706                "refrain from interacting with the screen or pressing any side "
707                "buttons while measurements are taken.")
708            USER_STEPS_REQUEST = ("\n===> Please press Next and when the "
709                "screen is off, then move the device to simulate step motion "
710                "until the screen turns on again.\nPlease refrain from "
711                "interacting with the screen or pressing any side buttons "
712                "while measurements are taken.")
713            testrunner = PowerTest(max_baseline_amps)
714            testrunner.executeOnDevice(
715                PowerTest.REQUEST_SHOW_MESSAGE % "Connected.  Running tests...")
716            is_baseline_success, baseline_amps = testrunner.getBaselineState()
717
718            if is_baseline_success:
719                testrunner.setUsbEnabled(True)
720                # TODO: Enable testing a single sensor
721                testrunner.runSensorPowerTest(
722                    "SIGNIFICANT_MOTION", PowerTest.MAX_SIGMO_AMPS, baseline_amps,
723                    user_request = GENERIC_MOTION_REQUEST)
724                testrunner.runSensorPowerTest(
725                    "STEP_DETECTOR", PowerTest.MAX_STEP_DETECTOR_AMPS, baseline_amps,
726                    user_request = USER_STEPS_REQUEST)
727                testrunner.runSensorPowerTest(
728                    "STEP_COUNTER", PowerTest.MAX_STEP_COUNTER_AMPS, baseline_amps,
729                    user_request = USER_STEPS_REQUEST)
730                testrunner.runSensorPowerTest(
731                    "ACCELEROMETER", PowerTest.MAX_ACCEL_AMPS, baseline_amps,
732                    user_request = GENERIC_MOTION_REQUEST)
733                testrunner.runSensorPowerTest(
734                    "MAGNETIC_FIELD", PowerTest.MAX_MAG_AMPS, baseline_amps,
735                    user_request = GENERIC_MOTION_REQUEST)
736                testrunner.runSensorPowerTest(
737                    "GYROSCOPE", PowerTest.MAX_GYRO_AMPS, baseline_amps,
738                    user_request = GENERIC_MOTION_REQUEST)
739                testrunner.runSensorPowerTest(
740                    "ACCELEROMETER", PowerTest.MAX_ACCEL_AMPS, baseline_amps,
741                    user_request = None)
742                testrunner.runSensorPowerTest(
743                    "MAGNETIC_FIELD", PowerTest.MAX_MAG_AMPS, baseline_amps,
744                    user_request = None)
745                testrunner.runSensorPowerTest(
746                    "GYROSCOPE", PowerTest.MAX_GYRO_AMPS, baseline_amps,
747                    user_request = None)
748                testrunner.runSensorPowerTest(
749                    "SIGNIFICANT_MOTION", PowerTest.MAX_SIGMO_AMPS, baseline_amps,
750                    user_request = None)
751                testrunner.runSensorPowerTest(
752                    "STEP_DETECTOR", PowerTest.MAX_STEP_DETECTOR_AMPS, baseline_amps,
753                    user_request = None)
754                testrunner.runSensorPowerTest(
755                    "STEP_COUNTER", PowerTest.MAX_STEP_COUNTER_AMPS, baseline_amps,
756                    user_request = None)
757            else:
758                print("Could not get to baseline state. This is either because "
759                      "in several trials, the monitor could not measure a set "
760                      "of power measurements that had the specified low "
761                      "variance or the mean measurements were below the "
762                      "expected value. None of the sensor power measurement "
763                      " tests were performed due to not being able to detect "
764                      "baseline state. Please re-run the power tests.")
765        except KeyboardInterrupt:
766            print "Keyboard interrupt from user."
767            raise
768        except:
769            import traceback
770            traceback.print_exc()
771        finally:
772            logging.info("TESTS COMPLETE")
773            if testrunner:
774                try:
775                    testrunner.finalize()
776                except socket.error:
777                    sys.exit(
778                        "===================================================\n"
779                        "Unable to connect to device under test. Make sure \n"
780                        "the device is connected via the usb pass-through, \n"
781                        "the CtsVerifier app is running the SensorPowerTest on \n"
782                        "the device, and USB pass-through is enabled.\n"
783                        "===================================================")
784
785def main(argv):
786    """ Simple command-line interface for a power test application."""
787    useful_flags = ["voltage", "status", "usbpassthrough",
788                    "samples", "current", "log", "power_monitor"]
789    if not [f for f in useful_flags if FLAGS.get(f, None) is not None]:
790        print __doc__.strip()
791        print FLAGS.MainModuleHelp()
792        return
793
794    if FLAGS.avg and FLAGS.avg < 0:
795        logging.error("--avg must be greater than 0")
796        return
797
798    if FLAGS.voltage is not None:
799        if FLAGS.voltage > 5.5:
800            print("!!WARNING: Voltage higher than typical values!!!")
801        try:
802            response = raw_input(
803                "Voltage of %.3f requested.  Confirm this is correct (Y/N)" %
804                FLAGS.voltage)
805            if response.upper() != "Y":
806                sys.exit("Aborting")
807        except:
808            sys.exit("Aborting.")
809
810    if not FLAGS.power_monitor:
811        sys.exit(
812            "You must specify a '--power_monitor' option to specify which power "
813            "monitor type " +
814            "you are using.\nOne of:\n  \n  ".join(available_monitors))
815    power_monitors = do_import("power_monitors.%s" % FLAGS.power_monitor)
816    try:
817        mon = power_monitors.Power_Monitor(device = FLAGS.device)
818    except:
819        import traceback
820
821        traceback.print_exc()
822        sys.exit("No power monitors found")
823
824    if FLAGS.voltage is not None:
825
826        if FLAGS.ramp is not None:
827            mon.RampVoltage(mon.start_voltage, FLAGS.voltage)
828        else:
829            mon.SetVoltage(FLAGS.voltage)
830
831    if FLAGS.current is not None:
832        mon.SetMaxCurrent(FLAGS.current)
833
834    if FLAGS.status:
835        items = sorted(mon.GetStatus().items())
836        print "\n".join(["%s: %s" % item for item in items])
837
838    if FLAGS.usbpassthrough:
839        if FLAGS.usbpassthrough == "off":
840            mon.SetUsbPassthrough(0)
841        elif FLAGS.usbpassthrough == "on":
842            mon.SetUsbPassthrough(1)
843        elif FLAGS.usbpassthrough == "auto":
844            mon.SetUsbPassthrough(2)
845        else:
846            mon.Close()
847            sys.exit("bad pass-through flag: %s" % FLAGS.usbpassthrough)
848
849    if FLAGS.samples:
850        # Make sure state is normal
851        mon.StopDataCollection()
852        status = mon.GetStatus()
853        native_hz = status["sampleRate"] * 1000
854
855        # Collect and average samples as specified
856        mon.StartDataCollection()
857
858        # In case FLAGS.hz doesn't divide native_hz exactly, use this invariant:
859        # 'offset' = (consumed samples) * FLAGS.hz - (emitted samples) * native_hz
860        # This is the error accumulator in a variation of Bresenham's algorithm.
861        emitted = offset = 0
862        collected = []
863        history_deque = collections.deque()  # past n samples for rolling average
864
865        # TODO: Complicated lines of code below. Refactoring needed
866        try:
867            last_flush = time.time()
868            while emitted < FLAGS.samples or FLAGS.samples == -1:
869                # The number of raw samples to consume before emitting the next output
870                need = (native_hz - offset + FLAGS.hz - 1) / FLAGS.hz
871                if need > len(collected):  # still need more input samples
872                    samples = mon.CollectData()
873                    if not samples: break
874                    collected.extend(samples)
875                else:
876                    # Have enough data, generate output samples.
877                    # Adjust for consuming 'need' input samples.
878                    offset += need * FLAGS.hz
879                    while offset >= native_hz:  # maybe multiple, if FLAGS.hz > native_hz
880                        this_sample = sum(collected[:need]) / need
881
882                        if FLAGS.timestamp: print int(time.time()),
883
884                        if FLAGS.avg:
885                            history_deque.appendleft(this_sample)
886                            if len(history_deque) > FLAGS.avg: history_deque.pop()
887                            print "%f %f" % (this_sample,
888                                             sum(history_deque) / len(history_deque))
889                        else:
890                            print "%f" % this_sample
891                        sys.stdout.flush()
892
893                        offset -= native_hz
894                        emitted += 1  # adjust for emitting 1 output sample
895                    collected = collected[need:]
896                    now = time.time()
897                    if now - last_flush >= 0.99:  # flush every second
898                        sys.stdout.flush()
899                        last_flush = now
900        except KeyboardInterrupt:
901            print("interrupted")
902            return 1
903        finally:
904            mon.Close()
905        return 0
906
907    if FLAGS.run:
908        if not FLAGS.power_monitor:
909            sys.exit(
910                "When running power tests, you must specify which type of power "
911                "monitor to use" +
912                " with '--power_monitor <type of power monitor>'")
913        try:
914            PowerTest.runTests(FLAGS.max_baseline_amps)
915        except KeyboardInterrupt:
916            print "Keyboard interrupt from user"
917
918if __name__ == "__main__":
919    flags.DEFINE_boolean("status", None, "Print power meter status")
920    flags.DEFINE_integer("avg", None,
921                         "Also report average over last n data points")
922    flags.DEFINE_float("voltage", None, "Set output voltage (0 for off)")
923    flags.DEFINE_float("current", None, "Set max output current")
924    flags.DEFINE_string("usbpassthrough", None, "USB control (on, off, auto)")
925    flags.DEFINE_integer("samples", None, "Collect and print this many samples")
926    flags.DEFINE_integer("hz", 5000, "Print this many samples/sec")
927    flags.DEFINE_string("device", None,
928                        "Path to the device in /dev/... (ex:/dev/ttyACM1)")
929    flags.DEFINE_boolean("timestamp", None,
930                         "Also print integer (seconds) timestamp on each line")
931    flags.DEFINE_boolean("ramp", True, "Gradually increase voltage")
932    flags.DEFINE_boolean("log", False, "Log progress to a file or not")
933    flags.DEFINE_boolean("run", False, "Run the test suite for power")
934    flags.DEFINE_string("power_monitor", None, "Type of power monitor to use")
935    flags.DEFINE_float("max_baseline_amps", 0.005,
936                       "Set maximum baseline current for device being tested")
937    sys.exit(main(FLAGS(sys.argv)))
938