Coverage for src \ truenex_memory \ ingestion \ global_refresh.py: 84%

569 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-19 10:21 +0200

1"""Incremental global refresh using confirmed source catalog + source ledger. 

2 

3Loads confirmed catalog entries, maps them to parsers, checks the ledger 

4for each parsed record, and indexes only new or changed content. 

5""" 

6 

7from __future__ import annotations 

8 

9import hashlib 

10import json 

11import os 

12import tempfile 

13import time as _time 

14from collections import Counter 

15from dataclasses import dataclass, field 

16from pathlib import Path 

17from datetime import datetime, timezone 

18 

19from truenex_memory.core.chunker import chunk_text, content_hash 

20from truenex_memory.discovery.source_catalog import ( 

21 CatalogEntry, 

22 SourceCatalog, 

23 source_id, 

24) 

25from truenex_memory.ingestion.manifest import IngestionRecord 

26from truenex_memory.ingestion.parsers import get_parser 

27from truenex_memory.store.repository import MemoryRepository 

28from truenex_memory.store.source_ledger import ( 

29 SourceLedgerRecord, 

30 list_ledger_entries, 

31 update_ledger_status, 

32 upsert_ledger_entry, 

33) 

34from truenex_memory.store.sqlite import connect, initialize_schema 

35 

36MAX_CHARS_BY_SOURCE_TYPE: dict[str, int] = { 

37 "agent_session": 600, 

38 "project_docs": 1200, 

39} 

40DEFAULT_MAX_CHARS = 1200 

41SUPPORTED_AGENT_SESSION_EXTENSIONS = {".jsonl"} 

42 

43 

44@dataclass 

45class RefreshReport: 

46 """Aggregate report for a global refresh run.""" 

47 

48 new: int = 0 

49 modified: int = 0 

50 unchanged: int = 0 

51 skipped: int = 0 

52 missing: int = 0 

53 errors: int = 0 

54 indexed_records: int = 0 

55 catalog_entries: int = 0 

56 refresh_skipped: bool = False 

57 auto_memory_candidates: int = 0 

58 auto_memory_created: int = 0 

59 auto_memory_duplicates: int = 0 

60 auto_memory_duplicate_active: int = 0 

61 auto_memory_duplicate_unverified: int = 0 

62 auto_memory_duplicate_rejected: int = 0 

63 auto_memory_low_confidence: int = 0 

64 auto_memory_limit_skipped: int = 0 

65 auto_memory_source_limit_skipped: int = 0 

66 auto_memory_non_document_skipped: int = 0 

67 auto_memory_noisy_session_skipped: int = 0 

68 details: list[dict[str, object]] = field(default_factory=list) 

69 

70 def detail_summary(self) -> dict[str, object]: 

71 """Return compact counters for large per-source detail lists.""" 

72 by_action: Counter[str] = Counter() 

73 by_source_type: Counter[str] = Counter() 

74 by_reason: Counter[str] = Counter() 

75 

76 for detail in self.details: 

77 action = str(detail.get("action") or "unknown") 

78 source_type = str(detail.get("source_type") or "unknown") 

79 reason = detail.get("reason") or detail.get("error") 

80 by_action[action] += 1 

81 by_source_type[source_type] += 1 

82 if reason: 

83 by_reason[str(reason)] += 1 

84 

85 return { 

86 "total": len(self.details), 

87 "by_action": dict(sorted(by_action.items())), 

88 "by_source_type": dict(sorted(by_source_type.items())), 

89 "top_reasons": [ 

90 {"reason": reason, "count": count} 

91 for reason, count in by_reason.most_common(10) 

92 ], 

93 } 

94 

95 def to_dict(self, *, detail_limit: int | None = None) -> dict[str, object]: 

96 details = self.details 

97 details_truncated = False 

98 if detail_limit is not None and detail_limit >= 0: 

99 details_truncated = len(details) > detail_limit 

100 details = details[:detail_limit] 

101 payload: dict[str, object] = { 

102 "new": self.new, 

103 "modified": self.modified, 

104 "unchanged": self.unchanged, 

105 "skipped": self.skipped, 

106 "missing": self.missing, 

107 "errors": self.errors, 

108 "indexed_records": self.indexed_records, 

109 "catalog_entries": self.catalog_entries, 

110 "refresh_skipped": self.refresh_skipped, 

111 "auto_memory_candidates": self.auto_memory_candidates, 

112 "auto_memory_created": self.auto_memory_created, 

113 "auto_memory_duplicates": self.auto_memory_duplicates, 

114 "auto_memory_duplicate_active": self.auto_memory_duplicate_active, 

115 "auto_memory_duplicate_unverified": self.auto_memory_duplicate_unverified, 

116 "auto_memory_duplicate_rejected": self.auto_memory_duplicate_rejected, 

117 "auto_memory_low_confidence": self.auto_memory_low_confidence, 

118 "auto_memory_limit_skipped": self.auto_memory_limit_skipped, 

119 "auto_memory_source_limit_skipped": self.auto_memory_source_limit_skipped, 

120 "auto_memory_non_document_skipped": self.auto_memory_non_document_skipped, 

121 "auto_memory_noisy_session_skipped": self.auto_memory_noisy_session_skipped, 

122 "detail_summary": self.detail_summary(), 

123 "details": details, 

124 "details_total": len(self.details), 

125 "details_truncated": details_truncated, 

126 } 

127 if detail_limit is not None and detail_limit >= 0: 

128 payload["detail_limit"] = detail_limit 

129 return payload 

130 

131 

132@dataclass 

133class _RefreshRunCache: 

134 """Per-run caches to avoid repeated file reads and ledger queries.""" 

135 

136 ledger_by_source_id: dict[str, SourceLedgerRecord] = field(default_factory=dict) 

137 active_ledger_by_source_type: dict[str, list[SourceLedgerRecord]] = field(default_factory=dict) 

138 ledger_by_path_by_source_type: dict[str, dict[str, list[SourceLedgerRecord]]] = field( 

139 default_factory=dict 

140 ) 

141 active_ledger_by_path_by_source_type: dict[str, dict[str, list[SourceLedgerRecord]]] = field( 

142 default_factory=dict 

143 ) 

144 file_hash_by_path: dict[str, str] = field(default_factory=dict) 

145 file_mtime_by_path: dict[str, str] = field(default_factory=dict) 

146 

147 def ledger_entry(self, source_id: str) -> SourceLedgerRecord | None: 

