#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Canonical task model + YAML loader/validator/writer for scitex-todo.
The task store is a YAML document with a top-level ``tasks:`` list. Each
task is a mapping with ``id`` + ``title`` + ``status`` (required) and
optional ``repo`` / ``depends_on`` / ``blocks`` / ``note`` / ``priority`` /
``parent`` fields. ``priority`` is an explicit integer rank (lower = higher
priority); when absent, document order is the implicit ordering. ``parent``
is an optional task-id string that nests this task under another node — a
task's children are tasks whose ``parent`` equals this task's ``id`` (the
board's drill-down view follows this relation).
This module is the single validation gate: ``load_tasks`` raises
``TaskValidationError`` on a malformed store (missing id/title, duplicate
id, invalid status, non-integer priority, non-string parent) so downstream
adapters can assume well-formed input. ``save_tasks`` re-runs the same gate
before writing back and preserves the hand-written YAML comments +
structure via ruamel.yaml.
"""
from __future__ import annotations
from pathlib import Path
import yaml
# Valid task statuses. ``goal`` marks a north-star objective (rendered gold);
# the rest are ordinary execution states.
VALID_STATUSES: tuple[str, ...] = (
"goal",
"pending",
"in_progress",
"blocked",
"done",
"deferred",
"failed",
)
[docs]
class TaskValidationError(ValueError):
"""Raised when a task store fails structural validation."""
[docs]
def load_tasks(path: str | Path) -> list[dict]:
"""Load and validate the task list from a YAML store.
Parameters
----------
path : str or pathlib.Path
Path to the YAML task store. The document must have a top-level
``tasks:`` list.
Returns
-------
list of dict
The validated task mappings, in document order.
Raises
------
FileNotFoundError
If ``path`` does not exist.
TaskValidationError
If the store is structurally invalid: ``tasks`` is not a list, a
task is missing ``id`` or ``title``, an ``id`` is duplicated, a
``status`` is not in :data:`VALID_STATUSES`, or a ``priority`` is
present but not an integer.
Examples
--------
>>> tasks = load_tasks("tasks.yaml") # doctest: +SKIP
>>> tasks[0]["id"] # doctest: +SKIP
'design'
"""
path = Path(path).expanduser()
if not path.exists():
raise FileNotFoundError(f"task store not found: {path}")
with path.open(encoding="utf-8") as handle:
data = yaml.safe_load(handle) or {}
tasks = data.get("tasks")
_validate_tasks(tasks, source=str(path))
return tasks
def _validate_tasks(tasks: object, source: str) -> None:
"""Validate a task list in place, raising on the first structural fault.
The single gate shared by :func:`load_tasks` (read side) and
:func:`save_tasks` (write side) so a bad mutation can never round-trip
through the writer.
Parameters
----------
tasks : object
The candidate ``tasks`` value (must be a list of mappings).
source : str
A label for error messages (the store path or ``"<save_tasks>"``).
Raises
------
TaskValidationError
On any structural fault — see :func:`load_tasks`.
"""
if not isinstance(tasks, list):
raise TaskValidationError(f"{source}: top-level 'tasks' must be a list")
seen: set[str] = set()
for task in tasks:
if not isinstance(task, dict):
raise TaskValidationError(
f"{source}: each task must be a mapping: {task!r}"
)
tid = task.get("id")
if not tid:
raise TaskValidationError(
f"{source}: a task is missing required 'id': {task!r}"
)
if tid in seen:
raise TaskValidationError(f"{source}: duplicate task id {tid!r}")
seen.add(tid)
if not task.get("title"):
raise TaskValidationError(
f"{source}: task {tid!r} is missing required 'title'"
)
status = task.get("status")
if status not in VALID_STATUSES:
raise TaskValidationError(
f"{source}: task {tid!r} has invalid status {status!r}; "
f"must be one of {VALID_STATUSES}"
)
priority = task.get("priority")
# bool is an int subclass — reject it explicitly so `priority: true`
# is a clear error rather than a silent 1.
if priority is not None and (
isinstance(priority, bool) or not isinstance(priority, int)
):
raise TaskValidationError(
f"{source}: task {tid!r} has non-integer priority {priority!r}; "
f"priority must be an integer or absent"
)
# `parent` is the additive-optional nesting field — a task's children
# are tasks whose `parent` equals this id. Validate type only (must be
# a non-empty string id when present); we do NOT require the
# referenced parent to exist or to be acyclic here. Stale/cyclic
# references are gracefully degraded by the consumers (server-side
# graph builder and frontend drill-down) — same lenient stance as
# `depends_on` / `blocks` references to unknown ids, which are dropped
# rather than rejected.
parent = task.get("parent")
if parent is not None and not (isinstance(parent, str) and parent):
raise TaskValidationError(
f"{source}: task {tid!r} has non-string parent {parent!r}; "
f"parent must be a task id string or absent"
)
# `comments` is an append-only thread of user/agent remarks, distinct
# from the descriptive `note`. Each entry must be a mapping with a
# non-empty string `text`; `ts` / `author` are optional strings the
# server fills in (ISO timestamp + commenter). Validate the shape only
# so a malformed comment can't round-trip, staying lenient otherwise.
comments = task.get("comments")
if comments is not None:
if not isinstance(comments, list):
raise TaskValidationError(
f"{source}: task {tid!r} has non-list comments "
f"{comments!r}; comments must be a list or absent"
)
for entry in comments:
if not isinstance(entry, dict) or not (
isinstance(entry.get("text"), str) and entry.get("text")
):
raise TaskValidationError(
f"{source}: task {tid!r} has an invalid comment "
f"{entry!r}; each comment must be a mapping with a "
f"non-empty string 'text'"
)
[docs]
def save_tasks(tasks: list[dict], path: str | Path) -> None:
"""Validate then write a task list back to a YAML store, preserving comments.
Re-runs the same validation gate as :func:`load_tasks` *before* touching
disk, so a malformed mutation can never corrupt the store. Uses
``ruamel.yaml`` round-trip mode so hand-written comments and key layout in
the existing store survive the rewrite.
Parameters
----------
tasks : list of dict
The (already-mutated) task mappings to persist. Validated first.
path : str or pathlib.Path
Destination store. If it already exists, its comments + structure are
preserved and only the ``tasks:`` payload is updated; otherwise a
fresh document is written.
Raises
------
TaskValidationError
If ``tasks`` fails structural validation (nothing is written).
Examples
--------
>>> tasks = load_tasks("tasks.yaml") # doctest: +SKIP
>>> tasks[0]["priority"] = 1 # doctest: +SKIP
>>> save_tasks(tasks, "tasks.yaml") # doctest: +SKIP
"""
from ruamel.yaml import YAML
path = Path(path).expanduser()
_validate_tasks(tasks, source="<save_tasks>")
yaml_rt = YAML()
yaml_rt.preserve_quotes = True
# Match the bundled store's hand layout (two-space block indent, lists
# indented under their key) so a round-trip is a minimal diff.
yaml_rt.indent(mapping=2, sequence=4, offset=2)
existing_doc = None
if path.exists():
with path.open(encoding="utf-8") as handle:
loaded = yaml_rt.load(handle)
if isinstance(loaded, dict):
existing_doc = loaded
if existing_doc is not None:
# Merge the caller's task data into the round-trip-loaded structure by
# id, so per-item and inline comments attached to the original nodes
# survive. New ids are appended; removed ids are dropped.
doc = existing_doc
old_seq = doc.get("tasks") if isinstance(doc.get("tasks"), list) else []
old_by_id = {t["id"]: t for t in old_seq if isinstance(t, dict) and t.get("id")}
merged = _merge_tasks_into_seq(tasks, old_by_id)
doc["tasks"] = merged
else:
# No existing store (or a non-mapping top level): write a fresh doc.
doc = {"tasks": tasks}
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8") as handle:
yaml_rt.dump(doc, handle)
def _merge_tasks_into_seq(tasks: list[dict], old_by_id: dict) -> list:
"""Build the new task sequence, reusing comment-bearing old nodes by id.
For each task in ``tasks``: if an old node with the same id exists, mutate
that node (so its attached comments survive) by syncing keys to the new
data; otherwise use the new mapping as-is. Order follows ``tasks``.
"""
merged: list = []
for task in tasks:
old = old_by_id.get(task.get("id"))
if old is None:
merged.append(task)
continue
# Sync the old comment-bearing node's keys to the new values.
for key, value in task.items():
old[key] = value
for stale_key in [k for k in list(old.keys()) if k not in task]:
del old[stale_key]
merged.append(old)
return merged
# EOF