1#!/usr/bin/python -u
2
3import collections
4import datetime
5import errno
6import fcntl
7import json
8import optparse
9import os
10import socket
11import subprocess
12import sys
13import traceback
14
15import common
16from autotest_lib.client.common_lib import global_config
17from autotest_lib.client.common_lib import mail, pidfile
18from autotest_lib.client.common_lib import utils
19from autotest_lib.client.common_lib.cros.graphite import autotest_es
20from autotest_lib.frontend import setup_django_environment
21from autotest_lib.frontend.tko import models as tko_models
22from autotest_lib.server import site_utils
23from autotest_lib.server.cros.dynamic_suite import constants
24from autotest_lib.site_utils import job_overhead
25from autotest_lib.site_utils.sponge_lib import sponge_utils
26from autotest_lib.tko import db as tko_db, utils as tko_utils
27from autotest_lib.tko import models, parser_lib
28from autotest_lib.tko.perf_upload import perf_uploader
29
30_ParseOptions = collections.namedtuple(
31    'ParseOptions', ['reparse', 'mail_on_failure', 'dry_run', 'suite_report',
32                     'datastore_creds', 'export_to_gcloud_path'])
33
34def parse_args():
35    """Parse args."""
36    # build up our options parser and parse sys.argv
37    parser = optparse.OptionParser()
38    parser.add_option("-m", help="Send mail for FAILED tests",
39                      dest="mailit", action="store_true")
40    parser.add_option("-r", help="Reparse the results of a job",
41                      dest="reparse", action="store_true")
42    parser.add_option("-o", help="Parse a single results directory",
43                      dest="singledir", action="store_true")
44    parser.add_option("-l", help=("Levels of subdirectories to include "
45                                  "in the job name"),
46                      type="int", dest="level", default=1)
47    parser.add_option("-n", help="No blocking on an existing parse",
48                      dest="noblock", action="store_true")
49    parser.add_option("-s", help="Database server hostname",
50                      dest="db_host", action="store")
51    parser.add_option("-u", help="Database username", dest="db_user",
52                      action="store")
53    parser.add_option("-p", help="Database password", dest="db_pass",
54                      action="store")
55    parser.add_option("-d", help="Database name", dest="db_name",
56                      action="store")
57    parser.add_option("--dry-run", help="Do not actually commit any results.",
58                      dest="dry_run", action="store_true", default=False)
59    parser.add_option("--write-pidfile",
60                      help="write pidfile (.parser_execute)",
61                      dest="write_pidfile", action="store_true",
62                      default=False)
63    parser.add_option("--record-duration",
64                      help="Record timing to metadata db",
65                      dest="record_duration", action="store_true",
66                      default=False)
67    parser.add_option("--suite-report",
68                      help=("Allows parsing job to attempt to create a suite "
69                            "timeline report, if it detects that the job being "
70                            "parsed is a suite job."),
71                      dest="suite_report", action="store_true",
72                      default=False)
73    parser.add_option("--datastore-creds",
74                      help=("The path to gcloud datastore credentials file, "
75                            "which will be used to upload suite timeline "
76                            "report to gcloud. If not specified, the one "
77                            "defined in shadow_config will be used."),
78                      dest="datastore_creds", action="store", default=None)
79    parser.add_option("--export-to-gcloud-path",
80                      help=("The path to export_to_gcloud script. Please find "
81                            "chromite path on your server. The script is under "
82                            "chromite/bin/."),
83                      dest="export_to_gcloud_path", action="store",
84                      default=None)
85    options, args = parser.parse_args()
86
87    # we need a results directory
88    if len(args) == 0:
89        tko_utils.dprint("ERROR: at least one results directory must "
90                         "be provided")
91        parser.print_help()
92        sys.exit(1)
93
94    if not options.datastore_creds:
95        gcloud_creds = global_config.global_config.get_config_value(
96            'GCLOUD', 'cidb_datastore_writer_creds', default=None)
97        options.datastore_creds = (site_utils.get_creds_abspath(gcloud_creds)
98                                   if gcloud_creds else None)
99
100    if not options.export_to_gcloud_path:
101        export_script = 'chromiumos/chromite/bin/export_to_gcloud'
102        # If it is a lab server, the script is under ~chromeos-test/
103        if os.path.exists(os.path.expanduser('~chromeos-test/%s' %
104                                             export_script)):
105            path = os.path.expanduser('~chromeos-test/%s' % export_script)
106        # If it is a local workstation, it is probably under ~/
107        elif os.path.exists(os.path.expanduser('~/%s' % export_script)):
108            path = os.path.expanduser('~/%s' % export_script)
109        # If it is not found anywhere, the default will be set to None.
110        else:
111            path = None
112        options.export_to_gcloud_path = path
113
114    # pass the options back
115    return options, args
116
117
118def format_failure_message(jobname, kernel, testname, status, reason):
119    """Format failure message with the given information.
120
121    @param jobname: String representing the job name.
122    @param kernel: String representing the kernel.
123    @param testname: String representing the test name.
124    @param status: String representing the test status.
125    @param reason: String representing the reason.
126
127    @return: Failure message as a string.
128    """
129    format_string = "%-12s %-20s %-12s %-10s %s"
130    return format_string % (jobname, kernel, testname, status, reason)
131
132
133def mailfailure(jobname, job, message):
134    """Send an email about the failure.
135
136    @param jobname: String representing the job name.
137    @param job: A job object.
138    @param message: The message to mail.
139    """
140    message_lines = [""]
141    message_lines.append("The following tests FAILED for this job")
142    message_lines.append("http://%s/results/%s" %
143                         (socket.gethostname(), jobname))
144    message_lines.append("")
145    message_lines.append(format_failure_message("Job name", "Kernel",
146                                                "Test name", "FAIL/WARN",
147                                                "Failure reason"))
148    message_lines.append(format_failure_message("=" * 8, "=" * 6, "=" * 8,
149                                                "=" * 8, "=" * 14))
150    message_header = "\n".join(message_lines)
151
152    subject = "AUTOTEST: FAILED tests from job %s" % jobname
153    mail.send("", job.user, "", subject, message_header + message)
154
155
156def _invalidate_original_tests(orig_job_idx, retry_job_idx):
157    """Retry tests invalidates original tests.
158
159    Whenever a retry job is complete, we want to invalidate the original
160    job's test results, such that the consumers of the tko database
161    (e.g. tko frontend, wmatrix) could figure out which results are the latest.
162
163    When a retry job is parsed, we retrieve the original job's afe_job_id
164    from the retry job's keyvals, which is then converted to tko job_idx and
165    passed into this method as |orig_job_idx|.
166
167    In this method, we are going to invalidate the rows in tko_tests that are
168    associated with the original job by flipping their 'invalid' bit to True.
169    In addition, in tko_tests, we also maintain a pointer from the retry results
170    to the original results, so that later we can always know which rows in
171    tko_tests are retries and which are the corresponding original results.
172    This is done by setting the field 'invalidates_test_idx' of the tests
173    associated with the retry job.
174
175    For example, assume Job(job_idx=105) are retried by Job(job_idx=108), after
176    this method is run, their tko_tests rows will look like:
177    __________________________________________________________________________
178    test_idx| job_idx | test            | ... | invalid | invalidates_test_idx
179    10      | 105     | dummy_Fail.Error| ... | 1       | NULL
180    11      | 105     | dummy_Fail.Fail | ... | 1       | NULL
181    ...
182    20      | 108     | dummy_Fail.Error| ... | 0       | 10
183    21      | 108     | dummy_Fail.Fail | ... | 0       | 11
184    __________________________________________________________________________
185    Note the invalid bits of the rows for Job(job_idx=105) are set to '1'.
186    And the 'invalidates_test_idx' fields of the rows for Job(job_idx=108)
187    are set to 10 and 11 (the test_idx of the rows for the original job).
188
189    @param orig_job_idx: An integer representing the original job's
190                         tko job_idx. Tests associated with this job will
191                         be marked as 'invalid'.
192    @param retry_job_idx: An integer representing the retry job's
193                          tko job_idx. The field 'invalidates_test_idx'
194                          of the tests associated with this job will be updated.
195
196    """
197    msg = 'orig_job_idx: %s, retry_job_idx: %s' % (orig_job_idx, retry_job_idx)
198    if not orig_job_idx or not retry_job_idx:
199        tko_utils.dprint('ERROR: Could not invalidate tests: ' + msg)
200    # Using django models here makes things easier, but make sure that
201    # before this method is called, all other relevant transactions have been
202    # committed to avoid race condition. In the long run, we might consider
203    # to make the rest of parser use django models.
204    orig_tests = tko_models.Test.objects.filter(job__job_idx=orig_job_idx)
205    retry_tests = tko_models.Test.objects.filter(job__job_idx=retry_job_idx)
206
207    # Invalidate original tests.
208    orig_tests.update(invalid=True)
209
210    # Maintain a dictionary that maps (test, subdir) to original tests.
211    # Note that within the scope of a job, (test, subdir) uniquelly
212    # identifies a test run, but 'test' does not.
213    # In a control file, one could run the same test with different
214    # 'subdir_tag', for example,
215    #     job.run_test('dummy_Fail', tag='Error', subdir_tag='subdir_1')
216    #     job.run_test('dummy_Fail', tag='Error', subdir_tag='subdir_2')
217    # In tko, we will get
218    #    (test='dummy_Fail.Error', subdir='dummy_Fail.Error.subdir_1')
219    #    (test='dummy_Fail.Error', subdir='dummy_Fail.Error.subdir_2')
220    invalidated_tests = {(orig_test.test, orig_test.subdir): orig_test
221                         for orig_test in orig_tests}
222    for retry in retry_tests:
223        # It is possible that (retry.test, retry.subdir) doesn't exist
224        # in invalidated_tests. This could happen when the original job
225        # didn't run some of its tests. For example, a dut goes offline
226        # since the beginning of the job, in which case invalidated_tests
227        # will only have one entry for 'SERVER_JOB'.
228        orig_test = invalidated_tests.get((retry.test, retry.subdir), None)
229        if orig_test:
230            retry.invalidates_test = orig_test
231            retry.save()
232    tko_utils.dprint('DEBUG: Invalidated tests associated to job: ' + msg)
233
234
235def parse_one(db, jobname, path, parse_options):
236    """Parse a single job. Optionally send email on failure.
237
238    @param db: database object.
239    @param jobname: the tag used to search for existing job in db,
240                    e.g. '1234-chromeos-test/host1'
241    @param path: The path to the results to be parsed.
242    @param parse_options: _ParseOptions instance.
243    """
244    reparse = parse_options.reparse
245    mail_on_failure = parse_options.mail_on_failure
246    dry_run = parse_options.dry_run
247    suite_report = parse_options.suite_report
248    datastore_creds = parse_options.datastore_creds
249    export_to_gcloud_path = parse_options.export_to_gcloud_path
250
251    tko_utils.dprint("\nScanning %s (%s)" % (jobname, path))
252    old_job_idx = db.find_job(jobname)
253    # old tests is a dict from tuple (test_name, subdir) to test_idx
254    old_tests = {}
255    if old_job_idx is not None:
256        if not reparse:
257            tko_utils.dprint("! Job is already parsed, done")
258            return
259
260        raw_old_tests = db.select("test_idx,subdir,test", "tko_tests",
261                                  {"job_idx": old_job_idx})
262        if raw_old_tests:
263            old_tests = dict(((test, subdir), test_idx)
264                             for test_idx, subdir, test in raw_old_tests)
265
266    # look up the status version
267    job_keyval = models.job.read_keyval(path)
268    status_version = job_keyval.get("status_version", 0)
269
270    # parse out the job
271    parser = parser_lib.parser(status_version)
272    job = parser.make_job(path)
273    status_log = os.path.join(path, "status.log")
274    if not os.path.exists(status_log):
275        status_log = os.path.join(path, "status")
276    if not os.path.exists(status_log):
277        tko_utils.dprint("! Unable to parse job, no status file")
278        return
279
280    # parse the status logs
281    tko_utils.dprint("+ Parsing dir=%s, jobname=%s" % (path, jobname))
282    status_lines = open(status_log).readlines()
283    parser.start(job)
284    tests = parser.end(status_lines)
285
286    # parser.end can return the same object multiple times, so filter out dups
287    job.tests = []
288    already_added = set()
289    for test in tests:
290        if test not in already_added:
291            already_added.add(test)
292            job.tests.append(test)
293
294    # try and port test_idx over from the old tests, but if old tests stop
295    # matching up with new ones just give up
296    if reparse and old_job_idx is not None:
297        job.index = old_job_idx
298        for test in job.tests:
299            test_idx = old_tests.pop((test.testname, test.subdir), None)
300            if test_idx is not None:
301                test.test_idx = test_idx
302            else:
303                tko_utils.dprint("! Reparse returned new test "
304                                 "testname=%r subdir=%r" %
305                                 (test.testname, test.subdir))
306        if not dry_run:
307            for test_idx in old_tests.itervalues():
308                where = {'test_idx' : test_idx}
309                db.delete('tko_iteration_result', where)
310                db.delete('tko_iteration_perf_value', where)
311                db.delete('tko_iteration_attributes', where)
312                db.delete('tko_test_attributes', where)
313                db.delete('tko_test_labels_tests', {'test_id': test_idx})
314                db.delete('tko_tests', where)
315
316    job.build = None
317    job.board = None
318    job.build_version = None
319    job.suite = None
320    if job.label:
321        label_info = site_utils.parse_job_name(job.label)
322        if label_info:
323            job.build = label_info.get('build', None)
324            job.build_version = label_info.get('build_version', None)
325            job.board = label_info.get('board', None)
326            job.suite = label_info.get('suite', None)
327
328    # Upload job details to Sponge.
329    if not dry_run:
330        sponge_url = sponge_utils.upload_results(job, log=tko_utils.dprint)
331        if sponge_url:
332            job.keyval_dict['sponge_url'] = sponge_url
333
334    # check for failures
335    message_lines = [""]
336    job_successful = True
337    for test in job.tests:
338        if not test.subdir:
339            continue
340        tko_utils.dprint("* testname, status, reason: %s %s %s"
341                         % (test.subdir, test.status, test.reason))
342        if test.status != 'GOOD':
343            job_successful = False
344            message_lines.append(format_failure_message(
345                jobname, test.kernel.base, test.subdir,
346                test.status, test.reason))
347    try:
348        message = "\n".join(message_lines)
349
350        if not dry_run:
351            # send out a email report of failure
352            if len(message) > 2 and mail_on_failure:
353                tko_utils.dprint("Sending email report of failure on %s to %s"
354                                 % (jobname, job.user))
355                mailfailure(jobname, job, message)
356
357            # write the job into the database.
358            job_data = db.insert_job(
359                jobname, job,
360                parent_job_id=job_keyval.get(constants.PARENT_JOB_ID, None))
361
362            # Upload perf values to the perf dashboard, if applicable.
363            for test in job.tests:
364                perf_uploader.upload_test(job, test, jobname)
365
366            # Although the cursor has autocommit, we still need to force it to
367            # commit existing changes before we can use django models, otherwise
368            # it will go into deadlock when django models try to start a new
369            # trasaction while the current one has not finished yet.
370            db.commit()
371
372            # Handle retry job.
373            orig_afe_job_id = job_keyval.get(constants.RETRY_ORIGINAL_JOB_ID,
374                                             None)
375            if orig_afe_job_id:
376                orig_job_idx = tko_models.Job.objects.get(
377                        afe_job_id=orig_afe_job_id).job_idx
378                _invalidate_original_tests(orig_job_idx, job.index)
379    except Exception as e:
380        metadata = {'path': path, 'error': str(e),
381                    'details': traceback.format_exc()}
382        tko_utils.dprint("Hit exception while uploading to tko db:\n%s" %
383                         traceback.format_exc())
384        autotest_es.post(use_http=True, type_str='parse_failure',
385                         metadata=metadata)
386        raise e
387
388    # Serializing job into a binary file
389    try:
390        from autotest_lib.tko import tko_pb2
391        from autotest_lib.tko import job_serializer
392
393        serializer = job_serializer.JobSerializer()
394        binary_file_name = os.path.join(path, "job.serialize")
395        serializer.serialize_to_binary(job, jobname, binary_file_name)
396
397        if reparse:
398            site_export_file = "autotest_lib.tko.site_export"
399            site_export = utils.import_site_function(__file__,
400                                                     site_export_file,
401                                                     "site_export",
402                                                     _site_export_dummy)
403            site_export(binary_file_name)
404
405    except ImportError:
406        tko_utils.dprint("DEBUG: tko_pb2.py doesn't exist. Create by "
407                         "compiling tko/tko.proto.")
408
409    if not dry_run:
410        db.commit()
411
412    # Generate a suite report.
413    # Check whether this is a suite job, a suite job will be a hostless job, its
414    # jobname will be <JOB_ID>-<USERNAME>/hostless, the suite field will not be
415    # NULL. Only generate timeline report when datastore_parent_key is given.
416    try:
417        datastore_parent_key = job_keyval.get('datastore_parent_key', None)
418        if (suite_report and jobname.endswith('/hostless')
419            and job_data['suite'] and datastore_parent_key):
420            tko_utils.dprint('Start dumping suite timing report...')
421            timing_log = os.path.join(path, 'suite_timing.log')
422            dump_cmd = ("%s/site_utils/dump_suite_report.py %s "
423                        "--output='%s' --debug" %
424                        (common.autotest_dir, job_data['afe_job_id'],
425                         timing_log))
426            subprocess.check_output(dump_cmd, shell=True)
427            tko_utils.dprint('Successfully finish dumping suite timing report')
428
429            if (datastore_creds and export_to_gcloud_path
430                and os.path.exists(export_to_gcloud_path)):
431                upload_cmd = [export_to_gcloud_path, datastore_creds,
432                              timing_log, '--parent_key',
433                              repr(tuple(datastore_parent_key))]
434                tko_utils.dprint('Start exporting timeline report to gcloud')
435                subprocess.check_output(upload_cmd)
436                tko_utils.dprint('Successfully export timeline report to '
437                                 'gcloud')
438            else:
439                tko_utils.dprint('DEBUG: skip exporting suite timeline to '
440                                 'gcloud, because either gcloud creds or '
441                                 'export_to_gcloud script is not found.')
442    except Exception as e:
443        tko_utils.dprint("WARNING: fail to dump/export suite report. "
444                         "Error:\n%s" % e)
445
446    # Mark GS_OFFLOADER_NO_OFFLOAD in gs_offloader_instructions at the end of
447    # the function, so any failure, e.g., db connection error, will stop
448    # gs_offloader_instructions being updated, and logs can be uploaded for
449    # troubleshooting.
450    if job_successful:
451        # Check if we should not offload this test's results.
452        if job_keyval.get(constants.JOB_OFFLOAD_FAILURES_KEY, False):
453            # Update the gs_offloader_instructions json file.
454            gs_instructions_file = os.path.join(
455                    path, constants.GS_OFFLOADER_INSTRUCTIONS)
456            gs_offloader_instructions = {}
457            if os.path.exists(gs_instructions_file):
458                with open(gs_instructions_file, 'r') as f:
459                    gs_offloader_instructions = json.load(f)
460
461            gs_offloader_instructions[constants.GS_OFFLOADER_NO_OFFLOAD] = True
462            with open(gs_instructions_file, 'w') as f:
463                json.dump(gs_offloader_instructions, f)
464
465
466def _site_export_dummy(binary_file_name):
467    pass
468
469
470def _get_job_subdirs(path):
471    """
472    Returns a list of job subdirectories at path. Returns None if the test
473    is itself a job directory. Does not recurse into the subdirs.
474    """
475    # if there's a .machines file, use it to get the subdirs
476    machine_list = os.path.join(path, ".machines")
477    if os.path.exists(machine_list):
478        subdirs = set(line.strip() for line in file(machine_list))
479        existing_subdirs = set(subdir for subdir in subdirs
480                               if os.path.exists(os.path.join(path, subdir)))
481        if len(existing_subdirs) != 0:
482            return existing_subdirs
483
484    # if this dir contains ONLY subdirectories, return them
485    contents = set(os.listdir(path))
486    contents.discard(".parse.lock")
487    subdirs = set(sub for sub in contents if
488                  os.path.isdir(os.path.join(path, sub)))
489    if len(contents) == len(subdirs) != 0:
490        return subdirs
491
492    # this is a job directory, or something else we don't understand
493    return None
494
495
496def parse_leaf_path(db, path, level, parse_options):
497    """Parse a leaf path.
498
499    @param db: database handle.
500    @param path: The path to the results to be parsed.
501    @param level: Integer, level of subdirectories to include in the job name.
502    @param parse_options: _ParseOptions instance.
503
504    @returns: The job name of the parsed job, e.g. '123-chromeos-test/host1'
505    """
506    job_elements = path.split("/")[-level:]
507    jobname = "/".join(job_elements)
508    try:
509        db.run_with_retry(parse_one, db, jobname, path, parse_options)
510    except Exception as e:
511        tko_utils.dprint("Error parsing leaf path: %s\nException:\n%s\n%s" %
512                         (path, e, traceback.format_exc()))
513    return jobname
514
515
516def parse_path(db, path, level, parse_options):
517    """Parse a path
518
519    @param db: database handle.
520    @param path: The path to the results to be parsed.
521    @param level: Integer, level of subdirectories to include in the job name.
522    @param parse_options: _ParseOptions instance.
523
524    @returns: A set of job names of the parsed jobs.
525              set(['123-chromeos-test/host1', '123-chromeos-test/host2'])
526    """
527    processed_jobs = set()
528    job_subdirs = _get_job_subdirs(path)
529    if job_subdirs is not None:
530        # parse status.log in current directory, if it exists. multi-machine
531        # synchronous server side tests record output in this directory. without
532        # this check, we do not parse these results.
533        if os.path.exists(os.path.join(path, 'status.log')):
534            new_job = parse_leaf_path(db, path, level, parse_options)
535            processed_jobs.add(new_job)
536        # multi-machine job
537        for subdir in job_subdirs:
538            jobpath = os.path.join(path, subdir)
539            new_jobs = parse_path(db, jobpath, level + 1, parse_options)
540            processed_jobs.update(new_jobs)
541    else:
542        # single machine job
543        new_job = parse_leaf_path(db, path, level, parse_options)
544        processed_jobs.add(new_job)
545    return processed_jobs
546
547
548def record_parsing(processed_jobs, duration_secs):
549    """Record the time spent on parsing to metadata db.
550
551    @param processed_jobs: A set of job names of the parsed jobs.
552              set(['123-chromeos-test/host1', '123-chromeos-test/host2'])
553    @param duration_secs: Total time spent on parsing, in seconds.
554    """
555
556    for job_name in processed_jobs:
557        job_id, hostname = tko_utils.get_afe_job_id_and_hostname(job_name)
558        if not job_id or not hostname:
559            tko_utils.dprint('ERROR: can not parse job name %s, '
560                             'will not send duration to metadata db.'
561                             % job_name)
562            continue
563        else:
564            job_overhead.record_state_duration(
565                    job_id, hostname, job_overhead.STATUS.PARSING,
566                    duration_secs)
567
568
569def main():
570    """Main entrance."""
571    start_time = datetime.datetime.now()
572    # Record the processed jobs so that
573    # we can send the duration of parsing to metadata db.
574    processed_jobs = set()
575
576    options, args = parse_args()
577    parse_options = _ParseOptions(options.reparse, options.mailit,
578                                  options.dry_run, options.suite_report,
579                                  options.datastore_creds,
580                                  options.export_to_gcloud_path)
581    results_dir = os.path.abspath(args[0])
582    assert os.path.exists(results_dir)
583
584    pid_file_manager = pidfile.PidFileManager("parser", results_dir)
585
586    if options.write_pidfile:
587        pid_file_manager.open_file()
588
589    try:
590        # build up the list of job dirs to parse
591        if options.singledir:
592            jobs_list = [results_dir]
593        else:
594            jobs_list = [os.path.join(results_dir, subdir)
595                         for subdir in os.listdir(results_dir)]
596
597        # build up the database
598        db = tko_db.db(autocommit=False, host=options.db_host,
599                       user=options.db_user, password=options.db_pass,
600                       database=options.db_name)
601
602        # parse all the jobs
603        for path in jobs_list:
604            lockfile = open(os.path.join(path, ".parse.lock"), "w")
605            flags = fcntl.LOCK_EX
606            if options.noblock:
607                flags |= fcntl.LOCK_NB
608            try:
609                fcntl.flock(lockfile, flags)
610            except IOError, e:
611                # lock is not available and nonblock has been requested
612                if e.errno == errno.EWOULDBLOCK:
613                    lockfile.close()
614                    continue
615                else:
616                    raise # something unexpected happened
617            try:
618                new_jobs = parse_path(db, path, options.level, parse_options)
619                processed_jobs.update(new_jobs)
620
621            finally:
622                fcntl.flock(lockfile, fcntl.LOCK_UN)
623                lockfile.close()
624
625    except Exception as e:
626        pid_file_manager.close_file(1)
627
628        metadata = {'results_dir': results_dir,
629                    'error': str(e),
630                    'details': traceback.format_exc()}
631        autotest_es.post(use_http=True, type_str='parse_failure_final',
632                         metadata=metadata)
633
634        raise
635    else:
636        pid_file_manager.close_file(0)
637    duration_secs = (datetime.datetime.now() - start_time).total_seconds()
638    if options.record_duration:
639        record_parsing(processed_jobs, duration_secs)
640
641
642if __name__ == "__main__":
643    main()
644