#!/usr/bin/python2 # Copyright (c) 2012 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import atexit import errno import logging import re import sys import socket import threading import xmlrpclib import rpm_controller import rpm_logging_config from config import rpm_config from MultiThreadedXMLRPCServer import MultiThreadedXMLRPCServer from rpm_infrastructure_exception import RPMInfrastructureException import common from autotest_lib.site_utils.rpm_control_system import utils LOG_FILENAME_FORMAT = rpm_config.get('GENERAL','dispatcher_logname_format') class RPMDispatcher(object): """ This class is the RPM dispatcher server and it is responsible for communicating directly to the RPM devices to change a DUT's outlet status. When an RPMDispatcher is initialized it registers itself with the frontend server, who will field out outlet requests to this dispatcher. Once a request is received the dispatcher looks up the RPMController instance for the given DUT and then queues up the request and blocks until it is processed. @var _address: IP address or Hostname of this dispatcher server. @var _frontend_server: URI of the frontend server. @var _lock: Lock used to synchronize access to _worker_dict. @var _port: Port assigned to this server instance. @var _worker_dict: Dictionary mapping RPM hostname's to RPMController instances. """ def __init__(self, address, port): """ RPMDispatcher constructor. Initialized instance vars and registers this server with the frontend server. @param address: Address of this dispatcher server. @param port: Port assigned to this dispatcher server. @raise RPMInfrastructureException: Raised if the dispatch server is unable to register with the frontend server. """ self._address = address self._port = port self._lock = threading.Lock() self._worker_dict = {} # We assume that the frontend server and dispatchers are running on the # same host, and the frontend server is listening for connections from # the external world. frontend_server_port = rpm_config.getint('RPM_INFRASTRUCTURE', 'frontend_port') self._frontend_server = 'http://%s:%d' % (socket.gethostname(), frontend_server_port) logging.info('Registering this rpm dispatcher with the frontend ' 'server at %s.', self._frontend_server) client = xmlrpclib.ServerProxy(self._frontend_server) # De-register with the frontend when the dispatcher exit's. atexit.register(self._unregister) try: client.register_dispatcher(self._get_serveruri()) except socket.error as er: err_msg = ('Unable to register with frontend server. Error: %s.' % errno.errorcode[er.errno]) logging.error(err_msg) raise RPMInfrastructureException(err_msg) def _worker_dict_put(self, key, value): """ Private method used to synchronize access to _worker_dict. @param key: key value we are using to access _worker_dict. @param value: value we are putting into _worker_dict. """ with self._lock: self._worker_dict[key] = value def _worker_dict_get(self, key): """ Private method used to synchronize access to _worker_dict. @param key: key value we are using to access _worker_dict. @return: value found when accessing _worker_dict """ with self._lock: return self._worker_dict.get(key) def is_up(self): """ Allows the frontend server to see if the dispatcher server is up before attempting to queue requests. @return: True. If connection fails, the client proxy will throw a socket error on the client side. """ return True def queue_request(self, powerunit_info_dict, new_state): """ Looks up the appropriate RPMController instance for the device and queues up the request. @param powerunit_info_dict: A dictionary, containing the attribute/values of an unmarshalled PowerUnitInfo instance. @param new_state: [ON, OFF, CYCLE] state we want to the change the outlet to. @return: True if the attempt to change power state was successful, False otherwise. """ powerunit_info = utils.PowerUnitInfo(**powerunit_info_dict) logging.info('Received request to set device: %s to state: %s', powerunit_info.device_hostname, new_state) rpm_controller = self._get_rpm_controller( powerunit_info.powerunit_hostname, powerunit_info.hydra_hostname) return rpm_controller.queue_request(powerunit_info, new_state) def _get_rpm_controller(self, rpm_hostname, hydra_hostname=None): """ Private method that retreives the appropriate RPMController instance for this RPM Hostname or calls _create_rpm_controller it if it does not already exist. @param rpm_hostname: hostname of the RPM whose RPMController we want. @return: RPMController instance responsible for this RPM. """ if not rpm_hostname: return None rpm_controller = self._worker_dict_get(rpm_hostname) if not rpm_controller: rpm_controller = self._create_rpm_controller( rpm_hostname, hydra_hostname) self._worker_dict_put(rpm_hostname, rpm_controller) return rpm_controller def _create_rpm_controller(self, rpm_hostname, hydra_hostname): """ Determines the type of RPMController required and initializes it. @param rpm_hostname: Hostname of the RPM we need to communicate with. @return: RPMController instance responsible for this RPM. """ hostname_elements = rpm_hostname.split('-') if hostname_elements[-2] == 'poe': # POE switch hostname looks like 'chromeos2-poe-switch1'. logging.info('The controller is a Cisco POE switch.') return rpm_controller.CiscoPOEController(rpm_hostname) else: # The device is an RPM. rack_id = hostname_elements[-2] rpm_typechecker = re.compile('rack[0-9]+[a-z]+') if rpm_typechecker.match(rack_id): logging.info('RPM is a webpowered device.') return rpm_controller.WebPoweredRPMController(rpm_hostname) else: logging.info('RPM is a Sentry CDU device.') return rpm_controller.SentryRPMController( hostname=rpm_hostname, hydra_hostname=hydra_hostname) def _get_serveruri(self): """ Formats the _address and _port into a meaningful URI string. @return: URI of this dispatch server. """ return 'http://%s:%d' % (self._address, self._port) def _unregister(self): """ Tells the frontend server that this dispatch server is shutting down and to unregister it. Called by atexit. @raise RPMInfrastructureException: Raised if the dispatch server is unable to unregister with the frontend server. """ logging.info('Dispatch server shutting down. Unregistering with RPM ' 'frontend server.') client = xmlrpclib.ServerProxy(self._frontend_server) try: client.unregister_dispatcher(self._get_serveruri()) except socket.error as er: err_msg = ('Unable to unregister with frontend server. Error: %s.' % errno.errorcode[er.errno]) logging.error(err_msg) raise RPMInfrastructureException(err_msg) def launch_server_on_unused_port(): """ Looks up an unused port on this host and launches the xmlrpc server. Useful for testing by running multiple dispatch servers on the same host. @return: server,port - server object and the port that which it is listening to. """ address = socket.gethostbyname(socket.gethostname()) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # Set this socket to allow reuse. sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind(('', 0)) port = sock.getsockname()[1] server = MultiThreadedXMLRPCServer((address, port), allow_none=True) sock.close() return server, port if __name__ == '__main__': """ Main function used to launch the dispatch server. Creates an instance of RPMDispatcher and registers it to a MultiThreadedXMLRPCServer instance. """ if len(sys.argv) != 2: print 'Usage: ./%s ' % sys.argv[0] sys.exit(1) rpm_logging_config.start_log_server(sys.argv[1], LOG_FILENAME_FORMAT) rpm_logging_config.set_up_logging_to_server() # Get the local ip _address and set the server to utilize it. address = socket.gethostbyname(socket.gethostname()) server, port = launch_server_on_unused_port() rpm_dispatcher = RPMDispatcher(address, port) server.register_instance(rpm_dispatcher) server.serve_forever()