Coverage for src \ truenex_memory \ ingestion \ global_status.py: 77%

178 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-19 10:21 +0200

1"""Read-only global status report for the Truenex Memory global store. 

2 

3Never creates directories, databases, catalog files, ledger rows, or runs 

4schema migrations. Only reads what already exists. 

5""" 

6 

7from __future__ import annotations 

8 

9from collections import defaultdict 

10from dataclasses import dataclass, field 

11from pathlib import Path 

12import sqlite3 

13import json 

14 

15 

16# Report dataclass 

17 

18@dataclass 

19class GlobalStatusReport: 

20 catalog_path: str 

21 catalog_exists: bool 

22 catalog_version: str | None 

23 catalog_total_entries: int 

24 catalog_confirmed_entries: int 

25 catalog_by_source_type: dict[str, int] = field(default_factory=dict) 

26 catalog_by_confirmation_status: dict[str, int] = field(default_factory=dict) 

27 

28 db_path: str = "" 

29 db_exists: bool = False 

30 ledger_total_rows: int = 0 

31 ledger_by_status: dict[str, int] = field(default_factory=dict) 

32 ledger_by_source_type: dict[str, int] = field(default_factory=dict) 

33 

34 indexed_documents: int = 0 

35 indexed_chunks: int = 0 

36 last_indexed_at: str | None = None 

37 

38 problem_counts: dict[str, int] = field(default_factory=dict) 

39 recent_problems: list[dict[str, object]] = field(default_factory=list) 

40 

41 warnings: list[str] = field(default_factory=list) 

42 

43 def to_dict(self) -> dict[str, object]: 

44 return { 

45 "catalog": { 

46 "path": self.catalog_path, 

47 "exists": self.catalog_exists, 

48 "version": self.catalog_version, 

49 "total_entries": self.catalog_total_entries, 

50 "confirmed_entries": self.catalog_confirmed_entries, 

51 "by_source_type": self.catalog_by_source_type, 

52 "by_confirmation_status": self.catalog_by_confirmation_status, 

53 }, 

54 "database": { 

55 "path": self.db_path, 

56 "exists": self.db_exists, 

57 }, 

58 "ledger": { 

59 "total_rows": self.ledger_total_rows, 

60 "by_status": self.ledger_by_status, 

61 "by_source_type": self.ledger_by_source_type, 

62 }, 

63 "indexed": { 

64 "documents": self.indexed_documents, 

65 "chunks": self.indexed_chunks, 

66 "last_indexed_at": self.last_indexed_at, 

67 }, 

68 "problems": { 

69 "counts": self.problem_counts, 

70 "recent": self.recent_problems, 

71 }, 

72 "warnings": self.warnings, 

73 } 

74 

75 

76# Build function 

77 

78def build_global_status( 

79 catalog_path: Path, 

80 db_path: Path, 

81 *, 

82 recent_problem_limit: int = 10, 

83) -> GlobalStatusReport: 

84 """Build a read-only GlobalStatusReport. 

85 

86 Never creates directories, databases, catalog files, or ledger rows. 

87 """ 

88 report = GlobalStatusReport( 

89 catalog_path=str(catalog_path), 

90 catalog_exists=False, 

91 catalog_version=None, 

92 catalog_total_entries=0, 

93 catalog_confirmed_entries=0, 

94 db_path=str(db_path), 

95 db_exists=False, 

96 ) 

97 

98 _read_catalog(catalog_path, report) 

99 

100 if db_path.exists(): 

101 report.db_exists = True 

102 try: 

103 conn = _connect_readonly(db_path) 

104 except Exception: 

105 report.warnings.append(f"Database exists but cannot be opened: {db_path}") 

106 else: 

107 try: 

108 _read_ledger(conn, report, recent_problem_limit) 

109 _read_indexed(conn, report) 

110 except sqlite3.DatabaseError: 

111 report.warnings.append(f"Database exists but cannot be read: {db_path}") 

112 finally: 

113 conn.close() 

114 else: 

115 report.warnings.append(f"Database not found: {db_path}") 

116 

117 if not report.catalog_exists: 

118 report.warnings.append(f"Catalog not found: {catalog_path}") 

119 

120 return report 

121 

122 

123# Internal helpers 

124 

125def _connect_readonly(db_path: Path) -> sqlite3.Connection: 

126 """Open a read-only SQLite connection. Does NOT create the file or 

127 parent directories.""" 

128 uri_path = db_path.resolve().as_posix() 

129 conn = sqlite3.connect(f"file:{uri_path}?mode=ro", uri=True) 

130 conn.row_factory = sqlite3.Row 

131 return conn 

132 

133 

134def _table_exists(conn: sqlite3.Connection, table_name: str) -> bool: 

