1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5 6"""Thread library for drone management. 7 8This library contains a threaded task queue capable of starting, monitoring 9and syncing threads across remote and localhost drones asynchronously. It also 10contains a wrapper for standard python threads that records exceptions so they 11can be re-raised in the thread manager. The api exposed by the threaded task 12queue is as follows: 13 1. worker: The staticmethod executed by all worker threads. 14 2. execute: Takes a list of drones and invokes a worker thread per drone. 15 This method assumes that all drones have a queue of pending calls 16 for execution. 17 3. wait_on_drones: Waits for all worker threads started by execute to finish 18 and raises any exceptions as a consolidated DroneTaskQueueException. 19 4. get_results: Returns the results of all threads as a dictionary keyed 20 on the drones. 21""" 22 23import collections 24import Queue 25import threading 26import logging 27 28import common 29from autotest_lib.client.common_lib.cros.graphite import autotest_stats 30from autotest_lib.scheduler import drone_task_queue 31 32 33class ExceptionRememberingThread(threading.Thread): 34 """A wrapper around regular python threads that records exceptions.""" 35 36 def run(self): 37 """Wrapper around the thread's run method.""" 38 try: 39 with autotest_stats.Timer(self.name): 40 super(ExceptionRememberingThread, self).run() 41 except Exception as self.err: 42 logging.error('%s raised an exception that will be re-raised by ' 43 'the thread pool manager.', self.getName()) 44 else: 45 self.err = None 46 47 48class PersistentTimer(object): 49 """A class to handle timers across local scopes.""" 50 51 def __init__(self, name): 52 """Initialize a persistent timer. 53 54 @param name: The name/key to insert timings under. 55 """ 56 self.name = name 57 self.timer = None 58 59 60 def start(self): 61 """Create and start a new timer.""" 62 self.timer = autotest_stats.Timer(self.name) 63 self.timer.start() 64 65 66 def stop(self): 67 """Stop a previously started timer.""" 68 try: 69 self.timer.stop() 70 except (AssertionError, AttributeError) as e: 71 logging.info('Stopping timer %s failed: %s', self.name, e) 72 finally: 73 self.timer = None 74 75 76class ThreadedTaskQueue(drone_task_queue.DroneTaskQueue): 77 """Threaded implementation of a drone task queue.""" 78 79 result = collections.namedtuple('task', ['drone', 'results']) 80 81 def __init__(self, name='thread_queue'): 82 self.results_queue = Queue.Queue() 83 self.drone_threads = {} 84 self.name = name 85 # The persistent timer is used to measure net time spent 86 # refreshing all drones across 'execute' and 'get_results'. 87 self.timer = PersistentTimer(self.name) 88 89 90 @staticmethod 91 def worker(drone, results_queue): 92 """Worker for task execution. 93 94 Execute calls queued against the given drone and place the return value 95 in results_queue. 96 97 @param drone: A drone with calls to execute. 98 @param results_queue: A queue, into which the worker places 99 ThreadedTaskQueue.result from the drone calls. 100 """ 101 logging.info('(Worker.%s) starting.', drone.hostname) 102 results_queue.put(ThreadedTaskQueue.result( 103 drone, drone.execute_queued_calls())) 104 logging.info('(Worker.%s) finished.', drone.hostname) 105 106 107 def wait_on_drones(self): 108 """Wait on all threads that are currently refreshing a drone. 109 110 @raises DroneTaskQueueException: Consolidated exception for all 111 drone thread exceptions. 112 """ 113 if not self.drone_threads: 114 return 115 # TODO: Make this process more resilient. We can: 116 # 1. Timeout the join. 117 # 2. Kick out the exception/timeout drone. 118 # 3. Selectively retry exceptions. 119 # For now, it is compliant with the single threaded drone manager which 120 # will raise all drone_utility, ssh and drone_manager exceptions. 121 drone_exceptions = [] 122 for drone, thread in self.drone_threads.iteritems(): 123 tname = thread.getName() 124 logging.info('(Task Queue) Waiting for %s', tname) 125 thread.join() 126 if thread.err: 127 drone_exceptions.append((drone, thread.err)) 128 logging.info('(Task Queue) All threads have returned, clearing map.') 129 self.drone_threads = {} 130 if not drone_exceptions: 131 return 132 exception_msg = '' 133 for drone, err in drone_exceptions: 134 exception_msg += ('Drone %s raised Exception %s\n' % 135 (drone.hostname, err)) 136 raise drone_task_queue.DroneTaskQueueException(exception_msg) 137 138 139 def get_results(self): 140 """Get a results dictionary keyed on the drones. 141 142 This method synchronously waits till all drone threads have returned 143 before checking for results. It is meant to be invoked in conjunction 144 with the 'execute' method, which creates a thread per drone. 145 146 @return: A dictionary of return values from the drones. 147 """ 148 self.wait_on_drones() 149 self.timer.stop() 150 results = {} 151 while not self.results_queue.empty(): 152 drone_results = self.results_queue.get() 153 if drone_results.drone in results: 154 raise drone_task_queue.DroneTaskQueueException( 155 'Task queue has recorded results for drone %s: %s' % 156 (drone_results.drone, results)) 157 results[drone_results.drone] = drone_results.results 158 return results 159 160 161 def execute(self, drones, wait=True): 162 """Invoke a thread per drone, to execute drone_utility in parallel. 163 164 @param drones: A list of drones with calls to execute. 165 @param wait: If True, this method will only return when all the drones 166 have returned the result of their respective invocations of 167 drone_utility. The results_queue and drone_threads will be cleared. 168 If False, the caller must clear both the queue and the map before 169 the next invocation of 'execute', by calling 'get_results'. 170 171 @return: A dictionary keyed on the drones, containing a list of return 172 values from the execution of drone_utility. 173 174 @raises DroneManagerError: If the results queue or drone map isn't empty 175 at the time of invocation. 176 """ 177 if not self.results_queue.empty(): 178 raise drone_task_queue.DroneTaskQueueException( 179 'Cannot clobber results queue: %s, it should be cleared ' 180 'through get_results.' % self.results_queue) 181 if self.drone_threads: 182 raise drone_task_queue.DroneTaskQueueException( 183 'Cannot clobber thread map: %s, it should be cleared ' 184 'through wait_on_drones' % self.drone_threads) 185 self.timer.start() 186 for drone in drones: 187 if not drone.get_calls(): 188 continue 189 worker_thread = ExceptionRememberingThread( 190 target=ThreadedTaskQueue.worker, 191 args=(drone, self.results_queue)) 192 # None of these threads are allowed to survive past the tick they 193 # were spawned in, and the scheduler won't die mid-tick, so none 194 # of the threads need to be daemons. However, if the scheduler does 195 # die unexpectedly we can just forsake the daemon threads. 196 self.drone_threads[drone] = worker_thread 197 # The name is only used for debugging 198 worker_thread.setName('%s.%s' % 199 (self.name, drone.hostname.replace('.', '_'))) 200 worker_thread.daemon = True 201 worker_thread.start() 202 return self.get_results() if wait else None 203