Coverage for src \ truenex_memory \ discovery \ source_catalog.py: 99%

123 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-19 10:21 +0200

1"""Source catalog domain model for confirmed local-private sources. 

2 

3Discovery produces candidates. The source catalog contains only confirmed 

4entries with stable deterministic ids. 

5""" 

6 

7from __future__ import annotations 

8 

9from dataclasses import asdict, dataclass, field 

10from pathlib import Path 

11import hashlib 

12import json 

13 

14from truenex_memory.discovery.agent_discovery import ( 

15 AgentRoot, 

16 CandidateDocument, 

17 CandidateProject, 

18 DiscoveryReport, 

19 ServerAlias, 

20 _split_sources, 

21) 

22 

23# ── constants ───────────────────────────────────────────────────────── 

24 

25DEFAULT_CATALOG_PATH = Path.home() / ".truenex-memory" / "sources.json" 

26 

27 

28def default_catalog_path(home: Path) -> Path: 

29 """Return the default source catalog path for a user home directory.""" 

30 return home / ".truenex-memory" / "sources.json" 

31 

32 

33# ── stable id ───────────────────────────────────────────────────────── 

34 

35def source_id(source_type: str, path_or_alias: str) -> str: 

36 """Return a deterministic stable id from source_type + normalized path/alias. 

37 

38 Normalization: whitespace trimmed, backslashes → forward slashes, 

39 lowercased, trailing slash stripped. 

40 """ 

41 normalized = path_or_alias.strip().replace("\\", "/").lower().rstrip("/") 

42 hexdigest = hashlib.sha256(normalized.encode("utf-8")).hexdigest()[:32] 

43 return f"{source_type}:{hexdigest}" 

44 

45 

46# ── catalog entry ───────────────────────────────────────────────────── 

47 

48@dataclass 

49class CatalogEntry: 

50 """A confirmed source entry in the source catalog.""" 

51 

52 id: str 

53 source_type: str # agent_root | project_root | document | server_alias 

54 path_or_alias: str 

55 project_name: str | None = None 

56 discovered_from: list[str] = field(default_factory=list) 

57 confirmation_status: str = "confirmed" 

58 privacy_scope: str = "local-private" 

59 confidence: float = 0.0 

60 evidence_count: int = 0 

61 

62 

63# ── candidate conversion ────────────────────────────────────────────── 

64 

65def candidate_to_entry( 

66 candidate: AgentRoot | CandidateProject | CandidateDocument | ServerAlias, 

67 *, 

68 confirmation_status: str = "confirmed", 

69) -> CatalogEntry: 

70 """Convert a discovery candidate to a CatalogEntry with a stable id.""" 

71 if isinstance(candidate, AgentRoot): 

72 return _agent_root_to_entry(candidate, confirmation_status=confirmation_status) 

73 if isinstance(candidate, CandidateProject): 

74 return _project_to_entry(candidate, confirmation_status=confirmation_status) 

75 if isinstance(candidate, CandidateDocument): 

76 return _document_to_entry(candidate, confirmation_status=confirmation_status) 

77 if isinstance(candidate, ServerAlias): 

78 return _server_to_entry(candidate, confirmation_status=confirmation_status) 

79 raise TypeError(f"Unknown candidate type: {type(candidate).__name__}") 

80 

81 

82def _agent_root_to_entry(root: AgentRoot, *, confirmation_status: str) -> CatalogEntry: 

83 path_str = str(root.path) 

84 return CatalogEntry( 

85 id=source_id("agent_root", path_str), 

86 source_type="agent_root", 

87 path_or_alias=path_str, 

88 discovered_from=[root.label], 

89 confirmation_status=confirmation_status, 

90 confidence=float(root.file_count) if root.exists else 0.0, 

91 evidence_count=root.file_count, 

92 ) 

93 

94 

95def _project_to_entry(proj: CandidateProject, *, confirmation_status: str) -> CatalogEntry: 

