Coverage for security.py: 69%

124 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-29 02:55 +0800

1""" 

2安全与权限控制模块 (Security & Permissions) 

3 

4负责管理 Agent 的权限,实施沙箱隔离,防止越权访问。 

5配置存储在 ~/.qrclaw/permissions.yaml 中。 

6 

7权限继承规则: 

8- 子 agent 继承父 agent 的权限 

9- 这样子 agent 可以访问父 agent 能访问的路径 

10""" 

11import os 

12import re 

13import yaml 

14from pathlib import Path 

15from typing import List, Literal, Optional 

16from pydantic import BaseModel, Field 

17 

18# 权限配置文件路径 

19PERMISSIONS_FILE = Path.home() / ".qrclaw" / "permissions.yaml" 

20 

21# 系统保护路径:只有 default agent 可访问 

22PROTECTED_PATHS = [ 

23 Path.home() / ".qrclaw" / "permissions.yaml", 

24 Path.home() / ".qrclaw" / "config.yaml", 

25] 

26 

27# 权限拒绝提示后缀 

28PERMISSION_DENIED_SUFFIX = "\n\n⚠️ 你的上级不允许操作除了你工作空间以外的任何文件,请立即停止!" 

29 

30# LLM 安全检查的 prompt 

31SHELL_SAFETY_PROMPT = """你是一个命令行安全审查员。判断以下 shell 命令是否安全。 

32 

33安全规则: 

341. 命令只能操作当前工作目录内的文件 

352. 不能访问、读取、修改工作目录外的任何文件或目录 

363. 不能使用绝对路径(如 /etc/passwd, ~/.ssh/id_rsa) 

374. 不能使用管道或重定向访问外部文件 

385. 不能执行网络操作(curl, wget, nc 等) 

396. 不能修改系统配置或环境变量 

407. 不能执行提权操作(sudo, su, chmod 777 等) 

41 

42工作目录: {workspace} 

43 

44命令: 

45{command} 

46 

47只返回一个词: true 或 false 

48true = 安全,允许执行 

49false = 危险,拒绝执行 

50 

51不要解释,只返回 true 或 false。""" 

52 

53 

54class AgentPermission(BaseModel): 

55 """单个 Agent 的权限配置""" 

56 access: Literal["full", "scoped", "readonly"] = "scoped" 

57 allow_paths: List[str] = Field(default_factory=list) 

58 

59 

60class PermissionConfig(BaseModel): 

61 """全局权限配置""" 

62 default_policy: Literal["restricted", "full"] = "restricted" 

63 agents: dict[str, AgentPermission] = Field(default_factory=dict) 

64 

65 

66# 默认配置:Default Agent 拥有最高权限 

67DEFAULT_PERMISSIONS = { 

68 "default_policy": "restricted", 

69 "agents": { 

70 "default": { 

71 "access": "full", 

72 "allow_paths": [] 

73 } 

74 } 

75} 

76 

77 

78# 高危命令黑名单(正则匹配) 

79DANGEROUS_COMMANDS = [ 

80 r"rm\s+-rf\s+/", 

81 r"rm\s+-rf\s+~", 

82 r"rm\s+-rf\s+\*", 

83 r">\s*/dev/sd[a-z]", 

84 r"mkfs\s+", 

85 r"dd\s+if=.*of=/dev/", 

86 r":(){ :|:& };:", # fork bomb 

87 r"chmod\s+777\s+/", 

88 r"chown\s+.*\s+/", 

89] 

90 

91 

92def _llm_check_command_safety(command: str, workspace: Path) -> bool: 

93 """ 

94 使用 LLM 判断命令是否安全 

95  

96 Args: 

97 command: 要检查的命令 

98 workspace: 工作目录路径 

99  

100 Returns: 

101 True = 安全,False = 危险 

102 """ 

103 try: 

104 from qrclaw.providers import provider 

105 from qrclaw.logger import get_logger 

106 

107 logger = get_logger("qrclaw.security") 

108 

109 prompt = SHELL_SAFETY_PROMPT.format( 

110 workspace=str(workspace), 

111 command=command 

112 ) 

113 

114 logger.debug(f"LLM 安全检查: {command[:100]}...") 

115 

116 response = provider.chat([{"role": "user", "content": prompt}]) 

117 result = response.content.strip().lower() 

