1import os
2import operator
3import sys
4import contextlib
5import itertools
6import unittest
7from distutils.errors import DistutilsError, DistutilsOptionError
8from distutils import log
9from unittest import TestLoader
10
11from setuptools.extern import six
12from setuptools.extern.six.moves import map, filter
13
14from pkg_resources import (resource_listdir, resource_exists, normalize_path,
15                           working_set, _namespace_packages, evaluate_marker,
16                           add_activation_listener, require, EntryPoint)
17from setuptools import Command
18
19
20class ScanningLoader(TestLoader):
21
22    def __init__(self):
23        TestLoader.__init__(self)
24        self._visited = set()
25
26    def loadTestsFromModule(self, module, pattern=None):
27        """Return a suite of all tests cases contained in the given module
28
29        If the module is a package, load tests from all the modules in it.
30        If the module has an ``additional_tests`` function, call it and add
31        the return value to the tests.
32        """
33        if module in self._visited:
34            return None
35        self._visited.add(module)
36
37        tests = []
38        tests.append(TestLoader.loadTestsFromModule(self, module))
39
40        if hasattr(module, "additional_tests"):
41            tests.append(module.additional_tests())
42
43        if hasattr(module, '__path__'):
44            for file in resource_listdir(module.__name__, ''):
45                if file.endswith('.py') and file != '__init__.py':
46                    submodule = module.__name__ + '.' + file[:-3]
47                else:
48                    if resource_exists(module.__name__, file + '/__init__.py'):
49                        submodule = module.__name__ + '.' + file
50                    else:
51                        continue
52                tests.append(self.loadTestsFromName(submodule))
53
54        if len(tests) != 1:
55            return self.suiteClass(tests)
56        else:
57            return tests[0]  # don't create a nested suite for only one return
58
59
60# adapted from jaraco.classes.properties:NonDataProperty
61class NonDataProperty(object):
62    def __init__(self, fget):
63        self.fget = fget
64
65    def __get__(self, obj, objtype=None):
66        if obj is None:
67            return self
68        return self.fget(obj)
69
70
71class test(Command):
72    """Command to run unit tests after in-place build"""
73
74    description = "run unit tests after in-place build"
75
76    user_options = [
77        ('test-module=', 'm', "Run 'test_suite' in specified module"),
78        ('test-suite=', 's',
79         "Run single test, case or suite (e.g. 'module.test_suite')"),
80        ('test-runner=', 'r', "Test runner to use"),
81    ]
82
83    def initialize_options(self):
84        self.test_suite = None
85        self.test_module = None
86        self.test_loader = None
87        self.test_runner = None
88
89    def finalize_options(self):
90
91        if self.test_suite and self.test_module:
92            msg = "You may specify a module or a suite, but not both"
93            raise DistutilsOptionError(msg)
94
95        if self.test_suite is None:
96            if self.test_module is None:
97                self.test_suite = self.distribution.test_suite
98            else:
99                self.test_suite = self.test_module + ".test_suite"
100
101        if self.test_loader is None:
102            self.test_loader = getattr(self.distribution, 'test_loader', None)
103        if self.test_loader is None:
104            self.test_loader = "setuptools.command.test:ScanningLoader"
105        if self.test_runner is None:
106            self.test_runner = getattr(self.distribution, 'test_runner', None)
107
108    @NonDataProperty
109    def test_args(self):
110        return list(self._test_args())
111
112    def _test_args(self):
113        if not self.test_suite and sys.version_info >= (2, 7):
114            yield 'discover'
115        if self.verbose:
116            yield '--verbose'
117        if self.test_suite:
118            yield self.test_suite
119
120    def with_project_on_sys_path(self, func):
121        """
122        Backward compatibility for project_on_sys_path context.
123        """
124        with self.project_on_sys_path():
125            func()
126
127    @contextlib.contextmanager
128    def project_on_sys_path(self, include_dists=[]):
129        with_2to3 = six.PY3 and getattr(self.distribution, 'use_2to3', False)
130
131        if with_2to3:
132            # If we run 2to3 we can not do this inplace:
133
134            # Ensure metadata is up-to-date
135            self.reinitialize_command('build_py', inplace=0)
136            self.run_command('build_py')
137            bpy_cmd = self.get_finalized_command("build_py")
138            build_path = normalize_path(bpy_cmd.build_lib)
139
140            # Build extensions
141            self.reinitialize_command('egg_info', egg_base=build_path)
142            self.run_command('egg_info')
143
144            self.reinitialize_command('build_ext', inplace=0)
145            self.run_command('build_ext')
146        else:
147            # Without 2to3 inplace works fine:
148            self.run_command('egg_info')
149
150            # Build extensions in-place
151            self.reinitialize_command('build_ext', inplace=1)
152            self.run_command('build_ext')
153
154        ei_cmd = self.get_finalized_command("egg_info")
155
156        old_path = sys.path[:]
157        old_modules = sys.modules.copy()
158
159        try:
160            project_path = normalize_path(ei_cmd.egg_base)
161            sys.path.insert(0, project_path)
162            working_set.__init__()
163            add_activation_listener(lambda dist: dist.activate())
164            require('%s==%s' % (ei_cmd.egg_name, ei_cmd.egg_version))
165            with self.paths_on_pythonpath([project_path]):
166                yield
167        finally:
168            sys.path[:] = old_path
169            sys.modules.clear()
170            sys.modules.update(old_modules)
171            working_set.__init__()
172
173    @staticmethod
174    @contextlib.contextmanager
175    def paths_on_pythonpath(paths):
176        """
177        Add the indicated paths to the head of the PYTHONPATH environment
178        variable so that subprocesses will also see the packages at
179        these paths.
180
181        Do this in a context that restores the value on exit.
182        """
183        nothing = object()
184        orig_pythonpath = os.environ.get('PYTHONPATH', nothing)
185        current_pythonpath = os.environ.get('PYTHONPATH', '')
186        try:
187            prefix = os.pathsep.join(paths)
188            to_join = filter(None, [prefix, current_pythonpath])
189            new_path = os.pathsep.join(to_join)
190            if new_path:
191                os.environ['PYTHONPATH'] = new_path
192            yield
193        finally:
194            if orig_pythonpath is nothing:
195                os.environ.pop('PYTHONPATH', None)
196            else:
197                os.environ['PYTHONPATH'] = orig_pythonpath
198
199    @staticmethod
200    def install_dists(dist):
201        """
202        Install the requirements indicated by self.distribution and
203        return an iterable of the dists that were built.
204        """
205        ir_d = dist.fetch_build_eggs(dist.install_requires)
206        tr_d = dist.fetch_build_eggs(dist.tests_require or [])
207        er_d = dist.fetch_build_eggs(
208            v for k, v in dist.extras_require.items()
209            if k.startswith(':') and evaluate_marker(k[1:])
210        )
211        return itertools.chain(ir_d, tr_d, er_d)
212
213    def run(self):
214        installed_dists = self.install_dists(self.distribution)
215
216        cmd = ' '.join(self._argv)
217        if self.dry_run:
218            self.announce('skipping "%s" (dry run)' % cmd)
219            return
220
221        self.announce('running "%s"' % cmd)
222
223        paths = map(operator.attrgetter('location'), installed_dists)
224        with self.paths_on_pythonpath(paths):
225            with self.project_on_sys_path():
226                self.run_tests()
227
228    def run_tests(self):
229        # Purge modules under test from sys.modules. The test loader will
230        # re-import them from the build location. Required when 2to3 is used
231        # with namespace packages.
232        if six.PY3 and getattr(self.distribution, 'use_2to3', False):
233            module = self.test_suite.split('.')[0]
234            if module in _namespace_packages:
235                del_modules = []
236                if module in sys.modules:
237                    del_modules.append(module)
238                module += '.'
239                for name in sys.modules:
240                    if name.startswith(module):
241                        del_modules.append(name)
242                list(map(sys.modules.__delitem__, del_modules))
243
244        test = unittest.main(
245            None, None, self._argv,
246            testLoader=self._resolve_as_ep(self.test_loader),
247            testRunner=self._resolve_as_ep(self.test_runner),
248            exit=False,
249        )
250        if not test.result.wasSuccessful():
251            msg = 'Test failed: %s' % test.result
252            self.announce(msg, log.ERROR)
253            raise DistutilsError(msg)
254
255    @property
256    def _argv(self):
257        return ['unittest'] + self.test_args
258
259    @staticmethod
260    def _resolve_as_ep(val):
261        """
262        Load the indicated attribute value, called, as a as if it were
263        specified as an entry point.
264        """
265        if val is None:
266            return
267        parsed = EntryPoint.parse("x=" + val)
268        return parsed.resolve()()
269