Coverage for src \ truenex_memory \ ingestion \ global_source_health.py: 88%
184 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-19 10:21 +0200
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-19 10:21 +0200
1"""Source catalog and ledger health review/cleanup helpers."""
3from __future__ import annotations
5from dataclasses import asdict, dataclass, replace
6from datetime import datetime, timezone
7from pathlib import Path
8import sqlite3
10from truenex_memory.discovery.source_catalog import CatalogEntry, SourceCatalog
11from truenex_memory.ingestion.global_refresh import (
12 _is_nonlocal_absolute_path,
13 _ledger_record_belongs_to_entry,
14 _parser_source_type_for_entry,
15 _physical_path,
16)
17from truenex_memory.store.source_ledger import SourceLedgerRecord, list_ledger_entries
18from truenex_memory.store.sqlite import connect
21EXPECTED_SKIP_MARKERS = (
22 "server_alias:",
23 "non-local path:",
24 "stale ledger:",
25 "removed local source:",
26 "disabled catalog source:",
27 "no indexable records",
28)
31@dataclass(frozen=True)
32class SourceHealthAction:
33 """A planned or applied source-health cleanup action."""
35 action: str
36 source_id: str
37 source_type: str
38 path_or_alias: str
39 reason: str
40 previous_status: str | None = None
41 next_status: str | None = None
43 def to_dict(self) -> dict[str, object]:
44 return asdict(self)
47@dataclass
48class SourceHealthReport:
49 """Source catalog/ledger health report."""
51 catalog_path: str
52 db_path: str
53 dry_run: bool
54 catalog_exists: bool = False
55 db_exists: bool = False
56 catalog_entries: int = 0
57 confirmed_entries: int = 0
58 disabled_catalog_entries: int = 0
59 nonlocal_catalog_entries: int = 0
60 missing_catalog_entries: int = 0
61 ledger_problem_entries: int = 0
62 cleanup_candidates: int = 0
63 catalog_changed: int = 0
64 ledger_changed: int = 0
65 actions: list[SourceHealthAction] = None # type: ignore[assignment]
66 warnings: list[str] = None # type: ignore[assignment]
68 def __post_init__(self) -> None:
69 if self.actions is None:
70 self.actions = []
71 if self.warnings is None:
72 self.warnings = []
74 def to_dict(self) -> dict[str, object]:
75 return {
76 "catalog_path": self.catalog_path,
77 "db_path": self.db_path,
78 "dry_run": self.dry_run,
79 "catalog_exists": self.catalog_exists,
80 "db_exists": self.db_exists,
81 "catalog_entries": self.catalog_entries,
82 "confirmed_entries": self.confirmed_entries,
83 "disabled_catalog_entries": self.disabled_catalog_entries,
84 "nonlocal_catalog_entries": self.nonlocal_catalog_entries,
85 "missing_catalog_entries": self.missing_catalog_entries,
86 "ledger_problem_entries": self.ledger_problem_entries,
87 "cleanup_candidates": self.cleanup_candidates,
88 "catalog_changed": self.catalog_changed,
89 "ledger_changed": self.ledger_changed,
90 "actions": [action.to_dict() for action in self.actions],
91 "warnings": self.warnings,
92 }
95def build_source_health(
96 catalog_path: Path,
97 db_path: Path,
98 *,
99 apply: bool = False,
100 limit: int = 50,
101) -> SourceHealthReport:
102 """Review and optionally clean source catalog/ledger health.
104 Cleanup is conservative:
106 - missing local catalog entries are disabled, not deleted;
107 - non-local POSIX paths on Windows are kept in the catalog but marked as
108 expected skipped in the ledger;
109 - stale/problem ledger rows are marked skipped with explanatory provenance;
110 - no indexed documents/chunks are deleted.
111 """
112 report = SourceHealthReport(
113 catalog_path=str(catalog_path),
114 db_path=str(db_path),
115 dry_run=not apply,
116 )
117 catalog = _load_catalog(catalog_path, report)
118 confirmed = [e for e in catalog.entries if e.confirmation_status == "confirmed"]
120 replacement_entries = list(catalog.entries)
121 for index, entry in enumerate(catalog.entries):
122 if entry.confirmation_status != "confirmed":
123 continue
124 if entry.source_type == "server_alias":
125 continue
126 if _is_nonlocal_absolute_path(entry.path_or_alias):
127 report.nonlocal_catalog_entries += 1
128 continue
129 if not Path(entry.path_or_alias).is_absolute():
130 report.missing_catalog_entries += 1
131 action = SourceHealthAction(
132 action="disable_catalog_entry",
133 source_id=entry.id,
134 source_type=entry.source_type,
135 path_or_alias=entry.path_or_alias,
136 previous_status=entry.confirmation_status,
137 next_status="disabled",
138 reason="confirmed filesystem catalog path is relative",
139 )
140 _add_action(report, action, limit)
141 replacement_entries[index] = replace(entry, confirmation_status="disabled")
142 if apply:
143 report.catalog_changed += 1
144 continue
145 if not Path(entry.path_or_alias).exists():
146 report.missing_catalog_entries += 1
147 action = SourceHealthAction(
148 action="disable_catalog_entry",
149 source_id=entry.id,
150 source_type=entry.source_type,
151 path_or_alias=entry.path_or_alias,
152 previous_status=entry.confirmation_status,
153 next_status="disabled",
154 reason="confirmed local catalog path does not exist",
155 )
156 _add_action(report, action, limit)
157 replacement_entries[index] = replace(entry, confirmation_status="disabled")
158 if apply:
159 report.catalog_changed += 1
161 if apply and report.catalog_changed:
162 SourceCatalog(entries=replacement_entries, version=catalog.version).save(catalog_path)
164 if db_path.exists():
165 report.db_exists = True
166 _review_ledger(
167 db_path,
168 replacement_entries,
169 report,
170 apply=apply,
171 limit=limit,
172 )
173 else:
174 report.warnings.append(f"Database not found: {db_path}")
176 return report
179def format_source_health_report(report: SourceHealthReport) -> str:
180 mode = "dry-run" if report.dry_run else "applied"
181 lines = [f"Source Health ({mode})", "=" * 60]
182 lines.append(f"Catalog: {report.catalog_path}")
183 lines.append(f" exists: {'yes' if report.catalog_exists else 'no'}")
184 lines.append(f" entries: {report.catalog_entries}")
185 lines.append(f" confirmed: {report.confirmed_entries}")
186 lines.append(f" missing local confirmed entries: {report.missing_catalog_entries}")
187 lines.append(f" non-local confirmed entries: {report.nonlocal_catalog_entries}")
188 lines.append(f"\nDatabase: {report.db_path}")
189 lines.append(f" exists: {'yes' if report.db_exists else 'no'}")
190 lines.append(f" ledger problem entries: {report.ledger_problem_entries}")
191 lines.append(f" cleanup candidates: {report.cleanup_candidates}")
192 lines.append(f" catalog changed: {report.catalog_changed}")
193 lines.append(f" ledger changed: {report.ledger_changed}")
195 if report.warnings:
196 lines.append("\nWarnings:")
197 for warning in report.warnings:
198 lines.append(f" - {warning}")
200 if report.actions:
201 lines.append(f"\nActions ({len(report.actions)} shown):")
202 for action in report.actions:
203 status = ""
204 if action.previous_status or action.next_status:
205 status = f" [{action.previous_status or '-'} -> {action.next_status or '-'}]"
206 lines.append(
207 f" - {action.action}{status}: "
208 f"{action.source_type}:{action.path_or_alias} -- {action.reason}"
209 )
211 return "\n".join(lines)
214def _load_catalog(catalog_path: Path, report: SourceHealthReport) -> SourceCatalog:
215 if not catalog_path.exists():
216 report.warnings.append(f"Catalog not found: {catalog_path}")
217 return SourceCatalog()
218 report.catalog_exists = True
219 catalog = SourceCatalog.load(catalog_path)
220 report.catalog_entries = len(catalog.entries)
221 report.confirmed_entries = sum(
222 1 for entry in catalog.entries if entry.confirmation_status == "confirmed"
223 )
224 return catalog
227def _review_ledger(
228 db_path: Path,
229 catalog_entries: list[CatalogEntry],
230 report: SourceHealthReport,
231 *,
232 apply: bool,
233 limit: int,
234) -> None:
235 try:
236 with connect(db_path) as conn:
237 ledger_entries = list_ledger_entries(conn)
238 problem_entries = [
239 entry for entry in ledger_entries
240 if entry.status in {"missing", "error", "skipped"}
241 ]
242 report.ledger_problem_entries = len(problem_entries)
243 for ledger_entry in problem_entries:
244 action = _cleanup_action_for_ledger_entry(ledger_entry, catalog_entries)
245 if action is None:
246 continue
247 _add_action(report, action, limit)
248 if apply:
249 _apply_ledger_cleanup(conn, ledger_entry, action)
250 report.ledger_changed += 1
251 except sqlite3.DatabaseError as exc:
252 report.warnings.append(f"Database could not be reviewed: {type(exc).__name__}: {exc}")
255def _cleanup_action_for_ledger_entry(
256 ledger_entry: SourceLedgerRecord,
257 catalog_entries: list[CatalogEntry],
258) -> SourceHealthAction | None:
259 path = _physical_path(ledger_entry.source_path_or_alias)
260 if ledger_entry.status == "skipped" and _is_expected_skip(ledger_entry.error_message):
261 return None
263 if _is_nonlocal_absolute_path(path):
264 return SourceHealthAction(
265 action="mark_ledger_expected_skip",
266 source_id=ledger_entry.source_id,
267 source_type=ledger_entry.source_type,
268 path_or_alias=ledger_entry.source_path_or_alias,
269 previous_status=ledger_entry.status,
270 next_status="skipped",
271 reason="non-local path cannot be indexed from this local machine",
272 )
274 matching_catalog = _matching_catalog_entry(ledger_entry, catalog_entries)
275 if matching_catalog is not None and matching_catalog.confirmation_status != "confirmed":
276 return SourceHealthAction(
277 action="mark_ledger_expected_skip",
278 source_id=ledger_entry.source_id,
279 source_type=ledger_entry.source_type,
280 path_or_alias=ledger_entry.source_path_or_alias,
281 previous_status=ledger_entry.status,
282 next_status="skipped",
283 reason="disabled catalog source should not block readiness",
284 )
286 if matching_catalog is None and ledger_entry.status in {"missing", "error"}:
287 return SourceHealthAction(
288 action="mark_ledger_expected_skip",
289 source_id=ledger_entry.source_id,
290 source_type=ledger_entry.source_type,
291 path_or_alias=ledger_entry.source_path_or_alias,
292 previous_status=ledger_entry.status,
293 next_status="skipped",
294 reason="stale ledger row has no confirmed catalog source",
295 )
297 if (
298 ledger_entry.status == "missing"
299 and "previously indexed source file no longer exists"
300 in (ledger_entry.error_message or "").lower()
301 ):
302 return SourceHealthAction(
303 action="mark_ledger_expected_skip",
304 source_id=ledger_entry.source_id,
305 source_type=ledger_entry.source_type,
306 path_or_alias=ledger_entry.source_path_or_alias,
307 previous_status=ledger_entry.status,
308 next_status="skipped",
309 reason="removed local source file is no longer active",
310 )
312 return None
315def _matching_catalog_entry(
316 ledger_entry: SourceLedgerRecord,
317 catalog_entries: list[CatalogEntry],
318) -> CatalogEntry | None:
319 for entry in catalog_entries:
320 if ledger_entry.source_id == entry.id:
321 return entry
322 if _ledger_source_type_matches_catalog_entry(ledger_entry.source_type, entry):
323 try:
324 if _ledger_record_belongs_to_entry(ledger_entry.source_path_or_alias, entry):
325 return entry
326 except (OSError, ValueError):
327 continue
328 return None
331def _ledger_source_type_matches_catalog_entry(source_type: str, entry: CatalogEntry) -> bool:
332 if entry.source_type == "server_alias":
333 return source_type == "server_alias"
334 return source_type == _parser_source_type_for_entry(entry)
337def _is_expected_skip(error_message: str | None) -> bool:
338 lowered = (error_message or "").lower()
339 return any(marker in lowered for marker in EXPECTED_SKIP_MARKERS)
342def _apply_ledger_cleanup(
343 conn: sqlite3.Connection,
344 ledger_entry: SourceLedgerRecord,
345 action: SourceHealthAction,
346) -> None:
347 error_message = _error_message_for_action(action)
348 now = datetime.now(timezone.utc).isoformat()
349 conn.execute(
350 """
351 UPDATE source_ledger
352 SET status = 'skipped',
353 error_message = ?,
354 updated_at = ?
355 WHERE source_id = ?
356 AND status IN ('missing', 'error', 'skipped')
357 """,
358 (error_message, now, ledger_entry.source_id),
359 )
360 conn.commit()
363def _error_message_for_action(action: SourceHealthAction) -> str:
364 if "non-local path" in action.reason:
365 return "non-local path: no local filesystem indexing"
366 if "disabled catalog source" in action.reason:
367 return "disabled catalog source: local path not indexed"
368 if "stale ledger" in action.reason:
369 return "stale ledger: no confirmed catalog source"
370 if "removed local source" in action.reason:
371 return "removed local source: no active local content"
372 return action.reason
375def _add_action(report: SourceHealthReport, action: SourceHealthAction, limit: int) -> None:
376 report.cleanup_candidates += 1
377 if len(report.actions) < limit:
378 report.actions.append(action)