Coverage for src \ truenex_memory \ discovery \ agent_discovery.py: 88%

590 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-19 10:21 +0200

1"""Agent discovery: scan local Codex/Claude roots for projects, docs, servers. 

2 

3Does NOT scan the whole PC. Only looks under agent client directories. 

4Does NOT mutate the memory database. Discovery only. 

5""" 

6 

7from __future__ import annotations 

8 

9from dataclasses import dataclass, field 

10from pathlib import Path 

11import json 

12import re 

13 

14# ── data model ──────────────────────────────────────────────────────── 

15 

16@dataclass 

17class AgentRoot: 

18 """A discovered agent client directory.""" 

19 label: str # e.g. "codex-sessions", "claude-projects" 

20 path: Path 

21 exists: bool 

22 file_count: int = 0 

23 warnings: list[str] = field(default_factory=list) 

24 

25 

26@dataclass 

27class CandidateProject: 

28 """A candidate project path discovered from agent data.""" 

29 root: str 

30 discovered_from: list[str] = field(default_factory=list) # agent root labels 

31 confidence: float = 0.0 

32 

33 

34@dataclass 

35class CandidateDocument: 

36 """A candidate document path discovered from agent data.""" 

37 path: str 

38 discovered_from: list[str] = field(default_factory=list) 

39 confidence: float = 0.0 

40 

41 

42@dataclass 

43class ServerAlias: 

44 """An SSH/server alias discovered from agent data.""" 

45 alias: str 

46 source: str = "agent-history" # agent root label(s), comma-separated after merge 

47 confidence: float = 0.0 

48 

49 

50@dataclass 

51class DiscoveryReport: 

52 """Full discovery report with sections and counts.""" 

53 agent_roots: list[AgentRoot] = field(default_factory=list) 

54 projects: list[CandidateProject] = field(default_factory=list) 

55 documents: list[CandidateDocument] = field(default_factory=list) 

56 servers: list[ServerAlias] = field(default_factory=list) 

57 warnings: list[str] = field(default_factory=list) 

58 

59 @property 

60 def project_count(self) -> int: 

61 return len(self.projects) 

62 

63 @property 

64 def document_count(self) -> int: 

65 return len(self.documents) 

66 

67 @property 

68 def server_count(self) -> int: 

69 return len(self.servers) 

70 

71 @property 

72 def warning_count(self) -> int: 

73 return len(self.warnings) 

74 

75 def to_dict(self) -> dict[str, object]: 

76 return { 

77 "agent_roots": [ 

78 { 

79 "label": r.label, 

80 "path": str(r.path), 

81 "exists": r.exists, 

82 "file_count": r.file_count, 

83 "warnings": r.warnings, 

84 } 

85 for r in self.agent_roots 

86 ], 

87 "projects": [ 

88 { 

89 "root": p.root, 

90 "discovered_from": p.discovered_from, 

91 "evidence_count": len(p.discovered_from), 

92 "confidence": p.confidence, 

93 } 

94 for p in self.projects 

95 ], 

96 "documents": [ 

97 { 

98 "path": d.path, 

99 "discovered_from": d.discovered_from, 

100 "evidence_count": len(d.discovered_from), 

101 "confidence": d.confidence, 

102 } 

103 for d in self.documents 

104 ], 

105 "servers": [ 

106 { 

107 "alias": s.alias, 

108 "source": s.source, 

109 "evidence_count": len(_split_sources(s.source)), 

110 "confidence": s.confidence, 

111 } 

112 for s in self.servers 

113 ], 

114 "warnings": self.warnings, 

115 } 

116 

117 

118# ── agent root layout ───────────────────────────────────────────────── 

119 

120AGENT_ROOTS = [ 

121 ("codex-sessions", ".codex", "sessions"), 

122 ("codex-history", ".codex", "history.jsonl"), 

123 ("codex-memories", ".codex", "memories"), 

124 ("claude-projects", ".claude", "projects"), 

125 ("claude-commands", ".claude", "commands"), 

126 ("claude-history", ".claude", "history.jsonl"), 

127 ("claude-skills", ".claude", "skills"), 

128] 

129 

130DOC_EXTENSIONS = frozenset({".md", ".txt", ".json", ".yaml", ".yml", ".toml"}) 

131MAX_FILE_READ_CHARS = 200_000 

132MAX_TEXT_CHARS = 20_000 

133MAX_TEXTS_PER_JSONL_FILE = 500 

134MAX_PATH_MATCHES_PER_TEXT = 250 

135MAX_DOC_MATCHES_PER_TEXT = 250 

136RELEVANT_JSON_KEYS = frozenset( 

137 { 

138 "content", 

139 "cwd", 

140 "message", 

141 "path", 

142 "root", 

143 "source_path", 

144 "summary", 

145 "text", 

146 } 

147) 

148 

149# Regex patterns 

150_RE_ABS_PATH_WIN = re.compile(r'[A-Za-z]:[\\/][^\s"\'<>|*?]+') 

