Coverage for src \ truenex_memory \ ingestion \ global_auto_status.py: 94%

205 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-19 10:21 +0200

1"""Read-only auto memory status report for Phase 3.2.""" 

2 

3from __future__ import annotations 

4 

5from dataclasses import dataclass, field 

6from datetime import datetime, timezone 

7from pathlib import Path 

8import sqlite3 

9 

10from truenex_memory.ingestion.global_auto_memory import analyze_auto_memory_candidates 

11from truenex_memory.ingestion.global_status import ( 

12 GlobalStatusReport, 

13 build_global_status, 

14) 

15 

16 

17@dataclass 

18class AutoStatusReport: 

19 """Global status plus Phase 3 auto-memory readiness information.""" 

20 

21 global_status: GlobalStatusReport 

22 phase: str = "3.2" 

23 ready: bool = False 

24 last_auto_run_at: str | None = None 

25 confirmed_sources: int = 0 

26 active_sources: int = 0 

27 missing_sources: int = 0 

28 error_sources: int = 0 

29 skipped_sources: int = 0 

30 actionable_skipped_sources: int = 0 

31 expected_skipped_sources: int = 0 

32 unstable_session_sources: int = 0 

33 transient_unstable_session_sources: int = 0 

34 stale_unstable_session_sources: int = 0 

35 unverified_memory_count: int = 0 

36 auto_memory_candidates: int = 0 

37 duplicate_skips: int = 0 

38 duplicate_active_skips: int = 0 

39 duplicate_unverified_skips: int = 0 

40 duplicate_rejected_skips: int = 0 

41 low_confidence_skips: int = 0 

42 non_document_skips: int = 0 

43 noisy_session_skips: int = 0 

44 skipped_reason_counts: list[dict[str, object]] = field(default_factory=list) 

45 unstable_session_files: list[dict[str, object]] = field(default_factory=list) 

46 warnings: list[str] = field(default_factory=list) 

47 

48 def to_dict(self) -> dict[str, object]: 

49 payload = self.global_status.to_dict() 

50 payload["auto"] = { 

51 "phase": self.phase, 

52 "ready": self.ready, 

53 "last_auto_run_at": self.last_auto_run_at, 

54 "confirmed_sources": self.confirmed_sources, 

55 "active_sources": self.active_sources, 

56 "missing_sources": self.missing_sources, 

57 "error_sources": self.error_sources, 

58 "skipped_sources": self.skipped_sources, 

59 "actionable_skipped_sources": self.actionable_skipped_sources, 

60 "expected_skipped_sources": self.expected_skipped_sources, 

61 "unstable_session_sources": self.unstable_session_sources, 

62 "transient_unstable_session_sources": self.transient_unstable_session_sources, 

63 "stale_unstable_session_sources": self.stale_unstable_session_sources, 

64 "unverified_memory_count": self.unverified_memory_count, 

65 "auto_memory_candidates": self.auto_memory_candidates, 

66 "duplicate_skips": self.duplicate_skips, 

67 "duplicate_active_skips": self.duplicate_active_skips, 

68 "duplicate_unverified_skips": self.duplicate_unverified_skips, 

69 "duplicate_rejected_skips": self.duplicate_rejected_skips, 

70 "low_confidence_skips": self.low_confidence_skips, 

71 "non_document_skips": self.non_document_skips, 

72 "noisy_session_skips": self.noisy_session_skips, 

73 "skipped_reason_counts": self.skipped_reason_counts, 

74 "unstable_session_files": self.unstable_session_files, 

75 "warnings": self.warnings, 

76 } 

77 return payload 

78 

79 

80def build_auto_status( 

81 catalog_path: Path, 

82 db_path: Path, 

83 *, 

84 recent_problem_limit: int = 10, 

85 stability_seconds: int = 120, 

86) -> AutoStatusReport: 

87 """Build a read-only Phase 3 auto status report. 

88 

89 This is intentionally a thin wrapper around ``build_global_status``. It 

90 never creates directories, databases, catalog files, or ledger rows. 

91 """ 

92 global_status = build_global_status( 

93 catalog_path=catalog_path, 

94 db_path=db_path, 

95 recent_problem_limit=recent_problem_limit, 

96 ) 

97 report = AutoStatusReport(global_status=global_status) 

98 report.last_auto_run_at = global_status.last_indexed_at 

99 report.confirmed_sources = global_status.catalog_confirmed_entries 

100 report.active_sources = global_status.ledger_by_status.get("active", 0) 

101 report.missing_sources = global_status.ledger_by_status.get("missing", 0) 

102 report.error_sources = global_status.ledger_by_status.get("error", 0) 

