1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5
6"""Thread library for drone management.
7
8This library contains a threaded task queue capable of starting, monitoring
9and syncing threads across remote and localhost drones asynchronously. It also
10contains a wrapper for standard python threads that records exceptions so they
11can be re-raised in the thread manager. The api exposed by the threaded task
12queue is as follows:
13    1. worker: The staticmethod executed by all worker threads.
14    2. execute: Takes a list of drones and invokes a worker thread per drone.
15        This method assumes that all drones have a queue of pending calls
16        for execution.
17    3. wait_on_drones: Waits for all worker threads started by execute to finish
18        and raises any exceptions as a consolidated DroneTaskQueueException.
19    4. get_results: Returns the results of all threads as a dictionary keyed
20        on the drones.
21"""
22
23import collections
24import Queue
25import threading
26import logging
27
28import common
29from autotest_lib.client.common_lib.cros.graphite import autotest_stats
30from autotest_lib.scheduler import drone_task_queue
31
32
33class ExceptionRememberingThread(threading.Thread):
34    """A wrapper around regular python threads that records exceptions."""
35
36    def run(self):
37        """Wrapper around the thread's run method."""
38        try:
39            with autotest_stats.Timer(self.name):
40                super(ExceptionRememberingThread, self).run()
41        except Exception as self.err:
42            logging.error('%s raised an exception that will be re-raised by '
43                          'the thread pool manager.', self.getName())
44        else:
45            self.err = None
46
47
48class PersistentTimer(object):
49    """A class to handle timers across local scopes."""
50
51    def __init__(self, name):
52        """Initialize a persistent timer.
53
54        @param name: The name/key to insert timings under.
55        """
56        self.name = name
57        self.timer = None
58
59
60    def start(self):
61        """Create and start a new timer."""
62        self.timer = autotest_stats.Timer(self.name)
63        self.timer.start()
64
65
66    def stop(self):
67        """Stop a previously started timer."""
68        try:
69            self.timer.stop()
70        except (AssertionError, AttributeError) as e:
71            logging.info('Stopping timer %s failed: %s', self.name, e)
72        finally:
73            self.timer = None
74
75
76class ThreadedTaskQueue(drone_task_queue.DroneTaskQueue):
77    """Threaded implementation of a drone task queue."""
78
79    result = collections.namedtuple('task', ['drone', 'results'])
80
81    def __init__(self, name='thread_queue'):
82        self.results_queue = Queue.Queue()
83        self.drone_threads = {}
84        self.name = name
85        # The persistent timer is used to measure net time spent
86        # refreshing all drones across 'execute' and 'get_results'.
87        self.timer = PersistentTimer(self.name)
88
89
90    @staticmethod
91    def worker(drone, results_queue):
92        """Worker for task execution.
93
94        Execute calls queued against the given drone and place the return value
95        in results_queue.
96
97        @param drone: A drone with calls to execute.
98        @param results_queue: A queue, into which the worker places
99            ThreadedTaskQueue.result from the drone calls.
100        """
101        logging.info('(Worker.%s) starting.', drone.hostname)
102        results_queue.put(ThreadedTaskQueue.result(
103            drone, drone.execute_queued_calls()))
104        logging.info('(Worker.%s) finished.', drone.hostname)
105
106
107    def wait_on_drones(self):
108        """Wait on all threads that are currently refreshing a drone.
109
110        @raises DroneTaskQueueException: Consolidated exception for all
111            drone thread exceptions.
112        """
113        if not self.drone_threads:
114            return
115        # TODO: Make this process more resilient. We can:
116        # 1. Timeout the join.
117        # 2. Kick out the exception/timeout drone.
118        # 3. Selectively retry exceptions.
119        # For now, it is compliant with the single threaded drone manager which
120        # will raise all drone_utility, ssh and drone_manager exceptions.
121        drone_exceptions = []
122        for drone, thread in self.drone_threads.iteritems():
123            tname = thread.getName()
124            logging.info('(Task Queue) Waiting for %s', tname)
125            thread.join()
126            if thread.err:
127                drone_exceptions.append((drone, thread.err))
128        logging.info('(Task Queue) All threads have returned, clearing map.')
129        self.drone_threads = {}
130        if not drone_exceptions:
131            return
132        exception_msg = ''
133        for drone, err in drone_exceptions:
134            exception_msg += ('Drone %s raised Exception %s\n' %
135                              (drone.hostname, err))
136        raise drone_task_queue.DroneTaskQueueException(exception_msg)
137
138
139    def get_results(self):
140        """Get a results dictionary keyed on the drones.
141
142        This method synchronously waits till all drone threads have returned
143        before checking for results. It is meant to be invoked in conjunction
144        with the 'execute' method, which creates a thread per drone.
145
146        @return: A dictionary of return values from the drones.
147        """
148        self.wait_on_drones()
149        self.timer.stop()
150        results = {}
151        while not self.results_queue.empty():
152            drone_results = self.results_queue.get()
153            if drone_results.drone in results:
154                raise drone_task_queue.DroneTaskQueueException(
155                        'Task queue has recorded results for drone %s: %s' %
156                        (drone_results.drone, results))
157            results[drone_results.drone] = drone_results.results
158        return results
159
160
161    def execute(self, drones, wait=True):
162        """Invoke a thread per drone, to execute drone_utility in parallel.
163
164        @param drones: A list of drones with calls to execute.
165        @param wait: If True, this method will only return when all the drones
166            have returned the result of their respective invocations of
167            drone_utility. The results_queue and drone_threads will be cleared.
168            If False, the caller must clear both the queue and the map before
169            the next invocation of 'execute', by calling 'get_results'.
170
171        @return: A dictionary keyed on the drones, containing a list of return
172            values from the execution of drone_utility.
173
174        @raises DroneManagerError: If the results queue or drone map isn't empty
175            at the time of invocation.
176        """
177        if not self.results_queue.empty():
178            raise drone_task_queue.DroneTaskQueueException(
179                    'Cannot clobber results queue: %s, it should be cleared '
180                    'through get_results.' % self.results_queue)
181        if self.drone_threads:
182            raise drone_task_queue.DroneTaskQueueException(
183                    'Cannot clobber thread map: %s, it should be cleared '
184                    'through wait_on_drones' % self.drone_threads)
185        self.timer.start()
186        for drone in drones:
187            if not drone.get_calls():
188                continue
189            worker_thread = ExceptionRememberingThread(
190                    target=ThreadedTaskQueue.worker,
191                    args=(drone, self.results_queue))
192            # None of these threads are allowed to survive past the tick they
193            # were spawned in, and the scheduler won't die mid-tick, so none
194            # of the threads need to be daemons. However, if the scheduler does
195            # die unexpectedly we can just forsake the daemon threads.
196            self.drone_threads[drone] = worker_thread
197            # The name is only used for debugging
198            worker_thread.setName('%s.%s' %
199                                  (self.name, drone.hostname.replace('.', '_')))
200            worker_thread.daemon = True
201            worker_thread.start()
202        return self.get_results() if wait else None
203