Coverage for src \ truenex_memory \ ingestion \ global_search.py: 84%

174 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-19 10:21 +0200

1"""Read-only keyword search for the Truenex Memory global store.""" 

2 

3from __future__ import annotations 

4 

5from dataclasses import dataclass, field 

6from pathlib import Path 

7import re 

8import sqlite3 

9 

10 

11DEFAULT_GLOBAL_SEARCH_LIMIT = 10 

12DEFAULT_EXCERPT_CHARS = 320 

13ACTIVE_MEMORY_STATUSES = ("active", "unverified") 

14EXCLUDED_LEDGER_STATUSES = ("missing", "skipped") 

15METADATA_MARKER = "TRUENEX_INGESTION_METADATA" 

16GLOBAL_SEARCH_KINDS = frozenset({"all", "memory", "chunks"}) 

17 

18 

19@dataclass(frozen=True) 

20class GlobalSearchHit: 

21 """One read-only global search result.""" 

22 

23 id: str 

24 kind: str 

25 title: str 

26 content: str 

27 content_excerpt: str 

28 source_path: str | None 

29 heading_path: str | None 

30 memory_type: str 

31 status: str 

32 score: float 

33 source_kind: str | None = None 

34 source_document_id: str | None = None 

35 source_chunk_id: str | None = None 

36 confidence: float | None = None 

37 

38 def to_dict(self) -> dict[str, object]: 

39 return { 

40 "id": self.id, 

41 "kind": self.kind, 

42 "title": self.title, 

43 "content": self.content, 

44 "content_excerpt": self.content_excerpt, 

45 "source_path": self.source_path, 

46 "heading_path": self.heading_path, 

47 "memory_type": self.memory_type, 

48 "status": self.status, 

49 "score": self.score, 

50 "source_kind": self.source_kind, 

51 "source_document_id": self.source_document_id, 

52 "source_chunk_id": self.source_chunk_id, 

53 "confidence": self.confidence, 

54 } 

55 

56 

57@dataclass 

58class GlobalSearchReport: 

59 """Read-only keyword search report for global memory.""" 

60 

61 query: str 

62 db_path: str 

63 db_exists: bool 

64 top_k: int = DEFAULT_GLOBAL_SEARCH_LIMIT 

65 include_inactive: bool = False 

66 kind_filter: str = "all" 

67 result_count: int = 0 

68 results: list[GlobalSearchHit] = field(default_factory=list) 

69 warnings: list[str] = field(default_factory=list) 

70 

71 def to_dict(self) -> dict[str, object]: 

72 return { 

73 "query": self.query, 

74 "db_path": self.db_path, 

75 "db_exists": self.db_exists, 

76 "top_k": self.top_k, 

77 "include_inactive": self.include_inactive, 

78 "kind_filter": self.kind_filter, 

79 "result_count": self.result_count, 

80 "results": [item.to_dict() for item in self.results], 

81 "warnings": self.warnings, 

82 } 

83 

84 

85def build_global_search( 

86 db_path: Path, 

87 query: str, 

88 *, 

89 top_k: int = DEFAULT_GLOBAL_SEARCH_LIMIT, 

90 include_inactive: bool = False, 

91 kind_filter: str = "all", 

92 excerpt_chars: int = DEFAULT_EXCERPT_CHARS, 

93) -> GlobalSearchReport: 

94 """Search the global SQLite store without creating or mutating anything.""" 

95 if top_k < 1: 

96 raise ValueError("top_k must be greater than zero") 

97 if excerpt_chars < 80: 

98 raise ValueError("excerpt_chars must be at least 80") 

99 if kind_filter not in GLOBAL_SEARCH_KINDS: 

100 expected = ", ".join(sorted(GLOBAL_SEARCH_KINDS)) 

101 raise ValueError(f"invalid kind_filter {kind_filter!r}; expected one of {expected}") 

102 

103 report = GlobalSearchReport( 

104 query=query, 

105 db_path=str(db_path), 

106 db_exists=db_path.exists(), 

107 top_k=top_k, 

108 include_inactive=include_inactive, 

109 kind_filter=kind_filter, 

110 ) 

111 tokens = tokenize_set(query) 

112 if not tokens: 

113 report.warnings.append("query has no searchable tokens") 

114 return report 

115 if not db_path.exists(): 

116 report.warnings.append("database not found") 

117 return report 

118 

