Coverage for src / tracekit / api / dsl.py: 99%

224 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Domain-Specific Language (DSL) for signal analysis. 

2 

3This module provides a simple DSL for expressing signal analysis 

4operations in a readable, declarative format. 

5""" 

6 

7from __future__ import annotations 

8 

9import re 

10from dataclasses import dataclass, field 

11from typing import TYPE_CHECKING, Any 

12 

13import numpy as np 

14 

15if TYPE_CHECKING: 

16 from collections.abc import Callable 

17 

18 from numpy.typing import NDArray 

19 

20__all__ = [ 

21 "DSLExpression", 

22 "DSLParser", 

23 "analyze", 

24 "parse_expression", 

25] 

26 

27 

28@dataclass 

29class DSLExpression: 

30 """Parsed DSL expression. 

31 

32 Attributes: 

33 operation: Operation name 

34 args: Positional arguments 

35 kwargs: Keyword arguments 

36 chain: Chained operation (if any) 

37 

38 Example: 

39 >>> expr = DSLExpression( 

40 ... operation="fft", 

41 ... kwargs={"nfft": 8192} 

42 ... ) 

43 

44 References: 

45 API-010: Domain-Specific Language (DSL) 

46 """ 

47 

48 operation: str 

49 args: list[Any] = field(default_factory=list) 

50 kwargs: dict[str, Any] = field(default_factory=dict) 

51 chain: DSLExpression | None = None 

52 

53 def to_dict(self) -> dict[str, Any]: 

54 """Convert to dictionary.""" 

55 result = {"operation": self.operation, "args": self.args, "kwargs": self.kwargs} 

56 if self.chain: 

57 result["chain"] = self.chain.to_dict() 

58 return result 

59 

60 

61class DSLParser: 

62 """Parser for signal analysis DSL. 

63 

64 Grammar: 

65 expression := operation | operation '|' expression 

66 operation := name | name '(' arguments ')' 

67 arguments := arg | arg ',' arguments 

68 arg := value | name '=' value 

69 value := number | string | list 

70 

71 Example: 

72 >>> parser = DSLParser() 

73 >>> expr = parser.parse("lowpass(cutoff=1e6) | fft(nfft=8192)") 

74 >>> print(expr.operation) 

75 'lowpass' 

76 

77 References: 

78 API-010: Domain-Specific Language (DSL) 

79 """ 

80 

81 OPERATIONS = { # noqa: RUF012 

82 "load", 

83 "save", 

84 "export", 

85 "lowpass", 

86 "highpass", 

87 "bandpass", 

88 "notch", 

89 "filter", 

90 "fft", 

91 "ifft", 

92 "psd", 

93 "spectrogram", 

94 "cwt", 

95 "mean", 

96 "std", 

97 "min", 

98 "max", 

99 "rms", 

100 "peak_to_peak", 

101 "rise_time", 

102 "fall_time", 

103 "frequency", 

104 "period", 

105 "threshold", 

106 "edges", 

107 "pulses", 

108 "decode", 

109 "uart", 

110 "spi", 

111 "i2c", 

112 "can", 

113 "plot", 

114 "show", 

115 "histogram", 

116 "resample", 

117 "decimate", 

118 "interpolate", 

119 "normalize", 

120 "zscore", 

121 "scale", 

122 "clip", 

123 "slice", 

124 "select", 

125 } 

126 

127 def __init__(self) -> None: 

128 """Initialize parser.""" 

129 self._pos = 0 

130 self._text = "" 

131 

132 def parse(self, text: str) -> DSLExpression: 

133 """Parse DSL expression. 

134 

135 Args: 

136 text: DSL expression text 

137 

138 Returns: 

139 Parsed expression 