148 return self.ledger_by_source_id.get(source_id) 

149 

150 def active_ledger_entries(self, source_type: str) -> list[SourceLedgerRecord]: 

151 return self.active_ledger_by_source_type.get(source_type, []) 

152 

153 def ledger_by_physical_path(self, source_type: str) -> dict[str, list[SourceLedgerRecord]]: 

154 if source_type not in self.ledger_by_path_by_source_type: 

155 grouped: dict[str, list[SourceLedgerRecord]] = {} 

156 for row in self.ledger_by_source_id.values(): 

157 if row.source_type != source_type: 

158 continue 

159 physical_path = Path(_physical_path(row.source_path_or_alias)) 

160 grouped.setdefault(_normalized_cache_path_key(physical_path), []).append(row) 

161 self.ledger_by_path_by_source_type[source_type] = grouped 

162 return self.ledger_by_path_by_source_type[source_type] 

163 

164 def active_ledger_by_physical_path(self, source_type: str) -> dict[str, list[SourceLedgerRecord]]: 

165 if source_type not in self.active_ledger_by_path_by_source_type: 

166 grouped: dict[str, list[SourceLedgerRecord]] = {} 

167 for row in self.active_ledger_entries(source_type): 

168 physical_path = Path(_physical_path(row.source_path_or_alias)) 

169 grouped.setdefault(_normalized_cache_path_key(physical_path), []).append(row) 

170 self.active_ledger_by_path_by_source_type[source_type] = grouped 

171 return self.active_ledger_by_path_by_source_type[source_type] 

172 

173 def file_hash(self, path: Path) -> str: 

174 key = _cache_path_key(path) 

175 if key not in self.file_hash_by_path: 

176 self.file_hash_by_path[key] = _file_content_hash(path) 

177 return self.file_hash_by_path[key] 

178 

179 def file_mtime(self, path: Path) -> str: 

180 key = _cache_path_key(path) 

181 if key not in self.file_mtime_by_path: 

182 self.file_mtime_by_path[key] = _file_mtime_iso(path) 

183 return self.file_mtime_by_path[key] 

184 

185 

186def _cache_path_key(path: Path) -> str: 

187 try: 

188 return str(path.resolve()) 

189 except OSError: 

190 return str(path) 

191 

192 

193def _normalized_cache_path_key(path: Path) -> str: 

194 return os.path.normcase(os.path.normpath(_cache_path_key(path))) 

195 

196 

197def _load_run_cache(db_path: Path) -> _RefreshRunCache: 

198 cache = _RefreshRunCache() 

199 if not db_path.exists(): 

200 return cache 

201 with connect(db_path) as conn: 

202 rows = list_ledger_entries(conn) 

203 cache.ledger_by_source_id = {row.source_id: row for row in rows} 

204 active_by_type: dict[str, list[SourceLedgerRecord]] = {} 

205 for row in rows: 

206 if row.status == "active": 

207 active_by_type.setdefault(row.source_type, []).append(row) 

208 cache.active_ledger_by_source_type = active_by_type 

209 return cache 

210 

211 

212def _now_iso() -> str: 

213 return datetime.now(timezone.utc).isoformat() 

214 

215 

216def _file_content_hash(path: Path) -> str: 

217 """SHA-256 hash of file content for ledger comparison (streaming, no RAM spike).""" 

218 try: 

219 h = hashlib.sha256() 

220 with open(path, "rb") as f: 

221 while chunk := f.read(8 * 1024 * 1024): 

222 h.update(chunk) 

223 return h.hexdigest() 

224 except OSError: 

225 return "" 

226 

227 

228def _file_mtime_iso(path: Path) -> str: 

229 try: 

230 stat = path.stat() 

231 return datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat() 

232 except OSError: 

233 return _now_iso() 

234 

235 

236def _is_jsonl_stable(path: Path, stability_seconds: int) -> bool: 

237 """Return True if a .jsonl file has not been modified recently.""" 

238 if stability_seconds <= 0: 

239 return True 

240 try: 

241 mtime = path.stat().st_mtime 

242 return (_time.time() - mtime) >= stability_seconds 

243 except OSError: 

244 return True 

245 

246 

247def _record_text(record: IngestionRecord) -> str: 

248 """Build index text with metadata preamble, matching engine._record_text.""" 

249 meta: dict[str, object] = { 

250 "project": record.project, 

251 "source_type": record.source_type, 

252 "source_tool": record.source_tool, 

253 "source_path": record.source_path, 

254 "session_id": record.session_id, 

255 "created_at": record.created_at, 

256 "last_modified": record.last_modified, 

257 "privacy_scope": record.privacy_scope, 

258 **record.metadata, 

259 } 

260 meta = {k: v for k, v in meta.items() if v not in (None, "")} 

261 preamble = json.dumps(meta, ensure_ascii=False, sort_keys=True) 

262 return f"TRUENEX_INGESTION_METADATA {preamble}\n\n{record.text}" 

263 

264 

265def _index_record(record: IngestionRecord, repository: MemoryRepository) -> int: 

266 """Index a single ingestion record. Returns chunk count or 0 if empty.""" 

267 indexed_text = _record_text(record) 

268 max_chars = MAX_CHARS_BY_SOURCE_TYPE.get(record.source_type, DEFAULT_MAX_CHARS) 

269 chunks = chunk_text(indexed_text, max_chars=max_chars) 

270 if not chunks: 

271 return 0 

272 

273 # Agent sessions produce N exchange records from the same file — each needs 

274 # a distinct doc_id so exchanges don't overwrite each other in upsert_document. 

275 exchange_index = record.metadata.get("exchange_index") if record.metadata else None 

276 relative_path = ( 

277 f"{record.source_path}::exchange_{exchange_index}" 

278 if exchange_index is not None 

279 else record.source_path 

280 ) 

281 

282 with tempfile.NamedTemporaryFile( 

283 mode="w", suffix=".txt", encoding="utf-8", delete=False 

284 ) as tmp: 

285 tmp.write(indexed_text) 

286 tmp_path = Path(tmp.name) 

287 

288 try: 

289 repository.upsert_document( 

290 path=tmp_path, 

291 relative_path=relative_path, 

292 chunks=chunks, 

293 source_type=record.source_type, 

294 ) 

295 finally: 

296 try: 

297 tmp_path.unlink() 

298 except OSError: 

299 pass 

300 