119 try: 

120 conn = _connect_readonly(db_path) 

121 except Exception: 

122 report.warnings.append("database exists but cannot be opened read-only") 

123 return report 

124 

125 try: 

126 hits: list[GlobalSearchHit] = [] 

127 if kind_filter in ("all", "memory") and _table_exists(conn, "memory_nodes"): 

128 hits.extend(_search_memory_nodes(conn, tokens, include_inactive, excerpt_chars)) 

129 elif kind_filter in ("all", "memory"): 

130 report.warnings.append("memory_nodes table not found") 

131 

132 if ( 

133 kind_filter in ("all", "chunks") 

134 and _table_exists(conn, "chunks") 

135 and _table_exists(conn, "documents") 

136 ): 

137 hits.extend(_search_chunks(conn, tokens, excerpt_chars)) 

138 elif kind_filter in ("all", "chunks"): 

139 report.warnings.append("documents/chunks tables not found") 

140 

141 hits.sort(key=lambda item: (-item.score, _kind_rank(item.kind), item.title, item.id)) 

142 report.results = hits[:top_k] 

143 report.result_count = len(report.results) 

144 except sqlite3.DatabaseError: 

145 report.warnings.append("database readable but global search query failed") 

146 finally: 

147 conn.close() 

148 

149 return report 

150 

151 

152def format_global_search_report(report: GlobalSearchReport) -> str: 

153 """Format a global search report as concise terminal text.""" 

154 lines: list[str] = [f"Global Search: {report.query}"] 

155 lines.append("=" * 60) 

156 lines.append(f"Database: {report.db_path}") 

157 if not report.db_exists: 

158 lines.append(" (not found)") 

159 lines.append(f"Kind: {report.kind_filter}") 

160 lines.append(f"Results: {report.result_count} / top_k {report.top_k}") 

161 if report.include_inactive: 

162 lines.append("Inactive memory statuses: included") 

163 

164 if report.results: 

165 lines.append("") 

166 for index, item in enumerate(report.results, start=1): 

167 confidence = "" if item.confidence is None else f" confidence={item.confidence:.2f}" 

168 lines.append( 

169 f"{index}. {item.score:.4f} {item.title} " 

170 f"[{item.kind}/{item.memory_type}/{item.status}]{confidence}" 

171 ) 

172 if item.source_path: 

173 lines.append(f" source: {item.source_path}") 

174 if item.heading_path: 

175 lines.append(f" heading: {item.heading_path}") 

176 lines.append(f" {item.content_excerpt}") 

177 

178 if report.warnings: 

179 lines.append("") 

180 lines.append("Warnings:") 

181 for warning in report.warnings: 

182 lines.append(f" - {warning}") 

183 

184 return "\n".join(lines) 

185 

186 

187def _search_memory_nodes( 

188 conn: sqlite3.Connection, 

189 tokens: set[str], 

190 include_inactive: bool, 

191 excerpt_chars: int, 

192) -> list[GlobalSearchHit]: 

193 if include_inactive: 

194 rows = conn.execute("SELECT * FROM memory_nodes").fetchall() 

195 else: 

196 rows = conn.execute( 

197 "SELECT * FROM memory_nodes WHERE status IN (?, ?)", 

198 ACTIVE_MEMORY_STATUSES, 

199 ).fetchall() 

200 

201 hits: list[GlobalSearchHit] = [] 

202 for row in rows: 

203 content = str(row["content"] or "") 

204 title = str(row["title"] or "") 

205 text = f"{title} {content} {row['source_path'] or ''}" 

206 text_tokens = tokenize_set(text) 

207 overlap = tokens & text_tokens 

208 if not overlap: 

209 continue 

210 score = round(len(overlap) / len(tokens) * 10.0, 4) 

211 hits.append( 

212 GlobalSearchHit( 

213 id=str(row["id"]), 

214 kind="memory_node", 

215 title=title, 

216 content=content, 

217 content_excerpt=_excerpt(content, excerpt_chars), 

218 source_path=str(row["source_path"]) if row["source_path"] is not None else None, 

219 heading_path=None, 

220 memory_type=str(row["type"]), 

221 status=str(row["status"]), 

222 score=score, 

223 source_kind=str(row["source_kind"]) if row["source_kind"] is not None else None, 

224 source_document_id=( 

225 str(row["source_document_id"]) 

226 if row["source_document_id"] is not None else None 

227 ), 

228 source_chunk_id=( 

229 str(row["source_chunk_id"]) 

230 if row["source_chunk_id"] is not None else None 

231 ), 

232 confidence=float(row["confidence"]) if row["confidence"] is not None else None, 

233 ) 

234 ) 

