1#!/usr/bin/python -u 2 3import collections 4import datetime 5import errno 6import fcntl 7import json 8import optparse 9import os 10import socket 11import subprocess 12import sys 13import traceback 14 15import common 16from autotest_lib.client.common_lib import global_config 17from autotest_lib.client.common_lib import mail, pidfile 18from autotest_lib.client.common_lib import utils 19from autotest_lib.client.common_lib.cros.graphite import autotest_es 20from autotest_lib.frontend import setup_django_environment 21from autotest_lib.frontend.tko import models as tko_models 22from autotest_lib.server import site_utils 23from autotest_lib.server.cros.dynamic_suite import constants 24from autotest_lib.site_utils import job_overhead 25from autotest_lib.site_utils.sponge_lib import sponge_utils 26from autotest_lib.tko import db as tko_db, utils as tko_utils 27from autotest_lib.tko import models, parser_lib 28from autotest_lib.tko.perf_upload import perf_uploader 29 30_ParseOptions = collections.namedtuple( 31 'ParseOptions', ['reparse', 'mail_on_failure', 'dry_run', 'suite_report', 32 'datastore_creds', 'export_to_gcloud_path']) 33 34def parse_args(): 35 """Parse args.""" 36 # build up our options parser and parse sys.argv 37 parser = optparse.OptionParser() 38 parser.add_option("-m", help="Send mail for FAILED tests", 39 dest="mailit", action="store_true") 40 parser.add_option("-r", help="Reparse the results of a job", 41 dest="reparse", action="store_true") 42 parser.add_option("-o", help="Parse a single results directory", 43 dest="singledir", action="store_true") 44 parser.add_option("-l", help=("Levels of subdirectories to include " 45 "in the job name"), 46 type="int", dest="level", default=1) 47 parser.add_option("-n", help="No blocking on an existing parse", 48 dest="noblock", action="store_true") 49 parser.add_option("-s", help="Database server hostname", 50 dest="db_host", action="store") 51 parser.add_option("-u", help="Database username", dest="db_user", 52 action="store") 53 parser.add_option("-p", help="Database password", dest="db_pass", 54 action="store") 55 parser.add_option("-d", help="Database name", dest="db_name", 56 action="store") 57 parser.add_option("--dry-run", help="Do not actually commit any results.", 58 dest="dry_run", action="store_true", default=False) 59 parser.add_option("--write-pidfile", 60 help="write pidfile (.parser_execute)", 61 dest="write_pidfile", action="store_true", 62 default=False) 63 parser.add_option("--record-duration", 64 help="Record timing to metadata db", 65 dest="record_duration", action="store_true", 66 default=False) 67 parser.add_option("--suite-report", 68 help=("Allows parsing job to attempt to create a suite " 69 "timeline report, if it detects that the job being " 70 "parsed is a suite job."), 71 dest="suite_report", action="store_true", 72 default=False) 73 parser.add_option("--datastore-creds", 74 help=("The path to gcloud datastore credentials file, " 75 "which will be used to upload suite timeline " 76 "report to gcloud. If not specified, the one " 77 "defined in shadow_config will be used."), 78 dest="datastore_creds", action="store", default=None) 79 parser.add_option("--export-to-gcloud-path", 80 help=("The path to export_to_gcloud script. Please find " 81 "chromite path on your server. The script is under " 82 "chromite/bin/."), 83 dest="export_to_gcloud_path", action="store", 84 default=None) 85 options, args = parser.parse_args() 86 87 # we need a results directory 88 if len(args) == 0: 89 tko_utils.dprint("ERROR: at least one results directory must " 90 "be provided") 91 parser.print_help() 92 sys.exit(1) 93 94 if not options.datastore_creds: 95 gcloud_creds = global_config.global_config.get_config_value( 96 'GCLOUD', 'cidb_datastore_writer_creds', default=None) 97 options.datastore_creds = (site_utils.get_creds_abspath(gcloud_creds) 98 if gcloud_creds else None) 99 100 if not options.export_to_gcloud_path: 101 export_script = 'chromiumos/chromite/bin/export_to_gcloud' 102 # If it is a lab server, the script is under ~chromeos-test/ 103 if os.path.exists(os.path.expanduser('~chromeos-test/%s' % 104 export_script)): 105 path = os.path.expanduser('~chromeos-test/%s' % export_script) 106 # If it is a local workstation, it is probably under ~/ 107 elif os.path.exists(os.path.expanduser('~/%s' % export_script)): 108 path = os.path.expanduser('~/%s' % export_script) 109 # If it is not found anywhere, the default will be set to None. 110 else: 111 path = None 112 options.export_to_gcloud_path = path 113 114 # pass the options back 115 return options, args 116 117 118def format_failure_message(jobname, kernel, testname, status, reason): 119 """Format failure message with the given information. 120 121 @param jobname: String representing the job name. 122 @param kernel: String representing the kernel. 123 @param testname: String representing the test name. 124 @param status: String representing the test status. 125 @param reason: String representing the reason. 126 127 @return: Failure message as a string. 128 """ 129 format_string = "%-12s %-20s %-12s %-10s %s" 130 return format_string % (jobname, kernel, testname, status, reason) 131 132 133def mailfailure(jobname, job, message): 134 """Send an email about the failure. 135 136 @param jobname: String representing the job name. 137 @param job: A job object. 138 @param message: The message to mail. 139 """ 140 message_lines = [""] 141 message_lines.append("The following tests FAILED for this job") 142 message_lines.append("http://%s/results/%s" % 143 (socket.gethostname(), jobname)) 144 message_lines.append("") 145 message_lines.append(format_failure_message("Job name", "Kernel", 146 "Test name", "FAIL/WARN", 147 "Failure reason")) 148 message_lines.append(format_failure_message("=" * 8, "=" * 6, "=" * 8, 149 "=" * 8, "=" * 14)) 150 message_header = "\n".join(message_lines) 151 152 subject = "AUTOTEST: FAILED tests from job %s" % jobname 153 mail.send("", job.user, "", subject, message_header + message) 154 155 156def _invalidate_original_tests(orig_job_idx, retry_job_idx): 157 """Retry tests invalidates original tests. 158 159 Whenever a retry job is complete, we want to invalidate the original 160 job's test results, such that the consumers of the tko database 161 (e.g. tko frontend, wmatrix) could figure out which results are the latest. 162 163 When a retry job is parsed, we retrieve the original job's afe_job_id 164 from the retry job's keyvals, which is then converted to tko job_idx and 165 passed into this method as |orig_job_idx|. 166 167 In this method, we are going to invalidate the rows in tko_tests that are 168 associated with the original job by flipping their 'invalid' bit to True. 169 In addition, in tko_tests, we also maintain a pointer from the retry results 170 to the original results, so that later we can always know which rows in 171 tko_tests are retries and which are the corresponding original results. 172 This is done by setting the field 'invalidates_test_idx' of the tests 173 associated with the retry job. 174 175 For example, assume Job(job_idx=105) are retried by Job(job_idx=108), after 176 this method is run, their tko_tests rows will look like: 177 __________________________________________________________________________ 178 test_idx| job_idx | test | ... | invalid | invalidates_test_idx 179 10 | 105 | dummy_Fail.Error| ... | 1 | NULL 180 11 | 105 | dummy_Fail.Fail | ... | 1 | NULL 181 ... 182 20 | 108 | dummy_Fail.Error| ... | 0 | 10 183 21 | 108 | dummy_Fail.Fail | ... | 0 | 11 184 __________________________________________________________________________ 185 Note the invalid bits of the rows for Job(job_idx=105) are set to '1'. 186 And the 'invalidates_test_idx' fields of the rows for Job(job_idx=108) 187 are set to 10 and 11 (the test_idx of the rows for the original job). 188 189 @param orig_job_idx: An integer representing the original job's 190 tko job_idx. Tests associated with this job will 191 be marked as 'invalid'. 192 @param retry_job_idx: An integer representing the retry job's 193 tko job_idx. The field 'invalidates_test_idx' 194 of the tests associated with this job will be updated. 195 196 """ 197 msg = 'orig_job_idx: %s, retry_job_idx: %s' % (orig_job_idx, retry_job_idx) 198 if not orig_job_idx or not retry_job_idx: 199 tko_utils.dprint('ERROR: Could not invalidate tests: ' + msg) 200 # Using django models here makes things easier, but make sure that 201 # before this method is called, all other relevant transactions have been 202 # committed to avoid race condition. In the long run, we might consider 203 # to make the rest of parser use django models. 204 orig_tests = tko_models.Test.objects.filter(job__job_idx=orig_job_idx) 205 retry_tests = tko_models.Test.objects.filter(job__job_idx=retry_job_idx) 206 207 # Invalidate original tests. 208 orig_tests.update(invalid=True) 209 210 # Maintain a dictionary that maps (test, subdir) to original tests. 211 # Note that within the scope of a job, (test, subdir) uniquelly 212 # identifies a test run, but 'test' does not. 213 # In a control file, one could run the same test with different 214 # 'subdir_tag', for example, 215 # job.run_test('dummy_Fail', tag='Error', subdir_tag='subdir_1') 216 # job.run_test('dummy_Fail', tag='Error', subdir_tag='subdir_2') 217 # In tko, we will get 218 # (test='dummy_Fail.Error', subdir='dummy_Fail.Error.subdir_1') 219 # (test='dummy_Fail.Error', subdir='dummy_Fail.Error.subdir_2') 220 invalidated_tests = {(orig_test.test, orig_test.subdir): orig_test 221 for orig_test in orig_tests} 222 for retry in retry_tests: 223 # It is possible that (retry.test, retry.subdir) doesn't exist 224 # in invalidated_tests. This could happen when the original job 225 # didn't run some of its tests. For example, a dut goes offline 226 # since the beginning of the job, in which case invalidated_tests 227 # will only have one entry for 'SERVER_JOB'. 228 orig_test = invalidated_tests.get((retry.test, retry.subdir), None) 229 if orig_test: 230 retry.invalidates_test = orig_test 231 retry.save() 232 tko_utils.dprint('DEBUG: Invalidated tests associated to job: ' + msg) 233 234 235def parse_one(db, jobname, path, parse_options): 236 """Parse a single job. Optionally send email on failure. 237 238 @param db: database object. 239 @param jobname: the tag used to search for existing job in db, 240 e.g. '1234-chromeos-test/host1' 241 @param path: The path to the results to be parsed. 242 @param parse_options: _ParseOptions instance. 243 """ 244 reparse = parse_options.reparse 245 mail_on_failure = parse_options.mail_on_failure 246 dry_run = parse_options.dry_run 247 suite_report = parse_options.suite_report 248 datastore_creds = parse_options.datastore_creds 249 export_to_gcloud_path = parse_options.export_to_gcloud_path 250 251 tko_utils.dprint("\nScanning %s (%s)" % (jobname, path)) 252 old_job_idx = db.find_job(jobname) 253 # old tests is a dict from tuple (test_name, subdir) to test_idx 254 old_tests = {} 255 if old_job_idx is not None: 256 if not reparse: 257 tko_utils.dprint("! Job is already parsed, done") 258 return 259 260 raw_old_tests = db.select("test_idx,subdir,test", "tko_tests", 261 {"job_idx": old_job_idx}) 262 if raw_old_tests: 263 old_tests = dict(((test, subdir), test_idx) 264 for test_idx, subdir, test in raw_old_tests) 265 266 # look up the status version 267 job_keyval = models.job.read_keyval(path) 268 status_version = job_keyval.get("status_version", 0) 269 270 # parse out the job 271 parser = parser_lib.parser(status_version) 272 job = parser.make_job(path) 273 status_log = os.path.join(path, "status.log") 274 if not os.path.exists(status_log): 275 status_log = os.path.join(path, "status") 276 if not os.path.exists(status_log): 277 tko_utils.dprint("! Unable to parse job, no status file") 278 return 279 280 # parse the status logs 281 tko_utils.dprint("+ Parsing dir=%s, jobname=%s" % (path, jobname)) 282 status_lines = open(status_log).readlines() 283 parser.start(job) 284 tests = parser.end(status_lines) 285 286 # parser.end can return the same object multiple times, so filter out dups 287 job.tests = [] 288 already_added = set() 289 for test in tests: 290 if test not in already_added: 291 already_added.add(test) 292 job.tests.append(test) 293 294 # try and port test_idx over from the old tests, but if old tests stop 295 # matching up with new ones just give up 296 if reparse and old_job_idx is not None: 297 job.index = old_job_idx 298 for test in job.tests: 299 test_idx = old_tests.pop((test.testname, test.subdir), None) 300 if test_idx is not None: 301 test.test_idx = test_idx 302 else: 303 tko_utils.dprint("! Reparse returned new test " 304 "testname=%r subdir=%r" % 305 (test.testname, test.subdir)) 306 if not dry_run: 307 for test_idx in old_tests.itervalues(): 308 where = {'test_idx' : test_idx} 309 db.delete('tko_iteration_result', where) 310 db.delete('tko_iteration_perf_value', where) 311 db.delete('tko_iteration_attributes', where) 312 db.delete('tko_test_attributes', where) 313 db.delete('tko_test_labels_tests', {'test_id': test_idx}) 314 db.delete('tko_tests', where) 315 316 job.build = None 317 job.board = None 318 job.build_version = None 319 job.suite = None 320 if job.label: 321 label_info = site_utils.parse_job_name(job.label) 322 if label_info: 323 job.build = label_info.get('build', None) 324 job.build_version = label_info.get('build_version', None) 325 job.board = label_info.get('board', None) 326 job.suite = label_info.get('suite', None) 327 328 # Upload job details to Sponge. 329 if not dry_run: 330 sponge_url = sponge_utils.upload_results(job, log=tko_utils.dprint) 331 if sponge_url: 332 job.keyval_dict['sponge_url'] = sponge_url 333 334 # check for failures 335 message_lines = [""] 336 job_successful = True 337 for test in job.tests: 338 if not test.subdir: 339 continue 340 tko_utils.dprint("* testname, status, reason: %s %s %s" 341 % (test.subdir, test.status, test.reason)) 342 if test.status != 'GOOD': 343 job_successful = False 344 message_lines.append(format_failure_message( 345 jobname, test.kernel.base, test.subdir, 346 test.status, test.reason)) 347 try: 348 message = "\n".join(message_lines) 349 350 if not dry_run: 351 # send out a email report of failure 352 if len(message) > 2 and mail_on_failure: 353 tko_utils.dprint("Sending email report of failure on %s to %s" 354 % (jobname, job.user)) 355 mailfailure(jobname, job, message) 356 357 # write the job into the database. 358 job_data = db.insert_job( 359 jobname, job, 360 parent_job_id=job_keyval.get(constants.PARENT_JOB_ID, None)) 361 362 # Upload perf values to the perf dashboard, if applicable. 363 for test in job.tests: 364 perf_uploader.upload_test(job, test, jobname) 365 366 # Although the cursor has autocommit, we still need to force it to 367 # commit existing changes before we can use django models, otherwise 368 # it will go into deadlock when django models try to start a new 369 # trasaction while the current one has not finished yet. 370 db.commit() 371 372 # Handle retry job. 373 orig_afe_job_id = job_keyval.get(constants.RETRY_ORIGINAL_JOB_ID, 374 None) 375 if orig_afe_job_id: 376 orig_job_idx = tko_models.Job.objects.get( 377 afe_job_id=orig_afe_job_id).job_idx 378 _invalidate_original_tests(orig_job_idx, job.index) 379 except Exception as e: 380 metadata = {'path': path, 'error': str(e), 381 'details': traceback.format_exc()} 382 tko_utils.dprint("Hit exception while uploading to tko db:\n%s" % 383 traceback.format_exc()) 384 autotest_es.post(use_http=True, type_str='parse_failure', 385 metadata=metadata) 386 raise e 387 388 # Serializing job into a binary file 389 try: 390 from autotest_lib.tko import tko_pb2 391 from autotest_lib.tko import job_serializer 392 393 serializer = job_serializer.JobSerializer() 394 binary_file_name = os.path.join(path, "job.serialize") 395 serializer.serialize_to_binary(job, jobname, binary_file_name) 396 397 if reparse: 398 site_export_file = "autotest_lib.tko.site_export" 399 site_export = utils.import_site_function(__file__, 400 site_export_file, 401 "site_export", 402 _site_export_dummy) 403 site_export(binary_file_name) 404 405 except ImportError: 406 tko_utils.dprint("DEBUG: tko_pb2.py doesn't exist. Create by " 407 "compiling tko/tko.proto.") 408 409 if not dry_run: 410 db.commit() 411 412 # Generate a suite report. 413 # Check whether this is a suite job, a suite job will be a hostless job, its 414 # jobname will be <JOB_ID>-<USERNAME>/hostless, the suite field will not be 415 # NULL. Only generate timeline report when datastore_parent_key is given. 416 try: 417 datastore_parent_key = job_keyval.get('datastore_parent_key', None) 418 if (suite_report and jobname.endswith('/hostless') 419 and job_data['suite'] and datastore_parent_key): 420 tko_utils.dprint('Start dumping suite timing report...') 421 timing_log = os.path.join(path, 'suite_timing.log') 422 dump_cmd = ("%s/site_utils/dump_suite_report.py %s " 423 "--output='%s' --debug" % 424 (common.autotest_dir, job_data['afe_job_id'], 425 timing_log)) 426 subprocess.check_output(dump_cmd, shell=True) 427 tko_utils.dprint('Successfully finish dumping suite timing report') 428 429 if (datastore_creds and export_to_gcloud_path 430 and os.path.exists(export_to_gcloud_path)): 431 upload_cmd = [export_to_gcloud_path, datastore_creds, 432 timing_log, '--parent_key', 433 repr(tuple(datastore_parent_key))] 434 tko_utils.dprint('Start exporting timeline report to gcloud') 435 subprocess.check_output(upload_cmd) 436 tko_utils.dprint('Successfully export timeline report to ' 437 'gcloud') 438 else: 439 tko_utils.dprint('DEBUG: skip exporting suite timeline to ' 440 'gcloud, because either gcloud creds or ' 441 'export_to_gcloud script is not found.') 442 except Exception as e: 443 tko_utils.dprint("WARNING: fail to dump/export suite report. " 444 "Error:\n%s" % e) 445 446 # Mark GS_OFFLOADER_NO_OFFLOAD in gs_offloader_instructions at the end of 447 # the function, so any failure, e.g., db connection error, will stop 448 # gs_offloader_instructions being updated, and logs can be uploaded for 449 # troubleshooting. 450 if job_successful: 451 # Check if we should not offload this test's results. 452 if job_keyval.get(constants.JOB_OFFLOAD_FAILURES_KEY, False): 453 # Update the gs_offloader_instructions json file. 454 gs_instructions_file = os.path.join( 455 path, constants.GS_OFFLOADER_INSTRUCTIONS) 456 gs_offloader_instructions = {} 457 if os.path.exists(gs_instructions_file): 458 with open(gs_instructions_file, 'r') as f: 459 gs_offloader_instructions = json.load(f) 460 461 gs_offloader_instructions[constants.GS_OFFLOADER_NO_OFFLOAD] = True 462 with open(gs_instructions_file, 'w') as f: 463 json.dump(gs_offloader_instructions, f) 464 465 466def _site_export_dummy(binary_file_name): 467 pass 468 469 470def _get_job_subdirs(path): 471 """ 472 Returns a list of job subdirectories at path. Returns None if the test 473 is itself a job directory. Does not recurse into the subdirs. 474 """ 475 # if there's a .machines file, use it to get the subdirs 476 machine_list = os.path.join(path, ".machines") 477 if os.path.exists(machine_list): 478 subdirs = set(line.strip() for line in file(machine_list)) 479 existing_subdirs = set(subdir for subdir in subdirs 480 if os.path.exists(os.path.join(path, subdir))) 481 if len(existing_subdirs) != 0: 482 return existing_subdirs 483 484 # if this dir contains ONLY subdirectories, return them 485 contents = set(os.listdir(path)) 486 contents.discard(".parse.lock") 487 subdirs = set(sub for sub in contents if 488 os.path.isdir(os.path.join(path, sub))) 489 if len(contents) == len(subdirs) != 0: 490 return subdirs 491 492 # this is a job directory, or something else we don't understand 493 return None 494 495 496def parse_leaf_path(db, path, level, parse_options): 497 """Parse a leaf path. 498 499 @param db: database handle. 500 @param path: The path to the results to be parsed. 501 @param level: Integer, level of subdirectories to include in the job name. 502 @param parse_options: _ParseOptions instance. 503 504 @returns: The job name of the parsed job, e.g. '123-chromeos-test/host1' 505 """ 506 job_elements = path.split("/")[-level:] 507 jobname = "/".join(job_elements) 508 try: 509 db.run_with_retry(parse_one, db, jobname, path, parse_options) 510 except Exception as e: 511 tko_utils.dprint("Error parsing leaf path: %s\nException:\n%s\n%s" % 512 (path, e, traceback.format_exc())) 513 return jobname 514 515 516def parse_path(db, path, level, parse_options): 517 """Parse a path 518 519 @param db: database handle. 520 @param path: The path to the results to be parsed. 521 @param level: Integer, level of subdirectories to include in the job name. 522 @param parse_options: _ParseOptions instance. 523 524 @returns: A set of job names of the parsed jobs. 525 set(['123-chromeos-test/host1', '123-chromeos-test/host2']) 526 """ 527 processed_jobs = set() 528 job_subdirs = _get_job_subdirs(path) 529 if job_subdirs is not None: 530 # parse status.log in current directory, if it exists. multi-machine 531 # synchronous server side tests record output in this directory. without 532 # this check, we do not parse these results. 533 if os.path.exists(os.path.join(path, 'status.log')): 534 new_job = parse_leaf_path(db, path, level, parse_options) 535 processed_jobs.add(new_job) 536 # multi-machine job 537 for subdir in job_subdirs: 538 jobpath = os.path.join(path, subdir) 539 new_jobs = parse_path(db, jobpath, level + 1, parse_options) 540 processed_jobs.update(new_jobs) 541 else: 542 # single machine job 543 new_job = parse_leaf_path(db, path, level, parse_options) 544 processed_jobs.add(new_job) 545 return processed_jobs 546 547 548def record_parsing(processed_jobs, duration_secs): 549 """Record the time spent on parsing to metadata db. 550 551 @param processed_jobs: A set of job names of the parsed jobs. 552 set(['123-chromeos-test/host1', '123-chromeos-test/host2']) 553 @param duration_secs: Total time spent on parsing, in seconds. 554 """ 555 556 for job_name in processed_jobs: 557 job_id, hostname = tko_utils.get_afe_job_id_and_hostname(job_name) 558 if not job_id or not hostname: 559 tko_utils.dprint('ERROR: can not parse job name %s, ' 560 'will not send duration to metadata db.' 561 % job_name) 562 continue 563 else: 564 job_overhead.record_state_duration( 565 job_id, hostname, job_overhead.STATUS.PARSING, 566 duration_secs) 567 568 569def main(): 570 """Main entrance.""" 571 start_time = datetime.datetime.now() 572 # Record the processed jobs so that 573 # we can send the duration of parsing to metadata db. 574 processed_jobs = set() 575 576 options, args = parse_args() 577 parse_options = _ParseOptions(options.reparse, options.mailit, 578 options.dry_run, options.suite_report, 579 options.datastore_creds, 580 options.export_to_gcloud_path) 581 results_dir = os.path.abspath(args[0]) 582 assert os.path.exists(results_dir) 583 584 pid_file_manager = pidfile.PidFileManager("parser", results_dir) 585 586 if options.write_pidfile: 587 pid_file_manager.open_file() 588 589 try: 590 # build up the list of job dirs to parse 591 if options.singledir: 592 jobs_list = [results_dir] 593 else: 594 jobs_list = [os.path.join(results_dir, subdir) 595 for subdir in os.listdir(results_dir)] 596 597 # build up the database 598 db = tko_db.db(autocommit=False, host=options.db_host, 599 user=options.db_user, password=options.db_pass, 600 database=options.db_name) 601 602 # parse all the jobs 603 for path in jobs_list: 604 lockfile = open(os.path.join(path, ".parse.lock"), "w") 605 flags = fcntl.LOCK_EX 606 if options.noblock: 607 flags |= fcntl.LOCK_NB 608 try: 609 fcntl.flock(lockfile, flags) 610 except IOError, e: 611 # lock is not available and nonblock has been requested 612 if e.errno == errno.EWOULDBLOCK: 613 lockfile.close() 614 continue 615 else: 616 raise # something unexpected happened 617 try: 618 new_jobs = parse_path(db, path, options.level, parse_options) 619 processed_jobs.update(new_jobs) 620 621 finally: 622 fcntl.flock(lockfile, fcntl.LOCK_UN) 623 lockfile.close() 624 625 except Exception as e: 626 pid_file_manager.close_file(1) 627 628 metadata = {'results_dir': results_dir, 629 'error': str(e), 630 'details': traceback.format_exc()} 631 autotest_es.post(use_http=True, type_str='parse_failure_final', 632 metadata=metadata) 633 634 raise 635 else: 636 pid_file_manager.close_file(0) 637 duration_secs = (datetime.datetime.now() - start_time).total_seconds() 638 if options.record_duration: 639 record_parsing(processed_jobs, duration_secs) 640 641 642if __name__ == "__main__": 643 main() 644