301 return len(chunks) 

302 

303 

304def _record_source_id(record: IngestionRecord) -> str: 

305 """Deterministic ledger source_id for a parsed record. 

306 

307 For agent_session records each exchange has its own exchange_index in metadata, 

308 so we include it to avoid all exchanges of the same JSONL colliding on one ledger entry. 

309 """ 

310 exchange_index = record.metadata.get("exchange_index") if record.metadata else None 

311 if exchange_index is not None: 

312 return source_id(record.source_type, f"{record.source_path}::exchange_{exchange_index}") 

313 return source_id(record.source_type, record.source_path) 

314 

315 

316def _add_detail(report: RefreshReport, detail: dict[str, object]) -> None: 

317 report.details.append(detail) 

318 

319 

320# Main refresh entry point 

321 

322def refresh( 

323 catalog_path: Path, 

324 db_path: Path, 

325 *, 

326 dry_run: bool = False, 

327 stability_seconds: int = 120, 

328 embedder=None, 

329 vector_store=None, 

330) -> RefreshReport: 

331 """Run incremental global refresh. 

332 

333 Args: 

334 catalog_path: Path to the confirmed source catalog JSON. 

335 db_path: Path to the SQLite database. 

336 dry_run: If True, report planned actions without mutating DB/ledger/index. 

337 stability_seconds: Skip recently-modified .jsonl files (agent sessions). 

338 embedder: Optional embedder for vector indexing. 

339 vector_store: Optional vector store for vector indexing. 

340 

341 Returns: 

342 RefreshReport with counts and per-record details. 

343 """ 

344 report = RefreshReport() 

345 

346 # 1. Load catalog 

347 if not catalog_path.exists(): 

348 report.errors += 1 

349 _add_detail(report, { 

350 "source_path": str(catalog_path), 

351 "source_type": "catalog", 

352 "action": "error", 

353 "error": f"Catalog file not found: {catalog_path}", 

354 }) 

355 return report 

356 

357 catalog = SourceCatalog.load(catalog_path) 

358 confirmed = [e for e in catalog.entries if e.confirmation_status == "confirmed"] 

359 report.catalog_entries = len(confirmed) 

360 

361 if not confirmed: 

362 return report 

363 

364 # 2. Initialize DB (schema) 

365 if not dry_run: 

366 with connect(db_path) as conn: 

367 initialize_schema(conn) 

368 

369 repository = MemoryRepository(db_path, embedder=embedder, vector_store=vector_store) 

370 cache = _load_run_cache(db_path) 

371 

372 # 3. Process each confirmed entry 

373 for entry in confirmed: 

374 _process_catalog_entry(entry, report, repository, dry_run, stability_seconds, cache) 

375 

376 return report 

377 

378 

379def _process_catalog_entry( 

380 entry: CatalogEntry, 

381 report: RefreshReport, 

382 repository: MemoryRepository, 

383 dry_run: bool, 

384 stability_seconds: int, 

385 cache: _RefreshRunCache, 

386) -> None: 

387 """Process a single confirmed catalog entry.""" 

388 

389 # server_alias: always skip 

390 if entry.source_type == "server_alias": 

391 report.skipped += 1 

392 _add_detail(report, { 

393 "source_id": entry.id, 

394 "source_path": entry.path_or_alias, 

395 "source_type": entry.source_type, 

396 "action": "skipped", 

397 "reason": "server_alias not indexable", 

398 }) 

399 if not dry_run: 

400 _upsert_ledger_for_catalog_entry( 

401 repository.db_path, entry, status="skipped", 

402 error_message="server_alias: no network/SSH indexing", 

403 ) 

404 return 

405 

406 # Resolve parser 

407 parser = _parser_for_entry(entry) 

408 if parser is None: 

409 report.skipped += 1 

410 _add_detail(report, { 

411 "source_id": entry.id, 

412 "source_path": entry.path_or_alias, 

413 "source_type": entry.source_type, 

414 "action": "skipped", 

415 "reason": "no parser registered for catalog source type", 

416 }) 

417 if not dry_run: 

418 _upsert_ledger_for_catalog_entry( 

419 repository.db_path, 

420 entry, 

421 status="skipped", 

422 error_message="no parser registered for catalog source type", 

423 ) 

424 return 

425 source_dir = _resolve_source_dir(entry) 

426 

427 # Check source exists 

428 path = Path(entry.path_or_alias) 

429 if not path.exists(): 

430 if _is_nonlocal_absolute_path(entry.path_or_alias): 

431 report.skipped += 1 

432 _add_detail(report, { 

433 "source_id": entry.id, 

434 "source_path": entry.path_or_alias, 

435 "source_type": entry.source_type, 

436 "action": "skipped", 

437 "reason": "non-local absolute path not indexable on this platform", 

438 }) 

439 if not dry_run: 

440 _upsert_ledger_for_catalog_entry( 

441 repository.db_path, 

442 entry, 

443 status="skipped", 

444 error_message="non-local path: no local filesystem indexing", 

445 ) 

446 return 

447 

448 report.missing += 1 

449 _add_detail(report, { 

450 "source_id": entry.id, 

451 "source_path": entry.path_or_alias, 

452 "source_type": entry.source_type, 

453 "action": "missing", 

454 "reason": "source path not found", 

455 }) 

456 if not dry_run: 

457 _upsert_ledger_for_catalog_entry( 

458 repository.db_path, entry, status="missing", 

459 error_message="source path not found", 

460 ) 

461 _mark_missing_previous_records( 

462 entry, 

463 report, 

464 repository.db_path, 

465 set(), 

466 dry_run, 

467 cache, 

468 ) 

469 return 

470 

471 # Run parser 

472 project_name = entry.project_name or _infer_name(entry.path_or_alias) 

473 source_tool = entry.discovered_from[0] if entry.discovered_from else "global-refresh" 

474 privacy_scope = entry.privacy_scope 

475 

476 if entry.source_type == "agent_root": 

477 _process_agent_root_entry( 

478 entry, 

479 report, 

480 repository, 

481 dry_run, 

482 stability_seconds, 

483 cache, 

484 source_dir, 

485 project_name, 

486 source_tool, 

487 privacy_scope, 

488 ) 

489 return 

490 

491 if entry.source_type in {"project_root", "document"}: 

492 _process_project_docs_entry( 

493 entry, 

494 report, 

495 repository, 

496 dry_run, 

497 stability_seconds, 

498 cache, 

499 source_dir, 

500 project_name, 

501 source_tool, 

502 privacy_scope, 

503 ) 

