#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Canonical task model + YAML loader/validator/writer for scitex-todo.
The task store is a YAML document with a top-level ``tasks:`` list. Each
task is a mapping with ``id`` + ``title`` + ``status`` (required) and
optional ``repo`` / ``depends_on`` / ``blocks`` / ``note`` / ``priority`` /
``parent`` fields. ``priority`` is an explicit integer rank (lower = higher
priority); when absent, document order is the implicit ordering. ``parent``
is an optional task-id string that nests this task under another node — a
task's children are tasks whose ``parent`` equals this task's ``id`` (the
board's drill-down view follows this relation).
This module is the single validation gate: ``load_tasks`` raises
``TaskValidationError`` on a malformed store (missing id/title, duplicate
id, invalid status, non-integer priority, non-string parent) so downstream
adapters can assume well-formed input. ``save_tasks`` re-runs the same gate
before writing back and preserves the hand-written YAML comments +
structure via ruamel.yaml.
"""
from __future__ import annotations
import contextlib
import fcntl
from pathlib import Path
import yaml
# Valid task statuses. ``goal`` marks a north-star objective (rendered gold);
# the rest are ordinary execution states.
VALID_STATUSES: tuple[str, ...] = (
"goal",
"pending",
"in_progress",
"blocked",
"done",
"deferred",
"failed",
)
[docs]
class TaskValidationError(ValueError):
"""Raised when a task store fails structural validation."""
def load_tasks(path: str | Path) -> list[dict]:
"""Load and validate the task list from a YAML store.
Parameters
----------
path : str or pathlib.Path
Path to the YAML task store. The document must have a top-level
``tasks:`` list.
Returns
-------
list of dict
The validated task mappings, in document order.
Raises
------
FileNotFoundError
If ``path`` does not exist.
TaskValidationError
If the store is structurally invalid: ``tasks`` is not a list, a
task is missing ``id`` or ``title``, an ``id`` is duplicated, a
``status`` is not in :data:`VALID_STATUSES`, or a ``priority`` is
present but not an integer.
Examples
--------
>>> tasks = load_tasks("tasks.yaml") # doctest: +SKIP
>>> tasks[0]["id"] # doctest: +SKIP
'design'
"""
path = Path(path).expanduser()
if not path.exists():
raise FileNotFoundError(f"task store not found: {path}")
with path.open(encoding="utf-8") as handle:
data = yaml.safe_load(handle) or {}
tasks = data.get("tasks")
_validate_tasks(tasks, source=str(path))
return tasks
def _validate_tasks(tasks: object, source: str) -> None:
"""Validate a task list in place, raising on the first structural fault.
The single gate shared by :func:`load_tasks` (read side) and
:func:`save_tasks` (write side) so a bad mutation can never round-trip
through the writer.
Parameters
----------
tasks : object
The candidate ``tasks`` value (must be a list of mappings).
source : str
A label for error messages (the store path or ``"<save_tasks>"``).
Raises
------
TaskValidationError
On any structural fault — see :func:`load_tasks`.
"""
if not isinstance(tasks, list):
raise TaskValidationError(f"{source}: top-level 'tasks' must be a list")
seen: set[str] = set()
for task in tasks:
if not isinstance(task, dict):
raise TaskValidationError(
f"{source}: each task must be a mapping: {task!r}"
)
tid = task.get("id")
if not tid:
raise TaskValidationError(
f"{source}: a task is missing required 'id': {task!r}"
)
if tid in seen:
raise TaskValidationError(f"{source}: duplicate task id {tid!r}")
seen.add(tid)
if not task.get("title"):
raise TaskValidationError(
f"{source}: task {tid!r} is missing required 'title'"
)
status = task.get("status")
if status not in VALID_STATUSES:
raise TaskValidationError(
f"{source}: task {tid!r} has invalid status {status!r}; "
f"must be one of {VALID_STATUSES}"
)
priority = task.get("priority")
# bool is an int subclass — reject it explicitly so `priority: true`
# is a clear error rather than a silent 1.
if priority is not None and (
isinstance(priority, bool) or not isinstance(priority, int)
):
raise TaskValidationError(
f"{source}: task {tid!r} has non-integer priority {priority!r}; "
f"priority must be an integer or absent"
)
# `parent` is the additive-optional nesting field — a task's children
# are tasks whose `parent` equals this id. Validate type only (must be
# a non-empty string id when present); we do NOT require the
# referenced parent to exist or to be acyclic here. Stale/cyclic
# references are gracefully degraded by the consumers (server-side
# graph builder and frontend drill-down) — same lenient stance as
# `depends_on` / `blocks` references to unknown ids, which are dropped
# rather than rejected.
parent = task.get("parent")
if parent is not None and not (isinstance(parent, str) and parent):
raise TaskValidationError(
f"{source}: task {tid!r} has non-string parent {parent!r}; "
f"parent must be a task id string or absent"
)
# `comments` is an append-only thread of user/agent remarks, distinct
# from the descriptive `note`. Each entry must be a mapping with a
# non-empty string `text`; `ts` / `author` are optional strings the
# server fills in (ISO timestamp + commenter). Validate the shape only
# so a malformed comment can't round-trip, staying lenient otherwise.
comments = task.get("comments")
if comments is not None:
if not isinstance(comments, list):
raise TaskValidationError(
f"{source}: task {tid!r} has non-list comments "
f"{comments!r}; comments must be a list or absent"
)
for entry in comments:
if not isinstance(entry, dict) or not (
isinstance(entry.get("text"), str) and entry.get("text")
):
raise TaskValidationError(
f"{source}: task {tid!r} has an invalid comment "
f"{entry!r}; each comment must be a mapping with a "
f"non-empty string 'text'"
)
# `scope` and `assignee` are additive-optional shared-fleet fields
# (PHASE 1, Req 1 in GITIGNORED/ARCHITECTURE.md). Both are free-form
# non-empty strings — no enum, no referential integrity. Convention is
# `agent:<name>` / `project:<name>` / `private` but that's a
# docs/skills convention, not enforced here (Req 8: be generic).
for label in ("scope", "assignee"):
value = task.get(label)
if value is not None and not (isinstance(value, str) and value):
raise TaskValidationError(
f"{source}: task {tid!r} has non-string {label} {value!r}; "
f"{label} must be a non-empty string or absent"
)
# `_log_meta` is an opaque event-stamp mapping written by
# `complete_task` etc. Keep it open-shaped — Phase 2 progress-history
# adapter shapes the keys. We only enforce "if present, it's a
# mapping" so a stray scalar can't corrupt downstream readers.
log_meta = task.get("_log_meta")
if log_meta is not None and not isinstance(log_meta, dict):
raise TaskValidationError(
f"{source}: task {tid!r} has non-mapping _log_meta "
f"{log_meta!r}; _log_meta must be a mapping or absent"
)
@contextlib.contextmanager
def _store_lock(path: Path):
"""Hold an exclusive `fcntl.flock` on a sibling `.<name>.lock` file.
Phase 1 prerequisite for the cross-host sync substrate (Req 2): two
concurrent writers — say a CLI verb and the board's `/priority` POST
handler — must serialize so the YAML payload they write is atomic at
the task-list granularity. We hold the lock on a separate `.lock`
sentinel file rather than on the store itself so we don't fight the
ruamel YAML reader/writer that re-opens the path.
The lock file is created if missing, never removed (next caller reuses
it). Empty mode is fine — only the lockf state matters.
Parameters
----------
path : Path
The store path (e.g. ``~/.scitex/todo/tasks.yaml``). The lock
sentinel sits next to it as ``.tasks.yaml.lock``.
Yields
------
None
After the lock is held; released on context exit (even on errors).
"""
path = Path(path)
lock_path = path.parent / f".{path.name}.lock"
lock_path.parent.mkdir(parents=True, exist_ok=True)
# `O_CREAT|O_RDWR` semantics via `open("a+")` — `a+` works even on
# FS that lack `O_EXLOCK` (e.g. WSL2 ext4) because we acquire the
# advisory lock via `fcntl.flock` after the open.
fd = lock_path.open("a+")
try:
fcntl.flock(fd.fileno(), fcntl.LOCK_EX)
yield
finally:
try:
fcntl.flock(fd.fileno(), fcntl.LOCK_UN)
finally:
fd.close()
def save_tasks(tasks: list[dict], path: str | Path) -> None:
"""Validate then write a task list back to a YAML store, preserving comments.
Re-runs the same validation gate as :func:`load_tasks` *before* touching
disk, so a malformed mutation can never corrupt the store. Uses
``ruamel.yaml`` round-trip mode so hand-written comments and key layout in
the existing store survive the rewrite.
Parameters
----------
tasks : list of dict
The (already-mutated) task mappings to persist. Validated first.
path : str or pathlib.Path
Destination store. If it already exists, its comments + structure are
preserved and only the ``tasks:`` payload is updated; otherwise a
fresh document is written.
Raises
------
TaskValidationError
If ``tasks`` fails structural validation (nothing is written).
Examples
--------
>>> tasks = load_tasks("tasks.yaml") # doctest: +SKIP
>>> tasks[0]["priority"] = 1 # doctest: +SKIP
>>> save_tasks(tasks, "tasks.yaml") # doctest: +SKIP
"""
path = Path(path).expanduser()
# Hold the cross-process advisory lock for the FULL read-modify-write
# cycle, not just the write — otherwise two writers could each load
# the file, mutate independently, and the second `dump` would silently
# clobber the first's mutation. The lock IS the at-most-once gate.
path.parent.mkdir(parents=True, exist_ok=True)
with _store_lock(path):
_save_tasks_unlocked(tasks, path)
def _save_tasks_unlocked(tasks: list[dict], path: Path) -> None:
"""Validate-and-write WITHOUT acquiring the store lock.
Used by callers (the `_store.add_task`/`update_task`/`complete_task`
Python API) that hold `_store_lock` for their whole read-modify-write
cycle. Calling `save_tasks` recursively would deadlock — `flock` on
a fresh fd to the same path blocks until the OUTER context releases.
Direct callers must already hold `_store_lock(path)`.
"""
from ruamel.yaml import YAML
_validate_tasks(tasks, source="<save_tasks>")
yaml_rt = YAML()
yaml_rt.preserve_quotes = True
# Match the bundled store's hand layout (two-space block indent,
# lists indented under their key) so a round-trip is a minimal diff.
yaml_rt.indent(mapping=2, sequence=4, offset=2)
existing_doc = None
if path.exists():
with path.open(encoding="utf-8") as handle:
loaded = yaml_rt.load(handle)
if isinstance(loaded, dict):
existing_doc = loaded
if existing_doc is not None:
# Merge the caller's task data into the round-trip-loaded
# structure by id, so per-item and inline comments attached to
# the original nodes survive. New ids are appended; removed
# ids are dropped.
doc = existing_doc
old_seq = doc.get("tasks") if isinstance(doc.get("tasks"), list) else []
old_by_id = {
t["id"]: t for t in old_seq if isinstance(t, dict) and t.get("id")
}
merged = _merge_tasks_into_seq(tasks, old_by_id)
doc["tasks"] = merged
else:
# No existing store (or a non-mapping top level): write fresh.
doc = {"tasks": tasks}
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8") as handle:
yaml_rt.dump(doc, handle)
def _merge_tasks_into_seq(tasks: list[dict], old_by_id: dict) -> list:
"""Build the new task sequence, reusing comment-bearing old nodes by id.
For each task in ``tasks``: if an old node with the same id exists, mutate
that node (so its attached comments survive) by syncing keys to the new
data; otherwise use the new mapping as-is. Order follows ``tasks``.
"""
merged: list = []
for task in tasks:
old = old_by_id.get(task.get("id"))
if old is None:
merged.append(task)
continue
# Sync the old comment-bearing node's keys to the new values.
for key, value in task.items():
old[key] = value
for stale_key in [k for k in list(old.keys()) if k not in task]:
del old[stale_key]
merged.append(old)
return merged
# EOF