1#
2# Copyright 2008 Google Inc. All Rights Reserved.
3
4"""
5The job module contains the objects and methods used to
6manage jobs in Autotest.
7
8The valid actions are:
9list:    lists job(s)
10create:  create a job
11abort:   abort job(s)
12stat:    detailed listing of job(s)
13
14The common options are:
15
16See topic_common.py for a High Level Design and Algorithm.
17"""
18
19import getpass, os, pwd, re, socket, sys
20from autotest_lib.cli import topic_common, action_common
21from autotest_lib.client.common_lib import control_data
22
23
24class job(topic_common.atest):
25    """Job class
26    atest job [create|clone|list|stat|abort] <options>"""
27    usage_action = '[create|clone|list|stat|abort]'
28    topic = msg_topic = 'job'
29    msg_items = '<job_ids>'
30
31
32    def _convert_status(self, results):
33        for result in results:
34            total = sum(result['status_counts'].values())
35            status = ['%s=%s(%.1f%%)' % (key, val, 100.0*float(val)/total)
36                      for key, val in result['status_counts'].iteritems()]
37            status.sort()
38            result['status_counts'] = ', '.join(status)
39
40
41    def backward_compatibility(self, action, argv):
42        """ 'job create --clone' became 'job clone --id' """
43        if action == 'create':
44            for option in ['-l', '--clone']:
45                if option in argv:
46                    argv[argv.index(option)] = '--id'
47                    action = 'clone'
48        return action
49
50
51class job_help(job):
52    """Just here to get the atest logic working.
53    Usage is set by its parent"""
54    pass
55
56
57class job_list_stat(action_common.atest_list, job):
58    def __init__(self):
59        super(job_list_stat, self).__init__()
60
61        self.topic_parse_info = topic_common.item_parse_info(
62            attribute_name='jobs',
63            use_leftover=True)
64
65
66    def __split_jobs_between_ids_names(self):
67        job_ids = []
68        job_names = []
69
70        # Sort between job IDs and names
71        for job_id in self.jobs:
72            if job_id.isdigit():
73                job_ids.append(job_id)
74            else:
75                job_names.append(job_id)
76        return (job_ids, job_names)
77
78
79    def execute_on_ids_and_names(self, op, filters={},
80                                 check_results={'id__in': 'id',
81                                                'name__in': 'id'},
82                                 tag_id='id__in', tag_name='name__in'):
83        if not self.jobs:
84            # Want everything
85            return super(job_list_stat, self).execute(op=op, filters=filters)
86
87        all_jobs = []
88        (job_ids, job_names) = self.__split_jobs_between_ids_names()
89
90        for items, tag in [(job_ids, tag_id),
91                          (job_names, tag_name)]:
92            if items:
93                new_filters = filters.copy()
94                new_filters[tag] = items
95                jobs = super(job_list_stat,
96                             self).execute(op=op,
97                                           filters=new_filters,
98                                           check_results=check_results)
99                all_jobs.extend(jobs)
100
101        return all_jobs
102
103
104class job_list(job_list_stat):
105    """atest job list [<jobs>] [--all] [--running] [--user <username>]"""
106    def __init__(self):
107        super(job_list, self).__init__()
108        self.parser.add_option('-a', '--all', help='List jobs for all '
109                               'users.', action='store_true', default=False)
110        self.parser.add_option('-r', '--running', help='List only running '
111                               'jobs', action='store_true')
112        self.parser.add_option('-u', '--user', help='List jobs for given '
113                               'user', type='string')
114
115
116    def parse(self):
117        options, leftover = super(job_list, self).parse()
118        self.all = options.all
119        self.data['running'] = options.running
120        if options.user:
121            if options.all:
122                self.invalid_syntax('Only specify --all or --user, not both.')
123            else:
124                self.data['owner'] = options.user
125        elif not options.all and not self.jobs:
126            self.data['owner'] = getpass.getuser()
127
128        return options, leftover
129
130
131    def execute(self):
132        return self.execute_on_ids_and_names(op='get_jobs_summary',
133                                             filters=self.data)
134
135
136    def output(self, results):
137        keys = ['id', 'owner', 'name', 'status_counts']
138        if self.verbose:
139            keys.extend(['priority', 'control_type', 'created_on'])
140        self._convert_status(results)
141        super(job_list, self).output(results, keys)
142
143
144
145class job_stat(job_list_stat):
146    """atest job stat <job>"""
147    usage_action = 'stat'
148
149    def __init__(self):
150        super(job_stat, self).__init__()
151        self.parser.add_option('-f', '--control-file',
152                               help='Display the control file',
153                               action='store_true', default=False)
154        self.parser.add_option('-N', '--list-hosts',
155                               help='Display only a list of hosts',
156                               action='store_true')
157        self.parser.add_option('-s', '--list-hosts-status',
158                               help='Display only the hosts in these statuses '
159                               'for a job.', action='store')
160
161
162    def parse(self):
163        status_list = topic_common.item_parse_info(
164                attribute_name='status_list',
165                inline_option='list_hosts_status')
166        options, leftover = super(job_stat, self).parse([status_list],
167                                                        req_items='jobs')
168
169        if not self.jobs:
170            self.invalid_syntax('Must specify at least one job.')
171
172        self.show_control_file = options.control_file
173        self.list_hosts = options.list_hosts
174
175        if self.list_hosts and self.status_list:
176            self.invalid_syntax('--list-hosts is implicit when using '
177                                '--list-hosts-status.')
178        if len(self.jobs) > 1 and (self.list_hosts or self.status_list):
179            self.invalid_syntax('--list-hosts and --list-hosts-status should '
180                                'only be used on a single job.')
181
182        return options, leftover
183
184
185    def _merge_results(self, summary, qes):
186        hosts_status = {}
187        for qe in qes:
188            if qe['host']:
189                job_id = qe['job']['id']
190                hostname = qe['host']['hostname']
191                hosts_status.setdefault(job_id,
192                                        {}).setdefault(qe['status'],
193                                                       []).append(hostname)
194
195        for job in summary:
196            job_id = job['id']
197            if hosts_status.has_key(job_id):
198                this_job = hosts_status[job_id]
199                job['hosts'] = ' '.join(' '.join(host) for host in
200                                        this_job.itervalues())
201                host_per_status = ['%s="%s"' %(status, ' '.join(host))
202                                   for status, host in this_job.iteritems()]
203                job['hosts_status'] = ', '.join(host_per_status)
204                if self.status_list:
205                    statuses = set(s.lower() for s in self.status_list)
206                    all_hosts = [s for s in host_per_status if s.split('=',
207                                 1)[0].lower() in statuses]
208                    job['hosts_selected_status'] = '\n'.join(all_hosts)
209            else:
210                job['hosts_status'] = ''
211
212            if not job.get('hosts'):
213                self.generic_error('Job has unassigned meta-hosts, '
214                                   'try again shortly.')
215
216        return summary
217
218
219    def execute(self):
220        summary = self.execute_on_ids_and_names(op='get_jobs_summary')
221
222        # Get the real hostnames
223        qes = self.execute_on_ids_and_names(op='get_host_queue_entries',
224                                            check_results={},
225                                            tag_id='job__in',
226                                            tag_name='job__name__in')
227
228        self._convert_status(summary)
229
230        return self._merge_results(summary, qes)
231
232
233    def output(self, results):
234        if self.list_hosts:
235            keys = ['hosts']
236        elif self.status_list:
237            keys = ['hosts_selected_status']
238        elif not self.verbose:
239            keys = ['id', 'name', 'priority', 'status_counts', 'hosts_status']
240        else:
241            keys = ['id', 'name', 'priority', 'status_counts', 'hosts_status',
242                    'owner', 'control_type',  'synch_count', 'created_on',
243                    'run_verify', 'reboot_before', 'reboot_after',
244                    'parse_failed_repair']
245
246        if self.show_control_file:
247            keys.append('control_file')
248
249        super(job_stat, self).output(results, keys)
250
251
252class job_create_or_clone(action_common.atest_create, job):
253    """Class containing the code common to the job create and clone actions"""
254    msg_items = 'job_name'
255
256    def __init__(self):
257        super(job_create_or_clone, self).__init__()
258        self.hosts = []
259        self.data_item_key = 'name'
260        self.parser.add_option('-p', '--priority', help='Job priority (low, '
261                               'medium, high, urgent), default=medium',
262                               type='choice', choices=('low', 'medium', 'high',
263                               'urgent'), default='medium')
264        self.parser.add_option('-b', '--labels',
265                               help='Comma separated list of labels '
266                               'to get machine list from.', default='')
267        self.parser.add_option('-m', '--machine', help='List of machines to '
268                               'run on')
269        self.parser.add_option('-M', '--mlist',
270                               help='File listing machines to use',
271                               type='string', metavar='MACHINE_FLIST')
272        self.parser.add_option('--one-time-hosts',
273                               help='List of one time hosts')
274        self.parser.add_option('-e', '--email',
275                               help='A comma seperated list of '
276                               'email addresses to notify of job completion',
277                               default='')
278
279
280    def _parse_hosts(self, args):
281        """ Parses the arguments to generate a list of hosts and meta_hosts
282        A host is a regular name, a meta_host is n*label or *label.
283        These can be mixed on the CLI, and separated by either commas or
284        spaces, e.g.: 5*Machine_Label host0 5*Machine_Label2,host2 """
285
286        hosts = []
287        meta_hosts = []
288
289        for arg in args:
290            for host in arg.split(','):
291                if re.match('^[0-9]+[*]', host):
292                    num, host = host.split('*', 1)
293                    meta_hosts += int(num) * [host]
294                elif re.match('^[*](\w*)', host):
295                    meta_hosts += [re.match('^[*](\w*)', host).group(1)]
296                elif host != '' and host not in hosts:
297                    # Real hostname and not a duplicate
298                    hosts.append(host)
299
300        return (hosts, meta_hosts)
301
302
303    def parse(self, parse_info=[]):
304        host_info = topic_common.item_parse_info(attribute_name='hosts',
305                                                 inline_option='machine',
306                                                 filename_option='mlist')
307        job_info = topic_common.item_parse_info(attribute_name='jobname',
308                                                use_leftover=True)
309        oth_info = topic_common.item_parse_info(attribute_name='one_time_hosts',
310                                                inline_option='one_time_hosts')
311        label_info = topic_common.item_parse_info(attribute_name='labels',
312                                                  inline_option='labels')
313
314        options, leftover = super(job_create_or_clone, self).parse(
315                [host_info, job_info, oth_info, label_info] + parse_info,
316                req_items='jobname')
317        self.data = {}
318        jobname = getattr(self, 'jobname')
319        if len(jobname) > 1:
320            self.invalid_syntax('Too many arguments specified, only expected '
321                                'to receive job name: %s' % jobname)
322        self.jobname = jobname[0]
323
324        if options.priority:
325            self.data['priority'] = options.priority.capitalize()
326
327        if self.one_time_hosts:
328            self.data['one_time_hosts'] = self.one_time_hosts
329
330        if self.labels:
331            label_hosts = self.execute_rpc(op='get_hosts',
332                                           multiple_labels=self.labels)
333            for host in label_hosts:
334                self.hosts.append(host['hostname'])
335
336        self.data['name'] = self.jobname
337
338        (self.data['hosts'],
339         self.data['meta_hosts']) = self._parse_hosts(self.hosts)
340
341        self.data['email_list'] = options.email
342
343        return options, leftover
344
345
346    def create_job(self):
347        job_id = self.execute_rpc(op='create_job', **self.data)
348        return ['%s (id %s)' % (self.jobname, job_id)]
349
350
351    def get_items(self):
352        return [self.jobname]
353
354
355
356class job_create(job_create_or_clone):
357    """atest job create [--priority <Low|Medium|High|Urgent>]
358    [--synch_count] [--control-file </path/to/cfile>]
359    [--on-server] [--test <test1,test2>] [--kernel <http://kernel>]
360    [--mlist </path/to/machinelist>] [--machine <host1 host2 host3>]
361    [--labels <list of labels of machines to run on>]
362    [--reboot_before <option>] [--reboot_after <option>]
363    [--noverify] [--timeout <timeout>] [--max_runtime <max runtime>]
364    [--one-time-hosts <hosts>] [--email <email>]
365    [--dependencies <labels this job is dependent on>]
366    [--atomic_group <atomic group name>] [--parse-failed-repair <option>]
367    [--image <http://path/to/image>] [--require-ssp]
368    job_name
369
370    Creating a job is rather different from the other create operations,
371    so it only uses the __init__() and output() from its superclass.
372    """
373    op_action = 'create'
374
375    def __init__(self):
376        super(job_create, self).__init__()
377        self.ctrl_file_data = {}
378        self.parser.add_option('-y', '--synch_count', type=int,
379                               help='Number of machines to use per autoserv '
380                                    'execution')
381        self.parser.add_option('-f', '--control-file',
382                               help='use this control file', metavar='FILE')
383        self.parser.add_option('-s', '--server',
384                               help='This is server-side job',
385                               action='store_true', default=False)
386        self.parser.add_option('-t', '--test',
387                               help='List of tests to run')
388
389        self.parser.add_option('-k', '--kernel', help='A comma separated list'
390                               ' of kernel versions/URLs/filenames to run the'
391                               ' job on')
392        self.parser.add_option('--kernel-cmdline', help='A string that will be'
393                               ' given as cmdline to the booted kernel(s)'
394                               ' specified by the -k option')
395
396        self.parser.add_option('-d', '--dependencies', help='Comma separated '
397                               'list of labels this job is dependent on.',
398                               default='')
399        self.parser.add_option('-G', '--atomic_group', help='Name of an Atomic '
400                               'Group to schedule this job on.',
401                               default='')
402
403        self.parser.add_option('-B', '--reboot_before',
404                               help='Whether or not to reboot the machine '
405                                    'before the job (never/if dirty/always)',
406                               type='choice',
407                               choices=('never', 'if dirty', 'always'))
408        self.parser.add_option('-a', '--reboot_after',
409                               help='Whether or not to reboot the machine '
410                                    'after the job (never/if all tests passed/'
411                                    'always)',
412                               type='choice',
413                               choices=('never', 'if all tests passed',
414                                        'always'))
415
416        self.parser.add_option('--parse-failed-repair',
417                               help='Whether or not to parse failed repair '
418                                    'results as part of the job',
419                               type='choice',
420                               choices=('true', 'false'))
421        self.parser.add_option('-n', '--noverify',
422                               help='Do not run verify for job',
423                               default=False, action='store_true')
424        self.parser.add_option('-o', '--timeout_mins',
425                               help='Job timeout in minutes.',
426                               metavar='TIMEOUT')
427        self.parser.add_option('--max_runtime',
428                               help='Job maximum runtime in minutes')
429
430        self.parser.add_option('-i', '--image',
431                               help='OS image to install before running the '
432                                    'test.')
433        self.parser.add_option('--require-ssp',
434                               help='Require server-side packaging',
435                               default=False, action='store_true')
436
437
438    @staticmethod
439    def _get_kernel_data(kernel_list, cmdline):
440        # the RPC supports cmdline per kernel version in a dictionary
441        kernels = []
442        for version in re.split(r'[, ]+', kernel_list):
443            if not version:
444                continue
445            kernel_info = {'version': version}
446            if cmdline:
447                kernel_info['cmdline'] = cmdline
448            kernels.append(kernel_info)
449
450        return kernels
451
452
453    def parse(self):
454        deps_info = topic_common.item_parse_info(attribute_name='dependencies',
455                                                 inline_option='dependencies')
456        options, leftover = super(job_create, self).parse(
457                parse_info=[deps_info])
458
459        if (len(self.hosts) == 0 and not self.one_time_hosts
460            and not options.labels and not options.atomic_group):
461            self.invalid_syntax('Must specify at least one machine '
462                                'or an atomic group '
463                                '(-m, -M, -b, -G or --one-time-hosts).')
464        if not options.control_file and not options.test:
465            self.invalid_syntax('Must specify either --test or --control-file'
466                                ' to create a job.')
467        if options.control_file and options.test:
468            self.invalid_syntax('Can only specify one of --control-file or '
469                                '--test, not both.')
470        if options.kernel:
471            self.ctrl_file_data['kernel'] = self._get_kernel_data(
472                    options.kernel, options.kernel_cmdline)
473        if options.control_file:
474            try:
475                control_file_f = open(options.control_file)
476                try:
477                    control_file_data = control_file_f.read()
478                finally:
479                    control_file_f.close()
480            except IOError:
481                self.generic_error('Unable to read from specified '
482                                   'control-file: %s' % options.control_file)
483            if options.kernel:
484                # execute() will pass this to the AFE server to wrap this
485                # control file up to include the kernel installation steps.
486                self.ctrl_file_data['client_control_file'] = control_file_data
487            else:
488                self.data['control_file'] = control_file_data
489        if options.test:
490            if options.server:
491                self.invalid_syntax('If you specify tests, then the '
492                                    'client/server setting is implicit and '
493                                    'cannot be overriden.')
494            tests = [t.strip() for t in options.test.split(',') if t.strip()]
495            self.ctrl_file_data['tests'] = tests
496
497        if options.image:
498            self.data['image'] = options.image
499
500        if options.reboot_before:
501            self.data['reboot_before'] = options.reboot_before.capitalize()
502        if options.reboot_after:
503            self.data['reboot_after'] = options.reboot_after.capitalize()
504        if options.parse_failed_repair:
505            self.data['parse_failed_repair'] = (
506                options.parse_failed_repair == 'true')
507        if options.noverify:
508            self.data['run_verify'] = False
509        if options.timeout_mins:
510            self.data['timeout_mins'] = options.timeout_mins
511        if options.max_runtime:
512            self.data['max_runtime_mins'] = options.max_runtime
513
514        if options.atomic_group:
515            self.data['atomic_group_name'] = options.atomic_group
516
517        self.data['dependencies'] = self.dependencies
518
519        if options.synch_count:
520            self.data['synch_count'] = options.synch_count
521        if options.server:
522            self.data['control_type'] = control_data.CONTROL_TYPE_NAMES.SERVER
523        else:
524            self.data['control_type'] = control_data.CONTROL_TYPE_NAMES.CLIENT
525
526        self.data['require_ssp'] = options.require_ssp
527
528        return options, leftover
529
530
531    def execute(self):
532        if self.ctrl_file_data:
533            uploading_kernel = 'kernel' in self.ctrl_file_data
534            if uploading_kernel:
535                default_timeout = socket.getdefaulttimeout()
536                socket.setdefaulttimeout(topic_common.UPLOAD_SOCKET_TIMEOUT)
537                print 'Uploading Kernel: this may take a while...',
538                sys.stdout.flush()
539            try:
540                cf_info = self.execute_rpc(op='generate_control_file',
541                                           item=self.jobname,
542                                           **self.ctrl_file_data)
543            finally:
544                if uploading_kernel:
545                    socket.setdefaulttimeout(default_timeout)
546
547            if uploading_kernel:
548                print 'Done'
549            self.data['control_file'] = cf_info['control_file']
550            if 'synch_count' not in self.data:
551                self.data['synch_count'] = cf_info['synch_count']
552            if cf_info['is_server']:
553                self.data['control_type'] = control_data.CONTROL_TYPE_NAMES.SERVER
554            else:
555                self.data['control_type'] = control_data.CONTROL_TYPE_NAMES.CLIENT
556
557            # Get the union of the 2 sets of dependencies
558            deps = set(self.data['dependencies'])
559            deps = sorted(deps.union(cf_info['dependencies']))
560            self.data['dependencies'] = list(deps)
561
562        if 'synch_count' not in self.data:
563            self.data['synch_count'] = 1
564
565        return self.create_job()
566
567
568class job_clone(job_create_or_clone):
569    """atest job clone [--priority <Low|Medium|High|Urgent>]
570    [--mlist </path/to/machinelist>] [--machine <host1 host2 host3>]
571    [--labels <list of labels of machines to run on>]
572    [--one-time-hosts <hosts>] [--email <email>]
573    job_name
574
575    Cloning a job is rather different from the other create operations,
576    so it only uses the __init__() and output() from its superclass.
577    """
578    op_action = 'clone'
579    usage_action = 'clone'
580
581    def __init__(self):
582        super(job_clone, self).__init__()
583        self.parser.add_option('-i', '--id', help='Job id to clone',
584                               default=False,
585                               metavar='JOB_ID')
586        self.parser.add_option('-r', '--reuse-hosts',
587                               help='Use the exact same hosts as the '
588                               'cloned job.',
589                               action='store_true', default=False)
590
591
592    def parse(self):
593        options, leftover = super(job_clone, self).parse()
594
595        self.clone_id = options.id
596        self.reuse_hosts = options.reuse_hosts
597
598        host_specified = self.hosts or self.one_time_hosts or options.labels
599        if self.reuse_hosts and host_specified:
600            self.invalid_syntax('Cannot specify hosts and reuse the same '
601                                'ones as the cloned job.')
602
603        if not (self.reuse_hosts or host_specified):
604            self.invalid_syntax('Must reuse or specify at least one '
605                                'machine (-r, -m, -M, -b or '
606                                '--one-time-hosts).')
607
608        return options, leftover
609
610
611    def execute(self):
612        clone_info = self.execute_rpc(op='get_info_for_clone',
613                                      id=self.clone_id,
614                                      preserve_metahosts=self.reuse_hosts)
615
616        # Remove fields from clone data that cannot be reused
617        for field in ('name', 'created_on', 'id', 'owner'):
618            del clone_info['job'][field]
619
620        # Also remove parameterized_job field, as the feature still is
621        # incomplete, this tool does not attempt to support it for now,
622        # it uses a different API function and it breaks create_job()
623        if clone_info['job'].has_key('parameterized_job'):
624            del clone_info['job']['parameterized_job']
625
626        # Keyword args cannot be unicode strings
627        self.data.update((str(key), val)
628                         for key, val in clone_info['job'].iteritems())
629
630        if self.reuse_hosts:
631            # Convert host list from clone info that can be used for job_create
632            for label, qty in clone_info['meta_host_counts'].iteritems():
633                self.data['meta_hosts'].extend([label]*qty)
634
635            self.data['hosts'].extend(host['hostname']
636                                      for host in clone_info['hosts'])
637
638        return self.create_job()
639
640
641class job_abort(job, action_common.atest_delete):
642    """atest job abort <job(s)>"""
643    usage_action = op_action = 'abort'
644    msg_done = 'Aborted'
645
646    def parse(self):
647        job_info = topic_common.item_parse_info(attribute_name='jobids',
648                                                use_leftover=True)
649        options, leftover = super(job_abort, self).parse([job_info],
650                                                         req_items='jobids')
651
652
653    def execute(self):
654        data = {'job__id__in': self.jobids}
655        self.execute_rpc(op='abort_host_queue_entries', **data)
656        print 'Aborting jobs: %s' % ', '.join(self.jobids)
657
658
659    def get_items(self):
660        return self.jobids
661