Coverage for agentos/security/sandbox.py: 48%

52 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2安全沙箱 — Docker隔离 + LLM操作级分析。 

3基因来源: OpenHands + Claude Code 

4 

5v1.2.1: SandboxExecutor 提升为一级导出,提供真正的代码隔离执行。 

6""" 

7 

8from __future__ import annotations 

9 

10from dataclasses import dataclass, field 

11from enum import Enum 

12from typing import Any 

13 

14from agentos.security.sandbox_executor import ( 

15 SandboxMode, 

16 SandboxConfig, 

17 SandboxResult, 

18 SandboxExecutor, 

19 ProcessSandbox, 

20 DockerSandbox, 

21) 

22 

23 

24class RiskLevel(str, Enum): 

25 """Safety risk classification for sandboxed operations.""" 

26 SAFE = "safe" 

27 MODERATE = "moderate" 

28 DANGEROUS = "dangerous" 

29 

30 

31@dataclass 

32class SafetyReport: 

33 """Result of a safety analysis for a sandboxed operation. 

34 

35 Attributes: 

36 risk: Classified risk level. 

37 reason: Explanation of the risk assessment. 

38 """ 

39 risk: RiskLevel 

40 reason: str 

41 

42 

43class Sandbox: 

44 """ 

45 安全沙箱 — 每个Agent会话对应一个隔离环境。 

46 生产环境使用Docker容器,当前为本地简化实现。 

47 """ 

48 

49 def __init__(self, session_id: str, workspace: str = "/workspace"): 

50 self.session_id = session_id 

51 self.workspace = workspace 

52 self.allowed_paths: list[str] = [workspace] 

53 self.blocked_commands: list[str] = [ 

54 "rm -rf /", "mkfs", "dd if=", "> /dev/sda", 

55 "shutdown", "reboot", "kill -9 1", 

56 ] 

57 

58 def is_allowed(self, tool_name: str, arguments: dict) -> bool: 

59 """检查工具调用是否被允许。""" 

60 # 检查路径是否在允许范围内 

61 path = arguments.get("file_path") or arguments.get("path") or "" 

62 if path and not any( 

63 path.startswith(allowed) for allowed in self.allowed_paths 

64 ): 

65 if not path.startswith("/tmp/"): 

66 return False 

67 

68 # 检查命令黑名单 

69 command = arguments.get("command") or arguments.get("code") or "" 

70 for blocked in self.blocked_commands: 

71 if blocked in command: 

72 return False 

73 

74 return True 

75 

76 async def execute_code(self, code: str, language: str = "python") -> Any: 

77 """在沙箱中执行代码。当前为简化实现。""" 

78 # 生产环境: docker exec 到容器内执行 

79 # 当前: subprocess(带安全检查) 

80 return None 

81 

82 

83class SandboxManager: 

84 """沙箱管理器 — 创建和销毁沙箱。""" 

85 

86 def __init__(self): 

87 self._sandboxes: dict[str, Sandbox] = {} 

88 

89 def get_sandbox(self, session_id: str) -> Sandbox: 

90 if session_id not in self._sandboxes: 

91 self._sandboxes[session_id] = Sandbox(session_id=session_id) 

92 return self._sandboxes[session_id] 

93 

94 def destroy(self, session_id: str): 

95 self._sandboxes.pop(session_id, None) 

96 

97 

98class LLMSafetyAnalyzer: 

99 """ 

100 操作级LLM安全分析 — 执行前用轻量模型评估风险。 

101 生产环境: 调用轻量模型分析每次代码执行的安全性。 

102 当前为规则匹配简化实现。 

103 """ 

104 

105 DANGEROUS_PATTERNS = [ 

106 "rm -rf /", "os.system", "subprocess", "eval(", "exec(", 

107 "shutil.rmtree('/'", "os.remove('/", 

108 ] 

109 

110 MODERATE_PATTERNS = [ 

111 "rm ", "os.remove", "os.unlink", "shutil.rmtree", 

112 "write(", "open(", "mkdir(", 

113 ] 

114 

115 async def analyze(self, code: str) -> SafetyReport: 

116 code_lower = code.lower() 

117 

118 for pattern in self.DANGEROUS_PATTERNS: 

119 if pattern.lower() in code_lower: 

120 return SafetyReport( 

121 risk=RiskLevel.DANGEROUS, 

122 reason=f"Blocked dangerous operation: {pattern}", 

123 ) 

124 

125 for pattern in self.MODERATE_PATTERNS: 

126 if pattern.lower() in code_lower: 

127 return SafetyReport( 

128 risk=RiskLevel.MODERATE, 

129 reason=f"Requires confirmation: write/delete operation detected", 

130 ) 

131 

132 return SafetyReport(risk=RiskLevel.SAFE, reason="No risky operations detected")