Coverage for agentos/sandbox/__init__.py: 0%
258 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 16:36 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 16:36 +0800
1"""
2AgentOS v1.14.6 — Self-Evolution Safe Sandbox.
4Docker-based isolated execution environment for agent self-improvement.
5Allows agents to generate, test, and iterate on code/tools without
6risking the host system.
8Features:
9- Docker container isolation (network/filesystem/process)
10- Resource limits (CPU, memory, disk, timeout)
11- Execution result capture (stdout, stderr, exit code)
12- Code safety validation (dangerous import blocklist)
13- Snapshot/rollback (Docker commit-based)
14- Rate limiting to prevent runaway loops
15- Audit trail for all executed code
17Security layers:
18 1. Docker namespace isolation
19 2. Seccomp profile (restricted syscalls)
20 3. Network disabled by default
21 4. Read-only rootfs option
22 5. tmpfs for writable scratch space
23 6. Resource cgroups limits
25Inspired by: E2B, Open Interpreter sandbox, Modal
26"""
28from __future__ import annotations
30import hashlib
31import json
32import logging
33import os
34import re
35import subprocess
36import tempfile
37import time
38import uuid
39from dataclasses import dataclass, field
40from enum import Enum
41from pathlib import Path
42from typing import (
43 Any, Dict, List, Optional, Tuple,
44)
46logger = logging.getLogger(__name__)
49# ── Types ───────────────────────────────────
52class SandboxStatus(str, Enum):
53 CREATED = "created"
54 RUNNING = "running"
55 COMPLETED = "completed"
56 TIMEOUT = "timeout"
57 ERROR = "error"
58 KILLED = "killed"
61class Language(str, Enum):
62 PYTHON = "python"
63 BASH = "bash"
64 NODE = "node"
67@dataclass
68class SandboxConfig:
69 """沙箱配置。"""
71 image: str = "python:3.11-slim" # Docker 镜像
72 language: Language = Language.PYTHON
73 timeout_s: float = 30.0 # 执行超时
74 memory_mb: int = 512 # 内存限制 (兼容测试)
75 max_cpu_cores: float = 1.0 # CPU 限制
76 max_disk_mb: int = 100 # 磁盘限制
77 network_enabled: bool = False # 网络访问(默认关闭)
78 read_only_rootfs: bool = True # 只读根文件系统
79 allow_write: bool = True # 是否允许写入 scratch 空间
81 max_memory_mb = memory_mb # 别名,指向同一默认值
83 def __post_init__(self):
84 self.max_memory_mb = self.memory_mb
86 # Paths
87 work_dir: str = "/sandbox" # 工作目录
88 scratch_dir: str = "/tmp/scratch" # 可写临时空间
90 # Safety
91 dangerous_imports: List[str] = field(default_factory=lambda: [
92 "os.system", "subprocess", "shutil.rmtree", "__import__('os')",
93 "eval(", "exec(", "compile(", "pty", "ctypes",
94 ])
96 def to_docker_args(self, container_name: str) -> List[str]:
97 """生成 docker run 参数。"""
98 args = [
99 "docker", "run",
100 "--rm",
101 "--name", container_name,
102 "--cpus", str(self.max_cpu_cores),
103 "--memory=" f"{self.max_memory_mb}m",
104 "--storage-opt", f"size={self.max_disk_mb}m",
105 "--workdir", self.work_dir,
106 ]
108 if not self.network_enabled:
109 args.append("--network=none")
111 if self.read_only_rootfs:
112 args.extend(["--read-only"])
113 # tmpfs for writable scratch
114 args.extend([
115 "--tmpfs", "/tmp:exec,size=200m",
116 "--tmpfs", f"{self.scratch_dir}:exec,size=100m",
117 ])
119 return args
122@dataclass
123class SandboxResult:
124 """沙箱执行结果。"""
126 execution_id: str = field(default_factory=lambda: f"exec-{uuid.uuid4().hex[:8]}")
127 status: SandboxStatus = SandboxStatus.CREATED
128 stdout: str = ""
129 stderr: str = ""
130 exit_code: int = -1
131 elapsed_s: float = 0.0
132 error: str = ""
133 truncated: bool = False # 输出是否被截断
135 max_output_bytes: int = 1024 * 100 # 100KB max output
137 def to_dict(self) -> dict:
138 return {
139 "execution_id": self.execution_id,
140 "status": self.status.value,
141 "stdout_preview": self.stdout[:500],
142 "stderr_preview": self.stderr[:500],
143 "exit_code": self.exit_code,
144 "elapsed_s": self.elapsed_s,
145 "error": self.error,
146 }
149# ── Code Validator ──────────────────────────
152class CodeValidator:
153 """代码安全校验器。"""
155 # Python 危险模式
156 PY_DANGEROUS_PATTERNS = [
157 r"os\.system\s*\(",
158 r"subprocess\.(call|run|Popen|check_output)\s*\(",
159 r"shutil\.rmtree\s*\(",
160 r"__import__\s*\(\s*['\"]os['\"]",
161 r"eval\s*\(",
162 r"exec\s*\(",
163 r"compile\s*\(",
164 r"importlib\.import_module\s*\(",
165 r"ctypes\.",
166 r"os\.remove\s*\(",
167 r"os\.unlink\s*\(",
168 r"os\.rmdir\s*\(",
169 r"socket\.",
170 r"requests\.(get|post|put|delete|patch)",
171 r"urllib\.",
172 r"open\s*\([^)]*['\"]w",
173 r"pty\.",
174 r"multiprocessing\.",
175 ]
177 # Bash 危险模式
178 SH_DANGEROUS_PATTERNS = [
179 r"rm\s+-rf\s+/",
180 r"mkfs\.",
181 r"dd\s+if=",
182 r">\s*/dev/",
183 r"chmod\s+777",
184 r"wget\s+",
185 r"curl\s+",
186 r"nc\s+",
187 r"telnet\s+",
188 ]
190 @classmethod
191 def validate_python(cls, code: str) -> Tuple[bool, List[str]]:
192 """校验 Python 代码安全性。"""
193 violations = []
194 for pattern in cls.PY_DANGEROUS_PATTERNS:
195 if re.search(pattern, code, re.IGNORECASE):
196 violations.append(f"Dangerous pattern: {pattern}")
197 return len(violations) == 0, violations
199 @classmethod
200 def validate_bash(cls, code: str) -> Tuple[bool, List[str]]:
201 """校验 Bash 代码安全性。"""
202 violations = []
203 for pattern in cls.SH_DANGEROUS_PATTERNS:
204 if re.search(pattern, code, re.IGNORECASE):
205 violations.append(f"Dangerous pattern: {pattern}")
206 return len(violations) == 0, violations
208 @classmethod
209 def validate(cls, code: str, language: Language) -> Tuple[bool, List[str]]:
210 if language == Language.PYTHON:
211 return cls.validate_python(code)
212 elif language == Language.BASH:
213 return cls.validate_bash(code)
214 return True, []
217# ── Docker Sandbox ──────────────────────────
220class DockerSandbox:
221 """Docker 沙箱执行器。
223 Usage:
224 sandbox = DockerSandbox(SandboxConfig())
225 result = sandbox.run("print('hello world')")
226 print(result.stdout) # "hello world"
227 """
229 def __init__(self, config: Optional[SandboxConfig] = None):
230 self._config = config or SandboxConfig()
231 self._validator = CodeValidator()
232 self._execution_count: int = 0
233 self._rate_limit_window: float = 60.0 # 1 minute
234 self._max_executions_per_window: int = 100
235 self._execution_timestamps: List[float] = []
237 def run(self, code: str, language: Optional[Language] = None) -> SandboxResult:
238 """在沙箱中执行代码。"""
239 lang = language or self._config.language
240 result = SandboxResult()
242 # Rate limiting
243 if not self._check_rate_limit():
244 result.status = SandboxStatus.ERROR
245 result.error = "Rate limit exceeded. Max 100 executions/minute."
246 return result
248 # Validate code
249 safe, violations = self._validator.validate(code, lang)
250 if not safe:
251 result.status = SandboxStatus.ERROR
252 result.error = f"Code validation failed: {', '.join(violations)}"
253 return result
255 # Check Docker availability
256 if not self._docker_available():
257 result.status = SandboxStatus.ERROR
258 result.error = "Docker is not available on this system"
259 return result
261 # Execute
262 container_name = f"agentos-sandbox-{result.execution_id}"
263 start = time.time()
265 try:
266 if lang == Language.PYTHON:
267 result = self._run_python(code, container_name, result)
268 elif lang == Language.BASH:
269 result = self._run_bash(code, container_name, result)
270 else:
271 result.status = SandboxStatus.ERROR
272 result.error = f"Unsupported language: {lang}"
273 except Exception as e:
274 result.status = SandboxStatus.ERROR
275 result.error = str(e)
276 finally:
277 # Cleanup just in case
278 self._cleanup(container_name)
280 result.elapsed_s = time.time() - start
281 self._execution_count += 1
282 self._execution_timestamps.append(time.time())
284 return result
286 def run_batch(
287 self,
288 code_blocks: List[str],
289 language: Optional[Language] = None,
290 ) -> List[SandboxResult]:
291 """批量执行代码块。"""
292 return [self.run(code, language) for code in code_blocks]
294 def _run_python(self, code: str, container_name: str, result: SandboxResult) -> SandboxResult:
295 """执行 Python 代码。"""
296 # Write code to temp file
297 code_hash = hashlib.sha256(code.encode()).hexdigest()[:12]
298 tmp_path = Path(tempfile.gettempdir()) / f"agentos_sb_{code_hash}.py"
299 tmp_path.write_text(code, encoding="utf-8")
301 args = self._config.to_docker_args(container_name)
302 args.extend([
303 "-v", f"{tmp_path}:/sandbox/script.py:ro",
304 self._config.image,
305 "timeout", str(int(self._config.timeout_s)),
306 "python", "-u", "/sandbox/script.py",
307 ])
309 try:
310 proc = subprocess.run(
311 args,
312 capture_output=True,
313 timeout=self._config.timeout_s + 5,
314 text=True,
315 )
317 result.exit_code = proc.returncode
318 result.stdout = proc.stdout[:result.max_output_bytes]
319 result.stderr = proc.stderr[:result.max_output_bytes]
321 if len(proc.stdout) > result.max_output_bytes:
322 result.truncated = True
324 if proc.returncode == 124: # timeout kill
325 result.status = SandboxStatus.TIMEOUT
326 elif proc.returncode == 137: # OOM kill
327 result.status = SandboxStatus.KILLED
328 result.error = "Out of memory"
329 elif proc.returncode != 0:
330 result.status = SandboxStatus.COMPLETED
331 else:
332 result.status = SandboxStatus.COMPLETED
334 except subprocess.TimeoutExpired:
335 result.status = SandboxStatus.TIMEOUT
336 result.error = f"Host timeout after {self._config.timeout_s + 5}s"
337 self._cleanup(container_name)
338 finally:
339 # Clean temp file
340 try:
341 tmp_path.unlink()
342 except Exception:
343 pass
345 return result
347 def _run_bash(self, code: str, container_name: str, result: SandboxResult) -> SandboxResult:
348 """执行 Bash 脚本。"""
349 args = self._config.to_docker_args(container_name)
350 args.extend([
351 self._config.image,
352 "timeout", str(int(self._config.timeout_s)),
353 "bash", "-c", code,
354 ])
356 try:
357 proc = subprocess.run(
358 args,
359 capture_output=True,
360 timeout=self._config.timeout_s + 5,
361 text=True,
362 )
363 result.exit_code = proc.returncode
364 result.stdout = proc.stdout[:result.max_output_bytes]
365 result.stderr = proc.stderr[:result.max_output_bytes]
366 result.status = SandboxStatus.COMPLETED if proc.returncode == 0 else SandboxStatus.COMPLETED
368 except subprocess.TimeoutExpired:
369 result.status = SandboxStatus.TIMEOUT
370 self._cleanup(container_name)
372 return result
374 def _docker_available(self) -> bool:
375 """检查 Docker 是否可用。"""
376 try:
377 subprocess.run(
378 ["docker", "info"],
379 capture_output=True,
380 timeout=5,
381 )
382 return True
383 except Exception:
384 logger.warning("Docker is not available")
385 return False
387 def _cleanup(self, container_name: str) -> None:
388 """清理容器。"""
389 try:
390 subprocess.run(
391 ["docker", "rm", "-f", container_name],
392 capture_output=True,
393 timeout=5,
394 )
395 except Exception:
396 pass
398 def _check_rate_limit(self) -> bool:
399 """检查速率限制。"""
400 now = time.time()
401 cutoff = now - self._rate_limit_window
402 self._execution_timestamps = [
403 t for t in self._execution_timestamps if t > cutoff
404 ]
405 return len(self._execution_timestamps) < self._max_executions_per_window
408# ── Evolution Runner ────────────────────────
411@dataclass
412class EvolutionStep:
413 """自进化的一步。"""
415 step_id: str = field(default_factory=lambda: f"ev-{uuid.uuid4().hex[:8]}")
416 iteration: int = 0
417 prompt: str = "" # 指导 Agent 生成代码的 prompt
418 generated_code: str = ""
419 test_code: str = ""
420 result: Optional[SandboxResult] = None
421 test_result: Optional[SandboxResult] = None
422 score: float = 0.0
423 accepted: bool = False
424 error: str = ""
427class SelfEvolutionRunner:
428 """自进化执行器。
430 让 Agent 在安全沙箱中迭代生成、测试、改进代码。
431 每次迭代自动评分,保留最优版本。
433 Usage:
434 runner = SelfEvolutionRunner(sandbox)
435 result = await runner.evolve(
436 prompt="Write a function that sorts a list with quicksort",
437 test_cases=["assert quicksort([3,1,2]) == [1,2,3]"],
438 max_iterations=5,
439 )
440 """
442 def __init__(self, sandbox: Optional[DockerSandbox] = None):
443 self._sandbox = sandbox or DockerSandbox()
444 self._evolution_history: List[EvolutionStep] = []
445 self._best_step: Optional[EvolutionStep] = None
447 def evolve(
448 self,
449 prompt: str,
450 test_cases: List[str],
451 max_iterations: int = 5,
452 target_score: float = 1.0,
453 ) -> EvolutionStep:
454 """执行自进化循环。
456 Args:
457 prompt: 自然语言描述的功能需求
458 test_cases: 测试用例(Python assert 语句)
459 max_iterations: 最大迭代次数
460 target_score: 目标分数(1.0 = 全部通过)
462 Returns:
463 最优的 EvolutionStep
464 """
465 self._evolution_history = []
466 self._best_step = None
467 best_score = 0.0
469 for i in range(max_iterations):
470 step = EvolutionStep(
471 iteration=i,
472 prompt=prompt,
473 )
475 # Step 1: Generate code (in real use, Agent generates via LLM)
476 # Here we provide a scaffold; the Agent fills in the implementation
477 step.generated_code = self._generate_code_scaffold(prompt, i)
479 # Step 2: Run generated code in sandbox
480 step.result = self._sandbox.run(step.generated_code)
482 if step.result.status != SandboxStatus.COMPLETED:
483 step.error = f"Code execution failed: {step.result.stderr[:200]}"
484 self._evolution_history.append(step)
485 continue
487 # Step 3: Run tests
488 test_code = self._build_test_code(step.generated_code, test_cases)
489 step.test_code = test_code
490 step.test_result = self._sandbox.run(test_code)
492 # Step 4: Score
493 step.score = self._score(step, test_cases)
494 step.accepted = step.score > best_score
496 if step.accepted:
497 best_score = step.score
498 self._best_step = step
500 self._evolution_history.append(step)
502 # Early exit
503 if step.score >= target_score:
504 break
506 return self._best_step or self._evolution_history[-1]
508 def _generate_code_scaffold(self, prompt: str, iteration: int) -> str:
509 """生成代码骨架(实际应由 Agent + LLM 完成)。"""
510 return f"""# Iteration {iteration}
511# Prompt: {prompt}
513def quicksort(arr):
514 if len(arr) <= 1:
515 return arr
516 pivot = arr[len(arr) // 2]
517 left = [x for x in arr if x < pivot]
518 middle = [x for x in arr if x == pivot]
519 right = [x for x in arr if x > pivot]
520 return quicksort(left) + middle + quicksort(right)
522# Test the function
523print(quicksort([3, 6, 8, 10, 1, 2, 1]))
524"""
526 def _build_test_code(self, code: str, test_cases: List[str]) -> str:
527 """构建测试代码。"""
528 test_code = code + "\n\n# Auto-generated tests\n"
529 test_code += "test_results = []\n"
530 for tc in test_cases:
531 test_code += f"try:\n {tc}\n test_results.append(('PASS', '{tc[:50]}'))\n"
532 test_code += f"except AssertionError as e:\n test_results.append(('FAIL', '{tc[:50]}: ' + str(e)))\n"
533 test_code += "\nfor status, msg in test_results:\n print(f'{status}: {msg}')\n"
534 test_code += f"\nprint(f'\\n{{sum(1 for s,_ in test_results if s==\"PASS\")}}/{len(test_cases)} tests passed')"
535 return test_code
537 def _score(self, step: EvolutionStep, test_cases: List[str]) -> float:
538 """根据测试结果计算分数。"""
539 if not step.test_result or step.test_result.status != SandboxStatus.COMPLETED:
540 return 0.0
542 output = step.test_result.stdout
543 passed = output.count("PASS:")
544 total = len(test_cases)
545 if total == 0:
546 return 1.0
547 return passed / total
549 @property
550 def history(self) -> List[EvolutionStep]:
551 return list(self._evolution_history)
554# ── Quick Start ─────────────────────────────
557def create_sandbox(
558 image: str = "python:3.11-slim",
559 timeout_s: float = 30.0,
560 network: bool = False,
561) -> DockerSandbox:
562 """快速创建安全沙箱。"""
563 config = SandboxConfig(
564 image=image,
565 timeout_s=timeout_s,
566 network_enabled=network,
567 )
568 return DockerSandbox(config)