Coverage for src \ truenex_memory \ ingestion \ global_source_health.py: 88%

184 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-19 10:21 +0200

1"""Source catalog and ledger health review/cleanup helpers.""" 

2 

3from __future__ import annotations 

4 

5from dataclasses import asdict, dataclass, replace 

6from datetime import datetime, timezone 

7from pathlib import Path 

8import sqlite3 

9 

10from truenex_memory.discovery.source_catalog import CatalogEntry, SourceCatalog 

11from truenex_memory.ingestion.global_refresh import ( 

12 _is_nonlocal_absolute_path, 

13 _ledger_record_belongs_to_entry, 

14 _parser_source_type_for_entry, 

15 _physical_path, 

16) 

17from truenex_memory.store.source_ledger import SourceLedgerRecord, list_ledger_entries 

18from truenex_memory.store.sqlite import connect 

19 

20 

21EXPECTED_SKIP_MARKERS = ( 

22 "server_alias:", 

23 "non-local path:", 

24 "stale ledger:", 

25 "removed local source:", 

26 "disabled catalog source:", 

27 "no indexable records", 

28) 

29 

30 

31@dataclass(frozen=True) 

32class SourceHealthAction: 

33 """A planned or applied source-health cleanup action.""" 

34 

35 action: str 

36 source_id: str 

37 source_type: str 

38 path_or_alias: str 

39 reason: str 

40 previous_status: str | None = None 

41 next_status: str | None = None 

42 

43 def to_dict(self) -> dict[str, object]: 

44 return asdict(self) 

45 

46 

47@dataclass 

48class SourceHealthReport: 

49 """Source catalog/ledger health report.""" 

50 

51 catalog_path: str 

52 db_path: str 

53 dry_run: bool 

54 catalog_exists: bool = False 

55 db_exists: bool = False 

56 catalog_entries: int = 0 

57 confirmed_entries: int = 0 

58 disabled_catalog_entries: int = 0 

59 nonlocal_catalog_entries: int = 0 

60 missing_catalog_entries: int = 0 

61 ledger_problem_entries: int = 0 

62 cleanup_candidates: int = 0 

63 catalog_changed: int = 0 

64 ledger_changed: int = 0 

65 actions: list[SourceHealthAction] = None # type: ignore[assignment] 

66 warnings: list[str] = None # type: ignore[assignment] 

67 

68 def __post_init__(self) -> None: 

69 if self.actions is None: 

70 self.actions = [] 

71 if self.warnings is None: 

72 self.warnings = [] 

73 

74 def to_dict(self) -> dict[str, object]: 

75 return { 

76 "catalog_path": self.catalog_path, 

77 "db_path": self.db_path, 

78 "dry_run": self.dry_run, 

79 "catalog_exists": self.catalog_exists, 

80 "db_exists": self.db_exists, 

81 "catalog_entries": self.catalog_entries, 

82 "confirmed_entries": self.confirmed_entries, 

83 "disabled_catalog_entries": self.disabled_catalog_entries, 

84 "nonlocal_catalog_entries": self.nonlocal_catalog_entries, 

85 "missing_catalog_entries": self.missing_catalog_entries, 

86 "ledger_problem_entries": self.ledger_problem_entries, 

87 "cleanup_candidates": self.cleanup_candidates, 

88 "catalog_changed": self.catalog_changed, 

89 "ledger_changed": self.ledger_changed, 

90 "actions": [action.to_dict() for action in self.actions], 

91 "warnings": self.warnings, 

92 } 

93 

94 

95def build_source_health( 

96 catalog_path: Path, 

97 db_path: Path, 

98 *, 

99 apply: bool = False, 

100 limit: int = 50, 

101) -> SourceHealthReport: 

102 """Review and optionally clean source catalog/ledger health. 

103 

104 Cleanup is conservative: 

105 

106 - missing local catalog entries are disabled, not deleted; 

107 - non-local POSIX paths on Windows are kept in the catalog but marked as 

108 expected skipped in the ledger; 

109 - stale/problem ledger rows are marked skipped with explanatory provenance; 

110 - no indexed documents/chunks are deleted. 

111 """ 

112 report = SourceHealthReport( 

113 catalog_path=str(catalog_path), 

114 db_path=str(db_path), 

115 dry_run=not apply, 

116 ) 

117 catalog = _load_catalog(catalog_path, report) 

118 confirmed = [e for e in catalog.entries if e.confirmation_status == "confirmed"] 

119 

120 replacement_entries = list(catalog.entries) 

121 for index, entry in enumerate(catalog.entries): 

122 if entry.confirmation_status != "confirmed": 

123 continue 

124 if entry.source_type == "server_alias": 

125 continue 

126 if _is_nonlocal_absolute_path(entry.path_or_alias): 

127 report.nonlocal_catalog_entries += 1 

128 continue 

129 if not Path(entry.path_or_alias).is_absolute(): 

130 report.missing_catalog_entries += 1 