118 

119 logger.info(f"LLM 安全检查结果: {result}") 

120 

121 return result == "true" 

122 

123 except Exception as e: 

124 # LLM 调用失败时,默认拒绝(安全优先) 

125 from qrclaw.logger import get_logger 

126 logger = get_logger("qrclaw.security") 

127 logger.error(f"LLM 安全检查失败: {e}") 

128 return False 

129 

130 

131def _get_effective_agent_id(agent_id: str) -> str: 

132 """ 

133 获取有效的 agent ID(用于权限继承) 

134  

135 子 agent 会继承父 agent 的权限。 

136 例如:如果 sub-agent "coder" 的父 agent 是 "default", 

137 那么 "coder" 会使用 "default" 的权限配置。 

138  

139 Args: 

140 agent_id: 当前 agent ID(可能是子 agent) 

141  

142 Returns: 

143 str: 用于权限查找的有效 agent ID 

144 """ 

145 # 尝试导入 spawn_agent 模块获取父 agent 映射 

146 try: 

147 from qrclaw.tools.spawn_agent import get_parent_agent_id 

148 parent_id = get_parent_agent_id(agent_id) 

149 if parent_id: 

150 # 递归查找,直到找到没有父 agent 的顶层 agent 

151 return _get_effective_agent_id(parent_id) 

152 except ImportError: 

153 pass 

154 

155 # 没有父 agent,返回自身 

156 return agent_id 

157 

158 

159class SecurityManager: 

160 _instance = None 

161 _config: PermissionConfig = None 

162 

163 def __new__(cls): 

164 if cls._instance is None: 

165 cls._instance = super(SecurityManager, cls).__new__(cls) 

166 cls._instance._load_config() 

167 return cls._instance 

168 

169 def _load_config(self): 

170 """加载权限配置,文件不存在则创建默认配置""" 

171 if not PERMISSIONS_FILE.exists(): 

172 self._create_default_config() 

173 

174 try: 

175 with open(PERMISSIONS_FILE, "r", encoding="utf-8") as f: 

176 data = yaml.safe_load(f) or {} 

177 # 兼容性处理:确保 default agent 始终存在 

178 if "agents" not in data: 

179 data["agents"] = {} 

180 if "default" not in data["agents"]: 

181 data["agents"]["default"] = DEFAULT_PERMISSIONS["agents"]["default"] 

182 

183 self._config = PermissionConfig(**data) 

184 except Exception as e: 

185 print(f"❌ 加载权限配置失败: {e},将使用默认安全策略") 

186 self._config = PermissionConfig(**DEFAULT_PERMISSIONS) 

187 

188 def _create_default_config(self): 

189 """创建默认的 permissions.yaml""" 

190 PERMISSIONS_FILE.parent.mkdir(parents=True, exist_ok=True) 

191 with open(PERMISSIONS_FILE, "w", encoding="utf-8") as f: 

192 yaml.dump(DEFAULT_PERMISSIONS, f, default_flow_style=False, allow_unicode=True) 

193 # 添加注释 

194 f.write("\n# access: full (无限制) | scoped (仅限workspace+白名单) | readonly (只读)\n") 

195 f.write("# allow_paths: 允许访问的外部路径列表\n") 

196 

197 def get_permission(self, agent_id: str) -> AgentPermission: 

198 """ 

199 获取指定 Agent 的权限配置(支持子 agent 权限继承) 

200  

201 子 agent 会继承父 agent 的权限。 

202 例如:如果 "coder" 是 "default" 的子 agent, 

203 那么 "coder" 会使用 "default" 的权限配置。 

204 """ 

205 # 获取有效的 agent ID(可能从父 agent 继承) 

206 effective_id = _get_effective_agent_id(agent_id) 

207 

208 if effective_id != agent_id: 

209 from qrclaw.logger import get_logger 

210 logger = get_logger("qrclaw.security") 

211 logger.debug(f"子 agent '{agent_id}' 继承父 agent '{effective_id}' 的权限") 

212 

213 # 1. 优先查明确配置 

214 if effective_id in self._config.agents: 

215 return self._config.agents[effective_id] 

216 

217 # 2. 回退到默认策略 

218 # 如果全局策略是 restricted,则新 Agent 默认为 scoped 

219 return AgentPermission(access="scoped") 

220 

