Coverage for src / apcore_cli / cli.py: 76%

511 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-04-26 10:23 +0800

1"""Core Dispatcher — CLI entry point and module routing (FE-01).""" 

2 

3from __future__ import annotations 

4 

5import json 

6import logging 

7import re 

8import sys 

9import time 

10from pathlib import Path 

11from typing import TYPE_CHECKING, Any 

12 

13import click 

14import jsonschema 

15 

16from apcore_cli.approval import check_approval 

17from apcore_cli.builtin_group import RESERVED_GROUP_NAMES as RESERVED_GROUP_NAMES # noqa: PLC0414 

18from apcore_cli.display_helpers import get_display as _get_display 

19from apcore_cli.output import format_exec_result 

20from apcore_cli.ref_resolver import RefResolverError, resolve_refs 

21from apcore_cli.schema_parser import reconvert_enum_values, schema_to_click_options 

22from apcore_cli.security.sandbox import Sandbox 

23 

24# FE-13 §11.4: ``BUILTIN_COMMANDS`` was retired in v0.7.0. All apcore-cli-provided 

25# commands now live under the single reserved ``apcli`` group, so the only 

26# collision surface is the group name itself (see ``RESERVED_GROUP_NAMES`` above, 

27# imported from :mod:`apcore_cli.builtin_group`). Stale ``from apcore_cli.cli 

28# import BUILTIN_COMMANDS`` imports will fail at import time rather than drift 

29# silently — intentional per spec §11.4. 

30 

31if TYPE_CHECKING: 

32 from apcore import Executor, Registry 

33 from apcore.registry.types import ModuleDescriptor 

34 

35 from apcore_cli.security.audit import AuditLogger 

36 

37logger = logging.getLogger("apcore_cli.cli") 

38 

39# Module-level audit logger, set during CLI init 

40_audit_logger: AuditLogger | None = None 

41 

42# Module-level verbose help flag, set during CLI init 

43_verbose_help: bool = False 

44 

45 

46def set_verbose_help(verbose: bool) -> None: 

47 """Set the verbose help flag. When False, built-in options are hidden.""" 

48 global _verbose_help 

49 _verbose_help = verbose 

50 

51 

52# Module-level docs URL, set by downstream projects 

53_docs_url: str | None = None 

54 

55 

56def set_docs_url(url: str | None) -> None: 

57 """Set the base URL for online documentation links in help and man pages. 

58 

59 Pass None to disable. Command-level help appends ``/commands/{name}`` 

60 automatically. 

61 

62 Example:: 

63 

64 set_docs_url("https://docs.apcore.dev/cli") 

65 """ 

66 global _docs_url 

67 _docs_url = url 

68 

69 

70def set_audit_logger(audit_logger: AuditLogger | None) -> None: 

71 """Set the global audit logger instance. Pass None to clear.""" 

72 global _audit_logger 

73 _audit_logger = audit_logger 

74 

75 

76class _LazyGroup(click.Group): 

77 """Click Group for a single command group — lazily builds subcommands.""" 

78 

79 def __init__( 

80 self, 

81 members: dict[str, tuple[str, Any]], 

82 executor: Any, 

83 help_text_max_length: int = 1000, 

84 extensions_root: str | None = None, 

85 **kwargs: Any, 

86 ) -> None: 

87 super().__init__(**kwargs) 

88 self._members = members # dict[cmd_name, (module_id, descriptor)] 

89 self._executor = executor 

90 self._help_text_max_length = help_text_max_length 

91 self._extensions_root = extensions_root 

92 self._cmd_cache: dict[str, click.Command] = {} 

93 

94 def list_commands(self, ctx: click.Context) -> list[str]: 

95 return sorted(self._members.keys()) 

96 

97 def get_command(self, ctx: click.Context, cmd_name: str) -> click.Command | None: 

98 if cmd_name in self._cmd_cache: 

99 return self._cmd_cache[cmd_name] 

100 entry = self._members.get(cmd_name) 

101 if entry is None: 

102 return None 

103 _, descriptor = entry 

104 cmd = build_module_command( 

105 descriptor, 

106 self._executor, 

107 help_text_max_length=self._help_text_max_length, 

108 cmd_name=cmd_name, 

109 extensions_root=self._extensions_root, 

110 ) 

111 self._cmd_cache[cmd_name] = cmd 

112 return cmd 

113 

114 

115class LazyModuleGroup(click.Group): 

116 """Custom Click Group that lazily loads apcore modules as subcommands.""" 

117 

118 def __init__( 

119 self, 

120 registry: Registry, 

121 executor: Executor, 

122 help_text_max_length: int = 1000, 

123 extensions_root: str | None = None, 

124 **kwargs: Any, 

125 ) -> None: 

126 super().__init__(**kwargs) 

127 self._registry = registry 

128 self._executor = executor 

129 self._help_text_max_length = help_text_max_length 

130 self._extensions_root = extensions_root 

131 self._module_cache: dict[str, click.Command] = {} 

132 # alias → canonical module_id (populated lazily) 

133 self._alias_map: dict[str, str] = {} 

134 # module_id → descriptor cache (populated during alias map build) 

135 self._descriptor_cache: dict[str, Any] = {} 

136 self._alias_map_built: bool = False 

137 

138 def _build_alias_map(self) -> None: 

139 """Build alias→module_id map from display overlay metadata.""" 

