1#!/usr/bin/env python3
2#
3#   Copyright 2019 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the 'License');
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an 'AS IS' BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import math
18
19# Metrics timestamp keys
20START_TIMESTAMP = 'start'
21END_TIMESTAMP = 'end'
22
23# Unit type constants
24CURRENT = 'current'
25POWER = 'power'
26TIME = 'time'
27VOLTAGE = 'voltage'
28
29# Unit constants
30MILLIVOLT = 'mV'
31VOLT = 'V'
32MILLIAMP = 'mA'
33AMP = 'A'
34AMPERE = AMP
35MILLIWATT = 'mW'
36WATT = 'W'
37MILLISECOND = 'ms'
38SECOND = 's'
39MINUTE = 'm'
40HOUR = 'h'
41
42CONVERSION_TABLES = {
43    CURRENT: {
44        MILLIAMP: 0.001,
45        AMP: 1
46    },
47    POWER: {
48        MILLIWATT: 0.001,
49        WATT: 1
50    },
51    TIME: {
52        MILLISECOND: 0.001,
53        SECOND: 1,
54        MINUTE: 60,
55        HOUR: 3600
56    },
57    VOLTAGE: {
58        MILLIVOLT: 0.001,
59        VOLT : 1
60    }
61}
62
63
64class Metric(object):
65    """Base class for describing power measurement values. Each object contains
66    an value and a unit. Enables some basic arithmetic operations with other
67    measurements of the same unit type.
68
69    Attributes:
70        value: Numeric value of the measurement
71        _unit_type: Unit type of the measurement (e.g. current, power)
72        unit: Unit of the measurement (e.g. W, mA)
73    """
74
75    def __init__(self, value, unit_type, unit, name=None):
76        if unit_type not in CONVERSION_TABLES:
77            raise TypeError(
78                '%s is not a valid unit type, valid unit types are %s' % (
79                    unit_type, str(CONVERSION_TABLES.keys)))
80        self.value = value
81        self.unit = unit
82        self.name = name
83        self._unit_type = unit_type
84
85    # Convenience constructor methods
86    @staticmethod
87    def amps(amps, name=None):
88        """Create a new current measurement, in amps."""
89        return Metric(amps, CURRENT, AMP, name=name)
90
91    @staticmethod
92    def watts(watts, name=None):
93        """Create a new power measurement, in watts."""
94        return Metric(watts, POWER, WATT, name=name)
95
96    @staticmethod
97    def seconds(seconds, name=None):
98        """Create a new time measurement, in seconds."""
99        return Metric(seconds, TIME, SECOND, name=name)
100
101    # Comparison methods
102
103    def __eq__(self, other):
104        return self.value == other.to_unit(self.unit).value
105
106    def __lt__(self, other):
107        return self.value < other.to_unit(self.unit).value
108
109    def __le__(self, other):
110        return self == other or self < other
111
112    # Addition and subtraction with other measurements
113
114    def __add__(self, other):
115        """Adds measurements of compatible unit types. The result will be in the
116        same units as self.
117        """
118        return Metric(self.value + other.to_unit(self.unit).value,
119                      self._unit_type, self.unit, name=self.name)
120
121    def __sub__(self, other):
122        """Subtracts measurements of compatible unit types. The result will be
123        in the same units as self.
124        """
125        return Metric(self.value - other.to_unit(self.unit).value,
126                      self._unit_type, self.unit, name=self.name)
127
128    # String representation
129
130    def __str__(self):
131        return '%g%s' % (self.value, self.unit)
132
133    def __repr__(self):
134        return str(self)
135
136    def to_unit(self, new_unit):
137        """Create an equivalent measurement under a different unit.
138        e.g. 0.5W -> 500mW
139
140        Args:
141            new_unit: Target unit. Must be compatible with current unit.
142
143        Returns: A new measurement with the converted value and unit.
144        """
145        try:
146            new_value = self.value * (
147                CONVERSION_TABLES[self._unit_type][self.unit] /
148                CONVERSION_TABLES[self._unit_type][new_unit])
149        except KeyError:
150            raise TypeError('Incompatible units: %s, %s' %
151                            (self.unit, new_unit))
152        return Metric(new_value, self._unit_type, new_unit, self.name)
153
154
155def import_raw_data(path):
156    """Create a generator from a Monsoon data file.
157
158    Args:
159        path: path to raw data file
160
161    Returns: generator that yields (timestamp, sample) per line
162    """
163    with open(path, 'r') as f:
164        for line in f:
165            time, sample = line.split()
166            yield float(time[:-1]), float(sample)
167
168
169def generate_test_metrics(raw_data, timestamps=None,
170                          voltage=None):
171    """Split the data into individual test metrics, based on the timestamps
172    given as a dict.
173
174    Args:
175        raw_data: raw data as list or generator of (timestamp, sample)
176        timestamps: dict following the output format of
177            instrumentation_proto_parser.get_test_timestamps()
178        voltage: voltage used during measurements
179    """
180
181    # Initialize metrics for each test
182    if timestamps is None:
183        timestamps = {}
184    test_starts = {}
185    test_ends = {}
186    test_metrics = {}
187    for seg_name, times in timestamps.items():
188        test_metrics[seg_name] = PowerMetrics(voltage)
189        try:
190            test_starts[seg_name] = Metric(
191                times[START_TIMESTAMP], TIME, MILLISECOND).to_unit(
192                SECOND).value
193        except KeyError:
194            raise ValueError(
195                'Missing start timestamp for test scenario "%s". Refer to '
196                'instrumentation_proto.txt for details.' % seg_name)
197        try:
198            test_ends[seg_name] = Metric(
199                times[END_TIMESTAMP], TIME, MILLISECOND).to_unit(
200                SECOND).value
201        except KeyError:
202            raise ValueError(
203                'Missing end timestamp for test scenario "%s". Test '
204                'scenario may have terminated with errors. Refer to '
205                'instrumentation_proto.txt for details.' % seg_name)
206
207    # Assign data to tests based on timestamps
208    for timestamp, amps in raw_data:
209        for seg_name in timestamps:
210            if test_starts[seg_name] <= timestamp <= test_ends[seg_name]:
211                test_metrics[seg_name].update_metrics(amps)
212
213    result = {}
214    for seg_name, power_metrics in test_metrics.items():
215        result[seg_name] = [
216            power_metrics.avg_current,
217            power_metrics.max_current,
218            power_metrics.min_current,
219            power_metrics.stdev_current,
220            power_metrics.avg_power]
221    return result
222
223
224class PowerMetrics(object):
225    """Class for processing raw power metrics generated by Monsoon measurements.
226    Provides useful metrics such as average current, max current, and average
227    power. Can generate individual test metrics.
228
229    See section "Numeric metrics" below for available metrics.
230    """
231
232    def __init__(self, voltage):
233        """Create a PowerMetrics.
234
235        Args:
236            voltage: Voltage of the measurement
237        """
238        self._voltage = voltage
239        self._num_samples = 0
240        self._sum_currents = 0
241        self._sum_squares = 0
242        self._max_current = None
243        self._min_current = None
244        self.test_metrics = {}
245
246    def update_metrics(self, sample):
247        """Update the running metrics with the current sample.
248
249        Args:
250            sample: A current sample in Amps.
251        """
252        self._num_samples += 1
253        self._sum_currents += sample
254        self._sum_squares += sample ** 2
255        if self._max_current is None or sample > self._max_current:
256            self._max_current = sample
257        if self._min_current is None or sample < self._min_current:
258            self._min_current = sample
259
260    # Numeric metrics
261    @property
262    def avg_current(self):
263        """Average current, in milliamps."""
264        if not self._num_samples:
265            return Metric.amps(0).to_unit(MILLIAMP)
266        return (Metric.amps(self._sum_currents / self._num_samples,
267                            'avg_current')
268                .to_unit(MILLIAMP))
269
270    @property
271    def max_current(self):
272        """Max current, in milliamps."""
273        return Metric.amps(self._max_current or 0, 'max_current').to_unit(
274            MILLIAMP)
275
276    @property
277    def min_current(self):
278        """Min current, in milliamps."""
279        return Metric.amps(self._min_current or 0, 'min_current').to_unit(
280            MILLIAMP)
281
282    @property
283    def stdev_current(self):
284        """Standard deviation of current values, in milliamps."""
285        if self._num_samples < 2:
286            return Metric.amps(0, 'stdev_current').to_unit(MILLIAMP)
287        stdev = math.sqrt(
288            (self._sum_squares - (
289                self._num_samples * self.avg_current.to_unit(AMP).value ** 2))
290            / (self._num_samples - 1))
291        return Metric.amps(stdev, 'stdev_current').to_unit(MILLIAMP)
292
293    @property
294    def avg_power(self):
295        """Average power, in milliwatts."""
296        return Metric.watts(self.avg_current.to_unit(AMP).value * self._voltage,
297                            'avg_power').to_unit(MILLIWATT)
298