Coverage for src / tracekit / dsl / parser.py: 94%

341 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""TraceKit DSL Parser. 

2 

3Implements simple domain-specific language for trace analysis workflows. 

4""" 

5 

6from dataclasses import dataclass 

7from enum import Enum, auto 

8from typing import Any, Union 

9 

10 

11class TokenType(Enum): 

12 """Token types for DSL lexer.""" 

13 

14 # Literals 

15 STRING = auto() 

16 NUMBER = auto() 

17 VARIABLE = auto() 

18 IDENTIFIER = auto() 

19 

20 # Operators 

21 PIPE = auto() 

22 ASSIGN = auto() 

23 COMMA = auto() 

24 

25 # Keywords 

26 LOAD = auto() 

27 FILTER = auto() 

28 MEASURE = auto() 

29 PLOT = auto() 

30 EXPORT = auto() 

31 FOR = auto() 

32 IN = auto() 

33 GLOB = auto() 

34 

35 # Structural 

36 LPAREN = auto() 

37 RPAREN = auto() 

38 COLON = auto() 

39 NEWLINE = auto() 

40 INDENT = auto() 

41 DEDENT = auto() 

42 EOF = auto() 

43 

44 

45@dataclass 

46class Token: 

47 """Lexical token.""" 

48 

49 type: TokenType 

50 value: Any 

51 line: int 

52 column: int 

53 

54 

55class Lexer: 

56 """Tokenizer for TraceKit DSL. 

57 

58 Breaks input text into tokens for parsing. 

59 Supports indentation-based block structure (Python-style). 

60 """ 

61 

62 KEYWORDS = { # noqa: RUF012 

63 "load": TokenType.LOAD, 

64 "filter": TokenType.FILTER, 

65 "measure": TokenType.MEASURE, 

66 "plot": TokenType.PLOT, 

67 "export": TokenType.EXPORT, 

68 "for": TokenType.FOR, 

69 "in": TokenType.IN, 

70 "glob": TokenType.GLOB, 

71 } 

72 

73 def __init__(self, text: str): 

74 """Initialize lexer with input text. 

75 

76 Args: 

77 text: DSL source code 

78 """ 

79 self.text = text 

80 self.pos = 0 

81 self.line = 1 

82 self.column = 1 

83 self.tokens: list[Token] = [] 

84 # Indentation tracking 

85 self.indent_stack: list[int] = [0] 

86 self.at_line_start = True 

87 

88 def current_char(self) -> str | None: 

89 """Get current character without advancing.""" 

90 if self.pos >= len(self.text): 

91 return None 

92 return self.text[self.pos] 

93 

94 def peek_char(self, offset: int = 1) -> str | None: 

95 """Peek ahead at character.""" 

96 pos = self.pos + offset 

97 if pos >= len(self.text): 

98 return None 

99 return self.text[pos] 

100 

101 def advance(self) -> None: 

102 """Advance position and update line/column.""" 

103 if self.pos < len(self.text) and self.text[self.pos] == "\n": 

104 self.line += 1 

105 self.column = 1 

106 self.at_line_start = True 

107 else: 

108 self.column += 1 

109 self.pos += 1 

110 

111 def skip_whitespace(self) -> None: 

112 """Skip whitespace except newlines.""" 

113 while self.current_char() and self.current_char() in " \t\r": # type: ignore[operator] 

114 self.advance() 

115 

116 def skip_comment(self) -> None: 

117 """Skip # comment to end of line.""" 

118 if self.current_char() == "#": 

119 while self.current_char() and self.current_char() != "\n": 

120 self.advance() 

121 

122 def measure_indent(self) -> int: 

123 """Measure indentation at current position (after newline). 

124 

125 Returns: 

126 Number of spaces of indentation (tabs count as 4 spaces) 

127 """ 

128 indent = 0 

129 start_pos = self.pos 

130 

