Coverage for src / apcore_cli / system_cmd.py: 89%

276 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-04-26 10:23 +0800

1"""System management commands — health, usage, enable, disable, reload, config (FE-11, FE-13). 

2 

3FE-13 §4.9 split the batched ``register_system_commands`` into six 

4per-subcommand registrars (``register_health_command`` … ``register_config_command``). 

5The per-command shape is required for apcli include/exclude filtering — 

6the factory dispatcher decides per-entry whether to attach to the 

7``apcli`` group. 

8 

9The legacy :func:`register_system_commands` remains as a thin wrapper for 

10pre-v0.7 call sites; it probes the executor for ``system.*`` modules and 

11skips attachment when they are absent, preserving the old "no-op if system 

12modules unavailable" behavior. 

13""" 

14 

15from __future__ import annotations 

16 

17import json 

18import logging 

19import sys 

20from typing import Any 

21 

22import click 

23 

24import apcore_cli.cli as _cli_module 

25from apcore_cli.approval import check_approval 

26from apcore_cli.output import format_exec_result, resolve_format 

27 

28logger = logging.getLogger("apcore_cli.system_cmd") 

29 

30 

31def _call_system_module(executor: Any, module_id: str, inputs: dict[str, Any]) -> Any: 

32 """Call a system module and return the result.""" 

33 return executor.call(module_id, inputs) 

34 

35 

36def _check_system_approval(executor: Any, module_id: str, auto_approve: bool) -> None: 

37 """Check approval for system commands that have requires_approval=True.""" 

38 module_def = _try_get_module_def(executor, module_id) 

39 if module_def is not None: 

40 # Let check_approval's own exits (sys.exit(46), ApprovalDeniedError, etc.) 

41 # propagate to the caller — only the registry-lookup step is guarded above. 

42 check_approval(module_def, auto_approve) 

43 

44 

45def _try_get_module_def(executor: Any, module_id: str) -> Any: 

46 """Best-effort registry lookup via executor._registry (apcore private attr). 

47 

48 Centralises the private-attribute access so the fragility surface is one 

49 function rather than scattered across system_cmd and strategy. Falls back 

50 to None on any AttributeError so callers gracefully degrade. 

51 """ 

52 try: 

53 if hasattr(executor, "_registry"): 

54 return executor._registry.get_definition(module_id) 

55 except Exception as e: 

56 logger.warning("executor._registry.get_definition(%r) failed: %s", module_id, e) 

57 return None 

58 

59 

60def _system_modules_available(executor: Any) -> bool: 

61 """Probe the executor for ``system.health.summary`` without executing it.""" 

62 try: 

63 if hasattr(executor, "validate"): 

64 executor.validate("system.health.summary", {}) 

65 return True 

66 return _try_get_module_def(executor, "system.health.summary") is not None 

67 except Exception as e: 

68 logger.warning("System module probe failed (system commands will be unavailable): %s", e) 

69 return False 

70 

71 

72def _format_health_summary_tty(result: dict[str, Any]) -> None: 

73 """Render health summary as a TTY table.""" 

74 summary = result.get("summary", {}) 

75 modules = result.get("modules", []) 

76 

77 if not modules: 

78 click.echo("No modules found.") 

79 return 

80 

81 click.echo(f"Health Overview ({summary.get('total_modules', len(modules))} modules)\n") 

82 click.echo(f" {'Module':<28} {'Status':<12} {'Error Rate':<12} Top Error") 

83 click.echo(f" {'-' * 28} {'-' * 12} {'-' * 12} {'-' * 20}") 

84 for m in modules: 

85 top = m.get("top_error") 

86 top_str = f"{top['code']} ({top.get('count', '?')})" if top else "—" 

87 rate = f"{m.get('error_rate', 0) * 100:.1f}%" 

88 click.echo(f" {m['module_id']:<28} {m['status']:<12} {rate:<12} {top_str}") 

89 

90 parts = [] 

91 for key in ("healthy", "degraded", "error"): 

92 count = summary.get(key, 0) 

93 if count: 

94 parts.append(f"{count} {key}") 

