Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mt_metadata \ mt_metadata \ utils \ summarize.py: 90%

156 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-10 00:11 -0800

1# -*- coding: utf-8 -*- 

2""" 

3Created on Tue Feb 23 11:52:35 2021 

4 

5:copyright: 

6 Jared Peacock (jpeacock@usgs.gov) 

7 

8:license: MIT 

9 

10This module provides functionality to summarize metadata standards from both 

11legacy BaseDict-based objects and modern Pydantic v2 MetadataBase objects. 

12 

13The main functions are: 

14- summarize_timeseries_standards(): Legacy function for BaseDict objects 

15- summarize_pydantic_standards(): New function for Pydantic v2 MetadataBase objects 

16- extract_metadata_fields_from_pydantic(): Extract fields from individual Pydantic classes 

17- summarize_standards(): Unified interface supporting both legacy and Pydantic systems 

18 

19Example usage: 

20 # For Pydantic v2 objects (recommended) 

21 >>> df = summarize_standards(metadata_type="pydantic") 

22 

23 # Extract fields from individual class 

24 >>> from mt_metadata.timeseries import Survey 

25 >>> fields = extract_metadata_fields_from_pydantic(Survey) 

26 

27 # Get BaseDict-compatible summary 

28 >>> summary = summarize_pydantic_standards() 

29""" 

30from typing import get_args, get_origin, Union 

31 

32# ============================================================================= 

33# Imports 

34# ============================================================================= 

35import numpy as np 

36from numpy._typing._array_like import NDArray 

37import pandas as pd 

38from loguru import logger 

39 

40from mt_metadata import __version__ 

41from mt_metadata.base import BaseDict, MetadataBase 

42from mt_metadata.utils.validators import validate_name 

43 

44 

45# ============================================================================= 

46 

47SUMMARIZE_DTYPE = np.dtype( 

48 [ 

49 ("attribute", "U72"), 

50 ("type", "U15"), 

51 ("required", np.bool_), 

52 ("style", "U72"), 

53 ("units", "U32"), 

54 ("description", "U300"), 

55 ("options", "U150"), 

56 ("alias", "U72"), 

57 ("example", "U72"), 

58 ("default", "U72"), 

59 ] 

60) 

61 

62 

63def extract_metadata_fields_from_pydantic(metadata_class): 

64 """ 

65 Extract field information from a Pydantic v2 MetadataBase class definition 

66 and convert it to a format compatible with BaseDict. 

67 

68 Parameters 

69 ---------- 

70 metadata_class : type 

71 A MetadataBase class (not instance) 

72 

73 Returns 

74 ------- 

75 dict 

76 Dictionary with field information compatible with BaseDict 

77 """ 

78 if not ( 

79 isinstance(metadata_class, type) and issubclass(metadata_class, MetadataBase) 

80 ): 

81 raise TypeError( 

82 f"Object must be a MetadataBase class, got {type(metadata_class)}" 

83 ) 

84 

85 field_dict = {} 

86 model_fields = metadata_class.model_fields 

87 

88 for field_name, field_info in model_fields.items(): 

89 # Extract basic field information 

90 field_data = { 

91 "type": _get_field_type(field_info), 

92 "required": _get_field_required(field_info), 

93 "style": _get_field_style(field_info), 

94 "units": _get_field_units(field_info), 

95 "description": _get_field_description(field_info), 

96 "options": _get_field_options(field_info), 

97 "alias": _get_field_alias(field_info), 

98 "example": _get_field_example(field_info), 

99 "default": _get_field_default(field_info), 

100 } 

101 

102 field_dict[field_name] = field_data 

103 

104 return field_dict 

105 

106 

107def _get_field_type(field_info): 

108 """Extract type information from Pydantic field""" 

109 annotation = field_info.annotation 

110 

111 # Handle Union types (e.g., str | int) 

112 origin = get_origin(annotation) 

113 if origin is not None: 

114 args = get_args(annotation) 

115 if origin is Union or ( 

116 hasattr(origin, "__name__") and origin.__name__ == "UnionType" 

117 ): 

118 # For Union types, get the first non-None type 

119 for arg in args: 

120 if arg is not type(None): 

121 annotation = arg 

122 break 

123 

124 # Map Python types to string representations that BaseDict now accepts 

125 type_mapping = { 

126 str: "string", 

127 int: "integer", 

128 float: "float", 

129 bool: "boolean", 

130 list: "list", # Now supported by BaseDict 

131 dict: "dict", # Now supported by BaseDict 

132 } 

133 

134 if annotation in type_mapping: 

135 return type_mapping[annotation] 

136 elif hasattr(annotation, "__name__"): 

137 # For custom classes, map to appropriate types that BaseDict accepts 

138 name = annotation.__name__.lower() 

