1#!/usr/bin/env python3
2#
3#   Copyright 2019 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16import logging
17import time
18
19from acts.controllers.monsoon_lib.api import common
20from acts.controllers.monsoon_lib.api.common import MonsoonError
21from acts.controllers.monsoon_lib.api.common import PassthroughStates
22
23
24class BaseMonsoon(object):
25    """The base class for all Monsoon interface devices.
26
27    Attributes:
28        on_reconnect: The function to call when Monsoon has reconnected USB.
29            Raises TimeoutError if the device cannot be found.
30        on_disconnect: The function to call when Monsoon has disconnected USB.
31    """
32
33    # The minimum non-zero supported voltage for the given Monsoon device.
34    MIN_VOLTAGE = NotImplemented
35
36    # The maximum practical voltage for the given Monsoon device.
37    MAX_VOLTAGE = NotImplemented
38
39    # When ramping voltage, the rate in volts/second to increase the voltage.
40    VOLTAGE_RAMP_RATE = 3
41
42    # The time step between voltage increments. This value does not need to be
43    # modified.
44    VOLTAGE_RAMP_TIME_STEP = .1
45
46    def __init__(self):
47        self._log = logging.getLogger()
48        self.on_disconnect = lambda: None
49        self.on_reconnect = lambda: None
50
51    @classmethod
52    def get_closest_valid_voltage(cls, voltage):
53        """Returns the nearest valid voltage value."""
54        if voltage < cls.MIN_VOLTAGE / 2:
55            return 0
56        else:
57            return max(cls.MIN_VOLTAGE, min(voltage, cls.MAX_VOLTAGE))
58
59    @classmethod
60    def is_voltage_valid(cls, voltage):
61        """Returns True iff the given voltage can be set on the device.
62
63        Valid voltage values are {x | x ∈ {0} ∪ [MIN_VOLTAGE, MAX_VOLTAGE]}.
64        """
65        return cls.get_closest_valid_voltage(voltage) == voltage
66
67    @classmethod
68    def validate_voltage(cls, voltage):
69        """Raises a MonsoonError if the given voltage cannot be set."""
70        if not cls.is_voltage_valid(voltage):
71            raise MonsoonError('Invalid voltage %s. Voltage must be zero or '
72                               'within range [%s, %s].' %
73                               (voltage, cls.MIN_VOLTAGE, cls.MAX_VOLTAGE))
74
75    def set_voltage_safe(self, voltage):
76        """Sets the output voltage of monsoon to a safe value.
77
78        This function is effectively:
79            self.set_voltage(self.get_closest_valid_voltage(voltage)).
80
81        Args:
82            voltage: The voltage to set the output to.
83        """
84        normalized_voltage = self.get_closest_valid_voltage(voltage)
85        if voltage != normalized_voltage:
86            self._log.debug(
87                'Requested voltage %sV is invalid.' % normalized_voltage)
88        self.set_voltage(normalized_voltage)
89
90    def ramp_voltage(self, start, end):
91        """Ramps up the voltage to the specified end voltage.
92
93        Increments the voltage by fixed intervals of .1 Volts every .1 seconds.
94
95        Args:
96            start: The starting voltage
97            end: the end voltage. Must be higher than the starting voltage.
98        """
99        voltage = start
100
101        while voltage < end:
102            self.set_voltage(self.get_closest_valid_voltage(voltage))
103            voltage += self.VOLTAGE_RAMP_RATE * self.VOLTAGE_RAMP_TIME_STEP
104            time.sleep(self.VOLTAGE_RAMP_TIME_STEP)
105        self.set_voltage(end)
106
107    def usb(self, state):
108        """Sets the monsoon's USB passthrough mode.
109
110        This is specific to the USB port in front of the monsoon box which
111        connects to the powered device, NOT the USB that is used to talk to the
112        monsoon itself.
113
114        Args:
115            state: The state to set the USB passthrough to. Can either be the
116                string name of the state or the integer value.
117
118                "Off" or 0 means USB always off.
119                "On" or 1 means USB always on.
120                "Auto" or 2 means USB is automatically turned off during
121                    sampling, and turned back on after sampling.
122
123        Raises:
124            ValueError if the state given is invalid.
125            TimeoutError if unable to set the passthrough mode within a minute,
126                or if the device was not found after setting the state to ON.
127        """
128        expected_state = None
129        states_dict = common.PASSTHROUGH_STATES
130        if isinstance(state, str):
131            normalized_state = state.lower()
132            expected_state = states_dict.get(normalized_state, None)
133        elif state in states_dict.values():
134            expected_state = state
135
136        if expected_state is None:
137            raise ValueError(
138                'USB passthrough state %s is not a valid state. '
139                'Expected any of %s.' % (repr(state), states_dict))
140        if self.status.usbPassthroughMode == expected_state:
141            return
142
143        if expected_state in [PassthroughStates.OFF, PassthroughStates.AUTO]:
144            self.on_disconnect()
145
146        start_time = time.time()
147        time_limit_seconds = 60
148        while self.status.usbPassthroughMode != expected_state:
149            current_time = time.time()
150            if current_time >= start_time + time_limit_seconds:
151                raise TimeoutError('Setting USB mode timed out after %s '
152                                   'seconds.' % time_limit_seconds)
153            self._set_usb_passthrough_mode(expected_state)
154            time.sleep(1)
155        self._log.info('Monsoon usbPassthroughMode is now "%s"',
156                       state)
157
158        if expected_state in [PassthroughStates.ON]:
159            self._on_reconnect()
160
161    def attach_device(self, android_device):
162        """Deprecated. Use the connection callbacks instead."""
163
164        def on_reconnect():
165            # Make sure the device is connected and available for commands.
166            android_device.wait_for_boot_completion()
167            android_device.start_services()
168            # Release wake lock to put device into sleep.
169            android_device.droid.goToSleepNow()
170            self._log.info('Dut reconnected.')
171
172        def on_disconnect():
173            android_device.stop_services()
174            time.sleep(1)
175
176        self.on_reconnect = on_reconnect
177        self.on_disconnect = on_disconnect
178
179    def set_on_disconnect(self, callback):
180        """Sets the callback to be called when Monsoon disconnects USB."""
181        self.on_disconnect = callback
182
183    def set_on_reconnect(self, callback):
184        """Sets the callback to be called when Monsoon reconnects USB."""
185        self.on_reconnect = callback
186
187    def take_samples(self, assembly_line):
188        """Runs the sampling procedure based on the given assembly line."""
189        # Sampling is always done in a separate process. Release the Monsoon
190        # so the child process can sample from the Monsoon.
191        self.release_monsoon_connection()
192
193        try:
194            assembly_line.run()
195        finally:
196            self.establish_monsoon_connection()
197
198    def measure_power(self,
199                      duration,
200                      measure_after_seconds=0,
201                      hz=5000,
202                      output_path=None,
203                      transformers=None):
204        """Measure power consumption of the attached device.
205
206        This function is a default implementation of measuring power consumption
207        during gathering measurements. For offline methods, use take_samples()
208        with a custom AssemblyLine.
209
210        Args:
211            duration: Amount of time to measure power for. Note:
212                total_duration = duration + measure_after_seconds
213            measure_after_seconds: Number of seconds to wait before beginning
214                reading measurement.
215            hz: The number of samples to collect per second. Must be a factor
216                of 5000.
217            output_path: The location to write the gathered data to.
218            transformers: A list of Transformer objects that receive passed-in
219                          samples. Runs in order sent.
220
221        Returns:
222            A MonsoonData object with the measured power data.
223        """
224        raise NotImplementedError()
225
226    def set_voltage(self, voltage):
227        """Sets the output voltage of monsoon.
228
229        Args:
230            voltage: The voltage to set the output to.
231        """
232        raise NotImplementedError()
233
234    def set_max_current(self, amperes):
235        """Sets monsoon's max output current.
236
237        Args:
238            amperes: The max current in A.
239        """
240        raise NotImplementedError()
241
242    def set_max_initial_current(self, amperes):
243        """Sets the max power-up/initial current.
244
245        Args:
246            amperes: The max initial current allowed in amperes.
247        """
248        raise NotImplementedError()
249
250    @property
251    def status(self):
252        """Gets the status params of monsoon.
253
254        Returns:
255            A dictionary of {status param, value} key-value pairs.
256        """
257        raise NotImplementedError()
258
259    def _on_reconnect(self):
260        """Reconnects the DUT over USB.
261
262        Raises:
263            TimeoutError upon failure to reconnect over USB.
264        """
265        self._log.info('Reconnecting dut.')
266        # Wait for two seconds to ensure that the device is ready, then
267        # attempt to reconnect. If reconnect times out, reset the passthrough
268        # state and try again.
269        time.sleep(2)
270        try:
271            self.on_reconnect()
272        except TimeoutError as err:
273            self._log.info('Toggling USB and trying again. %s' % err)
274            self.usb(PassthroughStates.OFF)
275            time.sleep(1)
276            self.usb(PassthroughStates.ON)
277            self.on_reconnect()
278
279    def _set_usb_passthrough_mode(self, mode):
280        """Makes the underlying Monsoon call to set passthrough mode."""
281        raise NotImplementedError()
282
283    def reconnect_monsoon(self):
284        """Reconnects the Monsoon Serial/USB connection."""
285        raise NotImplementedError()
286
287    def release_monsoon_connection(self):
288        """Releases the underlying monsoon Serial or USB connection.
289
290        Useful for allowing other processes access to the device.
291        """
292        raise NotImplementedError()
293
294    def establish_monsoon_connection(self):
295        """Establishes the underlying monsoon Serial or USB connection."""
296        raise NotImplementedError()
297