1#!/usr/bin/python 2# 3# Copyright (c) 2014 The Chromium OS Authors. All rights reserved. 4# Use of this source code is governed by a BSD-style license that can be 5# found in the LICENSE file. 6 7 8"""Script to calculate timing stats for suites. 9 10This script measures nine stats for a suite run. 111. Net suite runtime. 122. Suite scheduling overhead. 133. Average scheduling overhead. 144. Average Queuing time. 155. Average Resetting time. 166. Average provisioning time. 177. Average Running time. 188. Average Parsing time. 199. Average Gathering time. 20 21When the cron_mode is enabled, this script throws all stats but the first one 22(Net suite runtime) to Graphite because the first one is already 23being sent to Graphite by Autotest online. 24 25Net suite runtime is end-to-end time for a suite from the beginning 26to the end. 27It is stored in a field, "duration", of a type, "suite_runtime" in 28elasticsearch (ES). 29 30Suite scheduling overhead is defined by the average of DUT overheads. 31Suite is composed of one or more jobs, and those jobs are run on 32one or more DUTs that are available. 33A DUT overhead is defined by: 34 DUT_i overhead = sum(net time for job_k - runtime for job_k 35 - runtime for special tasks of job_k) 36 Job_k are the jobs run on DUT_i. 37 38Net time for a job is the time from job_queued_time to hqe_finished_time. 39job_queued_time is stored in the "queued_time" column of "tko_jobs" table. 40hqe_finished_time is stored in the "finished_on" of "afe_host_queue_entries" 41table. 42We do not use "job_finished_time" of "tko_jobs" as job_finished_time is 43recorded before gathering/parsing/archiving. 44We do not use hqe started time ("started_on" of "afe_host_queue_entries"), 45as it does not account for the lag from a host is assigned to the job till 46the scheduler sees the assignment. 47 48Runtime for job_k is the sum of durations for the records of 49"job_time_breakdown" type in ES that have "Queued" or "Running" status. 50It is possible that a job has multiple "Queued" records when the job's test 51failed and tried again. 52We take into account only the last "Queued" record. 53 54Runtime for special tasks of job_k is the sum of durations for the records 55of "job_time_breakdown" type in ES that have "Resetting", "Provisioning", 56"Gathering", or "Parsing" status. 57We take into account only the records whose timestamp is larger than 58the timestamp of the last "Queued" record. 59""" 60 61import argparse 62from datetime import datetime 63from datetime import timedelta 64 65import common 66from autotest_lib.client.common_lib import host_queue_entry_states 67from autotest_lib.client.common_lib import time_utils 68from autotest_lib.client.common_lib.cros.graphite import autotest_es 69from autotest_lib.client.common_lib.cros.graphite import autotest_stats 70from autotest_lib.frontend import setup_django_environment 71from autotest_lib.frontend.afe import models 72from autotest_lib.frontend.tko import models as tko_models 73from autotest_lib.server import utils 74from autotest_lib.site_utils import job_overhead 75 76 77_options = None 78 79_hqes = host_queue_entry_states.Status 80_states = [ 81 _hqes.QUEUED, _hqes.RESETTING, _hqes.PROVISIONING, 82 _hqes.RUNNING, _hqes.GATHERING, _hqes.PARSING] 83 84 85def mean(l): 86 """ 87 Calculates an Arithmetic Mean for the numbers in a list. 88 89 @param l: A list of numbers. 90 @return: Arithmetic mean if the list is not empty. 91 Otherwise, returns zero. 92 """ 93 return float(sum(l)) / len(l) if l else 0 94 95 96def print_verbose(string, *args): 97 if _options.verbose: 98 print(string % args) 99 100 101def get_nontask_runtime(job_id, dut, job_info_dict): 102 """ 103 Get sum of durations for "Queued", "Running", "Parsing", and "Gathering" 104 status records. 105 job_info_dict will be modified in this function to store the duration 106 for each status. 107 108 @param job_id: The job id of interest. 109 @param dut: Hostname of a DUT that the job ran on. 110 @param job_info_dict: Dictionary that has information for jobs. 111 @return: Tuple of sum of durations and the timestamp for the last 112 Queued record. 113 """ 114 results = autotest_es.query( 115 fields_returned=['status', 'duration', 'time_recorded'], 116 equality_constraints=[('_type', 'job_time_breakdown'), 117 ('job_id', job_id), 118 ('hostname', dut)], 119 sort_specs=[{'time_recorded': 'desc'}]) 120 121 sum = 0 122 last_queued_timestamp = 0 123 # There could be multiple "Queued" records. 124 # Get sum of durations for the records after the last "Queued" records 125 # (including the last "Queued" record). 126 # Exploits the fact that "results" are ordered in the descending order 127 # of time_recorded. 128 for hit in results.hits: 129 job_info_dict[job_id][hit['status']] = float(hit['duration']) 130 if hit['status'] == 'Queued': 131 # The first Queued record is the last one because of the descending 132 # order of "results". 133 last_queued_timestamp = float(hit['time_recorded']) 134 sum += float(hit['duration']) 135 break 136 else: 137 sum += float(hit['duration']) 138 return (sum, last_queued_timestamp) 139 140 141def get_tasks_runtime(task_list, dut, t_start, job_id, job_info_dict): 142 """ 143 Get sum of durations for special tasks. 144 job_info_dict will be modified in this function to store the duration 145 for each special task. 146 147 @param task_list: List of task id. 148 @param dut: Hostname of a DUT that the tasks ran on. 149 @param t_start: Beginning timestamp. 150 @param job_id: The job id that is related to the tasks. 151 This is used only for debugging purpose. 152 @param job_info_dict: Dictionary that has information for jobs. 153 @return: Sum of durations of the tasks. 154 """ 155 t_start_epoch = time_utils.to_epoch_time(t_start) 156 results = autotest_es.query( 157 fields_returned=['status', 'task_id', 'duration'], 158 equality_constraints=[('_type', 'job_time_breakdown'), 159 ('hostname', dut)], 160 range_constraints=[('time_recorded', t_start_epoch, None)], 161 batch_constraints=[('task_id', task_list)]) 162 sum = 0 163 for hit in results.hits: 164 sum += float(hit['duration']) 165 job_info_dict[job_id][hit['status']] = float(hit['duration']) 166 print_verbose('Task %s for Job %s took %s', 167 hit['task_id'], job_id, hit['duration']) 168 return sum 169 170 171def get_job_runtime(job_id, dut, job_info_dict): 172 """ 173 Get sum of durations for the entries that are related to a job. 174 job_info_dict will be modified in this function. 175 176 @param job_id: The job id of interest. 177 @param dut: Hostname of a DUT that the job ran on. 178 @param job_info_dict: Dictionary that has information for jobs. 179 @return: Total duration taken by a job. 180 """ 181 sum, t_last_queued = get_nontask_runtime(job_id, dut, job_info_dict) 182 print_verbose('Job %s took %f, last Queued: %s', 183 job_id, sum, t_last_queued) 184 sum += get_tasks_runtime( 185 list(job_info_dict[job_id]['tasks']), dut, t_last_queued, 186 job_id, job_info_dict) 187 return sum 188 189 190def get_dut_overhead(dut, jobs, job_info_dict): 191 """ 192 Calculates the scheduling overhead of a DUT. 193 194 The scheduling overhead of a DUT is defined by the sum of scheduling 195 overheads for the jobs that ran on the DUT. 196 The scheduling overhead for a job is defined by the difference 197 of net job runtime and real job runtime. 198 job_info_dict will be modified in this function. 199 200 @param dut: Hostname of a DUT. 201 @param jobs: The list of jobs that ran on the DUT. 202 @param job_info_dict: Dictionary that has information for jobs. 203 @return: Scheduling overhead of a DUT in a floating point value. 204 The unit is a second. 205 """ 206 overheads = [] 207 for job_id in jobs: 208 (t_start, t_end) = job_info_dict[job_id]['timestamps'] 209 runtime = get_job_runtime(job_id, dut, job_info_dict) 210 overheads.append(t_end - t_start - runtime) 211 print_verbose('Job: %s, Net runtime: %f, Real runtime: %f, ' 212 'Overhead: %f', job_id, t_end - t_start, runtime, 213 t_end - t_start - runtime) 214 return sum(overheads) 215 216 217def get_child_jobs_info(suite_job_id, num_child_jobs, sanity_check): 218 """ 219 Gets information about child jobs of a suite. 220 221 @param suite_job_id: Job id of a suite. 222 @param num_child_jobs: Number of child jobs of the suite. 223 @param sanity_check: Do sanity check if True. 224 @return: A tuple of (dictionary, list). For dictionary, the key is 225 a DUT's hostname and the value is a list of jobs that ran on 226 the DUT. List is the list of all jobs of the suite. 227 """ 228 results = autotest_es.query( 229 fields_returned=['job_id', 'hostname'], 230 equality_constraints=[('_type', 'host_history'), 231 ('parent_job_id', suite_job_id), 232 ('status', 'Running'),]) 233 234 dut_jobs_dict = {} 235 job_filter = set() 236 for hit in results.hits: 237 job_id = hit['job_id'] 238 dut = hit['hostname'] 239 if job_id in job_filter: 240 continue 241 job_list = dut_jobs_dict.setdefault(dut, []) 242 job_list.append(job_id) 243 job_filter.add(job_id) 244 245 if sanity_check and len(job_filter) != num_child_jobs: 246 print('WARNING: Mismatch number of child jobs of a suite (%d): ' 247 '%d != %d' % (suite_job_id, len(job_filter), num_child_jobs)) 248 return dut_jobs_dict, list(job_filter) 249 250 251def get_job_timestamps(job_list, job_info_dict): 252 """ 253 Get beginning time and ending time for each job. 254 255 The beginning time of a job is "queued_time" of "tko_jobs" table. 256 The ending time of a job is "finished_on" of "afe_host_queue_entries" table. 257 job_info_dict will be modified in this function to store the timestamps. 258 259 @param job_list: List of job ids 260 @param job_info_dict: Dictionary that timestamps for each job will be stored 261 """ 262 tko = tko_models.Job.objects.filter(afe_job_id__in=job_list) 263 hqe = models.HostQueueEntry.objects.filter(job_id__in=job_list) 264 job_start = {} 265 for t in tko: 266 job_start[t.afe_job_id] = time_utils.to_epoch_time(t.queued_time) 267 job_end = {} 268 for h in hqe: 269 job_end[h.job_id] = time_utils.to_epoch_time(h.finished_on) 270 271 for job_id in job_list: 272 info_dict = job_info_dict.setdefault(job_id, {}) 273 info_dict.setdefault('timestamps', (job_start[job_id], job_end[job_id])) 274 275 276def get_job_tasks(job_list, job_info_dict): 277 """ 278 Get task ids for each job. 279 job_info_dict will be modified in this function to store the task ids. 280 281 @param job_list: List of job ids 282 @param job_info_dict: Dictionary that task ids for each job will be stored. 283 """ 284 results = autotest_es.query( 285 fields_returned=['job_id', 'task_id'], 286 equality_constraints=[('_type', 'host_history')], 287 batch_constraints=[('job_id', job_list)]) 288 for hit in results.hits: 289 if 'task_id' in hit: 290 info_dict = job_info_dict.setdefault(hit['job_id'], {}) 291 task_set = info_dict.setdefault('tasks', set()) 292 task_set.add(hit['task_id']) 293 294 295def get_scheduling_overhead(suite_job_id, num_child_jobs, sanity_check=True): 296 """ 297 Calculates a scheduling overhead. 298 299 A scheduling overhead is defined by the average of DUT overheads 300 for the DUTs that the child jobs of a suite ran on. 301 302 @param suite_job_id: Job id of a suite. 303 @param num_child_jobs: Number of child jobs of the suite. 304 @param sanity_check: Do sanity check if True. 305 @return: Dictionary storing stats. 306 """ 307 dut_jobs_dict, job_list = get_child_jobs_info( 308 suite_job_id, num_child_jobs, sanity_check) 309 job_info_dict = {} 310 get_job_timestamps(job_list, job_info_dict) 311 get_job_tasks(job_list, job_info_dict) 312 313 dut_overheads = [] 314 avg_overhead = 0 315 for dut, jobs in dut_jobs_dict.iteritems(): 316 print_verbose('Dut: %s, Jobs: %s', dut, jobs) 317 overhead = get_dut_overhead(dut, jobs, job_info_dict) 318 avg_overhead += overhead 319 print_verbose('Dut overhead: %f', overhead) 320 dut_overheads.append(overhead) 321 322 if job_list: 323 avg_overhead = avg_overhead / len(job_list) 324 325 state_samples_dict = {} 326 for info in job_info_dict.itervalues(): 327 for state in _states: 328 if state in info: 329 samples = state_samples_dict.setdefault(state, []) 330 samples.append(info[state]) 331 332 if state_samples_dict: 333 result = {state: mean(state_samples_dict[state]) 334 if state in state_samples_dict else 0 335 for state in _states} 336 result['suite_overhead'] = mean(dut_overheads) 337 result['overhead'] = avg_overhead 338 result['num_duts'] = len(dut_jobs_dict) 339 return result 340 341 342def print_suite_stats(suite_stats): 343 """Prints out statistics for a suite to standard output.""" 344 print('suite_overhead: %(suite_overhead)f, overhead: %(overhead)f,' % 345 suite_stats), 346 for state in _states: 347 if state in suite_stats: 348 print('%s: %f,' % (state, suite_stats[state])), 349 print('num_duts: %(num_duts)d' % suite_stats) 350 351 352def analyze_suites(start_time, end_time): 353 """ 354 Calculates timing stats (i.e., suite runtime, scheduling overhead) 355 for the suites that finished within the timestamps given by parameters. 356 357 @param start_time: Beginning timestamp. 358 @param end_time: Ending timestamp. 359 """ 360 print('Analyzing suites from %s to %s...' % ( 361 time_utils.epoch_time_to_date_string(start_time), 362 time_utils.epoch_time_to_date_string(end_time))) 363 364 if _options.bvtonly: 365 batch_constraints = [ 366 ('suite_name', ['bvt-inline', 'bvt-cq', 'bvt-perbuild'])] 367 else: 368 batch_constraints = [] 369 370 start_time_epoch = time_utils.to_epoch_time(start_time) 371 end_time_epoch = time_utils.to_epoch_time(end_time) 372 results = autotest_es.query( 373 fields_returned=['suite_name', 'suite_job_id', 'board', 'build', 374 'num_child_jobs', 'duration'], 375 equality_constraints=[('_type', job_overhead.SUITE_RUNTIME_KEY),], 376 range_constraints=[('time_recorded', start_time_epoch, 377 end_time_epoch)], 378 sort_specs=[{'time_recorded': 'asc'}], 379 batch_constraints=batch_constraints) 380 print('Found %d suites' % (results.total)) 381 382 for hit in results.hits: 383 suite_job_id = hit['suite_job_id'] 384 385 try: 386 suite_name = hit['suite_name'] 387 num_child_jobs = int(hit['num_child_jobs']) 388 suite_runtime = float(hit['duration']) 389 390 print('Suite: %s (%s), Board: %s, Build: %s, Num child jobs: %d' % ( 391 suite_name, suite_job_id, hit['board'], hit['build'], 392 num_child_jobs)) 393 394 suite_stats = get_scheduling_overhead(suite_job_id, num_child_jobs) 395 print('Suite: %s (%s) runtime: %f,' % ( 396 suite_name, suite_job_id, suite_runtime)), 397 print_suite_stats(suite_stats) 398 399 if _options.cron_mode: 400 key = utils.get_data_key( 401 'suite_time_stats', suite_name, hit['build'], 402 hit['board']) 403 autotest_stats.Timer(key).send('suite_runtime', suite_runtime) 404 for stat, val in suite_stats.iteritems(): 405 autotest_stats.Timer(key).send(stat, val) 406 except Exception as e: 407 print('ERROR: Exception is raised while processing suite %s' % ( 408 suite_job_id)) 409 print e 410 411 412def analyze_suite(suite_job_id): 413 suite_stats = get_scheduling_overhead(suite_job_id, 0, False) 414 print('Suite (%s)' % suite_job_id), 415 print_suite_stats(suite_stats) 416 417 418def main(): 419 """main script.""" 420 parser = argparse.ArgumentParser( 421 formatter_class=argparse.ArgumentDefaultsHelpFormatter) 422 parser.add_argument('-c', dest='cron_mode', action='store_true', 423 help=('Run in a cron mode. Cron mode ' 424 'sends calculated stat data to Graphite.'), 425 default=False) 426 parser.add_argument('-s', type=int, dest='span', 427 help=('Number of hours that stats should be ' 428 'collected.'), 429 default=1) 430 parser.add_argument('--bvtonly', dest='bvtonly', action='store_true', 431 help=('Gets bvt suites only (i.e., bvt-inline,' 432 'bvt-cq, bvt-perbuild).'), 433 default=False) 434 parser.add_argument('--suite', type=int, dest='suite_job_id', 435 help=('Job id of a suite.')) 436 parser.add_argument('--verbose', dest='verbose', action='store_true', 437 help=('Prints out more info if True.'), 438 default=False) 439 global _options 440 _options = parser.parse_args() 441 442 if _options.suite_job_id: 443 analyze_suite(_options.suite_job_id) 444 else: 445 end_time = time_utils.to_epoch_time(datetime.now()) 446 start_time = end_time - timedelta(hours=_options.span).total_seconds() 447 analyze_suites(start_time, end_time) 448 449 450if __name__ == '__main__': 451 main() 452