Source code for scitex_agent_container.action_store

"""SQLite-backed attempt log for ``PaneAction`` executions.

Every run of a :class:`PaneAction` (see ``action_base.py``) writes
one ``attempts`` row so the fleet accumulates structured evidence
of what was tried and how it ended. The log is **ad-hoc** — humans
and future tooling read it to debug / revise the package; there is
no runtime post-hook that mutates behavior based on the log.

Scope
-----
- One host-level DB: ``~/.scitex/agent-container/actions.db``.
- One row per attempt; ``agent`` is a column so cross-agent
  queries (e.g. "which hosts see the most ``silent`` probes?") are
  trivial.
- Snapshots (``pane_before`` / ``pane_after``) are stored as JSON
  blobs with a ``format`` discriminator so the on-disk schema can
  later accept diff-encoded variants without breaking readers.

Design rules (follows ``event_log.py`` conventions)
--------------------------------------------------
- **Non-agentic.** Pure SQL + ``json``.
- **Fail-closed.** Any write-path exception is swallowed so a
  logging hiccup never takes an agent down.
- **Stdlib only.** ``sqlite3`` + ``json`` + ``pathlib``.
- **Injectable.** ``root`` override everywhere so tests use a
  temp directory.
"""

from __future__ import annotations

import json
import logging
import os
import sqlite3
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Iterable

logger = logging.getLogger(__name__)

DEFAULT_ROOT = (
    Path(os.environ.get("XDG_DATA_HOME") or Path.home() / ".scitex") / "agent-container"
)
DEFAULT_DB_FILENAME = "actions.db"
DEFAULT_RETENTION_DAYS = int(os.environ.get("SCITEX_AGENT_ACTION_RETENTION_DAYS", "30"))
# Cap snapshot text at 4 KB per side. Bigger captures rarely carry
# more signal and the DB grows fast otherwise.
DEFAULT_SNAPSHOT_MAX_CHARS = int(
    os.environ.get("SCITEX_AGENT_ACTION_SNAPSHOT_MAX_CHARS", "4096")
)

# Canonical outcome strings. Declared here so tests and callers can
# import them without dragging in the base-class module.
OUTCOMES = (
    "success",
    "precondition_fail",
    "send_error",
    "completion_timeout",
    "skipped_by_policy",
)

_SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS attempts (
    id           INTEGER PRIMARY KEY AUTOINCREMENT,
    ts           TEXT    NOT NULL,
    agent        TEXT    NOT NULL,
    action       TEXT    NOT NULL,
    outcome      TEXT    NOT NULL,
    elapsed_s    REAL    NOT NULL,
    pane_before  TEXT,
    pane_after   TEXT,
    extras       TEXT
);
CREATE INDEX IF NOT EXISTS idx_attempts_ts ON attempts(ts);
CREATE INDEX IF NOT EXISTS idx_attempts_agent_action ON attempts(agent, action);
"""


[docs] def _safe_float(value: Any) -> float | None: """Coerce to float, returning None for None / unparseable.""" if value is None: return None try: return float(value) except (TypeError, ValueError): # stx-allow: fallback (reason: type coercion or format mismatch) return None
def _db_path(root: Path | None = None) -> Path: base = Path(root) if root is not None else DEFAULT_ROOT base.mkdir(parents=True, exist_ok=True) return base / DEFAULT_DB_FILENAME
[docs] def _get_conn(db_path: Path) -> sqlite3.Connection: """Open a connection with WAL mode and the schema ensured. WAL keeps concurrent reads safe while a single writer inserts. """ conn = sqlite3.connect(str(db_path), isolation_level=None) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA synchronous=NORMAL") conn.executescript(_SCHEMA_SQL) return conn
[docs] def _truncate_snapshot( snap: Any, max_chars: int = DEFAULT_SNAPSHOT_MAX_CHARS ) -> dict[str, Any] | None: """Coerce a snapshot into the ``{format, text}`` wrapper. Accepts: - None -> returns None - dict with 'format' key -> passes through after text truncation - plain str -> wraps as {"format": "full", "text": s} - other dict -> returns {"format": "full", "text": json-dump} """ if snap is None: return None if isinstance(snap, dict) and "format" in snap: out = dict(snap) text = out.get("text", "") if isinstance(text, str) and len(text) > max_chars: out["text"] = text[-max_chars:] out["truncated"] = True return out if isinstance(snap, str): s = snap[-max_chars:] if len(snap) > max_chars else snap return { "format": "full", "text": s, **({"truncated": True} if len(snap) > max_chars else {}), } # Arbitrary serializable object — dump once then truncate. try: dumped = json.dumps(snap, default=str) except (TypeError, ValueError): # stx-allow: fallback (reason: type coercion or format mismatch) dumped = str(snap) s = dumped[-max_chars:] if len(dumped) > max_chars else dumped return {"format": "json-dump", "text": s}
[docs] def append_attempt( record: dict[str, Any], *, root: Path | None = None, ) -> None: """Insert one attempt row. Never raises. Required record keys: * ``agent`` (str) * ``action`` (str) — the action name * ``outcome`` (one of :data:`OUTCOMES`) * ``elapsed_s`` (float) Optional: * ``ts`` — ISO-8601 UTC; auto-populated if missing. * ``pane_before`` / ``pane_after`` — snapshot dicts / strings. * ``extras`` — action-specific fields (dict). """ try: agent = str(record["agent"]) action = str(record["action"]) outcome = str(record["outcome"]) elapsed_s = float(record["elapsed_s"]) except (KeyError, TypeError, ValueError) as exc: # stx-allow: fallback (reason: type coercion or format mismatch) logger.warning("action_store.append_attempt: invalid record: %s", exc) return ts = str(record.get("ts") or datetime.now(timezone.utc).isoformat()) pane_before = _truncate_snapshot(record.get("pane_before")) pane_after = _truncate_snapshot(record.get("pane_after")) extras = record.get("extras") or {} # stx-allow: fallback (reason: DB insert can fail due to disk full, file lock, or corrupt schema; swallowing keeps agent alive per fail-closed design) try: conn = _get_conn(_db_path(root)) try: conn.execute( "INSERT INTO attempts " "(ts, agent, action, outcome, elapsed_s, pane_before, pane_after, extras) " "VALUES (?, ?, ?, ?, ?, ?, ?, ?)", ( ts, agent, action, outcome, elapsed_s, json.dumps(pane_before) if pane_before is not None else None, json.dumps(pane_after) if pane_after is not None else None, json.dumps(extras), ), ) finally: conn.close() except Exception as exc: # stx-allow: fallback (reason: catch-all safety net — see inline comment for context) logger.warning("action_store.append_attempt: insert failed: %s", exc)
def _row_to_dict(row: sqlite3.Row) -> dict[str, Any]: out = dict(row) for key in ("pane_before", "pane_after", "extras"): raw = out.get(key) if raw is None: continue try: out[key] = json.loads(raw) except (TypeError, ValueError): # stx-allow: fallback (reason: type coercion or format mismatch) # Leave the raw string in place; the JSON column is # corrupt but the rest of the row is still useful. pass return out
[docs] def _parse_since(since: str | datetime | None) -> str | None: """Accept either a datetime, an ISO string, or a human-readable relative string like ``'24h'`` / ``'7d'`` / ``'30m'`` and return an ISO UTC string suitable for a ``ts >= ?`` comparison.""" if since is None: return None if isinstance(since, datetime): return since.astimezone(timezone.utc).isoformat() if isinstance(since, str): s = since.strip().lower() if not s: return None # Relative form: <number><unit>. if s[-1] in ("s", "m", "h", "d") and s[:-1].replace(".", "", 1).isdigit(): n = float(s[:-1]) unit = s[-1] seconds = { "s": n, "m": n * 60, "h": n * 3600, "d": n * 86400, }[unit] t = datetime.now(timezone.utc) - timedelta(seconds=seconds) return t.isoformat() # Otherwise treat as ISO. return since raise TypeError(f"Unsupported 'since' type: {type(since).__name__}")
[docs] def query( *, agent: str | None = None, action: str | None = None, outcome: str | None = None, since: str | datetime | None = None, limit: int = 50, offset: int = 0, root: Path | None = None, ) -> list[dict[str, Any]]: """Return matching attempts, newest first.""" conditions: list[str] = [] params: list[Any] = [] if agent: conditions.append("agent = ?") params.append(agent) if action: conditions.append("action = ?") params.append(action) if outcome: conditions.append("outcome = ?") params.append(outcome) since_iso = _parse_since(since) if since_iso: conditions.append("ts >= ?") params.append(since_iso) where = (" WHERE " + " AND ".join(conditions)) if conditions else "" sql = ( "SELECT id, ts, agent, action, outcome, elapsed_s, " "pane_before, pane_after, extras FROM attempts" + where + " ORDER BY ts DESC, id DESC LIMIT ? OFFSET ?" ) params.extend([int(limit), int(offset)]) # stx-allow: fallback (reason: DB read can fail due to missing file, lock contention, or I/O error; empty list is a safe no-results response) try: conn = _get_conn(_db_path(root)) try: rows = conn.execute(sql, tuple(params)).fetchall() finally: conn.close() except Exception as exc: # stx-allow: fallback (reason: catch-all safety net — see inline comment for context) logger.warning("action_store.query failed: %s", exc) return [] return [_row_to_dict(r) for r in rows]
[docs] def stats( *, agent: str | None = None, since: str | datetime | None = None, root: Path | None = None, ) -> list[dict[str, Any]]: """Per-(action, outcome) counts + mean/p95 elapsed. Returns a list of rows:: [{"action": "nonce-probe", "outcome": "success", "count": 42, "mean_elapsed_s": 3.1, "p95_elapsed_s": 5.9}, ...] p95 is computed in Python (sqlite has no native percentile); sample sets are bounded by query limits so it stays cheap. """ conditions: list[str] = [] params: list[Any] = [] if agent: conditions.append("agent = ?") params.append(agent) since_iso = _parse_since(since) if since_iso: conditions.append("ts >= ?") params.append(since_iso) where = (" WHERE " + " AND ".join(conditions)) if conditions else "" sql_counts = ( "SELECT action, outcome, COUNT(*) AS count, AVG(elapsed_s) AS mean " "FROM attempts" + where + " GROUP BY action, outcome " "ORDER BY action ASC, count DESC" ) sql_samples = "SELECT action, outcome, elapsed_s FROM attempts" + where # stx-allow: fallback (reason: DB read can fail due to missing file, lock contention, or I/O error; empty list means no stats available, which is acceptable for a best-effort summary) try: conn = _get_conn(_db_path(root)) try: groups = conn.execute(sql_counts, tuple(params)).fetchall() samples = conn.execute(sql_samples, tuple(params)).fetchall() finally: conn.close() except Exception as exc: # stx-allow: fallback (reason: catch-all safety net — see inline comment for context) logger.warning("action_store.stats failed: %s", exc) return [] # Bucket samples by (action, outcome) for p95. buckets: dict[tuple[str, str], list[float]] = {} for s in samples: key = (s["action"], s["outcome"]) buckets.setdefault(key, []).append(float(s["elapsed_s"])) out: list[dict[str, Any]] = [] for g in groups: samples_sorted = sorted(buckets.get((g["action"], g["outcome"]), [])) p95 = None if samples_sorted: idx = max(0, int(0.95 * (len(samples_sorted) - 1))) p95 = float(samples_sorted[idx]) out.append( { "action": g["action"], "outcome": g["outcome"], "count": int(g["count"]), "mean_elapsed_s": float(g["mean"]) if g["mean"] is not None else None, "p95_elapsed_s": p95, } ) return out
[docs] def summarize( agent: str, *, limit: int = 100, root: Path | None = None, ) -> dict[str, Any]: """Compact summary for the status payload / heartbeat. Keys:: { "last_action_at": ISO ts or "" "last_action_name": action name or "" (renamed from "last_action" to avoid collision with the pre-existing orochi liveness timestamp field) "last_action_outcome": outcome string or "" "last_action_elapsed_s": float or None "counts": {"<action>:<outcome>": n} "p95_elapsed_s_by_action": {"<action>": p95 float} } """ rows = query(agent=agent, limit=limit, root=root) if not rows: return { "last_action_at": "", "last_action_name": "", "last_action_outcome": "", "last_action_elapsed_s": None, "counts": {}, "p95_elapsed_s_by_action": {}, } last = rows[0] # newest first from query() counts: dict[str, int] = {} samples_by_action: dict[str, list[float]] = {} for r in rows: key = f"{r['action']}:{r['outcome']}" counts[key] = counts.get(key, 0) + 1 try: samples_by_action.setdefault(r["action"], []).append(float(r["elapsed_s"])) except (TypeError, ValueError): # stx-allow: fallback (reason: type coercion or format mismatch) pass p95: dict[str, float] = {} for name, samples in samples_by_action.items(): if not samples: continue ordered = sorted(samples) idx = max(0, int(0.95 * (len(ordered) - 1))) p95[name] = float(ordered[idx]) return { "last_action_at": str(last.get("ts") or ""), "last_action_name": str(last.get("action") or ""), "last_action_outcome": str(last.get("outcome") or ""), "last_action_elapsed_s": _safe_float(last.get("elapsed_s")), "counts": counts, "p95_elapsed_s_by_action": p95, }
[docs] def purge_old( *, days: int | None = None, root: Path | None = None, ) -> int: """Delete rows older than ``days`` (default from env). Returns rows deleted. Safe to call periodically from a cron / daemon.""" d = int(days) if days is not None else DEFAULT_RETENTION_DAYS cutoff = (datetime.now(timezone.utc) - timedelta(days=d)).isoformat() # stx-allow: fallback (reason: DB delete can fail due to disk error or lock; returning 0 is safe since purge is maintenance-only and a missed run leaves stale rows that will be retried next call) try: conn = _get_conn(_db_path(root)) try: cur = conn.execute("DELETE FROM attempts WHERE ts < ?", (cutoff,)) deleted = cur.rowcount conn.execute("VACUUM") return int(deleted) finally: conn.close() except Exception as exc: # stx-allow: fallback (reason: catch-all safety net — see inline comment for context) logger.warning("action_store.purge_old failed: %s", exc) return 0
[docs] def _all_rows(root: Path | None = None) -> Iterable[dict[str, Any]]: """Iterator over every row — used by tests / export tooling.""" # stx-allow: fallback (reason: DB read can fail due to missing file or I/O error; empty iterable is safe since _all_rows is used by tests/export tooling where no results is a valid outcome) try: conn = _get_conn(_db_path(root)) try: rows = conn.execute( "SELECT id, ts, agent, action, outcome, elapsed_s, " "pane_before, pane_after, extras " "FROM attempts ORDER BY ts ASC, id ASC" ).fetchall() finally: conn.close() except Exception as exc: # stx-allow: fallback (reason: catch-all safety net — see inline comment for context) logger.warning("action_store._all_rows failed: %s", exc) return [] return [_row_to_dict(r) for r in rows]