Coverage for security.py: 69%
124 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-29 02:55 +0800
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-29 02:55 +0800
1"""
2安全与权限控制模块 (Security & Permissions)
4负责管理 Agent 的权限,实施沙箱隔离,防止越权访问。
5配置存储在 ~/.qrclaw/permissions.yaml 中。
7权限继承规则:
8- 子 agent 继承父 agent 的权限
9- 这样子 agent 可以访问父 agent 能访问的路径
10"""
11import os
12import re
13import yaml
14from pathlib import Path
15from typing import List, Literal, Optional
16from pydantic import BaseModel, Field
18# 权限配置文件路径
19PERMISSIONS_FILE = Path.home() / ".qrclaw" / "permissions.yaml"
21# 系统保护路径:只有 default agent 可访问
22PROTECTED_PATHS = [
23 Path.home() / ".qrclaw" / "permissions.yaml",
24 Path.home() / ".qrclaw" / "config.yaml",
25]
27# 权限拒绝提示后缀
28PERMISSION_DENIED_SUFFIX = "\n\n⚠️ 你的上级不允许操作除了你工作空间以外的任何文件,请立即停止!"
30# LLM 安全检查的 prompt
31SHELL_SAFETY_PROMPT = """你是一个命令行安全审查员。判断以下 shell 命令是否安全。
33安全规则:
341. 命令只能操作当前工作目录内的文件
352. 不能访问、读取、修改工作目录外的任何文件或目录
363. 不能使用绝对路径(如 /etc/passwd, ~/.ssh/id_rsa)
374. 不能使用管道或重定向访问外部文件
385. 不能执行网络操作(curl, wget, nc 等)
396. 不能修改系统配置或环境变量
407. 不能执行提权操作(sudo, su, chmod 777 等)
42工作目录: {workspace}
44命令:
45{command}
47只返回一个词: true 或 false
48true = 安全,允许执行
49false = 危险,拒绝执行
51不要解释,只返回 true 或 false。"""
54class AgentPermission(BaseModel):
55 """单个 Agent 的权限配置"""
56 access: Literal["full", "scoped", "readonly"] = "scoped"
57 allow_paths: List[str] = Field(default_factory=list)
60class PermissionConfig(BaseModel):
61 """全局权限配置"""
62 default_policy: Literal["restricted", "full"] = "restricted"
63 agents: dict[str, AgentPermission] = Field(default_factory=dict)
66# 默认配置:Default Agent 拥有最高权限
67DEFAULT_PERMISSIONS = {
68 "default_policy": "restricted",
69 "agents": {
70 "default": {
71 "access": "full",
72 "allow_paths": []
73 }
74 }
75}
78# 高危命令黑名单(正则匹配)
79DANGEROUS_COMMANDS = [
80 r"rm\s+-rf\s+/",
81 r"rm\s+-rf\s+~",
82 r"rm\s+-rf\s+\*",
83 r">\s*/dev/sd[a-z]",
84 r"mkfs\s+",
85 r"dd\s+if=.*of=/dev/",
86 r":(){ :|:& };:", # fork bomb
87 r"chmod\s+777\s+/",
88 r"chown\s+.*\s+/",
89]
92def _llm_check_command_safety(command: str, workspace: Path) -> bool:
93 """
94 使用 LLM 判断命令是否安全
96 Args:
97 command: 要检查的命令
98 workspace: 工作目录路径
100 Returns:
101 True = 安全,False = 危险
102 """
103 try:
104 from qrclaw.providers import provider
105 from qrclaw.logger import get_logger
107 logger = get_logger("qrclaw.security")
109 prompt = SHELL_SAFETY_PROMPT.format(
110 workspace=str(workspace),
111 command=command
112 )
114 logger.debug(f"LLM 安全检查: {command[:100]}...")
116 response = provider.chat([{"role": "user", "content": prompt}])
117 result = response.content.strip().lower()
119 logger.info(f"LLM 安全检查结果: {result}")
121 return result == "true"
123 except Exception as e:
124 # LLM 调用失败时,默认拒绝(安全优先)
125 from qrclaw.logger import get_logger
126 logger = get_logger("qrclaw.security")
127 logger.error(f"LLM 安全检查失败: {e}")
128 return False
131def _get_effective_agent_id(agent_id: str) -> str:
132 """
133 获取有效的 agent ID(用于权限继承)
135 子 agent 会继承父 agent 的权限。
136 例如:如果 sub-agent "coder" 的父 agent 是 "default",
137 那么 "coder" 会使用 "default" 的权限配置。
139 Args:
140 agent_id: 当前 agent ID(可能是子 agent)
142 Returns:
143 str: 用于权限查找的有效 agent ID
144 """
145 # 尝试导入 spawn_agent 模块获取父 agent 映射
146 try:
147 from qrclaw.tools.spawn_agent import get_parent_agent_id
148 parent_id = get_parent_agent_id(agent_id)
149 if parent_id:
150 # 递归查找,直到找到没有父 agent 的顶层 agent
151 return _get_effective_agent_id(parent_id)
152 except ImportError:
153 pass
155 # 没有父 agent,返回自身
156 return agent_id
159class SecurityManager:
160 _instance = None
161 _config: PermissionConfig = None
163 def __new__(cls):
164 if cls._instance is None:
165 cls._instance = super(SecurityManager, cls).__new__(cls)
166 cls._instance._load_config()
167 return cls._instance
169 def _load_config(self):
170 """加载权限配置,文件不存在则创建默认配置"""
171 if not PERMISSIONS_FILE.exists():
172 self._create_default_config()
174 try:
175 with open(PERMISSIONS_FILE, "r", encoding="utf-8") as f:
176 data = yaml.safe_load(f) or {}
177 # 兼容性处理:确保 default agent 始终存在
178 if "agents" not in data:
179 data["agents"] = {}
180 if "default" not in data["agents"]:
181 data["agents"]["default"] = DEFAULT_PERMISSIONS["agents"]["default"]
183 self._config = PermissionConfig(**data)
184 except Exception as e:
185 print(f"❌ 加载权限配置失败: {e},将使用默认安全策略")
186 self._config = PermissionConfig(**DEFAULT_PERMISSIONS)
188 def _create_default_config(self):
189 """创建默认的 permissions.yaml"""
190 PERMISSIONS_FILE.parent.mkdir(parents=True, exist_ok=True)
191 with open(PERMISSIONS_FILE, "w", encoding="utf-8") as f:
192 yaml.dump(DEFAULT_PERMISSIONS, f, default_flow_style=False, allow_unicode=True)
193 # 添加注释
194 f.write("\n# access: full (无限制) | scoped (仅限workspace+白名单) | readonly (只读)\n")
195 f.write("# allow_paths: 允许访问的外部路径列表\n")
197 def get_permission(self, agent_id: str) -> AgentPermission:
198 """
199 获取指定 Agent 的权限配置(支持子 agent 权限继承)
201 子 agent 会继承父 agent 的权限。
202 例如:如果 "coder" 是 "default" 的子 agent,
203 那么 "coder" 会使用 "default" 的权限配置。
204 """
205 # 获取有效的 agent ID(可能从父 agent 继承)
206 effective_id = _get_effective_agent_id(agent_id)
208 if effective_id != agent_id:
209 from qrclaw.logger import get_logger
210 logger = get_logger("qrclaw.security")
211 logger.debug(f"子 agent '{agent_id}' 继承父 agent '{effective_id}' 的权限")
213 # 1. 优先查明确配置
214 if effective_id in self._config.agents:
215 return self._config.agents[effective_id]
217 # 2. 回退到默认策略
218 # 如果全局策略是 restricted,则新 Agent 默认为 scoped
219 return AgentPermission(access="scoped")
221 def check_access(self, agent_id: str, tool_name: str, args: dict, workspace_root: Path):
222 """
223 核心切面:检查工具调用是否合规
225 Args:
226 agent_id: 当前 Agent ID
227 tool_name: 工具名称
228 args: 工具参数
229 workspace_root: 当前 Agent 的工作区根目录
231 Raises:
232 PermissionError: 如果权限不足
233 """
234 perm = self.get_permission(agent_id)
236 # Level 1: Full Access (Root)
237 if perm.access == "full":
238 return # 放行
240 # Level 2: 检查文件路径
241 # 针对涉及文件操作的工具进行拦截
242 if tool_name in ["read_file", "write_file", "list_directory", "delete_file"]:
243 path_str = args.get("path")
244 if not path_str:
245 return # 无路径参数,跳过
247 self._check_path_safety(path_str, perm, workspace_root, tool_name, agent_id)
249 # Level 3: 检查 Shell 命令
250 if tool_name == "run_shell":
251 command = args.get("command", "")
252 self._check_shell_safety(command, perm, workspace_root, agent_id)
254 def _check_path_safety(self, path_str: str, perm: AgentPermission, workspace_root: Path, tool_name: str, agent_id: str):
255 """检查路径是否在允许范围内"""
256 try:
257 target_path = Path(path_str).expanduser().resolve()
258 ws_root = workspace_root.resolve()
259 except Exception as e:
260 raise PermissionError(f"🚫 [Security] 路径解析失败: {e}{PERMISSION_DENIED_SUFFIX}")
262 # 检查保护路径(只有 default agent 可访问)
263 for protected in PROTECTED_PATHS:
264 if target_path == protected.resolve():
265 if agent_id != "default":
266 raise PermissionError(f"🚫 [Security] 系统配置文件只有 default agent 可访问: {path_str}{PERMISSION_DENIED_SUFFIX}")
268 # 1. 检查是否在 Workspace 内部
269 if self._is_subpath(target_path, ws_root):
270 return # 放行
272 # 2. 检查白名单
273 for allowed in perm.allow_paths:
274 allowed_path = Path(allowed).expanduser().resolve()
275 if self._is_subpath(target_path, allowed_path):
276 # 如果是 readonly 权限,且试图写操作 -> 拦截
277 if perm.access == "readonly" and tool_name in ["write_file", "delete_file"]:
278 raise PermissionError(f"🚫 [Security] 该路径只读,禁止写入: {path_str}{PERMISSION_DENIED_SUFFIX}")
279 return # 放行
281 # 3. 拦截
282 raise PermissionError(f"🚫 [Security] Agent 无权访问 Workspace 外部路径: {path_str}{PERMISSION_DENIED_SUFFIX}")
284 def _check_shell_safety(self, command: str, perm: AgentPermission, workspace_root: Path, agent_id: str):
285 """
286 检查 Shell 命令安全性
288 对于 scoped 权限的 agent:
289 1. 禁止高危命令(rm -rf /, fork bomb 等)- 规则匹配
290 2. 使用 LLM 判断命令是否试图访问工作目录外的文件
291 """
292 # 检查高危命令(规则匹配,快速拦截)
293 for pattern in DANGEROUS_COMMANDS:
294 if re.search(pattern, command, re.IGNORECASE):
295 raise PermissionError(f"🚫 [Security] 禁止执行高危命令: {command}{PERMISSION_DENIED_SUFFIX}")
297 # 使用 LLM 判断命令安全性
298 if not _llm_check_command_safety(command, workspace_root):
299 raise PermissionError(f"🚫 [Security] LLM 判断命令存在安全风险,拒绝执行: {command}{PERMISSION_DENIED_SUFFIX}")
301 def _is_subpath(self, target: Path, parent: Path) -> bool:
302 """判断 target 是否是 parent 的子路径"""
303 try:
304 target.relative_to(parent)
305 return True
306 except ValueError:
307 return False
310# 全局单例
311security_manager = SecurityManager()