140 """ 

141 self._text = text.strip() 

142 self._pos = 0 

143 return self._parse_chain() 

144 

145 def _parse_chain(self) -> DSLExpression: 

146 """Parse expression chain.""" 

147 expr = self._parse_operation() 

148 

149 self._skip_whitespace() 

150 if self._pos < len(self._text) and self._text[self._pos] == "|": 

151 self._pos += 1 

152 self._skip_whitespace() 

153 expr.chain = self._parse_chain() 

154 

155 return expr 

156 

157 def _parse_operation(self) -> DSLExpression: 

158 """Parse single operation.""" 

159 self._skip_whitespace() 

160 

161 # Parse operation name 

162 name = self._parse_identifier() 

163 if name not in self.OPERATIONS: 

164 raise ValueError(f"Unknown operation: {name}") 

165 

166 self._skip_whitespace() 

167 

168 # Check for arguments 

169 args: list[Any] = [] 

170 kwargs: dict[str, Any] = {} 

171 

172 if self._pos < len(self._text) and self._text[self._pos] == "(": 

173 self._pos += 1 # Skip '(' 

174 args, kwargs = self._parse_arguments() 

175 

176 if self._pos >= len(self._text) or self._text[self._pos] != ")": 

177 raise ValueError("Expected ')'") 

178 self._pos += 1 # Skip ')' 

179 

180 return DSLExpression(operation=name, args=args, kwargs=kwargs) 

181 

182 def _parse_arguments(self) -> tuple[list[Any], dict[str, Any]]: 

183 """Parse argument list.""" 

184 args = [] 

185 kwargs = {} 

186 

187 while True: 

188 self._skip_whitespace() 

189 

190 if self._pos >= len(self._text) or self._text[self._pos] == ")": 

191 break 

192 

193 # Check for keyword argument 

194 start = self._pos 

195 name = self._try_parse_identifier() 

196 

197 self._skip_whitespace() 

198 if name and self._pos < len(self._text) and self._text[self._pos] == "=": 

199 self._pos += 1 # Skip '=' 

200 self._skip_whitespace() 

201 value = self._parse_value() 

202 kwargs[name] = value 

203 else: 

204 # Positional argument 

205 self._pos = start 

206 value = self._parse_value() 

207 args.append(value) 

208 

209 self._skip_whitespace() 

210 if self._pos < len(self._text) and self._text[self._pos] == ",": 

211 self._pos += 1 

212 

213 return args, kwargs 

214 

215 def _parse_value(self) -> Any: 

216 """Parse a value.""" 

217 self._skip_whitespace() 

218 

219 if self._pos >= len(self._text): 219 ↛ 220line 219 didn't jump to line 220 because the condition on line 219 was never true

220 raise ValueError("Unexpected end of expression") 

221 

222 char = self._text[self._pos] 

223 

224 # String 

225 if char in "\"'": 

226 return self._parse_string() 

227 

228 # List 

229 if char == "[": 

230 return self._parse_list() 

231 

232 # Number or identifier 

233 return self._parse_number_or_identifier() 

234 

235 def _parse_string(self) -> str: 

236 """Parse string literal.""" 

237 quote = self._text[self._pos] 

238 self._pos += 1 

239 start = self._pos 

240 

241 while self._pos < len(self._text) and self._text[self._pos] != quote: 

242 if self._text[self._pos] == "\\": 

243 self._pos += 1 # Skip escape 

244 self._pos += 1 

245 

246 if self._pos >= len(self._text): 

247 raise ValueError("Unterminated string") 

248 

249 value = self._text[start : self._pos] 

250 self._pos += 1 # Skip closing quote 

251 return value 

252 

253 def _parse_list(self) -> list[Any]: 

254 """Parse list literal.""" 

255 self._pos += 1 # Skip '[' 

256 items = [] 

257 

258 while True: 

259 self._skip_whitespace() 

260 

261 if self._pos >= len(self._text): 

262 raise ValueError("Unterminated list") 

263 

264 if self._text[self._pos] == "]": 

265 self._pos += 1 

266 break 

267 

268 items.append(self._parse_value()) 

269 

270 self._skip_whitespace() 

271 if self._pos < len(self._text) and self._text[self._pos] == ",": 

272 self._pos += 1 

273 

274 return items 

275 

276 def _parse_number_or_identifier(self) -> Any: 

277 """Parse number or identifier.""" 

278 # Match number pattern (including scientific notation) 

279 pattern = r"[-+]?(\d+\.?\d*|\.\d+)([eE][-+]?\d+)?" 

280 match = re.match(pattern, self._text[self._pos :]) 

281 

282 if match: 

283 self._pos += match.end() 

284 value = match.group() 

285 if "." in value or "e" in value.lower(): 

286 return float(value) 

287 return int(value) 

288 

289 # Try identifier (True, False, None) 

290 ident = self._parse_identifier() 

291 if ident == "True": 

292 return True 

293 elif ident == "False": 

294 return False 

295 elif ident == "None": 

296 return None 

297 return ident 

298 

299 def _parse_identifier(self) -> str: 

300 """Parse identifier.""" 

301 start = self._pos 

302 

303 if self._pos < len(self._text) and ( 

304 self._text[self._pos].isalpha() or self._text[self._pos] == "_" 

305 ): 

306 self._pos += 1 

307 while self._pos < len(self._text) and ( 

308 self._text[self._pos].isalnum() or self._text[self._pos] == "_" 

309 ): 

310 self._pos += 1 

311 

312 if self._pos == start: 

313 raise ValueError(f"Expected identifier at position {self._pos}") 

314 

315 return self._text[start : self._pos] 

316 

317 def _try_parse_identifier(self) -> str | None: 

318 """Try to parse identifier, return None on failure.""" 

319 start = self._pos 

320 try: 

321 return self._parse_identifier() 

322 except ValueError: 

323 self._pos = start 

324 return None 

325 

326 def _skip_whitespace(self) -> None: 

327 """Skip whitespace characters.""" 

328 while self._pos < len(self._text) and self._text[self._pos].isspace(): 

329 self._pos += 1 

330 

331 

332class DSLExecutor: 

333 """Executes parsed DSL expressions. 