103 

104 if global_status.db_exists: 

105 _read_auto_details(db_path, report, stability_seconds=stability_seconds) 

106 

107 _evaluate_readiness(report) 

108 return report 

109 

110 

111def format_auto_status_report(report: AutoStatusReport) -> str: 

112 """Format an AutoStatusReport as concise human-readable text.""" 

113 base = report.global_status 

114 lines: list[str] = ["Auto Memory Status (Phase 3.2)"] 

115 lines.append("=" * 60) 

116 

117 if base.warnings: 

118 for warning in base.warnings: 

119 lines.append(f"[WARNING] {warning}") 

120 lines.append("") 

121 

122 lines.append(f"Catalog: {base.catalog_path}") 

123 if not base.catalog_exists: 

124 lines.append(" (not found)") 

125 else: 

126 lines.append( 

127 f" entries: {base.catalog_total_entries} total" 

128 f" / {base.catalog_confirmed_entries} confirmed" 

129 ) 

130 

131 lines.append(f"\nDatabase: {base.db_path}") 

132 lines.append(" exists: yes" if base.db_exists else " (not found)") 

133 

134 lines.append("\nAuto Readiness:") 

135 lines.append(f" ready: {'yes' if report.ready else 'NO'}") 

136 lines.append(f" last_auto_run_at: {report.last_auto_run_at or 'never'}") 

137 

138 lines.append("\nAuto Counts:") 

139 lines.append(f" confirmed_sources: {report.confirmed_sources}") 

140 lines.append(f" active_sources: {report.active_sources}") 

141 lines.append(f" missing_sources: {report.missing_sources}") 

142 lines.append(f" error_sources: {report.error_sources}") 

143 lines.append(f" skipped_sources: {report.skipped_sources}") 

144 lines.append(f" actionable_skipped_sources: {report.actionable_skipped_sources}") 

145 lines.append(f" expected_skipped_sources: {report.expected_skipped_sources}") 

146 lines.append(f" unstable_session_sources: {report.unstable_session_sources}") 

147 lines.append( 

148 f" transient_unstable_session_sources: " 

149 f"{report.transient_unstable_session_sources}" 

150 ) 

151 lines.append(f" stale_unstable_session_sources: {report.stale_unstable_session_sources}") 

152 lines.append(f" unverified_memory_count: {report.unverified_memory_count}") 

153 lines.append(f" auto_memory_candidates: {report.auto_memory_candidates}") 

154 lines.append(f" duplicate_skips: {report.duplicate_skips}") 

155 lines.append(f" duplicate_active_skips: {report.duplicate_active_skips}") 

156 lines.append(f" duplicate_unverified_skips: {report.duplicate_unverified_skips}") 

157 lines.append(f" duplicate_rejected_skips: {report.duplicate_rejected_skips}") 

158 lines.append(f" low_confidence_skips: {report.low_confidence_skips}") 

159 lines.append(f" non_document_skips: {report.non_document_skips}") 

160 lines.append(f" noisy_session_skips: {report.noisy_session_skips}") 

161 

162 if report.skipped_reason_counts: 

163 lines.append("\nSkipped Breakdown (all ledger skipped rows):") 

164 for item in report.skipped_reason_counts[:10]: 

165 lines.append( 

166 f" - {item['count']}x {item['source_type']}: {item['reason']}" 

167 ) 

168 

169 if report.unstable_session_files: 

170 lines.append("\nUnstable Session Files:") 

171 for item in report.unstable_session_files[:10]: 

172 lines.append( 

173 f" - {item['count']} skipped exchange(s): {item['path']}" 

174 ) 

175 

176 if report.warnings: 

177 lines.append("\nAuto Warnings:") 

178 for warning in report.warnings: 

179 lines.append(f" - {warning}") 

180 

181 return "\n".join(lines) 

182 

183 

184def _evaluate_readiness(report: AutoStatusReport) -> None: 

185 base = report.global_status 

186 warnings: list[str] = [] 

187 

188 if not base.catalog_exists: 

189 warnings.append("catalog not found; confirm sources before auto status is ready") 

190 elif report.confirmed_sources == 0: 

191 warnings.append("catalog has no confirmed sources") 

192 

193 if not base.db_exists: 

194 warnings.append("database not found; run global auto run after confirming sources") 

195 elif report.active_sources == 0: 

196 warnings.append("no active indexed sources found") 

197 

198 if report.missing_sources: 

199 warnings.append(f"{report.missing_sources} source(s) are missing") 

200 if report.error_sources: 

201 warnings.append(f"{report.error_sources} source(s) have indexing errors") 