140 if self._alias_map_built: 

141 return 

142 try: 

143 for module_id in self._registry.list(): 

144 descriptor = self._registry.get_definition(module_id) 

145 if descriptor is None: 

146 continue 

147 self._descriptor_cache[module_id] = descriptor 

148 display = _get_display(descriptor) 

149 cli_alias: str | None = (display.get("cli") or {}).get("alias") 

150 if cli_alias and cli_alias != module_id: 

151 self._alias_map[cli_alias] = module_id 

152 self._alias_map_built = True 

153 except Exception: 

154 logger.warning("Failed to build alias map from registry") 

155 

156 def list_commands(self, ctx: click.Context) -> list[str]: 

157 # Root-level commands are whatever the factory has registered 

158 # (post-FE-13: the `apcli` group + any deprecation shims + caller 

159 # extras). Module-derived commands live on top of those. 

160 registered = [name for name, cmd in self.commands.items() if not cmd.hidden] 

161 try: 

162 self._build_alias_map() 

163 reverse: dict[str, str] = {v: k for k, v in self._alias_map.items()} 

164 module_ids = self._registry.list() 

165 names = [reverse.get(mid, mid) for mid in module_ids] 

166 except Exception: 

167 logger.warning("Failed to list modules from registry") 

168 names = [] 

169 return sorted(set(registered + names)) 

170 

171 def get_command(self, ctx: click.Context, cmd_name: str) -> click.Command | None: 

172 # Check built-in commands first 

173 if cmd_name in self.commands: 

174 return self.commands[cmd_name] 

175 

176 # Check cache 

177 if cmd_name in self._module_cache: 

178 return self._module_cache[cmd_name] 

179 

180 # Resolve alias → canonical module_id 

181 self._build_alias_map() 

182 module_id = self._alias_map.get(cmd_name, cmd_name) 

183 

184 # Look up in descriptor cache (populated during alias map build) or registry 

185 module_def = self._descriptor_cache.get(module_id) 

186 if module_def is None: 

187 module_def = self._registry.get_definition(module_id) 

188 if module_def is None: 

189 return None 

190 

191 cmd = build_module_command( 

192 module_def, 

193 self._executor, 

194 help_text_max_length=self._help_text_max_length, 

195 cmd_name=cmd_name, 

196 extensions_root=self._extensions_root, 

197 ) 

198 self._module_cache[cmd_name] = cmd 

199 return cmd 

200 

201 

202class GroupedModuleGroup(LazyModuleGroup): 

203 """Extended LazyModuleGroup that organises modules into named groups.""" 

204 

205 def __init__(self, **kwargs: Any) -> None: 

206 from apcore_cli.exposure import ExposureFilter 

207 

208 exposure_filter = kwargs.pop("exposure_filter", None) 

209 super().__init__(**kwargs) 

210 self._exposure_filter: ExposureFilter = exposure_filter or ExposureFilter() 

211 self._group_map: dict[str, dict[str, tuple[str, Any]]] = {} 

212 self._top_level_modules: dict[str, tuple[str, Any]] = {} 

213 self._group_cache: dict[str, _LazyGroup] = {} 

214 self._group_map_built: bool = False 

215 

216 @staticmethod 

217 def _resolve_group(module_id: str, descriptor: Any) -> tuple[str | None, str]: 

218 """Determine (group, command_name) for a module from its display overlay.""" 

219 if not module_id: 

220 logger.warning("Empty module_id encountered in _resolve_group") 

221 return (None, "") 

222 

223 display = _get_display(descriptor) 

224 cli_display = display.get("cli") or {} 

225 explicit_group = cli_display.get("group") 

226 

227 if isinstance(explicit_group, str) and explicit_group != "": 

228 return (explicit_group, cli_display.get("alias") or module_id) 

229 if explicit_group == "": 

230 return (None, cli_display.get("alias") or module_id) 

231 

232 cli_name = cli_display.get("alias") or module_id 

233 if "." in cli_name: 

234 group, _, cmd = cli_name.partition(".") 

235 return (group, cmd) 

236 return (None, cli_name) 

237 

238 def _build_group_map(self) -> None: 

239 """Build the group map from registry modules. 

240 

241 Applies the exposure filter before building groups — hidden modules are 

242 excluded from --help and tab-completion but remain invocable via exec. 

243 

244 FE-13 §4.10: rejects modules whose explicit group name, dotted-ID 

245 auto-extracted group, or top-level CLI name is ``apcli`` (reserved). 

246 Rejection is a hard :class:`click.UsageError` — exit 2 at import time. 

247 """ 

248 if self._group_map_built: 

249 return 

250 try: 

251 self._build_alias_map() 

252 for module_id in self._registry.list(): 

253 descriptor = self._descriptor_cache.get(module_id) 

254 if descriptor is None: 

255 continue 

256 # Skip modules that should not appear as CLI commands (FE-12). 

257 if not self._exposure_filter.is_exposed(module_id): 

258 continue 

259 group, cmd = self._resolve_group(module_id, descriptor) 

260 # FE-13: reserved-name enforcement covers all three collision 

261 # shapes — explicit group, auto-grouped from dotted module_id, 

262 # and top-level CLI name. Raised as UsageError so Click exits 

263 # with code 2 per spec §7 FR-13-09. UsageError must propagate 

264 # past the below try/except so test fixtures and live CLI see 

