1#!/usr/bin/env python3
2#
3#   Copyright 2019 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the 'License');
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an 'AS IS' BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import math
18
19from acts.test_utils.instrumentation import instrumentation_proto_parser \
20    as parser
21from acts.test_utils.instrumentation.instrumentation_base_test \
22    import InstrumentationTestError
23
24# Unit type constants
25CURRENT = 'current'
26POWER = 'power'
27TIME = 'time'
28
29# Unit constants
30MILLIAMP = 'mA'
31AMP = 'A'
32AMPERE = AMP
33MILLIWATT = 'mW'
34WATT = 'W'
35MILLISECOND = 'ms'
36SECOND = 's'
37MINUTE = 'm'
38HOUR = 'h'
39
40CONVERSION_TABLES = {
41    CURRENT: {
42        MILLIAMP: 0.001,
43        AMP: 1
44    },
45    POWER: {
46        MILLIWATT: 0.001,
47        WATT: 1
48    },
49    TIME: {
50        MILLISECOND: 0.001,
51        SECOND: 1,
52        MINUTE: 60,
53        HOUR: 3600
54    }
55}
56
57
58class Measurement(object):
59    """Base class for describing power measurement values. Each object contains
60    an value and a unit. Enables some basic arithmetic operations with other
61    measurements of the same unit type.
62
63    Attributes:
64        _value: Numeric value of the measurement
65        _unit_type: Unit type of the measurement (e.g. current, power)
66        _unit: Unit of the measurement (e.g. W, mA)
67    """
68
69    def __init__(self, value, unit_type, unit):
70        if unit_type not in CONVERSION_TABLES:
71            raise TypeError('%s is not a valid unit type' % unit_type)
72        self._value = value
73        self._unit_type = unit_type
74        self._unit = unit
75
76    # Convenience constructor methods
77    @staticmethod
78    def amps(amps):
79        """Create a new current measurement, in amps."""
80        return Measurement(amps, CURRENT, AMP)
81
82    @staticmethod
83    def watts(watts):
84        """Create a new power measurement, in watts."""
85        return Measurement(watts, POWER, WATT)
86
87    @staticmethod
88    def seconds(seconds):
89        """Create a new time measurement, in seconds."""
90        return Measurement(seconds, TIME, SECOND)
91
92    # Comparison methods
93
94    def __eq__(self, other):
95        return self.value == other.to_unit(self._unit).value
96
97    def __lt__(self, other):
98        return self.value < other.to_unit(self._unit).value
99
100    def __le__(self, other):
101        return self == other or self < other
102
103    # Addition and subtraction with other measurements
104
105    def __add__(self, other):
106        """Adds measurements of compatible unit types. The result will be in the
107        same units as self.
108        """
109        return Measurement(self.value + other.to_unit(self._unit).value,
110                           self._unit_type, self._unit)
111
112    def __sub__(self, other):
113        """Subtracts measurements of compatible unit types. The result will be
114        in the same units as self.
115        """
116        return Measurement(self.value - other.to_unit(self._unit).value,
117                           self._unit_type, self._unit)
118
119    # String representation
120
121    def __str__(self):
122        return '%g%s' % (self._value, self._unit)
123
124    def __repr__(self):
125        return str(self)
126
127    @property
128    def unit(self):
129        return self._unit
130
131    @property
132    def value(self):
133        return self._value
134
135    def to_unit(self, new_unit):
136        """Create an equivalent measurement under a different unit.
137        e.g. 0.5W -> 500mW
138
139        Args:
140            new_unit: Target unit. Must be compatible with current unit.
141
142        Returns: A new measurement with the converted value and unit.
143        """
144        try:
145            new_value = self._value * (
146                CONVERSION_TABLES[self._unit_type][self._unit] /
147                CONVERSION_TABLES[self._unit_type][new_unit])
148        except KeyError:
149            raise TypeError('Incompatible units: %s, %s' %
150                            (self._unit, new_unit))
151        return Measurement(new_value, self._unit_type, new_unit)
152
153
154class PowerMetrics(object):
155    """Class for processing raw power metrics generated by Monsoon measurements.
156    Provides useful metrics such as average current, max current, and average
157    power. Can generate individual test metrics.
158
159    See section "Numeric metrics" below for available metrics.
160    """
161
162    def __init__(self, voltage, start_time=0):
163        """Create a PowerMetrics.
164
165        Args:
166            voltage: Voltage of the measurement
167            start_time: Start time of the measurement. Used for generating
168                test-specific metrics.
169        """
170        self._voltage = voltage
171        self._start_time = start_time
172        self._num_samples = 0
173        self._sum_currents = 0
174        self._sum_squares = 0
175        self._max_current = None
176        self._min_current = None
177        self.test_metrics = {}
178
179    @staticmethod
180    def import_raw_data(path):
181        """Create a generator from a Monsoon data file.
182
183        Args:
184            path: path to raw data file
185
186        Returns: generator that yields (timestamp, sample) per line
187        """
188        with open(path, 'r') as f:
189            for line in f:
190                time, sample = line.split()
191                yield float(time[:-1]), float(sample)
192
193    def update_metrics(self, sample):
194        """Update the running metrics with the current sample.
195
196        Args:
197            sample: A current sample in Amps.
198        """
199        self._num_samples += 1
200        self._sum_currents += sample
201        self._sum_squares += sample ** 2
202        if self._max_current is None or sample > self._max_current:
203            self._max_current = sample
204        if self._min_current is None or sample < self._min_current:
205            self._min_current = sample
206
207    def generate_test_metrics(self, raw_data, test_timestamps=None):
208        """Split the data into individual test metrics, based on the timestamps
209        given as a dict.
210
211        Args:
212            raw_data: raw data as list or generator of (timestamp, sample)
213            test_timestamps: dict following the output format of
214                instrumentation_proto_parser.get_test_timestamps()
215        """
216
217        # Initialize metrics for each test
218        if test_timestamps is None:
219            test_timestamps = {}
220        test_starts = {}
221        test_ends = {}
222        for test_name, times in test_timestamps.items():
223            self.test_metrics[test_name] = PowerMetrics(
224                self._voltage, self._start_time)
225            try:
226                test_starts[test_name] = Measurement(
227                    times[parser.START_TIMESTAMP], TIME, MILLISECOND) \
228                                             .to_unit(SECOND).value - self._start_time
229            except KeyError:
230                raise InstrumentationTestError(
231                    'Missing start timestamp for test scenario "%s". Refer to '
232                    'instrumentation_proto.txt for details.' % test_name)
233            try:
234                test_ends[test_name] = Measurement(
235                    times[parser.END_TIMESTAMP], TIME, MILLISECOND) \
236                                           .to_unit(SECOND).value - self._start_time
237            except KeyError:
238                raise InstrumentationTestError(
239                    'Missing end timestamp for test scenario "%s". Test '
240                    'scenario may have terminated with errors. Refer to '
241                    'instrumentation_proto.txt for details.' % test_name)
242
243        # Assign data to tests based on timestamps
244        for timestamp, sample in raw_data:
245            self.update_metrics(sample)
246            for test_name in test_timestamps:
247                if test_starts[test_name] <= timestamp <= test_ends[test_name]:
248                    self.test_metrics[test_name].update_metrics(sample)
249
250    # Numeric metrics
251
252    ALL_METRICS = ('avg_current', 'max_current', 'min_current', 'stdev_current',
253                   'avg_power')
254
255    @property
256    def avg_current(self):
257        """Average current, in milliamps."""
258        if not self._num_samples:
259            return Measurement.amps(0).to_unit(MILLIAMP)
260        return (Measurement.amps(self._sum_currents / self._num_samples)
261                .to_unit(MILLIAMP))
262
263    @property
264    def max_current(self):
265        """Max current, in milliamps."""
266        return Measurement.amps(self._max_current or 0).to_unit(MILLIAMP)
267
268    @property
269    def min_current(self):
270        """Min current, in milliamps."""
271        return Measurement.amps(self._min_current or 0).to_unit(MILLIAMP)
272
273    @property
274    def stdev_current(self):
275        """Standard deviation of current values, in milliamps."""
276        if self._num_samples < 2:
277            return Measurement.amps(0).to_unit(MILLIAMP)
278        stdev = math.sqrt(
279            (self._sum_squares - (
280                self._num_samples * self.avg_current.to_unit(AMP).value ** 2))
281            / (self._num_samples - 1))
282        return Measurement.amps(stdev).to_unit(MILLIAMP)
283
284    def current_to_power(self, current):
285        """Converts a current value to a power value."""
286        return (Measurement.watts(current.to_unit(AMP).value * self._voltage))
287
288    @property
289    def avg_power(self):
290        """Average power, in milliwatts."""
291        return self.current_to_power(self.avg_current).to_unit(MILLIWATT)
292
293    @property
294    def summary(self):
295        """A summary of test metrics"""
296        return {'average_current': str(self.avg_current),
297                'max_current': str(self.max_current),
298                'average_power': str(self.avg_power)}
299