504 return 

505 

506 try: 

507 records = parser(source_dir, project_name, source_tool, privacy_scope) 

508 except Exception as exc: 

509 report.errors += 1 

510 _add_detail(report, { 

511 "source_id": entry.id, 

512 "source_path": entry.path_or_alias, 

513 "source_type": entry.source_type, 

514 "action": "error", 

515 "error": f"{type(exc).__name__}: {exc}", 

516 }) 

517 if not dry_run: 

518 _upsert_ledger_for_catalog_entry( 

519 repository.db_path, entry, status="error", 

520 error_message=f"{type(exc).__name__}: {exc}", 

521 ) 

522 return 

523 

524 if not records: 

525 report.skipped += 1 

526 _add_detail(report, { 

527 "source_id": entry.id, 

528 "source_path": entry.path_or_alias, 

529 "source_type": entry.source_type, 

530 "action": "skipped", 

531 "reason": "no indexable records", 

532 }) 

533 if not dry_run: 

534 _upsert_ledger_for_catalog_entry( 

535 repository.db_path, 

536 entry, 

537 status="skipped", 

538 error_message="no indexable records", 

539 ) 

540 

541 seen_source_ids: set[str] = set() 

542 

543 # Process each record from the parser 

544 for record in records: 

545 seen_source_ids.add(_record_source_id(record)) 

546 _process_record(record, entry, report, repository, dry_run, stability_seconds, cache) 

547 

548 _mark_missing_previous_records( 

549 entry, 

550 report, 

551 repository.db_path, 

552 seen_source_ids, 

553 dry_run, 

554 cache, 

555 ) 

556 

557 

558def _parser_for_entry(entry: CatalogEntry): 

559 """Map catalog source_type to registered parser name.""" 

560 if entry.source_type == "project_root": 

561 return get_parser("project_docs") 

562 if entry.source_type == "document": 

563 return get_parser("project_docs") 

564 if entry.source_type == "agent_root": 

565 return get_parser("agent_session") 

566 return None 

567 

568 

569def _parser_source_type_for_entry(entry: CatalogEntry) -> str | None: 

570 """Return the ledger source_type produced by the parser for a catalog entry.""" 

571 if entry.source_type in {"project_root", "document"}: 

572 return "project_docs" 

573 if entry.source_type == "agent_root": 

574 return "agent_session" 

575 return None 

576 

577 

578def _is_nonlocal_absolute_path(path_or_alias: str) -> bool: 

579 """Detect POSIX absolute paths that cannot be indexed on Windows as local files.""" 

580 if os.name != "nt": 

581 return False 

582 cleaned = path_or_alias.strip() 

583 if not cleaned.startswith("/") or cleaned.startswith("//"): 

584 return False 

585 return not Path(cleaned).exists() 

586 

587 

588def _resolve_source_dir(entry: CatalogEntry) -> Path: 

589 return Path(entry.path_or_alias) 

590 

591 

592def _infer_name(path_or_alias: str) -> str: 

593 cleaned = path_or_alias.strip().replace("\\", "/").rstrip("/") 

594 if not cleaned: 

595 return "unknown" 

596 return cleaned.rsplit("/", 1)[-1] or "unknown" 

597 

598 

599def _process_agent_root_entry( 

600 entry: CatalogEntry, 

601 report: RefreshReport, 

602 repository: MemoryRepository, 

603 dry_run: bool, 

604 stability_seconds: int, 

605 cache: _RefreshRunCache, 

606 source_dir: Path, 

607 project_name: str, 

608 source_tool: str, 

609 privacy_scope: str, 

610) -> None: 

611 parser = get_parser("agent_session") 

612 if parser is None: 

613 report.skipped += 1 

614 _add_detail(report, { 

615 "source_id": entry.id, 

616 "source_path": entry.path_or_alias, 

617 "source_type": entry.source_type, 

618 "action": "skipped", 

619 "reason": "no parser registered for catalog source type", 

620 }) 

621 return 

622 

623 session_files = _iter_agent_session_files(source_dir) 

624 if not session_files: 

625 report.skipped += 1 

626 _add_detail(report, { 

627 "source_id": entry.id, 

628 "source_path": entry.path_or_alias, 

629 "source_type": entry.source_type, 

630 "action": "skipped", 

631 "reason": "no indexable records", 

632 }) 

633 if not dry_run: 

634 _upsert_ledger_for_catalog_entry( 

635 repository.db_path, 

636 entry, 

637 status="skipped", 

638 error_message="no indexable records", 

639 ) 

640 _mark_missing_previous_records( 

641 entry, 

642 report, 

643 repository.db_path, 

644 set(), 

645 dry_run, 

646 cache, 

647 ) 

648 return 

649 

650 active_by_path = cache.active_ledger_by_physical_path("agent_session") 

651 ledger_by_path = cache.ledger_by_physical_path("agent_session") 

652 seen_source_ids: set[str] = set() 

653 processed_records = 0 

654 parse_errors = 0 

655 

656 for session_file in session_files: 

657 mtime = cache.file_mtime(session_file) 

658 path_key = _normalized_cache_path_key(session_file) 

659 previous = active_by_path.get(path_key, []) 

660 all_previous = ledger_by_path.get(path_key, []) 

661 can_skip_full_parse = _all_previous_records_are_active(all_previous, previous) 

662 if can_skip_full_parse and _active_records_match_mtime(previous, mtime): 

663 report.unchanged += len(previous) 

664 processed_records += len(previous) 

665 seen_source_ids.update(row.source_id for row in previous) 

666 _add_detail(report, { 

667 "source_path": str(session_file.resolve()), 

668 "source_type": "agent_session", 

669 "action": "unchanged", 

670 "records": len(previous), 

671 "reason": "unchanged session file skipped before hashing", 

672 }) 

673 continue 

674 

675 if can_skip_full_parse: 

676 file_hash = cache.file_hash(session_file) 

677 if all(row.content_hash == file_hash for row in previous): 

678 report.unchanged += len(previous) 

679 processed_records += len(previous) 

680 seen_source_ids.update(row.source_id for row in previous) 

681 _refresh_unchanged_ledger_mtime( 

682 repository.db_path, 

683 previous, 

684 mtime, 

685 dry_run, 

686 ) 

