Coverage for agentos/hitl/approver.py: 50%

119 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2Human-in-the-Loop approval engine — request construction, risk assessment, 

3policy evaluation, and decision processing. 

4""" 

5 

6from dataclasses import dataclass, field 

7from enum import Enum, auto 

8from typing import Any, Callable, Dict, List, Optional, Set 

9 

10 

11class ApprovalStatus(str, Enum): 

12 """Status of an approval request.""" 

13 

14 PENDING = "pending" 

15 APPROVED = "approved" 

16 REJECTED = "rejected" 

17 MODIFIED = "modified" 

18 TIMED_OUT = "timed_out" 

19 SKIPPED = "skipped" 

20 

21 

22class RiskLevel(str, Enum): 

23 """Risk classification for approval decisions.""" 

24 

25 LOW = "low" 

26 MEDIUM = "medium" 

27 HIGH = "high" 

28 CRITICAL = "critical" 

29 

30 

31@dataclass 

32class ApprovalRequest: 

33 """A structured request for human approval.""" 

34 

35 request_id: str 

36 action: str 

37 description: str 

38 risk_level: RiskLevel = RiskLevel.MEDIUM 

39 tool_name: str = "" 

40 tool_args: dict[str, Any] = field(default_factory=dict) 

41 estimated_cost_usd: float = 0.0 

42 data_affected: list[str] = field(default_factory=list) 

43 context: dict[str, Any] = field(default_factory=dict) 

44 metadata: dict[str, Any] = field(default_factory=dict) 

45 

46 

47@dataclass 

48class ApprovalDecision: 

49 """Human decision on an approval request.""" 

50 

51 request_id: str 

52 status: ApprovalStatus 

53 reason: str = "" 

54 modified_args: dict[str, Any] | None = None 

55 metadata: dict[str, Any] = field(default_factory=dict) 

56 

57 @property 

58 def is_approved(self) -> bool: 

59 return self.status in (ApprovalStatus.APPROVED, ApprovalStatus.MODIFIED) 

60 

61 @property 

62 def is_rejected(self) -> bool: 

63 return self.status == ApprovalStatus.REJECTED 

64 

65 

66@dataclass 

67class ApprovalPolicy: 

68 """Configures which actions require human approval.""" 

69 

70 require_approval_for_risk: set[RiskLevel] = field( 

71 default_factory=lambda: {RiskLevel.HIGH, RiskLevel.CRITICAL} 

72 ) 

73 auto_approve_domains: set[str] = field(default_factory=set) 

74 block_domains: set[str] = field(default_factory=set) 

75 max_auto_approve_cost_usd: float = 0.01 

76 require_approval_for_new_tools: bool = True 

77 timeout_seconds: int = 120 

78 max_pending_requests: int = 10 

79 cache_approval_seconds: int = 300 

80 

81 

82ApprovalCallback = Callable[[ApprovalRequest], ApprovalDecision] 

83 

84 

85class HumanInTheLoop: 

86 """Manages the human approval workflow for tool calls and mutations. 

87 

88 Supports synchronous callbacks (CLI prompt, webhook, etc.) and 

89 configurable auto-approval rules based on risk and domain. 

