1#!/usr/bin/python
2# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6import atexit
7import errno
8import logging
9import re
10import sys
11import socket
12import threading
13import xmlrpclib
14
15import rpm_controller
16import rpm_logging_config
17
18from config import rpm_config
19from MultiThreadedXMLRPCServer import MultiThreadedXMLRPCServer
20from rpm_infrastructure_exception import RPMInfrastructureException
21
22import common
23from autotest_lib.site_utils.rpm_control_system import utils
24
25LOG_FILENAME_FORMAT = rpm_config.get('GENERAL','dispatcher_logname_format')
26
27
28class RPMDispatcher(object):
29    """
30    This class is the RPM dispatcher server and it is responsible for
31    communicating directly to the RPM devices to change a DUT's outlet status.
32
33    When an RPMDispatcher is initialized it registers itself with the frontend
34    server, who will field out outlet requests to this dispatcher.
35
36    Once a request is received the dispatcher looks up the RPMController
37    instance for the given DUT and then queues up the request and blocks until
38    it is processed.
39
40    @var _address: IP address or Hostname of this dispatcher server.
41    @var _frontend_server: URI of the frontend server.
42    @var _lock: Lock used to synchronize access to _worker_dict.
43    @var _port: Port assigned to this server instance.
44    @var _worker_dict: Dictionary mapping RPM hostname's to RPMController
45                       instances.
46    """
47
48
49    def __init__(self, address, port):
50        """
51        RPMDispatcher constructor.
52
53        Initialized instance vars and registers this server with the frontend
54        server.
55
56        @param address: Address of this dispatcher server.
57        @param port: Port assigned to this dispatcher server.
58
59        @raise RPMInfrastructureException: Raised if the dispatch server is
60                                           unable to register with the frontend
61                                           server.
62        """
63        self._address = address
64        self._port = port
65        self._lock = threading.Lock()
66        self._worker_dict = {}
67        # We assume that the frontend server and dispatchers are running on the
68        # same host, and the frontend server is listening for connections from
69        # the external world.
70        frontend_server_port = rpm_config.getint('RPM_INFRASTRUCTURE',
71                                                 'frontend_port')
72        self._frontend_server = 'http://%s:%d' % (socket.gethostname(),
73                                                  frontend_server_port)
74        logging.info('Registering this rpm dispatcher with the frontend '
75                     'server at %s.', self._frontend_server)
76        client = xmlrpclib.ServerProxy(self._frontend_server)
77        # De-register with the frontend when the dispatcher exit's.
78        atexit.register(self._unregister)
79        try:
80            client.register_dispatcher(self._get_serveruri())
81        except socket.error as er:
82            err_msg = ('Unable to register with frontend server. Error: %s.' %
83                       errno.errorcode[er.errno])
84            logging.error(err_msg)
85            raise RPMInfrastructureException(err_msg)
86
87
88    def _worker_dict_put(self, key, value):
89        """
90        Private method used to synchronize access to _worker_dict.
91
92        @param key: key value we are using to access _worker_dict.
93        @param value: value we are putting into _worker_dict.
94        """
95        with self._lock:
96            self._worker_dict[key] = value
97
98
99    def _worker_dict_get(self, key):
100        """
101        Private method used to synchronize access to _worker_dict.
102
103        @param key: key value we are using to access _worker_dict.
104        @return: value found when accessing _worker_dict
105        """
106        with self._lock:
107            return self._worker_dict.get(key)
108
109
110    def is_up(self):
111        """
112        Allows the frontend server to see if the dispatcher server is up before
113        attempting to queue requests.
114
115        @return: True. If connection fails, the client proxy will throw a socket
116                 error on the client side.
117        """
118        return True
119
120
121    def queue_request(self, powerunit_info_dict, new_state):
122        """
123        Looks up the appropriate RPMController instance for the device and queues
124        up the request.
125
126        @param powerunit_info_dict: A dictionary, containing the attribute/values
127                                    of an unmarshalled PowerUnitInfo instance.
128        @param new_state: [ON, OFF, CYCLE] state we want to the change the
129                          outlet to.
130        @return: True if the attempt to change power state was successful,
131                 False otherwise.
132        """
133        powerunit_info = utils.PowerUnitInfo(**powerunit_info_dict)
134        logging.info('Received request to set device: %s to state: %s',
135                     powerunit_info.device_hostname, new_state)
136        rpm_controller = self._get_rpm_controller(
137                powerunit_info.powerunit_hostname,
138                powerunit_info.hydra_hostname)
139        return rpm_controller.queue_request(powerunit_info, new_state)
140
141
142    def _get_rpm_controller(self, rpm_hostname, hydra_hostname=None):
143        """
144        Private method that retreives the appropriate RPMController instance
145        for this RPM Hostname or calls _create_rpm_controller it if it does not
146        already exist.
147
148        @param rpm_hostname: hostname of the RPM whose RPMController we want.
149
150        @return: RPMController instance responsible for this RPM.
151        """
152        if not rpm_hostname:
153            return None
154        rpm_controller = self._worker_dict_get(rpm_hostname)
155        if not rpm_controller:
156            rpm_controller = self._create_rpm_controller(
157                    rpm_hostname, hydra_hostname)
158            self._worker_dict_put(rpm_hostname, rpm_controller)
159        return rpm_controller
160
161
162    def _create_rpm_controller(self, rpm_hostname, hydra_hostname):
163        """
164        Determines the type of RPMController required and initializes it.
165
166        @param rpm_hostname: Hostname of the RPM we need to communicate with.
167
168        @return: RPMController instance responsible for this RPM.
169        """
170        hostname_elements = rpm_hostname.split('-')
171        if hostname_elements[-2] == 'poe':
172            # POE switch hostname looks like 'chromeos2-poe-switch1'.
173            logging.info('The controller is a Cisco POE switch.')
174            return rpm_controller.CiscoPOEController(rpm_hostname)
175        else:
176            # The device is an RPM.
177            rack_id = hostname_elements[-2]
178            rpm_typechecker = re.compile('rack[0-9]+[a-z]+')
179            if rpm_typechecker.match(rack_id):
180                logging.info('RPM is a webpowered device.')
181                return rpm_controller.WebPoweredRPMController(rpm_hostname)
182            else:
183                logging.info('RPM is a Sentry CDU device.')
184                return rpm_controller.SentryRPMController(
185                        hostname=rpm_hostname,
186                        hydra_hostname=hydra_hostname)
187
188
189    def _get_serveruri(self):
190        """
191        Formats the _address and _port into a meaningful URI string.
192
193        @return: URI of this dispatch server.
194        """
195        return 'http://%s:%d' % (self._address, self._port)
196
197
198    def _unregister(self):
199        """
200        Tells the frontend server that this dispatch server is shutting down and
201        to unregister it.
202
203        Called by atexit.
204
205        @raise RPMInfrastructureException: Raised if the dispatch server is
206                                           unable to unregister with the
207                                           frontend server.
208        """
209        logging.info('Dispatch server shutting down. Unregistering with RPM '
210                     'frontend server.')
211        client = xmlrpclib.ServerProxy(self._frontend_server)
212        try:
213            client.unregister_dispatcher(self._get_serveruri())
214        except socket.error as er:
215            err_msg = ('Unable to unregister with frontend server. Error: %s.' %
216                       errno.errorcode[er.errno])
217            logging.error(err_msg)
218            raise RPMInfrastructureException(err_msg)
219
220
221def launch_server_on_unused_port():
222    """
223    Looks up an unused port on this host and launches the xmlrpc server.
224
225    Useful for testing by running multiple dispatch servers on the same host.
226
227    @return: server,port - server object and the port that which it is listening
228             to.
229    """
230    address = socket.gethostbyname(socket.gethostname())
231    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
232    # Set this socket to allow reuse.
233    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
234    sock.bind(('', 0))
235    port = sock.getsockname()[1]
236    server = MultiThreadedXMLRPCServer((address, port),
237                                       allow_none=True)
238    sock.close()
239    return server, port
240
241
242if __name__ == '__main__':
243    """
244    Main function used to launch the dispatch server. Creates an instance of
245    RPMDispatcher and registers it to a MultiThreadedXMLRPCServer instance.
246    """
247    if len(sys.argv) != 2:
248      print 'Usage: ./%s <log_file_name>' % sys.argv[0]
249      sys.exit(1)
250
251    rpm_logging_config.start_log_server(sys.argv[1], LOG_FILENAME_FORMAT)
252    rpm_logging_config.set_up_logging_to_server()
253
254    # Get the local ip _address and set the server to utilize it.
255    address = socket.gethostbyname(socket.gethostname())
256    server, port = launch_server_on_unused_port()
257    rpm_dispatcher = RPMDispatcher(address, port)
258    server.register_instance(rpm_dispatcher)
259    server.serve_forever()
260