Source code for scitex_todo._store

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Mutation-side Python API for the scitex-todo task store.

Sits on top of ``_model.load_tasks`` / ``_model.save_tasks`` to give agents,
the CLI verbs, the MCP tools, and (later) the web GUI write handlers a
single, well-tested surface for:

    add_task        Append a new task to the store.
    update_task     Mutate fields of an existing task by id.
    complete_task   Convenience: set status='done' + stamp completion meta.
    list_tasks      Filter by scope / assignee / status.
    summary         Count tasks per status (and by scope/assignee).

Every mutation runs through :func:`_model.save_tasks`, which holds an
``fcntl.flock``-based mutex on a sibling lock file so two concurrent writers
cannot interleave (PHASE 1 prereq for the cross-host sync substrate — see
``GITIGNORED/ARCHITECTURE.md`` Req 2).

The filter functions are read-only and do NOT lock — they snapshot the
store via :func:`_model.load_tasks` and apply the filter in memory.

Design constraints
------------------
- **Generic** (Req 8): scope/assignee/status are free-form strings. The
  helpers don't know what an "agent" is.
- **Centralized** (Req 3): the default store is whatever
  :func:`_paths.resolve_tasks_path` returns; callers can override with an
  explicit ``store=`` path. The user-scope default
  (``~/.scitex/todo/tasks.yaml``) covers Req 7.
- **Shared with scopes** (Req 1): ``$SCITEX_TODO_SCOPE`` provides the
  default value for ``list_tasks(scope=...)`` when the caller doesn't pass
  one explicitly. Pass ``scope=""`` (empty string) to ignore the env
  default and see everything.
