1# 2# Copyright 2008 Google Inc. All Rights Reserved. 3 4""" 5The job module contains the objects and methods used to 6manage jobs in Autotest. 7 8The valid actions are: 9list: lists job(s) 10create: create a job 11abort: abort job(s) 12stat: detailed listing of job(s) 13 14The common options are: 15 16See topic_common.py for a High Level Design and Algorithm. 17""" 18 19import getpass, os, pwd, re, socket, sys 20from autotest_lib.cli import topic_common, action_common 21from autotest_lib.client.common_lib import control_data 22 23 24class job(topic_common.atest): 25 """Job class 26 atest job [create|clone|list|stat|abort] <options>""" 27 usage_action = '[create|clone|list|stat|abort]' 28 topic = msg_topic = 'job' 29 msg_items = '<job_ids>' 30 31 32 def _convert_status(self, results): 33 for result in results: 34 total = sum(result['status_counts'].values()) 35 status = ['%s=%s(%.1f%%)' % (key, val, 100.0*float(val)/total) 36 for key, val in result['status_counts'].iteritems()] 37 status.sort() 38 result['status_counts'] = ', '.join(status) 39 40 41 def backward_compatibility(self, action, argv): 42 """ 'job create --clone' became 'job clone --id' """ 43 if action == 'create': 44 for option in ['-l', '--clone']: 45 if option in argv: 46 argv[argv.index(option)] = '--id' 47 action = 'clone' 48 return action 49 50 51class job_help(job): 52 """Just here to get the atest logic working. 53 Usage is set by its parent""" 54 pass 55 56 57class job_list_stat(action_common.atest_list, job): 58 def __init__(self): 59 super(job_list_stat, self).__init__() 60 61 self.topic_parse_info = topic_common.item_parse_info( 62 attribute_name='jobs', 63 use_leftover=True) 64 65 66 def __split_jobs_between_ids_names(self): 67 job_ids = [] 68 job_names = [] 69 70 # Sort between job IDs and names 71 for job_id in self.jobs: 72 if job_id.isdigit(): 73 job_ids.append(job_id) 74 else: 75 job_names.append(job_id) 76 return (job_ids, job_names) 77 78 79 def execute_on_ids_and_names(self, op, filters={}, 80 check_results={'id__in': 'id', 81 'name__in': 'id'}, 82 tag_id='id__in', tag_name='name__in'): 83 if not self.jobs: 84 # Want everything 85 return super(job_list_stat, self).execute(op=op, filters=filters) 86 87 all_jobs = [] 88 (job_ids, job_names) = self.__split_jobs_between_ids_names() 89 90 for items, tag in [(job_ids, tag_id), 91 (job_names, tag_name)]: 92 if items: 93 new_filters = filters.copy() 94 new_filters[tag] = items 95 jobs = super(job_list_stat, 96 self).execute(op=op, 97 filters=new_filters, 98 check_results=check_results) 99 all_jobs.extend(jobs) 100 101 return all_jobs 102 103 104class job_list(job_list_stat): 105 """atest job list [<jobs>] [--all] [--running] [--user <username>]""" 106 def __init__(self): 107 super(job_list, self).__init__() 108 self.parser.add_option('-a', '--all', help='List jobs for all ' 109 'users.', action='store_true', default=False) 110 self.parser.add_option('-r', '--running', help='List only running ' 111 'jobs', action='store_true') 112 self.parser.add_option('-u', '--user', help='List jobs for given ' 113 'user', type='string') 114 115 116 def parse(self): 117 options, leftover = super(job_list, self).parse() 118 self.all = options.all 119 self.data['running'] = options.running 120 if options.user: 121 if options.all: 122 self.invalid_syntax('Only specify --all or --user, not both.') 123 else: 124 self.data['owner'] = options.user 125 elif not options.all and not self.jobs: 126 self.data['owner'] = getpass.getuser() 127 128 return options, leftover 129 130 131 def execute(self): 132 return self.execute_on_ids_and_names(op='get_jobs_summary', 133 filters=self.data) 134 135 136 def output(self, results): 137 keys = ['id', 'owner', 'name', 'status_counts'] 138 if self.verbose: 139 keys.extend(['priority', 'control_type', 'created_on']) 140 self._convert_status(results) 141 super(job_list, self).output(results, keys) 142 143 144 145class job_stat(job_list_stat): 146 """atest job stat <job>""" 147 usage_action = 'stat' 148 149 def __init__(self): 150 super(job_stat, self).__init__() 151 self.parser.add_option('-f', '--control-file', 152 help='Display the control file', 153 action='store_true', default=False) 154 self.parser.add_option('-N', '--list-hosts', 155 help='Display only a list of hosts', 156 action='store_true') 157 self.parser.add_option('-s', '--list-hosts-status', 158 help='Display only the hosts in these statuses ' 159 'for a job.', action='store') 160 161 162 def parse(self): 163 status_list = topic_common.item_parse_info( 164 attribute_name='status_list', 165 inline_option='list_hosts_status') 166 options, leftover = super(job_stat, self).parse([status_list], 167 req_items='jobs') 168 169 if not self.jobs: 170 self.invalid_syntax('Must specify at least one job.') 171 172 self.show_control_file = options.control_file 173 self.list_hosts = options.list_hosts 174 175 if self.list_hosts and self.status_list: 176 self.invalid_syntax('--list-hosts is implicit when using ' 177 '--list-hosts-status.') 178 if len(self.jobs) > 1 and (self.list_hosts or self.status_list): 179 self.invalid_syntax('--list-hosts and --list-hosts-status should ' 180 'only be used on a single job.') 181 182 return options, leftover 183 184 185 def _merge_results(self, summary, qes): 186 hosts_status = {} 187 for qe in qes: 188 if qe['host']: 189 job_id = qe['job']['id'] 190 hostname = qe['host']['hostname'] 191 hosts_status.setdefault(job_id, 192 {}).setdefault(qe['status'], 193 []).append(hostname) 194 195 for job in summary: 196 job_id = job['id'] 197 if hosts_status.has_key(job_id): 198 this_job = hosts_status[job_id] 199 job['hosts'] = ' '.join(' '.join(host) for host in 200 this_job.itervalues()) 201 host_per_status = ['%s="%s"' %(status, ' '.join(host)) 202 for status, host in this_job.iteritems()] 203 job['hosts_status'] = ', '.join(host_per_status) 204 if self.status_list: 205 statuses = set(s.lower() for s in self.status_list) 206 all_hosts = [s for s in host_per_status if s.split('=', 207 1)[0].lower() in statuses] 208 job['hosts_selected_status'] = '\n'.join(all_hosts) 209 else: 210 job['hosts_status'] = '' 211 212 if not job.get('hosts'): 213 self.generic_error('Job has unassigned meta-hosts, ' 214 'try again shortly.') 215 216 return summary 217 218 219 def execute(self): 220 summary = self.execute_on_ids_and_names(op='get_jobs_summary') 221 222 # Get the real hostnames 223 qes = self.execute_on_ids_and_names(op='get_host_queue_entries', 224 check_results={}, 225 tag_id='job__in', 226 tag_name='job__name__in') 227 228 self._convert_status(summary) 229 230 return self._merge_results(summary, qes) 231 232 233 def output(self, results): 234 if self.list_hosts: 235 keys = ['hosts'] 236 elif self.status_list: 237 keys = ['hosts_selected_status'] 238 elif not self.verbose: 239 keys = ['id', 'name', 'priority', 'status_counts', 'hosts_status'] 240 else: 241 keys = ['id', 'name', 'priority', 'status_counts', 'hosts_status', 242 'owner', 'control_type', 'synch_count', 'created_on', 243 'run_verify', 'reboot_before', 'reboot_after', 244 'parse_failed_repair'] 245 246 if self.show_control_file: 247 keys.append('control_file') 248 249 super(job_stat, self).output(results, keys) 250 251 252class job_create_or_clone(action_common.atest_create, job): 253 """Class containing the code common to the job create and clone actions""" 254 msg_items = 'job_name' 255 256 def __init__(self): 257 super(job_create_or_clone, self).__init__() 258 self.hosts = [] 259 self.data_item_key = 'name' 260 self.parser.add_option('-p', '--priority', help='Job priority (low, ' 261 'medium, high, urgent), default=medium', 262 type='choice', choices=('low', 'medium', 'high', 263 'urgent'), default='medium') 264 self.parser.add_option('-b', '--labels', 265 help='Comma separated list of labels ' 266 'to get machine list from.', default='') 267 self.parser.add_option('-m', '--machine', help='List of machines to ' 268 'run on') 269 self.parser.add_option('-M', '--mlist', 270 help='File listing machines to use', 271 type='string', metavar='MACHINE_FLIST') 272 self.parser.add_option('--one-time-hosts', 273 help='List of one time hosts') 274 self.parser.add_option('-e', '--email', 275 help='A comma seperated list of ' 276 'email addresses to notify of job completion', 277 default='') 278 279 280 def _parse_hosts(self, args): 281 """ Parses the arguments to generate a list of hosts and meta_hosts 282 A host is a regular name, a meta_host is n*label or *label. 283 These can be mixed on the CLI, and separated by either commas or 284 spaces, e.g.: 5*Machine_Label host0 5*Machine_Label2,host2 """ 285 286 hosts = [] 287 meta_hosts = [] 288 289 for arg in args: 290 for host in arg.split(','): 291 if re.match('^[0-9]+[*]', host): 292 num, host = host.split('*', 1) 293 meta_hosts += int(num) * [host] 294 elif re.match('^[*](\w*)', host): 295 meta_hosts += [re.match('^[*](\w*)', host).group(1)] 296 elif host != '' and host not in hosts: 297 # Real hostname and not a duplicate 298 hosts.append(host) 299 300 return (hosts, meta_hosts) 301 302 303 def parse(self, parse_info=[]): 304 host_info = topic_common.item_parse_info(attribute_name='hosts', 305 inline_option='machine', 306 filename_option='mlist') 307 job_info = topic_common.item_parse_info(attribute_name='jobname', 308 use_leftover=True) 309 oth_info = topic_common.item_parse_info(attribute_name='one_time_hosts', 310 inline_option='one_time_hosts') 311 label_info = topic_common.item_parse_info(attribute_name='labels', 312 inline_option='labels') 313 314 options, leftover = super(job_create_or_clone, self).parse( 315 [host_info, job_info, oth_info, label_info] + parse_info, 316 req_items='jobname') 317 self.data = {} 318 jobname = getattr(self, 'jobname') 319 if len(jobname) > 1: 320 self.invalid_syntax('Too many arguments specified, only expected ' 321 'to receive job name: %s' % jobname) 322 self.jobname = jobname[0] 323 324 if options.priority: 325 self.data['priority'] = options.priority.capitalize() 326 327 if self.one_time_hosts: 328 self.data['one_time_hosts'] = self.one_time_hosts 329 330 if self.labels: 331 label_hosts = self.execute_rpc(op='get_hosts', 332 multiple_labels=self.labels) 333 for host in label_hosts: 334 self.hosts.append(host['hostname']) 335 336 self.data['name'] = self.jobname 337 338 (self.data['hosts'], 339 self.data['meta_hosts']) = self._parse_hosts(self.hosts) 340 341 self.data['email_list'] = options.email 342 343 return options, leftover 344 345 346 def create_job(self): 347 job_id = self.execute_rpc(op='create_job', **self.data) 348 return ['%s (id %s)' % (self.jobname, job_id)] 349 350 351 def get_items(self): 352 return [self.jobname] 353 354 355 356class job_create(job_create_or_clone): 357 """atest job create [--priority <Low|Medium|High|Urgent>] 358 [--synch_count] [--control-file </path/to/cfile>] 359 [--on-server] [--test <test1,test2>] [--kernel <http://kernel>] 360 [--mlist </path/to/machinelist>] [--machine <host1 host2 host3>] 361 [--labels <list of labels of machines to run on>] 362 [--reboot_before <option>] [--reboot_after <option>] 363 [--noverify] [--timeout <timeout>] [--max_runtime <max runtime>] 364 [--one-time-hosts <hosts>] [--email <email>] 365 [--dependencies <labels this job is dependent on>] 366 [--atomic_group <atomic group name>] [--parse-failed-repair <option>] 367 [--image <http://path/to/image>] [--require-ssp] 368 job_name 369 370 Creating a job is rather different from the other create operations, 371 so it only uses the __init__() and output() from its superclass. 372 """ 373 op_action = 'create' 374 375 def __init__(self): 376 super(job_create, self).__init__() 377 self.ctrl_file_data = {} 378 self.parser.add_option('-y', '--synch_count', type=int, 379 help='Number of machines to use per autoserv ' 380 'execution') 381 self.parser.add_option('-f', '--control-file', 382 help='use this control file', metavar='FILE') 383 self.parser.add_option('-s', '--server', 384 help='This is server-side job', 385 action='store_true', default=False) 386 self.parser.add_option('-t', '--test', 387 help='List of tests to run') 388 389 self.parser.add_option('-k', '--kernel', help='A comma separated list' 390 ' of kernel versions/URLs/filenames to run the' 391 ' job on') 392 self.parser.add_option('--kernel-cmdline', help='A string that will be' 393 ' given as cmdline to the booted kernel(s)' 394 ' specified by the -k option') 395 396 self.parser.add_option('-d', '--dependencies', help='Comma separated ' 397 'list of labels this job is dependent on.', 398 default='') 399 self.parser.add_option('-G', '--atomic_group', help='Name of an Atomic ' 400 'Group to schedule this job on.', 401 default='') 402 403 self.parser.add_option('-B', '--reboot_before', 404 help='Whether or not to reboot the machine ' 405 'before the job (never/if dirty/always)', 406 type='choice', 407 choices=('never', 'if dirty', 'always')) 408 self.parser.add_option('-a', '--reboot_after', 409 help='Whether or not to reboot the machine ' 410 'after the job (never/if all tests passed/' 411 'always)', 412 type='choice', 413 choices=('never', 'if all tests passed', 414 'always')) 415 416 self.parser.add_option('--parse-failed-repair', 417 help='Whether or not to parse failed repair ' 418 'results as part of the job', 419 type='choice', 420 choices=('true', 'false')) 421 self.parser.add_option('-n', '--noverify', 422 help='Do not run verify for job', 423 default=False, action='store_true') 424 self.parser.add_option('-o', '--timeout_mins', 425 help='Job timeout in minutes.', 426 metavar='TIMEOUT') 427 self.parser.add_option('--max_runtime', 428 help='Job maximum runtime in minutes') 429 430 self.parser.add_option('-i', '--image', 431 help='OS image to install before running the ' 432 'test.') 433 self.parser.add_option('--require-ssp', 434 help='Require server-side packaging', 435 default=False, action='store_true') 436 437 438 @staticmethod 439 def _get_kernel_data(kernel_list, cmdline): 440 # the RPC supports cmdline per kernel version in a dictionary 441 kernels = [] 442 for version in re.split(r'[, ]+', kernel_list): 443 if not version: 444 continue 445 kernel_info = {'version': version} 446 if cmdline: 447 kernel_info['cmdline'] = cmdline 448 kernels.append(kernel_info) 449 450 return kernels 451 452 453 def parse(self): 454 deps_info = topic_common.item_parse_info(attribute_name='dependencies', 455 inline_option='dependencies') 456 options, leftover = super(job_create, self).parse( 457 parse_info=[deps_info]) 458 459 if (len(self.hosts) == 0 and not self.one_time_hosts 460 and not options.labels and not options.atomic_group): 461 self.invalid_syntax('Must specify at least one machine ' 462 'or an atomic group ' 463 '(-m, -M, -b, -G or --one-time-hosts).') 464 if not options.control_file and not options.test: 465 self.invalid_syntax('Must specify either --test or --control-file' 466 ' to create a job.') 467 if options.control_file and options.test: 468 self.invalid_syntax('Can only specify one of --control-file or ' 469 '--test, not both.') 470 if options.kernel: 471 self.ctrl_file_data['kernel'] = self._get_kernel_data( 472 options.kernel, options.kernel_cmdline) 473 if options.control_file: 474 try: 475 control_file_f = open(options.control_file) 476 try: 477 control_file_data = control_file_f.read() 478 finally: 479 control_file_f.close() 480 except IOError: 481 self.generic_error('Unable to read from specified ' 482 'control-file: %s' % options.control_file) 483 if options.kernel: 484 # execute() will pass this to the AFE server to wrap this 485 # control file up to include the kernel installation steps. 486 self.ctrl_file_data['client_control_file'] = control_file_data 487 else: 488 self.data['control_file'] = control_file_data 489 if options.test: 490 if options.server: 491 self.invalid_syntax('If you specify tests, then the ' 492 'client/server setting is implicit and ' 493 'cannot be overriden.') 494 tests = [t.strip() for t in options.test.split(',') if t.strip()] 495 self.ctrl_file_data['tests'] = tests 496 497 if options.image: 498 self.data['image'] = options.image 499 500 if options.reboot_before: 501 self.data['reboot_before'] = options.reboot_before.capitalize() 502 if options.reboot_after: 503 self.data['reboot_after'] = options.reboot_after.capitalize() 504 if options.parse_failed_repair: 505 self.data['parse_failed_repair'] = ( 506 options.parse_failed_repair == 'true') 507 if options.noverify: 508 self.data['run_verify'] = False 509 if options.timeout_mins: 510 self.data['timeout_mins'] = options.timeout_mins 511 if options.max_runtime: 512 self.data['max_runtime_mins'] = options.max_runtime 513 514 if options.atomic_group: 515 self.data['atomic_group_name'] = options.atomic_group 516 517 self.data['dependencies'] = self.dependencies 518 519 if options.synch_count: 520 self.data['synch_count'] = options.synch_count 521 if options.server: 522 self.data['control_type'] = control_data.CONTROL_TYPE_NAMES.SERVER 523 else: 524 self.data['control_type'] = control_data.CONTROL_TYPE_NAMES.CLIENT 525 526 self.data['require_ssp'] = options.require_ssp 527 528 return options, leftover 529 530 531 def execute(self): 532 if self.ctrl_file_data: 533 uploading_kernel = 'kernel' in self.ctrl_file_data 534 if uploading_kernel: 535 default_timeout = socket.getdefaulttimeout() 536 socket.setdefaulttimeout(topic_common.UPLOAD_SOCKET_TIMEOUT) 537 print 'Uploading Kernel: this may take a while...', 538 sys.stdout.flush() 539 try: 540 cf_info = self.execute_rpc(op='generate_control_file', 541 item=self.jobname, 542 **self.ctrl_file_data) 543 finally: 544 if uploading_kernel: 545 socket.setdefaulttimeout(default_timeout) 546 547 if uploading_kernel: 548 print 'Done' 549 self.data['control_file'] = cf_info['control_file'] 550 if 'synch_count' not in self.data: 551 self.data['synch_count'] = cf_info['synch_count'] 552 if cf_info['is_server']: 553 self.data['control_type'] = control_data.CONTROL_TYPE_NAMES.SERVER 554 else: 555 self.data['control_type'] = control_data.CONTROL_TYPE_NAMES.CLIENT 556 557 # Get the union of the 2 sets of dependencies 558 deps = set(self.data['dependencies']) 559 deps = sorted(deps.union(cf_info['dependencies'])) 560 self.data['dependencies'] = list(deps) 561 562 if 'synch_count' not in self.data: 563 self.data['synch_count'] = 1 564 565 return self.create_job() 566 567 568class job_clone(job_create_or_clone): 569 """atest job clone [--priority <Low|Medium|High|Urgent>] 570 [--mlist </path/to/machinelist>] [--machine <host1 host2 host3>] 571 [--labels <list of labels of machines to run on>] 572 [--one-time-hosts <hosts>] [--email <email>] 573 job_name 574 575 Cloning a job is rather different from the other create operations, 576 so it only uses the __init__() and output() from its superclass. 577 """ 578 op_action = 'clone' 579 usage_action = 'clone' 580 581 def __init__(self): 582 super(job_clone, self).__init__() 583 self.parser.add_option('-i', '--id', help='Job id to clone', 584 default=False, 585 metavar='JOB_ID') 586 self.parser.add_option('-r', '--reuse-hosts', 587 help='Use the exact same hosts as the ' 588 'cloned job.', 589 action='store_true', default=False) 590 591 592 def parse(self): 593 options, leftover = super(job_clone, self).parse() 594 595 self.clone_id = options.id 596 self.reuse_hosts = options.reuse_hosts 597 598 host_specified = self.hosts or self.one_time_hosts or options.labels 599 if self.reuse_hosts and host_specified: 600 self.invalid_syntax('Cannot specify hosts and reuse the same ' 601 'ones as the cloned job.') 602 603 if not (self.reuse_hosts or host_specified): 604 self.invalid_syntax('Must reuse or specify at least one ' 605 'machine (-r, -m, -M, -b or ' 606 '--one-time-hosts).') 607 608 return options, leftover 609 610 611 def execute(self): 612 clone_info = self.execute_rpc(op='get_info_for_clone', 613 id=self.clone_id, 614 preserve_metahosts=self.reuse_hosts) 615 616 # Remove fields from clone data that cannot be reused 617 for field in ('name', 'created_on', 'id', 'owner'): 618 del clone_info['job'][field] 619 620 # Also remove parameterized_job field, as the feature still is 621 # incomplete, this tool does not attempt to support it for now, 622 # it uses a different API function and it breaks create_job() 623 if clone_info['job'].has_key('parameterized_job'): 624 del clone_info['job']['parameterized_job'] 625 626 # Keyword args cannot be unicode strings 627 self.data.update((str(key), val) 628 for key, val in clone_info['job'].iteritems()) 629 630 if self.reuse_hosts: 631 # Convert host list from clone info that can be used for job_create 632 for label, qty in clone_info['meta_host_counts'].iteritems(): 633 self.data['meta_hosts'].extend([label]*qty) 634 635 self.data['hosts'].extend(host['hostname'] 636 for host in clone_info['hosts']) 637 638 return self.create_job() 639 640 641class job_abort(job, action_common.atest_delete): 642 """atest job abort <job(s)>""" 643 usage_action = op_action = 'abort' 644 msg_done = 'Aborted' 645 646 def parse(self): 647 job_info = topic_common.item_parse_info(attribute_name='jobids', 648 use_leftover=True) 649 options, leftover = super(job_abort, self).parse([job_info], 650 req_items='jobids') 651 652 653 def execute(self): 654 data = {'job__id__in': self.jobids} 655 self.execute_rpc(op='abort_host_queue_entries', **data) 656 print 'Aborting jobs: %s' % ', '.join(self.jobids) 657 658 659 def get_items(self): 660 return self.jobids 661