95 click.echo(f"\nSummary: {', '.join(parts) or 'no data'}") 

96 

97 

98def _format_health_module_tty(result: dict[str, Any]) -> None: 

99 """Render single-module health detail.""" 

100 click.echo(f"Module: {result.get('module_id', '?')}") 

101 click.echo(f"Status: {result.get('status', 'unknown')}") 

102 total = result.get("total_calls", 0) 

103 errors = result.get("error_count", 0) 

104 rate = result.get("error_rate", 0) 

105 avg = result.get("avg_latency_ms", 0) 

106 p99 = result.get("p99_latency_ms", 0) 

107 click.echo(f"Calls: {total:,} total | {errors:,} errors | {rate * 100:.1f}% error rate") 

108 click.echo(f"Latency: {avg:.0f}ms avg | {p99:.0f}ms p99") 

109 

110 recent = result.get("recent_errors", []) 

111 if recent: 

112 click.echo(f"\nRecent Errors (top {len(recent)}):") 

113 for e in recent: 

114 count = e.get("count", "?") 

115 last = e.get("last_occurred", "?") 

116 click.echo(f" {e.get('code', '?'):<24} x{count} (last: {last})") 

117 

118 

119def _format_usage_summary_tty(result: dict[str, Any]) -> None: 

120 """Render usage summary as a TTY table.""" 

121 modules = result.get("modules", []) 

122 period = result.get("period", "?") 

123 

124 if not modules: 

125 click.echo(f"No usage data for period {period}.") 

126 return 

127 

128 click.echo(f"Usage Summary (last {period})\n") 

129 click.echo(f" {'Module':<24} {'Calls':>8} {'Errors':>8} {'Avg Latency':>12} {'Trend':<10}") 

130 click.echo(f" {'-' * 24} {'-' * 8} {'-' * 8} {'-' * 12} {'-' * 10}") 

131 for m in modules: 

132 avg = f"{m.get('avg_latency_ms', 0):.0f}ms" 

133 click.echo( 

134 f" {m['module_id']:<24} {m.get('call_count', 0):>8,} " 

135 f"{m.get('error_count', 0):>8,} {avg:>12} {m.get('trend', ''):>10}" 

136 ) 

137 

138 total_calls = result.get("total_calls", sum(m.get("call_count", 0) for m in modules)) 

139 total_errors = result.get("total_errors", sum(m.get("error_count", 0) for m in modules)) 

140 click.echo(f"\nTotal: {total_calls:,} calls | {total_errors:,} errors") 

141 

142 

143# --------------------------------------------------------------------------- 

144# Per-subcommand registrars (FE-13 §4.9) 

145# --------------------------------------------------------------------------- 

146 

147 

148def register_health_command(apcli_group: click.Group, executor: Any) -> None: 

149 """Register the ``health`` subcommand on the given group.""" 

150 

151 @apcli_group.command("health") 

152 @click.argument("module_id", required=False) 

153 @click.option("--threshold", type=float, default=0.01, help="Error rate threshold (default: 0.01).") 

154 @click.option("--all", "include_all", is_flag=True, default=False, help="Include healthy modules.") 

155 @click.option("--errors", type=int, default=10, help="Max recent errors (module detail only).") 

156 @click.option("--format", "output_format", type=click.Choice(["table", "json"]), default=None) 

157 def health_cmd( 

158 module_id: str | None, 

159 threshold: float, 

160 include_all: bool, 

161 errors: int, 

162 output_format: str | None, 

163 ) -> None: 

164 """Show module health status. Optionally specify a module ID for details.""" 

165 fmt = resolve_format(output_format) 

166 try: 

167 if module_id: 

168 result = _call_system_module( 

169 executor, 

170 "system.health.module", 

171 {"module_id": module_id, "error_limit": errors}, 

172 ) 

173 if fmt == "json" or not sys.stdout.isatty(): 

174 click.echo(json.dumps(result, indent=2, default=str)) 

175 else: 

176 _format_health_module_tty(result) 

177 else: 

178 result = _call_system_module( 

179 executor, 

180 "system.health.summary", 

181 {"error_rate_threshold": threshold, "include_healthy": include_all}, 

182 ) 

