Coverage for agentos/system/permissions.py: 48%

115 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2分层权限系统 — 让 Agent 操作系统的每一步都在受控范围内。 

3 

4权限层级设计理念: 

5- 参考 Android 权限模型的分级思想 

6- 默认最小权限原则 (Principle of Least Privilege) 

7- 高风险操作需二次确认 

8- 支持会话级/全局级权限配置 

9""" 

10 

11from __future__ import annotations 

12 

13from dataclasses import dataclass, field 

14from enum import IntEnum, auto 

15from typing import Optional, Callable 

16 

17# ── 权限层级定义 ────────────────────────────────────────────── 

18 

19 

20class PermissionTier(IntEnum): 

21 """权限层级,数值越大权限越高。""" 

22 READ = 0 # 只读访问 

23 WRITE_SANDBOX = 1 # 沙箱写入 

24 WRITE_ALL = 2 # 全盘写入 

25 SHELL_READONLY = 3 # 只读 Shell 

26 SHELL_STANDARD = 4 # 标准 Shell(超时/目录限制) 

27 SHELL_FULL = 5 # 全权限 Shell 

28 BROWSER = 6 # 浏览器自动化 

29 ADMIN = 7 # 系统管理 

30 

31 @property 

32 def label(self) -> str: 

33 """中文标签,用于 UI 展示。""" 

34 return { 

35 0: "只读访问", 

36 1: "沙箱写入", 

37 2: "全盘写入", 

38 3: "只读Shell", 

39 4: "标准Shell", 

40 5: "全权限Shell", 

41 6: "浏览器自动化", 

42 7: "系统管理", 

43 }.get(self.value, "未知") 

44 

45 

46@dataclass 

47class SystemPermission: 

48 """单个系统权限定义。""" 

49 tier: PermissionTier 

50 resource: str # 资源标识,如 "/home/user/*", "apt:install" 

51 description: str = "" 

52 requires_confirmation: bool = False # 是否需要用户二次确认 

53 rate_limit_per_minute: int = 0 # 0 表示不限 

54 

55 

56# ── 预设权限策略 ────────────────────────────────────────────── 

57 

58# 安全模式(默认): 允许读写但 Shell 受限 

59SAFE_PERMISSIONS: list[SystemPermission] = [ 

60 SystemPermission(PermissionTier.READ, "*", "读取任意文件"), 

61 SystemPermission(PermissionTier.WRITE_SANDBOX, "/tmp/agentos/**", "沙箱写入"), 

62 SystemPermission(PermissionTier.SHELL_READONLY, "ls,cat,head,tail,find,ps,df,du,whoami,pwd,env,echo,date,wc,stat,file,which,uname", "只读 Shell 命令"), 

63 SystemPermission(PermissionTier.BROWSER, "*", "浏览器自动化", requires_confirmation=True), 

64] 

65 

66# 开发模式: 允许写 + 标准 Shell 

67DEV_PERMISSIONS: list[SystemPermission] = SAFE_PERMISSIONS + [ 

68 SystemPermission(PermissionTier.WRITE_ALL, "/home/**", "用户目录读写"), 

69 SystemPermission(PermissionTier.SHELL_STANDARD, "*", "标准 Shell(超时60s,沙箱目录)"), 

70 SystemPermission(PermissionTier.BROWSER, "*", "浏览器自动化"), 

71] 

72 

73# 全权限模式: 无限制(高风险,仅限受信环境) 

74FULL_PERMISSIONS: list[SystemPermission] = [ 

75 SystemPermission(PermissionTier.ADMIN, "*", "完全权限"), 

76] 

77 

78 

79# ── 权限上下文 ───────────────────────────────────────────────── 

80 

81 

82@dataclass 

83class PermissionContext: 

84 """权限检查的上下文信息。""" 

85 session_id: str 

86 agent_id: str = "" 

87 user_id: str = "" 

88 tier: PermissionTier = PermissionTier.READ 

89 granted_permissions: list[SystemPermission] = field(default_factory=lambda: SAFE_PERMISSIONS.copy()) 

90 # 回调:当需要用户确认时触发 

91 on_confirm_needed: Optional[Callable[[SystemPermission, str], bool]] = None 

92 

93 

94class PermissionDenied(Exception): 

95 """权限拒绝异常。""" 

96 def __init__(self, required: PermissionTier, resource: str, detail: str = ""): 

97 self.required = required 

98 self.resource = resource 

99 self.detail = detail 

100 super().__init__(f"权限不足: 需要 {required.name} 访问 {resource}{' — ' + detail if detail else ''}") 

101 

102 

103# ── 权限管理器 ───────────────────────────────────────────────── 

104 

105 

106class SystemPermissionManager: 

107 """系统权限管理器 — 检查、授权、升级、审计。""" 

108 

109 def __init__(self, default_tier: PermissionTier = PermissionTier.READ): 

110 self._contexts: dict[str, PermissionContext] = {} 

111 self._default_tier = default_tier 

112 

113 # ── 会话管理 ── 

114 

115 def create_session( 

116 self, 

117 session_id: str, 

118 tier: PermissionTier | None = None, 

119 permissions: list[SystemPermission] | None = None, 

120 ) -> PermissionContext: 

121 """创建权限会话。""" 

122 ctx = PermissionContext( 

123 session_id=session_id, 

124 tier=tier or self._default_tier, 

125 granted_permissions=permissions or SAFE_PERMISSIONS.copy(), 

126 ) 

127 self._contexts[session_id] = ctx 

128 return ctx 

129 

130 def get_session(self, session_id: str) -> PermissionContext: 

131 """获取会话,不存在则创建默认会话。""" 

132 if session_id not in self._contexts: 

133 return self.create_session(session_id) 

134 return self._contexts[session_id] 

135 

136 def close_session(self, session_id: str) -> None: 

137 self._contexts.pop(session_id, None) 

138 

139 # ── 权限检查 ── 

140 

141 def check(self, session_id: str, required_tier: PermissionTier, resource: str) -> bool: 

142 """检查是否拥有指定资源的权限。""" 

143 ctx = self.get_session(session_id) 

144 

145 # 检查是否有匹配的权限 

146 for perm in ctx.granted_permissions: 

147 if self._tier_covers(perm.tier, required_tier) and self._resource_matches(perm.resource, resource): 

148 # 需要确认则触发回调 

149 if perm.requires_confirmation: 

150 if ctx.on_confirm_needed and not ctx.on_confirm_needed(perm, resource): 

151 return False 

152 return True 

153 return False 

154 

155 def require(self, session_id: str, required_tier: PermissionTier, resource: str) -> None: 

156 """要求权限,不满足则抛出 PermissionDenied。""" 

157 if not self.check(session_id, required_tier, resource): 

158 ctx = self.get_session(session_id) 

159 current_max = max((p.tier for p in ctx.granted_permissions), default=PermissionTier.READ) 

160 raise PermissionDenied( 

161 required_tier, resource, 

162 f"当前最高权限: {current_max.name},需要: {required_tier.name}", 

163 ) 

164 

165 # ── 权限升级 ── 

166 

167 def elevate( 

168 self, 

169 session_id: str, 

170 tier: PermissionTier, 

171 permissions: list[SystemPermission] | None = None, 

172 require_user_approval: bool = True, 

173 ) -> bool: 

174 """升级会话权限级别。""" 

175 ctx = self.get_session(session_id) 

176 

177 if require_user_approval: 

178 # 触发用户确认流程 

179 if ctx.on_confirm_needed: 

180 dummy_perm = SystemPermission(tier, "*", f"升级到 {tier.name}") 

181 if not ctx.on_confirm_needed(dummy_perm, "elevate"): 

182 return False 

183 

184 ctx.tier = tier 

185 if permissions: 

186 ctx.granted_permissions = permissions 

187 return True 

188 

189 # ── 临时提权(供 ApprovalEngine 审批通过后调用)── 

190 

191 def escalate(self, session_id: str, tier: PermissionTier, resource: str) -> None: 

192 """单次临时提权 — 审批通过后临时授予某资源访问权,本会话有效。 

