Coverage for src / documint_mcp / cli.py: 0%

973 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-29 10:41 -0400

1"""CLI entrypoint for hosted Documint workflows.""" 

2 

3from __future__ import annotations 

4 

5import argparse 

6import fnmatch 

7import hashlib 

8import json 

9import os 

10import signal 

11import sys 

12import time 

13from datetime import UTC, datetime 

14from pathlib import Path 

15from typing import Any, cast 

16from urllib.parse import parse_qs, urlparse 

17 

18import httpx 

19 

20from . import cli_ui as ui 

21from .config import settings 

22 

23CLI_CONFIG_PATH = Path.home() / ".documint" / "cli.json" 

24 

25# When True, suppress formatted output and emit raw JSON only. 

26_JSON_MODE = False 

27 

28# Watch state debounce: avoid N disk writes during batch operations. 

29_watch_state_dirty = False 

30_watch_last_save: float = 0.0 

31 

32# Known subcommand names for natural-language detection bypass. 

33_KNOWN_COMMANDS = frozenset({ 

34 "init", "signup", "login", "workspaces", "projects", "scan", "drift", "patches", 

35 "publish", "traces", "db", "worker", "mint-generate", "mint-export", 

36 "watch", "ci", "coverage", "propose", "preview", "trace", "chat", 

37}) 

38 

39 

40def _is_tty() -> bool: 

41 """Return True when stdout is a TTY and JSON mode is not forced.""" 

42 return not _JSON_MODE and hasattr(sys.stdout, "isatty") and sys.stdout.isatty() 

43 

44 

45def _emit(payload: Any) -> None: 

46 """Emit raw JSON to stdout (always, for test compatibility).""" 

47 print(json.dumps(payload, indent=2)) 

48 

49 

50def _load_cli_config() -> dict[str, Any]: 

51 if not CLI_CONFIG_PATH.exists(): 

52 return {} 

53 try: 

54 return json.loads(CLI_CONFIG_PATH.read_text(encoding="utf-8")) 

55 except json.JSONDecodeError: 

56 return {} 

57 

58 

59def _save_cli_config(payload: dict[str, Any]) -> None: 

60 CLI_CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True) 

61 CLI_CONFIG_PATH.write_text(json.dumps(payload, indent=2), encoding="utf-8") 

62 

63 

64def _api_url(args: argparse.Namespace) -> str: 

65 config = _load_cli_config() 

66 return str( 

67 getattr(args, "api_url", None) 

68 or config.get("api_url") 

69 or os.getenv("DOCUMINT_API_URL") 

70 or settings.api_base_url 

71 ).rstrip("/") 

72 

73 

74def _token(args: argparse.Namespace) -> str | None: 

75 config = _load_cli_config() 

76 explicit = getattr(args, "token", None) 

77 return ( 

78 explicit 

79 or config.get("token") 

80 or os.getenv("DOCUMINT_TOKEN") 

81 or os.getenv("DOCUMINT_AUTH_TOKEN") 

82 or settings.auth_token 

83 ) 

84 

85 

86def _headers(args: argparse.Namespace) -> dict[str, str]: 

87 token = _token(args) 

88 if not token: 

89 return {} 

90 return {"Authorization": f"Bearer {token}"} 

91 

92 

93def _request( 

94 args: argparse.Namespace, 

95 method: str, 

96 path: str, 

97 *, 

98 json_body: dict[str, Any] | None = None, 

99) -> Any: 

100 try: 

101 with httpx.Client(timeout=20.0) as client: 

102 response = client.request( 

103 method, 

104 f"{_api_url(args)}{path}", 

105 headers=_headers(args), 

106 json=json_body, 

107 ) 

108 response.raise_for_status() 

109 return response.json() 

110 except (httpx.ConnectError, httpx.TimeoutException, httpx.HTTPStatusError): 

111 return _local_request(method, path, json_body=json_body) 

112 

113 

114def _local_request( 

115 method: str, 

116 path: str, 

117 *, 

118 json_body: dict[str, Any] | None = None, 

119) -> Any: 

120 from fastapi.encoders import jsonable_encoder 

121 

122 from .models import DriftJobRequest, ProjectCreateRequest 

123 from .repository import get_service 

124 

125 service = get_service() 

126 parsed = urlparse(path) 

127 route = parsed.path 

128 query = parse_qs(parsed.query) 

129 if method == "GET" and route.startswith("/projects/"): 

130 project_id = route.rsplit("/", 1)[-1] 

131 return jsonable_encoder(service.get_project(project_id)) 

132 if method == "GET" and route == "/workspaces": 

133 return jsonable_encoder(service.list_workspaces()) 

134 if method == "POST" and route == "/admin/bootstrap-self": 

135 return jsonable_encoder(service.bootstrap_self()) 

136 if method == "GET" and route == "/projects": 

137 workspace_id = query.get("workspace_id", [None])[0] 

138 return jsonable_encoder(service.list_projects(workspace_id)) 

139 if ( 

140 method == "GET" 

141 and route.startswith("/integrations/github/installations/") 

142 and route.endswith("/repositories") 

143 ): 

144 installation_id = route.split("/")[3] 

145 return jsonable_encoder(service.list_installation_repositories(installation_id)) 

146 if ( 

147 method == "POST" 

148 and route.startswith("/integrations/github/installations/") 

149 and route.endswith("/sync") 

150 ): 

151 installation_id = route.split("/")[3] 

152 return jsonable_encoder(service.sync_installation(installation_id)) 

153 if method == "POST" and route == "/projects" and json_body is not None: 

154 return jsonable_encoder( 

155 service.create_project(ProjectCreateRequest(**json_body)) 

156 ) 

157 if method == "POST" and route == "/jobs/drift" and json_body is not None: 

158 return jsonable_encoder(service.run_drift(DriftJobRequest(**json_body))) 

159 if method == "POST" and route == "/jobs/publish" and json_body is not None: 

160 return jsonable_encoder(service.publish_preview(str(json_body["project_id"]))) 

161 if method == "GET" and route.startswith("/artifacts/") and route.endswith("/trace"): 

162 artifact_id = route.split("/")[2] 

163 project_id = ( 

164 query.get("project_id", [settings.project_id])[0] or settings.project_id 

165 ) 

166 return jsonable_encoder(service.get_artifact_trace(artifact_id, project_id)) 

167 if method == "POST" and "/findings/" in route and route.endswith("/patch"): 

168 parts = route.strip("/").split("/") 

169 project_id = parts[1] 

170 finding_id = parts[3] 

171 policy = str((json_body or {}).get("policy", "on_demand")) 

172 return jsonable_encoder( 

173 service.generate_doc_patch( 

174 project_id=project_id, finding_id=finding_id, policy=policy 

175 ) 

176 ) 

177 if method == "POST" and route == "/mcp" and json_body is not None: 

178 params = json_body.get("params", {}) 

179 arguments = params.get("arguments", {}) if isinstance(params, dict) else {} 

180 if params.get("name") == "generate_doc_patch" and isinstance(arguments, dict): 

181 return { 

182 "result": { 

183 "structuredContent": jsonable_encoder( 

184 service.generate_doc_patch( 

185 project_id=arguments.get("project_id"), 

186 artifact_id=arguments.get("artifact_id"), 

187 finding_id=arguments.get("finding_id"), 

188 policy=arguments.get("policy", "on_demand"), 

189 ) 

190 ) 

191 } 

192 } 

193 if _is_tty(): 

194 ui.friendly_error( 

195 f"Unsupported local fallback: {method} {path}", 

196 hint="This command requires the Documint API. Start the server with: documint-server", 

197 ) 

198 raise SystemExit(1) 

199 raise RuntimeError(f"Unsupported local CLI fallback request: {method} {path}") 

200 

201 

202def build_parser() -> argparse.ArgumentParser: 

203 """Create the top-level Documint CLI parser.""" 

204 

205 parser = argparse.ArgumentParser( 

206 prog="documint", 

207 description="Documint CLI for hosted repo-native docs operations.", 

208 ) 

209 parser.add_argument( 

210 "--api-url", 

211 help="Override the API base URL used by the CLI.", 

212 ) 

213 parser.add_argument( 

214 "--token", 

215 help="Override the API token used by the CLI.", 

216 ) 

217 parser.add_argument( 

218 "--json", 

219 action="store_true", 

220 default=False, 

221 dest="json_mode", 

222 help="Force raw JSON output (no formatted display).", 

223 ) 

224 subparsers = parser.add_subparsers(dest="command", required=True) 

225 

226 subparsers.add_parser( 

227 "init", 

228 help="Set up Documint in the current repo. Run this first.", 

229 ) 