687 _add_detail(report, { 

688 "source_path": str(session_file.resolve()), 

689 "source_type": "agent_session", 

690 "action": "unchanged", 

691 "records": len(previous), 

692 "reason": "unchanged session file skipped before parsing", 

693 }) 

694 continue 

695 

696 try: 

697 records = parser(session_file, project_name, source_tool, privacy_scope) 

698 except Exception as exc: 

699 report.errors += 1 

700 parse_errors += 1 

701 _add_detail(report, { 

702 "source_id": entry.id, 

703 "source_path": str(session_file.resolve()), 

704 "source_type": "agent_session", 

705 "action": "error", 

706 "error": f"{type(exc).__name__}: {exc}", 

707 }) 

708 continue 

709 

710 if not records: 

711 report.skipped += 1 

712 _add_detail(report, { 

713 "source_path": str(session_file.resolve()), 

714 "source_type": "agent_session", 

715 "action": "skipped", 

716 "reason": "no indexable records", 

717 }) 

718 continue 

719 

720 for record in records: 

721 processed_records += 1 

722 seen_source_ids.add(_record_source_id(record)) 

723 _process_record(record, entry, report, repository, dry_run, stability_seconds, cache) 

724 

725 if processed_records == 0 and parse_errors == 0 and not dry_run: 

726 _upsert_ledger_for_catalog_entry( 

727 repository.db_path, 

728 entry, 

729 status="skipped", 

730 error_message="no indexable records", 

731 ) 

732 

733 _mark_missing_previous_records( 

734 entry, 

735 report, 

736 repository.db_path, 

737 seen_source_ids, 

738 dry_run, 

739 cache, 

740 ) 

741 

742 

743def _iter_agent_session_files(source_dir: Path) -> list[Path]: 

744 resolved = source_dir.resolve() 

745 if resolved.is_file(): 

746 if resolved.suffix.lower() in SUPPORTED_AGENT_SESSION_EXTENSIONS: 

747 return [resolved] 

748 return [] 

749 return sorted( 

750 p for p in resolved.rglob("*") 

751 if p.is_file() and p.suffix.lower() in SUPPORTED_AGENT_SESSION_EXTENSIONS 

752 ) 

753 

754 

755def _process_project_docs_entry( 

756 entry: CatalogEntry, 

757 report: RefreshReport, 

758 repository: MemoryRepository, 

759 dry_run: bool, 

760 stability_seconds: int, 

761 cache: _RefreshRunCache, 

762 source_dir: Path, 

763 project_name: str, 

764 source_tool: str, 

765 privacy_scope: str, 

766) -> None: 

767 parser = get_parser("project_docs") 

768 if parser is None: 

769 report.skipped += 1 

770 _add_detail(report, { 

771 "source_id": entry.id, 

772 "source_path": entry.path_or_alias, 

773 "source_type": entry.source_type, 

774 "action": "skipped", 

775 "reason": "no parser registered for catalog source type", 

776 }) 

777 return 

778 

779 candidates = _iter_project_doc_files(source_dir) 

780 if not candidates: 

781 report.skipped += 1 

782 _add_detail(report, { 

783 "source_id": entry.id, 

784 "source_path": entry.path_or_alias, 

785 "source_type": entry.source_type, 

786 "action": "skipped", 

787 "reason": "no indexable records", 

788 }) 

789 if not dry_run: 

790 _upsert_ledger_for_catalog_entry( 

791 repository.db_path, 

792 entry, 

793 status="skipped", 

794 error_message="no indexable records", 

795 ) 

796 _mark_missing_previous_records( 

797 entry, 

798 report, 

799 repository.db_path, 

800 set(), 

801 dry_run, 

802 cache, 

803 ) 

804 return 

805 

806 active_by_path = cache.active_ledger_by_physical_path("project_docs") 

807 seen_source_ids: set[str] = set() 

808 processed_records = 0 

809 parse_errors = 0 

810 

811 for file_path in candidates: 

812 mtime = cache.file_mtime(file_path) 

813 previous = active_by_path.get(_normalized_cache_path_key(file_path), []) 

814 if len(previous) == 1 and _active_records_match_mtime(previous, mtime): 

815 ledger_record = previous[0] 

816 report.unchanged += 1 

817 processed_records += 1 

818 seen_source_ids.add(ledger_record.source_id) 

819 _add_detail(report, { 

820 "source_id": ledger_record.source_id, 

821 "source_path": ledger_record.source_path_or_alias, 

822 "source_type": ledger_record.source_type, 

823 "action": "unchanged", 

824 "reason": "unchanged document skipped before hashing", 

825 }) 

826 continue 

827 

828 file_hash = cache.file_hash(file_path) 

829 if len(previous) == 1 and previous[0].content_hash == file_hash: 

830 ledger_record = previous[0] 

831 report.unchanged += 1 

832 processed_records += 1 

833 seen_source_ids.add(ledger_record.source_id) 

834 _refresh_unchanged_ledger_mtime( 

835 repository.db_path, 

836 [ledger_record], 

837 mtime, 

838 dry_run, 

839 ) 

840 _add_detail(report, { 

841 "source_id": ledger_record.source_id, 

842 "source_path": ledger_record.source_path_or_alias, 

843 "source_type": ledger_record.source_type, 

844 "action": "unchanged", 

845 "reason": "unchanged document skipped before parsing", 

846 }) 

847 continue 

848 

849 try: 

850 records = parser(file_path, project_name, source_tool, privacy_scope) 

851 except Exception as exc: 

852 report.errors += 1 

853 parse_errors += 1 

854 _add_detail(report, { 

855 "source_id": entry.id, 

856 "source_path": str(file_path.resolve()), 

857 "source_type": "project_docs", 

858 "action": "error", 

859 "error": f"{type(exc).__name__}: {exc}", 

860 }) 

861 continue 

862 

863 if not records: 

864 continue 

865 

866 for record in records: 

867 processed_records += 1 

868 seen_source_ids.add(_record_source_id(record)) 

869 _process_record(record, entry, report, repository, dry_run, stability_seconds, cache) 

870 

871 if processed_records == 0 and parse_errors == 0: 

872 report.skipped += 1 

873 _add_detail(report, { 

874 "source_id": entry.id, 

875 "source_path": entry.path_or_alias, 

876 "source_type": entry.source_type, 

877 "action": "skipped", 

878 "reason": "no indexable records", 

879 }) 

880 if not dry_run: 

