1#!/usr/bin/python
2#
3# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7
8"""Script to calculate timing stats for suites.
9
10This script measures nine stats for a suite run.
111. Net suite runtime.
122. Suite scheduling overhead.
133. Average scheduling overhead.
144. Average Queuing time.
155. Average Resetting time.
166. Average provisioning time.
177. Average Running time.
188. Average Parsing time.
199. Average Gathering time.
20
21When the cron_mode is enabled, this script throws all stats but the first one
22(Net suite runtime) to Graphite because the first one is already
23being sent to Graphite by Autotest online.
24
25Net suite runtime is end-to-end time for a suite from the beginning
26to the end.
27It is stored in a field, "duration", of a type, "suite_runtime" in
28elasticsearch (ES).
29
30Suite scheduling overhead is defined by the average of DUT overheads.
31Suite is composed of one or more jobs, and those jobs are run on
32one or more DUTs that are available.
33A DUT overhead is defined by:
34    DUT_i overhead = sum(net time for job_k - runtime for job_k
35                         - runtime for special tasks of job_k)
36    Job_k are the jobs run on DUT_i.
37
38Net time for a job is the time from job_queued_time to hqe_finished_time.
39job_queued_time is stored in the "queued_time" column of "tko_jobs" table.
40hqe_finished_time is stored in the "finished_on" of "afe_host_queue_entries"
41table.
42We do not use "job_finished_time" of "tko_jobs" as job_finished_time is
43recorded before gathering/parsing/archiving.
44We do not use hqe started time ("started_on" of "afe_host_queue_entries"),
45as it does not account for the lag from a host is assigned to the job till
46the scheduler sees the assignment.
47
48Runtime for job_k is the sum of durations for the records of
49"job_time_breakdown" type in ES that have "Queued" or "Running" status.
50It is possible that a job has multiple "Queued" records when the job's test
51failed and tried again.
52We take into account only the last "Queued" record.
53
54Runtime for special tasks of job_k is the sum of durations for the records
55of "job_time_breakdown" type in ES that have "Resetting", "Provisioning",
56"Gathering", or "Parsing" status.
57We take into account only the records whose timestamp is larger than
58the timestamp of the last "Queued" record.
59"""
60
61import argparse
62from datetime import datetime
63from datetime import timedelta
64
65import common
66from autotest_lib.client.common_lib import host_queue_entry_states
67from autotest_lib.client.common_lib import time_utils
68from autotest_lib.client.common_lib.cros.graphite import autotest_es
69from autotest_lib.client.common_lib.cros.graphite import autotest_stats
70from autotest_lib.frontend import setup_django_environment
71from autotest_lib.frontend.afe import models
72from autotest_lib.frontend.tko import models as tko_models
73from autotest_lib.server import utils
74from autotest_lib.site_utils import job_overhead
75
76
77_options = None
78
79_hqes = host_queue_entry_states.Status
80_states = [
81        _hqes.QUEUED, _hqes.RESETTING, _hqes.PROVISIONING,
82        _hqes.RUNNING, _hqes.GATHERING, _hqes.PARSING]
83
84
85def mean(l):
86    """
87    Calculates an Arithmetic Mean for the numbers in a list.
88
89    @param l: A list of numbers.
90    @return: Arithmetic mean if the list is not empty.
91             Otherwise, returns zero.
92    """
93    return float(sum(l)) / len(l) if l else 0
94
95
96def print_verbose(string, *args):
97    if _options.verbose:
98        print(string % args)
99
100
101def get_nontask_runtime(job_id, dut, job_info_dict):
102    """
103    Get sum of durations for "Queued", "Running", "Parsing", and "Gathering"
104    status records.
105    job_info_dict will be modified in this function to store the duration
106    for each status.
107
108    @param job_id: The job id of interest.
109    @param dut: Hostname of a DUT that the job ran on.
110    @param job_info_dict: Dictionary that has information for jobs.
111    @return: Tuple of sum of durations and the timestamp for the last
112             Queued record.
113    """
114    results = autotest_es.query(
115            fields_returned=['status', 'duration', 'time_recorded'],
116            equality_constraints=[('_type', 'job_time_breakdown'),
117                                  ('job_id', job_id),
118                                  ('hostname', dut)],
119            sort_specs=[{'time_recorded': 'desc'}])
120
121    sum = 0
122    last_queued_timestamp = 0
123    # There could be multiple "Queued" records.
124    # Get sum of durations for the records after the last "Queued" records
125    # (including the last "Queued" record).
126    # Exploits the fact that "results" are ordered in the descending order
127    # of time_recorded.
128    for hit in results.hits:
129        job_info_dict[job_id][hit['status']] = float(hit['duration'])
130        if hit['status'] == 'Queued':
131            # The first Queued record is the last one because of the descending
132            # order of "results".
133            last_queued_timestamp = float(hit['time_recorded'])
134            sum += float(hit['duration'])
135            break
136        else:
137            sum += float(hit['duration'])
138    return (sum, last_queued_timestamp)
139
140
141def get_tasks_runtime(task_list, dut, t_start, job_id, job_info_dict):
142    """
143    Get sum of durations for special tasks.
144    job_info_dict will be modified in this function to store the duration
145    for each special task.
146
147    @param task_list: List of task id.
148    @param dut: Hostname of a DUT that the tasks ran on.
149    @param t_start: Beginning timestamp.
150    @param job_id: The job id that is related to the tasks.
151                   This is used only for debugging purpose.
152    @param job_info_dict: Dictionary that has information for jobs.
153    @return: Sum of durations of the tasks.
154    """
155    t_start_epoch = time_utils.to_epoch_time(t_start)
156    results = autotest_es.query(
157            fields_returned=['status', 'task_id', 'duration'],
158            equality_constraints=[('_type', 'job_time_breakdown'),
159                                  ('hostname', dut)],
160            range_constraints=[('time_recorded', t_start_epoch, None)],
161            batch_constraints=[('task_id', task_list)])
162    sum = 0
163    for hit in results.hits:
164        sum += float(hit['duration'])
165        job_info_dict[job_id][hit['status']] = float(hit['duration'])
166        print_verbose('Task %s for Job %s took %s',
167                      hit['task_id'], job_id, hit['duration'])
168    return sum
169
170
171def get_job_runtime(job_id, dut, job_info_dict):
172    """
173    Get sum of durations for the entries that are related to a job.
174    job_info_dict will be modified in this function.
175
176    @param job_id: The job id of interest.
177    @param dut: Hostname of a DUT that the job ran on.
178    @param job_info_dict: Dictionary that has information for jobs.
179    @return: Total duration taken by a job.
180    """
181    sum, t_last_queued = get_nontask_runtime(job_id, dut, job_info_dict)
182    print_verbose('Job %s took %f, last Queued: %s',
183                  job_id, sum, t_last_queued)
184    sum += get_tasks_runtime(
185            list(job_info_dict[job_id]['tasks']), dut, t_last_queued,
186            job_id, job_info_dict)
187    return sum
188
189
190def get_dut_overhead(dut, jobs, job_info_dict):
191    """
192    Calculates the scheduling overhead of a DUT.
193
194    The scheduling overhead of a DUT is defined by the sum of scheduling
195    overheads for the jobs that ran on the DUT.
196    The scheduling overhead for a job is defined by the difference
197    of net job runtime and real job runtime.
198    job_info_dict will be modified in this function.
199
200    @param dut: Hostname of a DUT.
201    @param jobs: The list of jobs that ran on the DUT.
202    @param job_info_dict: Dictionary that has information for jobs.
203    @return: Scheduling overhead of a DUT in a floating point value.
204             The unit is a second.
205    """
206    overheads = []
207    for job_id in jobs:
208        (t_start, t_end) = job_info_dict[job_id]['timestamps']
209        runtime = get_job_runtime(job_id, dut, job_info_dict)
210        overheads.append(t_end - t_start - runtime)
211        print_verbose('Job: %s, Net runtime: %f, Real runtime: %f, '
212                      'Overhead: %f', job_id, t_end - t_start, runtime,
213                      t_end - t_start - runtime)
214    return sum(overheads)
215
216
217def get_child_jobs_info(suite_job_id, num_child_jobs, sanity_check):
218    """
219    Gets information about child jobs of a suite.
220
221    @param suite_job_id: Job id of a suite.
222    @param num_child_jobs: Number of child jobs of the suite.
223    @param sanity_check: Do sanity check if True.
224    @return: A tuple of (dictionary, list). For dictionary, the key is
225             a DUT's hostname and the value is a list of jobs that ran on
226             the DUT. List is the list of all jobs of the suite.
227    """
228    results = autotest_es.query(
229            fields_returned=['job_id', 'hostname'],
230            equality_constraints=[('_type', 'host_history'),
231                                  ('parent_job_id', suite_job_id),
232                                  ('status', 'Running'),])
233
234    dut_jobs_dict = {}
235    job_filter = set()
236    for hit in results.hits:
237        job_id = hit['job_id']
238        dut = hit['hostname']
239        if job_id in job_filter:
240            continue
241        job_list = dut_jobs_dict.setdefault(dut, [])
242        job_list.append(job_id)
243        job_filter.add(job_id)
244
245    if sanity_check and len(job_filter) != num_child_jobs:
246        print('WARNING: Mismatch number of child jobs of a suite (%d): '
247              '%d != %d' % (suite_job_id, len(job_filter), num_child_jobs))
248    return dut_jobs_dict, list(job_filter)
249
250
251def get_job_timestamps(job_list, job_info_dict):
252    """
253    Get beginning time and ending time for each job.
254
255    The beginning time of a job is "queued_time" of "tko_jobs" table.
256    The ending time of a job is "finished_on" of "afe_host_queue_entries" table.
257    job_info_dict will be modified in this function to store the timestamps.
258
259    @param job_list: List of job ids
260    @param job_info_dict: Dictionary that timestamps for each job will be stored
261    """
262    tko = tko_models.Job.objects.filter(afe_job_id__in=job_list)
263    hqe = models.HostQueueEntry.objects.filter(job_id__in=job_list)
264    job_start = {}
265    for t in tko:
266        job_start[t.afe_job_id] = time_utils.to_epoch_time(t.queued_time)
267    job_end = {}
268    for h in hqe:
269        job_end[h.job_id] = time_utils.to_epoch_time(h.finished_on)
270
271    for job_id in job_list:
272        info_dict = job_info_dict.setdefault(job_id, {})
273        info_dict.setdefault('timestamps', (job_start[job_id], job_end[job_id]))
274
275
276def get_job_tasks(job_list, job_info_dict):
277    """
278    Get task ids for each job.
279    job_info_dict will be modified in this function to store the task ids.
280
281    @param job_list: List of job ids
282    @param job_info_dict: Dictionary that task ids for each job will be stored.
283    """
284    results = autotest_es.query(
285            fields_returned=['job_id', 'task_id'],
286            equality_constraints=[('_type', 'host_history')],
287            batch_constraints=[('job_id', job_list)])
288    for hit in results.hits:
289        if 'task_id' in hit:
290            info_dict = job_info_dict.setdefault(hit['job_id'], {})
291            task_set = info_dict.setdefault('tasks', set())
292            task_set.add(hit['task_id'])
293
294
295def get_scheduling_overhead(suite_job_id, num_child_jobs, sanity_check=True):
296    """
297    Calculates a scheduling overhead.
298
299    A scheduling overhead is defined by the average of DUT overheads
300    for the DUTs that the child jobs of a suite ran on.
301
302    @param suite_job_id: Job id of a suite.
303    @param num_child_jobs: Number of child jobs of the suite.
304    @param sanity_check: Do sanity check if True.
305    @return: Dictionary storing stats.
306    """
307    dut_jobs_dict, job_list = get_child_jobs_info(
308            suite_job_id, num_child_jobs, sanity_check)
309    job_info_dict = {}
310    get_job_timestamps(job_list, job_info_dict)
311    get_job_tasks(job_list, job_info_dict)
312
313    dut_overheads = []
314    avg_overhead = 0
315    for dut, jobs in dut_jobs_dict.iteritems():
316        print_verbose('Dut: %s, Jobs: %s', dut, jobs)
317        overhead = get_dut_overhead(dut, jobs, job_info_dict)
318        avg_overhead += overhead
319        print_verbose('Dut overhead: %f', overhead)
320        dut_overheads.append(overhead)
321
322    if job_list:
323        avg_overhead = avg_overhead / len(job_list)
324
325    state_samples_dict = {}
326    for info in job_info_dict.itervalues():
327        for state in _states:
328            if state in info:
329                samples = state_samples_dict.setdefault(state, [])
330                samples.append(info[state])
331
332    if state_samples_dict:
333        result = {state: mean(state_samples_dict[state])
334                  if state in state_samples_dict else 0
335                  for state in _states}
336    result['suite_overhead'] = mean(dut_overheads)
337    result['overhead'] = avg_overhead
338    result['num_duts'] = len(dut_jobs_dict)
339    return result
340
341
342def print_suite_stats(suite_stats):
343    """Prints out statistics for a suite to standard output."""
344    print('suite_overhead: %(suite_overhead)f, overhead: %(overhead)f,' %
345          suite_stats),
346    for state in _states:
347        if state in suite_stats:
348            print('%s: %f,' % (state, suite_stats[state])),
349    print('num_duts: %(num_duts)d' % suite_stats)
350
351
352def analyze_suites(start_time, end_time):
353    """
354    Calculates timing stats (i.e., suite runtime, scheduling overhead)
355    for the suites that finished within the timestamps given by parameters.
356
357    @param start_time: Beginning timestamp.
358    @param end_time: Ending timestamp.
359    """
360    print('Analyzing suites from %s to %s...' % (
361          time_utils.epoch_time_to_date_string(start_time),
362          time_utils.epoch_time_to_date_string(end_time)))
363
364    if _options.bvtonly:
365        batch_constraints = [
366                ('suite_name', ['bvt-inline', 'bvt-cq', 'bvt-perbuild'])]
367    else:
368        batch_constraints = []
369
370    start_time_epoch = time_utils.to_epoch_time(start_time)
371    end_time_epoch = time_utils.to_epoch_time(end_time)
372    results = autotest_es.query(
373            fields_returned=['suite_name', 'suite_job_id', 'board', 'build',
374                             'num_child_jobs', 'duration'],
375            equality_constraints=[('_type', job_overhead.SUITE_RUNTIME_KEY),],
376            range_constraints=[('time_recorded', start_time_epoch,
377                                end_time_epoch)],
378            sort_specs=[{'time_recorded': 'asc'}],
379            batch_constraints=batch_constraints)
380    print('Found %d suites' % (results.total))
381
382    for hit in results.hits:
383        suite_job_id = hit['suite_job_id']
384
385        try:
386            suite_name = hit['suite_name']
387            num_child_jobs = int(hit['num_child_jobs'])
388            suite_runtime = float(hit['duration'])
389
390            print('Suite: %s (%s), Board: %s, Build: %s, Num child jobs: %d' % (
391                    suite_name, suite_job_id, hit['board'], hit['build'],
392                    num_child_jobs))
393
394            suite_stats = get_scheduling_overhead(suite_job_id, num_child_jobs)
395            print('Suite: %s (%s) runtime: %f,' % (
396                    suite_name, suite_job_id, suite_runtime)),
397            print_suite_stats(suite_stats)
398
399            if _options.cron_mode:
400                key = utils.get_data_key(
401                        'suite_time_stats', suite_name, hit['build'],
402                        hit['board'])
403                autotest_stats.Timer(key).send('suite_runtime', suite_runtime)
404                for stat, val in suite_stats.iteritems():
405                    autotest_stats.Timer(key).send(stat, val)
406        except Exception as e:
407            print('ERROR: Exception is raised while processing suite %s' % (
408                    suite_job_id))
409            print e
410
411
412def analyze_suite(suite_job_id):
413    suite_stats = get_scheduling_overhead(suite_job_id, 0, False)
414    print('Suite (%s)' % suite_job_id),
415    print_suite_stats(suite_stats)
416
417
418def main():
419    """main script."""
420    parser = argparse.ArgumentParser(
421            formatter_class=argparse.ArgumentDefaultsHelpFormatter)
422    parser.add_argument('-c', dest='cron_mode', action='store_true',
423                        help=('Run in a cron mode. Cron mode '
424                              'sends calculated stat data to Graphite.'),
425                        default=False)
426    parser.add_argument('-s', type=int, dest='span',
427                        help=('Number of hours that stats should be '
428                              'collected.'),
429                        default=1)
430    parser.add_argument('--bvtonly', dest='bvtonly', action='store_true',
431                        help=('Gets bvt suites only (i.e., bvt-inline,'
432                              'bvt-cq, bvt-perbuild).'),
433                        default=False)
434    parser.add_argument('--suite', type=int, dest='suite_job_id',
435                        help=('Job id of a suite.'))
436    parser.add_argument('--verbose', dest='verbose', action='store_true',
437                        help=('Prints out more info if True.'),
438                        default=False)
439    global _options
440    _options = parser.parse_args()
441
442    if _options.suite_job_id:
443        analyze_suite(_options.suite_job_id)
444    else:
445        end_time = time_utils.to_epoch_time(datetime.now())
446        start_time = end_time - timedelta(hours=_options.span).total_seconds()
447        analyze_suites(start_time, end_time)
448
449
450if __name__ == '__main__':
451    main()
452