1#!/usr/bin/python 2# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6import errno 7import heapq 8import logging 9import os 10import sys 11import socket 12import threading 13import xmlrpclib 14 15import rpm_logging_config 16from config import rpm_config 17from MultiThreadedXMLRPCServer import MultiThreadedXMLRPCServer 18from rpm_infrastructure_exception import RPMInfrastructureException 19 20import common 21from autotest_lib.server import frontend 22from autotest_lib.site_utils.rpm_control_system import utils 23 24DEFAULT_RPM_COUNT = 0 25TERMINATED = -1 26 27# Indexes for accessing heap entries. 28RPM_COUNT = 0 29DISPATCHER_URI = 1 30 31LOG_FILENAME_FORMAT = rpm_config.get('GENERAL','frontend_logname_format') 32DEFAULT_RPM_ID = rpm_config.get('RPM_INFRASTRUCTURE', 'default_rpm_id') 33 34# Valid state values. 35VALID_STATE_VALUES = ['ON', 'OFF', 'CYCLE'] 36 37# Servo-interface mapping file 38MAPPING_FILE = os.path.join( 39 os.path.dirname(__file__), 40 rpm_config.get('CiscoPOE', 'servo_interface_mapping_file')) 41 42# Size of the LRU that holds power management unit information related 43# to a device, e.g. rpm_hostname, outlet, hydra_hostname, etc. 44LRU_SIZE = rpm_config.getint('RPM_INFRASTRUCTURE', 'lru_size') 45 46 47class DispatcherDownException(Exception): 48 """Raised when a particular RPMDispatcher is down.""" 49 50 51class RPMFrontendServer(object): 52 """ 53 This class is the frontend server of the RPM Infrastructure. All clients 54 will send their power state requests to this central server who will 55 forward the requests to an avaliable or already assigned RPM dispatcher 56 server. 57 58 Once the dispatcher processes the request it will return the result 59 to this frontend server who will send the result back to the client. 60 61 All calls to this server are blocking. 62 63 @var _dispatcher_minheap: Min heap that returns a list of format- 64 [ num_rpm's, dispatcher_uri ] 65 Used to choose the least loaded dispatcher. 66 @var _entry_dict: Maps dispatcher URI to an entry (list) inside the min 67 heap. If a dispatcher server shuts down this allows us to 68 invalidate the entry in the minheap. 69 @var _lock: Used to protect data from multiple running threads all 70 manipulating the same data. 71 @var _rpm_dict: Maps rpm hostname's to an already assigned dispatcher 72 server. 73 @var _mapping_last_modified: Last-modified time of the servo-interface 74 mapping file. 75 @var _servo_interface: Maps servo hostname to (switch_hostname, interface). 76 @var _rpm_info: An LRU cache to hold recently visited rpm information 77 so that we don't hit AFE too often. The elements in 78 the cache are instances of PowerUnitInfo indexed by 79 dut hostnames. POE info is not stored in the cache. 80 @var _afe: AFE instance to talk to autotest. Used to retrieve rpm hostname. 81 @var _email_handler: Email handler to use to control email notifications. 82 """ 83 84 85 def __init__(self, email_handler=None): 86 """ 87 RPMFrontendServer constructor. 88 89 Initializes instance variables. 90 """ 91 self._dispatcher_minheap = [] 92 self._entry_dict = {} 93 self._lock = threading.Lock() 94 self._mapping_last_modified = os.path.getmtime(MAPPING_FILE) 95 self._servo_interface = utils.load_servo_interface_mapping() 96 self._rpm_dict = {} 97 self._afe = frontend.AFE() 98 self._rpm_info = utils.LRUCache(size=LRU_SIZE) 99 self._email_handler = email_handler 100 101 102 def set_power_via_poe(self, device_hostname, new_state): 103 """Sets power state of the device to the requested state via POE. 104 105 @param device_hostname: Hostname of the servo to control. 106 @param new_state: [ON, OFF, CYCLE] State to which we want to set the 107 device's outlet to. 108 109 @return: True if the attempt to change power state was successful, 110 False otherwise. 111 112 @raise RPMInfrastructureException: No dispatchers are available or can 113 be reached. 114 """ 115 # Remove any DNS Zone information and simplify down to just the hostname. 116 device_hostname = device_hostname.split('.')[0] 117 new_state = new_state.upper() 118 if new_state not in VALID_STATE_VALUES: 119 logging.error('Received request to set servo %s to invalid ' 120 'state %s', device_hostname, new_state) 121 return False 122 logging.info('Received request to set servo: %s to state: %s', 123 device_hostname, new_state) 124 powerunit_info = self._get_poe_powerunit_info(device_hostname) 125 try: 126 return self._queue_once(powerunit_info, new_state) 127 except DispatcherDownException: 128 # Retry forwarding the request. 129 return self.set_power_via_poe(device_hostname, new_state) 130 131 132 def set_power_via_rpm(self, device_hostname, rpm_hostname, 133 rpm_outlet, hydra_hostname, new_state): 134 """Sets power state of a device to the requested state via RPM. 135 136 Unlike the special case of POE, powerunit information is not available 137 on the RPM server, so must be provided as arguments. 138 139 @param device_hostname: Hostname of the servo to control. 140 @param rpm_hostname: Hostname of the RPM to use. 141 @param rpm_outlet: The RPM outlet to control. 142 @param hydra_hostname: If required, the hydra device to SSH through to 143 get to the RPM. 144 @param new_state: [ON, OFF, CYCLE] State to which we want to set the 145 device's outlet to. 146 147 @return: True if the attempt to change power state was successful, 148 False otherwise. 149 150 @raise RPMInfrastructureException: No dispatchers are available or can 151 be reached. 152 """ 153 powerunit_info = utils.PowerUnitInfo( 154 device_hostname=device_hostname, 155 powerunit_type=utils.PowerUnitInfo.POWERUNIT_TYPES.RPM, 156 powerunit_hostname=rpm_hostname, 157 outlet=rpm_outlet, 158 hydra_hostname=hydra_hostname, 159 ) 160 try: 161 return self._queue_once(powerunit_info, new_state) 162 except DispatcherDownException: 163 # Retry forwarding the request. 164 return self.set_power_via_rpm(device_hostname, rpm_hostname, 165 rpm_outlet, hydra_hostname, new_state) 166 167 168 def queue_request(self, device_hostname, new_state): 169 """ 170 Forwards a request to change a device's (a dut or a servo) power state 171 to the appropriate dispatcher server. 172 173 This call will block until the forwarded request returns. 174 175 @param device_hostname: Hostname of the device whose power state we want 176 to change. 177 @param new_state: [ON, OFF, CYCLE] State to which we want to set the 178 device's outlet to. 179 180 @return: True if the attempt to change power state was successful, 181 False otherwise. 182 183 @raise RPMInfrastructureException: No dispatchers are available or can 184 be reached. 185 """ 186 # Remove any DNS Zone information and simplify down to just the hostname. 187 device_hostname = device_hostname.split('.')[0] 188 new_state = new_state.upper() 189 # Put new_state in all uppercase letters 190 if new_state not in VALID_STATE_VALUES: 191 logging.error('Received request to set device %s to invalid ' 192 'state %s', device_hostname, new_state) 193 return False 194 logging.info('Received request to set device: %s to state: %s', 195 device_hostname, new_state) 196 powerunit_info = self._get_powerunit_info(device_hostname) 197 try: 198 return self._queue_once(powerunit_info, new_state) 199 except DispatcherDownException: 200 # Retry forwarding the request. 201 return self.queue_request(device_hostname, new_state) 202 203 204 def _queue_once(self, powerunit_info, new_state): 205 """Queue one request to the dispatcher.""" 206 dispatcher_uri = self._get_dispatcher(powerunit_info) 207 if not dispatcher_uri: 208 # No dispatchers available. 209 raise RPMInfrastructureException('No dispatchers available.') 210 client = xmlrpclib.ServerProxy(dispatcher_uri, allow_none=True) 211 try: 212 # Block on the request and return the result once it arrives. 213 return client.queue_request(powerunit_info, new_state) 214 except socket.error as er: 215 # Dispatcher Server is not reachable. Unregister it and retry. 216 logging.error("Can't reach Dispatch Server: %s. Error: %s", 217 dispatcher_uri, errno.errorcode[er.errno]) 218 if self.is_network_infrastructure_down(): 219 # No dispatchers can handle this request so raise an Exception 220 # to the caller. 221 raise RPMInfrastructureException('No dispatchers can be' 222 'reached.') 223 logging.info('Will attempt forwarding request to other dispatch ' 224 'servers.') 225 logging.error('Unregistering %s due to error. Recommend resetting ' 226 'that dispatch server.', dispatcher_uri) 227 self.unregister_dispatcher(dispatcher_uri) 228 raise DispatcherDownException(dispatcher_uri) 229 230 231 def is_network_infrastructure_down(self): 232 """ 233 Check to see if we can communicate with any dispatcher servers. 234 235 Only called in the situation that queuing a request to a dispatcher 236 server failed. 237 238 @return: False if any dispatcher server is up and the rpm infrastructure 239 can still function. True otherwise. 240 """ 241 for dispatcher_entry in self._dispatcher_minheap: 242 dispatcher = xmlrpclib.ServerProxy( 243 dispatcher_entry[DISPATCHER_URI], allow_none=True) 244 try: 245 if dispatcher.is_up(): 246 # Atleast one dispatcher is alive so our network is fine. 247 return False 248 except socket.error: 249 # Can't talk to this dispatcher so keep looping. 250 pass 251 logging.error("Can't reach any dispatchers. Check frontend network " 252 'status or all dispatchers are down.') 253 return True 254 255 256 def _get_powerunit_info(self, device_hostname): 257 """Get the power management unit information for a device. 258 259 A device could be a chromeos dut or a servo. 260 1) ChromeOS dut 261 Chromeos dut is managed by RPM. The related information 262 we need to know include rpm hostname, rpm outlet, hydra hostname. 263 Such information can be retrieved from afe_host_attributes table 264 from afe. A local LRU cache is used avoid hitting afe too often. 265 266 2) Servo 267 Servo is managed by POE. The related information we need to know 268 include poe hostname, poe interface. Such information is 269 stored in a local file and read into memory. 270 271 @param device_hostname: A string representing the device's hostname. 272 273 @returns: A PowerUnitInfo object. 274 @raises RPMInfrastructureException if failed to get the power 275 unit info. 276 277 """ 278 if device_hostname.endswith('servo'): 279 return self._get_poe_powerunit_info(device_hostname) 280 else: 281 return self._get_rpm_powerunit_info(device_hostname) 282 283 284 def _get_poe_powerunit_info(self, device_hostname): 285 """Get the power management unit information for a POE controller. 286 287 Servo is managed by POE. The related information we need to know 288 include poe hostname, poe interface. Such information is 289 stored in a local file and read into memory. 290 291 @param device_hostname: A string representing the device's hostname. 292 293 @returns: A PowerUnitInfo object. 294 @raises RPMInfrastructureException if failed to get the power 295 unit info. 296 297 """ 298 with self._lock: 299 reload_info = utils.reload_servo_interface_mapping_if_necessary( 300 self._mapping_last_modified) 301 if reload_info: 302 self._mapping_last_modified, self._servo_interface = reload_info 303 switch_if_tuple = self._servo_interface.get(device_hostname) 304 if not switch_if_tuple: 305 raise RPMInfrastructureException( 306 'Could not determine POE hostname for %s. ' 307 'Please check the servo-interface mapping file.', 308 device_hostname) 309 else: 310 return utils.PowerUnitInfo( 311 device_hostname=device_hostname, 312 powerunit_type=utils.PowerUnitInfo.POWERUNIT_TYPES.POE, 313 powerunit_hostname=switch_if_tuple[0], 314 outlet=switch_if_tuple[1], 315 hydra_hostname=None) 316 317 318 319 def _get_rpm_powerunit_info(self, device_hostname): 320 """Get the power management unit information for an RPM controller. 321 322 Chromeos dut is managed by RPM. The related information 323 we need to know include rpm hostname, rpm outlet, hydra hostname. 324 Such information can be retrieved from afe_host_attributes table 325 from afe. A local LRU cache is used avoid hitting afe too often. 326 327 @param device_hostname: A string representing the device's hostname. 328 329 @returns: A PowerUnitInfo object. 330 @raises RPMInfrastructureException if failed to get the power 331 unit info. 332 333 """ 334 with self._lock: 335 # Regular DUTs are managed by RPMs. 336 if device_hostname in self._rpm_info: 337 return self._rpm_info[device_hostname] 338 else: 339 hosts = self._afe.get_hosts(hostname=device_hostname) 340 if not hosts: 341 raise RPMInfrastructureException( 342 'Can not retrieve rpm information ' 343 'from AFE for %s, no host found.' % device_hostname) 344 else: 345 info = utils.PowerUnitInfo.get_powerunit_info(hosts[0]) 346 self._rpm_info[device_hostname] = info 347 return info 348 349 350 351 def _get_dispatcher(self, powerunit_info): 352 """ 353 Private method that looks up or assigns a dispatcher server 354 responsible for communicating with the given RPM/POE. 355 356 Will also call _check_dispatcher to make sure it is up before returning 357 it. 358 359 @param powerunit_info: A PowerUnitInfo instance. 360 361 @return: URI of dispatcher server responsible for the rpm/poe. 362 None if no dispatcher servers are available. 363 """ 364 powerunit_type = powerunit_info.powerunit_type 365 powerunit_hostname = powerunit_info.powerunit_hostname 366 with self._lock: 367 if self._rpm_dict.get(powerunit_hostname): 368 return self._rpm_dict[powerunit_hostname] 369 logging.info('No Dispatcher assigned for %s %s.', 370 powerunit_type, powerunit_hostname) 371 # Choose the least loaded dispatcher to communicate with the RPM. 372 try: 373 heap_entry = heapq.heappop(self._dispatcher_minheap) 374 except IndexError: 375 logging.error('Infrastructure Error: Frontend has no' 376 'registered dispatchers to field out this ' 377 'request!') 378 return None 379 dispatcher_uri = heap_entry[DISPATCHER_URI] 380 # Put this entry back in the heap with an RPM Count + 1. 381 heap_entry[RPM_COUNT] = heap_entry[RPM_COUNT] + 1 382 heapq.heappush(self._dispatcher_minheap, heap_entry) 383 logging.info('Assigning %s for %s %s', dispatcher_uri, 384 powerunit_type, powerunit_hostname) 385 self._rpm_dict[powerunit_hostname] = dispatcher_uri 386 return dispatcher_uri 387 388 389 def register_dispatcher(self, dispatcher_uri): 390 """ 391 Called by a dispatcher server so that the frontend server knows it is 392 available to field out RPM requests. 393 394 Adds an entry to the min heap and entry map for this dispatcher. 395 396 @param dispatcher_uri: Address of dispatcher server we are registering. 397 """ 398 logging.info('Registering uri: %s as a rpm dispatcher.', dispatcher_uri) 399 with self._lock: 400 heap_entry = [DEFAULT_RPM_COUNT, dispatcher_uri] 401 heapq.heappush(self._dispatcher_minheap, heap_entry) 402 self._entry_dict[dispatcher_uri] = heap_entry 403 404 405 def unregister_dispatcher(self, uri_to_unregister): 406 """ 407 Called by a dispatcher server as it exits so that the frontend server 408 knows that it is no longer available to field out requests. 409 410 Assigns an rpm count of -1 to this dispatcher so that it will be pushed 411 out of the min heap. 412 413 Removes from _rpm_dict all entries with the value of this dispatcher so 414 that those RPM's can be reassigned to a new dispatcher. 415 416 @param uri_to_unregister: Address of dispatcher server we are 417 unregistering. 418 """ 419 logging.info('Unregistering uri: %s as a rpm dispatcher.', 420 uri_to_unregister) 421 with self._lock: 422 heap_entry = self._entry_dict.get(uri_to_unregister) 423 if not heap_entry: 424 logging.warning('%s was not registered.', uri_to_unregister) 425 return 426 # Set this entry's RPM_COUNT to TERMINATED (-1). 427 heap_entry[RPM_COUNT] = TERMINATED 428 # Remove all RPM mappings. 429 for rpm, dispatcher in self._rpm_dict.items(): 430 if dispatcher == uri_to_unregister: 431 self._rpm_dict[rpm] = None 432 self._entry_dict[uri_to_unregister] = None 433 # Re-sort the heap and remove any terminated dispatchers. 434 heapq.heapify(self._dispatcher_minheap) 435 self._remove_terminated_dispatchers() 436 437 438 def _remove_terminated_dispatchers(self): 439 """ 440 Peek at the head of the heap and keep popping off values until there is 441 a non-terminated dispatcher at the top. 442 """ 443 # Heapq guarantees the head of the heap is in the '0' index. 444 try: 445 # Peek at the next element in the heap. 446 top_of_heap = self._dispatcher_minheap[0] 447 while top_of_heap[RPM_COUNT] is TERMINATED: 448 # Pop off the top element. 449 heapq.heappop(self._dispatcher_minheap) 450 # Peek at the next element in the heap. 451 top_of_heap = self._dispatcher_minheap[0] 452 except IndexError: 453 # No more values in the heap. Can be thrown by both minheap[0] 454 # statements. 455 pass 456 457 458 def suspend_emails(self, hours): 459 """Suspend email notifications. 460 461 @param hours: How many hours to suspend email notifications. 462 """ 463 if self._email_handler: 464 self._email_handler.suspend_emails(hours) 465 466 467 def resume_emails(self): 468 """Resume email notifications.""" 469 if self._email_handler: 470 self._email_handler.resume_emails() 471 472 473if __name__ == '__main__': 474 """ 475 Main function used to launch the frontend server. Creates an instance of 476 RPMFrontendServer and registers it to a MultiThreadedXMLRPCServer instance. 477 """ 478 if len(sys.argv) != 2: 479 print 'Usage: ./%s <log_file_dir>.' % sys.argv[0] 480 sys.exit(1) 481 482 email_handler = rpm_logging_config.set_up_logging_to_file( 483 sys.argv[1], LOG_FILENAME_FORMAT) 484 frontend_server = RPMFrontendServer(email_handler=email_handler) 485 # We assume that external clients will always connect to us via the 486 # hostname, so listening on the hostname ensures we pick the right network 487 # interface. 488 address = socket.gethostname() 489 port = rpm_config.getint('RPM_INFRASTRUCTURE', 'frontend_port') 490 server = MultiThreadedXMLRPCServer((address, port), allow_none=True) 491 server.register_instance(frontend_server) 492 logging.info('Listening on %s port %d', address, port) 493 server.serve_forever() 494