Coverage for agentos/sandbox/__init__.py: 0%

258 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 16:36 +0800

1""" 

2AgentOS v1.14.6 — Self-Evolution Safe Sandbox. 

3 

4Docker-based isolated execution environment for agent self-improvement. 

5Allows agents to generate, test, and iterate on code/tools without 

6risking the host system. 

7 

8Features: 

9- Docker container isolation (network/filesystem/process) 

10- Resource limits (CPU, memory, disk, timeout) 

11- Execution result capture (stdout, stderr, exit code) 

12- Code safety validation (dangerous import blocklist) 

13- Snapshot/rollback (Docker commit-based) 

14- Rate limiting to prevent runaway loops 

15- Audit trail for all executed code 

16 

17Security layers: 

18 1. Docker namespace isolation 

19 2. Seccomp profile (restricted syscalls) 

20 3. Network disabled by default 

21 4. Read-only rootfs option 

22 5. tmpfs for writable scratch space 

23 6. Resource cgroups limits 

24 

25Inspired by: E2B, Open Interpreter sandbox, Modal 

26""" 

27 

28from __future__ import annotations 

29 

30import hashlib 

31import json 

32import logging 

33import os 

34import re 

35import subprocess 

36import tempfile 

37import time 

38import uuid 

39from dataclasses import dataclass, field 

40from enum import Enum 

41from pathlib import Path 

42from typing import ( 

43 Any, Dict, List, Optional, Tuple, 

44) 

45 

46logger = logging.getLogger(__name__) 

47 

48 

49# ── Types ─────────────────────────────────── 

50 

51 

52class SandboxStatus(str, Enum): 

53 CREATED = "created" 

54 RUNNING = "running" 

55 COMPLETED = "completed" 

56 TIMEOUT = "timeout" 

57 ERROR = "error" 

58 KILLED = "killed" 

59 

60 

61class Language(str, Enum): 

62 PYTHON = "python" 

63 BASH = "bash" 

64 NODE = "node" 

65 

66 

67@dataclass 

68class SandboxConfig: 

69 """沙箱配置。""" 

70 

71 image: str = "python:3.11-slim" # Docker 镜像 

72 language: Language = Language.PYTHON 

73 timeout_s: float = 30.0 # 执行超时 

74 memory_mb: int = 512 # 内存限制 (兼容测试) 

75 max_cpu_cores: float = 1.0 # CPU 限制 

76 max_disk_mb: int = 100 # 磁盘限制 

77 network_enabled: bool = False # 网络访问(默认关闭) 

78 read_only_rootfs: bool = True # 只读根文件系统 

79 allow_write: bool = True # 是否允许写入 scratch 空间 

80 

81 max_memory_mb = memory_mb # 别名,指向同一默认值 

82 

83 def __post_init__(self): 

84 self.max_memory_mb = self.memory_mb 

85 

86 # Paths 

87 work_dir: str = "/sandbox" # 工作目录 

88 scratch_dir: str = "/tmp/scratch" # 可写临时空间 

89 

90 # Safety 

91 dangerous_imports: List[str] = field(default_factory=lambda: [ 

92 "os.system", "subprocess", "shutil.rmtree", "__import__('os')", 

93 "eval(", "exec(", "compile(", "pty", "ctypes", 

94 ]) 

95 

96 def to_docker_args(self, container_name: str) -> List[str]: 

97 """生成 docker run 参数。""" 

98 args = [ 

99 "docker", "run", 

100 "--rm", 

101 "--name", container_name, 

102 "--cpus", str(self.max_cpu_cores), 

103 "--memory=" f"{self.max_memory_mb}m", 

104 "--storage-opt", f"size={self.max_disk_mb}m", 

105 "--workdir", self.work_dir, 

106 ] 

107 

108 if not self.network_enabled: 

109 args.append("--network=none") 

110 

111 if self.read_only_rootfs: 

112 args.extend(["--read-only"]) 

113 # tmpfs for writable scratch 

114 args.extend([ 

115 "--tmpfs", "/tmp:exec,size=200m", 

116 "--tmpfs", f"{self.scratch_dir}:exec,size=100m", 

117 ]) 

