Coverage for src / tracekit / dsl / interpreter.py: 100%

129 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""TraceKit DSL Interpreter. 

2 

3Executes parsed DSL programs. 

4""" 

5 

6from pathlib import Path 

7from typing import Any 

8 

9from tracekit.dsl.parser import ( 

10 Assignment, 

11 Command, 

12 Expression, 

13 ForLoop, 

14 FunctionCall, 

15 Literal, 

16 Pipeline, 

17 Statement, 

18 Variable, 

19 parse_dsl, 

20) 

21 

22 

23class InterpreterError(Exception): 

24 """DSL interpreter error.""" 

25 

26 pass 

27 

28 

29class Interpreter: 

30 """DSL interpreter for TraceKit commands. 

31 

32 Executes parsed AST with Python implementations of DSL commands. 

33 """ 

34 

35 def __init__(self) -> None: 

36 """Initialize interpreter with empty environment.""" 

37 self.variables: dict[str, Any] = {} 

38 self.commands: dict[str, Any] = {} 

39 self._register_builtin_commands() 

40 

41 def _register_builtin_commands(self) -> None: 

42 """Register built-in DSL commands.""" 

43 # Import tracekit functions lazily to avoid circular imports 

44 self.commands["load"] = self._cmd_load 

45 self.commands["filter"] = self._cmd_filter 

46 self.commands["measure"] = self._cmd_measure 

47 self.commands["plot"] = self._cmd_plot 

48 self.commands["export"] = self._cmd_export 

49 self.commands["glob"] = self._cmd_glob 

50 

51 def _cmd_load(self, *args: Any) -> Any: 

52 """Load command: load "filename".""" 

53 if len(args) != 1: 

54 raise InterpreterError("load requires exactly 1 argument: filename") 

55 

56 filename = args[0] 

57 if not isinstance(filename, str): 

58 raise InterpreterError("load filename must be a string") 

59 

60 # Lazy import to avoid circular dependency 

61 try: 

62 from tracekit import loaders 

63 

64 # Try to determine loader from extension 

65 path = Path(filename) 

66 if not path.exists(): 

67 raise InterpreterError(f"File not found: {filename}") 

68 

69 # This would call appropriate loader based on extension 

70 # For now, placeholder implementation 

71 raise InterpreterError( 

72 "Load command not yet fully implemented - requires loader integration" 

73 ) 

74 

75 except ImportError: 

76 raise InterpreterError("tracekit.loaders not available") # noqa: B904 

77 

78 def _cmd_filter(self, trace: Any, *args: Any) -> Any: 

79 """Filter command: filter lowpass 1000.""" 

80 if len(args) < 1: 

81 raise InterpreterError("filter requires filter type argument") 

82 

83 filter_type = args[0] 

84 if not isinstance(filter_type, str): 

85 raise InterpreterError("filter type must be a string") 

86 

87 # Placeholder for actual filter implementation 

88 raise InterpreterError( 

89 "Filter command not yet fully implemented - requires filter integration" 

90 ) 

91 

92 def _cmd_measure(self, trace: Any, *args: Any) -> Any: 

93 """Measure command: measure rise_time.""" 

94 if len(args) < 1: 

95 raise InterpreterError("measure requires measurement name") 

96 

97 measurement = args[0] 

98 if not isinstance(measurement, str): 

99 raise InterpreterError("measurement name must be a string") 

100 

101 # Placeholder for actual measurement implementation 

102 raise InterpreterError( 

103 "Measure command not yet fully implemented - requires measurement integration" 

104 ) 

105 

106 def _cmd_plot(self, trace: Any, *args: Any) -> Any: 

107 """Plot command: plot.""" 

108 # Placeholder for actual plot implementation 

109 raise InterpreterError( 

110 "Plot command not yet fully implemented - requires visualization integration" 

111 ) 

112 

113 def _cmd_export(self, data: Any, *args: Any) -> Any: 

114 """Export command: export json.""" 

115 if len(args) < 1: 

116 raise InterpreterError("export requires format argument") 

117 

118 format_type = args[0] 

119 if not isinstance(format_type, str): 

120 raise InterpreterError("export format must be a string") 

121 

122 # Placeholder for actual export implementation 

123 raise InterpreterError( 

124 "Export command not yet fully implemented - requires export integration" 

125 ) 

126 

127 def _cmd_glob(self, pattern: str) -> list[str]: 

128 """Glob command: glob("*.csv").""" 

129 if not isinstance(pattern, str): 

130 raise InterpreterError("glob pattern must be a string") 

131 

132 from glob import glob as glob_func 

133 

134 return list(glob_func(pattern)) # noqa: PTH207 

135 

136 def eval_expression(self, expr: Expression) -> Any: 

137 """Evaluate an expression. 

138 

139 Args: 

140 expr: Expression AST node 

141 

142 Returns: 

143 Evaluated result 

144 

145 Raises: 

146 InterpreterError: On evaluation errors 

