Coverage for agentos/swarm/code_sandbox.py: 28%

157 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2v1.9.5: Isolated Code Sandbox for safe agent code generation & execution. 

3 

4Runs generated code in subprocess with timeout, memory limits, 

5test case validation, and structured error extraction for feedback loops. 

6""" 

7 

8from __future__ import annotations 

9 

10import asyncio 

11import os 

12import resource 

13import signal 

14import subprocess 

15import tempfile 

16import traceback 

17from dataclasses import dataclass, field 

18from typing import Any 

19 

20 

21@dataclass 

22class SandboxResult: 

23 """Result of a sandbox execution.""" 

24 

25 success: bool = False 

26 exit_code: int = -1 

27 stdout: str = "" 

28 stderr: str = "" 

29 exception: str = "" 

30 duration: float = 0.0 

31 max_memory_mb: float = 0.0 

32 test_results: list[dict] = field(default_factory=list) # per-test-case results 

33 all_passed: bool = False 

34 

35 def to_dict(self) -> dict: 

36 return { 

37 "success": self.success, 

38 "exit_code": self.exit_code, 

39 "stdout": self.stdout[:500], 

40 "stderr": self.stderr[:500], 

41 "exception": self.exception[:300], 

42 "duration": f"{self.duration:.2f}s", 

43 "max_memory_mb": f"{self.max_memory_mb:.1f}", 

44 "test_results": self.test_results, 

45 "all_passed": self.all_passed, 

46 } 

47 

48 

49@dataclass 

50class TestCase: 

51 """A single test case for code validation.""" 

52 

53 name: str 

54 input_args: tuple = () 

55 input_kwargs: dict = field(default_factory=dict) 

56 expected_output: Any = None 

57 expected_type: str = "" # e.g. "int", "str", "list" 

58 expected_exception: str = "" # e.g. "ValueError" 

59 weight: float = 1.0 

60 

61 def to_dict(self) -> dict: 

62 d: dict = {"name": self.name, "input_args": str(self.input_args)} 

63 if self.expected_output is not None: 

64 d["expected_output"] = str(self.expected_output)[:100] 

65 if self.expected_exception: 

66 d["expected_exception"] = self.expected_exception 

67 return d 

68 

69 

70class CodeSandbox: 

71 """Isolated execution environment for agent-generated code. 

72 

73 Usage: 

74 sandbox = CodeSandbox(timeout=30, max_memory_mb=256) 

75 result = sandbox.run( 

76 code="def add(a, b): return a + b", 

77 func_name="add", 

78 test_cases=[TestCase(name="1+2", args=(1, 2), expected=3)] 

79 ) 

80 """ 

81 

82 def __init__( 

83 self, 

84 timeout: float = 30.0, 

85 max_memory_mb: int = 256, 

86 allow_imports: bool = True, 

87 forbidden_modules: list[str] | None = None, 

88 ): 

89 self.timeout = timeout 

90 self.max_memory_mb = max_memory_mb 

91 self.allow_imports = allow_imports 

92 self.forbidden_modules = forbidden_modules or [ 

93 "os.system", "subprocess", "shutil", "socket", 

94 "requests", "urllib", "http", "ftp", 

95 ] 

96 

97 def run( 

98 self, 

99 code: str, 

100 func_name: str = "", 

101 test_cases: list[TestCase] | None = None, 

102 setup_code: str = "", 

103 ) -> SandboxResult: 

104 """Execute code in sandbox with test cases. 

105 

106 Args: 

107 code: The code to execute 

108 func_name: Name of the function to test 

109 test_cases: List of test cases to run against func_name 

110 setup_code: Setup code to run before tests (imports, fixtures) 

111 

112 Returns: 

113 SandboxResult with execution details and test outcomes 