118 

119 return args 

120 

121 

122@dataclass 

123class SandboxResult: 

124 """沙箱执行结果。""" 

125 

126 execution_id: str = field(default_factory=lambda: f"exec-{uuid.uuid4().hex[:8]}") 

127 status: SandboxStatus = SandboxStatus.CREATED 

128 stdout: str = "" 

129 stderr: str = "" 

130 exit_code: int = -1 

131 elapsed_s: float = 0.0 

132 error: str = "" 

133 truncated: bool = False # 输出是否被截断 

134 

135 max_output_bytes: int = 1024 * 100 # 100KB max output 

136 

137 def to_dict(self) -> dict: 

138 return { 

139 "execution_id": self.execution_id, 

140 "status": self.status.value, 

141 "stdout_preview": self.stdout[:500], 

142 "stderr_preview": self.stderr[:500], 

143 "exit_code": self.exit_code, 

144 "elapsed_s": self.elapsed_s, 

145 "error": self.error, 

146 } 

147 

148 

149# ── Code Validator ────────────────────────── 

150 

151 

152class CodeValidator: 

153 """代码安全校验器。""" 

154 

155 # Python 危险模式 

156 PY_DANGEROUS_PATTERNS = [ 

157 r"os\.system\s*\(", 

158 r"subprocess\.(call|run|Popen|check_output)\s*\(", 

159 r"shutil\.rmtree\s*\(", 

160 r"__import__\s*\(\s*['\"]os['\"]", 

161 r"eval\s*\(", 

162 r"exec\s*\(", 

163 r"compile\s*\(", 

164 r"importlib\.import_module\s*\(", 

165 r"ctypes\.", 

166 r"os\.remove\s*\(", 

167 r"os\.unlink\s*\(", 

168 r"os\.rmdir\s*\(", 

169 r"socket\.", 

170 r"requests\.(get|post|put|delete|patch)", 

171 r"urllib\.", 

172 r"open\s*\([^)]*['\"]w", 

173 r"pty\.", 

174 r"multiprocessing\.", 

175 ] 

176 

177 # Bash 危险模式 

178 SH_DANGEROUS_PATTERNS = [ 

179 r"rm\s+-rf\s+/", 

180 r"mkfs\.", 

181 r"dd\s+if=", 

182 r">\s*/dev/", 

183 r"chmod\s+777", 

184 r"wget\s+", 

185 r"curl\s+", 

186 r"nc\s+", 

187 r"telnet\s+", 

188 ] 

189 

190 @classmethod 

191 def validate_python(cls, code: str) -> Tuple[bool, List[str]]: 

192 """校验 Python 代码安全性。""" 

193 violations = [] 

194 for pattern in cls.PY_DANGEROUS_PATTERNS: 

195 if re.search(pattern, code, re.IGNORECASE): 

196 violations.append(f"Dangerous pattern: {pattern}") 

197 return len(violations) == 0, violations 

198 

199 @classmethod 

200 def validate_bash(cls, code: str) -> Tuple[bool, List[str]]: 

201 """校验 Bash 代码安全性。""" 

202 violations = [] 

203 for pattern in cls.SH_DANGEROUS_PATTERNS: 

204 if re.search(pattern, code, re.IGNORECASE): 

205 violations.append(f"Dangerous pattern: {pattern}") 

206 return len(violations) == 0, violations 

207 

208 @classmethod 

209 def validate(cls, code: str, language: Language) -> Tuple[bool, List[str]]: 

210 if language == Language.PYTHON: 

211 return cls.validate_python(code) 

212 elif language == Language.BASH: 

213 return cls.validate_bash(code) 

214 return True, [] 

215 

216 

217# ── Docker Sandbox ────────────────────────── 

218 

219 

220class DockerSandbox: 

221 """Docker 沙箱执行器。 

222 

223 Usage: 

224 sandbox = DockerSandbox(SandboxConfig()) 

225 result = sandbox.run("print('hello world')") 

226 print(result.stdout) # "hello world" 

227 """ 

228 

229 def __init__(self, config: Optional[SandboxConfig] = None): 