193 

194 与 elevate 不同:escalate 是细粒度的、单资源、可撤销的临时授权; 

195 elevate 是整体层级提升。 

196 """ 

197 ctx = self.get_session(session_id) 

198 # 添加临时权限(不持久化,仅本会话) 

199 temp_perm = SystemPermission( 

200 tier=tier, 

201 resource=resource, 

202 description=f"临时授权: {tier.label} → {resource}", 

203 requires_confirmation=False, # 已经审批过,无需二次确认 

204 ) 

205 ctx.granted_permissions.append(temp_perm) 

206 

207 def revoke_escalation(self, session_id: str, resource: str) -> None: 

208 """撤销某资源的临时提权。""" 

209 ctx = self.get_session(session_id) 

210 ctx.granted_permissions = [ 

211 p for p in ctx.granted_permissions 

212 if not (p.description.startswith("临时授权:") and p.resource == resource) 

213 ] 

214 

215 # ── 预设模式快捷切换 ── 

216 

217 def set_safe_mode(self, session_id: str) -> None: 

218 """切换到安全模式。""" 

219 ctx = self.get_session(session_id) 

220 ctx.granted_permissions = SAFE_PERMISSIONS.copy() 

221 ctx.tier = PermissionTier.WRITE_SANDBOX 

222 

223 def set_dev_mode(self, session_id: str) -> None: 

224 """切换到开发模式。""" 

225 ctx = self.get_session(session_id) 

226 ctx.granted_permissions = DEV_PERMISSIONS.copy() 

227 ctx.tier = PermissionTier.SHELL_STANDARD 

228 

229 def set_full_mode(self, session_id: str) -> None: 

230 """切换到全权限模式(需确认)。""" 

231 ctx = self.get_session(session_id) 

232 ctx.granted_permissions = FULL_PERMISSIONS.copy() 

233 ctx.tier = PermissionTier.ADMIN 

234 

235 # ── 辅助方法 ── 

236 

237 @staticmethod 

238 def _tier_covers(granted: PermissionTier, required: PermissionTier) -> bool: 

239 """检查授权层级是否覆盖需求层级。""" 

240 return granted.value >= required.value 

241 

242 @staticmethod 

243 def _resource_matches(pattern: str, resource: str) -> bool: 

244 """简单的资源匹配(支持 * 通配符)。""" 

245 if pattern == "*": 

246 return True 

247 # 支持 ** 递归匹配 

248 if "**" in pattern: 

249 prefix = pattern.replace("**", "") 

250 return resource.startswith(prefix) 

251 # 支持 * 单层匹配 

252 if "*" in pattern: 

253 import fnmatch 

254 return fnmatch.fnmatch(resource, pattern) 

255 return resource == pattern or resource.startswith(pattern) 

256 

257 

258# ── Auto-generated compat stubs ── 

259 

260class SAFE_PERMISSIONS: pass 

261class DEV_PERMISSIONS: pass 

262class FULL_PERMISSIONS: pass