139 if name.endswith("enum"): 

140 return "string" # Enums are essentially strings with options 

141 elif "list" in name or "array" in name: 

142 return "list" # Use list type 

143 elif "dict" in name: 

144 return "dict" # Use dict type 

145 elif ( 

146 "comment" in name 

147 or "citation" in name 

148 or any(x in name for x in ["person", "location", "fdsn"]) 

149 ): 

150 return "object" # Complex objects as object type 

151 else: 

152 return "object" # Default to object for complex types 

153 else: 

154 return "string" # Default fallback 

155 

156 

157def _get_field_required(field_info): 

158 """Extract required status from Pydantic field""" 

159 # Check json_schema_extra first 

160 if hasattr(field_info, "json_schema_extra") and field_info.json_schema_extra: 

161 if "required" in field_info.json_schema_extra: 

162 return field_info.json_schema_extra["required"] 

163 

164 # Check if field has a default value - if no default, it's required 

165 from pydantic_core import PydanticUndefined 

166 

167 return field_info.default is PydanticUndefined and ( 

168 field_info.default_factory is None 

169 or field_info.default_factory is PydanticUndefined 

170 ) 

171 

172 

173def _get_field_style(field_info): 

174 """Extract style information from Pydantic field""" 

175 # Check for pattern in field constraints 

176 if hasattr(field_info, "pattern") and field_info.pattern: 

177 return f"pattern: {field_info.pattern}" 

178 

179 # Check json_schema_extra for style 

180 if hasattr(field_info, "json_schema_extra") and field_info.json_schema_extra: 

181 if "style" in field_info.json_schema_extra: 

182 return field_info.json_schema_extra["style"] 

183 

184 return "free form" 

185 

186 

187def _get_field_units(field_info): 

188 """Extract units information from Pydantic field""" 

189 if hasattr(field_info, "json_schema_extra") and field_info.json_schema_extra: 

190 if "units" in field_info.json_schema_extra: 

191 return field_info.json_schema_extra["units"] 

192 return None 

193 

194 

195def _get_field_description(field_info): 

196 """Extract description from Pydantic field""" 

197 return getattr(field_info, "description", "No description available") 

198 

199 

200def _get_field_options(field_info): 

201 """Extract options/choices from Pydantic field""" 

202 # Check for enum or choices in constraints 

203 options = [] 

204 

205 # Check for enum type 

206 annotation = field_info.annotation 

207 if hasattr(annotation, "__members__"): # Enum type 

208 options = list(annotation.__members__.keys()) 

209 

210 # Check json_schema_extra for options 

211 if hasattr(field_info, "json_schema_extra") and field_info.json_schema_extra: 

212 if "options" in field_info.json_schema_extra: 

213 schema_options = field_info.json_schema_extra["options"] 

214 if isinstance(schema_options, (list, tuple)): 

215 options.extend(schema_options) 

216 

217 return options # Return empty list instead of None to avoid validation errors 

218 

219 

220def _get_field_alias(field_info): 

221 """Extract alias from Pydantic field""" 

222 alias = getattr(field_info, "alias", None) 

223 return alias if alias is not None else "" 

224 

225 

226def _get_field_example(field_info): 

227 """Extract example from Pydantic field""" 

228 # First check the direct examples attribute (deprecated but still used) 

229 examples = getattr(field_info, "examples", None) 

230 

231 # If not found, check json_schema_extra for examples 

232 if ( 

233 examples is None 

234 and hasattr(field_info, "json_schema_extra") 

235 and field_info.json_schema_extra 

236 ): 

237 examples = field_info.json_schema_extra.get("examples", None) 

238 

239 if hasattr(examples, "__iter__") and not isinstance(examples, str): 

240 if examples and len(examples) > 0: 

241 return str(examples[0]) 

242 else: 

243 return str(examples) if examples is not None else "" 

244 

245 

246def _get_field_default(field_info): 

247 """Extract default value from Pydantic field""" 

248 from pydantic_core import PydanticUndefined 

249 

250 if field_info.default is not PydanticUndefined: 

251 # Convert to string for storage 

252 default_val = field_info.default 

253 if isinstance(default_val, (str, int, float, bool)): 

254 return str(default_val) 

255 elif default_val is None: 

256 return "" 

257 else: 

258 return str(default_val) 

259 elif ( 

260 field_info.default_factory is not None 

261 and field_info.default_factory is not PydanticUndefined 

262 ): 

263 try: 

264 default_val = field_info.default_factory() 

265 if isinstance(default_val, (str, int, float, bool)): 

266 return str(default_val) 

267 else: 

268 return str(type(default_val).__name__) 

269 except: 

270 return "" 

271 return "" 

272 

273 

274def collect_basemodel_objects(module: str) -> dict[type[MetadataBase], str]: 

