Coverage for src / apcore_cli / discovery.py: 99%

189 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-04-26 10:23 +0800

1"""Discovery commands — list, describe, exec, validate (FE-04, FE-11, FE-13). 

2 

3FE-13 §4.9 split the batched ``register_discovery_commands`` into four 

4per-subcommand registrars (``register_list_command``, 

5``register_describe_command``, ``register_exec_command``, 

6``register_validate_command``). The per-command shape is required for 

7apcli include/exclude filtering — the factory dispatcher decides at 

8registration time whether each subcommand should be attached to the 

9``apcli`` group or skipped. 

10 

11The legacy :func:`register_discovery_commands` remains as a thin wrapper 

12so pre-v0.7 test fixtures and call sites keep working. 

13""" 

14 

15from __future__ import annotations 

16 

17import json 

18import logging 

19import re 

20import sys 

21from typing import Any 

22 

23import click 

24 

25import apcore_cli.cli as _cli_module 

26from apcore_cli.cli import ( 

27 _first_failed_exit_code, 

28 collect_input, 

29 format_preflight_result, 

30 validate_module_id, 

31) 

32from apcore_cli.display_helpers import get_cli_display_fields 

33from apcore_cli.output import ( 

34 format_exec_result, 

35 format_grouped_module_list, 

36 format_module_detail, 

37 format_module_list, 

38 resolve_format, 

39) 

40 

41logger = logging.getLogger("apcore_cli.discovery") 

42 

43_TAG_PATTERN = re.compile(r"^[a-z][a-z0-9_-]*$") 

44 

45 

46def _validate_tag(tag: str) -> None: 

47 """Validate tag format.""" 

48 if not _TAG_PATTERN.match(tag): 

49 click.echo( 

50 f"Error: Invalid tag format: '{tag}'. Tags must match [a-z][a-z0-9_-]*.", 

51 err=True, 

52 ) 

53 sys.exit(2) 

54 

55 

56def _resolve_group_for_display(descriptor: Any) -> tuple[str | None, str]: 

57 """Resolve group name and command name for display — delegates to GroupedModuleGroup.""" 

58 from apcore_cli.cli import GroupedModuleGroup 

59 

60 module_id = getattr(descriptor, "module_id", "") or "" 

61 return GroupedModuleGroup._resolve_group(module_id, descriptor) 

62 

63 

64# --------------------------------------------------------------------------- 

65# Per-subcommand registrars (FE-13 §4.9) 

66# --------------------------------------------------------------------------- 

67 

68 

69def register_list_command( 

70 apcli_group: click.Group, 

71 registry: Any, 

72 exposure_filter: Any | None = None, 

73) -> None: 

74 """Register the ``list`` subcommand on the given group. 

75 

76 Accepts the apcli Click group (post-FE-13 canonical attachment point) 

77 or any other Click group for back-compat test fixtures. 

78 """ 

79 from apcore_cli.exposure import ExposureFilter 

80 

81 if exposure_filter is None: 

82 exposure_filter = ExposureFilter() 

83 

84 @apcli_group.command("list") 

85 @click.option("--tag", multiple=True, help="Filter modules by tag (AND logic). Repeatable.") 

86 @click.option("--flat", is_flag=True, default=False, help="Show flat list (no grouping).") 

87 @click.option( 

88 "--format", 

89 "output_format", 

90 type=click.Choice(["table", "json", "csv", "yaml", "jsonl"]), 

91 default=None, 

92 help="Output format. Default: table (TTY) or json (non-TTY).", 

93 ) 

94 @click.option("--search", "-s", default=None, help="Filter by substring match on ID and description.") 

95 @click.option( 

96 "--status", 

97 type=click.Choice(["enabled", "disabled", "all"]), 

98 default="enabled", 

99 help="Filter by module status. Default: enabled.", 

100 ) 

101 @click.option( 

102 "--annotation", 

103 "-a", 

104 multiple=True, 

105 type=click.Choice( 

106 [ 

107 "destructive", 

108 "requires-approval", 

109 "readonly", 

110 "streaming", 

111 "cacheable", 

112 "idempotent", 

113 # apcore >= 0.19.0 ModuleAnnotations additions. 

114 "paginated", 

115 ] 

116 ), 

117 help="Filter by annotation flag (AND logic). Repeatable.", 

118 ) 

119 @click.option( 

120 "--sort", 

121 type=click.Choice(["id", "calls", "errors", "latency"]), 

122 default="id", 

123 help="Sort order. Default: id.", 

124 ) 

