Coverage for src / apcore_cli / system_cmd.py: 89%
276 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-26 10:23 +0800
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-26 10:23 +0800
1"""System management commands — health, usage, enable, disable, reload, config (FE-11, FE-13).
3FE-13 §4.9 split the batched ``register_system_commands`` into six
4per-subcommand registrars (``register_health_command`` … ``register_config_command``).
5The per-command shape is required for apcli include/exclude filtering —
6the factory dispatcher decides per-entry whether to attach to the
7``apcli`` group.
9The legacy :func:`register_system_commands` remains as a thin wrapper for
10pre-v0.7 call sites; it probes the executor for ``system.*`` modules and
11skips attachment when they are absent, preserving the old "no-op if system
12modules unavailable" behavior.
13"""
15from __future__ import annotations
17import json
18import logging
19import sys
20from typing import Any
22import click
24import apcore_cli.cli as _cli_module
25from apcore_cli.approval import check_approval
26from apcore_cli.output import format_exec_result, resolve_format
28logger = logging.getLogger("apcore_cli.system_cmd")
31def _call_system_module(executor: Any, module_id: str, inputs: dict[str, Any]) -> Any:
32 """Call a system module and return the result."""
33 return executor.call(module_id, inputs)
36def _check_system_approval(executor: Any, module_id: str, auto_approve: bool) -> None:
37 """Check approval for system commands that have requires_approval=True."""
38 module_def = _try_get_module_def(executor, module_id)
39 if module_def is not None:
40 # Let check_approval's own exits (sys.exit(46), ApprovalDeniedError, etc.)
41 # propagate to the caller — only the registry-lookup step is guarded above.
42 check_approval(module_def, auto_approve)
45def _try_get_module_def(executor: Any, module_id: str) -> Any:
46 """Best-effort registry lookup via executor._registry (apcore private attr).
48 Centralises the private-attribute access so the fragility surface is one
49 function rather than scattered across system_cmd and strategy. Falls back
50 to None on any AttributeError so callers gracefully degrade.
51 """
52 try:
53 if hasattr(executor, "_registry"):
54 return executor._registry.get_definition(module_id)
55 except Exception as e:
56 logger.warning("executor._registry.get_definition(%r) failed: %s", module_id, e)
57 return None
60def _system_modules_available(executor: Any) -> bool:
61 """Probe the executor for ``system.health.summary`` without executing it."""
62 try:
63 if hasattr(executor, "validate"):
64 executor.validate("system.health.summary", {})
65 return True
66 return _try_get_module_def(executor, "system.health.summary") is not None
67 except Exception as e:
68 logger.warning("System module probe failed (system commands will be unavailable): %s", e)
69 return False
72def _format_health_summary_tty(result: dict[str, Any]) -> None:
73 """Render health summary as a TTY table."""
74 summary = result.get("summary", {})
75 modules = result.get("modules", [])
77 if not modules:
78 click.echo("No modules found.")
79 return
81 click.echo(f"Health Overview ({summary.get('total_modules', len(modules))} modules)\n")
82 click.echo(f" {'Module':<28} {'Status':<12} {'Error Rate':<12} Top Error")
83 click.echo(f" {'-' * 28} {'-' * 12} {'-' * 12} {'-' * 20}")
84 for m in modules:
85 top = m.get("top_error")
86 top_str = f"{top['code']} ({top.get('count', '?')})" if top else "—"
87 rate = f"{m.get('error_rate', 0) * 100:.1f}%"
88 click.echo(f" {m['module_id']:<28} {m['status']:<12} {rate:<12} {top_str}")
90 parts = []
91 for key in ("healthy", "degraded", "error"):
92 count = summary.get(key, 0)
93 if count:
94 parts.append(f"{count} {key}")
95 click.echo(f"\nSummary: {', '.join(parts) or 'no data'}")
98def _format_health_module_tty(result: dict[str, Any]) -> None:
99 """Render single-module health detail."""
100 click.echo(f"Module: {result.get('module_id', '?')}")
101 click.echo(f"Status: {result.get('status', 'unknown')}")
102 total = result.get("total_calls", 0)
103 errors = result.get("error_count", 0)
104 rate = result.get("error_rate", 0)
105 avg = result.get("avg_latency_ms", 0)
106 p99 = result.get("p99_latency_ms", 0)
107 click.echo(f"Calls: {total:,} total | {errors:,} errors | {rate * 100:.1f}% error rate")
108 click.echo(f"Latency: {avg:.0f}ms avg | {p99:.0f}ms p99")
110 recent = result.get("recent_errors", [])
111 if recent:
112 click.echo(f"\nRecent Errors (top {len(recent)}):")
113 for e in recent:
114 count = e.get("count", "?")
115 last = e.get("last_occurred", "?")
116 click.echo(f" {e.get('code', '?'):<24} x{count} (last: {last})")
119def _format_usage_summary_tty(result: dict[str, Any]) -> None:
120 """Render usage summary as a TTY table."""
121 modules = result.get("modules", [])
122 period = result.get("period", "?")
124 if not modules:
125 click.echo(f"No usage data for period {period}.")
126 return
128 click.echo(f"Usage Summary (last {period})\n")
129 click.echo(f" {'Module':<24} {'Calls':>8} {'Errors':>8} {'Avg Latency':>12} {'Trend':<10}")
130 click.echo(f" {'-' * 24} {'-' * 8} {'-' * 8} {'-' * 12} {'-' * 10}")
131 for m in modules:
132 avg = f"{m.get('avg_latency_ms', 0):.0f}ms"
133 click.echo(
134 f" {m['module_id']:<24} {m.get('call_count', 0):>8,} "
135 f"{m.get('error_count', 0):>8,} {avg:>12} {m.get('trend', ''):>10}"
136 )
138 total_calls = result.get("total_calls", sum(m.get("call_count", 0) for m in modules))
139 total_errors = result.get("total_errors", sum(m.get("error_count", 0) for m in modules))
140 click.echo(f"\nTotal: {total_calls:,} calls | {total_errors:,} errors")
143# ---------------------------------------------------------------------------
144# Per-subcommand registrars (FE-13 §4.9)
145# ---------------------------------------------------------------------------
148def register_health_command(apcli_group: click.Group, executor: Any) -> None:
149 """Register the ``health`` subcommand on the given group."""
151 @apcli_group.command("health")
152 @click.argument("module_id", required=False)
153 @click.option("--threshold", type=float, default=0.01, help="Error rate threshold (default: 0.01).")
154 @click.option("--all", "include_all", is_flag=True, default=False, help="Include healthy modules.")
155 @click.option("--errors", type=int, default=10, help="Max recent errors (module detail only).")
156 @click.option("--format", "output_format", type=click.Choice(["table", "json"]), default=None)
157 def health_cmd(
158 module_id: str | None,
159 threshold: float,
160 include_all: bool,
161 errors: int,
162 output_format: str | None,
163 ) -> None:
164 """Show module health status. Optionally specify a module ID for details."""
165 fmt = resolve_format(output_format)
166 try:
167 if module_id:
168 result = _call_system_module(
169 executor,
170 "system.health.module",
171 {"module_id": module_id, "error_limit": errors},
172 )
173 if fmt == "json" or not sys.stdout.isatty():
174 click.echo(json.dumps(result, indent=2, default=str))
175 else:
176 _format_health_module_tty(result)
177 else:
178 result = _call_system_module(
179 executor,
180 "system.health.summary",
181 {"error_rate_threshold": threshold, "include_healthy": include_all},
182 )
183 if fmt == "json" or not sys.stdout.isatty():
184 click.echo(json.dumps(result, indent=2, default=str))
185 else:
186 _format_health_summary_tty(result)
187 except Exception as e:
188 click.echo(f"Error: {e}", err=True)
189 sys.exit(1)
191 _ = health_cmd
194def register_usage_command(apcli_group: click.Group, executor: Any) -> None:
195 """Register the ``usage`` subcommand on the given group."""
197 @apcli_group.command("usage")
198 @click.argument("module_id", required=False)
199 @click.option("--period", default="24h", help="Time window: 1h, 24h, 7d, 30d (default: 24h).")
200 @click.option("--format", "output_format", type=click.Choice(["table", "json"]), default=None)
201 def usage_cmd(module_id: str | None, period: str, output_format: str | None) -> None:
202 """Show module usage statistics. Optionally specify a module ID for details."""
203 fmt = resolve_format(output_format)
204 try:
205 if module_id:
206 result = _call_system_module(
207 executor,
208 "system.usage.module",
209 {"module_id": module_id, "period": period},
210 )
211 else:
212 result = _call_system_module(
213 executor,
214 "system.usage.summary",
215 {"period": period},
216 )
217 if fmt == "json" or not sys.stdout.isatty():
218 click.echo(json.dumps(result, indent=2, default=str))
219 elif module_id:
220 format_exec_result(result, fmt)
221 else:
222 _format_usage_summary_tty(result)
223 except Exception as e:
224 click.echo(f"Error: {e}", err=True)
225 sys.exit(1)
227 _ = usage_cmd
230def register_enable_command(apcli_group: click.Group, executor: Any) -> None:
231 """Register the ``enable`` subcommand on the given group."""
233 @apcli_group.command("enable")
234 @click.argument("module_id")
235 @click.option("--reason", required=True, help="Reason for enabling (recorded in module audit trail).")
236 @click.option("--yes", "-y", is_flag=True, default=False, help="Skip approval prompt.")
237 @click.option("--format", "output_format", type=click.Choice(["table", "json"]), default=None)
238 def enable_cmd(module_id: str, reason: str, yes: bool, output_format: str | None) -> None:
239 """Enable a disabled module at runtime."""
240 import time
242 _check_system_approval(executor, "system.control.toggle_feature", yes)
243 fmt = resolve_format(output_format)
244 audit_start = time.monotonic()
245 try:
246 result = _call_system_module(
247 executor,
248 "system.control.toggle_feature",
249 {"module_id": module_id, "enabled": True, "reason": reason},
250 )
251 duration_ms = int((time.monotonic() - audit_start) * 1000)
252 _al = _cli_module._audit_logger
253 if _al is not None:
254 _al.log_execution(
255 "system.control.toggle_feature",
256 {"module_id": module_id, "enabled": True},
257 "success",
258 0,
259 duration_ms,
260 )
261 if fmt == "json" or not sys.stdout.isatty():
262 click.echo(json.dumps(result, indent=2, default=str))
263 else:
264 click.echo(f"Module '{module_id}' enabled.\n Reason: {reason}")
265 except Exception as e:
266 duration_ms = int((time.monotonic() - audit_start) * 1000)
267 _al = _cli_module._audit_logger
268 if _al is not None:
269 _al.log_execution(
270 "system.control.toggle_feature",
271 {"module_id": module_id, "enabled": True},
272 "error",
273 1,
274 duration_ms,
275 )
276 click.echo(f"Error: {e}", err=True)
277 sys.exit(1)
279 _ = enable_cmd
282def register_disable_command(apcli_group: click.Group, executor: Any) -> None:
283 """Register the ``disable`` subcommand on the given group."""
285 @apcli_group.command("disable")
286 @click.argument("module_id")
287 @click.option("--reason", required=True, help="Reason for disabling (recorded in module audit trail).")
288 @click.option("--yes", "-y", is_flag=True, default=False, help="Skip approval prompt.")
289 @click.option("--format", "output_format", type=click.Choice(["table", "json"]), default=None)
290 def disable_cmd(module_id: str, reason: str, yes: bool, output_format: str | None) -> None:
291 """Disable a module at runtime (calls are rejected until re-enabled)."""
292 import time
294 _check_system_approval(executor, "system.control.toggle_feature", yes)
295 fmt = resolve_format(output_format)
296 audit_start = time.monotonic()
297 try:
298 result = _call_system_module(
299 executor,
300 "system.control.toggle_feature",
301 {"module_id": module_id, "enabled": False, "reason": reason},
302 )
303 duration_ms = int((time.monotonic() - audit_start) * 1000)
304 _al = _cli_module._audit_logger
305 if _al is not None:
306 _al.log_execution(
307 "system.control.toggle_feature",
308 {"module_id": module_id, "enabled": False},
309 "success",
310 0,
311 duration_ms,
312 )
313 if fmt == "json" or not sys.stdout.isatty():
314 click.echo(json.dumps(result, indent=2, default=str))
315 else:
316 click.echo(f"Module '{module_id}' disabled.\n Reason: {reason}")
317 except Exception as e:
318 duration_ms = int((time.monotonic() - audit_start) * 1000)
319 _al = _cli_module._audit_logger
320 if _al is not None:
321 _al.log_execution(
322 "system.control.toggle_feature",
323 {"module_id": module_id, "enabled": False},
324 "error",
325 1,
326 duration_ms,
327 )
328 click.echo(f"Error: {e}", err=True)
329 sys.exit(1)
331 _ = disable_cmd
334def register_reload_command(apcli_group: click.Group, executor: Any) -> None:
335 """Register the ``reload`` subcommand on the given group."""
337 @apcli_group.command("reload")
338 @click.argument("module_id")
339 @click.option("--reason", required=True, help="Reason for reload (recorded in module audit trail).")
340 @click.option("--yes", "-y", is_flag=True, default=False, help="Skip approval prompt.")
341 @click.option("--format", "output_format", type=click.Choice(["table", "json"]), default=None)
342 def reload_cmd(module_id: str, reason: str, yes: bool, output_format: str | None) -> None:
343 """Hot-reload a module from disk."""
344 import time
346 _check_system_approval(executor, "system.control.reload_module", yes)
347 fmt = resolve_format(output_format)
348 audit_start = time.monotonic()
349 try:
350 result = _call_system_module(
351 executor,
352 "system.control.reload_module",
353 {"module_id": module_id, "reason": reason},
354 )
355 duration_ms = int((time.monotonic() - audit_start) * 1000)
356 _al = _cli_module._audit_logger
357 if _al is not None:
358 _al.log_execution("system.control.reload_module", {"module_id": module_id}, "success", 0, duration_ms)
359 if fmt == "json" or not sys.stdout.isatty():
360 click.echo(json.dumps(result, indent=2, default=str))
361 else:
362 prev = result.get("previous_version", "?")
363 new = result.get("new_version", "?")
364 dur = result.get("reload_duration_ms", "?")
365 click.echo(f"Module '{module_id}' reloaded.")
366 click.echo(f" Version: {prev} -> {new}")
367 click.echo(f" Duration: {dur}ms")
368 except Exception as e:
369 duration_ms = int((time.monotonic() - audit_start) * 1000)
370 _al = _cli_module._audit_logger
371 if _al is not None:
372 _al.log_execution("system.control.reload_module", {"module_id": module_id}, "error", 1, duration_ms)
373 click.echo(f"Error: {e}", err=True)
374 sys.exit(1)
376 _ = reload_cmd
379def register_config_command(apcli_group: click.Group, executor: Any) -> None:
380 """Register the ``config`` subgroup (``get`` / ``set``) on the given group."""
382 @apcli_group.group("config")
383 def config_group() -> None:
384 """Read or update runtime configuration."""
386 @config_group.command("get")
387 @click.argument("key")
388 def config_get_cmd(key: str) -> None:
389 """Read a configuration value by dot-path key."""
390 try:
391 from apcore import Config
393 value = Config().get(key)
394 click.echo(f"{key} = {value!r}")
395 except Exception as e:
396 click.echo(f"Error: {e}", err=True)
397 sys.exit(1)
399 @config_group.command("set")
400 @click.argument("key")
401 @click.argument("value")
402 @click.option("--reason", required=True, help="Reason for config change (recorded in module audit trail).")
403 @click.option("-y", "--yes", "auto_approve", is_flag=True, default=False, help="Auto-approve.")
404 @click.option("--format", "output_format", type=click.Choice(["table", "json"]), default=None)
405 def config_set_cmd(key: str, value: str, reason: str, auto_approve: bool, output_format: str | None) -> None:
406 """Update a runtime configuration value (requires approval)."""
407 import time
409 _check_system_approval(executor, "system.control.update_config", auto_approve)
410 fmt = resolve_format(output_format)
411 try:
412 parsed_value = json.loads(value)
413 except (json.JSONDecodeError, ValueError):
414 parsed_value = value
416 audit_start = time.monotonic()
417 try:
418 result = _call_system_module(
419 executor,
420 "system.control.update_config",
421 {"key": key, "value": parsed_value, "reason": reason},
422 )
423 duration_ms = int((time.monotonic() - audit_start) * 1000)
424 _al = _cli_module._audit_logger
425 if _al is not None:
426 _al.log_execution("system.control.update_config", {"key": key}, "success", 0, duration_ms)
427 if fmt == "json" or not sys.stdout.isatty():
428 click.echo(json.dumps(result, indent=2, default=str))
429 else:
430 old = result.get("old_value", "?")
431 new = result.get("new_value", "?")
432 click.echo(f"Config updated: {key}")
433 click.echo(f" {old!r} -> {new!r}")
434 click.echo(f" Reason: {reason}")
435 except Exception as e:
436 duration_ms = int((time.monotonic() - audit_start) * 1000)
437 _al = _cli_module._audit_logger
438 if _al is not None:
439 _al.log_execution("system.control.update_config", {"key": key}, "error", 1, duration_ms)
440 click.echo(f"Error: {e}", err=True)
441 sys.exit(1)
443 _ = config_get_cmd
444 _ = config_set_cmd
447# ---------------------------------------------------------------------------
448# Back-compat batched registrar (pre-v0.7 call sites)
449# ---------------------------------------------------------------------------
452def register_system_commands(cli: click.Group, executor: Any) -> None:
453 """Legacy wrapper — registers all six system subcommands on the given group.
455 Probes the executor for ``system.*`` modules and silently skips
456 registration if they are unavailable, matching the pre-FE-13 behavior.
457 Post-FE-13 canonical wiring attaches each subcommand individually to
458 the ``apcli`` group via the :mod:`apcore_cli.factory` dispatcher.
459 """
460 if not _system_modules_available(executor):
461 logger.debug("System modules not available; skipping system command registration.")
462 return
464 register_health_command(cli, executor)
465 register_usage_command(cli, executor)
466 register_enable_command(cli, executor)
467 register_disable_command(cli, executor)
468 register_reload_command(cli, executor)
469 register_config_command(cli, executor)