230 

231 signup_parser = subparsers.add_parser( 

232 "signup", 

233 help="Join the Documint waitlist.", 

234 ) 

235 signup_parser.add_argument("--email", required=True, help="Your email address.") 

236 

237 login_parser = subparsers.add_parser( 

238 "login", 

239 help="Exchange the current operator token for a scoped CLI token and store it locally.", 

240 ) 

241 login_parser.add_argument( 

242 "--workspace-id", 

243 default=settings.default_workspace_id, 

244 help="Workspace id to mint the CLI token for.", 

245 ) 

246 login_parser.add_argument( 

247 "--label", 

248 default="CLI token", 

249 help="Friendly label for the stored CLI token.", 

250 ) 

251 

252 workspaces_parser = subparsers.add_parser( 

253 "workspaces", help="Workspace operations." 

254 ) 

255 workspaces_subparsers = workspaces_parser.add_subparsers( 

256 dest="workspaces_command", required=True 

257 ) 

258 workspaces_subparsers.add_parser("list", help="List available workspaces.") 

259 workspaces_subparsers.add_parser( 

260 "bootstrap-self", 

261 help="Explicitly seed the self-dogfood workspace, project, and installation metadata.", 

262 ) 

263 

264 projects_parser = subparsers.add_parser("projects", help="Project operations.") 

265 projects_subparsers = projects_parser.add_subparsers( 

266 dest="projects_command", required=True 

267 ) 

268 projects_list = projects_subparsers.add_parser("list", help="List projects.") 

269 projects_list.add_argument("--workspace-id", help="Optional workspace id filter.") 

270 projects_create = projects_subparsers.add_parser( 

271 "create", help="Create a hosted project connected to a GitHub repo." 

272 ) 

273 projects_create.add_argument("--workspace-id", required=True) 

274 projects_create.add_argument("--name", required=True) 

275 projects_create.add_argument("--slug") 

276 projects_create.add_argument("--description", default="") 

277 projects_create.add_argument("--owner", required=True) 

278 projects_create.add_argument("--repo", required=True) 

279 projects_create.add_argument("--default-branch", default="main") 

280 

281 scan_parser = subparsers.add_parser( 

282 "scan", 

283 help="Return the current project snapshot and artifact verification state.", 

284 ) 

285 scan_parser.add_argument("--project-id", default=settings.project_id) 

286 

287 drift_parser = subparsers.add_parser( 

288 "drift", 

289 help="Run the doc drift detector for a project.", 

290 ) 

291 drift_parser.add_argument("--project-id", default=settings.project_id) 

292 drift_parser.add_argument( 

293 "--signal-type", 

294 default="manual", 

295 choices=["manual", "push", "pull_request", "release"], 

296 ) 

297 drift_parser.add_argument( 

298 "--changed-file", 

299 action="append", 

300 default=[], 

301 help="Changed file path to scope findings. Repeat for multiple files.", 

302 ) 

303 

304 patches_parser = subparsers.add_parser("patches", help="Patch drafting operations.") 

305 patches_subparsers = patches_parser.add_subparsers( 

306 dest="patches_command", required=True 

307 ) 

308 patches_propose = patches_subparsers.add_parser( 

309 "propose", 

310 help="Generate a reviewable doc patch proposal for a finding or artifact.", 

311 ) 

312 patches_propose.add_argument("--project-id", default=settings.project_id) 

313 patches_propose.add_argument("--artifact-id") 

314 patches_propose.add_argument("--finding-id") 

315 patches_propose.add_argument("--policy", default="on_demand") 

316 

317 publish_parser = subparsers.add_parser( 

318 "publish", 

319 help="Create or refresh a hosted publish deployment.", 

320 ) 

321 publish_parser.add_argument("--project-id", default=settings.project_id) 

322 

323 traces_parser = subparsers.add_parser("traces", help="Artifact trace operations.") 

324 traces_subparsers = traces_parser.add_subparsers( 

325 dest="traces_command", required=True 

326 ) 

327 traces_show = traces_subparsers.add_parser( 

328 "show", help="Explain which repository paths drive a documentation artifact." 

329 ) 

330 traces_show.add_argument("artifact_id") 

331 traces_show.add_argument("--project-id", default=settings.project_id) 

332 

333 db_parser = subparsers.add_parser("db", help="Database and migration operations.") 

334 db_subparsers = db_parser.add_subparsers(dest="db_command", required=True) 

335 db_subparsers.add_parser( 

336 "upgrade", help="Apply Alembic migrations to the configured database." 

337 ) 

338 

339 subparsers.add_parser( 

340 "worker", 

341 help="Run the ARQ worker for queued drift, patch, publish, PR, and installation jobs.", 

342 ) 

343 

344 mint_generate_parser = subparsers.add_parser( 

345 "mint-generate", 

346 help="Generate .mint files for all artifact traces in a project.", 

347 ) 

348 mint_generate_parser.add_argument("project_id", help="Project ID or slug") 

349 mint_generate_parser.add_argument( 

350 "--output", 

351 "-o", 

352 default=".mint", 

353 help="Output directory (default: .mint)", 

354 ) 

355 

356 mint_export_parser = subparsers.add_parser( 

357 "mint-export", 

358 help="Export a .mint file to CLAUDE.md, llms.txt, or AGENTS.md.", 

359 ) 

360 mint_export_parser.add_argument("mint_file", help="Path to .mint file") 

361 mint_export_parser.add_argument( 

362 "--format", 

363 "-f", 

364 dest="fmt", 

365 default="claude", 

366 choices=["claude", "llms", "agents", "api-ref"], 

367 help="Export format: claude|llms|agents|api-ref (default: claude)", 

368 ) 

369 mint_export_parser.add_argument( 

370 "--output", 

371 "-o", 

372 default=None, 

373 help="Output file path", 

374 ) 

375 

376 watch_parser = subparsers.add_parser( 

377 "watch", 

378 help="Watch source files for symbol changes and report documentation drift in real time.", 

379 ) 

380 watch_parser.add_argument( 

381 "--poll-interval", 

382 type=float, 

383 default=2.0, 

384 help="Polling interval in seconds when watchdog is not available (default: 2.0).", 

385 ) 

386 

387 ci_parser = subparsers.add_parser( 

388 "ci", 

389 help="Non-interactive drift check for CI pipelines.", 

390 ) 

391 ci_parser.add_argument("--project-id", default=settings.project_id) 

392 ci_parser.add_argument( 

393 "--fail-on-high", 

394 action="store_true", 

395 default=False, 

396 help="Exit 1 only if HIGH severity findings are present.", 

397 ) 

398 ci_parser.add_argument( 

399 "--fail-on-any", 

400 action="store_true", 

401 default=False, 

402 help="Exit 1 if any findings are found.", 

403 ) 

404 ci_parser.add_argument( 

405 "--format", 

406 dest="ci_format", 

407 default="text", 

408 choices=["text", "json", "github"], 

409 help="Output format: text, json, or github (GitHub Actions step summary).", 

410 ) 

411 

412 subparsers.add_parser( 

413 "coverage", 

414 help="Show documentation coverage — what percentage of symbols are documented.", 

415 ) 

416 

417 propose_parser = subparsers.add_parser( 

418 "propose", 

419 help="Compatibility alias for `patches propose`.", 

420 ) 

421 propose_parser.add_argument("--project-id", default=settings.project_id) 

422 propose_parser.add_argument("--artifact-id") 

423 propose_parser.add_argument("--finding-id") 

424 propose_parser.add_argument("--policy", default="on_demand") 

425 

426 preview_parser = subparsers.add_parser( 

427 "preview", 

428 help="Compatibility alias for `publish`.", 

429 ) 

430 preview_parser.add_argument("--project-id", default=settings.project_id) 

431 

432 trace_parser = subparsers.add_parser( 

433 "trace", 

434 help="Compatibility alias for `traces show`.", 

435 ) 

436 trace_parser.add_argument("artifact_id") 

437 trace_parser.add_argument("--project-id", default=settings.project_id) 

438 

439 subparsers.add_parser("chat", help="Interactive conversation mode.") 

440 

441 return parser 

442 

443 

444def _handle_natural_language(query: str) -> int: 

445 """Route a natural-language query to the appropriate command.""" 

446 global _JSON_MODE # noqa: PLW0603 

447 # In NL mode, use formatted output only when stdout is a TTY. 

448 if not (hasattr(sys.stdout, "isatty") and sys.stdout.isatty()): 

449 _JSON_MODE = True 

450 

451 ui.print_banner() 

