Coverage for src / tracekit / dsl / parser.py: 94%
341 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""TraceKit DSL Parser.
3Implements simple domain-specific language for trace analysis workflows.
4"""
6from dataclasses import dataclass
7from enum import Enum, auto
8from typing import Any, Union
11class TokenType(Enum):
12 """Token types for DSL lexer."""
14 # Literals
15 STRING = auto()
16 NUMBER = auto()
17 VARIABLE = auto()
18 IDENTIFIER = auto()
20 # Operators
21 PIPE = auto()
22 ASSIGN = auto()
23 COMMA = auto()
25 # Keywords
26 LOAD = auto()
27 FILTER = auto()
28 MEASURE = auto()
29 PLOT = auto()
30 EXPORT = auto()
31 FOR = auto()
32 IN = auto()
33 GLOB = auto()
35 # Structural
36 LPAREN = auto()
37 RPAREN = auto()
38 COLON = auto()
39 NEWLINE = auto()
40 INDENT = auto()
41 DEDENT = auto()
42 EOF = auto()
45@dataclass
46class Token:
47 """Lexical token."""
49 type: TokenType
50 value: Any
51 line: int
52 column: int
55class Lexer:
56 """Tokenizer for TraceKit DSL.
58 Breaks input text into tokens for parsing.
59 Supports indentation-based block structure (Python-style).
60 """
62 KEYWORDS = { # noqa: RUF012
63 "load": TokenType.LOAD,
64 "filter": TokenType.FILTER,
65 "measure": TokenType.MEASURE,
66 "plot": TokenType.PLOT,
67 "export": TokenType.EXPORT,
68 "for": TokenType.FOR,
69 "in": TokenType.IN,
70 "glob": TokenType.GLOB,
71 }
73 def __init__(self, text: str):
74 """Initialize lexer with input text.
76 Args:
77 text: DSL source code
78 """
79 self.text = text
80 self.pos = 0
81 self.line = 1
82 self.column = 1
83 self.tokens: list[Token] = []
84 # Indentation tracking
85 self.indent_stack: list[int] = [0]
86 self.at_line_start = True
88 def current_char(self) -> str | None:
89 """Get current character without advancing."""
90 if self.pos >= len(self.text):
91 return None
92 return self.text[self.pos]
94 def peek_char(self, offset: int = 1) -> str | None:
95 """Peek ahead at character."""
96 pos = self.pos + offset
97 if pos >= len(self.text):
98 return None
99 return self.text[pos]
101 def advance(self) -> None:
102 """Advance position and update line/column."""
103 if self.pos < len(self.text) and self.text[self.pos] == "\n":
104 self.line += 1
105 self.column = 1
106 self.at_line_start = True
107 else:
108 self.column += 1
109 self.pos += 1
111 def skip_whitespace(self) -> None:
112 """Skip whitespace except newlines."""
113 while self.current_char() and self.current_char() in " \t\r": # type: ignore[operator]
114 self.advance()
116 def skip_comment(self) -> None:
117 """Skip # comment to end of line."""
118 if self.current_char() == "#":
119 while self.current_char() and self.current_char() != "\n":
120 self.advance()
122 def measure_indent(self) -> int:
123 """Measure indentation at current position (after newline).
125 Returns:
126 Number of spaces of indentation (tabs count as 4 spaces)
127 """
128 indent = 0
129 start_pos = self.pos
131 while self.current_char() and self.current_char() in " \t": # type: ignore[operator]
132 if self.current_char() == " ":
133 indent += 1
134 elif self.current_char() == "\t": 134 ↛ 136line 134 didn't jump to line 136 because the condition on line 134 was always true
135 indent += 4 # Tab = 4 spaces
136 self.pos += 1
137 self.column += 1
139 # Check if rest of line is blank or comment
140 if self.current_char() == "#" or self.current_char() == "\n" or self.current_char() is None:
141 # Blank line or comment-only line - reset position and return -1
142 self.pos = start_pos
143 self.column = 1
144 return -1 # Signal to ignore this line for indentation
146 return indent
148 def read_string(self) -> str:
149 """Read quoted string literal."""
150 quote_char = self.current_char()
151 self.advance() # Skip opening quote
153 chars = []
154 while self.current_char() and self.current_char() != quote_char:
155 if self.current_char() == "\\":
156 self.advance()
157 # Simple escape sequences
158 escape_map = {"n": "\n", "t": "\t", "r": "\r", "\\": "\\", '"': '"', "'": "'"}
159 if self.current_char() in escape_map: 159 ↛ 162line 159 didn't jump to line 162 because the condition on line 159 was always true
160 chars.append(escape_map[self.current_char()]) # type: ignore[index]
161 else:
162 chars.append(self.current_char() or "")
163 else:
164 chars.append(self.current_char() or "")
165 self.advance()
167 if not self.current_char():
168 raise SyntaxError(f"Unterminated string at line {self.line}")
170 self.advance() # Skip closing quote
171 return "".join(chars)
173 def read_number(self) -> int | float:
174 """Read numeric literal."""
175 chars = []
176 has_dot = False
177 has_exp = False
179 while self.current_char() and (
180 self.current_char().isdigit() or self.current_char() in ".eE+-" # type: ignore[union-attr, operator]
181 ):
182 if self.current_char() == ".":
183 if has_dot: 183 ↛ 184line 183 didn't jump to line 184 because the condition on line 183 was never true
184 break
185 has_dot = True
186 elif self.current_char() in "eE": # type: ignore[operator]
187 if has_exp: 187 ↛ 188line 187 didn't jump to line 188 because the condition on line 187 was never true
188 break
189 has_exp = True
190 chars.append(self.current_char())
191 self.advance()
193 num_str = "".join(chars) # type: ignore[arg-type]
194 return float(num_str) if has_dot or has_exp else int(num_str)
196 def read_identifier(self) -> str:
197 """Read identifier or keyword."""
198 chars = []
199 while self.current_char() and (self.current_char().isalnum() or self.current_char() in "_"): # type: ignore[union-attr, operator, syntax, operator]
200 chars.append(self.current_char())
201 self.advance()
202 return "".join(chars) # type: ignore[arg-type]
204 def read_variable(self) -> str:
205 """Read variable name ($varname)."""
206 self.advance() # Skip $
207 return "$" + self.read_identifier()
209 def emit_indent_tokens(self, indent: int) -> None:
210 """Emit INDENT/DEDENT tokens based on indentation change.
212 Args:
213 indent: Current line's indentation level
215 Raises:
216 SyntaxError: If indentation is inconsistent.
217 """
218 current_indent = self.indent_stack[-1]
220 if indent > current_indent:
221 # Increased indentation
222 self.indent_stack.append(indent)
223 self.tokens.append(Token(TokenType.INDENT, indent, self.line, 1))
224 elif indent < current_indent:
225 # Decreased indentation - may need multiple DEDENTs
226 while self.indent_stack and indent < self.indent_stack[-1]:
227 self.indent_stack.pop()
228 self.tokens.append(Token(TokenType.DEDENT, indent, self.line, 1))
230 # Check for inconsistent indentation
231 if self.indent_stack and indent != self.indent_stack[-1]: 231 ↛ 232line 231 didn't jump to line 232 because the condition on line 231 was never true
232 raise SyntaxError(
233 f"Inconsistent indentation at line {self.line}: "
234 f"got {indent} spaces, expected {self.indent_stack[-1]}"
235 )
237 def tokenize(self) -> list[Token]:
238 """Tokenize entire input.
240 Returns:
241 List of tokens
243 Raises:
244 SyntaxError: On lexical errors
245 """
246 while self.pos < len(self.text):
247 # Handle indentation at line start
248 if self.at_line_start:
249 self.at_line_start = False
250 indent = self.measure_indent()
252 # Skip blank/comment lines
253 if indent == -1:
254 self.skip_whitespace()
255 self.skip_comment()
256 if self.current_char() == "\n":
257 self.advance()
258 continue
259 elif self.current_char() is None: 259 ↛ 264line 259 didn't jump to line 264 because the condition on line 259 was always true
260 break
261 else:
262 self.emit_indent_tokens(indent)
264 self.skip_whitespace()
265 self.skip_comment()
267 if not self.current_char():
268 break
270 line, col = self.line, self.column
271 char = self.current_char()
273 # Newline
274 if char == "\n":
275 self.tokens.append(Token(TokenType.NEWLINE, "\n", line, col))
276 self.advance()
278 # String
279 elif char in "\"'": # type: ignore[operator]
280 value = self.read_string()
281 self.tokens.append(Token(TokenType.STRING, value, line, col))
283 # Number
284 elif char.isdigit() or ( # type: ignore[union-attr]
285 char == "." and self.peek_char() and self.peek_char().isdigit() # type: ignore[union-attr]
286 ):
287 value = self.read_number() # type: ignore[assignment]
288 self.tokens.append(Token(TokenType.NUMBER, value, line, col))
290 # Variable
291 elif char == "$":
292 value = self.read_variable()
293 self.tokens.append(Token(TokenType.VARIABLE, value, line, col))
295 # Pipe
296 elif char == "|":
297 self.tokens.append(Token(TokenType.PIPE, "|", line, col))
298 self.advance()
300 # Assignment
301 elif char == "=":
302 self.tokens.append(Token(TokenType.ASSIGN, "=", line, col))
303 self.advance()
305 # Comma
306 elif char == ",":
307 self.tokens.append(Token(TokenType.COMMA, ",", line, col))
308 self.advance()
310 # Colon
311 elif char == ":":
312 self.tokens.append(Token(TokenType.COLON, ":", line, col))
313 self.advance()
315 # Parentheses
316 elif char == "(":
317 self.tokens.append(Token(TokenType.LPAREN, "(", line, col))
318 self.advance()
319 elif char == ")":
320 self.tokens.append(Token(TokenType.RPAREN, ")", line, col))
321 self.advance()
323 # Identifier or keyword
324 elif char.isalpha() or char == "_": # type: ignore[union-attr]
325 ident = self.read_identifier()
326 token_type = self.KEYWORDS.get(ident.lower(), TokenType.IDENTIFIER)
327 self.tokens.append(Token(token_type, ident, line, col))
329 else:
330 raise SyntaxError(f"Unexpected character '{char}' at line {line}, column {col}")
332 # Emit remaining DEDENTs at end of file
333 while len(self.indent_stack) > 1:
334 self.indent_stack.pop()
335 self.tokens.append(Token(TokenType.DEDENT, 0, self.line, self.column))
337 # Add EOF token
338 self.tokens.append(Token(TokenType.EOF, None, self.line, self.column))
339 return self.tokens
342@dataclass
343class ASTNode:
344 """Base class for AST nodes."""
346 line: int
347 column: int
350@dataclass
351class Assignment(ASTNode):
352 """Variable assignment: $var = expr."""
354 variable: str
355 expression: "Expression"
358@dataclass
359class Pipeline(ASTNode):
360 """Pipeline expression: expr | command | command."""
362 stages: list["Expression"]
365@dataclass
366class Command(ASTNode):
367 """Command invocation: command arg1 arg2."""
369 name: str
370 args: list["Expression"]
373@dataclass
374class FunctionCall(ASTNode):
375 """Function call: func(arg1, arg2)."""
377 name: str
378 args: list["Expression"]
381@dataclass
382class Variable(ASTNode):
383 """Variable reference: $var."""
385 name: str
388@dataclass
389class Literal(ASTNode):
390 """Literal value: string, number."""
392 value: str | int | float
395@dataclass
396class ForLoop(ASTNode):
397 """For loop: for $var in expr: body."""
399 variable: str
400 iterable: "Expression"
401 body: list["Statement"]
404# Type aliases
405Expression = Union[Pipeline, Command, FunctionCall, Variable, Literal]
406Statement = Union[Assignment, Pipeline, ForLoop]
409class Parser:
410 """Recursive descent parser for TraceKit DSL.
412 Parses token stream into abstract syntax tree.
413 Supports indentation-based block structure.
414 """
416 def __init__(self, tokens: list[Token]):
417 """Initialize parser with token list.
419 Args:
420 tokens: Token list from lexer
421 """
422 self.tokens = tokens
423 self.pos = 0
425 def current_token(self) -> Token:
426 """Get current token."""
427 if self.pos >= len(self.tokens): 427 ↛ 428line 427 didn't jump to line 428 because the condition on line 427 was never true
428 return self.tokens[-1] # EOF
429 return self.tokens[self.pos]
431 def peek_token(self, offset: int = 1) -> Token:
432 """Peek ahead at token."""
433 pos = self.pos + offset
434 if pos >= len(self.tokens): 434 ↛ 435line 434 didn't jump to line 435 because the condition on line 434 was never true
435 return self.tokens[-1] # EOF
436 return self.tokens[pos]
438 def advance(self) -> None:
439 """Advance to next token."""
440 if self.pos < len(self.tokens): 440 ↛ exitline 440 didn't return from function 'advance' because the condition on line 440 was always true
441 self.pos += 1
443 def expect(self, token_type: TokenType) -> Token:
444 """Expect specific token type and advance.
446 Args:
447 token_type: Expected token type
449 Returns:
450 The token
452 Raises:
453 SyntaxError: If token type doesn't match
454 """
455 token = self.current_token()
456 if token.type != token_type: 456 ↛ 457line 456 didn't jump to line 457 because the condition on line 456 was never true
457 raise SyntaxError(
458 f"Expected {token_type.name}, got {token.type.name} "
459 f"at line {token.line}, column {token.column}"
460 )
461 self.advance()
462 return token
464 def skip_newlines(self) -> None:
465 """Skip optional newlines."""
466 while self.current_token().type == TokenType.NEWLINE:
467 self.advance()
469 def parse(self) -> list[Statement]:
470 """Parse complete program.
472 Returns:
473 List of statements (AST)
475 Note:
476 May raise SyntaxError on parse errors via parse_statement().
477 """
478 statements = []
480 while self.current_token().type != TokenType.EOF:
481 self.skip_newlines()
482 if self.current_token().type == TokenType.EOF: 482 ↛ 483line 482 didn't jump to line 483 because the condition on line 482 was never true
483 break
485 stmt = self.parse_statement()
486 statements.append(stmt)
487 self.skip_newlines()
489 return statements
491 def parse_statement(self) -> Statement:
492 """Parse a single statement."""
493 # For loop
494 if self.current_token().type == TokenType.FOR:
495 return self.parse_for_loop()
497 # Assignment or expression
498 if self.current_token().type == TokenType.VARIABLE:
499 if self.peek_token().type == TokenType.ASSIGN:
500 return self.parse_assignment()
502 # Pipeline expression
503 return self.parse_pipeline() # type: ignore[return-value]
505 def parse_assignment(self) -> Assignment:
506 """Parse variable assignment."""
507 token = self.current_token()
508 var_token = self.expect(TokenType.VARIABLE)
509 self.expect(TokenType.ASSIGN)
510 expr = self.parse_pipeline()
512 return Assignment(
513 variable=var_token.value,
514 expression=expr,
515 line=token.line,
516 column=token.column,
517 )
519 def parse_pipeline(self) -> Expression:
520 """Parse pipeline expression."""
521 stages = [self.parse_primary()]
523 while self.current_token().type == TokenType.PIPE:
524 self.advance() # Skip |
525 stages.append(self.parse_primary())
527 if len(stages) == 1:
528 return stages[0]
530 return Pipeline(stages=stages, line=stages[0].line, column=stages[0].column)
532 def parse_primary(self) -> Expression:
533 """Parse primary expression."""
534 token = self.current_token()
536 # Literal string
537 if token.type == TokenType.STRING:
538 self.advance()
539 return Literal(value=token.value, line=token.line, column=token.column)
541 # Literal number
542 if token.type == TokenType.NUMBER:
543 self.advance()
544 return Literal(value=token.value, line=token.line, column=token.column)
546 # Variable
547 if token.type == TokenType.VARIABLE:
548 self.advance()
549 return Variable(name=token.value, line=token.line, column=token.column)
551 # Function call or command
552 if token.type in (
553 TokenType.IDENTIFIER,
554 TokenType.LOAD,
555 TokenType.FILTER,
556 TokenType.MEASURE,
557 TokenType.PLOT,
558 TokenType.EXPORT,
559 TokenType.GLOB,
560 ):
561 name = token.value
562 self.advance()
564 # Function call with parens
565 if self.current_token().type == TokenType.LPAREN:
566 return self.parse_function_call(name, token)
568 # Command with args
569 args = []
570 while self.current_token().type not in (
571 TokenType.PIPE,
572 TokenType.NEWLINE,
573 TokenType.EOF,
574 TokenType.COLON,
575 TokenType.INDENT,
576 TokenType.DEDENT,
577 ):
578 args.append(self.parse_primary())
580 return Command(name=name, args=args, line=token.line, column=token.column)
582 raise SyntaxError(
583 f"Unexpected token {token.type.name} at line {token.line}, column {token.column}"
584 )
586 def parse_function_call(self, name: str, token: Token) -> FunctionCall:
587 """Parse function call with parentheses."""
588 self.expect(TokenType.LPAREN)
590 args = []
591 while self.current_token().type != TokenType.RPAREN:
592 args.append(self.parse_primary())
593 if self.current_token().type == TokenType.COMMA:
594 self.advance()
596 self.expect(TokenType.RPAREN)
597 return FunctionCall(name=name, args=args, line=token.line, column=token.column)
599 def parse_for_loop(self) -> ForLoop:
600 """Parse for loop with indented body.
602 Supports both single-line body and multi-line indented blocks:
604 Single line:
605 for $f in glob("*.wfm"): load $f
607 Multi-line (indented block):
608 for $f in glob("*.wfm"):
609 $data = load $f
610 measure $data
611 plot $data
613 Returns:
614 ForLoop AST node.
615 """
616 token = self.current_token()
617 self.expect(TokenType.FOR)
619 var_token = self.expect(TokenType.VARIABLE)
620 self.expect(TokenType.IN)
622 iterable = self.parse_primary()
623 self.expect(TokenType.COLON)
625 body: list[Statement] = []
627 # Check if body follows on same line or is indented block
628 if self.current_token().type == TokenType.NEWLINE:
629 # Multi-line block: expect INDENT, statements, DEDENT
630 self.skip_newlines()
632 if self.current_token().type == TokenType.INDENT: 632 ↛ 649line 632 didn't jump to line 649 because the condition on line 632 was always true
633 self.advance() # Consume INDENT
635 # Parse statements until DEDENT
636 while self.current_token().type not in (TokenType.DEDENT, TokenType.EOF):
637 self.skip_newlines()
638 if self.current_token().type in (TokenType.DEDENT, TokenType.EOF): 638 ↛ 639line 638 didn't jump to line 639 because the condition on line 638 was never true
639 break
640 stmt = self.parse_statement()
641 body.append(stmt)
642 self.skip_newlines()
644 # Consume DEDENT if present
645 if self.current_token().type == TokenType.DEDENT: 645 ↛ 654line 645 didn't jump to line 654 because the condition on line 645 was always true
646 self.advance()
647 else:
648 # No INDENT after newline - parse single statement
649 body = [self.parse_statement()]
650 else:
651 # Single-line body (statement on same line as colon)
652 body = [self.parse_statement()]
654 return ForLoop(
655 variable=var_token.value,
656 iterable=iterable,
657 body=body,
658 line=token.line,
659 column=token.column,
660 )
663def parse_dsl(source: str) -> list[Statement]:
664 """Parse TraceKit DSL source code.
666 Args:
667 source: DSL source code
669 Returns:
670 Abstract syntax tree (list of statements)
672 Example:
673 >>> # Single-line for loop
674 >>> ast = parse_dsl('for $f in glob("*.wfm"): load $f')
676 >>> # Multi-line indented block
677 >>> ast = parse_dsl('''
678 ... for $f in glob("*.wfm"):
679 ... $data = load $f
680 ... measure $data
681 ... ''')
683 Note:
684 May raise SyntaxError on parse errors via tokenize() or parse().
685 """
686 lexer = Lexer(source)
687 tokens = lexer.tokenize()
688 parser = Parser(tokens)
689 return parser.parse()