Coverage for src \ truenex_memory \ ingestion \ global_auto_memory.py: 91%

237 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-19 10:21 +0200

1"""Conservative unverified auto-memory generation for Phase 3.4.""" 

2 

3from __future__ import annotations 

4 

5from dataclasses import dataclass 

6from pathlib import Path 

7import json 

8import re 

9import sqlite3 

10 

11from truenex_memory.core.chunker import content_hash, estimate_tokens 

12from truenex_memory.ingestion.global_refresh import RefreshReport 

13from truenex_memory.store.repository import MemoryRepository 

14 

15 

16PROJECT_DOCS_CONFIDENCE = 0.80 

17AGENT_SESSION_CONFIDENCE = 0.60 

18COMPACTION_CONFIDENCE = 0.75 

19DEFAULT_CONFIDENCE = 0.50 

20MIN_CANDIDATE_TOKENS = 8 

21DEFAULT_AUTO_MEMORY_LIMIT = 300 

22DEFAULT_AUTO_MEMORY_PER_SOURCE_LIMIT = 8 

23AUTO_MEMORY_EXTENSIONS = frozenset({".md", ".markdown", ".txt", ".rst"}) 

24 

25# Regex to detect compaction flag in TRUENEX_INGESTION_METADATA preamble without 

26# full JSON parse — "is_compaction": true can appear anywhere in the JSON object. 

27_RE_IS_COMPACTION = re.compile(r'"is_compaction"\s*:\s*true') 

28_RE_METADATA_LINE = re.compile(r'^TRUENEX_INGESTION_METADATA\s+(\{.*\})', re.MULTILINE) 

29_RE_JSON_DUMP = re.compile(r'^\s*\{.*"type"\s*:', re.DOTALL) 

30_RE_AGENT_TRANSCRIPT_LINE = re.compile(r'(^|\n)\s*\[(?:user|assistant)\]:', re.IGNORECASE) 

31_RE_ALL_USER_MESSAGES = re.compile(r'(^|\n)\s*\d+\.\s+all user messages\s*:', re.IGNORECASE) 

32_RE_NUMBERED_COMMAND_LINE = re.compile( 

33 r'^\s*\d+\.\s*' 

34 r'(?:build|check|copy|deploy|execute|find|lancia|mostra|open|restore|run|' 

35 r'show|start|stop|trova|verifica|verify)\b' 

36 r'.*`[^`]*(?:cargo|cp|docker|find|git|grep|ls|npm|powershell|ssh)[^`]*`', 

37 re.IGNORECASE, 

38) 

39 

40 

41@dataclass(frozen=True) 

42class AutoMemoryCandidate: 

43 """A source-grounded candidate for an unverified memory node.""" 

44 

45 content: str 

46 title: str 

47 source_path: str 

48 source_document_id: str 

49 source_chunk_id: str 

50 source_type: str | None 

51 confidence: float 

52 is_compaction: bool 

53 

54 

55@dataclass(frozen=True) 

56class AutoMemoryTelemetry: 

57 """Read-only candidate quality counters for Auto Memory.""" 

58 

59 candidates: int = 0 

60 duplicate_skips: int = 0 

61 duplicate_active: int = 0 

62 duplicate_unverified: int = 0 

63 duplicate_rejected: int = 0 

64 low_confidence: int = 0 

65 non_document_skipped: int = 0 

66 noisy_session_skipped: int = 0 

67 

68 def to_dict(self) -> dict[str, int]: 

69 return { 

70 "candidates": self.candidates, 

71 "duplicate_skips": self.duplicate_skips, 

72 "duplicate_active": self.duplicate_active, 

73 "duplicate_unverified": self.duplicate_unverified, 

74 "duplicate_rejected": self.duplicate_rejected, 

75 "low_confidence": self.low_confidence, 

76 "non_document_skipped": self.non_document_skipped, 

77 "noisy_session_skipped": self.noisy_session_skipped, 

78 } 

79 

80 

81def generate_unverified_auto_memories( 

82 db_path: Path, 

83 report: RefreshReport, 

84 *, 

85 dry_run: bool, 

86 min_confidence: float = DEFAULT_CONFIDENCE, 

87 limit: int = DEFAULT_AUTO_MEMORY_LIMIT, 

88 per_source_limit: int = DEFAULT_AUTO_MEMORY_PER_SOURCE_LIMIT, 

89) -> None: 

90 """Generate exact-deduped unverified memory nodes from active chunks.""" 