131 while self.current_char() and self.current_char() in " \t": # type: ignore[operator] 

132 if self.current_char() == " ": 

133 indent += 1 

134 elif self.current_char() == "\t": 134 ↛ 136line 134 didn't jump to line 136 because the condition on line 134 was always true

135 indent += 4 # Tab = 4 spaces 

136 self.pos += 1 

137 self.column += 1 

138 

139 # Check if rest of line is blank or comment 

140 if self.current_char() == "#" or self.current_char() == "\n" or self.current_char() is None: 

141 # Blank line or comment-only line - reset position and return -1 

142 self.pos = start_pos 

143 self.column = 1 

144 return -1 # Signal to ignore this line for indentation 

145 

146 return indent 

147 

148 def read_string(self) -> str: 

149 """Read quoted string literal.""" 

150 quote_char = self.current_char() 

151 self.advance() # Skip opening quote 

152 

153 chars = [] 

154 while self.current_char() and self.current_char() != quote_char: 

155 if self.current_char() == "\\": 

156 self.advance() 

157 # Simple escape sequences 

158 escape_map = {"n": "\n", "t": "\t", "r": "\r", "\\": "\\", '"': '"', "'": "'"} 

159 if self.current_char() in escape_map: 159 ↛ 162line 159 didn't jump to line 162 because the condition on line 159 was always true

160 chars.append(escape_map[self.current_char()]) # type: ignore[index] 

161 else: 

162 chars.append(self.current_char() or "") 

163 else: 

164 chars.append(self.current_char() or "") 

165 self.advance() 

166 

167 if not self.current_char(): 

168 raise SyntaxError(f"Unterminated string at line {self.line}") 

169 

170 self.advance() # Skip closing quote 

171 return "".join(chars) 

172 

173 def read_number(self) -> int | float: 

174 """Read numeric literal.""" 

175 chars = [] 

176 has_dot = False 

177 has_exp = False 

178 

179 while self.current_char() and ( 

180 self.current_char().isdigit() or self.current_char() in ".eE+-" # type: ignore[union-attr, operator] 

181 ): 

182 if self.current_char() == ".": 

183 if has_dot: 183 ↛ 184line 183 didn't jump to line 184 because the condition on line 183 was never true

184 break 

185 has_dot = True 

186 elif self.current_char() in "eE": # type: ignore[operator] 

187 if has_exp: 187 ↛ 188line 187 didn't jump to line 188 because the condition on line 187 was never true

188 break 

189 has_exp = True 

190 chars.append(self.current_char()) 

191 self.advance() 

192 

193 num_str = "".join(chars) # type: ignore[arg-type] 

194 return float(num_str) if has_dot or has_exp else int(num_str) 

195 

196 def read_identifier(self) -> str: 

197 """Read identifier or keyword.""" 

198 chars = [] 

199 while self.current_char() and (self.current_char().isalnum() or self.current_char() in "_"): # type: ignore[union-attr, operator, syntax, operator] 

200 chars.append(self.current_char()) 

201 self.advance() 

202 return "".join(chars) # type: ignore[arg-type] 

203 

204 def read_variable(self) -> str: 

205 """Read variable name ($varname).""" 

206 self.advance() # Skip $ 

207 return "$" + self.read_identifier() 

208 

209 def emit_indent_tokens(self, indent: int) -> None: 

210 """Emit INDENT/DEDENT tokens based on indentation change. 

211 

212 Args: 

213 indent: Current line's indentation level 

214 

215 Raises: 

216 SyntaxError: If indentation is inconsistent. 

217 """ 

218 current_indent = self.indent_stack[-1] 

219 

220 if indent > current_indent: 

221 # Increased indentation 

222 self.indent_stack.append(indent) 

223 self.tokens.append(Token(TokenType.INDENT, indent, self.line, 1)) 

224 elif indent < current_indent: 

225 # Decreased indentation - may need multiple DEDENTs 

