Coverage for src / apcore_cli / approval.py: 65%

113 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-04-26 10:23 +0800

1"""Approval Gate — TTY-aware HITL approval (FE-03, FE-11 §3.5).""" 

2 

3from __future__ import annotations 

4 

5import logging 

6import os 

7import sys 

8import threading 

9from typing import Any 

10 

11import click 

12 

13logger = logging.getLogger("apcore_cli.approval") 

14 

15 

16class ApprovalTimeoutError(Exception): 

17 """Raised when the approval prompt times out. 

18 

19 Carries ``code='APPROVAL_TIMEOUT'`` so the discovery.py exec_cmd 

20 ``except Exception`` handler maps it to exit code 46 via _ERROR_CODE_MAP 

21 while still flushing the audit log (D11-001). 

22 """ 

23 

24 code = "APPROVAL_TIMEOUT" 

25 

26 

27class ApprovalDeniedError(Exception): 

28 """Raised when an approval request is denied (rejected or pending). 

29 

30 Carries ``code='APPROVAL_DENIED'`` so the discovery.py exec_cmd 

31 ``except Exception`` handler maps it to exit code 46 via _ERROR_CODE_MAP 

32 while still flushing the audit log (D11-001). Previously the legacy 

33 ``check_approval`` wrapper called ``sys.exit(46)`` directly — SystemExit 

34 is a BaseException and bypassed the audit-flush handler. 

35 """ 

36 

37 code = "APPROVAL_DENIED" 

38 

39 

40def _get_annotation(annotations: Any, key: str, default: Any = None) -> Any: 

41 """Get an annotation value from either a dict or a ModuleAnnotations object.""" 

42 if isinstance(annotations, dict): 

43 return annotations.get(key, default) 

44 return getattr(annotations, key, default) 

45 

46 

47# --------------------------------------------------------------------------- 

48# CliApprovalHandler — implements apcore ApprovalHandler protocol (FE-11 §3.5) 

49# --------------------------------------------------------------------------- 

50 

51 

52class CliApprovalHandler: 

53 """ApprovalHandler that prompts in TTY, auto-denies in non-TTY (unless bypassed). 

54 

55 Implements the apcore ApprovalHandler protocol: 

56 - ``request_approval(request) -> ApprovalResult`` 

57 - ``check_approval(approval_id) -> ApprovalResult`` 

58 

59 Pass to Executor via ``executor.set_approval_handler(handler)``. 

60 """ 

61 

62 def __init__(self, auto_approve: bool = False, timeout: int = 60) -> None: 

63 self.auto_approve = auto_approve 

64 self.timeout = max(1, min(timeout, 3600)) 

65 

66 async def request_approval(self, request: Any) -> Any: 

67 """Request approval for a module invocation. 

68 

69 Follows the apcore ApprovalRequest/ApprovalResult protocol. 

70 Returns a dict with ``status``, ``approved_by``, ``reason`` fields 

71 (duck-type compatible with ApprovalResult dataclass). 

72 """ 

73 module_id = getattr(request, "module_id", "unknown") 

74 

75 # Bypass: auto_approve flag 

76 if self.auto_approve: 

77 logger.info("Approval bypassed via --yes flag for module '%s'.", module_id) 

78 return {"status": "approved", "approved_by": "auto_approve"} 

79 

80 # Bypass: APCORE_CLI_AUTO_APPROVE env var 

81 env_val = os.environ.get("APCORE_CLI_AUTO_APPROVE", "") 

82 if env_val == "1": 

83 logger.info("Approval bypassed via APCORE_CLI_AUTO_APPROVE for '%s'.", module_id) 

84 return {"status": "approved", "approved_by": "env_auto_approve"} 

85 if env_val != "" and env_val != "1": 

86 logger.warning("APCORE_CLI_AUTO_APPROVE='%s', expected '1'. Ignoring.", env_val) 

87 

88 # Non-TTY: reject 

89 if not sys.stdin.isatty(): 

90 return { 

91 "status": "rejected", 

92 "reason": "Non-interactive session without --yes", 

93 } 

94 

95 # TTY prompt 

96 annotations = getattr(request, "annotations", None) or {} 

97 extra = getattr(annotations, "extra", {}) if not isinstance(annotations, dict) else annotations 

98 message = extra.get("approval_message") or f"Module '{module_id}' requires approval to execute." 

99 

100 click.echo(message, err=True) 

101 try: 

102 approved = _tty_prompt(module_id, self.timeout) 

103 except ApprovalTimeoutError: 

104 return {"status": "timeout", "reason": f"Timed out after {self.timeout}s"} 

105 

106 if approved: 

107 return {"status": "approved", "approved_by": "tty_user"} 

108 return {"status": "rejected", "reason": "User rejected"} 

109 

110 async def check_approval(self, approval_id: str) -> Any: 

111 """Check status of a previously pending approval (Phase B). 

112 

113 CLI does not support async approval polling; always returns rejected. 

114 """ 

115 return { 

116 "status": "rejected", 

117 "reason": "CLI does not support async approval polling", 

118 } 

119 

120 

121# --------------------------------------------------------------------------- 

122# Legacy check_approval() — backward-compatible wrapper 

123# --------------------------------------------------------------------------- 

124 

125 

126def check_approval(module_def: Any, auto_approve: bool, timeout: int = 60) -> None: 

