Coverage for src/extratools_core/jsontools.py: 72%

146 statements  

« prev     ^ index     » next       coverage.py v7.8.1, created at 2025-06-26 03:52 -0700

1import json 

2import re 

3import tomllib 

4from csv import DictWriter 

5from io import StringIO 

6from pathlib import Path 

7from re import Match, Pattern 

8from types import NoneType 

9from typing import Any, TypedDict 

10 

11import yaml 

12from toolz.itertoolz import groupby 

13 

14type JsonDict = dict[str, Any] 

15 

16type DictOfJsonDicts = dict[str, JsonDict] 

17type ListOfJsonDicts = list[JsonDict] 

18 

19 

20class DictOfJsonDictsDiffUpdate(TypedDict): 

21 old: JsonDict 

22 new: JsonDict 

23 

24 

25class DictOfJsonDictsDiff(TypedDict): 

26 deletes: dict[str, JsonDict] 

27 inserts: dict[str, JsonDict] 

28 updates: dict[str, DictOfJsonDictsDiffUpdate] 

29 

30 

31class ListOfJsonDictsDiff(TypedDict): 

32 deletes: list[JsonDict] 

33 inserts: list[JsonDict] 

34 

35 

36def flatten(data: Any) -> Any: 

37 def flatten_rec(data: Any, path: str) -> None: 

38 if isinstance(data, dict): 

39 for k, v in data.items(): 

40 flatten_rec(v, path + (f".{k}" if path else k)) 

41 elif isinstance(data, list): 

42 for i, v in enumerate(data): 

43 flatten_rec(v, path + f"[{i}]") 

44 else: 

45 flatten_dict[path or "."] = data 

46 

47 flatten_dict: JsonDict = {} 

48 flatten_rec(data, "") 

49 return flatten_dict 

50 

51 

52def json_to_csv( 

53 data: DictOfJsonDicts | ListOfJsonDicts, 

54 /, 

55 csv_path: Path | str | None = None, 

56 *, 

57 key_field_name: str = "_key", 

58) -> str: 

59 if isinstance(data, dict): 

60 data = [ 

61 { 

62 # In case there is already a key field in each record, 

63 # the new key field will be overwritten. 

64 # It is okay though as the existing key field is likely 

65 # serving the purpose of containing keys. 

66 key_field_name: key, 

67 **value, 

68 } 

69 for key, value in data.items() 

70 ] 

71 

72 fields: set[str] = set() 

73 for record in data: 

74 fields.update(record.keys()) 

75 

76 sio = StringIO() 

77 

78 writer = DictWriter(sio, fieldnames=fields) 

79 writer.writeheader() 

80 writer.writerows(data) 

81 

82 csv_str: str = sio.getvalue() 

83 

84 if csv_path: 

85 Path(csv_path).write_text(csv_str) 

86 

87 return csv_str 

88 

89 

90def dict_of_json_dicts_diff( 

91 old: DictOfJsonDicts, 

92 new: DictOfJsonDicts, 

93) -> DictOfJsonDictsDiff: 

94 inserts: dict[str, JsonDict] = {} 

95 updates: dict[str, DictOfJsonDictsDiffUpdate] = {} 

96 

97 for new_key, new_value in new.items(): 

98 old_value: dict[str, Any] | None = old.get(new_key, None) 

99 if old_value is None: 

100 inserts[new_key] = new_value 

101 elif json.dumps(old_value) != json.dumps(new_value): 

102 updates[new_key] = { 

103 "old": old_value, 

104 "new": new_value, 

105 } 

106 

107 deletes: dict[str, JsonDict] = { 

108 old_key: old_value 

109 for old_key, old_value in old.items() 

110 if old_key not in new 

111 } 

112 

113 return { 

114 "deletes": deletes, 

115 "inserts": inserts, 

116 "updates": updates, 

117 } 

118 

119 

120def list_of_json_dicts_diff( 

121 old: ListOfJsonDicts, 

122 new: ListOfJsonDicts, 

123) -> ListOfJsonDictsDiff: 

124 old_dict: DictOfJsonDicts = { 

125 json.dumps(d): d 

126 for d in old 

127 } 

128 new_dict: DictOfJsonDicts = { 

129 json.dumps(d): d 

130 for d in new 

131 } 

132 

