# pylint: disable=too-many-lines
"""A collection of CLI commands for working with Kedro pipelines."""
import json
import re
import shutil
import sys
import tempfile
from importlib import import_module
from pathlib import Path
from textwrap import indent
from typing import Any, Iterable, List, NamedTuple, Optional, Set, Tuple, Union
from zipfile import ZipFile
import click
import pkg_resources
import yaml
from rope.base.project import Project
from rope.contrib import generate
from rope.refactor.move import MoveModule
from rope.refactor.rename import Rename
from setuptools.dist import Distribution
import kedro
from kedro.framework.cli.utils import (
KedroCliError,
_clean_pycache,
_filter_deprecation_warnings,
_get_requirements_in,
call,
command_with_verbosity,
env_option,
python_call,
)
from kedro.framework.project import pipelines, settings
from kedro.framework.startup import ProjectMetadata
_SETUP_PY_TEMPLATE = """# -*- coding: utf-8 -*-
from setuptools import setup, find_packages
setup(
name="{name}",
version="{version}",
description="Modular pipeline `{name}`",
packages=find_packages(),
include_package_data=True,
package_data={package_data},
install_requires={install_requires},
)
"""
[docs]class PipelineArtifacts(NamedTuple):
"""An ordered collection of source_path, tests_path, config_paths"""
pipeline_dir: Path
pipeline_tests: Path
pipeline_conf: Path
def _assert_pkg_name_ok(pkg_name: str):
"""Check that python package name is in line with PEP8 requirements.
Args:
pkg_name: Candidate Python package name.
Raises:
KedroCliError: If package name violates the requirements.
"""
base_message = f"`{pkg_name}` is not a valid Python package name."
if not re.match(r"^[a-zA-Z_]", pkg_name):
message = base_message + " It must start with a letter or underscore."
raise KedroCliError(message)
if len(pkg_name) < 2:
message = base_message + " It must be at least 2 characters long."
raise KedroCliError(message)
if not re.match(r"^\w+$", pkg_name[1:]):
message = (
base_message + " It must contain only letters, digits, and/or underscores."
)
raise KedroCliError(message)
def _check_pipeline_name(ctx, param, value): # pylint: disable=unused-argument
if value:
_assert_pkg_name_ok(value)
return value
def _check_module_path(ctx, param, value): # pylint: disable=unused-argument
if value and not re.match(r"^[\w.]+$", value):
message = "The pipeline location you provided is not a valid Python module path"
raise KedroCliError(message)
return value
# pylint: disable=missing-function-docstring
@click.group(name="Kedro")
def pipeline_cli(): # pragma: no cover
pass
@pipeline_cli.group()
def pipeline():
"""Commands for working with pipelines."""
@command_with_verbosity(pipeline, "create")
@click.argument("name", nargs=1, callback=_check_pipeline_name)
@click.option(
"--skip-config",
is_flag=True,
help="Skip creation of config files for the new pipeline(s).",
)
@env_option(help="Environment to create pipeline configuration in. Defaults to `base`.")
@click.pass_obj # this will pass the metadata as first argument
def create_pipeline(
metadata: ProjectMetadata, name, skip_config, env, **kwargs
): # pylint: disable=unused-argument
"""Create a new modular pipeline by providing a name."""
package_dir = metadata.source_dir / metadata.package_name
conf_root = settings.CONF_ROOT
project_conf_path = metadata.project_path / conf_root
env = env or "base"
if not skip_config and not (project_conf_path / env).exists():
raise KedroCliError(
f"Unable to locate environment `{env}`. "
f"Make sure it exists in the project configuration."
)
result_path = _create_pipeline(name, package_dir / "pipelines")
_copy_pipeline_tests(name, result_path, package_dir)
_copy_pipeline_configs(result_path, project_conf_path, skip_config, env=env)
click.secho(f"\nPipeline `{name}` was successfully created.\n", fg="green")
click.secho(
f"To be able to run the pipeline `{name}`, you will need to add it "
f"to `register_pipelines()` in `{package_dir / 'pipeline_registry.py'}`.",
fg="yellow",
)
@command_with_verbosity(pipeline, "delete")
@click.argument("name", nargs=1, callback=_check_pipeline_name)
@env_option(
help="Environment to delete pipeline configuration from. Defaults to `base`."
)
@click.option(
"-y", "--yes", is_flag=True, help="Confirm deletion of pipeline non-interactively."
)
@click.pass_obj # this will pass the metadata as first argument
def delete_pipeline(
metadata: ProjectMetadata, name, env, yes, **kwargs
): # pylint: disable=unused-argument
"""Delete a modular pipeline by providing a name."""
package_dir = metadata.source_dir / metadata.package_name
conf_root = settings.CONF_ROOT
project_conf_path = metadata.project_path / conf_root
env = env or "base"
if not (project_conf_path / env).exists():
raise KedroCliError(
f"Unable to locate environment `{env}`. "
f"Make sure it exists in the project configuration."
)
pipeline_artifacts = _get_pipeline_artifacts(metadata, pipeline_name=name, env=env)
files_to_delete = [
pipeline_artifacts.pipeline_conf / confdir / f"{name}.yml"
for confdir in ("parameters", "catalog")
if (pipeline_artifacts.pipeline_conf / confdir / f"{name}.yml").is_file()
]
dirs_to_delete = [
path
for path in (pipeline_artifacts.pipeline_dir, pipeline_artifacts.pipeline_tests)
if path.is_dir()
]
if not files_to_delete and not dirs_to_delete:
raise KedroCliError(f"Pipeline `{name}` not found.")
if not yes:
_echo_deletion_warning(
"The following paths will be removed:",
directories=dirs_to_delete,
files=files_to_delete,
)
click.echo()
yes = click.confirm(f"Are you sure you want to delete pipeline `{name}`?")
click.echo()
if not yes:
raise KedroCliError("Deletion aborted!")
_delete_artifacts(*files_to_delete, *dirs_to_delete)
click.secho(f"\nPipeline `{name}` was successfully deleted.", fg="green")
click.secho(
f"\nIf you added the pipeline `{name}` to `register_pipelines()` in "
f"`{package_dir / 'pipeline_registry.py'}`, you will need to remove it.",
fg="yellow",
)
@pipeline.command("list")
def list_pipelines():
"""List all pipelines defined in your pipeline_registry.py file. (DEPRECATED)"""
deprecation_message = (
"DeprecationWarning: Command `kedro pipeline list` is deprecated. "
"Please use `kedro registry list` instead."
)
click.secho(deprecation_message, fg="red")
click.echo(yaml.dump(sorted(pipelines)))
@command_with_verbosity(pipeline, "describe")
@click.argument("name", nargs=1, default="__default__")
@click.pass_obj
def describe_pipeline(
metadata: ProjectMetadata, name, **kwargs
): # pylint: disable=unused-argument, protected-access
"""Describe a pipeline by providing a pipeline name.
Defaults to the __default__ pipeline. (DEPRECATED)
"""
deprecation_message = (
"DeprecationWarning: Command `kedro pipeline describe` is deprecated. "
"Please use `kedro registry describe` instead."
)
click.secho(deprecation_message, fg="red")
pipeline_obj = pipelines.get(name)
if not pipeline_obj:
all_pipeline_names = pipelines.keys()
existing_pipelines = ", ".join(sorted(all_pipeline_names))
raise KedroCliError(
f"`{name}` pipeline not found. Existing pipelines: [{existing_pipelines}]"
)
nodes = []
for node in pipeline_obj.nodes:
namespace = f"{node.namespace}." if node.namespace else ""
nodes.append(f"{namespace}{node._name or node._func_name} ({node._func_name})")
result = {"Nodes": nodes}
click.echo(yaml.dump(result))
@command_with_verbosity(pipeline, "pull")
@click.argument("package_path", nargs=1, required=False)
@click.option(
"--all",
"-a",
"all_flag",
is_flag=True,
help="Pull and unpack all pipelines in the `pyproject.toml` package manifest section.",
)
@env_option(
help="Environment to install the pipeline configuration to. Defaults to `base`."
)
@click.option(
"--alias",
type=str,
default="",
callback=_check_pipeline_name,
help="Alternative name to unpackage under.",
)
@click.option(
"--fs-args",
type=click.Path(
exists=True, file_okay=True, dir_okay=False, readable=True, resolve_path=True
),
default=None,
help="Location of a configuration file for the fsspec filesystem used to pull the package.",
)
@click.pass_obj # this will pass the metadata as first argument
def pull_package( # pylint:disable=unused-argument, too-many-arguments
metadata: ProjectMetadata, package_path, env, alias, fs_args, all_flag, **kwargs
) -> None:
"""Pull and unpack a modular pipeline in your project."""
deprecation_message = (
"DeprecationWarning: Command `kedro pipeline pull` will be deprecated in Kedro 0.18.0. "
"In future please use `kedro micropkg pull` instead."
)
click.secho(deprecation_message, fg="red")
if not package_path and not all_flag:
click.secho(
"Please specify a package path or add '--all' to pull all pipelines in the "
"`pyproject.toml` package manifest section."
)
sys.exit(1)
if all_flag:
_pull_packages_from_manifest(metadata)
return
_pull_package(package_path, metadata, env=env, alias=alias, fs_args=fs_args)
as_alias = f" as `{alias}`" if alias else ""
message = f"Pipeline {package_path} pulled and unpacked{as_alias}!"
click.secho(message, fg="green")
def _pull_package(
package_path: str,
metadata: ProjectMetadata,
env: str = None,
alias: str = None,
fs_args: str = None,
):
with tempfile.TemporaryDirectory() as temp_dir:
temp_dir_path = Path(temp_dir).resolve()
_unpack_wheel(package_path, temp_dir_path, fs_args)
dist_info_file = list(temp_dir_path.glob("*.dist-info"))
if len(dist_info_file) != 1:
raise KedroCliError(
f"More than 1 or no dist-info files found from {package_path}. "
f"There has to be exactly one dist-info directory."
)
# Extract package name, based on the naming convention for wheel files
# https://www.python.org/dev/peps/pep-0427/#file-name-convention
package_name = dist_info_file[0].stem.split("-")[0]
package_metadata = dist_info_file[0] / "METADATA"
req_pattern = r"Requires-Dist: (.*?)\n"
package_reqs = re.findall(req_pattern, package_metadata.read_text())
if package_reqs:
requirements_in = _get_requirements_in(
metadata.source_dir, create_empty=True
)
_append_package_reqs(requirements_in, package_reqs, package_name)
_clean_pycache(temp_dir_path)
_install_files(metadata, package_name, temp_dir_path, env, alias)
def _pull_packages_from_manifest(metadata: ProjectMetadata) -> None:
# pylint: disable=import-outside-toplevel
import anyconfig # for performance reasons
config_dict = anyconfig.load(metadata.config_file)
config_dict = config_dict["tool"]["kedro"]
build_specs = config_dict.get("pipeline", {}).get("pull")
if not build_specs:
click.secho(
"Nothing to pull. Please update the `pyproject.toml` package manifest section.",
fg="yellow",
)
return
for package_path, specs in build_specs.items():
if "alias" in specs:
_assert_pkg_name_ok(specs["alias"])
_pull_package(package_path, metadata, **specs)
click.secho(f"Pulled and unpacked `{package_path}`!")
click.secho("Pipelines pulled and unpacked!", fg="green")
def _package_pipelines_from_manifest(metadata: ProjectMetadata) -> None:
# pylint: disable=import-outside-toplevel
import anyconfig # for performance reasons
config_dict = anyconfig.load(metadata.config_file)
config_dict = config_dict["tool"]["kedro"]
build_specs = config_dict.get("pipeline", {}).get("package")
if not build_specs:
click.secho(
"Nothing to package. Please update the `pyproject.toml` package manifest section.",
fg="yellow",
)
return
for pipeline_name, specs in build_specs.items():
if "alias" in specs:
_assert_pkg_name_ok(specs["alias"])
_package_pipeline(pipeline_name, metadata, **specs)
click.secho(f"Packaged `{pipeline_name}` pipeline!")
click.secho("Pipelines packaged!", fg="green")
@pipeline.command("package")
@env_option(
help="Environment where the pipeline configuration lives. Defaults to `base`."
)
@click.option(
"--alias",
type=str,
default="",
callback=_check_pipeline_name,
help="Alternative name to package under.",
)
@click.option(
"-d",
"--destination",
type=click.Path(resolve_path=True, file_okay=False),
help="Location where to create the wheel file. Defaults to `src/dist`.",
)
@click.option(
"-v",
"--version",
type=str,
help="Version to package under. "
"Defaults to pipeline package version or, "
"if that is not defined, the project package version.",
)
@click.option(
"--all",
"-a",
"all_flag",
is_flag=True,
help="Package all pipelines in the `pyproject.toml` package manifest section.",
)
@click.argument("name", nargs=1, required=False, callback=_check_module_path)
@click.pass_obj # this will pass the metadata as first argument
def package_pipeline(
metadata: ProjectMetadata, name, env, alias, destination, version, all_flag
): # pylint: disable=too-many-arguments
"""Package up a modular pipeline as a Python .whl."""
deprecation_message = (
"DeprecationWarning: Command `kedro pipeline package` will be deprecated in Kedro 0.18.0. "
"In future please use `kedro micropkg package` instead."
)
click.secho(deprecation_message, fg="red")
if not name and not all_flag:
click.secho(
"Please specify a pipeline name or add '--all' to package all pipelines in "
"the `pyproject.toml` package manifest section."
)
sys.exit(1)
if all_flag:
_package_pipelines_from_manifest(metadata)
return
result_path = _package_pipeline(
name, metadata, alias=alias, destination=destination, env=env, version=version
)
as_alias = f" as `{alias}`" if alias else ""
message = f"Pipeline `{name}` packaged{as_alias}! Location: {result_path}"
click.secho(message, fg="green")
def _echo_deletion_warning(message: str, **paths: List[Path]):
paths = {key: values for key, values in paths.items() if values}
if paths:
click.secho(message, bold=True)
for key, values in paths.items():
click.echo(f"\n{key.capitalize()}:")
paths_str = "\n".join(str(value) for value in values)
click.echo(indent(paths_str, " " * 2))
def _get_fsspec_filesystem(location: str, fs_args: Optional[str]):
# pylint: disable=import-outside-toplevel
import anyconfig
import fsspec
from kedro.io.core import get_protocol_and_path
protocol, _ = get_protocol_and_path(location)
fs_args_config = anyconfig.load(fs_args) if fs_args else {}
try:
return fsspec.filesystem(protocol, **fs_args_config)
except Exception as exc: # pylint: disable=broad-except
# Specified protocol is not supported by `fsspec`
# or requires extra dependencies
click.secho(str(exc), fg="red")
click.secho("Trying to use 'pip download'...", fg="red")
return None
def _unpack_wheel(location: str, destination: Path, fs_args: Optional[str]) -> None:
filesystem = _get_fsspec_filesystem(location, fs_args)
if location.endswith(".whl") and filesystem and filesystem.exists(location):
with filesystem.open(location) as fs_file:
# pylint: disable=consider-using-with
ZipFile(fs_file).extractall(destination)
else:
python_call(
"pip", ["download", "--no-deps", "--dest", str(destination), location]
)
wheel_file = list(destination.glob("*.whl"))
# `--no-deps` should fetch only one wheel file, and CLI should fail if that's
# not the case.
if len(wheel_file) != 1:
file_names = [wf.name for wf in wheel_file]
raise KedroCliError(
f"More than 1 or no wheel files found: {file_names}. "
f"There has to be exactly one distribution file."
)
# pylint: disable=consider-using-with
ZipFile(wheel_file[0]).extractall(destination)
def _rename_files(conf_source: Path, old_name: str, new_name: str):
config_files_to_rename = (
each
for each in conf_source.rglob("*")
if each.is_file() and old_name in each.name
)
for config_file in config_files_to_rename:
new_config_name = config_file.name.replace(old_name, new_name)
config_file.rename(config_file.parent / new_config_name)
def _refactor_code_for_unpacking(
project: Project,
package_path: Path,
tests_path: Path,
alias: Optional[str],
project_metadata: ProjectMetadata,
) -> Tuple[Path, Path]:
"""This is the reverse operation of `_refactor_code_for_package`, i.e
we go from:
<temp_dir> # also the root of the Rope project
|__ <pipeline_name> # or <alias>
|__ __init__.py
|__ tests # only tests for <pipeline_name>
|__ __init__.py
|__ test_pipeline.py
to:
<temp_dir> # also the root of the Rope project
|__ <package>
|__ __init__.py
|__ pipelines
|__ __init__.py
|__ <pipeline_name>
|__ __init__.py
|__ tests
|__ __init__.py
|__ pipelines
|__ __init__.py
|__ <pipeline_name>
|__ __init__.py
"""
def _move_package_with_conflicting_name(
target: Path, original_name: str, desired_name: str = None
) -> Path:
_rename_package(project, original_name, "tmp_name")
full_path = _create_nested_package(project, target)
_move_package(project, "tmp_name", target.as_posix())
desired_name = desired_name or original_name
_rename_package(project, (target / "tmp_name").as_posix(), desired_name)
return full_path
pipeline_name = package_path.stem
if alias:
_rename_package(project, pipeline_name, alias)
pipeline_name = alias
package_target = Path(project_metadata.package_name) / "pipelines"
if pipeline_name == project_metadata.package_name:
full_path = _move_package_with_conflicting_name(package_target, pipeline_name)
else:
full_path = _create_nested_package(project, package_target)
_move_package(project, pipeline_name, package_target.as_posix())
refactored_package_path = full_path / pipeline_name
if not tests_path.exists():
return refactored_package_path, tests_path
# we can't rename the tests package to <pipeline_name>
# because it will conflict with existing top-level package;
# hence we give it a temp name, create the expected
# nested folder structure, move the contents there,
# then rename the temp name to <pipeline_name>.
tests_target = Path("tests") / "pipelines"
full_path = _move_package_with_conflicting_name(
tests_target, original_name="tests", desired_name=pipeline_name
)
refactored_tests_path = full_path / pipeline_name
return refactored_package_path, refactored_tests_path
def _install_files(
project_metadata: ProjectMetadata,
package_name: str,
source_path: Path,
env: str = None,
alias: str = None,
):
env = env or "base"
package_source, test_source, conf_source = _get_package_artifacts(
source_path, package_name
)
if conf_source.is_dir() and alias:
_rename_files(conf_source, package_name, alias)
pipeline_name = alias or package_name
package_dest, test_dest, conf_dest = _get_pipeline_artifacts(
project_metadata, pipeline_name=pipeline_name, env=env
)
if conf_source.is_dir():
_sync_dirs(conf_source, conf_dest)
# `config` dir was packaged under `package_name` directory with
# `kedro pipeline package`. Since `config` was already synced,
# we don't want to copy it again when syncing the package, so we remove it.
shutil.rmtree(str(conf_source))
project = Project(source_path)
refactored_package_source, refactored_test_source = _refactor_code_for_unpacking(
project, package_source, test_source, alias, project_metadata
)
project.close()
if refactored_test_source.is_dir():
_sync_dirs(refactored_test_source, test_dest)
# Sync everything under package directory, except `config`
# since it has already been copied.
if refactored_package_source.is_dir():
_sync_dirs(refactored_package_source, package_dest)
def _find_config_files(
source_config_dir: Path, glob_patterns: List[str]
) -> List[Tuple[Path, str]]:
config_files = [] # type: List[Tuple[Path, str]]
if source_config_dir.is_dir():
config_files = [
(path, path.parent.relative_to(source_config_dir).as_posix())
for glob_pattern in glob_patterns
for path in source_config_dir.glob(glob_pattern)
if path.is_file()
]
return config_files
def _get_default_version(metadata: ProjectMetadata, pipeline_name: str) -> str:
# default to pipeline package version
try:
pipeline_module = import_module(
f"{metadata.package_name}.pipelines.{pipeline_name}"
)
return pipeline_module.__version__ # type: ignore
except (AttributeError, ModuleNotFoundError):
# if pipeline version doesn't exist, take the project one
project_module = import_module(f"{metadata.package_name}")
return project_module.__version__ # type: ignore
def _package_pipeline( # pylint: disable=too-many-arguments
pipeline_name: str,
metadata: ProjectMetadata,
alias: str = None,
destination: str = None,
env: str = None,
version: str = None,
) -> Path:
package_dir = metadata.source_dir / metadata.package_name
env = env or "base"
artifacts_to_package = _get_pipeline_artifacts(
metadata, pipeline_name=pipeline_name, env=env
)
# as the wheel file will only contain parameters, we aren't listing other
# config files not to confuse users and avoid useless file copies
configs_to_package = _find_config_files(
artifacts_to_package.pipeline_conf,
[f"parameters*/**/{pipeline_name}.yml", f"parameters*/**/{pipeline_name}/**/*"],
)
source_paths = (
artifacts_to_package.pipeline_dir,
artifacts_to_package.pipeline_tests,
configs_to_package,
)
# Check that pipeline directory exists and not empty
_validate_dir(artifacts_to_package.pipeline_dir)
destination = Path(destination) if destination else package_dir.parent / "dist"
version = version or _get_default_version(metadata, pipeline_name)
_generate_wheel_file(
pipeline_name=pipeline_name,
destination=destination.resolve(),
source_paths=source_paths,
version=version,
metadata=metadata,
alias=alias,
)
_clean_pycache(package_dir)
_clean_pycache(metadata.project_path)
return destination
def _validate_dir(path: Path) -> None:
if not path.is_dir():
raise KedroCliError(f"Directory '{path}' doesn't exist.")
if not list(path.iterdir()):
raise KedroCliError(f"'{path}' is an empty directory.")
def _get_wheel_name(**kwargs: Any) -> str:
# https://stackoverflow.com/q/51939257/3364156
dist = Distribution(attrs=kwargs)
bdist_wheel_cmd = dist.get_command_obj("bdist_wheel")
bdist_wheel_cmd.ensure_finalized()
distname = bdist_wheel_cmd.wheel_dist_name
tag = "-".join(bdist_wheel_cmd.get_tag())
return f"{distname}-{tag}.whl"
def _sync_path_list(source: List[Tuple[Path, str]], target: Path) -> None:
for source_path, suffix in source:
target_with_suffix = (target / suffix).resolve()
_sync_dirs(source_path, target_with_suffix)
def _make_install_requires(requirements_txt: Path) -> List[str]:
"""Parses each line of requirements.txt into a version specifier valid to put in
install_requires."""
if not requirements_txt.exists():
return []
requirements = pkg_resources.parse_requirements(requirements_txt.read_text())
return [str(requirement) for requirement in requirements]
def _create_nested_package(project: Project, package_path: Path) -> Path:
# fails if parts of the path exists already
packages = package_path.parts
parent = generate.create_package(project, packages[0])
nested_path = Path(project.address) / packages[0]
for package in packages[1:]:
parent = generate.create_package(project, package, sourcefolder=parent)
nested_path = nested_path / package
return nested_path
def _move_package(project: Project, source: str, target: str) -> None:
"""
Move a Python package, refactoring relevant imports along the way.
A target of empty string means moving to the root of the `project`.
Args:
project: rope.base.Project holding the scope of the refactoring.
source: Name of the Python package to be moved. Can be a fully
qualified module path relative to the `project` root, e.g.
"package.pipelines.pipeline" or "package/pipelines/pipeline".
target: Destination of the Python package to be moved. Can be a fully
qualified module path relative to the `project` root, e.g.
"package.pipelines.pipeline" or "package/pipelines/pipeline".
"""
src_folder = project.get_module(source).get_resource()
target_folder = project.get_module(target).get_resource()
change = MoveModule(project, src_folder).get_changes(dest=target_folder)
project.do(change)
def _rename_package(project: Project, old_name: str, new_name: str) -> None:
"""
Rename a Python package, refactoring relevant imports along the way,
as well as references in comments.
Args:
project: rope.base.Project holding the scope of the refactoring.
old_name: Old module name. Can be a fully qualified module path,
e.g. "package.pipelines.pipeline" or "package/pipelines/pipeline",
relative to the `project` root.
new_name: New module name. Can't be a fully qualified module path.
"""
folder = project.get_folder(old_name)
change = Rename(project, folder).get_changes(new_name, docs=True)
project.do(change)
def _refactor_code_for_package(
project: Project,
package_path: Path,
tests_path: Path,
alias: Optional[str],
project_metadata: ProjectMetadata,
) -> None:
"""In order to refactor the imports properly, we need to recreate
the same nested structure as in the project. Therefore, we create:
<temp_dir> # also the root of the Rope project
|__ <package>
|__ __init__.py
|__ pipelines
|__ __init__.py
|__ <pipeline_name>
|__ __init__.py
|__ tests
|__ __init__.py
|__ pipelines
|__ __init__.py
|__ <pipeline_name>
|__ __init__.py
We then move <pipeline_name> outside of package src to top level ("")
in temp_dir, and rename folder & imports if alias provided.
For tests, we need to extract all the contents of <pipeline_name>
at into top-level `tests` folder. This is not possible in one go with
the Rope API, so we have to do it in a bit of a hacky way.
We rename <pipeline_name> to a `tmp_name` and move it at top-level ("")
in temp_dir. We remove the old `tests` folder and rename `tmp_name` to `tests`.
The final structure should be:
<temp_dir> # also the root of the Rope project
|__ <pipeline_name> # or <alias>
|__ __init__.py
|__ tests # only tests for <pipeline_name>
|__ __init__.py
|__ test_pipeline.py
"""
def _move_package_with_conflicting_name(target: Path, conflicting_name: str):
tmp_name = "tmp_name"
tmp_module = target.parent / tmp_name
_rename_package(project, target.as_posix(), tmp_name)
_move_package(project, tmp_module.as_posix(), "")
shutil.rmtree(Path(project.address) / conflicting_name)
_rename_package(project, tmp_name, conflicting_name)
# Copy source in appropriate folder structure
package_target = package_path.relative_to(project_metadata.source_dir)
full_path = _create_nested_package(project, package_target)
# overwrite=True to update the __init__.py files generated by create_package
_sync_dirs(package_path, full_path, overwrite=True)
# Copy tests in appropriate folder structure
if tests_path.exists():
tests_target = tests_path.relative_to(project_metadata.source_dir)
full_path = _create_nested_package(project, tests_target)
# overwrite=True to update the __init__.py files generated by create_package
_sync_dirs(tests_path, full_path, overwrite=True)
# Refactor imports in src/package_name/pipelines/pipeline_name
# and imports of `pipeline_name` in tests.
pipeline_name = package_target.stem
if pipeline_name == project_metadata.package_name:
_move_package_with_conflicting_name(package_target, pipeline_name)
else:
_move_package(project, package_target.as_posix(), "")
shutil.rmtree(Path(project.address) / project_metadata.package_name)
if alias:
_rename_package(project, pipeline_name, alias)
if tests_path.exists():
# we can't move the relevant tests folder as is because
# it will conflict with the top-level package <pipeline_name>;
# we can't rename it "tests" and move it, because it will conflict
# with the existing "tests" folder at top level;
# hence we give it a temp name, move it, delete tests/ and
# rename the temp name to tests.
_move_package_with_conflicting_name(tests_target, "tests")
_SourcePathType = Union[Path, List[Tuple[Path, str]]]
# pylint: disable=too-many-arguments,too-many-locals
def _generate_wheel_file(
pipeline_name: str,
destination: Path,
source_paths: Tuple[_SourcePathType, ...],
version: str,
metadata: ProjectMetadata,
alias: str = None,
) -> None:
package_name = alias or pipeline_name
package_source, tests_source, conf_source = source_paths
with tempfile.TemporaryDirectory() as temp_dir:
temp_dir_path = Path(temp_dir).resolve()
project = Project(temp_dir_path) # project where to do refactoring
_refactor_code_for_package( # type: ignore
project, package_source, tests_source, alias, metadata
)
project.close()
# Copy & "refactor" config
_, _, conf_target = _get_package_artifacts(temp_dir_path, package_name)
_sync_path_list(conf_source, conf_target) # type: ignore
if conf_target.is_dir() and alias:
_rename_files(conf_target, pipeline_name, alias)
# Build a setup.py on the fly
try:
install_requires = _make_install_requires(
package_source / "requirements.txt" # type: ignore
)
except Exception as exc:
click.secho("FAILED", fg="red")
cls = exc.__class__
raise KedroCliError(f"{cls.__module__}.{cls.__qualname__}: {exc}") from exc
config_files = [str(file) for file in conf_target.rglob("*") if file.is_file()]
setup_file = _generate_setup_file(
package_name, version, install_requires, temp_dir_path, config_files
)
package_file = destination / _get_wheel_name(name=package_name, version=version)
if package_file.is_file():
click.secho(
f"Package file {package_file} will be overwritten!", fg="yellow"
)
# python setup.py bdist_wheel --dist-dir <destination>
call(
[
sys.executable,
str(setup_file.resolve()),
"bdist_wheel",
"--dist-dir",
str(destination),
],
cwd=temp_dir,
)
def _generate_setup_file(
package_name: str,
version: str,
install_requires: List[str],
output_dir: Path,
config_files: List[str],
) -> Path:
setup_file = output_dir / "setup.py"
package_data = {package_name: ["README.md"] + config_files}
setup_file_context = dict(
name=package_name,
version=version,
package_data=json.dumps(package_data),
install_requires=install_requires,
)
setup_file.write_text(_SETUP_PY_TEMPLATE.format(**setup_file_context))
return setup_file
def _create_pipeline(name: str, output_dir: Path) -> Path:
with _filter_deprecation_warnings():
# pylint: disable=import-outside-toplevel
from cookiecutter.main import cookiecutter
template_path = Path(kedro.__file__).parent / "templates" / "pipeline"
cookie_context = {"pipeline_name": name, "kedro_version": kedro.__version__}
click.echo(f"Creating the pipeline `{name}`: ", nl=False)
try:
result_path = cookiecutter(
str(template_path),
output_dir=str(output_dir),
no_input=True,
extra_context=cookie_context,
)
except Exception as exc:
click.secho("FAILED", fg="red")
cls = exc.__class__
raise KedroCliError(f"{cls.__module__}.{cls.__qualname__}: {exc}") from exc
click.secho("OK", fg="green")
result_path = Path(result_path)
message = indent(f"Location: `{result_path.resolve()}`", " " * 2)
click.secho(message, bold=True)
_clean_pycache(result_path)
return result_path
# pylint: disable=missing-raises-doc
def _sync_dirs(source: Path, target: Path, prefix: str = "", overwrite: bool = False):
"""Recursively copies `source` directory (or file) into `target` directory without
overwriting any existing files/directories in the target using the following
rules:
1) Skip any files/directories which names match with files in target,
unless overwrite=True.
2) Copy all files from source to target.
3) Recursively copy all directories from source to target.
Args:
source: A local directory to copy from, must exist.
target: A local directory to copy to, will be created if doesn't exist yet.
prefix: Prefix for CLI message indentation.
"""
existing = list(target.iterdir()) if target.is_dir() else []
existing_files = {f.name for f in existing if f.is_file()}
existing_folders = {f.name for f in existing if f.is_dir()}
if source.is_dir():
content = list(source.iterdir())
elif source.is_file():
content = [source]
else:
# nothing to copy
content = [] # pragma: no cover
for source_path in content:
source_name = source_path.name
target_path = target / source_name
click.echo(indent(f"Creating `{target_path}`: ", prefix), nl=False)
if ( # rule #1
not overwrite
and source_name in existing_files
or source_path.is_file()
and source_name in existing_folders
):
click.secho("SKIPPED (already exists)", fg="yellow")
elif source_path.is_file(): # rule #2
try:
target.mkdir(exist_ok=True, parents=True)
shutil.copyfile(str(source_path), str(target_path))
except Exception:
click.secho("FAILED", fg="red")
raise
click.secho("OK", fg="green")
else: # source_path is a directory, rule #3
click.echo()
new_prefix = (prefix or "") + " " * 2
_sync_dirs(source_path, target_path, prefix=new_prefix)
def _get_pipeline_artifacts(
project_metadata: ProjectMetadata, pipeline_name: str, env: str
) -> PipelineArtifacts:
"""From existing project, returns in order: source_path, tests_path, config_paths"""
package_dir = project_metadata.source_dir / project_metadata.package_name
conf_root = settings.CONF_ROOT
project_conf_path = project_metadata.project_path / conf_root
artifacts = PipelineArtifacts(
package_dir / "pipelines" / pipeline_name,
package_dir.parent / "tests" / "pipelines" / pipeline_name,
project_conf_path / env,
)
return artifacts
def _get_package_artifacts(
source_path: Path, package_name: str
) -> Tuple[Path, Path, Path]:
"""From existing unpacked wheel, returns in order:
source_path, tests_path, config_path
"""
artifacts = (
source_path / package_name,
source_path / "tests",
# package_data (non-python files) needs to live inside one of the packages
source_path / package_name / "config",
)
return artifacts
def _copy_pipeline_tests(pipeline_name: str, result_path: Path, package_dir: Path):
tests_source = result_path / "tests"
tests_target = package_dir.parent / "tests" / "pipelines" / pipeline_name
try:
_sync_dirs(tests_source, tests_target)
finally:
shutil.rmtree(tests_source)
def _copy_pipeline_configs(
result_path: Path, conf_path: Path, skip_config: bool, env: str
):
config_source = result_path / "config"
try:
if not skip_config:
config_target = conf_path / env
_sync_dirs(config_source, config_target)
finally:
shutil.rmtree(config_source)
def _delete_artifacts(*artifacts: Path):
for artifact in artifacts:
click.echo(f"Deleting `{artifact}`: ", nl=False)
try:
if artifact.is_dir():
shutil.rmtree(artifact)
else:
artifact.unlink()
except Exception as exc:
click.secho("FAILED", fg="red")
cls = exc.__class__
raise KedroCliError(f"{cls.__module__}.{cls.__qualname__}: {exc}") from exc
else:
click.secho("OK", fg="green")
def _append_package_reqs(
requirements_in: Path, package_reqs: List[str], pipeline_name: str
) -> None:
"""Appends modular pipeline requirements to project level requirements.in"""
existing_reqs = _safe_parse_requirements(requirements_in.read_text())
incoming_reqs = _safe_parse_requirements(package_reqs)
reqs_to_add = set(incoming_reqs) - set(existing_reqs)
if not reqs_to_add:
return
sorted_reqs = sorted(str(req) for req in reqs_to_add)
sep = "\n"
with open(requirements_in, "a", encoding="utf-8") as file:
file.write(
f"\n\n# Additional requirements from modular pipeline `{pipeline_name}`:\n"
)
file.write(sep.join(sorted_reqs))
click.secho(
f"Added the following requirements from modular pipeline `{pipeline_name}` to "
f"requirements.in:\n{sep.join(sorted_reqs)}"
)
click.secho(
"Use `kedro install --build-reqs` to compile and install the updated list of "
"requirements."
)
def _safe_parse_requirements(
requirements: Union[str, Iterable[str]]
) -> Set[pkg_resources.Requirement]:
"""Safely parse a requirement or set of requirements. This effectively replaces
pkg_resources.parse_requirements, which blows up with a ValueError as soon as it
encounters a requirement it cannot parse (e.g. `-r requirements.txt`). This way
we can still extract all the parseable requirements out of a set containing some
unparseable requirements.
"""
parseable_requirements = set()
for requirement in pkg_resources.yield_lines(requirements):
try:
parseable_requirements.add(pkg_resources.Requirement.parse(requirement))
except ValueError:
continue
return parseable_requirements