221 def check_access(self, agent_id: str, tool_name: str, args: dict, workspace_root: Path): 

222 """ 

223 核心切面:检查工具调用是否合规 

224  

225 Args: 

226 agent_id: 当前 Agent ID 

227 tool_name: 工具名称 

228 args: 工具参数 

229 workspace_root: 当前 Agent 的工作区根目录 

230  

231 Raises: 

232 PermissionError: 如果权限不足 

233 """ 

234 perm = self.get_permission(agent_id) 

235 

236 # Level 1: Full Access (Root) 

237 if perm.access == "full": 

238 return # 放行 

239 

240 # Level 2: 检查文件路径 

241 # 针对涉及文件操作的工具进行拦截 

242 if tool_name in ["read_file", "write_file", "list_directory", "delete_file"]: 

243 path_str = args.get("path") 

244 if not path_str: 

245 return # 无路径参数,跳过 

246 

247 self._check_path_safety(path_str, perm, workspace_root, tool_name, agent_id) 

248 

249 # Level 3: 检查 Shell 命令 

250 if tool_name == "run_shell": 

251 command = args.get("command", "") 

252 self._check_shell_safety(command, perm, workspace_root, agent_id) 

253 

254 def _check_path_safety(self, path_str: str, perm: AgentPermission, workspace_root: Path, tool_name: str, agent_id: str): 

255 """检查路径是否在允许范围内""" 

256 try: 

257 target_path = Path(path_str).expanduser().resolve() 

258 ws_root = workspace_root.resolve() 

259 except Exception as e: 

260 raise PermissionError(f"🚫 [Security] 路径解析失败: {e}{PERMISSION_DENIED_SUFFIX}") 

261 

262 # 检查保护路径(只有 default agent 可访问) 

263 for protected in PROTECTED_PATHS: 

264 if target_path == protected.resolve(): 

265 if agent_id != "default": 

266 raise PermissionError(f"🚫 [Security] 系统配置文件只有 default agent 可访问: {path_str}{PERMISSION_DENIED_SUFFIX}") 

267 

268 # 1. 检查是否在 Workspace 内部 

269 if self._is_subpath(target_path, ws_root): 

270 return # 放行 

271 

272 # 2. 检查白名单 

273 for allowed in perm.allow_paths: 

274 allowed_path = Path(allowed).expanduser().resolve() 

275 if self._is_subpath(target_path, allowed_path): 

276 # 如果是 readonly 权限,且试图写操作 -> 拦截 

277 if perm.access == "readonly" and tool_name in ["write_file", "delete_file"]: 

278 raise PermissionError(f"🚫 [Security] 该路径只读,禁止写入: {path_str}{PERMISSION_DENIED_SUFFIX}") 

279 return # 放行 

280 

281 # 3. 拦截 

282 raise PermissionError(f"🚫 [Security] Agent 无权访问 Workspace 外部路径: {path_str}{PERMISSION_DENIED_SUFFIX}") 

283 

284 def _check_shell_safety(self, command: str, perm: AgentPermission, workspace_root: Path, agent_id: str): 

285 """ 

286 检查 Shell 命令安全性 

287  

288 对于 scoped 权限的 agent: 

289 1. 禁止高危命令(rm -rf /, fork bomb 等)- 规则匹配 

290 2. 使用 LLM 判断命令是否试图访问工作目录外的文件 

291 """ 

292 # 检查高危命令(规则匹配,快速拦截) 

293 for pattern in DANGEROUS_COMMANDS: 

294 if re.search(pattern, command, re.IGNORECASE): 

295 raise PermissionError(f"🚫 [Security] 禁止执行高危命令: {command}{PERMISSION_DENIED_SUFFIX}") 

296 

297 # 使用 LLM 判断命令安全性 

298 if not _llm_check_command_safety(command, workspace_root): 

299 raise PermissionError(f"🚫 [Security] LLM 判断命令存在安全风险,拒绝执行: {command}{PERMISSION_DENIED_SUFFIX}") 

300 

301 def _is_subpath(self, target: Path, parent: Path) -> bool: 

302 """判断 target 是否是 parent 的子路径""" 

303 try: 

304 target.relative_to(parent) 

305 return True 

306 except ValueError: 

307 return False 

308 

309 

310# 全局单例 

311security_manager = SecurityManager()