135 row = conn.execute( 

136 "SELECT name FROM sqlite_master WHERE type='table' AND name = ?", 

137 (table_name,), 

138 ).fetchone() 

139 return row is not None 

140 

141 

142def _read_catalog(catalog_path: Path, report: GlobalStatusReport) -> None: 

143 if not catalog_path.exists(): 

144 return 

145 

146 report.catalog_exists = True 

147 try: 

148 data = json.loads(catalog_path.read_text(encoding="utf-8")) 

149 except (json.JSONDecodeError, OSError): 

150 report.warnings.append(f"Catalog exists but is invalid/unreadable: {catalog_path}") 

151 return 

152 if not isinstance(data, dict): 

153 report.warnings.append(f"Catalog must be a JSON object: {catalog_path}") 

154 return 

155 

156 report.catalog_version = str(data.get("version", "1")) 

157 

158 entries = data.get("entries", []) 

159 if not isinstance(entries, list): 

160 report.warnings.append( 

161 f"Catalog has unexpected structure (entries is not a list): {catalog_path}" 

162 ) 

163 return 

164 

165 report.catalog_total_entries = len(entries) 

166 

167 by_source_type: dict[str, int] = defaultdict(int) 

168 by_confirmation_status: dict[str, int] = defaultdict(int) 

169 confirmed = 0 

170 

171 for entry in entries: 

172 if not isinstance(entry, dict): 

173 report.warnings.append("Catalog contains non-object entries") 

174 continue 

175 st = str(entry.get("source_type", "unknown")) 

176 cs = str(entry.get("confirmation_status", "unknown")) 

177 by_source_type[st] += 1 

178 by_confirmation_status[cs] += 1 

179 if cs == "confirmed": 

180 confirmed += 1 

181 

182 report.catalog_confirmed_entries = confirmed 

183 report.catalog_by_source_type = dict(by_source_type) 

184 report.catalog_by_confirmation_status = dict(by_confirmation_status) 

185 

186 

187def _read_ledger( 

188 conn: sqlite3.Connection, 

189 report: GlobalStatusReport, 

190 recent_problem_limit: int, 

191) -> None: 

192 # Check table exists 

193 if not _table_exists(conn, "source_ledger"): 

194 report.warnings.append("source_ledger table not found in database") 

195 return 

196 

197 # Total rows 

198 total_row = conn.execute("SELECT COUNT(*) AS cnt FROM source_ledger").fetchone() 

199 report.ledger_total_rows = total_row["cnt"] if total_row else 0 

200 

201 # By status 

202 by_status: dict[str, int] = {} 

203 for row in conn.execute( 

204 "SELECT status, COUNT(*) AS cnt FROM source_ledger GROUP BY status" 

205 ): 

206 by_status[row["status"]] = row["cnt"] 

207 report.ledger_by_status = by_status 

208 

209 # By source_type 

210 by_st: dict[str, int] = {} 

211 for row in conn.execute( 

212 "SELECT source_type, COUNT(*) AS cnt FROM source_ledger GROUP BY source_type" 

213 ): 

214 by_st[row["source_type"]] = row["cnt"] 

215 report.ledger_by_source_type = by_st 

216 

217 # Problem counts: missing, error, skipped 

218 problem_statuses = ("missing", "error", "skipped") 

219 report.problem_counts = { 

220 status: by_status.get(status, 0) for status in problem_statuses 

221 } 

222 

223 # Recent problem details 

224 placeholders = ",".join("?" for _ in problem_statuses) 

225 rows = conn.execute( 

226 f"SELECT source_id, source_path_or_alias, source_type, status, " 

227 f"error_message, last_indexed_at, updated_at " 

228 f"FROM source_ledger " 

229 f"WHERE status IN ({placeholders}) " 

230 f"ORDER BY updated_at DESC " 

231 f"LIMIT ?", 

232 (*problem_statuses, recent_problem_limit), 

233 ).fetchall() 

234 

235 report.recent_problems = [ 

236 { 

237 "source_id": r["source_id"], 

238 "source_path_or_alias": r["source_path_or_alias"], 

239 "source_type": r["source_type"], 

240 "status": r["status"], 

241 "error_message": r["error_message"], 

242 "last_indexed_at": r["last_indexed_at"], 

243 "updated_at": r["updated_at"], 

244 } 

245 for r in rows 

246 ] 

247 

248 

249def _read_indexed(conn: sqlite3.Connection, report: GlobalStatusReport) -> None: 

250 has_documents = _table_exists(conn, "documents") 

251 has_chunks = _table_exists(conn, "chunks") 

252 

253 if has_documents: 

254 doc_row = conn.execute("SELECT COUNT(*) AS cnt FROM documents").fetchone() 

