Coverage for src \ truenex_memory \ ingestion \ global_auto_lifecycle.py: 80%

223 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-19 10:21 +0200

1"""Lifecycle controls for generated auto-memory nodes.""" 

2 

3from __future__ import annotations 

4 

5from dataclasses import dataclass, field 

6from pathlib import Path 

7import sqlite3 

8import uuid 

9 

10from truenex_memory.core.chunker import content_hash 

11 

12 

13AUTO_MEMORY_TOMBSTONE_CONTENT = "[pruned auto memory tombstone]" 

14DEFAULT_PRUNE_LIMIT = 100 

15CURATED_AUTO_MEMORY_TYPES = frozenset({"note", "decision", "issue", "pattern"}) 

16 

17 

18@dataclass(frozen=True) 

19class AutoMemoryLifecycleItem: 

20 """One generated auto-memory row touched or selected by a lifecycle command.""" 

21 

22 id: str 

23 title: str 

24 previous_status: str 

25 new_status: str | None 

26 source_path: str | None 

27 content_hash: str | None 

28 content_chars: int 

29 pruned: bool = False 

30 curated_id: str | None = None 

31 

32 def to_dict(self) -> dict[str, object]: 

33 return { 

34 "id": self.id, 

35 "title": self.title, 

36 "previous_status": self.previous_status, 

37 "new_status": self.new_status, 

38 "source_path": self.source_path, 

39 "content_hash": self.content_hash, 

40 "content_chars": self.content_chars, 

41 "pruned": self.pruned, 

42 "curated_id": self.curated_id, 

43 } 

44 

45 

46@dataclass 

47class AutoMemoryLifecycleReport: 

48 """JSON-safe report for approve/reject/prune operations.""" 

49 

50 action: str 

51 db_path: str 

52 db_exists: bool 

53 dry_run: bool = False 

54 requested_id: str | None = None 

55 source_filter: str | None = None 

56 limit: int | None = None 

57 matched: int = 0 

58 changed: int = 0 

59 items: list[AutoMemoryLifecycleItem] = field(default_factory=list) 

60 warnings: list[str] = field(default_factory=list) 

61 

62 def to_dict(self) -> dict[str, object]: 

63 return { 

64 "action": self.action, 

65 "db_path": self.db_path, 

66 "db_exists": self.db_exists, 

67 "dry_run": self.dry_run, 

68 "requested_id": self.requested_id, 

69 "source_filter": self.source_filter, 

70 "limit": self.limit, 

71 "matched": self.matched, 

72 "changed": self.changed, 

73 "items": [item.to_dict() for item in self.items], 

74 "warnings": self.warnings, 

75 } 

76 

77 

78def approve_auto_memory(db_path: Path, memory_id: str) -> AutoMemoryLifecycleReport: 

79 """Promote one generated unverified auto-memory node to active.""" 

80 return _transition_auto_memory( 

81 db_path, 

82 memory_id=memory_id, 

83 action="approve", 

84 target_status="active", 

85 ) 

86 

87 

88def reject_auto_memory(db_path: Path, memory_id: str) -> AutoMemoryLifecycleReport: 

89 """Reject one generated unverified auto-memory node by marking it obsolete.""" 

90 return _transition_auto_memory( 

91 db_path, 

92 memory_id=memory_id, 

93 action="reject", 

94 target_status="obsolete", 

95 ) 

96 

97 

98def promote_auto_memory( 

99 db_path: Path, 

100 memory_id: str, 

101 *, 

102 title: str, 

103 content: str, 

104 memory_type: str = "note", 

105 dry_run: bool = False, 

106) -> AutoMemoryLifecycleReport: 

107 """Create an active curated memory from one noisy unverified auto memory. 

108 

109 This is intentionally stricter than ``approve``. The original generated row 

110 is marked obsolete and the curated replacement is inserted in the same 

111 transaction, preserving source provenance without promoting raw session 

112 noise as-is. 

113 """ 

114 report = AutoMemoryLifecycleReport( 

115 action="promote", 

116 db_path=str(db_path), 

117 db_exists=db_path.exists(), 

118 dry_run=dry_run, 

119 requested_id=memory_id, 

120 ) 

121 clean_title = " ".join(title.split()) 

122 clean_content = content.strip() 

123 if not clean_title: 

124 raise ValueError("title cannot be empty") 

125 if not clean_content: 

126 raise ValueError("content cannot be empty") 

127 if memory_type not in CURATED_AUTO_MEMORY_TYPES: 

128 expected = ", ".join(sorted(CURATED_AUTO_MEMORY_TYPES)) 

129 raise ValueError(f"invalid memory type {memory_type!r}; expected one of {expected}") 

130 if not db_path.exists(): 

131 report.warnings.append("database not found") 