183 if fmt == "json" or not sys.stdout.isatty(): 

184 click.echo(json.dumps(result, indent=2, default=str)) 

185 else: 

186 _format_health_summary_tty(result) 

187 except Exception as e: 

188 click.echo(f"Error: {e}", err=True) 

189 sys.exit(1) 

190 

191 _ = health_cmd 

192 

193 

194def register_usage_command(apcli_group: click.Group, executor: Any) -> None: 

195 """Register the ``usage`` subcommand on the given group.""" 

196 

197 @apcli_group.command("usage") 

198 @click.argument("module_id", required=False) 

199 @click.option("--period", default="24h", help="Time window: 1h, 24h, 7d, 30d (default: 24h).") 

200 @click.option("--format", "output_format", type=click.Choice(["table", "json"]), default=None) 

201 def usage_cmd(module_id: str | None, period: str, output_format: str | None) -> None: 

202 """Show module usage statistics. Optionally specify a module ID for details.""" 

203 fmt = resolve_format(output_format) 

204 try: 

205 if module_id: 

206 result = _call_system_module( 

207 executor, 

208 "system.usage.module", 

209 {"module_id": module_id, "period": period}, 

210 ) 

211 else: 

212 result = _call_system_module( 

213 executor, 

214 "system.usage.summary", 

215 {"period": period}, 

216 ) 

217 if fmt == "json" or not sys.stdout.isatty(): 

218 click.echo(json.dumps(result, indent=2, default=str)) 

219 elif module_id: 

220 format_exec_result(result, fmt) 

221 else: 

222 _format_usage_summary_tty(result) 

223 except Exception as e: 

224 click.echo(f"Error: {e}", err=True) 

225 sys.exit(1) 

226 

227 _ = usage_cmd 

228 

229 

230def register_enable_command(apcli_group: click.Group, executor: Any) -> None: 

231 """Register the ``enable`` subcommand on the given group.""" 

232 

233 @apcli_group.command("enable") 

234 @click.argument("module_id") 

235 @click.option("--reason", required=True, help="Reason for enabling (recorded in module audit trail).") 

236 @click.option("--yes", "-y", is_flag=True, default=False, help="Skip approval prompt.") 

237 @click.option("--format", "output_format", type=click.Choice(["table", "json"]), default=None) 

238 def enable_cmd(module_id: str, reason: str, yes: bool, output_format: str | None) -> None: 

239 """Enable a disabled module at runtime.""" 

240 import time 

241 

242 _check_system_approval(executor, "system.control.toggle_feature", yes) 

243 fmt = resolve_format(output_format) 

244 audit_start = time.monotonic() 

245 try: 

246 result = _call_system_module( 

247 executor, 

248 "system.control.toggle_feature", 

249 {"module_id": module_id, "enabled": True, "reason": reason}, 

250 ) 

251 duration_ms = int((time.monotonic() - audit_start) * 1000) 

252 _al = _cli_module._audit_logger 

253 if _al is not None: 

254 _al.log_execution( 

255 "system.control.toggle_feature", 

256 {"module_id": module_id, "enabled": True}, 

257 "success", 

258 0, 

259 duration_ms, 

260 ) 

261 if fmt == "json" or not sys.stdout.isatty(): 

262 click.echo(json.dumps(result, indent=2, default=str)) 

263 else: 

264 click.echo(f"Module '{module_id}' enabled.\n Reason: {reason}") 

265 except Exception as e: 

266 duration_ms = int((time.monotonic() - audit_start) * 1000) 

267 _al = _cli_module._audit_logger 

268 if _al is not None: 

269 _al.log_execution( 

270 "system.control.toggle_feature", 

271 {"module_id": module_id, "enabled": True}, 

272 "error", 

273 1, 

274 duration_ms, 

275 ) 

276 click.echo(f"Error: {e}", err=True) 

277 sys.exit(1) 

278 

279 _ = enable_cmd 

280 

281 

282def register_disable_command(apcli_group: click.Group, executor: Any) -> None: 

