1# Lint as: python2, python3
2"""The main job wrapper
3
4This is the core infrastructure.
5
6Copyright Andy Whitcroft, Martin J. Bligh 2006
7"""
8
9# pylint: disable=missing-docstring
10
11import copy
12from datetime import datetime
13import getpass
14import glob
15import logging
16import os
17import re
18import shutil
19import sys
20import time
21import traceback
22import types
23import weakref
24
25import six
26
27import common
28from autotest_lib.client.bin import client_logging_config
29from autotest_lib.client.bin import harness
30from autotest_lib.client.bin import local_host
31from autotest_lib.client.bin import parallel
32from autotest_lib.client.bin import partition as partition_lib
33from autotest_lib.client.bin import profilers
34from autotest_lib.client.bin import sysinfo
35from autotest_lib.client.bin import test
36from autotest_lib.client.bin import utils
37from autotest_lib.client.common_lib import barrier
38from autotest_lib.client.common_lib import base_job
39from autotest_lib.client.common_lib import control_data
40from autotest_lib.client.common_lib import error
41from autotest_lib.client.common_lib import global_config
42from autotest_lib.client.common_lib import logging_manager
43from autotest_lib.client.common_lib import packages
44from autotest_lib.client.cros import cros_logging
45from autotest_lib.client.tools import html_report
46
47GLOBAL_CONFIG = global_config.global_config
48
49LAST_BOOT_TAG = object()
50JOB_PREAMBLE = """
51from autotest_lib.client.common_lib.error import *
52from autotest_lib.client.bin.utils import *
53"""
54
55
56class StepError(error.AutotestError):
57    pass
58
59class NotAvailableError(error.AutotestError):
60    pass
61
62
63
64def _run_test_complete_on_exit(f):
65    """Decorator for job methods that automatically calls
66    self.harness.run_test_complete when the method exits, if appropriate."""
67    def wrapped(self, *args, **dargs):
68        try:
69            return f(self, *args, **dargs)
70        finally:
71            if self._logger.global_filename == 'status':
72                self.harness.run_test_complete()
73                if self.drop_caches:
74                    utils.drop_caches()
75    wrapped.__name__ = f.__name__
76    wrapped.__doc__ = f.__doc__
77    wrapped.__dict__.update(f.__dict__)
78    return wrapped
79
80
81class status_indenter(base_job.status_indenter):
82    """Provide a status indenter that is backed by job._record_prefix."""
83    def __init__(self, job_):
84        self._job = weakref.proxy(job_)  # avoid a circular reference
85
86
87    @property
88    def indent(self):
89        return self._job._record_indent
90
91
92    def increment(self):
93        self._job._record_indent += 1
94
95
96    def decrement(self):
97        self._job._record_indent -= 1
98
99
100class base_client_job(base_job.base_job):
101    """The client-side concrete implementation of base_job.
102
103    Optional properties provided by this implementation:
104        control
105        harness
106    """
107
108    _WARNING_DISABLE_DELAY = 5
109
110    # _record_indent is a persistent property, but only on the client
111    _job_state = base_job.base_job._job_state
112    _record_indent = _job_state.property_factory(
113        '_state', '_record_indent', 0, namespace='client')
114    _max_disk_usage_rate = _job_state.property_factory(
115        '_state', '_max_disk_usage_rate', 0.0, namespace='client')
116
117
118    def __init__(self, control, options, drop_caches=True):
119        """
120        Prepare a client side job object.
121
122        @param control: The control file (pathname of).
123        @param options: an object which includes:
124                jobtag: The job tag string (eg "default").
125                cont: If this is the continuation of this job.
126                harness_type: An alternative server harness.  [None]
127                use_external_logging: If true, the enable_external_logging
128                          method will be called during construction.  [False]
129        @param drop_caches: If true, utils.drop_caches() is called before and
130                between all tests.  [True]
131        """
132        super(base_client_job, self).__init__(options=options)
133        self._pre_record_init(control, options)
134        try:
135            self._post_record_init(control, options, drop_caches)
136        except Exception as err:
137            self.record(
138                    'ABORT', None, None,'client.bin.job.__init__ failed: %s' %
139                    str(err))
140            raise
141
142
143    @classmethod
144    def _get_environ_autodir(cls):
145        return os.environ['AUTODIR']
146
147
148    @classmethod
149    def _find_base_directories(cls):
150        """
151        Determine locations of autodir and clientdir (which are the same)
152        using os.environ. Serverdir does not exist in this context.
153        """
154        autodir = clientdir = cls._get_environ_autodir()
155        return autodir, clientdir, None
156
157
158    @classmethod
159    def _parse_args(cls, args):
160        return re.findall("[^\s]*?['|\"].*?['|\"]|[^\s]+", args)
161
162
163    def _find_resultdir(self, options):
164        """
165        Determine the directory for storing results. On a client this is
166        always <autodir>/results/<tag>, where tag is passed in on the command
167        line as an option.
168        """
169        output_dir_config = GLOBAL_CONFIG.get_config_value('CLIENT',
170                                                           'output_dir',
171                                                            default="")
172        if options.output_dir:
173            basedir = options.output_dir
174        elif output_dir_config:
175            basedir = output_dir_config
176        else:
177            basedir = self.autodir
178
179        return os.path.join(basedir, 'results', options.tag)
180
181
182    def _get_status_logger(self):
183        """Return a reference to the status logger."""
184        return self._logger
185
186
187    def _pre_record_init(self, control, options):
188        """
189        Initialization function that should peform ONLY the required
190        setup so that the self.record() method works.
191
192        As of now self.record() needs self.resultdir, self._group_level,
193        self.harness and of course self._logger.
194        """
195        if not options.cont:
196            self._cleanup_debugdir_files()
197            self._cleanup_results_dir()
198
199        logging_manager.configure_logging(
200            client_logging_config.ClientLoggingConfig(),
201            results_dir=self.resultdir,
202            verbose=options.verbose)
203        logging.info('Writing results to %s', self.resultdir)
204
205        # init_group_level needs the state
206        self.control = os.path.realpath(control)
207        self._is_continuation = options.cont
208        self._current_step_ancestry = []
209        self._next_step_index = 0
210        self._load_state()
211
212        _harness = self.handle_persistent_option(options, 'harness')
213        _harness_args = self.handle_persistent_option(options, 'harness_args')
214
215        self.harness = harness.select(_harness, self, _harness_args)
216
217        if self.control:
218            parsed_control = control_data.parse_control(
219                    self.control, raise_warnings=False)
220            self.fast = parsed_control.fast
221
222        # set up the status logger
223        def client_job_record_hook(entry):
224            msg_tag = ''
225            if '.' in self._logger.global_filename:
226                msg_tag = self._logger.global_filename.split('.', 1)[1]
227            # send the entry to the job harness
228            message = '\n'.join([entry.message] + entry.extra_message_lines)
229            rendered_entry = self._logger.render_entry(entry)
230            self.harness.test_status_detail(entry.status_code, entry.subdir,
231                                            entry.operation, message, msg_tag,
232                                            entry.fields)
233            self.harness.test_status(rendered_entry, msg_tag)
234            # send the entry to stdout, if it's enabled
235            logging.info(rendered_entry)
236        self._logger = base_job.status_logger(
237            self, status_indenter(self), record_hook=client_job_record_hook)
238
239
240    def _post_record_init(self, control, options, drop_caches):
241        """
242        Perform job initialization not required by self.record().
243        """
244        self._init_drop_caches(drop_caches)
245
246        self._init_packages()
247
248        self.sysinfo = sysinfo.sysinfo(self.resultdir)
249        self._load_sysinfo_state()
250
251        if not options.cont:
252            download = os.path.join(self.testdir, 'download')
253            if not os.path.exists(download):
254                os.mkdir(download)
255
256            shutil.copyfile(self.control,
257                            os.path.join(self.resultdir, 'control'))
258
259        self.control = control
260
261        self.logging = logging_manager.get_logging_manager(
262                manage_stdout_and_stderr=True, redirect_fds=True)
263        self.logging.start_logging()
264
265        self.profilers = profilers.profilers(self)
266
267        self.machines = [options.hostname]
268        self.machine_dict_list = [{'hostname' : options.hostname}]
269        # Client side tests should always run the same whether or not they are
270        # running in the lab.
271        self.in_lab = False
272        self.hosts = set([local_host.LocalHost(hostname=options.hostname)])
273
274        self.args = []
275        if options.args:
276            self.args = self._parse_args(options.args)
277
278        if options.user:
279            self.user = options.user
280        else:
281            self.user = getpass.getuser()
282
283        self.sysinfo.log_per_reboot_data()
284
285        if not options.cont:
286            self.record('START', None, None)
287
288        self.harness.run_start()
289
290        if options.log:
291            self.enable_external_logging()
292
293        self.num_tests_run = None
294        self.num_tests_failed = None
295
296        self.warning_loggers = None
297        self.warning_manager = None
298
299
300    def _init_drop_caches(self, drop_caches):
301        """
302        Perform the drop caches initialization.
303        """
304        self.drop_caches_between_iterations = (
305                                    GLOBAL_CONFIG.get_config_value('CLIENT',
306                                    'drop_caches_between_iterations',
307                                    type=bool, default=True))
308        self.drop_caches = drop_caches
309        if self.drop_caches:
310            utils.drop_caches()
311
312
313    def _init_packages(self):
314        """
315        Perform the packages support initialization.
316        """
317        self.pkgmgr = packages.PackageManager(
318            self.autodir, run_function_dargs={'timeout':3600})
319
320
321    def _cleanup_results_dir(self):
322        """Delete everything in resultsdir"""
323        assert os.path.exists(self.resultdir)
324        list_files = glob.glob('%s/*' % self.resultdir)
325        for f in list_files:
326            if os.path.isdir(f):
327                shutil.rmtree(f)
328            elif os.path.isfile(f):
329                os.remove(f)
330
331
332    def _cleanup_debugdir_files(self):
333        """
334        Delete any leftover debugdir files
335        """
336        list_files = glob.glob("/tmp/autotest_results_dir.*")
337        for f in list_files:
338            os.remove(f)
339
340
341    def disable_warnings(self, warning_type):
342        self.record("INFO", None, None,
343                    "disabling %s warnings" % warning_type,
344                    {"warnings.disable": warning_type})
345        time.sleep(self._WARNING_DISABLE_DELAY)
346
347
348    def enable_warnings(self, warning_type):
349        time.sleep(self._WARNING_DISABLE_DELAY)
350        self.record("INFO", None, None,
351                    "enabling %s warnings" % warning_type,
352                    {"warnings.enable": warning_type})
353
354
355    def monitor_disk_usage(self, max_rate):
356        """\
357        Signal that the job should monitor disk space usage on /
358        and generate a warning if a test uses up disk space at a
359        rate exceeding 'max_rate'.
360
361        Parameters:
362             max_rate - the maximium allowed rate of disk consumption
363                        during a test, in MB/hour, or 0 to indicate
364                        no limit.
365        """
366        self._max_disk_usage_rate = max_rate
367
368
369    def control_get(self):
370        return self.control
371
372
373    def control_set(self, control):
374        self.control = os.path.abspath(control)
375
376
377    def harness_select(self, which, harness_args):
378        self.harness = harness.select(which, self, harness_args)
379
380
381    def setup_dirs(self, results_dir, tmp_dir):
382        if not tmp_dir:
383            tmp_dir = os.path.join(self.tmpdir, 'build')
384        if not os.path.exists(tmp_dir):
385            os.mkdir(tmp_dir)
386        if not os.path.isdir(tmp_dir):
387            e_msg = "Temp dir (%s) is not a dir - args backwards?" % self.tmpdir
388            raise ValueError(e_msg)
389
390        # We label the first build "build" and then subsequent ones
391        # as "build.2", "build.3", etc. Whilst this is a little bit
392        # inconsistent, 99.9% of jobs will only have one build
393        # (that's not done as kernbench, sparse, or buildtest),
394        # so it works out much cleaner. One of life's compromises.
395        if not results_dir:
396            results_dir = os.path.join(self.resultdir, 'build')
397            i = 2
398            while os.path.exists(results_dir):
399                results_dir = os.path.join(self.resultdir, 'build.%d' % i)
400                i += 1
401        if not os.path.exists(results_dir):
402            os.mkdir(results_dir)
403
404        return (results_dir, tmp_dir)
405
406
407    def barrier(self, *args, **kwds):
408        """Create a barrier object"""
409        return barrier.barrier(*args, **kwds)
410
411
412    def install_pkg(self, name, pkg_type, install_dir):
413        '''
414        This method is a simple wrapper around the actual package
415        installation method in the Packager class. This is used
416        internally by the profilers, deps and tests code.
417        name : name of the package (ex: sleeptest, dbench etc.)
418        pkg_type : Type of the package (ex: test, dep etc.)
419        install_dir : The directory in which the source is actually
420                      untarred into. (ex: client/profilers/<name> for profilers)
421        '''
422        if self.pkgmgr.repositories:
423            self.pkgmgr.install_pkg(name, pkg_type, self.pkgdir, install_dir)
424
425
426    def add_repository(self, repo_urls):
427        '''
428        Adds the repository locations to the job so that packages
429        can be fetched from them when needed. The repository list
430        needs to be a string list
431        Ex: job.add_repository(['http://blah1','http://blah2'])
432        '''
433        for repo_url in repo_urls:
434            self.pkgmgr.add_repository(repo_url)
435
436        # Fetch the packages' checksum file that contains the checksums
437        # of all the packages if it is not already fetched. The checksum
438        # is always fetched whenever a job is first started. This
439        # is not done in the job's constructor as we don't have the list of
440        # the repositories there (and obviously don't care about this file
441        # if we are not using the repos)
442        try:
443            checksum_file_path = os.path.join(self.pkgmgr.pkgmgr_dir,
444                                              packages.CHECKSUM_FILE)
445            self.pkgmgr.fetch_pkg(packages.CHECKSUM_FILE,
446                                  checksum_file_path, use_checksum=False)
447        except error.PackageFetchError:
448            # packaging system might not be working in this case
449            # Silently fall back to the normal case
450            pass
451
452
453    def require_gcc(self):
454        """
455        Test whether gcc is installed on the machine.
456        """
457        # check if gcc is installed on the system.
458        try:
459            utils.system('which gcc')
460        except error.CmdError:
461            raise NotAvailableError('gcc is required by this job and is '
462                                    'not available on the system')
463
464
465    def setup_dep(self, deps):
466        """Set up the dependencies for this test.
467        deps is a list of libraries required for this test.
468        """
469        # Fetch the deps from the repositories and set them up.
470        for dep in deps:
471            dep_dir = os.path.join(self.autodir, 'deps', dep)
472            # Search for the dependency in the repositories if specified,
473            # else check locally.
474            try:
475                self.install_pkg(dep, 'dep', dep_dir)
476            except error.PackageInstallError:
477                # see if the dep is there locally
478                pass
479
480            # dep_dir might not exist if it is not fetched from the repos
481            if not os.path.exists(dep_dir):
482                raise error.TestError("Dependency %s does not exist" % dep)
483
484            os.chdir(dep_dir)
485            # Run the dependency, as it could create more files needed for the
486            # tests.
487            # In future this might want to be changed, as this always returns
488            # None, unless the dep.py errors. In which case, it'll error rather
489            # than returning.
490            if eval(compile(open('%s.py' % dep, "rb").read(),
491                            '%s.py' % dep, 'exec'), {}) is None:
492                logging.info('Dependency %s successfuly built', dep)
493
494    def _runtest(self, url, tag, timeout, args, dargs):
495        try:
496            l = lambda : test.runtest(self, url, tag, args, dargs)
497            pid = parallel.fork_start(self.resultdir, l)
498
499            self._forkwait(pid, timeout)
500
501        except error.TestBaseException:
502            # These are already classified with an error type (exit_status)
503            raise
504        except error.JobError:
505            raise  # Caught further up and turned into an ABORT.
506        except Exception as e:
507            # Converts all other exceptions thrown by the test regardless
508            # of phase into a TestError(TestBaseException) subclass that
509            # reports them with their full stack trace.
510            raise error.UnhandledTestError(e)
511
512    def _forkwait(self, pid, timeout=None):
513        """Wait for the given pid to complete
514
515        @param pid (int) process id to wait for
516        @param timeout (int) seconds to wait before timing out the process"""
517        if timeout:
518            logging.debug('Waiting for pid %d for %d seconds', pid, timeout)
519            parallel.fork_waitfor_timed(self.resultdir, pid, timeout)
520        else:
521            logging.debug('Waiting for pid %d', pid)
522            parallel.fork_waitfor(self.resultdir, pid)
523        logging.info('pid %d completed', pid)
524
525
526    def _run_test_base(self, url, *args, **dargs):
527        """
528        Prepares arguments and run functions to run_test and run_test_detail.
529
530        @param url A url that identifies the test to run.
531        @param tag An optional keyword argument that will be added to the
532            test and subdir name.
533        @param subdir_tag An optional keyword argument that will be added
534            to the subdir name.
535
536        @returns:
537                subdir: Test subdirectory
538                testname: Test name
539                group_func: Actual test run function
540                timeout: Test timeout
541        """
542        _group, testname = self.pkgmgr.get_package_name(url, 'test')
543        testname, subdir, tag = self._build_tagged_test_name(testname, dargs)
544        self._make_test_outputdir(subdir)
545
546        timeout = dargs.pop('timeout', None)
547        if timeout:
548            logging.debug('Test has timeout: %d sec.', timeout)
549
550        def log_warning(reason):
551            self.record("WARN", subdir, testname, reason)
552        @disk_usage_monitor.watch(log_warning, "/", self._max_disk_usage_rate)
553        def group_func():
554            try:
555                self._runtest(url, tag, timeout, args, dargs)
556            except error.TestBaseException as detail:
557                # The error is already classified, record it properly.
558                self.record(detail.exit_status, subdir, testname, str(detail))
559                raise
560            else:
561                self.record('GOOD', subdir, testname, 'completed successfully')
562
563        return (subdir, testname, group_func, timeout)
564
565
566    @_run_test_complete_on_exit
567    def run_test(self, url, *args, **dargs):
568        """
569        Summon a test object and run it.
570
571        @param url A url that identifies the test to run.
572        @param tag An optional keyword argument that will be added to the
573            test and subdir name.
574        @param subdir_tag An optional keyword argument that will be added
575            to the subdir name.
576
577        @returns True if the test passes, False otherwise.
578        """
579        (subdir, testname, group_func, timeout) = self._run_test_base(url,
580                                                                      *args,
581                                                                      **dargs)
582        try:
583            self._rungroup(subdir, testname, group_func, timeout)
584            return True
585        except error.TestBaseException:
586            return False
587        # Any other exception here will be given to the caller
588        #
589        # NOTE: The only exception possible from the control file here
590        # is error.JobError as _runtest() turns all others into an
591        # UnhandledTestError that is caught above.
592
593
594    def stage_control_file(self, url):
595        """
596        Install the test package and return the control file path.
597
598        @param url The name of the test, e.g. login_LoginSuccess.  This is the
599            string passed to run_test in the client test control file:
600            job.run_test('login_LoginSuccess')
601            This name can also be something like 'camera_HAL3.jea',
602            which corresponds to a test package containing multiple
603            control files, each with calls to:
604            job.run_test('camera_HAL3', **opts)
605
606        @returns Absolute path to the control file for the test.
607        """
608        testname, _, _tag = url.partition('.')
609        bindir = os.path.join(self.testdir, testname)
610        self.install_pkg(testname, 'test', bindir)
611        return _locate_test_control_file(bindir, url)
612
613
614    @_run_test_complete_on_exit
615    def run_test_detail(self, url, *args, **dargs):
616        """
617        Summon a test object and run it, returning test status.
618
619        @param url A url that identifies the test to run.
620        @param tag An optional keyword argument that will be added to the
621            test and subdir name.
622        @param subdir_tag An optional keyword argument that will be added
623            to the subdir name.
624
625        @returns Test status
626        @see: client/common_lib/error.py, exit_status
627        """
628        (subdir, testname, group_func, timeout) = self._run_test_base(url,
629                                                                      *args,
630                                                                      **dargs)
631        try:
632            self._rungroup(subdir, testname, group_func, timeout)
633            return 'GOOD'
634        except error.TestBaseException as detail:
635            return detail.exit_status
636
637
638    def _rungroup(self, subdir, testname, function, timeout, *args, **dargs):
639        """\
640        subdir:
641                name of the group
642        testname:
643                name of the test to run, or support step
644        function:
645                subroutine to run
646        *args:
647                arguments for the function
648
649        Returns the result of the passed in function
650        """
651
652        try:
653            optional_fields = None
654            if timeout:
655                optional_fields = {}
656                optional_fields['timeout'] = timeout
657            self.record('START', subdir, testname,
658                        optional_fields=optional_fields)
659
660            self._state.set('client', 'unexpected_reboot', (subdir, testname))
661            try:
662                result = function(*args, **dargs)
663                self.record('END GOOD', subdir, testname)
664                return result
665            except error.TestBaseException as e:
666                self.record('END %s' % e.exit_status, subdir, testname)
667                raise
668            except error.JobError as e:
669                self.record('END ABORT', subdir, testname)
670                raise
671            except Exception as e:
672                # This should only ever happen due to a bug in the given
673                # function's code.  The common case of being called by
674                # run_test() will never reach this.  If a control file called
675                # run_group() itself, bugs in its function will be caught
676                # here.
677                err_msg = str(e) + '\n' + traceback.format_exc()
678                self.record('END ERROR', subdir, testname, err_msg)
679                raise
680        finally:
681            self._state.discard('client', 'unexpected_reboot')
682
683
684    def run_group(self, function, tag=None, **dargs):
685        """
686        Run a function nested within a group level.
687
688        function:
689                Callable to run.
690        tag:
691                An optional tag name for the group.  If None (default)
692                function.__name__ will be used.
693        **dargs:
694                Named arguments for the function.
695        """
696        if tag:
697            name = tag
698        else:
699            name = function.__name__
700
701        try:
702            return self._rungroup(subdir=None, testname=name,
703                                  function=function, timeout=None, **dargs)
704        except (SystemExit, error.TestBaseException):
705            raise
706        # If there was a different exception, turn it into a TestError.
707        # It will be caught by step_engine or _run_step_fn.
708        except Exception as e:
709            raise error.UnhandledTestError(e)
710
711
712    def cpu_count(self):
713        return utils.count_cpus()  # use total system count
714
715
716    def start_reboot(self):
717        self.record('START', None, 'reboot')
718        self.record('GOOD', None, 'reboot.start')
719
720
721    def _record_reboot_failure(self, subdir, operation, status,
722                               running_id=None):
723        self.record("ABORT", subdir, operation, status)
724        if not running_id:
725            running_id = utils.running_os_ident()
726        kernel = {"kernel": running_id.split("::")[0]}
727        self.record("END ABORT", subdir, 'reboot', optional_fields=kernel)
728
729
730    def _check_post_reboot(self, subdir, running_id=None):
731        """
732        Function to perform post boot checks such as if the system configuration
733        has changed across reboots (specifically, CPUs and partitions).
734
735        @param subdir: The subdir to use in the job.record call.
736        @param running_id: An optional running_id to include in the reboot
737            failure log message
738
739        @raise JobError: Raised if the current configuration does not match the
740            pre-reboot configuration.
741        """
742        # check to see if any partitions have changed
743        partition_list = partition_lib.get_partition_list(self,
744                                                          exclude_swap=False)
745        mount_info = partition_lib.get_mount_info(partition_list)
746        old_mount_info = self._state.get('client', 'mount_info')
747        if mount_info != old_mount_info:
748            new_entries = mount_info - old_mount_info
749            old_entries = old_mount_info - mount_info
750            description = ("mounted partitions are different after reboot "
751                           "(old entries: %s, new entries: %s)" %
752                           (old_entries, new_entries))
753            self._record_reboot_failure(subdir, "reboot.verify_config",
754                                        description, running_id=running_id)
755            raise error.JobError("Reboot failed: %s" % description)
756
757        # check to see if any CPUs have changed
758        cpu_count = utils.count_cpus()
759        old_count = self._state.get('client', 'cpu_count')
760        if cpu_count != old_count:
761            description = ('Number of CPUs changed after reboot '
762                           '(old count: %d, new count: %d)' %
763                           (old_count, cpu_count))
764            self._record_reboot_failure(subdir, 'reboot.verify_config',
765                                        description, running_id=running_id)
766            raise error.JobError('Reboot failed: %s' % description)
767
768
769    def partition(self, device, loop_size=0, mountpoint=None):
770        """
771        Work with a machine partition
772
773            @param device: e.g. /dev/sda2, /dev/sdb1 etc...
774            @param mountpoint: Specify a directory to mount to. If not specified
775                               autotest tmp directory will be used.
776            @param loop_size: Size of loopback device (in MB). Defaults to 0.
777
778            @return: A L{client.bin.partition.partition} object
779        """
780
781        if not mountpoint:
782            mountpoint = self.tmpdir
783        return partition_lib.partition(self, device, loop_size, mountpoint)
784
785    @utils.deprecated
786    def filesystem(self, device, mountpoint=None, loop_size=0):
787        """ Same as partition
788
789        @deprecated: Use partition method instead
790        """
791        return self.partition(device, loop_size, mountpoint)
792
793
794    def enable_external_logging(self):
795        pass
796
797
798    def disable_external_logging(self):
799        pass
800
801
802    def reboot_setup(self):
803        # save the partition list and mount points, as well as the cpu count
804        partition_list = partition_lib.get_partition_list(self,
805                                                          exclude_swap=False)
806        mount_info = partition_lib.get_mount_info(partition_list)
807        self._state.set('client', 'mount_info', mount_info)
808        self._state.set('client', 'cpu_count', utils.count_cpus())
809
810
811    def reboot(self):
812        self.reboot_setup()
813        self.harness.run_reboot()
814
815        # HACK: using this as a module sometimes hangs shutdown, so if it's
816        # installed unload it first
817        utils.system("modprobe -r netconsole", ignore_status=True)
818
819        # sync first, so that a sync during shutdown doesn't time out
820        utils.system("sync; sync", ignore_status=True)
821
822        utils.system("(sleep 5; reboot) </dev/null >/dev/null 2>&1 &")
823        self.quit()
824
825
826    def noop(self, text):
827        logging.info("job: noop: " + text)
828
829
830    @_run_test_complete_on_exit
831    def parallel(self, *tasklist, **kwargs):
832        """Run tasks in parallel"""
833
834        pids = []
835        old_log_filename = self._logger.global_filename
836        for i, task in enumerate(tasklist):
837            assert isinstance(task, (tuple, list))
838            self._logger.global_filename = old_log_filename + (".%d" % i)
839            def task_func():
840                # stub out _record_indent with a process-local one
841                base_record_indent = self._record_indent
842                proc_local = self._job_state.property_factory(
843                    '_state', '_record_indent.%d' % os.getpid(),
844                    base_record_indent, namespace='client')
845                self.__class__._record_indent = proc_local
846                task[0](*task[1:])
847            forked_pid = parallel.fork_start(self.resultdir, task_func)
848            logging.info('Just forked pid %d', forked_pid)
849            pids.append(forked_pid)
850
851        old_log_path = os.path.join(self.resultdir, old_log_filename)
852        old_log = open(old_log_path, "a")
853        exceptions = []
854        for i, pid in enumerate(pids):
855            # wait for the task to finish
856            try:
857                self._forkwait(pid, kwargs.get('timeout'))
858            except Exception as e:
859                logging.info('pid %d completed with error', pid)
860                exceptions.append(e)
861            # copy the logs from the subtask into the main log
862            new_log_path = old_log_path + (".%d" % i)
863            if os.path.exists(new_log_path):
864                new_log = open(new_log_path)
865                old_log.write(new_log.read())
866                new_log.close()
867                old_log.flush()
868                os.remove(new_log_path)
869        old_log.close()
870
871        self._logger.global_filename = old_log_filename
872
873        # handle any exceptions raised by the parallel tasks
874        if exceptions:
875            msg = "%d task(s) failed in job.parallel" % len(exceptions)
876            raise error.JobError(msg)
877
878
879    def quit(self):
880        # XXX: should have a better name.
881        self.harness.run_pause()
882        raise error.JobContinue("more to come")
883
884
885    def complete(self, status):
886        """Write pending reports, clean up, and exit"""
887        # We are about to exit 'complete' so clean up the control file.
888        dest = os.path.join(self.resultdir, os.path.basename(self._state_file))
889        shutil.move(self._state_file, dest)
890
891        self.harness.run_complete()
892        self.disable_external_logging()
893        sys.exit(status)
894
895
896    def _load_state(self):
897        # grab any initial state and set up $CONTROL.state as the backing file
898        init_state_file = self.control + '.init.state'
899        self._state_file = self.control + '.state'
900        if os.path.exists(init_state_file):
901            shutil.move(init_state_file, self._state_file)
902        self._state.set_backing_file(self._state_file)
903
904        # initialize the state engine, if necessary
905        has_steps = self._state.has('client', 'steps')
906        if not self._is_continuation and has_steps:
907            raise RuntimeError('Loaded state can only contain client.steps if '
908                               'this is a continuation')
909
910        if not has_steps:
911            logging.debug('Initializing the state engine')
912            self._state.set('client', 'steps', [])
913
914
915    def handle_persistent_option(self, options, option_name):
916        """
917        Select option from command line or persistent state.
918        Store selected option to allow standalone client to continue
919        after reboot with previously selected options.
920        Priority:
921        1. explicitly specified via command line
922        2. stored in state file (if continuing job '-c')
923        3. default == None
924        """
925        option = None
926        cmd_line_option = getattr(options, option_name)
927        if cmd_line_option:
928            option = cmd_line_option
929            self._state.set('client', option_name, option)
930        else:
931            stored_option = self._state.get('client', option_name, None)
932            if stored_option:
933                option = stored_option
934        logging.debug('Persistent option %s now set to %s', option_name, option)
935        return option
936
937
938    def __create_step_tuple(self, fn, args, dargs):
939        # Legacy code passes in an array where the first arg is
940        # the function or its name.
941        if isinstance(fn, list):
942            assert(len(args) == 0)
943            assert(len(dargs) == 0)
944            args = fn[1:]
945            fn = fn[0]
946        # Pickling actual functions is hairy, thus we have to call
947        # them by name.  Unfortunately, this means only functions
948        # defined globally can be used as a next step.
949        if callable(fn):
950            fn = fn.__name__
951        if not isinstance(fn, six.string_types):
952            raise StepError("Next steps must be functions or "
953                            "strings containing the function name")
954        ancestry = copy.copy(self._current_step_ancestry)
955        return (ancestry, fn, args, dargs)
956
957
958    def next_step_append(self, fn, *args, **dargs):
959        """Define the next step and place it at the end"""
960        steps = self._state.get('client', 'steps')
961        steps.append(self.__create_step_tuple(fn, args, dargs))
962        self._state.set('client', 'steps', steps)
963
964
965    def next_step(self, fn, *args, **dargs):
966        """Create a new step and place it after any steps added
967        while running the current step but before any steps added in
968        previous steps"""
969        steps = self._state.get('client', 'steps')
970        steps.insert(self._next_step_index,
971                     self.__create_step_tuple(fn, args, dargs))
972        self._next_step_index += 1
973        self._state.set('client', 'steps', steps)
974
975
976    def next_step_prepend(self, fn, *args, **dargs):
977        """Insert a new step, executing first"""
978        steps = self._state.get('client', 'steps')
979        steps.insert(0, self.__create_step_tuple(fn, args, dargs))
980        self._next_step_index += 1
981        self._state.set('client', 'steps', steps)
982
983
984
985    def _run_step_fn(self, local_vars, fn, args, dargs):
986        """Run a (step) function within the given context"""
987
988        local_vars['__args'] = args
989        local_vars['__dargs'] = dargs
990        try:
991            exec('__ret = %s(*__args, **__dargs)' % fn, local_vars, local_vars)
992            return local_vars['__ret']
993        except SystemExit:
994            raise  # Send error.JobContinue and JobComplete on up to runjob.
995        except error.TestNAError as detail:
996            self.record(detail.exit_status, None, fn, str(detail))
997        except Exception as detail:
998            raise error.UnhandledJobError(detail)
999
1000
1001    def _create_frame(self, global_vars, ancestry, fn_name):
1002        """Set up the environment like it would have been when this
1003        function was first defined.
1004
1005        Child step engine 'implementations' must have 'return locals()'
1006        at end end of their steps.  Because of this, we can call the
1007        parent function and get back all child functions (i.e. those
1008        defined within it).
1009
1010        Unfortunately, the call stack of the function calling
1011        job.next_step might have been deeper than the function it
1012        added.  In order to make sure that the environment is what it
1013        should be, we need to then pop off the frames we built until
1014        we find the frame where the function was first defined."""
1015
1016        # The copies ensure that the parent frames are not modified
1017        # while building child frames.  This matters if we then
1018        # pop some frames in the next part of this function.
1019        current_frame = copy.copy(global_vars)
1020        frames = [current_frame]
1021        for steps_fn_name in ancestry:
1022            ret = self._run_step_fn(current_frame, steps_fn_name, [], {})
1023            current_frame = copy.copy(ret)
1024            frames.append(current_frame)
1025
1026        # Walk up the stack frames until we find the place fn_name was defined.
1027        while len(frames) > 2:
1028            if fn_name not in frames[-2]:
1029                break
1030            if frames[-2][fn_name] != frames[-1][fn_name]:
1031                break
1032            frames.pop()
1033            ancestry.pop()
1034
1035        return (frames[-1], ancestry)
1036
1037
1038    def _add_step_init(self, local_vars, current_function):
1039        """If the function returned a dictionary that includes a
1040        function named 'step_init', prepend it to our list of steps.
1041        This will only get run the first time a function with a nested
1042        use of the step engine is run."""
1043
1044        if (isinstance(local_vars, dict) and
1045            'step_init' in local_vars and
1046            callable(local_vars['step_init'])):
1047            # The init step is a child of the function
1048            # we were just running.
1049            self._current_step_ancestry.append(current_function)
1050            self.next_step_prepend('step_init')
1051
1052
1053    def step_engine(self):
1054        """The multi-run engine used when the control file defines step_init.
1055
1056        Does the next step.
1057        """
1058
1059        # Set up the environment and then interpret the control file.
1060        # Some control files will have code outside of functions,
1061        # which means we need to have our state engine initialized
1062        # before reading in the file.
1063        global_control_vars = {'job': self,
1064                               'args': self.args}
1065        exec(JOB_PREAMBLE, global_control_vars, global_control_vars)
1066        try:
1067            exec(compile(open(self.control, "rb").read(), self.control, 'exec'),
1068                 global_control_vars, global_control_vars)
1069        except error.TestNAError as detail:
1070            self.record(detail.exit_status, None, self.control, str(detail))
1071        except SystemExit:
1072            raise  # Send error.JobContinue and JobComplete on up to runjob.
1073        except Exception as detail:
1074            # Syntax errors or other general Python exceptions coming out of
1075            # the top level of the control file itself go through here.
1076            raise error.UnhandledJobError(detail)
1077
1078        # If we loaded in a mid-job state file, then we presumably
1079        # know what steps we have yet to run.
1080        if not self._is_continuation:
1081            if 'step_init' in global_control_vars:
1082                self.next_step(global_control_vars['step_init'])
1083        else:
1084            # if last job failed due to unexpected reboot, record it as fail
1085            # so harness gets called
1086            last_job = self._state.get('client', 'unexpected_reboot', None)
1087            if last_job:
1088                subdir, testname = last_job
1089                self.record('FAIL', subdir, testname, 'unexpected reboot')
1090                self.record('END FAIL', subdir, testname)
1091
1092        # Iterate through the steps.  If we reboot, we'll simply
1093        # continue iterating on the next step.
1094        while len(self._state.get('client', 'steps')) > 0:
1095            steps = self._state.get('client', 'steps')
1096            (ancestry, fn_name, args, dargs) = steps.pop(0)
1097            self._state.set('client', 'steps', steps)
1098
1099            self._next_step_index = 0
1100            ret = self._create_frame(global_control_vars, ancestry, fn_name)
1101            local_vars, self._current_step_ancestry = ret
1102            local_vars = self._run_step_fn(local_vars, fn_name, args, dargs)
1103            self._add_step_init(local_vars, fn_name)
1104
1105
1106    def add_sysinfo_command(self, command, logfile=None, on_every_test=False):
1107        self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile),
1108                                   on_every_test)
1109
1110
1111    def add_sysinfo_logfile(self, file, on_every_test=False):
1112        self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test)
1113
1114
1115    def _add_sysinfo_loggable(self, loggable, on_every_test):
1116        if on_every_test:
1117            self.sysinfo.test_loggables.add(loggable)
1118        else:
1119            self.sysinfo.boot_loggables.add(loggable)
1120        self._save_sysinfo_state()
1121
1122
1123    def _load_sysinfo_state(self):
1124        state = self._state.get('client', 'sysinfo', None)
1125        if state:
1126            self.sysinfo.deserialize(state)
1127
1128
1129    def _save_sysinfo_state(self):
1130        state = self.sysinfo.serialize()
1131        self._state.set('client', 'sysinfo', state)
1132
1133
1134class disk_usage_monitor:
1135    def __init__(self, logging_func, device, max_mb_per_hour):
1136        self.func = logging_func
1137        self.device = device
1138        self.max_mb_per_hour = max_mb_per_hour
1139
1140
1141    def start(self):
1142        self.initial_space = utils.freespace(self.device)
1143        self.start_time = time.time()
1144
1145
1146    def stop(self):
1147        # if no maximum usage rate was set, we don't need to
1148        # generate any warnings
1149        if not self.max_mb_per_hour:
1150            return
1151
1152        final_space = utils.freespace(self.device)
1153        used_space = self.initial_space - final_space
1154        stop_time = time.time()
1155        total_time = stop_time - self.start_time
1156        # round up the time to one minute, to keep extremely short
1157        # tests from generating false positives due to short, badly
1158        # timed bursts of activity
1159        total_time = max(total_time, 60.0)
1160
1161        # determine the usage rate
1162        bytes_per_sec = used_space / total_time
1163        mb_per_sec = bytes_per_sec / 1024**2
1164        mb_per_hour = mb_per_sec * 60 * 60
1165
1166        if mb_per_hour > self.max_mb_per_hour:
1167            msg = ("disk space on %s was consumed at a rate of %.2f MB/hour")
1168            msg %= (self.device, mb_per_hour)
1169            self.func(msg)
1170
1171
1172    @classmethod
1173    def watch(cls, *monitor_args, **monitor_dargs):
1174        """ Generic decorator to wrap a function call with the
1175        standard create-monitor -> start -> call -> stop idiom."""
1176        def decorator(func):
1177            def watched_func(*args, **dargs):
1178                monitor = cls(*monitor_args, **monitor_dargs)
1179                monitor.start()
1180                try:
1181                    func(*args, **dargs)
1182                finally:
1183                    monitor.stop()
1184            return watched_func
1185        return decorator
1186
1187
1188def runjob(control, drop_caches, options):
1189    """
1190    Run a job using the given control file.
1191
1192    This is the main interface to this module.
1193
1194    @see base_job.__init__ for parameter info.
1195    """
1196    control = os.path.abspath(control)
1197    state = control + '.state'
1198    # Ensure state file is cleaned up before the job starts to run if autotest
1199    # is not running with the --continue flag
1200    if not options.cont and os.path.isfile(state):
1201        logging.debug('Cleaning up previously found state file')
1202        os.remove(state)
1203
1204    # instantiate the job object ready for the control file.
1205    myjob = None
1206    try:
1207        # Check that the control file is valid
1208        if not os.path.exists(control):
1209            raise error.JobError(control + ": control file not found")
1210
1211        # When continuing, the job is complete when there is no
1212        # state file, ensure we don't try and continue.
1213        if options.cont and not os.path.exists(state):
1214            raise error.JobComplete("all done")
1215
1216        myjob = job(control=control, drop_caches=drop_caches, options=options)
1217
1218        # Load in the users control file, may do any one of:
1219        #  1) execute in toto
1220        #  2) define steps, and select the first via next_step()
1221        myjob.step_engine()
1222
1223    except error.JobContinue:
1224        sys.exit(5)
1225
1226    except error.JobComplete:
1227        sys.exit(1)
1228
1229    except error.JobError as instance:
1230        logging.error("JOB ERROR: " + str(instance))
1231        if myjob:
1232            command = None
1233            if len(instance.args) > 1:
1234                command = instance.args[1]
1235                myjob.record('ABORT', None, command, str(instance))
1236            myjob.record('END ABORT', None, None, str(instance))
1237            assert myjob._record_indent == 0
1238            myjob.complete(1)
1239        else:
1240            sys.exit(1)
1241
1242    except Exception as e:
1243        # NOTE: job._run_step_fn and job.step_engine will turn things into
1244        # a JobError for us.  If we get here, its likely an autotest bug.
1245        msg = str(e) + '\n' + traceback.format_exc()
1246        logging.critical("JOB ERROR (autotest bug?): " + msg)
1247        if myjob:
1248            myjob.record('END ABORT', None, None, msg)
1249            assert myjob._record_indent == 0
1250            myjob.complete(1)
1251        else:
1252            sys.exit(1)
1253
1254    # If we get here, then we assume the job is complete and good.
1255    myjob.record('END GOOD', None, None)
1256    assert myjob._record_indent == 0
1257
1258    myjob.complete(0)
1259
1260
1261class job(base_client_job):
1262
1263    def __init__(self, *args, **kwargs):
1264        base_client_job.__init__(self, *args, **kwargs)
1265
1266
1267    def run_test(self, url, *args, **dargs):
1268        log_pauser = cros_logging.LogRotationPauser()
1269        passed = False
1270        try:
1271            log_pauser.begin()
1272            passed = base_client_job.run_test(self, url, *args, **dargs)
1273            if not passed:
1274                # Save the VM state immediately after the test failure.
1275                # This is a NOOP if the the test isn't running in a VM or
1276                # if the VM is not properly configured to save state.
1277                _group, testname = self.pkgmgr.get_package_name(url, 'test')
1278                now = datetime.now().strftime('%I:%M:%S.%f')
1279                checkpoint_name = '%s-%s' % (testname, now)
1280                utils.save_vm_state(checkpoint_name)
1281        finally:
1282            log_pauser.end()
1283        return passed
1284
1285
1286    def reboot(self):
1287        self.reboot_setup()
1288        self.harness.run_reboot()
1289
1290        # sync first, so that a sync during shutdown doesn't time out
1291        utils.system('sync; sync', ignore_status=True)
1292
1293        utils.system('reboot </dev/null >/dev/null 2>&1 &')
1294        self.quit()
1295
1296
1297    def require_gcc(self):
1298        return False
1299
1300
1301# TODO(ayatane): This logic should be deduplicated with
1302# server/cros/dynamic_suite/control_file_getter.py, but the server
1303# libraries are not available on clients.
1304def _locate_test_control_file(dirpath, testname):
1305    """
1306    Locate the control file for the given test.
1307
1308    @param dirpath Root directory to search.
1309    @param testname Name of test.
1310
1311    @returns Absolute path to the control file.
1312    @raise JobError: Raised if control file not found.
1313    """
1314    for dirpath, _dirnames, filenames in os.walk(dirpath):
1315        for filename in filenames:
1316            if 'control' not in filename:
1317                continue
1318            path = os.path.join(dirpath, filename)
1319            if _is_control_file_for_test(path, testname):
1320                return os.path.abspath(path)
1321    raise error.JobError(
1322            'could not find client test control file',
1323            dirpath, testname)
1324
1325
1326_NAME_PATTERN = "NAME *= *['\"]([^'\"]+)['\"]"
1327
1328
1329def _is_control_file_for_test(path, testname):
1330    with open(path) as f:
1331        for line in f:
1332            match = re.match(_NAME_PATTERN, line)
1333            if match is not None:
1334                return match.group(1) == testname
1335