90 """ 

91 

92 def __init__( 

93 self, 

94 policy: Optional[ApprovalPolicy] = None, 

95 callback: Optional[ApprovalCallback] = None, 

96 ): 

97 self.policy = policy or ApprovalPolicy() 

98 self.callback = callback 

99 self._pending: dict[str, ApprovalRequest] = {} 

100 self._decisions: dict[str, ApprovalDecision] = {} 

101 self._history: list[tuple[ApprovalRequest, ApprovalDecision]] = [] 

102 self._approval_cache: dict[str, tuple[float, ApprovalDecision]] = {} 

103 

104 def request_approval( 

105 self, 

106 action: str, 

107 description: str = "", 

108 risk_level: RiskLevel = RiskLevel.MEDIUM, 

109 tool_name: str = "", 

110 tool_args: dict[str, Any] | None = None, 

111 estimated_cost_usd: float = 0.0, 

112 data_affected: list[str] | None = None, 

113 ) -> ApprovalRequest: 

114 """Create an approval request and submit it for decision.""" 

115 import time, uuid 

116 

117 request_id = uuid.uuid4().hex[:12] 

118 req = ApprovalRequest( 

119 request_id=request_id, 

120 action=action, 

121 description=description, 

122 risk_level=risk_level, 

123 tool_name=tool_name, 

124 tool_args=tool_args or {}, 

125 estimated_cost_usd=estimated_cost_usd, 

126 data_affected=data_affected or [], 

127 ) 

128 

129 # Check cache 

130 cache_key = f"{tool_name}:{action}" 

131 if cache_key in self._approval_cache: 

132 ts, decision = self._approval_cache[cache_key] 

133 if time.time() - ts < self.policy.cache_approval_seconds: 

134 self._decisions[request_id] = decision 

135 self._history.append((req, decision)) 

136 return req 

137 

138 # Evaluate auto-approval policy 

139 decision = self._evaluate_policy(req) 

140 if decision is not None: 

141 self._decisions[request_id] = decision 

142 self._history.append((req, decision)) 

143 return req 

144 

145 # Needs human input 

146 if len(self._pending) >= self.policy.max_pending_requests: 

147 decision = ApprovalDecision( 

148 request_id=request_id, 

149 status=ApprovalStatus.REJECTED, 

150 reason="Max pending requests exceeded.", 

151 ) 

152 self._decisions[request_id] = decision 

153 self._history.append((req, decision)) 

154 return req 

155 

156 self._pending[request_id] = req 

157 return req 

158 

159 def decide(self, request_id: str, decision: ApprovalDecision) -> None: 

160 """Record a human decision and remove from pending.""" 

161 self._decisions[request_id] = decision 

162 if request_id in self._pending: 

163 req = self._pending.pop(request_id) 

164 self._history.append((req, decision)) 

165 # Cache if approved 

166 if decision.is_approved: 

167 import time 

168 cache_key = f"{req.tool_name}:{req.action}" 

169 self._approval_cache[cache_key] = (time.time(), decision) 

170 

171 def get_decision(self, request_id: str) -> Optional[ApprovalDecision]: 

172 return self._decisions.get(request_id) 

173 

174 def get_pending(self) -> list[ApprovalRequest]: 

175 return list(self._pending.values()) 

176 

177 def get_history(self) -> list[tuple[ApprovalRequest, ApprovalDecision]]: 

178 return self._history.copy() 

179 

180 def clear_cache(self) -> None: 

181 self._approval_cache.clear() 

182 

183 def _evaluate_policy(self, req: ApprovalRequest) -> Optional[ApprovalDecision]: 

184 """Determine if the request can be auto-decided without human input.""" 

185 

186 # Blocked domains always rejected 

187 domain = req.tool_name.split(".")[0] if req.tool_name else "" 

188 if domain and domain in self.policy.block_domains: 

189 return ApprovalDecision( 

190 request_id=req.request_id, 

191 status=ApprovalStatus.REJECTED, 

192 reason=f"Domain '{domain}' is blocked by policy.", 

193 ) 

194 

195 # Auto-approve domains + low risk 

196 if domain and domain in self.policy.auto_approve_domains: 

197 if req.estimated_cost_usd <= self.policy.max_auto_approve_cost_usd: 

198 return ApprovalDecision( 

199 request_id=req.request_id, 

200 status=ApprovalStatus.APPROVED, 

201 reason=f"Auto-approved: domain '{domain}' is trusted.", 

202 ) 

203 

204 # Risk level check 

205 if req.risk_level not in self.policy.require_approval_for_risk: 

206 return ApprovalDecision( 

207 request_id=req.request_id, 

208 status=ApprovalStatus.SKIPPED, 

209 reason=f"Risk level '{req.risk_level.value}' does not require approval.", 

210 ) 

211 

212 return None # Needs human input 

213 

214 def request_and_decide( 

215 self, 

216 action: str, 

217 description: str = "", 

218 risk_level: RiskLevel = RiskLevel.MEDIUM, 

219 tool_name: str = "", 

220 tool_args: dict[str, Any] | None = None, 

221 ) -> tuple[ApprovalRequest, ApprovalDecision]: 

222 """Create request, attempt auto-decision, invoke callback if needed.""" 

223 req = self.request_approval( 

224 action=action, 

225 description=description, 

226 risk_level=risk_level, 

227 tool_name=tool_name, 

228 tool_args=tool_args, 

229 ) 

230 decision = self.get_decision(req.request_id) 

231 if decision is not None: 

232 return req, decision 

233 

234 if self.callback: 

235 decision = self.callback(req) 

236 self.decide(req.request_id, decision) 

237 else: 

238 decision = ApprovalDecision( 

239 request_id=req.request_id, 

240 status=ApprovalStatus.TIMED_OUT, 

241 reason="No human callback configured.", 

242 ) 

243 

244 return req, decision