Coverage for src/certus/parsers/struct.py: 100%
69 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-04 15:55 +0100
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-04 15:55 +0100
1"""Module for the JSON (structured output) parser."""
3import json
4import re
5import typing
7from certus import nodes
9JSONNodeType: typing.TypeAlias = nodes.Object | nodes.Array | nodes.Composite | nodes.Token
10JSONPrimitiveType: typing.TypeAlias = None | bool | int | float | str
11JSONDataType: typing.TypeAlias = (
12 JSONPrimitiveType | list["JSONDataType"] | dict[str, "JSONDataType"]
13)
14KwargsType: typing.TypeAlias = dict[str, typing.Any]
15TokenSpanType: typing.TypeAlias = typing.Sequence[nodes.Token]
18def parse_json(
19 data: JSONDataType, tokens: TokenSpanType, dumps_kw: KwargsType | None = None
20) -> JSONNodeType:
21 """
22 Parse JSON recursively into a node tree.
24 Parameters
25 ----------
26 data : JSON-like
27 Data to parse.
28 tokens : sequence of Token
29 Token nodes.
30 dumps_kw : dict, optional
31 Keyword arguments for `json.dumps()`. If not provided, defaults
32 to an empty dictionary.
34 Raises
35 ------
36 ValueError
37 If `data` is not valid JSON.
38 RuntimeError
39 If a span or element of a span cannot be found, which really
40 should not happen. Ensure that `tokens` and `dumps_kw` are
41 correct for your `data`.
43 Returns
44 -------
45 JSONNodeType
46 Parsed token node.
47 """
48 dumps_kw = dumps_kw or {}
49 node, _ = _parse_json(data, tokens, dumps_kw)
51 return node
54def _parse_json(
55 data: JSONDataType, tokens: TokenSpanType, dumps_kw: KwargsType, offset: int = 0
56) -> tuple[JSONNodeType, int]:
57 """
58 Parse JSON into a node tree, tracking position by absolute offset.
60 Parameters
61 ----------
62 data : JSON-like
63 Data to parse.
64 tokens : sequence of Token
65 Token nodes.
66 dumps_kw : dict
67 Keyword arguments for `json.dumps()`.
69 Returns
70 -------
71 JSONNodeType
72 Parsed token node.
73 int
74 Index of first unused token after this subtree.
75 """
76 if data is not None and not isinstance(data, (str, bool, int, float, list, dict)):
77 raise ValueError(f"Invalid JSON data: {data=}, {type(data)=}")
79 start, end = _find_token_span(data, tokens, dumps_kw, offset)
80 token_span = tokens[start:end]
82 if isinstance(data, dict):
83 fields = {}
84 for key, value in data.items():
85 node, start = _parse_json(value, tokens, dumps_kw, start)
86 fields[key] = node
88 return nodes.Object(fields=fields), end
90 if isinstance(data, list):
91 elements = []
92 for item in data:
93 node, start = _parse_json(item, tokens, dumps_kw, start)
94 elements.append(node)
96 return nodes.Array(elements=elements), end
98 if len(token_span) == 1:
99 return token_span[0], end
101 return nodes.Composite(children=token_span), end
104def _find_token_span(
105 data: JSONDataType, tokens: TokenSpanType, dumps_kw: KwargsType, offset: int
106) -> tuple[int, int]:
107 """
108 Find absolute indices for the token span of some data.
110 Parameters
111 ----------
112 data : JSON-like
113 Data to parse.
114 tokens : sequence of Token
115 Token nodes.
116 dumps_kw : dict, optional
117 Keyword arguments for `json.dumps()`.
118 offset : int
119 Index in `tokens` from which to start parsing.
121 Returns
122 -------
123 tuple of (int, int)
124 Start and end indices of the span.
125 """
126 pattern = _make_regex_from_json(data, dumps_kw)
127 observed = "".join(t.value for t in tokens[offset:])
129 search = re.search(pattern, observed, re.DOTALL)
130 if search is None:
131 raise RuntimeError(f"Unable to find span for {data=}")
133 start = _find_span_start(tokens, search, offset)
134 end = _find_span_end(tokens, pattern, start)
136 return start, end
139def _make_regex_from_json(data: JSONDataType, dumps_kw: KwargsType) -> str:
140 """
141 Create a regular expression from a piece of JSON data.
143 The resultant pattern allows for flexible (or non-existent)
144 whitespace outside string literals. We do this by enforcing
145 indentation when dumping the data to a string and then iterating
146 over the segments inside and outside double-quotes.
148 Parameters
149 ----------
150 data : JSON-like
151 Data to transform.
152 dumps_kw : dict
153 Keyword arguments to pass to `json.dumps()` when dumping `data`.
155 Returns
156 -------
157 re.Pattern
158 Regular expression of `data` with flexible whitespace.
159 """
160 dumps_kw = dumps_kw.copy()
161 indent = dumps_kw.pop("indent", 1)
162 dumped = json.dumps(data, indent=indent, **dumps_kw)
164 segments = re.split(r'("(?:[^"\\]|\\.)*")', dumped)
166 parts = []
167 for i, segment in enumerate(segments):
168 escaped = re.escape(segment)
169 if i % 2 == 0:
170 parts.append(re.sub(r"\s+", r"\\s*", escaped))
171 else:
172 parts.append(escaped)
174 return re.sub(r"(\\\\s\*)+", r"\\s*", "".join(parts))
177def _find_span_start(tokens: TokenSpanType, search: re.Match, offset: int) -> int:
178 """
179 Find the absolute start index of a span from a regex search.
181 Parameters
182 ----------
183 tokens : sequence of Token
184 The full token list.
185 search : re.Match
186 Match object from a regex search over the concatenated tokens.
187 Used to find the local start.
188 offset : int
189 Index of the token where the matchable substring begins.
191 Returns
192 -------
193 int
194 Absolute start index of the token span.
195 """
196 char_count = 0
197 char_start = search.start()
198 for idx, token in enumerate(tokens[offset:], start=offset):
199 char_count += len(token.value)
200 if char_count > char_start:
201 return idx
203 raise RuntimeError(f"Unable to find start index for {search=}")
206def _find_span_end(tokens: TokenSpanType, pattern: str, start: int) -> int:
207 """
208 Find the absolute end index of a span from a pattern.
210 Parameters
211 ----------
212 tokens : sequence of Token
213 The full token list.
214 pattern : str
215 The regex pattern.
216 start : int
217 Absolute start index of the span.
219 Returns
220 -------
221 int
222 Absolute end index (exclusive) in `tokens`.
223 """
224 text = ""
225 for idx, token in enumerate(tokens[start:], start=start):
226 text += token.value
227 if re.search(pattern, text, re.DOTALL):
228 return idx + 1
230 raise RuntimeError(f"Unable to find end index for {pattern=}")