91 if not db_path.exists(): 

92 return 

93 

94 repository = MemoryRepository(db_path) 

95 created_or_planned = 0 

96 created_or_planned_by_source: dict[str, int] = {} 

97 blocked_hashes = _blocked_auto_memory_content_hashes_by_reason(db_path) 

98 for candidate in _iter_candidates(db_path): 

99 # agent_session chunks are always valid candidates regardless of extension. 

100 if candidate.source_type != "agent_session": 

101 if Path(candidate.source_path).suffix.lower() not in AUTO_MEMORY_EXTENSIONS: 

102 report.auto_memory_non_document_skipped += 1 

103 continue 

104 elif _is_noisy_agent_session_candidate(candidate.content): 

105 report.auto_memory_noisy_session_skipped += 1 

106 continue 

107 report.auto_memory_candidates += 1 

108 candidate_hash = content_hash(candidate.content) 

109 duplicate_reason = blocked_hashes.get(candidate_hash) 

110 if duplicate_reason: 

111 report.auto_memory_duplicates += 1 

112 _count_duplicate_reason(report, duplicate_reason) 

113 continue 

114 if candidate.confidence < min_confidence: 

115 report.auto_memory_low_confidence += 1 

116 continue 

117 source_count = created_or_planned_by_source.get(candidate.source_path, 0) 

118 if per_source_limit > 0 and source_count >= per_source_limit: 

119 report.auto_memory_source_limit_skipped += 1 

120 continue 

121 if limit > 0 and created_or_planned >= limit: 

122 report.auto_memory_limit_skipped += 1 

123 continue 

124 created_or_planned += 1 

125 created_or_planned_by_source[candidate.source_path] = source_count + 1 

126 report.auto_memory_created += 1 

127 if dry_run: 

128 continue 

129 repository.add_memory( 

130 candidate.content, 

131 memory_type="note", 

132 title=candidate.title, 

133 status="unverified", 

134 source_kind="auto", 

135 source_document_id=candidate.source_document_id, 

136 source_chunk_id=candidate.source_chunk_id, 

137 source_path=candidate.source_path, 

138 created_by="auto", 

139 confidence=candidate.confidence, 

140 ) 

141 blocked_hashes[candidate_hash] = "unverified" 

142 

143 

144def analyze_auto_memory_candidates( 

145 db_path: Path, 

146 *, 

147 min_confidence: float = DEFAULT_CONFIDENCE, 

148) -> AutoMemoryTelemetry: 

149 """Return read-only Auto Memory candidate quality counters.""" 

150 if not db_path.exists(): 

151 return AutoMemoryTelemetry() 

152 

153 counts = { 

154 "candidates": 0, 

155 "duplicate_skips": 0, 

156 "duplicate_active": 0, 

157 "duplicate_unverified": 0, 

158 "duplicate_rejected": 0, 

159 "low_confidence": 0, 

160 "non_document_skipped": 0, 

161 "noisy_session_skipped": 0, 

162 } 

163 try: 

164 blocked_hashes = _blocked_auto_memory_content_hashes_by_reason(db_path) 

165 candidates = _iter_candidates(db_path) 

166 except sqlite3.DatabaseError: 

167 return AutoMemoryTelemetry() 

168 

169 for candidate in candidates: 

170 if candidate.source_type != "agent_session": 

171 if Path(candidate.source_path).suffix.lower() not in AUTO_MEMORY_EXTENSIONS: 

172 counts["non_document_skipped"] += 1 

173 continue 

174 elif _is_noisy_agent_session_candidate(candidate.content): 

175 counts["noisy_session_skipped"] += 1 

176 continue 

177 

178 counts["candidates"] += 1 

179 candidate_hash = content_hash(candidate.content) 

180 duplicate_reason = blocked_hashes.get(candidate_hash) 

181 if duplicate_reason: 

182 counts["duplicate_skips"] += 1 

183 counts[f"duplicate_{duplicate_reason}"] += 1 

184 continue 

185 if candidate.confidence < min_confidence: 

186 counts["low_confidence"] += 1 

187 

188 return AutoMemoryTelemetry(**counts) 

189 

190 

191def _iter_candidates(db_path: Path) -> list[AutoMemoryCandidate]: 

192 with _connect_readonly(db_path) as conn: 