283 """Register the ``disable`` subcommand on the given group.""" 

284 

285 @apcli_group.command("disable") 

286 @click.argument("module_id") 

287 @click.option("--reason", required=True, help="Reason for disabling (recorded in module audit trail).") 

288 @click.option("--yes", "-y", is_flag=True, default=False, help="Skip approval prompt.") 

289 @click.option("--format", "output_format", type=click.Choice(["table", "json"]), default=None) 

290 def disable_cmd(module_id: str, reason: str, yes: bool, output_format: str | None) -> None: 

291 """Disable a module at runtime (calls are rejected until re-enabled).""" 

292 import time 

293 

294 _check_system_approval(executor, "system.control.toggle_feature", yes) 

295 fmt = resolve_format(output_format) 

296 audit_start = time.monotonic() 

297 try: 

298 result = _call_system_module( 

299 executor, 

300 "system.control.toggle_feature", 

301 {"module_id": module_id, "enabled": False, "reason": reason}, 

302 ) 

303 duration_ms = int((time.monotonic() - audit_start) * 1000) 

304 _al = _cli_module._audit_logger 

305 if _al is not None: 

306 _al.log_execution( 

307 "system.control.toggle_feature", 

308 {"module_id": module_id, "enabled": False}, 

309 "success", 

310 0, 

311 duration_ms, 

312 ) 

313 if fmt == "json" or not sys.stdout.isatty(): 

314 click.echo(json.dumps(result, indent=2, default=str)) 

315 else: 

316 click.echo(f"Module '{module_id}' disabled.\n Reason: {reason}") 

317 except Exception as e: 

318 duration_ms = int((time.monotonic() - audit_start) * 1000) 

319 _al = _cli_module._audit_logger 

320 if _al is not None: 

321 _al.log_execution( 

322 "system.control.toggle_feature", 

323 {"module_id": module_id, "enabled": False}, 

324 "error", 

325 1, 

326 duration_ms, 

327 ) 

328 click.echo(f"Error: {e}", err=True) 

329 sys.exit(1) 

330 

331 _ = disable_cmd 

332 

333 

334def register_reload_command(apcli_group: click.Group, executor: Any) -> None: 

335 """Register the ``reload`` subcommand on the given group.""" 

336 

337 @apcli_group.command("reload") 

338 @click.argument("module_id") 

339 @click.option("--reason", required=True, help="Reason for reload (recorded in module audit trail).") 

340 @click.option("--yes", "-y", is_flag=True, default=False, help="Skip approval prompt.") 

341 @click.option("--format", "output_format", type=click.Choice(["table", "json"]), default=None) 

342 def reload_cmd(module_id: str, reason: str, yes: bool, output_format: str | None) -> None: 

343 """Hot-reload a module from disk.""" 

344 import time 

345 

346 _check_system_approval(executor, "system.control.reload_module", yes) 

347 fmt = resolve_format(output_format) 

348 audit_start = time.monotonic() 

349 try: 

350 result = _call_system_module( 

351 executor, 

352 "system.control.reload_module", 

353 {"module_id": module_id, "reason": reason}, 

354 ) 

355 duration_ms = int((time.monotonic() - audit_start) * 1000) 

356 _al = _cli_module._audit_logger 

357 if _al is not None: 

358 _al.log_execution("system.control.reload_module", {"module_id": module_id}, "success", 0, duration_ms) 

359 if fmt == "json" or not sys.stdout.isatty(): 

360 click.echo(json.dumps(result, indent=2, default=str)) 

361 else: 

362 prev = result.get("previous_version", "?") 

363 new = result.get("new_version", "?") 

364 dur = result.get("reload_duration_ms", "?") 

365 click.echo(f"Module '{module_id}' reloaded.") 

366 click.echo(f" Version: {prev} -> {new}") 

367 click.echo(f" Duration: {dur}ms") 

368 except Exception as e: 

369 duration_ms = int((time.monotonic() - audit_start) * 1000) 

370 _al = _cli_module._audit_logger 

371 if _al is not None: 

372 _al.log_execution("system.control.reload_module", {"module_id": module_id}, "error", 1, duration_ms) 

