Coverage for src / tracekit / dsl / interpreter.py: 100%
129 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""TraceKit DSL Interpreter.
3Executes parsed DSL programs.
4"""
6from pathlib import Path
7from typing import Any
9from tracekit.dsl.parser import (
10 Assignment,
11 Command,
12 Expression,
13 ForLoop,
14 FunctionCall,
15 Literal,
16 Pipeline,
17 Statement,
18 Variable,
19 parse_dsl,
20)
23class InterpreterError(Exception):
24 """DSL interpreter error."""
26 pass
29class Interpreter:
30 """DSL interpreter for TraceKit commands.
32 Executes parsed AST with Python implementations of DSL commands.
33 """
35 def __init__(self) -> None:
36 """Initialize interpreter with empty environment."""
37 self.variables: dict[str, Any] = {}
38 self.commands: dict[str, Any] = {}
39 self._register_builtin_commands()
41 def _register_builtin_commands(self) -> None:
42 """Register built-in DSL commands."""
43 # Import tracekit functions lazily to avoid circular imports
44 self.commands["load"] = self._cmd_load
45 self.commands["filter"] = self._cmd_filter
46 self.commands["measure"] = self._cmd_measure
47 self.commands["plot"] = self._cmd_plot
48 self.commands["export"] = self._cmd_export
49 self.commands["glob"] = self._cmd_glob
51 def _cmd_load(self, *args: Any) -> Any:
52 """Load command: load "filename"."""
53 if len(args) != 1:
54 raise InterpreterError("load requires exactly 1 argument: filename")
56 filename = args[0]
57 if not isinstance(filename, str):
58 raise InterpreterError("load filename must be a string")
60 # Lazy import to avoid circular dependency
61 try:
62 from tracekit import loaders
64 # Try to determine loader from extension
65 path = Path(filename)
66 if not path.exists():
67 raise InterpreterError(f"File not found: {filename}")
69 # This would call appropriate loader based on extension
70 # For now, placeholder implementation
71 raise InterpreterError(
72 "Load command not yet fully implemented - requires loader integration"
73 )
75 except ImportError:
76 raise InterpreterError("tracekit.loaders not available") # noqa: B904
78 def _cmd_filter(self, trace: Any, *args: Any) -> Any:
79 """Filter command: filter lowpass 1000."""
80 if len(args) < 1:
81 raise InterpreterError("filter requires filter type argument")
83 filter_type = args[0]
84 if not isinstance(filter_type, str):
85 raise InterpreterError("filter type must be a string")
87 # Placeholder for actual filter implementation
88 raise InterpreterError(
89 "Filter command not yet fully implemented - requires filter integration"
90 )
92 def _cmd_measure(self, trace: Any, *args: Any) -> Any:
93 """Measure command: measure rise_time."""
94 if len(args) < 1:
95 raise InterpreterError("measure requires measurement name")
97 measurement = args[0]
98 if not isinstance(measurement, str):
99 raise InterpreterError("measurement name must be a string")
101 # Placeholder for actual measurement implementation
102 raise InterpreterError(
103 "Measure command not yet fully implemented - requires measurement integration"
104 )
106 def _cmd_plot(self, trace: Any, *args: Any) -> Any:
107 """Plot command: plot."""
108 # Placeholder for actual plot implementation
109 raise InterpreterError(
110 "Plot command not yet fully implemented - requires visualization integration"
111 )
113 def _cmd_export(self, data: Any, *args: Any) -> Any:
114 """Export command: export json."""
115 if len(args) < 1:
116 raise InterpreterError("export requires format argument")
118 format_type = args[0]
119 if not isinstance(format_type, str):
120 raise InterpreterError("export format must be a string")
122 # Placeholder for actual export implementation
123 raise InterpreterError(
124 "Export command not yet fully implemented - requires export integration"
125 )
127 def _cmd_glob(self, pattern: str) -> list[str]:
128 """Glob command: glob("*.csv")."""
129 if not isinstance(pattern, str):
130 raise InterpreterError("glob pattern must be a string")
132 from glob import glob as glob_func
134 return list(glob_func(pattern)) # noqa: PTH207
136 def eval_expression(self, expr: Expression) -> Any:
137 """Evaluate an expression.
139 Args:
140 expr: Expression AST node
142 Returns:
143 Evaluated result
145 Raises:
146 InterpreterError: On evaluation errors
147 """
148 # Literal value
149 if isinstance(expr, Literal):
150 return expr.value
152 # Variable reference
153 if isinstance(expr, Variable):
154 if expr.name not in self.variables:
155 raise InterpreterError(f"Undefined variable: {expr.name} at line {expr.line}")
156 return self.variables[expr.name]
158 # Function call
159 if isinstance(expr, FunctionCall):
160 return self.eval_function_call(expr)
162 # Command
163 if isinstance(expr, Command):
164 return self.eval_command(expr, None)
166 # Pipeline
167 if isinstance(expr, Pipeline):
168 return self.eval_pipeline(expr)
170 raise InterpreterError(f"Unknown expression type: {type(expr).__name__}")
172 def eval_function_call(self, func: FunctionCall) -> Any:
173 """Evaluate function call."""
174 if func.name not in self.commands:
175 raise InterpreterError(f"Unknown function: {func.name} at line {func.line}")
177 # Evaluate arguments
178 args = [self.eval_expression(arg) for arg in func.args]
180 # Call command function
181 return self.commands[func.name](*args)
183 def eval_command(self, cmd: Command, input_data: Any | None) -> Any:
184 """Evaluate command with optional piped input.
186 Args:
187 cmd: Command AST node
188 input_data: Input from previous pipeline stage (or None)
190 Returns:
191 Command result
193 Raises:
194 InterpreterError: If command is unknown
195 """
196 if cmd.name not in self.commands:
197 raise InterpreterError(f"Unknown command: {cmd.name} at line {cmd.line}")
199 # Evaluate arguments
200 args = [self.eval_expression(arg) for arg in cmd.args]
202 # If there's input data, prepend it as first argument
203 if input_data is not None:
204 args = [input_data, *args]
206 # Call command function
207 return self.commands[cmd.name](*args)
209 def eval_pipeline(self, pipeline: Pipeline) -> Any:
210 """Evaluate pipeline of commands.
212 Args:
213 pipeline: Pipeline AST node
215 Returns:
216 Final pipeline result
218 Raises:
219 InterpreterError: If pipeline stage is invalid
220 """
221 result = None
223 for i, stage in enumerate(pipeline.stages):
224 if i == 0:
225 # First stage - no input
226 if isinstance(stage, Command):
227 result = self.eval_command(stage, None)
228 else:
229 result = self.eval_expression(stage)
230 # Subsequent stages - pipe input from previous
231 elif isinstance(stage, Command):
232 result = self.eval_command(stage, result)
233 else:
234 raise InterpreterError(
235 f"Pipeline stage {i + 1} must be a command, got {type(stage).__name__}"
236 )
238 return result
240 def eval_statement(self, stmt: Statement) -> None:
241 """Execute a statement.
243 Args:
244 stmt: Statement AST node
246 Raises:
247 InterpreterError: On execution errors
248 """
249 # Assignment
250 if isinstance(stmt, Assignment):
251 value = self.eval_expression(stmt.expression)
252 self.variables[stmt.variable] = value
253 return
255 # For loop
256 if isinstance(stmt, ForLoop):
257 self.eval_for_loop(stmt)
258 return
260 # Expression statement (pipeline)
261 if isinstance(stmt, Pipeline):
262 self.eval_pipeline(stmt)
263 return
265 # Expression statements (function calls, commands)
266 # These can appear as statements in for loop bodies
267 if isinstance(stmt, FunctionCall | Command): # type: ignore[unreachable]
268 self.eval_expression(stmt)
269 return
271 raise InterpreterError(f"Unknown statement type: {type(stmt).__name__}")
273 def eval_for_loop(self, loop: ForLoop) -> None:
274 """Execute for loop.
276 Args:
277 loop: ForLoop AST node
279 Raises:
280 InterpreterError: If iterable is not iterable
281 """
282 # Evaluate iterable
283 iterable = self.eval_expression(loop.iterable)
285 if not hasattr(iterable, "__iter__"):
286 raise InterpreterError(
287 f"For loop iterable is not iterable: {type(iterable).__name__} at line {loop.line}"
288 )
290 # Execute body for each item
291 for item in iterable:
292 # Set loop variable
293 self.variables[loop.variable] = item
295 # Execute body statements
296 for stmt in loop.body:
297 self.eval_statement(stmt)
299 def execute(self, statements: list[Statement]) -> None:
300 """Execute a program (list of statements).
302 Args:
303 statements: AST (list of statements)
304 """
305 for stmt in statements:
306 self.eval_statement(stmt)
308 def execute_source(self, source: str) -> None:
309 """Parse and execute DSL source code.
311 Args:
312 source: DSL source code
313 """
314 ast = parse_dsl(source)
315 self.execute(ast)
318def execute_dsl(source: str, variables: dict[str, Any] | None = None) -> dict[str, Any]:
319 """Execute TraceKit DSL source code.
321 Args:
322 source: DSL source code
323 variables: Optional initial variables
325 Returns:
326 Final variable environment after execution
327 """
328 interpreter = Interpreter()
330 # Set initial variables
331 if variables:
332 interpreter.variables.update(variables)
334 # Execute
335 interpreter.execute_source(source)
337 return interpreter.variables