Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mt_metadata \ mt_metadata \ base \ helpers.py: 70%
451 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:11 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:11 -0800
1# -*- coding: utf-8 -*-
2"""
3Created on Wed Dec 23 20:37:52 2020
5:copyright:
6 Jared Peacock (jpeacock@usgs.gov)
8:license: MIT
10"""
11import hashlib
12import json
13import logging
14import os
16# =============================================================================
17# Imports
18# =============================================================================
19import textwrap
20from collections import defaultdict, OrderedDict
21from collections.abc import MutableMapping
22from operator import itemgetter
23from pathlib import Path
24from threading import RLock
25from typing import Any, Dict
26from xml.dom import minidom
27from xml.etree import cElementTree as et
29import numpy as np
30from loguru import logger
31from pydantic import BaseModel
32from pydantic.fields import FieldInfo
33from pydantic_core import PydanticUndefined
35filter_descriptions = {
36 "zpk": "poles and zeros filter",
37 "coefficient": "coefficient filter",
38 "time delay": "time delay filter",
39 "fir": "finite impaulse response filter",
40 "fap": "frequency amplitude phase lookup table",
41 "frequency response table": "frequency amplitude phase lookup table",
42 "base": "base filter",
43}
45# =============================================================================
46# write doc strings
47# =============================================================================
50def wrap_description(description, column_width):
51 """
52 split a description into separate lines
53 """
54 if isinstance(description, list):
55 description = " ".join([str(d) for d in description])
56 description = description.strip()
57 elif not isinstance(description, str):
58 description = str(description)
59 description = description.strip()
60 d_lines = textwrap.wrap(description, column_width)
61 if len(d_lines) < 11:
62 d_lines += [""] * (11 - len(d_lines))
63 return d_lines
66def validate_c1(attr_dict, c1):
67 """
68 Validate column 1 width based on attribute dictionary
70 Parameters
71 ----------
72 attr_dict : dict
73 DESCRIPTION
74 c1 : int
75 DESCRIPTION
77 Returns
78 -------
79 int
80 DESCRIPTION
81 """
82 try:
83 max_c1 = max([len(key) for key in attr_dict.keys()])
85 if max_c1 > (c1 - 4):
86 c1 = max_c1 + 6
87 except ValueError:
88 pass
90 return c1
93def write_lines(field_dict, c1=45, c2=45, c3=15):
94 """
95 Takes a dictionary of field names to FieldInfo objects and parses it into a table
96 Returns a string representation of this table. This overwrites the doc.
98 Parameters
99 ----------
100 field_dict : dict
101 dictionary mapping field names to FieldInfo objects
102 c1 : int, optional
103 column 1 width, by default 45
104 c2 : int, optional
105 column 2 width, by default 45
106 c3 : int, optional
107 column 3 width, by default 15
109 Returns
110 -------
111 str
112 doc string
113 """
114 c1 = validate_c1(field_dict, c1)
116 line = " | {0:<{1}}| {2:<{3}} | {4:<{5}}|"
117 hline = " +{0}+{1}+{2}+".format(
118 "-" * (c1 + 1), "-" * (c2 + 2), "-" * (c3 + 1)
119 )
120 mline = " +{0}+{1}+{2}+".format(
121 "=" * (c1 + 1), "=" * (c2 + 2), "=" * (c3 + 1)
122 )
124 lines = [
125 hline,
126 line.format("**Metadata Key**", c1, "**Description**", c2, "**Example**", c3),
127 mline,
128 ]
130 for key, field_info in field_dict.items():
131 if isinstance(field_info, logging.Logger):
132 continue
134 # Extract description from FieldInfo
135 description = field_info.description or ""
136 d_lines = wrap_description(description, c2)
138 # Extract examples from json_schema_extra
139 examples = ""
140 if field_info.json_schema_extra and isinstance(
141 field_info.json_schema_extra, dict
142 ):
143 examples = field_info.json_schema_extra.get("examples", "")
144 e_lines = wrap_description(examples, c3)
146 # Get required status
147 required = "False"
148 if field_info.json_schema_extra and isinstance(
149 field_info.json_schema_extra, dict
150 ):
151 required = str(field_info.json_schema_extra.get("required", False))
153 # Get units
154 units = ""
155 if field_info.json_schema_extra and isinstance(
156 field_info.json_schema_extra, dict
157 ):
158 units = str(field_info.json_schema_extra.get("units", ""))
160 # Get type from annotation
161 field_type = str(field_info.annotation) if field_info.annotation else "string"
163 # Get style
164 style = "free form"
165 if field_info.json_schema_extra and isinstance(
166 field_info.json_schema_extra, dict
167 ):
168 style = field_info.json_schema_extra.get("style", "free form")
170 # line 1 is with the entry
171 lines.append(line.format(f"**{key}**", c1, d_lines[0], c2, e_lines[0], c3))
172 # line 2 skip an entry in the
173 lines.append(line.format("", c1, d_lines[1], c2, e_lines[1], c3))
174 # line 3 required
175 lines.append(
176 line.format(
177 f"Required: {required}",
178 c1,
179 d_lines[2],
180 c2,
181 e_lines[2],
182 c3,
183 )
184 )
185 # line 4 blank
186 lines.append(line.format("", c1, d_lines[3], c2, e_lines[3], c3))
188 # line 5 units
189 lines.append(line.format(f"Units: {units}", c1, d_lines[4], c2, e_lines[4], c3))
191 # line 6 blank
192 lines.append(line.format("", c1, d_lines[5], c2, e_lines[5], c3))
194 # line 7 type
195 lines.append(
196 line.format(f"Type: {field_type}", c1, d_lines[6], c2, e_lines[6], c3)
197 )
199 # line 8 blank
200 lines.append(line.format("", c1, d_lines[7], c2, e_lines[7], c3))
202 # line 9 style
203 lines.append(line.format(f"Style: {style}", c1, d_lines[8], c2, e_lines[8], c3))
205 # line 10 blank
206 lines.append(line.format("", c1, d_lines[9], c2, e_lines[9], c3))
208 # Handle default value - similar to write_block
209 default_value = field_info.default
210 if default_value is PydanticUndefined:
211 if (
212 field_info.default_factory is not None
213 and field_info.default_factory is not PydanticUndefined
214 ):
215 try:
216 # Some default factories may require arguments, handle both cases
217 if callable(field_info.default_factory):
218 try:
219 default_value = field_info.default_factory()
220 except TypeError:
221 # If it needs arguments, we can't call it
222 default_value = f"<{field_info.default_factory.__name__}>"
223 else:
224 default_value = field_info.default_factory
225 # If it's a complex object, just show the type name
226 if not isinstance(
227 default_value, (str, int, float, bool, type(None))
228 ):
229 default_value = type(default_value).__name__
230 except Exception:
231 default_value = None
232 else:
233 default_value = None
235 default = [str(default_value)] + [""] * 5
236 if len(str(default_value)) > c1 - 15:
237 default = [""] + wrap_description(str(default_value), c1)
239 # line 9 type
240 lines.append(
241 line.format(
242 f"**Default**: {default[0]}",
243 c1,
244 d_lines[8],
245 c2,
246 e_lines[8],
247 c3,
248 )
249 )
251 # line 10 blank
252 lines.append(line.format(default[1], c1, d_lines[9], c2, e_lines[9], c3))
254 # line 9 type
255 lines.append(line.format(default[2], c1, d_lines[10], c2, e_lines[10], c3))
257 # line 10 blank
258 if len(d_lines) > 11:
259 lines.append(line.format(default[3], c1, d_lines[11], c2, "", c3))
260 for index, d_line in enumerate(d_lines[12:], 4):
261 try:
262 lines.append(line.format(default[index], c1, d_line, c2, "", c3))
263 except IndexError:
264 lines.append(line.format("", c1, d_line, c2, "", c3))
266 # long default value
267 if len(default) > 7:
268 lines.append(line.format(default[3], c1, "", c2, "", c3))
269 for index, d_line in enumerate(default[4:], 12):
270 try:
271 lines.append(line.format(d_line, c1, d_lines[index], c2, "", c3))
272 except IndexError:
273 lines.append(line.format(d_line, c1, "", c2, "", c3))
274 lines.append(hline)
275 return "\n".join(lines)
278def write_block(key, field_info: FieldInfo, c1=45, c2=45, c3=15):
279 """
281 :param key: key to write from attr dict
282 :type key: string
283 :param field_info: field information dictionary
284 :type field_info: dict
285 :param c1: column 1 width, defaults to 45
286 :type c1: int, optional
287 :param c2: column 2 width, defaults to 45
288 :type c2: int, optional
289 :param c3: column 3 width, defaults to 15
290 :type c3: int, optional
291 :return: list of lines
292 :rtype: list
294 """
295 if len(key) > c1 - 4:
296 c1 = len(key) + 6
298 line = " | {0:<{1}}| {2:<{3}} | {4:<{5}}|"
299 hline = " +{0}+{1}+{2}+".format(
300 "-" * (c1 + 1), "-" * (c2 + 2), "-" * (c3 + 1)
301 )
302 mline = " +{0}+{1}+{2}+".format(
303 "=" * (c1 + 1), "=" * (c2 + 2), "=" * (c3 + 1)
304 )
305 section = f":navy:`{key}`"
307 lines = [
308 section,
309 "~" * len(section),
310 "",
311 ".. container::",
312 "",
313 " .. table::",
314 " :class: tight-table",
315 f" :widths: {c1} {c2} {c3}",
316 "",
317 hline,
318 line.format(f"**{key}**", c1, "**Description**", c2, "**Example**", c3),
319 mline,
320 ]
322 t_lines = wrap_description(field_info.annotation, c1 - 10)
323 d_lines = wrap_description(field_info.description, c2)
325 # Safely get examples from json_schema_extra
326 examples = ""
327 if field_info.json_schema_extra and isinstance(field_info.json_schema_extra, dict):
328 examples = field_info.json_schema_extra.get("examples", "")
329 e_lines = wrap_description(examples, c3)
331 # Safely get required and units
332 required = "False"
333 units = ""
334 if field_info.json_schema_extra and isinstance(field_info.json_schema_extra, dict):
335 required = str(field_info.json_schema_extra.get("required", False))
336 units = str(field_info.json_schema_extra.get("units", ""))
338 # line 1 is with the entry
339 lines.append(
340 line.format(
341 f"**Required**: {required}",
342 c1,
343 d_lines[0],
344 c2,
345 e_lines[0],
346 c3,
347 )
348 )
349 # line 2 skip an entry in the
350 lines.append(line.format("", c1, d_lines[1], c2, e_lines[1], c3))
351 # line 3 required
352 lines.append(
353 line.format(
354 f"**Units**: {units}",
355 c1,
356 d_lines[2],
357 c2,
358 e_lines[2],
359 c3,
360 )
361 )
362 # line 4 blank
363 lines.append(line.format("", c1, d_lines[3], c2, e_lines[3], c3))
365 # line 5 units
366 lines.append(
367 line.format(
368 f"**Type**: {t_lines[0]}",
369 c1,
370 d_lines[4],
371 c2,
372 e_lines[4],
373 c3,
374 )
375 )
377 # line 6 blank
378 lines.append(line.format(t_lines[1], c1, d_lines[5], c2, e_lines[5], c3))
380 # line 7 - continuation of type if needed
381 lines.append(line.format(t_lines[2], c1, d_lines[6], c2, e_lines[6], c3))
383 # Add additional lines if type annotation is very long (more than 2 lines)
384 if len(t_lines) > 3:
385 for i in range(3, min(len(t_lines), 6)): # Add up to 3 more lines for type
386 desc_index = 7 + (i - 3)
387 example_index = 7 + (i - 3)
388 lines.append(
389 line.format(
390 t_lines[i],
391 c1,
392 d_lines[desc_index] if desc_index < len(d_lines) else "",
393 c2,
394 e_lines[example_index] if example_index < len(e_lines) else "",
395 c3,
396 )
397 )
399 # line 8+ blank (adjust based on how many type lines we added)
400 type_lines_used = min(len(t_lines), 6)
401 desc_start_index = 5 + type_lines_used - 3 # Adjust description index
402 example_start_index = 5 + type_lines_used - 3 # Adjust example index
404 lines.append(
405 line.format(
406 "",
407 c1,
408 d_lines[desc_start_index] if desc_start_index < len(d_lines) else "",
409 c2,
410 e_lines[example_start_index] if example_start_index < len(e_lines) else "",
411 c3,
412 )
413 )
415 # Handle default value - always convert to string
416 default_value = field_info.default
417 if default_value is PydanticUndefined:
418 if (
419 field_info.default_factory is not None
420 and field_info.default_factory is not PydanticUndefined
421 ):
422 try:
423 # Some default factories may require arguments, handle both cases
424 if callable(field_info.default_factory):
425 try:
426 default_value = field_info.default_factory()
427 except TypeError:
428 # If it needs arguments, we can't call it
429 default_value = f"<{field_info.default_factory.__name__}>"
430 else:
431 default_value = field_info.default_factory
432 # If it's a complex object, just show the type name
433 if not isinstance(default_value, (str, int, float, bool, type(None))):
434 default_value = type(default_value).__name__
435 except Exception:
436 default_value = "None"
437 else:
438 default_value = "None"
440 # Ensure default_value is always a string
441 default_value_str = str(default_value)
443 # Handle special cases for display
444 if default_value_str == "":
445 default_value_str = '""' # Show empty string explicitly
446 elif default_value_str == "None":
447 default_value_str = "None" # Keep None as is
449 # Wrap default value if it's too long
450 if len(default_value_str) > c1 - 15:
451 default_lines = wrap_description(default_value_str, c1)
452 default = [""] + default_lines
453 else:
454 default = [default_value_str] + [""] * 10 # Ensure we have enough empty strings
456 # Ensure we have at least 11 items in default list
457 while len(default) < 11:
458 default.append("")
460 # Calculate the description and example line indices for the default section
461 # Account for the additional type lines we may have added
462 default_desc_start = 8 + max(
463 0, min(len(t_lines) - 3, 3)
464 ) # Start after type section
465 default_example_start = 8 + max(0, min(len(t_lines) - 3, 3))
467 # line N - Default value (where N depends on type length)
468 lines.append(
469 line.format(
470 f"**Default**: {default[0]}",
471 c1,
472 d_lines[default_desc_start] if default_desc_start < len(d_lines) else "",
473 c2,
474 (
475 e_lines[default_example_start]
476 if default_example_start < len(e_lines)
477 else ""
478 ),
479 c3,
480 )
481 )
483 # line N+1 - continuation of default/description
484 lines.append(
485 line.format(
486 default[1],
487 c1,
488 (
489 d_lines[default_desc_start + 1]
490 if (default_desc_start + 1) < len(d_lines)
491 else ""
492 ),
493 c2,
494 (
495 e_lines[default_example_start + 1]
496 if (default_example_start + 1) < len(e_lines)
497 else ""
498 ),
499 c3,
500 )
501 )
503 # line N+2 - continuation of default/description
504 lines.append(
505 line.format(
506 default[2],
507 c1,
508 (
509 d_lines[default_desc_start + 2]
510 if (default_desc_start + 2) < len(d_lines)
511 else ""
512 ),
513 c2,
514 (
515 e_lines[default_example_start + 2]
516 if (default_example_start + 2) < len(e_lines)
517 else ""
518 ),
519 c3,
520 )
521 )
523 # Handle additional description lines if they exist
524 if len(d_lines) > 11:
525 lines.append(line.format(default[3], c1, d_lines[11], c2, "", c3))
526 for index, d_line in enumerate(d_lines[12:], 4):
527 if index < len(default):
528 lines.append(line.format(default[index], c1, d_line, c2, "", c3))
529 else:
530 lines.append(line.format("", c1, d_line, c2, "", c3))
532 # Handle long default values that span multiple lines
533 if len(default) > 4:
534 start_index = 4
535 # Only add additional lines if we haven't already handled them above
536 if len(d_lines) <= 11:
537 for index, default_line in enumerate(default[start_index:], start_index):
538 if default_line.strip(): # Only add non-empty default lines
539 lines.append(line.format(default_line, c1, "", c2, "", c3))
541 lines.append(hline)
542 lines.append("")
544 return lines
547# code to convert ini_dict to flattened dictionary
548# default seperater '_'
549def flatten_dict(meta_dict, parent_key=None, sep="."):
550 """
552 :param meta_dict: DESCRIPTION
553 :type meta_dict: TYPE
554 :param parent_key: DESCRIPTION, defaults to None
555 :type parent_key: TYPE, optional
556 :param sep: DESCRIPTION, defaults to '.'
557 :type sep: TYPE, optional
558 :return: DESCRIPTION
559 :rtype: TYPE
561 """
562 items = []
563 for key, value in meta_dict.items():
564 if parent_key:
565 new_key = f"{parent_key}{sep}{key}"
566 else:
567 new_key = key
568 if isinstance(value, MutableMapping):
569 items.extend(flatten_dict(value, new_key, sep=sep).items())
570 else:
571 items.append((new_key, value))
572 return dict(items)
575def flatten_list(x_list):
576 """
577 Flatten a nested list
578 flatten = lambda l: [item for sublist in l for item in sublist]
580 Returns
581 -------
582 None.
584 """
586 flat_list = [item for sublist in x_list for item in sublist]
588 return flat_list
591def recursive_split_dict(key, value, remainder, sep="."):
592 """
593 recursively split a dictionary
595 :param key: DESCRIPTION
596 :type key: TYPE
597 :param value: DESCRIPTION
598 :type value: TYPE
599 :param remainder: DESCRIPTION
600 :type remainder: TYPE
601 :return: DESCRIPTION
602 :rtype: TYPE
604 """
606 key, *other = key.split(sep, 1)
607 if other:
608 recursive_split_dict(other[0], value, remainder.setdefault(key, {}))
609 else:
610 remainder[key] = value
613def get_by_alias(model, alias_name):
614 # Find the field name that corresponds to the given alias
615 # Use __pydantic_fields__ instead of model_fields (which is deprecated)
616 for field_name, field_info in model.__pydantic_fields__.items():
617 if field_info.alias == alias_name:
618 return getattr(model, field_name)
619 return None
622# def get_alias_key(model, key: str) -> str:
623# """
624# Try to find an alias for a field name in a Pydantic BaseModel
626# Parameters
627# ----------
628# model : BaseModel
629# The Pydantic model to search for the field
630# key : str
631# The field name to find the alias for
633# Returns
634# -------
635# str or None
636# The alias name if found, None otherwise
637# """
638# try:
639# field_info = model.__pydantic_fields__.get(key)
640# if field_info.validation_alias:
642# if field_info and field_info.alias:
643# return field_info.alias
644# return key # Return the original key if no alias found
645# except (AttributeError, KeyError):
646# return key # Return the original key if any errors occur
649def recursive_split_getattr(base_object, name, sep="."):
650 key, *other = name.split(sep, 1)
652 if other:
653 base_object = getattr(base_object, key)
654 value, prop = recursive_split_getattr(base_object, other[0])
655 else:
656 # with Pydantic, if the attribute does not exist an attribute error
657 # will be raised, which is desired. The only issue will be if the
658 # attribute is an alias, then TODO create a get from alias method.
659 try:
660 value = getattr(base_object, key)
661 except AttributeError:
662 value = None
663 prop = False
664 try:
665 if isinstance(getattr(type(base_object), key), property):
666 prop = True
667 except AttributeError:
668 prop = False
669 return value, prop
672def recursive_split_setattr(base_object, name, value, sep=".", skip_validation=False):
673 """
674 Recursively split a name and set the value of the last key. Recursion splits on the separator present in the name.
676 :param base_object: The object having its attribute set, or a "parent" object in the recursive/nested scenario
677 :type base_object: object
678 :param name: The name of the attribute to set
679 :type name: str
680 :param value: The value to set the attribute to
681 :type value: any
682 :param sep: The separator to split the name on, defaults to "."
683 :type sep: str, optional
684 :param skip_validation: Whether to skip validation/parse of the attribute, defaults to False
685 :type skip_validation: Optional[bool]
687 :return: None
688 :rtype: NoneType
690 """
691 key, *other = name.split(sep, 1)
693 if other:
694 base_object = getattr(base_object, key)
695 recursive_split_setattr(base_object, other[0], value)
696 else:
697 # if the value is a list or dict then we need to add accordingly
698 if isinstance(value, list):
699 if len(value) == 0:
700 value = []
701 elif isinstance(value[0], (dict, OrderedDict)):
702 new_list = []
703 for obj_dict in value:
704 obj_key = list(obj_dict.keys())[0]
705 try:
706 obj = base_object._objects_included[obj_key]()
707 obj.from_dict(obj_dict)
708 new_list.append(obj)
709 except KeyError:
710 raise KeyError(
711 f"Could not find {obj_key} in {base_object._objects_included}"
712 )
713 value = new_list
715 setattr(base_object, key, value)
718def structure_dict(meta_dict, sep="."):
719 """
721 :param meta_dict: DESCRIPTION
722 :type meta_dict: TYPE
723 :param sep: DESCRIPTION, defaults to '.'
724 :type sep: TYPE, optional
725 :return: DESCRIPTION
726 :rtype: TYPE
728 """
729 structured_dict = {}
730 for key, value in meta_dict.items():
731 recursive_split_dict(key, value, structured_dict, sep=sep)
732 return structured_dict
735def get_units(name, attr_dict):
736 """ """
737 try:
738 units = attr_dict["json_schema_extra"]["units"]
739 if not isinstance(units, str):
740 units = "{0}".format(units)
741 except KeyError:
742 units = None
743 if units in [None, "None", "none"]:
744 return None
745 return units
748def get_type(name, attr_dict):
749 """ """
750 try:
751 v_type = attr_dict[name]["type"]
752 if v_type in ["string", str, "str", "String"]:
753 v_type = None
754 except KeyError:
755 v_type = None
756 return v_type
759def recursive_split_xml(element, item, base, name, attr_dict=None):
760 """ """
761 key = None
762 if isinstance(item, dict):
763 for key, value in item.items():
764 attr_name = ".".join([base, key])
766 sub_element = et.SubElement(element, key)
767 recursive_split_xml(sub_element, value, attr_name, key, attr_dict)
768 elif isinstance(item, (tuple, list)):
769 for ii in item:
770 sub_element = et.SubElement(element, "item")
771 recursive_split_xml(sub_element, ii, base, name, attr_dict)
772 elif isinstance(item, str):
773 element.text = item
774 elif item is None:
775 # Leave element.text as None so XML has empty element (no text)
776 pass
777 elif isinstance(item, (float, int)):
778 element.text = str(item)
779 else:
780 # if the value is an hdf5 reference make it a string
781 if "reference" in str(type(item)).lower():
782 element.text = str(item)
783 else:
784 raise ValueError("Value cannot be {0}".format(type(item)))
785 if attr_dict:
786 units = get_units(base, attr_dict)
787 if units:
788 element.set("units", str(units))
789 # v_type = get_type(base, attr_dict)
790 # if v_type:
791 # element.set("type", v_type)
792 return element, name
795def dict_to_xml(meta_dict, attr_dict=None):
796 """
797 Assumes dictionary is structured {class:{attribute_dict}}
799 :param meta_dict: DESCRIPTION
800 :type meta_dict: TYPE
801 :return: DESCRIPTION
802 :rtype: TYPE
804 """
805 class_name = list(meta_dict.keys())[0]
806 root = et.Element(class_name)
808 for key, value in meta_dict[class_name].items():
809 element = et.SubElement(root, key)
810 recursive_split_xml(element, value, key, key, attr_dict)
811 return root
814def element_to_dict(element):
815 """
817 .. todo:: Add way to read in attritues like units and validate them.
819 :param element: DESCRIPTION
820 :type element: TYPE
821 :return: DESCRIPTION
822 :rtype: TYPE
824 """
825 meta_dict = {element.tag: {} if element.attrib else None}
826 children = list(element)
827 if children:
828 child_dict = defaultdict(list)
829 for dc in map(element_to_dict, children):
830 for k, v in dc.items():
831 child_dict[k].append(v)
832 meta_dict = {
833 element.tag: {k: v[0] if len(v) == 1 else v for k, v in child_dict.items()}
834 }
835 if "item" in meta_dict[element.tag].keys():
836 meta_dict[element.tag] = meta_dict[element.tag]["item"]
837 # going to skip attributes for now, later can check them against
838 # standards, neet to skip units and type
839 if element.attrib:
840 pop_units = False
841 pop_type = False
842 for k, v in element.attrib.items():
843 if k in ["units"]:
844 if "type" in element.attrib.keys():
845 pop_type = True
846 if len(element.attrib.keys()) <= 2:
847 pop_units = True
848 continue
849 if k in ["type"]:
850 if len(element.attrib.keys()) <= 1:
851 if v in [
852 "float",
853 "string",
854 "integer",
855 "boolean",
856 "list",
857 "tuple",
858 ]:
859 pop_type = True
860 continue
862 meta_dict[element.tag][k] = v
863 if pop_units:
864 element.attrib.pop("units")
865 if pop_type:
866 element.attrib.pop("type")
867 if element.text:
868 text = element.text.strip()
869 if children or element.attrib:
870 if text:
871 if len(element.attrib.keys()) > 0:
872 meta_dict[element.tag]["value"] = text
873 else:
874 meta_dict[element.tag] = text
875 else:
876 meta_dict[element.tag] = text
877 return OrderedDict(sorted(meta_dict.items(), key=itemgetter(0)))
880def element_to_string(element):
881 return (
882 minidom.parseString(et.tostring(element).decode())
883 .toprettyxml(
884 indent=" ",
885 encoding="UTF-8",
886 )
887 .decode()
888 )
891# =============================================================================
892# Helper function to be sure everything is encoded properly
893# =============================================================================
894class NumpyEncoder(json.JSONEncoder):
895 """
896 Need to encode numpy ints and floats for json to work
897 """
899 def default(self, obj):
900 """
902 :param obj:
903 :type obj:
904 :return:
905 """
906 if isinstance(
907 obj,
908 (
909 np.int_,
910 np.intc,
911 np.intp,
912 np.int8,
913 np.int16,
914 np.int32,
915 np.int64,
916 np.uint8,
917 np.uint16,
918 np.uint32,
919 np.uint64,
920 ),
921 ):
922 return int(obj)
923 elif isinstance(obj, (np.float16, np.float32, np.float64)):
924 return float(obj)
925 elif isinstance(obj, (np.ndarray)):
926 if obj.dtype == complex:
927 return {"real": obj.real.tolist(), "imag": obj.imag.tolist()}
928 else:
929 return obj.tolist()
930 # For now turn references into a generic string
931 elif "h5" in str(type(obj)):
932 return str(obj)
933 elif hasattr(obj, "unicode_string"):
934 return obj.unicode_string()
935 elif isinstance(obj, Path):
936 return str(obj)
937 return json.JSONEncoder.default(self, obj)
940def validate_name(name, pattern=None):
941 """
942 Validate name
944 :param name: DESCRIPTION
945 :type name: TYPE
946 :param pattern: DESCRIPTION, defaults to None
947 :type pattern: TYPE, optional
948 :return: DESCRIPTION
949 :rtype: TYPE
951 """
952 if name is None:
953 return "unknown"
954 return name.replace(" ", "_")
957def has_numbers(text):
958 """
959 Check if a string contains any numeric characters.
961 Parameters
962 ----------
963 text : str
964 The string to check for numeric characters.
966 Returns
967 -------
968 bool
969 True if the string contains any digits (0-9), False otherwise.
971 Examples
972 --------
973 >>> has_numbers("abc123")
974 True
975 >>> has_numbers("hello")
976 False
977 >>> has_numbers("test1")
978 True
979 >>> has_numbers("")
980 False
981 """
982 if not isinstance(text, str):
983 return False
984 return any(char.isdigit() for char in text)
987def is_numeric_string(text):
988 """
989 Check if a string represents a valid number (int or float).
991 Parameters
992 ----------
993 text : str
994 The string to check if it represents a number.
996 Returns
997 -------
998 bool
999 True if the string can be converted to a number, False otherwise.
1001 Examples
1002 --------
1003 >>> is_numeric_string("123")
1004 True
1005 >>> is_numeric_string("12.34")
1006 True
1007 >>> is_numeric_string("-45.6")
1008 True
1009 >>> is_numeric_string("1.23e-4")
1010 True
1011 >>> is_numeric_string("abc")
1012 False
1013 >>> is_numeric_string("12abc")
1014 False
1015 """
1016 if not isinstance(text, str):
1017 return False
1019 # Handle empty string
1020 if not text.strip():
1021 return False
1023 try:
1024 float(text)
1025 return True
1026 except ValueError:
1027 return False
1030def extract_numbers(text):
1031 """
1032 Extract all numeric values from a string.
1034 Parameters
1035 ----------
1036 text : str
1037 The string to extract numbers from.
1039 Returns
1040 -------
1041 list
1042 List of float values found in the string.
1044 Examples
1045 --------
1046 >>> extract_numbers("abc123def45.6")
1047 [123.0, 45.6]
1048 >>> extract_numbers("no numbers here")
1049 []
1050 >>> extract_numbers("1.5 and -2.3e4")
1051 [1.5, -23000.0]
1052 """
1053 import re
1055 if not isinstance(text, str):
1056 return []
1058 # Pattern to match integers, floats, and scientific notation
1059 number_pattern = r"[-+]?(?:\d*\.?\d+(?:[eE][-+]?\d+)?)"
1061 matches = re.findall(number_pattern, text)
1063 numbers = []
1064 for match in matches:
1065 try:
1066 numbers.append(float(match))
1067 except ValueError:
1068 continue
1070 return numbers
1073def requires(**requirements):
1074 """Decorate a function with optional dependencies.
1076 Parameters
1077 ----------
1078 **requirements : obj
1079 keywords of package name and the required object for
1080 a function.
1082 Returns
1083 -------
1084 decorated_function : function
1085 Original function if all soft dependencies are met, otherwise
1086 it returns an empty function which prints why it is not running.
1088 Examples
1089 --------
1090 ```
1091 try:
1092 import obspy
1093 except ImportError:
1094 obspy = None
1096 @requires(obspy=obspy)
1097 def obspy_function():
1098 ...
1099 # does something using obspy
1101 """
1102 # Check the requirements, add missing package name in the list `missing`.
1103 missing = []
1104 for key, item in requirements.items():
1105 if not item:
1106 missing.append(key)
1108 def decorated_function(function):
1109 """Wrap function."""
1110 if not missing:
1111 return function
1112 else:
1114 def passer(*args, **kwargs):
1115 logger.warning(f"Missing dependencies: {missing}.")
1116 logger.warning(f"Not running `{function.__name__}`.")
1118 return passer
1120 return decorated_function
1123def object_to_array(value, dtype=float):
1124 """
1125 Convert a value to a numpy array.
1127 Parameters
1128 ----------
1129 value : any
1130 The value to convert.
1132 Returns
1133 -------
1134 np.ndarray
1135 The converted numpy array.
1137 """
1138 if value is None:
1139 return np.empty(0)
1140 elif isinstance(value, (list, tuple)):
1141 return np.array(value, dtype=dtype)
1142 elif isinstance(value, np.ndarray):
1143 return value.astype(dtype)
1144 elif isinstance(value, str):
1145 if value in ["", "none", "None"]:
1146 return np.empty(0)
1148 if not has_numbers(value) and not is_numeric_string(value):
1149 msg = f"String input must be a single number or a list of numbers, not '{value}'"
1150 raise TypeError(msg)
1152 elif has_numbers(value) and not is_numeric_string(value):
1153 value = extract_numbers(value)
1154 return np.array(value, dtype=dtype)
1156 if "j" in value and has_numbers(value):
1157 dtype = complex
1159 if "," in value:
1160 separator = ","
1161 else:
1162 separator = " " # Use space as default separator for whitespace
1164 try:
1165 return np.fromstring(value, sep=separator, dtype=dtype)
1167 except ValueError:
1168 msg = (
1169 f"input values must be a list, tuple, or np.ndarray, not {type(value)}"
1170 )
1171 raise TypeError(msg)
1172 elif isinstance(value, (int, float)):
1173 # Handle single numeric input
1174 return np.array([float(value)], dtype=dtype)
1175 elif isinstance(value, bytes):
1176 # Handle bytes input (e.g., from binary files)
1177 try:
1178 return np.frombuffer(value, dtype=dtype)
1179 except ValueError:
1180 msg = (
1181 f"input values must be a list, tuple, or np.ndarray, not {type(value)}"
1182 )
1183 raise TypeError(msg)
1184 else:
1185 msg = f"input values must be an list, tuple, or np.ndarray, not {type(value)}"
1186 raise TypeError(msg)
1189def _should_include_coordinate_field(field_name: str) -> bool:
1190 """
1191 Helper function to determine if a coordinate field should be included
1192 in to_dict output even when it has None/default values.
1194 This ensures backward compatibility for coordinate fields that tests expect.
1195 """
1196 coordinate_fields = {
1197 "negative.x",
1198 "negative.y",
1199 "negative.z",
1200 "positive.x2",
1201 "positive.y2",
1202 "positive.z2",
1203 "location.x",
1204 "location.y",
1205 "location.z",
1206 }
1207 return field_name in coordinate_fields
1210def _should_convert_none_to_empty_string(field_name: str) -> bool:
1211 """
1212 Helper function to determine if a field should convert None to empty string
1213 for backward compatibility.
1214 """
1215 string_fields = {
1216 "data_logger.firmware.author",
1217 "provenance.software.author",
1218 "provenance.software.version",
1219 }
1220 # Convert external URL None -> "" for backward compatibility in to_dict
1221 string_fields.add("url")
1222 return field_name in string_fields