334 

335 References: 

336 API-010: Domain-Specific Language (DSL) 

337 """ 

338 

339 def __init__(self) -> None: 

340 """Initialize executor.""" 

341 self._operations: dict[str, Callable] = {} # type: ignore[type-arg] 

342 self._register_builtins() 

343 

344 def _register_builtins(self) -> None: 

345 """Register built-in operations.""" 

346 # Filter operations 

347 self._operations["lowpass"] = self._lowpass 

348 self._operations["highpass"] = self._highpass 

349 self._operations["bandpass"] = self._bandpass 

350 

351 # Analysis operations 

352 self._operations["fft"] = self._fft 

353 self._operations["psd"] = self._psd 

354 

355 # Measurement operations 

356 self._operations["mean"] = lambda data: np.mean(data) 

357 self._operations["std"] = lambda data: np.std(data) 

358 self._operations["min"] = lambda data: np.min(data) 

359 self._operations["max"] = lambda data: np.max(data) 

360 self._operations["rms"] = lambda data: np.sqrt(np.mean(data**2)) 

361 

362 # Transform operations 

363 self._operations["normalize"] = self._normalize 

364 self._operations["resample"] = self._resample 

365 self._operations["slice"] = self._slice 

366 

367 def execute(self, expr: DSLExpression, data: NDArray[np.float64]) -> Any: 

368 """Execute DSL expression on data. 

369 

370 Args: 

371 expr: Parsed expression 

372 data: Input data 

373 

374 Returns: 

375 Result of execution 

376 

377 Raises: 

378 ValueError: If operation is unknown or result cannot be chained. 