226 while self.indent_stack and indent < self.indent_stack[-1]: 

227 self.indent_stack.pop() 

228 self.tokens.append(Token(TokenType.DEDENT, indent, self.line, 1)) 

229 

230 # Check for inconsistent indentation 

231 if self.indent_stack and indent != self.indent_stack[-1]: 231 ↛ 232line 231 didn't jump to line 232 because the condition on line 231 was never true

232 raise SyntaxError( 

233 f"Inconsistent indentation at line {self.line}: " 

234 f"got {indent} spaces, expected {self.indent_stack[-1]}" 

235 ) 

236 

237 def tokenize(self) -> list[Token]: 

238 """Tokenize entire input. 

239 

240 Returns: 

241 List of tokens 

242 

243 Raises: 

244 SyntaxError: On lexical errors 

245 """ 

246 while self.pos < len(self.text): 

247 # Handle indentation at line start 

248 if self.at_line_start: 

249 self.at_line_start = False 

250 indent = self.measure_indent() 

251 

252 # Skip blank/comment lines 

253 if indent == -1: 

254 self.skip_whitespace() 

255 self.skip_comment() 

256 if self.current_char() == "\n": 

257 self.advance() 

258 continue 

259 elif self.current_char() is None: 259 ↛ 264line 259 didn't jump to line 264 because the condition on line 259 was always true

260 break 

261 else: 

262 self.emit_indent_tokens(indent) 

263 

264 self.skip_whitespace() 

265 self.skip_comment() 

266 

267 if not self.current_char(): 

268 break 

269 

270 line, col = self.line, self.column 

271 char = self.current_char() 

272 

273 # Newline 

274 if char == "\n": 

275 self.tokens.append(Token(TokenType.NEWLINE, "\n", line, col)) 

276 self.advance() 

277 

278 # String 

279 elif char in "\"'": # type: ignore[operator] 

280 value = self.read_string() 

281 self.tokens.append(Token(TokenType.STRING, value, line, col)) 

282 

283 # Number 

284 elif char.isdigit() or ( # type: ignore[union-attr] 

285 char == "." and self.peek_char() and self.peek_char().isdigit() # type: ignore[union-attr] 

286 ): 

287 value = self.read_number() # type: ignore[assignment] 

288 self.tokens.append(Token(TokenType.NUMBER, value, line, col)) 

289 

290 # Variable 

291 elif char == "$": 

292 value = self.read_variable() 

293 self.tokens.append(Token(TokenType.VARIABLE, value, line, col)) 

294 

295 # Pipe 

296 elif char == "|": 

297 self.tokens.append(Token(TokenType.PIPE, "|", line, col)) 

298 self.advance() 

299 

300 # Assignment 

301 elif char == "=": 

302 self.tokens.append(Token(TokenType.ASSIGN, "=", line, col)) 

303 self.advance() 

304 

305 # Comma 

306 elif char == ",": 

307 self.tokens.append(Token(TokenType.COMMA, ",", line, col)) 

308 self.advance() 

309 

310 # Colon 

311 elif char == ":": 

312 self.tokens.append(Token(TokenType.COLON, ":", line, col)) 

313 self.advance() 

314 

315 # Parentheses 

316 elif char == "(": 

317 self.tokens.append(Token(TokenType.LPAREN, "(", line, col)) 

318 self.advance() 

319 elif char == ")": 

320 self.tokens.append(Token(TokenType.RPAREN, ")", line, col)) 

321 self.advance() 

322 

323 # Identifier or keyword 

324 elif char.isalpha() or char == "_": # type: ignore[union-attr] 

325 ident = self.read_identifier() 

326 token_type = self.KEYWORDS.get(ident.lower(), TokenType.IDENTIFIER) 

327 self.tokens.append(Token(token_type, ident, line, col)) 

328 

329 else: 

330 raise SyntaxError(f"Unexpected character '{char}' at line {line}, column {col}") 