230 self._config = config or SandboxConfig() 

231 self._validator = CodeValidator() 

232 self._execution_count: int = 0 

233 self._rate_limit_window: float = 60.0 # 1 minute 

234 self._max_executions_per_window: int = 100 

235 self._execution_timestamps: List[float] = [] 

236 

237 def run(self, code: str, language: Optional[Language] = None) -> SandboxResult: 

238 """在沙箱中执行代码。""" 

239 lang = language or self._config.language 

240 result = SandboxResult() 

241 

242 # Rate limiting 

243 if not self._check_rate_limit(): 

244 result.status = SandboxStatus.ERROR 

245 result.error = "Rate limit exceeded. Max 100 executions/minute." 

246 return result 

247 

248 # Validate code 

249 safe, violations = self._validator.validate(code, lang) 

250 if not safe: 

251 result.status = SandboxStatus.ERROR 

252 result.error = f"Code validation failed: {', '.join(violations)}" 

253 return result 

254 

255 # Check Docker availability 

256 if not self._docker_available(): 

257 result.status = SandboxStatus.ERROR 

258 result.error = "Docker is not available on this system" 

259 return result 

260 

261 # Execute 

262 container_name = f"agentos-sandbox-{result.execution_id}" 

263 start = time.time() 

264 

265 try: 

266 if lang == Language.PYTHON: 

267 result = self._run_python(code, container_name, result) 

268 elif lang == Language.BASH: 

269 result = self._run_bash(code, container_name, result) 

270 else: 

271 result.status = SandboxStatus.ERROR 

272 result.error = f"Unsupported language: {lang}" 

273 except Exception as e: 

274 result.status = SandboxStatus.ERROR 

275 result.error = str(e) 

276 finally: 

277 # Cleanup just in case 

278 self._cleanup(container_name) 

279 

280 result.elapsed_s = time.time() - start 

281 self._execution_count += 1 

282 self._execution_timestamps.append(time.time()) 

283 

284 return result 

285 

286 def run_batch( 

287 self, 

288 code_blocks: List[str], 

289 language: Optional[Language] = None, 

290 ) -> List[SandboxResult]: 

291 """批量执行代码块。""" 

292 return [self.run(code, language) for code in code_blocks] 

293 

294 def _run_python(self, code: str, container_name: str, result: SandboxResult) -> SandboxResult: 

295 """执行 Python 代码。""" 

296 # Write code to temp file 

297 code_hash = hashlib.sha256(code.encode()).hexdigest()[:12] 

298 tmp_path = Path(tempfile.gettempdir()) / f"agentos_sb_{code_hash}.py" 

299 tmp_path.write_text(code, encoding="utf-8") 

300 

301 args = self._config.to_docker_args(container_name) 

302 args.extend([ 

303 "-v", f"{tmp_path}:/sandbox/script.py:ro", 

304 self._config.image, 

305 "timeout", str(int(self._config.timeout_s)), 

306 "python", "-u", "/sandbox/script.py", 

307 ]) 

308 

309 try: 

310 proc = subprocess.run( 

311 args, 

312 capture_output=True, 

313 timeout=self._config.timeout_s + 5, 

314 text=True, 

315 ) 

316 

317 result.exit_code = proc.returncode 

318 result.stdout = proc.stdout[:result.max_output_bytes] 

319 result.stderr = proc.stderr[:result.max_output_bytes] 

320 

321 if len(proc.stdout) > result.max_output_bytes: 

322 result.truncated = True 

323 

324 if proc.returncode == 124: # timeout kill 

325 result.status = SandboxStatus.TIMEOUT 

326 elif proc.returncode == 137: # OOM kill 

327 result.status = SandboxStatus.KILLED 

328 result.error = "Out of memory" 

329 elif proc.returncode != 0: 

330 result.status = SandboxStatus.COMPLETED 

331 else: 

332 result.status = SandboxStatus.COMPLETED 

333 

334 except subprocess.TimeoutExpired: 

335 result.status = SandboxStatus.TIMEOUT 

336 result.error = f"Host timeout after {self._config.timeout_s + 5}s" 

