Coverage for src \ truenex_memory \ ingestion \ global_search.py: 84%
174 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-19 10:21 +0200
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-19 10:21 +0200
1"""Read-only keyword search for the Truenex Memory global store."""
3from __future__ import annotations
5from dataclasses import dataclass, field
6from pathlib import Path
7import re
8import sqlite3
11DEFAULT_GLOBAL_SEARCH_LIMIT = 10
12DEFAULT_EXCERPT_CHARS = 320
13ACTIVE_MEMORY_STATUSES = ("active", "unverified")
14EXCLUDED_LEDGER_STATUSES = ("missing", "skipped")
15METADATA_MARKER = "TRUENEX_INGESTION_METADATA"
16GLOBAL_SEARCH_KINDS = frozenset({"all", "memory", "chunks"})
19@dataclass(frozen=True)
20class GlobalSearchHit:
21 """One read-only global search result."""
23 id: str
24 kind: str
25 title: str
26 content: str
27 content_excerpt: str
28 source_path: str | None
29 heading_path: str | None
30 memory_type: str
31 status: str
32 score: float
33 source_kind: str | None = None
34 source_document_id: str | None = None
35 source_chunk_id: str | None = None
36 confidence: float | None = None
38 def to_dict(self) -> dict[str, object]:
39 return {
40 "id": self.id,
41 "kind": self.kind,
42 "title": self.title,
43 "content": self.content,
44 "content_excerpt": self.content_excerpt,
45 "source_path": self.source_path,
46 "heading_path": self.heading_path,
47 "memory_type": self.memory_type,
48 "status": self.status,
49 "score": self.score,
50 "source_kind": self.source_kind,
51 "source_document_id": self.source_document_id,
52 "source_chunk_id": self.source_chunk_id,
53 "confidence": self.confidence,
54 }
57@dataclass
58class GlobalSearchReport:
59 """Read-only keyword search report for global memory."""
61 query: str
62 db_path: str
63 db_exists: bool
64 top_k: int = DEFAULT_GLOBAL_SEARCH_LIMIT
65 include_inactive: bool = False
66 kind_filter: str = "all"
67 result_count: int = 0
68 results: list[GlobalSearchHit] = field(default_factory=list)
69 warnings: list[str] = field(default_factory=list)
71 def to_dict(self) -> dict[str, object]:
72 return {
73 "query": self.query,
74 "db_path": self.db_path,
75 "db_exists": self.db_exists,
76 "top_k": self.top_k,
77 "include_inactive": self.include_inactive,
78 "kind_filter": self.kind_filter,
79 "result_count": self.result_count,
80 "results": [item.to_dict() for item in self.results],
81 "warnings": self.warnings,
82 }
85def build_global_search(
86 db_path: Path,
87 query: str,
88 *,
89 top_k: int = DEFAULT_GLOBAL_SEARCH_LIMIT,
90 include_inactive: bool = False,
91 kind_filter: str = "all",
92 excerpt_chars: int = DEFAULT_EXCERPT_CHARS,
93) -> GlobalSearchReport:
94 """Search the global SQLite store without creating or mutating anything."""
95 if top_k < 1:
96 raise ValueError("top_k must be greater than zero")
97 if excerpt_chars < 80:
98 raise ValueError("excerpt_chars must be at least 80")
99 if kind_filter not in GLOBAL_SEARCH_KINDS:
100 expected = ", ".join(sorted(GLOBAL_SEARCH_KINDS))
101 raise ValueError(f"invalid kind_filter {kind_filter!r}; expected one of {expected}")
103 report = GlobalSearchReport(
104 query=query,
105 db_path=str(db_path),
106 db_exists=db_path.exists(),
107 top_k=top_k,
108 include_inactive=include_inactive,
109 kind_filter=kind_filter,
110 )
111 tokens = tokenize_set(query)
112 if not tokens:
113 report.warnings.append("query has no searchable tokens")
114 return report
115 if not db_path.exists():
116 report.warnings.append("database not found")
117 return report
119 try:
120 conn = _connect_readonly(db_path)
121 except Exception:
122 report.warnings.append("database exists but cannot be opened read-only")
123 return report
125 try:
126 hits: list[GlobalSearchHit] = []
127 if kind_filter in ("all", "memory") and _table_exists(conn, "memory_nodes"):
128 hits.extend(_search_memory_nodes(conn, tokens, include_inactive, excerpt_chars))
129 elif kind_filter in ("all", "memory"):
130 report.warnings.append("memory_nodes table not found")
132 if (
133 kind_filter in ("all", "chunks")
134 and _table_exists(conn, "chunks")
135 and _table_exists(conn, "documents")
136 ):
137 hits.extend(_search_chunks(conn, tokens, excerpt_chars))
138 elif kind_filter in ("all", "chunks"):
139 report.warnings.append("documents/chunks tables not found")
141 hits.sort(key=lambda item: (-item.score, _kind_rank(item.kind), item.title, item.id))
142 report.results = hits[:top_k]
143 report.result_count = len(report.results)
144 except sqlite3.DatabaseError:
145 report.warnings.append("database readable but global search query failed")
146 finally:
147 conn.close()
149 return report
152def format_global_search_report(report: GlobalSearchReport) -> str:
153 """Format a global search report as concise terminal text."""
154 lines: list[str] = [f"Global Search: {report.query}"]
155 lines.append("=" * 60)
156 lines.append(f"Database: {report.db_path}")
157 if not report.db_exists:
158 lines.append(" (not found)")
159 lines.append(f"Kind: {report.kind_filter}")
160 lines.append(f"Results: {report.result_count} / top_k {report.top_k}")
161 if report.include_inactive:
162 lines.append("Inactive memory statuses: included")
164 if report.results:
165 lines.append("")
166 for index, item in enumerate(report.results, start=1):
167 confidence = "" if item.confidence is None else f" confidence={item.confidence:.2f}"
168 lines.append(
169 f"{index}. {item.score:.4f} {item.title} "
170 f"[{item.kind}/{item.memory_type}/{item.status}]{confidence}"
171 )
172 if item.source_path:
173 lines.append(f" source: {item.source_path}")
174 if item.heading_path:
175 lines.append(f" heading: {item.heading_path}")
176 lines.append(f" {item.content_excerpt}")
178 if report.warnings:
179 lines.append("")
180 lines.append("Warnings:")
181 for warning in report.warnings:
182 lines.append(f" - {warning}")
184 return "\n".join(lines)
187def _search_memory_nodes(
188 conn: sqlite3.Connection,
189 tokens: set[str],
190 include_inactive: bool,
191 excerpt_chars: int,
192) -> list[GlobalSearchHit]:
193 if include_inactive:
194 rows = conn.execute("SELECT * FROM memory_nodes").fetchall()
195 else:
196 rows = conn.execute(
197 "SELECT * FROM memory_nodes WHERE status IN (?, ?)",
198 ACTIVE_MEMORY_STATUSES,
199 ).fetchall()
201 hits: list[GlobalSearchHit] = []
202 for row in rows:
203 content = str(row["content"] or "")
204 title = str(row["title"] or "")
205 text = f"{title} {content} {row['source_path'] or ''}"
206 text_tokens = tokenize_set(text)
207 overlap = tokens & text_tokens
208 if not overlap:
209 continue
210 score = round(len(overlap) / len(tokens) * 10.0, 4)
211 hits.append(
212 GlobalSearchHit(
213 id=str(row["id"]),
214 kind="memory_node",
215 title=title,
216 content=content,
217 content_excerpt=_excerpt(content, excerpt_chars),
218 source_path=str(row["source_path"]) if row["source_path"] is not None else None,
219 heading_path=None,
220 memory_type=str(row["type"]),
221 status=str(row["status"]),
222 score=score,
223 source_kind=str(row["source_kind"]) if row["source_kind"] is not None else None,
224 source_document_id=(
225 str(row["source_document_id"])
226 if row["source_document_id"] is not None else None
227 ),
228 source_chunk_id=(
229 str(row["source_chunk_id"])
230 if row["source_chunk_id"] is not None else None
231 ),
232 confidence=float(row["confidence"]) if row["confidence"] is not None else None,
233 )
234 )
235 return hits
238def _search_chunks(
239 conn: sqlite3.Connection,
240 tokens: set[str],
241 excerpt_chars: int,
242) -> list[GlobalSearchHit]:
243 if _table_exists(conn, "source_ledger"):
244 rows = conn.execute(
245 """
246 SELECT c.*, d.path, d.filename, sl.status AS ledger_status
247 FROM chunks c
248 JOIN documents d ON d.id = c.document_id
249 LEFT JOIN source_ledger sl ON sl.source_path_or_alias = d.path
250 WHERE sl.source_path_or_alias IS NULL OR sl.status NOT IN (?, ?)
251 """,
252 EXCLUDED_LEDGER_STATUSES,
253 ).fetchall()
254 else:
255 rows = conn.execute(
256 """
257 SELECT c.*, d.path, d.filename, NULL AS ledger_status
258 FROM chunks c
259 JOIN documents d ON d.id = c.document_id
260 """
261 ).fetchall()
263 if not rows:
264 return []
266 contents_for_scoring = [_strip_metadata_preamble(str(row["content"] or "")) for row in rows]
267 query_tokens = list(tokens)
268 tokenized = [tokenize(c) for c in contents_for_scoring]
269 bm25 = BM25(tokenized)
270 scores = bm25.get_scores(query_tokens)
272 hits: list[GlobalSearchHit] = []
273 for row, raw_score, stripped_content in zip(rows, scores, contents_for_scoring):
274 if raw_score <= 0:
275 continue
276 st = None
277 try:
278 st = row["source_type"]
279 except (IndexError, KeyError):
280 pass
281 final_score = round(raw_score * source_boost(st), 6)
282 title = str(row["heading_path"] or row["filename"] or Path(str(row["path"])).name)
283 hits.append(
284 GlobalSearchHit(
285 id=str(row["id"]),
286 kind="document_chunk",
287 title=title,
288 content=stripped_content,
289 content_excerpt=_excerpt(stripped_content, excerpt_chars),
290 source_path=str(row["path"]) if row["path"] is not None else None,
291 heading_path=str(row["heading_path"]) if row["heading_path"] is not None else None,
292 memory_type="document_chunk",
293 status="active",
294 score=final_score,
295 )
296 )
297 return hits
300def _connect_readonly(db_path: Path) -> sqlite3.Connection:
301 uri_path = db_path.resolve().as_posix()
302 conn = sqlite3.connect(f"file:{uri_path}?mode=ro", uri=True)
303 conn.row_factory = sqlite3.Row
304 return conn
307def _table_exists(conn: sqlite3.Connection, table_name: str) -> bool:
308 row = conn.execute(
309 "SELECT name FROM sqlite_master WHERE type='table' AND name = ?",
310 (table_name,),
311 ).fetchone()
312 return row is not None
315from truenex_memory.retrieval.scoring import BM25, tokenize, tokenize_set, source_boost
318def _excerpt(content: str, max_chars: int) -> str:
319 text = " ".join(content.split())
320 if len(text) <= max_chars:
321 return text
322 return text[: max_chars - 3].rstrip() + "..."
325def _strip_metadata_preamble(content: str) -> str:
326 text = content.lstrip()
327 if not text.startswith(METADATA_MARKER):
328 return content
329 parts = re.split(r"\r?\n\s*\r?\n", text, maxsplit=1)
330 if len(parts) == 2:
331 return parts[1].lstrip()
332 lines = text.splitlines()
333 for index, line in enumerate(lines):
334 if index > 0 and not line.strip():
335 return "\n".join(lines[index + 1 :]).lstrip()
336 return content
339def _kind_rank(kind: str) -> int:
340 return 0 if kind == "memory_node" else 1