1#!/usr/bin/python -u 2 3import datetime 4import json 5import os, sys, optparse, fcntl, errno, traceback, socket 6 7import common 8from autotest_lib.client.common_lib import mail, pidfile 9from autotest_lib.client.common_lib import utils 10from autotest_lib.frontend import setup_django_environment 11from autotest_lib.frontend.tko import models as tko_models 12from autotest_lib.server.cros.dynamic_suite import constants 13from autotest_lib.site_utils import job_overhead 14from autotest_lib.tko import db as tko_db, utils as tko_utils 15from autotest_lib.tko import models, status_lib 16from autotest_lib.tko.perf_upload import perf_uploader 17 18 19def parse_args(): 20 """Parse args.""" 21 # build up our options parser and parse sys.argv 22 parser = optparse.OptionParser() 23 parser.add_option("-m", help="Send mail for FAILED tests", 24 dest="mailit", action="store_true") 25 parser.add_option("-r", help="Reparse the results of a job", 26 dest="reparse", action="store_true") 27 parser.add_option("-o", help="Parse a single results directory", 28 dest="singledir", action="store_true") 29 parser.add_option("-l", help=("Levels of subdirectories to include " 30 "in the job name"), 31 type="int", dest="level", default=1) 32 parser.add_option("-n", help="No blocking on an existing parse", 33 dest="noblock", action="store_true") 34 parser.add_option("-s", help="Database server hostname", 35 dest="db_host", action="store") 36 parser.add_option("-u", help="Database username", dest="db_user", 37 action="store") 38 parser.add_option("-p", help="Database password", dest="db_pass", 39 action="store") 40 parser.add_option("-d", help="Database name", dest="db_name", 41 action="store") 42 parser.add_option("--write-pidfile", 43 help="write pidfile (.parser_execute)", 44 dest="write_pidfile", action="store_true", 45 default=False) 46 parser.add_option("--record-duration", 47 help="Record timing to metadata db", 48 dest="record_duration", action="store_true", 49 default=False) 50 options, args = parser.parse_args() 51 52 # we need a results directory 53 if len(args) == 0: 54 tko_utils.dprint("ERROR: at least one results directory must " 55 "be provided") 56 parser.print_help() 57 sys.exit(1) 58 59 # pass the options back 60 return options, args 61 62 63def format_failure_message(jobname, kernel, testname, status, reason): 64 """Format failure message with the given information. 65 66 @param jobname: String representing the job name. 67 @param kernel: String representing the kernel. 68 @param testname: String representing the test name. 69 @param status: String representing the test status. 70 @param reason: String representing the reason. 71 72 @return: Failure message as a string. 73 """ 74 format_string = "%-12s %-20s %-12s %-10s %s" 75 return format_string % (jobname, kernel, testname, status, reason) 76 77 78def mailfailure(jobname, job, message): 79 """Send an email about the failure. 80 81 @param jobname: String representing the job name. 82 @param job: A job object. 83 @param message: The message to mail. 84 """ 85 message_lines = [""] 86 message_lines.append("The following tests FAILED for this job") 87 message_lines.append("http://%s/results/%s" % 88 (socket.gethostname(), jobname)) 89 message_lines.append("") 90 message_lines.append(format_failure_message("Job name", "Kernel", 91 "Test name", "FAIL/WARN", 92 "Failure reason")) 93 message_lines.append(format_failure_message("=" * 8, "=" * 6, "=" * 8, 94 "=" * 8, "=" * 14)) 95 message_header = "\n".join(message_lines) 96 97 subject = "AUTOTEST: FAILED tests from job %s" % jobname 98 mail.send("", job.user, "", subject, message_header + message) 99 100 101def _invalidate_original_tests(orig_job_idx, retry_job_idx): 102 """Retry tests invalidates original tests. 103 104 Whenever a retry job is complete, we want to invalidate the original 105 job's test results, such that the consumers of the tko database 106 (e.g. tko frontend, wmatrix) could figure out which results are the latest. 107 108 When a retry job is parsed, we retrieve the original job's afe_job_id 109 from the retry job's keyvals, which is then converted to tko job_idx and 110 passed into this method as |orig_job_idx|. 111 112 In this method, we are going to invalidate the rows in tko_tests that are 113 associated with the original job by flipping their 'invalid' bit to True. 114 In addition, in tko_tests, we also maintain a pointer from the retry results 115 to the original results, so that later we can always know which rows in 116 tko_tests are retries and which are the corresponding original results. 117 This is done by setting the field 'invalidates_test_idx' of the tests 118 associated with the retry job. 119 120 For example, assume Job(job_idx=105) are retried by Job(job_idx=108), after 121 this method is run, their tko_tests rows will look like: 122 __________________________________________________________________________ 123 test_idx| job_idx | test | ... | invalid | invalidates_test_idx 124 10 | 105 | dummy_Fail.Error| ... | 1 | NULL 125 11 | 105 | dummy_Fail.Fail | ... | 1 | NULL 126 ... 127 20 | 108 | dummy_Fail.Error| ... | 0 | 10 128 21 | 108 | dummy_Fail.Fail | ... | 0 | 11 129 __________________________________________________________________________ 130 Note the invalid bits of the rows for Job(job_idx=105) are set to '1'. 131 And the 'invalidates_test_idx' fields of the rows for Job(job_idx=108) 132 are set to 10 and 11 (the test_idx of the rows for the original job). 133 134 @param orig_job_idx: An integer representing the original job's 135 tko job_idx. Tests associated with this job will 136 be marked as 'invalid'. 137 @param retry_job_idx: An integer representing the retry job's 138 tko job_idx. The field 'invalidates_test_idx' 139 of the tests associated with this job will be updated. 140 141 """ 142 msg = 'orig_job_idx: %s, retry_job_idx: %s' % (orig_job_idx, retry_job_idx) 143 if not orig_job_idx or not retry_job_idx: 144 tko_utils.dprint('ERROR: Could not invalidate tests: ' + msg) 145 # Using django models here makes things easier, but make sure that 146 # before this method is called, all other relevant transactions have been 147 # committed to avoid race condition. In the long run, we might consider 148 # to make the rest of parser use django models. 149 orig_tests = tko_models.Test.objects.filter(job__job_idx=orig_job_idx) 150 retry_tests = tko_models.Test.objects.filter(job__job_idx=retry_job_idx) 151 152 # Invalidate original tests. 153 orig_tests.update(invalid=True) 154 155 # Maintain a dictionary that maps (test, subdir) to original tests. 156 # Note that within the scope of a job, (test, subdir) uniquelly 157 # identifies a test run, but 'test' does not. 158 # In a control file, one could run the same test with different 159 # 'subdir_tag', for example, 160 # job.run_test('dummy_Fail', tag='Error', subdir_tag='subdir_1') 161 # job.run_test('dummy_Fail', tag='Error', subdir_tag='subdir_2') 162 # In tko, we will get 163 # (test='dummy_Fail.Error', subdir='dummy_Fail.Error.subdir_1') 164 # (test='dummy_Fail.Error', subdir='dummy_Fail.Error.subdir_2') 165 invalidated_tests = {(orig_test.test, orig_test.subdir): orig_test 166 for orig_test in orig_tests} 167 for retry in retry_tests: 168 # It is possible that (retry.test, retry.subdir) doesn't exist 169 # in invalidated_tests. This could happen when the original job 170 # didn't run some of its tests. For example, a dut goes offline 171 # since the beginning of the job, in which case invalidated_tests 172 # will only have one entry for 'SERVER_JOB'. 173 orig_test = invalidated_tests.get((retry.test, retry.subdir), None) 174 if orig_test: 175 retry.invalidates_test = orig_test 176 retry.save() 177 tko_utils.dprint('DEBUG: Invalidated tests associated to job: ' + msg) 178 179 180def parse_one(db, jobname, path, reparse, mail_on_failure): 181 """Parse a single job. Optionally send email on failure. 182 183 @param db: database object. 184 @param jobname: the tag used to search for existing job in db, 185 e.g. '1234-chromeos-test/host1' 186 @param path: The path to the results to be parsed. 187 @param reparse: True/False, whether this is reparsing of the job. 188 @param mail_on_failure: whether to send email on FAILED test. 189 190 191 """ 192 tko_utils.dprint("\nScanning %s (%s)" % (jobname, path)) 193 old_job_idx = db.find_job(jobname) 194 # old tests is a dict from tuple (test_name, subdir) to test_idx 195 old_tests = {} 196 if old_job_idx is not None: 197 if not reparse: 198 tko_utils.dprint("! Job is already parsed, done") 199 return 200 201 raw_old_tests = db.select("test_idx,subdir,test", "tko_tests", 202 {"job_idx": old_job_idx}) 203 if raw_old_tests: 204 old_tests = dict(((test, subdir), test_idx) 205 for test_idx, subdir, test in raw_old_tests) 206 207 # look up the status version 208 job_keyval = models.job.read_keyval(path) 209 status_version = job_keyval.get("status_version", 0) 210 211 # parse out the job 212 parser = status_lib.parser(status_version) 213 job = parser.make_job(path) 214 status_log = os.path.join(path, "status.log") 215 if not os.path.exists(status_log): 216 status_log = os.path.join(path, "status") 217 if not os.path.exists(status_log): 218 tko_utils.dprint("! Unable to parse job, no status file") 219 return 220 221 # parse the status logs 222 tko_utils.dprint("+ Parsing dir=%s, jobname=%s" % (path, jobname)) 223 status_lines = open(status_log).readlines() 224 parser.start(job) 225 tests = parser.end(status_lines) 226 227 # parser.end can return the same object multiple times, so filter out dups 228 job.tests = [] 229 already_added = set() 230 for test in tests: 231 if test not in already_added: 232 already_added.add(test) 233 job.tests.append(test) 234 235 # try and port test_idx over from the old tests, but if old tests stop 236 # matching up with new ones just give up 237 if reparse and old_job_idx is not None: 238 job.index = old_job_idx 239 for test in job.tests: 240 test_idx = old_tests.pop((test.testname, test.subdir), None) 241 if test_idx is not None: 242 test.test_idx = test_idx 243 else: 244 tko_utils.dprint("! Reparse returned new test " 245 "testname=%r subdir=%r" % 246 (test.testname, test.subdir)) 247 for test_idx in old_tests.itervalues(): 248 where = {'test_idx' : test_idx} 249 db.delete('tko_iteration_result', where) 250 db.delete('tko_iteration_perf_value', where) 251 db.delete('tko_iteration_attributes', where) 252 db.delete('tko_test_attributes', where) 253 db.delete('tko_test_labels_tests', {'test_id': test_idx}) 254 db.delete('tko_tests', where) 255 256 # check for failures 257 message_lines = [""] 258 job_successful = True 259 for test in job.tests: 260 if not test.subdir: 261 continue 262 tko_utils.dprint("* testname, status, reason: %s %s %s" 263 % (test.subdir, test.status, test.reason)) 264 if test.status != 'GOOD': 265 job_successful = False 266 message_lines.append(format_failure_message( 267 jobname, test.kernel.base, test.subdir, 268 test.status, test.reason)) 269 270 message = "\n".join(message_lines) 271 272 # send out a email report of failure 273 if len(message) > 2 and mail_on_failure: 274 tko_utils.dprint("Sending email report of failure on %s to %s" 275 % (jobname, job.user)) 276 mailfailure(jobname, job, message) 277 278 # write the job into the database. 279 db.insert_job(jobname, job, 280 parent_job_id=job_keyval.get(constants.PARENT_JOB_ID, None)) 281 282 # Upload perf values to the perf dashboard, if applicable. 283 for test in job.tests: 284 perf_uploader.upload_test(job, test) 285 286 # Although the cursor has autocommit, we still need to force it to commit 287 # existing changes before we can use django models, otherwise it 288 # will go into deadlock when django models try to start a new trasaction 289 # while the current one has not finished yet. 290 db.commit() 291 292 # Handle retry job. 293 orig_afe_job_id = job_keyval.get(constants.RETRY_ORIGINAL_JOB_ID, None) 294 if orig_afe_job_id: 295 orig_job_idx = tko_models.Job.objects.get( 296 afe_job_id=orig_afe_job_id).job_idx 297 _invalidate_original_tests(orig_job_idx, job.index) 298 299 # Serializing job into a binary file 300 try: 301 from autotest_lib.tko import tko_pb2 302 from autotest_lib.tko import job_serializer 303 304 serializer = job_serializer.JobSerializer() 305 binary_file_name = os.path.join(path, "job.serialize") 306 serializer.serialize_to_binary(job, jobname, binary_file_name) 307 308 if reparse: 309 site_export_file = "autotest_lib.tko.site_export" 310 site_export = utils.import_site_function(__file__, 311 site_export_file, 312 "site_export", 313 _site_export_dummy) 314 site_export(binary_file_name) 315 316 except ImportError: 317 tko_utils.dprint("DEBUG: tko_pb2.py doesn't exist. Create by " 318 "compiling tko/tko.proto.") 319 320 db.commit() 321 322 # Mark GS_OFFLOADER_NO_OFFLOAD in gs_offloader_instructions at the end of 323 # the function, so any failure, e.g., db connection error, will stop 324 # gs_offloader_instructions being updated, and logs can be uploaded for 325 # troubleshooting. 326 if job_successful: 327 # Check if we should not offload this test's results. 328 if job_keyval.get(constants.JOB_OFFLOAD_FAILURES_KEY, False): 329 # Update the gs_offloader_instructions json file. 330 gs_instructions_file = os.path.join( 331 path, constants.GS_OFFLOADER_INSTRUCTIONS) 332 gs_offloader_instructions = {} 333 if os.path.exists(gs_instructions_file): 334 with open(gs_instructions_file, 'r') as f: 335 gs_offloader_instructions = json.load(f) 336 337 gs_offloader_instructions[constants.GS_OFFLOADER_NO_OFFLOAD] = True 338 with open(gs_instructions_file, 'w') as f: 339 json.dump(gs_offloader_instructions, f) 340 341 342def _site_export_dummy(binary_file_name): 343 pass 344 345 346def _get_job_subdirs(path): 347 """ 348 Returns a list of job subdirectories at path. Returns None if the test 349 is itself a job directory. Does not recurse into the subdirs. 350 """ 351 # if there's a .machines file, use it to get the subdirs 352 machine_list = os.path.join(path, ".machines") 353 if os.path.exists(machine_list): 354 subdirs = set(line.strip() for line in file(machine_list)) 355 existing_subdirs = set(subdir for subdir in subdirs 356 if os.path.exists(os.path.join(path, subdir))) 357 if len(existing_subdirs) != 0: 358 return existing_subdirs 359 360 # if this dir contains ONLY subdirectories, return them 361 contents = set(os.listdir(path)) 362 contents.discard(".parse.lock") 363 subdirs = set(sub for sub in contents if 364 os.path.isdir(os.path.join(path, sub))) 365 if len(contents) == len(subdirs) != 0: 366 return subdirs 367 368 # this is a job directory, or something else we don't understand 369 return None 370 371 372def parse_leaf_path(db, path, level, reparse, mail_on_failure): 373 """Parse a leaf path. 374 375 @param db: database handle. 376 @param path: The path to the results to be parsed. 377 @param level: Integer, level of subdirectories to include in the job name. 378 @param reparse: True/False, whether this is reparsing of the job. 379 @param mail_on_failure: whether to send email on FAILED test. 380 381 @returns: The job name of the parsed job, e.g. '123-chromeos-test/host1' 382 """ 383 job_elements = path.split("/")[-level:] 384 jobname = "/".join(job_elements) 385 try: 386 db.run_with_retry(parse_one, db, jobname, path, reparse, 387 mail_on_failure) 388 except Exception: 389 traceback.print_exc() 390 return jobname 391 392 393def parse_path(db, path, level, reparse, mail_on_failure): 394 """Parse a path 395 396 @param db: database handle. 397 @param path: The path to the results to be parsed. 398 @param level: Integer, level of subdirectories to include in the job name. 399 @param reparse: True/False, whether this is reparsing of the job. 400 @param mail_on_failure: whether to send email on FAILED test. 401 402 @returns: A set of job names of the parsed jobs. 403 set(['123-chromeos-test/host1', '123-chromeos-test/host2']) 404 """ 405 processed_jobs = set() 406 job_subdirs = _get_job_subdirs(path) 407 if job_subdirs is not None: 408 # parse status.log in current directory, if it exists. multi-machine 409 # synchronous server side tests record output in this directory. without 410 # this check, we do not parse these results. 411 if os.path.exists(os.path.join(path, 'status.log')): 412 new_job = parse_leaf_path(db, path, level, reparse, mail_on_failure) 413 processed_jobs.add(new_job) 414 # multi-machine job 415 for subdir in job_subdirs: 416 jobpath = os.path.join(path, subdir) 417 new_jobs = parse_path(db, jobpath, level + 1, reparse, mail_on_failure) 418 processed_jobs.update(new_jobs) 419 else: 420 # single machine job 421 new_job = parse_leaf_path(db, path, level, reparse, mail_on_failure) 422 processed_jobs.add(new_job) 423 return processed_jobs 424 425 426def record_parsing(processed_jobs, duration_secs): 427 """Record the time spent on parsing to metadata db. 428 429 @param processed_jobs: A set of job names of the parsed jobs. 430 set(['123-chromeos-test/host1', '123-chromeos-test/host2']) 431 @param duration_secs: Total time spent on parsing, in seconds. 432 """ 433 434 for job_name in processed_jobs: 435 job_id, hostname = tko_utils.get_afe_job_id_and_hostname(job_name) 436 if not job_id or not hostname: 437 tko_utils.dprint('ERROR: can not parse job name %s, ' 438 'will not send duration to metadata db.' 439 % job_name) 440 continue 441 else: 442 job_overhead.record_state_duration( 443 job_id, hostname, job_overhead.STATUS.PARSING, 444 duration_secs) 445 446 447def main(): 448 """Main entrance.""" 449 start_time = datetime.datetime.now() 450 # Record the processed jobs so that 451 # we can send the duration of parsing to metadata db. 452 processed_jobs = set() 453 454 options, args = parse_args() 455 results_dir = os.path.abspath(args[0]) 456 assert os.path.exists(results_dir) 457 458 pid_file_manager = pidfile.PidFileManager("parser", results_dir) 459 460 if options.write_pidfile: 461 pid_file_manager.open_file() 462 463 try: 464 # build up the list of job dirs to parse 465 if options.singledir: 466 jobs_list = [results_dir] 467 else: 468 jobs_list = [os.path.join(results_dir, subdir) 469 for subdir in os.listdir(results_dir)] 470 471 # build up the database 472 db = tko_db.db(autocommit=False, host=options.db_host, 473 user=options.db_user, password=options.db_pass, 474 database=options.db_name) 475 476 # parse all the jobs 477 for path in jobs_list: 478 lockfile = open(os.path.join(path, ".parse.lock"), "w") 479 flags = fcntl.LOCK_EX 480 if options.noblock: 481 flags |= fcntl.LOCK_NB 482 try: 483 fcntl.flock(lockfile, flags) 484 except IOError, e: 485 # lock is not available and nonblock has been requested 486 if e.errno == errno.EWOULDBLOCK: 487 lockfile.close() 488 continue 489 else: 490 raise # something unexpected happened 491 try: 492 new_jobs = parse_path(db, path, options.level, options.reparse, 493 options.mailit) 494 processed_jobs.update(new_jobs) 495 496 finally: 497 fcntl.flock(lockfile, fcntl.LOCK_UN) 498 lockfile.close() 499 500 except: 501 pid_file_manager.close_file(1) 502 raise 503 else: 504 pid_file_manager.close_file(0) 505 duration_secs = (datetime.datetime.now() - start_time).total_seconds() 506 if options.record_duration: 507 record_parsing(processed_jobs, duration_secs) 508 509 510if __name__ == "__main__": 511 main() 512