resources.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. import os
  2. import tempfile
  3. from . import abc as resources_abc
  4. from contextlib import contextmanager, suppress
  5. from importlib import import_module
  6. from importlib.abc import ResourceLoader
  7. from io import BytesIO, TextIOWrapper
  8. from pathlib import Path
  9. from types import ModuleType
  10. from typing import Iterable, Iterator, Optional, Set, Union # noqa: F401
  11. from typing import cast
  12. from typing.io import BinaryIO, TextIO
  13. from zipimport import ZipImportError
  14. __all__ = [
  15. 'Package',
  16. 'Resource',
  17. 'contents',
  18. 'is_resource',
  19. 'open_binary',
  20. 'open_text',
  21. 'path',
  22. 'read_binary',
  23. 'read_text',
  24. ]
  25. Package = Union[str, ModuleType]
  26. Resource = Union[str, os.PathLike]
  27. def _get_package(package) -> ModuleType:
  28. """Take a package name or module object and return the module.
  29. If a name, the module is imported. If the passed or imported module
  30. object is not a package, raise an exception.
  31. """
  32. if hasattr(package, '__spec__'):
  33. if package.__spec__.submodule_search_locations is None:
  34. raise TypeError('{!r} is not a package'.format(
  35. package.__spec__.name))
  36. else:
  37. return package
  38. else:
  39. module = import_module(package)
  40. if module.__spec__.submodule_search_locations is None:
  41. raise TypeError('{!r} is not a package'.format(package))
  42. else:
  43. return module
  44. def _normalize_path(path) -> str:
  45. """Normalize a path by ensuring it is a string.
  46. If the resulting string contains path separators, an exception is raised.
  47. """
  48. parent, file_name = os.path.split(path)
  49. if parent:
  50. raise ValueError('{!r} must be only a file name'.format(path))
  51. else:
  52. return file_name
  53. def _get_resource_reader(
  54. package: ModuleType) -> Optional[resources_abc.ResourceReader]:
  55. # Return the package's loader if it's a ResourceReader. We can't use
  56. # a issubclass() check here because apparently abc.'s __subclasscheck__()
  57. # hook wants to create a weak reference to the object, but
  58. # zipimport.zipimporter does not support weak references, resulting in a
  59. # TypeError. That seems terrible.
  60. spec = package.__spec__
  61. if hasattr(spec.loader, 'get_resource_reader'):
  62. return cast(resources_abc.ResourceReader,
  63. spec.loader.get_resource_reader(spec.name))
  64. return None
  65. def _check_location(package):
  66. if package.__spec__.origin is None or not package.__spec__.has_location:
  67. raise FileNotFoundError(f'Package has no location {package!r}')
  68. def open_binary(package: Package, resource: Resource) -> BinaryIO:
  69. """Return a file-like object opened for binary reading of the resource."""
  70. resource = _normalize_path(resource)
  71. package = _get_package(package)
  72. reader = _get_resource_reader(package)
  73. if reader is not None:
  74. return reader.open_resource(resource)
  75. _check_location(package)
  76. absolute_package_path = os.path.abspath(package.__spec__.origin)
  77. package_path = os.path.dirname(absolute_package_path)
  78. full_path = os.path.join(package_path, resource)
  79. try:
  80. return open(full_path, mode='rb')
  81. except OSError:
  82. # Just assume the loader is a resource loader; all the relevant
  83. # importlib.machinery loaders are and an AttributeError for
  84. # get_data() will make it clear what is needed from the loader.
  85. loader = cast(ResourceLoader, package.__spec__.loader)
  86. data = None
  87. if hasattr(package.__spec__.loader, 'get_data'):
  88. with suppress(OSError):
  89. data = loader.get_data(full_path)
  90. if data is None:
  91. package_name = package.__spec__.name
  92. message = '{!r} resource not found in {!r}'.format(
  93. resource, package_name)
  94. raise FileNotFoundError(message)
  95. else:
  96. return BytesIO(data)
  97. def open_text(package: Package,
  98. resource: Resource,
  99. encoding: str = 'utf-8',
  100. errors: str = 'strict') -> TextIO:
  101. """Return a file-like object opened for text reading of the resource."""
  102. resource = _normalize_path(resource)
  103. package = _get_package(package)
  104. reader = _get_resource_reader(package)
  105. if reader is not None:
  106. return TextIOWrapper(reader.open_resource(resource), encoding, errors)
  107. _check_location(package)
  108. absolute_package_path = os.path.abspath(package.__spec__.origin)
  109. package_path = os.path.dirname(absolute_package_path)
  110. full_path = os.path.join(package_path, resource)
  111. try:
  112. return open(full_path, mode='r', encoding=encoding, errors=errors)
  113. except OSError:
  114. # Just assume the loader is a resource loader; all the relevant
  115. # importlib.machinery loaders are and an AttributeError for
  116. # get_data() will make it clear what is needed from the loader.
  117. loader = cast(ResourceLoader, package.__spec__.loader)
  118. data = None
  119. if hasattr(package.__spec__.loader, 'get_data'):
  120. with suppress(OSError):
  121. data = loader.get_data(full_path)
  122. if data is None:
  123. package_name = package.__spec__.name
  124. message = '{!r} resource not found in {!r}'.format(
  125. resource, package_name)
  126. raise FileNotFoundError(message)
  127. else:
  128. return TextIOWrapper(BytesIO(data), encoding, errors)
  129. def read_binary(package: Package, resource: Resource) -> bytes:
  130. """Return the binary contents of the resource."""
  131. resource = _normalize_path(resource)
  132. package = _get_package(package)
  133. with open_binary(package, resource) as fp:
  134. return fp.read()
  135. def read_text(package: Package,
  136. resource: Resource,
  137. encoding: str = 'utf-8',
  138. errors: str = 'strict') -> str:
  139. """Return the decoded string of the resource.
  140. The decoding-related arguments have the same semantics as those of
  141. bytes.decode().
  142. """
  143. resource = _normalize_path(resource)
  144. package = _get_package(package)
  145. with open_text(package, resource, encoding, errors) as fp:
  146. return fp.read()
  147. @contextmanager
  148. def path(package: Package, resource: Resource) -> Iterator[Path]:
  149. """A context manager providing a file path object to the resource.
  150. If the resource does not already exist on its own on the file system,
  151. a temporary file will be created. If the file was created, the file
  152. will be deleted upon exiting the context manager (no exception is
  153. raised if the file was deleted prior to the context manager
  154. exiting).
  155. """
  156. resource = _normalize_path(resource)
  157. package = _get_package(package)
  158. reader = _get_resource_reader(package)
  159. if reader is not None:
  160. try:
  161. yield Path(reader.resource_path(resource))
  162. return
  163. except FileNotFoundError:
  164. pass
  165. else:
  166. _check_location(package)
  167. # Fall-through for both the lack of resource_path() *and* if
  168. # resource_path() raises FileNotFoundError.
  169. package_directory = Path(package.__spec__.origin).parent
  170. file_path = package_directory / resource
  171. if file_path.exists():
  172. yield file_path
  173. else:
  174. with open_binary(package, resource) as fp:
  175. data = fp.read()
  176. # Not using tempfile.NamedTemporaryFile as it leads to deeper 'try'
  177. # blocks due to the need to close the temporary file to work on
  178. # Windows properly.
  179. fd, raw_path = tempfile.mkstemp()
  180. try:
  181. os.write(fd, data)
  182. os.close(fd)
  183. yield Path(raw_path)
  184. finally:
  185. try:
  186. os.remove(raw_path)
  187. except FileNotFoundError:
  188. pass
  189. def is_resource(package: Package, name: str) -> bool:
  190. """True if 'name' is a resource inside 'package'.
  191. Directories are *not* resources.
  192. """
  193. package = _get_package(package)
  194. _normalize_path(name)
  195. reader = _get_resource_reader(package)
  196. if reader is not None:
  197. return reader.is_resource(name)
  198. try:
  199. package_contents = set(contents(package))
  200. except (NotADirectoryError, FileNotFoundError):
  201. return False
  202. if name not in package_contents:
  203. return False
  204. # Just because the given file_name lives as an entry in the package's
  205. # contents doesn't necessarily mean it's a resource. Directories are not
  206. # resources, so let's try to find out if it's a directory or not.
  207. path = Path(package.__spec__.origin).parent / name
  208. return path.is_file()
  209. def contents(package: Package) -> Iterable[str]:
  210. """Return an iterable of entries in 'package'.
  211. Note that not all entries are resources. Specifically, directories are
  212. not considered resources. Use `is_resource()` on each entry returned here
  213. to check if it is a resource or not.
  214. """
  215. package = _get_package(package)
  216. reader = _get_resource_reader(package)
  217. if reader is not None:
  218. return reader.contents()
  219. # Is the package a namespace package? By definition, namespace packages
  220. # cannot have resources. We could use _check_location() and catch the
  221. # exception, but that's extra work, so just inline the check.
  222. elif package.__spec__.origin is None or not package.__spec__.has_location:
  223. return ()
  224. else:
  225. package_directory = Path(package.__spec__.origin).parent
  226. return os.listdir(package_directory)