265 # the intended failure path. 

266 if group is not None and group in RESERVED_GROUP_NAMES: 

267 raise click.UsageError( 

268 f"Module '{module_id}': group name '{group}' is reserved. " 

269 f"Use a different CLI alias or set display.cli.group to another value." 

270 ) 

271 if group is None and cmd in RESERVED_GROUP_NAMES: 

272 raise click.UsageError( 

273 f"Module '{module_id}': top-level CLI name '{cmd}' is reserved. Use a different CLI alias." 

274 ) 

275 if group is None: 

276 self._top_level_modules[cmd] = (module_id, descriptor) 

277 elif not re.fullmatch(r"[a-z][a-z0-9_-]*", group): 

278 logger.warning( 

279 "Module '%s': group name '%s' is not shell-safe — treating as top-level.", 

280 module_id, 

281 group, 

282 ) 

283 self._top_level_modules[cmd] = (module_id, descriptor) 

284 else: 

285 self._group_map.setdefault(group, {})[cmd] = (module_id, descriptor) 

286 self._group_map_built = True 

287 except click.UsageError: 

288 raise 

289 except Exception: 

290 # Transient registry errors — allow retry on next call. 

291 logger.warning("Failed to build group map") 

292 

293 def list_commands(self, ctx: click.Context) -> list[str]: 

294 self._build_group_map() 

295 # Registered root commands (post-FE-13: `apcli` group + deprecation 

296 # shims + caller extras). Hidden entries (e.g. the apcli group when 

297 # mode=none) are filtered out so tab-completion matches --help. 

298 registered = [name for name, cmd in self.commands.items() if not cmd.hidden] 

299 group_names = list(self._group_map.keys()) 

300 top_names = list(self._top_level_modules.keys()) 

301 return sorted(set(registered + group_names + top_names)) 

302 

303 def get_command(self, ctx: click.Context, cmd_name: str) -> click.Command | None: 

304 # Check built-in commands first 

305 if cmd_name in self.commands: 

306 return self.commands[cmd_name] 

307 

308 self._build_group_map() 

309 

310 # Check group cache 

311 if cmd_name in self._group_cache: 

312 return self._group_cache[cmd_name] 

313 

314 # Check if it's a group 

315 if cmd_name in self._group_map: 

316 grp = _LazyGroup( 

317 members=self._group_map[cmd_name], 

318 executor=self._executor, 

319 help_text_max_length=self._help_text_max_length, 

320 extensions_root=self._extensions_root, 

321 name=cmd_name, 

322 ) 

323 self._group_cache[cmd_name] = grp 

324 return grp 

325 

326 # Check top-level modules 

327 if cmd_name in self._top_level_modules: 

328 if cmd_name in self._module_cache: 

329 return self._module_cache[cmd_name] 

330 _, descriptor = self._top_level_modules[cmd_name] 

331 cmd = build_module_command( 

332 descriptor, 

333 self._executor, 

334 help_text_max_length=self._help_text_max_length, 

335 cmd_name=cmd_name, 

336 extensions_root=self._extensions_root, 

337 ) 

338 self._module_cache[cmd_name] = cmd 

339 return cmd 

340 

341 return None 

342 

343 def format_help(self, ctx: click.Context, formatter: click.HelpFormatter) -> None: 

344 self._build_group_map() 

345 self.format_usage(ctx, formatter) 

346 if self.help: 

347 formatter.write_paragraph() 

348 formatter.write(self.help) 

349 

350 # Options section 

351 opts = [] 

352 for p in self.get_params(ctx): 

353 rv = p.get_help_record(ctx) 

354 if rv is not None: 

355 opts.append(rv) 

356 if opts: 

357 with formatter.section("Options"): 

358 formatter.write_dl(opts) 

359 

360 # Commands section. Post-FE-13, all apcore-cli built-ins live under 

361 # the `apcli` group — at root we only show non-hidden registered 

362 # commands (the apcli group itself when visible, help, and any 

363 # user-added extras via extra_commands / add_command). 

364 cmd_records = [] 

365 for name in sorted(self.commands): 

366 cmd = self.commands[name] 

367 if cmd is None or cmd.hidden: 

368 continue 

369 cmd_records.append((name, cmd.get_short_help_str())) 

370 if cmd_records: 

371 with formatter.section("Commands"): 

372 formatter.write_dl(cmd_records) 

373 

374 # Modules section (top-level) 

375 if self._top_level_modules: 

376 module_records = [] 

377 for name in sorted(self._top_level_modules.keys()): 

378 _, descriptor = self._top_level_modules[name] 

379 desc = getattr(descriptor, "description", "") or "" 

380 module_records.append((name, desc)) 

381 with formatter.section("Modules"): 

382 formatter.write_dl(module_records) 

383 

384 # Groups section (business-module-derived groups only — the `apcli` 

385 # reserved group was rejected in _build_group_map, so filtering here 

386 # is defensive rather than load-bearing). 

387 if self._group_map: 

388 group_records = [] 

389 for group_name in sorted(self._group_map.keys()): 

390 count = len(self._group_map[group_name]) 

391 suffix = "s" if count != 1 else "" 

392 group_records.append((group_name, f"({count} command{suffix})")) 

393 if group_records: 

394 with formatter.section("Groups"): 

395 formatter.write_dl(group_records) 

396 

397 # Footer hints for discoverability 