151_RE_ABS_PATH_UNIX = re.compile(r'(?:^|\s)/(?:[^\s"\'*?|]+/)+[^\s"\'*?|]*') 

152_RE_SSH_ALIAS = re.compile(r'\bssh\s+(?:root@)?([\w][\w.-]*)\b', re.IGNORECASE) 

153_RE_SSH_ROOT_AT = re.compile(r'\bssh\s+root@(\S+)\b', re.IGNORECASE) 

154_RE_SSH_USER_AT = re.compile(r'\bssh\s+(\w+)@(\S+)\b', re.IGNORECASE) 

155_DOC_EXTENSION_HINTS = tuple(sorted(DOC_EXTENSIONS)) 

156_DOC_PATH_RE = re.compile( 

157 r'[^\s"\'<>|]*\.(?:md|txt|json|ya?ml|toml)', 

158 re.IGNORECASE, 

159) 

160_UNIX_PROJECT_PREFIXES = ( 

161 "/home/", 

162 "/mnt/", 

163 "/opt/", 

164 "/root/", 

165 "/srv/", 

166 "/users/", 

167 "/var/www/", 

168 "/workspace/", 

169) 

170_PROJECT_PATH_REJECT_CHARS = frozenset("{}<>[]`") 

171EXCLUDED_PROJECT_SEGMENTS = frozenset( 

172 { 

173 ".agents", 

174 ".claude", 

175 ".codex", 

176 ".git", 

177 ".mypy_cache", 

178 ".pytest_cache", 

179 ".tox", 

180 ".venv", 

181 "__pycache__", 

182 "node_modules", 

183 "site-packages", 

184 "venv", 

185 } 

186) 

187WINDOWS_PROJECT_ANCHORS = frozenset({"projectpy", "software", "sofware"}) 

188WINDOWS_USER_PROJECT_ANCHORS = frozenset({"documents", "documenti", "projects", "repos"}) 

189WINDOWS_PROJECT_STOP_SEGMENTS = frozenset({"docs", "src", "tests", "memory", "diary"}) 

190WINDOWS_NON_PROJECT_SEGMENTS = frozenset( 

191 { 

192 ".cursor", 

193 ".ssh", 

194 "appdata", 

195 "codex_tmp", 

196 "downloads", 

197 "system32", 

198 "tmp", 

199 "windows", 

200 } 

201) 

202CANONICAL_RELATIVE_DOCS = frozenset({"agents.md", "claude.md", "readme.md"}) 

203EXCLUDED_DOCUMENT_PATH_FRAGMENTS = ( 

204 ".agent/", 

205 ".agents/skills/", 

206 ".claude/shell-snapshots/", 

207 ".codex/skills/", 

208 "/.agent/", 

209 "/.agents/skills/", 

210 "/.claude/shell-snapshots/", 

211 "/.codex/skills/", 

212) 

213 

214# Common English words that can follow "SSH" in prose but aren't aliases 

215_SSH_NOISE_WORDS = frozenset({ 

216 "to", "the", "a", "an", "is", "it", "of", "in", "on", "at", "and", 

217 "or", "not", "no", "with", "for", "from", "by", "as", "be", "we", 

218 "key", "agent", "add", "config", "user", "root", "host", "server", 

219 "connection", "using", "via", "into", "references", 

220 "double-quoted", "read-only", 

221}) 

222 

223 

224# ── discovery ───────────────────────────────────────────────────────── 

225 

226def _is_known_agent_text_preamble(text: str) -> bool: 

227 """Skip internal agent instructions that aren't real user paths.""" 

228 return any( 

229 text.startswith(prefix) 

230 for prefix in ( 

231 "<environment", 

232 "<system", 

233 "<developer", 

234 "<instructions", 

235 "<agent", 

236 "<turn_aborted", 

237 ) 

238 ) 

239 

240 

241def _extract_strings(obj: object) -> list[str]: 

242 """Recursively extract all string values from a parsed JSON object.""" 

243 strings: list[str] = [] 

244 if isinstance(obj, str): 

245 strings.append(obj) 

246 elif isinstance(obj, dict): 

247 for v in obj.values(): 

248 strings.extend(_extract_strings(v)) 

249 elif isinstance(obj, list): 

250 for item in obj: 

251 strings.extend(_extract_strings(item)) 

252 return strings 

253 

254 

255def _extract_relevant_strings(obj: object, *, parent_key: str = "") -> list[str]: 

256 """Extract only likely human/project strings from a parsed JSON object.""" 

257 strings: list[str] = [] 

258 if isinstance(obj, str): 

259 if parent_key in RELEVANT_JSON_KEYS: 

260 strings.append(_trim_text(obj)) 

261 elif isinstance(obj, dict): 

262 for key, value in obj.items(): 

263 lower_key = str(key).lower() 

264 if lower_key in ("tool_call", "tool_calls", "tool_result", "tool_use", "input"): 

265 continue 

266 if lower_key in RELEVANT_JSON_KEYS: 