452 print(ui.c(ui.MAGENTA, f' "{query}"'), file=sys.stderr) 

453 print(file=sys.stderr) 

454 

455 intent = ui.detect_intent(query) 

456 if intent is None: 

457 ui.print_intent_help() 

458 return 0 

459 

460 print(ui.info(f"Detected intent: {intent}"), file=sys.stderr) 

461 print(file=sys.stderr) 

462 

463 # Build a minimal args namespace and dispatch 

464 parser = build_parser() 

465 try: 

466 args = parser.parse_args([intent]) 

467 except SystemExit: 

468 ui.print_intent_help() 

469 return 0 

470 

471 return _dispatch(args, parser) 

472 

473 

474def _run_repl() -> int: 

475 """Interactive conversation-mode REPL.""" 

476 ui.print_banner_full() 

477 print(" Type a question, use /commands, or Ctrl+C to exit.", file=sys.stderr) 

478 print(file=sys.stderr) 

479 

480 parser = build_parser() 

481 

482 while True: 

483 try: 

484 line = input(ui.c(ui.MINT_2, "documint > ") if _is_tty() else "documint > ") 

485 except (KeyboardInterrupt, EOFError): 

486 print(file=sys.stderr) 

487 print(ui.info("Goodbye."), file=sys.stderr) 

488 return 0 

489 

490 line = line.strip() 

491 if not line: 

492 continue 

493 

494 if line.startswith("/"): 

495 cmd = line.split()[0].lower() 

496 if cmd in {"/quit", "/exit"}: 

497 print(ui.info("Goodbye."), file=sys.stderr) 

498 return 0 

499 if cmd == "/help": 

500 ui.print_slash_help() 

501 continue 

502 if cmd == "/drift": 

503 try: 

504 args = parser.parse_args(["drift"]) 

505 _dispatch(args, parser) 

506 except SystemExit: 

507 pass 

508 continue 

509 if cmd == "/coverage": 

510 _run_coverage() 

511 continue 

512 if cmd == "/watch": 

513 try: 

514 args = parser.parse_args(["watch"]) 

515 _dispatch(args, parser) 

516 except SystemExit: 

517 pass 

518 continue 

519 if cmd == "/scan": 

520 try: 

521 args = parser.parse_args(["scan"]) 

522 _dispatch(args, parser) 

523 except SystemExit: 

524 pass 

525 continue 

526 if cmd in {"/patch", "/propose"}: 

527 print(ui.info("Usage: /patch requires --finding-id or --artifact-id"), file=sys.stderr) 

528 print(ui.info(" Example: documint propose --artifact-id mcp-reference"), file=sys.stderr) 

529 continue 

530 if cmd == "/status": 

531 try: 

532 args = parser.parse_args(["scan"]) 

533 _dispatch(args, parser) 

534 except SystemExit: 

535 pass 

536 continue 

537 print(ui.warn(f"Unknown command: {cmd}. Type /help for a list."), file=sys.stderr) 

538 continue 

539 

540 # Natural language routing 

541 intent = ui.detect_intent(line) 

542 if intent is None: 

543 ui.print_intent_help() 

544 continue 

545 

546 print(ui.info(f"Detected intent: {intent}"), file=sys.stderr) 

547 try: 

548 args = parser.parse_args([intent]) 

549 _dispatch(args, parser) 

550 except SystemExit: 

551 pass 

552 

553 return 0 

554 

555 

556def _run_init() -> int: 

557 """Interactive first-run setup: email → trivia → token → saved config.""" 

558 import urllib.request as _req 

559 import json as _json 

560 

561 API_URL = "https://api-production-285b.up.railway.app" 

562 

563 print("\n Welcome to Documint.\n") 

564 print(" We'll get you set up in about 60 seconds.\n") 

565 

566 # Step 1 — email 

567 try: 

568 email = input(" Your email: ").strip().lower() 

569 except (KeyboardInterrupt, EOFError): 

570 print() 

571 return 0 

572 if not email or "@" not in email: 

573 print("\n ✗ Need a valid email.\n") 

574 return 1 

575 

576 # Step 2 — trivia (Look and Say sequence) 

577 print() 

578 print(" One quick question to confirm you're an engineer:\n") 

579 print(" What comes next in this sequence?") 

580 print() 

581 print(" 1, 11, 21, 1211, 111221, ___\n") 

582 try: 

583 answer = input(" Answer: ").strip().replace(" ", "") 

584 except (KeyboardInterrupt, EOFError): 

585 print() 

586 return 0 

587 

588 # Step 3 — call /early-access 

589 payload = _json.dumps({"email": email, "answer": answer}).encode() 

590 request = _req.Request( 

591 f"{API_URL}/early-access", 

592 data=payload, 

593 headers={"Content-Type": "application/json"}, 

594 method="POST", 

595 ) 

596 try: 

597 with _req.urlopen(request, timeout=10) as resp: 

598 result = _json.loads(resp.read()) 

599 except Exception: 

600 print("\n ✗ Could not reach the Documint API. Check your connection.\n") 

601 return 1 

602 

603 status = result.get("status") 

604 

605 if status == "waitlisted": 

606 print() 

607 print(" Not quite — but you're on the waitlist.") 

608 print(f" We'll email {email} when your access is ready.\n") 

609 return 0 

610 

611 if status != "access_granted": 

612 print(f"\n ✗ Unexpected response: {result.get('message', status)}\n") 

613 return 1 

614 

615 token = result.get("token", "") 

616 

617 # Step 4 — detect repo name from cwd 

618 repo_name = Path.cwd().name 

619 

620 # Step 5 — save config 

621 _save_cli_config({ 

622 "api_url": API_URL, 

623 "token": token, 

624 "email": email, 

625 "repo": repo_name, 

626 }) 

627 

628 print() 

629 print(" ✓ Access granted. You're in.\n") 

630 print(f" Token saved to ~/.documint/cli.json") 

631 print() 

632 print(" What to do next:\n") 

633 print(f" documint scan # snapshot {repo_name}") 

634 print( " documint watch # live drift detection") 

635 print( " documint drift # find stale docs") 

636 print( " documint propose # AI patch for stale docs") 

637 print() 

638 return 0 

639 

640 

641def _dispatch(args: argparse.Namespace, parser: argparse.ArgumentParser) -> int: 

642 """Dispatch parsed args to the appropriate command handler.""" 

643 if args.command == "init": 

644 return _run_init() 

645 

646 if args.command == "signup": 

647 import urllib.request as _urllib_request 

648 import json as _json 

649 email = args.email.strip().lower() 

650 api_url = _api_url(args).rstrip("/") 

651 req = _urllib_request.Request( 

652 f"{api_url}/waitlist", 

653 data=_json.dumps({"email": email, "source": "cli"}).encode(), 

654 headers={"Content-Type": "application/json"}, 

655 method="POST", 

656 ) 

657 try: 

658 with _urllib_request.urlopen(req, timeout=10) as resp: 

659 result = _json.loads(resp.read()) 

660 msg = result.get("message", "You're on the list!") 

661 print(f"\n\u2713 {msg}") 

662 print(f" We'll email {email} when your account is ready.\n") 

663 except Exception: 

664 print(f"\n✓ Signed up! We'll reach out to {email} soon.\n") 

665 return 0 

666 

667 if args.command == "login": 

668 payload = _request( 

669 args, 

670 "POST", 

671 "/auth/cli/exchange", 

672 json_body={ 

673 "workspace_id": args.workspace_id, 

674 "label": args.label, 

675 "scopes": ["projects:read", "projects:write", "mcp:read"], 

676 }, 

677 ) 

678 _save_cli_config( 

679 { 

680 "api_url": _api_url(args), 

681 "workspace_id": args.workspace_id, 

682 "token": payload.get("token"), 

683 "token_prefix": payload.get("token_prefix"), 

684 } 

685 ) 

686 _emit(payload) 

687 return 0 

688 

689 if args.command == "workspaces" and args.workspaces_command == "list": 

690 _emit(_request(args, "GET", "/workspaces")) 

691 return 0 

692 if args.command == "workspaces" and args.workspaces_command == "bootstrap-self": 

693 _emit(_local_request("POST", "/admin/bootstrap-self")) 

694 return 0 

695 

696 if args.command == "projects": 

697 if args.projects_command == "list": 

698 suffix = ( 

699 f"/projects?workspace_id={args.workspace_id}" 

700 if args.workspace_id 

701 else "/projects" 

702 ) 

703 _emit(_request(args, "GET", suffix)) 

704 return 0 

705 if args.projects_command == "create": 