193 rows = conn.execute( 

194 """ 

195 SELECT 

196 c.id AS chunk_id, 

197 c.document_id, 

198 c.heading_path, 

199 c.content, 

200 d.path AS source_path, 

201 sl.source_type 

202 FROM chunks c 

203 JOIN documents d ON d.id = c.document_id 

204 JOIN source_ledger sl ON sl.source_path_or_alias = d.path 

205 WHERE sl.status = 'active' 

206 ORDER BY d.path, c.chunk_index 

207 """ 

208 ).fetchall() 

209 

210 candidates: list[AutoMemoryCandidate] = [] 

211 for row in rows: 

212 raw_content = str(row["content"]) 

213 is_compaction = bool(_RE_IS_COMPACTION.search(raw_content)) 

214 text = _candidate_content(raw_content) 

215 if _is_raw_json_dump(text): 

216 continue 

217 if estimate_tokens(text) < MIN_CANDIDATE_TOKENS: 

218 continue 

219 source_type = row["source_type"] 

220 source_path = str(row["source_path"]) 

221 if source_type == "agent_session": 

222 title = _agent_session_title(source_path, raw_content) 

223 confidence = COMPACTION_CONFIDENCE if is_compaction else AGENT_SESSION_CONFIDENCE 

224 else: 

225 title = str(row["heading_path"] or Path(source_path).name) 

226 confidence = _confidence_for_source_type(source_type) 

227 is_compaction = False 

228 candidates.append( 

229 AutoMemoryCandidate( 

230 content=text, 

231 title=title, 

232 source_path=source_path, 

233 source_document_id=str(row["document_id"]), 

234 source_chunk_id=str(row["chunk_id"]), 

235 source_type=str(source_type) if source_type is not None else None, 

236 confidence=confidence, 

237 is_compaction=is_compaction, 

238 ) 

239 ) 

240 

241 # Prioritise compaction records first, then longest exchange text, so that 

242 # per_source_limit slots are filled with the most informative chunks. 

243 candidates.sort(key=_sort_key_for_candidate) 

244 return candidates 

245 

246 

247def _sort_key_for_candidate(c: AutoMemoryCandidate) -> tuple[int, int, int]: 

248 """Compaction first, then agent_session before static docs, then descending token count.""" 

249 source_priority = 0 if c.source_type == "agent_session" else 1 

250 return (0 if c.is_compaction else 1, source_priority, -estimate_tokens(c.content)) 

251 

252 

253def _agent_session_title(source_path: str, chunk_content: str) -> str: 

254 """Build a human-readable title for an agent-session memory candidate.""" 

255 meta_match = _RE_METADATA_LINE.search(chunk_content) 

256 session_id: str | None = None 

257 created_at: str | None = None 

258 is_compaction = False 

259 exchange_index: int | None = None 

260 

261 if meta_match: 

262 try: 

263 meta = json.loads(meta_match.group(1)) 

264 session_id = meta.get("session_id") 

265 created_at = meta.get("created_at") or meta.get("last_modified") 

266 is_compaction = bool(meta.get("is_compaction")) 

267 exchange_index = meta.get("exchange_index") 

268 except (json.JSONDecodeError, AttributeError): 

269 pass 

270 

271 # Date portion: prefer ISO timestamp truncated to date, fall back to filename. 

272 date_str: str = "" 

273 if created_at: 

274 date_str = str(created_at)[:10] 

275 elif session_id: 

276 # session_id often encodes a timestamp: take first 10 chars if digit-like 

277 candidate_date = re.search(r'\d{4}-\d{2}-\d{2}', str(session_id)) 

278 date_str = candidate_date.group(0) if candidate_date else "" 

279 

280 if not date_str: 

281 stem = Path(source_path.split("::")[0]).stem 

282 date_match = re.search(r'\d{4}-\d{2}-\d{2}', stem) 

283 date_str = date_match.group(0) if date_match else stem 

284 

285 if is_compaction: 

286 return f"Session Summary: {date_str}" if date_str else "Session Summary" 

287 

288 # For normal exchanges use first 60 chars of user-visible text as suffix. 

289 text_after_meta = _candidate_content(chunk_content) 

290 snippet = text_after_meta[:60].replace("\n", " ").strip() 

291 n = f"#{exchange_index}" if exchange_index is not None else "" 

292 prefix = f"Session Exchange {n}: " if n else "Session Exchange: " 

293 return f"{prefix}{snippet}" if snippet else f"Session Exchange {n} ({date_str})" 

294 

295 

296def _connect_readonly(db_path: Path) -> sqlite3.Connection: 