267 strings.extend(_extract_relevant_strings(value, parent_key=lower_key)) 

268 elif isinstance(value, (dict, list)): 

269 strings.extend(_extract_relevant_strings(value, parent_key=lower_key)) 

270 elif isinstance(obj, list): 

271 for item in obj: 

272 strings.extend(_extract_relevant_strings(item, parent_key=parent_key)) 

273 return [text for text in strings if text] 

274 

275 

276def _trim_text(text: str) -> str: 

277 stripped = text.strip() 

278 if len(stripped) <= MAX_TEXT_CHARS: 

279 return stripped 

280 return stripped[:MAX_TEXT_CHARS] 

281 

282 

283def _bounded_read_text(file_path: Path) -> tuple[str, bool]: 

284 """Read at most *MAX_FILE_READ_CHARS* from *file_path*. 

285 

286 Returns ``(text, was_truncated)``. *was_truncated* is ``True`` when the 

287 file contains more data beyond the read limit. 

288 """ 

289 text_parts: list[str] = [] 

290 total_chars = 0 

291 truncated = False 

292 try: 

293 with open(file_path, encoding="utf-8", errors="replace") as f: 

294 while True: 

295 chunk = f.read(8192) 

296 if not chunk: 

297 break 

298 text_parts.append(chunk) 

299 total_chars += len(chunk) 

300 if total_chars >= MAX_FILE_READ_CHARS: 

301 extra = f.read(1) 

302 truncated = bool(extra) 

303 break 

304 full_text = "".join(text_parts) 

305 if truncated: 

306 full_text = full_text[:MAX_FILE_READ_CHARS] 

307 return (full_text, truncated) 

308 except OSError: 

309 return ("", False) 

310 

311 

312def _extract_text_from_jsonl(file_path: Path) -> tuple[list[str], bool]: 

313 """Stream and parse a JSONL file line-by-line. 

314 

315 Returns ``(texts, was_truncated)``. Never reads the whole file into 

316 memory at once. *was_truncated* is ``True`` when the file had more 

317 data after *MAX_TEXTS_PER_JSONL_FILE* texts were collected. 

318 """ 

319 texts: list[str] = [] 

320 truncated = False 

321 try: 

322 f = open(file_path, encoding="utf-8", errors="replace") 

323 except OSError: 

324 return (texts, False) 

325 

326 with f: 

327 for line in f: 

328 if len(texts) >= MAX_TEXTS_PER_JSONL_FILE: 

329 try: 

330 next(f) 

331 truncated = True 

332 except StopIteration: 

333 pass 

334 break 

335 

336 stripped = line.strip() 

337 if not stripped: 

338 continue 

339 try: 

340 obj = json.loads(stripped) 

341 except json.JSONDecodeError: 

342 continue 

343 if not isinstance(obj, dict): 

344 continue 

345 

346 # Skip developer/system instructions in older flattened schemas. 

347 role = str(obj.get("role", "")).lower() 

348 msg_type = str(obj.get("type", "")).lower() 

349 if role == "developer" or role == "system": 

350 continue 

351 if msg_type == "system": 

352 continue 

353 

354 payload = obj.get("payload") 

355 if isinstance(payload, dict): 

356 payload_role = str(payload.get("role", "")).lower() 

357 if payload_role in ("developer", "system"): 

358 continue 

359 

360 # Extract cwd from session_meta payloads (Codex-style). 

361 cwd = payload.get("cwd") 

362 if isinstance(cwd, str) and cwd.strip(): 

363 texts.append(_trim_text(cwd)) 

364 

365 if msg_type == "response_item" and str(payload.get("type", "")).lower() == "message": 

366 content_text = _extract_message_content_text(payload.get("content")) 

367 if content_text: 

368 texts.append(content_text) 

369 continue 

370 

371 if msg_type == "event_msg" and str(payload.get("type", "")).lower() == "user_message": 

372 message = payload.get("message") 

373 if isinstance(message, str) and message.strip(): 

374 texts.append(_trim_text(message)) 

375 continue 

376 

377 message = obj.get("message") 

378 if isinstance(message, dict): 

379 message_role = str(message.get("role", "")).lower() 

380 if message_role in ("developer", "system"): 

381 continue 

382 content_text = _extract_message_content_text(message.get("content")) 

383 if content_text: 

384 texts.append(content_text) 

385 continue 

386 

387 # Fallback for simple JSONL history entries. This is intentionally 

388 # selective so tool payloads are not treated as prose. 

389 texts.extend(_extract_relevant_strings(obj)) 

390 if len(texts) >= MAX_TEXTS_PER_JSONL_FILE: 

391 texts = texts[:MAX_TEXTS_PER_JSONL_FILE] 

392 try: 

393 next(f) 

394 truncated = True 

395 except StopIteration: 

396 pass 

397 break 

398 

399 return (texts, truncated) 

400 

401 

402def _extract_message_content_text(content: object) -> str: 

403 """Extract human prose from a message content field.""" 

