Source code for kedro.framework.hooks.manager

"""This module provides an utility function to retrieve the global hook_manager singleton
in a Kedro's execution process.
"""
# pylint: disable=global-statement,invalid-name
import logging
from typing import Any, Iterable

from pluggy import PluginManager

from .markers import HOOK_NAMESPACE
from .specs import (
    DataCatalogSpecs,
    DatasetSpecs,
    NodeSpecs,
    PipelineSpecs,
    RegistrationSpecs,
)

_hook_manager = None

_PLUGIN_HOOKS = "kedro.hooks"  # entry-point to load hooks from for installed plugins

logger = logging.getLogger(__name__)


def _create_hook_manager() -> PluginManager:
    """Create a new PluginManager instance and register Kedro's hook specs."""
    manager = PluginManager(HOOK_NAMESPACE)
    manager.add_hookspecs(NodeSpecs)
    manager.add_hookspecs(PipelineSpecs)
    manager.add_hookspecs(DataCatalogSpecs)
    manager.add_hookspecs(RegistrationSpecs)
    manager.add_hookspecs(DatasetSpecs)
    return manager


[docs]def get_hook_manager(): """Create or return the global _hook_manager singleton instance.""" global _hook_manager if _hook_manager is None: _hook_manager = _create_hook_manager() return _hook_manager
def _register_hooks(hook_manager: PluginManager, hooks: Iterable[Any]) -> None: """Register all hooks as specified in ``hooks`` with the global ``hook_manager``. Args: hook_manager: Hook manager instance to register the hooks with. hooks: Hooks that need to be registered. """ for hooks_collection in hooks: # Sometimes users might call hook registration more than once, in which # case hooks have already been registered, so we perform a simple check # here to avoid an error being raised and break user's workflow. if not hook_manager.is_registered(hooks_collection): hook_manager.register(hooks_collection) def _register_hooks_setuptools( hook_manager: PluginManager, disabled_plugins: Iterable[str] ) -> None: """Register pluggy hooks from setuptools entrypoints. Args: hook_manager: Hook manager instance to register the hooks with. disabled_plugins: An iterable returning the names of plugins which hooks must not be registered; any already registered hooks will be unregistered. """ already_registered = hook_manager.get_plugins() hook_manager.load_setuptools_entrypoints(_PLUGIN_HOOKS) disabled_plugins = set(disabled_plugins) # Get list of plugin/distinfo tuples for all setuptools registered plugins. plugininfo = hook_manager.list_plugin_distinfo() plugin_names = set() disabled_plugin_names = set() for plugin, dist in plugininfo: if dist.project_name in disabled_plugins: # `unregister()` is used instead of `set_blocked()` because # we want to disable hooks for specific plugin based on project # name and not `entry_point` name. Also, we log project names with # version for which hooks were registered. hook_manager.unregister(plugin=plugin) disabled_plugin_names.add(f"{dist.project_name}-{dist.version}") elif plugin not in already_registered: plugin_names.add(f"{dist.project_name}-{dist.version}") if disabled_plugin_names: logger.info( "Hooks are disabled for plugin(s): %s", ", ".join(sorted(disabled_plugin_names)), ) if plugin_names: logger.info( "Registered hooks from %d installed plugin(s): %s", len(plugin_names), ", ".join(sorted(plugin_names)), )