1#!/usr/bin/env python 2# 3# Copyright 2016 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17from enum import Enum 18from time import sleep 19 20from acts.controllers.relay_lib.errors import RelayConfigError 21 22 23class RelayState(Enum): 24 """Enum for possible Relay States.""" 25 # Pretend this means 'OFF' 26 NO = 'NORMALLY_OPEN' 27 # Pretend this means 'ON' 28 NC = 'NORMALLY_CLOSED' 29 30 31class SynchronizeRelays: 32 """A class that allows for relays to change state nearly simultaneously. 33 34 Can be used with the 'with' statement in Python: 35 36 with SynchronizeRelays(): 37 relay1.set_no() 38 relay2.set_nc() 39 40 Note that the thread will still wait for RELAY_TRANSITION_WAIT_TIME 41 after execution leaves the 'with' statement. 42 """ 43 _sync_sleep_flag = False 44 45 def __enter__(self): 46 self.prev_toggle_time = Relay.transition_wait_time 47 self.prev_sync_flag = SynchronizeRelays._sync_sleep_flag 48 Relay.transition_wait_time = 0 49 SynchronizeRelays._sync_sleep_flag = False 50 51 def __exit__(self, type, value, traceback): 52 if SynchronizeRelays._sync_sleep_flag: 53 sleep(Relay.transition_wait_time) 54 55 Relay.transition_wait_time = self.prev_toggle_time 56 SynchronizeRelays._sync_sleep_flag = self.prev_sync_flag 57 58 59class Relay(object): 60 """A class representing a single relay switch on a RelayBoard. 61 62 References to these relays are stored in both the RelayBoard and the 63 RelayDevice classes under the variable "relays". GenericRelayDevice can also 64 access these relays through the subscript ([]) operator. 65 66 At the moment, relays only have a valid state of 'ON' or 'OFF'. This may be 67 extended in a subclass if needed. Keep in mind that if this is done, changes 68 will also need to be made in the RelayRigParser class to initialize the 69 relays. 70 71 """ 72 """How long to wait for relays to transition state.""" 73 transition_wait_time = .2 74 button_press_time = .25 75 76 def __init__(self, relay_board, position): 77 self.relay_board = relay_board 78 self.position = position 79 self._original_state = None 80 self.relay_id = "%s/%s" % (self.relay_board.name, self.position) 81 82 def set_no(self): 83 """Sets the relay to the 'NO' state. Shorthand for set(RelayState.NO). 84 85 Blocks the thread for Relay.transition_wait_time. 86 """ 87 self.set(RelayState.NO) 88 89 def set_nc(self): 90 """Sets the relay to the 'NC' state. Shorthand for set(RelayState.NC). 91 92 Blocks the thread for Relay.transition_wait_time. 93 94 """ 95 self.set(RelayState.NC) 96 97 def toggle(self): 98 """Swaps the state from 'NO' to 'NC' or 'NC' to 'NO'. 99 Blocks the thread for Relay.transition_wait_time. 100 """ 101 if self.get_status() == RelayState.NO: 102 self.set(RelayState.NC) 103 else: 104 self.set(RelayState.NO) 105 106 def set(self, state): 107 """Sets the relay to the 'NO' or 'NC' state. 108 109 Blocks the thread for Relay.transition_wait_time. 110 111 Args: 112 state: either 'NO' or 'NC'. 113 114 Raises: 115 ValueError if state is not 'NO' or 'NC'. 116 117 """ 118 if self._original_state is None: 119 self._original_state = self.relay_board.get_relay_status( 120 self.position) 121 122 if state is not RelayState.NO and state is not RelayState.NC: 123 raise ValueError( 124 'Invalid state. Received "%s". Expected any of %s.' % 125 (state, [state for state in RelayState])) 126 if self.get_status() != state: 127 self.relay_board.set(self.position, state) 128 SynchronizeRelays._sync_sleep_flag = True 129 sleep(Relay.transition_wait_time) 130 131 def set_no_for(self, seconds=button_press_time): 132 """Sets the relay to 'NORMALLY_OPEN' for seconds. Blocks the thread. 133 134 Args: 135 seconds: The number of seconds to sleep for. 136 """ 137 self.set_no() 138 sleep(seconds) 139 self.set_nc() 140 141 def set_nc_for(self, seconds=button_press_time): 142 """Sets the relay to 'NORMALLY_CLOSED' for seconds. Blocks the thread. 143 144 Respects Relay.transition_wait_time for toggling state. 145 146 Args: 147 seconds: The number of seconds to sleep for. 148 """ 149 self.set_nc() 150 sleep(seconds) 151 self.set_no() 152 153 def get_status(self): 154 return self.relay_board.get_relay_status(self.position) 155 156 def clean_up(self): 157 """Does any clean up needed to allow the next series of tests to run. 158 159 For now, all this does is switches to its previous state. Inheriting 160 from this class and overriding this method would be the best course of 161 action to allow a more complex clean up to occur. If you do this, be 162 sure to make the necessary modifications in RelayRig.initialize_relay 163 and RelayRigParser.parse_json_relays. 164 """ 165 if self._original_state is not None: 166 self.set(self._original_state) 167 168 def is_dirty(self): 169 return self._original_state is not None 170 171 172class RelayDict(object): 173 """A wrapped dictionary that gives config errors upon failure. 174 175 Has the same interface as a dictionary, but when getting the key fails, the 176 dictionary returns a RelayConfigError, letting the user know that the reason 177 the dict failed to return a relay is because the relay was not found in the 178 config. 179 180 Also prevents modification of elements, because changing the relays here 181 does not change what they are in hardware. 182 """ 183 ERROR_MESSAGE = ('Error: Attempted to get relay "%s" in %s "%s" but the ' 184 'relay does not exist.\nExisting relays are: %s.\nMake ' 185 'sure the missing relay is added to the config file, and ' 186 'is properly setup.') 187 188 def __init__(self, relay_device, input_dict): 189 self.relay_device = relay_device 190 self._store = input_dict 191 192 def __getitem__(self, key): 193 try: 194 return self._store[key] 195 except KeyError: 196 raise RelayConfigError(self.ERROR_MESSAGE % 197 (key, type(self.relay_device), 198 self.relay_device.name, self._store)) 199 200 def __iter__(self): 201 return iter(self._store) 202 203 def __len__(self): 204 return len(self._store) 205 206 def __repr__(self): 207 return repr(self._store) 208