125 @click.option("--reverse", is_flag=True, default=False, help="Reverse sort order.") 

126 @click.option("--deprecated", is_flag=True, default=False, help="Include deprecated modules.") 

127 @click.option("--deps", is_flag=True, default=False, help="Show dependency count column.") 

128 @click.option( 

129 "--exposure", 

130 type=click.Choice(["exposed", "hidden", "all"]), 

131 default="exposed", 

132 help="Filter by exposure status. Default: exposed.", 

133 ) 

134 @click.pass_context 

135 def list_cmd( 

136 ctx: click.Context, 

137 tag: tuple[str, ...], 

138 flat: bool, 

139 output_format: str | None, 

140 search: str | None, 

141 status: str, 

142 annotation: tuple[str, ...], 

143 sort: str, 

144 reverse: bool, 

145 deprecated: bool, 

146 deps: bool, 

147 exposure: str, 

148 ) -> None: 

149 """List available modules in the registry.""" 

150 for t in tag: 

151 _validate_tag(t) 

152 

153 # Prefer a filter pushed into ctx.obj (factory.py wires it there); 

154 # fall back to the closure-captured default (mode=all) for tests. 

155 obj = (ctx.obj or {}) if ctx else {} 

156 ctx_filter = obj.get("exposure_filter") if isinstance(obj, dict) else None 

157 active_filter = ctx_filter if ctx_filter is not None else exposure_filter 

158 

159 modules = [] 

160 for mid in registry.list(): 

161 mdef = registry.get_definition(mid) 

162 if mdef is not None: 

163 modules.append(mdef) 

164 

165 if tag: 

166 filter_tags = set(tag) 

167 modules = [m for m in modules if filter_tags.issubset(set(getattr(m, "tags", [])))] 

168 

169 if search: 

170 query = search.lower() 

171 modules = [ 

172 m 

173 for m in modules 

174 if query in (getattr(m, "module_id", "") or "").lower() 

175 or query in (getattr(m, "description", "") or "").lower() 

176 ] 

177 

178 if status == "enabled": 

179 modules = [m for m in modules if getattr(m, "enabled", None) is not False] 

180 elif status == "disabled": 

181 modules = [m for m in modules if getattr(m, "enabled", None) is False] 

182 

183 if not deprecated: 

184 modules = [m for m in modules if getattr(m, "deprecated", False) is not True] 

185 

186 if annotation: 

187 _ann_map = { 

188 "destructive": "destructive", 

189 "requires-approval": "requires_approval", 

190 "readonly": "readonly", 

191 "streaming": "streaming", 

192 "cacheable": "cacheable", 

193 "idempotent": "idempotent", 

194 "paginated": "paginated", 

195 } 

196 for ann_flag in annotation: 

197 attr = _ann_map.get(ann_flag, ann_flag) 

198 modules = [m for m in modules if getattr(getattr(m, "annotations", None), attr, False) is True] 

199 

200 if exposure == "exposed": 

201 modules = [m for m in modules if active_filter.is_exposed(getattr(m, "module_id", ""))] 

202 elif exposure == "hidden": 

203 modules = [m for m in modules if not active_filter.is_exposed(getattr(m, "module_id", ""))] 

204 

205 if sort in ("calls", "errors", "latency"): 

206 logger.warning( 

207 "Usage data not available; sorting by id. Sort by %s requires system.usage modules.", 

208 sort, 

209 ) 

210 modules.sort(key=lambda m: getattr(m, "module_id", ""), reverse=reverse) 

211 

212 fmt = resolve_format(output_format) 

213 show_exposure_col = exposure == "all" 

214 

215 if flat or fmt in ("json", "csv", "yaml", "jsonl"): 

216 format_module_list( 

217 modules, 

218 fmt, 

219 filter_tags=tag, 

220 show_deps=deps, 

221 exposure_filter=active_filter if show_exposure_col else None, 

222 ) 

223 else: 

224 grouped: dict[str | None, list[tuple[str, str, list[str]]]] = {} 

225 for m in modules: 

226 group_name, cmd_name = _resolve_group_for_display(m) 

227 _, desc, tags_val = get_cli_display_fields(m) 

228 grouped.setdefault(group_name, []).append((cmd_name, desc, tags_val)) 

229 format_grouped_module_list(grouped, filter_tags=tag) 

230 

231 _ = list_cmd # silence unused-var checker 