331 

332 # Emit remaining DEDENTs at end of file 

333 while len(self.indent_stack) > 1: 

334 self.indent_stack.pop() 

335 self.tokens.append(Token(TokenType.DEDENT, 0, self.line, self.column)) 

336 

337 # Add EOF token 

338 self.tokens.append(Token(TokenType.EOF, None, self.line, self.column)) 

339 return self.tokens 

340 

341 

342@dataclass 

343class ASTNode: 

344 """Base class for AST nodes.""" 

345 

346 line: int 

347 column: int 

348 

349 

350@dataclass 

351class Assignment(ASTNode): 

352 """Variable assignment: $var = expr.""" 

353 

354 variable: str 

355 expression: "Expression" 

356 

357 

358@dataclass 

359class Pipeline(ASTNode): 

360 """Pipeline expression: expr | command | command.""" 

361 

362 stages: list["Expression"] 

363 

364 

365@dataclass 

366class Command(ASTNode): 

367 """Command invocation: command arg1 arg2.""" 

368 

369 name: str 

370 args: list["Expression"] 

371 

372 

373@dataclass 

374class FunctionCall(ASTNode): 

375 """Function call: func(arg1, arg2).""" 

376 

377 name: str 

378 args: list["Expression"] 

379 

380 

381@dataclass 

382class Variable(ASTNode): 

383 """Variable reference: $var.""" 

384 

385 name: str 

386 

387 

388@dataclass 

389class Literal(ASTNode): 

390 """Literal value: string, number.""" 

391 

392 value: str | int | float 

393 

394 

395@dataclass 

396class ForLoop(ASTNode): 

397 """For loop: for $var in expr: body.""" 

398 

399 variable: str 

400 iterable: "Expression" 

401 body: list["Statement"] 

402 

403 

404# Type aliases 

405Expression = Union[Pipeline, Command, FunctionCall, Variable, Literal] 

406Statement = Union[Assignment, Pipeline, ForLoop] 

407 

408 

409class Parser: 

410 """Recursive descent parser for TraceKit DSL. 

411 

412 Parses token stream into abstract syntax tree. 

413 Supports indentation-based block structure. 

414 """ 

415 

416 def __init__(self, tokens: list[Token]): 

417 """Initialize parser with token list. 

418 

419 Args: 

420 tokens: Token list from lexer 

421 """ 

422 self.tokens = tokens 

423 self.pos = 0 

424 

425 def current_token(self) -> Token: 

426 """Get current token.""" 

427 if self.pos >= len(self.tokens): 427 ↛ 428line 427 didn't jump to line 428 because the condition on line 427 was never true

428 return self.tokens[-1] # EOF 

429 return self.tokens[self.pos] 

430 

431 def peek_token(self, offset: int = 1) -> Token: 

432 """Peek ahead at token.""" 

433 pos = self.pos + offset 

434 if pos >= len(self.tokens): 434 ↛ 435line 434 didn't jump to line 435 because the condition on line 434 was never true

435 return self.tokens[-1] # EOF 

436 return self.tokens[pos] 

437 

438 def advance(self) -> None: 

439 """Advance to next token.""" 

440 if self.pos < len(self.tokens): 440 ↛ exitline 440 didn't return from function 'advance' because the condition on line 440 was always true

441 self.pos += 1 

442 

443 def expect(self, token_type: TokenType) -> Token: 

444 """Expect specific token type and advance. 

445 

446 Args: 

447 token_type: Expected token type 

448 

449 Returns: 

450 The token 

451 

452 Raises: 

453 SyntaxError: If token type doesn't match 

454 """ 

455 token = self.current_token() 

456 if token.type != token_type: 456 ↛ 457line 456 didn't jump to line 457 because the condition on line 456 was never true

457 raise SyntaxError( 

458 f"Expected {token_type.name}, got {token.type.name} " 

459 f"at line {token.line}, column {token.column}" 

460 ) 

