1import os
2
3from . import abc as resources_abc
4from . import _common
5from ._common import as_file
6from contextlib import contextmanager, suppress
7from importlib import import_module
8from importlib.abc import ResourceLoader
9from io import BytesIO, TextIOWrapper
10from pathlib import Path
11from types import ModuleType
12from typing import ContextManager, Iterable, Optional, Union
13from typing import cast
14from typing.io import BinaryIO, TextIO
15
16
17__all__ = [
18    'Package',
19    'Resource',
20    'as_file',
21    'contents',
22    'files',
23    'is_resource',
24    'open_binary',
25    'open_text',
26    'path',
27    'read_binary',
28    'read_text',
29    ]
30
31
32Package = Union[str, ModuleType]
33Resource = Union[str, os.PathLike]
34
35
36def _resolve(name) -> ModuleType:
37    """If name is a string, resolve to a module."""
38    if hasattr(name, '__spec__'):
39        return name
40    return import_module(name)
41
42
43def _get_package(package) -> ModuleType:
44    """Take a package name or module object and return the module.
45
46    If a name, the module is imported.  If the resolved module
47    object is not a package, raise an exception.
48    """
49    module = _resolve(package)
50    if module.__spec__.submodule_search_locations is None:
51        raise TypeError('{!r} is not a package'.format(package))
52    return module
53
54
55def _normalize_path(path) -> str:
56    """Normalize a path by ensuring it is a string.
57
58    If the resulting string contains path separators, an exception is raised.
59    """
60    parent, file_name = os.path.split(path)
61    if parent:
62        raise ValueError('{!r} must be only a file name'.format(path))
63    return file_name
64
65
66def _get_resource_reader(
67        package: ModuleType) -> Optional[resources_abc.ResourceReader]:
68    # Return the package's loader if it's a ResourceReader.  We can't use
69    # a issubclass() check here because apparently abc.'s __subclasscheck__()
70    # hook wants to create a weak reference to the object, but
71    # zipimport.zipimporter does not support weak references, resulting in a
72    # TypeError.  That seems terrible.
73    spec = package.__spec__
74    if hasattr(spec.loader, 'get_resource_reader'):
75        return cast(resources_abc.ResourceReader,
76                    spec.loader.get_resource_reader(spec.name))
77    return None
78
79
80def _check_location(package):
81    if package.__spec__.origin is None or not package.__spec__.has_location:
82        raise FileNotFoundError(f'Package has no location {package!r}')
83
84
85def open_binary(package: Package, resource: Resource) -> BinaryIO:
86    """Return a file-like object opened for binary reading of the resource."""
87    resource = _normalize_path(resource)
88    package = _get_package(package)
89    reader = _get_resource_reader(package)
90    if reader is not None:
91        return reader.open_resource(resource)
92    absolute_package_path = os.path.abspath(
93        package.__spec__.origin or 'non-existent file')
94    package_path = os.path.dirname(absolute_package_path)
95    full_path = os.path.join(package_path, resource)
96    try:
97        return open(full_path, mode='rb')
98    except OSError:
99        # Just assume the loader is a resource loader; all the relevant
100        # importlib.machinery loaders are and an AttributeError for
101        # get_data() will make it clear what is needed from the loader.
102        loader = cast(ResourceLoader, package.__spec__.loader)
103        data = None
104        if hasattr(package.__spec__.loader, 'get_data'):
105            with suppress(OSError):
106                data = loader.get_data(full_path)
107        if data is None:
108            package_name = package.__spec__.name
109            message = '{!r} resource not found in {!r}'.format(
110                resource, package_name)
111            raise FileNotFoundError(message)
112        return BytesIO(data)
113
114
115def open_text(package: Package,
116              resource: Resource,
117              encoding: str = 'utf-8',
118              errors: str = 'strict') -> TextIO:
119    """Return a file-like object opened for text reading of the resource."""
120    return TextIOWrapper(
121        open_binary(package, resource), encoding=encoding, errors=errors)
122
123
124def read_binary(package: Package, resource: Resource) -> bytes:
125    """Return the binary contents of the resource."""
126    with open_binary(package, resource) as fp:
127        return fp.read()
128
129
130def read_text(package: Package,
131              resource: Resource,
132              encoding: str = 'utf-8',
133              errors: str = 'strict') -> str:
134    """Return the decoded string of the resource.
135
136    The decoding-related arguments have the same semantics as those of
137    bytes.decode().
138    """
139    with open_text(package, resource, encoding, errors) as fp:
140        return fp.read()
141
142
143def files(package: Package) -> resources_abc.Traversable:
144    """
145    Get a Traversable resource from a package
146    """
147    return _common.from_package(_get_package(package))
148
149
150def path(
151        package: Package, resource: Resource,
152        ) -> 'ContextManager[Path]':
153    """A context manager providing a file path object to the resource.
154
155    If the resource does not already exist on its own on the file system,
156    a temporary file will be created. If the file was created, the file
157    will be deleted upon exiting the context manager (no exception is
158    raised if the file was deleted prior to the context manager
159    exiting).
160    """
161    reader = _get_resource_reader(_get_package(package))
162    return (
163        _path_from_reader(reader, resource)
164        if reader else
165        _common.as_file(files(package).joinpath(_normalize_path(resource)))
166        )
167
168
169@contextmanager
170def _path_from_reader(reader, resource):
171    norm_resource = _normalize_path(resource)
172    with suppress(FileNotFoundError):
173        yield Path(reader.resource_path(norm_resource))
174        return
175    opener_reader = reader.open_resource(norm_resource)
176    with _common._tempfile(opener_reader.read, suffix=norm_resource) as res:
177        yield res
178
179
180def is_resource(package: Package, name: str) -> bool:
181    """True if 'name' is a resource inside 'package'.
182
183    Directories are *not* resources.
184    """
185    package = _get_package(package)
186    _normalize_path(name)
187    reader = _get_resource_reader(package)
188    if reader is not None:
189        return reader.is_resource(name)
190    package_contents = set(contents(package))
191    if name not in package_contents:
192        return False
193    return (_common.from_package(package) / name).is_file()
194
195
196def contents(package: Package) -> Iterable[str]:
197    """Return an iterable of entries in 'package'.
198
199    Note that not all entries are resources.  Specifically, directories are
200    not considered resources.  Use `is_resource()` on each entry returned here
201    to check if it is a resource or not.
202    """
203    package = _get_package(package)
204    reader = _get_resource_reader(package)
205    if reader is not None:
206        return reader.contents()
207    # Is the package a namespace package?  By definition, namespace packages
208    # cannot have resources.
209    namespace = (
210        package.__spec__.origin is None or
211        package.__spec__.origin == 'namespace'
212        )
213    if namespace or not package.__spec__.has_location:
214        return ()
215    return list(item.name for item in _common.from_package(package).iterdir())
216