114 """ 

115 test_cases = test_cases or [] 

116 

117 # Security check 

118 security_issues = self._check_security(code) 

119 if security_issues: 

120 return SandboxResult( 

121 success=False, 

122 stderr=f"SECURITY VIOLATION: {security_issues}", 

123 exception="Security check failed", 

124 ) 

125 

126 # Syntactic check 

127 syntax_ok, syntax_err = self._check_syntax(code) 

128 if not syntax_ok: 

129 return SandboxResult( 

130 success=False, 

131 stderr=syntax_err, 

132 exception=f"Syntax error: {syntax_err}", 

133 ) 

134 

135 # Write code to temp file and execute 

136 with tempfile.NamedTemporaryFile( 

137 mode="w", suffix=".py", delete=False, 

138 prefix="sandbox_" 

139 ) as f: 

140 script = self._build_script(code, func_name, test_cases, setup_code) 

141 f.write(script) 

142 script_path = f.name 

143 

144 try: 

145 result = self._execute_script(script_path) 

146 finally: 

147 try: 

148 os.unlink(script_path) 

149 except OSError: 

150 pass 

151 

152 return result 

153 

154 def _check_security(self, code: str) -> str: 

155 """Check for forbidden patterns.""" 

156 issues = [] 

157 code_lower = code.lower() 

158 

159 # Check forbidden modules 

160 for mod in self.forbidden_modules: 

161 import_pattern = mod.replace(".", ".") 

162 if f"import {import_pattern}" in code_lower: 

163 issues.append(f"Forbidden import: {mod}") 

164 if f"from {import_pattern}" in code_lower: 

165 issues.append(f"Forbidden import: {mod}") 

166 

167 # Check dangerous builtins 

168 dangerous = [ 

169 ("eval(", "eval() is forbidden"), 

170 ("exec(", "exec() is forbidden"), 

171 ("__import__(", "__import__() is forbidden"), 

172 ("open(", "File I/O blocked in sandbox"), 

173 ] 

174 for pattern, msg in dangerous: 

175 if pattern in code_lower: 

176 issues.append(msg) 

177 

178 return "; ".join(issues) if issues else "" 

179 

180 def _check_syntax(self, code: str) -> tuple[bool, str]: 

181 """Check Python syntax.""" 

182 try: 

183 compile(code, "<sandbox>", "exec") 

184 return True, "" 

185 except SyntaxError as e: 

186 return False, f"Line {e.lineno}: {e.msg}" 

187 

188 def _build_script( 

189 self, 

190 code: str, 

191 func_name: str, 

192 test_cases: list[TestCase], 

193 setup_code: str, 

194 ) -> str: 

195 """Build the full sandbox execution script.""" 

196 

197 tc_defs = [] 

198 tc_names = [] 

199 for tc in test_cases: 

200 tc_names.append(tc.name) 

201 tc_defs.append( 

202 f' "{tc.name}": {{' 

203 f'"args": {list(tc.input_args)}, ' 

204 f'"kwargs": {tc.input_kwargs}, ' 

205 f'"expected": {repr(tc.expected_output) if tc.expected_output is not None else None}, ' 

206 f'"expected_type": "{tc.expected_type}", ' 

207 f'"expected_exception": "{tc.expected_exception}"' 

208 f"}}" 

209 ) 

210 

211 tc_dict = ",\n".join(tc_defs) 

212 tc_list = ", ".join(f'"{n}"' for n in tc_names) 

213 

214 return f'''#!/usr/bin/env python3 

215"""Sandbox execution script — auto-generated by CodeSandbox.""" 

216import json 

217import sys 

218import time 

219import traceback 

220import os 

221 

222# Limit memory 

223try: 

224 import resource 

225 resource.setrlimit(resource.RLIMIT_AS, ({self.max_memory_mb} * 1024 * 1024, {self.max_memory_mb} * 1024 * 1024)) 

226except Exception: 

227 pass 

228 

229# Setup code 

230{setup_code} 

231 

232# User code 

233{code} 

234 

235# Test runner 

236test_cases = {{ 

237{tc_dict} 

238}} 

239 

240func_name = "{func_name}" 

241has_func = func_name and func_name in dir() 

242 

243results = [] 

244total = 0 

245passed = 0 

246 

247if not has_func and len(test_cases) == 0: 

248 print("__SANDBOX_OK__") 

249 print(json.dumps({{"status": "executed", "message": "Code executed, no tests"}})) 

250 sys.exit(0) 

251 

252if not has_func and len(test_cases) > 0: 

253 print("__SANDBOX_ERR__") 

254 print(json.dumps({{"error": f"Function '{{func_name}}' not found in code"}})) 

255 sys.exit(1) 

256 

257func = eval(func_name) 

258all_test_order = [{tc_list}] 

259 

260for tc_name in all_test_order: 

261 tc = test_cases[tc_name] 

262 total += 1 

263 start = time.time() 

264 try: 

265 output = func(*tc["args"], **tc["kwargs"]) 

266 elapsed = time.time() - start 

267 

268 expected = tc.get("expected") 

269 expected_type = tc.get("expected_type") 

270 expected_exc = tc.get("expected_exception") 

271 

272 if expected_exc: 

273 passed_ = False 

274 detail = f"Expected exception {{expected_exc}} but none raised" 

275 elif expected_type and not isinstance(output, eval(expected_type)): 

276 passed_ = False 

277 detail = f"Type mismatch: got {{type(output).__name__}}, expected {{expected_type}}" 

278 elif expected is not None and output != expected: 

279 passed_ = False 

280 detail = f"Expected {{repr(expected)}}, got {{repr(output)}}" 

281 else: 

282 passed_ = True 

283 detail = "OK" 

284 

285 if passed_: 

286 passed += 1 

287 

288 results.append({{ 

289 "name": tc_name, 

290 "passed": passed_, 

291 "output": repr(output)[:200], 

292 "expected": repr(expected)[:200], 

293 "detail": detail, 

294 "duration_ms": round(elapsed * 1000, 2), 

295 }}) 

296 except Exception as e: 

297 exc_name = type(e).__name__ 

298 expected_exc = tc.get("expected_exception", "") 

299 

300 if expected_exc and exc_name == expected_exc: 

301 passed_ = True 

302 detail = f"Expected exception {{exc_name}} raised" 

303 passed += 1 

304 else: 

305 passed_ = False 

306 detail = f"{{exc_name}}: {{str(e)[:200]}}" 

307 

308 results.append({{ 

309 "name": tc_name, 

310 "passed": passed_, 

311 "output": "", 

312 "expected": repr(tc.get("expected"))[:200], 

313 "detail": detail, 

314 "duration_ms": 0, 

315 }}) 

316 

317print("__SANDBOX_RESULTS__") 

318print(json.dumps({{ 

319 "status": "complete", 

320 "total": total, 

321 "passed": passed, 

322 "failed": total - passed, 

323 "all_passed": total > 0 and passed == total, 

324 "test_results": results, 

325}})) 

326''' 

327 

328 def _execute_script(self, script_path: str) -> SandboxResult: 

329 """Execute the sandbox script as subprocess.""" 

330 try: 

331 start = time_module() 

332 proc = subprocess.run( 

333 ["python3", script_path], 

334 capture_output=True, 

335 text=True, 

336 timeout=self.timeout, 

337 cwd="/tmp", 

338 env={ 

339 **os.environ, 

340 "PYTHONDONTWRITEBYTECODE": "1", 

341 "PYTHONPATH": "", 

342 "SANDBOX_MODE": "1", 

343 }, 

344 ) 

345 duration = time_module() - start 

346 except subprocess.TimeoutExpired as e: 

347 return SandboxResult( 

348 success=False, 

349 exit_code=-1, 

350 stderr=str(e.stdout or "") if e.stdout else "", 

351 exception=f"Timeout after {self.timeout}s", 

352 duration=self.timeout, 

353 ) 

354 except Exception as e: 

355 return SandboxResult( 

356 success=False, 

357 exception=str(e), 

358 stderr=traceback.format_exc(), 

359 ) 

360 

361 result = SandboxResult( 

362 exit_code=proc.returncode, 

363 stdout=proc.stdout, 

364 stderr=proc.stderr, 

365 duration=duration, 

366 ) 

367 

368 # Parse test results 

369 if "__SANDBOX_RESULTS__" in proc.stdout: 

370 try: 

371 lines = proc.stdout.split("\n") 

372 json_start = False 

373 json_text = "" 

374 for line in lines: 

375 if json_start: 

376 json_text += line 

377 try: 

378 data = json.loads(json_text) 

379 break 

380 except json.JSONDecodeError: 

381 continue 

382 if "__SANDBOX_RESULTS__" in line: 

383 json_start = True 

384 

385 if data: 

386 result.test_results = data.get("test_results", []) 

387 result.all_passed = data.get("all_passed", False) 

388 passed = data.get("passed", 0) 

389 total = data.get("total", 0) 

390 result.success = total > 0 and passed == total 

391 except Exception: 

392 pass 

393 

394 elif proc.returncode == 0 and not proc.stderr: 

395 result.success = True 

396 

397 # Extract meaningful error info for feedback 

398 if not result.success and result.stderr: 

399 lines = result.stderr.strip().split("\n") 

400 # Get last 3 lines (most relevant error info) 

401 result.exception = "\n".join(lines[-5:]) if len(lines) > 5 else result.stderr 

402 

403 return result 

404 

405 

406def time_module() -> float: 

407 """Get current time in seconds.""" 

408 import time as _time 

409 return _time.time() 

410 

411 

412class CodeFeedbackExtractor: 

413 """Extracts actionable feedback from sandbox failures for retry loops.""" 

414 

415 ERROR_PATTERNS = { 

416 "NameError": "Variable or function not defined. Check spelling and scope.", 

417 "TypeError": "Wrong type passed to function. Check argument types.", 

418 "ValueError": "Invalid value for operation. Check parameter constraints.", 

419 "IndexError": "List index out of range. Check bounds.", 

420 "KeyError": "Dictionary key not found. Check key existence.", 

421 "AttributeError": "Object has no such attribute. Check method/attribute name.", 

422 "ImportError": "Missing import. Add 'import X' or 'from X import Y'.", 

423 "SyntaxError": "Python syntax error. Check indentation, brackets, colons.", 

424 "ZeroDivisionError": "Division by zero. Add guard for zero denominator.", 

425 "RecursionError": "Recursion depth exceeded. Add base case or switch to iteration.", 

426 "TimeoutError": "Code timed out. Check for infinite loops or optimize.", 

427 } 

428 

429 @classmethod 

430 def extract(cls, sandbox_result: SandboxResult) -> list[str]: 

431 """Extract actionable feedback suggestions from sandbox result.""" 

432 suggestions: list[str] = [] 

433 

434 # Check for security issues 

435 if "SECURITY" in sandbox_result.stderr: 

436 suggestions.append("Code violates sandbox security rules; avoid system calls and I/O.") 

437 return suggestions 

438 

439 # Check test failures 

440 for tc in sandbox_result.test_results: 

441 if not tc.get("passed", True): 

442 detail = tc.get("detail", "") 

443 name = tc.get("name", "unknown") 

444 suggestions.append(f"Test '{name}' failed: {detail}") 

445 

446 # Check error patterns 

447 for error_type, suggestion in cls.ERROR_PATTERNS.items(): 

448 if error_type in sandbox_result.stderr or error_type in sandbox_result.exception: 

449 if suggestion not in suggestions: 

450 suggestions.append(suggestion) 

451 

452 # Add specific output mismatch advice 

453 if not sandbox_result.success and not suggestions: 

454 suggestions.append("Code failed to execute. Check logic and edge cases.") 

455 if sandbox_result.stderr: 

456 suggestions.append(f"Error: {sandbox_result.stderr.strip()[:200]}") 

457 

458 return suggestions