Coverage for src / mysingle / dsl / validator.py: 0%
56 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-02 00:58 +0900
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-02 00:58 +0900
1"""DSL 코드 정적 분석 및 보안 검증"""
3import ast
5from mysingle.dsl.errors import SecurityViolation
8class SecurityValidator:
9 """
10 DSL 코드 보안 검증기
12 AST 기반 정적 분석으로 금지된 연산 탐지
13 """
15 # 금지된 import 모듈
16 FORBIDDEN_IMPORTS = {
17 # 파일 I/O
18 "os",
19 "sys",
20 "io",
21 "pathlib",
22 "shutil",
23 "tempfile",
24 # 네트워크
25 "socket",
26 "urllib",
27 "urllib3",
28 "requests",
29 "httpx",
30 "aiohttp",
31 "websocket",
32 # 시스템
33 "subprocess",
34 "multiprocessing",
35 "threading",
36 "signal",
37 "resource",
38 # 동적 실행
39 "pickle",
40 "marshal",
41 "shelve",
42 "importlib",
43 "__builtin__",
44 "builtins",
45 # 기타 위험
46 "ctypes",
47 "gc",
48 "inspect",
49 "code",
50 "codeop",
51 }
53 # 금지된 builtin 함수
54 FORBIDDEN_BUILTINS = {
55 "open",
56 "input",
57 "print", # 콘솔 출력 금지 (로깅은 별도 제공)
58 "eval",
59 "exec",
60 "compile",
61 "__import__",
62 "globals",
63 "locals",
64 "vars",
65 "dir",
66 "delattr",
67 "setattr",
68 "help",
69 "breakpoint",
70 "exit",
71 "quit",
72 }
74 # 금지된 속성 접근
75 FORBIDDEN_ATTRIBUTES = {
76 "__class__",
77 "__bases__",
78 "__subclasses__",
79 "__globals__",
80 "__code__",
81 "__closure__",
82 "__dict__",
83 "__module__",
84 }
86 def __init__(self):
87 """SecurityValidator 초기화"""
88 pass
90 def analyze(self, code: str) -> list[SecurityViolation]:
91 """
92 코드 정적 분석 수행
94 Args:
95 code: DSL 소스 코드
97 Returns:
98 list[SecurityViolation]: 발견된 보안 위반 사항 목록
99 """
100 violations: list[SecurityViolation] = []
102 try:
103 tree = ast.parse(code)
104 except SyntaxError as e:
105 violations.append(
106 SecurityViolation(
107 level="ERROR", message=f"Syntax error: {e}", line=e.lineno
108 )
109 )
110 return violations
112 # AST 순회하며 검사
113 for node in ast.walk(tree):
114 # Import 검사
115 if isinstance(node, ast.Import):
116 violations.extend(self._check_import(node))
118 elif isinstance(node, ast.ImportFrom):
119 violations.extend(self._check_import_from(node))
121 # 함수 호출 검사
122 elif isinstance(node, ast.Call):
123 violations.extend(self._check_call(node))
125 # 속성 접근 검사
126 elif isinstance(node, ast.Attribute):
127 violations.extend(self._check_attribute(node))
129 # 클래스 정의 검사 (Phase 1에서는 금지)
130 elif isinstance(node, ast.ClassDef):
131 violations.append(
132 SecurityViolation(
133 level="ERROR",
134 message="Class definition is not allowed in Phase 1",
135 line=node.lineno,
136 )
137 )
139 # async/await 검사 (금지)
140 elif isinstance(node, (ast.AsyncFunctionDef, ast.Await)):
141 violations.append(
142 SecurityViolation(
143 level="ERROR",
144 message="Async/await is not allowed",
145 line=node.lineno,
146 )
147 )
149 return violations
151 def _check_import(self, node: ast.Import) -> list[SecurityViolation]:
152 """Import 문 검사"""
153 violations = []
155 for alias in node.names:
156 if alias.name in self.FORBIDDEN_IMPORTS:
157 violations.append(
158 SecurityViolation(
159 level="ERROR",
160 message=f"Forbidden import: {alias.name}",
161 line=node.lineno,
162 )
163 )
165 return violations
167 def _check_import_from(self, node: ast.ImportFrom) -> list[SecurityViolation]:
168 """ImportFrom 문 검사"""
169 violations = []
171 if node.module and node.module in self.FORBIDDEN_IMPORTS:
172 violations.append(
173 SecurityViolation(
174 level="ERROR",
175 message=f"Forbidden import: {node.module}",
176 line=node.lineno,
177 )
178 )
180 return violations
182 def _check_call(self, node: ast.Call) -> list[SecurityViolation]:
183 """함수 호출 검사"""
184 violations = []
186 # builtin 함수 호출 검사
187 if isinstance(node.func, ast.Name):
188 func_name = node.func.id
189 if func_name in self.FORBIDDEN_BUILTINS:
190 violations.append(
191 SecurityViolation(
192 level="ERROR",
193 message=f"Forbidden builtin function: {func_name}",
194 line=node.lineno,
195 )
196 )
198 return violations
200 def _check_attribute(self, node: ast.Attribute) -> list[SecurityViolation]:
201 """속성 접근 검사"""
202 violations = []
204 if node.attr in self.FORBIDDEN_ATTRIBUTES:
205 violations.append(
206 SecurityViolation(
207 level="WARNING",
208 message=f"Suspicious attribute access: {node.attr}",
209 line=node.lineno,
210 )
211 )
213 return violations
215 def validate(self, code: str) -> tuple[bool, list[SecurityViolation]]:
216 """
217 코드 검증 및 결과 반환
219 Args:
220 code: DSL 소스 코드
222 Returns:
223 tuple[bool, list[SecurityViolation]]:
224 - bool: 검증 통과 여부 (ERROR가 없으면 True)
225 - list: 위반 사항 목록
226 """
227 violations = self.analyze(code)
229 # ERROR 레벨 위반이 있으면 실패
230 has_errors = any(v.level == "ERROR" for v in violations)
232 return not has_errors, violations