398 formatter.write_paragraph() 

399 formatter.write( 

400 "Use --help --verbose to show all options (including built-in apcore options).\n" 

401 "Use --help --man to display a formatted man page." 

402 ) 

403 

404 

405# Error code mapping from apcore error codes to CLI exit codes 

406_ERROR_CODE_MAP = { 

407 "MODULE_NOT_FOUND": 44, 

408 "MODULE_LOAD_ERROR": 44, 

409 "MODULE_DISABLED": 44, 

410 "SCHEMA_VALIDATION_ERROR": 45, 

411 "SCHEMA_CIRCULAR_REF": 48, 

412 "APPROVAL_DENIED": 46, 

413 "APPROVAL_TIMEOUT": 46, 

414 "APPROVAL_PENDING": 46, 

415 "CONFIG_NOT_FOUND": 47, 

416 "CONFIG_INVALID": 47, 

417 "MODULE_EXECUTE_ERROR": 1, 

418 "MODULE_TIMEOUT": 1, 

419 "ACL_DENIED": 77, 

420 # Config Bus errors (apcore >= 0.15.0) 

421 "CONFIG_NAMESPACE_RESERVED": 78, 

422 "CONFIG_NAMESPACE_DUPLICATE": 78, 

423 "CONFIG_ENV_PREFIX_CONFLICT": 78, 

424 "CONFIG_ENV_MAP_CONFLICT": 78, 

425 "CONFIG_MOUNT_ERROR": 66, 

426 "CONFIG_BIND_ERROR": 65, 

427 "ERROR_FORMATTER_DUPLICATE": 70, 

428} 

429 

430 

431def _first_failed_exit_code(result: Any) -> int: 

432 """Return the exit code for the first failed check in a PreflightResult.""" 

433 _check_to_exit = { 

434 "module_id": 2, 

435 "module_lookup": 44, 

436 "call_chain": 1, 

437 "acl": 77, 

438 "schema": 45, 

439 "approval": 46, 

440 "module_preflight": 1, 

441 } 

442 for check in getattr(result, "checks", []): 

443 if not check.passed: 

444 return _check_to_exit.get(check.check, 1) 

445 return 1 

446 

447 

448def format_preflight_result(result: Any, fmt: str | None = None) -> None: 

449 """Format and print a PreflightResult to stdout.""" 

450 from apcore_cli.output import resolve_format 

451 

452 resolved = resolve_format(fmt) 

453 if resolved == "json" or not sys.stdout.isatty(): 

454 payload: dict[str, Any] = { 

455 "valid": result.valid, 

456 "requires_approval": result.requires_approval, 

457 "checks": [], 

458 } 

459 for c in result.checks: 

460 entry: dict[str, Any] = {"check": c.check, "passed": c.passed} 

461 if c.error is not None: 

462 entry["error"] = c.error 

463 if c.warnings: 

464 entry["warnings"] = c.warnings 

465 payload["checks"].append(entry) 

466 click.echo(json.dumps(payload, indent=2, default=str)) 

467 else: 

468 # TTY table format 

469 for c in result.checks: 

470 has_warnings = bool(getattr(c, "warnings", [])) 

471 if c.passed and has_warnings: 

472 sym = "\u26a0" # ⚠ passed with warnings 

473 elif c.passed: 

474 sym = "\u2713" # ✓ passed 

475 elif c.passed is False: 

476 sym = "\u2717" # ✗ failed 

477 else: 

478 sym = "\u25cb" # ○ skipped 

479 status = f" {sym} {c.check:<20}" 

480 if c.error: 

481 detail = json.dumps(c.error, default=str) if isinstance(c.error, dict) else str(c.error) 

482 status += f" {detail}" 

483 elif c.passed and not has_warnings: 

484 status += " OK" 

485 elif not c.passed: 

486 status += " Skipped" 

487 click.echo(status) 

488 for w in getattr(c, "warnings", []): 

489 click.echo(f" Warning: {w}") 

490 errors = sum(1 for c in result.checks if not c.passed) 

491 warnings = sum(len(getattr(c, "warnings", [])) for c in result.checks) 

492 tag = "PASS" if result.valid else "FAIL" 

493 click.echo(f"\nResult: {tag} ({errors} error(s), {warnings} warning(s))") 

494 

495 

496def _emit_error_json(e: Exception, exit_code: int) -> None: 

497 """Emit structured JSON error to stderr for AI agents.""" 

498 code = getattr(e, "code", None) 

499 payload: dict[str, Any] = { 

500 "error": True, 

501 "code": code or "UNKNOWN", 

502 "message": str(e), 

503 "exit_code": exit_code, 

504 } 

505 for field in ("details", "suggestion", "ai_guidance", "retryable", "user_fixable"): 

506 val = getattr(e, field, None) 

507 if val is not None: 

508 payload[field] = val 

509 click.echo(json.dumps(payload, default=str), err=True) 

510 

511 

512def _emit_error_tty(e: Exception, exit_code: int) -> None: 

513 """Emit human-readable error to stderr with guidance fields.""" 

514 code = getattr(e, "code", None) 

515 header = f"Error [{code}]: {e}" if code else f"Error: {e}" 

516 click.echo(header, err=True) 

517 

518 details = getattr(e, "details", None) 

519 if details and isinstance(details, dict): 

520 click.echo("", err=True) 

