Coverage for src \ truenex_memory \ ingestion \ global_auto_status.py: 94%
205 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-19 10:21 +0200
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-19 10:21 +0200
1"""Read-only auto memory status report for Phase 3.2."""
3from __future__ import annotations
5from dataclasses import dataclass, field
6from datetime import datetime, timezone
7from pathlib import Path
8import sqlite3
10from truenex_memory.ingestion.global_auto_memory import analyze_auto_memory_candidates
11from truenex_memory.ingestion.global_status import (
12 GlobalStatusReport,
13 build_global_status,
14)
17@dataclass
18class AutoStatusReport:
19 """Global status plus Phase 3 auto-memory readiness information."""
21 global_status: GlobalStatusReport
22 phase: str = "3.2"
23 ready: bool = False
24 last_auto_run_at: str | None = None
25 confirmed_sources: int = 0
26 active_sources: int = 0
27 missing_sources: int = 0
28 error_sources: int = 0
29 skipped_sources: int = 0
30 actionable_skipped_sources: int = 0
31 expected_skipped_sources: int = 0
32 unstable_session_sources: int = 0
33 transient_unstable_session_sources: int = 0
34 stale_unstable_session_sources: int = 0
35 unverified_memory_count: int = 0
36 auto_memory_candidates: int = 0
37 duplicate_skips: int = 0
38 duplicate_active_skips: int = 0
39 duplicate_unverified_skips: int = 0
40 duplicate_rejected_skips: int = 0
41 low_confidence_skips: int = 0
42 non_document_skips: int = 0
43 noisy_session_skips: int = 0
44 skipped_reason_counts: list[dict[str, object]] = field(default_factory=list)
45 unstable_session_files: list[dict[str, object]] = field(default_factory=list)
46 warnings: list[str] = field(default_factory=list)
48 def to_dict(self) -> dict[str, object]:
49 payload = self.global_status.to_dict()
50 payload["auto"] = {
51 "phase": self.phase,
52 "ready": self.ready,
53 "last_auto_run_at": self.last_auto_run_at,
54 "confirmed_sources": self.confirmed_sources,
55 "active_sources": self.active_sources,
56 "missing_sources": self.missing_sources,
57 "error_sources": self.error_sources,
58 "skipped_sources": self.skipped_sources,
59 "actionable_skipped_sources": self.actionable_skipped_sources,
60 "expected_skipped_sources": self.expected_skipped_sources,
61 "unstable_session_sources": self.unstable_session_sources,
62 "transient_unstable_session_sources": self.transient_unstable_session_sources,
63 "stale_unstable_session_sources": self.stale_unstable_session_sources,
64 "unverified_memory_count": self.unverified_memory_count,
65 "auto_memory_candidates": self.auto_memory_candidates,
66 "duplicate_skips": self.duplicate_skips,
67 "duplicate_active_skips": self.duplicate_active_skips,
68 "duplicate_unverified_skips": self.duplicate_unverified_skips,
69 "duplicate_rejected_skips": self.duplicate_rejected_skips,
70 "low_confidence_skips": self.low_confidence_skips,
71 "non_document_skips": self.non_document_skips,
72 "noisy_session_skips": self.noisy_session_skips,
73 "skipped_reason_counts": self.skipped_reason_counts,
74 "unstable_session_files": self.unstable_session_files,
75 "warnings": self.warnings,
76 }
77 return payload
80def build_auto_status(
81 catalog_path: Path,
82 db_path: Path,
83 *,
84 recent_problem_limit: int = 10,
85 stability_seconds: int = 120,
86) -> AutoStatusReport:
87 """Build a read-only Phase 3 auto status report.
89 This is intentionally a thin wrapper around ``build_global_status``. It
90 never creates directories, databases, catalog files, or ledger rows.
91 """
92 global_status = build_global_status(
93 catalog_path=catalog_path,
94 db_path=db_path,
95 recent_problem_limit=recent_problem_limit,
96 )
97 report = AutoStatusReport(global_status=global_status)
98 report.last_auto_run_at = global_status.last_indexed_at
99 report.confirmed_sources = global_status.catalog_confirmed_entries
100 report.active_sources = global_status.ledger_by_status.get("active", 0)
101 report.missing_sources = global_status.ledger_by_status.get("missing", 0)
102 report.error_sources = global_status.ledger_by_status.get("error", 0)
104 if global_status.db_exists:
105 _read_auto_details(db_path, report, stability_seconds=stability_seconds)
107 _evaluate_readiness(report)
108 return report
111def format_auto_status_report(report: AutoStatusReport) -> str:
112 """Format an AutoStatusReport as concise human-readable text."""
113 base = report.global_status
114 lines: list[str] = ["Auto Memory Status (Phase 3.2)"]
115 lines.append("=" * 60)
117 if base.warnings:
118 for warning in base.warnings:
119 lines.append(f"[WARNING] {warning}")
120 lines.append("")
122 lines.append(f"Catalog: {base.catalog_path}")
123 if not base.catalog_exists:
124 lines.append(" (not found)")
125 else:
126 lines.append(
127 f" entries: {base.catalog_total_entries} total"
128 f" / {base.catalog_confirmed_entries} confirmed"
129 )
131 lines.append(f"\nDatabase: {base.db_path}")
132 lines.append(" exists: yes" if base.db_exists else " (not found)")
134 lines.append("\nAuto Readiness:")
135 lines.append(f" ready: {'yes' if report.ready else 'NO'}")
136 lines.append(f" last_auto_run_at: {report.last_auto_run_at or 'never'}")
138 lines.append("\nAuto Counts:")
139 lines.append(f" confirmed_sources: {report.confirmed_sources}")
140 lines.append(f" active_sources: {report.active_sources}")
141 lines.append(f" missing_sources: {report.missing_sources}")
142 lines.append(f" error_sources: {report.error_sources}")
143 lines.append(f" skipped_sources: {report.skipped_sources}")
144 lines.append(f" actionable_skipped_sources: {report.actionable_skipped_sources}")
145 lines.append(f" expected_skipped_sources: {report.expected_skipped_sources}")
146 lines.append(f" unstable_session_sources: {report.unstable_session_sources}")
147 lines.append(
148 f" transient_unstable_session_sources: "
149 f"{report.transient_unstable_session_sources}"
150 )
151 lines.append(f" stale_unstable_session_sources: {report.stale_unstable_session_sources}")
152 lines.append(f" unverified_memory_count: {report.unverified_memory_count}")
153 lines.append(f" auto_memory_candidates: {report.auto_memory_candidates}")
154 lines.append(f" duplicate_skips: {report.duplicate_skips}")
155 lines.append(f" duplicate_active_skips: {report.duplicate_active_skips}")
156 lines.append(f" duplicate_unverified_skips: {report.duplicate_unverified_skips}")
157 lines.append(f" duplicate_rejected_skips: {report.duplicate_rejected_skips}")
158 lines.append(f" low_confidence_skips: {report.low_confidence_skips}")
159 lines.append(f" non_document_skips: {report.non_document_skips}")
160 lines.append(f" noisy_session_skips: {report.noisy_session_skips}")
162 if report.skipped_reason_counts:
163 lines.append("\nSkipped Breakdown (all ledger skipped rows):")
164 for item in report.skipped_reason_counts[:10]:
165 lines.append(
166 f" - {item['count']}x {item['source_type']}: {item['reason']}"
167 )
169 if report.unstable_session_files:
170 lines.append("\nUnstable Session Files:")
171 for item in report.unstable_session_files[:10]:
172 lines.append(
173 f" - {item['count']} skipped exchange(s): {item['path']}"
174 )
176 if report.warnings:
177 lines.append("\nAuto Warnings:")
178 for warning in report.warnings:
179 lines.append(f" - {warning}")
181 return "\n".join(lines)
184def _evaluate_readiness(report: AutoStatusReport) -> None:
185 base = report.global_status
186 warnings: list[str] = []
188 if not base.catalog_exists:
189 warnings.append("catalog not found; confirm sources before auto status is ready")
190 elif report.confirmed_sources == 0:
191 warnings.append("catalog has no confirmed sources")
193 if not base.db_exists:
194 warnings.append("database not found; run global auto run after confirming sources")
195 elif report.active_sources == 0:
196 warnings.append("no active indexed sources found")
198 if report.missing_sources:
199 warnings.append(f"{report.missing_sources} source(s) are missing")
200 if report.error_sources:
201 warnings.append(f"{report.error_sources} source(s) have indexing errors")
202 other_actionable_skipped = max(
203 0,
204 report.actionable_skipped_sources - report.stale_unstable_session_sources,
205 )
206 if other_actionable_skipped:
207 warnings.append(
208 f"{other_actionable_skipped} actionable non-expected "
209 "source(s) are skipped"
210 )
211 if report.stale_unstable_session_sources:
212 warnings.append(
213 f"{report.stale_unstable_session_sources} agent session source(s) "
214 "remain unstable after the stability window"
215 )
217 report.warnings = warnings
218 report.ready = not warnings
221def _connect_readonly(db_path: Path) -> sqlite3.Connection:
222 uri_path = db_path.resolve().as_posix()
223 conn = sqlite3.connect(f"file:{uri_path}?mode=ro", uri=True)
224 conn.row_factory = sqlite3.Row
225 return conn
228def _table_exists(conn: sqlite3.Connection, table_name: str) -> bool:
229 row = conn.execute(
230 "SELECT name FROM sqlite_master WHERE type='table' AND name = ?",
231 (table_name,),
232 ).fetchone()
233 return row is not None
236def _read_auto_details(
237 db_path: Path,
238 report: AutoStatusReport,
239 *,
240 stability_seconds: int,
241) -> None:
242 try:
243 conn = _connect_readonly(db_path)
244 except Exception:
245 report.warnings.append("database exists but cannot be opened for auto details")
246 return
248 try:
249 if not _table_exists(conn, "source_ledger"):
250 return
251 expected_row = conn.execute(
252 """
253 SELECT COUNT(*) AS cnt
254 FROM source_ledger
255 WHERE status = 'skipped'
256 AND (
257 source_type = 'server_alias'
258 OR lower(coalesce(error_message, '')) LIKE '%non-local path%'
259 OR lower(coalesce(error_message, '')) LIKE '%stale ledger%'
260 OR lower(coalesce(error_message, '')) LIKE '%removed local source%'
261 OR lower(coalesce(error_message, '')) LIKE '%disabled catalog source%'
262 OR lower(coalesce(error_message, '')) LIKE '%no indexable records%'
263 )
264 """
265 ).fetchone()
266 expected_skipped = int(expected_row["cnt"]) if expected_row else 0
267 report.expected_skipped_sources = expected_skipped
269 skipped_row = conn.execute(
270 """
271 SELECT COUNT(*) AS cnt
272 FROM source_ledger
273 WHERE status = 'skipped'
274 AND source_type != 'server_alias'
275 AND lower(coalesce(error_message, '')) NOT LIKE '%non-local path%'
276 AND lower(coalesce(error_message, '')) NOT LIKE '%stale ledger%'
277 AND lower(coalesce(error_message, '')) NOT LIKE '%removed local source%'
278 AND lower(coalesce(error_message, '')) NOT LIKE '%disabled catalog source%'
279 AND lower(coalesce(error_message, '')) NOT LIKE '%no indexable records%'
280 """
281 ).fetchone()
282 raw_non_expected_skipped = int(skipped_row["cnt"]) if skipped_row else 0
284 unstable_row = conn.execute(
285 """
286 SELECT COUNT(*) AS cnt
287 FROM source_ledger
288 WHERE status = 'skipped'
289 AND source_type = 'agent_session'
290 AND (
291 lower(coalesce(error_message, '')) LIKE '%not yet stable%'
292 OR lower(coalesce(error_message, '')) LIKE '%unstable%'
293 )
294 """
295 ).fetchone()
296 report.unstable_session_sources = int(unstable_row["cnt"]) if unstable_row else 0
298 reason_rows = conn.execute(
299 """
300 SELECT
301 source_type,
302 coalesce(error_message, '') AS reason,
303 COUNT(*) AS cnt
304 FROM source_ledger
305 WHERE status = 'skipped'
306 GROUP BY source_type, reason
307 ORDER BY cnt DESC, source_type, reason
308 LIMIT 20
309 """
310 ).fetchall()
311 report.skipped_reason_counts = [
312 {
313 "source_type": row["source_type"],
314 "reason": row["reason"],
315 "count": int(row["cnt"]),
316 }
317 for row in reason_rows
318 ]
320 unstable_file_rows = conn.execute(
321 """
322 SELECT
323 CASE
324 WHEN instr(source_path_or_alias, '::') > 0
325 THEN substr(source_path_or_alias, 1, instr(source_path_or_alias, '::') - 1)
326 ELSE source_path_or_alias
327 END AS path,
328 COUNT(*) AS cnt,
329 MIN(last_modified_at) AS first_last_modified_at,
330 MAX(last_modified_at) AS last_last_modified_at,
331 MAX(updated_at) AS last_updated_at
332 FROM source_ledger
333 WHERE status = 'skipped'
334 AND source_type = 'agent_session'
335 AND (
336 lower(coalesce(error_message, '')) LIKE '%not yet stable%'
337 OR lower(coalesce(error_message, '')) LIKE '%unstable%'
338 )
339 GROUP BY path
340 ORDER BY cnt DESC, path
341 """
342 ).fetchall()
343 report.unstable_session_files = [
344 {
345 "path": row["path"],
346 "count": int(row["cnt"]),
347 "first_last_modified_at": row["first_last_modified_at"],
348 "last_last_modified_at": row["last_last_modified_at"],
349 "last_updated_at": row["last_updated_at"],
350 }
351 for row in unstable_file_rows
352 ]
353 _classify_unstable_session_freshness(report, stability_seconds)
354 report.skipped_sources = raw_non_expected_skipped
355 report.actionable_skipped_sources = max(
356 0,
357 raw_non_expected_skipped - report.transient_unstable_session_sources,
358 )
360 if _table_exists(conn, "memory_nodes"):
361 unverified_row = conn.execute(
362 """
363 SELECT COUNT(*) AS cnt
364 FROM memory_nodes
365 WHERE status = 'unverified'
366 AND created_by = 'auto'
367 AND source_kind = 'auto'
368 """
369 ).fetchone()
370 report.unverified_memory_count = (
371 int(unverified_row["cnt"]) if unverified_row else 0
372 )
374 telemetry = analyze_auto_memory_candidates(db_path)
375 report.auto_memory_candidates = telemetry.candidates
376 report.duplicate_skips = telemetry.duplicate_skips
377 report.duplicate_active_skips = telemetry.duplicate_active
378 report.duplicate_unverified_skips = telemetry.duplicate_unverified
379 report.duplicate_rejected_skips = telemetry.duplicate_rejected
380 report.low_confidence_skips = telemetry.low_confidence
381 report.non_document_skips = telemetry.non_document_skipped
382 report.noisy_session_skips = telemetry.noisy_session_skipped
383 except sqlite3.DatabaseError:
384 report.warnings.append("database readable but auto detail query failed")
385 finally:
386 conn.close()
389def _classify_unstable_session_freshness(
390 report: AutoStatusReport,
391 stability_seconds: int,
392) -> None:
393 """Split unstable session skips into transient current writes and stale rows."""
394 if stability_seconds <= 0:
395 report.stale_unstable_session_sources = report.unstable_session_sources
396 report.transient_unstable_session_sources = 0
397 for item in report.unstable_session_files:
398 item["freshness"] = "stale"
399 return
401 now = datetime.now(timezone.utc)
402 transient = 0
403 stale = 0
404 for item in report.unstable_session_files:
405 count = int(item.get("count") or 0)
406 last_modified = _parse_iso_datetime(item.get("last_last_modified_at"))
407 if last_modified is None:
408 stale += count
409 item["freshness"] = "stale"
410 continue
411 age_seconds = (now - last_modified).total_seconds()
412 item["age_seconds"] = max(0.0, round(age_seconds, 3))
413 if age_seconds < stability_seconds:
414 transient += count
415 item["freshness"] = "transient"
416 else:
417 stale += count
418 item["freshness"] = "stale"
420 report.transient_unstable_session_sources = transient
421 report.stale_unstable_session_sources = stale
424def _parse_iso_datetime(value: object) -> datetime | None:
425 if not isinstance(value, str) or not value:
426 return None
427 try:
428 parsed = datetime.fromisoformat(value.replace("Z", "+00:00"))
429 except ValueError:
430 return None
431 if parsed.tzinfo is None:
432 return parsed.replace(tzinfo=timezone.utc)
433 return parsed.astimezone(timezone.utc)