Coverage for src \ truenex_memory \ ingestion \ global_auto_lifecycle.py: 80%
223 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-19 10:21 +0200
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-19 10:21 +0200
1"""Lifecycle controls for generated auto-memory nodes."""
3from __future__ import annotations
5from dataclasses import dataclass, field
6from pathlib import Path
7import sqlite3
8import uuid
10from truenex_memory.core.chunker import content_hash
13AUTO_MEMORY_TOMBSTONE_CONTENT = "[pruned auto memory tombstone]"
14DEFAULT_PRUNE_LIMIT = 100
15CURATED_AUTO_MEMORY_TYPES = frozenset({"note", "decision", "issue", "pattern"})
18@dataclass(frozen=True)
19class AutoMemoryLifecycleItem:
20 """One generated auto-memory row touched or selected by a lifecycle command."""
22 id: str
23 title: str
24 previous_status: str
25 new_status: str | None
26 source_path: str | None
27 content_hash: str | None
28 content_chars: int
29 pruned: bool = False
30 curated_id: str | None = None
32 def to_dict(self) -> dict[str, object]:
33 return {
34 "id": self.id,
35 "title": self.title,
36 "previous_status": self.previous_status,
37 "new_status": self.new_status,
38 "source_path": self.source_path,
39 "content_hash": self.content_hash,
40 "content_chars": self.content_chars,
41 "pruned": self.pruned,
42 "curated_id": self.curated_id,
43 }
46@dataclass
47class AutoMemoryLifecycleReport:
48 """JSON-safe report for approve/reject/prune operations."""
50 action: str
51 db_path: str
52 db_exists: bool
53 dry_run: bool = False
54 requested_id: str | None = None
55 source_filter: str | None = None
56 limit: int | None = None
57 matched: int = 0
58 changed: int = 0
59 items: list[AutoMemoryLifecycleItem] = field(default_factory=list)
60 warnings: list[str] = field(default_factory=list)
62 def to_dict(self) -> dict[str, object]:
63 return {
64 "action": self.action,
65 "db_path": self.db_path,
66 "db_exists": self.db_exists,
67 "dry_run": self.dry_run,
68 "requested_id": self.requested_id,
69 "source_filter": self.source_filter,
70 "limit": self.limit,
71 "matched": self.matched,
72 "changed": self.changed,
73 "items": [item.to_dict() for item in self.items],
74 "warnings": self.warnings,
75 }
78def approve_auto_memory(db_path: Path, memory_id: str) -> AutoMemoryLifecycleReport:
79 """Promote one generated unverified auto-memory node to active."""
80 return _transition_auto_memory(
81 db_path,
82 memory_id=memory_id,
83 action="approve",
84 target_status="active",
85 )
88def reject_auto_memory(db_path: Path, memory_id: str) -> AutoMemoryLifecycleReport:
89 """Reject one generated unverified auto-memory node by marking it obsolete."""
90 return _transition_auto_memory(
91 db_path,
92 memory_id=memory_id,
93 action="reject",
94 target_status="obsolete",
95 )
98def promote_auto_memory(
99 db_path: Path,
100 memory_id: str,
101 *,
102 title: str,
103 content: str,
104 memory_type: str = "note",
105 dry_run: bool = False,
106) -> AutoMemoryLifecycleReport:
107 """Create an active curated memory from one noisy unverified auto memory.
109 This is intentionally stricter than ``approve``. The original generated row
110 is marked obsolete and the curated replacement is inserted in the same
111 transaction, preserving source provenance without promoting raw session
112 noise as-is.
113 """
114 report = AutoMemoryLifecycleReport(
115 action="promote",
116 db_path=str(db_path),
117 db_exists=db_path.exists(),
118 dry_run=dry_run,
119 requested_id=memory_id,
120 )
121 clean_title = " ".join(title.split())
122 clean_content = content.strip()
123 if not clean_title:
124 raise ValueError("title cannot be empty")
125 if not clean_content:
126 raise ValueError("content cannot be empty")
127 if memory_type not in CURATED_AUTO_MEMORY_TYPES:
128 expected = ", ".join(sorted(CURATED_AUTO_MEMORY_TYPES))
129 raise ValueError(f"invalid memory type {memory_type!r}; expected one of {expected}")
130 if not db_path.exists():
131 report.warnings.append("database not found")
132 return report
134 try:
135 conn = _connect_write_existing(db_path)
136 except Exception:
137 report.warnings.append("database exists but cannot be opened read/write")
138 return report
140 try:
141 if not _table_exists(conn, "memory_nodes"):
142 report.warnings.append("memory_nodes table not found")
143 return report
145 row = conn.execute(
146 """
147 SELECT id, project_id, title, status, source_path, source_document_id,
148 source_chunk_id, content_hash, length(content) AS content_chars
149 FROM memory_nodes
150 WHERE id = ? AND project_id = 'default'
151 """,
152 (memory_id,),
153 ).fetchone()
154 if row is None:
155 report.warnings.append("memory node not found")
156 return report
157 report.matched = 1
158 report.items = [_item_from_row(row, new_status=None)]
160 eligible = conn.execute(
161 """
162 SELECT 1
163 FROM memory_nodes
164 WHERE id = ?
165 AND project_id = 'default'
166 AND status = 'unverified'
167 AND source_kind = 'auto'
168 AND created_by = 'auto'
169 """,
170 (memory_id,),
171 ).fetchone()
172 if eligible is None:
173 report.warnings.append("memory node is not an unverified generated auto memory")
174 return report
176 curated_id = f"mem_{uuid.uuid4().hex}"
177 report.items = [
178 _item_from_row(row, new_status="obsolete", curated_id=curated_id)
179 ]
180 curated_hash = content_hash(clean_content)
181 if dry_run:
182 duplicate = conn.execute(
183 """
184 SELECT id
185 FROM memory_nodes
186 WHERE project_id = 'default'
187 AND content_hash = ?
188 AND status = 'active'
189 ORDER BY created_at, id
190 LIMIT 1
191 """,
192 (curated_hash,),
193 ).fetchone()
194 if duplicate is not None:
195 report.items = [_item_from_row(row, new_status=None)]
196 report.warnings.append(
197 f"active memory with same curated content already exists: {duplicate['id']}"
198 )
199 return report
201 try:
202 conn.execute("BEGIN IMMEDIATE")
203 duplicate = conn.execute(
204 """
205 SELECT id
206 FROM memory_nodes
207 WHERE project_id = 'default'
208 AND content_hash = ?
209 AND status = 'active'
210 ORDER BY created_at, id
211 LIMIT 1
212 """,
213 (curated_hash,),
214 ).fetchone()
215 if duplicate is not None:
216 conn.rollback()
217 report.items = [_item_from_row(row, new_status=None)]
218 report.warnings.append(
219 f"active memory with same curated content already exists: {duplicate['id']}"
220 )
221 return report
222 insert_cursor = conn.execute(
223 """
224 INSERT INTO memory_nodes (
225 id, project_id, type, title, content, status, source_kind,
226 source_document_id, source_chunk_id, source_path,
227 content_hash, created_by, model_name, confidence,
228 created_at, updated_at
229 )
230 SELECT
231 ?, project_id, ?, ?, ?, 'active', 'curated_auto',
232 source_document_id, source_chunk_id, source_path,
233 ?, 'curated_auto', model_name, confidence,
234 datetime('now'), datetime('now')
235 FROM memory_nodes
236 WHERE id = ?
237 AND project_id = 'default'
238 AND status = 'unverified'
239 AND source_kind = 'auto'
240 AND created_by = 'auto'
241 """,
242 (
243 curated_id,
244 memory_type,
245 clean_title,
246 clean_content,
247 curated_hash,
248 memory_id,
249 ),
250 )
251 update_cursor = conn.execute(
252 """
253 UPDATE memory_nodes
254 SET status = 'obsolete', updated_at = datetime('now')
255 WHERE id = ?
256 AND project_id = 'default'
257 AND status = 'unverified'
258 AND source_kind = 'auto'
259 AND created_by = 'auto'
260 """,
261 (memory_id,),
262 )
263 if insert_cursor.rowcount != 1 or update_cursor.rowcount != 1:
264 raise sqlite3.DatabaseError("promote transaction did not touch expected rows")
265 conn.commit()
266 except sqlite3.DatabaseError:
267 conn.rollback()
268 raise
269 report.changed = 2
270 except sqlite3.DatabaseError:
271 report.warnings.append("database readable but auto lifecycle query failed")
272 finally:
273 conn.close()
275 return report
278def prune_auto_memories(
279 db_path: Path,
280 *,
281 source_filter: str | None = None,
282 limit: int = DEFAULT_PRUNE_LIMIT,
283 dry_run: bool = True,
284) -> AutoMemoryLifecycleReport:
285 """Compact rejected generated auto memories into tombstone rows.
287 This intentionally does not hard-delete rows. Keeping the content hash gives
288 the generator a local tombstone so rejected content is not recreated later.
289 """
290 if limit < 1:
291 raise ValueError("limit must be greater than zero")
293 report = AutoMemoryLifecycleReport(
294 action="prune",
295 db_path=str(db_path),
296 db_exists=db_path.exists(),
297 dry_run=dry_run,
298 source_filter=source_filter,
299 limit=limit,
300 )
301 if not db_path.exists():
302 report.warnings.append("database not found")
303 return report
305 try:
306 conn = _connect_write_existing(db_path)
307 except Exception:
308 report.warnings.append("database exists but cannot be opened read/write")
309 return report
311 try:
312 if not _table_exists(conn, "memory_nodes"):
313 report.warnings.append("memory_nodes table not found")
314 return report
316 where_sql, params = _prune_where_clause(source_filter)
317 rows = conn.execute(
318 f"""
319 SELECT id, title, status, source_path, content_hash, length(content) AS content_chars
320 FROM memory_nodes
321 {where_sql}
322 ORDER BY updated_at, created_at, id
323 LIMIT ?
324 """,
325 [*params, limit],
326 ).fetchall()
327 report.matched = len(rows)
328 if dry_run or not rows:
329 report.items = [_item_from_row(row, new_status="obsolete") for row in rows]
330 return report
332 ids = [str(row["id"]) for row in rows]
333 placeholders = ", ".join("?" for _ in ids)
334 cursor = conn.execute(
335 f"""
336 UPDATE memory_nodes
337 SET content = ?,
338 updated_at = datetime('now')
339 WHERE id IN ({placeholders})
340 AND project_id = 'default'
341 AND status = 'obsolete'
342 AND source_kind = 'auto'
343 AND created_by = 'auto'
344 AND content_hash IS NOT NULL
345 AND content != ?
346 """,
347 [AUTO_MEMORY_TOMBSTONE_CONTENT, *ids, AUTO_MEMORY_TOMBSTONE_CONTENT],
348 )
349 conn.commit()
350 report.changed = int(cursor.rowcount)
351 if report.changed != len(ids):
352 report.warnings.append("some rows changed before prune completed")
353 all_rows_pruned = report.changed == len(rows)
354 report.items = [
355 _item_from_row(row, new_status="obsolete", pruned=all_rows_pruned)
356 for row in rows
357 ]
358 except sqlite3.DatabaseError:
359 report.warnings.append("database readable but auto lifecycle query failed")
360 finally:
361 conn.close()
363 return report
366def format_auto_memory_lifecycle_report(report: AutoMemoryLifecycleReport) -> str:
367 """Format lifecycle operation reports for CLI output."""
368 title = f"Auto Memory {report.action.title()}"
369 lines: list[str] = [title, "=" * 60]
370 lines.append(f"Database: {report.db_path}")
371 if not report.db_exists:
372 lines.append(" (not found)")
373 if report.requested_id:
374 lines.append(f"Memory id: {report.requested_id}")
375 if report.source_filter:
376 lines.append(f"Source filter: {report.source_filter}")
377 if report.limit is not None:
378 lines.append(f"Limit: {report.limit}")
379 if report.dry_run:
380 lines.append("Mode: dry-run")
381 lines.append(f"Matched: {report.matched}")
382 lines.append(f"Changed: {report.changed}")
384 if report.items:
385 lines.append("")
386 lines.append("Items:")
387 for item in report.items[:20]:
388 status = (
389 item.previous_status
390 if item.new_status is None
391 else f"{item.previous_status} -> {item.new_status}"
392 )
393 suffix = " pruned" if item.pruned else ""
394 lines.append(f" {item.id} [{status}]{suffix}")
395 if item.curated_id:
396 lines.append(f" curated: {item.curated_id}")
397 lines.append(f" title: {item.title}")
398 lines.append(f" source: {item.source_path or '(no source path)'}")
399 lines.append(f" content chars before: {item.content_chars}")
401 if report.warnings:
402 lines.append("")
403 lines.append("Warnings:")
404 for warning in report.warnings:
405 lines.append(f" - {warning}")
407 return "\n".join(lines)
410def _transition_auto_memory(
411 db_path: Path,
412 *,
413 memory_id: str,
414 action: str,
415 target_status: str,
416) -> AutoMemoryLifecycleReport:
417 report = AutoMemoryLifecycleReport(
418 action=action,
419 db_path=str(db_path),
420 db_exists=db_path.exists(),
421 requested_id=memory_id,
422 )
423 if not db_path.exists():
424 report.warnings.append("database not found")
425 return report
427 try:
428 conn = _connect_write_existing(db_path)
429 except Exception:
430 report.warnings.append("database exists but cannot be opened read/write")
431 return report
433 try:
434 if not _table_exists(conn, "memory_nodes"):
435 report.warnings.append("memory_nodes table not found")
436 return report
438 row = conn.execute(
439 """
440 SELECT id, title, status, source_path, content_hash, length(content) AS content_chars
441 FROM memory_nodes
442 WHERE id = ? AND project_id = 'default'
443 """,
444 (memory_id,),
445 ).fetchone()
446 if row is None:
447 report.warnings.append("memory node not found")
448 return report
449 report.matched = 1
451 cursor = conn.execute(
452 """
453 UPDATE memory_nodes
454 SET status = ?, updated_at = datetime('now')
455 WHERE id = ?
456 AND project_id = 'default'
457 AND status = 'unverified'
458 AND source_kind = 'auto'
459 AND created_by = 'auto'
460 """,
461 (target_status, memory_id),
462 )
463 conn.commit()
464 report.changed = int(cursor.rowcount)
465 if report.changed == 0:
466 report.warnings.append("memory node is not an unverified generated auto memory")
467 report.items = [
468 _item_from_row(
469 row,
470 new_status=target_status if report.changed == 1 else None,
471 )
472 ]
473 except sqlite3.DatabaseError:
474 report.warnings.append("database readable but auto lifecycle query failed")
475 finally:
476 conn.close()
478 return report
481def _item_from_row(
482 row: sqlite3.Row,
483 *,
484 new_status: str | None,
485 pruned: bool = False,
486 curated_id: str | None = None,
487) -> AutoMemoryLifecycleItem:
488 return AutoMemoryLifecycleItem(
489 id=str(row["id"]),
490 title=str(row["title"]),
491 previous_status=str(row["status"]),
492 new_status=new_status,
493 source_path=str(row["source_path"]) if row["source_path"] is not None else None,
494 content_hash=str(row["content_hash"]) if row["content_hash"] is not None else None,
495 content_chars=int(row["content_chars"] or 0),
496 pruned=pruned,
497 curated_id=curated_id,
498 )
501def _prune_where_clause(source_filter: str | None) -> tuple[str, list[object]]:
502 where = [
503 "project_id = 'default'",
504 "status = 'obsolete'",
505 "source_kind = 'auto'",
506 "created_by = 'auto'",
507 "content_hash IS NOT NULL",
508 "content != ?",
509 ]
510 params: list[object] = [AUTO_MEMORY_TOMBSTONE_CONTENT]
511 if source_filter:
512 where.append("lower(coalesce(source_path, '')) LIKE ? ESCAPE '\\'")
513 params.append(_like_contains(source_filter))
514 return "WHERE " + " AND ".join(where), params
517def _like_contains(value: str) -> str:
518 escaped = (
519 value.lower()
520 .replace("\\", "\\\\")
521 .replace("%", "\\%")
522 .replace("_", "\\_")
523 )
524 return f"%{escaped}%"
527def _connect_write_existing(db_path: Path) -> sqlite3.Connection:
528 uri_path = db_path.resolve().as_posix()
529 conn = sqlite3.connect(f"file:{uri_path}?mode=rw", uri=True)
530 conn.row_factory = sqlite3.Row
531 conn.execute("PRAGMA foreign_keys = ON")
532 return conn
535def _table_exists(conn: sqlite3.Connection, table_name: str) -> bool:
536 row = conn.execute(
537 "SELECT name FROM sqlite_master WHERE type='table' AND name = ?",
538 (table_name,),
539 ).fetchone()
540 return row is not None