235 return hits 

236 

237 

238def _search_chunks( 

239 conn: sqlite3.Connection, 

240 tokens: set[str], 

241 excerpt_chars: int, 

242) -> list[GlobalSearchHit]: 

243 if _table_exists(conn, "source_ledger"): 

244 rows = conn.execute( 

245 """ 

246 SELECT c.*, d.path, d.filename, sl.status AS ledger_status 

247 FROM chunks c 

248 JOIN documents d ON d.id = c.document_id 

249 LEFT JOIN source_ledger sl ON sl.source_path_or_alias = d.path 

250 WHERE sl.source_path_or_alias IS NULL OR sl.status NOT IN (?, ?) 

251 """, 

252 EXCLUDED_LEDGER_STATUSES, 

253 ).fetchall() 

254 else: 

255 rows = conn.execute( 

256 """ 

257 SELECT c.*, d.path, d.filename, NULL AS ledger_status 

258 FROM chunks c 

259 JOIN documents d ON d.id = c.document_id 

260 """ 

261 ).fetchall() 

262 

263 if not rows: 

264 return [] 

265 

266 contents_for_scoring = [_strip_metadata_preamble(str(row["content"] or "")) for row in rows] 

267 query_tokens = list(tokens) 

268 tokenized = [tokenize(c) for c in contents_for_scoring] 

269 bm25 = BM25(tokenized) 

270 scores = bm25.get_scores(query_tokens) 

271 

272 hits: list[GlobalSearchHit] = [] 

273 for row, raw_score, stripped_content in zip(rows, scores, contents_for_scoring): 

274 if raw_score <= 0: 

275 continue 

276 st = None 

277 try: 

278 st = row["source_type"] 

279 except (IndexError, KeyError): 

280 pass 

281 final_score = round(raw_score * source_boost(st), 6) 

282 title = str(row["heading_path"] or row["filename"] or Path(str(row["path"])).name) 

283 hits.append( 

284 GlobalSearchHit( 

285 id=str(row["id"]), 

286 kind="document_chunk", 

287 title=title, 

288 content=stripped_content, 

289 content_excerpt=_excerpt(stripped_content, excerpt_chars), 

290 source_path=str(row["path"]) if row["path"] is not None else None, 

291 heading_path=str(row["heading_path"]) if row["heading_path"] is not None else None, 

292 memory_type="document_chunk", 

293 status="active", 

294 score=final_score, 

295 ) 

296 ) 

297 return hits 

298 

299 

300def _connect_readonly(db_path: Path) -> sqlite3.Connection: 

301 uri_path = db_path.resolve().as_posix() 

302 conn = sqlite3.connect(f"file:{uri_path}?mode=ro", uri=True) 

303 conn.row_factory = sqlite3.Row 

304 return conn 

305 

306 

307def _table_exists(conn: sqlite3.Connection, table_name: str) -> bool: 

308 row = conn.execute( 

309 "SELECT name FROM sqlite_master WHERE type='table' AND name = ?", 

310 (table_name,), 

311 ).fetchone() 

312 return row is not None 

313 

314 

315from truenex_memory.retrieval.scoring import BM25, tokenize, tokenize_set, source_boost 

316 

317 

318def _excerpt(content: str, max_chars: int) -> str: 

319 text = " ".join(content.split()) 

320 if len(text) <= max_chars: 

321 return text 

322 return text[: max_chars - 3].rstrip() + "..." 

323 

324 

325def _strip_metadata_preamble(content: str) -> str: 

326 text = content.lstrip() 

327 if not text.startswith(METADATA_MARKER): 

328 return content 

329 parts = re.split(r"\r?\n\s*\r?\n", text, maxsplit=1) 

330 if len(parts) == 2: 

331 return parts[1].lstrip() 

332 lines = text.splitlines() 

333 for index, line in enumerate(lines): 

334 if index > 0 and not line.strip(): 

335 return "\n".join(lines[index + 1 :]).lstrip() 

336 return content 

337 

338 

339def _kind_rank(kind: str) -> int: 

340 return 0 if kind == "memory_node" else 1