1# Copyright (c) 2011 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""Utility classes used by site_server_job.distribute_across_machines(). 6 7test_item: extends the basic test tuple to add include/exclude attributes and 8 pre/post actions. 9 10machine_worker: is a thread that manages running tests on a host. It 11 verifies test are valid for a host using the test attributes from test_item 12 and the host attributes from host_attributes. 13""" 14 15 16import logging, os, Queue 17from autotest_lib.client.common_lib import error, utils 18from autotest_lib.server import autotest, hosts, host_attributes 19 20 21class test_item(object): 22 """Adds machine verification logic to the basic test tuple. 23 24 Tests can either be tuples of the existing form ('testName', {args}) or the 25 extended form ('testname', {args}, {'include': [], 'exclude': [], 26 'attributes': []}) where include and exclude are lists of host attribute 27 labels and attributes is a list of strings. A machine must have all the 28 labels in include and must not have any of the labels in exclude to be valid 29 for the test. Attributes strings can include reboot_before, reboot_after, 30 and server_job. 31 """ 32 33 def __init__(self, test_name, test_args, test_attribs=None): 34 """Creates an instance of test_item. 35 36 Args: 37 test_name: string, name of test to execute. 38 test_args: dictionary, arguments to pass into test. 39 test_attribs: Dictionary of test attributes. Valid keys are: 40 include - labels a machine must have to run a test. 41 exclude - labels preventing a machine from running a test. 42 attributes - reboot before/after test, run test as server job. 43 """ 44 self.test_name = test_name 45 self.test_args = test_args 46 self.tagged_test_name = test_name 47 if test_args.get('tag'): 48 self.tagged_test_name = test_name + '.' + test_args.get('tag') 49 50 if test_attribs is None: 51 test_attribs = {} 52 self.inc_set = set(test_attribs.get('include', [])) 53 self.exc_set = set(test_attribs.get('exclude', [])) 54 self.attributes = test_attribs.get('attributes', []) 55 56 def __str__(self): 57 """Return an info string of this test.""" 58 params = ['%s=%s' % (k, v) for k, v in self.test_args.items()] 59 msg = '%s(%s)' % (self.test_name, params) 60 if self.inc_set: 61 msg += ' include=%s' % [s for s in self.inc_set] 62 if self.exc_set: 63 msg += ' exclude=%s' % [s for s in self.exc_set] 64 if self.attributes: 65 msg += ' attributes=%s' % self.attributes 66 return msg 67 68 def validate(self, machine_attributes): 69 """Check if this test can run on machine with machine_attributes. 70 71 If the test has include attributes, a candidate machine must have all 72 the attributes to be valid. 73 74 If the test has exclude attributes, a candidate machine cannot have any 75 of the attributes to be valid. 76 77 Args: 78 machine_attributes: set, True attributes of candidate machine. 79 80 Returns: 81 True/False if the machine is valid for this test. 82 """ 83 if self.inc_set is not None: 84 if not self.inc_set <= machine_attributes: 85 return False 86 if self.exc_set is not None: 87 if self.exc_set & machine_attributes: 88 return False 89 return True 90 91 def run_test(self, client_at, work_dir='.', server_job=None): 92 """Runs the test on the client using autotest. 93 94 Args: 95 client_at: Autotest instance for this host. 96 work_dir: Directory to use for results and log files. 97 server_job: Server_Job instance to use to runs server tests. 98 """ 99 if 'reboot_before' in self.attributes: 100 client_at.host.reboot() 101 102 try: 103 if 'server_job' in self.attributes: 104 if 'host' in self.test_args: 105 self.test_args['host'] = client_at.host 106 if server_job is not None: 107 logging.info('Running Server_Job=%s', self.test_name) 108 server_job.run_test(self.test_name, **self.test_args) 109 else: 110 logging.error('No Server_Job instance provided for test ' 111 '%s.', self.test_name) 112 else: 113 client_at.run_test(self.test_name, results_dir=work_dir, 114 **self.test_args) 115 finally: 116 if 'reboot_after' in self.attributes: 117 client_at.host.reboot() 118 119 120class machine_worker(object): 121 """Worker that runs tests on a remote host machine.""" 122 123 def __init__(self, server_job, machine, work_dir, test_queue, queue_lock, 124 continuous_parsing=False): 125 """Creates an instance of machine_worker to run tests on a remote host. 126 127 Retrieves that host attributes for this machine and creates the set of 128 True attributes to validate against test include/exclude attributes. 129 130 Creates a directory to hold the log files for tests run and writes the 131 hostname and tko parser version into keyvals file. 132 133 Args: 134 server_job: run tests for this server_job. 135 machine: name of remote host. 136 work_dir: directory server job is using. 137 test_queue: queue of tests. 138 queue_lock: lock protecting test_queue. 139 continuous_parsing: bool, enable continuous parsing. 140 """ 141 self._server_job = server_job 142 self._test_queue = test_queue 143 self._test_queue_lock = queue_lock 144 self._continuous_parsing = continuous_parsing 145 self._tests_run = 0 146 self._machine = machine 147 self._host = hosts.create_host(self._machine) 148 self._client_at = autotest.Autotest(self._host) 149 client_attributes = host_attributes.host_attributes(machine) 150 self.attribute_set = set(client_attributes.get_attributes()) 151 self._results_dir = work_dir 152 if not os.path.exists(self._results_dir): 153 os.makedirs(self._results_dir) 154 machine_data = {'hostname': self._machine, 155 'status_version': str(1)} 156 utils.write_keyval(self._results_dir, machine_data) 157 158 def __str__(self): 159 attributes = [a for a in self.attribute_set] 160 return '%s attributes=%s' % (self._machine, attributes) 161 162 def get_test(self): 163 """Return a test from the queue to run on this host. 164 165 The test queue can be non-empty, but still not contain a test that is 166 valid for this machine. This function will take exclusive access to 167 the queue via _test_queue_lock and repeatedly pop tests off the queue 168 until finding a valid test or depleting the queue. In either case if 169 invalid tests have been popped from the queue, they are pushed back 170 onto the queue before returning. 171 172 Returns: 173 test_item, or None if no more tests exist for this machine. 174 """ 175 good_test = None 176 skipped_tests = [] 177 178 with self._test_queue_lock: 179 while True: 180 try: 181 canidate_test = self._test_queue.get_nowait() 182 # Check if test is valid for this machine. 183 if canidate_test.validate(self.attribute_set): 184 good_test = canidate_test 185 break 186 skipped_tests.append(canidate_test) 187 188 except Queue.Empty: 189 break 190 191 # Return any skipped tests to the queue. 192 for st in skipped_tests: 193 self._test_queue.put(st) 194 195 return good_test 196 197 def run(self): 198 """Executes tests on the host machine. 199 200 If continuous parsing was requested, start the parser before running 201 tests. 202 """ 203 # Modify job.resultdir so that it points to the results directory for 204 # the machine we're working on. Required so that server jobs will write 205 # to the proper location. 206 self._server_job.machines = [self._machine] 207 self._server_job.push_execution_context(self._machine['hostname']) 208 os.chdir(self._server_job.resultdir) 209 if self._continuous_parsing: 210 self._server_job._parse_job += "/" + self._machine['hostname'] 211 self._server_job._using_parser = True 212 self._server_job.init_parser() 213 214 while True: 215 active_test = self.get_test() 216 if active_test is None: 217 break 218 219 logging.info('%s running %s', self._machine, active_test) 220 try: 221 active_test.run_test(self._client_at, self._results_dir, 222 self._server_job) 223 except error.AutoservError: 224 logging.exception('Autoserv error running "%s".', active_test) 225 except error.AutotestError: 226 logging.exception('Autotest error running "%s".', active_test) 227 except Exception: 228 logging.exception('Exception running test "%s".', active_test) 229 raise 230 finally: 231 self._test_queue.task_done() 232 self._tests_run += 1 233 234 if self._continuous_parsing: 235 self._server_job.cleanup_parser() 236 logging.info('%s completed %d tests.', self._machine, self._tests_run) 237