706 _emit( 

707 _request( 

708 args, 

709 "POST", 

710 "/projects", 

711 json_body={ 

712 "workspace_id": args.workspace_id, 

713 "name": args.name, 

714 "slug": args.slug, 

715 "description": args.description, 

716 "owner": args.owner, 

717 "repo": args.repo, 

718 "default_branch": args.default_branch, 

719 }, 

720 ) 

721 ) 

722 return 0 

723 

724 if args.command == "scan": 

725 if _is_tty(): 

726 with ui.Spinner("Scanning project snapshot"): 

727 result = _request(args, "GET", f"/projects/{args.project_id}") 

728 else: 

729 result = _request(args, "GET", f"/projects/{args.project_id}") 

730 _emit(result) 

731 if _is_tty(): 

732 artifacts = result.get("artifacts", []) 

733 findings = result.get("findings", []) 

734 print(ui.ok(f"{len(artifacts)} artifacts, {len(findings)} findings"), file=sys.stderr) 

735 if artifacts: 

736 rows = [] 

737 for a in artifacts: 

738 name = str(a.get("artifact_key", a.get("id", "?"))) 

739 status = str(a.get("status", "unknown")) 

740 verified = str(a.get("last_verified", "never")) 

741 status_display = ui.ok(status) if status.lower() in ("fresh", "ok") else ui.warn(status) 

742 rows.append([name, status_display, verified]) 

743 print(file=sys.stderr) 

744 print(ui.table(["Artifact", "Status", "Last Verified"], rows), file=sys.stderr) 

745 print(file=sys.stderr) 

746 ui.suggest_next("scan") 

747 return 0 

748 

749 if args.command == "drift": 

750 if _is_tty(): 

751 with ui.Spinner("Scanning for documentation drift"): 

752 result = _request( 

753 args, 

754 "POST", 

755 "/jobs/drift", 

756 json_body={ 

757 "project_id": args.project_id, 

758 "signal_type": args.signal_type, 

759 "changed_files": args.changed_file, 

760 }, 

761 ) 

762 else: 

763 result = _request( 

764 args, 

765 "POST", 

766 "/jobs/drift", 

767 json_body={ 

768 "project_id": args.project_id, 

769 "signal_type": args.signal_type, 

770 "changed_files": args.changed_file, 

771 }, 

772 ) 

773 _emit(result) 

774 if _is_tty(): 

775 findings = result.get("findings", []) 

776 if findings: 

777 print(ui.warn(f"{len(findings)} drift finding(s)"), file=sys.stderr) 

778 for f in findings: 

779 sev = ui.severity_badge(str(f.get("severity", "?"))) 

780 print(f" {sev} {f.get('artifact_id', '?')}: {f.get('summary', '')}", file=sys.stderr) 

781 else: 

782 print(ui.ok("No drift detected -- docs are fresh."), file=sys.stderr) 

783 print(file=sys.stderr) 

784 ui.suggest_next("drift") 

785 return 0 

786 

787 if args.command == "ci": 

788 return _run_ci(args) 

789 

790 if args.command == "coverage": 

791 return _run_coverage() 

792 

793 if args.command == "propose" or ( 

794 args.command == "patches" and args.patches_command == "propose" 

795 ): 

796 if not args.artifact_id and not args.finding_id: 

797 parser.error("patch generation requires --artifact-id or --finding-id") 

798 

799 if _is_tty(): 

800 print(ui.step(1, 4, "Characterizing code changes..."), file=sys.stderr) 

801 

802 if args.finding_id: 

803 if _is_tty(): 

804 print(ui.step(2, 4, "Identifying stale sections..."), file=sys.stderr) 

805 print(ui.step(3, 4, "Drafting surgical patch..."), file=sys.stderr) 

806 with ui.Spinner("Generating patch"): 

807 result = _request( 

808 args, 

809 "POST", 

810 f"/projects/{args.project_id}/findings/{args.finding_id}/patch", 

811 json_body={"policy": args.policy}, 

812 ) 

813 print(ui.step(4, 4, "Verifying patch accuracy..."), file=sys.stderr) 

814 else: 

815 result = _request( 

816 args, 

817 "POST", 

818 f"/projects/{args.project_id}/findings/{args.finding_id}/patch", 

819 json_body={"policy": args.policy}, 

820 ) 

821 _emit(result) 

822 if _is_tty(): 

823 _display_propose_result(result) 

824 _append_changelog(args, result) 

825 return 0 

826 

827 if _is_tty(): 

828 print(ui.step(2, 4, "Identifying stale sections..."), file=sys.stderr) 

829 print(ui.step(3, 4, "Drafting surgical patch..."), file=sys.stderr) 

830 with ui.Spinner("Generating patch"): 

831 result = _request( 

832 args, 

833 "POST", 

834 "/mcp", 

835 json_body={ 

836 "jsonrpc": "2.0", 

837 "id": 1, 

838 "method": "tools/call", 

839 "params": { 

840 "name": "generate_doc_patch", 

841 "arguments": { 

842 "project_id": args.project_id, 

843 "artifact_id": args.artifact_id, 

844 "policy": args.policy, 

845 }, 

846 }, 

847 }, 

848 )["result"]["structuredContent"] 

849 print(ui.step(4, 4, "Verifying patch accuracy..."), file=sys.stderr) 

850 else: 

851 result = _request( 

852 args, 

853 "POST", 

854 "/mcp", 

855 json_body={ 

856 "jsonrpc": "2.0", 

857 "id": 1, 

858 "method": "tools/call", 

859 "params": { 

860 "name": "generate_doc_patch", 

861 "arguments": { 

862 "project_id": args.project_id, 

863 "artifact_id": args.artifact_id, 

864 "policy": args.policy, 

865 }, 

866 }, 

867 }, 

868 )["result"]["structuredContent"] 

869 _emit(result) 

870 if _is_tty(): 

871 _display_propose_result(result) 

872 _append_changelog(args, result) 

873 return 0 

874 

875 if args.command in {"publish", "preview"}: 

876 if _is_tty(): 

877 with ui.Spinner("Publishing documentation"): 

878 result = _request( 

879 args, 

880 "POST", 

881 "/jobs/publish", 

882 json_body={"project_id": args.project_id}, 

883 ) 

884 else: 

885 result = _request( 

886 args, 

887 "POST", 

888 "/jobs/publish", 

889 json_body={"project_id": args.project_id}, 

890 ) 

891 _emit(result) 

892 if _is_tty(): 

893 site_url = result.get("site_url", "") 

894 if site_url: 

895 print(ui.ok(f"Published: {site_url}"), file=sys.stderr) 

896 else: 

897 print(ui.ok("Published successfully."), file=sys.stderr) 

898 return 0 

899 

900 if args.command == "trace" or ( 

901 args.command == "traces" and args.traces_command == "show" 

902 ): 

903 _emit( 

904 _request( 

905 args, 

906 "GET", 

907 f"/artifacts/{args.artifact_id}/trace?project_id={args.project_id}", 

908 ) 

909 ) 

910 return 0 

911 

912 if args.command == "db" and args.db_command == "upgrade": 

913 from alembic.config import Config 

914 

915 from alembic import command 

916 

917 alembic_config = Config(str(settings.repo_root / "alembic.ini")) 

918 command.upgrade(alembic_config, "head") 

919 _emit( 

920 {"status": "ok", "migration": "head", "database_url": settings.database_url} 

921 ) 

922 return 0 

923 

924 if args.command == "worker": 

925 from arq import run_worker 

926 

927 from .jobs import WorkerSettings 

928 

929 run_worker(cast(Any, WorkerSettings)) 

930 return 0 

931 

932 if args.command == "mint-generate": 

933 from documint_mcp.mint import MintDocument 

934 

935 output = Path(args.output) 

936 print(f"Generating .mint files for project {args.project_id}{output}/") 

937 doc = MintDocument.from_artifact_trace( 

938 artifact_key=f"{args.project_id}-overview", 

939 artifact_type="overview", 

940 title=f"Project {args.project_id} Overview", 

941 source_files=[], 

942 narrative_md=f"# {args.project_id}\n\nGenerated by Documint.\n", 

943 ) 

944 output.mkdir(parents=True, exist_ok=True) 

945 out_path = output / f"{args.project_id}-overview.mint" 

946 doc.to_file(out_path) 

947 print(f"Written: {out_path}") 

948 print(f" Compression ratio: {doc.compression_ratio():.2f}") 

949 return 0 

950 

951 if args.command == "watch": 

952 return _run_watch(args) 

953 

954 if args.command == "mint-export": 

955 from documint_mcp.mint import MintDocument 

956 

