Coverage for src \ truenex_memory \ ingestion \ global_auto_review.py: 94%
126 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-19 10:21 +0200
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-19 10:21 +0200
1"""Read-only review report for generated unverified auto memories."""
3from __future__ import annotations
5from dataclasses import dataclass, field
6from pathlib import Path
7import sqlite3
10DEFAULT_REVIEW_LIMIT = 20
11DEFAULT_CONTENT_CHARS = 240
14@dataclass(frozen=True)
15class AutoMemoryReviewItem:
16 """One generated memory node prepared for user review."""
18 id: str
19 type: str
20 title: str
21 content: str
22 content_excerpt: str
23 status: str
24 source_kind: str
25 source_path: str | None
26 source_document_id: str | None
27 source_chunk_id: str | None
28 confidence: float | None
29 created_at: str
30 updated_at: str
32 def to_dict(self) -> dict[str, object]:
33 return {
34 "id": self.id,
35 "type": self.type,
36 "title": self.title,
37 "content": self.content,
38 "content_excerpt": self.content_excerpt,
39 "status": self.status,
40 "source_kind": self.source_kind,
41 "source_path": self.source_path,
42 "source_document_id": self.source_document_id,
43 "source_chunk_id": self.source_chunk_id,
44 "confidence": self.confidence,
45 "created_at": self.created_at,
46 "updated_at": self.updated_at,
47 }
50@dataclass(frozen=True)
51class AutoMemorySourceSummary:
52 """Count of generated auto memories for one source path."""
54 source_path: str | None
55 count: int
57 def to_dict(self) -> dict[str, object]:
58 return {
59 "source_path": self.source_path,
60 "count": self.count,
61 }
64@dataclass
65class AutoMemoryReviewReport:
66 """Read-only list of generated unverified auto memory nodes."""
68 db_path: str
69 db_exists: bool
70 total: int = 0
71 returned: int = 0
72 limit: int = DEFAULT_REVIEW_LIMIT
73 source_filter: str | None = None
74 content_chars: int = DEFAULT_CONTENT_CHARS
75 items: list[AutoMemoryReviewItem] = field(default_factory=list)
76 by_source_path: list[AutoMemorySourceSummary] = field(default_factory=list)
77 warnings: list[str] = field(default_factory=list)
79 def to_dict(self) -> dict[str, object]:
80 return {
81 "db_path": self.db_path,
82 "db_exists": self.db_exists,
83 "total": self.total,
84 "returned": self.returned,
85 "limit": self.limit,
86 "source_filter": self.source_filter,
87 "content_chars": self.content_chars,
88 "items": [item.to_dict() for item in self.items],
89 "by_source_path": [item.to_dict() for item in self.by_source_path],
90 "warnings": self.warnings,
91 }
94def build_auto_memory_review(
95 db_path: Path,
96 *,
97 limit: int = DEFAULT_REVIEW_LIMIT,
98 source_filter: str | None = None,
99 content_chars: int = DEFAULT_CONTENT_CHARS,
100) -> AutoMemoryReviewReport:
101 """Build a read-only report for generated unverified memory nodes."""
102 if limit < 1:
103 raise ValueError("limit must be greater than zero")
104 if content_chars < 40:
105 raise ValueError("content_chars must be at least 40")
107 report = AutoMemoryReviewReport(
108 db_path=str(db_path),
109 db_exists=db_path.exists(),
110 limit=limit,
111 source_filter=source_filter,
112 content_chars=content_chars,
113 )
114 if not db_path.exists():
115 report.warnings.append("database not found")
116 return report
118 try:
119 conn = _connect_readonly(db_path)
120 except Exception:
121 report.warnings.append("database exists but cannot be opened read-only")
122 return report
124 try:
125 if not _table_exists(conn, "memory_nodes"):
126 report.warnings.append("memory_nodes table not found")
127 return report
129 where_sql, params = _where_clause(source_filter)
130 total_row = conn.execute(
131 f"SELECT COUNT(*) AS cnt FROM memory_nodes {where_sql}",
132 params,
133 ).fetchone()
134 report.total = int(total_row["cnt"]) if total_row else 0
136 summary_rows = conn.execute(
137 f"""
138 SELECT source_path, COUNT(*) AS cnt
139 FROM memory_nodes
140 {where_sql}
141 GROUP BY source_path
142 ORDER BY cnt DESC, coalesce(source_path, '')
143 """,
144 params,
145 ).fetchall()
146 report.by_source_path = [
147 AutoMemorySourceSummary(
148 source_path=str(row["source_path"]) if row["source_path"] is not None else None,
149 count=int(row["cnt"]),
150 )
151 for row in summary_rows
152 ]
154 rows = conn.execute(
155 f"""
156 SELECT
157 id, type, title, content, status, source_kind, source_path,
158 source_document_id, source_chunk_id, confidence, created_at, updated_at
159 FROM memory_nodes
160 {where_sql}
161 ORDER BY source_path, title, created_at, id
162 LIMIT ?
163 """,
164 [*params, limit],
165 ).fetchall()
166 report.items = [
167 AutoMemoryReviewItem(
168 id=str(row["id"]),
169 type=str(row["type"]),
170 title=str(row["title"]),
171 content=str(row["content"]),
172 content_excerpt=_excerpt(str(row["content"]), content_chars),
173 status=str(row["status"]),
174 source_kind=str(row["source_kind"]),
175 source_path=str(row["source_path"]) if row["source_path"] is not None else None,
176 source_document_id=(
177 str(row["source_document_id"])
178 if row["source_document_id"] is not None else None
179 ),
180 source_chunk_id=(
181 str(row["source_chunk_id"])
182 if row["source_chunk_id"] is not None else None
183 ),
184 confidence=float(row["confidence"]) if row["confidence"] is not None else None,
185 created_at=str(row["created_at"]),
186 updated_at=str(row["updated_at"]),
187 )
188 for row in rows
189 ]
190 report.returned = len(report.items)
191 except sqlite3.DatabaseError:
192 report.warnings.append("database readable but auto review query failed")
193 finally:
194 conn.close()
196 return report
199def format_auto_memory_review(report: AutoMemoryReviewReport) -> str:
200 """Format generated auto memories as concise text for users."""
201 lines: list[str] = ["Auto Memory Review"]
202 lines.append("=" * 60)
203 lines.append(f"Database: {report.db_path}")
204 if not report.db_exists:
205 lines.append(" (not found)")
206 if report.source_filter:
207 lines.append(f"Source filter: {report.source_filter}")
208 lines.append(f"Total unverified auto memories: {report.total}")
209 lines.append(f"Returned: {report.returned} / limit {report.limit}")
211 if report.by_source_path:
212 lines.append("")
213 lines.append("Sources:")
214 for item in report.by_source_path[:10]:
215 lines.append(f" {item.count} {item.source_path or '(no source path)'}")
217 if report.items:
218 lines.append("")
219 lines.append("Items:")
220 for index, item in enumerate(report.items, start=1):
221 confidence = "n/a" if item.confidence is None else f"{item.confidence:.2f}"
222 lines.append(f"{index}. {item.id} [{item.status}/{item.type}] confidence={confidence}")
223 lines.append(f" title: {item.title}")
224 lines.append(f" source: {item.source_path or '(no source path)'}")
225 lines.append(f" content: {item.content_excerpt}")
227 if report.warnings:
228 lines.append("")
229 lines.append("Warnings:")
230 for warning in report.warnings:
231 lines.append(f" - {warning}")
233 return "\n".join(lines)
236def _where_clause(source_filter: str | None) -> tuple[str, list[object]]:
237 where = [
238 "status = 'unverified'",
239 "source_kind = 'auto'",
240 "created_by = 'auto'",
241 ]
242 params: list[object] = []
243 if source_filter:
244 where.append("lower(coalesce(source_path, '')) LIKE ? ESCAPE '\\'")
245 params.append(_like_contains(source_filter))
246 return "WHERE " + " AND ".join(where), params
249def _like_contains(value: str) -> str:
250 escaped = (
251 value.lower()
252 .replace("\\", "\\\\")
253 .replace("%", "\\%")
254 .replace("_", "\\_")
255 )
256 return f"%{escaped}%"
259def _connect_readonly(db_path: Path) -> sqlite3.Connection:
260 uri_path = db_path.resolve().as_posix()
261 conn = sqlite3.connect(f"file:{uri_path}?mode=ro", uri=True)
262 conn.row_factory = sqlite3.Row
263 return conn
266def _table_exists(conn: sqlite3.Connection, table_name: str) -> bool:
267 row = conn.execute(
268 "SELECT name FROM sqlite_master WHERE type='table' AND name = ?",
269 (table_name,),
270 ).fetchone()
271 return row is not None
274def _excerpt(content: str, max_chars: int) -> str:
275 text = " ".join(content.split())
276 if len(text) <= max_chars:
277 return text
278 return text[: max_chars - 3].rstrip() + "..."