404 if isinstance(content, str): 

405 return _trim_text(content) 

406 if not isinstance(content, list): 

407 return "" 

408 

409 parts: list[str] = [] 

410 for block in content: 

411 if not isinstance(block, dict): 

412 continue 

413 block_type = str(block.get("type", "")).lower() 

414 if block_type in ("tool_use", "tool_call", "tool_result", "tool"): 

415 continue 

416 text = block.get("text") 

417 if text is None: 

418 text = block.get("content") 

419 if isinstance(text, str) and text.strip(): 

420 parts.append(_trim_text(text)) 

421 return _trim_text("\n\n".join(parts)) 

422 

423 

424def _find_paths_in_text(text: str) -> list[str]: 

425 """Find candidate absolute paths in a text string.""" 

426 found: list[str] = [] 

427 for match in _RE_ABS_PATH_WIN.finditer(text): 

428 p = _clean_candidate_path(match.group(0)) 

429 if len(p) >= 3 and any(c in p for c in "/\\"): 

430 found.append(p) 

431 if len(found) >= MAX_PATH_MATCHES_PER_TEXT: 

432 return found 

433 for match in _RE_ABS_PATH_UNIX.finditer(text): 

434 p = _clean_candidate_path(match.group(0)) 

435 if len(p) >= 3 and p.count("/") >= 1: 

436 found.append(p) 

437 if len(found) >= MAX_PATH_MATCHES_PER_TEXT: 

438 return found 

439 return found 

440 

441 

442def _clean_candidate_path(path: str) -> str: 

443 return path.strip().strip("`[](){}<>").rstrip(".,;:!?\"'") 

444 

445 

446def _looks_like_project_root_path(path: str) -> bool: 

447 """Return whether a non-existing path looks like a project/root path. 

448 

449 Discovery sees a lot of API routes and code snippets in agent sessions. They 

450 are useful context, but they are not project source roots. 

451 """ 

452 cleaned = path.strip() 

453 if any(char in cleaned for char in _PROJECT_PATH_REJECT_CHARS): 

454 return False 

455 if Path(cleaned).suffix: 

456 return False 

457 normalized = cleaned.replace("\\", "/") 

458 lower = normalized.lower() 

459 if re.match(r"^[a-z]:/", lower): 

460 return len(Path(cleaned).parts) >= 3 

461 parts = [part for part in lower.split("/") if part] 

462 if lower.startswith("/home/") or lower.startswith("/users/"): 

463 return len(parts) >= 3 

464 if lower.startswith("/opt/") or lower.startswith("/srv/") or lower.startswith("/workspace/"): 

465 return len(parts) >= 2 

466 if lower.startswith("/var/www/"): 

467 return len(parts) >= 3 

468 if lower.startswith("/mnt/"): 

469 return len(parts) >= 4 

470 return False 

471 

472 

473def _project_root_from_path(path: str) -> str | None: 

474 """Infer a useful project/root directory from a discovered path.""" 

475 cleaned = _clean_candidate_path(path) 

476 if not cleaned or any(char in cleaned for char in _PROJECT_PATH_REJECT_CHARS): 

477 return None 

478 if len(re.findall(r"[A-Za-z]:", cleaned)) > 1: 

479 return None 

480 

481 normalized = cleaned.replace("\\", "/") 

482 parts = [part for part in normalized.split("/") if part] 

483 lowered_parts = [part.lower() for part in parts] 

484 if lowered_parts and lowered_parts[0] in ("a:", "b:"): 

485 return None 

486 if any(part in {".agents", ".claude", ".codex"} for part in lowered_parts): 

487 return None 

488 if any(part in WINDOWS_NON_PROJECT_SEGMENTS for part in lowered_parts): 

489 return None 

490 if any(part in EXCLUDED_PROJECT_SEGMENTS for part in lowered_parts): 

491 # Keep the parent project if the excluded segment appears under it. 

492 first_excluded = next( 

493 index for index, part in enumerate(lowered_parts) 

494 if part in EXCLUDED_PROJECT_SEGMENTS 

495 ) 

496 parts = parts[:first_excluded] 

497 lowered_parts = lowered_parts[:first_excluded] 

498 if not parts: 

499 return None 

500 

501 if re.match(r"^[a-z]:", normalized.lower()): 

502 if lowered_parts[0] not in ("c:", "d:"): 

503 return None 

504 for index, part in enumerate(lowered_parts): 

505 if part in WINDOWS_PROJECT_ANCHORS and index + 1 < len(parts): 

506 return _join_windows_parts(parts[: index + 2]) 

507 if part in WINDOWS_USER_PROJECT_ANCHORS and index + 1 < len(parts): 

508 return _join_windows_parts(parts[: index + 2]) 

509 for index, part in enumerate(lowered_parts): 

510 if part in WINDOWS_PROJECT_STOP_SEGMENTS and index >= 1: 

511 return _join_windows_parts(parts[:index]) 

512 return None 

513 

514 lower = normalized.lower() 

