Coverage for src \ truenex_memory \ ingestion \ global_context.py: 82%
379 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-19 10:21 +0200
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-19 10:21 +0200
1"""Read-only project context command for the Truenex Memory global store.
3Resolves a project from the confirmed source catalog and reads the SQLite
4global DB/ledger/index without mutating anything.
5"""
7from __future__ import annotations
9from dataclasses import dataclass, field
10from pathlib import Path
11import sqlite3
12import json
15# ── Report dataclass ──────────────────────────────────────────────────
17@dataclass
18class ProjectContextReport:
19 project_query: str
20 catalog_path: str
21 db_path: str
23 resolved: bool = False
24 resolution_method: str | None = None
25 resolution_notes: str | None = None
27 catalog_roots: list[dict[str, object]] = field(default_factory=list)
28 catalog_documents: list[dict[str, object]] = field(default_factory=list)
29 catalog_server_aliases: list[dict[str, object]] = field(default_factory=list)
31 ledger_entries: list[dict[str, object]] = field(default_factory=list)
32 indexed_documents: list[dict[str, object]] = field(default_factory=list)
33 indexed_chunks: list[dict[str, object]] = field(default_factory=list)
34 memory_nodes: list[dict[str, object]] = field(default_factory=list)
36 ambiguous_candidates: list[str] = field(default_factory=list)
38 warnings: list[str] = field(default_factory=list)
40 def to_dict(self) -> dict[str, object]:
41 return {
42 "project_query": self.project_query,
43 "catalog_path": self.catalog_path,
44 "db_path": self.db_path,
45 "resolved": self.resolved,
46 "resolution_method": self.resolution_method,
47 "resolution_notes": self.resolution_notes,
48 "catalog": {
49 "roots": self.catalog_roots,
50 "documents": self.catalog_documents,
51 "server_aliases": self.catalog_server_aliases,
52 },
53 "ledger": self.ledger_entries,
54 "indexed": {
55 "documents": self.indexed_documents,
56 "chunks": self.indexed_chunks,
57 },
58 "memory_nodes": self.memory_nodes,
59 "ambiguous_candidates": self.ambiguous_candidates,
60 "warnings": self.warnings,
61 }
64# ── Build function ────────────────────────────────────────────────────
66def build_project_context(
67 project_query: str,
68 catalog_path: Path,
69 db_path: Path,
70 *,
71 limit: int = 20,
72) -> ProjectContextReport:
73 """Build a read-only ProjectContextReport for *project_query*.
75 Never creates directories, databases, catalog files, or ledger rows.
76 """
77 report = ProjectContextReport(
78 project_query=project_query,
79 catalog_path=str(catalog_path),
80 db_path=str(db_path),
81 )
83 # 1. Check catalog exists and read it
84 if not catalog_path.exists():
85 report.warnings.append(f"Catalog not found: {catalog_path}")
86 if not db_path.exists():
87 report.warnings.append(f"Database not found: {db_path}")
88 return report
90 try:
91 catalog_data = json.loads(catalog_path.read_text(encoding="utf-8"))
92 except (json.JSONDecodeError, OSError):
93 report.warnings.append(f"Catalog exists but is invalid/unreadable: {catalog_path}")
94 return report
96 if not isinstance(catalog_data, dict):
97 report.warnings.append(f"Catalog must be a JSON object: {catalog_path}")
98 return report
100 raw_entries = catalog_data.get("entries", [])
101 if not isinstance(raw_entries, list):
102 report.warnings.append("Catalog entries is not a list")
103 return report
105 # 2. Collect confirmed entries only
106 confirmed: list[dict] = []
107 for entry in raw_entries:
108 if not isinstance(entry, dict):
109 report.warnings.append("Catalog contains non-object entries")
110 continue
111 if entry.get("confirmation_status") == "confirmed":
112 confirmed.append(entry)
114 if not confirmed:
115 report.warnings.append("No confirmed entries in catalog")
116 return report
118 # 3. Resolve the project query against confirmed entries
119 matched_roots, matched_docs, matched_servers, resolution_method, resolution_notes, ambiguous = (
120 _resolve_project(project_query, confirmed)
121 )
123 if ambiguous:
124 report.ambiguous_candidates = ambiguous
125 report.warnings.append(
126 f"Ambiguous project query '{project_query}' matches {len(ambiguous)} "
127 f"candidates: {', '.join(ambiguous[:10])}"
128 )
129 # Still return what we can, but mark unresolved
130 return report
132 if not matched_roots:
133 report.warnings.append(
134 f"Project '{project_query}' not found in confirmed catalog entries"
135 )
136 return report
138 report.resolved = True
139 report.resolution_method = resolution_method
140 report.resolution_notes = resolution_notes
141 report.catalog_roots = matched_roots
142 report.catalog_documents = matched_docs
143 report.catalog_server_aliases = matched_servers
145 # 4. Read ledger and indexed data from DB (if it exists)
146 if db_path.exists():
147 try:
148 conn = _connect_readonly(db_path)
149 except Exception:
150 report.warnings.append(f"Database exists but cannot be opened: {db_path}")
151 else:
152 try:
153 _read_ledger_for_project(conn, report, matched_roots)
154 _read_indexed_for_project(conn, report, matched_roots, limit)
155 _read_memory_nodes_for_project(conn, report, limit)
156 except sqlite3.DatabaseError:
157 report.warnings.append(f"Database exists but cannot be read: {db_path}")
158 finally:
159 conn.close()
160 else:
161 report.warnings.append(f"Database not found: {db_path}")
163 return report
166# ── Internal: resolution ──────────────────────────────────────────────
168def _resolve_project(
169 query: str,
170 confirmed_entries: list[dict],
171) -> tuple[
172 list[dict[str, object]],
173 list[dict[str, object]],
174 list[dict[str, object]],
175 str | None,
176 str | None,
177 list[str],
178]:
179 """Resolve *query* against confirmed catalog entries.
181 Returns (roots, docs, servers, method, notes, ambiguous_candidates).
182 """
183 query_lower = query.strip().lower()
185 # Separate entries by source_type
186 roots = [e for e in confirmed_entries if e.get("source_type") == "project_root"]
187 docs = [e for e in confirmed_entries if e.get("source_type") == "document"]
188 servers = [e for e in confirmed_entries if e.get("source_type") == "server_alias"]
190 # Phase 1: exact case-insensitive project_name match
191 name_matches = [
192 r for r in roots
193 if r.get("project_name") and str(r["project_name"]).strip().lower() == query_lower
194 ]
195 if len(name_matches) == 1:
196 root_entry = name_matches[0]
197 project_name_val = root_entry.get("project_name")
198 related_docs = _find_related_docs(root_entry, docs)
199 related_servers = _find_related_servers(root_entry, servers)
200 return (
201 _serialize_entries(name_matches),
202 _serialize_entries(related_docs),
203 _serialize_entries(related_servers),
204 "exact_name",
205 f"matched project_name='{project_name_val}'",
206 [],
207 )
208 if len(name_matches) > 1:
209 candidates = [str(r.get("project_name", r.get("path_or_alias", "?"))) for r in name_matches]
210 return [], [], [], None, None, candidates
212 # Phase 2a: exact full path/alias match
213 path_matches = []
214 query_path = _normalize_path_for_match(query)
215 for r in roots:
216 path_or_alias = str(r.get("path_or_alias", ""))
217 normalized_path = _normalize_path_for_match(path_or_alias)
218 if normalized_path and normalized_path == query_path:
219 path_matches.append(r)
221 if len(path_matches) == 1:
222 root_entry = path_matches[0]
223 related_docs = _find_related_docs(root_entry, docs)
224 related_servers = _find_related_servers(root_entry, servers)
225 return (
226 _serialize_entries(path_matches),
227 _serialize_entries(related_docs),
228 _serialize_entries(related_servers),
229 "path_alias",
230 f"matched path_or_alias='{root_entry.get('path_or_alias', '')}'",
231 [],
232 )
233 if len(path_matches) > 1:
234 candidates = [str(r.get("path_or_alias", "?")) for r in path_matches]
235 return [], [], [], None, None, candidates
237 # Phase 2b: exact basename match
238 basename_matches = []
239 for r in roots:
240 path_or_alias = str(r.get("path_or_alias", ""))
241 basename = _normalize_basename(path_or_alias)
242 if basename and basename.lower() == query_lower:
243 basename_matches.append(r)
245 if len(basename_matches) == 1:
246 root_entry = basename_matches[0]
247 related_docs = _find_related_docs(root_entry, docs)
248 related_servers = _find_related_servers(root_entry, servers)
249 return (
250 _serialize_entries(basename_matches),
251 _serialize_entries(related_docs),
252 _serialize_entries(related_servers),
253 "basename",
254 f"matched path basename='{_normalize_basename(str(root_entry.get('path_or_alias', '')))}'",
255 [],
256 )
257 if len(basename_matches) > 1:
258 candidates = [str(r.get("path_or_alias", "?")) for r in basename_matches]
259 return [], [], [], None, None, candidates
261 # Phase 3: substring fallback on project_name or path_or_alias
262 substring_matches = []
263 for r in roots:
264 project_name = str(r.get("project_name", "")).strip().lower()
265 path_or_alias = str(r.get("path_or_alias", "")).strip().lower()
266 if query_lower in project_name or query_lower in path_or_alias:
267 substring_matches.append(r)
269 if len(substring_matches) == 1:
270 root_entry = substring_matches[0]
271 related_docs = _find_related_docs(root_entry, docs)
272 related_servers = _find_related_servers(root_entry, servers)
273 return (
274 _serialize_entries(substring_matches),
275 _serialize_entries(related_docs),
276 _serialize_entries(related_servers),
277 "substring",
278 f"substring match on '{query}'",
279 [],
280 )
281 if len(substring_matches) > 1:
282 candidates = [str(r.get("path_or_alias", "?")) for r in substring_matches]
283 return [], [], [], None, None, candidates
285 # No match at all
286 return [], [], [], None, None, []
289def _normalize_basename(path_or_alias: str) -> str:
290 """Extract the basename from a path or alias."""
291 cleaned = path_or_alias.strip().replace("\\", "/").rstrip("/")
292 if not cleaned:
293 return ""
294 return cleaned.rsplit("/", 1)[-1]
297def _normalize_path_for_match(path_or_alias: str) -> str:
298 return path_or_alias.strip().replace("\\", "/").rstrip("/").lower()
301def _path_equal_or_inside(child: str, parent: str) -> bool:
302 child_norm = _normalize_path_for_match(child)
303 parent_norm = _normalize_path_for_match(parent)
304 if not child_norm or not parent_norm:
305 return False
306 return child_norm == parent_norm or child_norm.startswith(parent_norm + "/")
309def _find_related_docs(root_entry: dict, docs: list[dict]) -> list[dict]:
310 """Find document entries related to a project root by path prefix or discovered_from."""
311 root_path = str(root_entry.get("path_or_alias", ""))
312 root_id = str(root_entry.get("id", ""))
313 related: list[dict] = []
314 for doc in docs:
315 doc_path = str(doc.get("path_or_alias", ""))
316 discovered = doc.get("discovered_from", [])
317 if _path_equal_or_inside(doc_path, root_path):
318 related.append(doc)
319 elif root_id in (str(d).strip() for d in discovered):
320 related.append(doc)
321 return related
324def _find_related_servers(root_entry: dict, servers: list[dict]) -> list[dict]:
325 """Find server_alias entries related to a project root by project_name or discovered_from."""
326 project_name = str(root_entry.get("project_name", "")).strip().lower()
327 root_path = str(root_entry.get("path_or_alias", "")).lower()
328 related: list[dict] = []
329 for srv in servers:
330 discovered = srv.get("discovered_from", [])
331 discovered_str = " ".join(str(d) for d in discovered).lower()
332 if project_name and project_name in discovered_str:
333 related.append(srv)
334 elif root_path and root_path in discovered_str:
335 related.append(srv)
336 return related
339def _serialize_entries(entries: list[dict]) -> list[dict[str, object]]:
340 """Serialize catalog entries to stable dicts with citation fields."""
341 result: list[dict[str, object]] = []
342 for e in entries:
343 result.append({
344 "id": e.get("id", ""),
345 "source_type": e.get("source_type", ""),
346 "path_or_alias": e.get("path_or_alias", ""),
347 "project_name": e.get("project_name"),
348 "discovered_from": e.get("discovered_from", []),
349 "confirmation_status": e.get("confirmation_status", ""),
350 })
351 return result
354# ── Internal: DB readers ──────────────────────────────────────────────
356def _connect_readonly(db_path: Path) -> sqlite3.Connection:
357 """Open a read-only SQLite connection. Does NOT create the file or
358 parent directories."""
359 uri_path = db_path.resolve().as_posix()
360 conn = sqlite3.connect(f"file:{uri_path}?mode=ro", uri=True)
361 conn.row_factory = sqlite3.Row
362 return conn
365def _table_exists(conn: sqlite3.Connection, table_name: str) -> bool:
366 row = conn.execute(
367 "SELECT name FROM sqlite_master WHERE type='table' AND name = ?",
368 (table_name,),
369 ).fetchone()
370 return row is not None
373def _read_ledger_for_project(
374 conn: sqlite3.Connection,
375 report: ProjectContextReport,
376 matched_roots: list[dict[str, object]],
377) -> None:
378 if not _table_exists(conn, "source_ledger"):
379 report.warnings.append("source_ledger table not found in database")
380 return
382 # Build a set of source_ids from matched roots for direct lookup
383 root_ids = {str(r["id"]) for r in matched_roots if r.get("id")}
384 root_project_names = {
385 str(r["project_name"]).strip().lower()
386 for r in matched_roots
387 if r.get("project_name")
388 }
389 root_paths = {
390 str(r["path_or_alias"]).strip().lower()
391 for r in matched_roots
392 if r.get("path_or_alias")
393 }
395 # Query ledger entries that match:
396 # a) source_id is a matched root id, OR
397 # b) project_name matches, OR
398 # c) source_path_or_alias starts with a matched root path
399 all_rows = conn.execute(
400 "SELECT * FROM source_ledger ORDER BY updated_at DESC"
401 ).fetchall()
403 matching: list[dict[str, object]] = []
404 for row in all_rows:
405 sid = str(row["source_id"] or "")
406 pn = str(row["project_name"] or "").strip().lower()
407 spa = str(row["source_path_or_alias"] or "").strip()
409 # Direct match by source_id
410 if sid in root_ids:
411 matching.append(_ledger_row_to_dict(row))
412 continue
414 # Match by project_name (case-insensitive)
415 if pn and pn in root_project_names:
416 matching.append(_ledger_row_to_dict(row))
417 continue
419 # Match by path prefix
420 for rp in root_paths:
421 if _path_equal_or_inside(spa, rp):
422 matching.append(_ledger_row_to_dict(row))
423 break
425 report.ledger_entries = matching
428def _ledger_row_to_dict(row: sqlite3.Row) -> dict[str, object]:
429 return {
430 "source_id": row["source_id"],
431 "source_path_or_alias": row["source_path_or_alias"],
432 "project_name": row["project_name"],
433 "source_type": row["source_type"],
434 "parser_version": row["parser_version"],
435 "content_hash": row["content_hash"],
436 "last_modified_at": row["last_modified_at"],
437 "last_indexed_at": row["last_indexed_at"],
438 "status": row["status"],
439 "error_message": row["error_message"],
440 "chunk_count": row["chunk_count"],
441 "created_at": row["created_at"],
442 "updated_at": row["updated_at"],
443 }
446def _doc_ids_from_ledger(
447 conn: sqlite3.Connection,
448 root_project_names: set[str],
449 root_ids: set[str],
450) -> set[str]:
451 """Return document ids whose path matches a ledger entry for the project."""
452 if not _table_exists(conn, "source_ledger") or not _table_exists(conn, "documents"):
453 return set()
455 params: list[object] = []
456 clauses: list[str] = []
458 if root_project_names:
459 placeholders = ",".join("?" for _ in root_project_names)
460 clauses.append(f"LOWER(sl.project_name) IN ({placeholders})")
461 params.extend(root_project_names)
463 if root_ids:
464 placeholders = ",".join("?" for _ in root_ids)
465 clauses.append(f"sl.source_id IN ({placeholders})")
466 params.extend(root_ids)
468 if not clauses:
469 return set()
471 where = " OR ".join(clauses)
472 query = f"""
473 SELECT DISTINCT d.id
474 FROM documents d
475 JOIN source_ledger sl ON sl.source_path_or_alias = d.path
476 WHERE sl.status = 'active'
477 AND ({where})
478 """
479 rows = conn.execute(query, params).fetchall()
480 return {str(row[0]) for row in rows}
483def _read_indexed_for_project(
484 conn: sqlite3.Connection,
485 report: ProjectContextReport,
486 matched_roots: list[dict[str, object]],
487 limit: int,
488) -> None:
489 # Build project_id candidates from matched roots
490 project_ids: set[str] = set()
491 root_project_names: set[str] = set()
492 root_ids: set[str] = set()
493 for r in matched_roots:
494 pid = str(r.get("project_name", "")).strip()
495 if pid:
496 project_ids.add(pid)
497 root_project_names.add(pid.lower())
498 rid = str(r.get("id", "")).strip()
499 if rid:
500 root_ids.add(rid)
501 path_alias = str(r.get("path_or_alias", "")).strip().replace("\\", "/").rstrip("/")
502 if path_alias:
503 project_ids.add(path_alias)
504 basename = _normalize_basename(path_alias) if path_alias else ""
505 if basename:
506 project_ids.add(basename)
508 # Read documents
509 if _table_exists(conn, "documents"):
510 all_docs = conn.execute(
511 "SELECT * FROM documents ORDER BY updated_at DESC"
512 ).fetchall()
514 # Collect doc ids via path-prefix / project_id match first
515 seen_ids: set[str] = set()
516 matching_docs: list[dict[str, object]] = []
517 for row in all_docs:
518 doc_id = str(row["id"] or "")
519 pid = str(row["project_id"] or "")
520 if pid in project_ids:
521 matching_docs.append(_doc_row_to_dict(row))
522 seen_ids.add(doc_id)
523 else:
524 doc_path = str(row["path"] or "").strip().replace("\\", "/").lower()
525 for r in matched_roots:
526 rp = str(r.get("path_or_alias", "")).strip().replace("\\", "/").rstrip("/").lower()
527 if _path_equal_or_inside(doc_path, rp):
528 matching_docs.append(_doc_row_to_dict(row))
529 seen_ids.add(doc_id)
530 break
532 # Ledger-based JOIN: pick up session docs not under the repo root
533 ledger_doc_ids = _doc_ids_from_ledger(conn, root_project_names, root_ids)
534 extra_ids = ledger_doc_ids - seen_ids
535 if extra_ids:
536 id_placeholders = ",".join("?" for _ in extra_ids)
537 extra_rows = conn.execute(
538 f"SELECT * FROM documents WHERE id IN ({id_placeholders})"
539 f" ORDER BY updated_at DESC",
540 list(extra_ids),
541 ).fetchall()
542 for row in extra_rows:
543 matching_docs.append(_doc_row_to_dict(row))
544 seen_ids.add(str(row["id"]))
546 report.indexed_documents = matching_docs[:limit]
548 # Read chunks for matching documents (respect limit)
549 if _table_exists(conn, "chunks") and matching_docs:
550 doc_ids = [d["id"] for d in matching_docs[:limit]]
551 placeholders = ",".join("?" for _ in doc_ids)
552 chunk_rows = conn.execute(
553 f"SELECT * FROM chunks WHERE document_id IN ({placeholders}) "
554 f"ORDER BY chunk_index LIMIT ?",
555 (*doc_ids, limit),
556 ).fetchall()
557 report.indexed_chunks = [
558 _chunk_row_to_dict(r, limit_chars=400)
559 for r in chunk_rows
560 ]
561 else:
562 report.warnings.append("documents table not found in database")
565def _doc_row_to_dict(row: sqlite3.Row) -> dict[str, object]:
566 return {
567 "id": row["id"],
568 "project_id": row["project_id"],
569 "path": row["path"],
570 "filename": row["filename"],
571 "content_hash": row["content_hash"],
572 "last_indexed_at": row["last_indexed_at"],
573 "created_at": row["created_at"],
574 "updated_at": row["updated_at"],
575 }
578def _strip_ingestion_metadata(text: str) -> str:
579 if text.startswith("TRUENEX_INGESTION_METADATA "):
580 parts = text.split("\n\n", 2)
581 return parts[-1].strip() if len(parts) >= 2 else ""
582 return text
585def _chunk_row_to_dict(row: sqlite3.Row, limit_chars: int = 400) -> dict[str, object]:
586 content = _strip_ingestion_metadata(str(row["content"] or ""))
587 truncated = False
588 if len(content) > limit_chars:
589 content = content[:limit_chars] + "..."
590 truncated = True
591 return {
592 "id": row["id"],
593 "document_id": row["document_id"],
594 "chunk_index": row["chunk_index"],
595 "heading_path": row["heading_path"],
596 "content_excerpt": content,
597 "content_hash": row["content_hash"],
598 "token_count": row["token_count"],
599 "truncated": truncated,
600 "created_at": row["created_at"],
601 }
604def _read_memory_nodes_for_project(
605 conn: sqlite3.Connection,
606 report: ProjectContextReport,
607 limit: int,
608) -> None:
609 if not _table_exists(conn, "memory_nodes"):
610 return
611 rows = conn.execute(
612 """
613 SELECT id, title, type AS memory_type, status, confidence, content, source_path, created_at
614 FROM memory_nodes
615 WHERE project_id = 'default'
616 AND status IN ('active', 'unverified')
617 AND (confidence IS NULL OR confidence >= 0.5)
618 ORDER BY
619 CASE status WHEN 'active' THEN 0 ELSE 1 END,
620 confidence DESC,
621 created_at DESC
622 LIMIT ?
623 """,
624 (limit,),
625 ).fetchall()
626 report.memory_nodes = [
627 {
628 "id": row["id"],
629 "title": row["title"],
630 "memory_type": row["memory_type"],
631 "status": row["status"],
632 "confidence": row["confidence"],
633 "content": str(row["content"] or "")[:400],
634 "source_path": row["source_path"],
635 "created_at": row["created_at"],
636 }
637 for row in rows
638 ]
641# ── Text formatting ───────────────────────────────────────────────────
643def format_context_report(report: ProjectContextReport) -> str:
644 """Format a ProjectContextReport as concise agent-usable text."""
645 lines: list[str] = [f"Global Context: {report.project_query}"]
646 lines.append("=" * 60)
648 # Warnings first
649 if report.warnings:
650 for w in report.warnings:
651 lines.append(f"[WARNING] {w}")
652 lines.append("")
654 # Ambiguous candidates
655 if report.ambiguous_candidates:
656 lines.append(f"Ambiguous: {len(report.ambiguous_candidates)} candidates")
657 for c in report.ambiguous_candidates[:10]:
658 lines.append(f" - {c}")
659 return "\n".join(lines)
661 # Resolution info
662 if report.resolution_method:
663 lines.append(f"Resolved: {report.resolution_method} ({report.resolution_notes})")
664 lines.append("")
666 # Catalog roots
667 if report.catalog_roots:
668 lines.append("## Project Roots")
669 for r in report.catalog_roots:
670 pn = r.get("project_name")
671 pn_str = f" [{pn}]" if pn else ""
672 lines.append(f"- {r['id']} {r['path_or_alias']}{pn_str}")
673 lines.append("")
675 # Catalog documents
676 if report.catalog_documents:
677 lines.append("## Related Documents (catalog)")
678 for d in report.catalog_documents:
679 lines.append(f"- {d['id']} {d['path_or_alias']}")
680 lines.append("")
682 # Server aliases (hints only)
683 if report.catalog_server_aliases:
684 lines.append("## Server Aliases (hints, not executed)")
685 for s in report.catalog_server_aliases:
686 lines.append(f"- {s['id']} {s['path_or_alias']}")
687 lines.append("")
689 # Ledger
690 lines.append("## Ledger")
691 if not report.ledger_entries:
692 lines.append("(no ledger entries found for this project)")
693 else:
694 by_status: dict[str, int] = {}
695 for le in report.ledger_entries:
696 st = str(le.get("status", "?"))
697 by_status[st] = by_status.get(st, 0) + 1
698 status_str = " ".join(f"{k}={v}" for k, v in sorted(by_status.items()))
699 lines.append(f"{len(report.ledger_entries)} entries: {status_str}")
700 for le in report.ledger_entries[:10]:
701 err = le.get("error_message")
702 err_str = f" ({err})" if err else ""
703 lines.append(
704 f"- [{le.get('status')}] {le.get('source_type')}:"
705 f"{le.get('source_path_or_alias')} "
706 f"chunks={le.get('chunk_count')}{err_str}"
707 )
708 if len(report.ledger_entries) > 10:
709 lines.append(f" ... and {len(report.ledger_entries) - 10} more")
710 lines.append("")
712 # Indexed
713 lines.append("## Indexed")
714 lines.append(f"documents: {len(report.indexed_documents)}")
715 lines.append(f"chunks: {len(report.indexed_chunks)}")
717 if report.indexed_documents:
718 lines.append("")
719 lines.append("### Documents")
720 for d in report.indexed_documents[:10]:
721 lines.append(f"- {d['path']} (hash={d['content_hash'][:12]}...)")
723 if report.indexed_chunks:
724 lines.append("")
725 lines.append("### Chunks (excerpts)")
726 for c in report.indexed_chunks[:5]:
727 heading = c.get("heading_path") or "(no heading)"
728 excerpt = str(c.get("content_excerpt", ""))[:120].replace("\n", " ")
729 lines.append(f"- [{c['chunk_index']}] {heading}")
730 lines.append(f" {excerpt}")
732 # Memory nodes
733 if report.memory_nodes:
734 lines.append("")
735 lines.append("## Memory Nodes")
736 for mn in report.memory_nodes:
737 conf = mn.get("confidence")
738 conf_str = f" confidence={conf:.2f}" if conf is not None else ""
739 lines.append(
740 f"- [{mn.get('status')}/{mn.get('memory_type')}]{conf_str}"
741 f" {mn.get('title')}"
742 )
744 lines.append("")
745 lines.append(f"Catalog: {report.catalog_path}")
746 lines.append(f"Database: {report.db_path}")
748 return "\n".join(lines)