Coverage for src \ truenex_memory \ ingestion \ global_auto_memory.py: 91%
237 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-19 10:21 +0200
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-19 10:21 +0200
1"""Conservative unverified auto-memory generation for Phase 3.4."""
3from __future__ import annotations
5from dataclasses import dataclass
6from pathlib import Path
7import json
8import re
9import sqlite3
11from truenex_memory.core.chunker import content_hash, estimate_tokens
12from truenex_memory.ingestion.global_refresh import RefreshReport
13from truenex_memory.store.repository import MemoryRepository
16PROJECT_DOCS_CONFIDENCE = 0.80
17AGENT_SESSION_CONFIDENCE = 0.60
18COMPACTION_CONFIDENCE = 0.75
19DEFAULT_CONFIDENCE = 0.50
20MIN_CANDIDATE_TOKENS = 8
21DEFAULT_AUTO_MEMORY_LIMIT = 300
22DEFAULT_AUTO_MEMORY_PER_SOURCE_LIMIT = 8
23AUTO_MEMORY_EXTENSIONS = frozenset({".md", ".markdown", ".txt", ".rst"})
25# Regex to detect compaction flag in TRUENEX_INGESTION_METADATA preamble without
26# full JSON parse — "is_compaction": true can appear anywhere in the JSON object.
27_RE_IS_COMPACTION = re.compile(r'"is_compaction"\s*:\s*true')
28_RE_METADATA_LINE = re.compile(r'^TRUENEX_INGESTION_METADATA\s+(\{.*\})', re.MULTILINE)
29_RE_JSON_DUMP = re.compile(r'^\s*\{.*"type"\s*:', re.DOTALL)
30_RE_AGENT_TRANSCRIPT_LINE = re.compile(r'(^|\n)\s*\[(?:user|assistant)\]:', re.IGNORECASE)
31_RE_ALL_USER_MESSAGES = re.compile(r'(^|\n)\s*\d+\.\s+all user messages\s*:', re.IGNORECASE)
32_RE_NUMBERED_COMMAND_LINE = re.compile(
33 r'^\s*\d+\.\s*'
34 r'(?:build|check|copy|deploy|execute|find|lancia|mostra|open|restore|run|'
35 r'show|start|stop|trova|verifica|verify)\b'
36 r'.*`[^`]*(?:cargo|cp|docker|find|git|grep|ls|npm|powershell|ssh)[^`]*`',
37 re.IGNORECASE,
38)
41@dataclass(frozen=True)
42class AutoMemoryCandidate:
43 """A source-grounded candidate for an unverified memory node."""
45 content: str
46 title: str
47 source_path: str
48 source_document_id: str
49 source_chunk_id: str
50 source_type: str | None
51 confidence: float
52 is_compaction: bool
55@dataclass(frozen=True)
56class AutoMemoryTelemetry:
57 """Read-only candidate quality counters for Auto Memory."""
59 candidates: int = 0
60 duplicate_skips: int = 0
61 duplicate_active: int = 0
62 duplicate_unverified: int = 0
63 duplicate_rejected: int = 0
64 low_confidence: int = 0
65 non_document_skipped: int = 0
66 noisy_session_skipped: int = 0
68 def to_dict(self) -> dict[str, int]:
69 return {
70 "candidates": self.candidates,
71 "duplicate_skips": self.duplicate_skips,
72 "duplicate_active": self.duplicate_active,
73 "duplicate_unverified": self.duplicate_unverified,
74 "duplicate_rejected": self.duplicate_rejected,
75 "low_confidence": self.low_confidence,
76 "non_document_skipped": self.non_document_skipped,
77 "noisy_session_skipped": self.noisy_session_skipped,
78 }
81def generate_unverified_auto_memories(
82 db_path: Path,
83 report: RefreshReport,
84 *,
85 dry_run: bool,
86 min_confidence: float = DEFAULT_CONFIDENCE,
87 limit: int = DEFAULT_AUTO_MEMORY_LIMIT,
88 per_source_limit: int = DEFAULT_AUTO_MEMORY_PER_SOURCE_LIMIT,
89) -> None:
90 """Generate exact-deduped unverified memory nodes from active chunks."""
91 if not db_path.exists():
92 return
94 repository = MemoryRepository(db_path)
95 created_or_planned = 0
96 created_or_planned_by_source: dict[str, int] = {}
97 blocked_hashes = _blocked_auto_memory_content_hashes_by_reason(db_path)
98 for candidate in _iter_candidates(db_path):
99 # agent_session chunks are always valid candidates regardless of extension.
100 if candidate.source_type != "agent_session":
101 if Path(candidate.source_path).suffix.lower() not in AUTO_MEMORY_EXTENSIONS:
102 report.auto_memory_non_document_skipped += 1
103 continue
104 elif _is_noisy_agent_session_candidate(candidate.content):
105 report.auto_memory_noisy_session_skipped += 1
106 continue
107 report.auto_memory_candidates += 1
108 candidate_hash = content_hash(candidate.content)
109 duplicate_reason = blocked_hashes.get(candidate_hash)
110 if duplicate_reason:
111 report.auto_memory_duplicates += 1
112 _count_duplicate_reason(report, duplicate_reason)
113 continue
114 if candidate.confidence < min_confidence:
115 report.auto_memory_low_confidence += 1
116 continue
117 source_count = created_or_planned_by_source.get(candidate.source_path, 0)
118 if per_source_limit > 0 and source_count >= per_source_limit:
119 report.auto_memory_source_limit_skipped += 1
120 continue
121 if limit > 0 and created_or_planned >= limit:
122 report.auto_memory_limit_skipped += 1
123 continue
124 created_or_planned += 1
125 created_or_planned_by_source[candidate.source_path] = source_count + 1
126 report.auto_memory_created += 1
127 if dry_run:
128 continue
129 repository.add_memory(
130 candidate.content,
131 memory_type="note",
132 title=candidate.title,
133 status="unverified",
134 source_kind="auto",
135 source_document_id=candidate.source_document_id,
136 source_chunk_id=candidate.source_chunk_id,
137 source_path=candidate.source_path,
138 created_by="auto",
139 confidence=candidate.confidence,
140 )
141 blocked_hashes[candidate_hash] = "unverified"
144def analyze_auto_memory_candidates(
145 db_path: Path,
146 *,
147 min_confidence: float = DEFAULT_CONFIDENCE,
148) -> AutoMemoryTelemetry:
149 """Return read-only Auto Memory candidate quality counters."""
150 if not db_path.exists():
151 return AutoMemoryTelemetry()
153 counts = {
154 "candidates": 0,
155 "duplicate_skips": 0,
156 "duplicate_active": 0,
157 "duplicate_unverified": 0,
158 "duplicate_rejected": 0,
159 "low_confidence": 0,
160 "non_document_skipped": 0,
161 "noisy_session_skipped": 0,
162 }
163 try:
164 blocked_hashes = _blocked_auto_memory_content_hashes_by_reason(db_path)
165 candidates = _iter_candidates(db_path)
166 except sqlite3.DatabaseError:
167 return AutoMemoryTelemetry()
169 for candidate in candidates:
170 if candidate.source_type != "agent_session":
171 if Path(candidate.source_path).suffix.lower() not in AUTO_MEMORY_EXTENSIONS:
172 counts["non_document_skipped"] += 1
173 continue
174 elif _is_noisy_agent_session_candidate(candidate.content):
175 counts["noisy_session_skipped"] += 1
176 continue
178 counts["candidates"] += 1
179 candidate_hash = content_hash(candidate.content)
180 duplicate_reason = blocked_hashes.get(candidate_hash)
181 if duplicate_reason:
182 counts["duplicate_skips"] += 1
183 counts[f"duplicate_{duplicate_reason}"] += 1
184 continue
185 if candidate.confidence < min_confidence:
186 counts["low_confidence"] += 1
188 return AutoMemoryTelemetry(**counts)
191def _iter_candidates(db_path: Path) -> list[AutoMemoryCandidate]:
192 with _connect_readonly(db_path) as conn:
193 rows = conn.execute(
194 """
195 SELECT
196 c.id AS chunk_id,
197 c.document_id,
198 c.heading_path,
199 c.content,
200 d.path AS source_path,
201 sl.source_type
202 FROM chunks c
203 JOIN documents d ON d.id = c.document_id
204 JOIN source_ledger sl ON sl.source_path_or_alias = d.path
205 WHERE sl.status = 'active'
206 ORDER BY d.path, c.chunk_index
207 """
208 ).fetchall()
210 candidates: list[AutoMemoryCandidate] = []
211 for row in rows:
212 raw_content = str(row["content"])
213 is_compaction = bool(_RE_IS_COMPACTION.search(raw_content))
214 text = _candidate_content(raw_content)
215 if _is_raw_json_dump(text):
216 continue
217 if estimate_tokens(text) < MIN_CANDIDATE_TOKENS:
218 continue
219 source_type = row["source_type"]
220 source_path = str(row["source_path"])
221 if source_type == "agent_session":
222 title = _agent_session_title(source_path, raw_content)
223 confidence = COMPACTION_CONFIDENCE if is_compaction else AGENT_SESSION_CONFIDENCE
224 else:
225 title = str(row["heading_path"] or Path(source_path).name)
226 confidence = _confidence_for_source_type(source_type)
227 is_compaction = False
228 candidates.append(
229 AutoMemoryCandidate(
230 content=text,
231 title=title,
232 source_path=source_path,
233 source_document_id=str(row["document_id"]),
234 source_chunk_id=str(row["chunk_id"]),
235 source_type=str(source_type) if source_type is not None else None,
236 confidence=confidence,
237 is_compaction=is_compaction,
238 )
239 )
241 # Prioritise compaction records first, then longest exchange text, so that
242 # per_source_limit slots are filled with the most informative chunks.
243 candidates.sort(key=_sort_key_for_candidate)
244 return candidates
247def _sort_key_for_candidate(c: AutoMemoryCandidate) -> tuple[int, int, int]:
248 """Compaction first, then agent_session before static docs, then descending token count."""
249 source_priority = 0 if c.source_type == "agent_session" else 1
250 return (0 if c.is_compaction else 1, source_priority, -estimate_tokens(c.content))
253def _agent_session_title(source_path: str, chunk_content: str) -> str:
254 """Build a human-readable title for an agent-session memory candidate."""
255 meta_match = _RE_METADATA_LINE.search(chunk_content)
256 session_id: str | None = None
257 created_at: str | None = None
258 is_compaction = False
259 exchange_index: int | None = None
261 if meta_match:
262 try:
263 meta = json.loads(meta_match.group(1))
264 session_id = meta.get("session_id")
265 created_at = meta.get("created_at") or meta.get("last_modified")
266 is_compaction = bool(meta.get("is_compaction"))
267 exchange_index = meta.get("exchange_index")
268 except (json.JSONDecodeError, AttributeError):
269 pass
271 # Date portion: prefer ISO timestamp truncated to date, fall back to filename.
272 date_str: str = ""
273 if created_at:
274 date_str = str(created_at)[:10]
275 elif session_id:
276 # session_id often encodes a timestamp: take first 10 chars if digit-like
277 candidate_date = re.search(r'\d{4}-\d{2}-\d{2}', str(session_id))
278 date_str = candidate_date.group(0) if candidate_date else ""
280 if not date_str:
281 stem = Path(source_path.split("::")[0]).stem
282 date_match = re.search(r'\d{4}-\d{2}-\d{2}', stem)
283 date_str = date_match.group(0) if date_match else stem
285 if is_compaction:
286 return f"Session Summary: {date_str}" if date_str else "Session Summary"
288 # For normal exchanges use first 60 chars of user-visible text as suffix.
289 text_after_meta = _candidate_content(chunk_content)
290 snippet = text_after_meta[:60].replace("\n", " ").strip()
291 n = f"#{exchange_index}" if exchange_index is not None else ""
292 prefix = f"Session Exchange {n}: " if n else "Session Exchange: "
293 return f"{prefix}{snippet}" if snippet else f"Session Exchange {n} ({date_str})"
296def _connect_readonly(db_path: Path) -> sqlite3.Connection:
297 uri_path = db_path.resolve().as_posix()
298 conn = sqlite3.connect(f"file:{uri_path}?mode=ro", uri=True)
299 conn.row_factory = sqlite3.Row
300 return conn
303def _blocked_auto_memory_content_hashes_by_reason(db_path: Path) -> dict[str, str]:
304 """Return content hashes that suppress auto memories, grouped by reason."""
305 with _connect_readonly(db_path) as conn:
306 rows = conn.execute(
307 """
308 SELECT content_hash, status, source_kind, created_by
309 FROM memory_nodes
310 WHERE project_id = 'default'
311 AND (
312 status IN ('active', 'unverified')
313 OR (status = 'obsolete' AND source_kind = 'auto' AND created_by = 'auto')
314 )
315 ORDER BY created_at, id
316 """
317 ).fetchall()
318 blocked: dict[str, str] = {}
319 for row in rows:
320 row_hash = row["content_hash"]
321 if not row_hash:
322 continue
323 reason = _duplicate_reason_for_row(row)
324 existing = blocked.get(str(row_hash))
325 if (
326 existing is None
327 or _duplicate_reason_priority(reason) < _duplicate_reason_priority(existing)
328 ):
329 blocked[str(row_hash)] = reason
330 return blocked
333def _duplicate_reason_for_row(row: sqlite3.Row) -> str:
334 status = row["status"]
335 if status == "active":
336 return "active"
337 if status == "unverified":
338 return "unverified"
339 return "rejected"
342def _duplicate_reason_priority(reason: str) -> int:
343 return {"active": 0, "unverified": 1, "rejected": 2}.get(reason, 99)
346def _count_duplicate_reason(report: RefreshReport, reason: str) -> None:
347 if reason == "active":
348 report.auto_memory_duplicate_active += 1
349 elif reason == "unverified":
350 report.auto_memory_duplicate_unverified += 1
351 elif reason == "rejected":
352 report.auto_memory_duplicate_rejected += 1
355def _candidate_content(chunk_content: str) -> str:
356 """Strip ingestion metadata preamble from the first indexed chunk."""
357 text = chunk_content.strip()
358 if text.startswith("TRUENEX_INGESTION_METADATA "):
359 parts = re.split(r"\n\s*\n", text, maxsplit=1)
360 if len(parts) == 2:
361 return parts[1].strip()
362 return ""
363 return text
366def _is_raw_json_dump(text: str) -> bool:
367 return bool(_RE_JSON_DUMP.match(text[:300]))
370def _is_noisy_agent_session_candidate(text: str) -> bool:
371 """Return True for transcript fragments that should stay indexed, not promoted.
373 Agent sessions are useful as source-grounded chunks, but generated memory
374 nodes should capture distilled facts. Raw turn text, resume wrappers,
375 message inventories, and command-only snippets create noisy global memory.
376 """
377 clean = text.strip()
378 lowered = clean.lower()
379 if "continue the conversation from where it left off" in lowered:
380 return True
381 if _RE_ALL_USER_MESSAGES.search(clean):
382 return True
383 if _RE_AGENT_TRANSCRIPT_LINE.search(clean):
384 return True
385 if _looks_like_command_snippet(clean):
386 return True
387 return False
390def _looks_like_command_snippet(text: str) -> bool:
391 numbered_lines = [
392 line.strip()
393 for line in text.splitlines()
394 if re.match(r'^\s*\d+\.', line)
395 ]
396 if len(numbered_lines) < 2:
397 return False
398 command_lines = [
399 line for line in numbered_lines if _RE_NUMBERED_COMMAND_LINE.match(line)
400 ]
401 return len(command_lines) >= 2 and (len(command_lines) / len(numbered_lines)) >= 0.66
404def _confidence_for_source_type(source_type: object) -> float:
405 if source_type == "project_docs":
406 return PROJECT_DOCS_CONFIDENCE
407 if source_type == "agent_session":
408 return AGENT_SESSION_CONFIDENCE
409 return DEFAULT_CONFIDENCE