132 return report 

133 

134 try: 

135 conn = _connect_write_existing(db_path) 

136 except Exception: 

137 report.warnings.append("database exists but cannot be opened read/write") 

138 return report 

139 

140 try: 

141 if not _table_exists(conn, "memory_nodes"): 

142 report.warnings.append("memory_nodes table not found") 

143 return report 

144 

145 row = conn.execute( 

146 """ 

147 SELECT id, project_id, title, status, source_path, source_document_id, 

148 source_chunk_id, content_hash, length(content) AS content_chars 

149 FROM memory_nodes 

150 WHERE id = ? AND project_id = 'default' 

151 """, 

152 (memory_id,), 

153 ).fetchone() 

154 if row is None: 

155 report.warnings.append("memory node not found") 

156 return report 

157 report.matched = 1 

158 report.items = [_item_from_row(row, new_status=None)] 

159 

160 eligible = conn.execute( 

161 """ 

162 SELECT 1 

163 FROM memory_nodes 

164 WHERE id = ? 

165 AND project_id = 'default' 

166 AND status = 'unverified' 

167 AND source_kind = 'auto' 

168 AND created_by = 'auto' 

169 """, 

170 (memory_id,), 

171 ).fetchone() 

172 if eligible is None: 

173 report.warnings.append("memory node is not an unverified generated auto memory") 

174 return report 

175 

176 curated_id = f"mem_{uuid.uuid4().hex}" 

177 report.items = [ 

178 _item_from_row(row, new_status="obsolete", curated_id=curated_id) 

179 ] 

180 curated_hash = content_hash(clean_content) 

181 if dry_run: 

182 duplicate = conn.execute( 

183 """ 

184 SELECT id 

185 FROM memory_nodes 

186 WHERE project_id = 'default' 

187 AND content_hash = ? 

188 AND status = 'active' 

189 ORDER BY created_at, id 

190 LIMIT 1 

191 """, 

192 (curated_hash,), 

193 ).fetchone() 

194 if duplicate is not None: 

195 report.items = [_item_from_row(row, new_status=None)] 

196 report.warnings.append( 

197 f"active memory with same curated content already exists: {duplicate['id']}" 

198 ) 

199 return report 

200 

201 try: 

202 conn.execute("BEGIN IMMEDIATE") 

203 duplicate = conn.execute( 

204 """ 

205 SELECT id 

206 FROM memory_nodes 

207 WHERE project_id = 'default' 

208 AND content_hash = ? 

209 AND status = 'active' 

210 ORDER BY created_at, id 

211 LIMIT 1 

212 """, 

213 (curated_hash,), 

214 ).fetchone() 

215 if duplicate is not None: 

216 conn.rollback() 

217 report.items = [_item_from_row(row, new_status=None)] 

218 report.warnings.append( 

219 f"active memory with same curated content already exists: {duplicate['id']}" 

220 ) 

221 return report 

222 insert_cursor = conn.execute( 

223 """ 

224 INSERT INTO memory_nodes ( 

225 id, project_id, type, title, content, status, source_kind, 

226 source_document_id, source_chunk_id, source_path, 

227 content_hash, created_by, model_name, confidence, 

228 created_at, updated_at 

229 ) 

230 SELECT 

231 ?, project_id, ?, ?, ?, 'active', 'curated_auto', 

232 source_document_id, source_chunk_id, source_path, 

233 ?, 'curated_auto', model_name, confidence, 

234 datetime('now'), datetime('now') 

235 FROM memory_nodes 

236 WHERE id = ? 

237 AND project_id = 'default' 

238 AND status = 'unverified' 

239 AND source_kind = 'auto' 

240 AND created_by = 'auto' 

241 """, 

242 ( 

243 curated_id, 

244 memory_type, 

245 clean_title, 

246 clean_content, 

247 curated_hash, 

248 memory_id, 

249 ), 

250 ) 

251 update_cursor = conn.execute( 

252 """ 

253 UPDATE memory_nodes 

254 SET status = 'obsolete', updated_at = datetime('now') 

255 WHERE id = ? 

256 AND project_id = 'default' 

257 AND status = 'unverified' 

258 AND source_kind = 'auto' 

259 AND created_by = 'auto' 

260 """, 

261 (memory_id,), 

262 ) 

263 if insert_cursor.rowcount != 1 or update_cursor.rowcount != 1: 

264 raise sqlite3.DatabaseError("promote transaction did not touch expected rows") 

265 conn.commit() 

266 except sqlite3.DatabaseError: 

267 conn.rollback() 

268 raise 

269 report.changed = 2 

270 except sqlite3.DatabaseError: 

271 report.warnings.append("database readable but auto lifecycle query failed") 

272 finally: 

273 conn.close() 