521 click.echo(" Details:", err=True) 

522 for k, v in details.items(): 

523 click.echo(f" {k}: {v}", err=True) 

524 

525 suggestion = getattr(e, "suggestion", None) 

526 if suggestion: 

527 click.echo(f"\n Suggestion: {suggestion}", err=True) 

528 

529 retryable = getattr(e, "retryable", None) 

530 if retryable is not None: 

531 label = "Yes" if retryable else "No (same input will fail again)" 

532 click.echo(f" Retryable: {label}", err=True) 

533 

534 click.echo(f"\n Exit code: {exit_code}", err=True) 

535 

536 

537def _get_module_id(module_def: ModuleDescriptor) -> str: 

538 """Get the canonical module ID, falling back to module_id.""" 

539 cid = getattr(module_def, "canonical_id", None) 

540 if isinstance(cid, str): 

541 return cid 

542 return module_def.module_id 

543 

544 

545def build_module_command( 

546 module_def: ModuleDescriptor, 

547 executor: Executor, 

548 help_text_max_length: int = 1000, 

549 cmd_name: str | None = None, 

550 extensions_root: str | None = None, 

551) -> click.Command: 

552 """Build a Click command from an apcore module definition. 

553 

554 Generates Click options from the module's input_schema, wires up 

555 STDIN input collection, schema validation, approval gating, 

556 execution, audit logging, and output formatting. 

557 """ 

558 # Resolve display overlay fields (§5.13) 

559 display = _get_display(module_def) 

560 cli_display = display.get("cli") or {} 

561 

562 raw_schema = getattr(module_def, "input_schema", None) 

563 module_id = _get_module_id(module_def) 

564 # cmd_name is the user-facing name (alias or module_id) 

565 effective_cmd_name: str = cmd_name or cli_display.get("alias") or module_id 

566 cmd_help: str = cli_display.get("description") or module_def.description 

567 

568 # Defensively convert Pydantic model class to dict 

569 if raw_schema is None: 

570 input_schema: dict = {} 

571 elif isinstance(raw_schema, dict): 

572 input_schema = raw_schema 

573 elif hasattr(raw_schema, "model_json_schema"): 

574 # Pydantic v2 BaseModel class 

575 input_schema = raw_schema.model_json_schema() 

576 elif hasattr(raw_schema, "schema"): 

577 # Pydantic v1 BaseModel class 

578 input_schema = raw_schema.schema() 

579 else: 

580 input_schema = {} 

581 

582 if input_schema.get("properties"): 

583 try: 

584 resolved_schema = resolve_refs(input_schema, max_depth=32, module_id=module_id) 

585 except (SystemExit, RefResolverError): 

586 raise 

587 except Exception as e: 

588 logger.warning("Failed to resolve $refs in schema for '%s', using raw schema: %s", module_id, e) 

589 resolved_schema = input_schema 

590 else: 

591 resolved_schema = input_schema 

592 

593 try: 

594 schema_options = schema_to_click_options(resolved_schema, max_help_length=help_text_max_length) 

595 except ValueError as e: 

596 click.echo(f"Error: Module '{module_id}' schema error: {e}", err=True) 

597 sys.exit(2) 

598 

599 def callback(**kwargs: Any) -> None: 

600 # Separate built-in options from schema-generated kwargs 

601 stdin_input = kwargs.pop("input", None) 

602 auto_approve = kwargs.pop("yes", False) 

603 large_input = kwargs.pop("large_input", False) 

604 output_format = kwargs.pop("format", None) 

605 output_fields = kwargs.pop("fields", None) 

606 sandbox_flag = kwargs.pop("sandbox", False) 

607 dry_run = kwargs.pop("dry_run", False) 

608 trace_flag = kwargs.pop("trace", False) 

609 stream_flag = kwargs.pop("stream", False) 

610 strategy_name = kwargs.pop("strategy", None) 

611 approval_timeout = kwargs.pop("approval_timeout", None) or 60 

612 approval_token = kwargs.pop("approval_token", None) 

613 

614 merged: dict[str, Any] = {} 

615 audit_start = time.monotonic() 

616 try: 

617 # 1. Collect and merge input (STDIN + CLI flags) 

618 merged = collect_input(stdin_input, kwargs, large_input) 

619 

620 # 2. Reconvert enum values to original types 

621 merged = reconvert_enum_values(merged, schema_options) 

622 

623 # -- Dry-run: preflight validation only, no execution -- 

624 if dry_run: 

625 preflight = executor.validate(module_id, merged) 

626 format_preflight_result(preflight, output_format) 

627 # --trace --dry-run: show which pipeline steps would run 

628 if trace_flag and hasattr(preflight, "checks"): 

629 click.echo("\nPipeline preview (dry-run):", err=True) 

630 _pure_steps = { 

631 "context_creation", 

632 "call_chain_guard", 

633 "module_lookup", 

634 "acl_check", 

635 "input_validation", 

636 } 

637 _all_steps = [ 

638 "context_creation", 

639 "call_chain_guard", 

640 "module_lookup", 

641 "acl_check", 

642 "approval_gate", 

643 "middleware_before", 

644 "input_validation", 

645 "execute", 

646 "output_validation", 

647 "middleware_after", 

648 "return_result", 

649 ] 

650 for s in _all_steps: 

651 if s in _pure_steps: 

