Coverage for agentos/system/approval.py: 43%
84 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2可视化授权引擎 — Agent 主动申请权限,用户可视化审批。
4与 permissions.py 的分工:
5- permissions.py: 纯程序化权限检查(require/check)
6- approval.py: HITL 授权流程(Agent 发起申请 → 用户审批 → 回调)
8流程:
91. Agent 调用 request_approval(tier, resource, reason)
102. 引擎生成 ApprovalTicket,挂起等待
113. 通过 WebSocket 推送给桌面客户端
124. 用户在 UI 点击"同意" / "拒绝" / "拒绝+记住"
135. 回调触发,Agent 继续或拒绝
14"""
16from __future__ import annotations
18import asyncio
19import uuid
20from dataclasses import dataclass, field
21from datetime import datetime
22from enum import Enum
23from typing import Optional, Callable, Awaitable
25from agentos.system.permissions import PermissionTier
28class ApprovalStatus(str, Enum):
29 PENDING = "pending"
30 APPROVED = "approved"
31 DENIED = "denied"
32 DENIED_REMEMBER = "denied_remember" # 拒绝并记住,本次会话不再询问
33 TIMEOUT = "timeout"
36@dataclass
37class ApprovalTicket:
38 """授权申请票据。"""
39 ticket_id: str
40 tier: PermissionTier
41 resource: str # 申请访问的资源路径/命令
42 reason: str # Agent 申请原因(AI 生成的中文说明)
43 status: ApprovalStatus = ApprovalStatus.PENDING
44 created_at: str = field(default_factory=lambda: datetime.now().isoformat())
45 resolved_at: str = ""
46 session_id: str = ""
47 # 回调
48 _future: asyncio.Future | None = field(default=None, repr=False)
50 def __post_init__(self):
51 if self._future is None:
52 self._future = asyncio.Future()
54 async def wait(self, timeout: float = 60.0) -> ApprovalStatus:
55 """等待审批结果。"""
56 try:
57 return await asyncio.wait_for(asyncio.shield(self._future), timeout=timeout)
58 except asyncio.TimeoutError:
59 self.status = ApprovalStatus.TIMEOUT
60 self.resolved_at = datetime.now().isoformat()
61 return ApprovalStatus.TIMEOUT
63 def approve(self) -> None:
64 """批准。"""
65 self.status = ApprovalStatus.APPROVED
66 self.resolved_at = datetime.now().isoformat()
67 if self._future and not self._future.done():
68 self._future.set_result(ApprovalStatus.APPROVED)
70 def deny(self, remember: bool = False) -> None:
71 """拒绝。"""
72 self.status = ApprovalStatus.DENIED_REMEMBER if remember else ApprovalStatus.DENIED
73 self.resolved_at = datetime.now().isoformat()
74 if self._future and not self._future.done():
75 self._future.set_result(self.status)
78class ApprovalEngine:
79 """可视化授权引擎 — 管理审批流程。
81 用法:
82 engine = ApprovalEngine(pm, session_id)
83 engine.set_push_callback(send_to_ws) # 设置推送回调
85 # Agent 侧:
86 approved = await engine.request(
87 PermissionTier.SHELL_STANDARD,
88 "rm -rf ./build/",
89 "需要清理构建缓存以释放磁盘空间",
90 )
91 if approved:
92 # 执行操作
93 """
95 def __init__(self, perm_manager, session_id: str):
96 self._pm = perm_manager
97 self._sid = session_id
98 self._push_callback: Optional[Callable[[dict], Awaitable[None]]] = None
99 self._pending: dict[str, ApprovalTicket] = {}
100 self._denied_remember: set[str] = set() # 本次会话已记住拒绝的 (tier, pattern)
102 def set_push_callback(self, callback: Callable[[dict], Awaitable[None]]) -> None:
103 """设置推送回调(发送到 WebSocket 客户端)。"""
104 self._push_callback = callback
106 async def request(self, tier: PermissionTier, resource: str, reason: str,
107 timeout: float = 60.0) -> bool:
108 """Agent 发起权限申请,返回是否获批。
110 如果已有 DENIED_REMEMBER 记录,直接返回 False。
111 """
112 # 检查是否已记住拒绝
113 deny_key = f"{tier.value}:{resource}"
114 if deny_key in self._denied_remember:
115 return False
117 ticket = ApprovalTicket(
118 ticket_id=f"approval-{uuid.uuid4().hex[:12]}",
119 tier=tier,
120 resource=resource,
121 reason=reason,
122 session_id=self._sid,
123 )
124 self._pending[ticket.ticket_id] = ticket
126 # 推送到客户端
127 if self._push_callback:
128 try:
129 await self._push_callback({
130 "type": "approval_request",
131 "data": {
132 "ticket_id": ticket.ticket_id,
133 "tier": ticket.tier.value,
134 "tier_label": ticket.tier.label,
135 "resource": ticket.resource,
136 "reason": ticket.reason,
137 "session_id": self._sid,
138 "timeout": timeout,
139 },
140 })
141 except Exception:
142 pass # 推送失败不阻塞
144 # 等待审批
145 result = await ticket.wait(timeout=timeout)
147 # 处理记住拒绝
148 if result == ApprovalStatus.DENIED_REMEMBER:
149 self._denied_remember.add(deny_key)
151 # 如果审批通过,临时提升权限
152 if result == ApprovalStatus.APPROVED:
153 self._pm.escalate(self._sid, tier, resource)
155 # 清理
156 self._pending.pop(ticket.ticket_id, None)
158 return result == ApprovalStatus.APPROVED
160 def resolve(self, ticket_id: str, approved: bool, remember: bool = False) -> bool:
161 """手动审批票据(从 UI 调用)。"""
162 ticket = self._pending.get(ticket_id)
163 if not ticket or ticket.status != ApprovalStatus.PENDING:
164 return False
166 if approved:
167 ticket.approve()
168 else:
169 ticket.deny(remember=remember)
170 return True
172 def get_pending_tickets(self) -> list[dict]:
173 """获取所有待审批票据(供 UI 轮询)。"""
174 return [
175 {
176 "ticket_id": t.ticket_id,
177 "tier": t.tier.value,
178 "tier_label": t.tier.label,
179 "resource": t.resource,
180 "reason": t.reason,
181 "created_at": t.created_at,
182 "timeout_remaining": max(0, 60 - (
183 datetime.now() - datetime.fromisoformat(t.created_at)
184 ).total_seconds()),
185 }
186 for t in self._pending.values()
187 if t.status == ApprovalStatus.PENDING
188 ]
190 def clear_denied(self) -> None:
191 """清除本次会话的记住拒绝记录。"""
192 self._denied_remember.clear()