881 _upsert_ledger_for_catalog_entry( 

882 repository.db_path, 

883 entry, 

884 status="skipped", 

885 error_message="no indexable records", 

886 ) 

887 

888 _mark_missing_previous_records( 

889 entry, 

890 report, 

891 repository.db_path, 

892 seen_source_ids, 

893 dry_run, 

894 cache, 

895 ) 

896 

897 

898def _iter_project_doc_files(source_dir: Path) -> list[Path]: 

899 from truenex_memory.ingestion.parsers.text_docs import ( 

900 EXCLUDED_FILENAMES, 

901 INDEX_EXTENSIONS, 

902 _iter_candidate_files, 

903 ) 

904 

905 resolved = source_dir.resolve() 

906 if resolved.is_file(): 

907 if ( 

908 resolved.suffix.lower() in INDEX_EXTENSIONS 

909 and resolved.name not in EXCLUDED_FILENAMES 

910 ): 

911 return [resolved] 

912 return [] 

913 return [ 

914 p for p in _iter_candidate_files(resolved) 

915 if p.suffix.lower() in INDEX_EXTENSIONS 

916 and p.name not in EXCLUDED_FILENAMES 

917 ] 

918 

919 

920def _active_records_match_mtime( 

921 records: list[SourceLedgerRecord], 

922 last_modified_at: str, 

923) -> bool: 

924 return bool(records) and all( 

925 row.content_hash and row.last_modified_at == last_modified_at 

926 for row in records 

927 ) 

928 

929 

930def _all_previous_records_are_active( 

931 all_previous: list[SourceLedgerRecord], 

932 active_previous: list[SourceLedgerRecord], 

933) -> bool: 

934 if not all_previous or len(all_previous) != len(active_previous): 

935 return False 

936 return all(row.status == "active" for row in all_previous) 

937 

938 

939def _refresh_unchanged_ledger_mtime( 

940 db_path: Path, 

941 records: list[SourceLedgerRecord], 

942 last_modified_at: str, 

943 dry_run: bool, 

944) -> None: 

945 if dry_run or not records: 

946 return 

947 with connect(db_path) as conn: 

948 initialize_schema(conn) 

949 for row in records: 

950 upsert_ledger_entry( 

951 conn, 

952 row.source_id, 

953 row.source_path_or_alias, 

954 row.source_type, 

955 project_name=row.project_name, 

956 parser_version=row.parser_version, 

957 content_hash=row.content_hash, 

958 last_modified_at=last_modified_at, 

959 last_indexed_at=row.last_indexed_at, 

960 status=row.status, 

961 error_message=row.error_message, 

962 chunk_count=row.chunk_count, 

963 ) 

964 

965 

966def _process_record( 

967 record: IngestionRecord, 

968 entry: CatalogEntry, 

969 report: RefreshReport, 

970 repository: MemoryRepository, 

971 dry_run: bool, 

972 stability_seconds: int, 

973 cache: _RefreshRunCache, 

974) -> None: 

975 """Check ledger for a single parsed record and index if new/changed.""" 

976 

977 file_path = Path(record.source_path) 

978 rec_source_id = _record_source_id(record) 

979 file_hash = cache.file_hash(file_path) 

980 mtime = cache.file_mtime(file_path) 

981 

982 # Check ledger before stability handling. If a JSONL session already has a 

983 # previous active version, an unstable write must leave that version active. 

984 existing = cache.ledger_entry(rec_source_id) 

985 

986 # Stability check for .jsonl agent sessions 

987 if record.source_type == "agent_session" and file_path.suffix.lower() == ".jsonl": 

988 if not _is_jsonl_stable(file_path, stability_seconds): 

989 report.skipped += 1 

990 _add_detail(report, { 

991 "source_id": rec_source_id, 

992 "source_path": record.source_path, 

993 "source_type": record.source_type, 

994 "action": "skipped", 

995 "reason": "JSONL modified recently, not yet stable", 

996 }) 

997 if not dry_run and (existing is None or existing.status != "active"): 

998 _upsert_record_ledger( 

999 repository.db_path, rec_source_id, record, entry, 

1000 status="skipped", content_hash=file_hash, 

1001 last_modified_at=mtime, chunk_count=0, 

1002 error_message="JSONL modified recently, not yet stable", 

1003 ) 

1004 return 

1005 

1006 if existing is None: 

1007 # New record 

1008 if not file_path.exists(): 

1009 report.missing += 1 

1010 _add_detail(report, { 

1011 "source_id": rec_source_id, 

1012 "source_path": record.source_path, 

1013 "source_type": record.source_type, 

1014 "action": "missing", 

1015 }) 

1016 if not dry_run: 

1017 _upsert_record_ledger( 

1018 repository.db_path, rec_source_id, record, entry, 

1019 status="missing", content_hash="", 

1020 last_modified_at=mtime, chunk_count=0, 

1021 error_message="source file not found", 

1022 ) 

1023 else: 

1024 if not dry_run: 

1025 try: 

1026 chunk_count = _index_record(record, repository) 

1027 except Exception as exc: 

1028 _record_index_error( 

1029 repository.db_path, rec_source_id, record, entry, 

1030 report, existing=None, content_hash=file_hash, 

1031 last_modified_at=mtime, error=exc, 

1032 ) 

1033 return 

1034 report.indexed_records += 1 

1035 _upsert_record_ledger( 

1036 repository.db_path, rec_source_id, record, entry, 

1037 status="active", content_hash=file_hash, 

1038 last_modified_at=mtime, chunk_count=chunk_count, 

1039 ) 

1040 report.new += 1 

1041 _add_detail(report, { 

1042 "source_id": rec_source_id, 

1043 "source_path": record.source_path, 

1044 "source_type": record.source_type, 

1045 "action": "new", 

1046 }) 

1047 return 

1048 

1049 # Existing ledger record 

1050 if existing.status == "active" and existing.content_hash == file_hash: 

1051 # Unchanged 

1052 report.unchanged += 1 

1053 _add_detail(report, { 

1054 "source_id": rec_source_id, 

1055 "source_path": record.source_path, 

1056 "source_type": record.source_type, 

1057 "action": "unchanged", 

1058 }) 

1059 return 

1060 

1061 # Existing non-active records were never successfully indexed as an active 

1062 # version. Once they become indexable, report them as new rather than 

1063 # modified. 

1064 action = "modified" if existing.status == "active" else "new" 

1065 if action == "new": 

