Coverage for src / apcore_cli / approval.py: 65%
113 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-26 10:23 +0800
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-26 10:23 +0800
1"""Approval Gate — TTY-aware HITL approval (FE-03, FE-11 §3.5)."""
3from __future__ import annotations
5import logging
6import os
7import sys
8import threading
9from typing import Any
11import click
13logger = logging.getLogger("apcore_cli.approval")
16class ApprovalTimeoutError(Exception):
17 """Raised when the approval prompt times out.
19 Carries ``code='APPROVAL_TIMEOUT'`` so the discovery.py exec_cmd
20 ``except Exception`` handler maps it to exit code 46 via _ERROR_CODE_MAP
21 while still flushing the audit log (D11-001).
22 """
24 code = "APPROVAL_TIMEOUT"
27class ApprovalDeniedError(Exception):
28 """Raised when an approval request is denied (rejected or pending).
30 Carries ``code='APPROVAL_DENIED'`` so the discovery.py exec_cmd
31 ``except Exception`` handler maps it to exit code 46 via _ERROR_CODE_MAP
32 while still flushing the audit log (D11-001). Previously the legacy
33 ``check_approval`` wrapper called ``sys.exit(46)`` directly — SystemExit
34 is a BaseException and bypassed the audit-flush handler.
35 """
37 code = "APPROVAL_DENIED"
40def _get_annotation(annotations: Any, key: str, default: Any = None) -> Any:
41 """Get an annotation value from either a dict or a ModuleAnnotations object."""
42 if isinstance(annotations, dict):
43 return annotations.get(key, default)
44 return getattr(annotations, key, default)
47# ---------------------------------------------------------------------------
48# CliApprovalHandler — implements apcore ApprovalHandler protocol (FE-11 §3.5)
49# ---------------------------------------------------------------------------
52class CliApprovalHandler:
53 """ApprovalHandler that prompts in TTY, auto-denies in non-TTY (unless bypassed).
55 Implements the apcore ApprovalHandler protocol:
56 - ``request_approval(request) -> ApprovalResult``
57 - ``check_approval(approval_id) -> ApprovalResult``
59 Pass to Executor via ``executor.set_approval_handler(handler)``.
60 """
62 def __init__(self, auto_approve: bool = False, timeout: int = 60) -> None:
63 self.auto_approve = auto_approve
64 self.timeout = max(1, min(timeout, 3600))
66 async def request_approval(self, request: Any) -> Any:
67 """Request approval for a module invocation.
69 Follows the apcore ApprovalRequest/ApprovalResult protocol.
70 Returns a dict with ``status``, ``approved_by``, ``reason`` fields
71 (duck-type compatible with ApprovalResult dataclass).
72 """
73 module_id = getattr(request, "module_id", "unknown")
75 # Bypass: auto_approve flag
76 if self.auto_approve:
77 logger.info("Approval bypassed via --yes flag for module '%s'.", module_id)
78 return {"status": "approved", "approved_by": "auto_approve"}
80 # Bypass: APCORE_CLI_AUTO_APPROVE env var
81 env_val = os.environ.get("APCORE_CLI_AUTO_APPROVE", "")
82 if env_val == "1":
83 logger.info("Approval bypassed via APCORE_CLI_AUTO_APPROVE for '%s'.", module_id)
84 return {"status": "approved", "approved_by": "env_auto_approve"}
85 if env_val != "" and env_val != "1":
86 logger.warning("APCORE_CLI_AUTO_APPROVE='%s', expected '1'. Ignoring.", env_val)
88 # Non-TTY: reject
89 if not sys.stdin.isatty():
90 return {
91 "status": "rejected",
92 "reason": "Non-interactive session without --yes",
93 }
95 # TTY prompt
96 annotations = getattr(request, "annotations", None) or {}
97 extra = getattr(annotations, "extra", {}) if not isinstance(annotations, dict) else annotations
98 message = extra.get("approval_message") or f"Module '{module_id}' requires approval to execute."
100 click.echo(message, err=True)
101 try:
102 approved = _tty_prompt(module_id, self.timeout)
103 except ApprovalTimeoutError:
104 return {"status": "timeout", "reason": f"Timed out after {self.timeout}s"}
106 if approved:
107 return {"status": "approved", "approved_by": "tty_user"}
108 return {"status": "rejected", "reason": "User rejected"}
110 async def check_approval(self, approval_id: str) -> Any:
111 """Check status of a previously pending approval (Phase B).
113 CLI does not support async approval polling; always returns rejected.
114 """
115 return {
116 "status": "rejected",
117 "reason": "CLI does not support async approval polling",
118 }
121# ---------------------------------------------------------------------------
122# Legacy check_approval() — backward-compatible wrapper
123# ---------------------------------------------------------------------------
126def check_approval(module_def: Any, auto_approve: bool, timeout: int = 60) -> None:
127 """Check if module requires approval and handle accordingly.
129 Returns None if approved (or approval not required).
130 Raises :class:`ApprovalDeniedError` on denial / non-TTY / user reject,
131 or :class:`ApprovalTimeoutError` on prompt timeout. Both errors carry a
132 ``code`` attribute that maps to exit code 46 via the CLI ``_ERROR_CODE_MAP``;
133 the discovery.py exec_cmd ``except Exception`` handler is responsible for
134 flushing the audit log and translating the error to ``sys.exit(46)``
135 (D11-001 — previously this function called ``sys.exit`` directly, which
136 raises BaseException and bypassed the audit-flush handler).
138 Args:
139 module_def: Module descriptor with annotations.
140 auto_approve: If True, bypass approval (--yes flag).
141 timeout: Approval prompt timeout in seconds.
142 """
143 annotations = getattr(module_def, "annotations", None)
144 if annotations is None or (not isinstance(annotations, dict) and not hasattr(annotations, "requires_approval")):
145 return
147 requires = _get_annotation(annotations, "requires_approval", False)
148 if requires is not True:
149 return
151 module_id = getattr(module_def, "module_id", getattr(module_def, "canonical_id", "unknown"))
153 # Bypass: --yes flag (highest priority)
154 if auto_approve is True:
155 logger.info("Approval bypassed via --yes flag for module '%s'.", module_id)
156 return
158 # Bypass: APCORE_CLI_AUTO_APPROVE env var
159 env_val = os.environ.get("APCORE_CLI_AUTO_APPROVE", "")
160 if env_val == "1":
161 logger.info("Approval bypassed via APCORE_CLI_AUTO_APPROVE for '%s'.", module_id)
162 return
163 if env_val != "" and env_val != "1":
164 logger.warning("APCORE_CLI_AUTO_APPROVE='%s', expected '1'. Ignoring.", env_val)
166 # Non-TTY check
167 if not sys.stdin.isatty():
168 raise ApprovalDeniedError(
169 f"Module '{module_id}' requires approval but no interactive "
170 "terminal is available. Use --yes or set APCORE_CLI_AUTO_APPROVE=1 "
171 "to bypass."
172 )
174 # TTY prompt
175 _prompt_with_timeout(module_def, timeout=timeout)
178# ---------------------------------------------------------------------------
179# Internal prompt implementation
180# ---------------------------------------------------------------------------
183def _prompt_with_timeout(module_def: Any, timeout: int = 60) -> None:
184 """Display approval prompt with timeout."""
185 timeout = max(1, min(timeout, 3600))
187 module_id = getattr(module_def, "module_id", getattr(module_def, "canonical_id", "unknown"))
188 annotations = getattr(module_def, "annotations", None) or {}
189 message = _get_annotation(annotations, "approval_message", None)
190 if message is None:
191 message = f"Module '{module_id}' requires approval to execute."
193 click.echo(message, err=True)
195 try:
196 approved = _tty_prompt(module_id, timeout)
197 except ApprovalTimeoutError:
198 raise ApprovalTimeoutError(f"Approval prompt timed out after {timeout} seconds.") from None
200 if approved:
201 logger.info("User approved execution of module '%s'.", module_id)
202 else:
203 raise ApprovalDeniedError("Approval denied.")
206def _tty_prompt(module_id: str, timeout: int) -> bool:
207 """Run the TTY prompt with timeout. Returns True if approved, raises on timeout."""
208 if sys.platform != "win32":
209 return _prompt_unix(module_id, timeout)
210 return _prompt_windows(module_id, timeout)
213def _prompt_unix(module_id: str, timeout: int) -> bool:
214 """Unix approval prompt using SIGALRM."""
215 import signal
217 def _timeout_handler(signum: int, frame: Any) -> None:
218 raise ApprovalTimeoutError()
220 old_handler = signal.signal(signal.SIGALRM, _timeout_handler)
221 signal.alarm(timeout)
223 try:
224 approved = click.confirm("Proceed?", default=False)
225 except ApprovalTimeoutError:
226 logger.warning("Approval timed out after %ds for module '%s'.", timeout, module_id)
227 raise
228 finally:
229 signal.alarm(0)
230 signal.signal(signal.SIGALRM, old_handler)
232 return approved
235def _prompt_windows(module_id: str, timeout: int) -> bool:
236 """Windows approval prompt using threading.Timer + ctypes."""
237 import ctypes
239 def _interrupt_main() -> None:
240 ctypes.pythonapi.PyThreadState_SetAsyncExc(
241 ctypes.c_ulong(threading.main_thread().ident),
242 ctypes.py_object(ApprovalTimeoutError),
243 )
245 timer = threading.Timer(timeout, _interrupt_main)
246 timer.start()
248 try:
249 approved = click.confirm("Proceed?", default=False)
250 timer.cancel()
251 return approved
252 except ApprovalTimeoutError:
253 timer.cancel()
254 logger.warning("Approval timed out after %ds for module '%s'.", timeout, module_id)
255 raise