461 self.advance() 

462 return token 

463 

464 def skip_newlines(self) -> None: 

465 """Skip optional newlines.""" 

466 while self.current_token().type == TokenType.NEWLINE: 

467 self.advance() 

468 

469 def parse(self) -> list[Statement]: 

470 """Parse complete program. 

471 

472 Returns: 

473 List of statements (AST) 

474 

475 Note: 

476 May raise SyntaxError on parse errors via parse_statement(). 

477 """ 

478 statements = [] 

479 

480 while self.current_token().type != TokenType.EOF: 

481 self.skip_newlines() 

482 if self.current_token().type == TokenType.EOF: 482 ↛ 483line 482 didn't jump to line 483 because the condition on line 482 was never true

483 break 

484 

485 stmt = self.parse_statement() 

486 statements.append(stmt) 

487 self.skip_newlines() 

488 

489 return statements 

490 

491 def parse_statement(self) -> Statement: 

492 """Parse a single statement.""" 

493 # For loop 

494 if self.current_token().type == TokenType.FOR: 

495 return self.parse_for_loop() 

496 

497 # Assignment or expression 

498 if self.current_token().type == TokenType.VARIABLE: 

499 if self.peek_token().type == TokenType.ASSIGN: 

500 return self.parse_assignment() 

501 

502 # Pipeline expression 

503 return self.parse_pipeline() # type: ignore[return-value] 

504 

505 def parse_assignment(self) -> Assignment: 

506 """Parse variable assignment.""" 

507 token = self.current_token() 

508 var_token = self.expect(TokenType.VARIABLE) 

509 self.expect(TokenType.ASSIGN) 

510 expr = self.parse_pipeline() 

511 

512 return Assignment( 

513 variable=var_token.value, 

514 expression=expr, 

515 line=token.line, 

516 column=token.column, 

517 ) 

518 

519 def parse_pipeline(self) -> Expression: 

520 """Parse pipeline expression.""" 

521 stages = [self.parse_primary()] 

522 

523 while self.current_token().type == TokenType.PIPE: 

524 self.advance() # Skip | 

525 stages.append(self.parse_primary()) 

526 

527 if len(stages) == 1: 

528 return stages[0] 

529 

530 return Pipeline(stages=stages, line=stages[0].line, column=stages[0].column) 

531 

532 def parse_primary(self) -> Expression: 

533 """Parse primary expression.""" 

534 token = self.current_token() 

535 

536 # Literal string 

537 if token.type == TokenType.STRING: 

538 self.advance() 

539 return Literal(value=token.value, line=token.line, column=token.column) 

540 

541 # Literal number 

542 if token.type == TokenType.NUMBER: 

543 self.advance() 

544 return Literal(value=token.value, line=token.line, column=token.column) 

545 

546 # Variable 

547 if token.type == TokenType.VARIABLE: 

548 self.advance() 

549 return Variable(name=token.value, line=token.line, column=token.column) 

550 

551 # Function call or command 

552 if token.type in ( 

553 TokenType.IDENTIFIER, 

554 TokenType.LOAD, 

555 TokenType.FILTER, 

556 TokenType.MEASURE, 

557 TokenType.PLOT, 

558 TokenType.EXPORT, 

559 TokenType.GLOB, 

560 ): 

561 name = token.value 

562 self.advance() 

563 

564 # Function call with parens 

565 if self.current_token().type == TokenType.LPAREN: 

566 return self.parse_function_call(name, token) 

567 

568 # Command with args 

569 args = [] 

570 while self.current_token().type not in ( 

571 TokenType.PIPE, 

572 TokenType.NEWLINE, 

573 TokenType.EOF, 

574 TokenType.COLON, 

575 TokenType.INDENT, 

576 TokenType.DEDENT, 

577 ): 

578 args.append(self.parse_primary()) 

579 