515 if lower.startswith("/home/") or lower.startswith("/users/"): 

516 if len(parts) >= 3: 

517 return "/" + "/".join(parts[:3]) 

518 if lower.startswith("/opt/") or lower.startswith("/srv/") or lower.startswith("/workspace/"): 

519 if len(parts) >= 2: 

520 return "/" + "/".join(parts[:2]) 

521 if lower.startswith("/var/www/"): 

522 if len(parts) >= 3: 

523 return "/" + "/".join(parts[:3]) 

524 if lower.startswith("/mnt/") and len(parts) >= 4: 

525 return "/" + "/".join(parts[:4]) 

526 return cleaned if _looks_like_project_root_path(cleaned) else None 

527 

528 

529def _join_windows_parts(parts: list[str]) -> str: 

530 if not parts: 

531 return "" 

532 first = parts[0] 

533 if first.endswith(":"): 

534 return first + "\\" + "\\".join(parts[1:]) 

535 return "\\".join(parts) 

536 

537 

538def _find_ssh_aliases(text: str) -> list[str]: 

539 """Find SSH aliases and hosts in a text string.""" 

540 aliases: list[str] = [] 

541 

542 # ssh alias or ssh host 

543 for match in _RE_SSH_ALIAS.finditer(text): 

544 alias = _clean_server_alias(match.group(1)) 

545 if _looks_like_server_alias(alias): 

546 aliases.append(alias) 

547 

548 # ssh root@host 

549 for match in _RE_SSH_ROOT_AT.finditer(text): 

550 host = _clean_server_alias(match.group(1)) 

551 if host and _looks_like_server_alias(host) and host not in aliases: 

552 aliases.append(host) 

553 

554 # ssh user@host 

555 for match in _RE_SSH_USER_AT.finditer(text): 

556 host = _clean_server_alias(match.group(2)) 

557 if host and _looks_like_server_alias(host) and host not in aliases: 

558 aliases.append(host) 

559 

560 return aliases 

561 

562 

563def _clean_server_alias(alias: str) -> str: 

564 cleaned = alias.replace("\\n", "").replace("\\r", "") 

565 return cleaned.strip().strip("`'\".,;:!?()[]{}<>") 

566 

567 

568def _looks_like_server_alias(alias: str) -> bool: 

569 cleaned = _clean_server_alias(alias) 

570 lower = cleaned.lower() 

571 if not cleaned or lower in _SSH_NOISE_WORDS: 

572 return False 

573 if re.match(r"^\d{1,3}(?:\.\d{1,3}){3}$", cleaned): 

574 return True 

575 if "." in cleaned or "-" in cleaned: 

576 return True 

577 return False 

578 

579 

580def _find_doc_paths(text: str) -> list[str]: 

581 """Find candidate document file paths in a text string. 

582 

583 Looks for paths ending in known doc extensions. 

584 """ 

585 lower_text = text.lower() 

586 if not any(ext in lower_text for ext in _DOC_EXTENSION_HINTS): 

587 return [] 

588 

589 docs: list[str] = [] 

590 for match in _DOC_PATH_RE.finditer(text): 

591 candidate = _clean_doc_candidate(match.group(0)) 

592 if candidate is not None: 

593 docs.append(candidate) 

594 if len(docs) >= MAX_DOC_MATCHES_PER_TEXT: 

595 return docs 

596 return docs 

597 

598 

599def _clean_doc_candidate(candidate: str) -> str | None: 

600 cleaned = candidate.strip().strip("`[](){}<>").rstrip(".,;:!?\"'") 

601 if not cleaned: 

602 return None 

603 lower = cleaned.lower().replace("\\", "/") 

604 if lower.startswith(("http://", "https://")): 

605 return None 

606 if _is_excluded_document_path(lower): 

607 return None 

608 if any(char in cleaned for char in "<>[]()`{}"): 

609 return None 

610 if re.match(r"^[a-z]:/", lower) or lower.startswith("/"): 

611 return cleaned 

612 if "/" in lower: 

613 return cleaned 

614 if lower in CANONICAL_RELATIVE_DOCS: 

615 return cleaned 

616 return None 

617 

618 

619def _is_excluded_document_path(normalized_lower: str) -> bool: 

620 """Filter agent/tool internals that are not user project documents.""" 

621 return any(fragment in normalized_lower for fragment in EXCLUDED_DOCUMENT_PATH_FRAGMENTS) 

622 

623 

624def _scan_agent_root( 

625 label: str, home: Path, relative_dir: str, sub_dir: str 

626) -> tuple[AgentRoot, list[str]]: 

627 """Scan one agent root directory and return the AgentRoot and extracted texts.""" 

628 path = home / relative_dir / sub_dir 

629 warnings: list[str] = [] 

630 texts: list[str] = [] 

631 

632 if not path.exists(): 

633 return AgentRoot(label=label, path=path, exists=False, warnings=[]), texts 

634 

635 # Gather files and count 

636 files: list[Path] = [] 

637 if path.is_dir(): 

