"""Data/ML workflow specs: dbt, Quarto, Prefect, Dagster, Kedro, Airflow, Snakemake, Nox, Metaflow, MLFlow."""
import os
import re
import yaml
from projspec.proj.base import ParseFailed, ProjectSpec
from projspec.utils import AttrDict
[docs]
class Dbt(ProjectSpec):
"""dbt (data build tool) project.
dbt is used for data ingestion, validation, and transform.
The spec dbt about the context of your project and how to transform your data
(build your data sets).
"""
icon = "ποΈ"
spec_doc = "https://docs.getdbt.com/reference/dbt_project.yml"
def match(self) -> bool:
return "dbt_project.yml" in self.proj.basenames
def parse(self) -> None:
from projspec.artifact.process import Process
from projspec.content.executable import Command
from projspec.content.metadata import DescriptiveMetadata
try:
with self.proj.get_file("dbt_project.yml") as f:
cfg = yaml.safe_load(f)
except Exception as exc:
raise ParseFailed(f"Could not read dbt_project.yml: {exc}") from exc
if not isinstance(cfg, dict):
raise ParseFailed("dbt_project.yml did not parse to a mapping")
meta: dict[str, str] = {}
for key in ("name", "version", "profile"):
if val := cfg.get(key):
meta[key] = str(val)
conts = AttrDict()
if meta:
conts["descriptive_metadata"] = DescriptiveMetadata(
proj=self.proj, meta=meta
)
# Standard dbt commands
dbt_cmds = {
"run": ["dbt", "run"],
"test": ["dbt", "test"],
"build": ["dbt", "build"],
"compile": ["dbt", "compile"],
"docs_generate": ["dbt", "docs", "generate"],
"docs_serve": ["dbt", "docs", "serve"],
"seed": ["dbt", "seed"],
"snapshot": ["dbt", "snapshot"],
"source_freshness": ["dbt", "source", "freshness"],
}
cmds = AttrDict()
arts = AttrDict()
for name, cmd in dbt_cmds.items():
cmds[name] = Command(proj=self.proj, cmd=cmd)
arts[name] = Process(proj=self.proj, cmd=cmd)
conts["command"] = cmds
self._contents = conts
self._artifacts = AttrDict(process=arts)
@staticmethod
def _create(path: str) -> None:
"""Scaffold a minimal dbt project."""
name = os.path.basename(path)
with open(os.path.join(path, "dbt_project.yml"), "wt") as f:
f.write(
f"name: '{name}'\n"
"version: '1.0.0'\n"
"config-version: 2\n"
"\n"
"profile: 'default'\n"
"\n"
"model-paths: ['models']\n"
"seed-paths: ['seeds']\n"
"test-paths: ['tests']\n"
"snapshot-paths: ['snapshots']\n"
"\n"
"models:\n"
f" {name}:\n"
" +materialized: view\n"
)
os.makedirs(os.path.join(path, "models"), exist_ok=True)
with open(os.path.join(path, "models", "example.sql"), "wt") as f:
f.write("SELECT 1 AS id, 'hello' AS greeting\n")
[docs]
class Quarto(ProjectSpec):
"""Quarto publishing system project."""
icon = "π"
spec_doc = "https://quarto.org/docs/reference/projects/core.html"
def match(self) -> bool:
if (
"_quarto.yml" in self.proj.basenames
or "_quarto.yaml" in self.proj.basenames
):
return True
return any(n.endswith(".qmd") for n in self.proj.basenames)
def parse(self) -> None:
from projspec.artifact.infra import StaticSite
from projspec.artifact.process import Server
from projspec.content.metadata import DescriptiveMetadata
cfg: dict = {}
for fname in ("_quarto.yml", "_quarto.yaml"):
if fname in self.proj.basenames:
try:
with self.proj.get_file(fname) as f:
cfg = yaml.safe_load(f) or {}
except Exception:
pass
break
meta: dict[str, str] = {}
project = cfg.get("project", {})
for key in ("title", "type"):
if val := project.get(key):
meta[key] = str(val)
book = cfg.get("book", {})
for key in ("title", "author"):
if val := book.get(key):
meta[key] = str(val)
conts = AttrDict()
if meta:
conts["descriptive_metadata"] = DescriptiveMetadata(
proj=self.proj, meta=meta
)
output_dir = project.get("output-dir", "_site")
arts = AttrDict(
render=StaticSite(
proj=self.proj,
cmd=["quarto", "render"],
fn=f"{self.proj.url}/{output_dir}/index.html",
),
preview=Server(
proj=self.proj,
cmd=["quarto", "preview"],
),
)
self._contents = conts
self._artifacts = arts
@staticmethod
def _create(path: str) -> None:
"""Scaffold a minimal Quarto project."""
name = os.path.basename(path)
with open(os.path.join(path, "_quarto.yml"), "wt") as f:
f.write(
"project:\n"
" type: website\n"
"\n"
"website:\n"
f' title: "{name}"\n'
" navbar:\n"
" left:\n"
" - href: index.qmd\n"
" text: Home\n"
"\n"
"format:\n"
" html:\n"
" theme: cosmo\n"
)
with open(os.path.join(path, "index.qmd"), "wt") as f:
f.write(
"---\n"
f'title: "{name}"\n'
"---\n"
"\n"
"Welcome to this Quarto project.\n"
)
[docs]
class Prefect(ProjectSpec):
"""Prefect workflow orchestration project."""
icon = "π§"
spec_doc = "https://docs.prefect.io/v3/deploy/infrastructure-concepts/prefect-yaml"
def match(self) -> bool:
return "prefect.yaml" in self.proj.basenames
def parse(self) -> None:
from projspec.artifact.process import Process
from projspec.content.cicd import PipelineStage
from projspec.content.executable import Command
from projspec.content.metadata import DescriptiveMetadata
try:
with self.proj.get_file("prefect.yaml") as f:
cfg = yaml.safe_load(f)
except Exception as exc:
raise ParseFailed(f"Could not read prefect.yaml: {exc}") from exc
if not isinstance(cfg, dict):
raise ParseFailed("prefect.yaml did not parse to a mapping")
meta: dict[str, str] = {}
if name := cfg.get("name"):
meta["name"] = str(name)
conts = AttrDict()
if meta:
conts["descriptive_metadata"] = DescriptiveMetadata(
proj=self.proj, meta=meta
)
# Deployments become pipeline stages
deployments = cfg.get("deployments", [])
stages = AttrDict()
arts = AttrDict()
cmds = AttrDict()
for dep in deployments:
if not isinstance(dep, dict):
continue
dep_name = dep.get("name", "default")
entrypoint = dep.get("entrypoint", "")
stages[dep_name] = PipelineStage(
proj=self.proj,
name=dep_name,
cmd=["prefect", "deployment", "run", dep_name],
)
deploy_cmd = ["prefect", "deploy", "--name", dep_name]
cmds[dep_name] = Command(proj=self.proj, cmd=deploy_cmd)
arts[dep_name] = Process(proj=self.proj, cmd=deploy_cmd)
if stages:
conts["pipeline_stage"] = stages
if cmds:
conts["command"] = cmds
# Generic run command
arts["run"] = Process(proj=self.proj, cmd=["prefect", "run"])
self._contents = conts
self._artifacts = AttrDict(process=arts)
[docs]
class Dagster(ProjectSpec):
"""Dagster data orchestration project."""
icon = "πΊοΈ"
spec_doc = "https://docs.dagster.io/api/python-api/workspace"
def match(self) -> bool:
if self.proj.pyproject.get("tool", {}).get("dagster"):
return True
return bool(
{"dagster.yaml", "workspace.yaml"}.intersection(self.proj.basenames)
)
def parse(self) -> None:
from projspec.artifact.process import Process, Server
from projspec.content.executable import Command
from projspec.content.metadata import DescriptiveMetadata
meta: dict[str, str] = {}
dagster_cfg = self.proj.pyproject.get("tool", {}).get("dagster", {})
if isinstance(dagster_cfg, dict):
if module := dagster_cfg.get("module_name"):
meta["module"] = str(module)
conts = AttrDict()
if meta:
conts["descriptive_metadata"] = DescriptiveMetadata(
proj=self.proj, meta=meta
)
# Core commands
dbt_cmds = {
"dev": ["dagster", "dev"],
"materialize": ["dagster", "asset", "materialize", "--select", "*"],
}
cmds = AttrDict()
arts = AttrDict()
for name, cmd in dbt_cmds.items():
cmds[name] = Command(proj=self.proj, cmd=cmd)
arts["dev"] = Server(proj=self.proj, cmd=["dagster", "dev"])
arts["materialize"] = Process(
proj=self.proj,
cmd=["dagster", "asset", "materialize", "--select", "*"],
)
conts["command"] = cmds
self._contents = conts
self._artifacts = arts
[docs]
class Kedro(ProjectSpec):
"""Kedro data science pipeline project."""
icon = "πΈοΈ"
spec_doc = "https://docs.kedro.org/en/stable/kedro_project_setup/settings.html"
def match(self) -> bool:
return bool(self.proj.pyproject.get("tool", {}).get("kedro"))
def parse(self) -> None:
from projspec.artifact.process import Process, Server
from projspec.content.cicd import PipelineStage
from projspec.content.executable import Command
from projspec.content.metadata import DescriptiveMetadata
kedro_cfg = self.proj.pyproject.get("tool", {}).get("kedro", {})
meta: dict[str, str] = {}
for key in ("package_name", "project_name", "kedro_init_version"):
if val := kedro_cfg.get(key):
meta[key] = str(val)
conts = AttrDict()
if meta:
conts["descriptive_metadata"] = DescriptiveMetadata(
proj=self.proj, meta=meta
)
# Look for pipeline definitions in src/<package>/pipelines/
package_name = kedro_cfg.get("package_name", "")
pipeline_names: list[str] = []
if package_name:
pipelines_dir = f"{self.proj.url}/src/{package_name}/pipelines"
try:
entries = self.proj.fs.ls(pipelines_dir, detail=False)
pipeline_names = [
os.path.basename(e)
for e in entries
if self.proj.fs.isdir(e) and not os.path.basename(e).startswith("_")
]
except Exception:
pass
cmds = AttrDict()
arts = AttrDict()
stages = AttrDict()
# Default pipeline
cmds["run"] = Command(proj=self.proj, cmd=["kedro", "run"])
arts["run"] = Process(proj=self.proj, cmd=["kedro", "run"])
for pipeline in pipeline_names:
cmd = ["kedro", "run", "--pipeline", pipeline]
cmds[pipeline] = Command(proj=self.proj, cmd=cmd)
arts[pipeline] = Process(proj=self.proj, cmd=cmd)
stages[pipeline] = PipelineStage(proj=self.proj, name=pipeline, cmd=cmd)
arts["viz"] = Server(proj=self.proj, cmd=["kedro", "viz", "run"])
if stages:
conts["pipeline_stage"] = stages
conts["command"] = cmds
self._contents = conts
self._artifacts = arts
[docs]
class Airflow(ProjectSpec):
"""Apache Airflow workflow orchestration project/DAG spec."""
icon = "π¬οΈ"
spec_doc = (
"https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dags.html"
)
def match(self) -> bool:
dags_dir = f"{self.proj.url}/dags"
try:
if not self.proj.fs.isdir(dags_dir):
return False
entries = self.proj.fs.ls(dags_dir, detail=False)
return any(e.endswith(".py") for e in entries)
except Exception:
return False
def parse(self) -> None:
from projspec.artifact.process import Process, Server
from projspec.content.cicd import PipelineStage
from projspec.content.executable import Command
dags_dir = f"{self.proj.url}/dags"
try:
entries = self.proj.fs.ls(dags_dir, detail=False)
except Exception as exc:
raise ParseFailed(f"Could not list dags/: {exc}") from exc
stages = AttrDict()
for entry in entries:
if not entry.endswith(".py"):
continue
dag_name = os.path.basename(entry).replace(".py", "")
if dag_name.startswith("_"):
continue
# Try to extract dag_id from file content
try:
with self.proj.fs.open(entry, "rt") as f:
content = f.read()
dag_ids = re.findall(r'dag_id\s*=\s*["\']([^"\']+)["\']', content)
for dag_id in dag_ids:
stages[dag_id] = PipelineStage(
proj=self.proj,
name=dag_id,
cmd=["airflow", "dags", "trigger", dag_id],
)
except Exception:
stages[dag_name] = PipelineStage(
proj=self.proj,
name=dag_name,
cmd=["airflow", "dags", "trigger", dag_name],
)
cmds = AttrDict(
standalone=Command(proj=self.proj, cmd=["airflow", "standalone"]),
scheduler=Command(proj=self.proj, cmd=["airflow", "scheduler"]),
webserver=Command(proj=self.proj, cmd=["airflow", "webserver"]),
)
arts = AttrDict(
standalone=Process(proj=self.proj, cmd=["airflow", "standalone"]),
webserver=Server(
proj=self.proj, cmd=["airflow", "webserver", "--port", "8080"]
),
)
conts = AttrDict(command=cmds)
if stages:
conts["pipeline_stage"] = stages
self._contents = conts
self._artifacts = arts
[docs]
class Snakemake(ProjectSpec):
"""Snakemake workflow management system project."""
icon = "π"
spec_doc = (
"https://snakemake.readthedocs.io/en/stable/snakefiles/configuration.html"
)
def match(self) -> bool:
if "Snakefile" in self.proj.basenames:
return True
# also detect workflow/Snakefile layout
workflow_snakefile = f"{self.proj.url}/workflow/Snakefile"
try:
return self.proj.fs.isfile(workflow_snakefile)
except Exception:
return False
def parse(self) -> None:
from projspec.artifact.process import Process
from projspec.content.cicd import PipelineStage
from projspec.content.executable import Command
# Determine snakefile path
if "Snakefile" in self.proj.basenames:
snakefile_path = "Snakefile"
else:
snakefile_path = "workflow/Snakefile"
# Parse rules from snakefile
rule_names: list[str] = []
try:
with self.proj.get_file(snakefile_path) as f:
content = f.read()
rule_names = re.findall(r"^rule\s+(\w+)\s*:", content, re.MULTILINE)
except Exception:
pass # rules are optional β we still expose the run command
cmds = AttrDict()
arts = AttrDict()
stages = AttrDict()
# Generic run command
run_cmd = ["snakemake", "--cores", "all"]
cmds["run"] = Command(proj=self.proj, cmd=run_cmd)
arts["run"] = Process(proj=self.proj, cmd=run_cmd)
for rule in rule_names:
if rule in ("all", "clean"):
continue
cmd = ["snakemake", rule, "--cores", "all"]
cmds[rule] = Command(proj=self.proj, cmd=cmd)
stages[rule] = PipelineStage(proj=self.proj, name=rule, cmd=cmd)
if stages:
self._contents = AttrDict(command=cmds, pipeline_stage=stages)
else:
self._contents = AttrDict(command=cmds)
self._artifacts = AttrDict(process=arts)
[docs]
class Nox(ProjectSpec):
"""Nox Python automation project.
Often used for testing, linting, and packaging. Nox is a Python
environment management tool that allows you to define multiple
CI runs in one execution.
"""
icon = "π§ͺ"
spec_doc = "https://nox.thea.codes/en/stable/config.html"
def match(self) -> bool:
return "noxfile.py" in self.proj.basenames
def parse(self) -> None:
from projspec.artifact.process import Process
from projspec.content.executable import Command
# Discover session names via regex on noxfile.py
session_names: list[str] = []
try:
with self.proj.get_file("noxfile.py") as f:
content = f.read()
# Sessions are decorated functions: @nox.session or @session
session_names = re.findall(
r"@(?:nox\.)?session(?:\([^)]*\))?\s+def\s+(\w+)",
content,
re.MULTILINE,
)
except Exception:
pass
cmds = AttrDict()
arts = AttrDict()
if not session_names:
cmds["nox"] = Command(proj=self.proj, cmd=["nox"])
arts["nox"] = Process(proj=self.proj, cmd=["nox"])
else:
for name in session_names:
cmd = ["nox", "-s", name]
cmds[name] = Command(proj=self.proj, cmd=cmd)
arts[name] = Process(proj=self.proj, cmd=cmd)
self._contents = AttrDict(command=cmds)
self._artifacts = AttrDict(process=arts)
@staticmethod
def _create(path: str) -> None:
"""Scaffold a minimal noxfile.py."""
with open(os.path.join(path, "noxfile.py"), "wt") as f:
f.write(
"import nox\n"
"\n"
"\n"
"@nox.session\n"
"def tests(session):\n"
' """Run the test suite."""\n'
" session.install('pytest')\n"
" session.run('pytest')\n"
"\n"
"\n"
"@nox.session\n"
"def lint(session):\n"
' """Lint the code."""\n'
" session.install('ruff')\n"
" session.run('ruff', 'check', '.')\n"
)
[docs]
class MLFlow(ProjectSpec):
"""MLflow project, defined by an `MLproject` (or `MLFlow`) file.
An MLproject file is a YAML document that declares the project name,
the environment (conda or pip), and one or more named entry points.
"""
icon = "π"
spec_doc = (
"https://mlflow.org/docs/latest/ml/projects/#mlproject-file-configuration"
)
def match(self) -> bool:
return "MLproject" in self.proj.basenames or "MLFlow" in self.proj.basenames
def parse(self) -> None:
from projspec.artifact.process import Process
from projspec.content.environment import Environment, Precision, Stack
from projspec.content.executable import Command
fname = "MLproject" if "MLproject" in self.proj.basenames else "MLFlow"
with self.proj.fs.open(self.proj.basenames[fname], "rt") as f:
meta = yaml.safe_load(f)
if "python_env" in meta:
with self.proj.get_file(meta["python_env"], text=True) as f:
env = yaml.safe_load(f)
self._contents["environment"] = Environment(
stack=Stack.PIP,
precision=Precision.SPEC,
packages=env.get("dependencies", [])
+ [f"python {env.get('python', '')}"],
proj=self.proj,
)
elif "conda_env" in meta:
with self.proj.get_file(meta["conda_env"], text=True) as f:
env = yaml.safe_load(f)
self._contents["environment"] = Environment(
stack=Stack.CONDA,
precision=Precision.SPEC,
packages=env.get("dependencies", []),
channels=env.get("channels"),
proj=self.proj,
)
cmds = AttrDict()
arts = AttrDict()
for name, ep in meta.get("entry_points", {}).items():
cmds[name] = Command(proj=self.proj, cmd=ep["command"])
arts[name] = Process(proj=self.proj, cmd=["mlflow", "run", ".", "-e", name])
if cmds:
self._contents["command"] = cmds
if arts:
self._artifacts = AttrDict(process=arts)
if self._contents is None:
self._contents = AttrDict()
if self._artifacts is None:
self._artifacts = AttrDict()
@staticmethod
def _create(path: str) -> None:
with open(f"{path}/MLproject", "w") as f:
f.write(
"name: tutorial\n"
"\n"
"conda_env: conda.yaml\n"
"\n"
"entry_points:\n"
" main:\n"
" parameters:\n"
" alpha: {type: float, default: 0.5}\n"
" l1_ratio: {type: float, default: 0.1}\n"
' command: "python train.py {alpha} {l1_ratio}"\n'
)
with open(f"{path}/conda.yaml", "w") as f:
f.write(
"name: ml-project\n"
"channels:\n"
" - conda-forge\n"
"dependencies:\n"
" - python=3.9\n"
)
with open(f"{path}/train.py", "w") as f:
f.write("# MLFlow training script\n")