1#!/usr/bin/python2
2# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6import errno
7import heapq
8import logging
9import os
10import sys
11import socket
12import threading
13import xmlrpclib
14
15import rpm_logging_config
16from config import rpm_config
17from MultiThreadedXMLRPCServer import MultiThreadedXMLRPCServer
18from rpm_infrastructure_exception import RPMInfrastructureException
19
20import common
21from autotest_lib.site_utils.rpm_control_system import utils
22
23DEFAULT_RPM_COUNT = 0
24TERMINATED = -1
25
26# Indexes for accessing heap entries.
27RPM_COUNT = 0
28DISPATCHER_URI = 1
29
30LOG_FILENAME_FORMAT = rpm_config.get('GENERAL','frontend_logname_format')
31DEFAULT_RPM_ID = rpm_config.get('RPM_INFRASTRUCTURE', 'default_rpm_id')
32
33# Valid state values.
34VALID_STATE_VALUES = ['ON', 'OFF', 'CYCLE']
35
36# Servo-interface mapping file
37MAPPING_FILE = os.path.join(
38        os.path.dirname(__file__),
39        rpm_config.get('CiscoPOE', 'servo_interface_mapping_file'))
40
41# Size of the LRU that holds power management unit information related
42# to a device, e.g. rpm_hostname, outlet, hydra_hostname, etc.
43LRU_SIZE = rpm_config.getint('RPM_INFRASTRUCTURE', 'lru_size')
44
45
46class DispatcherDownException(Exception):
47    """Raised when a particular RPMDispatcher is down."""
48
49
50class RPMFrontendServer(object):
51    """
52    This class is the frontend server of the RPM Infrastructure. All clients
53    will send their power state requests to this central server who will
54    forward the requests to an avaliable or already assigned RPM dispatcher
55    server.
56
57    Once the dispatcher processes the request it will return the result
58    to this frontend server who will send the result back to the client.
59
60    All calls to this server are blocking.
61
62    @var _dispatcher_minheap: Min heap that returns a list of format-
63                              [ num_rpm's, dispatcher_uri ]
64                              Used to choose the least loaded dispatcher.
65    @var _entry_dict: Maps dispatcher URI to an entry (list) inside the min
66                     heap. If a dispatcher server shuts down this allows us to
67                     invalidate the entry in the minheap.
68    @var _lock: Used to protect data from multiple running threads all
69                manipulating the same data.
70    @var _rpm_dict: Maps rpm hostname's to an already assigned dispatcher
71                    server.
72    @var _mapping_last_modified: Last-modified time of the servo-interface
73                                 mapping file.
74    @var _servo_interface: Maps servo hostname to (switch_hostname, interface).
75    @var _rpm_info: An LRU cache to hold recently visited rpm information
76                    so that we don't hit AFE too often. The elements in
77                    the cache are instances of PowerUnitInfo indexed by
78                    dut hostnames. POE info is not stored in the cache.
79    @var _afe: AFE instance to talk to autotest. Used to retrieve rpm hostname.
80    @var _email_handler: Email handler to use to control email notifications.
81    """
82
83
84    def __init__(self, email_handler=None):
85        """
86        RPMFrontendServer constructor.
87
88        Initializes instance variables.
89        """
90        self._dispatcher_minheap = []
91        self._entry_dict = {}
92        self._lock = threading.Lock()
93        self._mapping_last_modified = os.path.getmtime(MAPPING_FILE)
94        self._servo_interface = utils.load_servo_interface_mapping()
95        self._rpm_dict = {}
96        self._rpm_info = utils.LRUCache(size=LRU_SIZE)
97        self._email_handler = email_handler
98
99
100    def set_power_via_poe(self, device_hostname, new_state):
101        """Sets power state of the device to the requested state via POE.
102
103        @param device_hostname: Hostname of the servo to control.
104        @param new_state: [ON, OFF, CYCLE] State to which we want to set the
105                          device's outlet to.
106
107        @return: True if the attempt to change power state was successful,
108                 False otherwise.
109
110        @raise RPMInfrastructureException: No dispatchers are available or can
111                                           be reached.
112        """
113        # Remove any DNS Zone information and simplify down to just the hostname.
114        device_hostname = device_hostname.split('.')[0]
115        new_state = new_state.upper()
116        if new_state not in VALID_STATE_VALUES:
117            logging.error('Received request to set servo %s to invalid '
118                          'state %s', device_hostname, new_state)
119            return False
120        logging.info('Received request to set servo: %s to state: %s',
121                     device_hostname, new_state)
122        powerunit_info = self._get_poe_powerunit_info(device_hostname)
123        try:
124            return self._queue_once(powerunit_info, new_state)
125        except DispatcherDownException:
126            # Retry forwarding the request.
127            return self.set_power_via_poe(device_hostname, new_state)
128
129
130    def set_power_via_rpm(self, device_hostname, rpm_hostname,
131                          rpm_outlet, hydra_hostname, new_state):
132        """Sets power state of a device to the requested state via RPM.
133
134        Unlike the special case of POE, powerunit information is not available
135        on the RPM server, so must be provided as arguments.
136
137        @param device_hostname: Hostname of the servo to control.
138        @param rpm_hostname: Hostname of the RPM to use.
139        @param rpm_outlet: The RPM outlet to control.
140        @param hydra_hostname: If required, the hydra device to SSH through to
141                               get to the RPM.
142        @param new_state: [ON, OFF, CYCLE] State to which we want to set the
143                          device's outlet to.
144
145        @return: True if the attempt to change power state was successful,
146                 False otherwise.
147
148        @raise RPMInfrastructureException: No dispatchers are available or can
149                                           be reached.
150        """
151        new_state = new_state.upper()
152        if new_state not in VALID_STATE_VALUES:
153            logging.error('Received request to set device %s to invalid '
154                          'state %s', device_hostname, new_state)
155            return False
156        logging.info('Received request to set device: %s to state: %s',
157                     device_hostname, new_state)
158
159        powerunit_info = utils.PowerUnitInfo(
160                device_hostname=device_hostname,
161                powerunit_type=utils.PowerUnitInfo.POWERUNIT_TYPES.RPM,
162                powerunit_hostname=rpm_hostname,
163                outlet=rpm_outlet,
164                hydra_hostname=hydra_hostname,
165        )
166        try:
167            return self._queue_once(powerunit_info, new_state)
168        except DispatcherDownException:
169            # Retry forwarding the request.
170            return self.set_power_via_rpm(device_hostname, rpm_hostname,
171                                          rpm_outlet, hydra_hostname, new_state)
172
173
174    def _queue_once(self, powerunit_info, new_state):
175        """Queue one request to the dispatcher."""
176        dispatcher_uri = self._get_dispatcher(powerunit_info)
177        if not dispatcher_uri:
178            # No dispatchers available.
179            raise RPMInfrastructureException('No dispatchers available.')
180        client = xmlrpclib.ServerProxy(dispatcher_uri, allow_none=True)
181        try:
182            # Block on the request and return the result once it arrives.
183            return client.queue_request(powerunit_info, new_state)
184        except socket.error as er:
185            # Dispatcher Server is not reachable. Unregister it and retry.
186            logging.error("Can't reach Dispatch Server: %s. Error: %s",
187                          dispatcher_uri, errno.errorcode[er.errno])
188            if self.is_network_infrastructure_down():
189                # No dispatchers can handle this request so raise an Exception
190                # to the caller.
191                raise RPMInfrastructureException('No dispatchers can be'
192                                                 'reached.')
193            logging.info('Will attempt forwarding request to other dispatch '
194                         'servers.')
195            logging.error('Unregistering %s due to error. Recommend resetting '
196                          'that dispatch server.', dispatcher_uri)
197            self.unregister_dispatcher(dispatcher_uri)
198            raise DispatcherDownException(dispatcher_uri)
199
200
201    def is_network_infrastructure_down(self):
202        """
203        Check to see if we can communicate with any dispatcher servers.
204
205        Only called in the situation that queuing a request to a dispatcher
206        server failed.
207
208        @return: False if any dispatcher server is up and the rpm infrastructure
209                 can still function. True otherwise.
210        """
211        for dispatcher_entry in self._dispatcher_minheap:
212            dispatcher = xmlrpclib.ServerProxy(
213                    dispatcher_entry[DISPATCHER_URI], allow_none=True)
214            try:
215                if dispatcher.is_up():
216                    # Atleast one dispatcher is alive so our network is fine.
217                    return False
218            except socket.error:
219                # Can't talk to this dispatcher so keep looping.
220                pass
221        logging.error("Can't reach any dispatchers. Check frontend network "
222                      'status or all dispatchers are down.')
223        return True
224
225
226    def _get_poe_powerunit_info(self, device_hostname):
227        """Get the power management unit information for a POE controller.
228
229        Servo is managed by POE. The related information we need to know
230        include poe hostname, poe interface. Such information is
231        stored in a local file and read into memory.
232
233        @param device_hostname: A string representing the device's hostname.
234
235        @returns: A PowerUnitInfo object.
236        @raises RPMInfrastructureException if failed to get the power
237                unit info.
238
239        """
240        with self._lock:
241            reload_info = utils.reload_servo_interface_mapping_if_necessary(
242                    self._mapping_last_modified)
243            if reload_info:
244                self._mapping_last_modified, self._servo_interface = reload_info
245            switch_if_tuple = self._servo_interface.get(device_hostname)
246            if not switch_if_tuple:
247                raise RPMInfrastructureException(
248                        'Could not determine POE hostname for %s. '
249                        'Please check the servo-interface mapping file.',
250                        device_hostname)
251            else:
252                return utils.PowerUnitInfo(
253                        device_hostname=device_hostname,
254                        powerunit_type=utils.PowerUnitInfo.POWERUNIT_TYPES.POE,
255                        powerunit_hostname=switch_if_tuple[0],
256                        outlet=switch_if_tuple[1],
257                        hydra_hostname=None)
258
259
260    def _get_dispatcher(self, powerunit_info):
261        """
262        Private method that looks up or assigns a dispatcher server
263        responsible for communicating with the given RPM/POE.
264
265        Will also call _check_dispatcher to make sure it is up before returning
266        it.
267
268        @param powerunit_info: A PowerUnitInfo instance.
269
270        @return: URI of dispatcher server responsible for the rpm/poe.
271                 None if no dispatcher servers are available.
272        """
273        powerunit_type = powerunit_info.powerunit_type
274        powerunit_hostname = powerunit_info.powerunit_hostname
275        with self._lock:
276            if self._rpm_dict.get(powerunit_hostname):
277                return self._rpm_dict[powerunit_hostname]
278            logging.info('No Dispatcher assigned for %s %s.',
279                         powerunit_type, powerunit_hostname)
280            # Choose the least loaded dispatcher to communicate with the RPM.
281            try:
282                heap_entry = heapq.heappop(self._dispatcher_minheap)
283            except IndexError:
284                logging.error('Infrastructure Error: Frontend has no'
285                              'registered dispatchers to field out this '
286                              'request!')
287                return None
288            dispatcher_uri = heap_entry[DISPATCHER_URI]
289            # Put this entry back in the heap with an RPM Count + 1.
290            heap_entry[RPM_COUNT] = heap_entry[RPM_COUNT] + 1
291            heapq.heappush(self._dispatcher_minheap, heap_entry)
292            logging.info('Assigning %s for %s %s', dispatcher_uri,
293                         powerunit_type, powerunit_hostname)
294            self._rpm_dict[powerunit_hostname] = dispatcher_uri
295            return dispatcher_uri
296
297
298    def register_dispatcher(self, dispatcher_uri):
299        """
300        Called by a dispatcher server so that the frontend server knows it is
301        available to field out RPM requests.
302
303        Adds an entry to the min heap and entry map for this dispatcher.
304
305        @param dispatcher_uri: Address of dispatcher server we are registering.
306        """
307        logging.info('Registering uri: %s as a rpm dispatcher.', dispatcher_uri)
308        with self._lock:
309            heap_entry = [DEFAULT_RPM_COUNT, dispatcher_uri]
310            heapq.heappush(self._dispatcher_minheap, heap_entry)
311            self._entry_dict[dispatcher_uri] = heap_entry
312
313
314    def unregister_dispatcher(self, uri_to_unregister):
315        """
316        Called by a dispatcher server as it exits so that the frontend server
317        knows that it is no longer available to field out requests.
318
319        Assigns an rpm count of -1 to this dispatcher so that it will be pushed
320        out of the min heap.
321
322        Removes from _rpm_dict all entries with the value of this dispatcher so
323        that those RPM's can be reassigned to a new dispatcher.
324
325        @param uri_to_unregister: Address of dispatcher server we are
326                                  unregistering.
327        """
328        logging.info('Unregistering uri: %s as a rpm dispatcher.',
329                     uri_to_unregister)
330        with self._lock:
331            heap_entry = self._entry_dict.get(uri_to_unregister)
332            if not heap_entry:
333                logging.warning('%s was not registered.', uri_to_unregister)
334                return
335            # Set this entry's RPM_COUNT to TERMINATED (-1).
336            heap_entry[RPM_COUNT] = TERMINATED
337            # Remove all RPM mappings.
338            for rpm, dispatcher in self._rpm_dict.items():
339                if dispatcher == uri_to_unregister:
340                    self._rpm_dict[rpm] = None
341            self._entry_dict[uri_to_unregister] = None
342            # Re-sort the heap and remove any terminated dispatchers.
343            heapq.heapify(self._dispatcher_minheap)
344            self._remove_terminated_dispatchers()
345
346
347    def _remove_terminated_dispatchers(self):
348        """
349        Peek at the head of the heap and keep popping off values until there is
350        a non-terminated dispatcher at the top.
351        """
352        # Heapq guarantees the head of the heap is in the '0' index.
353        try:
354            # Peek at the next element in the heap.
355            top_of_heap = self._dispatcher_minheap[0]
356            while top_of_heap[RPM_COUNT] is TERMINATED:
357                # Pop off the top element.
358                heapq.heappop(self._dispatcher_minheap)
359                # Peek at the next element in the heap.
360                top_of_heap = self._dispatcher_minheap[0]
361        except IndexError:
362            # No more values in the heap. Can be thrown by both minheap[0]
363            # statements.
364            pass
365
366
367    def suspend_emails(self, hours):
368        """Suspend email notifications.
369
370        @param hours: How many hours to suspend email notifications.
371        """
372        if self._email_handler:
373            self._email_handler.suspend_emails(hours)
374
375
376    def resume_emails(self):
377        """Resume email notifications."""
378        if self._email_handler:
379            self._email_handler.resume_emails()
380
381
382if __name__ == '__main__':
383    """
384    Main function used to launch the frontend server. Creates an instance of
385    RPMFrontendServer and registers it to a MultiThreadedXMLRPCServer instance.
386    """
387    if len(sys.argv) != 2:
388      print 'Usage: ./%s <log_file_dir>.' % sys.argv[0]
389      sys.exit(1)
390
391    email_handler = rpm_logging_config.set_up_logging_to_file(
392            sys.argv[1], LOG_FILENAME_FORMAT)
393    frontend_server = RPMFrontendServer(email_handler=email_handler)
394    # We assume that external clients will always connect to us via the
395    # hostname, so listening on the hostname ensures we pick the right network
396    # interface.
397    address = socket.gethostname()
398    port = rpm_config.getint('RPM_INFRASTRUCTURE', 'frontend_port')
399    server = MultiThreadedXMLRPCServer((address, port), allow_none=True)
400    server.register_instance(frontend_server)
401    logging.info('Listening on %s port %d', address, port)
402    server.serve_forever()
403