1#!/usr/bin/env python3 2# 3# Copyright 2019 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the 'License'); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an 'AS IS' BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import math 18 19from acts.test_utils.instrumentation import instrumentation_proto_parser \ 20 as parser 21from acts.test_utils.instrumentation.instrumentation_base_test \ 22 import InstrumentationTestError 23 24# Unit type constants 25CURRENT = 'current' 26POWER = 'power' 27TIME = 'time' 28 29# Unit constants 30MILLIAMP = 'mA' 31AMP = 'A' 32AMPERE = AMP 33MILLIWATT = 'mW' 34WATT = 'W' 35MILLISECOND = 'ms' 36SECOND = 's' 37MINUTE = 'm' 38HOUR = 'h' 39 40CONVERSION_TABLES = { 41 CURRENT: { 42 MILLIAMP: 0.001, 43 AMP: 1 44 }, 45 POWER: { 46 MILLIWATT: 0.001, 47 WATT: 1 48 }, 49 TIME: { 50 MILLISECOND: 0.001, 51 SECOND: 1, 52 MINUTE: 60, 53 HOUR: 3600 54 } 55} 56 57 58class Measurement(object): 59 """Base class for describing power measurement values. Each object contains 60 an value and a unit. Enables some basic arithmetic operations with other 61 measurements of the same unit type. 62 63 Attributes: 64 _value: Numeric value of the measurement 65 _unit_type: Unit type of the measurement (e.g. current, power) 66 _unit: Unit of the measurement (e.g. W, mA) 67 """ 68 69 def __init__(self, value, unit_type, unit): 70 if unit_type not in CONVERSION_TABLES: 71 raise TypeError('%s is not a valid unit type' % unit_type) 72 self._value = value 73 self._unit_type = unit_type 74 self._unit = unit 75 76 # Convenience constructor methods 77 @staticmethod 78 def amps(amps): 79 """Create a new current measurement, in amps.""" 80 return Measurement(amps, CURRENT, AMP) 81 82 @staticmethod 83 def watts(watts): 84 """Create a new power measurement, in watts.""" 85 return Measurement(watts, POWER, WATT) 86 87 @staticmethod 88 def seconds(seconds): 89 """Create a new time measurement, in seconds.""" 90 return Measurement(seconds, TIME, SECOND) 91 92 # Comparison methods 93 94 def __eq__(self, other): 95 return self.value == other.to_unit(self._unit).value 96 97 def __lt__(self, other): 98 return self.value < other.to_unit(self._unit).value 99 100 def __le__(self, other): 101 return self == other or self < other 102 103 # Addition and subtraction with other measurements 104 105 def __add__(self, other): 106 """Adds measurements of compatible unit types. The result will be in the 107 same units as self. 108 """ 109 return Measurement(self.value + other.to_unit(self._unit).value, 110 self._unit_type, self._unit) 111 112 def __sub__(self, other): 113 """Subtracts measurements of compatible unit types. The result will be 114 in the same units as self. 115 """ 116 return Measurement(self.value - other.to_unit(self._unit).value, 117 self._unit_type, self._unit) 118 119 # String representation 120 121 def __str__(self): 122 return '%g%s' % (self._value, self._unit) 123 124 def __repr__(self): 125 return str(self) 126 127 @property 128 def unit(self): 129 return self._unit 130 131 @property 132 def value(self): 133 return self._value 134 135 def to_unit(self, new_unit): 136 """Create an equivalent measurement under a different unit. 137 e.g. 0.5W -> 500mW 138 139 Args: 140 new_unit: Target unit. Must be compatible with current unit. 141 142 Returns: A new measurement with the converted value and unit. 143 """ 144 try: 145 new_value = self._value * ( 146 CONVERSION_TABLES[self._unit_type][self._unit] / 147 CONVERSION_TABLES[self._unit_type][new_unit]) 148 except KeyError: 149 raise TypeError('Incompatible units: %s, %s' % 150 (self._unit, new_unit)) 151 return Measurement(new_value, self._unit_type, new_unit) 152 153 154class PowerMetrics(object): 155 """Class for processing raw power metrics generated by Monsoon measurements. 156 Provides useful metrics such as average current, max current, and average 157 power. Can generate individual test metrics. 158 159 See section "Numeric metrics" below for available metrics. 160 """ 161 162 def __init__(self, voltage, start_time=0): 163 """Create a PowerMetrics. 164 165 Args: 166 voltage: Voltage of the measurement 167 start_time: Start time of the measurement. Used for generating 168 test-specific metrics. 169 """ 170 self._voltage = voltage 171 self._start_time = start_time 172 self._num_samples = 0 173 self._sum_currents = 0 174 self._sum_squares = 0 175 self._max_current = None 176 self._min_current = None 177 self.test_metrics = {} 178 179 @staticmethod 180 def import_raw_data(path): 181 """Create a generator from a Monsoon data file. 182 183 Args: 184 path: path to raw data file 185 186 Returns: generator that yields (timestamp, sample) per line 187 """ 188 with open(path, 'r') as f: 189 for line in f: 190 time, sample = line.split() 191 yield float(time[:-1]), float(sample) 192 193 def update_metrics(self, sample): 194 """Update the running metrics with the current sample. 195 196 Args: 197 sample: A current sample in Amps. 198 """ 199 self._num_samples += 1 200 self._sum_currents += sample 201 self._sum_squares += sample ** 2 202 if self._max_current is None or sample > self._max_current: 203 self._max_current = sample 204 if self._min_current is None or sample < self._min_current: 205 self._min_current = sample 206 207 def generate_test_metrics(self, raw_data, test_timestamps=None): 208 """Split the data into individual test metrics, based on the timestamps 209 given as a dict. 210 211 Args: 212 raw_data: raw data as list or generator of (timestamp, sample) 213 test_timestamps: dict following the output format of 214 instrumentation_proto_parser.get_test_timestamps() 215 """ 216 217 # Initialize metrics for each test 218 if test_timestamps is None: 219 test_timestamps = {} 220 test_starts = {} 221 test_ends = {} 222 for test_name, times in test_timestamps.items(): 223 self.test_metrics[test_name] = PowerMetrics( 224 self._voltage, self._start_time) 225 try: 226 test_starts[test_name] = Measurement( 227 times[parser.START_TIMESTAMP], TIME, MILLISECOND) \ 228 .to_unit(SECOND).value - self._start_time 229 except KeyError: 230 raise InstrumentationTestError( 231 'Missing start timestamp for test scenario "%s". Refer to ' 232 'instrumentation_proto.txt for details.' % test_name) 233 try: 234 test_ends[test_name] = Measurement( 235 times[parser.END_TIMESTAMP], TIME, MILLISECOND) \ 236 .to_unit(SECOND).value - self._start_time 237 except KeyError: 238 raise InstrumentationTestError( 239 'Missing end timestamp for test scenario "%s". Test ' 240 'scenario may have terminated with errors. Refer to ' 241 'instrumentation_proto.txt for details.' % test_name) 242 243 # Assign data to tests based on timestamps 244 for timestamp, sample in raw_data: 245 self.update_metrics(sample) 246 for test_name in test_timestamps: 247 if test_starts[test_name] <= timestamp <= test_ends[test_name]: 248 self.test_metrics[test_name].update_metrics(sample) 249 250 # Numeric metrics 251 252 ALL_METRICS = ('avg_current', 'max_current', 'min_current', 'stdev_current', 253 'avg_power') 254 255 @property 256 def avg_current(self): 257 """Average current, in milliamps.""" 258 if not self._num_samples: 259 return Measurement.amps(0).to_unit(MILLIAMP) 260 return (Measurement.amps(self._sum_currents / self._num_samples) 261 .to_unit(MILLIAMP)) 262 263 @property 264 def max_current(self): 265 """Max current, in milliamps.""" 266 return Measurement.amps(self._max_current or 0).to_unit(MILLIAMP) 267 268 @property 269 def min_current(self): 270 """Min current, in milliamps.""" 271 return Measurement.amps(self._min_current or 0).to_unit(MILLIAMP) 272 273 @property 274 def stdev_current(self): 275 """Standard deviation of current values, in milliamps.""" 276 if self._num_samples < 2: 277 return Measurement.amps(0).to_unit(MILLIAMP) 278 stdev = math.sqrt( 279 (self._sum_squares - ( 280 self._num_samples * self.avg_current.to_unit(AMP).value ** 2)) 281 / (self._num_samples - 1)) 282 return Measurement.amps(stdev).to_unit(MILLIAMP) 283 284 def current_to_power(self, current): 285 """Converts a current value to a power value.""" 286 return (Measurement.watts(current.to_unit(AMP).value * self._voltage)) 287 288 @property 289 def avg_power(self): 290 """Average power, in milliwatts.""" 291 return self.current_to_power(self.avg_current).to_unit(MILLIWATT) 292 293 @property 294 def summary(self): 295 """A summary of test metrics""" 296 return {'average_current': str(self.avg_current), 297 'max_current': str(self.max_current), 298 'average_power': str(self.avg_power)} 299