96 project_name = _infer_project_name(proj.root) 

97 return CatalogEntry( 

98 id=source_id("project_root", proj.root), 

99 source_type="project_root", 

100 path_or_alias=proj.root, 

101 project_name=project_name, 

102 discovered_from=list(proj.discovered_from), 

103 confirmation_status=confirmation_status, 

104 confidence=proj.confidence, 

105 evidence_count=len(proj.discovered_from), 

106 ) 

107 

108 

109def _infer_project_name(path_or_alias: str) -> str | None: 

110 cleaned = path_or_alias.strip().replace("\\", "/").rstrip("/") 

111 if not cleaned: 

112 return None 

113 return cleaned.rsplit("/", 1)[-1] or None 

114 

115 

116_INDEX_DOC_NAMES: frozenset[str] = frozenset({"skill.md", "readme.md", "agents.md", "claude.md"}) 

117 

118 

119def _infer_project_name_from_doc(path_str: str) -> str | None: 

120 """Return the parent directory name when *path_str* names a known index document. 

121 

122 Known index document names (case-insensitive): 

123 skill.md, readme.md, agents.md, claude.md 

124 

125 Returns None for all other file names, and for paths where the parent 

126 is empty, ``"."``, or ``".."``. 

127 """ 

128 cleaned = path_str.strip().replace("\\", "/") 

129 if not cleaned: 

130 return None 

131 filename = cleaned.rsplit("/", 1)[-1] 

132 if filename.lower() not in _INDEX_DOC_NAMES: 

133 return None 

134 if "/" not in cleaned: 

135 return None 

136 parent_part = cleaned.rsplit("/", 1)[0] 

137 parent_name = parent_part.rsplit("/", 1)[-1] if "/" in parent_part else parent_part 

138 if not parent_name or parent_name in (".", ".."): 

139 return None 

140 return parent_name 

141 

142 

143def _document_to_entry(doc: CandidateDocument, *, confirmation_status: str) -> CatalogEntry: 

144 return CatalogEntry( 

145 id=source_id("document", doc.path), 

146 source_type="document", 

147 path_or_alias=doc.path, 

148 project_name=_infer_project_name_from_doc(doc.path), 

149 discovered_from=list(doc.discovered_from), 

150 confirmation_status=confirmation_status, 

151 confidence=doc.confidence, 

152 evidence_count=len(doc.discovered_from), 

153 ) 

154 

155 

156def _server_to_entry(srv: ServerAlias, *, confirmation_status: str) -> CatalogEntry: 

157 sources = _split_sources(srv.source) 

158 return CatalogEntry( 

159 id=source_id("server_alias", srv.alias), 

160 source_type="server_alias", 

161 path_or_alias=srv.alias, 

162 discovered_from=sources, 

163 confirmation_status=confirmation_status, 

164 confidence=srv.confidence, 

165 evidence_count=len(sources), 

166 ) 

167 

168 

169def report_to_entries( 

170 report: DiscoveryReport, 

171 limit: int | None = None, 

172 *, 

173 confirmation_status: str = "confirmed", 

174) -> list[CatalogEntry]: 

175 """Convert a DiscoveryReport to a list of CatalogEntry, respecting a 

176 per-section limit when provided. 

177 

178 Only existing agent roots are included. Each candidate section 

179 (projects, documents, servers) is limited independently so that a 

180 single noisy section cannot crowd out the others. 

181 """ 

182 entries: list[CatalogEntry] = [] 

183 

184 for root in report.agent_roots: 

185 if root.exists: 

186 entries.append(candidate_to_entry(root, confirmation_status=confirmation_status)) 

187 

188 proj_candidates = report.projects[:limit] if limit is not None else report.projects 

189 for proj in proj_candidates: 

190 entries.append(candidate_to_entry(proj, confirmation_status=confirmation_status)) 

191 

