1#!/usr/bin/python
2# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6import errno
7import heapq
8import logging
9import os
10import sys
11import socket
12import threading
13import xmlrpclib
14
15import rpm_logging_config
16from config import rpm_config
17from MultiThreadedXMLRPCServer import MultiThreadedXMLRPCServer
18from rpm_infrastructure_exception import RPMInfrastructureException
19
20import common
21from autotest_lib.server import frontend
22from autotest_lib.site_utils.rpm_control_system import utils
23
24DEFAULT_RPM_COUNT = 0
25TERMINATED = -1
26
27# Indexes for accessing heap entries.
28RPM_COUNT = 0
29DISPATCHER_URI = 1
30
31LOG_FILENAME_FORMAT = rpm_config.get('GENERAL','frontend_logname_format')
32DEFAULT_RPM_ID = rpm_config.get('RPM_INFRASTRUCTURE', 'default_rpm_id')
33
34# Valid state values.
35VALID_STATE_VALUES = ['ON', 'OFF', 'CYCLE']
36
37# Servo-interface mapping file
38MAPPING_FILE = os.path.join(
39        os.path.dirname(__file__),
40        rpm_config.get('CiscoPOE', 'servo_interface_mapping_file'))
41
42# Size of the LRU that holds power management unit information related
43# to a device, e.g. rpm_hostname, outlet, hydra_hostname, etc.
44LRU_SIZE = rpm_config.getint('RPM_INFRASTRUCTURE', 'lru_size')
45
46
47class RPMFrontendServer(object):
48    """
49    This class is the frontend server of the RPM Infrastructure. All clients
50    will send their power state requests to this central server who will
51    forward the requests to an avaliable or already assigned RPM dispatcher
52    server.
53
54    Once the dispatcher processes the request it will return the result
55    to this frontend server who will send the result back to the client.
56
57    All calls to this server are blocking.
58
59    @var _dispatcher_minheap: Min heap that returns a list of format-
60                              [ num_rpm's, dispatcher_uri ]
61                              Used to choose the least loaded dispatcher.
62    @var _entry_dict: Maps dispatcher URI to an entry (list) inside the min
63                     heap. If a dispatcher server shuts down this allows us to
64                     invalidate the entry in the minheap.
65    @var _lock: Used to protect data from multiple running threads all
66                manipulating the same data.
67    @var _rpm_dict: Maps rpm hostname's to an already assigned dispatcher
68                    server.
69    @var _mapping_last_modified: Last-modified time of the servo-interface
70                                 mapping file.
71    @var _servo_interface: Maps servo hostname to (switch_hostname, interface).
72    @var _rpm_info: An LRU cache to hold recently visited rpm information
73                    so that we don't hit AFE too often. The elements in
74                    the cache are instances of PowerUnitInfo indexed by
75                    dut hostnames. POE info is not stored in the cache.
76    @var _afe: AFE instance to talk to autotest. Used to retrieve rpm hostname.
77    @var _email_handler: Email handler to use to control email notifications.
78    """
79
80
81    def __init__(self, email_handler=None):
82        """
83        RPMFrontendServer constructor.
84
85        Initializes instance variables.
86        """
87        self._dispatcher_minheap = []
88        self._entry_dict = {}
89        self._lock = threading.Lock()
90        self._mapping_last_modified = os.path.getmtime(MAPPING_FILE)
91        self._servo_interface = utils.load_servo_interface_mapping()
92        self._rpm_dict = {}
93        self._afe = frontend.AFE()
94        self._rpm_info = utils.LRUCache(size=LRU_SIZE)
95        self._email_handler = email_handler
96
97
98    def queue_request(self, device_hostname, new_state):
99        """
100        Forwards a request to change a device's (a dut or a servo) power state
101        to the appropriate dispatcher server.
102
103        This call will block until the forwarded request returns.
104
105        @param device_hostname: Hostname of the device whose power state we want to
106                             change.
107        @param new_state: [ON, OFF, CYCLE] State to which we want to set the
108                          device's outlet to.
109
110        @return: True if the attempt to change power state was successful,
111                 False otherwise.
112
113        @raise RPMInfrastructureException: No dispatchers are available or can
114                                           be reached.
115        """
116        # Remove any DNS Zone information and simplify down to just the hostname.
117        device_hostname = device_hostname.split('.')[0]
118        new_state = new_state.upper()
119        # Put new_state in all uppercase letters
120        if new_state not in VALID_STATE_VALUES:
121            logging.error('Received request to set device %s to invalid '
122                          'state %s', device_hostname, new_state)
123            return False
124        logging.info('Received request to set device: %s to state: %s',
125                     device_hostname, new_state)
126        powerunit_info = self._get_powerunit_info(device_hostname)
127        dispatcher_uri = self._get_dispatcher(powerunit_info)
128        if not dispatcher_uri:
129            # No dispatchers available.
130            raise RPMInfrastructureException('No dispatchers available.')
131        client = xmlrpclib.ServerProxy(dispatcher_uri, allow_none=True)
132        try:
133            # Block on the request and return the result once it arrives.
134            return client.queue_request(powerunit_info, new_state)
135        except socket.error as er:
136            # Dispatcher Server is not reachable. Unregister it and retry.
137            logging.error("Can't reach Dispatch Server: %s. Error: %s",
138                          dispatcher_uri, errno.errorcode[er.errno])
139            if self.is_network_infrastructure_down():
140                # No dispatchers can handle this request so raise an Exception
141                # to the caller.
142                raise RPMInfrastructureException('No dispatchers can be'
143                                                 'reached.')
144            logging.info('Will attempt forwarding request to other dispatch '
145                         'servers.')
146            logging.error('Unregistering %s due to error. Recommend resetting '
147                          'that dispatch server.', dispatcher_uri)
148            self.unregister_dispatcher(dispatcher_uri)
149            # Retry forwarding the request.
150            return self.queue_request(device_hostname, new_state)
151
152
153    def is_network_infrastructure_down(self):
154        """
155        Check to see if we can communicate with any dispatcher servers.
156
157        Only called in the situation that queuing a request to a dispatcher
158        server failed.
159
160        @return: False if any dispatcher server is up and the rpm infrastructure
161                 can still function. True otherwise.
162        """
163        for dispatcher_entry in self._dispatcher_minheap:
164            dispatcher = xmlrpclib.ServerProxy(
165                    dispatcher_entry[DISPATCHER_URI], allow_none=True)
166            try:
167                if dispatcher.is_up():
168                    # Atleast one dispatcher is alive so our network is fine.
169                    return False
170            except socket.error:
171                # Can't talk to this dispatcher so keep looping.
172                pass
173        logging.error("Can't reach any dispatchers. Check frontend network "
174                      'status or all dispatchers are down.')
175        return True
176
177
178    def _get_powerunit_info(self, device_hostname):
179        """Get the power management unit information for a device.
180
181        A device could be a chromeos dut or a servo.
182        1) ChromeOS dut
183        Chromeos dut is managed by RPM. The related information
184        we need to know include rpm hostname, rpm outlet, hydra hostname.
185        Such information can be retrieved from afe_host_attributes table
186        from afe. A local LRU cache is used avoid hitting afe too often.
187
188        2) Servo
189        Servo is managed by POE. The related information we need to know
190        include poe hostname, poe interface. Such information is
191        stored in a local file and read into memory.
192
193        @param device_hostname: A string representing the device's hostname.
194
195        @returns: A PowerUnitInfo object.
196        @raises RPMInfrastructureException if failed to get the power
197                unit info.
198
199        """
200        with self._lock:
201            if device_hostname.endswith('servo'):
202                # Servos are managed by Cisco POE switches.
203                reload_info = utils.reload_servo_interface_mapping_if_necessary(
204                        self._mapping_last_modified)
205                if reload_info:
206                    self._mapping_last_modified, self._servo_interface = reload_info
207                switch_if_tuple = self._servo_interface.get(device_hostname)
208                if not switch_if_tuple:
209                    raise RPMInfrastructureException(
210                            'Could not determine POE hostname for %s. '
211                            'Please check the servo-interface mapping file.',
212                            device_hostname)
213                else:
214                    return utils.PowerUnitInfo(
215                            device_hostname=device_hostname,
216                            powerunit_type=utils.PowerUnitInfo.POWERUNIT_TYPES.POE,
217                            powerunit_hostname=switch_if_tuple[0],
218                            outlet=switch_if_tuple[1],
219                            hydra_hostname=None)
220            else:
221                # Regular DUTs are managed by RPMs.
222                if device_hostname in self._rpm_info:
223                    return self._rpm_info[device_hostname]
224                else:
225                    hosts = self._afe.get_hosts(hostname=device_hostname)
226                    if not hosts:
227                        raise RPMInfrastructureException(
228                                'Can not retrieve rpm information '
229                                'from AFE for %s, no host found.' % device_hostname)
230                    else:
231                        info = utils.PowerUnitInfo.get_powerunit_info(hosts[0])
232                        self._rpm_info[device_hostname] = info
233                        return info
234
235
236    def _get_dispatcher(self, powerunit_info):
237        """
238        Private method that looks up or assigns a dispatcher server
239        responsible for communicating with the given RPM/POE.
240
241        Will also call _check_dispatcher to make sure it is up before returning
242        it.
243
244        @param powerunit_info: A PowerUnitInfo instance.
245
246        @return: URI of dispatcher server responsible for the rpm/poe.
247                 None if no dispatcher servers are available.
248        """
249        powerunit_type = powerunit_info.powerunit_type
250        powerunit_hostname = powerunit_info.powerunit_hostname
251        with self._lock:
252            if self._rpm_dict.get(powerunit_hostname):
253                return self._rpm_dict[powerunit_hostname]
254            logging.info('No Dispatcher assigned for %s %s.',
255                         powerunit_type, powerunit_hostname)
256            # Choose the least loaded dispatcher to communicate with the RPM.
257            try:
258                heap_entry = heapq.heappop(self._dispatcher_minheap)
259            except IndexError:
260                logging.error('Infrastructure Error: Frontend has no'
261                              'registered dispatchers to field out this '
262                              'request!')
263                return None
264            dispatcher_uri = heap_entry[DISPATCHER_URI]
265            # Put this entry back in the heap with an RPM Count + 1.
266            heap_entry[RPM_COUNT] = heap_entry[RPM_COUNT] + 1
267            heapq.heappush(self._dispatcher_minheap, heap_entry)
268            logging.info('Assigning %s for %s %s', dispatcher_uri,
269                         powerunit_type, powerunit_hostname)
270            self._rpm_dict[powerunit_hostname] = dispatcher_uri
271            return dispatcher_uri
272
273
274    def register_dispatcher(self, dispatcher_uri):
275        """
276        Called by a dispatcher server so that the frontend server knows it is
277        available to field out RPM requests.
278
279        Adds an entry to the min heap and entry map for this dispatcher.
280
281        @param dispatcher_uri: Address of dispatcher server we are registering.
282        """
283        logging.info('Registering uri: %s as a rpm dispatcher.', dispatcher_uri)
284        with self._lock:
285            heap_entry = [DEFAULT_RPM_COUNT, dispatcher_uri]
286            heapq.heappush(self._dispatcher_minheap, heap_entry)
287            self._entry_dict[dispatcher_uri] = heap_entry
288
289
290    def unregister_dispatcher(self, uri_to_unregister):
291        """
292        Called by a dispatcher server as it exits so that the frontend server
293        knows that it is no longer available to field out requests.
294
295        Assigns an rpm count of -1 to this dispatcher so that it will be pushed
296        out of the min heap.
297
298        Removes from _rpm_dict all entries with the value of this dispatcher so
299        that those RPM's can be reassigned to a new dispatcher.
300
301        @param uri_to_unregister: Address of dispatcher server we are
302                                  unregistering.
303        """
304        logging.info('Unregistering uri: %s as a rpm dispatcher.',
305                     uri_to_unregister)
306        with self._lock:
307            heap_entry = self._entry_dict.get(uri_to_unregister)
308            if not heap_entry:
309                logging.warning('%s was not registered.', uri_to_unregister)
310                return
311            # Set this entry's RPM_COUNT to TERMINATED (-1).
312            heap_entry[RPM_COUNT] = TERMINATED
313            # Remove all RPM mappings.
314            for rpm, dispatcher in self._rpm_dict.items():
315                if dispatcher == uri_to_unregister:
316                    self._rpm_dict[rpm] = None
317            self._entry_dict[uri_to_unregister] = None
318            # Re-sort the heap and remove any terminated dispatchers.
319            heapq.heapify(self._dispatcher_minheap)
320            self._remove_terminated_dispatchers()
321
322
323    def _remove_terminated_dispatchers(self):
324        """
325        Peek at the head of the heap and keep popping off values until there is
326        a non-terminated dispatcher at the top.
327        """
328        # Heapq guarantees the head of the heap is in the '0' index.
329        try:
330            # Peek at the next element in the heap.
331            top_of_heap = self._dispatcher_minheap[0]
332            while top_of_heap[RPM_COUNT] is TERMINATED:
333                # Pop off the top element.
334                heapq.heappop(self._dispatcher_minheap)
335                # Peek at the next element in the heap.
336                top_of_heap = self._dispatcher_minheap[0]
337        except IndexError:
338            # No more values in the heap. Can be thrown by both minheap[0]
339            # statements.
340            pass
341
342
343    def suspend_emails(self, hours):
344        """Suspend email notifications.
345
346        @param hours: How many hours to suspend email notifications.
347        """
348        if self._email_handler:
349            self._email_handler.suspend_emails(hours)
350
351
352    def resume_emails(self):
353        """Resume email notifications."""
354        if self._email_handler:
355            self._email_handler.resume_emails()
356
357
358if __name__ == '__main__':
359    """
360    Main function used to launch the frontend server. Creates an instance of
361    RPMFrontendServer and registers it to a MultiThreadedXMLRPCServer instance.
362    """
363    if len(sys.argv) > 1:
364      print 'Usage: ./%s, no arguments available.' % sys.argv[0]
365      sys.exit(1)
366    email_handler = rpm_logging_config.set_up_logging(LOG_FILENAME_FORMAT)
367    frontend_server = RPMFrontendServer(email_handler=email_handler)
368    address = rpm_config.get('RPM_INFRASTRUCTURE', 'frontend_addr')
369    port = rpm_config.getint('RPM_INFRASTRUCTURE', 'frontend_port')
370    server = MultiThreadedXMLRPCServer((address, port), allow_none=True)
371    server.register_instance(frontend_server)
372    logging.info('Listening on %s port %d', address, port)
373    server.serve_forever()
374