652 click.echo(f" \u2713 {s:<24} (pure — would execute)", err=True) 

653 else: 

654 click.echo(f" \u25cb {s:<24} (impure — skipped in dry-run)", err=True) 

655 sys.exit(0 if preflight.valid else _first_failed_exit_code(preflight)) 

656 

657 # 3. Validate against schema (if schema has properties) 

658 if resolved_schema.get("properties"): 

659 try: 

660 jsonschema.validate(merged, resolved_schema) 

661 except jsonschema.ValidationError as ve: 

662 click.echo( 

663 f"Error: Validation failed for '{ve.path}': {ve.message}.", 

664 err=True, 

665 ) 

666 sys.exit(45) 

667 

668 # -- Inject approval token if provided -- 

669 if approval_token: 

670 merged["_approval_token"] = approval_token 

671 

672 # 4. Check approval gate 

673 check_approval(module_def, auto_approve, timeout=approval_timeout) 

674 

675 # 5. Execute (optionally sandboxed) 

676 

677 # -- Streaming execution -- 

678 if stream_flag: 

679 import asyncio 

680 

681 # Streaming always outputs JSONL; --format table is ignored (spec §3.6.2) 

682 if output_format == "table": 

683 logger.warning("Streaming mode always outputs JSONL; --format table is ignored.") 

684 

685 annotations = getattr(module_def, "annotations", None) 

686 is_streaming = getattr(annotations, "streaming", False) 

687 if not is_streaming: 

688 logger.warning( 

689 "Module '%s' does not declare streaming support. Falling back to standard execution.", 

690 module_id, 

691 ) 

692 

693 if is_streaming and hasattr(executor, "stream"): 

694 

695 async def _do_stream() -> None: 

696 chunks = 0 

697 async for chunk in executor.stream(module_id, merged): 

698 chunks += 1 

699 click.echo(json.dumps(chunk, default=str)) 

700 sys.stdout.flush() 

701 if sys.stderr.isatty(): 

702 click.echo( 

703 f"\rStreaming {module_id}... ({chunks} chunks)", 

704 err=True, 

705 nl=False, 

706 ) 

707 if sys.stderr.isatty(): 

708 click.echo("", err=True) 

709 

710 asyncio.run(_do_stream()) 

711 duration_ms = int((time.monotonic() - audit_start) * 1000) 

712 if _audit_logger is not None: 

713 _audit_logger.log_execution(module_id, merged, "success", 0, duration_ms) 

714 return 

715 # else: fall through to normal execution 

716 

717 # -- Traced execution -- 

718 if trace_flag and hasattr(executor, "call_with_trace"): 

719 result, trace = executor.call_with_trace( 

720 module_id, 

721 merged, 

722 strategy=strategy_name, 

723 ) 

724 duration_ms = int((time.monotonic() - audit_start) * 1000) 

725 

726 # Print result (before audit so a formatter raise doesn't double-log) 

727 if output_format == "json" or not sys.stdout.isatty(): 

728 # Merge _trace into JSON output 

729 trace_data = { 

730 "strategy": trace.strategy_name, 

731 "total_duration_ms": trace.total_duration_ms, 

732 "success": trace.success, 

733 "steps": [ 

734 { 

735 "name": s.name, 

736 "duration_ms": s.duration_ms, 

737 "skipped": s.skipped, 

738 **({"skip_reason": s.skip_reason} if s.skipped else {}), 

739 } 

740 for s in trace.steps 

741 ], 

742 } 

743 if isinstance(result, dict): 

744 output = {**result, "_trace": trace_data} 

745 else: 

746 output = {"result": result, "_trace": trace_data} 

747 click.echo(json.dumps(output, indent=2, default=str)) 

748 else: 

749 format_exec_result(result, output_format, fields=output_fields) 

750 # Print trace to stderr 

751 step_count = len(trace.steps) 

752 click.echo( 

753 f"\nPipeline Trace (strategy: {trace.strategy_name}, " 

754 f"{step_count} steps, {trace.total_duration_ms:.1f}ms)", 

755 err=True, 

756 ) 

757 for s in trace.steps: 

758 if s.skipped: 

759 sym = "\u25cb" 

760 dur = "\u2014" 

761 reason = f" skipped ({s.skip_reason or 'n/a'})" 

762 else: 

763 sym = "\u2713" 

764 dur = f"{s.duration_ms:.1f}ms" 

765 reason = "" 

766 click.echo(f" {sym} {s.name:<24} {dur:>8}{reason}", err=True) 

767 

768 # Audit after formatting — formatter raise will not produce a 

769 # duplicate success+error pair 

770 if _audit_logger is not None: 

771 _audit_logger.log_execution(module_id, merged, "success", 0, duration_ms) 

772 return 

773 

774 # -- Standard execution (with optional strategy) -- 

775 sandbox = Sandbox(enabled=sandbox_flag, extensions_root=extensions_root) 

776 if strategy_name and hasattr(executor, "call_with_trace"): 

777 if sandbox_flag: 

778 # Sandbox mode: delegate to subprocess (strategy not available in sandbox) 

779 logger.warning("--sandbox ignores --strategy; sandboxed execution uses default strategy.") 

780 result = sandbox.execute(module_id, merged, executor) 

781 else: 

782 # Strategy requires call_with_trace to pass strategy param 

783 result, _trace = executor.call_with_trace( 

784 module_id, 

785 merged, 

786 strategy=strategy_name, 

787 ) 