202 other_actionable_skipped = max( 

203 0, 

204 report.actionable_skipped_sources - report.stale_unstable_session_sources, 

205 ) 

206 if other_actionable_skipped: 

207 warnings.append( 

208 f"{other_actionable_skipped} actionable non-expected " 

209 "source(s) are skipped" 

210 ) 

211 if report.stale_unstable_session_sources: 

212 warnings.append( 

213 f"{report.stale_unstable_session_sources} agent session source(s) " 

214 "remain unstable after the stability window" 

215 ) 

216 

217 report.warnings = warnings 

218 report.ready = not warnings 

219 

220 

221def _connect_readonly(db_path: Path) -> sqlite3.Connection: 

222 uri_path = db_path.resolve().as_posix() 

223 conn = sqlite3.connect(f"file:{uri_path}?mode=ro", uri=True) 

224 conn.row_factory = sqlite3.Row 

225 return conn 

226 

227 

228def _table_exists(conn: sqlite3.Connection, table_name: str) -> bool: 

229 row = conn.execute( 

230 "SELECT name FROM sqlite_master WHERE type='table' AND name = ?", 

231 (table_name,), 

232 ).fetchone() 

233 return row is not None 

234 

235 

236def _read_auto_details( 

237 db_path: Path, 

238 report: AutoStatusReport, 

239 *, 

240 stability_seconds: int, 

241) -> None: 

242 try: 

243 conn = _connect_readonly(db_path) 

244 except Exception: 

245 report.warnings.append("database exists but cannot be opened for auto details") 

246 return 

247 

248 try: 

249 if not _table_exists(conn, "source_ledger"): 

250 return 

251 expected_row = conn.execute( 

252 """ 

253 SELECT COUNT(*) AS cnt 

254 FROM source_ledger 

255 WHERE status = 'skipped' 

256 AND ( 

257 source_type = 'server_alias' 

258 OR lower(coalesce(error_message, '')) LIKE '%non-local path%' 

259 OR lower(coalesce(error_message, '')) LIKE '%stale ledger%' 

260 OR lower(coalesce(error_message, '')) LIKE '%removed local source%' 

261 OR lower(coalesce(error_message, '')) LIKE '%disabled catalog source%' 

262 OR lower(coalesce(error_message, '')) LIKE '%no indexable records%' 

263 ) 

264 """ 

265 ).fetchone() 

266 expected_skipped = int(expected_row["cnt"]) if expected_row else 0 

267 report.expected_skipped_sources = expected_skipped 

268 

269 skipped_row = conn.execute( 

270 """ 

271 SELECT COUNT(*) AS cnt 

272 FROM source_ledger 

273 WHERE status = 'skipped' 

274 AND source_type != 'server_alias' 

275 AND lower(coalesce(error_message, '')) NOT LIKE '%non-local path%' 

276 AND lower(coalesce(error_message, '')) NOT LIKE '%stale ledger%' 

277 AND lower(coalesce(error_message, '')) NOT LIKE '%removed local source%' 

278 AND lower(coalesce(error_message, '')) NOT LIKE '%disabled catalog source%' 

279 AND lower(coalesce(error_message, '')) NOT LIKE '%no indexable records%' 

280 """ 

281 ).fetchone() 

282 raw_non_expected_skipped = int(skipped_row["cnt"]) if skipped_row else 0 

283 

284 unstable_row = conn.execute( 

285 """ 

286 SELECT COUNT(*) AS cnt 

287 FROM source_ledger 

288 WHERE status = 'skipped' 

289 AND source_type = 'agent_session' 

290 AND ( 

291 lower(coalesce(error_message, '')) LIKE '%not yet stable%' 

292 OR lower(coalesce(error_message, '')) LIKE '%unstable%' 

293 ) 

294 """ 

295 ).fetchone() 

296 report.unstable_session_sources = int(unstable_row["cnt"]) if unstable_row else 0 

297 

298 reason_rows = conn.execute( 

299 """ 

300 SELECT 

301 source_type, 

302 coalesce(error_message, '') AS reason, 

303 COUNT(*) AS cnt 

304 FROM source_ledger 

305 WHERE status = 'skipped' 

306 GROUP BY source_type, reason 

307 ORDER BY cnt DESC, source_type, reason 

308 LIMIT 20 

309 """ 

310 ).fetchall() 

311 report.skipped_reason_counts = [ 

312 { 

313 "source_type": row["source_type"], 

314 "reason": row["reason"], 

315 "count": int(row["cnt"]), 

316 } 

317 for row in reason_rows 

318 ] 

319 