232 

233 

234def register_describe_command(apcli_group: click.Group, registry: Any) -> None: 

235 """Register the ``describe`` subcommand on the given group.""" 

236 

237 @apcli_group.command("describe") 

238 @click.argument("module_id") 

239 @click.option( 

240 "--format", 

241 "output_format", 

242 type=click.Choice(["table", "json"]), 

243 default=None, 

244 help="Output format. Default: table (TTY) or json (non-TTY).", 

245 ) 

246 def describe_cmd(module_id: str, output_format: str | None) -> None: 

247 """Show metadata, schema, and annotations for a module.""" 

248 validate_module_id(module_id) 

249 

250 module_def = registry.get_definition(module_id) 

251 if module_def is None: 

252 click.echo(f"Error: Module '{module_id}' not found.", err=True) 

253 sys.exit(44) 

254 

255 fmt = resolve_format(output_format) 

256 format_module_detail(module_def, fmt) 

257 

258 _ = describe_cmd 

259 

260 

261def register_exec_command( 

262 apcli_group: click.Group, 

263 registry: Any, 

264 executor: Any, 

265) -> None: 

266 """Register the generic ``exec`` subcommand on the apcli group (FE-13). 

267 

268 Dispatch shape: ``apcli exec <module-id> [--input JSON] [--format fmt]``. 

269 Unlike the per-module commands built by :func:`build_module_command`, this 

270 command does not derive options from the module's input schema — inputs 

271 are passed as a JSON object via ``--input``. 

272 """ 

273 from apcore_cli.approval import check_approval 

274 from apcore_cli.cli import _ERROR_CODE_MAP, _emit_error_tty 

275 

276 @apcli_group.command("exec") 

277 @click.argument("module_id") 

278 @click.option( 

279 "--format", 

280 "output_format", 

281 type=click.Choice(["json", "table", "csv", "yaml", "jsonl"]), 

282 default=None, 

283 help="Output format.", 

284 ) 

285 @click.option("--fields", default=None, help="Comma-separated dot-paths to select from the result.") 

286 @click.option( 

287 "--input", 

288 "stdin_input", 

289 default=None, 

290 help="JSON object passed as input to the module. Use '-' to read JSON from stdin.", 

291 ) 

292 @click.option("-y", "--yes", "auto_approve", is_flag=True, default=False, help="Auto-approve.") 

293 @click.option( 

294 "--approval-timeout", 

295 type=int, 

296 default=None, 

297 help="Seconds to wait for interactive approval.", 

298 ) 

299 @click.option( 

300 "--sandbox", 

301 is_flag=True, 

302 default=False, 

303 help="Run module in an isolated subprocess with restricted filesystem and env access.", 

304 ) 

305 @click.option("--strategy", default=None, help="Execution strategy (standard, parallel, sequential, etc.).") 

306 @click.option("--trace", is_flag=True, default=False, help="Enable pipeline trace output.") 

307 @click.option("--dry-run", "dry_run", is_flag=True, default=False, help="Validate inputs without executing.") 

308 @click.option("--stream", is_flag=True, default=False, help="Stream output as JSONL.") 

309 def exec_cmd( 

310 module_id: str, 

311 output_format: str | None, 

312 fields: str | None, 

313 stdin_input: str | None, 

314 auto_approve: bool, 

315 approval_timeout: int | None, 

316 sandbox: bool, 

317 strategy: str | None, 

318 trace: bool, 

319 dry_run: bool, 

320 stream: bool, 

321 ) -> None: 

322 """Execute a module by ID with JSON input.""" 

323 validate_module_id(module_id) 

324 

325 module_def = registry.get_definition(module_id) 

326 if module_def is None: 

327 click.echo(f"Error: Module '{module_id}' not found.", err=True) 

328 sys.exit(44) 

329 

330 # Distinguish stdin marker from inline JSON literal. 

331 merged: dict[str, Any] = {} 

332 if stdin_input == "-": 

333 merged = collect_input("-", {}, False) 

334 elif stdin_input is not None: 

335 try: 

336 parsed = json.loads(stdin_input) 

337 except json.JSONDecodeError as e: 

338 click.echo(f"Error: --input is not valid JSON: {e}", err=True) 

339 sys.exit(2) 

340 if not isinstance(parsed, dict): 

341 click.echo("Error: --input JSON must be an object.", err=True) 

342 sys.exit(2) 

343 merged = parsed 

344 