638 files = sorted(p for p in path.rglob("*") if p.is_file()) 

639 elif path.is_file(): 

640 files = [path] 

641 

642 for file_path in files: 

643 try: 

644 suffix = file_path.suffix.lower() 

645 if suffix == ".jsonl": 

646 file_texts, was_truncated = _extract_text_from_jsonl(file_path) 

647 texts.extend(file_texts) 

648 if was_truncated: 

649 warnings.append( 

650 f"{label}/{file_path.name}: truncated at " 

651 f"{MAX_TEXTS_PER_JSONL_FILE} texts" 

652 ) 

653 elif suffix == ".json": 

654 raw, was_truncated = _bounded_read_text(file_path) 

655 if was_truncated: 

656 warnings.append( 

657 f"{label}/{file_path.name}: truncated at " 

658 f"{MAX_FILE_READ_CHARS} chars" 

659 ) 

660 try: 

661 obj = json.loads(raw) 

662 texts.extend(_extract_relevant_strings(obj)) 

663 except json.JSONDecodeError: 

664 texts.append(_trim_text(raw)) 

665 elif suffix in (".md", ".txt", ".yaml", ".yml", ".toml"): 

666 raw, was_truncated = _bounded_read_text(file_path) 

667 if was_truncated: 

668 warnings.append( 

669 f"{label}/{file_path.name}: truncated at " 

670 f"{MAX_FILE_READ_CHARS} chars" 

671 ) 

672 texts.append(_trim_text(raw)) 

673 except Exception as exc: 

674 warnings.append(f"{label}/{file_path.name}: {exc}") 

675 continue 

676 

677 return ( 

678 AgentRoot(label=label, path=path, exists=True, file_count=len(files), warnings=warnings[:10]), 

679 texts, 

680 ) 

681 

682 

683def _deduplicate_projects(candidates: list[CandidateProject]) -> list[CandidateProject]: 

684 """Deduplicate project paths by normalizing and merging discovered_from.""" 

685 seen: dict[str, CandidateProject] = {} 

686 for cand in candidates: 

687 norm = _normalize_discovered_path(cand.root) 

688 key = norm.lower() 

689 if key in seen: 

690 existing = seen[key] 

691 for src in cand.discovered_from: 

692 if src not in existing.discovered_from: 

693 existing.discovered_from.append(src) 

694 else: 

695 seen[key] = CandidateProject(root=norm, discovered_from=list(cand.discovered_from)) 

696 return list(seen.values()) 

697 

698 

699def _safe_is_dir(path: Path) -> bool: 

700 """Return whether a path is a directory without surfacing OS access errors.""" 

701 try: 

702 return path.is_dir() 

703 except OSError: 

704 return False 

705 

706 

707def _safe_exists(path: Path) -> bool: 

708 """Return whether a path exists without surfacing OS access errors.""" 

709 try: 

710 return path.exists() 

711 except OSError: 

712 return False 

713 

714 

715def _normalize_discovered_path(raw_path: str) -> str: 

716 """Normalize a discovered path without mangling remote/nonexistent paths.""" 

717 path = raw_path.strip() 

718 try: 

719 candidate = Path(path) 

720 except (OSError, ValueError): 

721 return path 

722 if _safe_exists(candidate): 

723 try: 

724 return str(candidate.resolve()) 

725 except (OSError, RuntimeError): 

726 return str(candidate) 

727 return path 

728 

729 

730def _deduplicate_documents(candidates: list[CandidateDocument]) -> list[CandidateDocument]: 

731 """Deduplicate document paths.""" 

732 seen: dict[str, CandidateDocument] = {} 

733 for cand in candidates: 

734 key = cand.path.lower() 

735 if key in seen: 

736 for src in cand.discovered_from: 

737 if src not in seen[key].discovered_from: 

738 seen[key].discovered_from.append(src) 

739 else: 

740 seen[key] = CandidateDocument(path=cand.path, discovered_from=list(cand.discovered_from)) 

741 return list(seen.values()) 

742 

743 

744def _deduplicate_servers(servers: list[ServerAlias]) -> list[ServerAlias]: 

745 """Deduplicate server aliases, merging sources.""" 

746 seen: dict[str, ServerAlias] = {} 

747 for srv in servers: 

748 key = srv.alias.lower() 

749 if key in seen: 

750 sources = _split_sources(seen[key].source) 

751 for source in _split_sources(srv.source): 

752 if source not in sources: 

753 sources.append(source) 

754 seen[key].source = ",".join(sources) 

755 else: 

756 seen[key] = srv 

757 # Keep the original casing for display 

758 return list(seen.values()) 

759 

760 

761# ── confidence scoring ──────────────────────────────────────────────── 

762 

763 

764def _split_sources(source: str) -> list[str]: 

765 """Split comma-merged source labels while preserving order.""" 

766 return [item.strip() for item in source.split(",") if item.strip()] 

767 

768 

769def _score_project(candidate: CandidateProject) -> float: 

