1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import datetime 6import logging 7import os 8import random 9import time 10 11 12from autotest_lib.client.common_lib import base_job, global_config, log 13from autotest_lib.client.common_lib import time_utils 14 15_DEFAULT_POLL_INTERVAL_SECONDS = 30.0 16 17HQE_MAXIMUM_ABORT_RATE_FLOAT = global_config.global_config.get_config_value( 18 'SCHEDULER', 'hqe_maximum_abort_rate_float', type=float, 19 default=0.5) 20 21 22def view_is_relevant(view): 23 """ 24 Indicates whether the view of a given test is meaningful or not. 25 26 @param view: a detailed test 'view' from the TKO DB to look at. 27 @return True if this is a test result worth looking at further. 28 """ 29 return not view['test_name'].startswith('CLIENT_JOB') 30 31 32def view_is_for_suite_job(view): 33 """ 34 Indicates whether the given test view is the view of Suite job. 35 36 @param view: a detailed test 'view' from the TKO DB to look at. 37 @return True if this is view of suite job. 38 """ 39 return view['test_name'] == 'SERVER_JOB' 40 41 42def view_is_for_infrastructure_fail(view): 43 """ 44 Indicates whether the given test view is from an infra fail. 45 46 @param view: a detailed test 'view' from the TKO DB to look at. 47 @return True if this view indicates an infrastructure-side issue during 48 a test. 49 """ 50 return view['test_name'].endswith('SERVER_JOB') 51 52 53def is_for_infrastructure_fail(status): 54 """ 55 Indicates whether the given Status is from an infra fail. 56 57 @param status: the Status object to look at. 58 @return True if this Status indicates an infrastructure-side issue during 59 a test. 60 """ 61 return view_is_for_infrastructure_fail({'test_name': status.test_name}) 62 63 64def _abort_jobs_if_timedout(afe, jobs, start_time, timeout_mins): 65 """ 66 Abort all of the jobs in jobs if the running time has past the timeout. 67 68 @param afe: an instance of AFE as defined in server/frontend.py. 69 @param jobs: an iterable of Running frontend.Jobs 70 @param start_time: Time to compare to the current time to see if a timeout 71 has occurred. 72 @param timeout_mins: Time in minutes to wait before aborting the jobs we 73 are waiting on. 74 75 @returns True if we there was a timeout, False if not. 76 """ 77 if datetime.datetime.utcnow() < (start_time + 78 datetime.timedelta(minutes=timeout_mins)): 79 return False 80 for job in jobs: 81 logging.debug('Job: %s has timed out after %s minutes. Aborting job.', 82 job.id, timeout_mins) 83 afe.run('abort_host_queue_entries', job=job.id) 84 return True 85 86 87def _collate_aborted(current_value, entry): 88 """ 89 reduce() over a list of HostQueueEntries for a job; True if any aborted. 90 91 Functor that can be reduced()ed over a list of 92 HostQueueEntries for a job. If any were aborted 93 (|entry.aborted| exists and is True), then the reduce() will 94 return True. 95 96 Ex: 97 entries = AFE.run('get_host_queue_entries', job=job.id) 98 reduce(_collate_aborted, entries, False) 99 100 @param current_value: the current accumulator (a boolean). 101 @param entry: the current entry under consideration. 102 @return the value of |entry.aborted| if it exists, False if not. 103 """ 104 return current_value or ('aborted' in entry and entry['aborted']) 105 106 107def _status_for_test(status): 108 """ 109 Indicates whether the status of a given test is meaningful or not. 110 111 @param status: frontend.TestStatus object to look at. 112 @return True if this is a test result worth looking at further. 113 """ 114 return not (status.test_name.startswith('SERVER_JOB') or 115 status.test_name.startswith('CLIENT_JOB')) 116 117 118def _yield_job_results(afe, tko, job): 119 """ 120 Yields the results of an individual job. 121 122 Yields one Status object per test. 123 124 @param afe: an instance of AFE as defined in server/frontend.py. 125 @param tko: an instance of TKO as defined in server/frontend.py. 126 @param job: Job object to get results from, as defined in 127 server/frontend.py 128 @yields an iterator of Statuses, one per test. 129 """ 130 entries = afe.run('get_host_queue_entries', job=job.id) 131 132 # This query uses the job id to search through the tko_test_view_2 133 # table, for results of a test with a similar job_tag. The job_tag 134 # is used to store results, and takes the form job_id-owner/host. 135 # Many times when a job aborts during a test, the job_tag actually 136 # exists and the results directory contains valid logs. If the job 137 # was aborted prematurely i.e before it had a chance to create the 138 # job_tag, this query will return no results. When statuses is not 139 # empty it will contain frontend.TestStatus' with fields populated 140 # using the results of the db query. 141 statuses = tko.get_job_test_statuses_from_db(job.id) 142 if not statuses: 143 yield Status('ABORT', job.name) 144 145 # We only care about the SERVER and CLIENT job failures when there 146 # are no test failures. 147 contains_test_failure = any(_status_for_test(s) and s.status != 'GOOD' 148 for s in statuses) 149 for s in statuses: 150 # TKO parser uniquelly identifies a test run by 151 # (test_name, subdir). In dynamic suite, we need to emit 152 # a subdir for each status and make sure (test_name, subdir) 153 # in the suite job's status log is unique. 154 # For non-test status (i.e.SERVER_JOB, CLIENT_JOB), 155 # we use 'job_tag' from tko_test_view_2, which looks like 156 # '1246-owner/172.22.33.44' 157 # For normal test status, we use 'job_tag/subdir' 158 # which looks like '1246-owner/172.22.33.44/my_DummyTest.tag.subdir_tag' 159 if _status_for_test(s): 160 yield Status(s.status, s.test_name, s.reason, 161 s.test_started_time, s.test_finished_time, 162 job.id, job.owner, s.hostname, job.name, 163 subdir=os.path.join(s.job_tag, s.subdir)) 164 else: 165 if s.status != 'GOOD' and not contains_test_failure: 166 yield Status(s.status, 167 '%s_%s' % (entries[0]['job']['name'], 168 s.test_name), 169 s.reason, s.test_started_time, 170 s.test_finished_time, job.id, 171 job.owner, s.hostname, job.name, 172 subdir=s.job_tag) 173 174 175def wait_for_child_results(afe, tko, parent_job_id): 176 """ 177 Wait for results of all tests in jobs with given parent id. 178 179 New jobs could be added by calling send(new_jobs) on the generator. 180 Currently polls for results every 5s. Yields one Status object per test 181 as results become available. 182 183 @param afe: an instance of AFE as defined in server/frontend.py. 184 @param tko: an instance of TKO as defined in server/frontend.py. 185 @param parent_job_id: Parent job id for the jobs to wait on. 186 @yields an iterator of Statuses, one per test. 187 """ 188 remaining_child_jobs = set(job.id for job in 189 afe.get_jobs(parent_job_id=parent_job_id)) 190 while remaining_child_jobs: 191 new_finished_jobs = afe.get_jobs(id__in=list(remaining_child_jobs), 192 finished=True) 193 194 for job in new_finished_jobs: 195 196 remaining_child_jobs.remove(job.id) 197 for result in _yield_job_results(afe, tko, job): 198 # To figure out what new jobs (like retry jobs) have been 199 # created since last iteration, we could re-poll for 200 # the set of child jobs in each iteration and 201 # calculate the set difference against the set we got in 202 # last iteration. As an alternative, we could just make 203 # the caller 'send' new jobs to this generator. We go 204 # with the latter to avoid unnecessary overhead. 205 new_child_jobs = (yield result) 206 if new_child_jobs: 207 remaining_child_jobs.update([new_job.id for new_job in 208 new_child_jobs]) 209 # Return nothing if 'send' is called 210 yield None 211 212 time.sleep(_DEFAULT_POLL_INTERVAL_SECONDS * (random.random() + 0.5)) 213 214 215def wait_for_results(afe, tko, jobs): 216 """ 217 Wait for results of all tests in all jobs in |jobs|. 218 219 New jobs could be added by calling send(new_jobs) on the generator. 220 Currently polls for results every 5s. Yields one Status object per test 221 as results become available. 222 223 @param afe: an instance of AFE as defined in server/frontend.py. 224 @param tko: an instance of TKO as defined in server/frontend.py. 225 @param jobs: a list of Job objects, as defined in server/frontend.py. 226 @yields an iterator of Statuses, one per test. 227 """ 228 local_jobs = list(jobs) 229 while local_jobs: 230 for job in list(local_jobs): 231 if not afe.get_jobs(id=job.id, finished=True): 232 continue 233 234 local_jobs.remove(job) 235 for result in _yield_job_results(afe, tko, job): 236 # The caller can 'send' new jobs (i.e. retry jobs) 237 # to this generator at any time. 238 new_jobs = (yield result) 239 if new_jobs: 240 local_jobs.extend(new_jobs) 241 # Return nothing if 'send' is called 242 yield None 243 244 time.sleep(_DEFAULT_POLL_INTERVAL_SECONDS * (random.random() + 0.5)) 245 246 247class Status(object): 248 """ 249 A class representing a test result. 250 251 Stores all pertinent info about a test result and, given a callable 252 to use, can record start, result, and end info appropriately. 253 254 @var _status: status code, e.g. 'INFO', 'FAIL', etc. 255 @var _test_name: the name of the test whose result this is. 256 @var _reason: message explaining failure, if any. 257 @var _begin_timestamp: when test started (int, in seconds since the epoch). 258 @var _end_timestamp: when test finished (int, in seconds since the epoch). 259 @var _id: the ID of the job that generated this Status. 260 @var _owner: the owner of the job that generated this Status. 261 262 @var STATUS_MAP: a dict mapping host queue entry status strings to canonical 263 status codes; e.g. 'Aborted' -> 'ABORT' 264 """ 265 _status = None 266 _test_name = None 267 _reason = None 268 _begin_timestamp = None 269 _end_timestamp = None 270 271 # Queued status can occur if the try job just aborted due to not completing 272 # reimaging for all machines. The Queued corresponds to an 'ABORT'. 273 STATUS_MAP = {'Failed': 'FAIL', 'Aborted': 'ABORT', 'Completed': 'GOOD', 274 'Queued' : 'ABORT'} 275 276 class sle(base_job.status_log_entry): 277 """ 278 Thin wrapper around status_log_entry that supports stringification. 279 """ 280 def __str__(self): 281 return self.render() 282 283 def __repr__(self): 284 return self.render() 285 286 287 def __init__(self, status, test_name, reason='', begin_time_str=None, 288 end_time_str=None, job_id=None, owner=None, hostname=None, 289 job_name='', subdir=None): 290 """ 291 Constructor 292 293 @param status: status code, e.g. 'INFO', 'FAIL', etc. 294 @param test_name: the name of the test whose result this is. 295 @param reason: message explaining failure, if any; Optional. 296 @param begin_time_str: when test started (in time_utils.TIME_FMT); 297 now() if None or 'None'. 298 @param end_time_str: when test finished (in time_utils.TIME_FMT); 299 now() if None or 'None'. 300 @param job_id: the ID of the job that generated this Status. 301 @param owner: the owner of the job that generated this Status. 302 @param hostname: The name of the host the test that generated this 303 result ran on. 304 @param job_name: The job name; Contains the test name with/without the 305 experimental prefix, the tag and the build. 306 @param subdir: The result directory of the test. It will be recorded 307 as the subdir in the status.log file. 308 """ 309 self._status = status 310 self._test_name = test_name 311 self._reason = reason 312 self._id = job_id 313 self._owner = owner 314 self._hostname = hostname 315 self._job_name = job_name 316 self._subdir = subdir 317 # Autoserv drops a keyval of the started time which eventually makes its 318 # way here. Therefore, if we have a starting time, we may assume that 319 # the test reached Running and actually began execution on a drone. 320 self._test_executed = begin_time_str and begin_time_str != 'None' 321 322 if begin_time_str and begin_time_str != 'None': 323 self._begin_timestamp = int(time.mktime( 324 datetime.datetime.strptime( 325 begin_time_str, time_utils.TIME_FMT).timetuple())) 326 else: 327 self._begin_timestamp = int(time.time()) 328 329 if end_time_str and end_time_str != 'None': 330 self._end_timestamp = int(time.mktime( 331 datetime.datetime.strptime( 332 end_time_str, time_utils.TIME_FMT).timetuple())) 333 else: 334 self._end_timestamp = int(time.time()) 335 336 337 def is_good(self): 338 """ Returns true if status is good. """ 339 return self._status == 'GOOD' 340 341 342 def is_warn(self): 343 """ Returns true if status is warn. """ 344 return self._status == 'WARN' 345 346 347 def is_testna(self): 348 """ Returns true if status is TEST_NA """ 349 return self._status == 'TEST_NA' 350 351 352 def is_worse_than(self, candidate): 353 """ 354 Return whether |self| represents a "worse" failure than |candidate|. 355 356 "Worse" is defined the same as it is for log message purposes in 357 common_lib/log.py. We also consider status with a specific error 358 message to represent a "worse" failure than one without. 359 360 @param candidate: a Status instance to compare to this one. 361 @return True if |self| is "worse" than |candidate|. 362 """ 363 if self._status != candidate._status: 364 return (log.job_statuses.index(self._status) < 365 log.job_statuses.index(candidate._status)) 366 # else, if the statuses are the same... 367 if self._reason and not candidate._reason: 368 return True 369 return False 370 371 372 def record_start(self, record_entry): 373 """ 374 Use record_entry to log message about start of test. 375 376 @param record_entry: a callable to use for logging. 377 prototype: 378 record_entry(base_job.status_log_entry) 379 """ 380 log_entry = Status.sle('START', self._subdir, 381 self._test_name, '', 382 None, self._begin_timestamp) 383 record_entry(log_entry, log_in_subdir=False) 384 385 386 def record_result(self, record_entry): 387 """ 388 Use record_entry to log message about result of test. 389 390 @param record_entry: a callable to use for logging. 391 prototype: 392 record_entry(base_job.status_log_entry) 393 """ 394 log_entry = Status.sle(self._status, self._subdir, 395 self._test_name, self._reason, None, 396 self._end_timestamp) 397 record_entry(log_entry, log_in_subdir=False) 398 399 400 def record_end(self, record_entry): 401 """ 402 Use record_entry to log message about end of test. 403 404 @param record_entry: a callable to use for logging. 405 prototype: 406 record_entry(base_job.status_log_entry) 407 """ 408 log_entry = Status.sle('END %s' % self._status, self._subdir, 409 self._test_name, '', None, self._end_timestamp) 410 record_entry(log_entry, log_in_subdir=False) 411 412 413 def record_all(self, record_entry): 414 """ 415 Use record_entry to log all messages about test results. 416 417 @param record_entry: a callable to use for logging. 418 prototype: 419 record_entry(base_job.status_log_entry) 420 """ 421 self.record_start(record_entry) 422 self.record_result(record_entry) 423 self.record_end(record_entry) 424 425 426 def override_status(self, override): 427 """ 428 Override the _status field of this Status. 429 430 @param override: value with which to override _status. 431 """ 432 self._status = override 433 434 435 @property 436 def test_name(self): 437 """ Name of the test this status corresponds to. """ 438 return self._test_name 439 440 441 @test_name.setter 442 def test_name(self, value): 443 """ 444 Test name setter. 445 446 @param value: The test name. 447 """ 448 self._test_name = value 449 450 451 @property 452 def id(self): 453 """ Id of the job that corresponds to this status. """ 454 return self._id 455 456 457 @property 458 def owner(self): 459 """ Owner of the job that corresponds to this status. """ 460 return self._owner 461 462 463 @property 464 def hostname(self): 465 """ Host the job corresponding to this status ran on. """ 466 return self._hostname 467 468 469 @property 470 def reason(self): 471 """ Reason the job corresponding to this status failed. """ 472 return self._reason 473 474 475 @property 476 def test_executed(self): 477 """ If the test reached running an autoserv instance or not. """ 478 return self._test_executed 479 480 @property 481 def subdir(self): 482 """Subdir of test this status corresponds to.""" 483 return self._subdir 484