Coverage for src/extratools_core/jsontools.py: 72%
146 statements
« prev ^ index » next coverage.py v7.8.1, created at 2025-06-26 04:18 -0700
« prev ^ index » next coverage.py v7.8.1, created at 2025-06-26 04:18 -0700
1import json
2import re
3import tomllib
4from csv import DictWriter
5from io import StringIO
6from pathlib import Path
7from re import Match, Pattern
8from types import NoneType
9from typing import Any, TypedDict
11import yaml
12from toolz.itertoolz import groupby
14type JsonDict = dict[str, Any]
16type DictOfJsonDicts = dict[str, JsonDict]
17type ListOfJsonDicts = list[JsonDict]
20class DictOfJsonDictsDiffUpdate(TypedDict):
21 old: JsonDict
22 new: JsonDict
25class DictOfJsonDictsDiff(TypedDict):
26 deletes: dict[str, JsonDict]
27 inserts: dict[str, JsonDict]
28 updates: dict[str, DictOfJsonDictsDiffUpdate]
31class ListOfJsonDictsDiff(TypedDict):
32 deletes: list[JsonDict]
33 inserts: list[JsonDict]
36def flatten(data: Any) -> Any:
37 def flatten_rec(data: Any, path: str) -> None:
38 if isinstance(data, dict):
39 for k, v in data.items():
40 flatten_rec(v, path + (f".{k}" if path else k))
41 elif isinstance(data, list):
42 for i, v in enumerate(data):
43 flatten_rec(v, path + f"[{i}]")
44 else:
45 flatten_dict[path or "."] = data
47 flatten_dict: JsonDict = {}
48 flatten_rec(data, "")
49 return flatten_dict
52def json_to_csv(
53 data: DictOfJsonDicts | ListOfJsonDicts,
54 /,
55 csv_path: Path | str | None = None,
56 *,
57 key_field_name: str = "_key",
58) -> str:
59 if isinstance(data, dict):
60 data = [
61 {
62 # In case there is already a key field in each record,
63 # the new key field will be overwritten.
64 # It is okay though as the existing key field is likely
65 # serving the purpose of containing keys.
66 key_field_name: key,
67 **value,
68 }
69 for key, value in data.items()
70 ]
72 fields: set[str] = set()
73 for record in data:
74 fields.update(record.keys())
76 sio = StringIO()
78 writer = DictWriter(sio, fieldnames=fields)
79 writer.writeheader()
80 writer.writerows(data)
82 csv_str: str = sio.getvalue()
84 if csv_path:
85 Path(csv_path).write_text(csv_str)
87 return csv_str
90def dict_of_json_dicts_diff(
91 old: DictOfJsonDicts,
92 new: DictOfJsonDicts,
93) -> DictOfJsonDictsDiff:
94 inserts: dict[str, JsonDict] = {}
95 updates: dict[str, DictOfJsonDictsDiffUpdate] = {}
97 for new_key, new_value in new.items():
98 old_value: dict[str, Any] | None = old.get(new_key, None)
99 if old_value is None:
100 inserts[new_key] = new_value
101 elif json.dumps(old_value) != json.dumps(new_value):
102 updates[new_key] = {
103 "old": old_value,
104 "new": new_value,
105 }
107 deletes: dict[str, JsonDict] = {
108 old_key: old_value
109 for old_key, old_value in old.items()
110 if old_key not in new
111 }
113 return {
114 "deletes": deletes,
115 "inserts": inserts,
116 "updates": updates,
117 }
120def list_of_json_dicts_diff(
121 old: ListOfJsonDicts,
122 new: ListOfJsonDicts,
123) -> ListOfJsonDictsDiff:
124 old_dict: DictOfJsonDicts = {
125 json.dumps(d): d
126 for d in old
127 }
128 new_dict: DictOfJsonDicts = {
129 json.dumps(d): d
130 for d in new
131 }
133 inserts: list[JsonDict] = [
134 new_value
135 for new_key, new_value in new_dict.items()
136 if new_key not in old_dict
137 ]
138 deletes: list[JsonDict] = [
139 old_value
140 for old_key, old_value in old_dict.items()
141 if old_key not in new_dict
142 ]
144 return {
145 "deletes": deletes,
146 "inserts": inserts,
147 }
150def merge_json(
151 *values: Any,
152 concat_lists: bool = True,
153) -> Any:
154 def merge_json_dicts(*jds: JsonDict) -> JsonDict:
155 groups: dict[str, list[JsonDict]] = groupby(
156 lambda kv_tuple: kv_tuple[0],
157 (
158 kv_tuple
159 for jd in jds
160 for kv_tuple in jd.items()
161 ),
162 )
164 return {
165 key: merge_json(
166 *[value for _, value in kv_tuples],
167 concat_lists=concat_lists,
168 )
169 for key, kv_tuples in groups.items()
170 }
172 first_value_type: type | None = None
174 not_none_values = []
176 for value in values:
177 value_type: type = type(value)
178 if value_type is NoneType:
179 continue
181 if first_value_type is None:
182 first_value_type = value_type
183 elif first_value_type != value_type:
184 raise ValueError
186 not_none_values.append(value)
188 if first_value_type is None or first_value_type is NoneType:
189 return None
191 if first_value_type is dict:
192 return merge_json_dicts(*not_none_values)
194 if first_value_type is list and concat_lists:
195 return [
196 item
197 for value in not_none_values
198 for item in value
199 ]
201 return not_none_values[-1]
204__PATH_PATTERN: Pattern = re.compile(r"(?:\.(?P<field>[^\.\[\]]+)|\[(?P<index>[0-9]+)\])(?P<remaining>.*)") # noqa: E501
207def get_by_path(data: Any, path: str) -> Any:
208 match: Match | None = __PATH_PATTERN.fullmatch(path)
209 if not match:
210 raise ValueError
212 new_data: Any
213 try:
214 if field := match.group("field"):
215 if not isinstance(data, dict):
216 raise LookupError
218 new_data = data[field]
219 elif index := match.group("index"):
220 if not isinstance(data, list):
221 raise LookupError
223 new_data = data[int(index)]
224 else:
225 # This should be unreachable
226 raise NotImplementedError
227 except (IndexError, KeyError) as e:
228 raise LookupError from e
230 remaining_path: str = match.group("remaining")
231 if remaining_path:
232 return get_by_path(new_data, remaining_path)
234 return new_data
237def set_by_path(data: Any, path: str, value: Any) -> None:
238 match: Match | None = __PATH_PATTERN.fullmatch(path)
239 if not match:
240 raise ValueError
242 remaining_path: str = match.group("remaining")
244 try:
245 if field := match.group("field"):
246 if not isinstance(data, dict):
247 raise LookupError
249 if field not in data and remaining_path:
250 data[field] = {}
252 if remaining_path:
253 set_by_path(data[field], remaining_path, value)
254 else:
255 data[field] = value
256 elif index := match.group("index"):
257 if not isinstance(data, list):
258 raise LookupError
260 index = int(index)
262 if remaining_path:
263 set_by_path(data[index], remaining_path, value)
264 else:
265 data[index] = value
266 else:
267 # This should be unreachable
268 raise NotImplementedError
269 except (IndexError, KeyError) as e:
270 raise LookupError from e
273def read_json_from(path: Path | str) -> Any:
274 path = Path(path).expanduser()
276 content: str = path.read_text()
277 match path.suffix.lower():
278 case ".json":
279 return json.loads(content)
280 case ".toml":
281 return tomllib.loads(content)
282 case ".yaml" | ".yml":
283 return yaml.safe_load(content)
284 case _:
285 raise ValueError