Coverage for src \ truenex_memory \ ingestion \ global_refresh.py: 84%
569 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-19 10:21 +0200
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-19 10:21 +0200
1"""Incremental global refresh using confirmed source catalog + source ledger.
3Loads confirmed catalog entries, maps them to parsers, checks the ledger
4for each parsed record, and indexes only new or changed content.
5"""
7from __future__ import annotations
9import hashlib
10import json
11import os
12import tempfile
13import time as _time
14from collections import Counter
15from dataclasses import dataclass, field
16from pathlib import Path
17from datetime import datetime, timezone
19from truenex_memory.core.chunker import chunk_text, content_hash
20from truenex_memory.discovery.source_catalog import (
21 CatalogEntry,
22 SourceCatalog,
23 source_id,
24)
25from truenex_memory.ingestion.manifest import IngestionRecord
26from truenex_memory.ingestion.parsers import get_parser
27from truenex_memory.store.repository import MemoryRepository
28from truenex_memory.store.source_ledger import (
29 SourceLedgerRecord,
30 list_ledger_entries,
31 update_ledger_status,
32 upsert_ledger_entry,
33)
34from truenex_memory.store.sqlite import connect, initialize_schema
36MAX_CHARS_BY_SOURCE_TYPE: dict[str, int] = {
37 "agent_session": 600,
38 "project_docs": 1200,
39}
40DEFAULT_MAX_CHARS = 1200
41SUPPORTED_AGENT_SESSION_EXTENSIONS = {".jsonl"}
44@dataclass
45class RefreshReport:
46 """Aggregate report for a global refresh run."""
48 new: int = 0
49 modified: int = 0
50 unchanged: int = 0
51 skipped: int = 0
52 missing: int = 0
53 errors: int = 0
54 indexed_records: int = 0
55 catalog_entries: int = 0
56 refresh_skipped: bool = False
57 auto_memory_candidates: int = 0
58 auto_memory_created: int = 0
59 auto_memory_duplicates: int = 0
60 auto_memory_duplicate_active: int = 0
61 auto_memory_duplicate_unverified: int = 0
62 auto_memory_duplicate_rejected: int = 0
63 auto_memory_low_confidence: int = 0
64 auto_memory_limit_skipped: int = 0
65 auto_memory_source_limit_skipped: int = 0
66 auto_memory_non_document_skipped: int = 0
67 auto_memory_noisy_session_skipped: int = 0
68 details: list[dict[str, object]] = field(default_factory=list)
70 def detail_summary(self) -> dict[str, object]:
71 """Return compact counters for large per-source detail lists."""
72 by_action: Counter[str] = Counter()
73 by_source_type: Counter[str] = Counter()
74 by_reason: Counter[str] = Counter()
76 for detail in self.details:
77 action = str(detail.get("action") or "unknown")
78 source_type = str(detail.get("source_type") or "unknown")
79 reason = detail.get("reason") or detail.get("error")
80 by_action[action] += 1
81 by_source_type[source_type] += 1
82 if reason:
83 by_reason[str(reason)] += 1
85 return {
86 "total": len(self.details),
87 "by_action": dict(sorted(by_action.items())),
88 "by_source_type": dict(sorted(by_source_type.items())),
89 "top_reasons": [
90 {"reason": reason, "count": count}
91 for reason, count in by_reason.most_common(10)
92 ],
93 }
95 def to_dict(self, *, detail_limit: int | None = None) -> dict[str, object]:
96 details = self.details
97 details_truncated = False
98 if detail_limit is not None and detail_limit >= 0:
99 details_truncated = len(details) > detail_limit
100 details = details[:detail_limit]
101 payload: dict[str, object] = {
102 "new": self.new,
103 "modified": self.modified,
104 "unchanged": self.unchanged,
105 "skipped": self.skipped,
106 "missing": self.missing,
107 "errors": self.errors,
108 "indexed_records": self.indexed_records,
109 "catalog_entries": self.catalog_entries,
110 "refresh_skipped": self.refresh_skipped,
111 "auto_memory_candidates": self.auto_memory_candidates,
112 "auto_memory_created": self.auto_memory_created,
113 "auto_memory_duplicates": self.auto_memory_duplicates,
114 "auto_memory_duplicate_active": self.auto_memory_duplicate_active,
115 "auto_memory_duplicate_unverified": self.auto_memory_duplicate_unverified,
116 "auto_memory_duplicate_rejected": self.auto_memory_duplicate_rejected,
117 "auto_memory_low_confidence": self.auto_memory_low_confidence,
118 "auto_memory_limit_skipped": self.auto_memory_limit_skipped,
119 "auto_memory_source_limit_skipped": self.auto_memory_source_limit_skipped,
120 "auto_memory_non_document_skipped": self.auto_memory_non_document_skipped,
121 "auto_memory_noisy_session_skipped": self.auto_memory_noisy_session_skipped,
122 "detail_summary": self.detail_summary(),
123 "details": details,
124 "details_total": len(self.details),
125 "details_truncated": details_truncated,
126 }
127 if detail_limit is not None and detail_limit >= 0:
128 payload["detail_limit"] = detail_limit
129 return payload
132@dataclass
133class _RefreshRunCache:
134 """Per-run caches to avoid repeated file reads and ledger queries."""
136 ledger_by_source_id: dict[str, SourceLedgerRecord] = field(default_factory=dict)
137 active_ledger_by_source_type: dict[str, list[SourceLedgerRecord]] = field(default_factory=dict)
138 ledger_by_path_by_source_type: dict[str, dict[str, list[SourceLedgerRecord]]] = field(
139 default_factory=dict
140 )
141 active_ledger_by_path_by_source_type: dict[str, dict[str, list[SourceLedgerRecord]]] = field(
142 default_factory=dict
143 )
144 file_hash_by_path: dict[str, str] = field(default_factory=dict)
145 file_mtime_by_path: dict[str, str] = field(default_factory=dict)
147 def ledger_entry(self, source_id: str) -> SourceLedgerRecord | None:
148 return self.ledger_by_source_id.get(source_id)
150 def active_ledger_entries(self, source_type: str) -> list[SourceLedgerRecord]:
151 return self.active_ledger_by_source_type.get(source_type, [])
153 def ledger_by_physical_path(self, source_type: str) -> dict[str, list[SourceLedgerRecord]]:
154 if source_type not in self.ledger_by_path_by_source_type:
155 grouped: dict[str, list[SourceLedgerRecord]] = {}
156 for row in self.ledger_by_source_id.values():
157 if row.source_type != source_type:
158 continue
159 physical_path = Path(_physical_path(row.source_path_or_alias))
160 grouped.setdefault(_normalized_cache_path_key(physical_path), []).append(row)
161 self.ledger_by_path_by_source_type[source_type] = grouped
162 return self.ledger_by_path_by_source_type[source_type]
164 def active_ledger_by_physical_path(self, source_type: str) -> dict[str, list[SourceLedgerRecord]]:
165 if source_type not in self.active_ledger_by_path_by_source_type:
166 grouped: dict[str, list[SourceLedgerRecord]] = {}
167 for row in self.active_ledger_entries(source_type):
168 physical_path = Path(_physical_path(row.source_path_or_alias))
169 grouped.setdefault(_normalized_cache_path_key(physical_path), []).append(row)
170 self.active_ledger_by_path_by_source_type[source_type] = grouped
171 return self.active_ledger_by_path_by_source_type[source_type]
173 def file_hash(self, path: Path) -> str:
174 key = _cache_path_key(path)
175 if key not in self.file_hash_by_path:
176 self.file_hash_by_path[key] = _file_content_hash(path)
177 return self.file_hash_by_path[key]
179 def file_mtime(self, path: Path) -> str:
180 key = _cache_path_key(path)
181 if key not in self.file_mtime_by_path:
182 self.file_mtime_by_path[key] = _file_mtime_iso(path)
183 return self.file_mtime_by_path[key]
186def _cache_path_key(path: Path) -> str:
187 try:
188 return str(path.resolve())
189 except OSError:
190 return str(path)
193def _normalized_cache_path_key(path: Path) -> str:
194 return os.path.normcase(os.path.normpath(_cache_path_key(path)))
197def _load_run_cache(db_path: Path) -> _RefreshRunCache:
198 cache = _RefreshRunCache()
199 if not db_path.exists():
200 return cache
201 with connect(db_path) as conn:
202 rows = list_ledger_entries(conn)
203 cache.ledger_by_source_id = {row.source_id: row for row in rows}
204 active_by_type: dict[str, list[SourceLedgerRecord]] = {}
205 for row in rows:
206 if row.status == "active":
207 active_by_type.setdefault(row.source_type, []).append(row)
208 cache.active_ledger_by_source_type = active_by_type
209 return cache
212def _now_iso() -> str:
213 return datetime.now(timezone.utc).isoformat()
216def _file_content_hash(path: Path) -> str:
217 """SHA-256 hash of file content for ledger comparison (streaming, no RAM spike)."""
218 try:
219 h = hashlib.sha256()
220 with open(path, "rb") as f:
221 while chunk := f.read(8 * 1024 * 1024):
222 h.update(chunk)
223 return h.hexdigest()
224 except OSError:
225 return ""
228def _file_mtime_iso(path: Path) -> str:
229 try:
230 stat = path.stat()
231 return datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat()
232 except OSError:
233 return _now_iso()
236def _is_jsonl_stable(path: Path, stability_seconds: int) -> bool:
237 """Return True if a .jsonl file has not been modified recently."""
238 if stability_seconds <= 0:
239 return True
240 try:
241 mtime = path.stat().st_mtime
242 return (_time.time() - mtime) >= stability_seconds
243 except OSError:
244 return True
247def _record_text(record: IngestionRecord) -> str:
248 """Build index text with metadata preamble, matching engine._record_text."""
249 meta: dict[str, object] = {
250 "project": record.project,
251 "source_type": record.source_type,
252 "source_tool": record.source_tool,
253 "source_path": record.source_path,
254 "session_id": record.session_id,
255 "created_at": record.created_at,
256 "last_modified": record.last_modified,
257 "privacy_scope": record.privacy_scope,
258 **record.metadata,
259 }
260 meta = {k: v for k, v in meta.items() if v not in (None, "")}
261 preamble = json.dumps(meta, ensure_ascii=False, sort_keys=True)
262 return f"TRUENEX_INGESTION_METADATA {preamble}\n\n{record.text}"
265def _index_record(record: IngestionRecord, repository: MemoryRepository) -> int:
266 """Index a single ingestion record. Returns chunk count or 0 if empty."""
267 indexed_text = _record_text(record)
268 max_chars = MAX_CHARS_BY_SOURCE_TYPE.get(record.source_type, DEFAULT_MAX_CHARS)
269 chunks = chunk_text(indexed_text, max_chars=max_chars)
270 if not chunks:
271 return 0
273 # Agent sessions produce N exchange records from the same file — each needs
274 # a distinct doc_id so exchanges don't overwrite each other in upsert_document.
275 exchange_index = record.metadata.get("exchange_index") if record.metadata else None
276 relative_path = (
277 f"{record.source_path}::exchange_{exchange_index}"
278 if exchange_index is not None
279 else record.source_path
280 )
282 with tempfile.NamedTemporaryFile(
283 mode="w", suffix=".txt", encoding="utf-8", delete=False
284 ) as tmp:
285 tmp.write(indexed_text)
286 tmp_path = Path(tmp.name)
288 try:
289 repository.upsert_document(
290 path=tmp_path,
291 relative_path=relative_path,
292 chunks=chunks,
293 source_type=record.source_type,
294 )
295 finally:
296 try:
297 tmp_path.unlink()
298 except OSError:
299 pass
301 return len(chunks)
304def _record_source_id(record: IngestionRecord) -> str:
305 """Deterministic ledger source_id for a parsed record.
307 For agent_session records each exchange has its own exchange_index in metadata,
308 so we include it to avoid all exchanges of the same JSONL colliding on one ledger entry.
309 """
310 exchange_index = record.metadata.get("exchange_index") if record.metadata else None
311 if exchange_index is not None:
312 return source_id(record.source_type, f"{record.source_path}::exchange_{exchange_index}")
313 return source_id(record.source_type, record.source_path)
316def _add_detail(report: RefreshReport, detail: dict[str, object]) -> None:
317 report.details.append(detail)
320# Main refresh entry point
322def refresh(
323 catalog_path: Path,
324 db_path: Path,
325 *,
326 dry_run: bool = False,
327 stability_seconds: int = 120,
328 embedder=None,
329 vector_store=None,
330) -> RefreshReport:
331 """Run incremental global refresh.
333 Args:
334 catalog_path: Path to the confirmed source catalog JSON.
335 db_path: Path to the SQLite database.
336 dry_run: If True, report planned actions without mutating DB/ledger/index.
337 stability_seconds: Skip recently-modified .jsonl files (agent sessions).
338 embedder: Optional embedder for vector indexing.
339 vector_store: Optional vector store for vector indexing.
341 Returns:
342 RefreshReport with counts and per-record details.
343 """
344 report = RefreshReport()
346 # 1. Load catalog
347 if not catalog_path.exists():
348 report.errors += 1
349 _add_detail(report, {
350 "source_path": str(catalog_path),
351 "source_type": "catalog",
352 "action": "error",
353 "error": f"Catalog file not found: {catalog_path}",
354 })
355 return report
357 catalog = SourceCatalog.load(catalog_path)
358 confirmed = [e for e in catalog.entries if e.confirmation_status == "confirmed"]
359 report.catalog_entries = len(confirmed)
361 if not confirmed:
362 return report
364 # 2. Initialize DB (schema)
365 if not dry_run:
366 with connect(db_path) as conn:
367 initialize_schema(conn)
369 repository = MemoryRepository(db_path, embedder=embedder, vector_store=vector_store)
370 cache = _load_run_cache(db_path)
372 # 3. Process each confirmed entry
373 for entry in confirmed:
374 _process_catalog_entry(entry, report, repository, dry_run, stability_seconds, cache)
376 return report
379def _process_catalog_entry(
380 entry: CatalogEntry,
381 report: RefreshReport,
382 repository: MemoryRepository,
383 dry_run: bool,
384 stability_seconds: int,
385 cache: _RefreshRunCache,
386) -> None:
387 """Process a single confirmed catalog entry."""
389 # server_alias: always skip
390 if entry.source_type == "server_alias":
391 report.skipped += 1
392 _add_detail(report, {
393 "source_id": entry.id,
394 "source_path": entry.path_or_alias,
395 "source_type": entry.source_type,
396 "action": "skipped",
397 "reason": "server_alias not indexable",
398 })
399 if not dry_run:
400 _upsert_ledger_for_catalog_entry(
401 repository.db_path, entry, status="skipped",
402 error_message="server_alias: no network/SSH indexing",
403 )
404 return
406 # Resolve parser
407 parser = _parser_for_entry(entry)
408 if parser is None:
409 report.skipped += 1
410 _add_detail(report, {
411 "source_id": entry.id,
412 "source_path": entry.path_or_alias,
413 "source_type": entry.source_type,
414 "action": "skipped",
415 "reason": "no parser registered for catalog source type",
416 })
417 if not dry_run:
418 _upsert_ledger_for_catalog_entry(
419 repository.db_path,
420 entry,
421 status="skipped",
422 error_message="no parser registered for catalog source type",
423 )
424 return
425 source_dir = _resolve_source_dir(entry)
427 # Check source exists
428 path = Path(entry.path_or_alias)
429 if not path.exists():
430 if _is_nonlocal_absolute_path(entry.path_or_alias):
431 report.skipped += 1
432 _add_detail(report, {
433 "source_id": entry.id,
434 "source_path": entry.path_or_alias,
435 "source_type": entry.source_type,
436 "action": "skipped",
437 "reason": "non-local absolute path not indexable on this platform",
438 })
439 if not dry_run:
440 _upsert_ledger_for_catalog_entry(
441 repository.db_path,
442 entry,
443 status="skipped",
444 error_message="non-local path: no local filesystem indexing",
445 )
446 return
448 report.missing += 1
449 _add_detail(report, {
450 "source_id": entry.id,
451 "source_path": entry.path_or_alias,
452 "source_type": entry.source_type,
453 "action": "missing",
454 "reason": "source path not found",
455 })
456 if not dry_run:
457 _upsert_ledger_for_catalog_entry(
458 repository.db_path, entry, status="missing",
459 error_message="source path not found",
460 )
461 _mark_missing_previous_records(
462 entry,
463 report,
464 repository.db_path,
465 set(),
466 dry_run,
467 cache,
468 )
469 return
471 # Run parser
472 project_name = entry.project_name or _infer_name(entry.path_or_alias)
473 source_tool = entry.discovered_from[0] if entry.discovered_from else "global-refresh"
474 privacy_scope = entry.privacy_scope
476 if entry.source_type == "agent_root":
477 _process_agent_root_entry(
478 entry,
479 report,
480 repository,
481 dry_run,
482 stability_seconds,
483 cache,
484 source_dir,
485 project_name,
486 source_tool,
487 privacy_scope,
488 )
489 return
491 if entry.source_type in {"project_root", "document"}:
492 _process_project_docs_entry(
493 entry,
494 report,
495 repository,
496 dry_run,
497 stability_seconds,
498 cache,
499 source_dir,
500 project_name,
501 source_tool,
502 privacy_scope,
503 )
504 return
506 try:
507 records = parser(source_dir, project_name, source_tool, privacy_scope)
508 except Exception as exc:
509 report.errors += 1
510 _add_detail(report, {
511 "source_id": entry.id,
512 "source_path": entry.path_or_alias,
513 "source_type": entry.source_type,
514 "action": "error",
515 "error": f"{type(exc).__name__}: {exc}",
516 })
517 if not dry_run:
518 _upsert_ledger_for_catalog_entry(
519 repository.db_path, entry, status="error",
520 error_message=f"{type(exc).__name__}: {exc}",
521 )
522 return
524 if not records:
525 report.skipped += 1
526 _add_detail(report, {
527 "source_id": entry.id,
528 "source_path": entry.path_or_alias,
529 "source_type": entry.source_type,
530 "action": "skipped",
531 "reason": "no indexable records",
532 })
533 if not dry_run:
534 _upsert_ledger_for_catalog_entry(
535 repository.db_path,
536 entry,
537 status="skipped",
538 error_message="no indexable records",
539 )
541 seen_source_ids: set[str] = set()
543 # Process each record from the parser
544 for record in records:
545 seen_source_ids.add(_record_source_id(record))
546 _process_record(record, entry, report, repository, dry_run, stability_seconds, cache)
548 _mark_missing_previous_records(
549 entry,
550 report,
551 repository.db_path,
552 seen_source_ids,
553 dry_run,
554 cache,
555 )
558def _parser_for_entry(entry: CatalogEntry):
559 """Map catalog source_type to registered parser name."""
560 if entry.source_type == "project_root":
561 return get_parser("project_docs")
562 if entry.source_type == "document":
563 return get_parser("project_docs")
564 if entry.source_type == "agent_root":
565 return get_parser("agent_session")
566 return None
569def _parser_source_type_for_entry(entry: CatalogEntry) -> str | None:
570 """Return the ledger source_type produced by the parser for a catalog entry."""
571 if entry.source_type in {"project_root", "document"}:
572 return "project_docs"
573 if entry.source_type == "agent_root":
574 return "agent_session"
575 return None
578def _is_nonlocal_absolute_path(path_or_alias: str) -> bool:
579 """Detect POSIX absolute paths that cannot be indexed on Windows as local files."""
580 if os.name != "nt":
581 return False
582 cleaned = path_or_alias.strip()
583 if not cleaned.startswith("/") or cleaned.startswith("//"):
584 return False
585 return not Path(cleaned).exists()
588def _resolve_source_dir(entry: CatalogEntry) -> Path:
589 return Path(entry.path_or_alias)
592def _infer_name(path_or_alias: str) -> str:
593 cleaned = path_or_alias.strip().replace("\\", "/").rstrip("/")
594 if not cleaned:
595 return "unknown"
596 return cleaned.rsplit("/", 1)[-1] or "unknown"
599def _process_agent_root_entry(
600 entry: CatalogEntry,
601 report: RefreshReport,
602 repository: MemoryRepository,
603 dry_run: bool,
604 stability_seconds: int,
605 cache: _RefreshRunCache,
606 source_dir: Path,
607 project_name: str,
608 source_tool: str,
609 privacy_scope: str,
610) -> None:
611 parser = get_parser("agent_session")
612 if parser is None:
613 report.skipped += 1
614 _add_detail(report, {
615 "source_id": entry.id,
616 "source_path": entry.path_or_alias,
617 "source_type": entry.source_type,
618 "action": "skipped",
619 "reason": "no parser registered for catalog source type",
620 })
621 return
623 session_files = _iter_agent_session_files(source_dir)
624 if not session_files:
625 report.skipped += 1
626 _add_detail(report, {
627 "source_id": entry.id,
628 "source_path": entry.path_or_alias,
629 "source_type": entry.source_type,
630 "action": "skipped",
631 "reason": "no indexable records",
632 })
633 if not dry_run:
634 _upsert_ledger_for_catalog_entry(
635 repository.db_path,
636 entry,
637 status="skipped",
638 error_message="no indexable records",
639 )
640 _mark_missing_previous_records(
641 entry,
642 report,
643 repository.db_path,
644 set(),
645 dry_run,
646 cache,
647 )
648 return
650 active_by_path = cache.active_ledger_by_physical_path("agent_session")
651 ledger_by_path = cache.ledger_by_physical_path("agent_session")
652 seen_source_ids: set[str] = set()
653 processed_records = 0
654 parse_errors = 0
656 for session_file in session_files:
657 mtime = cache.file_mtime(session_file)
658 path_key = _normalized_cache_path_key(session_file)
659 previous = active_by_path.get(path_key, [])
660 all_previous = ledger_by_path.get(path_key, [])
661 can_skip_full_parse = _all_previous_records_are_active(all_previous, previous)
662 if can_skip_full_parse and _active_records_match_mtime(previous, mtime):
663 report.unchanged += len(previous)
664 processed_records += len(previous)
665 seen_source_ids.update(row.source_id for row in previous)
666 _add_detail(report, {
667 "source_path": str(session_file.resolve()),
668 "source_type": "agent_session",
669 "action": "unchanged",
670 "records": len(previous),
671 "reason": "unchanged session file skipped before hashing",
672 })
673 continue
675 if can_skip_full_parse:
676 file_hash = cache.file_hash(session_file)
677 if all(row.content_hash == file_hash for row in previous):
678 report.unchanged += len(previous)
679 processed_records += len(previous)
680 seen_source_ids.update(row.source_id for row in previous)
681 _refresh_unchanged_ledger_mtime(
682 repository.db_path,
683 previous,
684 mtime,
685 dry_run,
686 )
687 _add_detail(report, {
688 "source_path": str(session_file.resolve()),
689 "source_type": "agent_session",
690 "action": "unchanged",
691 "records": len(previous),
692 "reason": "unchanged session file skipped before parsing",
693 })
694 continue
696 try:
697 records = parser(session_file, project_name, source_tool, privacy_scope)
698 except Exception as exc:
699 report.errors += 1
700 parse_errors += 1
701 _add_detail(report, {
702 "source_id": entry.id,
703 "source_path": str(session_file.resolve()),
704 "source_type": "agent_session",
705 "action": "error",
706 "error": f"{type(exc).__name__}: {exc}",
707 })
708 continue
710 if not records:
711 report.skipped += 1
712 _add_detail(report, {
713 "source_path": str(session_file.resolve()),
714 "source_type": "agent_session",
715 "action": "skipped",
716 "reason": "no indexable records",
717 })
718 continue
720 for record in records:
721 processed_records += 1
722 seen_source_ids.add(_record_source_id(record))
723 _process_record(record, entry, report, repository, dry_run, stability_seconds, cache)
725 if processed_records == 0 and parse_errors == 0 and not dry_run:
726 _upsert_ledger_for_catalog_entry(
727 repository.db_path,
728 entry,
729 status="skipped",
730 error_message="no indexable records",
731 )
733 _mark_missing_previous_records(
734 entry,
735 report,
736 repository.db_path,
737 seen_source_ids,
738 dry_run,
739 cache,
740 )
743def _iter_agent_session_files(source_dir: Path) -> list[Path]:
744 resolved = source_dir.resolve()
745 if resolved.is_file():
746 if resolved.suffix.lower() in SUPPORTED_AGENT_SESSION_EXTENSIONS:
747 return [resolved]
748 return []
749 return sorted(
750 p for p in resolved.rglob("*")
751 if p.is_file() and p.suffix.lower() in SUPPORTED_AGENT_SESSION_EXTENSIONS
752 )
755def _process_project_docs_entry(
756 entry: CatalogEntry,
757 report: RefreshReport,
758 repository: MemoryRepository,
759 dry_run: bool,
760 stability_seconds: int,
761 cache: _RefreshRunCache,
762 source_dir: Path,
763 project_name: str,
764 source_tool: str,
765 privacy_scope: str,
766) -> None:
767 parser = get_parser("project_docs")
768 if parser is None:
769 report.skipped += 1
770 _add_detail(report, {
771 "source_id": entry.id,
772 "source_path": entry.path_or_alias,
773 "source_type": entry.source_type,
774 "action": "skipped",
775 "reason": "no parser registered for catalog source type",
776 })
777 return
779 candidates = _iter_project_doc_files(source_dir)
780 if not candidates:
781 report.skipped += 1
782 _add_detail(report, {
783 "source_id": entry.id,
784 "source_path": entry.path_or_alias,
785 "source_type": entry.source_type,
786 "action": "skipped",
787 "reason": "no indexable records",
788 })
789 if not dry_run:
790 _upsert_ledger_for_catalog_entry(
791 repository.db_path,
792 entry,
793 status="skipped",
794 error_message="no indexable records",
795 )
796 _mark_missing_previous_records(
797 entry,
798 report,
799 repository.db_path,
800 set(),
801 dry_run,
802 cache,
803 )
804 return
806 active_by_path = cache.active_ledger_by_physical_path("project_docs")
807 seen_source_ids: set[str] = set()
808 processed_records = 0
809 parse_errors = 0
811 for file_path in candidates:
812 mtime = cache.file_mtime(file_path)
813 previous = active_by_path.get(_normalized_cache_path_key(file_path), [])
814 if len(previous) == 1 and _active_records_match_mtime(previous, mtime):
815 ledger_record = previous[0]
816 report.unchanged += 1
817 processed_records += 1
818 seen_source_ids.add(ledger_record.source_id)
819 _add_detail(report, {
820 "source_id": ledger_record.source_id,
821 "source_path": ledger_record.source_path_or_alias,
822 "source_type": ledger_record.source_type,
823 "action": "unchanged",
824 "reason": "unchanged document skipped before hashing",
825 })
826 continue
828 file_hash = cache.file_hash(file_path)
829 if len(previous) == 1 and previous[0].content_hash == file_hash:
830 ledger_record = previous[0]
831 report.unchanged += 1
832 processed_records += 1
833 seen_source_ids.add(ledger_record.source_id)
834 _refresh_unchanged_ledger_mtime(
835 repository.db_path,
836 [ledger_record],
837 mtime,
838 dry_run,
839 )
840 _add_detail(report, {
841 "source_id": ledger_record.source_id,
842 "source_path": ledger_record.source_path_or_alias,
843 "source_type": ledger_record.source_type,
844 "action": "unchanged",
845 "reason": "unchanged document skipped before parsing",
846 })
847 continue
849 try:
850 records = parser(file_path, project_name, source_tool, privacy_scope)
851 except Exception as exc:
852 report.errors += 1
853 parse_errors += 1
854 _add_detail(report, {
855 "source_id": entry.id,
856 "source_path": str(file_path.resolve()),
857 "source_type": "project_docs",
858 "action": "error",
859 "error": f"{type(exc).__name__}: {exc}",
860 })
861 continue
863 if not records:
864 continue
866 for record in records:
867 processed_records += 1
868 seen_source_ids.add(_record_source_id(record))
869 _process_record(record, entry, report, repository, dry_run, stability_seconds, cache)
871 if processed_records == 0 and parse_errors == 0:
872 report.skipped += 1
873 _add_detail(report, {
874 "source_id": entry.id,
875 "source_path": entry.path_or_alias,
876 "source_type": entry.source_type,
877 "action": "skipped",
878 "reason": "no indexable records",
879 })
880 if not dry_run:
881 _upsert_ledger_for_catalog_entry(
882 repository.db_path,
883 entry,
884 status="skipped",
885 error_message="no indexable records",
886 )
888 _mark_missing_previous_records(
889 entry,
890 report,
891 repository.db_path,
892 seen_source_ids,
893 dry_run,
894 cache,
895 )
898def _iter_project_doc_files(source_dir: Path) -> list[Path]:
899 from truenex_memory.ingestion.parsers.text_docs import (
900 EXCLUDED_FILENAMES,
901 INDEX_EXTENSIONS,
902 _iter_candidate_files,
903 )
905 resolved = source_dir.resolve()
906 if resolved.is_file():
907 if (
908 resolved.suffix.lower() in INDEX_EXTENSIONS
909 and resolved.name not in EXCLUDED_FILENAMES
910 ):
911 return [resolved]
912 return []
913 return [
914 p for p in _iter_candidate_files(resolved)
915 if p.suffix.lower() in INDEX_EXTENSIONS
916 and p.name not in EXCLUDED_FILENAMES
917 ]
920def _active_records_match_mtime(
921 records: list[SourceLedgerRecord],
922 last_modified_at: str,
923) -> bool:
924 return bool(records) and all(
925 row.content_hash and row.last_modified_at == last_modified_at
926 for row in records
927 )
930def _all_previous_records_are_active(
931 all_previous: list[SourceLedgerRecord],
932 active_previous: list[SourceLedgerRecord],
933) -> bool:
934 if not all_previous or len(all_previous) != len(active_previous):
935 return False
936 return all(row.status == "active" for row in all_previous)
939def _refresh_unchanged_ledger_mtime(
940 db_path: Path,
941 records: list[SourceLedgerRecord],
942 last_modified_at: str,
943 dry_run: bool,
944) -> None:
945 if dry_run or not records:
946 return
947 with connect(db_path) as conn:
948 initialize_schema(conn)
949 for row in records:
950 upsert_ledger_entry(
951 conn,
952 row.source_id,
953 row.source_path_or_alias,
954 row.source_type,
955 project_name=row.project_name,
956 parser_version=row.parser_version,
957 content_hash=row.content_hash,
958 last_modified_at=last_modified_at,
959 last_indexed_at=row.last_indexed_at,
960 status=row.status,
961 error_message=row.error_message,
962 chunk_count=row.chunk_count,
963 )
966def _process_record(
967 record: IngestionRecord,
968 entry: CatalogEntry,
969 report: RefreshReport,
970 repository: MemoryRepository,
971 dry_run: bool,
972 stability_seconds: int,
973 cache: _RefreshRunCache,
974) -> None:
975 """Check ledger for a single parsed record and index if new/changed."""
977 file_path = Path(record.source_path)
978 rec_source_id = _record_source_id(record)
979 file_hash = cache.file_hash(file_path)
980 mtime = cache.file_mtime(file_path)
982 # Check ledger before stability handling. If a JSONL session already has a
983 # previous active version, an unstable write must leave that version active.
984 existing = cache.ledger_entry(rec_source_id)
986 # Stability check for .jsonl agent sessions
987 if record.source_type == "agent_session" and file_path.suffix.lower() == ".jsonl":
988 if not _is_jsonl_stable(file_path, stability_seconds):
989 report.skipped += 1
990 _add_detail(report, {
991 "source_id": rec_source_id,
992 "source_path": record.source_path,
993 "source_type": record.source_type,
994 "action": "skipped",
995 "reason": "JSONL modified recently, not yet stable",
996 })
997 if not dry_run and (existing is None or existing.status != "active"):
998 _upsert_record_ledger(
999 repository.db_path, rec_source_id, record, entry,
1000 status="skipped", content_hash=file_hash,
1001 last_modified_at=mtime, chunk_count=0,
1002 error_message="JSONL modified recently, not yet stable",
1003 )
1004 return
1006 if existing is None:
1007 # New record
1008 if not file_path.exists():
1009 report.missing += 1
1010 _add_detail(report, {
1011 "source_id": rec_source_id,
1012 "source_path": record.source_path,
1013 "source_type": record.source_type,
1014 "action": "missing",
1015 })
1016 if not dry_run:
1017 _upsert_record_ledger(
1018 repository.db_path, rec_source_id, record, entry,
1019 status="missing", content_hash="",
1020 last_modified_at=mtime, chunk_count=0,
1021 error_message="source file not found",
1022 )
1023 else:
1024 if not dry_run:
1025 try:
1026 chunk_count = _index_record(record, repository)
1027 except Exception as exc:
1028 _record_index_error(
1029 repository.db_path, rec_source_id, record, entry,
1030 report, existing=None, content_hash=file_hash,
1031 last_modified_at=mtime, error=exc,
1032 )
1033 return
1034 report.indexed_records += 1
1035 _upsert_record_ledger(
1036 repository.db_path, rec_source_id, record, entry,
1037 status="active", content_hash=file_hash,
1038 last_modified_at=mtime, chunk_count=chunk_count,
1039 )
1040 report.new += 1
1041 _add_detail(report, {
1042 "source_id": rec_source_id,
1043 "source_path": record.source_path,
1044 "source_type": record.source_type,
1045 "action": "new",
1046 })
1047 return
1049 # Existing ledger record
1050 if existing.status == "active" and existing.content_hash == file_hash:
1051 # Unchanged
1052 report.unchanged += 1
1053 _add_detail(report, {
1054 "source_id": rec_source_id,
1055 "source_path": record.source_path,
1056 "source_type": record.source_type,
1057 "action": "unchanged",
1058 })
1059 return
1061 # Existing non-active records were never successfully indexed as an active
1062 # version. Once they become indexable, report them as new rather than
1063 # modified.
1064 action = "modified" if existing.status == "active" else "new"
1065 if action == "new":
1066 report.new += 1
1067 else:
1068 report.modified += 1
1070 # Changed content or retry from a non-active status.
1071 if not file_path.exists():
1072 if action == "new":
1073 report.new -= 1
1074 else:
1075 report.modified -= 1
1076 report.missing += 1
1077 _add_detail(report, {
1078 "source_id": rec_source_id,
1079 "source_path": record.source_path,
1080 "source_type": record.source_type,
1081 "action": "missing",
1082 })
1083 if not dry_run:
1084 _upsert_record_ledger(
1085 repository.db_path, rec_source_id, record, entry,
1086 status="missing", content_hash="",
1087 last_modified_at=mtime, chunk_count=0,
1088 error_message="source file no longer exists",
1089 )
1090 else:
1091 if not dry_run:
1092 try:
1093 chunk_count = _index_record(record, repository)
1094 except Exception as exc:
1095 if action == "new":
1096 report.new -= 1
1097 else:
1098 report.modified -= 1
1099 _record_index_error(
1100 repository.db_path, rec_source_id, record, entry,
1101 report, existing=existing, content_hash=file_hash,
1102 last_modified_at=mtime, error=exc,
1103 )
1104 return
1105 report.indexed_records += 1
1106 _upsert_record_ledger(
1107 repository.db_path, rec_source_id, record, entry,
1108 status="active", content_hash=file_hash,
1109 last_modified_at=mtime, chunk_count=chunk_count,
1110 )
1111 _add_detail(report, {
1112 "source_id": rec_source_id,
1113 "source_path": record.source_path,
1114 "source_type": record.source_type,
1115 "action": action,
1116 "previous_hash": existing.content_hash,
1117 "new_hash": file_hash,
1118 })
1121def _record_index_error(
1122 db_path: Path,
1123 rec_source_id: str,
1124 record: IngestionRecord,
1125 entry: CatalogEntry,
1126 report: RefreshReport,
1127 *,
1128 existing: SourceLedgerRecord | None,
1129 content_hash: str,
1130 last_modified_at: str,
1131 error: Exception,
1132) -> None:
1133 """Record an index failure without replacing a previous active version."""
1134 message = str(error) or error.__class__.__name__
1135 report.errors += 1
1136 preserved_active = existing is not None and existing.status == "active"
1137 _add_detail(report, {
1138 "source_id": rec_source_id,
1139 "source_path": record.source_path,
1140 "source_type": record.source_type,
1141 "action": "error",
1142 "reason": message,
1143 "previous_status": existing.status if existing is not None else None,
1144 "preserved_previous_active": preserved_active,
1145 })
1146 if preserved_active:
1147 with connect(db_path) as conn:
1148 initialize_schema(conn)
1149 update_ledger_status(
1150 conn, rec_source_id, "error", error_message=message,
1151 )
1152 return
1153 _upsert_record_ledger(
1154 db_path, rec_source_id, record, entry,
1155 status="error", content_hash=content_hash,
1156 last_modified_at=last_modified_at, chunk_count=0,
1157 error_message=message,
1158 )
1161def _upsert_ledger_for_catalog_entry(
1162 db_path: Path,
1163 entry: CatalogEntry,
1164 *,
1165 status: str,
1166 error_message: str | None = None,
1167) -> None:
1168 """Write a catalog-level entry into the source ledger."""
1169 with connect(db_path) as conn:
1170 initialize_schema(conn)
1171 upsert_ledger_entry(
1172 conn,
1173 entry.id,
1174 entry.path_or_alias,
1175 entry.source_type,
1176 project_name=entry.project_name,
1177 status=status,
1178 content_hash=None,
1179 last_modified_at=None,
1180 last_indexed_at=_now_iso() if status in ("active",) else None,
1181 error_message=error_message,
1182 chunk_count=0,
1183 )
1186def _qualified_source_path(record: IngestionRecord) -> str:
1187 """Return the path stored in source_ledger.source_path_or_alias.
1189 For agent_session exchanges this must match documents.path (which uses the
1190 ::exchange_N suffix) so that the JOIN in _iter_candidates finds rows.
1191 """
1192 exchange_index = record.metadata.get("exchange_index") if record.metadata else None
1193 if exchange_index is not None:
1194 return f"{record.source_path}::exchange_{exchange_index}"
1195 return record.source_path
1198def _upsert_record_ledger(
1199 db_path: Path,
1200 rec_source_id: str,
1201 record: IngestionRecord,
1202 entry: CatalogEntry,
1203 *,
1204 status: str,
1205 content_hash: str,
1206 last_modified_at: str,
1207 chunk_count: int,
1208 error_message: str | None = None,
1209) -> None:
1210 """Write a file-level entry into the source ledger."""
1211 with connect(db_path) as conn:
1212 initialize_schema(conn)
1213 upsert_ledger_entry(
1214 conn,
1215 rec_source_id,
1216 _qualified_source_path(record),
1217 record.source_type,
1218 project_name=entry.project_name or record.project,
1219 status=status,
1220 content_hash=content_hash,
1221 last_modified_at=last_modified_at,
1222 last_indexed_at=_now_iso() if status == "active" else None,
1223 error_message=error_message,
1224 chunk_count=chunk_count,
1225 )
1228def _mark_missing_previous_records(
1229 entry: CatalogEntry,
1230 report: RefreshReport,
1231 db_path: Path,
1232 seen_source_ids: set[str],
1233 dry_run: bool,
1234 cache: _RefreshRunCache,
1235) -> None:
1236 """Mark active ledger records missing when a previously indexed file disappears."""
1237 parser_source_type = _parser_source_type_for_entry(entry)
1238 if parser_source_type is None or not db_path.exists():
1239 return
1241 for ledger_record in _active_ledger_records_for_entry(
1242 entry,
1243 parser_source_type,
1244 cache,
1245 ):
1246 if ledger_record.source_id in seen_source_ids:
1247 continue
1248 if Path(_physical_path(ledger_record.source_path_or_alias)).exists():
1249 continue
1251 report.missing += 1
1252 _add_detail(report, {
1253 "source_id": ledger_record.source_id,
1254 "source_path": ledger_record.source_path_or_alias,
1255 "source_type": ledger_record.source_type,
1256 "action": "missing",
1257 "reason": "previously indexed source file no longer exists",
1258 })
1259 if not dry_run:
1260 with connect(db_path) as conn:
1261 initialize_schema(conn)
1262 upsert_ledger_entry(
1263 conn,
1264 ledger_record.source_id,
1265 ledger_record.source_path_or_alias,
1266 ledger_record.source_type,
1267 project_name=ledger_record.project_name,
1268 parser_version=ledger_record.parser_version,
1269 content_hash=ledger_record.content_hash,
1270 last_modified_at=ledger_record.last_modified_at,
1271 last_indexed_at=ledger_record.last_indexed_at,
1272 status="missing",
1273 error_message="previously indexed source file no longer exists",
1274 chunk_count=ledger_record.chunk_count,
1275 )
1278def _active_ledger_records_for_entry(
1279 entry: CatalogEntry,
1280 parser_source_type: str,
1281 cache: _RefreshRunCache,
1282) -> list[SourceLedgerRecord]:
1283 """Return active ledger records physically under a catalog entry path."""
1284 grouped = cache.active_ledger_by_physical_path(parser_source_type)
1285 source_key = _normalized_cache_path_key(Path(entry.path_or_alias))
1287 if entry.source_type == "document":
1288 return grouped.get(source_key, [])
1290 if entry.source_type not in {"project_root", "agent_root"}:
1291 return []
1293 source_prefix = source_key.rstrip("\\/") + os.sep
1294 records: list[SourceLedgerRecord] = []
1295 for path_key, rows in grouped.items():
1296 if path_key == source_key or path_key.startswith(source_prefix):
1297 records.extend(rows)
1298 return records
1301def _physical_path(path_or_alias: str) -> str:
1302 """Strip ::exchange_N virtual suffix to get the real filesystem path."""
1303 sep = path_or_alias.find("::")
1304 return path_or_alias[:sep] if sep != -1 else path_or_alias
1307def _ledger_record_belongs_to_entry(record_path: str, entry: CatalogEntry) -> bool:
1308 record = Path(_physical_path(record_path)).resolve()
1309 source = Path(entry.path_or_alias).resolve()
1310 if entry.source_type == "document":
1311 return record == source
1312 if entry.source_type in {"project_root", "agent_root"}:
1313 try:
1314 record.relative_to(source)
1315 return True
1316 except ValueError:
1317 return False
1318 return False
1321# Text formatting
1323def format_refresh_report(report: RefreshReport) -> str:
1324 """Format a RefreshReport as a human-readable string."""
1325 summary = report.detail_summary()
1326 lines: list[str] = [
1327 "Refresh completed",
1328 "",
1329 f" New: {report.new}",
1330 f" Modified: {report.modified}",
1331 f" Unchanged: {report.unchanged}",
1332 f" Skipped: {report.skipped}",
1333 f" Missing: {report.missing}",
1334 f" Errors: {report.errors}",
1335 f" Indexed records: {report.indexed_records}",
1336 f" Catalog entries: {report.catalog_entries}",
1337 f" Detail rows: {summary['total']}",
1338 ]
1339 by_action = summary.get("by_action", {})
1340 if by_action:
1341 lines.append(
1342 " Detail by action: "
1343 + " ".join(f"{key}={value}" for key, value in by_action.items())
1344 )
1345 by_source_type = summary.get("by_source_type", {})
1346 if by_source_type:
1347 lines.append(
1348 " Detail by source_type: "
1349 + " ".join(f"{key}={value}" for key, value in by_source_type.items())
1350 )
1351 top_reasons = summary.get("top_reasons", [])
1352 if top_reasons:
1353 reason_parts = [
1354 f"{item['count']}x {item['reason']}" for item in top_reasons[:5]
1355 ]
1356 lines.append(" Top reasons: " + "; ".join(reason_parts))
1357 if report.refresh_skipped:
1358 lines.append(" Refresh skipped: yes")
1359 if (
1360 report.auto_memory_candidates
1361 or report.auto_memory_created
1362 or report.auto_memory_duplicates
1363 or report.auto_memory_duplicate_active
1364 or report.auto_memory_duplicate_unverified
1365 or report.auto_memory_duplicate_rejected
1366 or report.auto_memory_low_confidence
1367 or report.auto_memory_limit_skipped
1368 or report.auto_memory_source_limit_skipped
1369 or report.auto_memory_non_document_skipped
1370 or report.auto_memory_noisy_session_skipped
1371 ):
1372 lines.extend([
1373 f" Auto-memory candidates: {report.auto_memory_candidates}",
1374 f" Auto-memory created: {report.auto_memory_created}",
1375 f" Auto-memory duplicates skipped: {report.auto_memory_duplicates}",
1376 f" Auto-memory active duplicates skipped: {report.auto_memory_duplicate_active}",
1377 f" Auto-memory unverified duplicates skipped: {report.auto_memory_duplicate_unverified}",
1378 f" Auto-memory rejected duplicates skipped: {report.auto_memory_duplicate_rejected}",
1379 f" Auto-memory low-confidence skipped: {report.auto_memory_low_confidence}",
1380 f" Auto-memory limit skipped: {report.auto_memory_limit_skipped}",
1381 f" Auto-memory source-limit skipped: {report.auto_memory_source_limit_skipped}",
1382 f" Auto-memory non-document skipped: {report.auto_memory_non_document_skipped}",
1383 f" Auto-memory noisy-session skipped: {report.auto_memory_noisy_session_skipped}",
1384 ])
1385 return "\n".join(lines)