1#!/usr/bin/python 2# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6import errno 7import heapq 8import logging 9import os 10import sys 11import socket 12import threading 13import xmlrpclib 14 15import rpm_logging_config 16from config import rpm_config 17from MultiThreadedXMLRPCServer import MultiThreadedXMLRPCServer 18from rpm_infrastructure_exception import RPMInfrastructureException 19 20import common 21from autotest_lib.server import frontend 22from autotest_lib.site_utils.rpm_control_system import utils 23 24DEFAULT_RPM_COUNT = 0 25TERMINATED = -1 26 27# Indexes for accessing heap entries. 28RPM_COUNT = 0 29DISPATCHER_URI = 1 30 31LOG_FILENAME_FORMAT = rpm_config.get('GENERAL','frontend_logname_format') 32DEFAULT_RPM_ID = rpm_config.get('RPM_INFRASTRUCTURE', 'default_rpm_id') 33 34# Valid state values. 35VALID_STATE_VALUES = ['ON', 'OFF', 'CYCLE'] 36 37# Servo-interface mapping file 38MAPPING_FILE = os.path.join( 39 os.path.dirname(__file__), 40 rpm_config.get('CiscoPOE', 'servo_interface_mapping_file')) 41 42# Size of the LRU that holds power management unit information related 43# to a device, e.g. rpm_hostname, outlet, hydra_hostname, etc. 44LRU_SIZE = rpm_config.getint('RPM_INFRASTRUCTURE', 'lru_size') 45 46 47class RPMFrontendServer(object): 48 """ 49 This class is the frontend server of the RPM Infrastructure. All clients 50 will send their power state requests to this central server who will 51 forward the requests to an avaliable or already assigned RPM dispatcher 52 server. 53 54 Once the dispatcher processes the request it will return the result 55 to this frontend server who will send the result back to the client. 56 57 All calls to this server are blocking. 58 59 @var _dispatcher_minheap: Min heap that returns a list of format- 60 [ num_rpm's, dispatcher_uri ] 61 Used to choose the least loaded dispatcher. 62 @var _entry_dict: Maps dispatcher URI to an entry (list) inside the min 63 heap. If a dispatcher server shuts down this allows us to 64 invalidate the entry in the minheap. 65 @var _lock: Used to protect data from multiple running threads all 66 manipulating the same data. 67 @var _rpm_dict: Maps rpm hostname's to an already assigned dispatcher 68 server. 69 @var _mapping_last_modified: Last-modified time of the servo-interface 70 mapping file. 71 @var _servo_interface: Maps servo hostname to (switch_hostname, interface). 72 @var _rpm_info: An LRU cache to hold recently visited rpm information 73 so that we don't hit AFE too often. The elements in 74 the cache are instances of PowerUnitInfo indexed by 75 dut hostnames. POE info is not stored in the cache. 76 @var _afe: AFE instance to talk to autotest. Used to retrieve rpm hostname. 77 @var _email_handler: Email handler to use to control email notifications. 78 """ 79 80 81 def __init__(self, email_handler=None): 82 """ 83 RPMFrontendServer constructor. 84 85 Initializes instance variables. 86 """ 87 self._dispatcher_minheap = [] 88 self._entry_dict = {} 89 self._lock = threading.Lock() 90 self._mapping_last_modified = os.path.getmtime(MAPPING_FILE) 91 self._servo_interface = utils.load_servo_interface_mapping() 92 self._rpm_dict = {} 93 self._afe = frontend.AFE() 94 self._rpm_info = utils.LRUCache(size=LRU_SIZE) 95 self._email_handler = email_handler 96 97 98 def queue_request(self, device_hostname, new_state): 99 """ 100 Forwards a request to change a device's (a dut or a servo) power state 101 to the appropriate dispatcher server. 102 103 This call will block until the forwarded request returns. 104 105 @param device_hostname: Hostname of the device whose power state we want to 106 change. 107 @param new_state: [ON, OFF, CYCLE] State to which we want to set the 108 device's outlet to. 109 110 @return: True if the attempt to change power state was successful, 111 False otherwise. 112 113 @raise RPMInfrastructureException: No dispatchers are available or can 114 be reached. 115 """ 116 # Remove any DNS Zone information and simplify down to just the hostname. 117 device_hostname = device_hostname.split('.')[0] 118 new_state = new_state.upper() 119 # Put new_state in all uppercase letters 120 if new_state not in VALID_STATE_VALUES: 121 logging.error('Received request to set device %s to invalid ' 122 'state %s', device_hostname, new_state) 123 return False 124 logging.info('Received request to set device: %s to state: %s', 125 device_hostname, new_state) 126 powerunit_info = self._get_powerunit_info(device_hostname) 127 dispatcher_uri = self._get_dispatcher(powerunit_info) 128 if not dispatcher_uri: 129 # No dispatchers available. 130 raise RPMInfrastructureException('No dispatchers available.') 131 client = xmlrpclib.ServerProxy(dispatcher_uri, allow_none=True) 132 try: 133 # Block on the request and return the result once it arrives. 134 return client.queue_request(powerunit_info, new_state) 135 except socket.error as er: 136 # Dispatcher Server is not reachable. Unregister it and retry. 137 logging.error("Can't reach Dispatch Server: %s. Error: %s", 138 dispatcher_uri, errno.errorcode[er.errno]) 139 if self.is_network_infrastructure_down(): 140 # No dispatchers can handle this request so raise an Exception 141 # to the caller. 142 raise RPMInfrastructureException('No dispatchers can be' 143 'reached.') 144 logging.info('Will attempt forwarding request to other dispatch ' 145 'servers.') 146 logging.error('Unregistering %s due to error. Recommend resetting ' 147 'that dispatch server.', dispatcher_uri) 148 self.unregister_dispatcher(dispatcher_uri) 149 # Retry forwarding the request. 150 return self.queue_request(device_hostname, new_state) 151 152 153 def is_network_infrastructure_down(self): 154 """ 155 Check to see if we can communicate with any dispatcher servers. 156 157 Only called in the situation that queuing a request to a dispatcher 158 server failed. 159 160 @return: False if any dispatcher server is up and the rpm infrastructure 161 can still function. True otherwise. 162 """ 163 for dispatcher_entry in self._dispatcher_minheap: 164 dispatcher = xmlrpclib.ServerProxy( 165 dispatcher_entry[DISPATCHER_URI], allow_none=True) 166 try: 167 if dispatcher.is_up(): 168 # Atleast one dispatcher is alive so our network is fine. 169 return False 170 except socket.error: 171 # Can't talk to this dispatcher so keep looping. 172 pass 173 logging.error("Can't reach any dispatchers. Check frontend network " 174 'status or all dispatchers are down.') 175 return True 176 177 178 def _get_powerunit_info(self, device_hostname): 179 """Get the power management unit information for a device. 180 181 A device could be a chromeos dut or a servo. 182 1) ChromeOS dut 183 Chromeos dut is managed by RPM. The related information 184 we need to know include rpm hostname, rpm outlet, hydra hostname. 185 Such information can be retrieved from afe_host_attributes table 186 from afe. A local LRU cache is used avoid hitting afe too often. 187 188 2) Servo 189 Servo is managed by POE. The related information we need to know 190 include poe hostname, poe interface. Such information is 191 stored in a local file and read into memory. 192 193 @param device_hostname: A string representing the device's hostname. 194 195 @returns: A PowerUnitInfo object. 196 @raises RPMInfrastructureException if failed to get the power 197 unit info. 198 199 """ 200 with self._lock: 201 if device_hostname.endswith('servo'): 202 # Servos are managed by Cisco POE switches. 203 reload_info = utils.reload_servo_interface_mapping_if_necessary( 204 self._mapping_last_modified) 205 if reload_info: 206 self._mapping_last_modified, self._servo_interface = reload_info 207 switch_if_tuple = self._servo_interface.get(device_hostname) 208 if not switch_if_tuple: 209 raise RPMInfrastructureException( 210 'Could not determine POE hostname for %s. ' 211 'Please check the servo-interface mapping file.', 212 device_hostname) 213 else: 214 return utils.PowerUnitInfo( 215 device_hostname=device_hostname, 216 powerunit_type=utils.PowerUnitInfo.POWERUNIT_TYPES.POE, 217 powerunit_hostname=switch_if_tuple[0], 218 outlet=switch_if_tuple[1], 219 hydra_hostname=None) 220 else: 221 # Regular DUTs are managed by RPMs. 222 if device_hostname in self._rpm_info: 223 return self._rpm_info[device_hostname] 224 else: 225 hosts = self._afe.get_hosts(hostname=device_hostname) 226 if not hosts: 227 raise RPMInfrastructureException( 228 'Can not retrieve rpm information ' 229 'from AFE for %s, no host found.' % device_hostname) 230 else: 231 info = utils.PowerUnitInfo.get_powerunit_info(hosts[0]) 232 self._rpm_info[device_hostname] = info 233 return info 234 235 236 def _get_dispatcher(self, powerunit_info): 237 """ 238 Private method that looks up or assigns a dispatcher server 239 responsible for communicating with the given RPM/POE. 240 241 Will also call _check_dispatcher to make sure it is up before returning 242 it. 243 244 @param powerunit_info: A PowerUnitInfo instance. 245 246 @return: URI of dispatcher server responsible for the rpm/poe. 247 None if no dispatcher servers are available. 248 """ 249 powerunit_type = powerunit_info.powerunit_type 250 powerunit_hostname = powerunit_info.powerunit_hostname 251 with self._lock: 252 if self._rpm_dict.get(powerunit_hostname): 253 return self._rpm_dict[powerunit_hostname] 254 logging.info('No Dispatcher assigned for %s %s.', 255 powerunit_type, powerunit_hostname) 256 # Choose the least loaded dispatcher to communicate with the RPM. 257 try: 258 heap_entry = heapq.heappop(self._dispatcher_minheap) 259 except IndexError: 260 logging.error('Infrastructure Error: Frontend has no' 261 'registered dispatchers to field out this ' 262 'request!') 263 return None 264 dispatcher_uri = heap_entry[DISPATCHER_URI] 265 # Put this entry back in the heap with an RPM Count + 1. 266 heap_entry[RPM_COUNT] = heap_entry[RPM_COUNT] + 1 267 heapq.heappush(self._dispatcher_minheap, heap_entry) 268 logging.info('Assigning %s for %s %s', dispatcher_uri, 269 powerunit_type, powerunit_hostname) 270 self._rpm_dict[powerunit_hostname] = dispatcher_uri 271 return dispatcher_uri 272 273 274 def register_dispatcher(self, dispatcher_uri): 275 """ 276 Called by a dispatcher server so that the frontend server knows it is 277 available to field out RPM requests. 278 279 Adds an entry to the min heap and entry map for this dispatcher. 280 281 @param dispatcher_uri: Address of dispatcher server we are registering. 282 """ 283 logging.info('Registering uri: %s as a rpm dispatcher.', dispatcher_uri) 284 with self._lock: 285 heap_entry = [DEFAULT_RPM_COUNT, dispatcher_uri] 286 heapq.heappush(self._dispatcher_minheap, heap_entry) 287 self._entry_dict[dispatcher_uri] = heap_entry 288 289 290 def unregister_dispatcher(self, uri_to_unregister): 291 """ 292 Called by a dispatcher server as it exits so that the frontend server 293 knows that it is no longer available to field out requests. 294 295 Assigns an rpm count of -1 to this dispatcher so that it will be pushed 296 out of the min heap. 297 298 Removes from _rpm_dict all entries with the value of this dispatcher so 299 that those RPM's can be reassigned to a new dispatcher. 300 301 @param uri_to_unregister: Address of dispatcher server we are 302 unregistering. 303 """ 304 logging.info('Unregistering uri: %s as a rpm dispatcher.', 305 uri_to_unregister) 306 with self._lock: 307 heap_entry = self._entry_dict.get(uri_to_unregister) 308 if not heap_entry: 309 logging.warning('%s was not registered.', uri_to_unregister) 310 return 311 # Set this entry's RPM_COUNT to TERMINATED (-1). 312 heap_entry[RPM_COUNT] = TERMINATED 313 # Remove all RPM mappings. 314 for rpm, dispatcher in self._rpm_dict.items(): 315 if dispatcher == uri_to_unregister: 316 self._rpm_dict[rpm] = None 317 self._entry_dict[uri_to_unregister] = None 318 # Re-sort the heap and remove any terminated dispatchers. 319 heapq.heapify(self._dispatcher_minheap) 320 self._remove_terminated_dispatchers() 321 322 323 def _remove_terminated_dispatchers(self): 324 """ 325 Peek at the head of the heap and keep popping off values until there is 326 a non-terminated dispatcher at the top. 327 """ 328 # Heapq guarantees the head of the heap is in the '0' index. 329 try: 330 # Peek at the next element in the heap. 331 top_of_heap = self._dispatcher_minheap[0] 332 while top_of_heap[RPM_COUNT] is TERMINATED: 333 # Pop off the top element. 334 heapq.heappop(self._dispatcher_minheap) 335 # Peek at the next element in the heap. 336 top_of_heap = self._dispatcher_minheap[0] 337 except IndexError: 338 # No more values in the heap. Can be thrown by both minheap[0] 339 # statements. 340 pass 341 342 343 def suspend_emails(self, hours): 344 """Suspend email notifications. 345 346 @param hours: How many hours to suspend email notifications. 347 """ 348 if self._email_handler: 349 self._email_handler.suspend_emails(hours) 350 351 352 def resume_emails(self): 353 """Resume email notifications.""" 354 if self._email_handler: 355 self._email_handler.resume_emails() 356 357 358if __name__ == '__main__': 359 """ 360 Main function used to launch the frontend server. Creates an instance of 361 RPMFrontendServer and registers it to a MultiThreadedXMLRPCServer instance. 362 """ 363 if len(sys.argv) > 1: 364 print 'Usage: ./%s, no arguments available.' % sys.argv[0] 365 sys.exit(1) 366 email_handler = rpm_logging_config.set_up_logging(LOG_FILENAME_FORMAT) 367 frontend_server = RPMFrontendServer(email_handler=email_handler) 368 address = rpm_config.get('RPM_INFRASTRUCTURE', 'frontend_addr') 369 port = rpm_config.getint('RPM_INFRASTRUCTURE', 'frontend_port') 370 server = MultiThreadedXMLRPCServer((address, port), allow_none=True) 371 server.register_instance(frontend_server) 372 logging.info('Listening on %s port %d', address, port) 373 server.serve_forever() 374