Coverage for agentos/security/sandbox_executor.py: 25%
266 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2AgentOS v1.2.1 — 沙箱执行器。
4基因来源: OpenHands Docker Sandbox + Claude Code subprocess isolation
6提供真正的代码/命令隔离执行能力:
7- Process模式: 子进程隔离(轻量,零依赖)
8- Docker模式: 容器隔离(强隔离,需Docker)
9- 资源限制:内存、CPU、时间、磁盘
10- 文件桥接:自动复制输入文件到沙箱,提取输出文件
11- 与 CodeAgent / ToolOrchestrator 集成
12"""
14from __future__ import annotations
16import asyncio
17import os
18import shutil
19import subprocess
20import tempfile
21import time
22import uuid
23from dataclasses import dataclass, field
24from enum import Enum
25from typing import Any, Dict, List, Optional
28# ── 枚举与配置 ───────────────────────────────────
30class SandboxMode(str, Enum):
32 """沙箱模式枚举。"""
34 DOCKER = "docker"
35 PROCESS = "process"
36 NONE = "none" # 直接在当前进程执行(不安全,仅调试用)
39@dataclass
40class SandboxConfig:
41 """沙箱执行配置"""
42 mode: SandboxMode = SandboxMode.PROCESS
43 memory_limit_mb: int = 256
44 cpu_limit: float = 1.0 # CPU 核心数上限
45 timeout_seconds: float = 30.0
46 max_output_bytes: int = 1_000_000 # stdout+stderr 上限
47 network_enabled: bool = False
48 writable_root: bool = False # root 是否可写(Docker模式)
49 docker_image: str = "python:3.11-slim"
50 container_name_prefix: str = "agentos-sandbox-"
51 env_vars: Dict[str, str] = field(default_factory=dict)
54# ── 执行结果 ────────────────────────────────────
56@dataclass
57class SandboxResult:
58 """沙箱执行结果"""
59 success: bool
60 exit_code: int = 0
61 stdout: str = ""
62 stderr: str = ""
63 output_files: Dict[str, str] = field(default_factory=dict) # 文件名→本地路径
64 duration_ms: float = 0.0
65 truncated: bool = False
66 error: Optional[str] = None
69# ── Process 沙箱 ────────────────────────────────
71class ProcessSandbox:
72 """进程级隔离沙箱。使用 subprocess + 临时目录隔离文件系统。"""
74 def __init__(self, config: SandboxConfig | None = None):
75 self.config = config or SandboxConfig()
76 self._work_dir: Optional[str] = None
78 def setup(self) -> str:
79 """创建隔离的工作目录。返回沙箱目录路径。"""
80 self._work_dir = tempfile.mkdtemp(prefix="agentos-sandbox-")
81 return self._work_dir
83 def copy_in(self, src: str, dst_filename: str | None = None) -> str:
84 """将外部文件复制到沙箱内。返回沙箱内路径。"""
85 if not self._work_dir:
86 self.setup()
87 fname = dst_filename or os.path.basename(src)
88 dst = os.path.join(self._work_dir, fname)
89 if os.path.isfile(src):
90 shutil.copy2(src, dst)
91 elif os.path.isdir(src):
92 shutil.copytree(src, dst, dirs_exist_ok=True)
93 return dst
95 def copy_out(self, sandbox_path: str, local_path: str) -> str:
96 """将沙箱内文件复制到外部。"""
97 os.makedirs(os.path.dirname(local_path) or ".", exist_ok=True)
98 if os.path.isfile(sandbox_path):
99 shutil.copy2(sandbox_path, local_path)
100 return local_path
102 def collect_output_files(self, patterns: List[str] | None = None) -> Dict[str, str]:
103 """收集沙箱内生成的文件(按扩展名匹配),复制到本地临时目录。
105 Args:
106 patterns: 文件扩展名或glob模式列表,如 ['.json', '.csv', '.png']。
107 None 则收集所有非目录文件。
109 Returns:
110 {沙箱内文件名: 本地临时路径}
111 """
112 if not self._work_dir:
113 return {}
114 output_dir = tempfile.mkdtemp(prefix="agentos-output-")
115 result: Dict[str, str] = {}
116 for root, dirs, files in os.walk(self._work_dir):
117 for fname in files:
118 match = True
119 if patterns:
120 match = any(fname.endswith(p) or fname == p for p in patterns)
121 if match:
122 src = os.path.join(root, fname)
123 rel = os.path.relpath(src, self._work_dir)
124 dst = os.path.join(output_dir, rel)
125 os.makedirs(os.path.dirname(dst), exist_ok=True)
126 shutil.copy2(src, dst)
127 result[fname] = dst
128 return result
130 def execute_code(
131 self,
132 code: str,
133 language: str = "python",
134 input_files: Dict[str, str] | None = None,
135 ) -> SandboxResult:
136 """在沙箱中执行代码。
138 Args:
139 code: 代码字符串
140 language: python | bash
141 input_files: {文件名: 外部路径} 输入文件映射
142 """
143 start = time.monotonic()
144 if not self._work_dir:
145 self.setup()
147 # 复制输入文件
148 if input_files:
149 for fname, src_path in input_files.items():
150 self.copy_in(src_path, fname)
152 if language == "python":
153 script_path = os.path.join(self._work_dir, "_sandbox_script.py")
154 with open(script_path, "w") as f:
155 f.write(code)
156 cmd = [self._get_python(), script_path]
157 elif language == "bash":
158 script_path = os.path.join(self._work_dir, "_sandbox_script.sh")
159 with open(script_path, "w") as f:
160 f.write("#!/bin/bash\nset -e\n" + code)
161 os.chmod(script_path, 0o755)
162 cmd = ["bash", script_path]
163 else:
164 return SandboxResult(success=False, error=f"Unsupported language: {language}")
166 return self._run_subprocess(cmd)
168 def execute_command(self, command: str | List[str]) -> SandboxResult:
169 """在沙箱中执行命令。"""
170 start = time.monotonic()
171 if not self._work_dir:
172 self.setup()
173 if isinstance(command, str):
174 cmd = ["bash", "-c", command]
175 else:
176 cmd = list(command)
177 return self._run_subprocess(cmd)
179 def _run_subprocess(self, cmd: List[str]) -> SandboxResult:
180 start = time.monotonic()
181 env = os.environ.copy()
182 env.update(self.config.env_vars)
183 # 网络隔离
184 if not self.config.network_enabled:
185 env["http_proxy"] = ""
186 env["https_proxy"] = ""
187 env["HTTP_PROXY"] = ""
188 env["HTTPS_PROXY"] = ""
190 try:
191 proc = subprocess.run(
192 cmd,
193 cwd=self._work_dir,
194 env=env,
195 capture_output=True,
196 timeout=self.config.timeout_seconds,
197 text=True,
198 )
199 stdout = proc.stdout or ""
200 stderr = proc.stderr or ""
201 truncated = False
203 if len(stdout) > self.config.max_output_bytes:
204 stdout = stdout[:self.config.max_output_bytes] + "\n... [stdout truncated]"
205 truncated = True
206 if len(stderr) > self.config.max_output_bytes:
207 stderr = stderr[:self.config.max_output_bytes] + "\n... [stderr truncated]"
208 truncated = True
210 duration = (time.monotonic() - start) * 1000
211 return SandboxResult(
212 success=(proc.returncode == 0),
213 exit_code=proc.returncode,
214 stdout=stdout,
215 stderr=stderr,
216 duration_ms=duration,
217 truncated=truncated,
218 )
219 except subprocess.TimeoutExpired as e:
220 duration = (time.monotonic() - start) * 1000
221 return SandboxResult(
222 success=False,
223 exit_code=-1,
224 stdout=e.stdout or "" if e.stdout else "",
225 stderr=e.stderr or "Timeout: execution exceeded limit" if e.stderr else "Timeout: execution exceeded limit",
226 duration_ms=duration,
227 error=f"Timeout after {self.config.timeout_seconds}s",
228 )
229 except Exception as e:
230 duration = (time.monotonic() - start) * 1000
231 return SandboxResult(success=False, duration_ms=duration, error=str(e))
233 @staticmethod
234 def _get_python() -> str:
235 return shutil.which("python3") or shutil.which("python") or "python3"
237 def cleanup(self):
238 if self._work_dir and os.path.isdir(self._work_dir):
239 shutil.rmtree(self._work_dir, ignore_errors=True)
240 self._work_dir = None
243# ── Docker 沙箱 ────────────────────────────────
245class DockerSandbox:
246 """Docker 容器隔离沙箱。更强的隔离性和可重现性。"""
248 def __init__(self, config: SandboxConfig | None = None):
249 self.config = config or SandboxConfig(mode=SandboxMode.DOCKER)
250 self._container_id: Optional[str] = None
251 self._host_work_dir: Optional[str] = None
253 def setup(self) -> str:
254 """创建并启动 Docker 容器。返回沙箱目录路径。"""
255 self._host_work_dir = tempfile.mkdtemp(prefix="agentos-docker-")
256 container_name = f"{self.config.container_name_prefix}{uuid.uuid4().hex[:8]}"
258 cmd = [
259 "docker", "run", "-d", "--rm",
260 "--name", container_name,
261 f"--memory={self.config.memory_limit_mb}m",
262 f"--cpus={self.config.cpu_limit}",
263 "-v", f"{self._host_work_dir}:/workspace",
264 "-w", "/workspace",
265 ]
266 if not self.config.network_enabled:
267 cmd.append("--network=none")
268 if self.config.writable_root:
269 cmd.append("--read-only=false")
270 else:
271 cmd.append("--read-only")
272 cmd.append("--tmpfs=/tmp:exec")
274 cmd.extend(["sleep", "infinity"])
275 cmd.append(self.config.docker_image)
277 result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
278 if result.returncode != 0:
279 raise RuntimeError(f"Docker setup failed: {result.stderr}")
281 self._container_id = result.stdout.strip()[:12]
282 return self._host_work_dir
284 def copy_in(self, src: str, dst_filename: str | None = None) -> str:
285 if not self._container_id:
286 self.setup()
287 fname = dst_filename or os.path.basename(src)
288 subprocess.run(
289 ["docker", "cp", src, f"{self._container_id}:/workspace/{fname}"],
290 check=True, capture_output=True, timeout=10,
291 )
292 return os.path.join(self._host_work_dir, fname)
294 def execute_code(
295 self,
296 code: str,
297 language: str = "python",
298 input_files: Dict[str, str] | None = None,
299 ) -> SandboxResult:
300 if not self._container_id:
301 self.setup()
303 if input_files:
304 for fname, src_path in input_files.items():
305 self.copy_in(src_path, fname)
307 if language == "python":
308 script = "_sandbox_script.py"
309 script_path = os.path.join(self._host_work_dir, script)
310 with open(script_path, "w") as f:
311 f.write(code)
312 cmd = ["docker", "exec", self._container_id, "python3", f"/workspace/{script}"]
313 elif language == "bash":
314 script = "_sandbox_script.sh"
315 script_path = os.path.join(self._host_work_dir, script)
316 with open(script_path, "w") as f:
317 f.write("#!/bin/bash\nset -e\n" + code)
318 subprocess.run(["chmod", "+x", script_path], check=False)
319 cmd = ["docker", "exec", self._container_id, "bash", f"/workspace/{script}"]
320 else:
321 return SandboxResult(success=False, error=f"Unsupported language: {language}")
323 return self._run_docker(cmd)
325 def execute_command(self, command: str | List[str]) -> SandboxResult:
326 if not self._container_id:
327 self.setup()
328 if isinstance(command, str):
329 cmd = ["docker", "exec", self._container_id, "bash", "-c", command]
330 else:
331 cmd = ["docker", "exec", self._container_id] + list(command)
332 return self._run_docker(cmd)
334 def _run_docker(self, cmd: List[str]) -> SandboxResult:
335 start = time.monotonic()
336 try:
337 proc = subprocess.run(
338 cmd, capture_output=True, text=True,
339 timeout=self.config.timeout_seconds,
340 )
341 duration = (time.monotonic() - start) * 1000
342 stdout = proc.stdout or ""
343 stderr = proc.stderr or ""
344 truncated = False
346 if len(stdout) > self.config.max_output_bytes:
347 stdout = stdout[:self.config.max_output_bytes] + "\n... [truncated]"
348 truncated = True
350 return SandboxResult(
351 success=(proc.returncode == 0),
352 exit_code=proc.returncode,
353 stdout=stdout,
354 stderr=stderr,
355 duration_ms=duration,
356 truncated=truncated,
357 )
358 except subprocess.TimeoutExpired:
359 if self._container_id:
360 subprocess.run(["docker", "kill", self._container_id], capture_output=True)
361 return SandboxResult(
362 success=False, exit_code=-1,
363 error=f"Timeout after {self.config.timeout_seconds}s",
364 duration_ms=(time.monotonic() - start) * 1000,
365 )
366 except Exception as e:
367 return SandboxResult(
368 success=False,
369 error=str(e),
370 duration_ms=(time.monotonic() - start) * 1000,
371 )
373 def collect_output_files(self, patterns: List[str] | None = None) -> Dict[str, str]:
374 if not self._host_work_dir:
375 return {}
376 output_dir = tempfile.mkdtemp(prefix="agentos-output-")
377 result: Dict[str, str] = {}
378 for root, dirs, files in os.walk(self._host_work_dir):
379 for fname in files:
380 if fname.startswith("_sandbox"):
381 continue
382 match = True
383 if patterns:
384 match = any(fname.endswith(p) for p in patterns)
385 if match:
386 src = os.path.join(root, fname)
387 dst = os.path.join(output_dir, fname)
388 shutil.copy2(src, dst)
389 result[fname] = dst
390 return result
392 def cleanup(self):
393 if self._container_id:
394 subprocess.run(["docker", "stop", self._container_id], capture_output=True, timeout=5)
395 self._container_id = None
396 if self._host_work_dir and os.path.isdir(self._host_work_dir):
397 shutil.rmtree(self._host_work_dir, ignore_errors=True)
398 self._host_work_dir = None
401# ── 统一沙箱执行器 ─────────────────────────────
403class SandboxExecutor:
404 """统一沙箱执行器。根据 SandboxConfig.mode 自动选择 Process/Docker。"""
406 def __init__(self, config: SandboxConfig | None = None):
407 self.config = config or SandboxConfig()
408 if self.config.mode == SandboxMode.DOCKER:
409 try:
410 self._sandbox: ProcessSandbox | DockerSandbox = DockerSandbox(self.config)
411 self._sandbox.setup()
412 except Exception:
413 # Docker 不可用时降级到 Process
414 self.config.mode = SandboxMode.PROCESS
415 self._sandbox = ProcessSandbox(self.config)
416 else:
417 self._sandbox = ProcessSandbox(self.config)
419 async def execute_code(
420 self,
421 code: str,
422 language: str = "python",
423 input_files: Dict[str, str] | None = None,
424 ) -> SandboxResult:
425 loop = asyncio.get_event_loop()
426 return await loop.run_in_executor(
427 None, self._sandbox.execute_code, code, language, input_files,
428 )
430 async def execute_command(self, command: str | List[str]) -> SandboxResult:
431 loop = asyncio.get_event_loop()
432 return await loop.run_in_executor(None, self._sandbox.execute_command, command)
434 def collect_output_files(self, patterns: List[str] | None = None) -> Dict[str, str]:
435 return self._sandbox.collect_output_files(patterns)
437 async def cleanup(self):
438 loop = asyncio.get_event_loop()
439 await loop.run_in_executor(None, self._sandbox.cleanup)
441 def __enter__(self):
442 self._sandbox.setup()
443 return self
445 def __exit__(self, *args):
446 self._sandbox.cleanup()