Coverage for agentos/hitl/approver.py: 50%
119 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2Human-in-the-Loop approval engine — request construction, risk assessment,
3policy evaluation, and decision processing.
4"""
6from dataclasses import dataclass, field
7from enum import Enum, auto
8from typing import Any, Callable, Dict, List, Optional, Set
11class ApprovalStatus(str, Enum):
12 """Status of an approval request."""
14 PENDING = "pending"
15 APPROVED = "approved"
16 REJECTED = "rejected"
17 MODIFIED = "modified"
18 TIMED_OUT = "timed_out"
19 SKIPPED = "skipped"
22class RiskLevel(str, Enum):
23 """Risk classification for approval decisions."""
25 LOW = "low"
26 MEDIUM = "medium"
27 HIGH = "high"
28 CRITICAL = "critical"
31@dataclass
32class ApprovalRequest:
33 """A structured request for human approval."""
35 request_id: str
36 action: str
37 description: str
38 risk_level: RiskLevel = RiskLevel.MEDIUM
39 tool_name: str = ""
40 tool_args: dict[str, Any] = field(default_factory=dict)
41 estimated_cost_usd: float = 0.0
42 data_affected: list[str] = field(default_factory=list)
43 context: dict[str, Any] = field(default_factory=dict)
44 metadata: dict[str, Any] = field(default_factory=dict)
47@dataclass
48class ApprovalDecision:
49 """Human decision on an approval request."""
51 request_id: str
52 status: ApprovalStatus
53 reason: str = ""
54 modified_args: dict[str, Any] | None = None
55 metadata: dict[str, Any] = field(default_factory=dict)
57 @property
58 def is_approved(self) -> bool:
59 return self.status in (ApprovalStatus.APPROVED, ApprovalStatus.MODIFIED)
61 @property
62 def is_rejected(self) -> bool:
63 return self.status == ApprovalStatus.REJECTED
66@dataclass
67class ApprovalPolicy:
68 """Configures which actions require human approval."""
70 require_approval_for_risk: set[RiskLevel] = field(
71 default_factory=lambda: {RiskLevel.HIGH, RiskLevel.CRITICAL}
72 )
73 auto_approve_domains: set[str] = field(default_factory=set)
74 block_domains: set[str] = field(default_factory=set)
75 max_auto_approve_cost_usd: float = 0.01
76 require_approval_for_new_tools: bool = True
77 timeout_seconds: int = 120
78 max_pending_requests: int = 10
79 cache_approval_seconds: int = 300
82ApprovalCallback = Callable[[ApprovalRequest], ApprovalDecision]
85class HumanInTheLoop:
86 """Manages the human approval workflow for tool calls and mutations.
88 Supports synchronous callbacks (CLI prompt, webhook, etc.) and
89 configurable auto-approval rules based on risk and domain.
90 """
92 def __init__(
93 self,
94 policy: Optional[ApprovalPolicy] = None,
95 callback: Optional[ApprovalCallback] = None,
96 ):
97 self.policy = policy or ApprovalPolicy()
98 self.callback = callback
99 self._pending: dict[str, ApprovalRequest] = {}
100 self._decisions: dict[str, ApprovalDecision] = {}
101 self._history: list[tuple[ApprovalRequest, ApprovalDecision]] = []
102 self._approval_cache: dict[str, tuple[float, ApprovalDecision]] = {}
104 def request_approval(
105 self,
106 action: str,
107 description: str = "",
108 risk_level: RiskLevel = RiskLevel.MEDIUM,
109 tool_name: str = "",
110 tool_args: dict[str, Any] | None = None,
111 estimated_cost_usd: float = 0.0,
112 data_affected: list[str] | None = None,
113 ) -> ApprovalRequest:
114 """Create an approval request and submit it for decision."""
115 import time, uuid
117 request_id = uuid.uuid4().hex[:12]
118 req = ApprovalRequest(
119 request_id=request_id,
120 action=action,
121 description=description,
122 risk_level=risk_level,
123 tool_name=tool_name,
124 tool_args=tool_args or {},
125 estimated_cost_usd=estimated_cost_usd,
126 data_affected=data_affected or [],
127 )
129 # Check cache
130 cache_key = f"{tool_name}:{action}"
131 if cache_key in self._approval_cache:
132 ts, decision = self._approval_cache[cache_key]
133 if time.time() - ts < self.policy.cache_approval_seconds:
134 self._decisions[request_id] = decision
135 self._history.append((req, decision))
136 return req
138 # Evaluate auto-approval policy
139 decision = self._evaluate_policy(req)
140 if decision is not None:
141 self._decisions[request_id] = decision
142 self._history.append((req, decision))
143 return req
145 # Needs human input
146 if len(self._pending) >= self.policy.max_pending_requests:
147 decision = ApprovalDecision(
148 request_id=request_id,
149 status=ApprovalStatus.REJECTED,
150 reason="Max pending requests exceeded.",
151 )
152 self._decisions[request_id] = decision
153 self._history.append((req, decision))
154 return req
156 self._pending[request_id] = req
157 return req
159 def decide(self, request_id: str, decision: ApprovalDecision) -> None:
160 """Record a human decision and remove from pending."""
161 self._decisions[request_id] = decision
162 if request_id in self._pending:
163 req = self._pending.pop(request_id)
164 self._history.append((req, decision))
165 # Cache if approved
166 if decision.is_approved:
167 import time
168 cache_key = f"{req.tool_name}:{req.action}"
169 self._approval_cache[cache_key] = (time.time(), decision)
171 def get_decision(self, request_id: str) -> Optional[ApprovalDecision]:
172 return self._decisions.get(request_id)
174 def get_pending(self) -> list[ApprovalRequest]:
175 return list(self._pending.values())
177 def get_history(self) -> list[tuple[ApprovalRequest, ApprovalDecision]]:
178 return self._history.copy()
180 def clear_cache(self) -> None:
181 self._approval_cache.clear()
183 def _evaluate_policy(self, req: ApprovalRequest) -> Optional[ApprovalDecision]:
184 """Determine if the request can be auto-decided without human input."""
186 # Blocked domains always rejected
187 domain = req.tool_name.split(".")[0] if req.tool_name else ""
188 if domain and domain in self.policy.block_domains:
189 return ApprovalDecision(
190 request_id=req.request_id,
191 status=ApprovalStatus.REJECTED,
192 reason=f"Domain '{domain}' is blocked by policy.",
193 )
195 # Auto-approve domains + low risk
196 if domain and domain in self.policy.auto_approve_domains:
197 if req.estimated_cost_usd <= self.policy.max_auto_approve_cost_usd:
198 return ApprovalDecision(
199 request_id=req.request_id,
200 status=ApprovalStatus.APPROVED,
201 reason=f"Auto-approved: domain '{domain}' is trusted.",
202 )
204 # Risk level check
205 if req.risk_level not in self.policy.require_approval_for_risk:
206 return ApprovalDecision(
207 request_id=req.request_id,
208 status=ApprovalStatus.SKIPPED,
209 reason=f"Risk level '{req.risk_level.value}' does not require approval.",
210 )
212 return None # Needs human input
214 def request_and_decide(
215 self,
216 action: str,
217 description: str = "",
218 risk_level: RiskLevel = RiskLevel.MEDIUM,
219 tool_name: str = "",
220 tool_args: dict[str, Any] | None = None,
221 ) -> tuple[ApprovalRequest, ApprovalDecision]:
222 """Create request, attempt auto-decision, invoke callback if needed."""
223 req = self.request_approval(
224 action=action,
225 description=description,
226 risk_level=risk_level,
227 tool_name=tool_name,
228 tool_args=tool_args,
229 )
230 decision = self.get_decision(req.request_id)
231 if decision is not None:
232 return req, decision
234 if self.callback:
235 decision = self.callback(req)
236 self.decide(req.request_id, decision)
237 else:
238 decision = ApprovalDecision(
239 request_id=req.request_id,
240 status=ApprovalStatus.TIMED_OUT,
241 reason="No human callback configured.",
242 )
244 return req, decision