788 if strategy_name != "standard" and sys.stderr.isatty(): 

789 click.echo( 

790 f"Warning: Using '{strategy_name}' strategy.", 

791 err=True, 

792 ) 

793 else: 

794 if strategy_name and not hasattr(executor, "call_with_trace"): 

795 logger.warning( 

796 "--strategy '%s' requested but executor does not support call_with_trace; " 

797 "using default pipeline.", 

798 strategy_name, 

799 ) 

800 result = sandbox.execute(module_id, merged, executor) 

801 duration_ms = int((time.monotonic() - audit_start) * 1000) 

802 

803 # 6. Format and print result (before audit so a formatter raise doesn't 

804 # produce a spurious success entry followed by an error entry) 

805 format_exec_result(result, output_format, fields=output_fields) 

806 

807 # 7. Audit log (success) — only reached if formatting did not raise 

808 if _audit_logger is not None: 

809 _audit_logger.log_execution(module_id, merged, "success", 0, duration_ms) 

810 

811 except KeyboardInterrupt: 

812 click.echo("Execution cancelled.", err=True) 

813 sys.exit(130) 

814 except SystemExit: 

815 raise 

816 except Exception as e: 

817 error_code = getattr(e, "code", None) 

818 exit_code = _ERROR_CODE_MAP.get(error_code, 1) if isinstance(error_code, str) else 1 

819 

820 # Audit log (error) 

821 if _audit_logger is not None: 

822 duration_ms = int((time.monotonic() - audit_start) * 1000) 

823 _audit_logger.log_execution(module_id, merged, "error", exit_code, duration_ms) 

824 

825 if output_format == "json" or not sys.stderr.isatty(): 

826 _emit_error_json(e, exit_code) 

827 else: 

828 _emit_error_tty(e, exit_code) 

829 sys.exit(exit_code) 

830 

831 # Build the command with schema-generated options + built-in options 

832 _epilog_parts: list[str] = [] 

833 if not _verbose_help: 

834 _epilog_parts.append("Use --verbose to show all options (including built-in apcore options).") 

835 if _docs_url: 

836 _epilog_parts.append(f"Docs: {_docs_url}/commands/{effective_cmd_name}") 

837 _epilog = "\n".join(_epilog_parts) if _epilog_parts else None 

838 cmd = click.Command( 

839 name=effective_cmd_name, 

840 help=cmd_help, 

841 callback=callback, 

842 epilog=_epilog, 

843 ) 

844 

845 # Add built-in options (hidden unless --verbose is passed with --help) 

846 _hide = not _verbose_help 

847 cmd.params.append( 

848 click.Option( 

849 ["--input"], 

850 default=None, 

851 help="Read JSON input from a file path, or use '-' to read from stdin pipe.", 

852 hidden=_hide, 

853 ) 

854 ) 

855 cmd.params.append( 

856 click.Option( 

857 ["--yes", "-y"], 

858 is_flag=True, 

859 default=False, 

860 help="Skip interactive approval prompts (for scripts and CI).", 

861 hidden=_hide, 

862 ) 

863 ) 

864 cmd.params.append( 

865 click.Option( 

866 ["--large-input"], 

867 is_flag=True, 

868 default=False, 

869 help="Allow stdin input larger than 10MB (default limit protects against accidental pipes).", 

870 hidden=_hide, 

871 ) 

872 ) 

873 cmd.params.append( 

874 click.Option( 

875 ["--format"], 

876 type=click.Choice(["json", "table", "csv", "yaml", "jsonl"]), 

877 default=None, 

878 help="Output format: json, table, csv, yaml, jsonl.", 

879 hidden=_hide, 

880 ) 

881 ) 

882 cmd.params.append( 

883 click.Option( 

884 ["--fields"], 

885 default=None, 

886 help="Comma-separated dot-paths to select from the result (e.g., 'status,data.count').", 

887 hidden=_hide, 

888 ) 

889 ) 

890 # --sandbox is always hidden (not yet implemented) 

891 cmd.params.append( 

892 click.Option( 

893 ["--sandbox"], 

894 is_flag=True, 

895 default=False, 

896 help="Run module in an isolated subprocess with restricted filesystem and env access.", 

897 hidden=True, 

898 ) 

899 ) 

900 cmd.params.append( 

901 click.Option( 

902 ["--dry-run"], 

903 is_flag=True, 

904 default=False, 

905 help="Run preflight checks without executing the module. Shows validation results.", 

906 hidden=_hide, 

907 ) 

908 ) 

909 cmd.params.append( 

910 click.Option( 

911 ["--trace"], 

912 is_flag=True, 

913 default=False, 

914 help="Show execution pipeline trace with per-step timing after the result.", 

915 hidden=_hide, 

916 ) 

917 ) 

918 cmd.params.append( 

919 click.Option( 

920 ["--stream"], 

921 is_flag=True, 

922 default=False, 

923 help="Stream module output as JSONL (one JSON object per line, flushed immediately).", 

924 hidden=_hide, 

925 ) 

926 ) 

927 cmd.params.append( 

928 click.Option( 

929 ["--strategy"], 

930 type=click.Choice(["standard", "internal", "testing", "performance", "minimal"]), 

931 default=None, 

932 help="Execution pipeline strategy: standard (default), internal, testing, performance.", 

933 hidden=_hide, 

934 ) 

935 ) 