337 self._cleanup(container_name) 

338 finally: 

339 # Clean temp file 

340 try: 

341 tmp_path.unlink() 

342 except Exception: 

343 pass 

344 

345 return result 

346 

347 def _run_bash(self, code: str, container_name: str, result: SandboxResult) -> SandboxResult: 

348 """执行 Bash 脚本。""" 

349 args = self._config.to_docker_args(container_name) 

350 args.extend([ 

351 self._config.image, 

352 "timeout", str(int(self._config.timeout_s)), 

353 "bash", "-c", code, 

354 ]) 

355 

356 try: 

357 proc = subprocess.run( 

358 args, 

359 capture_output=True, 

360 timeout=self._config.timeout_s + 5, 

361 text=True, 

362 ) 

363 result.exit_code = proc.returncode 

364 result.stdout = proc.stdout[:result.max_output_bytes] 

365 result.stderr = proc.stderr[:result.max_output_bytes] 

366 result.status = SandboxStatus.COMPLETED if proc.returncode == 0 else SandboxStatus.COMPLETED 

367 

368 except subprocess.TimeoutExpired: 

369 result.status = SandboxStatus.TIMEOUT 

370 self._cleanup(container_name) 

371 

372 return result 

373 

374 def _docker_available(self) -> bool: 

375 """检查 Docker 是否可用。""" 

376 try: 

377 subprocess.run( 

378 ["docker", "info"], 

379 capture_output=True, 

380 timeout=5, 

381 ) 

382 return True 

383 except Exception: 

384 logger.warning("Docker is not available") 

385 return False 

386 

387 def _cleanup(self, container_name: str) -> None: 

388 """清理容器。""" 

389 try: 

390 subprocess.run( 

391 ["docker", "rm", "-f", container_name], 

392 capture_output=True, 

393 timeout=5, 

394 ) 

395 except Exception: 

396 pass 

397 

398 def _check_rate_limit(self) -> bool: 

399 """检查速率限制。""" 

400 now = time.time() 

401 cutoff = now - self._rate_limit_window 

402 self._execution_timestamps = [ 

403 t for t in self._execution_timestamps if t > cutoff 

404 ] 

405 return len(self._execution_timestamps) < self._max_executions_per_window 

406 

407 

408# ── Evolution Runner ──────────────────────── 

409 

410 

411@dataclass 

412class EvolutionStep: 

413 """自进化的一步。""" 

414 

415 step_id: str = field(default_factory=lambda: f"ev-{uuid.uuid4().hex[:8]}") 

416 iteration: int = 0 

417 prompt: str = "" # 指导 Agent 生成代码的 prompt 

418 generated_code: str = "" 

419 test_code: str = "" 

420 result: Optional[SandboxResult] = None 

421 test_result: Optional[SandboxResult] = None 

422 score: float = 0.0 

423 accepted: bool = False 

424 error: str = "" 

425 

426 

427class SelfEvolutionRunner: 

428 """自进化执行器。 

429 

430 让 Agent 在安全沙箱中迭代生成、测试、改进代码。 

431 每次迭代自动评分,保留最优版本。 

432 

433 Usage: 

434 runner = SelfEvolutionRunner(sandbox) 

435 result = await runner.evolve( 

436 prompt="Write a function that sorts a list with quicksort", 

437 test_cases=["assert quicksort([3,1,2]) == [1,2,3]"], 

438 max_iterations=5, 

439 ) 

440 """ 

441 

442 def __init__(self, sandbox: Optional[DockerSandbox] = None): 

443 self._sandbox = sandbox or DockerSandbox() 

444 self._evolution_history: List[EvolutionStep] = [] 

445 self._best_step: Optional[EvolutionStep] = None 

446 

447 def evolve( 

448 self, 

449 prompt: str, 

450 test_cases: List[str], 

451 max_iterations: int = 5, 

452 target_score: float = 1.0, 

453 ) -> EvolutionStep: 

454 """执行自进化循环。 

455 

456 Args: 

457 prompt: 自然语言描述的功能需求 

458 test_cases: 测试用例(Python assert 语句) 

459 max_iterations: 最大迭代次数 

460 target_score: 目标分数(1.0 = 全部通过) 

461 

462 Returns: 

463 最优的 EvolutionStep 

464 """ 

