1"""The main job wrapper
2
3This is the core infrastructure.
4
5Copyright Andy Whitcroft, Martin J. Bligh 2006
6"""
7
8# pylint: disable=missing-docstring
9
10import copy
11from datetime import datetime
12import getpass
13import glob
14import logging
15import os
16import re
17import shutil
18import sys
19import time
20import traceback
21import types
22import weakref
23
24import common
25from autotest_lib.client.bin import client_logging_config
26from autotest_lib.client.bin import harness
27from autotest_lib.client.bin import local_host
28from autotest_lib.client.bin import parallel
29from autotest_lib.client.bin import partition as partition_lib
30from autotest_lib.client.bin import profilers
31from autotest_lib.client.bin import sysinfo
32from autotest_lib.client.bin import test
33from autotest_lib.client.bin import utils
34from autotest_lib.client.common_lib import barrier
35from autotest_lib.client.common_lib import base_job
36from autotest_lib.client.common_lib import control_data
37from autotest_lib.client.common_lib import error
38from autotest_lib.client.common_lib import global_config
39from autotest_lib.client.common_lib import logging_manager
40from autotest_lib.client.common_lib import packages
41from autotest_lib.client.cros import cros_logging
42from autotest_lib.client.tools import html_report
43
44GLOBAL_CONFIG = global_config.global_config
45
46LAST_BOOT_TAG = object()
47JOB_PREAMBLE = """
48from autotest_lib.client.common_lib.error import *
49from autotest_lib.client.bin.utils import *
50"""
51
52
53class StepError(error.AutotestError):
54    pass
55
56class NotAvailableError(error.AutotestError):
57    pass
58
59
60
61def _run_test_complete_on_exit(f):
62    """Decorator for job methods that automatically calls
63    self.harness.run_test_complete when the method exits, if appropriate."""
64    def wrapped(self, *args, **dargs):
65        try:
66            return f(self, *args, **dargs)
67        finally:
68            if self._logger.global_filename == 'status':
69                self.harness.run_test_complete()
70                if self.drop_caches:
71                    utils.drop_caches()
72    wrapped.__name__ = f.__name__
73    wrapped.__doc__ = f.__doc__
74    wrapped.__dict__.update(f.__dict__)
75    return wrapped
76
77
78class status_indenter(base_job.status_indenter):
79    """Provide a status indenter that is backed by job._record_prefix."""
80    def __init__(self, job_):
81        self._job = weakref.proxy(job_)  # avoid a circular reference
82
83
84    @property
85    def indent(self):
86        return self._job._record_indent
87
88
89    def increment(self):
90        self._job._record_indent += 1
91
92
93    def decrement(self):
94        self._job._record_indent -= 1
95
96
97class base_client_job(base_job.base_job):
98    """The client-side concrete implementation of base_job.
99
100    Optional properties provided by this implementation:
101        control
102        harness
103    """
104
105    _WARNING_DISABLE_DELAY = 5
106
107    # _record_indent is a persistent property, but only on the client
108    _job_state = base_job.base_job._job_state
109    _record_indent = _job_state.property_factory(
110        '_state', '_record_indent', 0, namespace='client')
111    _max_disk_usage_rate = _job_state.property_factory(
112        '_state', '_max_disk_usage_rate', 0.0, namespace='client')
113
114
115    def __init__(self, control, options, drop_caches=True):
116        """
117        Prepare a client side job object.
118
119        @param control: The control file (pathname of).
120        @param options: an object which includes:
121                jobtag: The job tag string (eg "default").
122                cont: If this is the continuation of this job.
123                harness_type: An alternative server harness.  [None]
124                use_external_logging: If true, the enable_external_logging
125                          method will be called during construction.  [False]
126        @param drop_caches: If true, utils.drop_caches() is called before and
127                between all tests.  [True]
128        """
129        super(base_client_job, self).__init__(options=options)
130        self._pre_record_init(control, options)
131        try:
132            self._post_record_init(control, options, drop_caches)
133        except Exception, err:
134            self.record(
135                    'ABORT', None, None,'client.bin.job.__init__ failed: %s' %
136                    str(err))
137            raise
138
139
140    @classmethod
141    def _get_environ_autodir(cls):
142        return os.environ['AUTODIR']
143
144
145    @classmethod
146    def _find_base_directories(cls):
147        """
148        Determine locations of autodir and clientdir (which are the same)
149        using os.environ. Serverdir does not exist in this context.
150        """
151        autodir = clientdir = cls._get_environ_autodir()
152        return autodir, clientdir, None
153
154
155    @classmethod
156    def _parse_args(cls, args):
157        return re.findall("[^\s]*?['|\"].*?['|\"]|[^\s]+", args)
158
159
160    def _find_resultdir(self, options):
161        """
162        Determine the directory for storing results. On a client this is
163        always <autodir>/results/<tag>, where tag is passed in on the command
164        line as an option.
165        """
166        output_dir_config = GLOBAL_CONFIG.get_config_value('CLIENT',
167                                                           'output_dir',
168                                                            default="")
169        if options.output_dir:
170            basedir = options.output_dir
171        elif output_dir_config:
172            basedir = output_dir_config
173        else:
174            basedir = self.autodir
175
176        return os.path.join(basedir, 'results', options.tag)
177
178
179    def _get_status_logger(self):
180        """Return a reference to the status logger."""
181        return self._logger
182
183
184    def _pre_record_init(self, control, options):
185        """
186        Initialization function that should peform ONLY the required
187        setup so that the self.record() method works.
188
189        As of now self.record() needs self.resultdir, self._group_level,
190        self.harness and of course self._logger.
191        """
192        if not options.cont:
193            self._cleanup_debugdir_files()
194            self._cleanup_results_dir()
195
196        logging_manager.configure_logging(
197            client_logging_config.ClientLoggingConfig(),
198            results_dir=self.resultdir,
199            verbose=options.verbose)
200        logging.info('Writing results to %s', self.resultdir)
201
202        # init_group_level needs the state
203        self.control = os.path.realpath(control)
204        self._is_continuation = options.cont
205        self._current_step_ancestry = []
206        self._next_step_index = 0
207        self._load_state()
208
209        _harness = self.handle_persistent_option(options, 'harness')
210        _harness_args = self.handle_persistent_option(options, 'harness_args')
211
212        self.harness = harness.select(_harness, self, _harness_args)
213
214        if self.control:
215            parsed_control = control_data.parse_control(
216                    self.control, raise_warnings=False)
217            self.fast = parsed_control.fast
218
219        # set up the status logger
220        def client_job_record_hook(entry):
221            msg_tag = ''
222            if '.' in self._logger.global_filename:
223                msg_tag = self._logger.global_filename.split('.', 1)[1]
224            # send the entry to the job harness
225            message = '\n'.join([entry.message] + entry.extra_message_lines)
226            rendered_entry = self._logger.render_entry(entry)
227            self.harness.test_status_detail(entry.status_code, entry.subdir,
228                                            entry.operation, message, msg_tag,
229                                            entry.fields)
230            self.harness.test_status(rendered_entry, msg_tag)
231            # send the entry to stdout, if it's enabled
232            logging.info(rendered_entry)
233        self._logger = base_job.status_logger(
234            self, status_indenter(self), record_hook=client_job_record_hook)
235
236
237    def _post_record_init(self, control, options, drop_caches):
238        """
239        Perform job initialization not required by self.record().
240        """
241        self._init_drop_caches(drop_caches)
242
243        self._init_packages()
244
245        self.sysinfo = sysinfo.sysinfo(self.resultdir)
246        self._load_sysinfo_state()
247
248        if not options.cont:
249            download = os.path.join(self.testdir, 'download')
250            if not os.path.exists(download):
251                os.mkdir(download)
252
253            shutil.copyfile(self.control,
254                            os.path.join(self.resultdir, 'control'))
255
256        self.control = control
257
258        self.logging = logging_manager.get_logging_manager(
259                manage_stdout_and_stderr=True, redirect_fds=True)
260        self.logging.start_logging()
261
262        self.profilers = profilers.profilers(self)
263
264        self.machines = [options.hostname]
265        self.machine_dict_list = [{'hostname' : options.hostname}]
266        # Client side tests should always run the same whether or not they are
267        # running in the lab.
268        self.in_lab = False
269        self.hosts = set([local_host.LocalHost(hostname=options.hostname)])
270
271        self.args = []
272        if options.args:
273            self.args = self._parse_args(options.args)
274
275        if options.user:
276            self.user = options.user
277        else:
278            self.user = getpass.getuser()
279
280        self.sysinfo.log_per_reboot_data()
281
282        if not options.cont:
283            self.record('START', None, None)
284
285        self.harness.run_start()
286
287        if options.log:
288            self.enable_external_logging()
289
290        self.num_tests_run = None
291        self.num_tests_failed = None
292
293        self.warning_loggers = None
294        self.warning_manager = None
295
296
297    def _init_drop_caches(self, drop_caches):
298        """
299        Perform the drop caches initialization.
300        """
301        self.drop_caches_between_iterations = (
302                                    GLOBAL_CONFIG.get_config_value('CLIENT',
303                                    'drop_caches_between_iterations',
304                                    type=bool, default=True))
305        self.drop_caches = drop_caches
306        if self.drop_caches:
307            utils.drop_caches()
308
309
310    def _init_packages(self):
311        """
312        Perform the packages support initialization.
313        """
314        self.pkgmgr = packages.PackageManager(
315            self.autodir, run_function_dargs={'timeout':3600})
316
317
318    def _cleanup_results_dir(self):
319        """Delete everything in resultsdir"""
320        assert os.path.exists(self.resultdir)
321        list_files = glob.glob('%s/*' % self.resultdir)
322        for f in list_files:
323            if os.path.isdir(f):
324                shutil.rmtree(f)
325            elif os.path.isfile(f):
326                os.remove(f)
327
328
329    def _cleanup_debugdir_files(self):
330        """
331        Delete any leftover debugdir files
332        """
333        list_files = glob.glob("/tmp/autotest_results_dir.*")
334        for f in list_files:
335            os.remove(f)
336
337
338    def disable_warnings(self, warning_type):
339        self.record("INFO", None, None,
340                    "disabling %s warnings" % warning_type,
341                    {"warnings.disable": warning_type})
342        time.sleep(self._WARNING_DISABLE_DELAY)
343
344
345    def enable_warnings(self, warning_type):
346        time.sleep(self._WARNING_DISABLE_DELAY)
347        self.record("INFO", None, None,
348                    "enabling %s warnings" % warning_type,
349                    {"warnings.enable": warning_type})
350
351
352    def monitor_disk_usage(self, max_rate):
353        """\
354        Signal that the job should monitor disk space usage on /
355        and generate a warning if a test uses up disk space at a
356        rate exceeding 'max_rate'.
357
358        Parameters:
359             max_rate - the maximium allowed rate of disk consumption
360                        during a test, in MB/hour, or 0 to indicate
361                        no limit.
362        """
363        self._max_disk_usage_rate = max_rate
364
365
366    def control_get(self):
367        return self.control
368
369
370    def control_set(self, control):
371        self.control = os.path.abspath(control)
372
373
374    def harness_select(self, which, harness_args):
375        self.harness = harness.select(which, self, harness_args)
376
377
378    def setup_dirs(self, results_dir, tmp_dir):
379        if not tmp_dir:
380            tmp_dir = os.path.join(self.tmpdir, 'build')
381        if not os.path.exists(tmp_dir):
382            os.mkdir(tmp_dir)
383        if not os.path.isdir(tmp_dir):
384            e_msg = "Temp dir (%s) is not a dir - args backwards?" % self.tmpdir
385            raise ValueError(e_msg)
386
387        # We label the first build "build" and then subsequent ones
388        # as "build.2", "build.3", etc. Whilst this is a little bit
389        # inconsistent, 99.9% of jobs will only have one build
390        # (that's not done as kernbench, sparse, or buildtest),
391        # so it works out much cleaner. One of life's compromises.
392        if not results_dir:
393            results_dir = os.path.join(self.resultdir, 'build')
394            i = 2
395            while os.path.exists(results_dir):
396                results_dir = os.path.join(self.resultdir, 'build.%d' % i)
397                i += 1
398        if not os.path.exists(results_dir):
399            os.mkdir(results_dir)
400
401        return (results_dir, tmp_dir)
402
403
404    def barrier(self, *args, **kwds):
405        """Create a barrier object"""
406        return barrier.barrier(*args, **kwds)
407
408
409    def install_pkg(self, name, pkg_type, install_dir):
410        '''
411        This method is a simple wrapper around the actual package
412        installation method in the Packager class. This is used
413        internally by the profilers, deps and tests code.
414        name : name of the package (ex: sleeptest, dbench etc.)
415        pkg_type : Type of the package (ex: test, dep etc.)
416        install_dir : The directory in which the source is actually
417                      untarred into. (ex: client/profilers/<name> for profilers)
418        '''
419        if self.pkgmgr.repositories:
420            self.pkgmgr.install_pkg(name, pkg_type, self.pkgdir, install_dir)
421
422
423    def add_repository(self, repo_urls):
424        '''
425        Adds the repository locations to the job so that packages
426        can be fetched from them when needed. The repository list
427        needs to be a string list
428        Ex: job.add_repository(['http://blah1','http://blah2'])
429        '''
430        for repo_url in repo_urls:
431            self.pkgmgr.add_repository(repo_url)
432
433        # Fetch the packages' checksum file that contains the checksums
434        # of all the packages if it is not already fetched. The checksum
435        # is always fetched whenever a job is first started. This
436        # is not done in the job's constructor as we don't have the list of
437        # the repositories there (and obviously don't care about this file
438        # if we are not using the repos)
439        try:
440            checksum_file_path = os.path.join(self.pkgmgr.pkgmgr_dir,
441                                              packages.CHECKSUM_FILE)
442            self.pkgmgr.fetch_pkg(packages.CHECKSUM_FILE,
443                                  checksum_file_path, use_checksum=False)
444        except error.PackageFetchError:
445            # packaging system might not be working in this case
446            # Silently fall back to the normal case
447            pass
448
449
450    def require_gcc(self):
451        """
452        Test whether gcc is installed on the machine.
453        """
454        # check if gcc is installed on the system.
455        try:
456            utils.system('which gcc')
457        except error.CmdError:
458            raise NotAvailableError('gcc is required by this job and is '
459                                    'not available on the system')
460
461
462    def setup_dep(self, deps):
463        """Set up the dependencies for this test.
464        deps is a list of libraries required for this test.
465        """
466        # Fetch the deps from the repositories and set them up.
467        for dep in deps:
468            dep_dir = os.path.join(self.autodir, 'deps', dep)
469            # Search for the dependency in the repositories if specified,
470            # else check locally.
471            try:
472                self.install_pkg(dep, 'dep', dep_dir)
473            except error.PackageInstallError:
474                # see if the dep is there locally
475                pass
476
477            # dep_dir might not exist if it is not fetched from the repos
478            if not os.path.exists(dep_dir):
479                raise error.TestError("Dependency %s does not exist" % dep)
480
481            os.chdir(dep_dir)
482            if execfile('%s.py' % dep, {}) is None:
483                logging.info('Dependency %s successfuly built', dep)
484
485
486    def _runtest(self, url, tag, timeout, args, dargs):
487        try:
488            l = lambda : test.runtest(self, url, tag, args, dargs)
489            pid = parallel.fork_start(self.resultdir, l)
490
491            self._forkwait(pid, timeout)
492
493        except error.TestBaseException:
494            # These are already classified with an error type (exit_status)
495            raise
496        except error.JobError:
497            raise  # Caught further up and turned into an ABORT.
498        except Exception, e:
499            # Converts all other exceptions thrown by the test regardless
500            # of phase into a TestError(TestBaseException) subclass that
501            # reports them with their full stack trace.
502            raise error.UnhandledTestError(e)
503
504    def _forkwait(self, pid, timeout=None):
505        """Wait for the given pid to complete
506
507        @param pid (int) process id to wait for
508        @param timeout (int) seconds to wait before timing out the process"""
509        if timeout:
510            logging.debug('Waiting for pid %d for %d seconds', pid, timeout)
511            parallel.fork_waitfor_timed(self.resultdir, pid, timeout)
512        else:
513            logging.debug('Waiting for pid %d', pid)
514            parallel.fork_waitfor(self.resultdir, pid)
515        logging.info('pid %d completed', pid)
516
517
518    def _run_test_base(self, url, *args, **dargs):
519        """
520        Prepares arguments and run functions to run_test and run_test_detail.
521
522        @param url A url that identifies the test to run.
523        @param tag An optional keyword argument that will be added to the
524            test and subdir name.
525        @param subdir_tag An optional keyword argument that will be added
526            to the subdir name.
527
528        @returns:
529                subdir: Test subdirectory
530                testname: Test name
531                group_func: Actual test run function
532                timeout: Test timeout
533        """
534        _group, testname = self.pkgmgr.get_package_name(url, 'test')
535        testname, subdir, tag = self._build_tagged_test_name(testname, dargs)
536        self._make_test_outputdir(subdir)
537
538        timeout = dargs.pop('timeout', None)
539        if timeout:
540            logging.debug('Test has timeout: %d sec.', timeout)
541
542        def log_warning(reason):
543            self.record("WARN", subdir, testname, reason)
544        @disk_usage_monitor.watch(log_warning, "/", self._max_disk_usage_rate)
545        def group_func():
546            try:
547                self._runtest(url, tag, timeout, args, dargs)
548            except error.TestBaseException, detail:
549                # The error is already classified, record it properly.
550                self.record(detail.exit_status, subdir, testname, str(detail))
551                raise
552            else:
553                self.record('GOOD', subdir, testname, 'completed successfully')
554
555        return (subdir, testname, group_func, timeout)
556
557
558    @_run_test_complete_on_exit
559    def run_test(self, url, *args, **dargs):
560        """
561        Summon a test object and run it.
562
563        @param url A url that identifies the test to run.
564        @param tag An optional keyword argument that will be added to the
565            test and subdir name.
566        @param subdir_tag An optional keyword argument that will be added
567            to the subdir name.
568
569        @returns True if the test passes, False otherwise.
570        """
571        (subdir, testname, group_func, timeout) = self._run_test_base(url,
572                                                                      *args,
573                                                                      **dargs)
574        try:
575            self._rungroup(subdir, testname, group_func, timeout)
576            return True
577        except error.TestBaseException:
578            return False
579        # Any other exception here will be given to the caller
580        #
581        # NOTE: The only exception possible from the control file here
582        # is error.JobError as _runtest() turns all others into an
583        # UnhandledTestError that is caught above.
584
585
586    def stage_control_file(self, url):
587        """
588        Install the test package and return the control file path.
589
590        @param url The name of the test, e.g. dummy_Pass.  This is the
591            string passed to run_test in the client test control file:
592            job.run_test('dummy_Pass')
593            This name can also be something like 'camera_HAL3.jea',
594            which corresponds to a test package containing multiple
595            control files, each with calls to:
596            job.run_test('camera_HAL3', **opts)
597
598        @returns Absolute path to the control file for the test.
599        """
600        testname, _, _tag = url.partition('.')
601        bindir = os.path.join(self.testdir, testname)
602        self.install_pkg(testname, 'test', bindir)
603        return _locate_test_control_file(bindir, url)
604
605
606    @_run_test_complete_on_exit
607    def run_test_detail(self, url, *args, **dargs):
608        """
609        Summon a test object and run it, returning test status.
610
611        @param url A url that identifies the test to run.
612        @param tag An optional keyword argument that will be added to the
613            test and subdir name.
614        @param subdir_tag An optional keyword argument that will be added
615            to the subdir name.
616
617        @returns Test status
618        @see: client/common_lib/error.py, exit_status
619        """
620        (subdir, testname, group_func, timeout) = self._run_test_base(url,
621                                                                      *args,
622                                                                      **dargs)
623        try:
624            self._rungroup(subdir, testname, group_func, timeout)
625            return 'GOOD'
626        except error.TestBaseException, detail:
627            return detail.exit_status
628
629
630    def _rungroup(self, subdir, testname, function, timeout, *args, **dargs):
631        """\
632        subdir:
633                name of the group
634        testname:
635                name of the test to run, or support step
636        function:
637                subroutine to run
638        *args:
639                arguments for the function
640
641        Returns the result of the passed in function
642        """
643
644        try:
645            optional_fields = None
646            if timeout:
647                optional_fields = {}
648                optional_fields['timeout'] = timeout
649            self.record('START', subdir, testname,
650                        optional_fields=optional_fields)
651
652            self._state.set('client', 'unexpected_reboot', (subdir, testname))
653            try:
654                result = function(*args, **dargs)
655                self.record('END GOOD', subdir, testname)
656                return result
657            except error.TestBaseException, e:
658                self.record('END %s' % e.exit_status, subdir, testname)
659                raise
660            except error.JobError, e:
661                self.record('END ABORT', subdir, testname)
662                raise
663            except Exception, e:
664                # This should only ever happen due to a bug in the given
665                # function's code.  The common case of being called by
666                # run_test() will never reach this.  If a control file called
667                # run_group() itself, bugs in its function will be caught
668                # here.
669                err_msg = str(e) + '\n' + traceback.format_exc()
670                self.record('END ERROR', subdir, testname, err_msg)
671                raise
672        finally:
673            self._state.discard('client', 'unexpected_reboot')
674
675
676    def run_group(self, function, tag=None, **dargs):
677        """
678        Run a function nested within a group level.
679
680        function:
681                Callable to run.
682        tag:
683                An optional tag name for the group.  If None (default)
684                function.__name__ will be used.
685        **dargs:
686                Named arguments for the function.
687        """
688        if tag:
689            name = tag
690        else:
691            name = function.__name__
692
693        try:
694            return self._rungroup(subdir=None, testname=name,
695                                  function=function, timeout=None, **dargs)
696        except (SystemExit, error.TestBaseException):
697            raise
698        # If there was a different exception, turn it into a TestError.
699        # It will be caught by step_engine or _run_step_fn.
700        except Exception, e:
701            raise error.UnhandledTestError(e)
702
703
704    def cpu_count(self):
705        return utils.count_cpus()  # use total system count
706
707
708    def start_reboot(self):
709        self.record('START', None, 'reboot')
710        self.record('GOOD', None, 'reboot.start')
711
712
713    def _record_reboot_failure(self, subdir, operation, status,
714                               running_id=None):
715        self.record("ABORT", subdir, operation, status)
716        if not running_id:
717            running_id = utils.running_os_ident()
718        kernel = {"kernel": running_id.split("::")[0]}
719        self.record("END ABORT", subdir, 'reboot', optional_fields=kernel)
720
721
722    def _check_post_reboot(self, subdir, running_id=None):
723        """
724        Function to perform post boot checks such as if the system configuration
725        has changed across reboots (specifically, CPUs and partitions).
726
727        @param subdir: The subdir to use in the job.record call.
728        @param running_id: An optional running_id to include in the reboot
729            failure log message
730
731        @raise JobError: Raised if the current configuration does not match the
732            pre-reboot configuration.
733        """
734        # check to see if any partitions have changed
735        partition_list = partition_lib.get_partition_list(self,
736                                                          exclude_swap=False)
737        mount_info = partition_lib.get_mount_info(partition_list)
738        old_mount_info = self._state.get('client', 'mount_info')
739        if mount_info != old_mount_info:
740            new_entries = mount_info - old_mount_info
741            old_entries = old_mount_info - mount_info
742            description = ("mounted partitions are different after reboot "
743                           "(old entries: %s, new entries: %s)" %
744                           (old_entries, new_entries))
745            self._record_reboot_failure(subdir, "reboot.verify_config",
746                                        description, running_id=running_id)
747            raise error.JobError("Reboot failed: %s" % description)
748
749        # check to see if any CPUs have changed
750        cpu_count = utils.count_cpus()
751        old_count = self._state.get('client', 'cpu_count')
752        if cpu_count != old_count:
753            description = ('Number of CPUs changed after reboot '
754                           '(old count: %d, new count: %d)' %
755                           (old_count, cpu_count))
756            self._record_reboot_failure(subdir, 'reboot.verify_config',
757                                        description, running_id=running_id)
758            raise error.JobError('Reboot failed: %s' % description)
759
760
761    def partition(self, device, loop_size=0, mountpoint=None):
762        """
763        Work with a machine partition
764
765            @param device: e.g. /dev/sda2, /dev/sdb1 etc...
766            @param mountpoint: Specify a directory to mount to. If not specified
767                               autotest tmp directory will be used.
768            @param loop_size: Size of loopback device (in MB). Defaults to 0.
769
770            @return: A L{client.bin.partition.partition} object
771        """
772
773        if not mountpoint:
774            mountpoint = self.tmpdir
775        return partition_lib.partition(self, device, loop_size, mountpoint)
776
777    @utils.deprecated
778    def filesystem(self, device, mountpoint=None, loop_size=0):
779        """ Same as partition
780
781        @deprecated: Use partition method instead
782        """
783        return self.partition(device, loop_size, mountpoint)
784
785
786    def enable_external_logging(self):
787        pass
788
789
790    def disable_external_logging(self):
791        pass
792
793
794    def reboot_setup(self):
795        # save the partition list and mount points, as well as the cpu count
796        partition_list = partition_lib.get_partition_list(self,
797                                                          exclude_swap=False)
798        mount_info = partition_lib.get_mount_info(partition_list)
799        self._state.set('client', 'mount_info', mount_info)
800        self._state.set('client', 'cpu_count', utils.count_cpus())
801
802
803    def reboot(self):
804        self.reboot_setup()
805        self.harness.run_reboot()
806
807        # HACK: using this as a module sometimes hangs shutdown, so if it's
808        # installed unload it first
809        utils.system("modprobe -r netconsole", ignore_status=True)
810
811        # sync first, so that a sync during shutdown doesn't time out
812        utils.system("sync; sync", ignore_status=True)
813
814        utils.system("(sleep 5; reboot) </dev/null >/dev/null 2>&1 &")
815        self.quit()
816
817
818    def noop(self, text):
819        logging.info("job: noop: " + text)
820
821
822    @_run_test_complete_on_exit
823    def parallel(self, *tasklist, **kwargs):
824        """Run tasks in parallel"""
825
826        pids = []
827        old_log_filename = self._logger.global_filename
828        for i, task in enumerate(tasklist):
829            assert isinstance(task, (tuple, list))
830            self._logger.global_filename = old_log_filename + (".%d" % i)
831            def task_func():
832                # stub out _record_indent with a process-local one
833                base_record_indent = self._record_indent
834                proc_local = self._job_state.property_factory(
835                    '_state', '_record_indent.%d' % os.getpid(),
836                    base_record_indent, namespace='client')
837                self.__class__._record_indent = proc_local
838                task[0](*task[1:])
839            forked_pid = parallel.fork_start(self.resultdir, task_func)
840            logging.info('Just forked pid %d', forked_pid)
841            pids.append(forked_pid)
842
843        old_log_path = os.path.join(self.resultdir, old_log_filename)
844        old_log = open(old_log_path, "a")
845        exceptions = []
846        for i, pid in enumerate(pids):
847            # wait for the task to finish
848            try:
849                self._forkwait(pid, kwargs.get('timeout'))
850            except Exception, e:
851                logging.info('pid %d completed with error', pid)
852                exceptions.append(e)
853            # copy the logs from the subtask into the main log
854            new_log_path = old_log_path + (".%d" % i)
855            if os.path.exists(new_log_path):
856                new_log = open(new_log_path)
857                old_log.write(new_log.read())
858                new_log.close()
859                old_log.flush()
860                os.remove(new_log_path)
861        old_log.close()
862
863        self._logger.global_filename = old_log_filename
864
865        # handle any exceptions raised by the parallel tasks
866        if exceptions:
867            msg = "%d task(s) failed in job.parallel" % len(exceptions)
868            raise error.JobError(msg)
869
870
871    def quit(self):
872        # XXX: should have a better name.
873        self.harness.run_pause()
874        raise error.JobContinue("more to come")
875
876
877    def complete(self, status):
878        """Write pending reports, clean up, and exit"""
879        # write out a job HTML report
880        try:
881            html_report.create_report(self.resultdir)
882        except Exception, e:
883            logging.error("Error writing job HTML report: %s", e)
884
885        # We are about to exit 'complete' so clean up the control file.
886        dest = os.path.join(self.resultdir, os.path.basename(self._state_file))
887        shutil.move(self._state_file, dest)
888
889        self.harness.run_complete()
890        self.disable_external_logging()
891        sys.exit(status)
892
893
894    def _load_state(self):
895        # grab any initial state and set up $CONTROL.state as the backing file
896        init_state_file = self.control + '.init.state'
897        self._state_file = self.control + '.state'
898        if os.path.exists(init_state_file):
899            shutil.move(init_state_file, self._state_file)
900        self._state.set_backing_file(self._state_file)
901
902        # initialize the state engine, if necessary
903        has_steps = self._state.has('client', 'steps')
904        if not self._is_continuation and has_steps:
905            raise RuntimeError('Loaded state can only contain client.steps if '
906                               'this is a continuation')
907
908        if not has_steps:
909            logging.debug('Initializing the state engine')
910            self._state.set('client', 'steps', [])
911
912
913    def handle_persistent_option(self, options, option_name):
914        """
915        Select option from command line or persistent state.
916        Store selected option to allow standalone client to continue
917        after reboot with previously selected options.
918        Priority:
919        1. explicitly specified via command line
920        2. stored in state file (if continuing job '-c')
921        3. default == None
922        """
923        option = None
924        cmd_line_option = getattr(options, option_name)
925        if cmd_line_option:
926            option = cmd_line_option
927            self._state.set('client', option_name, option)
928        else:
929            stored_option = self._state.get('client', option_name, None)
930            if stored_option:
931                option = stored_option
932        logging.debug('Persistent option %s now set to %s', option_name, option)
933        return option
934
935
936    def __create_step_tuple(self, fn, args, dargs):
937        # Legacy code passes in an array where the first arg is
938        # the function or its name.
939        if isinstance(fn, list):
940            assert(len(args) == 0)
941            assert(len(dargs) == 0)
942            args = fn[1:]
943            fn = fn[0]
944        # Pickling actual functions is hairy, thus we have to call
945        # them by name.  Unfortunately, this means only functions
946        # defined globally can be used as a next step.
947        if callable(fn):
948            fn = fn.__name__
949        if not isinstance(fn, types.StringTypes):
950            raise StepError("Next steps must be functions or "
951                            "strings containing the function name")
952        ancestry = copy.copy(self._current_step_ancestry)
953        return (ancestry, fn, args, dargs)
954
955
956    def next_step_append(self, fn, *args, **dargs):
957        """Define the next step and place it at the end"""
958        steps = self._state.get('client', 'steps')
959        steps.append(self.__create_step_tuple(fn, args, dargs))
960        self._state.set('client', 'steps', steps)
961
962
963    def next_step(self, fn, *args, **dargs):
964        """Create a new step and place it after any steps added
965        while running the current step but before any steps added in
966        previous steps"""
967        steps = self._state.get('client', 'steps')
968        steps.insert(self._next_step_index,
969                     self.__create_step_tuple(fn, args, dargs))
970        self._next_step_index += 1
971        self._state.set('client', 'steps', steps)
972
973
974    def next_step_prepend(self, fn, *args, **dargs):
975        """Insert a new step, executing first"""
976        steps = self._state.get('client', 'steps')
977        steps.insert(0, self.__create_step_tuple(fn, args, dargs))
978        self._next_step_index += 1
979        self._state.set('client', 'steps', steps)
980
981
982
983    def _run_step_fn(self, local_vars, fn, args, dargs):
984        """Run a (step) function within the given context"""
985
986        local_vars['__args'] = args
987        local_vars['__dargs'] = dargs
988        try:
989            exec('__ret = %s(*__args, **__dargs)' % fn, local_vars, local_vars)
990            return local_vars['__ret']
991        except SystemExit:
992            raise  # Send error.JobContinue and JobComplete on up to runjob.
993        except error.TestNAError, detail:
994            self.record(detail.exit_status, None, fn, str(detail))
995        except Exception, detail:
996            raise error.UnhandledJobError(detail)
997
998
999    def _create_frame(self, global_vars, ancestry, fn_name):
1000        """Set up the environment like it would have been when this
1001        function was first defined.
1002
1003        Child step engine 'implementations' must have 'return locals()'
1004        at end end of their steps.  Because of this, we can call the
1005        parent function and get back all child functions (i.e. those
1006        defined within it).
1007
1008        Unfortunately, the call stack of the function calling
1009        job.next_step might have been deeper than the function it
1010        added.  In order to make sure that the environment is what it
1011        should be, we need to then pop off the frames we built until
1012        we find the frame where the function was first defined."""
1013
1014        # The copies ensure that the parent frames are not modified
1015        # while building child frames.  This matters if we then
1016        # pop some frames in the next part of this function.
1017        current_frame = copy.copy(global_vars)
1018        frames = [current_frame]
1019        for steps_fn_name in ancestry:
1020            ret = self._run_step_fn(current_frame, steps_fn_name, [], {})
1021            current_frame = copy.copy(ret)
1022            frames.append(current_frame)
1023
1024        # Walk up the stack frames until we find the place fn_name was defined.
1025        while len(frames) > 2:
1026            if fn_name not in frames[-2]:
1027                break
1028            if frames[-2][fn_name] != frames[-1][fn_name]:
1029                break
1030            frames.pop()
1031            ancestry.pop()
1032
1033        return (frames[-1], ancestry)
1034
1035
1036    def _add_step_init(self, local_vars, current_function):
1037        """If the function returned a dictionary that includes a
1038        function named 'step_init', prepend it to our list of steps.
1039        This will only get run the first time a function with a nested
1040        use of the step engine is run."""
1041
1042        if (isinstance(local_vars, dict) and
1043            'step_init' in local_vars and
1044            callable(local_vars['step_init'])):
1045            # The init step is a child of the function
1046            # we were just running.
1047            self._current_step_ancestry.append(current_function)
1048            self.next_step_prepend('step_init')
1049
1050
1051    def step_engine(self):
1052        """The multi-run engine used when the control file defines step_init.
1053
1054        Does the next step.
1055        """
1056
1057        # Set up the environment and then interpret the control file.
1058        # Some control files will have code outside of functions,
1059        # which means we need to have our state engine initialized
1060        # before reading in the file.
1061        global_control_vars = {'job': self,
1062                               'args': self.args}
1063        exec(JOB_PREAMBLE, global_control_vars, global_control_vars)
1064        try:
1065            execfile(self.control, global_control_vars, global_control_vars)
1066        except error.TestNAError, detail:
1067            self.record(detail.exit_status, None, self.control, str(detail))
1068        except SystemExit:
1069            raise  # Send error.JobContinue and JobComplete on up to runjob.
1070        except Exception, detail:
1071            # Syntax errors or other general Python exceptions coming out of
1072            # the top level of the control file itself go through here.
1073            raise error.UnhandledJobError(detail)
1074
1075        # If we loaded in a mid-job state file, then we presumably
1076        # know what steps we have yet to run.
1077        if not self._is_continuation:
1078            if 'step_init' in global_control_vars:
1079                self.next_step(global_control_vars['step_init'])
1080        else:
1081            # if last job failed due to unexpected reboot, record it as fail
1082            # so harness gets called
1083            last_job = self._state.get('client', 'unexpected_reboot', None)
1084            if last_job:
1085                subdir, testname = last_job
1086                self.record('FAIL', subdir, testname, 'unexpected reboot')
1087                self.record('END FAIL', subdir, testname)
1088
1089        # Iterate through the steps.  If we reboot, we'll simply
1090        # continue iterating on the next step.
1091        while len(self._state.get('client', 'steps')) > 0:
1092            steps = self._state.get('client', 'steps')
1093            (ancestry, fn_name, args, dargs) = steps.pop(0)
1094            self._state.set('client', 'steps', steps)
1095
1096            self._next_step_index = 0
1097            ret = self._create_frame(global_control_vars, ancestry, fn_name)
1098            local_vars, self._current_step_ancestry = ret
1099            local_vars = self._run_step_fn(local_vars, fn_name, args, dargs)
1100            self._add_step_init(local_vars, fn_name)
1101
1102
1103    def add_sysinfo_command(self, command, logfile=None, on_every_test=False):
1104        self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile),
1105                                   on_every_test)
1106
1107
1108    def add_sysinfo_logfile(self, file, on_every_test=False):
1109        self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test)
1110
1111
1112    def _add_sysinfo_loggable(self, loggable, on_every_test):
1113        if on_every_test:
1114            self.sysinfo.test_loggables.add(loggable)
1115        else:
1116            self.sysinfo.boot_loggables.add(loggable)
1117        self._save_sysinfo_state()
1118
1119
1120    def _load_sysinfo_state(self):
1121        state = self._state.get('client', 'sysinfo', None)
1122        if state:
1123            self.sysinfo.deserialize(state)
1124
1125
1126    def _save_sysinfo_state(self):
1127        state = self.sysinfo.serialize()
1128        self._state.set('client', 'sysinfo', state)
1129
1130
1131class disk_usage_monitor:
1132    def __init__(self, logging_func, device, max_mb_per_hour):
1133        self.func = logging_func
1134        self.device = device
1135        self.max_mb_per_hour = max_mb_per_hour
1136
1137
1138    def start(self):
1139        self.initial_space = utils.freespace(self.device)
1140        self.start_time = time.time()
1141
1142
1143    def stop(self):
1144        # if no maximum usage rate was set, we don't need to
1145        # generate any warnings
1146        if not self.max_mb_per_hour:
1147            return
1148
1149        final_space = utils.freespace(self.device)
1150        used_space = self.initial_space - final_space
1151        stop_time = time.time()
1152        total_time = stop_time - self.start_time
1153        # round up the time to one minute, to keep extremely short
1154        # tests from generating false positives due to short, badly
1155        # timed bursts of activity
1156        total_time = max(total_time, 60.0)
1157
1158        # determine the usage rate
1159        bytes_per_sec = used_space / total_time
1160        mb_per_sec = bytes_per_sec / 1024**2
1161        mb_per_hour = mb_per_sec * 60 * 60
1162
1163        if mb_per_hour > self.max_mb_per_hour:
1164            msg = ("disk space on %s was consumed at a rate of %.2f MB/hour")
1165            msg %= (self.device, mb_per_hour)
1166            self.func(msg)
1167
1168
1169    @classmethod
1170    def watch(cls, *monitor_args, **monitor_dargs):
1171        """ Generic decorator to wrap a function call with the
1172        standard create-monitor -> start -> call -> stop idiom."""
1173        def decorator(func):
1174            def watched_func(*args, **dargs):
1175                monitor = cls(*monitor_args, **monitor_dargs)
1176                monitor.start()
1177                try:
1178                    func(*args, **dargs)
1179                finally:
1180                    monitor.stop()
1181            return watched_func
1182        return decorator
1183
1184
1185def runjob(control, drop_caches, options):
1186    """
1187    Run a job using the given control file.
1188
1189    This is the main interface to this module.
1190
1191    @see base_job.__init__ for parameter info.
1192    """
1193    control = os.path.abspath(control)
1194    state = control + '.state'
1195    # Ensure state file is cleaned up before the job starts to run if autotest
1196    # is not running with the --continue flag
1197    if not options.cont and os.path.isfile(state):
1198        logging.debug('Cleaning up previously found state file')
1199        os.remove(state)
1200
1201    # instantiate the job object ready for the control file.
1202    myjob = None
1203    try:
1204        # Check that the control file is valid
1205        if not os.path.exists(control):
1206            raise error.JobError(control + ": control file not found")
1207
1208        # When continuing, the job is complete when there is no
1209        # state file, ensure we don't try and continue.
1210        if options.cont and not os.path.exists(state):
1211            raise error.JobComplete("all done")
1212
1213        myjob = job(control=control, drop_caches=drop_caches, options=options)
1214
1215        # Load in the users control file, may do any one of:
1216        #  1) execute in toto
1217        #  2) define steps, and select the first via next_step()
1218        myjob.step_engine()
1219
1220    except error.JobContinue:
1221        sys.exit(5)
1222
1223    except error.JobComplete:
1224        sys.exit(1)
1225
1226    except error.JobError, instance:
1227        logging.error("JOB ERROR: " + str(instance))
1228        if myjob:
1229            command = None
1230            if len(instance.args) > 1:
1231                command = instance.args[1]
1232                myjob.record('ABORT', None, command, str(instance))
1233            myjob.record('END ABORT', None, None, str(instance))
1234            assert myjob._record_indent == 0
1235            myjob.complete(1)
1236        else:
1237            sys.exit(1)
1238
1239    except Exception, e:
1240        # NOTE: job._run_step_fn and job.step_engine will turn things into
1241        # a JobError for us.  If we get here, its likely an autotest bug.
1242        msg = str(e) + '\n' + traceback.format_exc()
1243        logging.critical("JOB ERROR (autotest bug?): " + msg)
1244        if myjob:
1245            myjob.record('END ABORT', None, None, msg)
1246            assert myjob._record_indent == 0
1247            myjob.complete(1)
1248        else:
1249            sys.exit(1)
1250
1251    # If we get here, then we assume the job is complete and good.
1252    myjob.record('END GOOD', None, None)
1253    assert myjob._record_indent == 0
1254
1255    myjob.complete(0)
1256
1257
1258class job(base_client_job):
1259
1260    def __init__(self, *args, **kwargs):
1261        base_client_job.__init__(self, *args, **kwargs)
1262
1263
1264    def run_test(self, url, *args, **dargs):
1265        log_pauser = cros_logging.LogRotationPauser()
1266        passed = False
1267        try:
1268            log_pauser.begin()
1269            passed = base_client_job.run_test(self, url, *args, **dargs)
1270            if not passed:
1271                # Save the VM state immediately after the test failure.
1272                # This is a NOOP if the the test isn't running in a VM or
1273                # if the VM is not properly configured to save state.
1274                _group, testname = self.pkgmgr.get_package_name(url, 'test')
1275                now = datetime.now().strftime('%I:%M:%S.%f')
1276                checkpoint_name = '%s-%s' % (testname, now)
1277                utils.save_vm_state(checkpoint_name)
1278        finally:
1279            log_pauser.end()
1280        return passed
1281
1282
1283    def reboot(self):
1284        self.reboot_setup()
1285        self.harness.run_reboot()
1286
1287        # sync first, so that a sync during shutdown doesn't time out
1288        utils.system('sync; sync', ignore_status=True)
1289
1290        utils.system('reboot </dev/null >/dev/null 2>&1 &')
1291        self.quit()
1292
1293
1294    def require_gcc(self):
1295        return False
1296
1297
1298# TODO(ayatane): This logic should be deduplicated with
1299# server/cros/dynamic_suite/control_file_getter.py, but the server
1300# libraries are not available on clients.
1301def _locate_test_control_file(dirpath, testname):
1302    """
1303    Locate the control file for the given test.
1304
1305    @param dirpath Root directory to search.
1306    @param testname Name of test.
1307
1308    @returns Absolute path to the control file.
1309    @raise JobError: Raised if control file not found.
1310    """
1311    for dirpath, _dirnames, filenames in os.walk(dirpath):
1312        for filename in filenames:
1313            if 'control' not in filename:
1314                continue
1315            path = os.path.join(dirpath, filename)
1316            if _is_control_file_for_test(path, testname):
1317                return os.path.abspath(path)
1318    raise error.JobError(
1319            'could not find client test control file',
1320            dirpath, testname)
1321
1322
1323_NAME_PATTERN = "NAME *= *['\"]([^'\"]+)['\"]"
1324
1325
1326def _is_control_file_for_test(path, testname):
1327    with open(path) as f:
1328        for line in f:
1329            match = re.match(_NAME_PATTERN, line)
1330            if match is not None:
1331                return match.group(1) == testname
1332