Source code for projspec.utils

import contextlib
import enum
import logging
import os
import pathlib
import re
import subprocess
import sys
from collections.abc import Iterable

import toml
import yaml

enum_registry = {}
logger = logging.getLogger("projspec")


[docs] class Enum(enum.Enum): """Named enum values, so that str(x) looks like the label.""" # TODO: does this need explicit deser for JSON? def __repr__(self): return self.name __str__ = __repr__ def __init_subclass__(cls, **kwargs): enum_registry[camel_to_snake(cls.__name__)] = cls @classmethod def snake_name(cls): return camel_to_snake(cls.__name__) def to_dict(self, compact=False): if compact: return self.name return {"klass": ["enum", self.snake_name()], "value": self.value} def __eq__(self, other): if isinstance(other, int): return self.value == other return str(self) == str(other)
def get_enum_class(name): return enum_registry[name]
[docs] class AttrDict(dict): """Contains a dict but allows attribute read access for compliant keys.""" def __init__(self, *data, **kw): dic = False if len(data) == 1 and isinstance(data[0], (tuple, list, dict)): types = {type(_) for _ in data[0]} if isinstance(data[0], dict): super().__init__(data[0]) elif isinstance(data[0], list): if len(types) > 1: raise TypeError("Multiple types ina list") super().__init__({camel_to_snake(next(iter(types)).__name__): data[0]}) elif isinstance(data[0], dict): super().__init__(data[0]) else: dic = True else: dic = True if dic: super().__init__({camel_to_snake(type(v).__name__): v for v in data}) self.update(kw) def __getattr__(self, item): if item in self: return self[item] raise AttributeError(item) def to_dict(self, compact=True): return to_dict(self, compact=compact) def __dir__(self): return sorted(list(super().__dir__()) + list(self))
def to_dict(obj, compact=True): """Make entity into JSON-serialisable dict representation""" if isinstance(obj, dict): return { k: ( v.to_dict(compact=compact) if hasattr(v, "to_dict") else to_dict(v, compact=compact) ) for k, v in obj.items() } if isinstance(obj, (bytes, str)): return obj if isinstance(obj, Iterable): return [to_dict(_, compact=compact) for _ in obj] if hasattr(obj, "to_dict"): return obj.to_dict(compact=compact) return str(obj) def from_dict(dic, proj=None): """Rehydrate the result of to_dict into projspec instances""" from projspec import Project if isinstance(dic, dict): if "klass" in dic: if dic["klass"] == "project": return Project.from_dict(dic) category, name = dic.pop("klass") cls = get_cls(name, category) if category == "enum": return cls(dic["value"]) obj = object.__new__(cls) obj.proj = proj obj.__dict__.update({k: from_dict(v, proj=proj) for k, v in dic.items()}) return obj return AttrDict(**{k: from_dict(v, proj=proj) for k, v in dic.items()}) elif isinstance(dic, list): return [from_dict(_, proj=proj) for _ in dic] else: return dic class IndentDumper(yaml.Dumper): """Helper class to write YAML output with given prefix indent""" def __init__(self, stream, **kw): super().__init__(stream, **kw) self.increase_indent() def increase_indent(self, flow=False, indentless=False): return super().increase_indent(flow, False) cam_patt = re.compile(r"(?<!^)(?=[A-Z])") def camel_to_snake(camel: str) -> str: """CamelCase to snake_case converter""" # https://stackoverflow.com/a/1176023/3821154 return re.sub(cam_patt, "_", camel).lower() def to_camel_case(snake_str: str) -> str: """snake_case to camelCase converter""" # https://stackoverflow.com/a/19053800/3821154 return "".join(x.capitalize() for x in snake_str.lower().split("_")) def _linked_local_path(path): return str(pathlib.Path(path).resolve())
[docs] class IsInstalled: """Checks if we can call commands, as a function of the current environment. Typical usage: >>> "python" in IsInstalled() True Results are cached by command and python executable, so that in the future we may be able to persist these for future sessions. An instance of this class is created at import: ``projspec.utils.is_installed``. """ cache = {} def __init__(self): # or maybe the value of $PATH self.env = _linked_local_path(sys.executable)
[docs] def exists(self, cmd: str, refresh=False): """Test if command can be called by starting a subprocess This is more costly what some PATH lookup (i.e., what `which()` does), but also more rigorous. We cache the result - currently for the session, and eventually persistently. """ if refresh or (self.env, cmd) not in self.cache: try: p = subprocess.Popen( [cmd], stderr=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stdin=subprocess.DEVNULL, ) p.terminate() p.wait() out = True except FileNotFoundError: out = False except subprocess.CalledProcessError: # failed due to missing args, but does exist out = True self.cache[(self.env, cmd)] = out return self.cache[(self.env, cmd)]
def __contains__(self, item): """Allows syntax shortcut of ``"command" in ...``""" # shutil.which? return self.exists(item)
# TODO: persist cache is_installed = IsInstalled() def run_subprocess(cmd, cwd=None, env=None, output=True, popen=False, **kwargs): """Common way to run subprocesses, first checking for command existence This is convenient because command existence can be cached, and so it's much faster to do the lookup and give a reasonable error message. """ # TODO: we want to swap out direct calls to subprocess logger.debug("Running subprocess: %s", cmd) if cmd[0] not in is_installed: from projspec.tools import suggest raise RuntimeError(f"Not installed: {suggest(cmd[0])}") if popen: if "stdout" not in kwargs and output: kwargs["stdout"] = subprocess.PIPE if "stderr" not in kwargs and output: kwargs["stderr"] = subprocess.STDOUT return subprocess.Popen(cmd, cwd=cwd, env=env, **kwargs) # returns CompletedProcess with stdout, stderr as attributes if "stdout" not in kwargs and "stderr" not in kwargs: kwargs.setdefault("capture_output", output) return subprocess.run(cmd, cwd=cwd, env=env, check=True, **kwargs) # {% set sha256 = "fff" %} sj = re.compile(r'{%\s+set\s+(\S+)\s+=\s+"(.*)"\s+%}') def _yaml_no_jinja(fileobj): """Read YAML text from the given file, attempting to evaluate jinja2 templates.""" txt = fileobj.read().decode() lines = [] variables = {} for line in txt.splitlines(): if "{%" in line: if match := sj.search(line): key, var = match.groups() variables[key] = var continue if " # [" in line: line = line[: line.index(" # [")] if "{{" in line and "}}" in line: import jinja2 try: line = jinja2.Template(line).render(variables) done = True except jinja2.exceptions.TemplateError: logging.debug("Jinja Template Error") done = False except ImportError: done = False if not done: # include unrendered template if line.strip()[0] == "-": # list element ind = line.index("-") + 2 end = line[ind:].replace('"', "").replace("\\", "") line = f'{line[:ind]}"{end}"' elif ":" in line: # key element ind = line.index(":") + 2 end = line[ind:].replace('"', "").replace("\\", "") line = f'{line[:ind]}"{end}"' lines.append(line) else: lines.append(line) return yaml.load("\n".join(lines), Loader=yaml.CSafeLoader) def flatten(x: Iterable, out=None): """Descend into dictionaries to return the set of all leaf values""" out = [] if out is None else out if isinstance(x, dict): x = x.values() for item in x: if isinstance(item, dict): flatten(item.values(), out) elif isinstance(item, (str, bytes)): # These are iterables whose items are also iterable, i.e., # the first item of "item" is "i", which is also a string. out.append(item) elif isinstance(item, Iterable): flatten(item, out) else: out.append(item) return out def deep_get(data: dict, path: str | list[str], default=None): """Fetch data from a nested dictionary at a given path.""" if isinstance(path, str): path = path.split(".") for part in path: if part not in data: return default data = data[part] return data def deep_set(data: dict, path: str | list[str], thing) -> None: """Set data in a nested dictionary at a given path.""" if isinstance(path, str): path = path.split(".") for part in path[:-1]: data = data.setdefault(part, {}) data[path[-1]] = thing def sort_version_strings(versions: Iterable[str]) -> list[str]: """Sort typical python package version strings""" def int_or(x): try: return int(x) except ValueError: ma = re.search(r"(\d+)", x) if ma: return int(ma.group(1)) else: return 0.001 return sorted(versions, key=lambda s: [int_or(_) for _ in s.split(".")]) class PickleableTomlDecoder(toml.TomlDecoder): """Allows TOML empty tables to be picklable""" # https://github.com/uiri/toml/issues/362#issuecomment-842665836 def get_empty_inline_table(self): return {} def get_get_cls(registry="proj"): import projspec reg_map = { "proj": projspec.proj.base.registry, "projspec": projspec.proj.base.registry, "content": projspec.content.base.registry, "artifact": projspec.artifact.base.registry, "enum": enum_registry, } return reg_map[registry]
[docs] def get_cls(name: str, registry: str = "proj") -> type: """Find class by name and type name: str Class name in camel case (the typical real name) or snake equivalent registry: projspec|content|artifact|enum Category of class to find """ return get_get_cls(registry)[camel_to_snake(name)]
def spec_class_qnames(registry="proj"): """Useful for generating lists of classes for documentation""" reg = get_get_cls(registry) for s in sorted( ( ".".join([cls.__module__, cls.__name__]).removeprefix("projspec.") for cls in reg.values() ) ): (print(" ", s),) for s in sorted( ( ".. autoclass:: " + ".".join([cls.__module__, cls.__name__]) for cls in reg.values() ) ): print(s)
[docs] def class_infos(): """Gather all the class info for documentation""" import projspec return { "specs": { name: { "doc": cls.__doc__, "link": cls.spec_doc, "icon": getattr(cls, "icon", None), "create": cls._create is not projspec.ProjectSpec._create, } for name, cls in projspec.proj.base.registry.items() }, "content": { name: { "doc": cls.__doc__, "icon": getattr(cls, "icon", None), } for name, cls in projspec.content.base.registry.items() }, "artifact": { name: { "doc": cls.__doc__, "icon": getattr(cls, "icon", None), } for name, cls in projspec.artifact.base.registry.items() }, "enum": {name: {"doc": cls.__doc__} for name, cls in enum_registry.items()}, }
[docs] @contextlib.contextmanager def make_and_copy(path, sub=None, mkdir=False): """Provide a temporary directory to create and write into, and then copy to destination""" # TODO: path could be remote, add optional fs= rather than assume local import fsspec import tempfile import uuid fs = fsspec.filesystem("file") tmp = f"{tempfile.mkdtemp()}/{uuid.uuid4()}/" if mkdir: fs.mkdir(tmp) yield tmp src = f"{tmp}/{sub}/*" if sub else tmp fs.copy(src, path, recursive=True) fs.rm(tmp, recursive=True)
def _ipynb_to_py(data: str) -> str: """Read only executable code from an ipython notebook. This is roughly equivalent to an ipynb->py conversion. """ import json everything = json.loads(data) assert "nbformat" in everything, "Not an ipython notebook" return "\n\n###\n\n".join( ["".join(_["source"]) for _ in everything["cells"] if _["cell_type"] == "code"] )