275 """ 

276 Collect all MetadataBase subclasses from a given module. 

277 

278 Parameters 

279 ---------- 

280 module : str 

281 The module to inspect (e.g., 'mt_metadata.timeseries') 

282 

283 Returns 

284 ------- 

285 dict[type[MetadataBase], str] 

286 Dictionary mapping class objects to their names 

287 """ 

288 import importlib 

289 import inspect 

290 

291 mod = importlib.import_module(f"mt_metadata.{module}") 

292 basemodel_classes = {} 

293 for name, obj in inspect.getmembers(mod, inspect.isclass): 

294 if issubclass(obj, MetadataBase) and obj is not MetadataBase: 

295 basemodel_classes[obj] = validate_name(name) 

296 return basemodel_classes 

297 

298 

299def summarize_pydantic_standards(module: str = "timeseries") -> BaseDict: 

300 """ 

301 Summarize the standards for metadata using Pydantic v2 MetadataBase classes. 

302 Similar to summarize_timeseries_standards but works with the new Pydantic structure. 

303 

304 Parameters 

305 ---------- 

306 module : str, optional 

307 The module to inspect, by default "timeseries" 

308 

309 Returns 

310 ------- 

311 BaseDict 

312 BaseDict object containing summarized field information 

313 """ 

314 metadata_classes = collect_basemodel_objects(module) 

315 

316 summary_dict = BaseDict() 

317 

318 for metadata_class, class_name in metadata_classes.items(): 

319 try: 

320 class_fields = extract_metadata_fields_from_pydantic(metadata_class) 

321 summary_dict.add_dict(class_fields, class_name) 

322 except Exception as e: 

323 logger.exception(e) 

324 logger.warning(f"Could not process {class_name} fields: {e}") 

325 

326 return summary_dict 

327 

328 

329def summary_to_array(summary_dict, dtype=SUMMARIZE_DTYPE) -> np.ndarray: 

330 """ 

331 Summarize all metadata from a summarized dictionary of standards 

332 

333 Parameters 

334 ---------- 

335 summary_dict : dict 

336 Dictionary of summarized standards 

337 

338 Returns 

339 ------- 

340 np.array 

341 numpy structured array 

342 """ 

343 

344 entries = np.zeros(len(summary_dict.keys()) + 1, dtype=dtype) 

345 entries[0]["attribute"] = "mt_metadata.standards.version" 

346 entries[0]["description"] = f"Metadata standards version {__version__}" 

347 entries[0]["type"] = "string" 

348 entries[0]["style"] = "free form" 

349 count = 1 

350 for key, v_dict in summary_dict.items(): 

351 entries[count]["attribute"] = key 

352 for dkey in dtype.names[1:]: 

353 value = v_dict[dkey] 

354 

355 if isinstance(value, list): 

356 if len(value) == 0: 

357 value = "" 

358 

359 else: 

360 value = ",".join(["{0}".format(ii) for ii in value]) 

361 if value is None: 

362 value = "" 

363 

364 entries[count][dkey] = value 

365 count += 1 

366 

367 return entries 

368 

369 

370def summary_to_dataframe(summary_dict): 

371 """ 

372 Convert a summary dictionary to a pandas DataFrame. 

373 

374 Parameters 

375 ---------- 

376 summary_dict : dict 

377 Dictionary of summarized standards 

378 

379 Returns 

380 ------- 

381 pd.DataFrame 

382 DataFrame containing the summarized standards 

383 """ 

384 entries = summary_to_array(summary_dict) 

385 return pd.DataFrame(entries) 

386 

387 

388def summarize_standards( 

389 module="timeseries", 

390 csv_fn=None, 

391 output_type="dataframe", 

392 dtype=SUMMARIZE_DTYPE, 

393) -> Union[pd.DataFrame, np.ndarray]: 

394 """ 

395 Summarize standards into a numpy array and write a csv if specified 

396 

397 Parameters 

398 ---------- 

399 module : str, optional 

400 Module to summarize, by default "timeseries" 

401 csv_fn : str or Path, optional 

402 Full path to write a csv file, by default None 

403 

404 Returns 

405 ------- 

406 numpy.ndarray | pd.DataFrame 

407 If output_type is "array", returns a numpy structured array. 

408 If output_type is "dataframe", returns a pandas DataFrame. 

409 """ 

410 

411 summary_dict = summarize_pydantic_standards(module) 

412 if output_type == "array": 

413 return summary_to_array(summary_dict, dtype=dtype) 

414 elif output_type == "dataframe" and csv_fn is None: 

415 return summary_to_dataframe(summary_dict) 

416 

417 elif csv_fn: 

418 summary_df = summary_to_dataframe(summary_dict) 

419 summary_df.to_csv(csv_fn, index=False) 

420 return summary_df