# Copyright 2021 The Pigweed Authors # # Licensed under the Apache License, Version 2.0 (the "License"); you may not # use this file except in compliance with the License. You may obtain a copy of # the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations under # the License. """Provides general purpose plugin functionality. As used in this module, a plugin is a Python object associated with a name. Plugins are registered in a Registry. The plugin object is typically a function, but can be anything. Plugins may be loaded in a variety of ways: - Listed in a plugins file in the file system (e.g. as "name module target"). - Registered in a Python file using a decorator (@my_registry.plugin). - Registered directly or by name with function calls on a registry object. This functionality can be used to create plugins for command line tools, interactive consoles, or anything else. Pigweed's pw command uses this module for its plugins. """ import collections import collections.abc import importlib import inspect import logging from pathlib import Path import pkgutil import sys from textwrap import TextWrapper import types from typing import Any, Callable, Dict, List, Iterable, Iterator, Optional, Set _LOG = logging.getLogger(__name__) _BUILT_IN = '' class Error(Exception): """Indicates that a plugin is invalid or cannot be registered.""" def __str__(self): """Displays the error as a string, including the __cause__ if present. Adding __cause__ gives useful context without displaying a backtrace. """ if self.__cause__ is None: return super().__str__() return (f'{super().__str__()} ' f'({type(self.__cause__).__name__}: {self.__cause__})') def _get_module(member: object) -> types.ModuleType: """Gets the module or a dummy module if the module isn't found.""" module = inspect.getmodule(member) return module if module else types.ModuleType('') class Plugin: """Represents a Python entity registered as a plugin. Each plugin resolves to a Python object, typically a function. """ @classmethod def from_name(cls, name: str, module_name: str, member_name: str, source: Optional[Path]) -> 'Plugin': """Creates a plugin by module and attribute name. Args: name: the name of the plugin module_name: Python module name (e.g. 'foo_pkg.bar') member_name: the name of the member in the module source: path to the plugins file that declared this plugin, if any """ # Attempt to access the module and member. Catch any errors that might # occur, since a bad plugin shouldn't be a fatal error. try: module = importlib.import_module(module_name) except Exception as err: raise Error(f'Failed to import module "{module_name}"') from err try: member = getattr(module, member_name) except AttributeError as err: raise Error( f'"{module_name}.{member_name}" does not exist') from err return cls(name, member, source) def __init__(self, name: str, target: Any, source: Path = None) -> None: """Creates a plugin for the provided target.""" self.name = name self._module = _get_module(target) self.target = target self.source = source @property def target_name(self) -> str: return (f'{self._module.__name__}.' f'{getattr(self.target, "__name__", self.target)}') @property def source_name(self) -> str: return _BUILT_IN if self.source is None else str(self.source) def run_with_argv(self, argv: Iterable[str]) -> int: """Sets sys.argv and calls the plugin function. This is used to call a plugin as if from the command line. """ original_sys_argv = sys.argv sys.argv = [f'pw {self.name}', *argv] try: return self.target() finally: sys.argv = original_sys_argv def help(self, full: bool = False) -> str: """Returns a description of this plugin from its docstring.""" docstring = self.target.__doc__ or self._module.__doc__ or '' return docstring if full else next(iter(docstring.splitlines()), '') def details(self, full: bool = False) -> Iterator[str]: yield f'help {self.help(full=full)}' yield f'module {self._module.__name__}' yield f'target {getattr(self.target, "__name__", self.target)}' yield f'source {self.source_name}' def __repr__(self) -> str: return (f'{self.__class__.__name__}(name={self.name!r}, ' f'target={self.target_name}' f'{f", source={self.source_name!r}" if self.source else ""})') def callable_with_no_args(plugin: Plugin) -> None: """Checks that a plugin is callable without arguments. May be used for the validator argument to Registry. """ try: params = inspect.signature(plugin.target).parameters except TypeError: raise Error('Plugin functions must be callable, but ' f'{plugin.target_name} is a ' f'{type(plugin.target).__name__}') positional = sum(p.default == p.empty for p in params.values()) if positional: raise Error(f'Plugin functions cannot have any required positional ' f'arguments, but {plugin.target_name} has {positional}') class Registry(collections.abc.Mapping): """Manages a set of plugins from Python modules or plugins files.""" def __init__(self, validator: Callable[[Plugin], Any] = lambda _: None) -> None: """Creates a new, empty plugins registry. Args: validator: Function that checks whether a plugin is valid and should be registered. Must raise plugins.Error is the plugin is invalid. """ self._registry: Dict[str, Plugin] = {} self._sources: Set[Path] = set() # Paths to plugins files self._errors: Dict[str, List[Exception]] = collections.defaultdict(list) self._validate_plugin = validator def __getitem__(self, name: str) -> Plugin: """Accesses a plugin by name; raises KeyError if it does not exist.""" if name in self._registry: return self._registry[name] if name in self._errors: raise KeyError(f'Registration for "{name}" failed: ' + ', '.join(str(e) for e in self._errors[name])) raise KeyError(f'The plugin "{name}" has not been registered') def __iter__(self) -> Iterator[str]: return iter(self._registry) def __len__(self) -> int: return len(self._registry) def errors(self) -> Dict[str, List[Exception]]: return self._errors def run_with_argv(self, name: str, argv: Iterable[str]) -> int: """Runs a plugin by name, setting sys.argv to the provided args. This is used to run a command as if it were executed directly from the command line. The plugin is expected to return an int. Raises: KeyError if plugin is not registered. """ return self[name].run_with_argv(argv) def _should_register(self, plugin: Plugin) -> bool: """Determines and logs if a plugin should be registered or not. Some errors are exceptions, others are not. """ if plugin.name in self._registry and plugin.source is None: raise Error( f'Attempted to register built-in plugin "{plugin.name}", but ' 'a plugin with that name was previously registered ' f'({self[plugin.name]})!') # Run the user-provided validation function, which raises exceptions # if there are errors. self._validate_plugin(plugin) existing = self._registry.get(plugin.name) if existing is None: return True if existing.source is None: _LOG.debug('%s: Overriding built-in plugin "%s" with %s', plugin.source_name, plugin.name, plugin.target_name) return True if plugin.source != existing.source: _LOG.debug( '%s: The plugin "%s" was previously registered in %s; ' 'ignoring registration as %s', plugin.source_name, plugin.name, self._registry[plugin.name].source, plugin.target_name) elif plugin.source not in self._sources: _LOG.warning( '%s: "%s" is registered file multiple times in this file! ' 'Only the first registration takes effect', plugin.source_name, plugin.name) return False def register(self, name: str, target: Any) -> Optional[Plugin]: """Registers an object as a plugin.""" return self._register(Plugin(name, target, None)) def register_by_name(self, name: str, module_name: str, member_name: str, source: Path = None) -> Optional[Plugin]: """Registers an object from its module and name as a plugin.""" return self._register( Plugin.from_name(name, module_name, member_name, source)) def _register(self, plugin: Plugin) -> Optional[Plugin]: # Prohibit functions not from a plugins file from overriding others. if not self._should_register(plugin): return None self._registry[plugin.name] = plugin _LOG.debug('%s: Registered plugin "%s" for %s', plugin.source_name, plugin.name, plugin.target_name) return plugin def register_file(self, path: Path) -> None: """Registers plugins from a plugins file. Any exceptions raised from parsing the file are caught and logged. """ with path.open() as contents: for lineno, line in enumerate(contents, 1): line = line.strip() if not line or line.startswith('#'): continue try: name, module, function = line.split() except ValueError as err: self._errors[line.strip()].append(Error(err)) _LOG.error( '%s:%d: Failed to parse plugin entry "%s": ' 'Expected 3 items (name, module, function), ' 'got %d', path, lineno, line, len(line.split())) continue try: self.register_by_name(name, module, function, path) except Error as err: self._errors[name].append(err) _LOG.error('%s: Failed to register plugin "%s": %s', path, name, err) self._sources.add(path) def register_directory(self, directory: Path, file_name: str, restrict_to: Path = None) -> None: """Finds and registers plugins from plugins files in a directory. Args: directory: The directory from which to start searching up. file_name: The name of plugins files to look for. restrict_to: If provided, do not search higher than this directory. """ for path in find_all_in_parents(file_name, directory): if not path.is_file(): continue if restrict_to is not None and restrict_to not in path.parents: _LOG.debug( "Skipping plugins file %s because it's outside of %s", path, restrict_to) continue _LOG.debug('Found plugins file %s', path) self.register_file(path) def short_help(self) -> str: """Returns a help string for the registered plugins.""" width = max(len(name) for name in self._registry) + 1 if self._registry else 1 help_items = '\n'.join( f' {name:{width}} {plugin.help()}' for name, plugin in sorted(self._registry.items())) return f'supported plugins:\n{help_items}' def detailed_help(self, plugins: Iterable[str] = ()) -> Iterator[str]: """Yields lines of detailed information about commands.""" if not plugins: plugins = list(self._registry) yield '\ndetailed plugin information:' wrapper = TextWrapper(width=80, initial_indent=' ', subsequent_indent=' ' * 11) plugins = sorted(plugins) for plugin in plugins: yield f' [{plugin}]' try: for line in self[plugin].details(full=len(plugins) == 1): yield wrapper.fill(line) except KeyError as err: yield wrapper.fill(f'error {str(err)[1:-1]}') yield '' yield 'Plugins files:' if self._sources: yield from (f' [{i}] {file}' for i, file in enumerate(self._sources, 1)) else: yield ' (none found)' def plugin(self, function: Callable = None, *, name: str = None) -> Callable[[Callable], Callable]: """Decorator that registers a function with this plugin registry.""" def decorator(function: Callable) -> Callable: self.register(function.__name__ if name is None else name, function) return function if function is None: return decorator self.register(function.__name__, function) return function def find_in_parents(name: str, path: Path) -> Optional[Path]: """Searches parent directories of the path for a file or directory.""" path = path.resolve() while not path.joinpath(name).exists(): path = path.parent if path.samefile(path.parent): return None return path.joinpath(name) def find_all_in_parents(name: str, path: Path) -> Iterator[Path]: """Searches all parent directories of the path for files or directories.""" while True: result = find_in_parents(name, path) if result is None: return yield result path = result.parent.parent def import_submodules(module: types.ModuleType, recursive: bool = False) -> None: """Imports the submodules of a package. This can be used to collect plugins registered with a decorator from a directory. """ path = module.__path__ # type: ignore[attr-defined] if recursive: modules = pkgutil.walk_packages(path, module.__name__ + '.') else: modules = pkgutil.iter_modules(path, module.__name__ + '.') for info in modules: importlib.import_module(info.name)