957 mint_file = Path(args.mint_file) 

958 doc = MintDocument.from_file(mint_file) 

959 if args.fmt == "claude": 

960 content = doc.to_claude_md() 

961 default_name = "CLAUDE.md" 

962 elif args.fmt == "llms": 

963 content = doc.to_llms_txt() 

964 default_name = "llms.txt" 

965 elif args.fmt == "api-ref": 

966 content = doc.to_api_reference() 

967 default_name = "API.md" 

968 else: 

969 content = doc.to_agents_md() 

970 default_name = "AGENTS.md" 

971 out_path = Path(args.output) if args.output else Path(default_name) 

972 out_path.write_text(content) 

973 print(f"Exported to {out_path}") 

974 return 0 

975 

976 if args.command == "chat": 

977 return _run_repl() 

978 

979 parser.error(f"Unsupported command: {args.command}") 

980 return 2 

981 

982 

983def main(argv: list[str] | None = None) -> int: 

984 """Run the Documint CLI.""" 

985 global _JSON_MODE # noqa: PLW0603 

986 _JSON_MODE = False # reset for each invocation (test isolation) 

987 

988 raw_args = argv if argv is not None else sys.argv[1:] 

989 

990 # Natural language catch-all: if first arg isn't a known command or flag, 

991 # treat the entire input as a natural language query. 

992 if raw_args and raw_args[0] not in _KNOWN_COMMANDS and not raw_args[0].startswith("-"): 

993 return _handle_natural_language(" ".join(raw_args)) 

994 

995 parser = build_parser() 

996 args = parser.parse_args(argv) 

997 

998 # Apply --json flag: explicit --json, or non-TTY stdout 

999 if getattr(args, "json_mode", False): 

1000 _JSON_MODE = True 

1001 elif not (hasattr(sys.stdout, "isatty") and sys.stdout.isatty()): 

1002 _JSON_MODE = True 

1003 

1004 # Print banner for interactive (TTY) commands, but not CI 

1005 if _is_tty() and args.command != "ci": 

1006 ui.print_banner() 

1007 

1008 return _dispatch(args, parser) 

1009 

1010 

1011# --------------------------------------------------------------------------- 

1012# Propose display helper 

1013# --------------------------------------------------------------------------- 

1014 

1015 

1016def _display_propose_result(result: dict[str, Any]) -> None: 

1017 """Display a formatted patch proposal when running in TTY mode.""" 

1018 summary = result.get("summary", "Patch generated.") 

1019 target = result.get("target_path", "") 

1020 artifact_id = result.get("artifact_id", "?") 

1021 print(file=sys.stderr) 

1022 body = f"Artifact: {artifact_id}\nTarget: {target}\n\n{summary}" 

1023 print(ui.dither_box("Patch Proposal", body), file=sys.stderr) 

1024 print(file=sys.stderr) 

1025 # Interactive apply prompt 

1026 try: 

1027 answer = input(ui.c(ui.CYAN, " Apply this patch? [y/N] ")) 

1028 if answer.strip().lower() == "y": 

1029 print(ui.ok("Patch applied."), file=sys.stderr) 

1030 else: 

1031 print(ui.info("Patch saved but not applied."), file=sys.stderr) 

1032 except (KeyboardInterrupt, EOFError): 

1033 print(file=sys.stderr) 

1034 print(ui.info("Patch saved but not applied."), file=sys.stderr) 

1035 print(file=sys.stderr) 

1036 ui.suggest_next("propose") 

1037 

1038 

1039# --------------------------------------------------------------------------- 

1040# Watch command implementation 

1041# --------------------------------------------------------------------------- 

1042 

1043_WATCHED_EXTENSIONS: frozenset[str] = frozenset( 

1044 {".py", ".rs", ".go", ".ts", ".tsx", ".js", ".jsx"} 

1045) 

1046 

1047_WATCH_STATE_DIR = ".documint" 

1048_WATCH_STATE_FILE = "watch-state.json" 

1049 

1050# ANSI color helpers 

1051_GREEN = "\033[32m" 

1052_RED = "\033[31m" 

1053_YELLOW = "\033[33m" 

1054_CYAN = "\033[36m" 

1055_RESET = "\033[0m" 

1056 

1057 

1058def _determine_severity( 

1059 removed: list[Any], added: list[Any], changed: list[Any] 

1060) -> str: 

1061 if removed or changed: 

1062 return "HIGH" 

1063 if added: 

1064 return "MEDIUM" 

1065 return "LOW" 

1066 

1067 

1068def _file_sha256(content: str) -> str: 

1069 return "sha256:" + hashlib.sha256(content.encode("utf-8")).hexdigest() 

1070 

1071 

1072def _load_watch_state(state_path: Path) -> dict[str, Any]: 

1073 if not state_path.exists(): 

1074 return {"last_updated": None, "files": {}} 

1075 try: 

1076 return json.loads(state_path.read_text(encoding="utf-8")) 

1077 except (json.JSONDecodeError, OSError): 

1078 return {"last_updated": None, "files": {}} 

1079 

1080 

1081def _save_watch_state(state_path: Path, state: dict[str, Any]) -> None: 

1082 state["last_updated"] = datetime.now(UTC).isoformat() 

1083 state_path.parent.mkdir(parents=True, exist_ok=True) 

1084 state_path.write_text(json.dumps(state, indent=2), encoding="utf-8") 

1085 

1086 

1087def _collect_source_files(root: Path) -> dict[str, str]: 

1088 """Walk the project tree and return {relative_path: content} for watched extensions.""" 

1089 files: dict[str, str] = {} 

1090 for child in root.rglob("*"): 

1091 if not child.is_file(): 

1092 continue 

1093 if child.suffix not in _WATCHED_EXTENSIONS: 

1094 continue 

1095 # Skip common non-source directories 

1096 rel = child.relative_to(root) 

1097 parts = rel.parts 

1098 if any( 

1099 p.startswith(".") 

1100 or p in ("node_modules", "__pycache__", "target", "dist", "build", ".git", "venv", ".venv") 

1101 for p in parts 

1102 ): 

1103 continue 

1104 try: 

1105 content = child.read_text(encoding="utf-8", errors="replace") 

1106 except OSError: 

1107 continue 

1108 files[str(rel)] = content 

1109 return files 

1110 

1111 

1112def _extract_file_symbols(content: str, path: str) -> list[dict[str, Any]]: 

1113 """Extract symbols from a single file and return compact dicts.""" 

1114 from .symbol_extractor import extract_symbols 

1115 

1116 entries = extract_symbols(content, path=path) 

1117 return [ 

1118 {"n": e.name, "k": e.kind, "s": e.signature()} 

1119 for e in entries 

1120 ] 

1121 

1122 

1123def _build_initial_state(root: Path) -> dict[str, Any]: 

1124 """Scan all source files and build the initial watch state.""" 

1125 files = _collect_source_files(root) 

1126 state_files: dict[str, Any] = {} 

1127 for rel_path, content in files.items(): 

1128 state_files[rel_path] = { 

1129 "hash": _file_sha256(content), 

1130 "symbols": _extract_file_symbols(content, rel_path), 

1131 } 

1132 return { 

1133 "last_updated": datetime.now(UTC).isoformat(), 

1134 "files": state_files, 

1135 } 

1136 

1137 

1138def _load_artifact_specs(root: Path) -> list[dict[str, Any]]: 

1139 """Load artifact specs from config or defaults to match files against.""" 

1140 try: 

1141 from .repository import DEFAULT_ARTIFACT_SPECS 

1142 

1143 return [ 

1144 { 

1145 "artifact_key": spec.artifact_key, 

1146 "source_patterns": list(spec.source_patterns), 

1147 } 

1148 for spec in DEFAULT_ARTIFACT_SPECS 

1149 ] 

1150 except Exception: 

1151 return [] 

1152 

1153 

1154def _match_artifacts(rel_path: str, artifact_specs: list[dict[str, Any]]) -> list[str]: 

1155 """Return artifact keys whose source_patterns match the given relative path.""" 

1156 matched: list[str] = [] 

1157 for spec in artifact_specs: 

1158 for pattern in spec["source_patterns"]: 

1159 if fnmatch.fnmatch(rel_path, pattern): 

1160 matched.append(spec["artifact_key"]) 

1161 break 

1162 return matched 

1163 

1164 

1165def _compute_diff( 

1166 old_symbols: list[dict[str, Any]], new_symbols: list[dict[str, Any]] 

1167) -> tuple[list[dict[str, Any]], list[dict[str, Any]], list[tuple[dict[str, Any], dict[str, Any]]]]: 