274 

275 return report 

276 

277 

278def prune_auto_memories( 

279 db_path: Path, 

280 *, 

281 source_filter: str | None = None, 

282 limit: int = DEFAULT_PRUNE_LIMIT, 

283 dry_run: bool = True, 

284) -> AutoMemoryLifecycleReport: 

285 """Compact rejected generated auto memories into tombstone rows. 

286 

287 This intentionally does not hard-delete rows. Keeping the content hash gives 

288 the generator a local tombstone so rejected content is not recreated later. 

289 """ 

290 if limit < 1: 

291 raise ValueError("limit must be greater than zero") 

292 

293 report = AutoMemoryLifecycleReport( 

294 action="prune", 

295 db_path=str(db_path), 

296 db_exists=db_path.exists(), 

297 dry_run=dry_run, 

298 source_filter=source_filter, 

299 limit=limit, 

300 ) 

301 if not db_path.exists(): 

302 report.warnings.append("database not found") 

303 return report 

304 

305 try: 

306 conn = _connect_write_existing(db_path) 

307 except Exception: 

308 report.warnings.append("database exists but cannot be opened read/write") 

309 return report 

310 

311 try: 

312 if not _table_exists(conn, "memory_nodes"): 

313 report.warnings.append("memory_nodes table not found") 

314 return report 

315 

316 where_sql, params = _prune_where_clause(source_filter) 

317 rows = conn.execute( 

318 f""" 

319 SELECT id, title, status, source_path, content_hash, length(content) AS content_chars 

320 FROM memory_nodes 

321 {where_sql} 

322 ORDER BY updated_at, created_at, id 

323 LIMIT ? 

324 """, 

325 [*params, limit], 

326 ).fetchall() 

327 report.matched = len(rows) 

328 if dry_run or not rows: 

329 report.items = [_item_from_row(row, new_status="obsolete") for row in rows] 

330 return report 

331 

332 ids = [str(row["id"]) for row in rows] 

333 placeholders = ", ".join("?" for _ in ids) 

334 cursor = conn.execute( 

335 f""" 

336 UPDATE memory_nodes 

337 SET content = ?, 

338 updated_at = datetime('now') 

339 WHERE id IN ({placeholders}) 

340 AND project_id = 'default' 

341 AND status = 'obsolete' 

342 AND source_kind = 'auto' 

343 AND created_by = 'auto' 

344 AND content_hash IS NOT NULL 

345 AND content != ? 

346 """, 

347 [AUTO_MEMORY_TOMBSTONE_CONTENT, *ids, AUTO_MEMORY_TOMBSTONE_CONTENT], 

348 ) 

349 conn.commit() 

350 report.changed = int(cursor.rowcount) 

351 if report.changed != len(ids): 

352 report.warnings.append("some rows changed before prune completed") 

353 all_rows_pruned = report.changed == len(rows) 

354 report.items = [ 

355 _item_from_row(row, new_status="obsolete", pruned=all_rows_pruned) 

356 for row in rows 

357 ] 

358 except sqlite3.DatabaseError: 

359 report.warnings.append("database readable but auto lifecycle query failed") 

360 finally: 

361 conn.close() 

362 

363 return report 

364 

365 

366def format_auto_memory_lifecycle_report(report: AutoMemoryLifecycleReport) -> str: 

367 """Format lifecycle operation reports for CLI output.""" 

368 title = f"Auto Memory {report.action.title()}" 

369 lines: list[str] = [title, "=" * 60] 

370 lines.append(f"Database: {report.db_path}") 

371 if not report.db_exists: 

372 lines.append(" (not found)") 

373 if report.requested_id: 

374 lines.append(f"Memory id: {report.requested_id}") 

375 if report.source_filter: 

376 lines.append(f"Source filter: {report.source_filter}") 

377 if report.limit is not None: 

378 lines.append(f"Limit: {report.limit}") 

379 if report.dry_run: 

380 lines.append("Mode: dry-run") 

381 lines.append(f"Matched: {report.matched}") 

382 lines.append(f"Changed: {report.changed}") 

383 

384 if report.items: 

385 lines.append("") 

386 lines.append("Items:") 

387 for item in report.items[:20]: 

388 status = ( 

389 item.previous_status 

390 if item.new_status is None 

391 else f"{item.previous_status} -> {item.new_status}" 

392 ) 

393 suffix = " pruned" if item.pruned else "" 

394 lines.append(f" {item.id} [{status}]{suffix}") 

395 if item.curated_id: 

396 lines.append(f" curated: {item.curated_id}") 

397 lines.append(f" title: {item.title}") 

398 lines.append(f" source: {item.source_path or '(no source path)'}") 

399 lines.append(f" content chars before: {item.content_chars}") 

400 