1066 report.new += 1 

1067 else: 

1068 report.modified += 1 

1069 

1070 # Changed content or retry from a non-active status. 

1071 if not file_path.exists(): 

1072 if action == "new": 

1073 report.new -= 1 

1074 else: 

1075 report.modified -= 1 

1076 report.missing += 1 

1077 _add_detail(report, { 

1078 "source_id": rec_source_id, 

1079 "source_path": record.source_path, 

1080 "source_type": record.source_type, 

1081 "action": "missing", 

1082 }) 

1083 if not dry_run: 

1084 _upsert_record_ledger( 

1085 repository.db_path, rec_source_id, record, entry, 

1086 status="missing", content_hash="", 

1087 last_modified_at=mtime, chunk_count=0, 

1088 error_message="source file no longer exists", 

1089 ) 

1090 else: 

1091 if not dry_run: 

1092 try: 

1093 chunk_count = _index_record(record, repository) 

1094 except Exception as exc: 

1095 if action == "new": 

1096 report.new -= 1 

1097 else: 

1098 report.modified -= 1 

1099 _record_index_error( 

1100 repository.db_path, rec_source_id, record, entry, 

1101 report, existing=existing, content_hash=file_hash, 

1102 last_modified_at=mtime, error=exc, 

1103 ) 

1104 return 

1105 report.indexed_records += 1 

1106 _upsert_record_ledger( 

1107 repository.db_path, rec_source_id, record, entry, 

1108 status="active", content_hash=file_hash, 

1109 last_modified_at=mtime, chunk_count=chunk_count, 

1110 ) 

1111 _add_detail(report, { 

1112 "source_id": rec_source_id, 

1113 "source_path": record.source_path, 

1114 "source_type": record.source_type, 

1115 "action": action, 

1116 "previous_hash": existing.content_hash, 

1117 "new_hash": file_hash, 

1118 }) 

1119 

1120 

1121def _record_index_error( 

1122 db_path: Path, 

1123 rec_source_id: str, 

1124 record: IngestionRecord, 

1125 entry: CatalogEntry, 

1126 report: RefreshReport, 

1127 *, 

1128 existing: SourceLedgerRecord | None, 

1129 content_hash: str, 

1130 last_modified_at: str, 

1131 error: Exception, 

1132) -> None: 

1133 """Record an index failure without replacing a previous active version.""" 

1134 message = str(error) or error.__class__.__name__ 

1135 report.errors += 1 

1136 preserved_active = existing is not None and existing.status == "active" 

1137 _add_detail(report, { 

1138 "source_id": rec_source_id, 

1139 "source_path": record.source_path, 

1140 "source_type": record.source_type, 

1141 "action": "error", 

1142 "reason": message, 

1143 "previous_status": existing.status if existing is not None else None, 

1144 "preserved_previous_active": preserved_active, 

1145 }) 

1146 if preserved_active: 

1147 with connect(db_path) as conn: 

1148 initialize_schema(conn) 

1149 update_ledger_status( 

1150 conn, rec_source_id, "error", error_message=message, 

1151 ) 

1152 return 

1153 _upsert_record_ledger( 

1154 db_path, rec_source_id, record, entry, 

1155 status="error", content_hash=content_hash, 

1156 last_modified_at=last_modified_at, chunk_count=0, 

1157 error_message=message, 

1158 ) 

1159 

1160 

1161def _upsert_ledger_for_catalog_entry( 

1162 db_path: Path, 

1163 entry: CatalogEntry, 

1164 *, 

1165 status: str, 

1166 error_message: str | None = None, 

1167) -> None: 

1168 """Write a catalog-level entry into the source ledger.""" 

1169 with connect(db_path) as conn: 

1170 initialize_schema(conn) 

1171 upsert_ledger_entry( 

1172 conn, 

1173 entry.id, 

1174 entry.path_or_alias, 

1175 entry.source_type, 

1176 project_name=entry.project_name, 

1177 status=status, 

1178 content_hash=None, 

1179 last_modified_at=None, 

1180 last_indexed_at=_now_iso() if status in ("active",) else None, 

1181 error_message=error_message, 

1182 chunk_count=0, 

1183 ) 

1184 

1185 

1186def _qualified_source_path(record: IngestionRecord) -> str: 

1187 """Return the path stored in source_ledger.source_path_or_alias. 

1188 

1189 For agent_session exchanges this must match documents.path (which uses the 

1190 ::exchange_N suffix) so that the JOIN in _iter_candidates finds rows. 

1191 """ 

1192 exchange_index = record.metadata.get("exchange_index") if record.metadata else None 

1193 if exchange_index is not None: 

1194 return f"{record.source_path}::exchange_{exchange_index}" 

1195 return record.source_path 

1196 

1197 

1198def _upsert_record_ledger( 

1199 db_path: Path, 

1200 rec_source_id: str, 

1201 record: IngestionRecord, 

1202 entry: CatalogEntry, 

1203 *, 

1204 status: str, 

1205 content_hash: str, 

1206 last_modified_at: str, 

1207 chunk_count: int, 

1208 error_message: str | None = None, 

1209) -> None: 

1210 """Write a file-level entry into the source ledger.""" 

1211 with connect(db_path) as conn: 

1212 initialize_schema(conn) 

1213 upsert_ledger_entry( 

1214 conn, 

1215 rec_source_id, 

1216 _qualified_source_path(record), 

1217 record.source_type, 

1218 project_name=entry.project_name or record.project, 

1219 status=status, 

1220 content_hash=content_hash, 

1221 last_modified_at=last_modified_at, 

1222 last_indexed_at=_now_iso() if status == "active" else None, 

1223 error_message=error_message, 

1224 chunk_count=chunk_count, 

1225 ) 

1226 

1227 

1228def _mark_missing_previous_records( 

1229 entry: CatalogEntry, 

1230 report: RefreshReport, 

1231 db_path: Path, 

1232 seen_source_ids: set[str], 

1233 dry_run: bool, 

1234 cache: _RefreshRunCache, 

1235) -> None: 

1236 """Mark active ledger records missing when a previously indexed file disappears.""" 

1237 parser_source_type = _parser_source_type_for_entry(entry) 

1238 if parser_source_type is None or not db_path.exists(): 

1239 return 

1240 

1241 for ledger_record in _active_ledger_records_for_entry( 

1242 entry, 

1243 parser_source_type, 

1244 cache, 

1245 ): 