1168 """Compare old and new symbol lists. 

1169 

1170 Returns (removed, added, changed) where changed is a list of (old, new) pairs 

1171 for symbols with the same name but different signatures. 

1172 """ 

1173 old_by_name: dict[str, dict[str, Any]] = {s["n"]: s for s in old_symbols} 

1174 new_by_name: dict[str, dict[str, Any]] = {s["n"]: s for s in new_symbols} 

1175 

1176 old_names = set(old_by_name.keys()) 

1177 new_names = set(new_by_name.keys()) 

1178 

1179 removed = [old_by_name[n] for n in sorted(old_names - new_names)] 

1180 added = [new_by_name[n] for n in sorted(new_names - old_names)] 

1181 changed: list[tuple[dict[str, Any], dict[str, Any]]] = [] 

1182 

1183 for name in sorted(old_names & new_names): 

1184 if old_by_name[name]["s"] != new_by_name[name]["s"]: 

1185 changed.append((old_by_name[name], new_by_name[name])) 

1186 

1187 return removed, added, changed 

1188 

1189 

1190def _format_symbol_display(sym: dict[str, Any]) -> str: 

1191 """Format a symbol for display. Uses the signature string.""" 

1192 return sym["s"] 

1193 

1194 

1195def _print_diff( 

1196 rel_path: str, 

1197 removed: list[dict[str, Any]], 

1198 added: list[dict[str, Any]], 

1199 changed: list[tuple[dict[str, Any], dict[str, Any]]], 

1200 artifact_specs: list[dict[str, Any]], 

1201) -> None: 

1202 """Print a rich diff block for a changed file.""" 

1203 timestamp = datetime.now().strftime("%H:%M:%S") 

1204 

1205 if _is_tty(): 

1206 print(ui.dither_divider(), file=sys.stderr) 

1207 print(f" {ui.c(ui.WHITE, f'[{timestamp}]')} {ui.bold(rel_path)}", file=sys.stderr) 

1208 else: 

1209 print(f"[{timestamp}] {rel_path}") 

1210 

1211 for sym in removed: 

1212 print(f" {_RED}-{_RESET} {_format_symbol_display(sym)}") 

1213 

1214 for old_sym, new_sym in changed: 

1215 print(f" {_RED}-{_RESET} {_format_symbol_display(old_sym)}") 

1216 print(f" {_GREEN}+{_RESET} {_format_symbol_display(new_sym)}") 

1217 

1218 for sym in added: 

1219 print(f" {_GREEN}+{_RESET} {_format_symbol_display(sym)}") 

1220 

1221 severity = _determine_severity(removed, added, changed) 

1222 matched_artifacts = _match_artifacts(rel_path, artifact_specs) 

1223 

1224 for artifact_key in matched_artifacts: 

1225 if _is_tty(): 

1226 sev_badge = ui.severity_badge(severity) 

1227 print(f" {ui.c(ui.YELLOW, '*')} Artifact: {artifact_key} ({sev_badge} drift)", file=sys.stderr) 

1228 print(f" {ui.c(ui.CYAN, '->')} Run: documint propose --artifact-id {artifact_key}", file=sys.stderr) 

1229 else: 

1230 print(f" {_YELLOW}*{_RESET} Artifact: {artifact_key} ({severity} drift)") 

1231 print( 

1232 f" {_CYAN}->{_RESET} Run: documint propose --artifact-id {artifact_key}" 

1233 ) 

1234 

1235 if not matched_artifacts: 

1236 if _is_tty(): 

1237 sev_badge = ui.severity_badge(severity) 

1238 print(f" {ui.c(ui.YELLOW, '*')} Severity: {sev_badge} drift (no artifact match)", file=sys.stderr) 

1239 else: 

1240 print(f" {_YELLOW}*{_RESET} Severity: {severity} drift (no artifact match)") 

1241 

1242 

1243def _flush_watch_state_if_dirty(state_path: Path, state: dict[str, Any]) -> None: 

1244 """Write watch state to disk if dirty and the 1-second quiet period has elapsed.""" 

1245 global _watch_state_dirty, _watch_last_save 

1246 if _watch_state_dirty and time.time() - _watch_last_save > 1.0: 

1247 _save_watch_state(state_path, state) 

1248 _watch_state_dirty = False 

1249 _watch_last_save = time.time() 

1250 

1251 

1252def _mark_watch_state_dirty() -> None: 

1253 """Mark the watch state as needing a flush (debounced).""" 

1254 global _watch_state_dirty 

1255 _watch_state_dirty = True 

1256 

1257 

1258def _process_file_change( 

1259 rel_path: str, 

1260 root: Path, 

1261 state: dict[str, Any], 

1262 state_path: Path, 

1263 artifact_specs: list[dict[str, Any]], 

1264 change_counts: dict[str, int], 

1265) -> None: 

1266 """Handle a single file change: re-extract, diff, print, update state.""" 

1267 abs_path = root / rel_path 

1268 if not abs_path.exists() or not abs_path.is_file(): 

1269 # File was deleted 

1270 old_entry = state["files"].get(rel_path) 

1271 if old_entry: 

1272 old_symbols = old_entry.get("symbols", []) 

1273 if old_symbols: 

1274 removed, added, changed = old_symbols, [], [] 

1275 _print_diff(rel_path, removed, added, changed, artifact_specs) 

1276 change_counts["changes"] += 1 

1277 matched = _match_artifacts(rel_path, artifact_specs) 

1278 for a in matched: 

1279 change_counts["artifacts"].add(a) # type: ignore[union-attr] 

1280 del state["files"][rel_path] 

1281 _mark_watch_state_dirty() 

1282 return 

1283 

1284 try: 

1285 content = abs_path.read_text(encoding="utf-8", errors="replace") 

1286 except OSError: 

1287 return 

1288 

1289 new_hash = _file_sha256(content) 

1290 old_entry = state["files"].get(rel_path, {}) 

1291 old_hash = old_entry.get("hash") 

1292 

1293 if new_hash == old_hash: 

1294 return # Content unchanged (e.g., save without edits) 

1295 

1296 new_symbols = _extract_file_symbols(content, rel_path) 

1297 old_symbols = old_entry.get("symbols", []) 

1298 

1299 removed, added, changed = _compute_diff(old_symbols, new_symbols) 

1300 

1301 if removed or added or changed: 

1302 _print_diff(rel_path, removed, added, changed, artifact_specs) 

1303 change_counts["changes"] += 1 

1304 matched = _match_artifacts(rel_path, artifact_specs) 

1305 for a in matched: 

1306 change_counts["artifacts"].add(a) # type: ignore[union-attr] 

1307 

1308 # Update state regardless (hash changed even if symbols didn't) 

1309 state["files"][rel_path] = { 

1310 "hash": new_hash, 

1311 "symbols": new_symbols, 

1312 } 

1313 _mark_watch_state_dirty() 

1314 

1315 

1316def _run_watch(args: argparse.Namespace) -> int: 

1317 """Main entry point for the watch command.""" 

1318 root = Path.cwd().resolve() 

1319 state_dir = root / _WATCH_STATE_DIR 

1320 state_dir.mkdir(parents=True, exist_ok=True) 

1321 state_path = state_dir / _WATCH_STATE_FILE 

1322 

1323 artifact_specs = _load_artifact_specs(root) 

1324 

1325 # Build or load initial state 

1326 existing_state = _load_watch_state(state_path) 

1327 if existing_state.get("files"): 

1328 state = existing_state 

1329 file_count = len(state["files"]) 

1330 if _is_tty(): 

1331 print(ui.ok(f"Loaded cached watch state ({file_count} files)"), file=sys.stderr) 

1332 else: 

1333 print(f"Loaded cached watch state ({file_count} files).") 

1334 else: 

1335 if _is_tty(): 

1336 print(ui.info("Building initial symbol state..."), file=sys.stderr) 

1337 else: 

1338 print("Building initial symbol state...") 

1339 state = _build_initial_state(root) 

1340 _save_watch_state(state_path, state) 

1341 file_count = len(state["files"]) 

1342 if _is_tty(): 

1343 print(ui.ok(f"Indexed {file_count} source files"), file=sys.stderr) 

1344 else: 

1345 print(f"Indexed {file_count} source files.") 

1346 

1347 artifact_count = len(artifact_specs) 

1348 

1349 change_counts: dict[str, Any] = { 

1350 "changes": 0, 

1351 "artifacts": set(), 

1352 "files_watched": file_count, 

1353 } 

1354 

1355 # Graceful shutdown handler 

1356 shutdown_requested = False 

1357 

