Coverage for agentos/system/shell_exec.py: 33%
156 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2Shell 执行模块 — 带权限检查和安全沙箱的命令执行。
4分层策略:
5- SHELL_READONLY: 只允许预定义安全命令白名单
6- SHELL_STANDARD: 允许任意命令,超时+沙箱目录限制
7- SHELL_FULL: 无限制(需二次确认)
8"""
10from __future__ import annotations
12import os
13import re
14import signal
15import subprocess
16import tempfile
17import shlex
18from dataclasses import dataclass, field
19from typing import Optional
21from agentos.system.permissions import (
22 SystemPermissionManager,
23 PermissionTier,
24 PermissionDenied,
25)
28# ── Shell 策略 ─────────────────────────────────────────────────
31@dataclass
32class ShellPolicy:
33 """Shell 执行策略。"""
34 allowed_commands: list[str] = field(default_factory=list) # 命令白名单
35 blocked_commands: list[str] = field(default_factory=list) # 命令黑名单
36 blocked_patterns: list[str] = field(default_factory=list) # 参数黑名单模式
37 timeout_seconds: int = 30 # 超时时间
38 max_output_bytes: int = 1024 * 100 # 最大输出字节
39 sandbox_dir: str = "" # 沙箱工作目录
40 allow_pipes: bool = False # 是否允许管道
41 allow_redirects: bool = False # 是否允许重定向
44@dataclass
45class ShellResult:
46 """Shell 执行结果。"""
47 success: bool
48 command: str
49 stdout: str
50 stderr: str
51 exit_code: int = -1
52 duration_ms: float = 0
53 timeout: bool = False
54 permission_denied: bool = False
55 error: str = ""
58# ── 预设策略 ───────────────────────────────────────────────────
60READONLY_POLICY = ShellPolicy(
61 allowed_commands=[
62 "ls", "cat", "head", "tail", "find", "ps", "df", "du",
63 "whoami", "pwd", "env", "echo", "date", "wc", "stat",
64 "file", "which", "uname", "uptime", "id", "groups",
65 "free", "top", "grep", "awk", "sed", "sort", "uniq",
66 "cut", "tr", "tee", "xargs", "basename", "dirname",
67 "readlink", "realpath", "md5sum", "sha256sum", "diff",
68 "curl", "wget", "ping", "hostname", "ip", "ss",
69 "python3", "python", "pip", "pip3", "git", "node", "npm",
70 ],
71 timeout_seconds=30,
72 allow_pipes=True,
73 allow_redirects=False,
74)
76STANDARD_POLICY = ShellPolicy(
77 blocked_commands=[
78 "rm", "shutdown", "reboot", "halt", "poweroff",
79 "mkfs", "dd", "fdisk", "parted", "mount", "umount",
80 "chmod", "chown", "useradd", "userdel", "passwd",
81 "iptables", "ufw", "systemctl", "service",
82 ],
83 blocked_patterns=[
84 r"rm\s+(-rf?|--recursive)\s+/", # rm -rf /
85 r">\s*/dev/", # 覆盖设备
86 r"mkfs\.", # 格式化
87 r"dd\s+if=", # dd 操作
88 r"curl.*\|.*sh", # curl pipe sh
89 r"wget.*\|.*sh", # wget pipe sh
90 r">\s*/etc/", # 写入 /etc/
91 ],
92 timeout_seconds=60,
93 max_output_bytes=1024 * 500,
94 allow_pipes=True,
95 allow_redirects=True,
96)
98FULL_POLICY = ShellPolicy(
99 timeout_seconds=300,
100 max_output_bytes=1024 * 1024 * 10,
101 allow_pipes=True,
102 allow_redirects=True,
103)
106# ── Shell 沙箱 ─────────────────────────────────────────────────
109class ShellSandbox:
110 """Shell 沙箱 — 隔离命令执行环境。
112 特性:
113 - 临时工作目录隔离
114 - 环境变量过滤(移除敏感变量)
115 - 资源限制(超时、输出大小)
116 - 进程组管理(确保超时时子进程也被杀死)
117 """
119 def __init__(self, work_dir: str | None = None):
120 self._work_dir = work_dir or tempfile.mkdtemp(prefix="agentos_shell_")
122 @property
123 def work_dir(self) -> str:
124 return self._work_dir
126 def filtered_env(self) -> dict[str, str]:
127 """返回过滤后的环境变量(移除敏感变量)。"""
128 blocked = {"AWS_", "SECRET_", "TOKEN", "PASSWORD", "PASSWD",
129 "KEY", "CREDENTIAL", "PRIVATE", "CERT", "AUTH"}
130 env = {}
131 for k, v in os.environ.items():
132 if not any(b in k.upper() for b in blocked):
133 env[k] = v
134 # 设置安全默认值
135 env["HOME"] = self._work_dir
136 env["PATH"] = os.environ.get("PATH", "/usr/local/bin:/usr/bin:/bin")
137 return env
139 def cleanup(self) -> None:
140 """清理沙箱目录。"""
141 import shutil
142 try:
143 shutil.rmtree(self._work_dir, ignore_errors=True)
144 except Exception:
145 pass
148# ── Shell 执行器 ────────────────────────────────────────────────
151class ShellExecutor:
152 """Shell 执行器 — 带策略和权限检查的命令执行。"""
154 def __init__(self, perm_manager: SystemPermissionManager, session_id: str):
155 self._pm = perm_manager
156 self._sid = session_id
157 self._sandboxes: dict[str, ShellSandbox] = {}
159 # ── 沙箱管理 ──
161 def create_sandbox(self, name: str = "default") -> ShellSandbox:
162 """创建命名沙箱。"""
163 sb = ShellSandbox()
164 self._sandboxes[name] = sb
165 return sb
167 def get_sandbox(self, name: str = "default") -> ShellSandbox:
168 """获取或创建沙箱。"""
169 if name not in self._sandboxes:
170 return self.create_sandbox(name)
171 return self._sandboxes[name]
173 def cleanup_sandbox(self, name: str = "default") -> None:
174 """清理指定沙箱。"""
175 sb = self._sandboxes.pop(name, None)
176 if sb:
177 sb.cleanup()
179 def cleanup_all(self) -> None:
180 for sb in list(self._sandboxes.values()):
181 sb.cleanup()
182 self._sandboxes.clear()
184 # ── 命令执行 ──
186 def execute(self, command: str, sandbox_name: str = "default") -> ShellResult:
187 """执行 Shell 命令,自动选择策略。"""
188 # 选择策略
189 try:
190 self._pm.require(self._sid, PermissionTier.SHELL_FULL, command)
191 policy = FULL_POLICY
192 tier = PermissionTier.SHELL_FULL
193 except PermissionDenied:
194 try:
195 self._pm.require(self._sid, PermissionTier.SHELL_STANDARD, command)
196 policy = STANDARD_POLICY
197 tier = PermissionTier.SHELL_STANDARD
198 except PermissionDenied:
199 try:
200 self._pm.require(self._sid, PermissionTier.SHELL_READONLY, command)
201 policy = READONLY_POLICY
202 tier = PermissionTier.SHELL_READONLY
203 except PermissionDenied as e:
204 return ShellResult(
205 success=False, command=command,
206 stdout="", stderr="", permission_denied=True, error=str(e),
207 )
209 return self._execute_with_policy(command, policy, sandbox_name)
211 def execute_checked(self, command: str, required_tier: PermissionTier,
212 sandbox_name: str = "default") -> ShellResult:
213 """以指定权限级别执行命令。"""
214 self._pm.require(self._sid, required_tier, command)
215 if required_tier == PermissionTier.SHELL_READONLY:
216 policy = READONLY_POLICY
217 elif required_tier == PermissionTier.SHELL_STANDARD:
218 policy = STANDARD_POLICY
219 else:
220 policy = FULL_POLICY
221 return self._execute_with_policy(command, policy, sandbox_name)
223 # ── 内部实现 ──
225 def _execute_with_policy(self, command: str, policy: ShellPolicy,
226 sandbox_name: str) -> ShellResult:
227 """按策略执行命令。"""
228 import time
230 # 安全检查
231 safety_check = self._safety_check(command, policy)
232 if safety_check:
233 return ShellResult(
234 success=False, command=command,
235 stdout="", stderr=safety_check, error=safety_check,
236 )
238 sandbox = self.get_sandbox(sandbox_name)
240 try:
241 t0 = time.time()
242 proc = subprocess.Popen(
243 command,
244 shell=True,
245 stdout=subprocess.PIPE,
246 stderr=subprocess.PIPE,
247 cwd=sandbox.work_dir,
248 env=sandbox.filtered_env(),
249 preexec_fn=os.setsid, # 创建新进程组,便于超时杀子进程
250 text=True,
251 )
253 try:
254 stdout, stderr = proc.communicate(timeout=policy.timeout_seconds)
255 exit_code = proc.returncode
256 timeout = False
257 except subprocess.TimeoutExpired:
258 # 杀死整个进程组
259 os.killpg(os.getpgid(proc.pid), signal.SIGTERM)
260 try:
261 stdout, stderr = proc.communicate(timeout=5)
262 except subprocess.TimeoutExpired:
263 os.killpg(os.getpgid(proc.pid), signal.SIGKILL)
264 stdout, stderr = proc.communicate()
265 exit_code = -1
266 timeout = True
268 duration_ms = (time.time() - t0) * 1000
270 # 截断过大的输出
271 stdout = self._truncate(stdout, policy.max_output_bytes)
272 stderr = self._truncate(stderr, policy.max_output_bytes)
274 return ShellResult(
275 success=(exit_code == 0 and not timeout),
276 command=command,
277 stdout=stdout,
278 stderr=stderr,
279 exit_code=exit_code,
280 duration_ms=duration_ms,
281 timeout=timeout,
282 )
284 except Exception as e:
285 return ShellResult(
286 success=False, command=command,
287 stdout="", stderr="", error=str(e),
288 )
290 def _safety_check(self, command: str, policy: ShellPolicy) -> str:
291 """安全检查,返回错误信息或空字符串。"""
292 # 提取主命令
293 cmd_parts = shlex.split(command) if command else []
294 if not cmd_parts:
295 return "空命令"
297 main_cmd = os.path.basename(cmd_parts[0])
299 # 白名单检查
300 if policy.allowed_commands:
301 if main_cmd not in policy.allowed_commands and cmd_parts[0] not in policy.allowed_commands:
302 return f"命令 '{main_cmd}' 不在允许列表中。允许的命令: {', '.join(policy.allowed_commands[:20])}"
304 # 黑名单检查
305 if policy.blocked_commands:
306 if main_cmd in policy.blocked_commands or cmd_parts[0] in policy.blocked_commands:
307 return f"命令 '{main_cmd}' 已被阻止。被阻止的命令: {', '.join(policy.blocked_commands)}"
309 # 危险模式检查
310 for pattern in policy.blocked_patterns:
311 if re.search(pattern, command):
312 return f"命令包含危险模式: {pattern}"
314 # 管道检查
315 if not policy.allow_pipes and "|" in command:
316 return "管道操作不被允许"
318 # 重定向检查
319 if not policy.allow_redirects and re.search(r"[<>]", command):
320 return "重定向操作不被允许"
322 return ""
324 @staticmethod
325 def _truncate(text: str, max_bytes: int) -> str:
326 """截断文本到指定字节数。"""
327 encoded = text.encode("utf-8")
328 if len(encoded) <= max_bytes:
329 return text
330 truncated = encoded[:max_bytes].decode("utf-8", errors="replace")
331 return truncated + f"\n... [截断: {len(encoded)} → {max_bytes} 字节]"