1# Copyright 2018 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Helper class for power autotests requiring telemetry devices."""
6
7import logging
8import time
9
10import numpy
11
12CUSTOM_START = 'PowerTelemetryLogger custom start.'
13CUSTOM_END = 'PowerTelemetryLogger custom end.'
14INTERPOLATION_RESOLUTION = 6
15
16
17class TelemetryUtilsError(Exception):
18    """Error class for issues using these utilities."""
19
20
21def interpolate_missing_data(data, max_nan_ratio=None, max_sample_gap=None,
22                             max_sample_time_gap=None, timeline=None):
23    """Interpolate missing power readings in data.
24
25    @param data: array of values
26    @min_nan_ratio: optional, float, max acceptable ratio of NaN to real values
27    @max_sample_gap: optional, int, max acceptable number of NaN in a row
28    @max_sample_time_gap: optional, float, max measurement gap in seconds
29                       Note: supplying max_nan_time_gap requires timeline
30    @timeline: array of same size as |data| with timeline info for each sample
31
32    @returns: list, array |data| with missing values interpolated.
33    @raises: TelemetryUtilsError if
34              - the ratio of NaN is higher than |max_nan_ratio| (if supplied)
35              - no NaN gap is larger than |max_sample_gap| (if supplied)
36              - no NaN gap takes more time in |timeline| than
37                |max_sample_time_gap| (if supplied)
38              - all values in |data| are NaN.
39    """
40    if max_sample_time_gap is not None and timeline is None:
41        # These are mutually required.
42        raise TelemetryUtilsError('Supplying max_sample_time_gap requires a '
43                                  'timeline.')
44    data = numpy.array(data)
45    nan_data = numpy.isnan(data)
46    if max_nan_ratio:
47        # Validate the ratio if a ratio is supplied.
48        nan_ratio = float(sum(nan_data)) / len(data)
49        if nan_ratio > max_nan_ratio:
50            # There are too many errors in this source.
51            # Throw an error so the user has a chance to adjust their power
52            # collection setup.
53            raise TelemetryUtilsError('NaN ratio of %.02f '
54                                      ' - Max is %.02f.' % (nan_ratio,
55                                                            max_nan_ratio))
56    if max_sample_gap is not None or max_sample_time_gap is not None:
57        # Flag to keep track whether the loop is in a measurement gap (NaN).
58        consecutive_nan_start = None
59        # Add a dummy at the end to make sure the iteration covers all real
60        # examples.
61        for i, isnan in enumerate(numpy.append(nan_data, False)):
62            if isnan and consecutive_nan_start is None:
63                consecutive_nan_start = i
64            if not isnan and consecutive_nan_start is not None:
65                consecutive_nans = i - consecutive_nan_start
66                if max_sample_gap and consecutive_nans >= max_sample_gap:
67                    # Reject if there are too many consecutive failures.
68                    raise TelemetryUtilsError('Too many consecutive NaN samples'
69                                              ': %d.' % consecutive_nans)
70                if max_sample_time_gap:
71                    # Checks whether the first valid timestamp before the
72                    # gap exists and whether the first valid timestamp after the
73                    # gap exists.
74                    if consecutive_nan_start == 0 or i == len(data):
75                        # We cannot determine the gap timeline properly here
76                        # as the gap either starts or ends with the time.
77                        # Ignore for now.
78                        continue
79                    sample_time_gap = (timeline[i] -
80                                       timeline[consecutive_nan_start-1])
81                    if sample_time_gap > max_sample_time_gap:
82                        raise TelemetryUtilsError('Excessively long sample gap '
83                                                  'of %.02fs. Longest '
84                                                  'permissible gap is %.02fs.'
85                                                  % (sample_time_gap,
86                                                     max_sample_time_gap))
87
88                # Reset the flag for the next gap.
89                consecutive_nan_start = None
90    # At this point the data passed all validations required.
91    sample_idx = numpy.arange(len(data))[[~nan_data]]
92    sample_vals = data[[~nan_data]]
93    if not len(sample_idx):
94        raise TelemetryUtilsError('Data has no valid readings. Cannot '
95                                  'interpolate.')
96    output = numpy.interp(range(len(data)), sample_idx, sample_vals)
97    return [round(x, INTERPOLATION_RESOLUTION) for x in output]
98
99def log_event_ts(message=None, timestamp=None, offset=0):
100    """Log the event and timestamp for parsing later.
101
102    @param message: description of the event.
103    @param timestamp: timestamp to for the event, if not provided, default to
104           current time. Local seconds since epoch.
105    @param offset: offset in seconds from the provided timestamp, or offset from
106           current time if timestamp is not provided. Can be positive or
107           negative.
108    """
109    if not message:
110        return
111    if timestamp:
112        ts = timestamp + offset
113    else:
114        ts = time.time() + offset
115    logging.debug("%s %s", message, ts)
116
117def start_measurement(timestamp=None, offset=0):
118    """Mark the start of power telemetry measurement.
119
120    Optional. Use only once in the client side test that is wrapped in the
121    power measurement wrapper tests to help pinpoint exactly where power
122    telemetry data should start. PowerTelemetryLogger will trim off excess data
123    before this point. If not used, power telemetry data will start right before
124    the client side test.
125    @param timestamp: timestamp for the start of measurement, if not provided,
126           default to current time. Local seconds since epoch.
127    @param offset: offset in seconds from the provided timestamp, or offset from
128           current time if timestamp is not provided. Can be positive or
129           negative.
130    """
131    log_event_ts(CUSTOM_START, timestamp, offset)
132
133def end_measurement(timestamp=None, offset=0):
134    """Mark the end of power telemetry measurement.
135
136    Optional. Use only once in the client side test that is wrapped in the
137    power measurement wrapper tests to help pinpoint exactly where power
138    telemetry data should end. PowerTelemetryLogger will trim off excess data
139    after this point. If not used, power telemetry data will end right after the
140    client side test.
141    @param timestamp: timestamp for the end of measurement, if not provided,
142           default to current time. Local seconds since epoch.
143    @param offset: offset in seconds from the provided timestamp, or offset from
144           current time if timestamp is not provided. Can be positive or
145           negative.
146    """
147    log_event_ts(CUSTOM_END, timestamp, offset)
148