Coverage for src \ truenex_memory \ store \ task_store.py: 88%

149 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-19 10:21 +0200

1"""CRUD for the adaptive task pipeline tables.""" 

2from __future__ import annotations 

3from dataclasses import dataclass 

4from datetime import datetime, timezone 

5from pathlib import Path 

6import sqlite3 

7import uuid 

8from truenex_memory.store.sqlite import connect, initialize_schema 

9 

10TASK_TYPES = frozenset({"bugfix", "feature", "refactor", "review", "query"}) 

11BRAIN_JUDGMENTS = frozenset({"ok", "needs_revision", "rejected"}) 

12TASK_STATUSES = frozenset({"open", "closed", "unrated"}) 

13 

14 

15def _now() -> str: 

16 return datetime.now(timezone.utc).isoformat() 

17 

18 

19def _new_id(prefix: str) -> str: 

20 return f"{prefix}_{uuid.uuid4().hex}" 

21 

22 

23@dataclass(frozen=True) 

24class TaskRecord: 

25 task_id: str 

26 title: str 

27 type: str 

28 project: str | None 

29 agent_session_id: str | None 

30 human_outcome: int | None 

31 human_comment: str | None 

32 total_tokens: int | None 

33 total_duration_s: float | None 

34 status: str 

35 created_at: str 

36 closed_at: str | None 

37 

38 

39@dataclass(frozen=True) 

40class TaskStepRecord: 

41 step_id: str 

42 task_id: str 

43 step_index: int 

44 prompt_used: str | None 

45 output: str | None 

46 brain_judgment: str | None 

47 tokens_used: int | None 

48 duration_s: float | None 

49 model_used: str | None 

50 created_at: str 

51 

52 

53@dataclass(frozen=True) 

54class VerifierRoundRecord: 

55 round_id: str 

56 task_id: str 

57 step_id: str | None 

58 suggestion_type: str 

59 brain_accepted: bool 

60 brain_rationale: str | None 

61 created_at: str 

62 

63 

64class TaskStore: 

65 def __init__(self, db_path: Path) -> None: 

66 self.db_path = db_path 

67 

68 def _conn(self) -> sqlite3.Connection: 

69 conn = connect(self.db_path) 

70 initialize_schema(conn) 

71 return conn 

72 

73 def task_open(self, title: str, task_type: str, *, project: str | None = None, agent_session_id: str | None = None) -> str: 

74 if not title.strip(): 

75 raise ValueError("title cannot be empty") 

76 if task_type not in TASK_TYPES: 

77 raise ValueError(f"task_type must be one of {sorted(TASK_TYPES)}") 

78 task_id = _new_id("task") 

79 now = _now() 

80 with self._conn() as conn: 

81 conn.execute( 

82 "INSERT INTO tasks (task_id, title, type, project, agent_session_id, status, created_at) VALUES (?, ?, ?, ?, ?, 'open', ?)", 

83 (task_id, title.strip(), task_type, project, agent_session_id, now), 

84 ) 

85 conn.commit() 

86 return task_id 

87 

88 def task_close(self, task_id: str, *, human_outcome: int | None = None, human_comment: str | None = None) -> TaskRecord: 

89 if human_outcome is not None and human_outcome not in (1, 0, -1): 

90 raise ValueError("human_outcome must be 1, 0, or -1") 

91 status = "closed" if human_outcome is not None else "unrated" 

92 now = _now() 

93 with self._conn() as conn: 

94 row = conn.execute("SELECT SUM(tokens_used), SUM(duration_s) FROM task_steps WHERE task_id = ?", (task_id,)).fetchone() 

95 total_tokens = row[0] 

96 total_duration_s = row[1] 

97 conn.execute( 

98 "UPDATE tasks SET status=?, human_outcome=?, human_comment=?, total_tokens=?, total_duration_s=?, closed_at=? WHERE task_id=?", 

99 (status, human_outcome, human_comment, total_tokens, total_duration_s, now, task_id), 

100 ) 

101 if conn.execute("SELECT changes()").fetchone()[0] == 0: 

102 raise LookupError(f"task not found: {task_id!r}") 

103 conn.commit() 

104 return self._get_task(conn, task_id) 

105 

106 def task_get(self, task_id: str) -> TaskRecord: 

107 with self._conn() as conn: 

108 return self._get_task(conn, task_id) 

109 

110 def task_list(self, *, project: str | None = None, status: str | None = None, limit: int = 20) -> list[TaskRecord]: 

111 if status is not None and status not in TASK_STATUSES: 

112 raise ValueError(f"status must be one of {sorted(TASK_STATUSES)}") 

113 with self._conn() as conn: 

114 clauses: list[str] = [] 

115 args: list[object] = [] 

116 if project is not None: 

117 clauses.append("project = ?") 

118 args.append(project) 

119 if status is not None: 

120 clauses.append("status = ?") 

121 args.append(status) 

122 where = f"WHERE {' AND '.join(clauses)}" if clauses else "" 

123 args.append(limit) 

124 rows = conn.execute(f"SELECT * FROM tasks {where} ORDER BY created_at DESC LIMIT ?", args).fetchall() 

125 return [_task_from_row(row) for row in rows] 

126 

127 def _get_task(self, conn: sqlite3.Connection, task_id: str) -> TaskRecord: 

128 row = conn.execute("SELECT * FROM tasks WHERE task_id = ?", (task_id,)).fetchone() 

