Coverage for agentos/system/approval.py: 43%

84 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2可视化授权引擎 — Agent 主动申请权限,用户可视化审批。 

3 

4与 permissions.py 的分工: 

5- permissions.py: 纯程序化权限检查(require/check) 

6- approval.py: HITL 授权流程(Agent 发起申请 → 用户审批 → 回调) 

7 

8流程: 

91. Agent 调用 request_approval(tier, resource, reason) 

102. 引擎生成 ApprovalTicket,挂起等待 

113. 通过 WebSocket 推送给桌面客户端 

124. 用户在 UI 点击"同意" / "拒绝" / "拒绝+记住" 

135. 回调触发,Agent 继续或拒绝 

14""" 

15 

16from __future__ import annotations 

17 

18import asyncio 

19import uuid 

20from dataclasses import dataclass, field 

21from datetime import datetime 

22from enum import Enum 

23from typing import Optional, Callable, Awaitable 

24 

25from agentos.system.permissions import PermissionTier 

26 

27 

28class ApprovalStatus(str, Enum): 

29 PENDING = "pending" 

30 APPROVED = "approved" 

31 DENIED = "denied" 

32 DENIED_REMEMBER = "denied_remember" # 拒绝并记住,本次会话不再询问 

33 TIMEOUT = "timeout" 

34 

35 

36@dataclass 

37class ApprovalTicket: 

38 """授权申请票据。""" 

39 ticket_id: str 

40 tier: PermissionTier 

41 resource: str # 申请访问的资源路径/命令 

42 reason: str # Agent 申请原因(AI 生成的中文说明) 

43 status: ApprovalStatus = ApprovalStatus.PENDING 

44 created_at: str = field(default_factory=lambda: datetime.now().isoformat()) 

45 resolved_at: str = "" 

46 session_id: str = "" 

47 # 回调 

48 _future: asyncio.Future | None = field(default=None, repr=False) 

49 

50 def __post_init__(self): 

51 if self._future is None: 

52 self._future = asyncio.Future() 

53 

54 async def wait(self, timeout: float = 60.0) -> ApprovalStatus: 

55 """等待审批结果。""" 

56 try: 

57 return await asyncio.wait_for(asyncio.shield(self._future), timeout=timeout) 

58 except asyncio.TimeoutError: 

59 self.status = ApprovalStatus.TIMEOUT 

60 self.resolved_at = datetime.now().isoformat() 

61 return ApprovalStatus.TIMEOUT 

62 

63 def approve(self) -> None: 

64 """批准。""" 

65 self.status = ApprovalStatus.APPROVED 

66 self.resolved_at = datetime.now().isoformat() 

67 if self._future and not self._future.done(): 

68 self._future.set_result(ApprovalStatus.APPROVED) 

69 

70 def deny(self, remember: bool = False) -> None: 

71 """拒绝。""" 

72 self.status = ApprovalStatus.DENIED_REMEMBER if remember else ApprovalStatus.DENIED 

73 self.resolved_at = datetime.now().isoformat() 

74 if self._future and not self._future.done(): 

75 self._future.set_result(self.status) 

76 

77 

78class ApprovalEngine: 

79 """可视化授权引擎 — 管理审批流程。 

80 

81 用法: 

82 engine = ApprovalEngine(pm, session_id) 

83 engine.set_push_callback(send_to_ws) # 设置推送回调 

84 

85 # Agent 侧: 

86 approved = await engine.request( 

87 PermissionTier.SHELL_STANDARD, 

88 "rm -rf ./build/", 

89 "需要清理构建缓存以释放磁盘空间", 

90 ) 

91 if approved: 

92 # 执行操作 

93 """ 

94 

95 def __init__(self, perm_manager, session_id: str): 

96 self._pm = perm_manager 

97 self._sid = session_id 

98 self._push_callback: Optional[Callable[[dict], Awaitable[None]]] = None 

99 self._pending: dict[str, ApprovalTicket] = {} 

100 self._denied_remember: set[str] = set() # 本次会话已记住拒绝的 (tier, pattern) 

101 

102 def set_push_callback(self, callback: Callable[[dict], Awaitable[None]]) -> None: 

103 """设置推送回调(发送到 WebSocket 客户端)。""" 

104 self._push_callback = callback 

105 

106 async def request(self, tier: PermissionTier, resource: str, reason: str, 

107 timeout: float = 60.0) -> bool: 

108 """Agent 发起权限申请,返回是否获批。 

109 

110 如果已有 DENIED_REMEMBER 记录,直接返回 False。 

111 """ 

112 # 检查是否已记住拒绝 

113 deny_key = f"{tier.value}:{resource}" 

114 if deny_key in self._denied_remember: 

115 return False 

116 

117 ticket = ApprovalTicket( 

118 ticket_id=f"approval-{uuid.uuid4().hex[:12]}", 

119 tier=tier, 

120 resource=resource, 

121 reason=reason, 

122 session_id=self._sid, 

123 ) 

124 self._pending[ticket.ticket_id] = ticket 

125 

126 # 推送到客户端 

127 if self._push_callback: 

128 try: 

129 await self._push_callback({ 

130 "type": "approval_request", 

131 "data": { 

132 "ticket_id": ticket.ticket_id, 

133 "tier": ticket.tier.value, 

134 "tier_label": ticket.tier.label, 

135 "resource": ticket.resource, 

136 "reason": ticket.reason, 

137 "session_id": self._sid, 

138 "timeout": timeout, 

139 }, 

140 }) 

141 except Exception: 

142 pass # 推送失败不阻塞 

143 

144 # 等待审批 

145 result = await ticket.wait(timeout=timeout) 

146 

147 # 处理记住拒绝 

148 if result == ApprovalStatus.DENIED_REMEMBER: 

149 self._denied_remember.add(deny_key) 

150 

151 # 如果审批通过,临时提升权限 

152 if result == ApprovalStatus.APPROVED: 

153 self._pm.escalate(self._sid, tier, resource) 

154 

155 # 清理 

156 self._pending.pop(ticket.ticket_id, None) 

157 

158 return result == ApprovalStatus.APPROVED 

159 

160 def resolve(self, ticket_id: str, approved: bool, remember: bool = False) -> bool: 

161 """手动审批票据(从 UI 调用)。""" 

162 ticket = self._pending.get(ticket_id) 

163 if not ticket or ticket.status != ApprovalStatus.PENDING: 

164 return False 

165 

166 if approved: 

167 ticket.approve() 

168 else: 

169 ticket.deny(remember=remember) 

170 return True 

171 

172 def get_pending_tickets(self) -> list[dict]: 

173 """获取所有待审批票据(供 UI 轮询)。""" 

174 return [ 

175 { 

176 "ticket_id": t.ticket_id, 

177 "tier": t.tier.value, 

178 "tier_label": t.tier.label, 

179 "resource": t.resource, 

180 "reason": t.reason, 

181 "created_at": t.created_at, 

182 "timeout_remaining": max(0, 60 - ( 

183 datetime.now() - datetime.fromisoformat(t.created_at) 

184 ).total_seconds()), 

185 } 

186 for t in self._pending.values() 

187 if t.status == ApprovalStatus.PENDING 

188 ] 

189 

190 def clear_denied(self) -> None: 

191 """清除本次会话的记住拒绝记录。""" 

192 self._denied_remember.clear()