Coverage for agentos/security/sandbox.py: 48%
52 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2安全沙箱 — Docker隔离 + LLM操作级分析。
3基因来源: OpenHands + Claude Code
5v1.2.1: SandboxExecutor 提升为一级导出,提供真正的代码隔离执行。
6"""
8from __future__ import annotations
10from dataclasses import dataclass, field
11from enum import Enum
12from typing import Any
14from agentos.security.sandbox_executor import (
15 SandboxMode,
16 SandboxConfig,
17 SandboxResult,
18 SandboxExecutor,
19 ProcessSandbox,
20 DockerSandbox,
21)
24class RiskLevel(str, Enum):
25 """Safety risk classification for sandboxed operations."""
26 SAFE = "safe"
27 MODERATE = "moderate"
28 DANGEROUS = "dangerous"
31@dataclass
32class SafetyReport:
33 """Result of a safety analysis for a sandboxed operation.
35 Attributes:
36 risk: Classified risk level.
37 reason: Explanation of the risk assessment.
38 """
39 risk: RiskLevel
40 reason: str
43class Sandbox:
44 """
45 安全沙箱 — 每个Agent会话对应一个隔离环境。
46 生产环境使用Docker容器,当前为本地简化实现。
47 """
49 def __init__(self, session_id: str, workspace: str = "/workspace"):
50 self.session_id = session_id
51 self.workspace = workspace
52 self.allowed_paths: list[str] = [workspace]
53 self.blocked_commands: list[str] = [
54 "rm -rf /", "mkfs", "dd if=", "> /dev/sda",
55 "shutdown", "reboot", "kill -9 1",
56 ]
58 def is_allowed(self, tool_name: str, arguments: dict) -> bool:
59 """检查工具调用是否被允许。"""
60 # 检查路径是否在允许范围内
61 path = arguments.get("file_path") or arguments.get("path") or ""
62 if path and not any(
63 path.startswith(allowed) for allowed in self.allowed_paths
64 ):
65 if not path.startswith("/tmp/"):
66 return False
68 # 检查命令黑名单
69 command = arguments.get("command") or arguments.get("code") or ""
70 for blocked in self.blocked_commands:
71 if blocked in command:
72 return False
74 return True
76 async def execute_code(self, code: str, language: str = "python") -> Any:
77 """在沙箱中执行代码。当前为简化实现。"""
78 # 生产环境: docker exec 到容器内执行
79 # 当前: subprocess(带安全检查)
80 return None
83class SandboxManager:
84 """沙箱管理器 — 创建和销毁沙箱。"""
86 def __init__(self):
87 self._sandboxes: dict[str, Sandbox] = {}
89 def get_sandbox(self, session_id: str) -> Sandbox:
90 if session_id not in self._sandboxes:
91 self._sandboxes[session_id] = Sandbox(session_id=session_id)
92 return self._sandboxes[session_id]
94 def destroy(self, session_id: str):
95 self._sandboxes.pop(session_id, None)
98class LLMSafetyAnalyzer:
99 """
100 操作级LLM安全分析 — 执行前用轻量模型评估风险。
101 生产环境: 调用轻量模型分析每次代码执行的安全性。
102 当前为规则匹配简化实现。
103 """
105 DANGEROUS_PATTERNS = [
106 "rm -rf /", "os.system", "subprocess", "eval(", "exec(",
107 "shutil.rmtree('/'", "os.remove('/",
108 ]
110 MODERATE_PATTERNS = [
111 "rm ", "os.remove", "os.unlink", "shutil.rmtree",
112 "write(", "open(", "mkdir(",
113 ]
115 async def analyze(self, code: str) -> SafetyReport:
116 code_lower = code.lower()
118 for pattern in self.DANGEROUS_PATTERNS:
119 if pattern.lower() in code_lower:
120 return SafetyReport(
121 risk=RiskLevel.DANGEROUS,
122 reason=f"Blocked dangerous operation: {pattern}",
123 )
125 for pattern in self.MODERATE_PATTERNS:
126 if pattern.lower() in code_lower:
127 return SafetyReport(
128 risk=RiskLevel.MODERATE,
129 reason=f"Requires confirmation: write/delete operation detected",
130 )
132 return SafetyReport(risk=RiskLevel.SAFE, reason="No risky operations detected")