1#!/usr/bin/python
2# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6import errno
7import heapq
8import logging
9import os
10import sys
11import socket
12import threading
13import xmlrpclib
14
15import rpm_logging_config
16from config import rpm_config
17from MultiThreadedXMLRPCServer import MultiThreadedXMLRPCServer
18from rpm_infrastructure_exception import RPMInfrastructureException
19
20import common
21from autotest_lib.server import frontend
22from autotest_lib.site_utils.rpm_control_system import utils
23
24DEFAULT_RPM_COUNT = 0
25TERMINATED = -1
26
27# Indexes for accessing heap entries.
28RPM_COUNT = 0
29DISPATCHER_URI = 1
30
31LOG_FILENAME_FORMAT = rpm_config.get('GENERAL','frontend_logname_format')
32DEFAULT_RPM_ID = rpm_config.get('RPM_INFRASTRUCTURE', 'default_rpm_id')
33
34# Valid state values.
35VALID_STATE_VALUES = ['ON', 'OFF', 'CYCLE']
36
37# Servo-interface mapping file
38MAPPING_FILE = os.path.join(
39        os.path.dirname(__file__),
40        rpm_config.get('CiscoPOE', 'servo_interface_mapping_file'))
41
42# Size of the LRU that holds power management unit information related
43# to a device, e.g. rpm_hostname, outlet, hydra_hostname, etc.
44LRU_SIZE = rpm_config.getint('RPM_INFRASTRUCTURE', 'lru_size')
45
46
47class DispatcherDownException(Exception):
48    """Raised when a particular RPMDispatcher is down."""
49
50
51class RPMFrontendServer(object):
52    """
53    This class is the frontend server of the RPM Infrastructure. All clients
54    will send their power state requests to this central server who will
55    forward the requests to an avaliable or already assigned RPM dispatcher
56    server.
57
58    Once the dispatcher processes the request it will return the result
59    to this frontend server who will send the result back to the client.
60
61    All calls to this server are blocking.
62
63    @var _dispatcher_minheap: Min heap that returns a list of format-
64                              [ num_rpm's, dispatcher_uri ]
65                              Used to choose the least loaded dispatcher.
66    @var _entry_dict: Maps dispatcher URI to an entry (list) inside the min
67                     heap. If a dispatcher server shuts down this allows us to
68                     invalidate the entry in the minheap.
69    @var _lock: Used to protect data from multiple running threads all
70                manipulating the same data.
71    @var _rpm_dict: Maps rpm hostname's to an already assigned dispatcher
72                    server.
73    @var _mapping_last_modified: Last-modified time of the servo-interface
74                                 mapping file.
75    @var _servo_interface: Maps servo hostname to (switch_hostname, interface).
76    @var _rpm_info: An LRU cache to hold recently visited rpm information
77                    so that we don't hit AFE too often. The elements in
78                    the cache are instances of PowerUnitInfo indexed by
79                    dut hostnames. POE info is not stored in the cache.
80    @var _afe: AFE instance to talk to autotest. Used to retrieve rpm hostname.
81    @var _email_handler: Email handler to use to control email notifications.
82    """
83
84
85    def __init__(self, email_handler=None):
86        """
87        RPMFrontendServer constructor.
88
89        Initializes instance variables.
90        """
91        self._dispatcher_minheap = []
92        self._entry_dict = {}
93        self._lock = threading.Lock()
94        self._mapping_last_modified = os.path.getmtime(MAPPING_FILE)
95        self._servo_interface = utils.load_servo_interface_mapping()
96        self._rpm_dict = {}
97        self._afe = frontend.AFE()
98        self._rpm_info = utils.LRUCache(size=LRU_SIZE)
99        self._email_handler = email_handler
100
101
102    def set_power_via_poe(self, device_hostname, new_state):
103        """Sets power state of the device to the requested state via POE.
104
105        @param device_hostname: Hostname of the servo to control.
106        @param new_state: [ON, OFF, CYCLE] State to which we want to set the
107                          device's outlet to.
108
109        @return: True if the attempt to change power state was successful,
110                 False otherwise.
111
112        @raise RPMInfrastructureException: No dispatchers are available or can
113                                           be reached.
114        """
115        # Remove any DNS Zone information and simplify down to just the hostname.
116        device_hostname = device_hostname.split('.')[0]
117        new_state = new_state.upper()
118        if new_state not in VALID_STATE_VALUES:
119            logging.error('Received request to set servo %s to invalid '
120                          'state %s', device_hostname, new_state)
121            return False
122        logging.info('Received request to set servo: %s to state: %s',
123                     device_hostname, new_state)
124        powerunit_info = self._get_poe_powerunit_info(device_hostname)
125        try:
126            return self._queue_once(powerunit_info, new_state)
127        except DispatcherDownException:
128            # Retry forwarding the request.
129            return self.set_power_via_poe(device_hostname, new_state)
130
131
132    def set_power_via_rpm(self, device_hostname, rpm_hostname,
133                          rpm_outlet, hydra_hostname, new_state):
134        """Sets power state of a device to the requested state via RPM.
135
136        Unlike the special case of POE, powerunit information is not available
137        on the RPM server, so must be provided as arguments.
138
139        @param device_hostname: Hostname of the servo to control.
140        @param rpm_hostname: Hostname of the RPM to use.
141        @param rpm_outlet: The RPM outlet to control.
142        @param hydra_hostname: If required, the hydra device to SSH through to
143                               get to the RPM.
144        @param new_state: [ON, OFF, CYCLE] State to which we want to set the
145                          device's outlet to.
146
147        @return: True if the attempt to change power state was successful,
148                 False otherwise.
149
150        @raise RPMInfrastructureException: No dispatchers are available or can
151                                           be reached.
152        """
153        powerunit_info = utils.PowerUnitInfo(
154                device_hostname=device_hostname,
155                powerunit_type=utils.PowerUnitInfo.POWERUNIT_TYPES.RPM,
156                powerunit_hostname=rpm_hostname,
157                outlet=rpm_outlet,
158                hydra_hostname=hydra_hostname,
159        )
160        try:
161            return self._queue_once(powerunit_info, new_state)
162        except DispatcherDownException:
163            # Retry forwarding the request.
164            return self.set_power_via_rpm(device_hostname, rpm_hostname,
165                                          rpm_outlet, hydra_hostname, new_state)
166
167
168    def queue_request(self, device_hostname, new_state):
169        """
170        Forwards a request to change a device's (a dut or a servo) power state
171        to the appropriate dispatcher server.
172
173        This call will block until the forwarded request returns.
174
175        @param device_hostname: Hostname of the device whose power state we want
176                                to change.
177        @param new_state: [ON, OFF, CYCLE] State to which we want to set the
178                          device's outlet to.
179
180        @return: True if the attempt to change power state was successful,
181                 False otherwise.
182
183        @raise RPMInfrastructureException: No dispatchers are available or can
184                                           be reached.
185        """
186        # Remove any DNS Zone information and simplify down to just the hostname.
187        device_hostname = device_hostname.split('.')[0]
188        new_state = new_state.upper()
189        # Put new_state in all uppercase letters
190        if new_state not in VALID_STATE_VALUES:
191            logging.error('Received request to set device %s to invalid '
192                          'state %s', device_hostname, new_state)
193            return False
194        logging.info('Received request to set device: %s to state: %s',
195                     device_hostname, new_state)
196        powerunit_info = self._get_powerunit_info(device_hostname)
197        try:
198            return self._queue_once(powerunit_info, new_state)
199        except DispatcherDownException:
200            # Retry forwarding the request.
201            return self.queue_request(device_hostname, new_state)
202
203
204    def _queue_once(self, powerunit_info, new_state):
205        """Queue one request to the dispatcher."""
206        dispatcher_uri = self._get_dispatcher(powerunit_info)
207        if not dispatcher_uri:
208            # No dispatchers available.
209            raise RPMInfrastructureException('No dispatchers available.')
210        client = xmlrpclib.ServerProxy(dispatcher_uri, allow_none=True)
211        try:
212            # Block on the request and return the result once it arrives.
213            return client.queue_request(powerunit_info, new_state)
214        except socket.error as er:
215            # Dispatcher Server is not reachable. Unregister it and retry.
216            logging.error("Can't reach Dispatch Server: %s. Error: %s",
217                          dispatcher_uri, errno.errorcode[er.errno])
218            if self.is_network_infrastructure_down():
219                # No dispatchers can handle this request so raise an Exception
220                # to the caller.
221                raise RPMInfrastructureException('No dispatchers can be'
222                                                 'reached.')
223            logging.info('Will attempt forwarding request to other dispatch '
224                         'servers.')
225            logging.error('Unregistering %s due to error. Recommend resetting '
226                          'that dispatch server.', dispatcher_uri)
227            self.unregister_dispatcher(dispatcher_uri)
228            raise DispatcherDownException(dispatcher_uri)
229
230
231    def is_network_infrastructure_down(self):
232        """
233        Check to see if we can communicate with any dispatcher servers.
234
235        Only called in the situation that queuing a request to a dispatcher
236        server failed.
237
238        @return: False if any dispatcher server is up and the rpm infrastructure
239                 can still function. True otherwise.
240        """
241        for dispatcher_entry in self._dispatcher_minheap:
242            dispatcher = xmlrpclib.ServerProxy(
243                    dispatcher_entry[DISPATCHER_URI], allow_none=True)
244            try:
245                if dispatcher.is_up():
246                    # Atleast one dispatcher is alive so our network is fine.
247                    return False
248            except socket.error:
249                # Can't talk to this dispatcher so keep looping.
250                pass
251        logging.error("Can't reach any dispatchers. Check frontend network "
252                      'status or all dispatchers are down.')
253        return True
254
255
256    def _get_powerunit_info(self, device_hostname):
257        """Get the power management unit information for a device.
258
259        A device could be a chromeos dut or a servo.
260        1) ChromeOS dut
261        Chromeos dut is managed by RPM. The related information
262        we need to know include rpm hostname, rpm outlet, hydra hostname.
263        Such information can be retrieved from afe_host_attributes table
264        from afe. A local LRU cache is used avoid hitting afe too often.
265
266        2) Servo
267        Servo is managed by POE. The related information we need to know
268        include poe hostname, poe interface. Such information is
269        stored in a local file and read into memory.
270
271        @param device_hostname: A string representing the device's hostname.
272
273        @returns: A PowerUnitInfo object.
274        @raises RPMInfrastructureException if failed to get the power
275                unit info.
276
277        """
278        if device_hostname.endswith('servo'):
279            return self._get_poe_powerunit_info(device_hostname)
280        else:
281            return self._get_rpm_powerunit_info(device_hostname)
282
283
284    def _get_poe_powerunit_info(self, device_hostname):
285        """Get the power management unit information for a POE controller.
286
287        Servo is managed by POE. The related information we need to know
288        include poe hostname, poe interface. Such information is
289        stored in a local file and read into memory.
290
291        @param device_hostname: A string representing the device's hostname.
292
293        @returns: A PowerUnitInfo object.
294        @raises RPMInfrastructureException if failed to get the power
295                unit info.
296
297        """
298        with self._lock:
299            reload_info = utils.reload_servo_interface_mapping_if_necessary(
300                    self._mapping_last_modified)
301            if reload_info:
302                self._mapping_last_modified, self._servo_interface = reload_info
303            switch_if_tuple = self._servo_interface.get(device_hostname)
304            if not switch_if_tuple:
305                raise RPMInfrastructureException(
306                        'Could not determine POE hostname for %s. '
307                        'Please check the servo-interface mapping file.',
308                        device_hostname)
309            else:
310                return utils.PowerUnitInfo(
311                        device_hostname=device_hostname,
312                        powerunit_type=utils.PowerUnitInfo.POWERUNIT_TYPES.POE,
313                        powerunit_hostname=switch_if_tuple[0],
314                        outlet=switch_if_tuple[1],
315                        hydra_hostname=None)
316
317
318
319    def _get_rpm_powerunit_info(self, device_hostname):
320        """Get the power management unit information for an RPM controller.
321
322        Chromeos dut is managed by RPM. The related information
323        we need to know include rpm hostname, rpm outlet, hydra hostname.
324        Such information can be retrieved from afe_host_attributes table
325        from afe. A local LRU cache is used avoid hitting afe too often.
326
327        @param device_hostname: A string representing the device's hostname.
328
329        @returns: A PowerUnitInfo object.
330        @raises RPMInfrastructureException if failed to get the power
331                unit info.
332
333        """
334        with self._lock:
335            # Regular DUTs are managed by RPMs.
336            if device_hostname in self._rpm_info:
337                return self._rpm_info[device_hostname]
338            else:
339                hosts = self._afe.get_hosts(hostname=device_hostname)
340                if not hosts:
341                    raise RPMInfrastructureException(
342                            'Can not retrieve rpm information '
343                            'from AFE for %s, no host found.' % device_hostname)
344                else:
345                    info = utils.PowerUnitInfo.get_powerunit_info(hosts[0])
346                    self._rpm_info[device_hostname] = info
347                    return info
348
349
350
351    def _get_dispatcher(self, powerunit_info):
352        """
353        Private method that looks up or assigns a dispatcher server
354        responsible for communicating with the given RPM/POE.
355
356        Will also call _check_dispatcher to make sure it is up before returning
357        it.
358
359        @param powerunit_info: A PowerUnitInfo instance.
360
361        @return: URI of dispatcher server responsible for the rpm/poe.
362                 None if no dispatcher servers are available.
363        """
364        powerunit_type = powerunit_info.powerunit_type
365        powerunit_hostname = powerunit_info.powerunit_hostname
366        with self._lock:
367            if self._rpm_dict.get(powerunit_hostname):
368                return self._rpm_dict[powerunit_hostname]
369            logging.info('No Dispatcher assigned for %s %s.',
370                         powerunit_type, powerunit_hostname)
371            # Choose the least loaded dispatcher to communicate with the RPM.
372            try:
373                heap_entry = heapq.heappop(self._dispatcher_minheap)
374            except IndexError:
375                logging.error('Infrastructure Error: Frontend has no'
376                              'registered dispatchers to field out this '
377                              'request!')
378                return None
379            dispatcher_uri = heap_entry[DISPATCHER_URI]
380            # Put this entry back in the heap with an RPM Count + 1.
381            heap_entry[RPM_COUNT] = heap_entry[RPM_COUNT] + 1
382            heapq.heappush(self._dispatcher_minheap, heap_entry)
383            logging.info('Assigning %s for %s %s', dispatcher_uri,
384                         powerunit_type, powerunit_hostname)
385            self._rpm_dict[powerunit_hostname] = dispatcher_uri
386            return dispatcher_uri
387
388
389    def register_dispatcher(self, dispatcher_uri):
390        """
391        Called by a dispatcher server so that the frontend server knows it is
392        available to field out RPM requests.
393
394        Adds an entry to the min heap and entry map for this dispatcher.
395
396        @param dispatcher_uri: Address of dispatcher server we are registering.
397        """
398        logging.info('Registering uri: %s as a rpm dispatcher.', dispatcher_uri)
399        with self._lock:
400            heap_entry = [DEFAULT_RPM_COUNT, dispatcher_uri]
401            heapq.heappush(self._dispatcher_minheap, heap_entry)
402            self._entry_dict[dispatcher_uri] = heap_entry
403
404
405    def unregister_dispatcher(self, uri_to_unregister):
406        """
407        Called by a dispatcher server as it exits so that the frontend server
408        knows that it is no longer available to field out requests.
409
410        Assigns an rpm count of -1 to this dispatcher so that it will be pushed
411        out of the min heap.
412
413        Removes from _rpm_dict all entries with the value of this dispatcher so
414        that those RPM's can be reassigned to a new dispatcher.
415
416        @param uri_to_unregister: Address of dispatcher server we are
417                                  unregistering.
418        """
419        logging.info('Unregistering uri: %s as a rpm dispatcher.',
420                     uri_to_unregister)
421        with self._lock:
422            heap_entry = self._entry_dict.get(uri_to_unregister)
423            if not heap_entry:
424                logging.warning('%s was not registered.', uri_to_unregister)
425                return
426            # Set this entry's RPM_COUNT to TERMINATED (-1).
427            heap_entry[RPM_COUNT] = TERMINATED
428            # Remove all RPM mappings.
429            for rpm, dispatcher in self._rpm_dict.items():
430                if dispatcher == uri_to_unregister:
431                    self._rpm_dict[rpm] = None
432            self._entry_dict[uri_to_unregister] = None
433            # Re-sort the heap and remove any terminated dispatchers.
434            heapq.heapify(self._dispatcher_minheap)
435            self._remove_terminated_dispatchers()
436
437
438    def _remove_terminated_dispatchers(self):
439        """
440        Peek at the head of the heap and keep popping off values until there is
441        a non-terminated dispatcher at the top.
442        """
443        # Heapq guarantees the head of the heap is in the '0' index.
444        try:
445            # Peek at the next element in the heap.
446            top_of_heap = self._dispatcher_minheap[0]
447            while top_of_heap[RPM_COUNT] is TERMINATED:
448                # Pop off the top element.
449                heapq.heappop(self._dispatcher_minheap)
450                # Peek at the next element in the heap.
451                top_of_heap = self._dispatcher_minheap[0]
452        except IndexError:
453            # No more values in the heap. Can be thrown by both minheap[0]
454            # statements.
455            pass
456
457
458    def suspend_emails(self, hours):
459        """Suspend email notifications.
460
461        @param hours: How many hours to suspend email notifications.
462        """
463        if self._email_handler:
464            self._email_handler.suspend_emails(hours)
465
466
467    def resume_emails(self):
468        """Resume email notifications."""
469        if self._email_handler:
470            self._email_handler.resume_emails()
471
472
473if __name__ == '__main__':
474    """
475    Main function used to launch the frontend server. Creates an instance of
476    RPMFrontendServer and registers it to a MultiThreadedXMLRPCServer instance.
477    """
478    if len(sys.argv) != 2:
479      print 'Usage: ./%s <log_file_dir>.' % sys.argv[0]
480      sys.exit(1)
481
482    email_handler = rpm_logging_config.set_up_logging_to_file(
483            sys.argv[1], LOG_FILENAME_FORMAT)
484    frontend_server = RPMFrontendServer(email_handler=email_handler)
485    # We assume that external clients will always connect to us via the
486    # hostname, so listening on the hostname ensures we pick the right network
487    # interface.
488    address = socket.gethostname()
489    port = rpm_config.getint('RPM_INFRASTRUCTURE', 'frontend_port')
490    server = MultiThreadedXMLRPCServer((address, port), allow_none=True)
491    server.register_instance(frontend_server)
492    logging.info('Listening on %s port %d', address, port)
493    server.serve_forever()
494