345 import time 

346 

347 from apcore_cli.security.sandbox import Sandbox 

348 

349 audit_start = time.monotonic() 

350 try: 

351 timeout = approval_timeout if approval_timeout is not None else 60 

352 check_approval(module_def, auto_approve=auto_approve, timeout=timeout) 

353 

354 if dry_run: 

355 preflight = executor.validate(module_id, merged) 

356 format_preflight_result(preflight, output_format) 

357 return 

358 

359 if (trace or strategy) and hasattr(executor, "call_with_trace"): 

360 result, _trace_data = executor.call_with_trace( 

361 module_id, 

362 merged, 

363 strategy=strategy, 

364 ) 

365 else: 

366 result = Sandbox(enabled=sandbox).execute(module_id, merged, executor) 

367 

368 # Format output FIRST (canonical order: format → audit on success) 

369 fmt = resolve_format(output_format) 

370 format_exec_result(result, fmt, fields) 

371 duration_ms = int((time.monotonic() - audit_start) * 1000) 

372 _al = _cli_module._audit_logger 

373 if _al is not None: 

374 _al.log_execution(module_id, merged, "success", 0, duration_ms) 

375 except Exception as e: 

376 code = getattr(e, "code", None) 

377 exit_code = _ERROR_CODE_MAP.get(code, 1) if isinstance(code, str) else 1 

378 duration_ms = int((time.monotonic() - audit_start) * 1000) 

379 _al = _cli_module._audit_logger 

380 if _al is not None: 

381 _al.log_execution(module_id, merged, "error", exit_code, duration_ms) 

382 _emit_error_tty(e, exit_code) 

383 sys.exit(exit_code) 

384 

385 _ = exec_cmd 

386 

387 

388def register_validate_command(apcli_group: click.Group, registry: Any, executor: Any) -> None: 

389 """Register the ``validate`` subcommand on the given group.""" 

390 from apcore_cli.cli import _ERROR_CODE_MAP, _emit_error_tty 

391 

392 @apcli_group.command("validate") 

393 @click.argument("module_id") 

394 @click.option("--input", "stdin_input", default=None, help="JSON input file or '-' for stdin.") 

395 @click.option( 

396 "--format", 

397 "output_format", 

398 type=click.Choice(["table", "json"]), 

399 default=None, 

400 help="Output format.", 

401 ) 

402 def validate_cmd(module_id: str, stdin_input: str | None, output_format: str | None) -> None: 

403 """Run preflight checks without executing a module.""" 

404 validate_module_id(module_id) 

405 

406 module_def = registry.get_definition(module_id) 

407 if module_def is None: 

408 click.echo(f"Error: Module '{module_id}' not found.", err=True) 

409 sys.exit(44) 

410 

411 merged = collect_input(stdin_input, {}, False) if stdin_input else {} 

412 try: 

413 preflight = executor.validate(module_id, merged) 

414 format_preflight_result(preflight, output_format) 

415 except Exception as e: 

416 code = getattr(e, "code", None) 

417 exit_code = _ERROR_CODE_MAP.get(code, 1) if isinstance(code, str) else 1 

418 _al = _cli_module._audit_logger 

419 if _al is not None: 

420 _al.log_execution(module_id, merged, "error", exit_code, 0) 

421 _emit_error_tty(e, exit_code) 

422 sys.exit(exit_code) 

423 sys.exit(0 if preflight.valid else _first_failed_exit_code(preflight)) 

424 

425 _ = validate_cmd 

426 

427 

428# --------------------------------------------------------------------------- 

429# Back-compat batched registrar (pre-v0.7 call sites) 

430# --------------------------------------------------------------------------- 

431 

432 

433def register_discovery_commands( 

434 cli: click.Group, 

435 registry: Any, 

436 exposure_filter: Any | None = None, 

437) -> None: 

438 """Legacy wrapper — delegates to the per-subcommand registrars. 

439 

440 Pre-FE-13 callers (the original ``register_discovery_commands``) attached 

441 ``list`` + ``describe`` directly to the root Click group. FE-13 moves 

442 those under the ``apcli`` group; the new canonical wiring lives in 

443 :func:`apcore_cli.factory._register_apcli_subcommands`. This shim keeps 

444 existing tests working by registering ``list`` + ``describe`` on the 

445 group the caller passes in. 

446 """ 

447 register_list_command(cli, registry, exposure_filter=exposure_filter) 

448 register_describe_command(cli, registry)