Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mt_metadata \ mt_metadata \ base \ schema.py: 26%

160 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-10 00:11 -0800

1# -*- coding: utf-8 -*- 

2""" 

3Created on Thu Dec 24 12:02:12 2020 

4 

5:copyright: 

6 Jared Peacock (jpeacock@usgs.gov) 

7 

8:license: MIT 

9 

10""" 

11import json 

12from collections import OrderedDict 

13from collections.abc import MutableMapping 

14from copy import deepcopy 

15from operator import itemgetter 

16 

17# ============================================================================= 

18# Imports 

19# ============================================================================= 

20from pathlib import Path 

21 

22from loguru import logger 

23 

24from mt_metadata import REQUIRED_KEYS 

25from mt_metadata.base.helpers import NumpyEncoder 

26from mt_metadata.utils import validators 

27from mt_metadata.utils.exceptions import MTSchemaError 

28 

29 

30# ============================================================================= 

31# base dictionary 

32# ============================================================================= 

33class BaseDict(MutableMapping): 

34 """ 

35 BaseDict is a convenience class that can help the metadata dictionaries 

36 act like classes so you can access variables by .name or [name] 

37 

38 .. note:: If the attribute has a . in the name then you will not be able 

39 to access that attribute by class.name.name You will get an 

40 attribute error. You need to access the attribute like a 

41 dictionary class['name.name'] 

42 

43 You can add an attribute by: 

44 

45 >>> b = BaseDict() 

46 >>> b.update({name: value_dict}) 

47 

48 Or you can add a whole dictionary: 

49 

50 >>> b.add_dict(ATTR_DICT['run']) 

51 

52 All attributes have a descriptive dictionary of the form: 

53 

54 >>> {'type': data type, 'required': [True | False], 

55 >>> ... 'style': 'string style', 'units': attribute units} 

56 

57 * **type** --> the data type [ str | int | float | bool ] 

58 * **required** --> required in the standards [ True | False ] 

59 * **style** --> style of the string 

60 * **units** --> units of the attribute, must be a string 

61 """ 

62 

63 def __init__(self, *args, **kwargs): 

64 self.update(dict(*args, **kwargs)) 

65 

66 def __setitem__(self, key, value): 

67 self.__dict__[key] = validators.validate_value_dict(value) 

68 

69 def __getitem__(self, key): 

70 try: 

71 return self.__dict__[key] 

72 except KeyError as error: 

73 msg = ( 

74 f"{error}, {key} is not in dictionary yet. " 

75 "Returning default schema dictionary." 

76 ) 

77 logger.debug(msg) 

78 return { 

79 "type": "string", 

80 "required": False, 

81 "style": "free form", 

82 "units": None, 

83 "options": None, 

84 "description": "user defined", 

85 "example": None, 

86 "default": None, 

87 } 

88 

89 def __delitem__(self, key): 

90 try: 

91 del self.__dict__[key] 

92 except KeyError: 

93 msg = "Key: {0} does not exist".format(key) 

94 logger.info(msg) 

95 

96 def __iter__(self): 

97 return iter(self.__dict__) 

98 

99 def __len__(self): 

100 return len(self.__dict__) 

101 

102 # The final two methods aren't required, but nice for demo purposes: 

103 def __str__(self): 

104 """returns simple dict representation of the mapping""" 

105 s = dict(sorted(self.__dict__.items(), key=itemgetter(0))) 

106 lines = [] 

107 for key, value in s.items(): 

108 if key in ["logger"]: 

109 continue 

110 lines.append("{0}:".format(key)) 

111 for name, info in value.items(): 

112 lines.append("\t{0}: {1}".format(name, info)) 

113 return "\n".join(lines) 

114 

115 def __repr__(self): 

116 """echoes class, id, & reproducible representation in the REPL""" 

117 return "{}, BaseDict({})".format( 

118 super(BaseDict, self).__repr__(), self.__dict__ 

119 ) 

120 

121 @property 

122 def name(self): 

123 try: 

124 return list(self.keys())[0] 

125 except KeyError: 

126 return None 

127 

128 def add_dict(self, add_dict, name=None, keys=None): 

129 """ 

130 Add a dictionary to. If name is input it is added to the keys of 

131 the input dictionary 

132 

133 Parameters 

134 ---------- 

135 add_dict : dict or MutableMapping 

136 dictionary to add 

137 name : str, optional 

138 name to add to keys, by default None 

139 

140 Examples 

141 -------- 

142 >>> s_obj = Standards() 

143 >>> run_dict = s_obj.run_dict 

144 >>> run_dict.add_dict(s_obj.declination_dict, 'declination') 

145 """ 

