Coverage for src / mysingle / dsl / parser.py: 0%

47 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-02 00:58 +0900

1"""RestrictedPython 기반 DSL 파서""" 

2 

3import hashlib 

4import marshal 

5from types import CodeType 

6from typing import Any 

7 

8from RestrictedPython import compile_restricted 

9from RestrictedPython.Guards import ( 

10 guarded_iter_unpack_sequence, 

11 safe_builtins, 

12 safer_getattr, 

13) 

14 

15from mysingle.dsl.errors import DSLCompilationError 

16 

17 

18class DSLParser: 

19 """ 

20 RestrictedPython 기반 DSL 파서 

21 

22 사용자 DSL 코드를 안전하게 컴파일하고 실행 가능한 바이트코드로 변환 

23 """ 

24 

25 # 허용된 builtin 함수 (보안 검증됨) 

26 ALLOWED_BUILTINS = { 

27 # 수학 함수 

28 "abs", 

29 "all", 

30 "any", 

31 "enumerate", 

32 "filter", 

33 "len", 

34 "list", 

35 "map", 

36 "max", 

37 "min", 

38 "range", 

39 "round", 

40 "sorted", 

41 "sum", 

42 "zip", 

43 # 타입 변환 

44 "bool", 

45 "dict", 

46 "float", 

47 "int", 

48 "str", 

49 "tuple", 

50 # 예외 

51 "ValueError", 

52 "TypeError", 

53 "IndexError", 

54 "KeyError", 

55 "AttributeError", 

56 # 기타 

57 "isinstance", 

58 "hasattr", 

59 "getattr", 

60 } 

61 

62 def __init__(self): 

63 """DSL 파서 초기화""" 

64 self._safe_builtins = self._get_safe_builtins() 

65 

66 def parse(self, code: str, filename: str = "<indicator>") -> bytes: 

67 """ 

68 DSL 코드를 바이트코드로 컴파일 

69 

70 Args: 

71 code: DSL 소스 코드 

72 filename: 파일명 (에러 메시지용) 

73 

74 Returns: 

75 bytes: 컴파일된 바이트코드 (marshal로 직렬화됨) 

76 

77 Raises: 

78 DSLCompilationError: 컴파일 실패 시 

79 """ 

80 try: 

81 # RestrictedPython으로 컴파일 

82 result = compile_restricted(code, filename=filename, mode="exec") 

83 

84 # compile_restricted는 CompileResult 또는 code object를 반환할 수 있음 

85 # CompileResult인 경우 errors/warnings 속성 확인 

86 if hasattr(result, "errors") and result.errors: 

87 error_msg = "\n".join(result.errors) 

88 raise DSLCompilationError( 

89 f"Compilation failed with {len(result.errors)} error(s):\n{error_msg}" 

90 ) 

91 

92 if hasattr(result, "warnings") and result.warnings: 

93 # 경고는 로깅만 (컴파일 계속) 

94 import logging 

95 

96 logger = logging.getLogger(__name__) 

97 for warning in result.warnings: 

98 logger.warning(f"Compilation warning: {warning}") 

99 

100 # code object 추출 

101 code_object = result.code if hasattr(result, "code") else result 

102 

103 if code_object is None: 

104 raise DSLCompilationError("Compilation produced no code object") 

105 

106 # marshal을 사용하여 바이트코드로 직렬화 

107 return marshal.dumps(code_object) 

108 

109 except SyntaxError as e: 

110 raise DSLCompilationError(f"Syntax error: {e}") from e 

111 except Exception as e: 

112 raise DSLCompilationError(f"Unexpected compilation error: {e}") from e 

113 

114 def load(self, bytecode: bytes) -> CodeType: 

115 """ 

116 직렬화된 바이트코드를 code object로 로드 

117 

118 Args: 

119 bytecode: marshal로 직렬화된 바이트코드 

120 

121 Returns: 

122 CodeType: 로드된 code object 

123 

124 Raises: 

125 DSLCompilationError: 로드 실패 시 

126 """ 

127 try: 

128 return marshal.loads(bytecode) 

129 except Exception as e: 

130 raise DSLCompilationError(f"Failed to load bytecode: {e}") from e 

131 

132 def get_code_hash(self, code: str) -> str: 

133 """ 

134 코드 해시 생성 (캐싱용) 

135 

136 Args: 

137 code: DSL 소스 코드 

138 

139 Returns: 

140 str: SHA-256 해시 (hex) 

141 """ 

142 return hashlib.sha256(code.encode("utf-8")).hexdigest() 

143 

144 def get_safe_globals(self) -> dict[str, Any]: 

145 """ 

146 안전한 글로벌 네임스페이스 반환 

147 

148 Returns: 

149 dict: 안전한 글로벌 변수 딕셔너리 

150 """ 

151 

152 # 기본 guard 함수들 정의 

153 def _getitem_(obj, index, wrap=True): 

154 """안전한 인덱스 접근""" 

155 return obj[index] 

156 

157 def _getiter_(obj): 

158 """안전한 iterator 접근""" 

159 return iter(obj) 

160 

161 def _write_(obj): 

162 """쓰기 guard (모든 쓰기 허용)""" 

163 return obj 

164 

165 return { 

166 "__builtins__": self._safe_builtins, 

167 "_iter_unpack_sequence_": guarded_iter_unpack_sequence, 

168 "_getattr_": safer_getattr, 

169 "_getitem_": _getitem_, 

170 "_getiter_": _getiter_, 

171 "_write_": _write_, 

172 # NumPy, Pandas는 executor에서 주입 

173 } 

174 

175 def _get_safe_builtins(self) -> dict[str, Any]: 

176 """ 

177 허용된 builtin 함수만 포함한 딕셔너리 생성 

178 

179 Returns: 

180 dict: 안전한 builtin 함수 딕셔너리 

181 """ 

182 return { 

183 name: safe_builtins.get(name) or __builtins__[name] 

184 for name in self.ALLOWED_BUILTINS 

185 if name in safe_builtins or name in __builtins__ 

186 }