1# Shell class for a test, inherited by all individual tests
2#
3# Methods:
4#       __init__        initialise
5#       initialize      run once for each job
6#       setup           run once for each new version of the test installed
7#       run             run the test (wrapped by job.run_test())
8#
9# Data:
10#       job             backreference to the job this test instance is part of
11#       outputdir       eg. results/<job>/<testname.tag>
12#       resultsdir      eg. results/<job>/<testname.tag>/results
13#       profdir         eg. results/<job>/<testname.tag>/profiling
14#       debugdir        eg. results/<job>/<testname.tag>/debug
15#       bindir          eg. tests/<test>
16#       src             eg. tests/<test>/src
17#       tmpdir          eg. tmp/<tempname>_<testname.tag>
18
19#pylint: disable=C0111
20
21import fcntl, json, os, re, sys, shutil, stat, tempfile, time, traceback
22import logging
23
24from autotest_lib.client.bin import utils
25from autotest_lib.client.common_lib import error
26
27
28class base_test(object):
29    preserve_srcdir = False
30    network_destabilizing = False
31
32    def __init__(self, job, bindir, outputdir):
33        self.job = job
34        self.pkgmgr = job.pkgmgr
35        self.autodir = job.autodir
36        self.outputdir = outputdir
37        self.tagged_testname = os.path.basename(self.outputdir)
38        self.resultsdir = os.path.join(self.outputdir, 'results')
39        os.mkdir(self.resultsdir)
40        self.profdir = os.path.join(self.outputdir, 'profiling')
41        os.mkdir(self.profdir)
42        self.debugdir = os.path.join(self.outputdir, 'debug')
43        os.mkdir(self.debugdir)
44        # TODO(ericli): figure out how autotest crash handler work with cros
45        # Once this is re-enabled import getpass. crosbug.com/31232
46        # crash handler, we should restore it in near term.
47        # if getpass.getuser() == 'root':
48        #     self.configure_crash_handler()
49        # else:
50        self.crash_handling_enabled = False
51        self.bindir = bindir
52        self.srcdir = os.path.join(self.bindir, 'src')
53        self.tmpdir = tempfile.mkdtemp("_" + self.tagged_testname,
54                                       dir=job.tmpdir)
55        self._keyvals = []
56        self._new_keyval = False
57        self.failed_constraints = []
58        self.iteration = 0
59        self.before_iteration_hooks = []
60        self.after_iteration_hooks = []
61
62        # Flag to indicate if the test has succeeded or failed.
63        self.success = False
64
65
66    def configure_crash_handler(self):
67        pass
68
69
70    def crash_handler_report(self):
71        pass
72
73
74    def assert_(self, expr, msg='Assertion failed.'):
75        if not expr:
76            raise error.TestError(msg)
77
78
79    def write_test_keyval(self, attr_dict):
80        utils.write_keyval(self.outputdir, attr_dict,
81                           tap_report=self.job._tap)
82
83
84    @staticmethod
85    def _append_type_to_keys(dictionary, typename):
86        new_dict = {}
87        for key, value in dictionary.iteritems():
88            new_key = "%s{%s}" % (key, typename)
89            new_dict[new_key] = value
90        return new_dict
91
92
93    def output_perf_value(self, description, value, units=None,
94                          higher_is_better=None, graph=None, replacement='_'):
95        """
96        Records a measured performance value in an output file.
97
98        The output file will subsequently be parsed by the TKO parser to have
99        the information inserted into the results database.
100
101        @param description: A string describing the measured perf value. Must
102                be maximum length 256, and may only contain letters, numbers,
103                periods, dashes, and underscores.  For example:
104                "page_load_time", "scrolling-frame-rate".
105        @param value: A number representing the measured perf value, or a list
106                of measured values if a test takes multiple measurements.
107                Measured perf values can be either ints or floats.
108        @param units: A string describing the units associated with the
109                measured perf value. Must be maximum length 32, and may only
110                contain letters, numbers, periods, dashes, and underscores.
111                For example: "msec", "fps", "score", "runs_per_second".
112        @param higher_is_better: A boolean indicating whether or not a "higher"
113                measured perf value is considered to be better. If False, it is
114                assumed that a "lower" measured value is considered to be
115                better. This impacts dashboard plotting and email notification.
116                Pure autotests are expected to specify either True or False!
117                This value can be set to "None" to indicate that the perf
118                dashboard should apply the rules encoded via Chromium
119                unit-info.json. This is only used for tracking Chromium based
120                tests (in particular telemetry).
121        @param graph: A string indicating the name of the graph on which
122                the perf value will be subsequently displayed on the chrome perf
123                dashboard. This allows multiple metrics be grouped together on
124                the same graphs. Defaults to None, indicating that the perf
125                value should be displayed individually on a separate graph.
126        @param replacement: string to replace illegal characters in
127                |description| and |units| with.
128        """
129        if len(description) > 256:
130            raise ValueError('The description must be at most 256 characters.')
131        if units and len(units) > 32:
132            raise ValueError('The units must be at most 32 characters.')
133
134        # If |replacement| is legal replace illegal characters with it.
135        string_regex = re.compile(r'[^-\.\w]')
136        if replacement is None or re.search(string_regex, replacement):
137            raise ValueError('Invalid replacement string to mask illegal '
138                             'characters. May only contain letters, numbers, '
139                             'periods, dashes, and underscores. '
140                             'replacement: %s' % replacement)
141        description = re.sub(string_regex, replacement, description)
142        units = re.sub(string_regex, replacement, units) if units else None
143
144        charts = {}
145        output_file = os.path.join(self.resultsdir, 'results-chart.json')
146        if os.path.isfile(output_file):
147            with open(output_file, 'r') as fp:
148                contents = fp.read()
149                if contents:
150                     charts = json.loads(contents)
151
152        if graph:
153            first_level = graph
154            second_level = description
155        else:
156            first_level = description
157            second_level = 'summary'
158
159        direction = 'up' if higher_is_better else 'down'
160
161        # All input should be a number - but at times there are strings
162        # representing numbers logged, attempt to convert them to numbers.
163        # If a non number string is logged an exception will be thrown.
164        if isinstance(value, list):
165          value = map(float, value)
166        else:
167          value = float(value)
168
169        result_type = 'scalar'
170        value_key = 'value'
171        result_value = value
172
173        # The chart json spec go/telemetry-json differenciates between a single
174        # value vs a list of values.  Lists of values get extra processing in
175        # the chromeperf dashboard ( mean, standard deviation etc)
176        # Tests can log one or more values for the same metric, to adhere stricly
177        # to the specification the first value logged is a scalar but if another
178        # value is logged the results become a list of scalar.
179        # TODO Figure out if there would be any difference of always using list
180        # of scalar even if there is just one item in the list.
181        if isinstance(value, list):
182            result_type = 'list_of_scalar_values'
183            value_key = 'values'
184            if first_level in charts and second_level in charts[first_level]:
185                if 'values' in charts[first_level][second_level]:
186                    result_value = charts[first_level][second_level]['values']
187                    result_value.extend(value)
188                elif 'value' in charts[first_level][second_level]:
189                    result_value = [charts[first_level][second_level]['value']]
190                    result_value.extend(value)
191            else:
192                result_value = value
193        elif first_level in charts and second_level in charts[first_level]:
194            result_type = 'list_of_scalar_values'
195            value_key = 'values'
196            if 'values' in charts[first_level][second_level]:
197                result_value = charts[first_level][second_level]['values']
198                result_value.append(value)
199            else:
200                result_value = [charts[first_level][second_level]['value'], value]
201
202        test_data = {
203            second_level: {
204                 'type': result_type,
205                 'units': units,
206                 value_key: result_value,
207                 'improvement_direction': direction
208           }
209        }
210
211        if first_level in charts:
212            charts[first_level].update(test_data)
213        else:
214            charts.update({first_level: test_data})
215
216        with open(output_file, 'w') as fp:
217            fp.write(json.dumps(charts, indent=2))
218
219
220    def write_perf_keyval(self, perf_dict):
221        self.write_iteration_keyval({}, perf_dict,
222                                    tap_report=self.job._tap)
223
224
225    def write_attr_keyval(self, attr_dict):
226        self.write_iteration_keyval(attr_dict, {},
227                                    tap_report=self.job._tap)
228
229
230    def write_iteration_keyval(self, attr_dict, perf_dict, tap_report=None):
231        # append the dictionaries before they have the {perf} and {attr} added
232        self._keyvals.append({'attr':attr_dict, 'perf':perf_dict})
233        self._new_keyval = True
234
235        if attr_dict:
236            attr_dict = self._append_type_to_keys(attr_dict, "attr")
237            utils.write_keyval(self.resultsdir, attr_dict, type_tag="attr",
238                               tap_report=tap_report)
239
240        if perf_dict:
241            perf_dict = self._append_type_to_keys(perf_dict, "perf")
242            utils.write_keyval(self.resultsdir, perf_dict, type_tag="perf",
243                               tap_report=tap_report)
244
245        keyval_path = os.path.join(self.resultsdir, "keyval")
246        print >> open(keyval_path, "a"), ""
247
248
249    def analyze_perf_constraints(self, constraints):
250        if not self._new_keyval:
251            return
252
253        # create a dict from the keyvals suitable as an environment for eval
254        keyval_env = self._keyvals[-1]['perf'].copy()
255        keyval_env['__builtins__'] = None
256        self._new_keyval = False
257        failures = []
258
259        # evaluate each constraint using the current keyvals
260        for constraint in constraints:
261            logging.info('___________________ constraint = %s', constraint)
262            logging.info('___________________ keyvals = %s', keyval_env)
263
264            try:
265                if not eval(constraint, keyval_env):
266                    failures.append('%s: constraint was not met' % constraint)
267            except:
268                failures.append('could not evaluate constraint: %s'
269                                % constraint)
270
271        # keep track of the errors for each iteration
272        self.failed_constraints.append(failures)
273
274
275    def process_failed_constraints(self):
276        msg = ''
277        for i, failures in enumerate(self.failed_constraints):
278            if failures:
279                msg += 'iteration %d:%s  ' % (i, ','.join(failures))
280
281        if msg:
282            raise error.TestFail(msg)
283
284
285    def register_before_iteration_hook(self, iteration_hook):
286        """
287        This is how we expect test writers to register a before_iteration_hook.
288        This adds the method to the list of hooks which are executed
289        before each iteration.
290
291        @param iteration_hook: Method to run before each iteration. A valid
292                               hook accepts a single argument which is the
293                               test object.
294        """
295        self.before_iteration_hooks.append(iteration_hook)
296
297
298    def register_after_iteration_hook(self, iteration_hook):
299        """
300        This is how we expect test writers to register an after_iteration_hook.
301        This adds the method to the list of hooks which are executed
302        after each iteration. Hooks are executed starting with the most-
303        recently registered, in stack fashion.
304
305        @param iteration_hook: Method to run after each iteration. A valid
306                               hook accepts a single argument which is the
307                               test object.
308        """
309        self.after_iteration_hooks.append(iteration_hook)
310
311
312    def initialize(self):
313        pass
314
315
316    def setup(self):
317        pass
318
319
320    def warmup(self, *args, **dargs):
321        pass
322
323
324    def drop_caches_between_iterations(self):
325        if self.job.drop_caches_between_iterations:
326            utils.drop_caches()
327
328
329    def _call_run_once_with_retry(self, constraints, profile_only,
330                                  postprocess_profiled_run, args, dargs):
331        """Thin wrapper around _call_run_once that retries unsuccessful tests.
332
333        If the job object's attribute test_retry is > 0 retry any tests that
334        ran unsuccessfully X times.
335        *Note this does not competely re-initialize the test, it only
336            re-executes code once all the initial job set up (packages,
337            sysinfo, etc) is complete.
338        """
339        if self.job.test_retry != 0:
340            logging.info('Test will be retried a maximum of %d times',
341                         self.job.test_retry)
342
343        max_runs = self.job.test_retry
344        for retry_run in xrange(0, max_runs+1):
345            try:
346                self._call_run_once(constraints, profile_only,
347                                    postprocess_profiled_run, args, dargs)
348                break
349            except error.TestFailRetry as err:
350                if retry_run == max_runs:
351                    raise
352                self.job.record('INFO', None, None, 'Run %s failed with %s' % (
353                        retry_run, err))
354        if retry_run > 0:
355            self.write_test_keyval({'test_retries_before_success': retry_run})
356
357
358    def _call_run_once(self, constraints, profile_only,
359                       postprocess_profiled_run, args, dargs):
360        self.drop_caches_between_iterations()
361        # execute iteration hooks
362        logging.debug('starting before_iteration_hooks')
363        for hook in self.before_iteration_hooks:
364            hook(self)
365        logging.debug('before_iteration_hooks completed')
366
367        try:
368            if profile_only:
369                if not self.job.profilers.present():
370                    self.job.record('WARN', None, None,
371                                    'No profilers have been added but '
372                                    'profile_only is set - nothing '
373                                    'will be run')
374                self.run_once_profiling(postprocess_profiled_run,
375                                        *args, **dargs)
376            else:
377                self.before_run_once()
378                logging.debug('starting test(run_once()), test details follow'
379                              '\n%r', args)
380                self.run_once(*args, **dargs)
381                logging.debug('The test has completed successfully')
382                self.after_run_once()
383
384            self.postprocess_iteration()
385            self.analyze_perf_constraints(constraints)
386        # Catch and re-raise to let after_iteration_hooks see the exception.
387        except Exception as e:
388            logging.debug('Test failed due to %s. Exception log follows the '
389                          'after_iteration_hooks.', str(e))
390            raise
391        finally:
392            logging.debug('starting after_iteration_hooks')
393            for hook in reversed(self.after_iteration_hooks):
394                hook(self)
395            logging.debug('after_iteration_hooks completed')
396
397
398    def execute(self, iterations=None, test_length=None, profile_only=None,
399                _get_time=time.time, postprocess_profiled_run=None,
400                constraints=(), *args, **dargs):
401        """
402        This is the basic execute method for the tests inherited from base_test.
403        If you want to implement a benchmark test, it's better to implement
404        the run_once function, to cope with the profiling infrastructure. For
405        other tests, you can just override the default implementation.
406
407        @param test_length: The minimum test length in seconds. We'll run the
408            run_once function for a number of times large enough to cover the
409            minimum test length.
410
411        @param iterations: A number of iterations that we'll run the run_once
412            function. This parameter is incompatible with test_length and will
413            be silently ignored if you specify both.
414
415        @param profile_only: If true run X iterations with profilers enabled.
416            If false run X iterations and one with profiling if profiles are
417            enabled. If None, default to the value of job.default_profile_only.
418
419        @param _get_time: [time.time] Used for unit test time injection.
420
421        @param postprocess_profiled_run: Run the postprocessing for the
422            profiled run.
423        """
424
425        # For our special class of tests, the benchmarks, we don't want
426        # profilers to run during the test iterations. Let's reserve only
427        # the last iteration for profiling, if needed. So let's stop
428        # all profilers if they are present and active.
429        profilers = self.job.profilers
430        if profilers.active():
431            profilers.stop(self)
432        if profile_only is None:
433            profile_only = self.job.default_profile_only
434        # If the user called this test in an odd way (specified both iterations
435        # and test_length), let's warn them.
436        if iterations and test_length:
437            logging.debug('Iterations parameter ignored (timed execution)')
438        if test_length:
439            test_start = _get_time()
440            time_elapsed = 0
441            timed_counter = 0
442            logging.debug('Test started. Specified %d s as the minimum test '
443                          'length', test_length)
444            while time_elapsed < test_length:
445                timed_counter = timed_counter + 1
446                if time_elapsed == 0:
447                    logging.debug('Executing iteration %d', timed_counter)
448                elif time_elapsed > 0:
449                    logging.debug('Executing iteration %d, time_elapsed %d s',
450                                  timed_counter, time_elapsed)
451                self._call_run_once_with_retry(constraints, profile_only,
452                                               postprocess_profiled_run, args,
453                                               dargs)
454                test_iteration_finish = _get_time()
455                time_elapsed = test_iteration_finish - test_start
456            logging.debug('Test finished after %d iterations, '
457                          'time elapsed: %d s', timed_counter, time_elapsed)
458        else:
459            if iterations is None:
460                iterations = 1
461            if iterations > 1:
462                logging.debug('Test started. Specified %d iterations',
463                              iterations)
464            for self.iteration in xrange(1, iterations + 1):
465                if iterations > 1:
466                    logging.debug('Executing iteration %d of %d',
467                                  self.iteration, iterations)
468                self._call_run_once_with_retry(constraints, profile_only,
469                                               postprocess_profiled_run, args,
470                                               dargs)
471
472        if not profile_only:
473            self.iteration += 1
474            self.run_once_profiling(postprocess_profiled_run, *args, **dargs)
475
476        # Do any postprocessing, normally extracting performance keyvals, etc
477        self.postprocess()
478        self.process_failed_constraints()
479
480
481    def run_once_profiling(self, postprocess_profiled_run, *args, **dargs):
482        profilers = self.job.profilers
483        # Do a profiling run if necessary
484        if profilers.present():
485            self.drop_caches_between_iterations()
486            profilers.before_start(self)
487
488            self.before_run_once()
489            profilers.start(self)
490            logging.debug('Profilers present. Profiling run started')
491
492            try:
493                self.run_once(*args, **dargs)
494
495                # Priority to the run_once() argument over the attribute.
496                postprocess_attribute = getattr(self,
497                                                'postprocess_profiled_run',
498                                                False)
499
500                if (postprocess_profiled_run or
501                    (postprocess_profiled_run is None and
502                     postprocess_attribute)):
503                    self.postprocess_iteration()
504
505            finally:
506                profilers.stop(self)
507                profilers.report(self)
508
509            self.after_run_once()
510
511
512    def postprocess(self):
513        pass
514
515
516    def postprocess_iteration(self):
517        pass
518
519
520    def cleanup(self):
521        pass
522
523
524    def before_run_once(self):
525        """
526        Override in tests that need it, will be called before any run_once()
527        call including the profiling run (when it's called before starting
528        the profilers).
529        """
530        pass
531
532
533    def after_run_once(self):
534        """
535        Called after every run_once (including from a profiled run when it's
536        called after stopping the profilers).
537        """
538        pass
539
540
541    @staticmethod
542    def _make_writable_to_others(directory):
543        mode = os.stat(directory).st_mode
544        mode = mode | stat.S_IROTH | stat.S_IWOTH | stat.S_IXOTH
545        os.chmod(directory, mode)
546
547
548    def _exec(self, args, dargs):
549        self.job.logging.tee_redirect_debug_dir(self.debugdir,
550                                                log_name=self.tagged_testname)
551        try:
552            if self.network_destabilizing:
553                self.job.disable_warnings("NETWORK")
554
555            # write out the test attributes into a keyval
556            dargs   = dargs.copy()
557            run_cleanup = dargs.pop('run_cleanup', self.job.run_test_cleanup)
558            keyvals = dargs.pop('test_attributes', {}).copy()
559            keyvals['version'] = self.version
560            for i, arg in enumerate(args):
561                keyvals['param-%d' % i] = repr(arg)
562            for name, arg in dargs.iteritems():
563                keyvals['param-%s' % name] = repr(arg)
564            self.write_test_keyval(keyvals)
565
566            _validate_args(args, dargs, self.initialize, self.setup,
567                           self.execute, self.cleanup)
568
569            try:
570                # Make resultsdir and tmpdir accessible to everyone. We may
571                # output data to these directories as others, e.g., chronos.
572                self._make_writable_to_others(self.tmpdir)
573                self._make_writable_to_others(self.resultsdir)
574
575                # Initialize:
576                _cherry_pick_call(self.initialize, *args, **dargs)
577
578                lockfile = open(os.path.join(self.job.tmpdir, '.testlock'), 'w')
579                try:
580                    fcntl.flock(lockfile, fcntl.LOCK_EX)
581                    # Setup: (compile and install the test, if needed)
582                    p_args, p_dargs = _cherry_pick_args(self.setup, args, dargs)
583                    utils.update_version(self.srcdir, self.preserve_srcdir,
584                                         self.version, self.setup,
585                                         *p_args, **p_dargs)
586                finally:
587                    fcntl.flock(lockfile, fcntl.LOCK_UN)
588                    lockfile.close()
589
590                # Execute:
591                os.chdir(self.outputdir)
592
593                # call self.warmup cherry picking the arguments it accepts and
594                # translate exceptions if needed
595                _call_test_function(_cherry_pick_call, self.warmup,
596                                    *args, **dargs)
597
598                if hasattr(self, 'run_once'):
599                    p_args, p_dargs = _cherry_pick_args(self.run_once,
600                                                        args, dargs)
601                    # pull in any non-* and non-** args from self.execute
602                    for param in _get_nonstar_args(self.execute):
603                        if param in dargs:
604                            p_dargs[param] = dargs[param]
605                else:
606                    p_args, p_dargs = _cherry_pick_args(self.execute,
607                                                        args, dargs)
608
609                _call_test_function(self.execute, *p_args, **p_dargs)
610            except Exception:
611                # Save the exception while we run our cleanup() before
612                # reraising it, but log it to so actual time of error is known.
613                exc_info = sys.exc_info()
614                logging.warning('The test failed with the following exception',
615                                exc_info=True)
616
617                try:
618                    try:
619                        if run_cleanup:
620                            logging.debug('Running cleanup for test.')
621                            _cherry_pick_call(self.cleanup, *args, **dargs)
622                    except Exception:
623                        logging.error('Ignoring exception during cleanup() '
624                                      'phase:')
625                        traceback.print_exc()
626                        logging.error('Now raising the earlier %s error',
627                                      exc_info[0])
628                    self.crash_handler_report()
629                finally:
630                    # Raise exception after running cleanup, reporting crash,
631                    # and restoring job's logging, even if the first two
632                    # actions fail.
633                    self.job.logging.restore()
634                    try:
635                        raise exc_info[0], exc_info[1], exc_info[2]
636                    finally:
637                        # http://docs.python.org/library/sys.html#sys.exc_info
638                        # Be nice and prevent a circular reference.
639                        del exc_info
640            else:
641                try:
642                    if run_cleanup:
643                        _cherry_pick_call(self.cleanup, *args, **dargs)
644                    self.crash_handler_report()
645                finally:
646                    self.job.logging.restore()
647        except error.AutotestError:
648            if self.network_destabilizing:
649                self.job.enable_warnings("NETWORK")
650            # Pass already-categorized errors on up.
651            raise
652        except Exception, e:
653            if self.network_destabilizing:
654                self.job.enable_warnings("NETWORK")
655            # Anything else is an ERROR in our own code, not execute().
656            raise error.UnhandledTestError(e)
657        else:
658            if self.network_destabilizing:
659                self.job.enable_warnings("NETWORK")
660
661
662    def runsubtest(self, url, *args, **dargs):
663        """
664        Execute another autotest test from inside the current test's scope.
665
666        @param test: Parent test.
667        @param url: Url of new test.
668        @param tag: Tag added to test name.
669        @param args: Args for subtest.
670        @param dargs: Dictionary with args for subtest.
671        @iterations: Number of subtest iterations.
672        @profile_only: If true execute one profiled run.
673        """
674        dargs["profile_only"] = dargs.get("profile_only", False)
675        test_basepath = self.outputdir[len(self.job.resultdir + "/"):]
676        return self.job.run_test(url, master_testpath=test_basepath,
677                                 *args, **dargs)
678
679
680def _get_nonstar_args(func):
681    """Extract all the (normal) function parameter names.
682
683    Given a function, returns a tuple of parameter names, specifically
684    excluding the * and ** parameters, if the function accepts them.
685
686    @param func: A callable that we want to chose arguments for.
687
688    @return: A tuple of parameters accepted by the function.
689    """
690    return func.func_code.co_varnames[:func.func_code.co_argcount]
691
692
693def _cherry_pick_args(func, args, dargs):
694    """Sanitize positional and keyword arguments before calling a function.
695
696    Given a callable (func), an argument tuple and a dictionary of keyword
697    arguments, pick only those arguments which the function is prepared to
698    accept and return a new argument tuple and keyword argument dictionary.
699
700    Args:
701      func: A callable that we want to choose arguments for.
702      args: A tuple of positional arguments to consider passing to func.
703      dargs: A dictionary of keyword arguments to consider passing to func.
704    Returns:
705      A tuple of: (args tuple, keyword arguments dictionary)
706    """
707    # Cherry pick args:
708    if func.func_code.co_flags & 0x04:
709        # func accepts *args, so return the entire args.
710        p_args = args
711    else:
712        p_args = ()
713
714    # Cherry pick dargs:
715    if func.func_code.co_flags & 0x08:
716        # func accepts **dargs, so return the entire dargs.
717        p_dargs = dargs
718    else:
719        # Only return the keyword arguments that func accepts.
720        p_dargs = {}
721        for param in _get_nonstar_args(func):
722            if param in dargs:
723                p_dargs[param] = dargs[param]
724
725    return p_args, p_dargs
726
727
728def _cherry_pick_call(func, *args, **dargs):
729    """Cherry picks arguments from args/dargs based on what "func" accepts
730    and calls the function with the picked arguments."""
731    p_args, p_dargs = _cherry_pick_args(func, args, dargs)
732    return func(*p_args, **p_dargs)
733
734
735def _validate_args(args, dargs, *funcs):
736    """Verify that arguments are appropriate for at least one callable.
737
738    Given a list of callables as additional parameters, verify that
739    the proposed keyword arguments in dargs will each be accepted by at least
740    one of the callables.
741
742    NOTE: args is currently not supported and must be empty.
743
744    Args:
745      args: A tuple of proposed positional arguments.
746      dargs: A dictionary of proposed keyword arguments.
747      *funcs: Callables to be searched for acceptance of args and dargs.
748    Raises:
749      error.AutotestError: if an arg won't be accepted by any of *funcs.
750    """
751    all_co_flags = 0
752    all_varnames = ()
753    for func in funcs:
754        all_co_flags |= func.func_code.co_flags
755        all_varnames += func.func_code.co_varnames[:func.func_code.co_argcount]
756
757    # Check if given args belongs to at least one of the methods below.
758    if len(args) > 0:
759        # Current implementation doesn't allow the use of args.
760        raise error.TestError('Unnamed arguments not accepted. Please '
761                              'call job.run_test with named args only')
762
763    # Check if given dargs belongs to at least one of the methods below.
764    if len(dargs) > 0:
765        if not all_co_flags & 0x08:
766            # no func accepts *dargs, so:
767            for param in dargs:
768                if not param in all_varnames:
769                    raise error.AutotestError('Unknown parameter: %s' % param)
770
771
772def _installtest(job, url):
773    (group, name) = job.pkgmgr.get_package_name(url, 'test')
774
775    # Bail if the test is already installed
776    group_dir = os.path.join(job.testdir, "download", group)
777    if os.path.exists(os.path.join(group_dir, name)):
778        return (group, name)
779
780    # If the group directory is missing create it and add
781    # an empty  __init__.py so that sub-directories are
782    # considered for import.
783    if not os.path.exists(group_dir):
784        os.makedirs(group_dir)
785        f = file(os.path.join(group_dir, '__init__.py'), 'w+')
786        f.close()
787
788    logging.debug("%s: installing test url=%s", name, url)
789    tarball = os.path.basename(url)
790    tarball_path = os.path.join(group_dir, tarball)
791    test_dir = os.path.join(group_dir, name)
792    job.pkgmgr.fetch_pkg(tarball, tarball_path,
793                         repo_url = os.path.dirname(url))
794
795    # Create the directory for the test
796    if not os.path.exists(test_dir):
797        os.mkdir(os.path.join(group_dir, name))
798
799    job.pkgmgr.untar_pkg(tarball_path, test_dir)
800
801    os.remove(tarball_path)
802
803    # For this 'sub-object' to be importable via the name
804    # 'group.name' we need to provide an __init__.py,
805    # so link the main entry point to this.
806    os.symlink(name + '.py', os.path.join(group_dir, name,
807                            '__init__.py'))
808
809    # The test is now installed.
810    return (group, name)
811
812
813def _call_test_function(func, *args, **dargs):
814    """Calls a test function and translates exceptions so that errors
815    inside test code are considered test failures."""
816    try:
817        return func(*args, **dargs)
818    except error.AutotestError:
819        raise
820    except Exception, e:
821        # Other exceptions must be treated as a FAIL when
822        # raised during the test functions
823        raise error.UnhandledTestFail(e)
824
825
826def runtest(job, url, tag, args, dargs,
827            local_namespace={}, global_namespace={},
828            before_test_hook=None, after_test_hook=None,
829            before_iteration_hook=None, after_iteration_hook=None):
830    local_namespace = local_namespace.copy()
831    global_namespace = global_namespace.copy()
832    # if this is not a plain test name then download and install the
833    # specified test
834    if url.endswith('.tar.bz2'):
835        (testgroup, testname) = _installtest(job, url)
836        bindir = os.path.join(job.testdir, 'download', testgroup, testname)
837        importdir = os.path.join(job.testdir, 'download')
838        modulename = '%s.%s' % (re.sub('/', '.', testgroup), testname)
839        classname = '%s.%s' % (modulename, testname)
840        path = testname
841    else:
842        # If the test is local, it may be under either testdir or site_testdir.
843        # Tests in site_testdir override tests defined in testdir
844        testname = path = url
845        testgroup = ''
846        path = re.sub(':', '/', testname)
847        modulename = os.path.basename(path)
848        classname = '%s.%s' % (modulename, modulename)
849
850        # Try installing the test package
851        # The job object may be either a server side job or a client side job.
852        # 'install_pkg' method will be present only if it's a client side job.
853        if hasattr(job, 'install_pkg'):
854            try:
855                bindir = os.path.join(job.testdir, testname)
856                job.install_pkg(testname, 'test', bindir)
857            except error.PackageInstallError:
858                # continue as a fall back mechanism and see if the test code
859                # already exists on the machine
860                pass
861
862        bindir = None
863        for dir in [job.testdir, getattr(job, 'site_testdir', None)]:
864            if dir is not None and os.path.exists(os.path.join(dir, path)):
865                importdir = bindir = os.path.join(dir, path)
866        if not bindir:
867            raise error.TestError(testname + ': test does not exist')
868
869    subdir = os.path.join(dargs.pop('master_testpath', ""), testname)
870    outputdir = os.path.join(job.resultdir, subdir)
871    if tag:
872        outputdir += '.' + tag
873
874    local_namespace['job'] = job
875    local_namespace['bindir'] = bindir
876    local_namespace['outputdir'] = outputdir
877
878    sys.path.insert(0, importdir)
879    try:
880        exec ('import %s' % modulename, local_namespace, global_namespace)
881        exec ("mytest = %s(job, bindir, outputdir)" % classname,
882              local_namespace, global_namespace)
883    finally:
884        sys.path.pop(0)
885
886    pwd = os.getcwd()
887    os.chdir(outputdir)
888
889    try:
890        mytest = global_namespace['mytest']
891        mytest.success = False
892        if before_test_hook:
893            before_test_hook(mytest)
894
895        # we use the register iteration hooks methods to register the passed
896        # in hooks
897        if before_iteration_hook:
898            mytest.register_before_iteration_hook(before_iteration_hook)
899        if after_iteration_hook:
900            mytest.register_after_iteration_hook(after_iteration_hook)
901        mytest._exec(args, dargs)
902        mytest.success = True
903    finally:
904        os.chdir(pwd)
905        if after_test_hook:
906            after_test_hook(mytest)
907        shutil.rmtree(mytest.tmpdir, ignore_errors=True)
908
909