Coverage for agentos/system/shell_exec.py: 33%

156 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2Shell 执行模块 — 带权限检查和安全沙箱的命令执行。 

3 

4分层策略: 

5- SHELL_READONLY: 只允许预定义安全命令白名单 

6- SHELL_STANDARD: 允许任意命令,超时+沙箱目录限制 

7- SHELL_FULL: 无限制(需二次确认) 

8""" 

9 

10from __future__ import annotations 

11 

12import os 

13import re 

14import signal 

15import subprocess 

16import tempfile 

17import shlex 

18from dataclasses import dataclass, field 

19from typing import Optional 

20 

21from agentos.system.permissions import ( 

22 SystemPermissionManager, 

23 PermissionTier, 

24 PermissionDenied, 

25) 

26 

27 

28# ── Shell 策略 ───────────────────────────────────────────────── 

29 

30 

31@dataclass 

32class ShellPolicy: 

33 """Shell 执行策略。""" 

34 allowed_commands: list[str] = field(default_factory=list) # 命令白名单 

35 blocked_commands: list[str] = field(default_factory=list) # 命令黑名单 

36 blocked_patterns: list[str] = field(default_factory=list) # 参数黑名单模式 

37 timeout_seconds: int = 30 # 超时时间 

38 max_output_bytes: int = 1024 * 100 # 最大输出字节 

39 sandbox_dir: str = "" # 沙箱工作目录 

40 allow_pipes: bool = False # 是否允许管道 

41 allow_redirects: bool = False # 是否允许重定向 

42 

43 

44@dataclass 

45class ShellResult: 

46 """Shell 执行结果。""" 

47 success: bool 

48 command: str 

49 stdout: str 

50 stderr: str 

51 exit_code: int = -1 

52 duration_ms: float = 0 

53 timeout: bool = False 

54 permission_denied: bool = False 

55 error: str = "" 

56 

57 

58# ── 预设策略 ─────────────────────────────────────────────────── 

59 

60READONLY_POLICY = ShellPolicy( 

61 allowed_commands=[ 

62 "ls", "cat", "head", "tail", "find", "ps", "df", "du", 

63 "whoami", "pwd", "env", "echo", "date", "wc", "stat", 

64 "file", "which", "uname", "uptime", "id", "groups", 

65 "free", "top", "grep", "awk", "sed", "sort", "uniq", 

66 "cut", "tr", "tee", "xargs", "basename", "dirname", 

67 "readlink", "realpath", "md5sum", "sha256sum", "diff", 

68 "curl", "wget", "ping", "hostname", "ip", "ss", 

69 "python3", "python", "pip", "pip3", "git", "node", "npm", 

70 ], 

71 timeout_seconds=30, 

72 allow_pipes=True, 

73 allow_redirects=False, 

74) 

75 

76STANDARD_POLICY = ShellPolicy( 

77 blocked_commands=[ 

78 "rm", "shutdown", "reboot", "halt", "poweroff", 

79 "mkfs", "dd", "fdisk", "parted", "mount", "umount", 

80 "chmod", "chown", "useradd", "userdel", "passwd", 

81 "iptables", "ufw", "systemctl", "service", 

82 ], 

83 blocked_patterns=[ 

84 r"rm\s+(-rf?|--recursive)\s+/", # rm -rf / 

85 r">\s*/dev/", # 覆盖设备 

86 r"mkfs\.", # 格式化 

87 r"dd\s+if=", # dd 操作 

88 r"curl.*\|.*sh", # curl pipe sh 

89 r"wget.*\|.*sh", # wget pipe sh 

90 r">\s*/etc/", # 写入 /etc/ 

91 ], 

92 timeout_seconds=60, 

93 max_output_bytes=1024 * 500, 

94 allow_pipes=True, 

95 allow_redirects=True, 

96) 

97 

98FULL_POLICY = ShellPolicy( 

99 timeout_seconds=300, 

100 max_output_bytes=1024 * 1024 * 10, 

101 allow_pipes=True, 

102 allow_redirects=True, 

103) 

104 

105 

106# ── Shell 沙箱 ───────────────────────────────────────────────── 

107 

108 

109class ShellSandbox: 

110 """Shell 沙箱 — 隔离命令执行环境。 

111 

112 特性: 

113 - 临时工作目录隔离 

114 - 环境变量过滤(移除敏感变量) 

