1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5
6"""Thread library for drone management.
7
8This library contains a threaded task queue capable of starting, monitoring
9and syncing threads across remote and localhost drones asynchronously. It also
10contains a wrapper for standard python threads that records exceptions so they
11can be re-raised in the thread manager. The api exposed by the threaded task
12queue is as follows:
13    1. worker: The staticmethod executed by all worker threads.
14    2. execute: Takes a list of drones and invokes a worker thread per drone.
15        This method assumes that all drones have a queue of pending calls
16        for execution.
17    3. wait_on_drones: Waits for all worker threads started by execute to finish
18        and raises any exceptions as a consolidated DroneTaskQueueException.
19    4. get_results: Returns the results of all threads as a dictionary keyed
20        on the drones.
21"""
22
23import collections
24import Queue
25import threading
26import logging
27
28import common
29from autotest_lib.scheduler import drone_task_queue
30
31
32class ExceptionRememberingThread(threading.Thread):
33    """A wrapper around regular python threads that records exceptions."""
34
35    def run(self):
36        """Wrapper around the thread's run method."""
37        try:
38            super(ExceptionRememberingThread, self).run()
39        except Exception as self.err:
40            logging.error('%s raised an exception that will be re-raised by '
41                          'the thread pool manager.', self.getName())
42        else:
43            self.err = None
44
45
46class ThreadedTaskQueue(drone_task_queue.DroneTaskQueue):
47    """Threaded implementation of a drone task queue."""
48
49    result = collections.namedtuple('task', ['drone', 'results'])
50
51    def __init__(self, name='thread_queue'):
52        self.results_queue = Queue.Queue()
53        self.drone_threads = {}
54        self.name = name
55
56
57    @staticmethod
58    def worker(drone, results_queue):
59        """Worker for task execution.
60
61        Execute calls queued against the given drone and place the return value
62        in results_queue.
63
64        @param drone: A drone with calls to execute.
65        @param results_queue: A queue, into which the worker places
66            ThreadedTaskQueue.result from the drone calls.
67        """
68        logging.info('(Worker.%s) starting.', drone.hostname)
69        results_queue.put(ThreadedTaskQueue.result(
70            drone, drone.execute_queued_calls()))
71        logging.info('(Worker.%s) finished.', drone.hostname)
72
73
74    def wait_on_drones(self):
75        """Wait on all threads that are currently refreshing a drone.
76
77        @raises DroneTaskQueueException: Consolidated exception for all
78            drone thread exceptions.
79        """
80        if not self.drone_threads:
81            return
82        # TODO: Make this process more resilient. We can:
83        # 1. Timeout the join.
84        # 2. Kick out the exception/timeout drone.
85        # 3. Selectively retry exceptions.
86        # For now, it is compliant with the single threaded drone manager which
87        # will raise all drone_utility, ssh and drone_manager exceptions.
88        drone_exceptions = []
89        for drone, thread in self.drone_threads.iteritems():
90            tname = thread.getName()
91            logging.info('(Task Queue) Waiting for %s', tname)
92            thread.join()
93            if thread.err:
94                drone_exceptions.append((drone, thread.err))
95        logging.info('(Task Queue) All threads have returned, clearing map.')
96        self.drone_threads = {}
97        if not drone_exceptions:
98            return
99        exception_msg = ''
100        for drone, err in drone_exceptions:
101            exception_msg += ('Drone %s raised Exception %s\n' %
102                              (drone.hostname, err))
103        raise drone_task_queue.DroneTaskQueueException(exception_msg)
104
105
106    def get_results(self):
107        """Get a results dictionary keyed on the drones.
108
109        This method synchronously waits till all drone threads have returned
110        before checking for results. It is meant to be invoked in conjunction
111        with the 'execute' method, which creates a thread per drone.
112
113        @return: A dictionary of return values from the drones.
114        """
115        self.wait_on_drones()
116        results = {}
117        while not self.results_queue.empty():
118            drone_results = self.results_queue.get()
119            if drone_results.drone in results:
120                raise drone_task_queue.DroneTaskQueueException(
121                        'Task queue has recorded results for drone %s: %s' %
122                        (drone_results.drone, results))
123            results[drone_results.drone] = drone_results.results
124        return results
125
126
127    def execute(self, drones, wait=True):
128        """Invoke a thread per drone, to execute drone_utility in parallel.
129
130        @param drones: A list of drones with calls to execute.
131        @param wait: If True, this method will only return when all the drones
132            have returned the result of their respective invocations of
133            drone_utility. The results_queue and drone_threads will be cleared.
134            If False, the caller must clear both the queue and the map before
135            the next invocation of 'execute', by calling 'get_results'.
136
137        @return: A dictionary keyed on the drones, containing a list of return
138            values from the execution of drone_utility.
139
140        @raises DroneManagerError: If the results queue or drone map isn't empty
141            at the time of invocation.
142        """
143        if not self.results_queue.empty():
144            raise drone_task_queue.DroneTaskQueueException(
145                    'Cannot clobber results queue: %s, it should be cleared '
146                    'through get_results.' % self.results_queue)
147        if self.drone_threads:
148            raise drone_task_queue.DroneTaskQueueException(
149                    'Cannot clobber thread map: %s, it should be cleared '
150                    'through wait_on_drones' % self.drone_threads)
151        for drone in drones:
152            if not drone.get_calls():
153                continue
154            worker_thread = ExceptionRememberingThread(
155                    target=ThreadedTaskQueue.worker,
156                    args=(drone, self.results_queue))
157            # None of these threads are allowed to survive past the tick they
158            # were spawned in, and the scheduler won't die mid-tick, so none
159            # of the threads need to be daemons. However, if the scheduler does
160            # die unexpectedly we can just forsake the daemon threads.
161            self.drone_threads[drone] = worker_thread
162            # The name is only used for debugging
163            worker_thread.setName('%s.%s' %
164                                  (self.name, drone.hostname.replace('.', '_')))
165            worker_thread.daemon = True
166            worker_thread.start()
167        return self.get_results() if wait else None
168