Coverage for agentos/security/sandbox_executor.py: 25%

266 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2AgentOS v1.2.1 — 沙箱执行器。 

3 

4基因来源: OpenHands Docker Sandbox + Claude Code subprocess isolation 

5 

6提供真正的代码/命令隔离执行能力: 

7- Process模式: 子进程隔离(轻量,零依赖) 

8- Docker模式: 容器隔离(强隔离,需Docker) 

9- 资源限制:内存、CPU、时间、磁盘 

10- 文件桥接:自动复制输入文件到沙箱,提取输出文件 

11- 与 CodeAgent / ToolOrchestrator 集成 

12""" 

13 

14from __future__ import annotations 

15 

16import asyncio 

17import os 

18import shutil 

19import subprocess 

20import tempfile 

21import time 

22import uuid 

23from dataclasses import dataclass, field 

24from enum import Enum 

25from typing import Any, Dict, List, Optional 

26 

27 

28# ── 枚举与配置 ─────────────────────────────────── 

29 

30class SandboxMode(str, Enum): 

31 

32 """沙箱模式枚举。""" 

33 

34 DOCKER = "docker" 

35 PROCESS = "process" 

36 NONE = "none" # 直接在当前进程执行(不安全,仅调试用) 

37 

38 

39@dataclass 

40class SandboxConfig: 

41 """沙箱执行配置""" 

42 mode: SandboxMode = SandboxMode.PROCESS 

43 memory_limit_mb: int = 256 

44 cpu_limit: float = 1.0 # CPU 核心数上限 

45 timeout_seconds: float = 30.0 

46 max_output_bytes: int = 1_000_000 # stdout+stderr 上限 

47 network_enabled: bool = False 

48 writable_root: bool = False # root 是否可写(Docker模式) 

49 docker_image: str = "python:3.11-slim" 

50 container_name_prefix: str = "agentos-sandbox-" 

51 env_vars: Dict[str, str] = field(default_factory=dict) 

52 

53 

54# ── 执行结果 ──────────────────────────────────── 

55 

56@dataclass 

57class SandboxResult: 

58 """沙箱执行结果""" 

59 success: bool 

60 exit_code: int = 0 

61 stdout: str = "" 

62 stderr: str = "" 

63 output_files: Dict[str, str] = field(default_factory=dict) # 文件名→本地路径 

64 duration_ms: float = 0.0 

65 truncated: bool = False 

66 error: Optional[str] = None 

67 

68 

69# ── Process 沙箱 ──────────────────────────────── 

70 

71class ProcessSandbox: 

72 """进程级隔离沙箱。使用 subprocess + 临时目录隔离文件系统。""" 

73 

74 def __init__(self, config: SandboxConfig | None = None): 

75 self.config = config or SandboxConfig() 

76 self._work_dir: Optional[str] = None 

77 

78 def setup(self) -> str: 

79 """创建隔离的工作目录。返回沙箱目录路径。""" 

80 self._work_dir = tempfile.mkdtemp(prefix="agentos-sandbox-") 

81 return self._work_dir 

82 

83 def copy_in(self, src: str, dst_filename: str | None = None) -> str: 

84 """将外部文件复制到沙箱内。返回沙箱内路径。""" 

85 if not self._work_dir: 

86 self.setup() 

87 fname = dst_filename or os.path.basename(src) 

88 dst = os.path.join(self._work_dir, fname) 

89 if os.path.isfile(src): 

90 shutil.copy2(src, dst) 

91 elif os.path.isdir(src): 

92 shutil.copytree(src, dst, dirs_exist_ok=True) 

93 return dst 

94 

95 def copy_out(self, sandbox_path: str, local_path: str) -> str: 

96 """将沙箱内文件复制到外部。""" 

97 os.makedirs(os.path.dirname(local_path) or ".", exist_ok=True) 

98 if os.path.isfile(sandbox_path): 

99 shutil.copy2(sandbox_path, local_path) 

100 return local_path 

101 

102 def collect_output_files(self, patterns: List[str] | None = None) -> Dict[str, str]: 

103 """收集沙箱内生成的文件(按扩展名匹配),复制到本地临时目录。 

104 

105 Args: 

106 patterns: 文件扩展名或glob模式列表,如 ['.json', '.csv', '.png']。 

107 None 则收集所有非目录文件。 

108 

109 Returns: 

110 {沙箱内文件名: 本地临时路径} 

