Coverage for agentos/swarm/code_sandbox.py: 28%
157 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2v1.9.5: Isolated Code Sandbox for safe agent code generation & execution.
4Runs generated code in subprocess with timeout, memory limits,
5test case validation, and structured error extraction for feedback loops.
6"""
8from __future__ import annotations
10import asyncio
11import os
12import resource
13import signal
14import subprocess
15import tempfile
16import traceback
17from dataclasses import dataclass, field
18from typing import Any
21@dataclass
22class SandboxResult:
23 """Result of a sandbox execution."""
25 success: bool = False
26 exit_code: int = -1
27 stdout: str = ""
28 stderr: str = ""
29 exception: str = ""
30 duration: float = 0.0
31 max_memory_mb: float = 0.0
32 test_results: list[dict] = field(default_factory=list) # per-test-case results
33 all_passed: bool = False
35 def to_dict(self) -> dict:
36 return {
37 "success": self.success,
38 "exit_code": self.exit_code,
39 "stdout": self.stdout[:500],
40 "stderr": self.stderr[:500],
41 "exception": self.exception[:300],
42 "duration": f"{self.duration:.2f}s",
43 "max_memory_mb": f"{self.max_memory_mb:.1f}",
44 "test_results": self.test_results,
45 "all_passed": self.all_passed,
46 }
49@dataclass
50class TestCase:
51 """A single test case for code validation."""
53 name: str
54 input_args: tuple = ()
55 input_kwargs: dict = field(default_factory=dict)
56 expected_output: Any = None
57 expected_type: str = "" # e.g. "int", "str", "list"
58 expected_exception: str = "" # e.g. "ValueError"
59 weight: float = 1.0
61 def to_dict(self) -> dict:
62 d: dict = {"name": self.name, "input_args": str(self.input_args)}
63 if self.expected_output is not None:
64 d["expected_output"] = str(self.expected_output)[:100]
65 if self.expected_exception:
66 d["expected_exception"] = self.expected_exception
67 return d
70class CodeSandbox:
71 """Isolated execution environment for agent-generated code.
73 Usage:
74 sandbox = CodeSandbox(timeout=30, max_memory_mb=256)
75 result = sandbox.run(
76 code="def add(a, b): return a + b",
77 func_name="add",
78 test_cases=[TestCase(name="1+2", args=(1, 2), expected=3)]
79 )
80 """
82 def __init__(
83 self,
84 timeout: float = 30.0,
85 max_memory_mb: int = 256,
86 allow_imports: bool = True,
87 forbidden_modules: list[str] | None = None,
88 ):
89 self.timeout = timeout
90 self.max_memory_mb = max_memory_mb
91 self.allow_imports = allow_imports
92 self.forbidden_modules = forbidden_modules or [
93 "os.system", "subprocess", "shutil", "socket",
94 "requests", "urllib", "http", "ftp",
95 ]
97 def run(
98 self,
99 code: str,
100 func_name: str = "",
101 test_cases: list[TestCase] | None = None,
102 setup_code: str = "",
103 ) -> SandboxResult:
104 """Execute code in sandbox with test cases.
106 Args:
107 code: The code to execute
108 func_name: Name of the function to test
109 test_cases: List of test cases to run against func_name
110 setup_code: Setup code to run before tests (imports, fixtures)
112 Returns:
113 SandboxResult with execution details and test outcomes
114 """
115 test_cases = test_cases or []
117 # Security check
118 security_issues = self._check_security(code)
119 if security_issues:
120 return SandboxResult(
121 success=False,
122 stderr=f"SECURITY VIOLATION: {security_issues}",
123 exception="Security check failed",
124 )
126 # Syntactic check
127 syntax_ok, syntax_err = self._check_syntax(code)
128 if not syntax_ok:
129 return SandboxResult(
130 success=False,
131 stderr=syntax_err,
132 exception=f"Syntax error: {syntax_err}",
133 )
135 # Write code to temp file and execute
136 with tempfile.NamedTemporaryFile(
137 mode="w", suffix=".py", delete=False,
138 prefix="sandbox_"
139 ) as f:
140 script = self._build_script(code, func_name, test_cases, setup_code)
141 f.write(script)
142 script_path = f.name
144 try:
145 result = self._execute_script(script_path)
146 finally:
147 try:
148 os.unlink(script_path)
149 except OSError:
150 pass
152 return result
154 def _check_security(self, code: str) -> str:
155 """Check for forbidden patterns."""
156 issues = []
157 code_lower = code.lower()
159 # Check forbidden modules
160 for mod in self.forbidden_modules:
161 import_pattern = mod.replace(".", ".")
162 if f"import {import_pattern}" in code_lower:
163 issues.append(f"Forbidden import: {mod}")
164 if f"from {import_pattern}" in code_lower:
165 issues.append(f"Forbidden import: {mod}")
167 # Check dangerous builtins
168 dangerous = [
169 ("eval(", "eval() is forbidden"),
170 ("exec(", "exec() is forbidden"),
171 ("__import__(", "__import__() is forbidden"),
172 ("open(", "File I/O blocked in sandbox"),
173 ]
174 for pattern, msg in dangerous:
175 if pattern in code_lower:
176 issues.append(msg)
178 return "; ".join(issues) if issues else ""
180 def _check_syntax(self, code: str) -> tuple[bool, str]:
181 """Check Python syntax."""
182 try:
183 compile(code, "<sandbox>", "exec")
184 return True, ""
185 except SyntaxError as e:
186 return False, f"Line {e.lineno}: {e.msg}"
188 def _build_script(
189 self,
190 code: str,
191 func_name: str,
192 test_cases: list[TestCase],
193 setup_code: str,
194 ) -> str:
195 """Build the full sandbox execution script."""
197 tc_defs = []
198 tc_names = []
199 for tc in test_cases:
200 tc_names.append(tc.name)
201 tc_defs.append(
202 f' "{tc.name}": {{'
203 f'"args": {list(tc.input_args)}, '
204 f'"kwargs": {tc.input_kwargs}, '
205 f'"expected": {repr(tc.expected_output) if tc.expected_output is not None else None}, '
206 f'"expected_type": "{tc.expected_type}", '
207 f'"expected_exception": "{tc.expected_exception}"'
208 f"}}"
209 )
211 tc_dict = ",\n".join(tc_defs)
212 tc_list = ", ".join(f'"{n}"' for n in tc_names)
214 return f'''#!/usr/bin/env python3
215"""Sandbox execution script — auto-generated by CodeSandbox."""
216import json
217import sys
218import time
219import traceback
220import os
222# Limit memory
223try:
224 import resource
225 resource.setrlimit(resource.RLIMIT_AS, ({self.max_memory_mb} * 1024 * 1024, {self.max_memory_mb} * 1024 * 1024))
226except Exception:
227 pass
229# Setup code
230{setup_code}
232# User code
233{code}
235# Test runner
236test_cases = {{
237{tc_dict}
238}}
240func_name = "{func_name}"
241has_func = func_name and func_name in dir()
243results = []
244total = 0
245passed = 0
247if not has_func and len(test_cases) == 0:
248 print("__SANDBOX_OK__")
249 print(json.dumps({{"status": "executed", "message": "Code executed, no tests"}}))
250 sys.exit(0)
252if not has_func and len(test_cases) > 0:
253 print("__SANDBOX_ERR__")
254 print(json.dumps({{"error": f"Function '{{func_name}}' not found in code"}}))
255 sys.exit(1)
257func = eval(func_name)
258all_test_order = [{tc_list}]
260for tc_name in all_test_order:
261 tc = test_cases[tc_name]
262 total += 1
263 start = time.time()
264 try:
265 output = func(*tc["args"], **tc["kwargs"])
266 elapsed = time.time() - start
268 expected = tc.get("expected")
269 expected_type = tc.get("expected_type")
270 expected_exc = tc.get("expected_exception")
272 if expected_exc:
273 passed_ = False
274 detail = f"Expected exception {{expected_exc}} but none raised"
275 elif expected_type and not isinstance(output, eval(expected_type)):
276 passed_ = False
277 detail = f"Type mismatch: got {{type(output).__name__}}, expected {{expected_type}}"
278 elif expected is not None and output != expected:
279 passed_ = False
280 detail = f"Expected {{repr(expected)}}, got {{repr(output)}}"
281 else:
282 passed_ = True
283 detail = "OK"
285 if passed_:
286 passed += 1
288 results.append({{
289 "name": tc_name,
290 "passed": passed_,
291 "output": repr(output)[:200],
292 "expected": repr(expected)[:200],
293 "detail": detail,
294 "duration_ms": round(elapsed * 1000, 2),
295 }})
296 except Exception as e:
297 exc_name = type(e).__name__
298 expected_exc = tc.get("expected_exception", "")
300 if expected_exc and exc_name == expected_exc:
301 passed_ = True
302 detail = f"Expected exception {{exc_name}} raised"
303 passed += 1
304 else:
305 passed_ = False
306 detail = f"{{exc_name}}: {{str(e)[:200]}}"
308 results.append({{
309 "name": tc_name,
310 "passed": passed_,
311 "output": "",
312 "expected": repr(tc.get("expected"))[:200],
313 "detail": detail,
314 "duration_ms": 0,
315 }})
317print("__SANDBOX_RESULTS__")
318print(json.dumps({{
319 "status": "complete",
320 "total": total,
321 "passed": passed,
322 "failed": total - passed,
323 "all_passed": total > 0 and passed == total,
324 "test_results": results,
325}}))
326'''
328 def _execute_script(self, script_path: str) -> SandboxResult:
329 """Execute the sandbox script as subprocess."""
330 try:
331 start = time_module()
332 proc = subprocess.run(
333 ["python3", script_path],
334 capture_output=True,
335 text=True,
336 timeout=self.timeout,
337 cwd="/tmp",
338 env={
339 **os.environ,
340 "PYTHONDONTWRITEBYTECODE": "1",
341 "PYTHONPATH": "",
342 "SANDBOX_MODE": "1",
343 },
344 )
345 duration = time_module() - start
346 except subprocess.TimeoutExpired as e:
347 return SandboxResult(
348 success=False,
349 exit_code=-1,
350 stderr=str(e.stdout or "") if e.stdout else "",
351 exception=f"Timeout after {self.timeout}s",
352 duration=self.timeout,
353 )
354 except Exception as e:
355 return SandboxResult(
356 success=False,
357 exception=str(e),
358 stderr=traceback.format_exc(),
359 )
361 result = SandboxResult(
362 exit_code=proc.returncode,
363 stdout=proc.stdout,
364 stderr=proc.stderr,
365 duration=duration,
366 )
368 # Parse test results
369 if "__SANDBOX_RESULTS__" in proc.stdout:
370 try:
371 lines = proc.stdout.split("\n")
372 json_start = False
373 json_text = ""
374 for line in lines:
375 if json_start:
376 json_text += line
377 try:
378 data = json.loads(json_text)
379 break
380 except json.JSONDecodeError:
381 continue
382 if "__SANDBOX_RESULTS__" in line:
383 json_start = True
385 if data:
386 result.test_results = data.get("test_results", [])
387 result.all_passed = data.get("all_passed", False)
388 passed = data.get("passed", 0)
389 total = data.get("total", 0)
390 result.success = total > 0 and passed == total
391 except Exception:
392 pass
394 elif proc.returncode == 0 and not proc.stderr:
395 result.success = True
397 # Extract meaningful error info for feedback
398 if not result.success and result.stderr:
399 lines = result.stderr.strip().split("\n")
400 # Get last 3 lines (most relevant error info)
401 result.exception = "\n".join(lines[-5:]) if len(lines) > 5 else result.stderr
403 return result
406def time_module() -> float:
407 """Get current time in seconds."""
408 import time as _time
409 return _time.time()
412class CodeFeedbackExtractor:
413 """Extracts actionable feedback from sandbox failures for retry loops."""
415 ERROR_PATTERNS = {
416 "NameError": "Variable or function not defined. Check spelling and scope.",
417 "TypeError": "Wrong type passed to function. Check argument types.",
418 "ValueError": "Invalid value for operation. Check parameter constraints.",
419 "IndexError": "List index out of range. Check bounds.",
420 "KeyError": "Dictionary key not found. Check key existence.",
421 "AttributeError": "Object has no such attribute. Check method/attribute name.",
422 "ImportError": "Missing import. Add 'import X' or 'from X import Y'.",
423 "SyntaxError": "Python syntax error. Check indentation, brackets, colons.",
424 "ZeroDivisionError": "Division by zero. Add guard for zero denominator.",
425 "RecursionError": "Recursion depth exceeded. Add base case or switch to iteration.",
426 "TimeoutError": "Code timed out. Check for infinite loops or optimize.",
427 }
429 @classmethod
430 def extract(cls, sandbox_result: SandboxResult) -> list[str]:
431 """Extract actionable feedback suggestions from sandbox result."""
432 suggestions: list[str] = []
434 # Check for security issues
435 if "SECURITY" in sandbox_result.stderr:
436 suggestions.append("Code violates sandbox security rules; avoid system calls and I/O.")
437 return suggestions
439 # Check test failures
440 for tc in sandbox_result.test_results:
441 if not tc.get("passed", True):
442 detail = tc.get("detail", "")
443 name = tc.get("name", "unknown")
444 suggestions.append(f"Test '{name}' failed: {detail}")
446 # Check error patterns
447 for error_type, suggestion in cls.ERROR_PATTERNS.items():
448 if error_type in sandbox_result.stderr or error_type in sandbox_result.exception:
449 if suggestion not in suggestions:
450 suggestions.append(suggestion)
452 # Add specific output mismatch advice
453 if not sandbox_result.success and not suggestions:
454 suggestions.append("Code failed to execute. Check logic and edge cases.")
455 if sandbox_result.stderr:
456 suggestions.append(f"Error: {sandbox_result.stderr.strip()[:200]}")
458 return suggestions