580 return Command(name=name, args=args, line=token.line, column=token.column) 

581 

582 raise SyntaxError( 

583 f"Unexpected token {token.type.name} at line {token.line}, column {token.column}" 

584 ) 

585 

586 def parse_function_call(self, name: str, token: Token) -> FunctionCall: 

587 """Parse function call with parentheses.""" 

588 self.expect(TokenType.LPAREN) 

589 

590 args = [] 

591 while self.current_token().type != TokenType.RPAREN: 

592 args.append(self.parse_primary()) 

593 if self.current_token().type == TokenType.COMMA: 

594 self.advance() 

595 

596 self.expect(TokenType.RPAREN) 

597 return FunctionCall(name=name, args=args, line=token.line, column=token.column) 

598 

599 def parse_for_loop(self) -> ForLoop: 

600 """Parse for loop with indented body. 

601 

602 Supports both single-line body and multi-line indented blocks: 

603 

604 Single line: 

605 for $f in glob("*.wfm"): load $f 

606 

607 Multi-line (indented block): 

608 for $f in glob("*.wfm"): 

609 $data = load $f 

610 measure $data 

611 plot $data 

612 

613 Returns: 

614 ForLoop AST node. 

615 """ 

616 token = self.current_token() 

617 self.expect(TokenType.FOR) 

618 

619 var_token = self.expect(TokenType.VARIABLE) 

620 self.expect(TokenType.IN) 

621 

622 iterable = self.parse_primary() 

623 self.expect(TokenType.COLON) 

624 

625 body: list[Statement] = [] 

626 

627 # Check if body follows on same line or is indented block 

628 if self.current_token().type == TokenType.NEWLINE: 

629 # Multi-line block: expect INDENT, statements, DEDENT 

630 self.skip_newlines() 

631 

632 if self.current_token().type == TokenType.INDENT: 632 ↛ 649line 632 didn't jump to line 649 because the condition on line 632 was always true

633 self.advance() # Consume INDENT 

634 

635 # Parse statements until DEDENT 

636 while self.current_token().type not in (TokenType.DEDENT, TokenType.EOF): 

637 self.skip_newlines() 

638 if self.current_token().type in (TokenType.DEDENT, TokenType.EOF): 638 ↛ 639line 638 didn't jump to line 639 because the condition on line 638 was never true

639 break 

640 stmt = self.parse_statement() 

641 body.append(stmt) 

642 self.skip_newlines() 

643 

644 # Consume DEDENT if present 

645 if self.current_token().type == TokenType.DEDENT: 645 ↛ 654line 645 didn't jump to line 654 because the condition on line 645 was always true

646 self.advance() 

647 else: 

648 # No INDENT after newline - parse single statement 

649 body = [self.parse_statement()] 

650 else: 

651 # Single-line body (statement on same line as colon) 

652 body = [self.parse_statement()] 

653 

654 return ForLoop( 

655 variable=var_token.value, 

656 iterable=iterable, 

657 body=body, 

658 line=token.line, 

659 column=token.column, 

660 ) 

661 

662 

663def parse_dsl(source: str) -> list[Statement]: 

664 """Parse TraceKit DSL source code. 

665 

666 Args: 

667 source: DSL source code 

668 

669 Returns: 

670 Abstract syntax tree (list of statements) 

671 

672 Example: 

673 >>> # Single-line for loop 

674 >>> ast = parse_dsl('for $f in glob("*.wfm"): load $f') 

675 

676 >>> # Multi-line indented block 

677 >>> ast = parse_dsl(''' 

678 ... for $f in glob("*.wfm"): 

679 ... $data = load $f 

680 ... measure $data 

681 ... ''') 

682 

683 Note: 

684 May raise SyntaxError on parse errors via tokenize() or parse(). 

685 """ 

686 lexer = Lexer(source) 

687 tokens = lexer.tokenize() 

688 parser = Parser(tokens) 

689 return parser.parse()