129 if row is None: 

130 raise LookupError(f"task not found: {task_id!r}") 

131 return _task_from_row(row) 

132 

133 def step_add(self, task_id: str, *, prompt_used: str | None = None, output: str | None = None, 

134 brain_judgment: str | None = None, tokens_used: int | None = None, 

135 duration_s: float | None = None, model_used: str | None = None) -> str: 

136 if brain_judgment is not None and brain_judgment not in BRAIN_JUDGMENTS: 

137 raise ValueError(f"brain_judgment must be one of {sorted(BRAIN_JUDGMENTS)}") 

138 with self._conn() as conn: 

139 if conn.execute("SELECT task_id FROM tasks WHERE task_id = ?", (task_id,)).fetchone() is None: 

140 raise LookupError(f"task not found: {task_id!r}") 

141 idx = conn.execute("SELECT COALESCE(MAX(step_index), -1) + 1 FROM task_steps WHERE task_id = ?", (task_id,)).fetchone()[0] 

142 step_id = _new_id("step") 

143 conn.execute( 

144 "INSERT INTO task_steps (step_id, task_id, step_index, prompt_used, output, brain_judgment, tokens_used, duration_s, model_used, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", 

145 (step_id, task_id, idx, prompt_used, output, brain_judgment, tokens_used, duration_s, model_used, _now()), 

146 ) 

147 conn.commit() 

148 return step_id 

149 

150 def step_list(self, task_id: str) -> list[TaskStepRecord]: 

151 with self._conn() as conn: 

152 rows = conn.execute("SELECT * FROM task_steps WHERE task_id = ? ORDER BY step_index", (task_id,)).fetchall() 

153 return [_step_from_row(row) for row in rows] 

154 

155 def verifier_add(self, task_id: str, suggestion_type: str, brain_accepted: bool, *, 

156 step_id: str | None = None, brain_rationale: str | None = None) -> str: 

157 if not suggestion_type.strip(): 

158 raise ValueError("suggestion_type cannot be empty") 

159 round_id = _new_id("vround") 

160 with self._conn() as conn: 

161 conn.execute( 

162 "INSERT INTO verifier_rounds (round_id, task_id, step_id, suggestion_type, brain_accepted, brain_rationale, created_at) VALUES (?, ?, ?, ?, ?, ?, ?)", 

163 (round_id, task_id, step_id, suggestion_type.strip(), 1 if brain_accepted else 0, brain_rationale, _now()), 

164 ) 

165 conn.commit() 

166 return round_id 

167 

168 def calibration(self, *, project: str | None = None) -> dict[str, object]: 

169 with self._conn() as conn: 

170 task_filter = "" 

171 args_vr: list[object] = [] 

172 if project is not None: 

173 task_filter = "JOIN tasks t ON t.task_id = vr.task_id WHERE t.project = ?" 

174 args_vr.append(project) 

175 rows_vr = conn.execute( 

176 f"SELECT suggestion_type, COUNT(*) AS total, SUM(brain_accepted) AS accepted FROM verifier_rounds vr {task_filter} GROUP BY suggestion_type ORDER BY suggestion_type", 

177 args_vr, 

178 ).fetchall() 

179 verifier_rates = [ 

180 {"suggestion_type": r["suggestion_type"], "total": r["total"], "accepted": r["accepted"], 

181 "acceptance_rate": round(r["accepted"] / r["total"], 3) if r["total"] else None} 

182 for r in rows_vr 

183 ] 

184 where_align = "WHERE t.human_outcome IS NOT NULL AND ts.brain_judgment IS NOT NULL" 

185 args_al: list[object] = [] 

186 if project is not None: 

187 where_align += " AND t.project = ?" 

188 args_al.append(project) 

189 rows_al = conn.execute( 

190 f"SELECT ts.brain_judgment, t.human_outcome, COUNT(*) AS cnt FROM tasks t JOIN task_steps ts ON ts.task_id = t.task_id {where_align} GROUP BY ts.brain_judgment, t.human_outcome ORDER BY ts.brain_judgment, t.human_outcome", 

191 args_al, 

192 ).fetchall() 

193 alignment = [{"brain_judgment": r["brain_judgment"], "human_outcome": r["human_outcome"], "count": r["cnt"]} for r in rows_al] 

194 return {"verifier_acceptance": verifier_rates, "brain_human_alignment": alignment} 

195 

196 

197def _task_from_row(row: sqlite3.Row) -> TaskRecord: 

198 return TaskRecord( 

199 task_id=row["task_id"], title=row["title"], type=row["type"], 

200 project=row["project"], agent_session_id=row["agent_session_id"], 

201 human_outcome=row["human_outcome"], human_comment=row["human_comment"], 

202 total_tokens=row["total_tokens"], total_duration_s=row["total_duration_s"], 

203 status=row["status"], created_at=row["created_at"], closed_at=row["closed_at"], 

204 ) 

205 

206 

207def _step_from_row(row: sqlite3.Row) -> TaskStepRecord: 

208 return TaskStepRecord( 

209 step_id=row["step_id"], task_id=row["task_id"], step_index=row["step_index"], 

210 prompt_used=row["prompt_used"], output=row["output"], brain_judgment=row["brain_judgment"], 

211 tokens_used=row["tokens_used"], duration_s=row["duration_s"], model_used=row["model_used"], 

212 created_at=row["created_at"], 

213 )