1# Shell class for a test, inherited by all individual tests
2#
3# Methods:
4#       __init__        initialise
5#       initialize      run once for each job
6#       setup           run once for each new version of the test installed
7#       run             run the test (wrapped by job.run_test())
8#
9# Data:
10#       job             backreference to the job this test instance is part of
11#       outputdir       eg. results/<job>/<testname.tag>
12#       resultsdir      eg. results/<job>/<testname.tag>/results
13#       profdir         eg. results/<job>/<testname.tag>/profiling
14#       debugdir        eg. results/<job>/<testname.tag>/debug
15#       bindir          eg. tests/<test>
16#       src             eg. tests/<test>/src
17#       tmpdir          eg. tmp/<tempname>_<testname.tag>
18
19#pylint: disable-msg=C0111
20
21import fcntl, json, os, re, sys, shutil, stat, tempfile, time, traceback
22import logging
23
24from autotest_lib.client.bin import utils
25from autotest_lib.client.common_lib import error
26
27
28class base_test(object):
29    preserve_srcdir = False
30    network_destabilizing = False
31
32    def __init__(self, job, bindir, outputdir):
33        self.job = job
34        self.pkgmgr = job.pkgmgr
35        self.autodir = job.autodir
36        self.outputdir = outputdir
37        self.tagged_testname = os.path.basename(self.outputdir)
38        self.resultsdir = os.path.join(self.outputdir, 'results')
39        os.mkdir(self.resultsdir)
40        self.profdir = os.path.join(self.outputdir, 'profiling')
41        os.mkdir(self.profdir)
42        self.debugdir = os.path.join(self.outputdir, 'debug')
43        os.mkdir(self.debugdir)
44        # TODO(ericli): figure out how autotest crash handler work with cros
45        # Once this is re-enabled import getpass. crosbug.com/31232
46        # crash handler, we should restore it in near term.
47        # if getpass.getuser() == 'root':
48        #     self.configure_crash_handler()
49        # else:
50        self.crash_handling_enabled = False
51        self.bindir = bindir
52        self.srcdir = os.path.join(self.bindir, 'src')
53        self.tmpdir = tempfile.mkdtemp("_" + self.tagged_testname,
54                                       dir=job.tmpdir)
55        self._keyvals = []
56        self._new_keyval = False
57        self.failed_constraints = []
58        self.iteration = 0
59        self.before_iteration_hooks = []
60        self.after_iteration_hooks = []
61
62        # Flag to indicate if the test has succeeded or failed.
63        self.success = False
64
65
66    def configure_crash_handler(self):
67        pass
68
69
70    def crash_handler_report(self):
71        pass
72
73
74    def assert_(self, expr, msg='Assertion failed.'):
75        if not expr:
76            raise error.TestError(msg)
77
78
79    def write_test_keyval(self, attr_dict):
80        utils.write_keyval(self.outputdir, attr_dict,
81                           tap_report=self.job._tap)
82
83
84    @staticmethod
85    def _append_type_to_keys(dictionary, typename):
86        new_dict = {}
87        for key, value in dictionary.iteritems():
88            new_key = "%s{%s}" % (key, typename)
89            new_dict[new_key] = value
90        return new_dict
91
92
93    def output_perf_value(self, description, value, units=None,
94                          higher_is_better=None, graph=None, replacement='_'):
95        """
96        Records a measured performance value in an output file.
97
98        The output file will subsequently be parsed by the TKO parser to have
99        the information inserted into the results database.
100
101        @param description: A string describing the measured perf value. Must
102                be maximum length 256, and may only contain letters, numbers,
103                periods, dashes, and underscores.  For example:
104                "page_load_time", "scrolling-frame-rate".
105        @param value: A number representing the measured perf value, or a list
106                of measured values if a test takes multiple measurements.
107                Measured perf values can be either ints or floats.
108        @param units: A string describing the units associated with the
109                measured perf value. Must be maximum length 32, and may only
110                contain letters, numbers, periods, dashes, and underscores.
111                For example: "msec", "fps", "score", "runs_per_second".
112        @param higher_is_better: A boolean indicating whether or not a "higher"
113                measured perf value is considered to be better. If False, it is
114                assumed that a "lower" measured value is considered to be
115                better. This impacts dashboard plotting and email notification.
116                Pure autotests are expected to specify either True or False!
117                This value can be set to "None" to indicate that the perf
118                dashboard should apply the rules encoded via Chromium
119                unit-info.json. This is only used for tracking Chromium based
120                tests (in particular telemetry).
121        @param graph: A string indicating the name of the graph on which
122                the perf value will be subsequently displayed on the chrome perf
123                dashboard. This allows multiple metrics be grouped together on
124                the same graphs. Defaults to None, indicating that the perf
125                value should be displayed individually on a separate graph.
126        @param replacement: string to replace illegal characters in
127                |description| and |units| with.
128        """
129        if len(description) > 256:
130            raise ValueError('The description must be at most 256 characters.')
131        if units and len(units) > 32:
132            raise ValueError('The units must be at most 32 characters.')
133
134        # If |replacement| is legal replace illegal characters with it.
135        string_regex = re.compile(r'[^-\.\w]')
136        if replacement is None or re.search(string_regex, replacement):
137            raise ValueError('Invalid replacement string to mask illegal '
138                             'characters. May only contain letters, numbers, '
139                             'periods, dashes, and underscores. '
140                             'replacement: %s' % replacement)
141        description = re.sub(string_regex, replacement, description)
142        units = re.sub(string_regex, replacement, units) if units else None
143
144        entry = {
145            'description': description,
146            'value': value,
147            'units': units,
148            'higher_is_better': higher_is_better,
149            'graph': graph
150        }
151
152        output_path = os.path.join(self.resultsdir, 'perf_measurements')
153        with open(output_path, 'a') as fp:
154            fp.write(json.dumps(entry, sort_keys=True) + '\n')
155
156
157    def write_perf_keyval(self, perf_dict):
158        self.write_iteration_keyval({}, perf_dict,
159                                    tap_report=self.job._tap)
160
161
162    def write_attr_keyval(self, attr_dict):
163        self.write_iteration_keyval(attr_dict, {},
164                                    tap_report=self.job._tap)
165
166
167    def write_iteration_keyval(self, attr_dict, perf_dict, tap_report=None):
168        # append the dictionaries before they have the {perf} and {attr} added
169        self._keyvals.append({'attr':attr_dict, 'perf':perf_dict})
170        self._new_keyval = True
171
172        if attr_dict:
173            attr_dict = self._append_type_to_keys(attr_dict, "attr")
174            utils.write_keyval(self.resultsdir, attr_dict, type_tag="attr",
175                               tap_report=tap_report)
176
177        if perf_dict:
178            perf_dict = self._append_type_to_keys(perf_dict, "perf")
179            utils.write_keyval(self.resultsdir, perf_dict, type_tag="perf",
180                               tap_report=tap_report)
181
182        keyval_path = os.path.join(self.resultsdir, "keyval")
183        print >> open(keyval_path, "a"), ""
184
185
186    def analyze_perf_constraints(self, constraints):
187        if not self._new_keyval:
188            return
189
190        # create a dict from the keyvals suitable as an environment for eval
191        keyval_env = self._keyvals[-1]['perf'].copy()
192        keyval_env['__builtins__'] = None
193        self._new_keyval = False
194        failures = []
195
196        # evaluate each constraint using the current keyvals
197        for constraint in constraints:
198            logging.info('___________________ constraint = %s', constraint)
199            logging.info('___________________ keyvals = %s', keyval_env)
200
201            try:
202                if not eval(constraint, keyval_env):
203                    failures.append('%s: constraint was not met' % constraint)
204            except:
205                failures.append('could not evaluate constraint: %s'
206                                % constraint)
207
208        # keep track of the errors for each iteration
209        self.failed_constraints.append(failures)
210
211
212    def process_failed_constraints(self):
213        msg = ''
214        for i, failures in enumerate(self.failed_constraints):
215            if failures:
216                msg += 'iteration %d:%s  ' % (i, ','.join(failures))
217
218        if msg:
219            raise error.TestFail(msg)
220
221
222    def register_before_iteration_hook(self, iteration_hook):
223        """
224        This is how we expect test writers to register a before_iteration_hook.
225        This adds the method to the list of hooks which are executed
226        before each iteration.
227
228        @param iteration_hook: Method to run before each iteration. A valid
229                               hook accepts a single argument which is the
230                               test object.
231        """
232        self.before_iteration_hooks.append(iteration_hook)
233
234
235    def register_after_iteration_hook(self, iteration_hook):
236        """
237        This is how we expect test writers to register an after_iteration_hook.
238        This adds the method to the list of hooks which are executed
239        after each iteration. Hooks are executed starting with the most-
240        recently registered, in stack fashion.
241
242        @param iteration_hook: Method to run after each iteration. A valid
243                               hook accepts a single argument which is the
244                               test object.
245        """
246        self.after_iteration_hooks.append(iteration_hook)
247
248
249    def initialize(self):
250        pass
251
252
253    def setup(self):
254        pass
255
256
257    def warmup(self, *args, **dargs):
258        pass
259
260
261    def drop_caches_between_iterations(self):
262        if self.job.drop_caches_between_iterations:
263            utils.drop_caches()
264
265
266    def _call_run_once_with_retry(self, constraints, profile_only,
267                                  postprocess_profiled_run, args, dargs):
268        """Thin wrapper around _call_run_once that retries unsuccessful tests.
269
270        If the job object's attribute test_retry is > 0 retry any tests that
271        ran unsuccessfully X times.
272        *Note this does not competely re-initialize the test, it only
273            re-executes code once all the initial job set up (packages,
274            sysinfo, etc) is complete.
275        """
276        if self.job.test_retry != 0:
277            logging.info('Test will be retried a maximum of %d times',
278                         self.job.test_retry)
279
280        max_runs = self.job.test_retry
281        for retry_run in xrange(0, max_runs+1):
282            try:
283                self._call_run_once(constraints, profile_only,
284                                    postprocess_profiled_run, args, dargs)
285                break
286            except error.TestFailRetry as err:
287                if retry_run == max_runs:
288                    raise
289                self.job.record('INFO', None, None, 'Run %s failed with %s' % (
290                        retry_run, err))
291        if retry_run > 0:
292            self.write_test_keyval({'test_retries_before_success': retry_run})
293
294
295    def _call_run_once(self, constraints, profile_only,
296                       postprocess_profiled_run, args, dargs):
297        self.drop_caches_between_iterations()
298        # execute iteration hooks
299        for hook in self.before_iteration_hooks:
300            hook(self)
301
302        try:
303            if profile_only:
304                if not self.job.profilers.present():
305                    self.job.record('WARN', None, None,
306                                    'No profilers have been added but '
307                                    'profile_only is set - nothing '
308                                    'will be run')
309                self.run_once_profiling(postprocess_profiled_run,
310                                        *args, **dargs)
311            else:
312                self.before_run_once()
313                self.run_once(*args, **dargs)
314                self.after_run_once()
315
316            self.postprocess_iteration()
317            self.analyze_perf_constraints(constraints)
318        # Catch and re-raise to let after_iteration_hooks see the exception.
319        except:
320            raise
321        finally:
322            for hook in reversed(self.after_iteration_hooks):
323                hook(self)
324
325
326    def execute(self, iterations=None, test_length=None, profile_only=None,
327                _get_time=time.time, postprocess_profiled_run=None,
328                constraints=(), *args, **dargs):
329        """
330        This is the basic execute method for the tests inherited from base_test.
331        If you want to implement a benchmark test, it's better to implement
332        the run_once function, to cope with the profiling infrastructure. For
333        other tests, you can just override the default implementation.
334
335        @param test_length: The minimum test length in seconds. We'll run the
336            run_once function for a number of times large enough to cover the
337            minimum test length.
338
339        @param iterations: A number of iterations that we'll run the run_once
340            function. This parameter is incompatible with test_length and will
341            be silently ignored if you specify both.
342
343        @param profile_only: If true run X iterations with profilers enabled.
344            If false run X iterations and one with profiling if profiles are
345            enabled. If None, default to the value of job.default_profile_only.
346
347        @param _get_time: [time.time] Used for unit test time injection.
348
349        @param postprocess_profiled_run: Run the postprocessing for the
350            profiled run.
351        """
352
353        # For our special class of tests, the benchmarks, we don't want
354        # profilers to run during the test iterations. Let's reserve only
355        # the last iteration for profiling, if needed. So let's stop
356        # all profilers if they are present and active.
357        profilers = self.job.profilers
358        if profilers.active():
359            profilers.stop(self)
360        if profile_only is None:
361            profile_only = self.job.default_profile_only
362        # If the user called this test in an odd way (specified both iterations
363        # and test_length), let's warn them.
364        if iterations and test_length:
365            logging.debug('Iterations parameter ignored (timed execution)')
366        if test_length:
367            test_start = _get_time()
368            time_elapsed = 0
369            timed_counter = 0
370            logging.debug('Test started. Specified %d s as the minimum test '
371                          'length', test_length)
372            while time_elapsed < test_length:
373                timed_counter = timed_counter + 1
374                if time_elapsed == 0:
375                    logging.debug('Executing iteration %d', timed_counter)
376                elif time_elapsed > 0:
377                    logging.debug('Executing iteration %d, time_elapsed %d s',
378                                  timed_counter, time_elapsed)
379                self._call_run_once_with_retry(constraints, profile_only,
380                                               postprocess_profiled_run, args,
381                                               dargs)
382                test_iteration_finish = _get_time()
383                time_elapsed = test_iteration_finish - test_start
384            logging.debug('Test finished after %d iterations, '
385                          'time elapsed: %d s', timed_counter, time_elapsed)
386        else:
387            if iterations is None:
388                iterations = 1
389            if iterations > 1:
390                logging.debug('Test started. Specified %d iterations',
391                              iterations)
392            for self.iteration in xrange(1, iterations + 1):
393                if iterations > 1:
394                    logging.debug('Executing iteration %d of %d',
395                                  self.iteration, iterations)
396                self._call_run_once_with_retry(constraints, profile_only,
397                                               postprocess_profiled_run, args,
398                                               dargs)
399
400        if not profile_only:
401            self.iteration += 1
402            self.run_once_profiling(postprocess_profiled_run, *args, **dargs)
403
404        # Do any postprocessing, normally extracting performance keyvals, etc
405        self.postprocess()
406        self.process_failed_constraints()
407
408
409    def run_once_profiling(self, postprocess_profiled_run, *args, **dargs):
410        profilers = self.job.profilers
411        # Do a profiling run if necessary
412        if profilers.present():
413            self.drop_caches_between_iterations()
414            profilers.before_start(self)
415
416            self.before_run_once()
417            profilers.start(self)
418            logging.debug('Profilers present. Profiling run started')
419
420            try:
421                self.run_once(*args, **dargs)
422
423                # Priority to the run_once() argument over the attribute.
424                postprocess_attribute = getattr(self,
425                                                'postprocess_profiled_run',
426                                                False)
427
428                if (postprocess_profiled_run or
429                    (postprocess_profiled_run is None and
430                     postprocess_attribute)):
431                    self.postprocess_iteration()
432
433            finally:
434                profilers.stop(self)
435                profilers.report(self)
436
437            self.after_run_once()
438
439
440    def postprocess(self):
441        pass
442
443
444    def postprocess_iteration(self):
445        pass
446
447
448    def cleanup(self):
449        pass
450
451
452    def before_run_once(self):
453        """
454        Override in tests that need it, will be called before any run_once()
455        call including the profiling run (when it's called before starting
456        the profilers).
457        """
458        pass
459
460
461    def after_run_once(self):
462        """
463        Called after every run_once (including from a profiled run when it's
464        called after stopping the profilers).
465        """
466        pass
467
468
469    @staticmethod
470    def _make_writable_to_others(directory):
471        mode = os.stat(directory).st_mode
472        mode = mode | stat.S_IROTH | stat.S_IWOTH | stat.S_IXOTH
473        os.chmod(directory, mode)
474
475
476    def _exec(self, args, dargs):
477        self.job.logging.tee_redirect_debug_dir(self.debugdir,
478                                                log_name=self.tagged_testname)
479        try:
480            if self.network_destabilizing:
481                self.job.disable_warnings("NETWORK")
482
483            # write out the test attributes into a keyval
484            dargs   = dargs.copy()
485            run_cleanup = dargs.pop('run_cleanup', self.job.run_test_cleanup)
486            keyvals = dargs.pop('test_attributes', {}).copy()
487            keyvals['version'] = self.version
488            for i, arg in enumerate(args):
489                keyvals['param-%d' % i] = repr(arg)
490            for name, arg in dargs.iteritems():
491                keyvals['param-%s' % name] = repr(arg)
492            self.write_test_keyval(keyvals)
493
494            _validate_args(args, dargs, self.initialize, self.setup,
495                           self.execute, self.cleanup)
496
497            try:
498                # Make resultsdir and tmpdir accessible to everyone. We may
499                # output data to these directories as others, e.g., chronos.
500                self._make_writable_to_others(self.tmpdir)
501                self._make_writable_to_others(self.resultsdir)
502
503                # Initialize:
504                _cherry_pick_call(self.initialize, *args, **dargs)
505
506                lockfile = open(os.path.join(self.job.tmpdir, '.testlock'), 'w')
507                try:
508                    fcntl.flock(lockfile, fcntl.LOCK_EX)
509                    # Setup: (compile and install the test, if needed)
510                    p_args, p_dargs = _cherry_pick_args(self.setup, args, dargs)
511                    utils.update_version(self.srcdir, self.preserve_srcdir,
512                                         self.version, self.setup,
513                                         *p_args, **p_dargs)
514                finally:
515                    fcntl.flock(lockfile, fcntl.LOCK_UN)
516                    lockfile.close()
517
518                # Execute:
519                os.chdir(self.outputdir)
520
521                # call self.warmup cherry picking the arguments it accepts and
522                # translate exceptions if needed
523                _call_test_function(_cherry_pick_call, self.warmup,
524                                    *args, **dargs)
525
526                if hasattr(self, 'run_once'):
527                    p_args, p_dargs = _cherry_pick_args(self.run_once,
528                                                        args, dargs)
529                    # pull in any non-* and non-** args from self.execute
530                    for param in _get_nonstar_args(self.execute):
531                        if param in dargs:
532                            p_dargs[param] = dargs[param]
533                else:
534                    p_args, p_dargs = _cherry_pick_args(self.execute,
535                                                        args, dargs)
536
537                _call_test_function(self.execute, *p_args, **p_dargs)
538            except Exception:
539                # Save the exception while we run our cleanup() before
540                # reraising it, but log it to so actual time of error is known.
541                exc_info = sys.exc_info()
542                logging.warning('Autotest caught exception when running test:',
543                                exc_info=True)
544
545                try:
546                    try:
547                        if run_cleanup:
548                            _cherry_pick_call(self.cleanup, *args, **dargs)
549                    except Exception:
550                        logging.error('Ignoring exception during cleanup() '
551                                      'phase:')
552                        traceback.print_exc()
553                        logging.error('Now raising the earlier %s error',
554                                      exc_info[0])
555                    self.crash_handler_report()
556                finally:
557                    self.job.logging.restore()
558                    try:
559                        raise exc_info[0], exc_info[1], exc_info[2]
560                    finally:
561                        # http://docs.python.org/library/sys.html#sys.exc_info
562                        # Be nice and prevent a circular reference.
563                        del exc_info
564            else:
565                try:
566                    if run_cleanup:
567                        _cherry_pick_call(self.cleanup, *args, **dargs)
568                    self.crash_handler_report()
569                finally:
570                    self.job.logging.restore()
571        except error.AutotestError:
572            if self.network_destabilizing:
573                self.job.enable_warnings("NETWORK")
574            # Pass already-categorized errors on up.
575            raise
576        except Exception, e:
577            if self.network_destabilizing:
578                self.job.enable_warnings("NETWORK")
579            # Anything else is an ERROR in our own code, not execute().
580            raise error.UnhandledTestError(e)
581        else:
582            if self.network_destabilizing:
583                self.job.enable_warnings("NETWORK")
584
585
586    def runsubtest(self, url, *args, **dargs):
587        """
588        Execute another autotest test from inside the current test's scope.
589
590        @param test: Parent test.
591        @param url: Url of new test.
592        @param tag: Tag added to test name.
593        @param args: Args for subtest.
594        @param dargs: Dictionary with args for subtest.
595        @iterations: Number of subtest iterations.
596        @profile_only: If true execute one profiled run.
597        """
598        dargs["profile_only"] = dargs.get("profile_only", False)
599        test_basepath = self.outputdir[len(self.job.resultdir + "/"):]
600        return self.job.run_test(url, master_testpath=test_basepath,
601                                 *args, **dargs)
602
603
604def _get_nonstar_args(func):
605    """Extract all the (normal) function parameter names.
606
607    Given a function, returns a tuple of parameter names, specifically
608    excluding the * and ** parameters, if the function accepts them.
609
610    @param func: A callable that we want to chose arguments for.
611
612    @return: A tuple of parameters accepted by the function.
613    """
614    return func.func_code.co_varnames[:func.func_code.co_argcount]
615
616
617def _cherry_pick_args(func, args, dargs):
618    """Sanitize positional and keyword arguments before calling a function.
619
620    Given a callable (func), an argument tuple and a dictionary of keyword
621    arguments, pick only those arguments which the function is prepared to
622    accept and return a new argument tuple and keyword argument dictionary.
623
624    Args:
625      func: A callable that we want to choose arguments for.
626      args: A tuple of positional arguments to consider passing to func.
627      dargs: A dictionary of keyword arguments to consider passing to func.
628    Returns:
629      A tuple of: (args tuple, keyword arguments dictionary)
630    """
631    # Cherry pick args:
632    if func.func_code.co_flags & 0x04:
633        # func accepts *args, so return the entire args.
634        p_args = args
635    else:
636        p_args = ()
637
638    # Cherry pick dargs:
639    if func.func_code.co_flags & 0x08:
640        # func accepts **dargs, so return the entire dargs.
641        p_dargs = dargs
642    else:
643        # Only return the keyword arguments that func accepts.
644        p_dargs = {}
645        for param in _get_nonstar_args(func):
646            if param in dargs:
647                p_dargs[param] = dargs[param]
648
649    return p_args, p_dargs
650
651
652def _cherry_pick_call(func, *args, **dargs):
653    """Cherry picks arguments from args/dargs based on what "func" accepts
654    and calls the function with the picked arguments."""
655    p_args, p_dargs = _cherry_pick_args(func, args, dargs)
656    return func(*p_args, **p_dargs)
657
658
659def _validate_args(args, dargs, *funcs):
660    """Verify that arguments are appropriate for at least one callable.
661
662    Given a list of callables as additional parameters, verify that
663    the proposed keyword arguments in dargs will each be accepted by at least
664    one of the callables.
665
666    NOTE: args is currently not supported and must be empty.
667
668    Args:
669      args: A tuple of proposed positional arguments.
670      dargs: A dictionary of proposed keyword arguments.
671      *funcs: Callables to be searched for acceptance of args and dargs.
672    Raises:
673      error.AutotestError: if an arg won't be accepted by any of *funcs.
674    """
675    all_co_flags = 0
676    all_varnames = ()
677    for func in funcs:
678        all_co_flags |= func.func_code.co_flags
679        all_varnames += func.func_code.co_varnames[:func.func_code.co_argcount]
680
681    # Check if given args belongs to at least one of the methods below.
682    if len(args) > 0:
683        # Current implementation doesn't allow the use of args.
684        raise error.TestError('Unnamed arguments not accepted. Please '
685                              'call job.run_test with named args only')
686
687    # Check if given dargs belongs to at least one of the methods below.
688    if len(dargs) > 0:
689        if not all_co_flags & 0x08:
690            # no func accepts *dargs, so:
691            for param in dargs:
692                if not param in all_varnames:
693                    raise error.AutotestError('Unknown parameter: %s' % param)
694
695
696def _installtest(job, url):
697    (group, name) = job.pkgmgr.get_package_name(url, 'test')
698
699    # Bail if the test is already installed
700    group_dir = os.path.join(job.testdir, "download", group)
701    if os.path.exists(os.path.join(group_dir, name)):
702        return (group, name)
703
704    # If the group directory is missing create it and add
705    # an empty  __init__.py so that sub-directories are
706    # considered for import.
707    if not os.path.exists(group_dir):
708        os.makedirs(group_dir)
709        f = file(os.path.join(group_dir, '__init__.py'), 'w+')
710        f.close()
711
712    logging.debug("%s: installing test url=%s", name, url)
713    tarball = os.path.basename(url)
714    tarball_path = os.path.join(group_dir, tarball)
715    test_dir = os.path.join(group_dir, name)
716    job.pkgmgr.fetch_pkg(tarball, tarball_path,
717                         repo_url = os.path.dirname(url))
718
719    # Create the directory for the test
720    if not os.path.exists(test_dir):
721        os.mkdir(os.path.join(group_dir, name))
722
723    job.pkgmgr.untar_pkg(tarball_path, test_dir)
724
725    os.remove(tarball_path)
726
727    # For this 'sub-object' to be importable via the name
728    # 'group.name' we need to provide an __init__.py,
729    # so link the main entry point to this.
730    os.symlink(name + '.py', os.path.join(group_dir, name,
731                            '__init__.py'))
732
733    # The test is now installed.
734    return (group, name)
735
736
737def _call_test_function(func, *args, **dargs):
738    """Calls a test function and translates exceptions so that errors
739    inside test code are considered test failures."""
740    try:
741        return func(*args, **dargs)
742    except error.AutotestError:
743        raise
744    except Exception, e:
745        # Other exceptions must be treated as a FAIL when
746        # raised during the test functions
747        raise error.UnhandledTestFail(e)
748
749
750def runtest(job, url, tag, args, dargs,
751            local_namespace={}, global_namespace={},
752            before_test_hook=None, after_test_hook=None,
753            before_iteration_hook=None, after_iteration_hook=None):
754    local_namespace = local_namespace.copy()
755    global_namespace = global_namespace.copy()
756    # if this is not a plain test name then download and install the
757    # specified test
758    if url.endswith('.tar.bz2'):
759        (testgroup, testname) = _installtest(job, url)
760        bindir = os.path.join(job.testdir, 'download', testgroup, testname)
761        importdir = os.path.join(job.testdir, 'download')
762        modulename = '%s.%s' % (re.sub('/', '.', testgroup), testname)
763        classname = '%s.%s' % (modulename, testname)
764        path = testname
765    else:
766        # If the test is local, it may be under either testdir or site_testdir.
767        # Tests in site_testdir override tests defined in testdir
768        testname = path = url
769        testgroup = ''
770        path = re.sub(':', '/', testname)
771        modulename = os.path.basename(path)
772        classname = '%s.%s' % (modulename, modulename)
773
774        # Try installing the test package
775        # The job object may be either a server side job or a client side job.
776        # 'install_pkg' method will be present only if it's a client side job.
777        if hasattr(job, 'install_pkg'):
778            try:
779                bindir = os.path.join(job.testdir, testname)
780                job.install_pkg(testname, 'test', bindir)
781            except error.PackageInstallError:
782                # continue as a fall back mechanism and see if the test code
783                # already exists on the machine
784                pass
785
786        bindir = None
787        for dir in [job.testdir, getattr(job, 'site_testdir', None)]:
788            if dir is not None and os.path.exists(os.path.join(dir, path)):
789                importdir = bindir = os.path.join(dir, path)
790        if not bindir:
791            raise error.TestError(testname + ': test does not exist')
792
793    subdir = os.path.join(dargs.pop('master_testpath', ""), testname)
794    outputdir = os.path.join(job.resultdir, subdir)
795    if tag:
796        outputdir += '.' + tag
797
798    local_namespace['job'] = job
799    local_namespace['bindir'] = bindir
800    local_namespace['outputdir'] = outputdir
801
802    sys.path.insert(0, importdir)
803    try:
804        exec ('import %s' % modulename, local_namespace, global_namespace)
805        exec ("mytest = %s(job, bindir, outputdir)" % classname,
806              local_namespace, global_namespace)
807    finally:
808        sys.path.pop(0)
809
810    pwd = os.getcwd()
811    os.chdir(outputdir)
812
813    try:
814        mytest = global_namespace['mytest']
815        mytest.success = False
816        if before_test_hook:
817            before_test_hook(mytest)
818
819        # we use the register iteration hooks methods to register the passed
820        # in hooks
821        if before_iteration_hook:
822            mytest.register_before_iteration_hook(before_iteration_hook)
823        if after_iteration_hook:
824            mytest.register_after_iteration_hook(after_iteration_hook)
825        mytest._exec(args, dargs)
826        mytest.success = True
827    finally:
828        os.chdir(pwd)
829        if after_test_hook:
830            after_test_hook(mytest)
831        shutil.rmtree(mytest.tmpdir, ignore_errors=True)
832