Coverage for src / mysingle / dsl / validator.py: 0%

56 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-02 00:58 +0900

1"""DSL 코드 정적 분석 및 보안 검증""" 

2 

3import ast 

4 

5from mysingle.dsl.errors import SecurityViolation 

6 

7 

8class SecurityValidator: 

9 """ 

10 DSL 코드 보안 검증기 

11 

12 AST 기반 정적 분석으로 금지된 연산 탐지 

13 """ 

14 

15 # 금지된 import 모듈 

16 FORBIDDEN_IMPORTS = { 

17 # 파일 I/O 

18 "os", 

19 "sys", 

20 "io", 

21 "pathlib", 

22 "shutil", 

23 "tempfile", 

24 # 네트워크 

25 "socket", 

26 "urllib", 

27 "urllib3", 

28 "requests", 

29 "httpx", 

30 "aiohttp", 

31 "websocket", 

32 # 시스템 

33 "subprocess", 

34 "multiprocessing", 

35 "threading", 

36 "signal", 

37 "resource", 

38 # 동적 실행 

39 "pickle", 

40 "marshal", 

41 "shelve", 

42 "importlib", 

43 "__builtin__", 

44 "builtins", 

45 # 기타 위험 

46 "ctypes", 

47 "gc", 

48 "inspect", 

49 "code", 

50 "codeop", 

51 } 

52 

53 # 금지된 builtin 함수 

54 FORBIDDEN_BUILTINS = { 

55 "open", 

56 "input", 

57 "print", # 콘솔 출력 금지 (로깅은 별도 제공) 

58 "eval", 

59 "exec", 

60 "compile", 

61 "__import__", 

62 "globals", 

63 "locals", 

64 "vars", 

65 "dir", 

66 "delattr", 

67 "setattr", 

68 "help", 

69 "breakpoint", 

70 "exit", 

71 "quit", 

72 } 

73 

74 # 금지된 속성 접근 

75 FORBIDDEN_ATTRIBUTES = { 

76 "__class__", 

77 "__bases__", 

78 "__subclasses__", 

79 "__globals__", 

80 "__code__", 

81 "__closure__", 

82 "__dict__", 

83 "__module__", 

84 } 

85 

86 def __init__(self): 

87 """SecurityValidator 초기화""" 

88 pass 

89 

90 def analyze(self, code: str) -> list[SecurityViolation]: 

91 """ 

92 코드 정적 분석 수행 

93 

94 Args: 

95 code: DSL 소스 코드 

96 

97 Returns: 

98 list[SecurityViolation]: 발견된 보안 위반 사항 목록 

99 """ 

100 violations: list[SecurityViolation] = [] 

101 

102 try: 

103 tree = ast.parse(code) 

104 except SyntaxError as e: 

105 violations.append( 

106 SecurityViolation( 

107 level="ERROR", message=f"Syntax error: {e}", line=e.lineno 

108 ) 

109 ) 

110 return violations 

111 

112 # AST 순회하며 검사 

113 for node in ast.walk(tree): 

114 # Import 검사 

115 if isinstance(node, ast.Import): 

116 violations.extend(self._check_import(node)) 

117 

118 elif isinstance(node, ast.ImportFrom): 

119 violations.extend(self._check_import_from(node)) 

120 

121 # 함수 호출 검사 

122 elif isinstance(node, ast.Call): 

123 violations.extend(self._check_call(node)) 

124 

125 # 속성 접근 검사 

126 elif isinstance(node, ast.Attribute): 

127 violations.extend(self._check_attribute(node)) 

128 

129 # 클래스 정의 검사 (Phase 1에서는 금지) 

130 elif isinstance(node, ast.ClassDef): 

131 violations.append( 

132 SecurityViolation( 

133 level="ERROR", 

134 message="Class definition is not allowed in Phase 1", 

135 line=node.lineno, 

136 ) 

137 ) 

138 

139 # async/await 검사 (금지) 

140 elif isinstance(node, (ast.AsyncFunctionDef, ast.Await)): 

141 violations.append( 

142 SecurityViolation( 

143 level="ERROR", 

144 message="Async/await is not allowed", 

145 line=node.lineno, 

146 ) 

147 ) 

148 

149 return violations 

150 

151 def _check_import(self, node: ast.Import) -> list[SecurityViolation]: 

152 """Import 문 검사""" 

153 violations = [] 

154 

155 for alias in node.names: 

156 if alias.name in self.FORBIDDEN_IMPORTS: 

157 violations.append( 

158 SecurityViolation( 

159 level="ERROR", 

160 message=f"Forbidden import: {alias.name}", 

161 line=node.lineno, 

162 ) 

163 ) 

164 

165 return violations 

166 

167 def _check_import_from(self, node: ast.ImportFrom) -> list[SecurityViolation]: 

168 """ImportFrom 문 검사""" 

169 violations = [] 

170 

171 if node.module and node.module in self.FORBIDDEN_IMPORTS: 

172 violations.append( 

173 SecurityViolation( 

174 level="ERROR", 

175 message=f"Forbidden import: {node.module}", 

176 line=node.lineno, 

177 ) 

178 ) 

179 

180 return violations 

181 

182 def _check_call(self, node: ast.Call) -> list[SecurityViolation]: 

183 """함수 호출 검사""" 

184 violations = [] 

185 

186 # builtin 함수 호출 검사 

187 if isinstance(node.func, ast.Name): 

188 func_name = node.func.id 

189 if func_name in self.FORBIDDEN_BUILTINS: 

190 violations.append( 

191 SecurityViolation( 

192 level="ERROR", 

193 message=f"Forbidden builtin function: {func_name}", 

194 line=node.lineno, 

195 ) 

196 ) 

197 

198 return violations 

199 

200 def _check_attribute(self, node: ast.Attribute) -> list[SecurityViolation]: 

201 """속성 접근 검사""" 

202 violations = [] 

203 

204 if node.attr in self.FORBIDDEN_ATTRIBUTES: 

205 violations.append( 

206 SecurityViolation( 

207 level="WARNING", 

208 message=f"Suspicious attribute access: {node.attr}", 

209 line=node.lineno, 

210 ) 

211 ) 

212 

213 return violations 

214 

215 def validate(self, code: str) -> tuple[bool, list[SecurityViolation]]: 

216 """ 

217 코드 검증 및 결과 반환 

218 

219 Args: 

220 code: DSL 소스 코드 

221 

222 Returns: 

223 tuple[bool, list[SecurityViolation]]: 

224 - bool: 검증 통과 여부 (ERROR가 없으면 True) 

225 - list: 위반 사항 목록 

226 """ 

227 violations = self.analyze(code) 

228 

229 # ERROR 레벨 위반이 있으면 실패 

230 has_errors = any(v.level == "ERROR" for v in violations) 

231 

232 return not has_errors, violations