1import abc
2import builtins
3import contextlib
4import errno
5import functools
6import importlib
7from importlib import machinery, util, invalidate_caches
8from importlib.abc import ResourceReader
9import io
10import os
11import os.path
12from pathlib import Path, PurePath
13from test import support
14import unittest
15import sys
16import tempfile
17import types
18
19from . import data01
20from . import zipdata01
21
22
23BUILTINS = types.SimpleNamespace()
24BUILTINS.good_name = None
25BUILTINS.bad_name = None
26if 'errno' in sys.builtin_module_names:
27    BUILTINS.good_name = 'errno'
28if 'importlib' not in sys.builtin_module_names:
29    BUILTINS.bad_name = 'importlib'
30
31EXTENSIONS = types.SimpleNamespace()
32EXTENSIONS.path = None
33EXTENSIONS.ext = None
34EXTENSIONS.filename = None
35EXTENSIONS.file_path = None
36EXTENSIONS.name = '_testcapi'
37
38def _extension_details():
39    global EXTENSIONS
40    for path in sys.path:
41        for ext in machinery.EXTENSION_SUFFIXES:
42            filename = EXTENSIONS.name + ext
43            file_path = os.path.join(path, filename)
44            if os.path.exists(file_path):
45                EXTENSIONS.path = path
46                EXTENSIONS.ext = ext
47                EXTENSIONS.filename = filename
48                EXTENSIONS.file_path = file_path
49                return
50
51_extension_details()
52
53
54def import_importlib(module_name):
55    """Import a module from importlib both w/ and w/o _frozen_importlib."""
56    fresh = ('importlib',) if '.' in module_name else ()
57    frozen = support.import_fresh_module(module_name)
58    source = support.import_fresh_module(module_name, fresh=fresh,
59                                         blocked=('_frozen_importlib', '_frozen_importlib_external'))
60    return {'Frozen': frozen, 'Source': source}
61
62
63def specialize_class(cls, kind, base=None, **kwargs):
64    # XXX Support passing in submodule names--load (and cache) them?
65    # That would clean up the test modules a bit more.
66    if base is None:
67        base = unittest.TestCase
68    elif not isinstance(base, type):
69        base = base[kind]
70    name = '{}_{}'.format(kind, cls.__name__)
71    bases = (cls, base)
72    specialized = types.new_class(name, bases)
73    specialized.__module__ = cls.__module__
74    specialized._NAME = cls.__name__
75    specialized._KIND = kind
76    for attr, values in kwargs.items():
77        value = values[kind]
78        setattr(specialized, attr, value)
79    return specialized
80
81
82def split_frozen(cls, base=None, **kwargs):
83    frozen = specialize_class(cls, 'Frozen', base, **kwargs)
84    source = specialize_class(cls, 'Source', base, **kwargs)
85    return frozen, source
86
87
88def test_both(test_class, base=None, **kwargs):
89    return split_frozen(test_class, base, **kwargs)
90
91
92CASE_INSENSITIVE_FS = True
93# Windows is the only OS that is *always* case-insensitive
94# (OS X *can* be case-sensitive).
95if sys.platform not in ('win32', 'cygwin'):
96    changed_name = __file__.upper()
97    if changed_name == __file__:
98        changed_name = __file__.lower()
99    if not os.path.exists(changed_name):
100        CASE_INSENSITIVE_FS = False
101
102source_importlib = import_importlib('importlib')['Source']
103__import__ = {'Frozen': staticmethod(builtins.__import__),
104              'Source': staticmethod(source_importlib.__import__)}
105
106
107def case_insensitive_tests(test):
108    """Class decorator that nullifies tests requiring a case-insensitive
109    file system."""
110    return unittest.skipIf(not CASE_INSENSITIVE_FS,
111                            "requires a case-insensitive filesystem")(test)
112
113
114def submodule(parent, name, pkg_dir, content=''):
115    path = os.path.join(pkg_dir, name + '.py')
116    with open(path, 'w') as subfile:
117        subfile.write(content)
118    return '{}.{}'.format(parent, name), path
119
120
121@contextlib.contextmanager
122def uncache(*names):
123    """Uncache a module from sys.modules.
124
125    A basic sanity check is performed to prevent uncaching modules that either
126    cannot/shouldn't be uncached.
127
128    """
129    for name in names:
130        if name in ('sys', 'marshal', 'imp'):
131            raise ValueError(
132                "cannot uncache {0}".format(name))
133        try:
134            del sys.modules[name]
135        except KeyError:
136            pass
137    try:
138        yield
139    finally:
140        for name in names:
141            try:
142                del sys.modules[name]
143            except KeyError:
144                pass
145
146
147@contextlib.contextmanager
148def temp_module(name, content='', *, pkg=False):
149    conflicts = [n for n in sys.modules if n.partition('.')[0] == name]
150    with support.temp_cwd(None) as cwd:
151        with uncache(name, *conflicts):
152            with support.DirsOnSysPath(cwd):
153                invalidate_caches()
154
155                location = os.path.join(cwd, name)
156                if pkg:
157                    modpath = os.path.join(location, '__init__.py')
158                    os.mkdir(name)
159                else:
160                    modpath = location + '.py'
161                    if content is None:
162                        # Make sure the module file gets created.
163                        content = ''
164                if content is not None:
165                    # not a namespace package
166                    with open(modpath, 'w') as modfile:
167                        modfile.write(content)
168                yield location
169
170
171@contextlib.contextmanager
172def import_state(**kwargs):
173    """Context manager to manage the various importers and stored state in the
174    sys module.
175
176    The 'modules' attribute is not supported as the interpreter state stores a
177    pointer to the dict that the interpreter uses internally;
178    reassigning to sys.modules does not have the desired effect.
179
180    """
181    originals = {}
182    try:
183        for attr, default in (('meta_path', []), ('path', []),
184                              ('path_hooks', []),
185                              ('path_importer_cache', {})):
186            originals[attr] = getattr(sys, attr)
187            if attr in kwargs:
188                new_value = kwargs[attr]
189                del kwargs[attr]
190            else:
191                new_value = default
192            setattr(sys, attr, new_value)
193        if len(kwargs):
194            raise ValueError(
195                    'unrecognized arguments: {0}'.format(kwargs.keys()))
196        yield
197    finally:
198        for attr, value in originals.items():
199            setattr(sys, attr, value)
200
201
202class _ImporterMock:
203
204    """Base class to help with creating importer mocks."""
205
206    def __init__(self, *names, module_code={}):
207        self.modules = {}
208        self.module_code = {}
209        for name in names:
210            if not name.endswith('.__init__'):
211                import_name = name
212            else:
213                import_name = name[:-len('.__init__')]
214            if '.' not in name:
215                package = None
216            elif import_name == name:
217                package = name.rsplit('.', 1)[0]
218            else:
219                package = import_name
220            module = types.ModuleType(import_name)
221            module.__loader__ = self
222            module.__file__ = '<mock __file__>'
223            module.__package__ = package
224            module.attr = name
225            if import_name != name:
226                module.__path__ = ['<mock __path__>']
227            self.modules[import_name] = module
228            if import_name in module_code:
229                self.module_code[import_name] = module_code[import_name]
230
231    def __getitem__(self, name):
232        return self.modules[name]
233
234    def __enter__(self):
235        self._uncache = uncache(*self.modules.keys())
236        self._uncache.__enter__()
237        return self
238
239    def __exit__(self, *exc_info):
240        self._uncache.__exit__(None, None, None)
241
242
243class mock_modules(_ImporterMock):
244
245    """Importer mock using PEP 302 APIs."""
246
247    def find_module(self, fullname, path=None):
248        if fullname not in self.modules:
249            return None
250        else:
251            return self
252
253    def load_module(self, fullname):
254        if fullname not in self.modules:
255            raise ImportError
256        else:
257            sys.modules[fullname] = self.modules[fullname]
258            if fullname in self.module_code:
259                try:
260                    self.module_code[fullname]()
261                except Exception:
262                    del sys.modules[fullname]
263                    raise
264            return self.modules[fullname]
265
266
267class mock_spec(_ImporterMock):
268
269    """Importer mock using PEP 451 APIs."""
270
271    def find_spec(self, fullname, path=None, parent=None):
272        try:
273            module = self.modules[fullname]
274        except KeyError:
275            return None
276        spec = util.spec_from_file_location(
277                fullname, module.__file__, loader=self,
278                submodule_search_locations=getattr(module, '__path__', None))
279        return spec
280
281    def create_module(self, spec):
282        if spec.name not in self.modules:
283            raise ImportError
284        return self.modules[spec.name]
285
286    def exec_module(self, module):
287        try:
288            self.module_code[module.__spec__.name]()
289        except KeyError:
290            pass
291
292
293def writes_bytecode_files(fxn):
294    """Decorator to protect sys.dont_write_bytecode from mutation and to skip
295    tests that require it to be set to False."""
296    if sys.dont_write_bytecode:
297        return lambda *args, **kwargs: None
298    @functools.wraps(fxn)
299    def wrapper(*args, **kwargs):
300        original = sys.dont_write_bytecode
301        sys.dont_write_bytecode = False
302        try:
303            to_return = fxn(*args, **kwargs)
304        finally:
305            sys.dont_write_bytecode = original
306        return to_return
307    return wrapper
308
309
310def ensure_bytecode_path(bytecode_path):
311    """Ensure that the __pycache__ directory for PEP 3147 pyc file exists.
312
313    :param bytecode_path: File system path to PEP 3147 pyc file.
314    """
315    try:
316        os.mkdir(os.path.dirname(bytecode_path))
317    except OSError as error:
318        if error.errno != errno.EEXIST:
319            raise
320
321
322@contextlib.contextmanager
323def create_modules(*names):
324    """Temporarily create each named module with an attribute (named 'attr')
325    that contains the name passed into the context manager that caused the
326    creation of the module.
327
328    All files are created in a temporary directory returned by
329    tempfile.mkdtemp(). This directory is inserted at the beginning of
330    sys.path. When the context manager exits all created files (source and
331    bytecode) are explicitly deleted.
332
333    No magic is performed when creating packages! This means that if you create
334    a module within a package you must also create the package's __init__ as
335    well.
336
337    """
338    source = 'attr = {0!r}'
339    created_paths = []
340    mapping = {}
341    state_manager = None
342    uncache_manager = None
343    try:
344        temp_dir = tempfile.mkdtemp()
345        mapping['.root'] = temp_dir
346        import_names = set()
347        for name in names:
348            if not name.endswith('__init__'):
349                import_name = name
350            else:
351                import_name = name[:-len('.__init__')]
352            import_names.add(import_name)
353            if import_name in sys.modules:
354                del sys.modules[import_name]
355            name_parts = name.split('.')
356            file_path = temp_dir
357            for directory in name_parts[:-1]:
358                file_path = os.path.join(file_path, directory)
359                if not os.path.exists(file_path):
360                    os.mkdir(file_path)
361                    created_paths.append(file_path)
362            file_path = os.path.join(file_path, name_parts[-1] + '.py')
363            with open(file_path, 'w') as file:
364                file.write(source.format(name))
365            created_paths.append(file_path)
366            mapping[name] = file_path
367        uncache_manager = uncache(*import_names)
368        uncache_manager.__enter__()
369        state_manager = import_state(path=[temp_dir])
370        state_manager.__enter__()
371        yield mapping
372    finally:
373        if state_manager is not None:
374            state_manager.__exit__(None, None, None)
375        if uncache_manager is not None:
376            uncache_manager.__exit__(None, None, None)
377        support.rmtree(temp_dir)
378
379
380def mock_path_hook(*entries, importer):
381    """A mock sys.path_hooks entry."""
382    def hook(entry):
383        if entry not in entries:
384            raise ImportError
385        return importer
386    return hook
387
388
389class CASEOKTestBase:
390
391    def caseok_env_changed(self, *, should_exist):
392        possibilities = b'PYTHONCASEOK', 'PYTHONCASEOK'
393        if any(x in self.importlib._bootstrap_external._os.environ
394                    for x in possibilities) != should_exist:
395            self.skipTest('os.environ changes not reflected in _os.environ')
396
397
398def create_package(file, path, is_package=True, contents=()):
399    class Reader(ResourceReader):
400        def get_resource_reader(self, package):
401            return self
402
403        def open_resource(self, path):
404            self._path = path
405            if isinstance(file, Exception):
406                raise file
407            else:
408                return file
409
410        def resource_path(self, path_):
411            self._path = path_
412            if isinstance(path, Exception):
413                raise path
414            else:
415                return path
416
417        def is_resource(self, path_):
418            self._path = path_
419            if isinstance(path, Exception):
420                raise path
421            for entry in contents:
422                parts = entry.split('/')
423                if len(parts) == 1 and parts[0] == path_:
424                    return True
425            return False
426
427        def contents(self):
428            if isinstance(path, Exception):
429                raise path
430            # There's no yield from in baseball, er, Python 2.
431            for entry in contents:
432                yield entry
433
434    name = 'testingpackage'
435    # Unforunately importlib.util.module_from_spec() was not introduced until
436    # Python 3.5.
437    module = types.ModuleType(name)
438    loader = Reader()
439    spec = machinery.ModuleSpec(
440        name, loader,
441        origin='does-not-exist',
442        is_package=is_package)
443    module.__spec__ = spec
444    module.__loader__ = loader
445    return module
446
447
448class CommonResourceTests(abc.ABC):
449    @abc.abstractmethod
450    def execute(self, package, path):
451        raise NotImplementedError
452
453    def test_package_name(self):
454        # Passing in the package name should succeed.
455        self.execute(data01.__name__, 'utf-8.file')
456
457    def test_package_object(self):
458        # Passing in the package itself should succeed.
459        self.execute(data01, 'utf-8.file')
460
461    def test_string_path(self):
462        # Passing in a string for the path should succeed.
463        path = 'utf-8.file'
464        self.execute(data01, path)
465
466    @unittest.skipIf(sys.version_info < (3, 6), 'requires os.PathLike support')
467    def test_pathlib_path(self):
468        # Passing in a pathlib.PurePath object for the path should succeed.
469        path = PurePath('utf-8.file')
470        self.execute(data01, path)
471
472    def test_absolute_path(self):
473        # An absolute path is a ValueError.
474        path = Path(__file__)
475        full_path = path.parent/'utf-8.file'
476        with self.assertRaises(ValueError):
477            self.execute(data01, full_path)
478
479    def test_relative_path(self):
480        # A reative path is a ValueError.
481        with self.assertRaises(ValueError):
482            self.execute(data01, '../data01/utf-8.file')
483
484    def test_importing_module_as_side_effect(self):
485        # The anchor package can already be imported.
486        del sys.modules[data01.__name__]
487        self.execute(data01.__name__, 'utf-8.file')
488
489    def test_non_package_by_name(self):
490        # The anchor package cannot be a module.
491        with self.assertRaises(TypeError):
492            self.execute(__name__, 'utf-8.file')
493
494    def test_non_package_by_package(self):
495        # The anchor package cannot be a module.
496        with self.assertRaises(TypeError):
497            module = sys.modules['test.test_importlib.util']
498            self.execute(module, 'utf-8.file')
499
500    @unittest.skipIf(sys.version_info < (3,), 'No ResourceReader in Python 2')
501    def test_resource_opener(self):
502        bytes_data = io.BytesIO(b'Hello, world!')
503        package = create_package(file=bytes_data, path=FileNotFoundError())
504        self.execute(package, 'utf-8.file')
505        self.assertEqual(package.__loader__._path, 'utf-8.file')
506
507    @unittest.skipIf(sys.version_info < (3,), 'No ResourceReader in Python 2')
508    def test_resource_path(self):
509        bytes_data = io.BytesIO(b'Hello, world!')
510        path = __file__
511        package = create_package(file=bytes_data, path=path)
512        self.execute(package, 'utf-8.file')
513        self.assertEqual(package.__loader__._path, 'utf-8.file')
514
515    def test_useless_loader(self):
516        package = create_package(file=FileNotFoundError(),
517                                 path=FileNotFoundError())
518        with self.assertRaises(FileNotFoundError):
519            self.execute(package, 'utf-8.file')
520
521
522class ZipSetupBase:
523    ZIP_MODULE = None
524
525    @classmethod
526    def setUpClass(cls):
527        data_path = Path(cls.ZIP_MODULE.__file__)
528        data_dir = data_path.parent
529        cls._zip_path = str(data_dir / 'ziptestdata.zip')
530        sys.path.append(cls._zip_path)
531        cls.data = importlib.import_module('ziptestdata')
532
533    @classmethod
534    def tearDownClass(cls):
535        try:
536            sys.path.remove(cls._zip_path)
537        except ValueError:
538            pass
539
540        try:
541            del sys.path_importer_cache[cls._zip_path]
542            del sys.modules[cls.data.__name__]
543        except KeyError:
544            pass
545
546        try:
547            del cls.data
548            del cls._zip_path
549        except AttributeError:
550            pass
551
552    def setUp(self):
553        modules = support.modules_setup()
554        self.addCleanup(support.modules_cleanup, *modules)
555
556
557class ZipSetup(ZipSetupBase):
558    ZIP_MODULE = zipdata01                          # type: ignore
559