401 if report.warnings: 

402 lines.append("") 

403 lines.append("Warnings:") 

404 for warning in report.warnings: 

405 lines.append(f" - {warning}") 

406 

407 return "\n".join(lines) 

408 

409 

410def _transition_auto_memory( 

411 db_path: Path, 

412 *, 

413 memory_id: str, 

414 action: str, 

415 target_status: str, 

416) -> AutoMemoryLifecycleReport: 

417 report = AutoMemoryLifecycleReport( 

418 action=action, 

419 db_path=str(db_path), 

420 db_exists=db_path.exists(), 

421 requested_id=memory_id, 

422 ) 

423 if not db_path.exists(): 

424 report.warnings.append("database not found") 

425 return report 

426 

427 try: 

428 conn = _connect_write_existing(db_path) 

429 except Exception: 

430 report.warnings.append("database exists but cannot be opened read/write") 

431 return report 

432 

433 try: 

434 if not _table_exists(conn, "memory_nodes"): 

435 report.warnings.append("memory_nodes table not found") 

436 return report 

437 

438 row = conn.execute( 

439 """ 

440 SELECT id, title, status, source_path, content_hash, length(content) AS content_chars 

441 FROM memory_nodes 

442 WHERE id = ? AND project_id = 'default' 

443 """, 

444 (memory_id,), 

445 ).fetchone() 

446 if row is None: 

447 report.warnings.append("memory node not found") 

448 return report 

449 report.matched = 1 

450 

451 cursor = conn.execute( 

452 """ 

453 UPDATE memory_nodes 

454 SET status = ?, updated_at = datetime('now') 

455 WHERE id = ? 

456 AND project_id = 'default' 

457 AND status = 'unverified' 

458 AND source_kind = 'auto' 

459 AND created_by = 'auto' 

460 """, 

461 (target_status, memory_id), 

462 ) 

463 conn.commit() 

464 report.changed = int(cursor.rowcount) 

465 if report.changed == 0: 

466 report.warnings.append("memory node is not an unverified generated auto memory") 

467 report.items = [ 

468 _item_from_row( 

469 row, 

470 new_status=target_status if report.changed == 1 else None, 

471 ) 

472 ] 

473 except sqlite3.DatabaseError: 

474 report.warnings.append("database readable but auto lifecycle query failed") 

475 finally: 

476 conn.close() 

477 

478 return report 

479 

480 

481def _item_from_row( 

482 row: sqlite3.Row, 

483 *, 

484 new_status: str | None, 

485 pruned: bool = False, 

486 curated_id: str | None = None, 

487) -> AutoMemoryLifecycleItem: 

488 return AutoMemoryLifecycleItem( 

489 id=str(row["id"]), 

490 title=str(row["title"]), 

491 previous_status=str(row["status"]), 

492 new_status=new_status, 

493 source_path=str(row["source_path"]) if row["source_path"] is not None else None, 

494 content_hash=str(row["content_hash"]) if row["content_hash"] is not None else None, 

495 content_chars=int(row["content_chars"] or 0), 

496 pruned=pruned, 

497 curated_id=curated_id, 

498 ) 

499 

500 

501def _prune_where_clause(source_filter: str | None) -> tuple[str, list[object]]: 

502 where = [ 

503 "project_id = 'default'", 

504 "status = 'obsolete'", 

505 "source_kind = 'auto'", 

506 "created_by = 'auto'", 

507 "content_hash IS NOT NULL", 

508 "content != ?", 

509 ] 

510 params: list[object] = [AUTO_MEMORY_TOMBSTONE_CONTENT] 

511 if source_filter: 

512 where.append("lower(coalesce(source_path, '')) LIKE ? ESCAPE '\\'") 

513 params.append(_like_contains(source_filter)) 

514 return "WHERE " + " AND ".join(where), params 

515 

516 

517def _like_contains(value: str) -> str: 

518 escaped = ( 

519 value.lower() 

520 .replace("\\", "\\\\") 

521 .replace("%", "\\%") 

522 .replace("_", "\\_") 

523 ) 

524 return f"%{escaped}%" 

525 

526 

527def _connect_write_existing(db_path: Path) -> sqlite3.Connection: 

528 uri_path = db_path.resolve().as_posix() 

529 conn = sqlite3.connect(f"file:{uri_path}?mode=rw", uri=True) 

530 conn.row_factory = sqlite3.Row 

531 conn.execute("PRAGMA foreign_keys = ON") 

532 return conn 

533 

534 

535def _table_exists(conn: sqlite3.Connection, table_name: str) -> bool: 

536 row = conn.execute( 

537 "SELECT name FROM sqlite_master WHERE type='table' AND name = ?", 

538 (table_name,), 

539 ).fetchone() 

540 return row is not None