Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mt_metadata \ mt_metadata \ base \ schema.py: 26%
160 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:11 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:11 -0800
1# -*- coding: utf-8 -*-
2"""
3Created on Thu Dec 24 12:02:12 2020
5:copyright:
6 Jared Peacock (jpeacock@usgs.gov)
8:license: MIT
10"""
11import json
12from collections import OrderedDict
13from collections.abc import MutableMapping
14from copy import deepcopy
15from operator import itemgetter
17# =============================================================================
18# Imports
19# =============================================================================
20from pathlib import Path
22from loguru import logger
24from mt_metadata import REQUIRED_KEYS
25from mt_metadata.base.helpers import NumpyEncoder
26from mt_metadata.utils import validators
27from mt_metadata.utils.exceptions import MTSchemaError
30# =============================================================================
31# base dictionary
32# =============================================================================
33class BaseDict(MutableMapping):
34 """
35 BaseDict is a convenience class that can help the metadata dictionaries
36 act like classes so you can access variables by .name or [name]
38 .. note:: If the attribute has a . in the name then you will not be able
39 to access that attribute by class.name.name You will get an
40 attribute error. You need to access the attribute like a
41 dictionary class['name.name']
43 You can add an attribute by:
45 >>> b = BaseDict()
46 >>> b.update({name: value_dict})
48 Or you can add a whole dictionary:
50 >>> b.add_dict(ATTR_DICT['run'])
52 All attributes have a descriptive dictionary of the form:
54 >>> {'type': data type, 'required': [True | False],
55 >>> ... 'style': 'string style', 'units': attribute units}
57 * **type** --> the data type [ str | int | float | bool ]
58 * **required** --> required in the standards [ True | False ]
59 * **style** --> style of the string
60 * **units** --> units of the attribute, must be a string
61 """
63 def __init__(self, *args, **kwargs):
64 self.update(dict(*args, **kwargs))
66 def __setitem__(self, key, value):
67 self.__dict__[key] = validators.validate_value_dict(value)
69 def __getitem__(self, key):
70 try:
71 return self.__dict__[key]
72 except KeyError as error:
73 msg = (
74 f"{error}, {key} is not in dictionary yet. "
75 "Returning default schema dictionary."
76 )
77 logger.debug(msg)
78 return {
79 "type": "string",
80 "required": False,
81 "style": "free form",
82 "units": None,
83 "options": None,
84 "description": "user defined",
85 "example": None,
86 "default": None,
87 }
89 def __delitem__(self, key):
90 try:
91 del self.__dict__[key]
92 except KeyError:
93 msg = "Key: {0} does not exist".format(key)
94 logger.info(msg)
96 def __iter__(self):
97 return iter(self.__dict__)
99 def __len__(self):
100 return len(self.__dict__)
102 # The final two methods aren't required, but nice for demo purposes:
103 def __str__(self):
104 """returns simple dict representation of the mapping"""
105 s = dict(sorted(self.__dict__.items(), key=itemgetter(0)))
106 lines = []
107 for key, value in s.items():
108 if key in ["logger"]:
109 continue
110 lines.append("{0}:".format(key))
111 for name, info in value.items():
112 lines.append("\t{0}: {1}".format(name, info))
113 return "\n".join(lines)
115 def __repr__(self):
116 """echoes class, id, & reproducible representation in the REPL"""
117 return "{}, BaseDict({})".format(
118 super(BaseDict, self).__repr__(), self.__dict__
119 )
121 @property
122 def name(self):
123 try:
124 return list(self.keys())[0]
125 except KeyError:
126 return None
128 def add_dict(self, add_dict, name=None, keys=None):
129 """
130 Add a dictionary to. If name is input it is added to the keys of
131 the input dictionary
133 Parameters
134 ----------
135 add_dict : dict or MutableMapping
136 dictionary to add
137 name : str, optional
138 name to add to keys, by default None
140 Examples
141 --------
142 >>> s_obj = Standards()
143 >>> run_dict = s_obj.run_dict
144 >>> run_dict.add_dict(s_obj.declination_dict, 'declination')
145 """
146 if not isinstance(add_dict, (dict, MutableMapping)):
147 msg = "add_dict takes only a dictionary not type {0}".format(type(add_dict))
148 logger.error(msg)
149 raise TypeError(msg)
151 if keys:
152 small_dict = {}
153 for key, value in add_dict.items():
154 if key in keys:
155 small_dict[key] = value
156 add_dict = small_dict
158 if name:
159 add_dict = dict(
160 [
161 ("{0}.{1}".format(name, key), value)
162 for key, value in add_dict.items()
163 ]
164 )
166 self.update(**add_dict)
168 def copy(self):
169 return deepcopy(self)
171 def to_latex(self, max_entries=7, first_table_len=7):
172 """
173 Convert to LaTeX format
175 Parameters
176 ----------
177 max_entries : int, optional
178 Maximum number of entries, by default 7
179 first_table_len : int, optional
180 Length of first table, by default 7
182 Returns
183 -------
184 TYPE
185 DESCRIPTION
186 """
187 beginning = [
188 r"\clearpage",
189 r"\\newpage",
190 r"\\begin{table}[h!]",
191 r"\caption*{{Attributes for {0} Category}}".format(self.name),
192 r"\\begin{tabular}{p{.305\\textwidth}p{.47\\textwidth}p{.2\\textwidth}}",
193 ]
195 end = [r"\end{tabular}", r"\label{tab:}", r"\end{table}"]
196 header = [
197 " & ".join(
198 [
199 r"\textbf{Metadata Key}",
200 r"\textbf{Description}",
201 r"\textbf{Example}",
202 ]
203 )
204 + " \\ \toprule"
205 ]
207 order = [
208 "name",
209 "required",
210 "units",
211 "type",
212 "style",
213 "description",
214 "example",
215 ]
217 level_dict = OrderedDict(sorted(self.items(), key=itemgetter(0)))
219 ntables = int(len(level_dict) / max_entries)
220 if len(level_dict) // max_entries > 0:
221 ntables += 1
223 lines = []
224 for name, v_dict in level_dict.items():
225 if not v_dict["options"] in [None, "none", "None", []]:
226 v_dict["description"] += ". Options: {0}".format(v_dict["options"])
227 line = [
228 r"\entry{{{0}}}".format(name)
229 + "".join(["{{{0}}}".format(v_dict[ii]) for ii in order[1:]])
230 ]
231 lines.append(line[0])
233 all_lines = beginning + header + ["\n".join(lines[0:first_table_len])] + end
234 for ii in range(ntables - 1):
235 stable = beginning + header
236 for kk in range(max_entries):
237 index = first_table_len + max_entries * ii + kk
238 try:
239 stable.append(lines[index].replace("_", r"\_"))
240 except IndexError:
241 break
242 stable += end
243 all_lines.append("\n".join(stable))
245 return all_lines
247 def from_csv(self, csv_fn):
248 """
249 Read in CSV file as a dictionary
251 Parameters
252 ----------
253 csv_fn : pathlib.Path or str
254 csv file to read metadata standards from
256 Returns
257 -------
258 dict
259 dictionary of the contents of the file
261 Examples
262 --------
264 >>> run_dict = BaseDict()
265 >>> run_dict.from_csv(get_level_fn('run'))
267 """
268 csv_fn = Path(csv_fn)
269 if not csv_fn.exists():
270 msg = f"Schema file {csv_fn} does not exist."
271 logger.error(msg)
272 raise MTSchemaError(msg)
274 with open(csv_fn, "r") as fid:
275 logger.debug(f"Reading schema CSV {csv_fn}")
276 lines = fid.readlines()
278 header = validators.validate_header(
279 [ss.strip().lower() for ss in lines[0].strip().split(",")],
280 attribute=True,
281 )
282 attribute_dict = {}
283 for line in lines[1:]:
284 if len(line) < 2:
285 continue
286 line_dict = dict(
287 [
288 (key, ss.strip())
289 for key, ss in zip(header, line.strip().split(",", len(header) - 1))
290 ]
291 )
293 key_name = validators.validate_attribute(line_dict["attribute"])
294 line_dict.pop("attribute")
296 attribute_dict[key_name] = validators.validate_value_dict(line_dict)
298 self.update(attribute_dict)
300 def to_csv(self, csv_fn):
301 """
302 write dictionary to csv file
304 Parameters
305 ----------
306 csv_fn : TYPE
307 DESCRIPTION
309 Returns
310 -------
311 TYPE
312 DESCRIPTION
313 """
315 if not isinstance(csv_fn, Path):
316 csv_fn = Path(csv_fn)
318 # sort dictionary first
319 lines = [",".join(REQUIRED_KEYS)]
320 for key in sorted(list(self.keys())):
321 line = [key]
322 for rkey in REQUIRED_KEYS[1:]:
323 value = self[key][rkey]
324 if isinstance(value, (list, tuple)):
325 if len(value) == 0:
326 line.append("None")
327 else:
328 line.append(
329 '"{0}"'.format(value).replace(",", "|").replace("'", "")
330 )
331 else:
332 line.append("{0}".format(self[key][rkey]))
333 lines.append(",".join(line))
335 with csv_fn.open("w") as fid:
336 fid.write("\n".join(lines))
337 logger.info("Wrote dictionary to {0}".format(csv_fn))
338 return csv_fn
340 def to_json(self, json_fn, indent=" " * 4):
341 """
342 Write schema standards to json
344 Parameters
345 ----------
346 json_fn : str or Path
347 full path to json file
348 indent : str, optional
349 indentation string, by default " " * 4
351 Returns
352 -------
353 Path
354 full path to json file
355 """
357 json_fn = Path(json_fn)
359 json_dict = dict([(k, v) for k, v in self.items() if k not in ["logger"]])
360 with open(json_fn, "w") as fid:
361 json.dump(json_dict, fid, cls=NumpyEncoder, indent=indent)
363 return json_fn
365 def from_json(self, json_fn):
366 """
367 Read schema standards from json
369 Parameters
370 ----------
371 json_fn : str or Path
372 full path to json file
374 Returns
375 -------
376 Path
377 full path to json file
378 """
380 json_fn = Path(json_fn)
381 if not json_fn.exists():
382 msg = f"JSON schema file {json_fn} does not exist"
383 logger.error(msg)
384 MTSchemaError(msg)
386 with open(json_fn, "r") as fid:
387 json_dict = json.load(fid)
389 valid_dict = {}
390 for k, v in json_dict.items():
391 valid_dict[k] = validators.validate_value_dict(v)
392 self.update(valid_dict)
395def get_schema_fn(schema_element, paths):
396 """
397 Get the correct file name for the given schema element from the provided
398 list of valid file names
400 Parameters
401 ----------
402 schema_element : str
403 name of the schema element to get filename for
404 paths : list
405 list of valid file paths
407 Returns
408 -------
409 pathlib.Path
410 correct file name for given element
411 """
412 for fn in paths:
413 if schema_element == fn.stem:
414 return fn
415 msg = f"Could not find schema element {schema_element}.json in {paths[0].parent}."
416 raise MTSchemaError(msg)
419def get_schema(schema_element, paths):
420 """
421 Get a :class:`mt_metadata.schema_base.BaseDict` object of the element
423 Parameters
424 ----------
425 schema_element : str
426 name of the schema element to get filename for
427 paths : list
428 list of valid file paths
430 Returns
431 -------
432 mt_metadata.schema_base.BaseDict
433 return a dictionary that describes the standards for the element
434 """
436 schema_fn = get_schema_fn(schema_element, paths)
437 element_dict = BaseDict()
438 element_dict.from_json(schema_fn)
440 return element_dict