297 uri_path = db_path.resolve().as_posix() 

298 conn = sqlite3.connect(f"file:{uri_path}?mode=ro", uri=True) 

299 conn.row_factory = sqlite3.Row 

300 return conn 

301 

302 

303def _blocked_auto_memory_content_hashes_by_reason(db_path: Path) -> dict[str, str]: 

304 """Return content hashes that suppress auto memories, grouped by reason.""" 

305 with _connect_readonly(db_path) as conn: 

306 rows = conn.execute( 

307 """ 

308 SELECT content_hash, status, source_kind, created_by 

309 FROM memory_nodes 

310 WHERE project_id = 'default' 

311 AND ( 

312 status IN ('active', 'unverified') 

313 OR (status = 'obsolete' AND source_kind = 'auto' AND created_by = 'auto') 

314 ) 

315 ORDER BY created_at, id 

316 """ 

317 ).fetchall() 

318 blocked: dict[str, str] = {} 

319 for row in rows: 

320 row_hash = row["content_hash"] 

321 if not row_hash: 

322 continue 

323 reason = _duplicate_reason_for_row(row) 

324 existing = blocked.get(str(row_hash)) 

325 if ( 

326 existing is None 

327 or _duplicate_reason_priority(reason) < _duplicate_reason_priority(existing) 

328 ): 

329 blocked[str(row_hash)] = reason 

330 return blocked 

331 

332 

333def _duplicate_reason_for_row(row: sqlite3.Row) -> str: 

334 status = row["status"] 

335 if status == "active": 

336 return "active" 

337 if status == "unverified": 

338 return "unverified" 

339 return "rejected" 

340 

341 

342def _duplicate_reason_priority(reason: str) -> int: 

343 return {"active": 0, "unverified": 1, "rejected": 2}.get(reason, 99) 

344 

345 

346def _count_duplicate_reason(report: RefreshReport, reason: str) -> None: 

347 if reason == "active": 

348 report.auto_memory_duplicate_active += 1 

349 elif reason == "unverified": 

350 report.auto_memory_duplicate_unverified += 1 

351 elif reason == "rejected": 

352 report.auto_memory_duplicate_rejected += 1 

353 

354 

355def _candidate_content(chunk_content: str) -> str: 

356 """Strip ingestion metadata preamble from the first indexed chunk.""" 

357 text = chunk_content.strip() 

358 if text.startswith("TRUENEX_INGESTION_METADATA "): 

359 parts = re.split(r"\n\s*\n", text, maxsplit=1) 

360 if len(parts) == 2: 

361 return parts[1].strip() 

362 return "" 

363 return text 

364 

365 

366def _is_raw_json_dump(text: str) -> bool: 

367 return bool(_RE_JSON_DUMP.match(text[:300])) 

368 

369 

370def _is_noisy_agent_session_candidate(text: str) -> bool: 

371 """Return True for transcript fragments that should stay indexed, not promoted. 

372 

373 Agent sessions are useful as source-grounded chunks, but generated memory 

374 nodes should capture distilled facts. Raw turn text, resume wrappers, 

375 message inventories, and command-only snippets create noisy global memory. 

376 """ 

377 clean = text.strip() 

378 lowered = clean.lower() 

379 if "continue the conversation from where it left off" in lowered: 

380 return True 

381 if _RE_ALL_USER_MESSAGES.search(clean): 

382 return True 

383 if _RE_AGENT_TRANSCRIPT_LINE.search(clean): 

384 return True 

385 if _looks_like_command_snippet(clean): 

386 return True 

387 return False 

388 

389 

390def _looks_like_command_snippet(text: str) -> bool: 

391 numbered_lines = [ 

392 line.strip() 

393 for line in text.splitlines() 

394 if re.match(r'^\s*\d+\.', line) 

395 ] 

396 if len(numbered_lines) < 2: 

397 return False 

398 command_lines = [ 

399 line for line in numbered_lines if _RE_NUMBERED_COMMAND_LINE.match(line) 

400 ] 

401 return len(command_lines) >= 2 and (len(command_lines) / len(numbered_lines)) >= 0.66 

402 

403 

404def _confidence_for_source_type(source_type: object) -> float: 

405 if source_type == "project_docs": 

406 return PROJECT_DOCS_CONFIDENCE 

407 if source_type == "agent_session": 

408 return AGENT_SESSION_CONFIDENCE 

409 return DEFAULT_CONFIDENCE