"""

from __future__ import annotations

import datetime as _dt
import getpass
import os
from pathlib import Path

from ._model import (
    VALID_STATUSES,
    TaskValidationError,
    _save_tasks_unlocked,
    _store_lock,
    load_tasks,
    save_tasks,
)
from ._paths import resolve_tasks_path

#: Env var name an agent sets to scope its default `list_tasks` / `summary`
#: view. The CLI's `--scope` flag overrides this; pass `scope=""` in the
#: Python API to see the unfiltered store.
ENV_SCOPE = "SCITEX_TODO_SCOPE"

#: Env var name carrying the agent's identity. Used as the default
#: `completed_by` when :func:`complete_task` doesn't get an explicit `by=`.
ENV_AGENT = "SCITEX_TODO_AGENT"


[docs] class TaskNotFoundError(KeyError): """Raised when an update/complete target id is not in the store."""
# --------------------------------------------------------------------------- # # Internal helpers # # --------------------------------------------------------------------------- # def _resolved_store(store: str | Path | None) -> Path: """Resolve a store path argument through the precedence chain. ``None`` ⇒ apply the full resolution chain (`_paths.resolve_tasks_path`). Explicit path ⇒ used as-is (must exist for reads; will be created for fresh writes by :func:`_model.save_tasks`). """ return resolve_tasks_path(store) if store is None else Path(store).expanduser() def _default_scope(arg: str | None) -> str | None: """Resolve a scope argument, honoring ``$SCITEX_TODO_SCOPE`` as the default. ``None`` (caller didn't pass anything) → env var if set, else ``None`` (no filter). Empty string ``""`` → caller explicitly opted out of filtering. Non-empty string → used as-is. """ if arg is None: env = os.environ.get(ENV_SCOPE) return env if env else None if arg == "": return None return arg def _default_agent(arg: str | None) -> str: """Resolve a `completed_by` argument with the env→login→unknown chain. Per ``GITIGNORED/QUESTIONS.md`` #2 the precedence is ``$SCITEX_TODO_AGENT`` → ``getpass.getuser()`` → ``"unknown"`` (final fallback handles environments where login info isn't available). """ if arg: return arg env = os.environ.get(ENV_AGENT) if env: return env try: return getpass.getuser() except Exception: # pragma: no cover — extremely rare environments return "unknown" def _utc_now_iso() -> str: """ISO-8601 UTC timestamp with second resolution and the ``Z`` suffix. Trims the microseconds (the operator reads these on the board; second resolution is plenty) and uses the canonical ``Z`` suffix rather than ``+00:00`` so the string round-trips losslessly through YAML. """ return ( _dt.datetime.now(_dt.timezone.utc) .replace(microsecond=0) .isoformat() .replace("+00:00", "Z") ) def _match(task: dict, *, scope: str | None, assignee: str | None, status: str | None) -> bool: """Three-way string-equality filter. Any argument that is None is treated as "no constraint".""" if scope is not None and task.get("scope") != scope: return False if assignee is not None and task.get("assignee") != assignee: return False if status is not None and task.get("status") != status: return False return True # --------------------------------------------------------------------------- # # Public API # # --------------------------------------------------------------------------- #
[docs] def add_task( store: str | Path | None = None, *, id: str, title: str, status: str = "pending", scope: str | None = None, assignee: str | None = None, priority: int | None = None, parent: str | None = None, note: str | None = None, depends_on: list[str] | None = None, blocks: list[str] | None = None, repo: str | None = None, ) -> dict: """Append a new task to ``store`` and persist via :func:`save_tasks`. Returns the inserted task mapping (a fresh dict, not the underlying YAML node) for convenient round-trip use by callers — the CLI prints it, the MCP tools serialize it as the JSON result. Raises ------ TaskValidationError On duplicate id or any other structural fault — `save_tasks` re-runs the full validation gate before touching disk. """ resolved = _resolved_store(store) resolved.parent.mkdir(parents=True, exist_ok=True) new: dict = {"id": id, "title": title, "status": status} if scope is not None: new["scope"] = scope if assignee is not None: new["assignee"] = assignee if priority is not None: new["priority"] = priority if parent is not None: new["parent"] = parent if note is not None: new["note"] = note if depends_on is not None: new["depends_on"] = list(depends_on) if blocks is not None: new["blocks"] = list(blocks) if repo is not None: new["repo"] = repo # Lock for the FULL read-modify-write — without this, two concurrent # writers each load a stale snapshot and the second `save_tasks` call # silently clobbers the first writer's insert. See # tests/scitex_todo/test__store.py::test_two_concurrent_writers... with _store_lock(resolved): tasks = load_tasks(resolved) if resolved.exists() else [] tasks.append(new) _save_tasks_unlocked(tasks, resolved) return dict(new)
[docs] def update_task( store: str | Path | None = None, task_id: str | None = None, **fields, ) -> dict: """Update fields of the task with id ``task_id``; return the merged dict. Any keyword argument becomes a field on the task. Passing ``None`` for a field DELETES it (matches the operator's mental model: "clear the scope" = `update_task(..., scope=None)`). To leave a field untouched, just omit it. Raises ------ TaskNotFoundError If no task matches ``task_id``. TaskValidationError If the resulting mutation is structurally invalid. """ if not task_id: raise TypeError("update_task() requires a non-empty task_id") resolved = _resolved_store(store) with _store_lock(resolved): tasks = load_tasks(resolved) for task in tasks: if task.get("id") == task_id: for key, value in fields.items(): if value is None: task.pop(key, None) else: task[key] = value _save_tasks_unlocked(tasks, resolved) return dict(task) raise TaskNotFoundError(f"task id {task_id!r} not found in {resolved}")
[docs] def complete_task( store: str | Path | None = None, task_id: str | None = None, *, by: str | None = None, ) -> dict: """Mark ``task_id`` as ``done`` and stamp ``_log_meta.completed_{at,by}``. Idempotent per ``GITIGNORED/QUESTIONS.md`` #3: re-completing a ``done`` task is a no-op (timestamps stay frozen from the first completion). Pass ``by=`` to override the ``$SCITEX_TODO_AGENT`` → ``$USER`` → ``"unknown"`` precedence chain. Returns the (post-mutation) task mapping. Raises ------ TaskNotFoundError If no task matches ``task_id``. """ if not task_id: raise TypeError("complete_task() requires a non-empty task_id") resolved = _resolved_store(store) with _store_lock(resolved): tasks = load_tasks(resolved) for task in tasks: if task.get("id") == task_id: if task.get("status") == "done": # Idempotent: don't refresh the stamp, just return. return dict(task) task["status"] = "done" log_meta = task.get("_log_meta") if not isinstance(log_meta, dict): log_meta = {} task["_log_meta"] = log_meta log_meta["completed_at"] = _utc_now_iso() log_meta["completed_by"] = _default_agent(by) _save_tasks_unlocked(tasks, resolved) return dict(task) raise TaskNotFoundError(f"task id {task_id!r} not found in {resolved}")
[docs] def list_tasks( store: str | Path | None = None, *, scope: str | None = None, assignee: str | None = None, status: str | None = None, ) -> list[dict]: """Snapshot the store, then filter by scope / assignee / status. Filter semantics: - ``scope=None`` (default): use ``$SCITEX_TODO_SCOPE`` if set, else no filter. ``scope=""`` opts out of the env default explicitly. - ``assignee`` / ``status``: ``None`` = no filter; any string = exact match. (Generic Req 8 — no fuzzy / glob; callers compose.) The returned list contains fresh dicts, safe to mutate without affecting the on-disk store (no save here). """ resolved = _resolved_store(store) tasks = load_tasks(resolved) scope_eff = _default_scope(scope) return [ dict(t) for t in tasks if _match(t, scope=scope_eff, assignee=assignee, status=status) ]
[docs] def summarize_tasks( store: str | Path | None = None, *, scope: str | None = None, assignee: str | None = None, ) -> dict: """Return numeric progress counts grouped by status, scope, assignee. Output shape (always present keys): :: { "store": "/abs/path/to/tasks.yaml", "total": int, "by_status": {<status>: int, ...}, # one key per VALID_STATUSES "by_scope": {<scope|"">: int, ...}, "by_assignee": {<assignee|"">: int, ...}, } Tasks with no scope / assignee bucket under the empty string ``""``. The ``by_status`` map is densified to all :data:`VALID_STATUSES` so consumers (web UI, progress widgets) don't have to special-case zero-count keys. """ resolved = _resolved_store(store) tasks = load_tasks(resolved) scope_eff = _default_scope(scope) by_status: dict[str, int] = {s: 0 for s in VALID_STATUSES} by_scope: dict[str, int] = {} by_assignee: dict[str, int] = {} total = 0 for task in tasks: if not _match(task, scope=scope_eff, assignee=assignee, status=None): continue total += 1 st = task.get("status") if st in by_status: by_status[st] += 1 sc = task.get("scope") or "" by_scope[sc] = by_scope.get(sc, 0) + 1 asg = task.get("assignee") or "" by_assignee[asg] = by_assignee.get(asg, 0) + 1 return { "store": str(resolved), "total": total, "by_status": by_status, "by_scope": by_scope, "by_assignee": by_assignee, }
[docs] def resolve_store(store: str | Path | None = None) -> dict: """Return the resolved task store path and the precedence chain. Mirrors the data the `scitex-todo resolve-store` CLI verb and the `resolve_store` MCP tool emit. Keeping a Python API by the same name as the MCP tool satisfies audit §6 (Convention A: tool_name == api_name). Output shape:: { "resolved": "/abs/path/to/tasks.yaml", "explicit": <the `store` arg you passed, or None>, "env_tasks": <value of $SCITEX_TODO_TASKS, or None>, "user_store": "/abs/path/to/~/.scitex/todo/tasks.yaml", "bundled_example": "/abs/path/to/bundled/example.yaml", "pkg_short": "scitex_todo", "exists": bool, } """ import os from ._paths import ( ENV_TASKS, PKG_SHORT, _user_root, bundled_example, resolve_tasks_path, ) resolved = resolve_tasks_path(store if isinstance(store, (str, type(None))) else str(store)) return { "resolved": str(resolved), "explicit": str(store) if store is not None else None, "env_tasks": os.environ.get(ENV_TASKS), "user_store": str(_user_root() / "tasks.yaml"), "bundled_example": str(bundled_example()), "pkg_short": PKG_SHORT, "exists": Path(resolved).exists(), }
__all__ = [ "ENV_AGENT", "ENV_SCOPE", "TaskNotFoundError", "TaskValidationError", "add_task", "complete_task", "list_tasks", "resolve_store", "summarize_tasks", "update_task", ] # EOF