Coverage for agentos/system/permissions.py: 48%
115 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2分层权限系统 — 让 Agent 操作系统的每一步都在受控范围内。
4权限层级设计理念:
5- 参考 Android 权限模型的分级思想
6- 默认最小权限原则 (Principle of Least Privilege)
7- 高风险操作需二次确认
8- 支持会话级/全局级权限配置
9"""
11from __future__ import annotations
13from dataclasses import dataclass, field
14from enum import IntEnum, auto
15from typing import Optional, Callable
17# ── 权限层级定义 ──────────────────────────────────────────────
20class PermissionTier(IntEnum):
21 """权限层级,数值越大权限越高。"""
22 READ = 0 # 只读访问
23 WRITE_SANDBOX = 1 # 沙箱写入
24 WRITE_ALL = 2 # 全盘写入
25 SHELL_READONLY = 3 # 只读 Shell
26 SHELL_STANDARD = 4 # 标准 Shell(超时/目录限制)
27 SHELL_FULL = 5 # 全权限 Shell
28 BROWSER = 6 # 浏览器自动化
29 ADMIN = 7 # 系统管理
31 @property
32 def label(self) -> str:
33 """中文标签,用于 UI 展示。"""
34 return {
35 0: "只读访问",
36 1: "沙箱写入",
37 2: "全盘写入",
38 3: "只读Shell",
39 4: "标准Shell",
40 5: "全权限Shell",
41 6: "浏览器自动化",
42 7: "系统管理",
43 }.get(self.value, "未知")
46@dataclass
47class SystemPermission:
48 """单个系统权限定义。"""
49 tier: PermissionTier
50 resource: str # 资源标识,如 "/home/user/*", "apt:install"
51 description: str = ""
52 requires_confirmation: bool = False # 是否需要用户二次确认
53 rate_limit_per_minute: int = 0 # 0 表示不限
56# ── 预设权限策略 ──────────────────────────────────────────────
58# 安全模式(默认): 允许读写但 Shell 受限
59SAFE_PERMISSIONS: list[SystemPermission] = [
60 SystemPermission(PermissionTier.READ, "*", "读取任意文件"),
61 SystemPermission(PermissionTier.WRITE_SANDBOX, "/tmp/agentos/**", "沙箱写入"),
62 SystemPermission(PermissionTier.SHELL_READONLY, "ls,cat,head,tail,find,ps,df,du,whoami,pwd,env,echo,date,wc,stat,file,which,uname", "只读 Shell 命令"),
63 SystemPermission(PermissionTier.BROWSER, "*", "浏览器自动化", requires_confirmation=True),
64]
66# 开发模式: 允许写 + 标准 Shell
67DEV_PERMISSIONS: list[SystemPermission] = SAFE_PERMISSIONS + [
68 SystemPermission(PermissionTier.WRITE_ALL, "/home/**", "用户目录读写"),
69 SystemPermission(PermissionTier.SHELL_STANDARD, "*", "标准 Shell(超时60s,沙箱目录)"),
70 SystemPermission(PermissionTier.BROWSER, "*", "浏览器自动化"),
71]
73# 全权限模式: 无限制(高风险,仅限受信环境)
74FULL_PERMISSIONS: list[SystemPermission] = [
75 SystemPermission(PermissionTier.ADMIN, "*", "完全权限"),
76]
79# ── 权限上下文 ─────────────────────────────────────────────────
82@dataclass
83class PermissionContext:
84 """权限检查的上下文信息。"""
85 session_id: str
86 agent_id: str = ""
87 user_id: str = ""
88 tier: PermissionTier = PermissionTier.READ
89 granted_permissions: list[SystemPermission] = field(default_factory=lambda: SAFE_PERMISSIONS.copy())
90 # 回调:当需要用户确认时触发
91 on_confirm_needed: Optional[Callable[[SystemPermission, str], bool]] = None
94class PermissionDenied(Exception):
95 """权限拒绝异常。"""
96 def __init__(self, required: PermissionTier, resource: str, detail: str = ""):
97 self.required = required
98 self.resource = resource
99 self.detail = detail
100 super().__init__(f"权限不足: 需要 {required.name} 访问 {resource}{' — ' + detail if detail else ''}")
103# ── 权限管理器 ─────────────────────────────────────────────────
106class SystemPermissionManager:
107 """系统权限管理器 — 检查、授权、升级、审计。"""
109 def __init__(self, default_tier: PermissionTier = PermissionTier.READ):
110 self._contexts: dict[str, PermissionContext] = {}
111 self._default_tier = default_tier
113 # ── 会话管理 ──
115 def create_session(
116 self,
117 session_id: str,
118 tier: PermissionTier | None = None,
119 permissions: list[SystemPermission] | None = None,
120 ) -> PermissionContext:
121 """创建权限会话。"""
122 ctx = PermissionContext(
123 session_id=session_id,
124 tier=tier or self._default_tier,
125 granted_permissions=permissions or SAFE_PERMISSIONS.copy(),
126 )
127 self._contexts[session_id] = ctx
128 return ctx
130 def get_session(self, session_id: str) -> PermissionContext:
131 """获取会话,不存在则创建默认会话。"""
132 if session_id not in self._contexts:
133 return self.create_session(session_id)
134 return self._contexts[session_id]
136 def close_session(self, session_id: str) -> None:
137 self._contexts.pop(session_id, None)
139 # ── 权限检查 ──
141 def check(self, session_id: str, required_tier: PermissionTier, resource: str) -> bool:
142 """检查是否拥有指定资源的权限。"""
143 ctx = self.get_session(session_id)
145 # 检查是否有匹配的权限
146 for perm in ctx.granted_permissions:
147 if self._tier_covers(perm.tier, required_tier) and self._resource_matches(perm.resource, resource):
148 # 需要确认则触发回调
149 if perm.requires_confirmation:
150 if ctx.on_confirm_needed and not ctx.on_confirm_needed(perm, resource):
151 return False
152 return True
153 return False
155 def require(self, session_id: str, required_tier: PermissionTier, resource: str) -> None:
156 """要求权限,不满足则抛出 PermissionDenied。"""
157 if not self.check(session_id, required_tier, resource):
158 ctx = self.get_session(session_id)
159 current_max = max((p.tier for p in ctx.granted_permissions), default=PermissionTier.READ)
160 raise PermissionDenied(
161 required_tier, resource,
162 f"当前最高权限: {current_max.name},需要: {required_tier.name}",
163 )
165 # ── 权限升级 ──
167 def elevate(
168 self,
169 session_id: str,
170 tier: PermissionTier,
171 permissions: list[SystemPermission] | None = None,
172 require_user_approval: bool = True,
173 ) -> bool:
174 """升级会话权限级别。"""
175 ctx = self.get_session(session_id)
177 if require_user_approval:
178 # 触发用户确认流程
179 if ctx.on_confirm_needed:
180 dummy_perm = SystemPermission(tier, "*", f"升级到 {tier.name}")
181 if not ctx.on_confirm_needed(dummy_perm, "elevate"):
182 return False
184 ctx.tier = tier
185 if permissions:
186 ctx.granted_permissions = permissions
187 return True
189 # ── 临时提权(供 ApprovalEngine 审批通过后调用)──
191 def escalate(self, session_id: str, tier: PermissionTier, resource: str) -> None:
192 """单次临时提权 — 审批通过后临时授予某资源访问权,本会话有效。
194 与 elevate 不同:escalate 是细粒度的、单资源、可撤销的临时授权;
195 elevate 是整体层级提升。
196 """
197 ctx = self.get_session(session_id)
198 # 添加临时权限(不持久化,仅本会话)
199 temp_perm = SystemPermission(
200 tier=tier,
201 resource=resource,
202 description=f"临时授权: {tier.label} → {resource}",
203 requires_confirmation=False, # 已经审批过,无需二次确认
204 )
205 ctx.granted_permissions.append(temp_perm)
207 def revoke_escalation(self, session_id: str, resource: str) -> None:
208 """撤销某资源的临时提权。"""
209 ctx = self.get_session(session_id)
210 ctx.granted_permissions = [
211 p for p in ctx.granted_permissions
212 if not (p.description.startswith("临时授权:") and p.resource == resource)
213 ]
215 # ── 预设模式快捷切换 ──
217 def set_safe_mode(self, session_id: str) -> None:
218 """切换到安全模式。"""
219 ctx = self.get_session(session_id)
220 ctx.granted_permissions = SAFE_PERMISSIONS.copy()
221 ctx.tier = PermissionTier.WRITE_SANDBOX
223 def set_dev_mode(self, session_id: str) -> None:
224 """切换到开发模式。"""
225 ctx = self.get_session(session_id)
226 ctx.granted_permissions = DEV_PERMISSIONS.copy()
227 ctx.tier = PermissionTier.SHELL_STANDARD
229 def set_full_mode(self, session_id: str) -> None:
230 """切换到全权限模式(需确认)。"""
231 ctx = self.get_session(session_id)
232 ctx.granted_permissions = FULL_PERMISSIONS.copy()
233 ctx.tier = PermissionTier.ADMIN
235 # ── 辅助方法 ──
237 @staticmethod
238 def _tier_covers(granted: PermissionTier, required: PermissionTier) -> bool:
239 """检查授权层级是否覆盖需求层级。"""
240 return granted.value >= required.value
242 @staticmethod
243 def _resource_matches(pattern: str, resource: str) -> bool:
244 """简单的资源匹配(支持 * 通配符)。"""
245 if pattern == "*":
246 return True
247 # 支持 ** 递归匹配
248 if "**" in pattern:
249 prefix = pattern.replace("**", "")
250 return resource.startswith(prefix)
251 # 支持 * 单层匹配
252 if "*" in pattern:
253 import fnmatch
254 return fnmatch.fnmatch(resource, pattern)
255 return resource == pattern or resource.startswith(pattern)
258# ── Auto-generated compat stubs ──
260class SAFE_PERMISSIONS: pass
261class DEV_PERMISSIONS: pass
262class FULL_PERMISSIONS: pass