• Home
  • History
  • Annotate
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""The main job wrapper
2
3This is the core infrastructure.
4
5Copyright Andy Whitcroft, Martin J. Bligh 2006
6"""
7
8# pylint: disable=missing-docstring
9
10import copy
11from datetime import datetime
12import getpass
13import glob
14import logging
15import os
16import re
17import shutil
18import sys
19import time
20import traceback
21import types
22import weakref
23
24import common
25from autotest_lib.client.bin import client_logging_config
26from autotest_lib.client.bin import harness
27from autotest_lib.client.bin import local_host
28from autotest_lib.client.bin import parallel
29from autotest_lib.client.bin import partition as partition_lib
30from autotest_lib.client.bin import profilers
31from autotest_lib.client.bin import sysinfo
32from autotest_lib.client.bin import test
33from autotest_lib.client.bin import utils
34from autotest_lib.client.common_lib import barrier
35from autotest_lib.client.common_lib import base_job
36from autotest_lib.client.common_lib import packages
37from autotest_lib.client.common_lib import error
38from autotest_lib.client.common_lib import global_config
39from autotest_lib.client.common_lib import logging_manager
40from autotest_lib.client.common_lib import packages
41from autotest_lib.client.cros import cros_logging
42from autotest_lib.client.tools import html_report
43
44GLOBAL_CONFIG = global_config.global_config
45
46LAST_BOOT_TAG = object()
47JOB_PREAMBLE = """
48from autotest_lib.client.common_lib.error import *
49from autotest_lib.client.bin.utils import *
50"""
51
52
53class StepError(error.AutotestError):
54    pass
55
56class NotAvailableError(error.AutotestError):
57    pass
58
59
60
61def _run_test_complete_on_exit(f):
62    """Decorator for job methods that automatically calls
63    self.harness.run_test_complete when the method exits, if appropriate."""
64    def wrapped(self, *args, **dargs):
65        try:
66            return f(self, *args, **dargs)
67        finally:
68            if self._logger.global_filename == 'status':
69                self.harness.run_test_complete()
70                if self.drop_caches:
71                    utils.drop_caches()
72    wrapped.__name__ = f.__name__
73    wrapped.__doc__ = f.__doc__
74    wrapped.__dict__.update(f.__dict__)
75    return wrapped
76
77
78class status_indenter(base_job.status_indenter):
79    """Provide a status indenter that is backed by job._record_prefix."""
80    def __init__(self, job_):
81        self._job = weakref.proxy(job_)  # avoid a circular reference
82
83
84    @property
85    def indent(self):
86        return self._job._record_indent
87
88
89    def increment(self):
90        self._job._record_indent += 1
91
92
93    def decrement(self):
94        self._job._record_indent -= 1
95
96
97class base_client_job(base_job.base_job):
98    """The client-side concrete implementation of base_job.
99
100    Optional properties provided by this implementation:
101        control
102        harness
103    """
104
105    _WARNING_DISABLE_DELAY = 5
106
107    # _record_indent is a persistent property, but only on the client
108    _job_state = base_job.base_job._job_state
109    _record_indent = _job_state.property_factory(
110        '_state', '_record_indent', 0, namespace='client')
111    _max_disk_usage_rate = _job_state.property_factory(
112        '_state', '_max_disk_usage_rate', 0.0, namespace='client')
113
114
115    def __init__(self, control, options, drop_caches=True):
116        """
117        Prepare a client side job object.
118
119        @param control: The control file (pathname of).
120        @param options: an object which includes:
121                jobtag: The job tag string (eg "default").
122                cont: If this is the continuation of this job.
123                harness_type: An alternative server harness.  [None]
124                use_external_logging: If true, the enable_external_logging
125                          method will be called during construction.  [False]
126        @param drop_caches: If true, utils.drop_caches() is called before and
127                between all tests.  [True]
128        """
129        super(base_client_job, self).__init__(options=options)
130        self._pre_record_init(control, options)
131        try:
132            self._post_record_init(control, options, drop_caches)
133        except Exception, err:
134            self.record(
135                    'ABORT', None, None,'client.bin.job.__init__ failed: %s' %
136                    str(err))
137            raise
138
139
140    @classmethod
141    def _get_environ_autodir(cls):
142        return os.environ['AUTODIR']
143
144
145    @classmethod
146    def _find_base_directories(cls):
147        """
148        Determine locations of autodir and clientdir (which are the same)
149        using os.environ. Serverdir does not exist in this context.
150        """
151        autodir = clientdir = cls._get_environ_autodir()
152        return autodir, clientdir, None
153
154
155    @classmethod
156    def _parse_args(cls, args):
157        return re.findall("[^\s]*?['|\"].*?['|\"]|[^\s]+", args)
158
159
160    def _find_resultdir(self, options):
161        """
162        Determine the directory for storing results. On a client this is
163        always <autodir>/results/<tag>, where tag is passed in on the command
164        line as an option.
165        """
166        output_dir_config = GLOBAL_CONFIG.get_config_value('CLIENT',
167                                                           'output_dir',
168                                                            default="")
169        if options.output_dir:
170            basedir = options.output_dir
171        elif output_dir_config:
172            basedir = output_dir_config
173        else:
174            basedir = self.autodir
175
176        return os.path.join(basedir, 'results', options.tag)
177
178
179    def _get_status_logger(self):
180        """Return a reference to the status logger."""
181        return self._logger
182
183
184    def _pre_record_init(self, control, options):
185        """
186        Initialization function that should peform ONLY the required
187        setup so that the self.record() method works.
188
189        As of now self.record() needs self.resultdir, self._group_level,
190        self.harness and of course self._logger.
191        """
192        if not options.cont:
193            self._cleanup_debugdir_files()
194            self._cleanup_results_dir()
195
196        logging_manager.configure_logging(
197            client_logging_config.ClientLoggingConfig(),
198            results_dir=self.resultdir,
199            verbose=options.verbose)
200        logging.info('Writing results to %s', self.resultdir)
201
202        # init_group_level needs the state
203        self.control = os.path.realpath(control)
204        self._is_continuation = options.cont
205        self._current_step_ancestry = []
206        self._next_step_index = 0
207        self._load_state()
208
209        _harness = self.handle_persistent_option(options, 'harness')
210        _harness_args = self.handle_persistent_option(options, 'harness_args')
211
212        self.harness = harness.select(_harness, self, _harness_args)
213
214        # set up the status logger
215        def client_job_record_hook(entry):
216            msg_tag = ''
217            if '.' in self._logger.global_filename:
218                msg_tag = self._logger.global_filename.split('.', 1)[1]
219            # send the entry to the job harness
220            message = '\n'.join([entry.message] + entry.extra_message_lines)
221            rendered_entry = self._logger.render_entry(entry)
222            self.harness.test_status_detail(entry.status_code, entry.subdir,
223                                            entry.operation, message, msg_tag,
224                                            entry.fields)
225            self.harness.test_status(rendered_entry, msg_tag)
226            # send the entry to stdout, if it's enabled
227            logging.info(rendered_entry)
228        self._logger = base_job.status_logger(
229            self, status_indenter(self), record_hook=client_job_record_hook,
230            tap_writer=self._tap)
231
232
233    def _post_record_init(self, control, options, drop_caches):
234        """
235        Perform job initialization not required by self.record().
236        """
237        self._init_drop_caches(drop_caches)
238
239        self._init_packages()
240
241        self.sysinfo = sysinfo.sysinfo(self.resultdir)
242        self._load_sysinfo_state()
243
244        if not options.cont:
245            download = os.path.join(self.testdir, 'download')
246            if not os.path.exists(download):
247                os.mkdir(download)
248
249            shutil.copyfile(self.control,
250                            os.path.join(self.resultdir, 'control'))
251
252        self.control = control
253
254        self.logging = logging_manager.get_logging_manager(
255                manage_stdout_and_stderr=True, redirect_fds=True)
256        self.logging.start_logging()
257
258        self.profilers = profilers.profilers(self)
259
260        self.machines = [options.hostname]
261        self.machine_dict_list = [{'hostname' : options.hostname}]
262        # Client side tests should always run the same whether or not they are
263        # running in the lab.
264        self.in_lab = False
265        self.hosts = set([local_host.LocalHost(hostname=options.hostname)])
266
267        self.args = []
268        if options.args:
269            self.args = self._parse_args(options.args)
270
271        if options.user:
272            self.user = options.user
273        else:
274            self.user = getpass.getuser()
275
276        self.sysinfo.log_per_reboot_data()
277
278        if not options.cont:
279            self.record('START', None, None)
280
281        self.harness.run_start()
282
283        if options.log:
284            self.enable_external_logging()
285
286        self.num_tests_run = None
287        self.num_tests_failed = None
288
289        self.warning_loggers = None
290        self.warning_manager = None
291
292
293    def _init_drop_caches(self, drop_caches):
294        """
295        Perform the drop caches initialization.
296        """
297        self.drop_caches_between_iterations = (
298                                    GLOBAL_CONFIG.get_config_value('CLIENT',
299                                    'drop_caches_between_iterations',
300                                    type=bool, default=True))
301        self.drop_caches = drop_caches
302        if self.drop_caches:
303            utils.drop_caches()
304
305
306    def _init_packages(self):
307        """
308        Perform the packages support initialization.
309        """
310        self.pkgmgr = packages.PackageManager(
311            self.autodir, run_function_dargs={'timeout':3600})
312
313
314    def _cleanup_results_dir(self):
315        """Delete everything in resultsdir"""
316        assert os.path.exists(self.resultdir)
317        list_files = glob.glob('%s/*' % self.resultdir)
318        for f in list_files:
319            if os.path.isdir(f):
320                shutil.rmtree(f)
321            elif os.path.isfile(f):
322                os.remove(f)
323
324
325    def _cleanup_debugdir_files(self):
326        """
327        Delete any leftover debugdir files
328        """
329        list_files = glob.glob("/tmp/autotest_results_dir.*")
330        for f in list_files:
331            os.remove(f)
332
333
334    def disable_warnings(self, warning_type):
335        self.record("INFO", None, None,
336                    "disabling %s warnings" % warning_type,
337                    {"warnings.disable": warning_type})
338        time.sleep(self._WARNING_DISABLE_DELAY)
339
340
341    def enable_warnings(self, warning_type):
342        time.sleep(self._WARNING_DISABLE_DELAY)
343        self.record("INFO", None, None,
344                    "enabling %s warnings" % warning_type,
345                    {"warnings.enable": warning_type})
346
347
348    def monitor_disk_usage(self, max_rate):
349        """\
350        Signal that the job should monitor disk space usage on /
351        and generate a warning if a test uses up disk space at a
352        rate exceeding 'max_rate'.
353
354        Parameters:
355             max_rate - the maximium allowed rate of disk consumption
356                        during a test, in MB/hour, or 0 to indicate
357                        no limit.
358        """
359        self._max_disk_usage_rate = max_rate
360
361
362    def control_get(self):
363        return self.control
364
365
366    def control_set(self, control):
367        self.control = os.path.abspath(control)
368
369
370    def harness_select(self, which, harness_args):
371        self.harness = harness.select(which, self, harness_args)
372
373
374    def setup_dirs(self, results_dir, tmp_dir):
375        if not tmp_dir:
376            tmp_dir = os.path.join(self.tmpdir, 'build')
377        if not os.path.exists(tmp_dir):
378            os.mkdir(tmp_dir)
379        if not os.path.isdir(tmp_dir):
380            e_msg = "Temp dir (%s) is not a dir - args backwards?" % self.tmpdir
381            raise ValueError(e_msg)
382
383        # We label the first build "build" and then subsequent ones
384        # as "build.2", "build.3", etc. Whilst this is a little bit
385        # inconsistent, 99.9% of jobs will only have one build
386        # (that's not done as kernbench, sparse, or buildtest),
387        # so it works out much cleaner. One of life's compromises.
388        if not results_dir:
389            results_dir = os.path.join(self.resultdir, 'build')
390            i = 2
391            while os.path.exists(results_dir):
392                results_dir = os.path.join(self.resultdir, 'build.%d' % i)
393                i += 1
394        if not os.path.exists(results_dir):
395            os.mkdir(results_dir)
396
397        return (results_dir, tmp_dir)
398
399
400    def barrier(self, *args, **kwds):
401        """Create a barrier object"""
402        return barrier.barrier(*args, **kwds)
403
404
405    def install_pkg(self, name, pkg_type, install_dir):
406        '''
407        This method is a simple wrapper around the actual package
408        installation method in the Packager class. This is used
409        internally by the profilers, deps and tests code.
410        name : name of the package (ex: sleeptest, dbench etc.)
411        pkg_type : Type of the package (ex: test, dep etc.)
412        install_dir : The directory in which the source is actually
413                      untarred into. (ex: client/profilers/<name> for profilers)
414        '''
415        if self.pkgmgr.repositories:
416            self.pkgmgr.install_pkg(name, pkg_type, self.pkgdir, install_dir)
417
418
419    def add_repository(self, repo_urls):
420        '''
421        Adds the repository locations to the job so that packages
422        can be fetched from them when needed. The repository list
423        needs to be a string list
424        Ex: job.add_repository(['http://blah1','http://blah2'])
425        '''
426        for repo_url in repo_urls:
427            self.pkgmgr.add_repository(repo_url)
428
429        # Fetch the packages' checksum file that contains the checksums
430        # of all the packages if it is not already fetched. The checksum
431        # is always fetched whenever a job is first started. This
432        # is not done in the job's constructor as we don't have the list of
433        # the repositories there (and obviously don't care about this file
434        # if we are not using the repos)
435        try:
436            checksum_file_path = os.path.join(self.pkgmgr.pkgmgr_dir,
437                                              packages.CHECKSUM_FILE)
438            self.pkgmgr.fetch_pkg(packages.CHECKSUM_FILE,
439                                  checksum_file_path, use_checksum=False)
440        except error.PackageFetchError:
441            # packaging system might not be working in this case
442            # Silently fall back to the normal case
443            pass
444
445
446    def require_gcc(self):
447        """
448        Test whether gcc is installed on the machine.
449        """
450        # check if gcc is installed on the system.
451        try:
452            utils.system('which gcc')
453        except error.CmdError:
454            raise NotAvailableError('gcc is required by this job and is '
455                                    'not available on the system')
456
457
458    def setup_dep(self, deps):
459        """Set up the dependencies for this test.
460        deps is a list of libraries required for this test.
461        """
462        # Fetch the deps from the repositories and set them up.
463        for dep in deps:
464            dep_dir = os.path.join(self.autodir, 'deps', dep)
465            # Search for the dependency in the repositories if specified,
466            # else check locally.
467            try:
468                self.install_pkg(dep, 'dep', dep_dir)
469            except error.PackageInstallError:
470                # see if the dep is there locally
471                pass
472
473            # dep_dir might not exist if it is not fetched from the repos
474            if not os.path.exists(dep_dir):
475                raise error.TestError("Dependency %s does not exist" % dep)
476
477            os.chdir(dep_dir)
478            if execfile('%s.py' % dep, {}) is None:
479                logging.info('Dependency %s successfuly built', dep)
480
481
482    def _runtest(self, url, tag, timeout, args, dargs):
483        try:
484            l = lambda : test.runtest(self, url, tag, args, dargs)
485            pid = parallel.fork_start(self.resultdir, l)
486
487            if timeout:
488                logging.debug('Waiting for pid %d for %d seconds', pid, timeout)
489                parallel.fork_waitfor_timed(self.resultdir, pid, timeout)
490            else:
491                parallel.fork_waitfor(self.resultdir, pid)
492
493        except error.TestBaseException:
494            # These are already classified with an error type (exit_status)
495            raise
496        except error.JobError:
497            raise  # Caught further up and turned into an ABORT.
498        except Exception, e:
499            # Converts all other exceptions thrown by the test regardless
500            # of phase into a TestError(TestBaseException) subclass that
501            # reports them with their full stack trace.
502            raise error.UnhandledTestError(e)
503
504
505    def _run_test_base(self, url, *args, **dargs):
506        """
507        Prepares arguments and run functions to run_test and run_test_detail.
508
509        @param url A url that identifies the test to run.
510        @param tag An optional keyword argument that will be added to the
511            test and subdir name.
512        @param subdir_tag An optional keyword argument that will be added
513            to the subdir name.
514
515        @returns:
516                subdir: Test subdirectory
517                testname: Test name
518                group_func: Actual test run function
519                timeout: Test timeout
520        """
521        _group, testname = self.pkgmgr.get_package_name(url, 'test')
522        testname, subdir, tag = self._build_tagged_test_name(testname, dargs)
523        self._make_test_outputdir(subdir)
524
525        timeout = dargs.pop('timeout', None)
526        if timeout:
527            logging.debug('Test has timeout: %d sec.', timeout)
528
529        def log_warning(reason):
530            self.record("WARN", subdir, testname, reason)
531        @disk_usage_monitor.watch(log_warning, "/", self._max_disk_usage_rate)
532        def group_func():
533            try:
534                self._runtest(url, tag, timeout, args, dargs)
535            except error.TestBaseException, detail:
536                # The error is already classified, record it properly.
537                self.record(detail.exit_status, subdir, testname, str(detail))
538                raise
539            else:
540                self.record('GOOD', subdir, testname, 'completed successfully')
541
542        return (subdir, testname, group_func, timeout)
543
544
545    @_run_test_complete_on_exit
546    def run_test(self, url, *args, **dargs):
547        """
548        Summon a test object and run it.
549
550        @param url A url that identifies the test to run.
551        @param tag An optional keyword argument that will be added to the
552            test and subdir name.
553        @param subdir_tag An optional keyword argument that will be added
554            to the subdir name.
555
556        @returns True if the test passes, False otherwise.
557        """
558        (subdir, testname, group_func, timeout) = self._run_test_base(url,
559                                                                      *args,
560                                                                      **dargs)
561        try:
562            self._rungroup(subdir, testname, group_func, timeout)
563            return True
564        except error.TestBaseException:
565            return False
566        # Any other exception here will be given to the caller
567        #
568        # NOTE: The only exception possible from the control file here
569        # is error.JobError as _runtest() turns all others into an
570        # UnhandledTestError that is caught above.
571
572
573    @_run_test_complete_on_exit
574    def run_test_detail(self, url, *args, **dargs):
575        """
576        Summon a test object and run it, returning test status.
577
578        @param url A url that identifies the test to run.
579        @param tag An optional keyword argument that will be added to the
580            test and subdir name.
581        @param subdir_tag An optional keyword argument that will be added
582            to the subdir name.
583
584        @returns Test status
585        @see: client/common_lib/error.py, exit_status
586        """
587        (subdir, testname, group_func, timeout) = self._run_test_base(url,
588                                                                      *args,
589                                                                      **dargs)
590        try:
591            self._rungroup(subdir, testname, group_func, timeout)
592            return 'GOOD'
593        except error.TestBaseException, detail:
594            return detail.exit_status
595
596
597    def _rungroup(self, subdir, testname, function, timeout, *args, **dargs):
598        """\
599        subdir:
600                name of the group
601        testname:
602                name of the test to run, or support step
603        function:
604                subroutine to run
605        *args:
606                arguments for the function
607
608        Returns the result of the passed in function
609        """
610
611        try:
612            optional_fields = None
613            if timeout:
614                optional_fields = {}
615                optional_fields['timeout'] = timeout
616            self.record('START', subdir, testname,
617                        optional_fields=optional_fields)
618
619            self._state.set('client', 'unexpected_reboot', (subdir, testname))
620            try:
621                result = function(*args, **dargs)
622                self.record('END GOOD', subdir, testname)
623                return result
624            except error.TestBaseException, e:
625                self.record('END %s' % e.exit_status, subdir, testname)
626                raise
627            except error.JobError, e:
628                self.record('END ABORT', subdir, testname)
629                raise
630            except Exception, e:
631                # This should only ever happen due to a bug in the given
632                # function's code.  The common case of being called by
633                # run_test() will never reach this.  If a control file called
634                # run_group() itself, bugs in its function will be caught
635                # here.
636                err_msg = str(e) + '\n' + traceback.format_exc()
637                self.record('END ERROR', subdir, testname, err_msg)
638                raise
639        finally:
640            self._state.discard('client', 'unexpected_reboot')
641
642
643    def run_group(self, function, tag=None, **dargs):
644        """
645        Run a function nested within a group level.
646
647        function:
648                Callable to run.
649        tag:
650                An optional tag name for the group.  If None (default)
651                function.__name__ will be used.
652        **dargs:
653                Named arguments for the function.
654        """
655        if tag:
656            name = tag
657        else:
658            name = function.__name__
659
660        try:
661            return self._rungroup(subdir=None, testname=name,
662                                  function=function, timeout=None, **dargs)
663        except (SystemExit, error.TestBaseException):
664            raise
665        # If there was a different exception, turn it into a TestError.
666        # It will be caught by step_engine or _run_step_fn.
667        except Exception, e:
668            raise error.UnhandledTestError(e)
669
670
671    def cpu_count(self):
672        return utils.count_cpus()  # use total system count
673
674
675    def start_reboot(self):
676        self.record('START', None, 'reboot')
677        self.record('GOOD', None, 'reboot.start')
678
679
680    def _record_reboot_failure(self, subdir, operation, status,
681                               running_id=None):
682        self.record("ABORT", subdir, operation, status)
683        if not running_id:
684            running_id = utils.running_os_ident()
685        kernel = {"kernel": running_id.split("::")[0]}
686        self.record("END ABORT", subdir, 'reboot', optional_fields=kernel)
687
688
689    def _check_post_reboot(self, subdir, running_id=None):
690        """
691        Function to perform post boot checks such as if the system configuration
692        has changed across reboots (specifically, CPUs and partitions).
693
694        @param subdir: The subdir to use in the job.record call.
695        @param running_id: An optional running_id to include in the reboot
696            failure log message
697
698        @raise JobError: Raised if the current configuration does not match the
699            pre-reboot configuration.
700        """
701        # check to see if any partitions have changed
702        partition_list = partition_lib.get_partition_list(self,
703                                                          exclude_swap=False)
704        mount_info = partition_lib.get_mount_info(partition_list)
705        old_mount_info = self._state.get('client', 'mount_info')
706        if mount_info != old_mount_info:
707            new_entries = mount_info - old_mount_info
708            old_entries = old_mount_info - mount_info
709            description = ("mounted partitions are different after reboot "
710                           "(old entries: %s, new entries: %s)" %
711                           (old_entries, new_entries))
712            self._record_reboot_failure(subdir, "reboot.verify_config",
713                                        description, running_id=running_id)
714            raise error.JobError("Reboot failed: %s" % description)
715
716        # check to see if any CPUs have changed
717        cpu_count = utils.count_cpus()
718        old_count = self._state.get('client', 'cpu_count')
719        if cpu_count != old_count:
720            description = ('Number of CPUs changed after reboot '
721                           '(old count: %d, new count: %d)' %
722                           (old_count, cpu_count))
723            self._record_reboot_failure(subdir, 'reboot.verify_config',
724                                        description, running_id=running_id)
725            raise error.JobError('Reboot failed: %s' % description)
726
727
728    def partition(self, device, loop_size=0, mountpoint=None):
729        """
730        Work with a machine partition
731
732            @param device: e.g. /dev/sda2, /dev/sdb1 etc...
733            @param mountpoint: Specify a directory to mount to. If not specified
734                               autotest tmp directory will be used.
735            @param loop_size: Size of loopback device (in MB). Defaults to 0.
736
737            @return: A L{client.bin.partition.partition} object
738        """
739
740        if not mountpoint:
741            mountpoint = self.tmpdir
742        return partition_lib.partition(self, device, loop_size, mountpoint)
743
744    @utils.deprecated
745    def filesystem(self, device, mountpoint=None, loop_size=0):
746        """ Same as partition
747
748        @deprecated: Use partition method instead
749        """
750        return self.partition(device, loop_size, mountpoint)
751
752
753    def enable_external_logging(self):
754        pass
755
756
757    def disable_external_logging(self):
758        pass
759
760
761    def reboot_setup(self):
762        # save the partition list and mount points, as well as the cpu count
763        partition_list = partition_lib.get_partition_list(self,
764                                                          exclude_swap=False)
765        mount_info = partition_lib.get_mount_info(partition_list)
766        self._state.set('client', 'mount_info', mount_info)
767        self._state.set('client', 'cpu_count', utils.count_cpus())
768
769
770    def reboot(self):
771        self.reboot_setup()
772        self.harness.run_reboot()
773
774        # HACK: using this as a module sometimes hangs shutdown, so if it's
775        # installed unload it first
776        utils.system("modprobe -r netconsole", ignore_status=True)
777
778        # sync first, so that a sync during shutdown doesn't time out
779        utils.system("sync; sync", ignore_status=True)
780
781        utils.system("(sleep 5; reboot) </dev/null >/dev/null 2>&1 &")
782        self.quit()
783
784
785    def noop(self, text):
786        logging.info("job: noop: " + text)
787
788
789    @_run_test_complete_on_exit
790    def parallel(self, *tasklist):
791        """Run tasks in parallel"""
792
793        pids = []
794        old_log_filename = self._logger.global_filename
795        for i, task in enumerate(tasklist):
796            assert isinstance(task, (tuple, list))
797            self._logger.global_filename = old_log_filename + (".%d" % i)
798            def task_func():
799                # stub out _record_indent with a process-local one
800                base_record_indent = self._record_indent
801                proc_local = self._job_state.property_factory(
802                    '_state', '_record_indent.%d' % os.getpid(),
803                    base_record_indent, namespace='client')
804                self.__class__._record_indent = proc_local
805                task[0](*task[1:])
806            pids.append(parallel.fork_start(self.resultdir, task_func))
807
808        old_log_path = os.path.join(self.resultdir, old_log_filename)
809        old_log = open(old_log_path, "a")
810        exceptions = []
811        for i, pid in enumerate(pids):
812            # wait for the task to finish
813            try:
814                parallel.fork_waitfor(self.resultdir, pid)
815            except Exception, e:
816                exceptions.append(e)
817            # copy the logs from the subtask into the main log
818            new_log_path = old_log_path + (".%d" % i)
819            if os.path.exists(new_log_path):
820                new_log = open(new_log_path)
821                old_log.write(new_log.read())
822                new_log.close()
823                old_log.flush()
824                os.remove(new_log_path)
825        old_log.close()
826
827        self._logger.global_filename = old_log_filename
828
829        # handle any exceptions raised by the parallel tasks
830        if exceptions:
831            msg = "%d task(s) failed in job.parallel" % len(exceptions)
832            raise error.JobError(msg)
833
834
835    def quit(self):
836        # XXX: should have a better name.
837        self.harness.run_pause()
838        raise error.JobContinue("more to come")
839
840
841    def complete(self, status):
842        """Write pending TAP reports, clean up, and exit"""
843        # write out TAP reports
844        if self._tap.do_tap_report:
845            self._tap.write()
846            self._tap._write_tap_archive()
847
848        # write out a job HTML report
849        try:
850            html_report.create_report(self.resultdir)
851        except Exception, e:
852            logging.error("Error writing job HTML report: %s", e)
853
854        # We are about to exit 'complete' so clean up the control file.
855        dest = os.path.join(self.resultdir, os.path.basename(self._state_file))
856        shutil.move(self._state_file, dest)
857
858        self.harness.run_complete()
859        self.disable_external_logging()
860        sys.exit(status)
861
862
863    def _load_state(self):
864        # grab any initial state and set up $CONTROL.state as the backing file
865        init_state_file = self.control + '.init.state'
866        self._state_file = self.control + '.state'
867        if os.path.exists(init_state_file):
868            shutil.move(init_state_file, self._state_file)
869        self._state.set_backing_file(self._state_file)
870
871        # initialize the state engine, if necessary
872        has_steps = self._state.has('client', 'steps')
873        if not self._is_continuation and has_steps:
874            raise RuntimeError('Loaded state can only contain client.steps if '
875                               'this is a continuation')
876
877        if not has_steps:
878            logging.debug('Initializing the state engine')
879            self._state.set('client', 'steps', [])
880
881
882    def handle_persistent_option(self, options, option_name):
883        """
884        Select option from command line or persistent state.
885        Store selected option to allow standalone client to continue
886        after reboot with previously selected options.
887        Priority:
888        1. explicitly specified via command line
889        2. stored in state file (if continuing job '-c')
890        3. default == None
891        """
892        option = None
893        cmd_line_option = getattr(options, option_name)
894        if cmd_line_option:
895            option = cmd_line_option
896            self._state.set('client', option_name, option)
897        else:
898            stored_option = self._state.get('client', option_name, None)
899            if stored_option:
900                option = stored_option
901        logging.debug('Persistent option %s now set to %s', option_name, option)
902        return option
903
904
905    def __create_step_tuple(self, fn, args, dargs):
906        # Legacy code passes in an array where the first arg is
907        # the function or its name.
908        if isinstance(fn, list):
909            assert(len(args) == 0)
910            assert(len(dargs) == 0)
911            args = fn[1:]
912            fn = fn[0]
913        # Pickling actual functions is hairy, thus we have to call
914        # them by name.  Unfortunately, this means only functions
915        # defined globally can be used as a next step.
916        if callable(fn):
917            fn = fn.__name__
918        if not isinstance(fn, types.StringTypes):
919            raise StepError("Next steps must be functions or "
920                            "strings containing the function name")
921        ancestry = copy.copy(self._current_step_ancestry)
922        return (ancestry, fn, args, dargs)
923
924
925    def next_step_append(self, fn, *args, **dargs):
926        """Define the next step and place it at the end"""
927        steps = self._state.get('client', 'steps')
928        steps.append(self.__create_step_tuple(fn, args, dargs))
929        self._state.set('client', 'steps', steps)
930
931
932    def next_step(self, fn, *args, **dargs):
933        """Create a new step and place it after any steps added
934        while running the current step but before any steps added in
935        previous steps"""
936        steps = self._state.get('client', 'steps')
937        steps.insert(self._next_step_index,
938                     self.__create_step_tuple(fn, args, dargs))
939        self._next_step_index += 1
940        self._state.set('client', 'steps', steps)
941
942
943    def next_step_prepend(self, fn, *args, **dargs):
944        """Insert a new step, executing first"""
945        steps = self._state.get('client', 'steps')
946        steps.insert(0, self.__create_step_tuple(fn, args, dargs))
947        self._next_step_index += 1
948        self._state.set('client', 'steps', steps)
949
950
951
952    def _run_step_fn(self, local_vars, fn, args, dargs):
953        """Run a (step) function within the given context"""
954
955        local_vars['__args'] = args
956        local_vars['__dargs'] = dargs
957        try:
958            exec('__ret = %s(*__args, **__dargs)' % fn, local_vars, local_vars)
959            return local_vars['__ret']
960        except SystemExit:
961            raise  # Send error.JobContinue and JobComplete on up to runjob.
962        except error.TestNAError, detail:
963            self.record(detail.exit_status, None, fn, str(detail))
964        except Exception, detail:
965            raise error.UnhandledJobError(detail)
966
967
968    def _create_frame(self, global_vars, ancestry, fn_name):
969        """Set up the environment like it would have been when this
970        function was first defined.
971
972        Child step engine 'implementations' must have 'return locals()'
973        at end end of their steps.  Because of this, we can call the
974        parent function and get back all child functions (i.e. those
975        defined within it).
976
977        Unfortunately, the call stack of the function calling
978        job.next_step might have been deeper than the function it
979        added.  In order to make sure that the environment is what it
980        should be, we need to then pop off the frames we built until
981        we find the frame where the function was first defined."""
982
983        # The copies ensure that the parent frames are not modified
984        # while building child frames.  This matters if we then
985        # pop some frames in the next part of this function.
986        current_frame = copy.copy(global_vars)
987        frames = [current_frame]
988        for steps_fn_name in ancestry:
989            ret = self._run_step_fn(current_frame, steps_fn_name, [], {})
990            current_frame = copy.copy(ret)
991            frames.append(current_frame)
992
993        # Walk up the stack frames until we find the place fn_name was defined.
994        while len(frames) > 2:
995            if fn_name not in frames[-2]:
996                break
997            if frames[-2][fn_name] != frames[-1][fn_name]:
998                break
999            frames.pop()
1000            ancestry.pop()
1001
1002        return (frames[-1], ancestry)
1003
1004
1005    def _add_step_init(self, local_vars, current_function):
1006        """If the function returned a dictionary that includes a
1007        function named 'step_init', prepend it to our list of steps.
1008        This will only get run the first time a function with a nested
1009        use of the step engine is run."""
1010
1011        if (isinstance(local_vars, dict) and
1012            'step_init' in local_vars and
1013            callable(local_vars['step_init'])):
1014            # The init step is a child of the function
1015            # we were just running.
1016            self._current_step_ancestry.append(current_function)
1017            self.next_step_prepend('step_init')
1018
1019
1020    def step_engine(self):
1021        """The multi-run engine used when the control file defines step_init.
1022
1023        Does the next step.
1024        """
1025
1026        # Set up the environment and then interpret the control file.
1027        # Some control files will have code outside of functions,
1028        # which means we need to have our state engine initialized
1029        # before reading in the file.
1030        global_control_vars = {'job': self,
1031                               'args': self.args}
1032        exec(JOB_PREAMBLE, global_control_vars, global_control_vars)
1033        try:
1034            execfile(self.control, global_control_vars, global_control_vars)
1035        except error.TestNAError, detail:
1036            self.record(detail.exit_status, None, self.control, str(detail))
1037        except SystemExit:
1038            raise  # Send error.JobContinue and JobComplete on up to runjob.
1039        except Exception, detail:
1040            # Syntax errors or other general Python exceptions coming out of
1041            # the top level of the control file itself go through here.
1042            raise error.UnhandledJobError(detail)
1043
1044        # If we loaded in a mid-job state file, then we presumably
1045        # know what steps we have yet to run.
1046        if not self._is_continuation:
1047            if 'step_init' in global_control_vars:
1048                self.next_step(global_control_vars['step_init'])
1049        else:
1050            # if last job failed due to unexpected reboot, record it as fail
1051            # so harness gets called
1052            last_job = self._state.get('client', 'unexpected_reboot', None)
1053            if last_job:
1054                subdir, testname = last_job
1055                self.record('FAIL', subdir, testname, 'unexpected reboot')
1056                self.record('END FAIL', subdir, testname)
1057
1058        # Iterate through the steps.  If we reboot, we'll simply
1059        # continue iterating on the next step.
1060        while len(self._state.get('client', 'steps')) > 0:
1061            steps = self._state.get('client', 'steps')
1062            (ancestry, fn_name, args, dargs) = steps.pop(0)
1063            self._state.set('client', 'steps', steps)
1064
1065            self._next_step_index = 0
1066            ret = self._create_frame(global_control_vars, ancestry, fn_name)
1067            local_vars, self._current_step_ancestry = ret
1068            local_vars = self._run_step_fn(local_vars, fn_name, args, dargs)
1069            self._add_step_init(local_vars, fn_name)
1070
1071
1072    def add_sysinfo_command(self, command, logfile=None, on_every_test=False):
1073        self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile),
1074                                   on_every_test)
1075
1076
1077    def add_sysinfo_logfile(self, file, on_every_test=False):
1078        self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test)
1079
1080
1081    def _add_sysinfo_loggable(self, loggable, on_every_test):
1082        if on_every_test:
1083            self.sysinfo.test_loggables.add(loggable)
1084        else:
1085            self.sysinfo.boot_loggables.add(loggable)
1086        self._save_sysinfo_state()
1087
1088
1089    def _load_sysinfo_state(self):
1090        state = self._state.get('client', 'sysinfo', None)
1091        if state:
1092            self.sysinfo.deserialize(state)
1093
1094
1095    def _save_sysinfo_state(self):
1096        state = self.sysinfo.serialize()
1097        self._state.set('client', 'sysinfo', state)
1098
1099
1100class disk_usage_monitor:
1101    def __init__(self, logging_func, device, max_mb_per_hour):
1102        self.func = logging_func
1103        self.device = device
1104        self.max_mb_per_hour = max_mb_per_hour
1105
1106
1107    def start(self):
1108        self.initial_space = utils.freespace(self.device)
1109        self.start_time = time.time()
1110
1111
1112    def stop(self):
1113        # if no maximum usage rate was set, we don't need to
1114        # generate any warnings
1115        if not self.max_mb_per_hour:
1116            return
1117
1118        final_space = utils.freespace(self.device)
1119        used_space = self.initial_space - final_space
1120        stop_time = time.time()
1121        total_time = stop_time - self.start_time
1122        # round up the time to one minute, to keep extremely short
1123        # tests from generating false positives due to short, badly
1124        # timed bursts of activity
1125        total_time = max(total_time, 60.0)
1126
1127        # determine the usage rate
1128        bytes_per_sec = used_space / total_time
1129        mb_per_sec = bytes_per_sec / 1024**2
1130        mb_per_hour = mb_per_sec * 60 * 60
1131
1132        if mb_per_hour > self.max_mb_per_hour:
1133            msg = ("disk space on %s was consumed at a rate of %.2f MB/hour")
1134            msg %= (self.device, mb_per_hour)
1135            self.func(msg)
1136
1137
1138    @classmethod
1139    def watch(cls, *monitor_args, **monitor_dargs):
1140        """ Generic decorator to wrap a function call with the
1141        standard create-monitor -> start -> call -> stop idiom."""
1142        def decorator(func):
1143            def watched_func(*args, **dargs):
1144                monitor = cls(*monitor_args, **monitor_dargs)
1145                monitor.start()
1146                try:
1147                    func(*args, **dargs)
1148                finally:
1149                    monitor.stop()
1150            return watched_func
1151        return decorator
1152
1153
1154def runjob(control, drop_caches, options):
1155    """
1156    Run a job using the given control file.
1157
1158    This is the main interface to this module.
1159
1160    @see base_job.__init__ for parameter info.
1161    """
1162    control = os.path.abspath(control)
1163    state = control + '.state'
1164    # Ensure state file is cleaned up before the job starts to run if autotest
1165    # is not running with the --continue flag
1166    if not options.cont and os.path.isfile(state):
1167        logging.debug('Cleaning up previously found state file')
1168        os.remove(state)
1169
1170    # instantiate the job object ready for the control file.
1171    myjob = None
1172    try:
1173        # Check that the control file is valid
1174        if not os.path.exists(control):
1175            raise error.JobError(control + ": control file not found")
1176
1177        # When continuing, the job is complete when there is no
1178        # state file, ensure we don't try and continue.
1179        if options.cont and not os.path.exists(state):
1180            raise error.JobComplete("all done")
1181
1182        myjob = job(control=control, drop_caches=drop_caches, options=options)
1183
1184        # Load in the users control file, may do any one of:
1185        #  1) execute in toto
1186        #  2) define steps, and select the first via next_step()
1187        myjob.step_engine()
1188
1189    except error.JobContinue:
1190        sys.exit(5)
1191
1192    except error.JobComplete:
1193        sys.exit(1)
1194
1195    except error.JobError, instance:
1196        logging.error("JOB ERROR: " + str(instance))
1197        if myjob:
1198            command = None
1199            if len(instance.args) > 1:
1200                command = instance.args[1]
1201                myjob.record('ABORT', None, command, str(instance))
1202            myjob.record('END ABORT', None, None, str(instance))
1203            assert myjob._record_indent == 0
1204            myjob.complete(1)
1205        else:
1206            sys.exit(1)
1207
1208    except Exception, e:
1209        # NOTE: job._run_step_fn and job.step_engine will turn things into
1210        # a JobError for us.  If we get here, its likely an autotest bug.
1211        msg = str(e) + '\n' + traceback.format_exc()
1212        logging.critical("JOB ERROR (autotest bug?): " + msg)
1213        if myjob:
1214            myjob.record('END ABORT', None, None, msg)
1215            assert myjob._record_indent == 0
1216            myjob.complete(1)
1217        else:
1218            sys.exit(1)
1219
1220    # If we get here, then we assume the job is complete and good.
1221    myjob.record('END GOOD', None, None)
1222    assert myjob._record_indent == 0
1223
1224    myjob.complete(0)
1225
1226
1227class job(base_client_job):
1228
1229    def __init__(self, *args, **kwargs):
1230        base_client_job.__init__(self, *args, **kwargs)
1231
1232
1233    def run_test(self, url, *args, **dargs):
1234        log_pauser = cros_logging.LogRotationPauser()
1235        passed = False
1236        try:
1237            log_pauser.begin()
1238            passed = base_client_job.run_test(self, url, *args, **dargs)
1239            if not passed:
1240                # Save the VM state immediately after the test failure.
1241                # This is a NOOP if the the test isn't running in a VM or
1242                # if the VM is not properly configured to save state.
1243                _group, testname = self.pkgmgr.get_package_name(url, 'test')
1244                now = datetime.now().strftime('%I:%M:%S.%f')
1245                checkpoint_name = '%s-%s' % (testname, now)
1246                utils.save_vm_state(checkpoint_name)
1247        finally:
1248            log_pauser.end()
1249        return passed
1250
1251
1252    def reboot(self):
1253        self.reboot_setup()
1254        self.harness.run_reboot()
1255
1256        # sync first, so that a sync during shutdown doesn't time out
1257        utils.system('sync; sync', ignore_status=True)
1258
1259        utils.system('reboot </dev/null >/dev/null 2>&1 &')
1260        self.quit()
1261
1262
1263    def require_gcc(self):
1264        return False
1265