936 cmd.params.append( 

937 click.Option( 

938 ["--approval-timeout"], 

939 type=int, 

940 default=None, 

941 help="Override approval prompt timeout in seconds (default: 60).", 

942 hidden=_hide, 

943 ) 

944 ) 

945 cmd.params.append( 

946 click.Option( 

947 ["--approval-token"], 

948 default=None, 

949 help="Resume a pending approval with the given token (for async approval flows).", 

950 hidden=_hide, 

951 ) 

952 ) 

953 

954 # Guard: schema property names must not collide with built-in option names. 

955 _reserved = { 

956 "input", 

957 "yes", 

958 "large_input", 

959 "format", 

960 "fields", 

961 "sandbox", 

962 "verbose", 

963 "dry_run", 

964 "trace", 

965 "stream", 

966 "strategy", 

967 "approval_timeout", 

968 "approval_token", 

969 } 

970 for opt in schema_options: 

971 if opt.name in _reserved: 

972 click.echo( 

973 f"Error: Module '{module_id}' schema property '{opt.name}' conflicts " 

974 f"with a reserved CLI option name. Rename the property.", 

975 err=True, 

976 ) 

977 sys.exit(2) 

978 

979 # Add schema-generated options 

980 cmd.params.extend(schema_options) 

981 

982 return cmd 

983 

984 

985def validate_module_id(module_id: str) -> None: 

986 """Validate module ID format and length. 

987 

988 Length limit tracks PROTOCOL_SPEC §2.7 EBNF constraint #1 — bumped from 

989 128 to 192 in spec 1.6.0-draft to accommodate Java/.NET deep-namespace 

990 FQN-derived IDs. Filesystem-safe (192 + len('.binding.yaml')=205 < 255). 

991 """ 

992 if len(module_id) > 192: 

993 click.echo( 

994 f"Error: Invalid module ID format: '{module_id}'. Maximum length is 192 characters.", 

995 err=True, 

996 ) 

997 sys.exit(2) 

998 if not re.fullmatch(r"[a-z][a-z0-9_]*(\.[a-z][a-z0-9_]*)*", module_id): 

999 click.echo( 

1000 f"Error: Invalid module ID format: '{module_id}'.", 

1001 err=True, 

1002 ) 

1003 sys.exit(2) 

1004 

1005 

1006def collect_input( 

1007 stdin_flag: str | None, 

1008 cli_kwargs: dict[str, Any], 

1009 large_input: bool = False, 

1010) -> dict[str, Any]: 

1011 """Collect and merge input from STDIN, a file path, and CLI flags. 

1012 

1013 ``stdin_flag`` accepts three forms: 

1014 * ``None`` or empty — use CLI kwargs only. 

1015 * ``"-"`` — read JSON from STDIN (10 MB cap unless ``large_input``). 

1016 * any other string — treat as a file path; open and read JSON. 

1017 

1018 CLI flags override STDIN/file values for duplicate keys. 

1019 """ 

1020 # Remove None values from CLI kwargs 

1021 cli_kwargs_non_none = {k: v for k, v in cli_kwargs.items() if v is not None} 

1022 

1023 if not stdin_flag: 

1024 return cli_kwargs_non_none 

1025 

1026 if stdin_flag == "-": 

1027 raw = sys.stdin.read() 

1028 raw_size = len(raw.encode("utf-8")) 

1029 

1030 if raw_size > 10_485_760 and not large_input: 

1031 click.echo( 

1032 "Error: STDIN input exceeds 10MB limit. Use --large-input to override.", 

1033 err=True, 

1034 ) 

1035 sys.exit(2) 

1036 source_label = "STDIN" 

1037 else: 

1038 # File-path form. 

1039 try: 

1040 raw = Path(stdin_flag).read_text(encoding="utf-8") 

1041 except FileNotFoundError: 

1042 click.echo(f"Error: --input file '{stdin_flag}' does not exist.", err=True) 

1043 sys.exit(2) 

1044 except OSError as e: 

1045 click.echo(f"Error: could not read --input file '{stdin_flag}': {e}.", err=True) 

1046 sys.exit(2) 

1047 raw_size = len(raw.encode("utf-8")) 

1048 if raw_size > 10_485_760 and not large_input: 

1049 click.echo( 

1050 f"Error: --input file '{stdin_flag}' exceeds 10MB limit. Use --large-input to override.", 

1051 err=True, 

1052 ) 

1053 sys.exit(2) 

1054 source_label = f"--input file '{stdin_flag}'" 

1055 

1056 if not raw: 

1057 stdin_data: dict[str, Any] = {} 

1058 else: 

1059 try: 

1060 stdin_data = json.loads(raw) 

1061 except json.JSONDecodeError as e: 

1062 click.echo( 

1063 f"Error: {source_label} does not contain valid JSON: {e.msg}.", 

1064 err=True, 

1065 ) 

1066 sys.exit(2) 

1067 

1068 if not isinstance(stdin_data, dict): 

1069 click.echo( 

1070 f"Error: {source_label} JSON must be an object, got {type(stdin_data).__name__}.", 

1071 err=True, 

1072 ) 

1073 sys.exit(2) 

1074 

1075 # CLI flags override STDIN/file for duplicate keys 

1076 return {**stdin_data, **cli_kwargs_non_none}