192 doc_candidates = report.documents[:limit] if limit is not None else report.documents 

193 for doc in doc_candidates: 

194 entries.append(candidate_to_entry(doc, confirmation_status=confirmation_status)) 

195 

196 srv_candidates = report.servers[:limit] if limit is not None else report.servers 

197 for srv in srv_candidates: 

198 entries.append(candidate_to_entry(srv, confirmation_status=confirmation_status)) 

199 

200 return entries 

201 

202 

203# ── catalog persistence ─────────────────────────────────────────────── 

204 

205@dataclass 

206class SourceCatalog: 

207 """A collection of confirmed source catalog entries.""" 

208 

209 entries: list[CatalogEntry] = field(default_factory=list) 

210 version: str = "1" 

211 

212 def save(self, path: Path) -> None: 

213 """Write the catalog to *path* as JSON, creating parent dirs as needed.""" 

214 path.parent.mkdir(parents=True, exist_ok=True) 

215 data: dict[str, object] = { 

216 "version": self.version, 

217 "entries": [asdict(entry) for entry in self.entries], 

218 } 

219 path.write_text( 

220 json.dumps(data, indent=2, sort_keys=True), 

221 encoding="utf-8", 

222 ) 

223 

224 @classmethod 

225 def load(cls, path: Path) -> SourceCatalog: 

226 """Load a catalog from *path*. Returns an empty catalog when the 

227 file does not exist.""" 

228 if not path.exists(): 

229 return cls() 

230 data = json.loads(path.read_text(encoding="utf-8")) 

231 entries = [ 

232 CatalogEntry(**entry) 

233 for entry in data.get("entries", []) 

234 ] 

235 version = str(data.get("version", "1")) 

236 return cls(entries=entries, version=version) 

237 

238 def upsert_entry(self, entry: CatalogEntry) -> tuple[str, CatalogEntry]: 

239 """Add or replace an entry by stable id. 

240 

241 Returns ``("added", entry)`` when the id is new, or 

242 ``("updated", entry)`` when an existing entry was replaced. 

243 All other entries are preserved unchanged. 

244 """ 

245 for i, existing in enumerate(self.entries): 

246 if existing.id == entry.id: 

247 self.entries[i] = entry 

248 return ("updated", entry) 

249 self.entries.append(entry) 

250 return ("added", entry) 

251 

252 

253# ── formatting ──────────────────────────────────────────────────────── 

254 

255def format_entries(entries: list[CatalogEntry]) -> str: 

256 """Format a list of CatalogEntry as a human-readable markdown string.""" 

257 lines: list[str] = ["# Source Catalog Candidates (review only, not written)"] 

258 

259 by_type: dict[str, list[CatalogEntry]] = {} 

260 type_order = ("agent_root", "project_root", "document", "server_alias") 

261 for entry in entries: 

262 by_type.setdefault(entry.source_type, []).append(entry) 

263 

264 for source_type in type_order: 

265 items = by_type.get(source_type, []) 

266 lines.append(f"\n## {source_type} ({len(items)})") 

267 if items: 

268 for item in items: 

269 extra = f" [{item.project_name}]" if item.project_name else "" 

270 lines.append( 

271 f"- {item.path_or_alias}{extra} " 

272 f"(conf={item.confidence:.1f}, evidence={item.evidence_count}, " 

273 f"from: {', '.join(item.discovered_from)})" 

274 ) 

275 else: 

276 lines.append("- (none)") 

277 

278 total = len(entries) 

279 lines.append(f"\n## Summary") 

280 lines.append(f"- Total entries: {total}") 

281 return "\n".join(lines) 

282 

283 

284def entries_to_dict(entries: list[CatalogEntry]) -> dict[str, object]: 

285 """Serialize a list of CatalogEntry to a JSON-friendly dict.""" 

286 return { 

287 "version": "1", 

288 "entries": [asdict(entry) for entry in entries], 

289 }