133 inserts: list[JsonDict] = [ 

134 new_value 

135 for new_key, new_value in new_dict.items() 

136 if new_key not in old_dict 

137 ] 

138 deletes: list[JsonDict] = [ 

139 old_value 

140 for old_key, old_value in old_dict.items() 

141 if old_key not in new_dict 

142 ] 

143 

144 return { 

145 "deletes": deletes, 

146 "inserts": inserts, 

147 } 

148 

149 

150def merge_json( 

151 *values: Any, 

152 concat_lists: bool = True, 

153) -> Any: 

154 def merge_json_dicts(*jds: JsonDict) -> JsonDict: 

155 groups: dict[str, list[JsonDict]] = groupby( 

156 lambda kv_tuple: kv_tuple[0], 

157 ( 

158 kv_tuple 

159 for jd in jds 

160 for kv_tuple in jd.items() 

161 ), 

162 ) 

163 

164 return { 

165 key: merge_json( 

166 *[value for _, value in kv_tuples], 

167 concat_lists=concat_lists, 

168 ) 

169 for key, kv_tuples in groups.items() 

170 } 

171 

172 first_value_type: type | None = None 

173 

174 not_none_values = [] 

175 

176 for value in values: 

177 value_type: type = type(value) 

178 if value_type is NoneType: 

179 continue 

180 

181 if first_value_type is None: 

182 first_value_type = value_type 

183 elif first_value_type != value_type: 

184 raise ValueError 

185 

186 not_none_values.append(value) 

187 

188 if first_value_type is None or first_value_type is NoneType: 

189 return None 

190 

191 if first_value_type is dict: 

192 return merge_json_dicts(*not_none_values) 

193 

194 if first_value_type is list and concat_lists: 

195 return [ 

196 item 

197 for value in not_none_values 

198 for item in value 

199 ] 

200 

201 return not_none_values[-1] 

202 

203 

204__PATH_PATTERN: Pattern = re.compile(r"(?:\.(?P<field>\w+)|\[(?P<index>[0-9]+)\])(?P<remaining>.*)") 

205 

206 

207def get_by_path(data: Any, path: str) -> Any: 

208 match: Match | None = __PATH_PATTERN.fullmatch(path) 

209 if not match: 

210 raise ValueError 

211 

212 new_data: Any 

213 try: 

214 if field := match.group("field"): 

215 if not isinstance(data, dict): 

216 raise LookupError 

217 

218 new_data = data[field] 

219 elif index := match.group("index"): 

220 if not isinstance(data, list): 

221 raise LookupError 

222 

223 new_data = data[int(index)] 

224 else: 

225 # This should be unreachable 

226 raise NotImplementedError 

227 except (IndexError, KeyError) as e: 

228 raise LookupError from e 

229 

230 remaining_path: str = match.group("remaining") 

231 if remaining_path: 

232 return get_by_path(new_data, remaining_path) 

233 

234 return new_data 

235 

236 

237def set_by_path(data: Any, path: str, value: Any) -> None: 

238 match: Match | None = __PATH_PATTERN.fullmatch(path) 

239 if not match: 

240 raise ValueError 

241 

242 remaining_path: str = match.group("remaining") 

243 

244 try: 

245 if field := match.group("field"): 

246 if not isinstance(data, dict): 

247 raise LookupError 

248 

249 if field not in data and remaining_path: 

250 data[field] = {} 

251 

252 if remaining_path: 

253 set_by_path(data[field], remaining_path, value) 

254 else: 

255 data[field] = value 

256 elif index := match.group("index"): 

257 if not isinstance(data, list): 

258 raise LookupError 

259 

260 index = int(index) 

261 

262 if remaining_path: 

263 set_by_path(data[index], remaining_path, value) 

264 else: 

265 data[index] = value 

266 else: 

267 # This should be unreachable 

268 raise NotImplementedError 

269 except (IndexError, KeyError) as e: 

270 raise LookupError from e 

271 

272 

273def read_json_from(path: Path | str) -> Any: 

274 path = Path(path).expanduser() 

275 

276 content: str = path.read_text() 

277 match path.suffix.lower(): 

278 case ".json": 

279 return json.loads(content) 

280 case ".toml": 

281 return tomllib.loads(content) 

282 case ".yaml" | ".yml": 

283 return yaml.safe_load(content) 

284 case _: 

285 raise ValueError