770 """Compute a deterministic confidence score for a candidate project. 

771 

772 Signals: 

773 - 1.0 per distinct agent root that discovered it (cross-validation) 

774 - +1.0 if the path exists on disk as a directory (verified) 

775 """ 

776 confidence = float(len(candidate.discovered_from)) 

777 if _safe_is_dir(Path(candidate.root)): 

778 confidence += 1.0 

779 return confidence 

780 

781 

782def _score_document(candidate: CandidateDocument) -> float: 

783 """Compute a deterministic confidence score for a candidate document. 

784 

785 Signals: 

786 - 1.0 per distinct agent root that discovered it 

787 - +1.0 if the path exists on disk 

788 - +0.5 if the filename is a canonical doc name (AGENTS.md, CLAUDE.md, README.md) 

789 """ 

790 confidence = float(len(candidate.discovered_from)) 

791 path = Path(candidate.path) 

792 if _is_absolute_path_string(candidate.path) and _safe_exists(path): 

793 confidence += 1.0 

794 if path.name.lower() in CANONICAL_RELATIVE_DOCS: 

795 confidence += 0.5 

796 return confidence 

797 

798 

799def _is_absolute_path_string(path: str) -> bool: 

800 normalized = path.strip().replace("\\", "/") 

801 return bool(re.match(r"^[a-zA-Z]:/", normalized) or normalized.startswith("/")) 

802 

803 

804def _score_server(server: ServerAlias) -> float: 

805 """Compute a deterministic confidence score for a server alias. 

806 

807 Signals: 

808 - 1.0 per distinct agent root that discovered it 

809 - +0.5 if the alias is a fully-qualified name (contains dots) 

810 - +0.5 if the alias contains hyphens (common server naming pattern) 

811 """ 

812 sources = _split_sources(server.source) 

813 confidence = float(len(sources)) 

814 if "." in server.alias: 

815 confidence += 0.5 

816 if "-" in server.alias: 

817 confidence += 0.5 

818 return confidence 

819 

820 

821def _collect_skill_documents(skills_dir: Path, label: str) -> list[CandidateDocument]: 

822 """Directly emit doc files in a skills directory as document candidates. 

823 

824 Unlike normal agent-root scanning (which extracts paths from file *content*), 

825 this promotes the skill files themselves as indexable documents. Only 

826 files with a known doc extension are included; subdirectories are walked 

827 recursively so nested skill layouts (e.g. ``skills/truenex/SKILL.md``) work. 

828 """ 

829 docs: list[CandidateDocument] = [] 

830 if not _safe_is_dir(skills_dir): 

831 return docs 

832 try: 

833 for skill_file in sorted(skills_dir.rglob("*")): 

834 try: 

835 if skill_file.is_file() and skill_file.suffix.lower() in DOC_EXTENSIONS: 

836 docs.append(CandidateDocument(path=str(skill_file), discovered_from=[label])) 

837 except OSError: 

838 continue 

839 except OSError: 

840 pass 

841 return docs 

842 

843 

844def discover_from_agents(home: Path) -> DiscoveryReport: 

845 """Scan agent client directories under *home* and produce a DiscoveryReport. 

846 

847 This scans only the agent roots (`.codex/*`, `.claude/*`) - it does NOT 

848 recursively traverse discovered project directories. 

849 

850 Returns a DiscoveryReport with sections for agent roots, projects, 

851 documents, servers, and warnings. 

852 """ 

853 roots: list[AgentRoot] = [] 

854 projects: list[CandidateProject] = [] 

855 documents: list[CandidateDocument] = [] 

856 servers: list[ServerAlias] = [] 

857 warnings: list[str] = [] 

858 

859 for label, relative_dir, sub_dir in AGENT_ROOTS: 

860 root, texts = _scan_agent_root(label, home, relative_dir, sub_dir) 

861 roots.append(root) 

862 warnings.extend( 

863 f"{label}: {w}" for w in root.warnings 

864 ) 

865 

866 for text in texts: 

867 if _is_known_agent_text_preamble(text): 

868 continue 

869 

870 # Extract project paths 

871 for found_path in _find_paths_in_text(text): 

872 root_path = _project_root_from_path(found_path) 

873 if root_path is None: 

874 continue 

875 projects.append( 

876 CandidateProject(root=root_path, discovered_from=[label]) 

877 ) 

878 

879 # Extract document paths 

880 for doc_path in _find_doc_paths(text): 

881 documents.append( 

882 CandidateDocument(path=doc_path, discovered_from=[label]) 

883 ) 

884 

885 # Extract SSH/server aliases 

886 for alias in _find_ssh_aliases(text): 

887 servers.append(ServerAlias(alias=alias, source=label)) 

888 

889 # Promote .claude/skills/ files directly as document candidates so they are 

890 # always discovered regardless of whether their paths appear in session logs. 

891 documents.extend(_collect_skill_documents(home / ".claude" / "skills", "claude-skills")) 

892 

893 # Deduplicate, then score and rank by confidence (highest first) 