131 action = SourceHealthAction( 

132 action="disable_catalog_entry", 

133 source_id=entry.id, 

134 source_type=entry.source_type, 

135 path_or_alias=entry.path_or_alias, 

136 previous_status=entry.confirmation_status, 

137 next_status="disabled", 

138 reason="confirmed filesystem catalog path is relative", 

139 ) 

140 _add_action(report, action, limit) 

141 replacement_entries[index] = replace(entry, confirmation_status="disabled") 

142 if apply: 

143 report.catalog_changed += 1 

144 continue 

145 if not Path(entry.path_or_alias).exists(): 

146 report.missing_catalog_entries += 1 

147 action = SourceHealthAction( 

148 action="disable_catalog_entry", 

149 source_id=entry.id, 

150 source_type=entry.source_type, 

151 path_or_alias=entry.path_or_alias, 

152 previous_status=entry.confirmation_status, 

153 next_status="disabled", 

154 reason="confirmed local catalog path does not exist", 

155 ) 

156 _add_action(report, action, limit) 

157 replacement_entries[index] = replace(entry, confirmation_status="disabled") 

158 if apply: 

159 report.catalog_changed += 1 

160 

161 if apply and report.catalog_changed: 

162 SourceCatalog(entries=replacement_entries, version=catalog.version).save(catalog_path) 

163 

164 if db_path.exists(): 

165 report.db_exists = True 

166 _review_ledger( 

167 db_path, 

168 replacement_entries, 

169 report, 

170 apply=apply, 

171 limit=limit, 

172 ) 

173 else: 

174 report.warnings.append(f"Database not found: {db_path}") 

175 

176 return report 

177 

178 

179def format_source_health_report(report: SourceHealthReport) -> str: 

180 mode = "dry-run" if report.dry_run else "applied" 

181 lines = [f"Source Health ({mode})", "=" * 60] 

182 lines.append(f"Catalog: {report.catalog_path}") 

183 lines.append(f" exists: {'yes' if report.catalog_exists else 'no'}") 

184 lines.append(f" entries: {report.catalog_entries}") 

185 lines.append(f" confirmed: {report.confirmed_entries}") 

186 lines.append(f" missing local confirmed entries: {report.missing_catalog_entries}") 

187 lines.append(f" non-local confirmed entries: {report.nonlocal_catalog_entries}") 

188 lines.append(f"\nDatabase: {report.db_path}") 

189 lines.append(f" exists: {'yes' if report.db_exists else 'no'}") 

190 lines.append(f" ledger problem entries: {report.ledger_problem_entries}") 

191 lines.append(f" cleanup candidates: {report.cleanup_candidates}") 

192 lines.append(f" catalog changed: {report.catalog_changed}") 

193 lines.append(f" ledger changed: {report.ledger_changed}") 

194 

195 if report.warnings: 

196 lines.append("\nWarnings:") 

197 for warning in report.warnings: 

198 lines.append(f" - {warning}") 

199 

200 if report.actions: 

201 lines.append(f"\nActions ({len(report.actions)} shown):") 

202 for action in report.actions: 

203 status = "" 

204 if action.previous_status or action.next_status: 

205 status = f" [{action.previous_status or '-'} -> {action.next_status or '-'}]" 

206 lines.append( 

207 f" - {action.action}{status}: " 

208 f"{action.source_type}:{action.path_or_alias} -- {action.reason}" 

209 ) 

210 

211 return "\n".join(lines) 

212 

213 

214def _load_catalog(catalog_path: Path, report: SourceHealthReport) -> SourceCatalog: 

215 if not catalog_path.exists(): 

216 report.warnings.append(f"Catalog not found: {catalog_path}") 

217 return SourceCatalog() 

218 report.catalog_exists = True 

219 catalog = SourceCatalog.load(catalog_path) 

220 report.catalog_entries = len(catalog.entries) 

221 report.confirmed_entries = sum( 

222 1 for entry in catalog.entries if entry.confirmation_status == "confirmed" 

223 ) 

224 return catalog 

225 

226 

227def _review_ledger( 

228 db_path: Path, 

229 catalog_entries: list[CatalogEntry], 

230 report: SourceHealthReport, 

231 *, 

232 apply: bool, 

233 limit: int, 

234) -> None: 

235 try: 

236 with connect(db_path) as conn: 

237 ledger_entries = list_ledger_entries(conn) 

238 problem_entries = [ 

239 entry for entry in ledger_entries 

240 if entry.status in {"missing", "error", "skipped"} 

241 ] 

242 report.ledger_problem_entries = len(problem_entries) 

243 for ledger_entry in problem_entries: 

244 action = _cleanup_action_for_ledger_entry(ledger_entry, catalog_entries) 

245 if action is None: 

246 continue 

247 _add_action(report, action, limit) 

248 if apply: 

249 _apply_ledger_cleanup(conn, ledger_entry, action) 

250 report.ledger_changed += 1 

251 except sqlite3.DatabaseError as exc: 

252 report.warnings.append(f"Database could not be reviewed: {type(exc).__name__}: {exc}") 

253 

254 