255 report.indexed_documents = doc_row["cnt"] if doc_row else 0 

256 else: 

257 report.warnings.append("documents table not found in database") 

258 

259 if has_chunks: 

260 ch_row = conn.execute("SELECT COUNT(*) AS cnt FROM chunks").fetchone() 

261 report.indexed_chunks = ch_row["cnt"] if ch_row else 0 

262 else: 

263 report.warnings.append("chunks table not found in database") 

264 

265 parts: list[str] = [] 

266 if _table_exists(conn, "source_ledger"): 

267 parts.append( 

268 "SELECT last_indexed_at FROM source_ledger WHERE last_indexed_at IS NOT NULL" 

269 ) 

270 if has_documents: 

271 parts.append("SELECT last_indexed_at FROM documents WHERE last_indexed_at IS NOT NULL") 

272 if parts: 

273 last = conn.execute( 

274 "SELECT MAX(last_indexed_at) AS val FROM (" 

275 + " UNION ALL ".join(parts) 

276 + ")" 

277 ).fetchone() 

278 report.last_indexed_at = last["val"] if last and last["val"] else None 

279 

280 

281# Text formatting 

282 

283def format_status_report(report: GlobalStatusReport) -> str: 

284 """Format a GlobalStatusReport as concise human-readable text.""" 

285 lines: list[str] = ["Global Status"] 

286 lines.append("=" * 60) 

287 

288 # Warnings first 

289 if report.warnings: 

290 for w in report.warnings: 

291 lines.append(f"[WARNING] {w}") 

292 lines.append("") 

293 

294 # Catalog section 

295 lines.append(f"Catalog: {report.catalog_path}") 

296 if not report.catalog_exists: 

297 lines.append(" (not found)") 

298 elif report.catalog_version is None: 

299 lines.append(" (invalid/unreadable)") 

300 else: 

301 lines.append(f" version: {report.catalog_version}") 

302 lines.append( 

303 f" entries: {report.catalog_total_entries} total" 

304 f" / {report.catalog_confirmed_entries} confirmed" 

305 ) 

306 if report.catalog_by_source_type: 

307 types_str = " ".join( 

308 f"{k}={v}" for k, v in sorted(report.catalog_by_source_type.items()) 

309 ) 

310 lines.append(f" by source_type: {types_str}") 

311 if report.catalog_by_confirmation_status: 

312 cs_str = " ".join( 

313 f"{k}={v}" for k, v in sorted(report.catalog_by_confirmation_status.items()) 

314 ) 

315 lines.append(f" by confirmation_status: {cs_str}") 

316 

317 # DB section 

318 lines.append(f"\nDatabase: {report.db_path}") 

319 if not report.db_exists: 

320 lines.append(" (not found)") 

321 return "\n".join(lines) 

322 

323 lines.append(" exists: yes") 

324 

325 # Ledger section 

326 if report.warnings and any("source_ledger" in w for w in report.warnings): 

327 lines.append("\nLedger: (source_ledger table missing)") 

328 else: 

329 lines.append(f"\nLedger: {report.ledger_total_rows} rows") 

330 if report.ledger_by_status: 

331 st_str = " ".join( 

332 f"{k}={v}" for k, v in sorted(report.ledger_by_status.items()) 

333 ) 

334 lines.append(f" by status: {st_str}") 

335 if report.ledger_by_source_type: 

336 ty_str = " ".join( 

337 f"{k}={v}" for k, v in sorted(report.ledger_by_source_type.items()) 

338 ) 

339 lines.append(f" by source_type: {ty_str}") 

340 

341 # Indexed section 

342 lines.append("\nIndexed:") 

343 lines.append(f" documents: {report.indexed_documents}") 

344 lines.append(f" chunks: {report.indexed_chunks}") 

345 if report.last_indexed_at: 

346 lines.append(f" last_indexed_at: {report.last_indexed_at}") 

347 

348 # Problems section 

349 total_problems = sum(report.problem_counts.values()) 

350 lines.append(f"\nProblems: {total_problems}") 

351 if report.problem_counts: 

352 prob_str = " ".join(f"{k}={v}" for k, v in sorted(report.problem_counts.items())) 

353 lines.append(f" {prob_str}") 

354 

355 if report.recent_problems: 

356 lines.append(f"\nRecent problems ({len(report.recent_problems)}):") 

357 for p in report.recent_problems: 

358 err = p.get("error_message") 

359 err_str = f" -- {err}" if err else "" 

360 lines.append( 

361 f" [{p['status']}] {p['source_type']}:{p['source_path_or_alias']}" 

362 f"{err_str}" 

363 ) 

364 

365 return "\n".join(lines)