111 """ 

112 if not self._work_dir: 

113 return {} 

114 output_dir = tempfile.mkdtemp(prefix="agentos-output-") 

115 result: Dict[str, str] = {} 

116 for root, dirs, files in os.walk(self._work_dir): 

117 for fname in files: 

118 match = True 

119 if patterns: 

120 match = any(fname.endswith(p) or fname == p for p in patterns) 

121 if match: 

122 src = os.path.join(root, fname) 

123 rel = os.path.relpath(src, self._work_dir) 

124 dst = os.path.join(output_dir, rel) 

125 os.makedirs(os.path.dirname(dst), exist_ok=True) 

126 shutil.copy2(src, dst) 

127 result[fname] = dst 

128 return result 

129 

130 def execute_code( 

131 self, 

132 code: str, 

133 language: str = "python", 

134 input_files: Dict[str, str] | None = None, 

135 ) -> SandboxResult: 

136 """在沙箱中执行代码。 

137 

138 Args: 

139 code: 代码字符串 

140 language: python | bash 

141 input_files: {文件名: 外部路径} 输入文件映射 

142 """ 

143 start = time.monotonic() 

144 if not self._work_dir: 

145 self.setup() 

146 

147 # 复制输入文件 

148 if input_files: 

149 for fname, src_path in input_files.items(): 

150 self.copy_in(src_path, fname) 

151 

152 if language == "python": 

153 script_path = os.path.join(self._work_dir, "_sandbox_script.py") 

154 with open(script_path, "w") as f: 

155 f.write(code) 

156 cmd = [self._get_python(), script_path] 

157 elif language == "bash": 

158 script_path = os.path.join(self._work_dir, "_sandbox_script.sh") 

159 with open(script_path, "w") as f: 

160 f.write("#!/bin/bash\nset -e\n" + code) 

161 os.chmod(script_path, 0o755) 

162 cmd = ["bash", script_path] 

163 else: 

164 return SandboxResult(success=False, error=f"Unsupported language: {language}") 

165 

166 return self._run_subprocess(cmd) 

167 

168 def execute_command(self, command: str | List[str]) -> SandboxResult: 

169 """在沙箱中执行命令。""" 

170 start = time.monotonic() 

171 if not self._work_dir: 

172 self.setup() 

173 if isinstance(command, str): 

174 cmd = ["bash", "-c", command] 

175 else: 

176 cmd = list(command) 

177 return self._run_subprocess(cmd) 

178 

179 def _run_subprocess(self, cmd: List[str]) -> SandboxResult: 

180 start = time.monotonic() 

181 env = os.environ.copy() 

182 env.update(self.config.env_vars) 

183 # 网络隔离 

184 if not self.config.network_enabled: 

185 env["http_proxy"] = "" 

186 env["https_proxy"] = "" 

187 env["HTTP_PROXY"] = "" 

188 env["HTTPS_PROXY"] = "" 

189 

190 try: 

191 proc = subprocess.run( 

192 cmd, 

193 cwd=self._work_dir, 

194 env=env, 

195 capture_output=True, 

196 timeout=self.config.timeout_seconds, 

197 text=True, 

198 ) 

199 stdout = proc.stdout or "" 

200 stderr = proc.stderr or "" 

201 truncated = False 

202 

203 if len(stdout) > self.config.max_output_bytes: 

204 stdout = stdout[:self.config.max_output_bytes] + "\n... [stdout truncated]" 

205 truncated = True 

206 if len(stderr) > self.config.max_output_bytes: 

207 stderr = stderr[:self.config.max_output_bytes] + "\n... [stderr truncated]" 

208 truncated = True 

209 

210 duration = (time.monotonic() - start) * 1000 

211 return SandboxResult( 

212 success=(proc.returncode == 0), 

213 exit_code=proc.returncode, 

214 stdout=stdout, 

215 stderr=stderr, 

216 duration_ms=duration, 

217 truncated=truncated, 

218 ) 

219 except subprocess.TimeoutExpired as e: 

220 duration = (time.monotonic() - start) * 1000 

221 return SandboxResult( 

222 success=False, 

223 exit_code=-1, 

224 stdout=e.stdout or "" if e.stdout else "", 

225 stderr=e.stderr or "Timeout: execution exceeded limit" if e.stderr else "Timeout: execution exceeded limit", 

226 duration_ms=duration, 

227 error=f"Timeout after {self.config.timeout_seconds}s", 

228 ) 

229 except Exception as e: 

230 duration = (time.monotonic() - start) * 1000 

231 return SandboxResult(success=False, duration_ms=duration, error=str(e)) 

232 

233 @staticmethod 

234 def _get_python() -> str: 

235 return shutil.which("python3") or shutil.which("python") or "python3" 

236 

237 def cleanup(self): 

238 if self._work_dir and os.path.isdir(self._work_dir): 

239 shutil.rmtree(self._work_dir, ignore_errors=True) 

240 self._work_dir = None 

241 

242 

243# ── Docker 沙箱 ──────────────────────────────── 

244 

245class DockerSandbox: 

246 """Docker 容器隔离沙箱。更强的隔离性和可重现性。""" 

247 

248 def __init__(self, config: SandboxConfig | None = None): 

249 self.config = config or SandboxConfig(mode=SandboxMode.DOCKER) 

250 self._container_id: Optional[str] = None 

251 self._host_work_dir: Optional[str] = None 

252 

253 def setup(self) -> str: 

254 """创建并启动 Docker 容器。返回沙箱目录路径。""" 

255 self._host_work_dir = tempfile.mkdtemp(prefix="agentos-docker-") 

256 container_name = f"{self.config.container_name_prefix}{uuid.uuid4().hex[:8]}" 

257 

258 cmd = [ 

259 "docker", "run", "-d", "--rm", 

260 "--name", container_name, 

261 f"--memory={self.config.memory_limit_mb}m", 

262 f"--cpus={self.config.cpu_limit}", 

263 "-v", f"{self._host_work_dir}:/workspace", 

264 "-w", "/workspace", 

265 ] 

266 if not self.config.network_enabled: 

267 cmd.append("--network=none") 

268 if self.config.writable_root: 

269 cmd.append("--read-only=false") 

270 else: 

271 cmd.append("--read-only") 

272 cmd.append("--tmpfs=/tmp:exec") 

273 

274 cmd.extend(["sleep", "infinity"]) 

275 cmd.append(self.config.docker_image) 

276 

277 result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) 

278 if result.returncode != 0: 

279 raise RuntimeError(f"Docker setup failed: {result.stderr}") 

280 

281 self._container_id = result.stdout.strip()[:12] 

282 return self._host_work_dir 

283 

284 def copy_in(self, src: str, dst_filename: str | None = None) -> str: 

285 if not self._container_id: 

286 self.setup() 

287 fname = dst_filename or os.path.basename(src) 

288 subprocess.run( 

289 ["docker", "cp", src, f"{self._container_id}:/workspace/{fname}"], 

290 check=True, capture_output=True, timeout=10, 

291 ) 

292 return os.path.join(self._host_work_dir, fname) 

293 

294 def execute_code( 

295 self, 

296 code: str, 

297 language: str = "python", 

298 input_files: Dict[str, str] | None = None, 

299 ) -> SandboxResult: 

300 if not self._container_id: 

301 self.setup() 

302 

303 if input_files: 

304 for fname, src_path in input_files.items(): 

305 self.copy_in(src_path, fname) 

306 

307 if language == "python": 

308 script = "_sandbox_script.py" 

309 script_path = os.path.join(self._host_work_dir, script) 

310 with open(script_path, "w") as f: 

311 f.write(code) 

312 cmd = ["docker", "exec", self._container_id, "python3", f"/workspace/{script}"] 

313 elif language == "bash": 

314 script = "_sandbox_script.sh" 

315 script_path = os.path.join(self._host_work_dir, script) 

316 with open(script_path, "w") as f: 

317 f.write("#!/bin/bash\nset -e\n" + code) 

318 subprocess.run(["chmod", "+x", script_path], check=False) 

319 cmd = ["docker", "exec", self._container_id, "bash", f"/workspace/{script}"] 

320 else: 

321 return SandboxResult(success=False, error=f"Unsupported language: {language}") 

322 

323 return self._run_docker(cmd) 

324 

325 def execute_command(self, command: str | List[str]) -> SandboxResult: 

326 if not self._container_id: 

327 self.setup() 

328 if isinstance(command, str): 

329 cmd = ["docker", "exec", self._container_id, "bash", "-c", command] 

330 else: 

331 cmd = ["docker", "exec", self._container_id] + list(command) 

332 return self._run_docker(cmd) 

333 

334 def _run_docker(self, cmd: List[str]) -> SandboxResult: 

335 start = time.monotonic() 

336 try: 

337 proc = subprocess.run( 

338 cmd, capture_output=True, text=True, 

339 timeout=self.config.timeout_seconds, 

340 ) 

341 duration = (time.monotonic() - start) * 1000 

342 stdout = proc.stdout or "" 

343 stderr = proc.stderr or "" 

344 truncated = False 

345 

346 if len(stdout) > self.config.max_output_bytes: 

347 stdout = stdout[:self.config.max_output_bytes] + "\n... [truncated]" 

348 truncated = True 

349 

350 return SandboxResult( 

351 success=(proc.returncode == 0), 

352 exit_code=proc.returncode, 

353 stdout=stdout, 

354 stderr=stderr, 

355 duration_ms=duration, 

356 truncated=truncated, 

357 ) 

358 except subprocess.TimeoutExpired: 

359 if self._container_id: 

360 subprocess.run(["docker", "kill", self._container_id], capture_output=True) 

361 return SandboxResult( 

362 success=False, exit_code=-1, 

363 error=f"Timeout after {self.config.timeout_seconds}s", 

364 duration_ms=(time.monotonic() - start) * 1000, 

365 ) 

366 except Exception as e: 

367 return SandboxResult( 

368 success=False, 

369 error=str(e), 

370 duration_ms=(time.monotonic() - start) * 1000, 

371 ) 

372 

373 def collect_output_files(self, patterns: List[str] | None = None) -> Dict[str, str]: 

374 if not self._host_work_dir: 

375 return {} 

376 output_dir = tempfile.mkdtemp(prefix="agentos-output-") 

377 result: Dict[str, str] = {} 

378 for root, dirs, files in os.walk(self._host_work_dir): 

379 for fname in files: 

380 if fname.startswith("_sandbox"): 

381 continue 

382 match = True 

383 if patterns: 

384 match = any(fname.endswith(p) for p in patterns) 

385 if match: 

386 src = os.path.join(root, fname) 

387 dst = os.path.join(output_dir, fname) 

388 shutil.copy2(src, dst) 

389 result[fname] = dst 

390 return result 

391 

392 def cleanup(self): 

393 if self._container_id: 

394 subprocess.run(["docker", "stop", self._container_id], capture_output=True, timeout=5) 

395 self._container_id = None 

396 if self._host_work_dir and os.path.isdir(self._host_work_dir): 

397 shutil.rmtree(self._host_work_dir, ignore_errors=True) 

398 self._host_work_dir = None 

399 

400 

401# ── 统一沙箱执行器 ───────────────────────────── 

402 

403class SandboxExecutor: 

404 """统一沙箱执行器。根据 SandboxConfig.mode 自动选择 Process/Docker。""" 

405 

406 def __init__(self, config: SandboxConfig | None = None): 

407 self.config = config or SandboxConfig() 

408 if self.config.mode == SandboxMode.DOCKER: 

409 try: 

410 self._sandbox: ProcessSandbox | DockerSandbox = DockerSandbox(self.config) 

411 self._sandbox.setup() 

412 except Exception: 

413 # Docker 不可用时降级到 Process 

414 self.config.mode = SandboxMode.PROCESS 

415 self._sandbox = ProcessSandbox(self.config) 

416 else: 

417 self._sandbox = ProcessSandbox(self.config) 

418 

419 async def execute_code( 

420 self, 

421 code: str, 

422 language: str = "python", 

423 input_files: Dict[str, str] | None = None, 

424 ) -> SandboxResult: 

425 loop = asyncio.get_event_loop() 

426 return await loop.run_in_executor( 

427 None, self._sandbox.execute_code, code, language, input_files, 

428 ) 

429 

430 async def execute_command(self, command: str | List[str]) -> SandboxResult: 

431 loop = asyncio.get_event_loop() 

432 return await loop.run_in_executor(None, self._sandbox.execute_command, command) 

433 

434 def collect_output_files(self, patterns: List[str] | None = None) -> Dict[str, str]: 

435 return self._sandbox.collect_output_files(patterns) 

436 

437 async def cleanup(self): 

438 loop = asyncio.get_event_loop() 

439 await loop.run_in_executor(None, self._sandbox.cleanup) 

440 

441 def __enter__(self): 

442 self._sandbox.setup() 

443 return self 

444 

445 def __exit__(self, *args): 

446 self._sandbox.cleanup()