# Copyright 2020 QuantumBlack Visual Analytics Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND
# NONINFRINGEMENT. IN NO EVENT WILL THE LICENSOR OR OTHER CONTRIBUTORS
# BE LIABLE FOR ANY CLAIM, DAMAGES, OR OTHER LIABILITY, WHETHER IN AN
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF, OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#
# The QuantumBlack Visual Analytics Limited ("QuantumBlack") name and logo
# (either separately or in combination, "QuantumBlack Trademarks") are
# trademarks of QuantumBlack. The License does not grant you any right or
# license to the QuantumBlack Trademarks. You may not use the QuantumBlack
# Trademarks or any confusingly similar mark as a trademark for your product,
# or use the QuantumBlack Trademarks in any other manner that might cause
# confusion in the marketplace, including but not limited to in advertising,
# on websites, or on software.
#
# See the License for the specific language governing permissions and
# limitations under the License.
"""A module containing specifications for all callable hooks in the Kedro's execution timeline.
For more information about these specifications, please visit
[Pluggy's documentation](https://pluggy.readthedocs.io/en/stable/#specs)
"""
# pylint: disable=too-many-arguments
from typing import Any, Dict
from kedro.io import DataCatalog
from kedro.pipeline import Pipeline
from kedro.pipeline.node import Node
from .markers import hook_spec
class DataCatalogSpecs:
"""Namespace that defines all specifications for a data catalog's lifecycle hooks."""
@hook_spec
def after_catalog_created(
self,
catalog: DataCatalog,
conf_catalog: Dict[str, Any],
conf_creds: Dict[str, Any],
feed_dict: Dict[str, Any],
save_version: str,
load_versions: Dict[str, str],
run_id: str,
) -> None:
"""Hooks to be invoked after a data catalog is created.
It receives the ``catalog`` as well as
all the arguments for ``KedroContext._create_catalog``.
Args:
catalog: The catalog that was created.
conf_catalog: The config from which the catalog was created.
conf_creds: The credentials conf from which the catalog was created.
feed_dict: The feed_dict that was added to the catalog after creation.
save_version: The save_version used in ``save`` operations
for all datasets in the catalog.
load_versions: The load_versions used in ``load`` operations
for each dataset in the catalog.
run_id: The id of the run for which the catalog is loaded.
"""
pass
[docs]class NodeSpecs:
"""Namespace that defines all specifications for a node's lifecycle hooks."""
[docs] @hook_spec
def before_node_run(
self,
node: Node,
catalog: DataCatalog,
inputs: Dict[str, Any],
is_async: bool,
run_id: str,
) -> None:
"""Hook to be invoked before a node runs.
The arguments received are the same as those used by ``kedro.runner.run_node``
Args:
node: The ``Node`` to run.
catalog: A ``DataCatalog`` containing the node's inputs and outputs.
inputs: The dictionary of inputs dataset.
The keys are dataset names and the values are the actual loaded input data,
not the dataset instance.
is_async: Whether the node was run in ``async`` mode.
run_id: The id of the run.
"""
pass
[docs] @hook_spec
def after_node_run( # pylint: disable=too-many-arguments
self,
node: Node,
catalog: DataCatalog,
inputs: Dict[str, Any],
outputs: Dict[str, Any],
is_async: bool,
run_id: str,
) -> None:
"""Hook to be invoked after a node runs.
The arguments received are the same as those used by ``kedro.runner.run_node``
as well as the ``outputs`` of the node run.
Args:
node: The ``Node`` that ran.
catalog: A ``DataCatalog`` containing the node's inputs and outputs.
inputs: The dictionary of inputs dataset.
The keys are dataset names and the values are the actual loaded input data,
not the dataset instance.
outputs: The dictionary of outputs dataset.
The keys are dataset names and the values are the actual computed output data,
not the dataset instance.
is_async: Whether the node was run in ``async`` mode.
run_id: The id of the run.
"""
pass
[docs] @hook_spec
def on_node_error(
self,
error: Exception,
node: Node,
catalog: DataCatalog,
inputs: Dict[str, Any],
is_async: bool,
run_id: str,
):
"""Hook to be invoked if a node run throws an uncaught error.
The signature of this error hook should match the signature of ``before_node_run``
along with the error that was raised.
Args:
error: The uncaught exception thrown during the node run.
node: The ``Node`` to run.
catalog: A ``DataCatalog`` containing the node's inputs and outputs.
inputs: The dictionary of inputs dataset.
The keys are dataset names and the values are the actual loaded input data,
not the dataset instance.
is_async: Whether the node was run in ``async`` mode.
run_id: The id of the run.
"""
pass
[docs]class PipelineSpecs:
"""Namespace that defines all specifications for a pipeline's lifecycle hooks."""
[docs] @hook_spec
def before_pipeline_run(
self, run_params: Dict[str, Any], pipeline: Pipeline, catalog: DataCatalog
) -> None:
"""Hook to be invoked before a pipeline runs.
Args:
run_params: The params used to run the pipeline.
Should be identical to the data logged by Journal with the following schema::
{
"run_id": str
"project_path": str,
"env": str,
"kedro_version": str,
"tags": Optional[List[str]],
"from_nodes": Optional[List[str]],
"to_nodes": Optional[List[str]],
"node_names": Optional[List[str]],
"from_inputs": Optional[List[str]],
"load_versions": Optional[List[str]],
"pipeline_name": str,
"extra_params": Optional[Dict[str, Any]]
}
pipeline: The ``Pipeline`` that will be run.
catalog: The ``DataCatalog`` to be used during the run.
"""
pass
[docs] @hook_spec
def after_pipeline_run(
self, run_params: Dict[str, Any], pipeline: Pipeline, catalog: DataCatalog
) -> None:
"""Hook to be invoked after a pipeline runs.
Args:
run_params: The params used to run the pipeline.
Should be identical to the data logged by Journal with the following schema::
{
"run_id": str
"project_path": str,
"env": str,
"kedro_version": str,
"tags": Optional[List[str]],
"from_nodes": Optional[List[str]],
"to_nodes": Optional[List[str]],
"node_names": Optional[List[str]],
"from_inputs": Optional[List[str]],
"load_versions": Optional[List[str]],
"pipeline_name": str,
"extra_params": Optional[Dict[str, Any]]
}
pipeline: The ``Pipeline`` that was run.
catalog: The ``DataCatalog`` used during the run.
"""
pass
[docs] @hook_spec
def on_pipeline_error(
self,
error: Exception,
run_params: Dict[str, Any],
pipeline: Pipeline,
catalog: DataCatalog,
):
"""Hook to be invoked if a pipeline run throws an uncaught Exception.
The signature of this error hook should match the signature of ``before_pipeline_run``
along with the error that was raised.
Args:
error: The uncaught exception thrown during the pipeline run.
run_params: The params used to run the pipeline.
Should be identical to the data logged by Journal with the following schema::
{
"run_id": str
"project_path": str,
"env": str,
"kedro_version": str,
"tags": Optional[List[str]],
"from_nodes": Optional[List[str]],
"to_nodes": Optional[List[str]],
"node_names": Optional[List[str]],
"from_inputs": Optional[List[str]],
"load_versions": Optional[List[str]],
"pipeline_name": str,
"extra_params": Optional[Dict[str, Any]]
}
pipeline: The ``Pipeline`` that will was run.
catalog: The ``DataCatalog`` used during the run.
"""
pass