147 """ 

148 # Literal value 

149 if isinstance(expr, Literal): 

150 return expr.value 

151 

152 # Variable reference 

153 if isinstance(expr, Variable): 

154 if expr.name not in self.variables: 

155 raise InterpreterError(f"Undefined variable: {expr.name} at line {expr.line}") 

156 return self.variables[expr.name] 

157 

158 # Function call 

159 if isinstance(expr, FunctionCall): 

160 return self.eval_function_call(expr) 

161 

162 # Command 

163 if isinstance(expr, Command): 

164 return self.eval_command(expr, None) 

165 

166 # Pipeline 

167 if isinstance(expr, Pipeline): 

168 return self.eval_pipeline(expr) 

169 

170 raise InterpreterError(f"Unknown expression type: {type(expr).__name__}") 

171 

172 def eval_function_call(self, func: FunctionCall) -> Any: 

173 """Evaluate function call.""" 

174 if func.name not in self.commands: 

175 raise InterpreterError(f"Unknown function: {func.name} at line {func.line}") 

176 

177 # Evaluate arguments 

178 args = [self.eval_expression(arg) for arg in func.args] 

179 

180 # Call command function 

181 return self.commands[func.name](*args) 

182 

183 def eval_command(self, cmd: Command, input_data: Any | None) -> Any: 

184 """Evaluate command with optional piped input. 

185 

186 Args: 

187 cmd: Command AST node 

188 input_data: Input from previous pipeline stage (or None) 

189 

190 Returns: 

191 Command result 

192 

193 Raises: 

194 InterpreterError: If command is unknown 

195 """ 

196 if cmd.name not in self.commands: 

197 raise InterpreterError(f"Unknown command: {cmd.name} at line {cmd.line}") 

198 

199 # Evaluate arguments 

200 args = [self.eval_expression(arg) for arg in cmd.args] 

201 

202 # If there's input data, prepend it as first argument 

203 if input_data is not None: 

204 args = [input_data, *args] 

205 

206 # Call command function 

207 return self.commands[cmd.name](*args) 

208 

209 def eval_pipeline(self, pipeline: Pipeline) -> Any: 

210 """Evaluate pipeline of commands. 

211 

212 Args: 

213 pipeline: Pipeline AST node 

214 

215 Returns: 

216 Final pipeline result 

217 

218 Raises: 

219 InterpreterError: If pipeline stage is invalid 

220 """ 

221 result = None 

222 

223 for i, stage in enumerate(pipeline.stages): 

224 if i == 0: 

225 # First stage - no input 

226 if isinstance(stage, Command): 

227 result = self.eval_command(stage, None) 

228 else: 

229 result = self.eval_expression(stage) 

230 # Subsequent stages - pipe input from previous 

231 elif isinstance(stage, Command): 

232 result = self.eval_command(stage, result) 

233 else: 

234 raise InterpreterError( 

235 f"Pipeline stage {i + 1} must be a command, got {type(stage).__name__}" 

236 ) 

237 

238 return result 

239 

240 def eval_statement(self, stmt: Statement) -> None: 

241 """Execute a statement. 

242 

243 Args: 

244 stmt: Statement AST node 

245 

246 Raises: 

247 InterpreterError: On execution errors 

248 """ 

249 # Assignment 

250 if isinstance(stmt, Assignment): 

251 value = self.eval_expression(stmt.expression) 

252 self.variables[stmt.variable] = value 

253 return 

254 

255 # For loop 

256 if isinstance(stmt, ForLoop): 

257 self.eval_for_loop(stmt) 

258 return 

259 

260 # Expression statement (pipeline) 

261 if isinstance(stmt, Pipeline): 

262 self.eval_pipeline(stmt) 

263 return 

264 

265 # Expression statements (function calls, commands) 

266 # These can appear as statements in for loop bodies 

267 if isinstance(stmt, FunctionCall | Command): # type: ignore[unreachable] 

268 self.eval_expression(stmt) 

269 return 

270 

271 raise InterpreterError(f"Unknown statement type: {type(stmt).__name__}") 

272 

273 def eval_for_loop(self, loop: ForLoop) -> None: 

274 """Execute for loop. 

275 

276 Args: 

277 loop: ForLoop AST node 

278 

279 Raises: 

280 InterpreterError: If iterable is not iterable 

281 """ 

282 # Evaluate iterable 

283 iterable = self.eval_expression(loop.iterable) 

284 

285 if not hasattr(iterable, "__iter__"): 

286 raise InterpreterError( 

287 f"For loop iterable is not iterable: {type(iterable).__name__} at line {loop.line}" 

288 ) 

289 

290 # Execute body for each item 

291 for item in iterable: 

292 # Set loop variable 

293 self.variables[loop.variable] = item 

294 

295 # Execute body statements 

296 for stmt in loop.body: 

297 self.eval_statement(stmt) 

298 

299 def execute(self, statements: list[Statement]) -> None: 

300 """Execute a program (list of statements). 

301 

302 Args: 

303 statements: AST (list of statements) 

304 """ 

305 for stmt in statements: 

306 self.eval_statement(stmt) 

307 

308 def execute_source(self, source: str) -> None: 

309 """Parse and execute DSL source code. 

310 

311 Args: 

312 source: DSL source code 

313 """ 

314 ast = parse_dsl(source) 

315 self.execute(ast) 

316 

317 

318def execute_dsl(source: str, variables: dict[str, Any] | None = None) -> dict[str, Any]: 

319 """Execute TraceKit DSL source code. 

320 

321 Args: 

322 source: DSL source code 

323 variables: Optional initial variables 

324 

325 Returns: 

326 Final variable environment after execution 

327 """ 

328 interpreter = Interpreter() 

329 

330 # Set initial variables 

331 if variables: 

332 interpreter.variables.update(variables) 

333 

334 # Execute 

335 interpreter.execute_source(source) 

336 

337 return interpreter.variables