Coverage for src \ truenex_memory \ ingestion \ global_status.py: 77%
178 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-19 10:21 +0200
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-19 10:21 +0200
1"""Read-only global status report for the Truenex Memory global store.
3Never creates directories, databases, catalog files, ledger rows, or runs
4schema migrations. Only reads what already exists.
5"""
7from __future__ import annotations
9from collections import defaultdict
10from dataclasses import dataclass, field
11from pathlib import Path
12import sqlite3
13import json
16# Report dataclass
18@dataclass
19class GlobalStatusReport:
20 catalog_path: str
21 catalog_exists: bool
22 catalog_version: str | None
23 catalog_total_entries: int
24 catalog_confirmed_entries: int
25 catalog_by_source_type: dict[str, int] = field(default_factory=dict)
26 catalog_by_confirmation_status: dict[str, int] = field(default_factory=dict)
28 db_path: str = ""
29 db_exists: bool = False
30 ledger_total_rows: int = 0
31 ledger_by_status: dict[str, int] = field(default_factory=dict)
32 ledger_by_source_type: dict[str, int] = field(default_factory=dict)
34 indexed_documents: int = 0
35 indexed_chunks: int = 0
36 last_indexed_at: str | None = None
38 problem_counts: dict[str, int] = field(default_factory=dict)
39 recent_problems: list[dict[str, object]] = field(default_factory=list)
41 warnings: list[str] = field(default_factory=list)
43 def to_dict(self) -> dict[str, object]:
44 return {
45 "catalog": {
46 "path": self.catalog_path,
47 "exists": self.catalog_exists,
48 "version": self.catalog_version,
49 "total_entries": self.catalog_total_entries,
50 "confirmed_entries": self.catalog_confirmed_entries,
51 "by_source_type": self.catalog_by_source_type,
52 "by_confirmation_status": self.catalog_by_confirmation_status,
53 },
54 "database": {
55 "path": self.db_path,
56 "exists": self.db_exists,
57 },
58 "ledger": {
59 "total_rows": self.ledger_total_rows,
60 "by_status": self.ledger_by_status,
61 "by_source_type": self.ledger_by_source_type,
62 },
63 "indexed": {
64 "documents": self.indexed_documents,
65 "chunks": self.indexed_chunks,
66 "last_indexed_at": self.last_indexed_at,
67 },
68 "problems": {
69 "counts": self.problem_counts,
70 "recent": self.recent_problems,
71 },
72 "warnings": self.warnings,
73 }
76# Build function
78def build_global_status(
79 catalog_path: Path,
80 db_path: Path,
81 *,
82 recent_problem_limit: int = 10,
83) -> GlobalStatusReport:
84 """Build a read-only GlobalStatusReport.
86 Never creates directories, databases, catalog files, or ledger rows.
87 """
88 report = GlobalStatusReport(
89 catalog_path=str(catalog_path),
90 catalog_exists=False,
91 catalog_version=None,
92 catalog_total_entries=0,
93 catalog_confirmed_entries=0,
94 db_path=str(db_path),
95 db_exists=False,
96 )
98 _read_catalog(catalog_path, report)
100 if db_path.exists():
101 report.db_exists = True
102 try:
103 conn = _connect_readonly(db_path)
104 except Exception:
105 report.warnings.append(f"Database exists but cannot be opened: {db_path}")
106 else:
107 try:
108 _read_ledger(conn, report, recent_problem_limit)
109 _read_indexed(conn, report)
110 except sqlite3.DatabaseError:
111 report.warnings.append(f"Database exists but cannot be read: {db_path}")
112 finally:
113 conn.close()
114 else:
115 report.warnings.append(f"Database not found: {db_path}")
117 if not report.catalog_exists:
118 report.warnings.append(f"Catalog not found: {catalog_path}")
120 return report
123# Internal helpers
125def _connect_readonly(db_path: Path) -> sqlite3.Connection:
126 """Open a read-only SQLite connection. Does NOT create the file or
127 parent directories."""
128 uri_path = db_path.resolve().as_posix()
129 conn = sqlite3.connect(f"file:{uri_path}?mode=ro", uri=True)
130 conn.row_factory = sqlite3.Row
131 return conn
134def _table_exists(conn: sqlite3.Connection, table_name: str) -> bool:
135 row = conn.execute(
136 "SELECT name FROM sqlite_master WHERE type='table' AND name = ?",
137 (table_name,),
138 ).fetchone()
139 return row is not None
142def _read_catalog(catalog_path: Path, report: GlobalStatusReport) -> None:
143 if not catalog_path.exists():
144 return
146 report.catalog_exists = True
147 try:
148 data = json.loads(catalog_path.read_text(encoding="utf-8"))
149 except (json.JSONDecodeError, OSError):
150 report.warnings.append(f"Catalog exists but is invalid/unreadable: {catalog_path}")
151 return
152 if not isinstance(data, dict):
153 report.warnings.append(f"Catalog must be a JSON object: {catalog_path}")
154 return
156 report.catalog_version = str(data.get("version", "1"))
158 entries = data.get("entries", [])
159 if not isinstance(entries, list):
160 report.warnings.append(
161 f"Catalog has unexpected structure (entries is not a list): {catalog_path}"
162 )
163 return
165 report.catalog_total_entries = len(entries)
167 by_source_type: dict[str, int] = defaultdict(int)
168 by_confirmation_status: dict[str, int] = defaultdict(int)
169 confirmed = 0
171 for entry in entries:
172 if not isinstance(entry, dict):
173 report.warnings.append("Catalog contains non-object entries")
174 continue
175 st = str(entry.get("source_type", "unknown"))
176 cs = str(entry.get("confirmation_status", "unknown"))
177 by_source_type[st] += 1
178 by_confirmation_status[cs] += 1
179 if cs == "confirmed":
180 confirmed += 1
182 report.catalog_confirmed_entries = confirmed
183 report.catalog_by_source_type = dict(by_source_type)
184 report.catalog_by_confirmation_status = dict(by_confirmation_status)
187def _read_ledger(
188 conn: sqlite3.Connection,
189 report: GlobalStatusReport,
190 recent_problem_limit: int,
191) -> None:
192 # Check table exists
193 if not _table_exists(conn, "source_ledger"):
194 report.warnings.append("source_ledger table not found in database")
195 return
197 # Total rows
198 total_row = conn.execute("SELECT COUNT(*) AS cnt FROM source_ledger").fetchone()
199 report.ledger_total_rows = total_row["cnt"] if total_row else 0
201 # By status
202 by_status: dict[str, int] = {}
203 for row in conn.execute(
204 "SELECT status, COUNT(*) AS cnt FROM source_ledger GROUP BY status"
205 ):
206 by_status[row["status"]] = row["cnt"]
207 report.ledger_by_status = by_status
209 # By source_type
210 by_st: dict[str, int] = {}
211 for row in conn.execute(
212 "SELECT source_type, COUNT(*) AS cnt FROM source_ledger GROUP BY source_type"
213 ):
214 by_st[row["source_type"]] = row["cnt"]
215 report.ledger_by_source_type = by_st
217 # Problem counts: missing, error, skipped
218 problem_statuses = ("missing", "error", "skipped")
219 report.problem_counts = {
220 status: by_status.get(status, 0) for status in problem_statuses
221 }
223 # Recent problem details
224 placeholders = ",".join("?" for _ in problem_statuses)
225 rows = conn.execute(
226 f"SELECT source_id, source_path_or_alias, source_type, status, "
227 f"error_message, last_indexed_at, updated_at "
228 f"FROM source_ledger "
229 f"WHERE status IN ({placeholders}) "
230 f"ORDER BY updated_at DESC "
231 f"LIMIT ?",
232 (*problem_statuses, recent_problem_limit),
233 ).fetchall()
235 report.recent_problems = [
236 {
237 "source_id": r["source_id"],
238 "source_path_or_alias": r["source_path_or_alias"],
239 "source_type": r["source_type"],
240 "status": r["status"],
241 "error_message": r["error_message"],
242 "last_indexed_at": r["last_indexed_at"],
243 "updated_at": r["updated_at"],
244 }
245 for r in rows
246 ]
249def _read_indexed(conn: sqlite3.Connection, report: GlobalStatusReport) -> None:
250 has_documents = _table_exists(conn, "documents")
251 has_chunks = _table_exists(conn, "chunks")
253 if has_documents:
254 doc_row = conn.execute("SELECT COUNT(*) AS cnt FROM documents").fetchone()
255 report.indexed_documents = doc_row["cnt"] if doc_row else 0
256 else:
257 report.warnings.append("documents table not found in database")
259 if has_chunks:
260 ch_row = conn.execute("SELECT COUNT(*) AS cnt FROM chunks").fetchone()
261 report.indexed_chunks = ch_row["cnt"] if ch_row else 0
262 else:
263 report.warnings.append("chunks table not found in database")
265 parts: list[str] = []
266 if _table_exists(conn, "source_ledger"):
267 parts.append(
268 "SELECT last_indexed_at FROM source_ledger WHERE last_indexed_at IS NOT NULL"
269 )
270 if has_documents:
271 parts.append("SELECT last_indexed_at FROM documents WHERE last_indexed_at IS NOT NULL")
272 if parts:
273 last = conn.execute(
274 "SELECT MAX(last_indexed_at) AS val FROM ("
275 + " UNION ALL ".join(parts)
276 + ")"
277 ).fetchone()
278 report.last_indexed_at = last["val"] if last and last["val"] else None
281# Text formatting
283def format_status_report(report: GlobalStatusReport) -> str:
284 """Format a GlobalStatusReport as concise human-readable text."""
285 lines: list[str] = ["Global Status"]
286 lines.append("=" * 60)
288 # Warnings first
289 if report.warnings:
290 for w in report.warnings:
291 lines.append(f"[WARNING] {w}")
292 lines.append("")
294 # Catalog section
295 lines.append(f"Catalog: {report.catalog_path}")
296 if not report.catalog_exists:
297 lines.append(" (not found)")
298 elif report.catalog_version is None:
299 lines.append(" (invalid/unreadable)")
300 else:
301 lines.append(f" version: {report.catalog_version}")
302 lines.append(
303 f" entries: {report.catalog_total_entries} total"
304 f" / {report.catalog_confirmed_entries} confirmed"
305 )
306 if report.catalog_by_source_type:
307 types_str = " ".join(
308 f"{k}={v}" for k, v in sorted(report.catalog_by_source_type.items())
309 )
310 lines.append(f" by source_type: {types_str}")
311 if report.catalog_by_confirmation_status:
312 cs_str = " ".join(
313 f"{k}={v}" for k, v in sorted(report.catalog_by_confirmation_status.items())
314 )
315 lines.append(f" by confirmation_status: {cs_str}")
317 # DB section
318 lines.append(f"\nDatabase: {report.db_path}")
319 if not report.db_exists:
320 lines.append(" (not found)")
321 return "\n".join(lines)
323 lines.append(" exists: yes")
325 # Ledger section
326 if report.warnings and any("source_ledger" in w for w in report.warnings):
327 lines.append("\nLedger: (source_ledger table missing)")
328 else:
329 lines.append(f"\nLedger: {report.ledger_total_rows} rows")
330 if report.ledger_by_status:
331 st_str = " ".join(
332 f"{k}={v}" for k, v in sorted(report.ledger_by_status.items())
333 )
334 lines.append(f" by status: {st_str}")
335 if report.ledger_by_source_type:
336 ty_str = " ".join(
337 f"{k}={v}" for k, v in sorted(report.ledger_by_source_type.items())
338 )
339 lines.append(f" by source_type: {ty_str}")
341 # Indexed section
342 lines.append("\nIndexed:")
343 lines.append(f" documents: {report.indexed_documents}")
344 lines.append(f" chunks: {report.indexed_chunks}")
345 if report.last_indexed_at:
346 lines.append(f" last_indexed_at: {report.last_indexed_at}")
348 # Problems section
349 total_problems = sum(report.problem_counts.values())
350 lines.append(f"\nProblems: {total_problems}")
351 if report.problem_counts:
352 prob_str = " ".join(f"{k}={v}" for k, v in sorted(report.problem_counts.items()))
353 lines.append(f" {prob_str}")
355 if report.recent_problems:
356 lines.append(f"\nRecent problems ({len(report.recent_problems)}):")
357 for p in report.recent_problems:
358 err = p.get("error_message")
359 err_str = f" -- {err}" if err else ""
360 lines.append(
361 f" [{p['status']}] {p['source_type']}:{p['source_path_or_alias']}"
362 f"{err_str}"
363 )
365 return "\n".join(lines)