320 unstable_file_rows = conn.execute( 

321 """ 

322 SELECT 

323 CASE 

324 WHEN instr(source_path_or_alias, '::') > 0 

325 THEN substr(source_path_or_alias, 1, instr(source_path_or_alias, '::') - 1) 

326 ELSE source_path_or_alias 

327 END AS path, 

328 COUNT(*) AS cnt, 

329 MIN(last_modified_at) AS first_last_modified_at, 

330 MAX(last_modified_at) AS last_last_modified_at, 

331 MAX(updated_at) AS last_updated_at 

332 FROM source_ledger 

333 WHERE status = 'skipped' 

334 AND source_type = 'agent_session' 

335 AND ( 

336 lower(coalesce(error_message, '')) LIKE '%not yet stable%' 

337 OR lower(coalesce(error_message, '')) LIKE '%unstable%' 

338 ) 

339 GROUP BY path 

340 ORDER BY cnt DESC, path 

341 """ 

342 ).fetchall() 

343 report.unstable_session_files = [ 

344 { 

345 "path": row["path"], 

346 "count": int(row["cnt"]), 

347 "first_last_modified_at": row["first_last_modified_at"], 

348 "last_last_modified_at": row["last_last_modified_at"], 

349 "last_updated_at": row["last_updated_at"], 

350 } 

351 for row in unstable_file_rows 

352 ] 

353 _classify_unstable_session_freshness(report, stability_seconds) 

354 report.skipped_sources = raw_non_expected_skipped 

355 report.actionable_skipped_sources = max( 

356 0, 

357 raw_non_expected_skipped - report.transient_unstable_session_sources, 

358 ) 

359 

360 if _table_exists(conn, "memory_nodes"): 

361 unverified_row = conn.execute( 

362 """ 

363 SELECT COUNT(*) AS cnt 

364 FROM memory_nodes 

365 WHERE status = 'unverified' 

366 AND created_by = 'auto' 

367 AND source_kind = 'auto' 

368 """ 

369 ).fetchone() 

370 report.unverified_memory_count = ( 

371 int(unverified_row["cnt"]) if unverified_row else 0 

372 ) 

373 

374 telemetry = analyze_auto_memory_candidates(db_path) 

375 report.auto_memory_candidates = telemetry.candidates 

376 report.duplicate_skips = telemetry.duplicate_skips 

377 report.duplicate_active_skips = telemetry.duplicate_active 

378 report.duplicate_unverified_skips = telemetry.duplicate_unverified 

379 report.duplicate_rejected_skips = telemetry.duplicate_rejected 

380 report.low_confidence_skips = telemetry.low_confidence 

381 report.non_document_skips = telemetry.non_document_skipped 

382 report.noisy_session_skips = telemetry.noisy_session_skipped 

383 except sqlite3.DatabaseError: 

384 report.warnings.append("database readable but auto detail query failed") 

385 finally: 

386 conn.close() 

387 

388 

389def _classify_unstable_session_freshness( 

390 report: AutoStatusReport, 

391 stability_seconds: int, 

392) -> None: 

393 """Split unstable session skips into transient current writes and stale rows.""" 

394 if stability_seconds <= 0: 

395 report.stale_unstable_session_sources = report.unstable_session_sources 

396 report.transient_unstable_session_sources = 0 

397 for item in report.unstable_session_files: 

398 item["freshness"] = "stale" 

399 return 

400 

401 now = datetime.now(timezone.utc) 

402 transient = 0 

403 stale = 0 

404 for item in report.unstable_session_files: 

405 count = int(item.get("count") or 0) 

406 last_modified = _parse_iso_datetime(item.get("last_last_modified_at")) 

407 if last_modified is None: 

408 stale += count 

409 item["freshness"] = "stale" 

410 continue 

411 age_seconds = (now - last_modified).total_seconds() 

412 item["age_seconds"] = max(0.0, round(age_seconds, 3)) 

413 if age_seconds < stability_seconds: 

414 transient += count 

415 item["freshness"] = "transient" 

416 else: 

417 stale += count 

418 item["freshness"] = "stale" 

419 

420 report.transient_unstable_session_sources = transient 

421 report.stale_unstable_session_sources = stale 

422 

423 

424def _parse_iso_datetime(value: object) -> datetime | None: 

425 if not isinstance(value, str) or not value: 

426 return None 

427 try: 

428 parsed = datetime.fromisoformat(value.replace("Z", "+00:00")) 

429 except ValueError: 

430 return None 

431 if parsed.tzinfo is None: 

432 return parsed.replace(tzinfo=timezone.utc) 

433 return parsed.astimezone(timezone.utc)