Coverage for src / mysingle / dsl / parser.py: 0%
47 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-02 00:58 +0900
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-02 00:58 +0900
1"""RestrictedPython 기반 DSL 파서"""
3import hashlib
4import marshal
5from types import CodeType
6from typing import Any
8from RestrictedPython import compile_restricted
9from RestrictedPython.Guards import (
10 guarded_iter_unpack_sequence,
11 safe_builtins,
12 safer_getattr,
13)
15from mysingle.dsl.errors import DSLCompilationError
18class DSLParser:
19 """
20 RestrictedPython 기반 DSL 파서
22 사용자 DSL 코드를 안전하게 컴파일하고 실행 가능한 바이트코드로 변환
23 """
25 # 허용된 builtin 함수 (보안 검증됨)
26 ALLOWED_BUILTINS = {
27 # 수학 함수
28 "abs",
29 "all",
30 "any",
31 "enumerate",
32 "filter",
33 "len",
34 "list",
35 "map",
36 "max",
37 "min",
38 "range",
39 "round",
40 "sorted",
41 "sum",
42 "zip",
43 # 타입 변환
44 "bool",
45 "dict",
46 "float",
47 "int",
48 "str",
49 "tuple",
50 # 예외
51 "ValueError",
52 "TypeError",
53 "IndexError",
54 "KeyError",
55 "AttributeError",
56 # 기타
57 "isinstance",
58 "hasattr",
59 "getattr",
60 }
62 def __init__(self):
63 """DSL 파서 초기화"""
64 self._safe_builtins = self._get_safe_builtins()
66 def parse(self, code: str, filename: str = "<indicator>") -> bytes:
67 """
68 DSL 코드를 바이트코드로 컴파일
70 Args:
71 code: DSL 소스 코드
72 filename: 파일명 (에러 메시지용)
74 Returns:
75 bytes: 컴파일된 바이트코드 (marshal로 직렬화됨)
77 Raises:
78 DSLCompilationError: 컴파일 실패 시
79 """
80 try:
81 # RestrictedPython으로 컴파일
82 result = compile_restricted(code, filename=filename, mode="exec")
84 # compile_restricted는 CompileResult 또는 code object를 반환할 수 있음
85 # CompileResult인 경우 errors/warnings 속성 확인
86 if hasattr(result, "errors") and result.errors:
87 error_msg = "\n".join(result.errors)
88 raise DSLCompilationError(
89 f"Compilation failed with {len(result.errors)} error(s):\n{error_msg}"
90 )
92 if hasattr(result, "warnings") and result.warnings:
93 # 경고는 로깅만 (컴파일 계속)
94 import logging
96 logger = logging.getLogger(__name__)
97 for warning in result.warnings:
98 logger.warning(f"Compilation warning: {warning}")
100 # code object 추출
101 code_object = result.code if hasattr(result, "code") else result
103 if code_object is None:
104 raise DSLCompilationError("Compilation produced no code object")
106 # marshal을 사용하여 바이트코드로 직렬화
107 return marshal.dumps(code_object)
109 except SyntaxError as e:
110 raise DSLCompilationError(f"Syntax error: {e}") from e
111 except Exception as e:
112 raise DSLCompilationError(f"Unexpected compilation error: {e}") from e
114 def load(self, bytecode: bytes) -> CodeType:
115 """
116 직렬화된 바이트코드를 code object로 로드
118 Args:
119 bytecode: marshal로 직렬화된 바이트코드
121 Returns:
122 CodeType: 로드된 code object
124 Raises:
125 DSLCompilationError: 로드 실패 시
126 """
127 try:
128 return marshal.loads(bytecode)
129 except Exception as e:
130 raise DSLCompilationError(f"Failed to load bytecode: {e}") from e
132 def get_code_hash(self, code: str) -> str:
133 """
134 코드 해시 생성 (캐싱용)
136 Args:
137 code: DSL 소스 코드
139 Returns:
140 str: SHA-256 해시 (hex)
141 """
142 return hashlib.sha256(code.encode("utf-8")).hexdigest()
144 def get_safe_globals(self) -> dict[str, Any]:
145 """
146 안전한 글로벌 네임스페이스 반환
148 Returns:
149 dict: 안전한 글로벌 변수 딕셔너리
150 """
152 # 기본 guard 함수들 정의
153 def _getitem_(obj, index, wrap=True):
154 """안전한 인덱스 접근"""
155 return obj[index]
157 def _getiter_(obj):
158 """안전한 iterator 접근"""
159 return iter(obj)
161 def _write_(obj):
162 """쓰기 guard (모든 쓰기 허용)"""
163 return obj
165 return {
166 "__builtins__": self._safe_builtins,
167 "_iter_unpack_sequence_": guarded_iter_unpack_sequence,
168 "_getattr_": safer_getattr,
169 "_getitem_": _getitem_,
170 "_getiter_": _getiter_,
171 "_write_": _write_,
172 # NumPy, Pandas는 executor에서 주입
173 }
175 def _get_safe_builtins(self) -> dict[str, Any]:
176 """
177 허용된 builtin 함수만 포함한 딕셔너리 생성
179 Returns:
180 dict: 안전한 builtin 함수 딕셔너리
181 """
182 return {
183 name: safe_builtins.get(name) or __builtins__[name]
184 for name in self.ALLOWED_BUILTINS
185 if name in safe_builtins or name in __builtins__
186 }