379 """ 

380 # Execute operation 

381 op = self._operations.get(expr.operation) 

382 if op is None: 

383 raise ValueError(f"Unknown operation: {expr.operation}") 

384 

385 result = op(data, *expr.args, **expr.kwargs) 

386 

387 # Execute chain if present 

388 if expr.chain: 

389 if isinstance(result, np.ndarray): 

390 return self.execute(expr.chain, result) 

391 else: 

392 raise ValueError(f"Cannot chain after {expr.operation}: result is not an array") 

393 

394 return result 

395 

396 def _lowpass( 

397 self, data: NDArray[np.float64], cutoff: float = 1e6, **kwargs: Any 

398 ) -> NDArray[np.float64]: 

399 """Low-pass filter.""" 

400 from scipy import signal 

401 

402 b, a = signal.butter(4, cutoff, btype="low", fs=kwargs.get("fs", 2 * cutoff)) 

403 result: NDArray[np.float64] = signal.filtfilt(b, a, data) 

404 return result 

405 

406 def _highpass( 

407 self, data: NDArray[np.float64], cutoff: float = 1e3, **kwargs: Any 

408 ) -> NDArray[np.float64]: 

409 """High-pass filter.""" 

410 from scipy import signal 

411 

412 b, a = signal.butter(4, cutoff, btype="high", fs=kwargs.get("fs", 10 * cutoff)) 

413 result: NDArray[np.float64] = signal.filtfilt(b, a, data) 

414 return result 

415 

416 def _bandpass( 

417 self, 

418 data: NDArray[np.float64], 

419 low: float = 1e3, 

420 high: float = 1e6, 

421 **kwargs: Any, 

422 ) -> NDArray[np.float64]: 

423 """Band-pass filter.""" 

424 from scipy import signal 

425 

426 b, a = signal.butter(4, [low, high], btype="band", fs=kwargs.get("fs", 2 * high)) 

427 result: NDArray[np.float64] = signal.filtfilt(b, a, data) 

428 return result 

429 

430 def _fft( 

431 self, 

432 data: NDArray[np.float64], 

433 nfft: int | None = None, 

434 **kwargs: Any, 

435 ) -> NDArray[np.complex128]: 

436 """FFT.""" 

437 return np.fft.fft(data, n=nfft) 

438 

439 def _psd( 

440 self, 

441 data: NDArray[np.float64], 

442 nperseg: int = 256, 

443 **kwargs: Any, 

444 ) -> NDArray[np.float64]: 

445 """Power spectral density.""" 

446 from scipy import signal 

447 

448 _, psd_result = signal.welch(data, nperseg=nperseg) 

449 result: NDArray[np.float64] = psd_result 

450 return result 

451 

452 def _normalize( 

453 self, 

454 data: NDArray[np.float64], 

455 method: str = "minmax", 

456 **kwargs: Any, 

457 ) -> NDArray[np.float64]: 

458 """Normalize data.""" 

459 if method == "minmax": 

460 data_min = np.min(data) 

461 data_max = np.max(data) 

462 if data_max - data_min > 0: 

463 result: NDArray[np.float64] = (data - data_min) / (data_max - data_min) 

464 return result 

465 return data 

466 elif method == "zscore": 

467 std = np.std(data) 

468 if std > 0: 

469 result_z: NDArray[np.float64] = (data - np.mean(data)) / std 

470 return result_z 

471 result_mean: NDArray[np.float64] = data - np.mean(data) 

472 return result_mean 

473 return data 

474 

475 def _resample( 

476 self, 

477 data: NDArray[np.float64], 

478 factor: int = 2, 

479 **kwargs: Any, 

480 ) -> NDArray[np.float64]: 

481 """Resample data.""" 

482 from scipy import signal 

483 

484 result: NDArray[np.float64] = signal.resample(data, len(data) // factor) 

485 return result 

486 

487 def _slice( 

488 self, 

489 data: NDArray[np.float64], 

490 start: int = 0, 

491 end: int | None = None, 

492 **kwargs: Any, 

493 ) -> NDArray[np.float64]: 

494 """Slice data.""" 

495 return data[start:end] 

496 

497 

498# Global parser and executor 

499_parser = DSLParser() 

500_executor = DSLExecutor() 

501 

502 

503def parse_expression(text: str) -> DSLExpression: 

504 """Parse DSL expression. 

505 

506 Args: 

507 text: DSL expression text 

508 

509 Returns: 

510 Parsed expression 

511 

512 Example: 

513 >>> expr = parse_expression("lowpass(cutoff=1e6) | fft(nfft=8192)") 

514 

515 References: 

516 API-010: Domain-Specific Language (DSL) 

517 """ 

518 return _parser.parse(text) 

519 

520 

521def analyze(data: NDArray[np.float64], expression: str) -> Any: 

522 """Analyze data using DSL expression. 

523 

524 Args: 

525 data: Input data array 

526 expression: DSL expression string 

527 

528 Returns: 

529 Analysis result 

530 

531 Example: 

532 >>> result = analyze(data, "lowpass(cutoff=1e6) | fft(nfft=8192)") 

533 

534 References: 

535 API-010: Domain-Specific Language (DSL) 

536 """ 

537 expr = parse_expression(expression) 

538 return _executor.execute(expr, data)