Coverage for src / documint_mcp / cli.py: 0%
973 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-29 10:41 -0400
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-29 10:41 -0400
1"""CLI entrypoint for hosted Documint workflows."""
3from __future__ import annotations
5import argparse
6import fnmatch
7import hashlib
8import json
9import os
10import signal
11import sys
12import time
13from datetime import UTC, datetime
14from pathlib import Path
15from typing import Any, cast
16from urllib.parse import parse_qs, urlparse
18import httpx
20from . import cli_ui as ui
21from .config import settings
23CLI_CONFIG_PATH = Path.home() / ".documint" / "cli.json"
25# When True, suppress formatted output and emit raw JSON only.
26_JSON_MODE = False
28# Watch state debounce: avoid N disk writes during batch operations.
29_watch_state_dirty = False
30_watch_last_save: float = 0.0
32# Known subcommand names for natural-language detection bypass.
33_KNOWN_COMMANDS = frozenset({
34 "init", "signup", "login", "workspaces", "projects", "scan", "drift", "patches",
35 "publish", "traces", "db", "worker", "mint-generate", "mint-export",
36 "watch", "ci", "coverage", "propose", "preview", "trace", "chat",
37})
40def _is_tty() -> bool:
41 """Return True when stdout is a TTY and JSON mode is not forced."""
42 return not _JSON_MODE and hasattr(sys.stdout, "isatty") and sys.stdout.isatty()
45def _emit(payload: Any) -> None:
46 """Emit raw JSON to stdout (always, for test compatibility)."""
47 print(json.dumps(payload, indent=2))
50def _load_cli_config() -> dict[str, Any]:
51 if not CLI_CONFIG_PATH.exists():
52 return {}
53 try:
54 return json.loads(CLI_CONFIG_PATH.read_text(encoding="utf-8"))
55 except json.JSONDecodeError:
56 return {}
59def _save_cli_config(payload: dict[str, Any]) -> None:
60 CLI_CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True)
61 CLI_CONFIG_PATH.write_text(json.dumps(payload, indent=2), encoding="utf-8")
64def _api_url(args: argparse.Namespace) -> str:
65 config = _load_cli_config()
66 return str(
67 getattr(args, "api_url", None)
68 or config.get("api_url")
69 or os.getenv("DOCUMINT_API_URL")
70 or settings.api_base_url
71 ).rstrip("/")
74def _token(args: argparse.Namespace) -> str | None:
75 config = _load_cli_config()
76 explicit = getattr(args, "token", None)
77 return (
78 explicit
79 or config.get("token")
80 or os.getenv("DOCUMINT_TOKEN")
81 or os.getenv("DOCUMINT_AUTH_TOKEN")
82 or settings.auth_token
83 )
86def _headers(args: argparse.Namespace) -> dict[str, str]:
87 token = _token(args)
88 if not token:
89 return {}
90 return {"Authorization": f"Bearer {token}"}
93def _request(
94 args: argparse.Namespace,
95 method: str,
96 path: str,
97 *,
98 json_body: dict[str, Any] | None = None,
99) -> Any:
100 try:
101 with httpx.Client(timeout=20.0) as client:
102 response = client.request(
103 method,
104 f"{_api_url(args)}{path}",
105 headers=_headers(args),
106 json=json_body,
107 )
108 response.raise_for_status()
109 return response.json()
110 except (httpx.ConnectError, httpx.TimeoutException, httpx.HTTPStatusError):
111 return _local_request(method, path, json_body=json_body)
114def _local_request(
115 method: str,
116 path: str,
117 *,
118 json_body: dict[str, Any] | None = None,
119) -> Any:
120 from fastapi.encoders import jsonable_encoder
122 from .models import DriftJobRequest, ProjectCreateRequest
123 from .repository import get_service
125 service = get_service()
126 parsed = urlparse(path)
127 route = parsed.path
128 query = parse_qs(parsed.query)
129 if method == "GET" and route.startswith("/projects/"):
130 project_id = route.rsplit("/", 1)[-1]
131 return jsonable_encoder(service.get_project(project_id))
132 if method == "GET" and route == "/workspaces":
133 return jsonable_encoder(service.list_workspaces())
134 if method == "POST" and route == "/admin/bootstrap-self":
135 return jsonable_encoder(service.bootstrap_self())
136 if method == "GET" and route == "/projects":
137 workspace_id = query.get("workspace_id", [None])[0]
138 return jsonable_encoder(service.list_projects(workspace_id))
139 if (
140 method == "GET"
141 and route.startswith("/integrations/github/installations/")
142 and route.endswith("/repositories")
143 ):
144 installation_id = route.split("/")[3]
145 return jsonable_encoder(service.list_installation_repositories(installation_id))
146 if (
147 method == "POST"
148 and route.startswith("/integrations/github/installations/")
149 and route.endswith("/sync")
150 ):
151 installation_id = route.split("/")[3]
152 return jsonable_encoder(service.sync_installation(installation_id))
153 if method == "POST" and route == "/projects" and json_body is not None:
154 return jsonable_encoder(
155 service.create_project(ProjectCreateRequest(**json_body))
156 )
157 if method == "POST" and route == "/jobs/drift" and json_body is not None:
158 return jsonable_encoder(service.run_drift(DriftJobRequest(**json_body)))
159 if method == "POST" and route == "/jobs/publish" and json_body is not None:
160 return jsonable_encoder(service.publish_preview(str(json_body["project_id"])))
161 if method == "GET" and route.startswith("/artifacts/") and route.endswith("/trace"):
162 artifact_id = route.split("/")[2]
163 project_id = (
164 query.get("project_id", [settings.project_id])[0] or settings.project_id
165 )
166 return jsonable_encoder(service.get_artifact_trace(artifact_id, project_id))
167 if method == "POST" and "/findings/" in route and route.endswith("/patch"):
168 parts = route.strip("/").split("/")
169 project_id = parts[1]
170 finding_id = parts[3]
171 policy = str((json_body or {}).get("policy", "on_demand"))
172 return jsonable_encoder(
173 service.generate_doc_patch(
174 project_id=project_id, finding_id=finding_id, policy=policy
175 )
176 )
177 if method == "POST" and route == "/mcp" and json_body is not None:
178 params = json_body.get("params", {})
179 arguments = params.get("arguments", {}) if isinstance(params, dict) else {}
180 if params.get("name") == "generate_doc_patch" and isinstance(arguments, dict):
181 return {
182 "result": {
183 "structuredContent": jsonable_encoder(
184 service.generate_doc_patch(
185 project_id=arguments.get("project_id"),
186 artifact_id=arguments.get("artifact_id"),
187 finding_id=arguments.get("finding_id"),
188 policy=arguments.get("policy", "on_demand"),
189 )
190 )
191 }
192 }
193 if _is_tty():
194 ui.friendly_error(
195 f"Unsupported local fallback: {method} {path}",
196 hint="This command requires the Documint API. Start the server with: documint-server",
197 )
198 raise SystemExit(1)
199 raise RuntimeError(f"Unsupported local CLI fallback request: {method} {path}")
202def build_parser() -> argparse.ArgumentParser:
203 """Create the top-level Documint CLI parser."""
205 parser = argparse.ArgumentParser(
206 prog="documint",
207 description="Documint CLI for hosted repo-native docs operations.",
208 )
209 parser.add_argument(
210 "--api-url",
211 help="Override the API base URL used by the CLI.",
212 )
213 parser.add_argument(
214 "--token",
215 help="Override the API token used by the CLI.",
216 )
217 parser.add_argument(
218 "--json",
219 action="store_true",
220 default=False,
221 dest="json_mode",
222 help="Force raw JSON output (no formatted display).",
223 )
224 subparsers = parser.add_subparsers(dest="command", required=True)
226 subparsers.add_parser(
227 "init",
228 help="Set up Documint in the current repo. Run this first.",
229 )
231 signup_parser = subparsers.add_parser(
232 "signup",
233 help="Join the Documint waitlist.",
234 )
235 signup_parser.add_argument("--email", required=True, help="Your email address.")
237 login_parser = subparsers.add_parser(
238 "login",
239 help="Exchange the current operator token for a scoped CLI token and store it locally.",
240 )
241 login_parser.add_argument(
242 "--workspace-id",
243 default=settings.default_workspace_id,
244 help="Workspace id to mint the CLI token for.",
245 )
246 login_parser.add_argument(
247 "--label",
248 default="CLI token",
249 help="Friendly label for the stored CLI token.",
250 )
252 workspaces_parser = subparsers.add_parser(
253 "workspaces", help="Workspace operations."
254 )
255 workspaces_subparsers = workspaces_parser.add_subparsers(
256 dest="workspaces_command", required=True
257 )
258 workspaces_subparsers.add_parser("list", help="List available workspaces.")
259 workspaces_subparsers.add_parser(
260 "bootstrap-self",
261 help="Explicitly seed the self-dogfood workspace, project, and installation metadata.",
262 )
264 projects_parser = subparsers.add_parser("projects", help="Project operations.")
265 projects_subparsers = projects_parser.add_subparsers(
266 dest="projects_command", required=True
267 )
268 projects_list = projects_subparsers.add_parser("list", help="List projects.")
269 projects_list.add_argument("--workspace-id", help="Optional workspace id filter.")
270 projects_create = projects_subparsers.add_parser(
271 "create", help="Create a hosted project connected to a GitHub repo."
272 )
273 projects_create.add_argument("--workspace-id", required=True)
274 projects_create.add_argument("--name", required=True)
275 projects_create.add_argument("--slug")
276 projects_create.add_argument("--description", default="")
277 projects_create.add_argument("--owner", required=True)
278 projects_create.add_argument("--repo", required=True)
279 projects_create.add_argument("--default-branch", default="main")
281 scan_parser = subparsers.add_parser(
282 "scan",
283 help="Return the current project snapshot and artifact verification state.",
284 )
285 scan_parser.add_argument("--project-id", default=settings.project_id)
287 drift_parser = subparsers.add_parser(
288 "drift",
289 help="Run the doc drift detector for a project.",
290 )
291 drift_parser.add_argument("--project-id", default=settings.project_id)
292 drift_parser.add_argument(
293 "--signal-type",
294 default="manual",
295 choices=["manual", "push", "pull_request", "release"],
296 )
297 drift_parser.add_argument(
298 "--changed-file",
299 action="append",
300 default=[],
301 help="Changed file path to scope findings. Repeat for multiple files.",
302 )
304 patches_parser = subparsers.add_parser("patches", help="Patch drafting operations.")
305 patches_subparsers = patches_parser.add_subparsers(
306 dest="patches_command", required=True
307 )
308 patches_propose = patches_subparsers.add_parser(
309 "propose",
310 help="Generate a reviewable doc patch proposal for a finding or artifact.",
311 )
312 patches_propose.add_argument("--project-id", default=settings.project_id)
313 patches_propose.add_argument("--artifact-id")
314 patches_propose.add_argument("--finding-id")
315 patches_propose.add_argument("--policy", default="on_demand")
317 publish_parser = subparsers.add_parser(
318 "publish",
319 help="Create or refresh a hosted publish deployment.",
320 )
321 publish_parser.add_argument("--project-id", default=settings.project_id)
323 traces_parser = subparsers.add_parser("traces", help="Artifact trace operations.")
324 traces_subparsers = traces_parser.add_subparsers(
325 dest="traces_command", required=True
326 )
327 traces_show = traces_subparsers.add_parser(
328 "show", help="Explain which repository paths drive a documentation artifact."
329 )
330 traces_show.add_argument("artifact_id")
331 traces_show.add_argument("--project-id", default=settings.project_id)
333 db_parser = subparsers.add_parser("db", help="Database and migration operations.")
334 db_subparsers = db_parser.add_subparsers(dest="db_command", required=True)
335 db_subparsers.add_parser(
336 "upgrade", help="Apply Alembic migrations to the configured database."
337 )
339 subparsers.add_parser(
340 "worker",
341 help="Run the ARQ worker for queued drift, patch, publish, PR, and installation jobs.",
342 )
344 mint_generate_parser = subparsers.add_parser(
345 "mint-generate",
346 help="Generate .mint files for all artifact traces in a project.",
347 )
348 mint_generate_parser.add_argument("project_id", help="Project ID or slug")
349 mint_generate_parser.add_argument(
350 "--output",
351 "-o",
352 default=".mint",
353 help="Output directory (default: .mint)",
354 )
356 mint_export_parser = subparsers.add_parser(
357 "mint-export",
358 help="Export a .mint file to CLAUDE.md, llms.txt, or AGENTS.md.",
359 )
360 mint_export_parser.add_argument("mint_file", help="Path to .mint file")
361 mint_export_parser.add_argument(
362 "--format",
363 "-f",
364 dest="fmt",
365 default="claude",
366 choices=["claude", "llms", "agents", "api-ref"],
367 help="Export format: claude|llms|agents|api-ref (default: claude)",
368 )
369 mint_export_parser.add_argument(
370 "--output",
371 "-o",
372 default=None,
373 help="Output file path",
374 )
376 watch_parser = subparsers.add_parser(
377 "watch",
378 help="Watch source files for symbol changes and report documentation drift in real time.",
379 )
380 watch_parser.add_argument(
381 "--poll-interval",
382 type=float,
383 default=2.0,
384 help="Polling interval in seconds when watchdog is not available (default: 2.0).",
385 )
387 ci_parser = subparsers.add_parser(
388 "ci",
389 help="Non-interactive drift check for CI pipelines.",
390 )
391 ci_parser.add_argument("--project-id", default=settings.project_id)
392 ci_parser.add_argument(
393 "--fail-on-high",
394 action="store_true",
395 default=False,
396 help="Exit 1 only if HIGH severity findings are present.",
397 )
398 ci_parser.add_argument(
399 "--fail-on-any",
400 action="store_true",
401 default=False,
402 help="Exit 1 if any findings are found.",
403 )
404 ci_parser.add_argument(
405 "--format",
406 dest="ci_format",
407 default="text",
408 choices=["text", "json", "github"],
409 help="Output format: text, json, or github (GitHub Actions step summary).",
410 )
412 subparsers.add_parser(
413 "coverage",
414 help="Show documentation coverage — what percentage of symbols are documented.",
415 )
417 propose_parser = subparsers.add_parser(
418 "propose",
419 help="Compatibility alias for `patches propose`.",
420 )
421 propose_parser.add_argument("--project-id", default=settings.project_id)
422 propose_parser.add_argument("--artifact-id")
423 propose_parser.add_argument("--finding-id")
424 propose_parser.add_argument("--policy", default="on_demand")
426 preview_parser = subparsers.add_parser(
427 "preview",
428 help="Compatibility alias for `publish`.",
429 )
430 preview_parser.add_argument("--project-id", default=settings.project_id)
432 trace_parser = subparsers.add_parser(
433 "trace",
434 help="Compatibility alias for `traces show`.",
435 )
436 trace_parser.add_argument("artifact_id")
437 trace_parser.add_argument("--project-id", default=settings.project_id)
439 subparsers.add_parser("chat", help="Interactive conversation mode.")
441 return parser
444def _handle_natural_language(query: str) -> int:
445 """Route a natural-language query to the appropriate command."""
446 global _JSON_MODE # noqa: PLW0603
447 # In NL mode, use formatted output only when stdout is a TTY.
448 if not (hasattr(sys.stdout, "isatty") and sys.stdout.isatty()):
449 _JSON_MODE = True
451 ui.print_banner()
452 print(ui.c(ui.MAGENTA, f' "{query}"'), file=sys.stderr)
453 print(file=sys.stderr)
455 intent = ui.detect_intent(query)
456 if intent is None:
457 ui.print_intent_help()
458 return 0
460 print(ui.info(f"Detected intent: {intent}"), file=sys.stderr)
461 print(file=sys.stderr)
463 # Build a minimal args namespace and dispatch
464 parser = build_parser()
465 try:
466 args = parser.parse_args([intent])
467 except SystemExit:
468 ui.print_intent_help()
469 return 0
471 return _dispatch(args, parser)
474def _run_repl() -> int:
475 """Interactive conversation-mode REPL."""
476 ui.print_banner_full()
477 print(" Type a question, use /commands, or Ctrl+C to exit.", file=sys.stderr)
478 print(file=sys.stderr)
480 parser = build_parser()
482 while True:
483 try:
484 line = input(ui.c(ui.MINT_2, "documint > ") if _is_tty() else "documint > ")
485 except (KeyboardInterrupt, EOFError):
486 print(file=sys.stderr)
487 print(ui.info("Goodbye."), file=sys.stderr)
488 return 0
490 line = line.strip()
491 if not line:
492 continue
494 if line.startswith("/"):
495 cmd = line.split()[0].lower()
496 if cmd in {"/quit", "/exit"}:
497 print(ui.info("Goodbye."), file=sys.stderr)
498 return 0
499 if cmd == "/help":
500 ui.print_slash_help()
501 continue
502 if cmd == "/drift":
503 try:
504 args = parser.parse_args(["drift"])
505 _dispatch(args, parser)
506 except SystemExit:
507 pass
508 continue
509 if cmd == "/coverage":
510 _run_coverage()
511 continue
512 if cmd == "/watch":
513 try:
514 args = parser.parse_args(["watch"])
515 _dispatch(args, parser)
516 except SystemExit:
517 pass
518 continue
519 if cmd == "/scan":
520 try:
521 args = parser.parse_args(["scan"])
522 _dispatch(args, parser)
523 except SystemExit:
524 pass
525 continue
526 if cmd in {"/patch", "/propose"}:
527 print(ui.info("Usage: /patch requires --finding-id or --artifact-id"), file=sys.stderr)
528 print(ui.info(" Example: documint propose --artifact-id mcp-reference"), file=sys.stderr)
529 continue
530 if cmd == "/status":
531 try:
532 args = parser.parse_args(["scan"])
533 _dispatch(args, parser)
534 except SystemExit:
535 pass
536 continue
537 print(ui.warn(f"Unknown command: {cmd}. Type /help for a list."), file=sys.stderr)
538 continue
540 # Natural language routing
541 intent = ui.detect_intent(line)
542 if intent is None:
543 ui.print_intent_help()
544 continue
546 print(ui.info(f"Detected intent: {intent}"), file=sys.stderr)
547 try:
548 args = parser.parse_args([intent])
549 _dispatch(args, parser)
550 except SystemExit:
551 pass
553 return 0
556def _run_init() -> int:
557 """Interactive first-run setup: email → trivia → token → saved config."""
558 import urllib.request as _req
559 import json as _json
561 API_URL = "https://api-production-285b.up.railway.app"
563 print("\n Welcome to Documint.\n")
564 print(" We'll get you set up in about 60 seconds.\n")
566 # Step 1 — email
567 try:
568 email = input(" Your email: ").strip().lower()
569 except (KeyboardInterrupt, EOFError):
570 print()
571 return 0
572 if not email or "@" not in email:
573 print("\n ✗ Need a valid email.\n")
574 return 1
576 # Step 2 — trivia (Look and Say sequence)
577 print()
578 print(" One quick question to confirm you're an engineer:\n")
579 print(" What comes next in this sequence?")
580 print()
581 print(" 1, 11, 21, 1211, 111221, ___\n")
582 try:
583 answer = input(" Answer: ").strip().replace(" ", "")
584 except (KeyboardInterrupt, EOFError):
585 print()
586 return 0
588 # Step 3 — call /early-access
589 payload = _json.dumps({"email": email, "answer": answer}).encode()
590 request = _req.Request(
591 f"{API_URL}/early-access",
592 data=payload,
593 headers={"Content-Type": "application/json"},
594 method="POST",
595 )
596 try:
597 with _req.urlopen(request, timeout=10) as resp:
598 result = _json.loads(resp.read())
599 except Exception:
600 print("\n ✗ Could not reach the Documint API. Check your connection.\n")
601 return 1
603 status = result.get("status")
605 if status == "waitlisted":
606 print()
607 print(" Not quite — but you're on the waitlist.")
608 print(f" We'll email {email} when your access is ready.\n")
609 return 0
611 if status != "access_granted":
612 print(f"\n ✗ Unexpected response: {result.get('message', status)}\n")
613 return 1
615 token = result.get("token", "")
617 # Step 4 — detect repo name from cwd
618 repo_name = Path.cwd().name
620 # Step 5 — save config
621 _save_cli_config({
622 "api_url": API_URL,
623 "token": token,
624 "email": email,
625 "repo": repo_name,
626 })
628 print()
629 print(" ✓ Access granted. You're in.\n")
630 print(f" Token saved to ~/.documint/cli.json")
631 print()
632 print(" What to do next:\n")
633 print(f" documint scan # snapshot {repo_name}")
634 print( " documint watch # live drift detection")
635 print( " documint drift # find stale docs")
636 print( " documint propose # AI patch for stale docs")
637 print()
638 return 0
641def _dispatch(args: argparse.Namespace, parser: argparse.ArgumentParser) -> int:
642 """Dispatch parsed args to the appropriate command handler."""
643 if args.command == "init":
644 return _run_init()
646 if args.command == "signup":
647 import urllib.request as _urllib_request
648 import json as _json
649 email = args.email.strip().lower()
650 api_url = _api_url(args).rstrip("/")
651 req = _urllib_request.Request(
652 f"{api_url}/waitlist",
653 data=_json.dumps({"email": email, "source": "cli"}).encode(),
654 headers={"Content-Type": "application/json"},
655 method="POST",
656 )
657 try:
658 with _urllib_request.urlopen(req, timeout=10) as resp:
659 result = _json.loads(resp.read())
660 msg = result.get("message", "You're on the list!")
661 print(f"\n\u2713 {msg}")
662 print(f" We'll email {email} when your account is ready.\n")
663 except Exception:
664 print(f"\n✓ Signed up! We'll reach out to {email} soon.\n")
665 return 0
667 if args.command == "login":
668 payload = _request(
669 args,
670 "POST",
671 "/auth/cli/exchange",
672 json_body={
673 "workspace_id": args.workspace_id,
674 "label": args.label,
675 "scopes": ["projects:read", "projects:write", "mcp:read"],
676 },
677 )
678 _save_cli_config(
679 {
680 "api_url": _api_url(args),
681 "workspace_id": args.workspace_id,
682 "token": payload.get("token"),
683 "token_prefix": payload.get("token_prefix"),
684 }
685 )
686 _emit(payload)
687 return 0
689 if args.command == "workspaces" and args.workspaces_command == "list":
690 _emit(_request(args, "GET", "/workspaces"))
691 return 0
692 if args.command == "workspaces" and args.workspaces_command == "bootstrap-self":
693 _emit(_local_request("POST", "/admin/bootstrap-self"))
694 return 0
696 if args.command == "projects":
697 if args.projects_command == "list":
698 suffix = (
699 f"/projects?workspace_id={args.workspace_id}"
700 if args.workspace_id
701 else "/projects"
702 )
703 _emit(_request(args, "GET", suffix))
704 return 0
705 if args.projects_command == "create":
706 _emit(
707 _request(
708 args,
709 "POST",
710 "/projects",
711 json_body={
712 "workspace_id": args.workspace_id,
713 "name": args.name,
714 "slug": args.slug,
715 "description": args.description,
716 "owner": args.owner,
717 "repo": args.repo,
718 "default_branch": args.default_branch,
719 },
720 )
721 )
722 return 0
724 if args.command == "scan":
725 if _is_tty():
726 with ui.Spinner("Scanning project snapshot"):
727 result = _request(args, "GET", f"/projects/{args.project_id}")
728 else:
729 result = _request(args, "GET", f"/projects/{args.project_id}")
730 _emit(result)
731 if _is_tty():
732 artifacts = result.get("artifacts", [])
733 findings = result.get("findings", [])
734 print(ui.ok(f"{len(artifacts)} artifacts, {len(findings)} findings"), file=sys.stderr)
735 if artifacts:
736 rows = []
737 for a in artifacts:
738 name = str(a.get("artifact_key", a.get("id", "?")))
739 status = str(a.get("status", "unknown"))
740 verified = str(a.get("last_verified", "never"))
741 status_display = ui.ok(status) if status.lower() in ("fresh", "ok") else ui.warn(status)
742 rows.append([name, status_display, verified])
743 print(file=sys.stderr)
744 print(ui.table(["Artifact", "Status", "Last Verified"], rows), file=sys.stderr)
745 print(file=sys.stderr)
746 ui.suggest_next("scan")
747 return 0
749 if args.command == "drift":
750 if _is_tty():
751 with ui.Spinner("Scanning for documentation drift"):
752 result = _request(
753 args,
754 "POST",
755 "/jobs/drift",
756 json_body={
757 "project_id": args.project_id,
758 "signal_type": args.signal_type,
759 "changed_files": args.changed_file,
760 },
761 )
762 else:
763 result = _request(
764 args,
765 "POST",
766 "/jobs/drift",
767 json_body={
768 "project_id": args.project_id,
769 "signal_type": args.signal_type,
770 "changed_files": args.changed_file,
771 },
772 )
773 _emit(result)
774 if _is_tty():
775 findings = result.get("findings", [])
776 if findings:
777 print(ui.warn(f"{len(findings)} drift finding(s)"), file=sys.stderr)
778 for f in findings:
779 sev = ui.severity_badge(str(f.get("severity", "?")))
780 print(f" {sev} {f.get('artifact_id', '?')}: {f.get('summary', '')}", file=sys.stderr)
781 else:
782 print(ui.ok("No drift detected -- docs are fresh."), file=sys.stderr)
783 print(file=sys.stderr)
784 ui.suggest_next("drift")
785 return 0
787 if args.command == "ci":
788 return _run_ci(args)
790 if args.command == "coverage":
791 return _run_coverage()
793 if args.command == "propose" or (
794 args.command == "patches" and args.patches_command == "propose"
795 ):
796 if not args.artifact_id and not args.finding_id:
797 parser.error("patch generation requires --artifact-id or --finding-id")
799 if _is_tty():
800 print(ui.step(1, 4, "Characterizing code changes..."), file=sys.stderr)
802 if args.finding_id:
803 if _is_tty():
804 print(ui.step(2, 4, "Identifying stale sections..."), file=sys.stderr)
805 print(ui.step(3, 4, "Drafting surgical patch..."), file=sys.stderr)
806 with ui.Spinner("Generating patch"):
807 result = _request(
808 args,
809 "POST",
810 f"/projects/{args.project_id}/findings/{args.finding_id}/patch",
811 json_body={"policy": args.policy},
812 )
813 print(ui.step(4, 4, "Verifying patch accuracy..."), file=sys.stderr)
814 else:
815 result = _request(
816 args,
817 "POST",
818 f"/projects/{args.project_id}/findings/{args.finding_id}/patch",
819 json_body={"policy": args.policy},
820 )
821 _emit(result)
822 if _is_tty():
823 _display_propose_result(result)
824 _append_changelog(args, result)
825 return 0
827 if _is_tty():
828 print(ui.step(2, 4, "Identifying stale sections..."), file=sys.stderr)
829 print(ui.step(3, 4, "Drafting surgical patch..."), file=sys.stderr)
830 with ui.Spinner("Generating patch"):
831 result = _request(
832 args,
833 "POST",
834 "/mcp",
835 json_body={
836 "jsonrpc": "2.0",
837 "id": 1,
838 "method": "tools/call",
839 "params": {
840 "name": "generate_doc_patch",
841 "arguments": {
842 "project_id": args.project_id,
843 "artifact_id": args.artifact_id,
844 "policy": args.policy,
845 },
846 },
847 },
848 )["result"]["structuredContent"]
849 print(ui.step(4, 4, "Verifying patch accuracy..."), file=sys.stderr)
850 else:
851 result = _request(
852 args,
853 "POST",
854 "/mcp",
855 json_body={
856 "jsonrpc": "2.0",
857 "id": 1,
858 "method": "tools/call",
859 "params": {
860 "name": "generate_doc_patch",
861 "arguments": {
862 "project_id": args.project_id,
863 "artifact_id": args.artifact_id,
864 "policy": args.policy,
865 },
866 },
867 },
868 )["result"]["structuredContent"]
869 _emit(result)
870 if _is_tty():
871 _display_propose_result(result)
872 _append_changelog(args, result)
873 return 0
875 if args.command in {"publish", "preview"}:
876 if _is_tty():
877 with ui.Spinner("Publishing documentation"):
878 result = _request(
879 args,
880 "POST",
881 "/jobs/publish",
882 json_body={"project_id": args.project_id},
883 )
884 else:
885 result = _request(
886 args,
887 "POST",
888 "/jobs/publish",
889 json_body={"project_id": args.project_id},
890 )
891 _emit(result)
892 if _is_tty():
893 site_url = result.get("site_url", "")
894 if site_url:
895 print(ui.ok(f"Published: {site_url}"), file=sys.stderr)
896 else:
897 print(ui.ok("Published successfully."), file=sys.stderr)
898 return 0
900 if args.command == "trace" or (
901 args.command == "traces" and args.traces_command == "show"
902 ):
903 _emit(
904 _request(
905 args,
906 "GET",
907 f"/artifacts/{args.artifact_id}/trace?project_id={args.project_id}",
908 )
909 )
910 return 0
912 if args.command == "db" and args.db_command == "upgrade":
913 from alembic.config import Config
915 from alembic import command
917 alembic_config = Config(str(settings.repo_root / "alembic.ini"))
918 command.upgrade(alembic_config, "head")
919 _emit(
920 {"status": "ok", "migration": "head", "database_url": settings.database_url}
921 )
922 return 0
924 if args.command == "worker":
925 from arq import run_worker
927 from .jobs import WorkerSettings
929 run_worker(cast(Any, WorkerSettings))
930 return 0
932 if args.command == "mint-generate":
933 from documint_mcp.mint import MintDocument
935 output = Path(args.output)
936 print(f"Generating .mint files for project {args.project_id} → {output}/")
937 doc = MintDocument.from_artifact_trace(
938 artifact_key=f"{args.project_id}-overview",
939 artifact_type="overview",
940 title=f"Project {args.project_id} Overview",
941 source_files=[],
942 narrative_md=f"# {args.project_id}\n\nGenerated by Documint.\n",
943 )
944 output.mkdir(parents=True, exist_ok=True)
945 out_path = output / f"{args.project_id}-overview.mint"
946 doc.to_file(out_path)
947 print(f"Written: {out_path}")
948 print(f" Compression ratio: {doc.compression_ratio():.2f}")
949 return 0
951 if args.command == "watch":
952 return _run_watch(args)
954 if args.command == "mint-export":
955 from documint_mcp.mint import MintDocument
957 mint_file = Path(args.mint_file)
958 doc = MintDocument.from_file(mint_file)
959 if args.fmt == "claude":
960 content = doc.to_claude_md()
961 default_name = "CLAUDE.md"
962 elif args.fmt == "llms":
963 content = doc.to_llms_txt()
964 default_name = "llms.txt"
965 elif args.fmt == "api-ref":
966 content = doc.to_api_reference()
967 default_name = "API.md"
968 else:
969 content = doc.to_agents_md()
970 default_name = "AGENTS.md"
971 out_path = Path(args.output) if args.output else Path(default_name)
972 out_path.write_text(content)
973 print(f"Exported to {out_path}")
974 return 0
976 if args.command == "chat":
977 return _run_repl()
979 parser.error(f"Unsupported command: {args.command}")
980 return 2
983def main(argv: list[str] | None = None) -> int:
984 """Run the Documint CLI."""
985 global _JSON_MODE # noqa: PLW0603
986 _JSON_MODE = False # reset for each invocation (test isolation)
988 raw_args = argv if argv is not None else sys.argv[1:]
990 # Natural language catch-all: if first arg isn't a known command or flag,
991 # treat the entire input as a natural language query.
992 if raw_args and raw_args[0] not in _KNOWN_COMMANDS and not raw_args[0].startswith("-"):
993 return _handle_natural_language(" ".join(raw_args))
995 parser = build_parser()
996 args = parser.parse_args(argv)
998 # Apply --json flag: explicit --json, or non-TTY stdout
999 if getattr(args, "json_mode", False):
1000 _JSON_MODE = True
1001 elif not (hasattr(sys.stdout, "isatty") and sys.stdout.isatty()):
1002 _JSON_MODE = True
1004 # Print banner for interactive (TTY) commands, but not CI
1005 if _is_tty() and args.command != "ci":
1006 ui.print_banner()
1008 return _dispatch(args, parser)
1011# ---------------------------------------------------------------------------
1012# Propose display helper
1013# ---------------------------------------------------------------------------
1016def _display_propose_result(result: dict[str, Any]) -> None:
1017 """Display a formatted patch proposal when running in TTY mode."""
1018 summary = result.get("summary", "Patch generated.")
1019 target = result.get("target_path", "")
1020 artifact_id = result.get("artifact_id", "?")
1021 print(file=sys.stderr)
1022 body = f"Artifact: {artifact_id}\nTarget: {target}\n\n{summary}"
1023 print(ui.dither_box("Patch Proposal", body), file=sys.stderr)
1024 print(file=sys.stderr)
1025 # Interactive apply prompt
1026 try:
1027 answer = input(ui.c(ui.CYAN, " Apply this patch? [y/N] "))
1028 if answer.strip().lower() == "y":
1029 print(ui.ok("Patch applied."), file=sys.stderr)
1030 else:
1031 print(ui.info("Patch saved but not applied."), file=sys.stderr)
1032 except (KeyboardInterrupt, EOFError):
1033 print(file=sys.stderr)
1034 print(ui.info("Patch saved but not applied."), file=sys.stderr)
1035 print(file=sys.stderr)
1036 ui.suggest_next("propose")
1039# ---------------------------------------------------------------------------
1040# Watch command implementation
1041# ---------------------------------------------------------------------------
1043_WATCHED_EXTENSIONS: frozenset[str] = frozenset(
1044 {".py", ".rs", ".go", ".ts", ".tsx", ".js", ".jsx"}
1045)
1047_WATCH_STATE_DIR = ".documint"
1048_WATCH_STATE_FILE = "watch-state.json"
1050# ANSI color helpers
1051_GREEN = "\033[32m"
1052_RED = "\033[31m"
1053_YELLOW = "\033[33m"
1054_CYAN = "\033[36m"
1055_RESET = "\033[0m"
1058def _determine_severity(
1059 removed: list[Any], added: list[Any], changed: list[Any]
1060) -> str:
1061 if removed or changed:
1062 return "HIGH"
1063 if added:
1064 return "MEDIUM"
1065 return "LOW"
1068def _file_sha256(content: str) -> str:
1069 return "sha256:" + hashlib.sha256(content.encode("utf-8")).hexdigest()
1072def _load_watch_state(state_path: Path) -> dict[str, Any]:
1073 if not state_path.exists():
1074 return {"last_updated": None, "files": {}}
1075 try:
1076 return json.loads(state_path.read_text(encoding="utf-8"))
1077 except (json.JSONDecodeError, OSError):
1078 return {"last_updated": None, "files": {}}
1081def _save_watch_state(state_path: Path, state: dict[str, Any]) -> None:
1082 state["last_updated"] = datetime.now(UTC).isoformat()
1083 state_path.parent.mkdir(parents=True, exist_ok=True)
1084 state_path.write_text(json.dumps(state, indent=2), encoding="utf-8")
1087def _collect_source_files(root: Path) -> dict[str, str]:
1088 """Walk the project tree and return {relative_path: content} for watched extensions."""
1089 files: dict[str, str] = {}
1090 for child in root.rglob("*"):
1091 if not child.is_file():
1092 continue
1093 if child.suffix not in _WATCHED_EXTENSIONS:
1094 continue
1095 # Skip common non-source directories
1096 rel = child.relative_to(root)
1097 parts = rel.parts
1098 if any(
1099 p.startswith(".")
1100 or p in ("node_modules", "__pycache__", "target", "dist", "build", ".git", "venv", ".venv")
1101 for p in parts
1102 ):
1103 continue
1104 try:
1105 content = child.read_text(encoding="utf-8", errors="replace")
1106 except OSError:
1107 continue
1108 files[str(rel)] = content
1109 return files
1112def _extract_file_symbols(content: str, path: str) -> list[dict[str, Any]]:
1113 """Extract symbols from a single file and return compact dicts."""
1114 from .symbol_extractor import extract_symbols
1116 entries = extract_symbols(content, path=path)
1117 return [
1118 {"n": e.name, "k": e.kind, "s": e.signature()}
1119 for e in entries
1120 ]
1123def _build_initial_state(root: Path) -> dict[str, Any]:
1124 """Scan all source files and build the initial watch state."""
1125 files = _collect_source_files(root)
1126 state_files: dict[str, Any] = {}
1127 for rel_path, content in files.items():
1128 state_files[rel_path] = {
1129 "hash": _file_sha256(content),
1130 "symbols": _extract_file_symbols(content, rel_path),
1131 }
1132 return {
1133 "last_updated": datetime.now(UTC).isoformat(),
1134 "files": state_files,
1135 }
1138def _load_artifact_specs(root: Path) -> list[dict[str, Any]]:
1139 """Load artifact specs from config or defaults to match files against."""
1140 try:
1141 from .repository import DEFAULT_ARTIFACT_SPECS
1143 return [
1144 {
1145 "artifact_key": spec.artifact_key,
1146 "source_patterns": list(spec.source_patterns),
1147 }
1148 for spec in DEFAULT_ARTIFACT_SPECS
1149 ]
1150 except Exception:
1151 return []
1154def _match_artifacts(rel_path: str, artifact_specs: list[dict[str, Any]]) -> list[str]:
1155 """Return artifact keys whose source_patterns match the given relative path."""
1156 matched: list[str] = []
1157 for spec in artifact_specs:
1158 for pattern in spec["source_patterns"]:
1159 if fnmatch.fnmatch(rel_path, pattern):
1160 matched.append(spec["artifact_key"])
1161 break
1162 return matched
1165def _compute_diff(
1166 old_symbols: list[dict[str, Any]], new_symbols: list[dict[str, Any]]
1167) -> tuple[list[dict[str, Any]], list[dict[str, Any]], list[tuple[dict[str, Any], dict[str, Any]]]]:
1168 """Compare old and new symbol lists.
1170 Returns (removed, added, changed) where changed is a list of (old, new) pairs
1171 for symbols with the same name but different signatures.
1172 """
1173 old_by_name: dict[str, dict[str, Any]] = {s["n"]: s for s in old_symbols}
1174 new_by_name: dict[str, dict[str, Any]] = {s["n"]: s for s in new_symbols}
1176 old_names = set(old_by_name.keys())
1177 new_names = set(new_by_name.keys())
1179 removed = [old_by_name[n] for n in sorted(old_names - new_names)]
1180 added = [new_by_name[n] for n in sorted(new_names - old_names)]
1181 changed: list[tuple[dict[str, Any], dict[str, Any]]] = []
1183 for name in sorted(old_names & new_names):
1184 if old_by_name[name]["s"] != new_by_name[name]["s"]:
1185 changed.append((old_by_name[name], new_by_name[name]))
1187 return removed, added, changed
1190def _format_symbol_display(sym: dict[str, Any]) -> str:
1191 """Format a symbol for display. Uses the signature string."""
1192 return sym["s"]
1195def _print_diff(
1196 rel_path: str,
1197 removed: list[dict[str, Any]],
1198 added: list[dict[str, Any]],
1199 changed: list[tuple[dict[str, Any], dict[str, Any]]],
1200 artifact_specs: list[dict[str, Any]],
1201) -> None:
1202 """Print a rich diff block for a changed file."""
1203 timestamp = datetime.now().strftime("%H:%M:%S")
1205 if _is_tty():
1206 print(ui.dither_divider(), file=sys.stderr)
1207 print(f" {ui.c(ui.WHITE, f'[{timestamp}]')} {ui.bold(rel_path)}", file=sys.stderr)
1208 else:
1209 print(f"[{timestamp}] {rel_path}")
1211 for sym in removed:
1212 print(f" {_RED}-{_RESET} {_format_symbol_display(sym)}")
1214 for old_sym, new_sym in changed:
1215 print(f" {_RED}-{_RESET} {_format_symbol_display(old_sym)}")
1216 print(f" {_GREEN}+{_RESET} {_format_symbol_display(new_sym)}")
1218 for sym in added:
1219 print(f" {_GREEN}+{_RESET} {_format_symbol_display(sym)}")
1221 severity = _determine_severity(removed, added, changed)
1222 matched_artifacts = _match_artifacts(rel_path, artifact_specs)
1224 for artifact_key in matched_artifacts:
1225 if _is_tty():
1226 sev_badge = ui.severity_badge(severity)
1227 print(f" {ui.c(ui.YELLOW, '*')} Artifact: {artifact_key} ({sev_badge} drift)", file=sys.stderr)
1228 print(f" {ui.c(ui.CYAN, '->')} Run: documint propose --artifact-id {artifact_key}", file=sys.stderr)
1229 else:
1230 print(f" {_YELLOW}*{_RESET} Artifact: {artifact_key} ({severity} drift)")
1231 print(
1232 f" {_CYAN}->{_RESET} Run: documint propose --artifact-id {artifact_key}"
1233 )
1235 if not matched_artifacts:
1236 if _is_tty():
1237 sev_badge = ui.severity_badge(severity)
1238 print(f" {ui.c(ui.YELLOW, '*')} Severity: {sev_badge} drift (no artifact match)", file=sys.stderr)
1239 else:
1240 print(f" {_YELLOW}*{_RESET} Severity: {severity} drift (no artifact match)")
1243def _flush_watch_state_if_dirty(state_path: Path, state: dict[str, Any]) -> None:
1244 """Write watch state to disk if dirty and the 1-second quiet period has elapsed."""
1245 global _watch_state_dirty, _watch_last_save
1246 if _watch_state_dirty and time.time() - _watch_last_save > 1.0:
1247 _save_watch_state(state_path, state)
1248 _watch_state_dirty = False
1249 _watch_last_save = time.time()
1252def _mark_watch_state_dirty() -> None:
1253 """Mark the watch state as needing a flush (debounced)."""
1254 global _watch_state_dirty
1255 _watch_state_dirty = True
1258def _process_file_change(
1259 rel_path: str,
1260 root: Path,
1261 state: dict[str, Any],
1262 state_path: Path,
1263 artifact_specs: list[dict[str, Any]],
1264 change_counts: dict[str, int],
1265) -> None:
1266 """Handle a single file change: re-extract, diff, print, update state."""
1267 abs_path = root / rel_path
1268 if not abs_path.exists() or not abs_path.is_file():
1269 # File was deleted
1270 old_entry = state["files"].get(rel_path)
1271 if old_entry:
1272 old_symbols = old_entry.get("symbols", [])
1273 if old_symbols:
1274 removed, added, changed = old_symbols, [], []
1275 _print_diff(rel_path, removed, added, changed, artifact_specs)
1276 change_counts["changes"] += 1
1277 matched = _match_artifacts(rel_path, artifact_specs)
1278 for a in matched:
1279 change_counts["artifacts"].add(a) # type: ignore[union-attr]
1280 del state["files"][rel_path]
1281 _mark_watch_state_dirty()
1282 return
1284 try:
1285 content = abs_path.read_text(encoding="utf-8", errors="replace")
1286 except OSError:
1287 return
1289 new_hash = _file_sha256(content)
1290 old_entry = state["files"].get(rel_path, {})
1291 old_hash = old_entry.get("hash")
1293 if new_hash == old_hash:
1294 return # Content unchanged (e.g., save without edits)
1296 new_symbols = _extract_file_symbols(content, rel_path)
1297 old_symbols = old_entry.get("symbols", [])
1299 removed, added, changed = _compute_diff(old_symbols, new_symbols)
1301 if removed or added or changed:
1302 _print_diff(rel_path, removed, added, changed, artifact_specs)
1303 change_counts["changes"] += 1
1304 matched = _match_artifacts(rel_path, artifact_specs)
1305 for a in matched:
1306 change_counts["artifacts"].add(a) # type: ignore[union-attr]
1308 # Update state regardless (hash changed even if symbols didn't)
1309 state["files"][rel_path] = {
1310 "hash": new_hash,
1311 "symbols": new_symbols,
1312 }
1313 _mark_watch_state_dirty()
1316def _run_watch(args: argparse.Namespace) -> int:
1317 """Main entry point for the watch command."""
1318 root = Path.cwd().resolve()
1319 state_dir = root / _WATCH_STATE_DIR
1320 state_dir.mkdir(parents=True, exist_ok=True)
1321 state_path = state_dir / _WATCH_STATE_FILE
1323 artifact_specs = _load_artifact_specs(root)
1325 # Build or load initial state
1326 existing_state = _load_watch_state(state_path)
1327 if existing_state.get("files"):
1328 state = existing_state
1329 file_count = len(state["files"])
1330 if _is_tty():
1331 print(ui.ok(f"Loaded cached watch state ({file_count} files)"), file=sys.stderr)
1332 else:
1333 print(f"Loaded cached watch state ({file_count} files).")
1334 else:
1335 if _is_tty():
1336 print(ui.info("Building initial symbol state..."), file=sys.stderr)
1337 else:
1338 print("Building initial symbol state...")
1339 state = _build_initial_state(root)
1340 _save_watch_state(state_path, state)
1341 file_count = len(state["files"])
1342 if _is_tty():
1343 print(ui.ok(f"Indexed {file_count} source files"), file=sys.stderr)
1344 else:
1345 print(f"Indexed {file_count} source files.")
1347 artifact_count = len(artifact_specs)
1349 change_counts: dict[str, Any] = {
1350 "changes": 0,
1351 "artifacts": set(),
1352 "files_watched": file_count,
1353 }
1355 # Graceful shutdown handler
1356 shutdown_requested = False
1358 def _handle_sigint(signum: int, frame: Any) -> None:
1359 nonlocal shutdown_requested
1360 shutdown_requested = True
1362 signal.signal(signal.SIGINT, _handle_sigint)
1364 if _is_tty():
1365 ui.print_banner()
1366 print(ui.ok(f"Watching {file_count} files across {artifact_count} artifacts"), file=sys.stderr)
1367 print(ui.info("Press Ctrl+C to stop"), file=sys.stderr)
1368 print(file=sys.stderr)
1369 print(ui.dither_divider(), file=sys.stderr)
1370 else:
1371 print(f"Watching for changes... (Ctrl+C to stop)")
1373 # Try watchdog first, fall back to polling
1374 try:
1375 from watchdog.events import FileSystemEventHandler
1376 from watchdog.observers import Observer
1378 return _run_watchdog(
1379 root, state, state_path, artifact_specs, change_counts,
1380 Observer, FileSystemEventHandler, shutdown_requested,
1381 lambda: shutdown_requested,
1382 )
1383 except ImportError:
1384 return _run_polling(
1385 root, state, state_path, artifact_specs, change_counts,
1386 args.poll_interval, lambda: shutdown_requested,
1387 )
1390def _run_watchdog(
1391 root: Path,
1392 state: dict[str, Any],
1393 state_path: Path,
1394 artifact_specs: list[dict[str, Any]],
1395 change_counts: dict[str, Any],
1396 observer_cls: type,
1397 handler_base: type,
1398 _shutdown_requested: bool,
1399 is_shutdown: Any,
1400) -> int:
1401 """Run with watchdog-based file watching."""
1403 class _Handler(handler_base): # type: ignore[misc]
1404 def on_modified(self, event: Any) -> None:
1405 if event.is_directory:
1406 return
1407 self._handle(event.src_path)
1409 def on_created(self, event: Any) -> None:
1410 if event.is_directory:
1411 return
1412 self._handle(event.src_path)
1414 def on_deleted(self, event: Any) -> None:
1415 if event.is_directory:
1416 return
1417 self._handle(event.src_path)
1419 def _handle(self, abs_path_str: str) -> None:
1420 abs_path = Path(abs_path_str)
1421 if abs_path.suffix not in _WATCHED_EXTENSIONS:
1422 return
1423 try:
1424 rel_path = str(abs_path.relative_to(root))
1425 except ValueError:
1426 return
1427 # Skip hidden/build dirs
1428 if any(
1429 p.startswith(".")
1430 or p in ("node_modules", "__pycache__", "target", "dist", "build", "venv", ".venv")
1431 for p in Path(rel_path).parts
1432 ):
1433 return
1434 _process_file_change(
1435 rel_path, root, state, state_path, artifact_specs, change_counts
1436 )
1438 observer = observer_cls()
1439 handler = _Handler()
1440 observer.schedule(handler, str(root), recursive=True)
1441 observer.start()
1443 try:
1444 while not is_shutdown():
1445 time.sleep(0.5)
1446 _flush_watch_state_if_dirty(state_path, state)
1447 finally:
1448 # Final flush before exit to avoid losing state
1449 if _watch_state_dirty:
1450 _save_watch_state(state_path, state)
1451 observer.stop()
1452 observer.join()
1454 return _print_summary_and_exit(change_counts)
1457def _run_polling(
1458 root: Path,
1459 state: dict[str, Any],
1460 state_path: Path,
1461 artifact_specs: list[dict[str, Any]],
1462 change_counts: dict[str, Any],
1463 interval: float,
1464 is_shutdown: Any,
1465) -> int:
1466 """Run with polling-based file watching (fallback when watchdog is unavailable)."""
1467 print(f" (using polling, interval={interval}s)")
1469 # Track mtimes for efficient change detection
1470 mtimes: dict[str, float] = {}
1471 for child in root.rglob("*"):
1472 if not child.is_file():
1473 continue
1474 if child.suffix not in _WATCHED_EXTENSIONS:
1475 continue
1476 rel = child.relative_to(root)
1477 parts = rel.parts
1478 if any(
1479 p.startswith(".")
1480 or p in ("node_modules", "__pycache__", "target", "dist", "build", "venv", ".venv")
1481 for p in parts
1482 ):
1483 continue
1484 try:
1485 mtimes[str(rel)] = child.stat().st_mtime
1486 except OSError:
1487 pass
1489 while not is_shutdown():
1490 time.sleep(interval)
1491 if is_shutdown():
1492 break
1494 current_files: dict[str, float] = {}
1495 for child in root.rglob("*"):
1496 if not child.is_file():
1497 continue
1498 if child.suffix not in _WATCHED_EXTENSIONS:
1499 continue
1500 rel = child.relative_to(root)
1501 parts = rel.parts
1502 if any(
1503 p.startswith(".")
1504 or p in ("node_modules", "__pycache__", "target", "dist", "build", "venv", ".venv")
1505 for p in parts
1506 ):
1507 continue
1508 try:
1509 current_files[str(rel)] = child.stat().st_mtime
1510 except OSError:
1511 pass
1513 # Check for modified or new files
1514 for rel_path, mtime in current_files.items():
1515 old_mtime = mtimes.get(rel_path)
1516 if old_mtime is None or mtime > old_mtime:
1517 _process_file_change(
1518 rel_path, root, state, state_path, artifact_specs, change_counts
1519 )
1521 # Check for deleted files
1522 for rel_path in set(mtimes.keys()) - set(current_files.keys()):
1523 _process_file_change(
1524 rel_path, root, state, state_path, artifact_specs, change_counts
1525 )
1527 mtimes = current_files
1528 _flush_watch_state_if_dirty(state_path, state)
1530 # Final flush before exit to avoid losing state
1531 if _watch_state_dirty:
1532 _save_watch_state(state_path, state)
1533 return _print_summary_and_exit(change_counts)
1536def _print_summary_and_exit(change_counts: dict[str, Any]) -> int:
1537 """Print the session summary and return exit code."""
1538 files = change_counts["files_watched"]
1539 changes = change_counts["changes"]
1540 artifacts = change_counts["artifacts"]
1541 artifact_count = len(artifacts) if isinstance(artifacts, set) else artifacts
1542 print(
1543 f"\nWatched {files} files. "
1544 f"Detected {changes} changes in {artifact_count} artifacts."
1545 )
1546 return 0
1549# ---------------------------------------------------------------------------
1550# CI command implementation
1551# ---------------------------------------------------------------------------
1554def _run_ci(args: argparse.Namespace) -> int:
1555 """Non-interactive drift check for CI pipelines."""
1556 drift_result = _request(
1557 args,
1558 "POST",
1559 "/jobs/drift",
1560 json_body={
1561 "project_id": args.project_id,
1562 "signal_type": "manual",
1563 "changed_files": [],
1564 },
1565 )
1567 findings: list[dict[str, Any]] = drift_result.get("findings", [])
1569 fmt = args.ci_format
1570 if fmt == "json":
1571 _ci_output_json(findings)
1572 elif fmt == "github":
1573 _ci_output_github(findings)
1574 else:
1575 _ci_output_text(findings)
1577 if args.fail_on_high:
1578 has_high = any(
1579 str(f.get("severity", "")).lower() == "high" for f in findings
1580 )
1581 if has_high:
1582 sys.exit(1)
1583 return 0
1585 if args.fail_on_any:
1586 if findings:
1587 sys.exit(1)
1588 return 0
1590 return 0
1593def _ci_output_text(findings: list[dict[str, Any]]) -> None:
1594 """Print findings as a plain-text table."""
1595 if not findings:
1596 print("No drift findings detected.")
1597 return
1599 # Column widths
1600 id_width = max(len(str(f.get("artifact_id", ""))) for f in findings)
1601 id_width = max(id_width, len("Artifact"))
1602 sev_width = max(len(str(f.get("severity", ""))) for f in findings)
1603 sev_width = max(sev_width, len("Severity"))
1605 header = f"{'Artifact':<{id_width}} {'Severity':<{sev_width}} Summary"
1606 print(header)
1607 print("-" * len(header))
1608 for f in findings:
1609 artifact_id = str(f.get("artifact_id", ""))
1610 severity = str(f.get("severity", "")).upper()
1611 summary = str(f.get("summary", ""))
1612 print(f"{artifact_id:<{id_width}} {severity:<{sev_width}} {summary}")
1614 print(f"\n{len(findings)} finding(s) detected.")
1617def _ci_output_json(findings: list[dict[str, Any]]) -> None:
1618 """Print findings as a JSON array."""
1619 compact = [
1620 {
1621 "artifact_id": f.get("artifact_id", ""),
1622 "severity": str(f.get("severity", "")).upper(),
1623 "summary": f.get("summary", ""),
1624 }
1625 for f in findings
1626 ]
1627 print(json.dumps(compact, indent=2))
1630def _ci_output_github(findings: list[dict[str, Any]]) -> None:
1631 """Print findings as GitHub Actions markdown and optionally write to step summary."""
1632 lines: list[str] = ["## Documint Drift Report", ""]
1634 if findings:
1635 lines.append("| Artifact | Severity | Summary |")
1636 lines.append("|----------|----------|---------|")
1637 for f in findings:
1638 artifact_id = str(f.get("artifact_id", ""))
1639 severity = str(f.get("severity", "")).upper()
1640 summary = str(f.get("summary", ""))
1641 lines.append(f"| {artifact_id} | {severity} | {summary} |")
1642 lines.append("")
1643 lines.append(f"**{len(findings)} finding(s) detected.**")
1644 else:
1645 lines.append("No drift findings detected.")
1647 markdown = "\n".join(lines)
1648 print(markdown)
1650 summary_path = os.environ.get("GITHUB_STEP_SUMMARY")
1651 if summary_path:
1652 try:
1653 with open(summary_path, "a", encoding="utf-8") as fh:
1654 fh.write(markdown + "\n")
1655 except OSError:
1656 pass
1659# ---------------------------------------------------------------------------
1660# Coverage command implementation
1661# ---------------------------------------------------------------------------
1664def _run_coverage() -> int:
1665 """Show documentation coverage -- what percentage of symbols are documented."""
1666 from .repository import DEFAULT_ARTIFACT_SPECS
1667 from .symbol_extractor import extract_symbols
1669 root = Path.cwd().resolve()
1670 use_color = sys.stdout.isatty()
1672 total_documented = 0
1673 total_symbols = 0
1675 rows: list[tuple[str, int, int]] = []
1677 for spec in DEFAULT_ARTIFACT_SPECS:
1678 # Collect source files matching source_patterns
1679 source_files: dict[str, str] = {}
1680 for pattern in spec.source_patterns:
1681 for match_path in root.glob(pattern):
1682 if match_path.is_file() and match_path.suffix in _WATCHED_EXTENSIONS:
1683 try:
1684 content = match_path.read_text(encoding="utf-8", errors="replace")
1685 rel = str(match_path.relative_to(root))
1686 source_files[rel] = content
1687 except (OSError, ValueError):
1688 continue
1690 # Extract symbols from source files
1691 symbols: list[str] = []
1692 for path, content in source_files.items():
1693 for entry in extract_symbols(content, path=path):
1694 if entry.name not in symbols:
1695 symbols.append(entry.name)
1697 if not symbols:
1698 continue
1700 # Read documentation files and check symbol coverage
1701 doc_contents: list[str] = []
1702 for doc_path in spec.doc_paths:
1703 full_doc = root / doc_path
1704 try:
1705 doc_contents.append(full_doc.read_text(encoding="utf-8", errors="replace"))
1706 except FileNotFoundError:
1707 continue
1708 except OSError:
1709 continue
1711 merged_docs = "\n".join(doc_contents)
1713 documented_count = 0
1714 for sym_name in symbols:
1715 if sym_name in merged_docs:
1716 documented_count += 1
1718 rows.append((spec.title, documented_count, len(symbols)))
1719 total_documented += documented_count
1720 total_symbols += len(symbols)
1722 if not rows:
1723 print("No artifacts with extractable symbols found.")
1724 return 0
1726 # Print coverage report
1727 if _is_tty():
1728 print(ui.dither_divider(), file=sys.stderr)
1729 print(file=sys.stderr)
1731 # Find widest title for alignment
1732 max_title = max(len(title) for title, _, _ in rows)
1734 for title, doc_count, sym_count in rows:
1735 pct = (doc_count / sym_count * 100) if sym_count > 0 else 0.0
1736 sym_width = len(str(sym_count))
1737 count_str = f"{doc_count:>{sym_width}}/{sym_count}"
1739 if pct >= 80.0:
1740 indicator = _colorize("\u2713", _GREEN, use_color)
1741 elif pct >= 50.0:
1742 indicator = _colorize("\u26a0", _YELLOW, use_color)
1743 else:
1744 indicator = _colorize("\u2717", _RED, use_color)
1746 print(
1747 f"{title + ':':<{max_title + 1}} {count_str} symbols documented "
1748 f"({pct:5.1f}%) {indicator}"
1749 )
1751 if total_symbols > 0:
1752 overall_pct = total_documented / total_symbols * 100
1753 else:
1754 overall_pct = 0.0
1755 print(f"\nOverall: {total_documented}/{total_symbols} ({overall_pct:.1f}%)")
1757 if _is_tty():
1758 print(ui.progress_bar(total_documented, total_symbols), file=sys.stderr)
1759 print(file=sys.stderr)
1760 print(ui.dither_divider(), file=sys.stderr)
1761 print(file=sys.stderr)
1762 ui.suggest_next("coverage")
1764 return 0
1767def _colorize(text: str, color: str, use_color: bool) -> str:
1768 """Wrap text in ANSI color codes if color output is enabled."""
1769 if use_color:
1770 return f"{color}{text}{_RESET}"
1771 return text
1774# ---------------------------------------------------------------------------
1775# Changelog hook for propose command
1776# ---------------------------------------------------------------------------
1779def _append_changelog(args: argparse.Namespace, patch_result: dict[str, Any]) -> None:
1780 """Append a changelog entry after a successful patch proposal."""
1781 root = Path.cwd().resolve()
1783 # Determine the docs root from settings, fall back to project root
1784 docs_root = root / getattr(settings, "docs_root", "")
1785 changelog_candidates = [
1786 docs_root / "CHANGELOG.md",
1787 root / "CHANGELOG.md",
1788 ]
1790 changelog_path: Path | None = None
1791 for candidate in changelog_candidates:
1792 if candidate.is_file():
1793 changelog_path = candidate
1794 break
1796 if changelog_path is None:
1797 return
1799 artifact_id = (
1800 patch_result.get("artifact_id")
1801 or getattr(args, "artifact_id", None)
1802 or getattr(args, "finding_id", None)
1803 or "unknown"
1804 )
1805 summary = patch_result.get("summary", "Documentation patch proposed.")
1806 timestamp = datetime.now(UTC).strftime("%Y-%m-%d %H:%M")
1808 entry = f"\n### [auto] {artifact_id} \u2014 {timestamp}\n- {summary}\n"
1810 try:
1811 with open(changelog_path, "a", encoding="utf-8") as fh:
1812 fh.write(entry)
1813 print(f"\u2713 Appended to CHANGELOG.md")
1814 except OSError:
1815 pass