115 - 资源限制(超时、输出大小) 

116 - 进程组管理(确保超时时子进程也被杀死) 

117 """ 

118 

119 def __init__(self, work_dir: str | None = None): 

120 self._work_dir = work_dir or tempfile.mkdtemp(prefix="agentos_shell_") 

121 

122 @property 

123 def work_dir(self) -> str: 

124 return self._work_dir 

125 

126 def filtered_env(self) -> dict[str, str]: 

127 """返回过滤后的环境变量(移除敏感变量)。""" 

128 blocked = {"AWS_", "SECRET_", "TOKEN", "PASSWORD", "PASSWD", 

129 "KEY", "CREDENTIAL", "PRIVATE", "CERT", "AUTH"} 

130 env = {} 

131 for k, v in os.environ.items(): 

132 if not any(b in k.upper() for b in blocked): 

133 env[k] = v 

134 # 设置安全默认值 

135 env["HOME"] = self._work_dir 

136 env["PATH"] = os.environ.get("PATH", "/usr/local/bin:/usr/bin:/bin") 

137 return env 

138 

139 def cleanup(self) -> None: 

140 """清理沙箱目录。""" 

141 import shutil 

142 try: 

143 shutil.rmtree(self._work_dir, ignore_errors=True) 

144 except Exception: 

145 pass 

146 

147 

148# ── Shell 执行器 ──────────────────────────────────────────────── 

149 

150 

151class ShellExecutor: 

152 """Shell 执行器 — 带策略和权限检查的命令执行。""" 

153 

154 def __init__(self, perm_manager: SystemPermissionManager, session_id: str): 

155 self._pm = perm_manager 

156 self._sid = session_id 

157 self._sandboxes: dict[str, ShellSandbox] = {} 

158 

159 # ── 沙箱管理 ── 

160 

161 def create_sandbox(self, name: str = "default") -> ShellSandbox: 

162 """创建命名沙箱。""" 

163 sb = ShellSandbox() 

164 self._sandboxes[name] = sb 

165 return sb 

166 

167 def get_sandbox(self, name: str = "default") -> ShellSandbox: 

168 """获取或创建沙箱。""" 

169 if name not in self._sandboxes: 

170 return self.create_sandbox(name) 

171 return self._sandboxes[name] 

172 

173 def cleanup_sandbox(self, name: str = "default") -> None: 

174 """清理指定沙箱。""" 

175 sb = self._sandboxes.pop(name, None) 

176 if sb: 

177 sb.cleanup() 

178 

179 def cleanup_all(self) -> None: 

180 for sb in list(self._sandboxes.values()): 

181 sb.cleanup() 

182 self._sandboxes.clear() 

183 

184 # ── 命令执行 ── 

185 

186 def execute(self, command: str, sandbox_name: str = "default") -> ShellResult: 

187 """执行 Shell 命令,自动选择策略。""" 

188 # 选择策略 

189 try: 

190 self._pm.require(self._sid, PermissionTier.SHELL_FULL, command) 

191 policy = FULL_POLICY 

192 tier = PermissionTier.SHELL_FULL 

193 except PermissionDenied: 

194 try: 

195 self._pm.require(self._sid, PermissionTier.SHELL_STANDARD, command) 

196 policy = STANDARD_POLICY 

197 tier = PermissionTier.SHELL_STANDARD 

198 except PermissionDenied: 

199 try: 

200 self._pm.require(self._sid, PermissionTier.SHELL_READONLY, command) 

201 policy = READONLY_POLICY 

202 tier = PermissionTier.SHELL_READONLY 

203 except PermissionDenied as e: 

204 return ShellResult( 

205 success=False, command=command, 

206 stdout="", stderr="", permission_denied=True, error=str(e), 

207 ) 

208 

209 return self._execute_with_policy(command, policy, sandbox_name) 

210 

211 def execute_checked(self, command: str, required_tier: PermissionTier, 

212 sandbox_name: str = "default") -> ShellResult: 

213 """以指定权限级别执行命令。""" 

214 self._pm.require(self._sid, required_tier, command) 

215 if required_tier == PermissionTier.SHELL_READONLY: 

216 policy = READONLY_POLICY 

217 elif required_tier == PermissionTier.SHELL_STANDARD: 

218 policy = STANDARD_POLICY 

219 else: 

220 policy = FULL_POLICY 

221 return self._execute_with_policy(command, policy, sandbox_name) 

222 

223 # ── 内部实现 ── 

224 

225 def _execute_with_policy(self, command: str, policy: ShellPolicy, 

226 sandbox_name: str) -> ShellResult: 

227 """按策略执行命令。""" 

228 import time 

229 

230 # 安全检查 

231 safety_check = self._safety_check(command, policy) 

232 if safety_check: 

233 return ShellResult( 

234 success=False, command=command, 

235 stdout="", stderr=safety_check, error=safety_check, 

236 ) 

237 

238 sandbox = self.get_sandbox(sandbox_name) 

239 

240 try: 

241 t0 = time.time() 

242 proc = subprocess.Popen( 

243 command, 

244 shell=True, 

245 stdout=subprocess.PIPE, 

246 stderr=subprocess.PIPE, 

247 cwd=sandbox.work_dir, 

248 env=sandbox.filtered_env(), 

249 preexec_fn=os.setsid, # 创建新进程组,便于超时杀子进程 

250 text=True, 

251 ) 

252 

253 try: 

254 stdout, stderr = proc.communicate(timeout=policy.timeout_seconds) 

255 exit_code = proc.returncode 

256 timeout = False 

257 except subprocess.TimeoutExpired: 

258 # 杀死整个进程组 

259 os.killpg(os.getpgid(proc.pid), signal.SIGTERM) 

260 try: 

261 stdout, stderr = proc.communicate(timeout=5) 

262 except subprocess.TimeoutExpired: 

263 os.killpg(os.getpgid(proc.pid), signal.SIGKILL) 

264 stdout, stderr = proc.communicate() 

265 exit_code = -1 

266 timeout = True 

267 

268 duration_ms = (time.time() - t0) * 1000 

269 

270 # 截断过大的输出 

271 stdout = self._truncate(stdout, policy.max_output_bytes) 

272 stderr = self._truncate(stderr, policy.max_output_bytes) 

273 

274 return ShellResult( 

275 success=(exit_code == 0 and not timeout), 

276 command=command, 

277 stdout=stdout, 

278 stderr=stderr, 

279 exit_code=exit_code, 

280 duration_ms=duration_ms, 

281 timeout=timeout, 

282 ) 

283 

284 except Exception as e: 

285 return ShellResult( 

286 success=False, command=command, 

287 stdout="", stderr="", error=str(e), 

288 ) 

289 

290 def _safety_check(self, command: str, policy: ShellPolicy) -> str: 

291 """安全检查,返回错误信息或空字符串。""" 

292 # 提取主命令 

293 cmd_parts = shlex.split(command) if command else [] 

294 if not cmd_parts: 

295 return "空命令" 

296 

297 main_cmd = os.path.basename(cmd_parts[0]) 

298 

299 # 白名单检查 

300 if policy.allowed_commands: 

301 if main_cmd not in policy.allowed_commands and cmd_parts[0] not in policy.allowed_commands: 

302 return f"命令 '{main_cmd}' 不在允许列表中。允许的命令: {', '.join(policy.allowed_commands[:20])}" 

303 

304 # 黑名单检查 

305 if policy.blocked_commands: 

306 if main_cmd in policy.blocked_commands or cmd_parts[0] in policy.blocked_commands: 

307 return f"命令 '{main_cmd}' 已被阻止。被阻止的命令: {', '.join(policy.blocked_commands)}" 

308 

309 # 危险模式检查 

310 for pattern in policy.blocked_patterns: 

311 if re.search(pattern, command): 

312 return f"命令包含危险模式: {pattern}" 

313 

314 # 管道检查 

315 if not policy.allow_pipes and "|" in command: 

316 return "管道操作不被允许" 

317 

318 # 重定向检查 

319 if not policy.allow_redirects and re.search(r"[<>]", command): 

320 return "重定向操作不被允许" 

321 

322 return "" 

323 

324 @staticmethod 

325 def _truncate(text: str, max_bytes: int) -> str: 

326 """截断文本到指定字节数。""" 

327 encoded = text.encode("utf-8") 

328 if len(encoded) <= max_bytes: 

329 return text 

330 truncated = encoded[:max_bytes].decode("utf-8", errors="replace") 

331 return truncated + f"\n... [截断: {len(encoded)} → {max_bytes} 字节]"