465 self._evolution_history = [] 

466 self._best_step = None 

467 best_score = 0.0 

468 

469 for i in range(max_iterations): 

470 step = EvolutionStep( 

471 iteration=i, 

472 prompt=prompt, 

473 ) 

474 

475 # Step 1: Generate code (in real use, Agent generates via LLM) 

476 # Here we provide a scaffold; the Agent fills in the implementation 

477 step.generated_code = self._generate_code_scaffold(prompt, i) 

478 

479 # Step 2: Run generated code in sandbox 

480 step.result = self._sandbox.run(step.generated_code) 

481 

482 if step.result.status != SandboxStatus.COMPLETED: 

483 step.error = f"Code execution failed: {step.result.stderr[:200]}" 

484 self._evolution_history.append(step) 

485 continue 

486 

487 # Step 3: Run tests 

488 test_code = self._build_test_code(step.generated_code, test_cases) 

489 step.test_code = test_code 

490 step.test_result = self._sandbox.run(test_code) 

491 

492 # Step 4: Score 

493 step.score = self._score(step, test_cases) 

494 step.accepted = step.score > best_score 

495 

496 if step.accepted: 

497 best_score = step.score 

498 self._best_step = step 

499 

500 self._evolution_history.append(step) 

501 

502 # Early exit 

503 if step.score >= target_score: 

504 break 

505 

506 return self._best_step or self._evolution_history[-1] 

507 

508 def _generate_code_scaffold(self, prompt: str, iteration: int) -> str: 

509 """生成代码骨架(实际应由 Agent + LLM 完成)。""" 

510 return f"""# Iteration {iteration} 

511# Prompt: {prompt} 

512 

513def quicksort(arr): 

514 if len(arr) <= 1: 

515 return arr 

516 pivot = arr[len(arr) // 2] 

517 left = [x for x in arr if x < pivot] 

518 middle = [x for x in arr if x == pivot] 

519 right = [x for x in arr if x > pivot] 

520 return quicksort(left) + middle + quicksort(right) 

521 

522# Test the function 

523print(quicksort([3, 6, 8, 10, 1, 2, 1])) 

524""" 

525 

526 def _build_test_code(self, code: str, test_cases: List[str]) -> str: 

527 """构建测试代码。""" 

528 test_code = code + "\n\n# Auto-generated tests\n" 

529 test_code += "test_results = []\n" 

530 for tc in test_cases: 

531 test_code += f"try:\n {tc}\n test_results.append(('PASS', '{tc[:50]}'))\n" 

532 test_code += f"except AssertionError as e:\n test_results.append(('FAIL', '{tc[:50]}: ' + str(e)))\n" 

533 test_code += "\nfor status, msg in test_results:\n print(f'{status}: {msg}')\n" 

534 test_code += f"\nprint(f'\\n{{sum(1 for s,_ in test_results if s==\"PASS\")}}/{len(test_cases)} tests passed')" 

535 return test_code 

536 

537 def _score(self, step: EvolutionStep, test_cases: List[str]) -> float: 

538 """根据测试结果计算分数。""" 

539 if not step.test_result or step.test_result.status != SandboxStatus.COMPLETED: 

540 return 0.0 

541 

542 output = step.test_result.stdout 

543 passed = output.count("PASS:") 

544 total = len(test_cases) 

545 if total == 0: 

546 return 1.0 

547 return passed / total 

548 

549 @property 

550 def history(self) -> List[EvolutionStep]: 

551 return list(self._evolution_history) 

552 

553 

554# ── Quick Start ───────────────────────────── 

555 

556 

557def create_sandbox( 

558 image: str = "python:3.11-slim", 

559 timeout_s: float = 30.0, 

560 network: bool = False, 

561) -> DockerSandbox: 

562 """快速创建安全沙箱。""" 

563 config = SandboxConfig( 

564 image=image, 

565 timeout_s=timeout_s, 

566 network_enabled=network, 

567 ) 

568 return DockerSandbox(config)