Coverage for src \ truenex_memory \ discovery \ source_catalog.py: 99%
123 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-19 10:21 +0200
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-19 10:21 +0200
1"""Source catalog domain model for confirmed local-private sources.
3Discovery produces candidates. The source catalog contains only confirmed
4entries with stable deterministic ids.
5"""
7from __future__ import annotations
9from dataclasses import asdict, dataclass, field
10from pathlib import Path
11import hashlib
12import json
14from truenex_memory.discovery.agent_discovery import (
15 AgentRoot,
16 CandidateDocument,
17 CandidateProject,
18 DiscoveryReport,
19 ServerAlias,
20 _split_sources,
21)
23# ── constants ─────────────────────────────────────────────────────────
25DEFAULT_CATALOG_PATH = Path.home() / ".truenex-memory" / "sources.json"
28def default_catalog_path(home: Path) -> Path:
29 """Return the default source catalog path for a user home directory."""
30 return home / ".truenex-memory" / "sources.json"
33# ── stable id ─────────────────────────────────────────────────────────
35def source_id(source_type: str, path_or_alias: str) -> str:
36 """Return a deterministic stable id from source_type + normalized path/alias.
38 Normalization: whitespace trimmed, backslashes → forward slashes,
39 lowercased, trailing slash stripped.
40 """
41 normalized = path_or_alias.strip().replace("\\", "/").lower().rstrip("/")
42 hexdigest = hashlib.sha256(normalized.encode("utf-8")).hexdigest()[:32]
43 return f"{source_type}:{hexdigest}"
46# ── catalog entry ─────────────────────────────────────────────────────
48@dataclass
49class CatalogEntry:
50 """A confirmed source entry in the source catalog."""
52 id: str
53 source_type: str # agent_root | project_root | document | server_alias
54 path_or_alias: str
55 project_name: str | None = None
56 discovered_from: list[str] = field(default_factory=list)
57 confirmation_status: str = "confirmed"
58 privacy_scope: str = "local-private"
59 confidence: float = 0.0
60 evidence_count: int = 0
63# ── candidate conversion ──────────────────────────────────────────────
65def candidate_to_entry(
66 candidate: AgentRoot | CandidateProject | CandidateDocument | ServerAlias,
67 *,
68 confirmation_status: str = "confirmed",
69) -> CatalogEntry:
70 """Convert a discovery candidate to a CatalogEntry with a stable id."""
71 if isinstance(candidate, AgentRoot):
72 return _agent_root_to_entry(candidate, confirmation_status=confirmation_status)
73 if isinstance(candidate, CandidateProject):
74 return _project_to_entry(candidate, confirmation_status=confirmation_status)
75 if isinstance(candidate, CandidateDocument):
76 return _document_to_entry(candidate, confirmation_status=confirmation_status)
77 if isinstance(candidate, ServerAlias):
78 return _server_to_entry(candidate, confirmation_status=confirmation_status)
79 raise TypeError(f"Unknown candidate type: {type(candidate).__name__}")
82def _agent_root_to_entry(root: AgentRoot, *, confirmation_status: str) -> CatalogEntry:
83 path_str = str(root.path)
84 return CatalogEntry(
85 id=source_id("agent_root", path_str),
86 source_type="agent_root",
87 path_or_alias=path_str,
88 discovered_from=[root.label],
89 confirmation_status=confirmation_status,
90 confidence=float(root.file_count) if root.exists else 0.0,
91 evidence_count=root.file_count,
92 )
95def _project_to_entry(proj: CandidateProject, *, confirmation_status: str) -> CatalogEntry:
96 project_name = _infer_project_name(proj.root)
97 return CatalogEntry(
98 id=source_id("project_root", proj.root),
99 source_type="project_root",
100 path_or_alias=proj.root,
101 project_name=project_name,
102 discovered_from=list(proj.discovered_from),
103 confirmation_status=confirmation_status,
104 confidence=proj.confidence,
105 evidence_count=len(proj.discovered_from),
106 )
109def _infer_project_name(path_or_alias: str) -> str | None:
110 cleaned = path_or_alias.strip().replace("\\", "/").rstrip("/")
111 if not cleaned:
112 return None
113 return cleaned.rsplit("/", 1)[-1] or None
116_INDEX_DOC_NAMES: frozenset[str] = frozenset({"skill.md", "readme.md", "agents.md", "claude.md"})
119def _infer_project_name_from_doc(path_str: str) -> str | None:
120 """Return the parent directory name when *path_str* names a known index document.
122 Known index document names (case-insensitive):
123 skill.md, readme.md, agents.md, claude.md
125 Returns None for all other file names, and for paths where the parent
126 is empty, ``"."``, or ``".."``.
127 """
128 cleaned = path_str.strip().replace("\\", "/")
129 if not cleaned:
130 return None
131 filename = cleaned.rsplit("/", 1)[-1]
132 if filename.lower() not in _INDEX_DOC_NAMES:
133 return None
134 if "/" not in cleaned:
135 return None
136 parent_part = cleaned.rsplit("/", 1)[0]
137 parent_name = parent_part.rsplit("/", 1)[-1] if "/" in parent_part else parent_part
138 if not parent_name or parent_name in (".", ".."):
139 return None
140 return parent_name
143def _document_to_entry(doc: CandidateDocument, *, confirmation_status: str) -> CatalogEntry:
144 return CatalogEntry(
145 id=source_id("document", doc.path),
146 source_type="document",
147 path_or_alias=doc.path,
148 project_name=_infer_project_name_from_doc(doc.path),
149 discovered_from=list(doc.discovered_from),
150 confirmation_status=confirmation_status,
151 confidence=doc.confidence,
152 evidence_count=len(doc.discovered_from),
153 )
156def _server_to_entry(srv: ServerAlias, *, confirmation_status: str) -> CatalogEntry:
157 sources = _split_sources(srv.source)
158 return CatalogEntry(
159 id=source_id("server_alias", srv.alias),
160 source_type="server_alias",
161 path_or_alias=srv.alias,
162 discovered_from=sources,
163 confirmation_status=confirmation_status,
164 confidence=srv.confidence,
165 evidence_count=len(sources),
166 )
169def report_to_entries(
170 report: DiscoveryReport,
171 limit: int | None = None,
172 *,
173 confirmation_status: str = "confirmed",
174) -> list[CatalogEntry]:
175 """Convert a DiscoveryReport to a list of CatalogEntry, respecting a
176 per-section limit when provided.
178 Only existing agent roots are included. Each candidate section
179 (projects, documents, servers) is limited independently so that a
180 single noisy section cannot crowd out the others.
181 """
182 entries: list[CatalogEntry] = []
184 for root in report.agent_roots:
185 if root.exists:
186 entries.append(candidate_to_entry(root, confirmation_status=confirmation_status))
188 proj_candidates = report.projects[:limit] if limit is not None else report.projects
189 for proj in proj_candidates:
190 entries.append(candidate_to_entry(proj, confirmation_status=confirmation_status))
192 doc_candidates = report.documents[:limit] if limit is not None else report.documents
193 for doc in doc_candidates:
194 entries.append(candidate_to_entry(doc, confirmation_status=confirmation_status))
196 srv_candidates = report.servers[:limit] if limit is not None else report.servers
197 for srv in srv_candidates:
198 entries.append(candidate_to_entry(srv, confirmation_status=confirmation_status))
200 return entries
203# ── catalog persistence ───────────────────────────────────────────────
205@dataclass
206class SourceCatalog:
207 """A collection of confirmed source catalog entries."""
209 entries: list[CatalogEntry] = field(default_factory=list)
210 version: str = "1"
212 def save(self, path: Path) -> None:
213 """Write the catalog to *path* as JSON, creating parent dirs as needed."""
214 path.parent.mkdir(parents=True, exist_ok=True)
215 data: dict[str, object] = {
216 "version": self.version,
217 "entries": [asdict(entry) for entry in self.entries],
218 }
219 path.write_text(
220 json.dumps(data, indent=2, sort_keys=True),
221 encoding="utf-8",
222 )
224 @classmethod
225 def load(cls, path: Path) -> SourceCatalog:
226 """Load a catalog from *path*. Returns an empty catalog when the
227 file does not exist."""
228 if not path.exists():
229 return cls()
230 data = json.loads(path.read_text(encoding="utf-8"))
231 entries = [
232 CatalogEntry(**entry)
233 for entry in data.get("entries", [])
234 ]
235 version = str(data.get("version", "1"))
236 return cls(entries=entries, version=version)
238 def upsert_entry(self, entry: CatalogEntry) -> tuple[str, CatalogEntry]:
239 """Add or replace an entry by stable id.
241 Returns ``("added", entry)`` when the id is new, or
242 ``("updated", entry)`` when an existing entry was replaced.
243 All other entries are preserved unchanged.
244 """
245 for i, existing in enumerate(self.entries):
246 if existing.id == entry.id:
247 self.entries[i] = entry
248 return ("updated", entry)
249 self.entries.append(entry)
250 return ("added", entry)
253# ── formatting ────────────────────────────────────────────────────────
255def format_entries(entries: list[CatalogEntry]) -> str:
256 """Format a list of CatalogEntry as a human-readable markdown string."""
257 lines: list[str] = ["# Source Catalog Candidates (review only, not written)"]
259 by_type: dict[str, list[CatalogEntry]] = {}
260 type_order = ("agent_root", "project_root", "document", "server_alias")
261 for entry in entries:
262 by_type.setdefault(entry.source_type, []).append(entry)
264 for source_type in type_order:
265 items = by_type.get(source_type, [])
266 lines.append(f"\n## {source_type} ({len(items)})")
267 if items:
268 for item in items:
269 extra = f" [{item.project_name}]" if item.project_name else ""
270 lines.append(
271 f"- {item.path_or_alias}{extra} "
272 f"(conf={item.confidence:.1f}, evidence={item.evidence_count}, "
273 f"from: {', '.join(item.discovered_from)})"
274 )
275 else:
276 lines.append("- (none)")
278 total = len(entries)
279 lines.append(f"\n## Summary")
280 lines.append(f"- Total entries: {total}")
281 return "\n".join(lines)
284def entries_to_dict(entries: list[CatalogEntry]) -> dict[str, object]:
285 """Serialize a list of CatalogEntry to a JSON-friendly dict."""
286 return {
287 "version": "1",
288 "entries": [asdict(entry) for entry in entries],
289 }