Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mt_metadata \ mt_metadata \ utils \ summarize.py: 90%
156 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:11 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:11 -0800
1# -*- coding: utf-8 -*-
2"""
3Created on Tue Feb 23 11:52:35 2021
5:copyright:
6 Jared Peacock (jpeacock@usgs.gov)
8:license: MIT
10This module provides functionality to summarize metadata standards from both
11legacy BaseDict-based objects and modern Pydantic v2 MetadataBase objects.
13The main functions are:
14- summarize_timeseries_standards(): Legacy function for BaseDict objects
15- summarize_pydantic_standards(): New function for Pydantic v2 MetadataBase objects
16- extract_metadata_fields_from_pydantic(): Extract fields from individual Pydantic classes
17- summarize_standards(): Unified interface supporting both legacy and Pydantic systems
19Example usage:
20 # For Pydantic v2 objects (recommended)
21 >>> df = summarize_standards(metadata_type="pydantic")
23 # Extract fields from individual class
24 >>> from mt_metadata.timeseries import Survey
25 >>> fields = extract_metadata_fields_from_pydantic(Survey)
27 # Get BaseDict-compatible summary
28 >>> summary = summarize_pydantic_standards()
29"""
30from typing import get_args, get_origin, Union
32# =============================================================================
33# Imports
34# =============================================================================
35import numpy as np
36from numpy._typing._array_like import NDArray
37import pandas as pd
38from loguru import logger
40from mt_metadata import __version__
41from mt_metadata.base import BaseDict, MetadataBase
42from mt_metadata.utils.validators import validate_name
45# =============================================================================
47SUMMARIZE_DTYPE = np.dtype(
48 [
49 ("attribute", "U72"),
50 ("type", "U15"),
51 ("required", np.bool_),
52 ("style", "U72"),
53 ("units", "U32"),
54 ("description", "U300"),
55 ("options", "U150"),
56 ("alias", "U72"),
57 ("example", "U72"),
58 ("default", "U72"),
59 ]
60)
63def extract_metadata_fields_from_pydantic(metadata_class):
64 """
65 Extract field information from a Pydantic v2 MetadataBase class definition
66 and convert it to a format compatible with BaseDict.
68 Parameters
69 ----------
70 metadata_class : type
71 A MetadataBase class (not instance)
73 Returns
74 -------
75 dict
76 Dictionary with field information compatible with BaseDict
77 """
78 if not (
79 isinstance(metadata_class, type) and issubclass(metadata_class, MetadataBase)
80 ):
81 raise TypeError(
82 f"Object must be a MetadataBase class, got {type(metadata_class)}"
83 )
85 field_dict = {}
86 model_fields = metadata_class.model_fields
88 for field_name, field_info in model_fields.items():
89 # Extract basic field information
90 field_data = {
91 "type": _get_field_type(field_info),
92 "required": _get_field_required(field_info),
93 "style": _get_field_style(field_info),
94 "units": _get_field_units(field_info),
95 "description": _get_field_description(field_info),
96 "options": _get_field_options(field_info),
97 "alias": _get_field_alias(field_info),
98 "example": _get_field_example(field_info),
99 "default": _get_field_default(field_info),
100 }
102 field_dict[field_name] = field_data
104 return field_dict
107def _get_field_type(field_info):
108 """Extract type information from Pydantic field"""
109 annotation = field_info.annotation
111 # Handle Union types (e.g., str | int)
112 origin = get_origin(annotation)
113 if origin is not None:
114 args = get_args(annotation)
115 if origin is Union or (
116 hasattr(origin, "__name__") and origin.__name__ == "UnionType"
117 ):
118 # For Union types, get the first non-None type
119 for arg in args:
120 if arg is not type(None):
121 annotation = arg
122 break
124 # Map Python types to string representations that BaseDict now accepts
125 type_mapping = {
126 str: "string",
127 int: "integer",
128 float: "float",
129 bool: "boolean",
130 list: "list", # Now supported by BaseDict
131 dict: "dict", # Now supported by BaseDict
132 }
134 if annotation in type_mapping:
135 return type_mapping[annotation]
136 elif hasattr(annotation, "__name__"):
137 # For custom classes, map to appropriate types that BaseDict accepts
138 name = annotation.__name__.lower()
139 if name.endswith("enum"):
140 return "string" # Enums are essentially strings with options
141 elif "list" in name or "array" in name:
142 return "list" # Use list type
143 elif "dict" in name:
144 return "dict" # Use dict type
145 elif (
146 "comment" in name
147 or "citation" in name
148 or any(x in name for x in ["person", "location", "fdsn"])
149 ):
150 return "object" # Complex objects as object type
151 else:
152 return "object" # Default to object for complex types
153 else:
154 return "string" # Default fallback
157def _get_field_required(field_info):
158 """Extract required status from Pydantic field"""
159 # Check json_schema_extra first
160 if hasattr(field_info, "json_schema_extra") and field_info.json_schema_extra:
161 if "required" in field_info.json_schema_extra:
162 return field_info.json_schema_extra["required"]
164 # Check if field has a default value - if no default, it's required
165 from pydantic_core import PydanticUndefined
167 return field_info.default is PydanticUndefined and (
168 field_info.default_factory is None
169 or field_info.default_factory is PydanticUndefined
170 )
173def _get_field_style(field_info):
174 """Extract style information from Pydantic field"""
175 # Check for pattern in field constraints
176 if hasattr(field_info, "pattern") and field_info.pattern:
177 return f"pattern: {field_info.pattern}"
179 # Check json_schema_extra for style
180 if hasattr(field_info, "json_schema_extra") and field_info.json_schema_extra:
181 if "style" in field_info.json_schema_extra:
182 return field_info.json_schema_extra["style"]
184 return "free form"
187def _get_field_units(field_info):
188 """Extract units information from Pydantic field"""
189 if hasattr(field_info, "json_schema_extra") and field_info.json_schema_extra:
190 if "units" in field_info.json_schema_extra:
191 return field_info.json_schema_extra["units"]
192 return None
195def _get_field_description(field_info):
196 """Extract description from Pydantic field"""
197 return getattr(field_info, "description", "No description available")
200def _get_field_options(field_info):
201 """Extract options/choices from Pydantic field"""
202 # Check for enum or choices in constraints
203 options = []
205 # Check for enum type
206 annotation = field_info.annotation
207 if hasattr(annotation, "__members__"): # Enum type
208 options = list(annotation.__members__.keys())
210 # Check json_schema_extra for options
211 if hasattr(field_info, "json_schema_extra") and field_info.json_schema_extra:
212 if "options" in field_info.json_schema_extra:
213 schema_options = field_info.json_schema_extra["options"]
214 if isinstance(schema_options, (list, tuple)):
215 options.extend(schema_options)
217 return options # Return empty list instead of None to avoid validation errors
220def _get_field_alias(field_info):
221 """Extract alias from Pydantic field"""
222 alias = getattr(field_info, "alias", None)
223 return alias if alias is not None else ""
226def _get_field_example(field_info):
227 """Extract example from Pydantic field"""
228 # First check the direct examples attribute (deprecated but still used)
229 examples = getattr(field_info, "examples", None)
231 # If not found, check json_schema_extra for examples
232 if (
233 examples is None
234 and hasattr(field_info, "json_schema_extra")
235 and field_info.json_schema_extra
236 ):
237 examples = field_info.json_schema_extra.get("examples", None)
239 if hasattr(examples, "__iter__") and not isinstance(examples, str):
240 if examples and len(examples) > 0:
241 return str(examples[0])
242 else:
243 return str(examples) if examples is not None else ""
246def _get_field_default(field_info):
247 """Extract default value from Pydantic field"""
248 from pydantic_core import PydanticUndefined
250 if field_info.default is not PydanticUndefined:
251 # Convert to string for storage
252 default_val = field_info.default
253 if isinstance(default_val, (str, int, float, bool)):
254 return str(default_val)
255 elif default_val is None:
256 return ""
257 else:
258 return str(default_val)
259 elif (
260 field_info.default_factory is not None
261 and field_info.default_factory is not PydanticUndefined
262 ):
263 try:
264 default_val = field_info.default_factory()
265 if isinstance(default_val, (str, int, float, bool)):
266 return str(default_val)
267 else:
268 return str(type(default_val).__name__)
269 except:
270 return ""
271 return ""
274def collect_basemodel_objects(module: str) -> dict[type[MetadataBase], str]:
275 """
276 Collect all MetadataBase subclasses from a given module.
278 Parameters
279 ----------
280 module : str
281 The module to inspect (e.g., 'mt_metadata.timeseries')
283 Returns
284 -------
285 dict[type[MetadataBase], str]
286 Dictionary mapping class objects to their names
287 """
288 import importlib
289 import inspect
291 mod = importlib.import_module(f"mt_metadata.{module}")
292 basemodel_classes = {}
293 for name, obj in inspect.getmembers(mod, inspect.isclass):
294 if issubclass(obj, MetadataBase) and obj is not MetadataBase:
295 basemodel_classes[obj] = validate_name(name)
296 return basemodel_classes
299def summarize_pydantic_standards(module: str = "timeseries") -> BaseDict:
300 """
301 Summarize the standards for metadata using Pydantic v2 MetadataBase classes.
302 Similar to summarize_timeseries_standards but works with the new Pydantic structure.
304 Parameters
305 ----------
306 module : str, optional
307 The module to inspect, by default "timeseries"
309 Returns
310 -------
311 BaseDict
312 BaseDict object containing summarized field information
313 """
314 metadata_classes = collect_basemodel_objects(module)
316 summary_dict = BaseDict()
318 for metadata_class, class_name in metadata_classes.items():
319 try:
320 class_fields = extract_metadata_fields_from_pydantic(metadata_class)
321 summary_dict.add_dict(class_fields, class_name)
322 except Exception as e:
323 logger.exception(e)
324 logger.warning(f"Could not process {class_name} fields: {e}")
326 return summary_dict
329def summary_to_array(summary_dict, dtype=SUMMARIZE_DTYPE) -> np.ndarray:
330 """
331 Summarize all metadata from a summarized dictionary of standards
333 Parameters
334 ----------
335 summary_dict : dict
336 Dictionary of summarized standards
338 Returns
339 -------
340 np.array
341 numpy structured array
342 """
344 entries = np.zeros(len(summary_dict.keys()) + 1, dtype=dtype)
345 entries[0]["attribute"] = "mt_metadata.standards.version"
346 entries[0]["description"] = f"Metadata standards version {__version__}"
347 entries[0]["type"] = "string"
348 entries[0]["style"] = "free form"
349 count = 1
350 for key, v_dict in summary_dict.items():
351 entries[count]["attribute"] = key
352 for dkey in dtype.names[1:]:
353 value = v_dict[dkey]
355 if isinstance(value, list):
356 if len(value) == 0:
357 value = ""
359 else:
360 value = ",".join(["{0}".format(ii) for ii in value])
361 if value is None:
362 value = ""
364 entries[count][dkey] = value
365 count += 1
367 return entries
370def summary_to_dataframe(summary_dict):
371 """
372 Convert a summary dictionary to a pandas DataFrame.
374 Parameters
375 ----------
376 summary_dict : dict
377 Dictionary of summarized standards
379 Returns
380 -------
381 pd.DataFrame
382 DataFrame containing the summarized standards
383 """
384 entries = summary_to_array(summary_dict)
385 return pd.DataFrame(entries)
388def summarize_standards(
389 module="timeseries",
390 csv_fn=None,
391 output_type="dataframe",
392 dtype=SUMMARIZE_DTYPE,
393) -> Union[pd.DataFrame, np.ndarray]:
394 """
395 Summarize standards into a numpy array and write a csv if specified
397 Parameters
398 ----------
399 module : str, optional
400 Module to summarize, by default "timeseries"
401 csv_fn : str or Path, optional
402 Full path to write a csv file, by default None
404 Returns
405 -------
406 numpy.ndarray | pd.DataFrame
407 If output_type is "array", returns a numpy structured array.
408 If output_type is "dataframe", returns a pandas DataFrame.
409 """
411 summary_dict = summarize_pydantic_standards(module)
412 if output_type == "array":
413 return summary_to_array(summary_dict, dtype=dtype)
414 elif output_type == "dataframe" and csv_fn is None:
415 return summary_to_dataframe(summary_dict)
417 elif csv_fn:
418 summary_df = summary_to_dataframe(summary_dict)
419 summary_df.to_csv(csv_fn, index=False)
420 return summary_df