1246 if ledger_record.source_id in seen_source_ids: 

1247 continue 

1248 if Path(_physical_path(ledger_record.source_path_or_alias)).exists(): 

1249 continue 

1250 

1251 report.missing += 1 

1252 _add_detail(report, { 

1253 "source_id": ledger_record.source_id, 

1254 "source_path": ledger_record.source_path_or_alias, 

1255 "source_type": ledger_record.source_type, 

1256 "action": "missing", 

1257 "reason": "previously indexed source file no longer exists", 

1258 }) 

1259 if not dry_run: 

1260 with connect(db_path) as conn: 

1261 initialize_schema(conn) 

1262 upsert_ledger_entry( 

1263 conn, 

1264 ledger_record.source_id, 

1265 ledger_record.source_path_or_alias, 

1266 ledger_record.source_type, 

1267 project_name=ledger_record.project_name, 

1268 parser_version=ledger_record.parser_version, 

1269 content_hash=ledger_record.content_hash, 

1270 last_modified_at=ledger_record.last_modified_at, 

1271 last_indexed_at=ledger_record.last_indexed_at, 

1272 status="missing", 

1273 error_message="previously indexed source file no longer exists", 

1274 chunk_count=ledger_record.chunk_count, 

1275 ) 

1276 

1277 

1278def _active_ledger_records_for_entry( 

1279 entry: CatalogEntry, 

1280 parser_source_type: str, 

1281 cache: _RefreshRunCache, 

1282) -> list[SourceLedgerRecord]: 

1283 """Return active ledger records physically under a catalog entry path.""" 

1284 grouped = cache.active_ledger_by_physical_path(parser_source_type) 

1285 source_key = _normalized_cache_path_key(Path(entry.path_or_alias)) 

1286 

1287 if entry.source_type == "document": 

1288 return grouped.get(source_key, []) 

1289 

1290 if entry.source_type not in {"project_root", "agent_root"}: 

1291 return [] 

1292 

1293 source_prefix = source_key.rstrip("\\/") + os.sep 

1294 records: list[SourceLedgerRecord] = [] 

1295 for path_key, rows in grouped.items(): 

1296 if path_key == source_key or path_key.startswith(source_prefix): 

1297 records.extend(rows) 

1298 return records 

1299 

1300 

1301def _physical_path(path_or_alias: str) -> str: 

1302 """Strip ::exchange_N virtual suffix to get the real filesystem path.""" 

1303 sep = path_or_alias.find("::") 

1304 return path_or_alias[:sep] if sep != -1 else path_or_alias 

1305 

1306 

1307def _ledger_record_belongs_to_entry(record_path: str, entry: CatalogEntry) -> bool: 

1308 record = Path(_physical_path(record_path)).resolve() 

1309 source = Path(entry.path_or_alias).resolve() 

1310 if entry.source_type == "document": 

1311 return record == source 

1312 if entry.source_type in {"project_root", "agent_root"}: 

1313 try: 

1314 record.relative_to(source) 

1315 return True 

1316 except ValueError: 

1317 return False 

1318 return False 

1319 

1320 

1321# Text formatting 

1322 

1323def format_refresh_report(report: RefreshReport) -> str: 

1324 """Format a RefreshReport as a human-readable string.""" 

1325 summary = report.detail_summary() 

1326 lines: list[str] = [ 

1327 "Refresh completed", 

1328 "", 

1329 f" New: {report.new}", 

1330 f" Modified: {report.modified}", 

1331 f" Unchanged: {report.unchanged}", 

1332 f" Skipped: {report.skipped}", 

1333 f" Missing: {report.missing}", 

1334 f" Errors: {report.errors}", 

1335 f" Indexed records: {report.indexed_records}", 

1336 f" Catalog entries: {report.catalog_entries}", 

1337 f" Detail rows: {summary['total']}", 

1338 ] 

1339 by_action = summary.get("by_action", {}) 

1340 if by_action: 

1341 lines.append( 

1342 " Detail by action: " 

1343 + " ".join(f"{key}={value}" for key, value in by_action.items()) 

1344 ) 

1345 by_source_type = summary.get("by_source_type", {}) 

1346 if by_source_type: 

1347 lines.append( 

1348 " Detail by source_type: " 

1349 + " ".join(f"{key}={value}" for key, value in by_source_type.items()) 

1350 ) 

1351 top_reasons = summary.get("top_reasons", []) 

1352 if top_reasons: 

1353 reason_parts = [ 

1354 f"{item['count']}x {item['reason']}" for item in top_reasons[:5] 

1355 ] 

1356 lines.append(" Top reasons: " + "; ".join(reason_parts)) 

1357 if report.refresh_skipped: 

1358 lines.append(" Refresh skipped: yes") 

1359 if ( 

1360 report.auto_memory_candidates 

1361 or report.auto_memory_created 

1362 or report.auto_memory_duplicates 

1363 or report.auto_memory_duplicate_active 

1364 or report.auto_memory_duplicate_unverified 

1365 or report.auto_memory_duplicate_rejected 

1366 or report.auto_memory_low_confidence 

1367 or report.auto_memory_limit_skipped 

1368 or report.auto_memory_source_limit_skipped 

1369 or report.auto_memory_non_document_skipped 

1370 or report.auto_memory_noisy_session_skipped 

1371 ): 

1372 lines.extend([ 

1373 f" Auto-memory candidates: {report.auto_memory_candidates}", 

1374 f" Auto-memory created: {report.auto_memory_created}", 

1375 f" Auto-memory duplicates skipped: {report.auto_memory_duplicates}", 

1376 f" Auto-memory active duplicates skipped: {report.auto_memory_duplicate_active}", 

1377 f" Auto-memory unverified duplicates skipped: {report.auto_memory_duplicate_unverified}", 

1378 f" Auto-memory rejected duplicates skipped: {report.auto_memory_duplicate_rejected}", 

1379 f" Auto-memory low-confidence skipped: {report.auto_memory_low_confidence}", 

1380 f" Auto-memory limit skipped: {report.auto_memory_limit_skipped}", 

1381 f" Auto-memory source-limit skipped: {report.auto_memory_source_limit_skipped}", 

1382 f" Auto-memory non-document skipped: {report.auto_memory_non_document_skipped}", 

1383 f" Auto-memory noisy-session skipped: {report.auto_memory_noisy_session_skipped}", 

1384 ]) 

1385 return "\n".join(lines)