255def _cleanup_action_for_ledger_entry( 

256 ledger_entry: SourceLedgerRecord, 

257 catalog_entries: list[CatalogEntry], 

258) -> SourceHealthAction | None: 

259 path = _physical_path(ledger_entry.source_path_or_alias) 

260 if ledger_entry.status == "skipped" and _is_expected_skip(ledger_entry.error_message): 

261 return None 

262 

263 if _is_nonlocal_absolute_path(path): 

264 return SourceHealthAction( 

265 action="mark_ledger_expected_skip", 

266 source_id=ledger_entry.source_id, 

267 source_type=ledger_entry.source_type, 

268 path_or_alias=ledger_entry.source_path_or_alias, 

269 previous_status=ledger_entry.status, 

270 next_status="skipped", 

271 reason="non-local path cannot be indexed from this local machine", 

272 ) 

273 

274 matching_catalog = _matching_catalog_entry(ledger_entry, catalog_entries) 

275 if matching_catalog is not None and matching_catalog.confirmation_status != "confirmed": 

276 return SourceHealthAction( 

277 action="mark_ledger_expected_skip", 

278 source_id=ledger_entry.source_id, 

279 source_type=ledger_entry.source_type, 

280 path_or_alias=ledger_entry.source_path_or_alias, 

281 previous_status=ledger_entry.status, 

282 next_status="skipped", 

283 reason="disabled catalog source should not block readiness", 

284 ) 

285 

286 if matching_catalog is None and ledger_entry.status in {"missing", "error"}: 

287 return SourceHealthAction( 

288 action="mark_ledger_expected_skip", 

289 source_id=ledger_entry.source_id, 

290 source_type=ledger_entry.source_type, 

291 path_or_alias=ledger_entry.source_path_or_alias, 

292 previous_status=ledger_entry.status, 

293 next_status="skipped", 

294 reason="stale ledger row has no confirmed catalog source", 

295 ) 

296 

297 if ( 

298 ledger_entry.status == "missing" 

299 and "previously indexed source file no longer exists" 

300 in (ledger_entry.error_message or "").lower() 

301 ): 

302 return SourceHealthAction( 

303 action="mark_ledger_expected_skip", 

304 source_id=ledger_entry.source_id, 

305 source_type=ledger_entry.source_type, 

306 path_or_alias=ledger_entry.source_path_or_alias, 

307 previous_status=ledger_entry.status, 

308 next_status="skipped", 

309 reason="removed local source file is no longer active", 

310 ) 

311 

312 return None 

313 

314 

315def _matching_catalog_entry( 

316 ledger_entry: SourceLedgerRecord, 

317 catalog_entries: list[CatalogEntry], 

318) -> CatalogEntry | None: 

319 for entry in catalog_entries: 

320 if ledger_entry.source_id == entry.id: 

321 return entry 

322 if _ledger_source_type_matches_catalog_entry(ledger_entry.source_type, entry): 

323 try: 

324 if _ledger_record_belongs_to_entry(ledger_entry.source_path_or_alias, entry): 

325 return entry 

326 except (OSError, ValueError): 

327 continue 

328 return None 

329 

330 

331def _ledger_source_type_matches_catalog_entry(source_type: str, entry: CatalogEntry) -> bool: 

332 if entry.source_type == "server_alias": 

333 return source_type == "server_alias" 

334 return source_type == _parser_source_type_for_entry(entry) 

335 

336 

337def _is_expected_skip(error_message: str | None) -> bool: 

338 lowered = (error_message or "").lower() 

339 return any(marker in lowered for marker in EXPECTED_SKIP_MARKERS) 

340 

341 

342def _apply_ledger_cleanup( 

343 conn: sqlite3.Connection, 

344 ledger_entry: SourceLedgerRecord, 

345 action: SourceHealthAction, 

346) -> None: 

347 error_message = _error_message_for_action(action) 

348 now = datetime.now(timezone.utc).isoformat() 

349 conn.execute( 

350 """ 

351 UPDATE source_ledger 

352 SET status = 'skipped', 

353 error_message = ?, 

354 updated_at = ? 

355 WHERE source_id = ? 

356 AND status IN ('missing', 'error', 'skipped') 

357 """, 

358 (error_message, now, ledger_entry.source_id), 

359 ) 

360 conn.commit() 

361 

362 

363def _error_message_for_action(action: SourceHealthAction) -> str: 

364 if "non-local path" in action.reason: 

365 return "non-local path: no local filesystem indexing" 

366 if "disabled catalog source" in action.reason: 

367 return "disabled catalog source: local path not indexed" 

368 if "stale ledger" in action.reason: 

369 return "stale ledger: no confirmed catalog source" 

370 if "removed local source" in action.reason: 

371 return "removed local source: no active local content" 

372 return action.reason 

373 

374 

375def _add_action(report: SourceHealthReport, action: SourceHealthAction, limit: int) -> None: 

376 report.cleanup_candidates += 1 

377 if len(report.actions) < limit: 

378 report.actions.append(action)