146 if not isinstance(add_dict, (dict, MutableMapping)): 

147 msg = "add_dict takes only a dictionary not type {0}".format(type(add_dict)) 

148 logger.error(msg) 

149 raise TypeError(msg) 

150 

151 if keys: 

152 small_dict = {} 

153 for key, value in add_dict.items(): 

154 if key in keys: 

155 small_dict[key] = value 

156 add_dict = small_dict 

157 

158 if name: 

159 add_dict = dict( 

160 [ 

161 ("{0}.{1}".format(name, key), value) 

162 for key, value in add_dict.items() 

163 ] 

164 ) 

165 

166 self.update(**add_dict) 

167 

168 def copy(self): 

169 return deepcopy(self) 

170 

171 def to_latex(self, max_entries=7, first_table_len=7): 

172 """ 

173 Convert to LaTeX format 

174 

175 Parameters 

176 ---------- 

177 max_entries : int, optional 

178 Maximum number of entries, by default 7 

179 first_table_len : int, optional 

180 Length of first table, by default 7 

181 

182 Returns 

183 ------- 

184 TYPE 

185 DESCRIPTION 

186 """ 

187 beginning = [ 

188 r"\clearpage", 

189 r"\\newpage", 

190 r"\\begin{table}[h!]", 

191 r"\caption*{{Attributes for {0} Category}}".format(self.name), 

192 r"\\begin{tabular}{p{.305\\textwidth}p{.47\\textwidth}p{.2\\textwidth}}", 

193 ] 

194 

195 end = [r"\end{tabular}", r"\label{tab:}", r"\end{table}"] 

196 header = [ 

197 " & ".join( 

198 [ 

199 r"\textbf{Metadata Key}", 

200 r"\textbf{Description}", 

201 r"\textbf{Example}", 

202 ] 

203 ) 

204 + " \\ \toprule" 

205 ] 

206 

207 order = [ 

208 "name", 

209 "required", 

210 "units", 

211 "type", 

212 "style", 

213 "description", 

214 "example", 

215 ] 

216 

217 level_dict = OrderedDict(sorted(self.items(), key=itemgetter(0))) 

218 

219 ntables = int(len(level_dict) / max_entries) 

220 if len(level_dict) // max_entries > 0: 

221 ntables += 1 

222 

223 lines = [] 

224 for name, v_dict in level_dict.items(): 

225 if not v_dict["options"] in [None, "none", "None", []]: 

226 v_dict["description"] += ". Options: {0}".format(v_dict["options"]) 

227 line = [ 

228 r"\entry{{{0}}}".format(name) 

229 + "".join(["{{{0}}}".format(v_dict[ii]) for ii in order[1:]]) 

230 ] 

231 lines.append(line[0]) 

232 

233 all_lines = beginning + header + ["\n".join(lines[0:first_table_len])] + end 

234 for ii in range(ntables - 1): 

235 stable = beginning + header 

236 for kk in range(max_entries): 

237 index = first_table_len + max_entries * ii + kk 

238 try: 

239 stable.append(lines[index].replace("_", r"\_")) 

240 except IndexError: 

241 break 

242 stable += end 

243 all_lines.append("\n".join(stable)) 

244 

245 return all_lines 

246 

247 def from_csv(self, csv_fn): 

248 """ 

249 Read in CSV file as a dictionary 

250 

251 Parameters 

252 ---------- 

253 csv_fn : pathlib.Path or str 

254 csv file to read metadata standards from 

255 

256 Returns 

257 ------- 

258 dict 

259 dictionary of the contents of the file 

260 

261 Examples 

262 -------- 

263 

264 >>> run_dict = BaseDict() 

265 >>> run_dict.from_csv(get_level_fn('run')) 

266 

267 """ 

268 csv_fn = Path(csv_fn) 

269 if not csv_fn.exists(): 

270 msg = f"Schema file {csv_fn} does not exist." 

271 logger.error(msg) 

272 raise MTSchemaError(msg) 

273 

274 with open(csv_fn, "r") as fid: 

275 logger.debug(f"Reading schema CSV {csv_fn}") 

276 lines = fid.readlines() 

277 

278 header = validators.validate_header( 

279 [ss.strip().lower() for ss in lines[0].strip().split(",")], 

280 attribute=True, 

281 ) 

282 attribute_dict = {} 

283 for line in lines[1:]: 

284 if len(line) < 2: 