373 click.echo(f"Error: {e}", err=True) 

374 sys.exit(1) 

375 

376 _ = reload_cmd 

377 

378 

379def register_config_command(apcli_group: click.Group, executor: Any) -> None: 

380 """Register the ``config`` subgroup (``get`` / ``set``) on the given group.""" 

381 

382 @apcli_group.group("config") 

383 def config_group() -> None: 

384 """Read or update runtime configuration.""" 

385 

386 @config_group.command("get") 

387 @click.argument("key") 

388 def config_get_cmd(key: str) -> None: 

389 """Read a configuration value by dot-path key.""" 

390 try: 

391 from apcore import Config 

392 

393 value = Config().get(key) 

394 click.echo(f"{key} = {value!r}") 

395 except Exception as e: 

396 click.echo(f"Error: {e}", err=True) 

397 sys.exit(1) 

398 

399 @config_group.command("set") 

400 @click.argument("key") 

401 @click.argument("value") 

402 @click.option("--reason", required=True, help="Reason for config change (recorded in module audit trail).") 

403 @click.option("-y", "--yes", "auto_approve", is_flag=True, default=False, help="Auto-approve.") 

404 @click.option("--format", "output_format", type=click.Choice(["table", "json"]), default=None) 

405 def config_set_cmd(key: str, value: str, reason: str, auto_approve: bool, output_format: str | None) -> None: 

406 """Update a runtime configuration value (requires approval).""" 

407 import time 

408 

409 _check_system_approval(executor, "system.control.update_config", auto_approve) 

410 fmt = resolve_format(output_format) 

411 try: 

412 parsed_value = json.loads(value) 

413 except (json.JSONDecodeError, ValueError): 

414 parsed_value = value 

415 

416 audit_start = time.monotonic() 

417 try: 

418 result = _call_system_module( 

419 executor, 

420 "system.control.update_config", 

421 {"key": key, "value": parsed_value, "reason": reason}, 

422 ) 

423 duration_ms = int((time.monotonic() - audit_start) * 1000) 

424 _al = _cli_module._audit_logger 

425 if _al is not None: 

426 _al.log_execution("system.control.update_config", {"key": key}, "success", 0, duration_ms) 

427 if fmt == "json" or not sys.stdout.isatty(): 

428 click.echo(json.dumps(result, indent=2, default=str)) 

429 else: 

430 old = result.get("old_value", "?") 

431 new = result.get("new_value", "?") 

432 click.echo(f"Config updated: {key}") 

433 click.echo(f" {old!r} -> {new!r}") 

434 click.echo(f" Reason: {reason}") 

435 except Exception as e: 

436 duration_ms = int((time.monotonic() - audit_start) * 1000) 

437 _al = _cli_module._audit_logger 

438 if _al is not None: 

439 _al.log_execution("system.control.update_config", {"key": key}, "error", 1, duration_ms) 

440 click.echo(f"Error: {e}", err=True) 

441 sys.exit(1) 

442 

443 _ = config_get_cmd 

444 _ = config_set_cmd 

445 

446 

447# --------------------------------------------------------------------------- 

448# Back-compat batched registrar (pre-v0.7 call sites) 

449# --------------------------------------------------------------------------- 

450 

451 

452def register_system_commands(cli: click.Group, executor: Any) -> None: 

453 """Legacy wrapper — registers all six system subcommands on the given group. 

454 

455 Probes the executor for ``system.*`` modules and silently skips 

456 registration if they are unavailable, matching the pre-FE-13 behavior. 

457 Post-FE-13 canonical wiring attaches each subcommand individually to 

458 the ``apcli`` group via the :mod:`apcore_cli.factory` dispatcher. 

459 """ 

460 if not _system_modules_available(executor): 

461 logger.debug("System modules not available; skipping system command registration.") 

462 return 

463 

464 register_health_command(cli, executor) 

465 register_usage_command(cli, executor) 

466 register_enable_command(cli, executor) 

467 register_disable_command(cli, executor) 

468 register_reload_command(cli, executor) 

469 register_config_command(cli, executor)