Coverage for src \ truenex_memory \ discovery \ agent_discovery.py: 88%
590 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-19 10:21 +0200
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-19 10:21 +0200
1"""Agent discovery: scan local Codex/Claude roots for projects, docs, servers.
3Does NOT scan the whole PC. Only looks under agent client directories.
4Does NOT mutate the memory database. Discovery only.
5"""
7from __future__ import annotations
9from dataclasses import dataclass, field
10from pathlib import Path
11import json
12import re
14# ── data model ────────────────────────────────────────────────────────
16@dataclass
17class AgentRoot:
18 """A discovered agent client directory."""
19 label: str # e.g. "codex-sessions", "claude-projects"
20 path: Path
21 exists: bool
22 file_count: int = 0
23 warnings: list[str] = field(default_factory=list)
26@dataclass
27class CandidateProject:
28 """A candidate project path discovered from agent data."""
29 root: str
30 discovered_from: list[str] = field(default_factory=list) # agent root labels
31 confidence: float = 0.0
34@dataclass
35class CandidateDocument:
36 """A candidate document path discovered from agent data."""
37 path: str
38 discovered_from: list[str] = field(default_factory=list)
39 confidence: float = 0.0
42@dataclass
43class ServerAlias:
44 """An SSH/server alias discovered from agent data."""
45 alias: str
46 source: str = "agent-history" # agent root label(s), comma-separated after merge
47 confidence: float = 0.0
50@dataclass
51class DiscoveryReport:
52 """Full discovery report with sections and counts."""
53 agent_roots: list[AgentRoot] = field(default_factory=list)
54 projects: list[CandidateProject] = field(default_factory=list)
55 documents: list[CandidateDocument] = field(default_factory=list)
56 servers: list[ServerAlias] = field(default_factory=list)
57 warnings: list[str] = field(default_factory=list)
59 @property
60 def project_count(self) -> int:
61 return len(self.projects)
63 @property
64 def document_count(self) -> int:
65 return len(self.documents)
67 @property
68 def server_count(self) -> int:
69 return len(self.servers)
71 @property
72 def warning_count(self) -> int:
73 return len(self.warnings)
75 def to_dict(self) -> dict[str, object]:
76 return {
77 "agent_roots": [
78 {
79 "label": r.label,
80 "path": str(r.path),
81 "exists": r.exists,
82 "file_count": r.file_count,
83 "warnings": r.warnings,
84 }
85 for r in self.agent_roots
86 ],
87 "projects": [
88 {
89 "root": p.root,
90 "discovered_from": p.discovered_from,
91 "evidence_count": len(p.discovered_from),
92 "confidence": p.confidence,
93 }
94 for p in self.projects
95 ],
96 "documents": [
97 {
98 "path": d.path,
99 "discovered_from": d.discovered_from,
100 "evidence_count": len(d.discovered_from),
101 "confidence": d.confidence,
102 }
103 for d in self.documents
104 ],
105 "servers": [
106 {
107 "alias": s.alias,
108 "source": s.source,
109 "evidence_count": len(_split_sources(s.source)),
110 "confidence": s.confidence,
111 }
112 for s in self.servers
113 ],
114 "warnings": self.warnings,
115 }
118# ── agent root layout ─────────────────────────────────────────────────
120AGENT_ROOTS = [
121 ("codex-sessions", ".codex", "sessions"),
122 ("codex-history", ".codex", "history.jsonl"),
123 ("codex-memories", ".codex", "memories"),
124 ("claude-projects", ".claude", "projects"),
125 ("claude-commands", ".claude", "commands"),
126 ("claude-history", ".claude", "history.jsonl"),
127 ("claude-skills", ".claude", "skills"),
128]
130DOC_EXTENSIONS = frozenset({".md", ".txt", ".json", ".yaml", ".yml", ".toml"})
131MAX_FILE_READ_CHARS = 200_000
132MAX_TEXT_CHARS = 20_000
133MAX_TEXTS_PER_JSONL_FILE = 500
134MAX_PATH_MATCHES_PER_TEXT = 250
135MAX_DOC_MATCHES_PER_TEXT = 250
136RELEVANT_JSON_KEYS = frozenset(
137 {
138 "content",
139 "cwd",
140 "message",
141 "path",
142 "root",
143 "source_path",
144 "summary",
145 "text",
146 }
147)
149# Regex patterns
150_RE_ABS_PATH_WIN = re.compile(r'[A-Za-z]:[\\/][^\s"\'<>|*?]+')
151_RE_ABS_PATH_UNIX = re.compile(r'(?:^|\s)/(?:[^\s"\'*?|]+/)+[^\s"\'*?|]*')
152_RE_SSH_ALIAS = re.compile(r'\bssh\s+(?:root@)?([\w][\w.-]*)\b', re.IGNORECASE)
153_RE_SSH_ROOT_AT = re.compile(r'\bssh\s+root@(\S+)\b', re.IGNORECASE)
154_RE_SSH_USER_AT = re.compile(r'\bssh\s+(\w+)@(\S+)\b', re.IGNORECASE)
155_DOC_EXTENSION_HINTS = tuple(sorted(DOC_EXTENSIONS))
156_DOC_PATH_RE = re.compile(
157 r'[^\s"\'<>|]*\.(?:md|txt|json|ya?ml|toml)',
158 re.IGNORECASE,
159)
160_UNIX_PROJECT_PREFIXES = (
161 "/home/",
162 "/mnt/",
163 "/opt/",
164 "/root/",
165 "/srv/",
166 "/users/",
167 "/var/www/",
168 "/workspace/",
169)
170_PROJECT_PATH_REJECT_CHARS = frozenset("{}<>[]`")
171EXCLUDED_PROJECT_SEGMENTS = frozenset(
172 {
173 ".agents",
174 ".claude",
175 ".codex",
176 ".git",
177 ".mypy_cache",
178 ".pytest_cache",
179 ".tox",
180 ".venv",
181 "__pycache__",
182 "node_modules",
183 "site-packages",
184 "venv",
185 }
186)
187WINDOWS_PROJECT_ANCHORS = frozenset({"projectpy", "software", "sofware"})
188WINDOWS_USER_PROJECT_ANCHORS = frozenset({"documents", "documenti", "projects", "repos"})
189WINDOWS_PROJECT_STOP_SEGMENTS = frozenset({"docs", "src", "tests", "memory", "diary"})
190WINDOWS_NON_PROJECT_SEGMENTS = frozenset(
191 {
192 ".cursor",
193 ".ssh",
194 "appdata",
195 "codex_tmp",
196 "downloads",
197 "system32",
198 "tmp",
199 "windows",
200 }
201)
202CANONICAL_RELATIVE_DOCS = frozenset({"agents.md", "claude.md", "readme.md"})
203EXCLUDED_DOCUMENT_PATH_FRAGMENTS = (
204 ".agent/",
205 ".agents/skills/",
206 ".claude/shell-snapshots/",
207 ".codex/skills/",
208 "/.agent/",
209 "/.agents/skills/",
210 "/.claude/shell-snapshots/",
211 "/.codex/skills/",
212)
214# Common English words that can follow "SSH" in prose but aren't aliases
215_SSH_NOISE_WORDS = frozenset({
216 "to", "the", "a", "an", "is", "it", "of", "in", "on", "at", "and",
217 "or", "not", "no", "with", "for", "from", "by", "as", "be", "we",
218 "key", "agent", "add", "config", "user", "root", "host", "server",
219 "connection", "using", "via", "into", "references",
220 "double-quoted", "read-only",
221})
224# ── discovery ─────────────────────────────────────────────────────────
226def _is_known_agent_text_preamble(text: str) -> bool:
227 """Skip internal agent instructions that aren't real user paths."""
228 return any(
229 text.startswith(prefix)
230 for prefix in (
231 "<environment",
232 "<system",
233 "<developer",
234 "<instructions",
235 "<agent",
236 "<turn_aborted",
237 )
238 )
241def _extract_strings(obj: object) -> list[str]:
242 """Recursively extract all string values from a parsed JSON object."""
243 strings: list[str] = []
244 if isinstance(obj, str):
245 strings.append(obj)
246 elif isinstance(obj, dict):
247 for v in obj.values():
248 strings.extend(_extract_strings(v))
249 elif isinstance(obj, list):
250 for item in obj:
251 strings.extend(_extract_strings(item))
252 return strings
255def _extract_relevant_strings(obj: object, *, parent_key: str = "") -> list[str]:
256 """Extract only likely human/project strings from a parsed JSON object."""
257 strings: list[str] = []
258 if isinstance(obj, str):
259 if parent_key in RELEVANT_JSON_KEYS:
260 strings.append(_trim_text(obj))
261 elif isinstance(obj, dict):
262 for key, value in obj.items():
263 lower_key = str(key).lower()
264 if lower_key in ("tool_call", "tool_calls", "tool_result", "tool_use", "input"):
265 continue
266 if lower_key in RELEVANT_JSON_KEYS:
267 strings.extend(_extract_relevant_strings(value, parent_key=lower_key))
268 elif isinstance(value, (dict, list)):
269 strings.extend(_extract_relevant_strings(value, parent_key=lower_key))
270 elif isinstance(obj, list):
271 for item in obj:
272 strings.extend(_extract_relevant_strings(item, parent_key=parent_key))
273 return [text for text in strings if text]
276def _trim_text(text: str) -> str:
277 stripped = text.strip()
278 if len(stripped) <= MAX_TEXT_CHARS:
279 return stripped
280 return stripped[:MAX_TEXT_CHARS]
283def _bounded_read_text(file_path: Path) -> tuple[str, bool]:
284 """Read at most *MAX_FILE_READ_CHARS* from *file_path*.
286 Returns ``(text, was_truncated)``. *was_truncated* is ``True`` when the
287 file contains more data beyond the read limit.
288 """
289 text_parts: list[str] = []
290 total_chars = 0
291 truncated = False
292 try:
293 with open(file_path, encoding="utf-8", errors="replace") as f:
294 while True:
295 chunk = f.read(8192)
296 if not chunk:
297 break
298 text_parts.append(chunk)
299 total_chars += len(chunk)
300 if total_chars >= MAX_FILE_READ_CHARS:
301 extra = f.read(1)
302 truncated = bool(extra)
303 break
304 full_text = "".join(text_parts)
305 if truncated:
306 full_text = full_text[:MAX_FILE_READ_CHARS]
307 return (full_text, truncated)
308 except OSError:
309 return ("", False)
312def _extract_text_from_jsonl(file_path: Path) -> tuple[list[str], bool]:
313 """Stream and parse a JSONL file line-by-line.
315 Returns ``(texts, was_truncated)``. Never reads the whole file into
316 memory at once. *was_truncated* is ``True`` when the file had more
317 data after *MAX_TEXTS_PER_JSONL_FILE* texts were collected.
318 """
319 texts: list[str] = []
320 truncated = False
321 try:
322 f = open(file_path, encoding="utf-8", errors="replace")
323 except OSError:
324 return (texts, False)
326 with f:
327 for line in f:
328 if len(texts) >= MAX_TEXTS_PER_JSONL_FILE:
329 try:
330 next(f)
331 truncated = True
332 except StopIteration:
333 pass
334 break
336 stripped = line.strip()
337 if not stripped:
338 continue
339 try:
340 obj = json.loads(stripped)
341 except json.JSONDecodeError:
342 continue
343 if not isinstance(obj, dict):
344 continue
346 # Skip developer/system instructions in older flattened schemas.
347 role = str(obj.get("role", "")).lower()
348 msg_type = str(obj.get("type", "")).lower()
349 if role == "developer" or role == "system":
350 continue
351 if msg_type == "system":
352 continue
354 payload = obj.get("payload")
355 if isinstance(payload, dict):
356 payload_role = str(payload.get("role", "")).lower()
357 if payload_role in ("developer", "system"):
358 continue
360 # Extract cwd from session_meta payloads (Codex-style).
361 cwd = payload.get("cwd")
362 if isinstance(cwd, str) and cwd.strip():
363 texts.append(_trim_text(cwd))
365 if msg_type == "response_item" and str(payload.get("type", "")).lower() == "message":
366 content_text = _extract_message_content_text(payload.get("content"))
367 if content_text:
368 texts.append(content_text)
369 continue
371 if msg_type == "event_msg" and str(payload.get("type", "")).lower() == "user_message":
372 message = payload.get("message")
373 if isinstance(message, str) and message.strip():
374 texts.append(_trim_text(message))
375 continue
377 message = obj.get("message")
378 if isinstance(message, dict):
379 message_role = str(message.get("role", "")).lower()
380 if message_role in ("developer", "system"):
381 continue
382 content_text = _extract_message_content_text(message.get("content"))
383 if content_text:
384 texts.append(content_text)
385 continue
387 # Fallback for simple JSONL history entries. This is intentionally
388 # selective so tool payloads are not treated as prose.
389 texts.extend(_extract_relevant_strings(obj))
390 if len(texts) >= MAX_TEXTS_PER_JSONL_FILE:
391 texts = texts[:MAX_TEXTS_PER_JSONL_FILE]
392 try:
393 next(f)
394 truncated = True
395 except StopIteration:
396 pass
397 break
399 return (texts, truncated)
402def _extract_message_content_text(content: object) -> str:
403 """Extract human prose from a message content field."""
404 if isinstance(content, str):
405 return _trim_text(content)
406 if not isinstance(content, list):
407 return ""
409 parts: list[str] = []
410 for block in content:
411 if not isinstance(block, dict):
412 continue
413 block_type = str(block.get("type", "")).lower()
414 if block_type in ("tool_use", "tool_call", "tool_result", "tool"):
415 continue
416 text = block.get("text")
417 if text is None:
418 text = block.get("content")
419 if isinstance(text, str) and text.strip():
420 parts.append(_trim_text(text))
421 return _trim_text("\n\n".join(parts))
424def _find_paths_in_text(text: str) -> list[str]:
425 """Find candidate absolute paths in a text string."""
426 found: list[str] = []
427 for match in _RE_ABS_PATH_WIN.finditer(text):
428 p = _clean_candidate_path(match.group(0))
429 if len(p) >= 3 and any(c in p for c in "/\\"):
430 found.append(p)
431 if len(found) >= MAX_PATH_MATCHES_PER_TEXT:
432 return found
433 for match in _RE_ABS_PATH_UNIX.finditer(text):
434 p = _clean_candidate_path(match.group(0))
435 if len(p) >= 3 and p.count("/") >= 1:
436 found.append(p)
437 if len(found) >= MAX_PATH_MATCHES_PER_TEXT:
438 return found
439 return found
442def _clean_candidate_path(path: str) -> str:
443 return path.strip().strip("`[](){}<>").rstrip(".,;:!?\"'")
446def _looks_like_project_root_path(path: str) -> bool:
447 """Return whether a non-existing path looks like a project/root path.
449 Discovery sees a lot of API routes and code snippets in agent sessions. They
450 are useful context, but they are not project source roots.
451 """
452 cleaned = path.strip()
453 if any(char in cleaned for char in _PROJECT_PATH_REJECT_CHARS):
454 return False
455 if Path(cleaned).suffix:
456 return False
457 normalized = cleaned.replace("\\", "/")
458 lower = normalized.lower()
459 if re.match(r"^[a-z]:/", lower):
460 return len(Path(cleaned).parts) >= 3
461 parts = [part for part in lower.split("/") if part]
462 if lower.startswith("/home/") or lower.startswith("/users/"):
463 return len(parts) >= 3
464 if lower.startswith("/opt/") or lower.startswith("/srv/") or lower.startswith("/workspace/"):
465 return len(parts) >= 2
466 if lower.startswith("/var/www/"):
467 return len(parts) >= 3
468 if lower.startswith("/mnt/"):
469 return len(parts) >= 4
470 return False
473def _project_root_from_path(path: str) -> str | None:
474 """Infer a useful project/root directory from a discovered path."""
475 cleaned = _clean_candidate_path(path)
476 if not cleaned or any(char in cleaned for char in _PROJECT_PATH_REJECT_CHARS):
477 return None
478 if len(re.findall(r"[A-Za-z]:", cleaned)) > 1:
479 return None
481 normalized = cleaned.replace("\\", "/")
482 parts = [part for part in normalized.split("/") if part]
483 lowered_parts = [part.lower() for part in parts]
484 if lowered_parts and lowered_parts[0] in ("a:", "b:"):
485 return None
486 if any(part in {".agents", ".claude", ".codex"} for part in lowered_parts):
487 return None
488 if any(part in WINDOWS_NON_PROJECT_SEGMENTS for part in lowered_parts):
489 return None
490 if any(part in EXCLUDED_PROJECT_SEGMENTS for part in lowered_parts):
491 # Keep the parent project if the excluded segment appears under it.
492 first_excluded = next(
493 index for index, part in enumerate(lowered_parts)
494 if part in EXCLUDED_PROJECT_SEGMENTS
495 )
496 parts = parts[:first_excluded]
497 lowered_parts = lowered_parts[:first_excluded]
498 if not parts:
499 return None
501 if re.match(r"^[a-z]:", normalized.lower()):
502 if lowered_parts[0] not in ("c:", "d:"):
503 return None
504 for index, part in enumerate(lowered_parts):
505 if part in WINDOWS_PROJECT_ANCHORS and index + 1 < len(parts):
506 return _join_windows_parts(parts[: index + 2])
507 if part in WINDOWS_USER_PROJECT_ANCHORS and index + 1 < len(parts):
508 return _join_windows_parts(parts[: index + 2])
509 for index, part in enumerate(lowered_parts):
510 if part in WINDOWS_PROJECT_STOP_SEGMENTS and index >= 1:
511 return _join_windows_parts(parts[:index])
512 return None
514 lower = normalized.lower()
515 if lower.startswith("/home/") or lower.startswith("/users/"):
516 if len(parts) >= 3:
517 return "/" + "/".join(parts[:3])
518 if lower.startswith("/opt/") or lower.startswith("/srv/") or lower.startswith("/workspace/"):
519 if len(parts) >= 2:
520 return "/" + "/".join(parts[:2])
521 if lower.startswith("/var/www/"):
522 if len(parts) >= 3:
523 return "/" + "/".join(parts[:3])
524 if lower.startswith("/mnt/") and len(parts) >= 4:
525 return "/" + "/".join(parts[:4])
526 return cleaned if _looks_like_project_root_path(cleaned) else None
529def _join_windows_parts(parts: list[str]) -> str:
530 if not parts:
531 return ""
532 first = parts[0]
533 if first.endswith(":"):
534 return first + "\\" + "\\".join(parts[1:])
535 return "\\".join(parts)
538def _find_ssh_aliases(text: str) -> list[str]:
539 """Find SSH aliases and hosts in a text string."""
540 aliases: list[str] = []
542 # ssh alias or ssh host
543 for match in _RE_SSH_ALIAS.finditer(text):
544 alias = _clean_server_alias(match.group(1))
545 if _looks_like_server_alias(alias):
546 aliases.append(alias)
548 # ssh root@host
549 for match in _RE_SSH_ROOT_AT.finditer(text):
550 host = _clean_server_alias(match.group(1))
551 if host and _looks_like_server_alias(host) and host not in aliases:
552 aliases.append(host)
554 # ssh user@host
555 for match in _RE_SSH_USER_AT.finditer(text):
556 host = _clean_server_alias(match.group(2))
557 if host and _looks_like_server_alias(host) and host not in aliases:
558 aliases.append(host)
560 return aliases
563def _clean_server_alias(alias: str) -> str:
564 cleaned = alias.replace("\\n", "").replace("\\r", "")
565 return cleaned.strip().strip("`'\".,;:!?()[]{}<>")
568def _looks_like_server_alias(alias: str) -> bool:
569 cleaned = _clean_server_alias(alias)
570 lower = cleaned.lower()
571 if not cleaned or lower in _SSH_NOISE_WORDS:
572 return False
573 if re.match(r"^\d{1,3}(?:\.\d{1,3}){3}$", cleaned):
574 return True
575 if "." in cleaned or "-" in cleaned:
576 return True
577 return False
580def _find_doc_paths(text: str) -> list[str]:
581 """Find candidate document file paths in a text string.
583 Looks for paths ending in known doc extensions.
584 """
585 lower_text = text.lower()
586 if not any(ext in lower_text for ext in _DOC_EXTENSION_HINTS):
587 return []
589 docs: list[str] = []
590 for match in _DOC_PATH_RE.finditer(text):
591 candidate = _clean_doc_candidate(match.group(0))
592 if candidate is not None:
593 docs.append(candidate)
594 if len(docs) >= MAX_DOC_MATCHES_PER_TEXT:
595 return docs
596 return docs
599def _clean_doc_candidate(candidate: str) -> str | None:
600 cleaned = candidate.strip().strip("`[](){}<>").rstrip(".,;:!?\"'")
601 if not cleaned:
602 return None
603 lower = cleaned.lower().replace("\\", "/")
604 if lower.startswith(("http://", "https://")):
605 return None
606 if _is_excluded_document_path(lower):
607 return None
608 if any(char in cleaned for char in "<>[]()`{}"):
609 return None
610 if re.match(r"^[a-z]:/", lower) or lower.startswith("/"):
611 return cleaned
612 if "/" in lower:
613 return cleaned
614 if lower in CANONICAL_RELATIVE_DOCS:
615 return cleaned
616 return None
619def _is_excluded_document_path(normalized_lower: str) -> bool:
620 """Filter agent/tool internals that are not user project documents."""
621 return any(fragment in normalized_lower for fragment in EXCLUDED_DOCUMENT_PATH_FRAGMENTS)
624def _scan_agent_root(
625 label: str, home: Path, relative_dir: str, sub_dir: str
626) -> tuple[AgentRoot, list[str]]:
627 """Scan one agent root directory and return the AgentRoot and extracted texts."""
628 path = home / relative_dir / sub_dir
629 warnings: list[str] = []
630 texts: list[str] = []
632 if not path.exists():
633 return AgentRoot(label=label, path=path, exists=False, warnings=[]), texts
635 # Gather files and count
636 files: list[Path] = []
637 if path.is_dir():
638 files = sorted(p for p in path.rglob("*") if p.is_file())
639 elif path.is_file():
640 files = [path]
642 for file_path in files:
643 try:
644 suffix = file_path.suffix.lower()
645 if suffix == ".jsonl":
646 file_texts, was_truncated = _extract_text_from_jsonl(file_path)
647 texts.extend(file_texts)
648 if was_truncated:
649 warnings.append(
650 f"{label}/{file_path.name}: truncated at "
651 f"{MAX_TEXTS_PER_JSONL_FILE} texts"
652 )
653 elif suffix == ".json":
654 raw, was_truncated = _bounded_read_text(file_path)
655 if was_truncated:
656 warnings.append(
657 f"{label}/{file_path.name}: truncated at "
658 f"{MAX_FILE_READ_CHARS} chars"
659 )
660 try:
661 obj = json.loads(raw)
662 texts.extend(_extract_relevant_strings(obj))
663 except json.JSONDecodeError:
664 texts.append(_trim_text(raw))
665 elif suffix in (".md", ".txt", ".yaml", ".yml", ".toml"):
666 raw, was_truncated = _bounded_read_text(file_path)
667 if was_truncated:
668 warnings.append(
669 f"{label}/{file_path.name}: truncated at "
670 f"{MAX_FILE_READ_CHARS} chars"
671 )
672 texts.append(_trim_text(raw))
673 except Exception as exc:
674 warnings.append(f"{label}/{file_path.name}: {exc}")
675 continue
677 return (
678 AgentRoot(label=label, path=path, exists=True, file_count=len(files), warnings=warnings[:10]),
679 texts,
680 )
683def _deduplicate_projects(candidates: list[CandidateProject]) -> list[CandidateProject]:
684 """Deduplicate project paths by normalizing and merging discovered_from."""
685 seen: dict[str, CandidateProject] = {}
686 for cand in candidates:
687 norm = _normalize_discovered_path(cand.root)
688 key = norm.lower()
689 if key in seen:
690 existing = seen[key]
691 for src in cand.discovered_from:
692 if src not in existing.discovered_from:
693 existing.discovered_from.append(src)
694 else:
695 seen[key] = CandidateProject(root=norm, discovered_from=list(cand.discovered_from))
696 return list(seen.values())
699def _safe_is_dir(path: Path) -> bool:
700 """Return whether a path is a directory without surfacing OS access errors."""
701 try:
702 return path.is_dir()
703 except OSError:
704 return False
707def _safe_exists(path: Path) -> bool:
708 """Return whether a path exists without surfacing OS access errors."""
709 try:
710 return path.exists()
711 except OSError:
712 return False
715def _normalize_discovered_path(raw_path: str) -> str:
716 """Normalize a discovered path without mangling remote/nonexistent paths."""
717 path = raw_path.strip()
718 try:
719 candidate = Path(path)
720 except (OSError, ValueError):
721 return path
722 if _safe_exists(candidate):
723 try:
724 return str(candidate.resolve())
725 except (OSError, RuntimeError):
726 return str(candidate)
727 return path
730def _deduplicate_documents(candidates: list[CandidateDocument]) -> list[CandidateDocument]:
731 """Deduplicate document paths."""
732 seen: dict[str, CandidateDocument] = {}
733 for cand in candidates:
734 key = cand.path.lower()
735 if key in seen:
736 for src in cand.discovered_from:
737 if src not in seen[key].discovered_from:
738 seen[key].discovered_from.append(src)
739 else:
740 seen[key] = CandidateDocument(path=cand.path, discovered_from=list(cand.discovered_from))
741 return list(seen.values())
744def _deduplicate_servers(servers: list[ServerAlias]) -> list[ServerAlias]:
745 """Deduplicate server aliases, merging sources."""
746 seen: dict[str, ServerAlias] = {}
747 for srv in servers:
748 key = srv.alias.lower()
749 if key in seen:
750 sources = _split_sources(seen[key].source)
751 for source in _split_sources(srv.source):
752 if source not in sources:
753 sources.append(source)
754 seen[key].source = ",".join(sources)
755 else:
756 seen[key] = srv
757 # Keep the original casing for display
758 return list(seen.values())
761# ── confidence scoring ────────────────────────────────────────────────
764def _split_sources(source: str) -> list[str]:
765 """Split comma-merged source labels while preserving order."""
766 return [item.strip() for item in source.split(",") if item.strip()]
769def _score_project(candidate: CandidateProject) -> float:
770 """Compute a deterministic confidence score for a candidate project.
772 Signals:
773 - 1.0 per distinct agent root that discovered it (cross-validation)
774 - +1.0 if the path exists on disk as a directory (verified)
775 """
776 confidence = float(len(candidate.discovered_from))
777 if _safe_is_dir(Path(candidate.root)):
778 confidence += 1.0
779 return confidence
782def _score_document(candidate: CandidateDocument) -> float:
783 """Compute a deterministic confidence score for a candidate document.
785 Signals:
786 - 1.0 per distinct agent root that discovered it
787 - +1.0 if the path exists on disk
788 - +0.5 if the filename is a canonical doc name (AGENTS.md, CLAUDE.md, README.md)
789 """
790 confidence = float(len(candidate.discovered_from))
791 path = Path(candidate.path)
792 if _is_absolute_path_string(candidate.path) and _safe_exists(path):
793 confidence += 1.0
794 if path.name.lower() in CANONICAL_RELATIVE_DOCS:
795 confidence += 0.5
796 return confidence
799def _is_absolute_path_string(path: str) -> bool:
800 normalized = path.strip().replace("\\", "/")
801 return bool(re.match(r"^[a-zA-Z]:/", normalized) or normalized.startswith("/"))
804def _score_server(server: ServerAlias) -> float:
805 """Compute a deterministic confidence score for a server alias.
807 Signals:
808 - 1.0 per distinct agent root that discovered it
809 - +0.5 if the alias is a fully-qualified name (contains dots)
810 - +0.5 if the alias contains hyphens (common server naming pattern)
811 """
812 sources = _split_sources(server.source)
813 confidence = float(len(sources))
814 if "." in server.alias:
815 confidence += 0.5
816 if "-" in server.alias:
817 confidence += 0.5
818 return confidence
821def _collect_skill_documents(skills_dir: Path, label: str) -> list[CandidateDocument]:
822 """Directly emit doc files in a skills directory as document candidates.
824 Unlike normal agent-root scanning (which extracts paths from file *content*),
825 this promotes the skill files themselves as indexable documents. Only
826 files with a known doc extension are included; subdirectories are walked
827 recursively so nested skill layouts (e.g. ``skills/truenex/SKILL.md``) work.
828 """
829 docs: list[CandidateDocument] = []
830 if not _safe_is_dir(skills_dir):
831 return docs
832 try:
833 for skill_file in sorted(skills_dir.rglob("*")):
834 try:
835 if skill_file.is_file() and skill_file.suffix.lower() in DOC_EXTENSIONS:
836 docs.append(CandidateDocument(path=str(skill_file), discovered_from=[label]))
837 except OSError:
838 continue
839 except OSError:
840 pass
841 return docs
844def discover_from_agents(home: Path) -> DiscoveryReport:
845 """Scan agent client directories under *home* and produce a DiscoveryReport.
847 This scans only the agent roots (`.codex/*`, `.claude/*`) - it does NOT
848 recursively traverse discovered project directories.
850 Returns a DiscoveryReport with sections for agent roots, projects,
851 documents, servers, and warnings.
852 """
853 roots: list[AgentRoot] = []
854 projects: list[CandidateProject] = []
855 documents: list[CandidateDocument] = []
856 servers: list[ServerAlias] = []
857 warnings: list[str] = []
859 for label, relative_dir, sub_dir in AGENT_ROOTS:
860 root, texts = _scan_agent_root(label, home, relative_dir, sub_dir)
861 roots.append(root)
862 warnings.extend(
863 f"{label}: {w}" for w in root.warnings
864 )
866 for text in texts:
867 if _is_known_agent_text_preamble(text):
868 continue
870 # Extract project paths
871 for found_path in _find_paths_in_text(text):
872 root_path = _project_root_from_path(found_path)
873 if root_path is None:
874 continue
875 projects.append(
876 CandidateProject(root=root_path, discovered_from=[label])
877 )
879 # Extract document paths
880 for doc_path in _find_doc_paths(text):
881 documents.append(
882 CandidateDocument(path=doc_path, discovered_from=[label])
883 )
885 # Extract SSH/server aliases
886 for alias in _find_ssh_aliases(text):
887 servers.append(ServerAlias(alias=alias, source=label))
889 # Promote .claude/skills/ files directly as document candidates so they are
890 # always discovered regardless of whether their paths appear in session logs.
891 documents.extend(_collect_skill_documents(home / ".claude" / "skills", "claude-skills"))
893 # Deduplicate, then score and rank by confidence (highest first)
894 projects = _deduplicate_projects(projects)
895 documents = _deduplicate_documents(documents)
896 servers = _deduplicate_servers(servers)
898 for p in projects:
899 p.confidence = _score_project(p)
900 for d in documents:
901 d.confidence = _score_document(d)
902 for s in servers:
903 s.confidence = _score_server(s)
905 projects.sort(key=lambda p: (-p.confidence, p.root.lower()))
906 documents.sort(key=lambda d: (-d.confidence, d.path.lower()))
907 servers.sort(key=lambda s: (-s.confidence, s.alias.lower()))
909 report = DiscoveryReport(
910 agent_roots=roots,
911 projects=projects,
912 documents=documents,
913 servers=servers,
914 warnings=warnings[:50],
915 )
917 return report
920DEFAULT_DISPLAY_LIMIT = 20
923def _format_header(title: str, count: int) -> str:
924 return f"\n## {title} ({count})"
927def _format_project_line(p: CandidateProject) -> str:
928 sources = ", ".join(p.discovered_from)
929 exists = " [EXISTS]" if _safe_is_dir(Path(p.root)) else ""
930 return f"- {p.root}{exists} (conf={p.confidence:.1f}, from: {sources})"
933def _format_document_line(d: CandidateDocument) -> str:
934 sources = ", ".join(d.discovered_from)
935 exists = " [EXISTS]" if _is_absolute_path_string(d.path) and _safe_exists(Path(d.path)) else ""
936 return f"- {d.path}{exists} (conf={d.confidence:.1f}, from: {sources})"
939def _format_server_line(s: ServerAlias) -> str:
940 return f"- {s.alias} (conf={s.confidence:.1f}, from: {s.source})"
943def _append_candidate_section(
944 lines: list[str],
945 candidates: list,
946 limit: int | None,
947 formatter,
948) -> None:
949 """Append formatted candidate lines with optional truncation note."""
950 visible = candidates[:limit] if limit is not None else candidates
951 for cand in visible:
952 lines.append(formatter(cand))
953 if limit is not None and len(candidates) > limit:
954 remaining = len(candidates) - limit
955 lines.append(f" ... and {remaining} more (use --json for full list)")
958def format_report(report: DiscoveryReport, limit: int | None = DEFAULT_DISPLAY_LIMIT) -> str:
959 """Format a DiscoveryReport as a human-readable markdown string.
961 Candidates are ordered by confidence (highest first) with alphabetical
962 tie-breaking. Each section shows at most *limit* entries with a truncation
963 note when there are more. Pass limit=None to show all entries.
964 """
965 lines: list[str] = ["# Agent Discovery Report"]
967 # Agent roots
968 lines.append(_format_header("Agent Roots", len(report.agent_roots)))
969 for r in report.agent_roots:
970 status = "exists" if r.exists else "NOT FOUND"
971 suffix = f" ({r.file_count} files)" if r.file_count else ""
972 lines.append(f"- [{status}] {r.label}: {r.path}{suffix}")
973 for w in r.warnings:
974 lines.append(f" - Warning: {w}")
976 # Projects (ranked by confidence, highest first)
977 lines.append(_format_header("Projects", report.project_count))
978 if report.projects:
979 _append_candidate_section(lines, report.projects, limit, _format_project_line)
980 else:
981 lines.append("- (none)")
983 # Documents (ranked by confidence, highest first)
984 lines.append(_format_header("Documents", report.document_count))
985 if report.documents:
986 _append_candidate_section(lines, report.documents, limit, _format_document_line)
987 else:
988 lines.append("- (none)")
990 # Servers (ranked by confidence, highest first)
991 lines.append(_format_header("Servers", report.server_count))
992 if report.servers:
993 _append_candidate_section(lines, report.servers, limit, _format_server_line)
994 else:
995 lines.append("- (none)")
997 # Warnings
998 lines.append(_format_header("Warnings/Errors", report.warning_count))
999 if report.warnings:
1000 for w in report.warnings:
1001 lines.append(f"- {w}")
1002 else:
1003 lines.append("- (none)")
1005 # Summary
1006 lines.append(f"\n## Summary")
1007 lines.append(f"- Agent roots: {len(report.agent_roots)}")
1008 lines.append(f"- Projects discovered: {report.project_count}")
1009 lines.append(f"- Documents discovered: {report.document_count}")
1010 lines.append(f"- Servers discovered: {report.server_count}")
1011 lines.append(f"- Warnings: {report.warning_count}")
1013 return "\n".join(lines)