285 continue 

286 line_dict = dict( 

287 [ 

288 (key, ss.strip()) 

289 for key, ss in zip(header, line.strip().split(",", len(header) - 1)) 

290 ] 

291 ) 

292 

293 key_name = validators.validate_attribute(line_dict["attribute"]) 

294 line_dict.pop("attribute") 

295 

296 attribute_dict[key_name] = validators.validate_value_dict(line_dict) 

297 

298 self.update(attribute_dict) 

299 

300 def to_csv(self, csv_fn): 

301 """ 

302 write dictionary to csv file 

303 

304 Parameters 

305 ---------- 

306 csv_fn : TYPE 

307 DESCRIPTION 

308 

309 Returns 

310 ------- 

311 TYPE 

312 DESCRIPTION 

313 """ 

314 

315 if not isinstance(csv_fn, Path): 

316 csv_fn = Path(csv_fn) 

317 

318 # sort dictionary first 

319 lines = [",".join(REQUIRED_KEYS)] 

320 for key in sorted(list(self.keys())): 

321 line = [key] 

322 for rkey in REQUIRED_KEYS[1:]: 

323 value = self[key][rkey] 

324 if isinstance(value, (list, tuple)): 

325 if len(value) == 0: 

326 line.append("None") 

327 else: 

328 line.append( 

329 '"{0}"'.format(value).replace(",", "|").replace("'", "") 

330 ) 

331 else: 

332 line.append("{0}".format(self[key][rkey])) 

333 lines.append(",".join(line)) 

334 

335 with csv_fn.open("w") as fid: 

336 fid.write("\n".join(lines)) 

337 logger.info("Wrote dictionary to {0}".format(csv_fn)) 

338 return csv_fn 

339 

340 def to_json(self, json_fn, indent=" " * 4): 

341 """ 

342 Write schema standards to json 

343 

344 Parameters 

345 ---------- 

346 json_fn : str or Path 

347 full path to json file 

348 indent : str, optional 

349 indentation string, by default " " * 4 

350 

351 Returns 

352 ------- 

353 Path 

354 full path to json file 

355 """ 

356 

357 json_fn = Path(json_fn) 

358 

359 json_dict = dict([(k, v) for k, v in self.items() if k not in ["logger"]]) 

360 with open(json_fn, "w") as fid: 

361 json.dump(json_dict, fid, cls=NumpyEncoder, indent=indent) 

362 

363 return json_fn 

364 

365 def from_json(self, json_fn): 

366 """ 

367 Read schema standards from json 

368 

369 Parameters 

370 ---------- 

371 json_fn : str or Path 

372 full path to json file 

373 

374 Returns 

375 ------- 

376 Path 

377 full path to json file 

378 """ 

379 

380 json_fn = Path(json_fn) 

381 if not json_fn.exists(): 

382 msg = f"JSON schema file {json_fn} does not exist" 

383 logger.error(msg) 

384 MTSchemaError(msg) 

385 

386 with open(json_fn, "r") as fid: 

387 json_dict = json.load(fid) 

388 

389 valid_dict = {} 

390 for k, v in json_dict.items(): 

391 valid_dict[k] = validators.validate_value_dict(v) 

392 self.update(valid_dict) 

393 

394 

395def get_schema_fn(schema_element, paths): 

396 """ 

397 Get the correct file name for the given schema element from the provided 

398 list of valid file names 

399 

400 Parameters 

401 ---------- 

402 schema_element : str 

403 name of the schema element to get filename for 

404 paths : list 

405 list of valid file paths 

406 

407 Returns 

408 ------- 

409 pathlib.Path 

410 correct file name for given element 

411 """ 

412 for fn in paths: 

413 if schema_element == fn.stem: 

414 return fn 

415 msg = f"Could not find schema element {schema_element}.json in {paths[0].parent}." 

416 raise MTSchemaError(msg) 

417 

418 

419def get_schema(schema_element, paths): 

420 """ 

421 Get a :class:`mt_metadata.schema_base.BaseDict` object of the element 

422 

423 Parameters 

424 ---------- 

425 schema_element : str 

426 name of the schema element to get filename for 

427 paths : list 

428 list of valid file paths 

429 

430 Returns 

431 ------- 

432 mt_metadata.schema_base.BaseDict 

433 return a dictionary that describes the standards for the element 

434 """ 

435 

436 schema_fn = get_schema_fn(schema_element, paths) 

437 element_dict = BaseDict() 

438 element_dict.from_json(schema_fn) 

439 

440 return element_dict