127 """Check if module requires approval and handle accordingly. 

128 

129 Returns None if approved (or approval not required). 

130 Raises :class:`ApprovalDeniedError` on denial / non-TTY / user reject, 

131 or :class:`ApprovalTimeoutError` on prompt timeout. Both errors carry a 

132 ``code`` attribute that maps to exit code 46 via the CLI ``_ERROR_CODE_MAP``; 

133 the discovery.py exec_cmd ``except Exception`` handler is responsible for 

134 flushing the audit log and translating the error to ``sys.exit(46)`` 

135 (D11-001 — previously this function called ``sys.exit`` directly, which 

136 raises BaseException and bypassed the audit-flush handler). 

137 

138 Args: 

139 module_def: Module descriptor with annotations. 

140 auto_approve: If True, bypass approval (--yes flag). 

141 timeout: Approval prompt timeout in seconds. 

142 """ 

143 annotations = getattr(module_def, "annotations", None) 

144 if annotations is None or (not isinstance(annotations, dict) and not hasattr(annotations, "requires_approval")): 

145 return 

146 

147 requires = _get_annotation(annotations, "requires_approval", False) 

148 if requires is not True: 

149 return 

150 

151 module_id = getattr(module_def, "module_id", getattr(module_def, "canonical_id", "unknown")) 

152 

153 # Bypass: --yes flag (highest priority) 

154 if auto_approve is True: 

155 logger.info("Approval bypassed via --yes flag for module '%s'.", module_id) 

156 return 

157 

158 # Bypass: APCORE_CLI_AUTO_APPROVE env var 

159 env_val = os.environ.get("APCORE_CLI_AUTO_APPROVE", "") 

160 if env_val == "1": 

161 logger.info("Approval bypassed via APCORE_CLI_AUTO_APPROVE for '%s'.", module_id) 

162 return 

163 if env_val != "" and env_val != "1": 

164 logger.warning("APCORE_CLI_AUTO_APPROVE='%s', expected '1'. Ignoring.", env_val) 

165 

166 # Non-TTY check 

167 if not sys.stdin.isatty(): 

168 raise ApprovalDeniedError( 

169 f"Module '{module_id}' requires approval but no interactive " 

170 "terminal is available. Use --yes or set APCORE_CLI_AUTO_APPROVE=1 " 

171 "to bypass." 

172 ) 

173 

174 # TTY prompt 

175 _prompt_with_timeout(module_def, timeout=timeout) 

176 

177 

178# --------------------------------------------------------------------------- 

179# Internal prompt implementation 

180# --------------------------------------------------------------------------- 

181 

182 

183def _prompt_with_timeout(module_def: Any, timeout: int = 60) -> None: 

184 """Display approval prompt with timeout.""" 

185 timeout = max(1, min(timeout, 3600)) 

186 

187 module_id = getattr(module_def, "module_id", getattr(module_def, "canonical_id", "unknown")) 

188 annotations = getattr(module_def, "annotations", None) or {} 

189 message = _get_annotation(annotations, "approval_message", None) 

190 if message is None: 

191 message = f"Module '{module_id}' requires approval to execute." 

192 

193 click.echo(message, err=True) 

194 

195 try: 

196 approved = _tty_prompt(module_id, timeout) 

197 except ApprovalTimeoutError: 

198 raise ApprovalTimeoutError(f"Approval prompt timed out after {timeout} seconds.") from None 

199 

200 if approved: 

201 logger.info("User approved execution of module '%s'.", module_id) 

202 else: 

203 raise ApprovalDeniedError("Approval denied.") 

204 

205 

206def _tty_prompt(module_id: str, timeout: int) -> bool: 

207 """Run the TTY prompt with timeout. Returns True if approved, raises on timeout.""" 

208 if sys.platform != "win32": 

209 return _prompt_unix(module_id, timeout) 

210 return _prompt_windows(module_id, timeout) 

211 

212 

213def _prompt_unix(module_id: str, timeout: int) -> bool: 

214 """Unix approval prompt using SIGALRM.""" 

215 import signal 

216 

217 def _timeout_handler(signum: int, frame: Any) -> None: 

218 raise ApprovalTimeoutError() 

219 

220 old_handler = signal.signal(signal.SIGALRM, _timeout_handler) 

221 signal.alarm(timeout) 

222 

223 try: 

224 approved = click.confirm("Proceed?", default=False) 

225 except ApprovalTimeoutError: 

226 logger.warning("Approval timed out after %ds for module '%s'.", timeout, module_id) 

227 raise 

228 finally: 

229 signal.alarm(0) 

230 signal.signal(signal.SIGALRM, old_handler) 

231 

232 return approved 

233 

234 

235def _prompt_windows(module_id: str, timeout: int) -> bool: 

236 """Windows approval prompt using threading.Timer + ctypes.""" 

237 import ctypes 

238 

239 def _interrupt_main() -> None: 

240 ctypes.pythonapi.PyThreadState_SetAsyncExc( 

241 ctypes.c_ulong(threading.main_thread().ident), 

242 ctypes.py_object(ApprovalTimeoutError), 

243 ) 

244 

245 timer = threading.Timer(timeout, _interrupt_main) 

246 timer.start() 

247 

248 try: 

249 approved = click.confirm("Proceed?", default=False) 

250 timer.cancel() 

251 return approved 

252 except ApprovalTimeoutError: 

253 timer.cancel() 

254 logger.warning("Approval timed out after %ds for module '%s'.", timeout, module_id) 

255 raise