Coverage for src / tracekit / api / dsl.py: 99%
224 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Domain-Specific Language (DSL) for signal analysis.
3This module provides a simple DSL for expressing signal analysis
4operations in a readable, declarative format.
5"""
7from __future__ import annotations
9import re
10from dataclasses import dataclass, field
11from typing import TYPE_CHECKING, Any
13import numpy as np
15if TYPE_CHECKING:
16 from collections.abc import Callable
18 from numpy.typing import NDArray
20__all__ = [
21 "DSLExpression",
22 "DSLParser",
23 "analyze",
24 "parse_expression",
25]
28@dataclass
29class DSLExpression:
30 """Parsed DSL expression.
32 Attributes:
33 operation: Operation name
34 args: Positional arguments
35 kwargs: Keyword arguments
36 chain: Chained operation (if any)
38 Example:
39 >>> expr = DSLExpression(
40 ... operation="fft",
41 ... kwargs={"nfft": 8192}
42 ... )
44 References:
45 API-010: Domain-Specific Language (DSL)
46 """
48 operation: str
49 args: list[Any] = field(default_factory=list)
50 kwargs: dict[str, Any] = field(default_factory=dict)
51 chain: DSLExpression | None = None
53 def to_dict(self) -> dict[str, Any]:
54 """Convert to dictionary."""
55 result = {"operation": self.operation, "args": self.args, "kwargs": self.kwargs}
56 if self.chain:
57 result["chain"] = self.chain.to_dict()
58 return result
61class DSLParser:
62 """Parser for signal analysis DSL.
64 Grammar:
65 expression := operation | operation '|' expression
66 operation := name | name '(' arguments ')'
67 arguments := arg | arg ',' arguments
68 arg := value | name '=' value
69 value := number | string | list
71 Example:
72 >>> parser = DSLParser()
73 >>> expr = parser.parse("lowpass(cutoff=1e6) | fft(nfft=8192)")
74 >>> print(expr.operation)
75 'lowpass'
77 References:
78 API-010: Domain-Specific Language (DSL)
79 """
81 OPERATIONS = { # noqa: RUF012
82 "load",
83 "save",
84 "export",
85 "lowpass",
86 "highpass",
87 "bandpass",
88 "notch",
89 "filter",
90 "fft",
91 "ifft",
92 "psd",
93 "spectrogram",
94 "cwt",
95 "mean",
96 "std",
97 "min",
98 "max",
99 "rms",
100 "peak_to_peak",
101 "rise_time",
102 "fall_time",
103 "frequency",
104 "period",
105 "threshold",
106 "edges",
107 "pulses",
108 "decode",
109 "uart",
110 "spi",
111 "i2c",
112 "can",
113 "plot",
114 "show",
115 "histogram",
116 "resample",
117 "decimate",
118 "interpolate",
119 "normalize",
120 "zscore",
121 "scale",
122 "clip",
123 "slice",
124 "select",
125 }
127 def __init__(self) -> None:
128 """Initialize parser."""
129 self._pos = 0
130 self._text = ""
132 def parse(self, text: str) -> DSLExpression:
133 """Parse DSL expression.
135 Args:
136 text: DSL expression text
138 Returns:
139 Parsed expression
140 """
141 self._text = text.strip()
142 self._pos = 0
143 return self._parse_chain()
145 def _parse_chain(self) -> DSLExpression:
146 """Parse expression chain."""
147 expr = self._parse_operation()
149 self._skip_whitespace()
150 if self._pos < len(self._text) and self._text[self._pos] == "|":
151 self._pos += 1
152 self._skip_whitespace()
153 expr.chain = self._parse_chain()
155 return expr
157 def _parse_operation(self) -> DSLExpression:
158 """Parse single operation."""
159 self._skip_whitespace()
161 # Parse operation name
162 name = self._parse_identifier()
163 if name not in self.OPERATIONS:
164 raise ValueError(f"Unknown operation: {name}")
166 self._skip_whitespace()
168 # Check for arguments
169 args: list[Any] = []
170 kwargs: dict[str, Any] = {}
172 if self._pos < len(self._text) and self._text[self._pos] == "(":
173 self._pos += 1 # Skip '('
174 args, kwargs = self._parse_arguments()
176 if self._pos >= len(self._text) or self._text[self._pos] != ")":
177 raise ValueError("Expected ')'")
178 self._pos += 1 # Skip ')'
180 return DSLExpression(operation=name, args=args, kwargs=kwargs)
182 def _parse_arguments(self) -> tuple[list[Any], dict[str, Any]]:
183 """Parse argument list."""
184 args = []
185 kwargs = {}
187 while True:
188 self._skip_whitespace()
190 if self._pos >= len(self._text) or self._text[self._pos] == ")":
191 break
193 # Check for keyword argument
194 start = self._pos
195 name = self._try_parse_identifier()
197 self._skip_whitespace()
198 if name and self._pos < len(self._text) and self._text[self._pos] == "=":
199 self._pos += 1 # Skip '='
200 self._skip_whitespace()
201 value = self._parse_value()
202 kwargs[name] = value
203 else:
204 # Positional argument
205 self._pos = start
206 value = self._parse_value()
207 args.append(value)
209 self._skip_whitespace()
210 if self._pos < len(self._text) and self._text[self._pos] == ",":
211 self._pos += 1
213 return args, kwargs
215 def _parse_value(self) -> Any:
216 """Parse a value."""
217 self._skip_whitespace()
219 if self._pos >= len(self._text): 219 ↛ 220line 219 didn't jump to line 220 because the condition on line 219 was never true
220 raise ValueError("Unexpected end of expression")
222 char = self._text[self._pos]
224 # String
225 if char in "\"'":
226 return self._parse_string()
228 # List
229 if char == "[":
230 return self._parse_list()
232 # Number or identifier
233 return self._parse_number_or_identifier()
235 def _parse_string(self) -> str:
236 """Parse string literal."""
237 quote = self._text[self._pos]
238 self._pos += 1
239 start = self._pos
241 while self._pos < len(self._text) and self._text[self._pos] != quote:
242 if self._text[self._pos] == "\\":
243 self._pos += 1 # Skip escape
244 self._pos += 1
246 if self._pos >= len(self._text):
247 raise ValueError("Unterminated string")
249 value = self._text[start : self._pos]
250 self._pos += 1 # Skip closing quote
251 return value
253 def _parse_list(self) -> list[Any]:
254 """Parse list literal."""
255 self._pos += 1 # Skip '['
256 items = []
258 while True:
259 self._skip_whitespace()
261 if self._pos >= len(self._text):
262 raise ValueError("Unterminated list")
264 if self._text[self._pos] == "]":
265 self._pos += 1
266 break
268 items.append(self._parse_value())
270 self._skip_whitespace()
271 if self._pos < len(self._text) and self._text[self._pos] == ",":
272 self._pos += 1
274 return items
276 def _parse_number_or_identifier(self) -> Any:
277 """Parse number or identifier."""
278 # Match number pattern (including scientific notation)
279 pattern = r"[-+]?(\d+\.?\d*|\.\d+)([eE][-+]?\d+)?"
280 match = re.match(pattern, self._text[self._pos :])
282 if match:
283 self._pos += match.end()
284 value = match.group()
285 if "." in value or "e" in value.lower():
286 return float(value)
287 return int(value)
289 # Try identifier (True, False, None)
290 ident = self._parse_identifier()
291 if ident == "True":
292 return True
293 elif ident == "False":
294 return False
295 elif ident == "None":
296 return None
297 return ident
299 def _parse_identifier(self) -> str:
300 """Parse identifier."""
301 start = self._pos
303 if self._pos < len(self._text) and (
304 self._text[self._pos].isalpha() or self._text[self._pos] == "_"
305 ):
306 self._pos += 1
307 while self._pos < len(self._text) and (
308 self._text[self._pos].isalnum() or self._text[self._pos] == "_"
309 ):
310 self._pos += 1
312 if self._pos == start:
313 raise ValueError(f"Expected identifier at position {self._pos}")
315 return self._text[start : self._pos]
317 def _try_parse_identifier(self) -> str | None:
318 """Try to parse identifier, return None on failure."""
319 start = self._pos
320 try:
321 return self._parse_identifier()
322 except ValueError:
323 self._pos = start
324 return None
326 def _skip_whitespace(self) -> None:
327 """Skip whitespace characters."""
328 while self._pos < len(self._text) and self._text[self._pos].isspace():
329 self._pos += 1
332class DSLExecutor:
333 """Executes parsed DSL expressions.
335 References:
336 API-010: Domain-Specific Language (DSL)
337 """
339 def __init__(self) -> None:
340 """Initialize executor."""
341 self._operations: dict[str, Callable] = {} # type: ignore[type-arg]
342 self._register_builtins()
344 def _register_builtins(self) -> None:
345 """Register built-in operations."""
346 # Filter operations
347 self._operations["lowpass"] = self._lowpass
348 self._operations["highpass"] = self._highpass
349 self._operations["bandpass"] = self._bandpass
351 # Analysis operations
352 self._operations["fft"] = self._fft
353 self._operations["psd"] = self._psd
355 # Measurement operations
356 self._operations["mean"] = lambda data: np.mean(data)
357 self._operations["std"] = lambda data: np.std(data)
358 self._operations["min"] = lambda data: np.min(data)
359 self._operations["max"] = lambda data: np.max(data)
360 self._operations["rms"] = lambda data: np.sqrt(np.mean(data**2))
362 # Transform operations
363 self._operations["normalize"] = self._normalize
364 self._operations["resample"] = self._resample
365 self._operations["slice"] = self._slice
367 def execute(self, expr: DSLExpression, data: NDArray[np.float64]) -> Any:
368 """Execute DSL expression on data.
370 Args:
371 expr: Parsed expression
372 data: Input data
374 Returns:
375 Result of execution
377 Raises:
378 ValueError: If operation is unknown or result cannot be chained.
379 """
380 # Execute operation
381 op = self._operations.get(expr.operation)
382 if op is None:
383 raise ValueError(f"Unknown operation: {expr.operation}")
385 result = op(data, *expr.args, **expr.kwargs)
387 # Execute chain if present
388 if expr.chain:
389 if isinstance(result, np.ndarray):
390 return self.execute(expr.chain, result)
391 else:
392 raise ValueError(f"Cannot chain after {expr.operation}: result is not an array")
394 return result
396 def _lowpass(
397 self, data: NDArray[np.float64], cutoff: float = 1e6, **kwargs: Any
398 ) -> NDArray[np.float64]:
399 """Low-pass filter."""
400 from scipy import signal
402 b, a = signal.butter(4, cutoff, btype="low", fs=kwargs.get("fs", 2 * cutoff))
403 result: NDArray[np.float64] = signal.filtfilt(b, a, data)
404 return result
406 def _highpass(
407 self, data: NDArray[np.float64], cutoff: float = 1e3, **kwargs: Any
408 ) -> NDArray[np.float64]:
409 """High-pass filter."""
410 from scipy import signal
412 b, a = signal.butter(4, cutoff, btype="high", fs=kwargs.get("fs", 10 * cutoff))
413 result: NDArray[np.float64] = signal.filtfilt(b, a, data)
414 return result
416 def _bandpass(
417 self,
418 data: NDArray[np.float64],
419 low: float = 1e3,
420 high: float = 1e6,
421 **kwargs: Any,
422 ) -> NDArray[np.float64]:
423 """Band-pass filter."""
424 from scipy import signal
426 b, a = signal.butter(4, [low, high], btype="band", fs=kwargs.get("fs", 2 * high))
427 result: NDArray[np.float64] = signal.filtfilt(b, a, data)
428 return result
430 def _fft(
431 self,
432 data: NDArray[np.float64],
433 nfft: int | None = None,
434 **kwargs: Any,
435 ) -> NDArray[np.complex128]:
436 """FFT."""
437 return np.fft.fft(data, n=nfft)
439 def _psd(
440 self,
441 data: NDArray[np.float64],
442 nperseg: int = 256,
443 **kwargs: Any,
444 ) -> NDArray[np.float64]:
445 """Power spectral density."""
446 from scipy import signal
448 _, psd_result = signal.welch(data, nperseg=nperseg)
449 result: NDArray[np.float64] = psd_result
450 return result
452 def _normalize(
453 self,
454 data: NDArray[np.float64],
455 method: str = "minmax",
456 **kwargs: Any,
457 ) -> NDArray[np.float64]:
458 """Normalize data."""
459 if method == "minmax":
460 data_min = np.min(data)
461 data_max = np.max(data)
462 if data_max - data_min > 0:
463 result: NDArray[np.float64] = (data - data_min) / (data_max - data_min)
464 return result
465 return data
466 elif method == "zscore":
467 std = np.std(data)
468 if std > 0:
469 result_z: NDArray[np.float64] = (data - np.mean(data)) / std
470 return result_z
471 result_mean: NDArray[np.float64] = data - np.mean(data)
472 return result_mean
473 return data
475 def _resample(
476 self,
477 data: NDArray[np.float64],
478 factor: int = 2,
479 **kwargs: Any,
480 ) -> NDArray[np.float64]:
481 """Resample data."""
482 from scipy import signal
484 result: NDArray[np.float64] = signal.resample(data, len(data) // factor)
485 return result
487 def _slice(
488 self,
489 data: NDArray[np.float64],
490 start: int = 0,
491 end: int | None = None,
492 **kwargs: Any,
493 ) -> NDArray[np.float64]:
494 """Slice data."""
495 return data[start:end]
498# Global parser and executor
499_parser = DSLParser()
500_executor = DSLExecutor()
503def parse_expression(text: str) -> DSLExpression:
504 """Parse DSL expression.
506 Args:
507 text: DSL expression text
509 Returns:
510 Parsed expression
512 Example:
513 >>> expr = parse_expression("lowpass(cutoff=1e6) | fft(nfft=8192)")
515 References:
516 API-010: Domain-Specific Language (DSL)
517 """
518 return _parser.parse(text)
521def analyze(data: NDArray[np.float64], expression: str) -> Any:
522 """Analyze data using DSL expression.
524 Args:
525 data: Input data array
526 expression: DSL expression string
528 Returns:
529 Analysis result
531 Example:
532 >>> result = analyze(data, "lowpass(cutoff=1e6) | fft(nfft=8192)")
534 References:
535 API-010: Domain-Specific Language (DSL)
536 """
537 expr = parse_expression(expression)
538 return _executor.execute(expr, data)