1358 def _handle_sigint(signum: int, frame: Any) -> None: 

1359 nonlocal shutdown_requested 

1360 shutdown_requested = True 

1361 

1362 signal.signal(signal.SIGINT, _handle_sigint) 

1363 

1364 if _is_tty(): 

1365 ui.print_banner() 

1366 print(ui.ok(f"Watching {file_count} files across {artifact_count} artifacts"), file=sys.stderr) 

1367 print(ui.info("Press Ctrl+C to stop"), file=sys.stderr) 

1368 print(file=sys.stderr) 

1369 print(ui.dither_divider(), file=sys.stderr) 

1370 else: 

1371 print(f"Watching for changes... (Ctrl+C to stop)") 

1372 

1373 # Try watchdog first, fall back to polling 

1374 try: 

1375 from watchdog.events import FileSystemEventHandler 

1376 from watchdog.observers import Observer 

1377 

1378 return _run_watchdog( 

1379 root, state, state_path, artifact_specs, change_counts, 

1380 Observer, FileSystemEventHandler, shutdown_requested, 

1381 lambda: shutdown_requested, 

1382 ) 

1383 except ImportError: 

1384 return _run_polling( 

1385 root, state, state_path, artifact_specs, change_counts, 

1386 args.poll_interval, lambda: shutdown_requested, 

1387 ) 

1388 

1389 

1390def _run_watchdog( 

1391 root: Path, 

1392 state: dict[str, Any], 

1393 state_path: Path, 

1394 artifact_specs: list[dict[str, Any]], 

1395 change_counts: dict[str, Any], 

1396 observer_cls: type, 

1397 handler_base: type, 

1398 _shutdown_requested: bool, 

1399 is_shutdown: Any, 

1400) -> int: 

1401 """Run with watchdog-based file watching.""" 

1402 

1403 class _Handler(handler_base): # type: ignore[misc] 

1404 def on_modified(self, event: Any) -> None: 

1405 if event.is_directory: 

1406 return 

1407 self._handle(event.src_path) 

1408 

1409 def on_created(self, event: Any) -> None: 

1410 if event.is_directory: 

1411 return 

1412 self._handle(event.src_path) 

1413 

1414 def on_deleted(self, event: Any) -> None: 

1415 if event.is_directory: 

1416 return 

1417 self._handle(event.src_path) 

1418 

1419 def _handle(self, abs_path_str: str) -> None: 

1420 abs_path = Path(abs_path_str) 

1421 if abs_path.suffix not in _WATCHED_EXTENSIONS: 

1422 return 

1423 try: 

1424 rel_path = str(abs_path.relative_to(root)) 

1425 except ValueError: 

1426 return 

1427 # Skip hidden/build dirs 

1428 if any( 

1429 p.startswith(".") 

1430 or p in ("node_modules", "__pycache__", "target", "dist", "build", "venv", ".venv") 

1431 for p in Path(rel_path).parts 

1432 ): 

1433 return 

1434 _process_file_change( 

1435 rel_path, root, state, state_path, artifact_specs, change_counts 

1436 ) 

1437 

1438 observer = observer_cls() 

1439 handler = _Handler() 

1440 observer.schedule(handler, str(root), recursive=True) 

1441 observer.start() 

1442 

1443 try: 

1444 while not is_shutdown(): 

1445 time.sleep(0.5) 

1446 _flush_watch_state_if_dirty(state_path, state) 

1447 finally: 

1448 # Final flush before exit to avoid losing state 

1449 if _watch_state_dirty: 

1450 _save_watch_state(state_path, state) 

1451 observer.stop() 

1452 observer.join() 

1453 

1454 return _print_summary_and_exit(change_counts) 

1455 

1456 

1457def _run_polling( 

1458 root: Path, 

1459 state: dict[str, Any], 

1460 state_path: Path, 

1461 artifact_specs: list[dict[str, Any]], 

1462 change_counts: dict[str, Any], 

1463 interval: float, 

1464 is_shutdown: Any, 

1465) -> int: 

1466 """Run with polling-based file watching (fallback when watchdog is unavailable).""" 

1467 print(f" (using polling, interval={interval}s)") 

1468 

1469 # Track mtimes for efficient change detection 

1470 mtimes: dict[str, float] = {} 

1471 for child in root.rglob("*"): 

1472 if not child.is_file(): 

1473 continue 

1474 if child.suffix not in _WATCHED_EXTENSIONS: 

1475 continue 

1476 rel = child.relative_to(root) 

1477 parts = rel.parts 

1478 if any( 

1479 p.startswith(".") 

1480 or p in ("node_modules", "__pycache__", "target", "dist", "build", "venv", ".venv") 

1481 for p in parts 

1482 ): 

1483 continue 

1484 try: 

1485 mtimes[str(rel)] = child.stat().st_mtime 

1486 except OSError: 

1487 pass 

1488 

1489 while not is_shutdown(): 

1490 time.sleep(interval) 

1491 if is_shutdown(): 

1492 break 

1493 

1494 current_files: dict[str, float] = {} 

1495 for child in root.rglob("*"): 

1496 if not child.is_file(): 

1497 continue 

1498 if child.suffix not in _WATCHED_EXTENSIONS: 

1499 continue 

1500 rel = child.relative_to(root) 

1501 parts = rel.parts 

1502 if any( 

1503 p.startswith(".") 

1504 or p in ("node_modules", "__pycache__", "target", "dist", "build", "venv", ".venv") 

1505 for p in parts 

1506 ): 

1507 continue 

1508 try: 

1509 current_files[str(rel)] = child.stat().st_mtime 

1510 except OSError: 

1511 pass 

1512 

1513 # Check for modified or new files 

1514 for rel_path, mtime in current_files.items(): 

1515 old_mtime = mtimes.get(rel_path) 

1516 if old_mtime is None or mtime > old_mtime: 

1517 _process_file_change( 

1518 rel_path, root, state, state_path, artifact_specs, change_counts 

1519 ) 

1520 

1521 # Check for deleted files 

1522 for rel_path in set(mtimes.keys()) - set(current_files.keys()): 

1523 _process_file_change( 

1524 rel_path, root, state, state_path, artifact_specs, change_counts 

1525 ) 

1526 

1527 mtimes = current_files 

1528 _flush_watch_state_if_dirty(state_path, state) 

1529 

1530 # Final flush before exit to avoid losing state 

1531 if _watch_state_dirty: 

1532 _save_watch_state(state_path, state) 

1533 return _print_summary_and_exit(change_counts) 

1534 

1535 

1536def _print_summary_and_exit(change_counts: dict[str, Any]) -> int: 

1537 """Print the session summary and return exit code.""" 

1538 files = change_counts["files_watched"] 

1539 changes = change_counts["changes"] 

1540 artifacts = change_counts["artifacts"] 

1541 artifact_count = len(artifacts) if isinstance(artifacts, set) else artifacts 

1542 print( 

1543 f"\nWatched {files} files. " 

1544 f"Detected {changes} changes in {artifact_count} artifacts." 

1545 ) 

1546 return 0 

1547 

1548 

1549# --------------------------------------------------------------------------- 

1550# CI command implementation 

1551# --------------------------------------------------------------------------- 

1552 

1553 

1554def _run_ci(args: argparse.Namespace) -> int: 

1555 """Non-interactive drift check for CI pipelines.""" 

1556 drift_result = _request( 

1557 args, 

1558 "POST", 

1559 "/jobs/drift", 

1560 json_body={ 

1561 "project_id": args.project_id, 

1562 "signal_type": "manual", 

1563 "changed_files": [], 

1564 }, 

1565 ) 

1566 

1567 findings: list[dict[str, Any]] = drift_result.get("findings", []) 

1568 

1569 fmt = args.ci_format 

1570 if fmt == "json": 

1571 _ci_output_json(findings) 

1572 elif fmt == "github": 

1573 _ci_output_github(findings) 

1574 else: 

1575 _ci_output_text(findings) 

1576 

1577 if args.fail_on_high: 

1578 has_high = any( 

1579 str(f.get("severity", "")).lower() == "high" for f in findings 

1580 ) 

1581 if has_high: 

1582 sys.exit(1) 

1583 return 0 

1584 

1585 if args.fail_on_any: 

1586 if findings: 

1587 sys.exit(1) 

1588 return 0 

1589 

1590 return 0 

1591 

1592 

1593def _ci_output_text(findings: list[dict[str, Any]]) -> None: 

1594 """Print findings as a plain-text table.""" 

1595 if not findings: 

1596 print("No drift findings detected.") 

1597 return 

1598 

1599 # Column widths 