894 projects = _deduplicate_projects(projects) 

895 documents = _deduplicate_documents(documents) 

896 servers = _deduplicate_servers(servers) 

897 

898 for p in projects: 

899 p.confidence = _score_project(p) 

900 for d in documents: 

901 d.confidence = _score_document(d) 

902 for s in servers: 

903 s.confidence = _score_server(s) 

904 

905 projects.sort(key=lambda p: (-p.confidence, p.root.lower())) 

906 documents.sort(key=lambda d: (-d.confidence, d.path.lower())) 

907 servers.sort(key=lambda s: (-s.confidence, s.alias.lower())) 

908 

909 report = DiscoveryReport( 

910 agent_roots=roots, 

911 projects=projects, 

912 documents=documents, 

913 servers=servers, 

914 warnings=warnings[:50], 

915 ) 

916 

917 return report 

918 

919 

920DEFAULT_DISPLAY_LIMIT = 20 

921 

922 

923def _format_header(title: str, count: int) -> str: 

924 return f"\n## {title} ({count})" 

925 

926 

927def _format_project_line(p: CandidateProject) -> str: 

928 sources = ", ".join(p.discovered_from) 

929 exists = " [EXISTS]" if _safe_is_dir(Path(p.root)) else "" 

930 return f"- {p.root}{exists} (conf={p.confidence:.1f}, from: {sources})" 

931 

932 

933def _format_document_line(d: CandidateDocument) -> str: 

934 sources = ", ".join(d.discovered_from) 

935 exists = " [EXISTS]" if _is_absolute_path_string(d.path) and _safe_exists(Path(d.path)) else "" 

936 return f"- {d.path}{exists} (conf={d.confidence:.1f}, from: {sources})" 

937 

938 

939def _format_server_line(s: ServerAlias) -> str: 

940 return f"- {s.alias} (conf={s.confidence:.1f}, from: {s.source})" 

941 

942 

943def _append_candidate_section( 

944 lines: list[str], 

945 candidates: list, 

946 limit: int | None, 

947 formatter, 

948) -> None: 

949 """Append formatted candidate lines with optional truncation note.""" 

950 visible = candidates[:limit] if limit is not None else candidates 

951 for cand in visible: 

952 lines.append(formatter(cand)) 

953 if limit is not None and len(candidates) > limit: 

954 remaining = len(candidates) - limit 

955 lines.append(f" ... and {remaining} more (use --json for full list)") 

956 

957 

958def format_report(report: DiscoveryReport, limit: int | None = DEFAULT_DISPLAY_LIMIT) -> str: 

959 """Format a DiscoveryReport as a human-readable markdown string. 

960 

961 Candidates are ordered by confidence (highest first) with alphabetical 

962 tie-breaking. Each section shows at most *limit* entries with a truncation 

963 note when there are more. Pass limit=None to show all entries. 

964 """ 

965 lines: list[str] = ["# Agent Discovery Report"] 

966 

967 # Agent roots 

968 lines.append(_format_header("Agent Roots", len(report.agent_roots))) 

969 for r in report.agent_roots: 

970 status = "exists" if r.exists else "NOT FOUND" 

971 suffix = f" ({r.file_count} files)" if r.file_count else "" 

972 lines.append(f"- [{status}] {r.label}: {r.path}{suffix}") 

973 for w in r.warnings: 

974 lines.append(f" - Warning: {w}") 

975 

976 # Projects (ranked by confidence, highest first) 

977 lines.append(_format_header("Projects", report.project_count)) 

978 if report.projects: 

979 _append_candidate_section(lines, report.projects, limit, _format_project_line) 

980 else: 

981 lines.append("- (none)") 

982 

983 # Documents (ranked by confidence, highest first) 

984 lines.append(_format_header("Documents", report.document_count)) 

985 if report.documents: 

986 _append_candidate_section(lines, report.documents, limit, _format_document_line) 

987 else: 

988 lines.append("- (none)") 

989 

990 # Servers (ranked by confidence, highest first) 

991 lines.append(_format_header("Servers", report.server_count)) 

992 if report.servers: 

993 _append_candidate_section(lines, report.servers, limit, _format_server_line) 

994 else: 

995 lines.append("- (none)") 

996 

997 # Warnings 

998 lines.append(_format_header("Warnings/Errors", report.warning_count)) 

999 if report.warnings: 

1000 for w in report.warnings: 

1001 lines.append(f"- {w}") 

1002 else: 

1003 lines.append("- (none)") 

1004 

1005 # Summary 

1006 lines.append(f"\n## Summary") 

1007 lines.append(f"- Agent roots: {len(report.agent_roots)}") 

1008 lines.append(f"- Projects discovered: {report.project_count}") 

1009 lines.append(f"- Documents discovered: {report.document_count}") 

1010 lines.append(f"- Servers discovered: {report.server_count}") 

1011 lines.append(f"- Warnings: {report.warning_count}") 

1012 

1013 return "\n".join(lines)