Coverage for src \ truenex_memory \ store \ task_store.py: 88%
149 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-19 10:21 +0200
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-19 10:21 +0200
1"""CRUD for the adaptive task pipeline tables."""
2from __future__ import annotations
3from dataclasses import dataclass
4from datetime import datetime, timezone
5from pathlib import Path
6import sqlite3
7import uuid
8from truenex_memory.store.sqlite import connect, initialize_schema
10TASK_TYPES = frozenset({"bugfix", "feature", "refactor", "review", "query"})
11BRAIN_JUDGMENTS = frozenset({"ok", "needs_revision", "rejected"})
12TASK_STATUSES = frozenset({"open", "closed", "unrated"})
15def _now() -> str:
16 return datetime.now(timezone.utc).isoformat()
19def _new_id(prefix: str) -> str:
20 return f"{prefix}_{uuid.uuid4().hex}"
23@dataclass(frozen=True)
24class TaskRecord:
25 task_id: str
26 title: str
27 type: str
28 project: str | None
29 agent_session_id: str | None
30 human_outcome: int | None
31 human_comment: str | None
32 total_tokens: int | None
33 total_duration_s: float | None
34 status: str
35 created_at: str
36 closed_at: str | None
39@dataclass(frozen=True)
40class TaskStepRecord:
41 step_id: str
42 task_id: str
43 step_index: int
44 prompt_used: str | None
45 output: str | None
46 brain_judgment: str | None
47 tokens_used: int | None
48 duration_s: float | None
49 model_used: str | None
50 created_at: str
53@dataclass(frozen=True)
54class VerifierRoundRecord:
55 round_id: str
56 task_id: str
57 step_id: str | None
58 suggestion_type: str
59 brain_accepted: bool
60 brain_rationale: str | None
61 created_at: str
64class TaskStore:
65 def __init__(self, db_path: Path) -> None:
66 self.db_path = db_path
68 def _conn(self) -> sqlite3.Connection:
69 conn = connect(self.db_path)
70 initialize_schema(conn)
71 return conn
73 def task_open(self, title: str, task_type: str, *, project: str | None = None, agent_session_id: str | None = None) -> str:
74 if not title.strip():
75 raise ValueError("title cannot be empty")
76 if task_type not in TASK_TYPES:
77 raise ValueError(f"task_type must be one of {sorted(TASK_TYPES)}")
78 task_id = _new_id("task")
79 now = _now()
80 with self._conn() as conn:
81 conn.execute(
82 "INSERT INTO tasks (task_id, title, type, project, agent_session_id, status, created_at) VALUES (?, ?, ?, ?, ?, 'open', ?)",
83 (task_id, title.strip(), task_type, project, agent_session_id, now),
84 )
85 conn.commit()
86 return task_id
88 def task_close(self, task_id: str, *, human_outcome: int | None = None, human_comment: str | None = None) -> TaskRecord:
89 if human_outcome is not None and human_outcome not in (1, 0, -1):
90 raise ValueError("human_outcome must be 1, 0, or -1")
91 status = "closed" if human_outcome is not None else "unrated"
92 now = _now()
93 with self._conn() as conn:
94 row = conn.execute("SELECT SUM(tokens_used), SUM(duration_s) FROM task_steps WHERE task_id = ?", (task_id,)).fetchone()
95 total_tokens = row[0]
96 total_duration_s = row[1]
97 conn.execute(
98 "UPDATE tasks SET status=?, human_outcome=?, human_comment=?, total_tokens=?, total_duration_s=?, closed_at=? WHERE task_id=?",
99 (status, human_outcome, human_comment, total_tokens, total_duration_s, now, task_id),
100 )
101 if conn.execute("SELECT changes()").fetchone()[0] == 0:
102 raise LookupError(f"task not found: {task_id!r}")
103 conn.commit()
104 return self._get_task(conn, task_id)
106 def task_get(self, task_id: str) -> TaskRecord:
107 with self._conn() as conn:
108 return self._get_task(conn, task_id)
110 def task_list(self, *, project: str | None = None, status: str | None = None, limit: int = 20) -> list[TaskRecord]:
111 if status is not None and status not in TASK_STATUSES:
112 raise ValueError(f"status must be one of {sorted(TASK_STATUSES)}")
113 with self._conn() as conn:
114 clauses: list[str] = []
115 args: list[object] = []
116 if project is not None:
117 clauses.append("project = ?")
118 args.append(project)
119 if status is not None:
120 clauses.append("status = ?")
121 args.append(status)
122 where = f"WHERE {' AND '.join(clauses)}" if clauses else ""
123 args.append(limit)
124 rows = conn.execute(f"SELECT * FROM tasks {where} ORDER BY created_at DESC LIMIT ?", args).fetchall()
125 return [_task_from_row(row) for row in rows]
127 def _get_task(self, conn: sqlite3.Connection, task_id: str) -> TaskRecord:
128 row = conn.execute("SELECT * FROM tasks WHERE task_id = ?", (task_id,)).fetchone()
129 if row is None:
130 raise LookupError(f"task not found: {task_id!r}")
131 return _task_from_row(row)
133 def step_add(self, task_id: str, *, prompt_used: str | None = None, output: str | None = None,
134 brain_judgment: str | None = None, tokens_used: int | None = None,
135 duration_s: float | None = None, model_used: str | None = None) -> str:
136 if brain_judgment is not None and brain_judgment not in BRAIN_JUDGMENTS:
137 raise ValueError(f"brain_judgment must be one of {sorted(BRAIN_JUDGMENTS)}")
138 with self._conn() as conn:
139 if conn.execute("SELECT task_id FROM tasks WHERE task_id = ?", (task_id,)).fetchone() is None:
140 raise LookupError(f"task not found: {task_id!r}")
141 idx = conn.execute("SELECT COALESCE(MAX(step_index), -1) + 1 FROM task_steps WHERE task_id = ?", (task_id,)).fetchone()[0]
142 step_id = _new_id("step")
143 conn.execute(
144 "INSERT INTO task_steps (step_id, task_id, step_index, prompt_used, output, brain_judgment, tokens_used, duration_s, model_used, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
145 (step_id, task_id, idx, prompt_used, output, brain_judgment, tokens_used, duration_s, model_used, _now()),
146 )
147 conn.commit()
148 return step_id
150 def step_list(self, task_id: str) -> list[TaskStepRecord]:
151 with self._conn() as conn:
152 rows = conn.execute("SELECT * FROM task_steps WHERE task_id = ? ORDER BY step_index", (task_id,)).fetchall()
153 return [_step_from_row(row) for row in rows]
155 def verifier_add(self, task_id: str, suggestion_type: str, brain_accepted: bool, *,
156 step_id: str | None = None, brain_rationale: str | None = None) -> str:
157 if not suggestion_type.strip():
158 raise ValueError("suggestion_type cannot be empty")
159 round_id = _new_id("vround")
160 with self._conn() as conn:
161 conn.execute(
162 "INSERT INTO verifier_rounds (round_id, task_id, step_id, suggestion_type, brain_accepted, brain_rationale, created_at) VALUES (?, ?, ?, ?, ?, ?, ?)",
163 (round_id, task_id, step_id, suggestion_type.strip(), 1 if brain_accepted else 0, brain_rationale, _now()),
164 )
165 conn.commit()
166 return round_id
168 def calibration(self, *, project: str | None = None) -> dict[str, object]:
169 with self._conn() as conn:
170 task_filter = ""
171 args_vr: list[object] = []
172 if project is not None:
173 task_filter = "JOIN tasks t ON t.task_id = vr.task_id WHERE t.project = ?"
174 args_vr.append(project)
175 rows_vr = conn.execute(
176 f"SELECT suggestion_type, COUNT(*) AS total, SUM(brain_accepted) AS accepted FROM verifier_rounds vr {task_filter} GROUP BY suggestion_type ORDER BY suggestion_type",
177 args_vr,
178 ).fetchall()
179 verifier_rates = [
180 {"suggestion_type": r["suggestion_type"], "total": r["total"], "accepted": r["accepted"],
181 "acceptance_rate": round(r["accepted"] / r["total"], 3) if r["total"] else None}
182 for r in rows_vr
183 ]
184 where_align = "WHERE t.human_outcome IS NOT NULL AND ts.brain_judgment IS NOT NULL"
185 args_al: list[object] = []
186 if project is not None:
187 where_align += " AND t.project = ?"
188 args_al.append(project)
189 rows_al = conn.execute(
190 f"SELECT ts.brain_judgment, t.human_outcome, COUNT(*) AS cnt FROM tasks t JOIN task_steps ts ON ts.task_id = t.task_id {where_align} GROUP BY ts.brain_judgment, t.human_outcome ORDER BY ts.brain_judgment, t.human_outcome",
191 args_al,
192 ).fetchall()
193 alignment = [{"brain_judgment": r["brain_judgment"], "human_outcome": r["human_outcome"], "count": r["cnt"]} for r in rows_al]
194 return {"verifier_acceptance": verifier_rates, "brain_human_alignment": alignment}
197def _task_from_row(row: sqlite3.Row) -> TaskRecord:
198 return TaskRecord(
199 task_id=row["task_id"], title=row["title"], type=row["type"],
200 project=row["project"], agent_session_id=row["agent_session_id"],
201 human_outcome=row["human_outcome"], human_comment=row["human_comment"],
202 total_tokens=row["total_tokens"], total_duration_s=row["total_duration_s"],
203 status=row["status"], created_at=row["created_at"], closed_at=row["closed_at"],
204 )
207def _step_from_row(row: sqlite3.Row) -> TaskStepRecord:
208 return TaskStepRecord(
209 step_id=row["step_id"], task_id=row["task_id"], step_index=row["step_index"],
210 prompt_used=row["prompt_used"], output=row["output"], brain_judgment=row["brain_judgment"],
211 tokens_used=row["tokens_used"], duration_s=row["duration_s"], model_used=row["model_used"],
212 created_at=row["created_at"],
213 )