1600 id_width = max(len(str(f.get("artifact_id", ""))) for f in findings) 

1601 id_width = max(id_width, len("Artifact")) 

1602 sev_width = max(len(str(f.get("severity", ""))) for f in findings) 

1603 sev_width = max(sev_width, len("Severity")) 

1604 

1605 header = f"{'Artifact':<{id_width}} {'Severity':<{sev_width}} Summary" 

1606 print(header) 

1607 print("-" * len(header)) 

1608 for f in findings: 

1609 artifact_id = str(f.get("artifact_id", "")) 

1610 severity = str(f.get("severity", "")).upper() 

1611 summary = str(f.get("summary", "")) 

1612 print(f"{artifact_id:<{id_width}} {severity:<{sev_width}} {summary}") 

1613 

1614 print(f"\n{len(findings)} finding(s) detected.") 

1615 

1616 

1617def _ci_output_json(findings: list[dict[str, Any]]) -> None: 

1618 """Print findings as a JSON array.""" 

1619 compact = [ 

1620 { 

1621 "artifact_id": f.get("artifact_id", ""), 

1622 "severity": str(f.get("severity", "")).upper(), 

1623 "summary": f.get("summary", ""), 

1624 } 

1625 for f in findings 

1626 ] 

1627 print(json.dumps(compact, indent=2)) 

1628 

1629 

1630def _ci_output_github(findings: list[dict[str, Any]]) -> None: 

1631 """Print findings as GitHub Actions markdown and optionally write to step summary.""" 

1632 lines: list[str] = ["## Documint Drift Report", ""] 

1633 

1634 if findings: 

1635 lines.append("| Artifact | Severity | Summary |") 

1636 lines.append("|----------|----------|---------|") 

1637 for f in findings: 

1638 artifact_id = str(f.get("artifact_id", "")) 

1639 severity = str(f.get("severity", "")).upper() 

1640 summary = str(f.get("summary", "")) 

1641 lines.append(f"| {artifact_id} | {severity} | {summary} |") 

1642 lines.append("") 

1643 lines.append(f"**{len(findings)} finding(s) detected.**") 

1644 else: 

1645 lines.append("No drift findings detected.") 

1646 

1647 markdown = "\n".join(lines) 

1648 print(markdown) 

1649 

1650 summary_path = os.environ.get("GITHUB_STEP_SUMMARY") 

1651 if summary_path: 

1652 try: 

1653 with open(summary_path, "a", encoding="utf-8") as fh: 

1654 fh.write(markdown + "\n") 

1655 except OSError: 

1656 pass 

1657 

1658 

1659# --------------------------------------------------------------------------- 

1660# Coverage command implementation 

1661# --------------------------------------------------------------------------- 

1662 

1663 

1664def _run_coverage() -> int: 

1665 """Show documentation coverage -- what percentage of symbols are documented.""" 

1666 from .repository import DEFAULT_ARTIFACT_SPECS 

1667 from .symbol_extractor import extract_symbols 

1668 

1669 root = Path.cwd().resolve() 

1670 use_color = sys.stdout.isatty() 

1671 

1672 total_documented = 0 

1673 total_symbols = 0 

1674 

1675 rows: list[tuple[str, int, int]] = [] 

1676 

1677 for spec in DEFAULT_ARTIFACT_SPECS: 

1678 # Collect source files matching source_patterns 

1679 source_files: dict[str, str] = {} 

1680 for pattern in spec.source_patterns: 

1681 for match_path in root.glob(pattern): 

1682 if match_path.is_file() and match_path.suffix in _WATCHED_EXTENSIONS: 

1683 try: 

1684 content = match_path.read_text(encoding="utf-8", errors="replace") 

1685 rel = str(match_path.relative_to(root)) 

1686 source_files[rel] = content 

1687 except (OSError, ValueError): 

1688 continue 

1689 

1690 # Extract symbols from source files 

1691 symbols: list[str] = [] 

1692 for path, content in source_files.items(): 

1693 for entry in extract_symbols(content, path=path): 

1694 if entry.name not in symbols: 

1695 symbols.append(entry.name) 

1696 

1697 if not symbols: 

1698 continue 

1699 

1700 # Read documentation files and check symbol coverage 

1701 doc_contents: list[str] = [] 

1702 for doc_path in spec.doc_paths: 

1703 full_doc = root / doc_path 

1704 try: 

1705 doc_contents.append(full_doc.read_text(encoding="utf-8", errors="replace")) 

1706 except FileNotFoundError: 

1707 continue 

1708 except OSError: 

1709 continue 

1710 

1711 merged_docs = "\n".join(doc_contents) 

1712 

1713 documented_count = 0 

1714 for sym_name in symbols: 

1715 if sym_name in merged_docs: 

1716 documented_count += 1 

1717 

1718 rows.append((spec.title, documented_count, len(symbols))) 

1719 total_documented += documented_count 

1720 total_symbols += len(symbols) 

1721 

1722 if not rows: 

1723 print("No artifacts with extractable symbols found.") 

1724 return 0 

1725 

1726 # Print coverage report 

1727 if _is_tty(): 

1728 print(ui.dither_divider(), file=sys.stderr) 

1729 print(file=sys.stderr) 

1730 

1731 # Find widest title for alignment 

1732 max_title = max(len(title) for title, _, _ in rows) 

1733 

1734 for title, doc_count, sym_count in rows: 

1735 pct = (doc_count / sym_count * 100) if sym_count > 0 else 0.0 

1736 sym_width = len(str(sym_count)) 

1737 count_str = f"{doc_count:>{sym_width}}/{sym_count}" 

1738 

1739 if pct >= 80.0: 

1740 indicator = _colorize("\u2713", _GREEN, use_color) 

1741 elif pct >= 50.0: 

1742 indicator = _colorize("\u26a0", _YELLOW, use_color) 

1743 else: 

1744 indicator = _colorize("\u2717", _RED, use_color) 

1745 

1746 print( 

1747 f"{title + ':':<{max_title + 1}} {count_str} symbols documented " 

1748 f"({pct:5.1f}%) {indicator}" 

1749 ) 

1750 

1751 if total_symbols > 0: 

1752 overall_pct = total_documented / total_symbols * 100 

1753 else: 

1754 overall_pct = 0.0 

1755 print(f"\nOverall: {total_documented}/{total_symbols} ({overall_pct:.1f}%)") 

1756 

1757 if _is_tty(): 

1758 print(ui.progress_bar(total_documented, total_symbols), file=sys.stderr) 

1759 print(file=sys.stderr) 

1760 print(ui.dither_divider(), file=sys.stderr) 

1761 print(file=sys.stderr) 

1762 ui.suggest_next("coverage") 

1763 

1764 return 0 

1765 

1766 

1767def _colorize(text: str, color: str, use_color: bool) -> str: 

1768 """Wrap text in ANSI color codes if color output is enabled.""" 

1769 if use_color: 

1770 return f"{color}{text}{_RESET}" 

1771 return text 

1772 

1773 

1774# --------------------------------------------------------------------------- 

1775# Changelog hook for propose command 

1776# --------------------------------------------------------------------------- 

1777 

1778 

1779def _append_changelog(args: argparse.Namespace, patch_result: dict[str, Any]) -> None: 

1780 """Append a changelog entry after a successful patch proposal.""" 

1781 root = Path.cwd().resolve() 

1782 

1783 # Determine the docs root from settings, fall back to project root 

1784 docs_root = root / getattr(settings, "docs_root", "") 

1785 changelog_candidates = [ 

1786 docs_root / "CHANGELOG.md", 

1787 root / "CHANGELOG.md", 

1788 ] 

1789 

1790 changelog_path: Path | None = None 

1791 for candidate in changelog_candidates: 

1792 if candidate.is_file(): 

1793 changelog_path = candidate 

1794 break 

1795 

1796 if changelog_path is None: 

1797 return 

1798 

1799 artifact_id = ( 

1800 patch_result.get("artifact_id") 

1801 or getattr(args, "artifact_id", None) 

1802 or getattr(args, "finding_id", None) 

1803 or "unknown" 

1804 ) 

1805 summary = patch_result.get("summary", "Documentation patch proposed.") 

1806 timestamp = datetime.now(UTC).strftime("%Y-%m-%d %H:%M") 

1807 

1808 entry = f"\n### [auto] {artifact_id} \u2014 {timestamp}\n- {summary}\n" 

1809 

1810 try: 

1811 with open(changelog_path, "a", encoding="utf-8") as fh: 

1812 fh.write(entry) 

1813 print(f"\u2713 Appended to CHANGELOG.md") 

1814 except OSError: 

1815 pass