Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mth5 \ mth5 \ helpers.py: 80%

319 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-27 20:09 -0800

1# -*- coding: utf-8 -*- 

2""" 

3Helper functions for HDF5 

4 

5Created on Tue Jun 2 12:37:50 2020 

6 

7:copyright: 

8 Jared Peacock (jpeacock@usgs.gov) 

9 

10:license: 

11 MIT 

12 

13""" 

14import gc 

15import inspect 

16 

17# ============================================================================= 

18# Imports 

19# ============================================================================= 

20from collections.abc import Iterable 

21from typing import Any, Type 

22 

23import h5py 

24import numpy as np 

25from loguru import logger 

26from mt_metadata.base import MetadataBase 

27from pydantic.fields import FieldInfo 

28 

29 

30# ============================================================================= 

31# Acceptable compressions 

32# ============================================================================= 

33COMPRESSION = ["lzf", "gzip", "szip", None] 

34COMPRESSION_LEVELS = { 

35 "lzf": [None], 

36 "gzip": range(10), 

37 "szip": ["ec-8", "ee-10", "nn-8", "nn-10"], 

38 None: [None], 

39} 

40 

41 

42def validate_compression( 

43 compression: str | None, level: int | str | None 

44) -> tuple[str | None, int | str | None]: 

45 """ 

46 Validate that the input compression is supported. 

47 

48 Parameters 

49 ---------- 

50 compression : str or None 

51 Type of lossless compression. Options are 'lzf', 'gzip', 'szip', or None. 

52 level : int, str, or None 

53 Compression level if supported. 

54 - int for 'gzip' (0-9) 

55 - str for 'szip' ('ec-8', 'ee-10', 'nn-8', 'nn-10') 

56 - None for 'lzf' or None compression 

57 

58 Returns 

59 ------- 

60 compression : str or None 

61 Validated compression type 

62 level : int, str, or None 

63 Validated compression level 

64 

65 Raises 

66 ------ 

67 ValueError 

68 If compression or level are not supported 

69 TypeError 

70 If compression is not a string or None, or if compression level 

71 type is incorrect for the specified compression type 

72 

73 """ 

74 if compression is None: 

75 return None, None 

76 if not isinstance(compression, (str, type(None))): 

77 msg = f"compression type must be a string, not {type(compression)}" 

78 logger.error(msg) 

79 raise TypeError(msg) 

80 if not compression in COMPRESSION: 

81 msg = ( 

82 f"Compression type {compression} not supported. " 

83 f"Supported options are {COMPRESSION}" 

84 ) 

85 logger.error(msg) 

86 raise ValueError(msg) 

87 if compression == "lzf": 

88 level = COMPRESSION_LEVELS["lzf"][0] 

89 elif compression == " gzip": 

90 if not isinstance(level, (int)): 

91 msg = ( 

92 f"Level type for gzip must be an int, not {type(level)}. " 

93 f"Options are {COMPRESSION_LEVELS['gzip']}" 

94 ) 

95 logger.error(msg) 

96 raise TypeError(msg) 

97 elif compression == " szip": 

98 if not isinstance(level, (str)): 

99 msg = ( 

100 f"Level type for szip must be an str, not {type(level)}. " 

101 f"Options are {COMPRESSION_LEVELS['szip']}" 

102 ) 

103 logger.error(msg) 

104 raise TypeError(msg) 

105 if not level in COMPRESSION_LEVELS[compression]: 

106 msg = ( 

107 f"compression level {level} not supported for {compression}. " 

108 f"Options are {COMPRESSION_LEVELS[compression]}" 

109 ) 

110 

111 logger.error(msg) 

112 raise ValueError(msg) 

113 return compression, level 

114 

115 

116def recursive_hdf5_tree( 

117 group: h5py.Group | h5py.File | h5py.Dataset, lines: list[str] | None = None 

118) -> str: 

119 """ 

120 Recursively traverse an HDF5 group and return a string representation of its structure. 

121 

122 Parameters 

123 ---------- 

124 group : h5py.Group, h5py.File, or h5py.Dataset 

125 HDF5 object to traverse 

126 lines : list of str, optional 

127 List to accumulate the tree representation lines. If None, an empty list is used. 

128 

129 Returns 

130 ------- 

131 str 

132 String representation of the HDF5 tree structure 

133 

134 Notes 

135 ----- 

136 This function recursively traverses HDF5 groups and files, building a text 

137 representation of the structure including groups, datasets, and attributes. 

138 """ 

139 if lines is None: 

140 lines = [] 

141 if isinstance(group, (h5py._hl.group.Group, h5py._hl.files.File)): 

142 for key, value in group.items(): 

143 lines.append(f"-{key}: {value}") 

144 recursive_hdf5_tree(value, lines) 

145 elif isinstance(group, h5py._hl.dataset.Dataset): 

146 for key, value in group.attrs.items(): 

147 lines.append(f"\t-{key}: {value}") 

148 return "\n".join(lines) 

149 

150 

151def close_open_files() -> None: 

152 """ 

153 Close all open HDF5 files found in memory. 

154 

155 This function searches through all objects in memory using garbage collection 

156 to find and close any open HDF5 files. This is useful for cleanup operations 

157 to ensure no files are left open. 

158 

159 Notes 

160 ----- 

161 This function iterates through all objects in memory and attempts to close 

162 any h5py.File objects that are found. If a file is already closed, it will 

163 log that information. Any exceptions during the process are caught and logged. 

164 """ 

165 for obj in gc.get_objects(): 

166 try: 

167 if isinstance(obj, h5py.File): 

168 msg = "Found HDF5 File object " 

169 logger.debug(msg) 

170 try: 

171 msg = f"{obj.filename}, " 

172 obj.flush() 

173 obj.close() 

174 msg += "Closed File" 

175 logger.info(msg) 

176 except: 

177 msg += f"{obj.filename} file already closed." 

178 logger.info(msg) 

179 except: 

180 logger.debug(f"Object {type(obj)} does not have __class__") 

181 

182 

183def get_tree(parent: h5py.Group | h5py.File) -> str: 

184 """ 

185 Recursively print the contents of an HDF5 group in a formatted tree structure. 

186 

187 Parameters 

188 ---------- 

189 parent : h5py.Group or h5py.File 

190 HDF5 (sub-)tree to print 

191 

192 Returns 

193 ------- 

194 str 

195 Formatted string representation of the HDF5 tree structure 

196 

197 Raises 

198 ------ 

199 TypeError 

200 If the provided object is not an h5py.File or h5py.Group object 

201 

202 Notes 

203 ----- 

204 This function creates a hierarchical text representation of an HDF5 file 

205 or group structure, showing groups and datasets with appropriate indentation 

206 and formatting. 

207 """ 

208 lines = ["{0}:".format(parent.name), "=" * 20] 

209 if not isinstance(parent, (h5py.File, h5py.Group)): 

210 raise TypeError("Provided object is not a h5py.File or h5py.Group " "object") 

211 

212 def fancy_print(name: str, obj: h5py.Group | h5py.Dataset) -> None: 

213 # lines.append(name) 

214 spacing = " " * 4 * (name.count("/") + 1) 

215 group_name = name[name.rfind("/") + 1 :] 

216 

217 if isinstance(obj, h5py.Group): 

218 lines.append(f"{spacing}|- Group: {group_name}") 

219 lines.append("{0}{1}".format(spacing, (len(group_name) + 10) * "-")) 

220 elif isinstance(obj, h5py.Dataset): 

221 lines.append(f"{spacing}--> Dataset: {group_name}") 

222 lines.append("{0}{1}".format(spacing, (len(group_name) + 15) * ".")) 

223 

224 # lines.append(parent.name) 

225 parent.visititems(fancy_print) 

226 return "\n".join(lines) 

227 

228 

229def to_numpy_type(value: Any) -> Any: 

230 """ 

231 Convert a value to a numpy/HDF5 compatible type. 

232 

233 This function handles the conversion of various Python data types to formats 

234 that are compatible with both NumPy and HDF5. For numbers and booleans, this 

235 is straightforward as they are automatically mapped to numpy types. For strings 

236 and complex data structures, special handling is required. 

237 

238 Parameters 

239 ---------- 

240 value : any 

241 The value to convert to a numpy/HDF5 compatible type 

242 

243 Returns 

244 ------- 

245 various 

246 The converted value in a numpy/HDF5 compatible format: 

247 - None becomes "none" string 

248 - Dictionaries and lists become JSON strings 

249 - Type objects become string representations 

250 - h5py References become strings 

251 - Object arrays become string representations 

252 - Iterables with strings become numpy byte arrays 

253 - Other iterables become numpy arrays 

254 - Basic types (str, int, float, bool, complex) are returned as-is 

255 

256 Notes 

257 ----- 

258 HDF5 should only deal with ASCII characters or Unicode. No binary data 

259 is allowed. This function ensures compatibility by converting complex 

260 Python objects to appropriate string or array representations. 

261 

262 Lists and dictionaries are converted to JSON strings for storage in HDF5, 

263 which can be reconstructed using `from_numpy_type`. 

264 """ 

265 

266 if value is None: 

267 return "none" 

268 # For now turn references into a generic string 

269 if isinstance(value, h5py.h5r.Reference): 

270 value = str(value) 

271 

272 # Handle enum instances - convert to their string value 

273 from enum import Enum 

274 

275 if isinstance(value, Enum): 

276 return str(value.value) 

277 

278 # Handle enum type classes - store them in a recognizable format 

279 # Check if value is a class that is a subclass of Enum 

280 if isinstance(value, type) and issubclass(value, Enum): 

281 # Store as "enum:module.ClassName" for later reconstruction 

282 return f"enum:{value.__module__}.{value.__qualname__}" 

283 

284 # Handle type objects and classes that might come from pydantic serialization 

285 if isinstance(value, type): 

286 # Use a stable, fully-qualified type name rather than the raw repr 

287 type_str = f"{value.__module__}.{value.__qualname__}" 

288 logger.warning( 

289 f"Converting type object {value!r} to its fully qualified name " 

290 f"{type_str!r} for HDF5 metadata storage. " 

291 "This may indicate that a type object was passed where a value was expected." 

292 ) 

293 return type_str 

294 

295 # Handle dictionaries and lists by converting to JSON 

296 if isinstance(value, (dict, list)): 

297 try: 

298 import json 

299 

300 return json.dumps(value) 

301 except (TypeError, ValueError): 

302 # If JSON serialization fails, convert to string 

303 return str(value) 

304 

305 # Handle numpy arrays with object dtype 

306 if isinstance(value, np.ndarray) and value.dtype == np.dtype("O"): 

307 # Try to convert to string representation 

308 return str(value) 

309 

310 if isinstance( 

311 value, 

312 ( 

313 str, 

314 np.str_, 

315 int, 

316 float, 

317 bool, 

318 complex, 

319 np.int_, 

320 np.float64, 

321 np.bool_, 

322 np.complex128, 

323 ), 

324 ): 

325 return value 

326 if isinstance(value, Iterable): 

327 if np.any([type(x) in [str, bytes, np.str_] for x in value]): 

328 return np.array(value, dtype="S") 

329 else: 

330 try: 

331 converted_array = np.array(value) 

332 # Check if the resulting array has object dtype 

333 if converted_array.dtype == np.dtype("O"): 

334 return str(value) 

335 return converted_array 

336 except (ValueError, TypeError): 

337 # If we can't convert to numpy array, convert to string representation 

338 return str(value) 

339 else: 

340 # For pydantic models and other complex objects, convert to string 

341 try: 

342 # First try to convert directly 

343 converted_array = np.array(value) 

344 # Check if the resulting array has object dtype 

345 if converted_array.dtype == np.dtype("O"): 

346 return str(value) 

347 return converted_array 

348 except (ValueError, TypeError): 

349 # If that fails, convert to string representation 

350 return str(value) 

351 

352 

353def validate_name(name: str) -> str: 

354 """ 

355 Clean a name by replacing spaces and slashes with underscores. 

356 

357 Parameters 

358 ---------- 

359 name : str 

360 The name to validate and clean 

361 

362 Returns 

363 ------- 

364 str 

365 The cleaned name with spaces and slashes replaced by underscores 

366 

367 Notes 

368 ----- 

369 This function ensures that names are compatible with HDF5 naming conventions 

370 by removing problematic characters. 

371 """ 

372 

373 return name.replace(" ", "_").replace("/", "_") 

374 

375 

376def from_numpy_type(value: Any) -> Any: 

377 """ 

378 Convert a value from numpy/HDF5 format back to standard Python types. 

379 

380 This function handles the reverse conversion from numpy/HDF5 compatible types 

381 back to standard Python data types. It's the counterpart to `to_numpy_type`. 

382 

383 Parameters 

384 ---------- 

385 value : any 

386 The value to convert from numpy/HDF5 format 

387 

388 Returns 

389 ------- 

390 various 

391 The converted value in standard Python format: 

392 - "none" string becomes None 

393 - JSON strings become dictionaries or lists 

394 - h5py References become strings 

395 - Numpy types become standard Python types 

396 - Byte arrays become string lists 

397 - Other arrays become Python lists 

398 

399 Raises 

400 ------ 

401 TypeError 

402 If the value type is not understood or supported 

403 

404 Notes 

405 ----- 

406 This function reverses the conversions made by `to_numpy_type`, including: 

407 - Converting JSON strings back to dictionaries and lists 

408 - Converting "none" strings back to None 

409 - Converting numpy arrays back to Python lists 

410 - Handling deprecated numpy.bool types 

411 

412 For numbers and booleans, they are automatically mapped from h5py to numpy types. 

413 For strings, especially lists of strings, special handling is required. 

414 HDF5 deals with ASCII characters or Unicode, no binary data is allowed. 

415 """ 

416 

417 if value is None: 

418 return "none" 

419 

420 # Convert "none" string back to None when reading from HDF5 

421 if isinstance(value, str) and value.lower() == "none": 

422 return None 

423 

424 # Handle JSON-like strings that represent dictionaries or lists from HDF5 

425 if isinstance(value, str): 

426 # Check if it looks like a JSON dictionary or list 

427 if (value.startswith("{") and value.endswith("}")) or ( 

428 value.startswith("[") and value.endswith("]") 

429 ): 

430 try: 

431 import json 

432 

433 parsed = json.loads(value) 

434 return parsed 

435 except (json.JSONDecodeError, ValueError): 

436 # If JSON parsing fails, just return the string 

437 pass 

438 

439 # For now turn references into a generic string 

440 if isinstance(value, h5py.h5r.Reference): 

441 value = str(value) 

442 if isinstance( 

443 value, 

444 ( 

445 str, 

446 np.str_, 

447 int, 

448 float, 

449 bool, 

450 complex, 

451 np.int32, 

452 np.float64, 

453 np.complex128, 

454 np.intp, 

455 np.bool_, # Add support for numpy.bool_ 

456 ), 

457 ): 

458 return value 

459 

460 # Handle deprecated numpy.bool (numpy >=1.20 deprecates numpy.bool) 

461 if isinstance(value, (bool, np.bool_)): 

462 return bool(value) 

463 

464 # if isinstance( 

465 # value, 

466 # ( 

467 # np.int32, 

468 # ) 

469 # ): 

470 # return np.int64(value) 

471 if isinstance(value, Iterable): 

472 if np.any([type(x) in [bytes, np.bytes_] for x in value]): 

473 return np.array(value, dtype="U").tolist() 

474 else: 

475 return np.array(value).tolist() 

476 else: 

477 raise TypeError("Type {0} not understood".format(type(value))) 

478 

479 

480def coerce_value_to_expected_type(key: str, value: Any, expected_type: Any) -> Any: 

481 """ 

482 Coerce a value to the expected type based on metadata field definitions. 

483 

484 This method handles type conversions for older MTH5 files that may have 

485 stored metadata with less strict type enforcement. Uses the metadata's 

486 attribute_information method to get expected types. 

487 

488 Parameters 

489 ---------- 

490 key : str 

491 Metadata field name (may include dots for nested attributes). 

492 value : Any 

493 Value to coerce. 

494 expected_type : Any 

495 Expected value type (can be a type object or string representation). 

496 Returns 

497 ------- 

498 Any 

499 Coerced value matching expected type, or original value if coercion fails. 

500 

501 Examples 

502 -------- 

503 >>> coerced = channel._coerce_value_to_expected_type('sample_rate', '256.0', float) 

504 >>> print(type(coerced), coerced) 

505 <class 'float'> 256.0 

506 

507 >>> coerced = channel._coerce_value_to_expected_type('channel_number', 1.0, int) 

508 >>> print(type(coerced), coerced) 

509 <class 'int'> 1 

510 """ 

511 # Return None values as-is 

512 if value is None: 

513 return value 

514 

515 try: 

516 if expected_type is None: 

517 return value 

518 

519 # Convert string representation to type if needed 

520 if isinstance(expected_type, str): 

521 try: 

522 expected_type = get_data_type(expected_type) 

523 except ValueError: 

524 # Can't convert, return original value 

525 return value 

526 

527 # Already the correct type 

528 if isinstance(value, expected_type): 

529 return value 

530 

531 # Handle common type coercions 

532 if expected_type == float: 

533 if isinstance(value, (int, str, np.integer, np.floating)): 

534 try: 

535 return float(value) 

536 except (ValueError, TypeError): 

537 logger.debug(f"Could not coerce {key}={value} to float") 

538 return value 

539 elif isinstance(value, list): 

540 if len(value) == 1: 

541 try: 

542 return float(value[0]) 

543 except (ValueError, TypeError): 

544 logger.debug(f"Could not coerce {key}={value} to float") 

545 return value 

546 

547 elif expected_type == int: 

548 if isinstance(value, (float, str, np.integer, np.floating)): 

549 try: 

550 return int(value) 

551 except (ValueError, TypeError): 

552 logger.debug(f"Could not coerce {key}={value} to int") 

553 return value 

554 elif isinstance(value, list): 

555 if len(value) == 1: 

556 try: 

557 return int(value[0]) 

558 except (ValueError, TypeError): 

559 logger.debug(f"Could not coerce {key}={value} to int") 

560 return value 

561 

562 elif expected_type == str: 

563 if isinstance(value, list): 

564 if len(value) == 1: 

565 try: 

566 return str(value[0]) 

567 except (ValueError, TypeError): 

568 logger.debug(f"Could not coerce {key}={value} to str") 

569 return value 

570 elif not isinstance(value, str): 

571 try: 

572 return str(value) 

573 except (ValueError, TypeError): 

574 logger.debug(f"Could not coerce {key}={value} to str") 

575 return value 

576 

577 elif expected_type == bool: 

578 if isinstance(value, (int, float, str, np.integer, np.floating)): 

579 try: 

580 # Handle string representations 

581 if isinstance(value, str): 

582 return value.lower() in ("true", "1", "yes", "y") 

583 # Handle numeric representations 

584 return bool(value) 

585 except (ValueError, TypeError): 

586 logger.debug(f"Could not coerce {key}={value} to bool") 

587 return value 

588 elif isinstance(value, list): 

589 if len(value) == 1: 

590 try: 

591 val = value[0] 

592 if isinstance(val, str): 

593 return val.lower() in ("true", "1", "yes", "y") 

594 return bool(val) 

595 except (ValueError, TypeError): 

596 logger.debug(f"Could not coerce {key}={value} to bool") 

597 return value 

598 

599 elif expected_type == list: 

600 if isinstance(value, str): 

601 # Handle string representations of lists 

602 try: 

603 import json 

604 

605 return json.loads(value) 

606 except (json.JSONDecodeError, ValueError): 

607 # Try comma-separated values 

608 if "," in value: 

609 return [v.strip() for v in value.split(",")] 

610 logger.debug(f"Could not coerce {key}={value} to list") 

611 return value 

612 elif not isinstance(value, list): 

613 # Try to convert to list 

614 try: 

615 return list(value) 

616 except (ValueError, TypeError): 

617 logger.debug(f"Could not coerce {key}={value} to list") 

618 return value 

619 

620 except Exception as e: 

621 # If anything goes wrong, log and return original value 

622 logger.debug(f"Exception during type coercion for {key}: {e}") 

623 return value 

624 

625 # Return original value if no coercion applied 

626 return value 

627 

628 

629def get_metadata_type_dict(metadata_class: MetadataBase) -> dict[str, Type[Any]]: 

630 """ 

631 get dictionary of expected data types from the metadata object. 

632 

633 Parameters 

634 ---------- 

635 metadata_class : MetadataBase 

636 Metadata class to extract data types from 

637 

638 Returns 

639 ------- 

640 dict[str, Type[Any]] 

641 Dictionary mapping metadata field names to their expected data types. 

642 """ 

643 type_dict = {} 

644 for key, field_info in metadata_class.get_all_fields().items(): 

645 type_str = field_info.get("type") 

646 if isinstance(type_str, type): 

647 # Already a type object 

648 type_dict[key] = type_str 

649 elif isinstance(type_str, str): 

650 # Convert string representation to type 

651 try: 

652 type_dict[key] = get_data_type(type_str) 

653 except ValueError: 

654 # If conversion fails, store the string 

655 type_dict[key] = type_str 

656 else: 

657 type_dict[key] = type_str 

658 return type_dict 

659 

660 

661def get_data_type(string_representation: str) -> Type[Any]: 

662 """ 

663 Get the Python data type from its string representation. 

664 

665 Parameters 

666 ---------- 

667 string_representation : str 

668 String representation of the data type (e.g., 'int', 'float', 'str'). 

669 

670 Returns 

671 ------- 

672 type 

673 Corresponding Python data type. 

674 

675 Raises 

676 ------ 

677 ValueError 

678 If the string representation does not correspond to a known data type. 

679 

680 Notes 

681 ----- 

682 This function maps common string representations of data types to their 

683 corresponding Python types. It supports basic types like int, float, str, 

684 bool, list, and dict. 

685 """ 

686 type_mapping = { 

687 "int": int, 

688 "float": float, 

689 "str": str, 

690 "bool": bool, 

691 "list": list, 

692 "dict": dict, 

693 "complex": complex, 

694 "object": str, # Treat object type as str for HDF5 storage 

695 "mt_metadata.common.mttime.MTime": str, 

696 } 

697 

698 if isinstance(string_representation, type): 

699 return string_representation 

700 elif not isinstance(string_representation, str): 

701 print(type(string_representation), string_representation) 

702 raise ValueError( 

703 f"Input must be a string representation of a data type, not " 

704 f"{type(string_representation)}" 

705 ) 

706 

707 # Handle Union types (e.g., "ChannelOrientationEnum | None" or "HttpUrl | str | None") 

708 # For Union types with "|", extract the first non-None type and treat as str if complex 

709 if " | " in string_representation: 

710 # Extract the first non-None type from the union 

711 parts = [p.strip() for p in string_representation.split(" | ")] 

712 non_none_parts = [p for p in parts if p.lower() != "none"] 

713 if non_none_parts: 

714 first_type = non_none_parts[0] 

715 # If it's a complex type (has dots or is an Enum), return str 

716 if "." in first_type or "Enum" in first_type or "Url" in first_type: 

717 return str 

718 # Otherwise try to get the data type for the first type 

719 try: 

720 return get_data_type(first_type) 

721 except (ValueError, KeyError): 

722 return str 

723 # If only None in the union, return str 

724 return str 

725 

726 # Handle enum type patterns - both old format and new format 

727 # Old format: "<enum 'DataTypeEnum'>" or similar 

728 # New format: "enum:module.ClassName" 

729 if string_representation.startswith("enum:"): 

730 # New format - just return str as the expected type for enums 

731 return str 

732 if "<enum " in string_representation or "<class 'enum" in string_representation: 

733 # Old format from previous versions - treat as str 

734 return str 

735 if "MTime" in string_representation: 

736 return str 

737 if "EmailStr" in string_representation: 

738 return str 

739 

740 dtype = ( 

741 string_representation.replace("'<class", "") 

742 .replace("'>", "") 

743 .replace("<class '", "") 

744 .replace("'>", "") 

745 .replace("<class", "") 

746 .replace("'", "") 

747 .replace(">", "") 

748 .split("|")[0] 

749 .strip() 

750 ) 

751 if "[" in dtype and "]" in dtype: 

752 dtype = dtype[: dtype.find("[")].strip() 

753 try: 

754 return type_mapping[dtype.lower()] 

755 except KeyError: 

756 raise ValueError( 

757 f"Unknown data type string representation: {string_representation}" 

758 ) 

759 

760 

761def read_attrs_to_dict( 

762 attrs_dict: dict[str, Any], metadata_object: MetadataBase 

763) -> dict[str, Any]: 

764 """ 

765 Read HDF5 attributes from a group or dataset into a dictionary. 

766 

767 Parameters 

768 ---------- 

769 attrs_dict : dict[str, Any] 

770 Dictionary of attributes to read and convert. 

771 metadata_object : MetadataBase 

772 Metadata object to use for type information. 

773 

774 Returns 

775 ------- 

776 dict[str, Any] 

777 Dictionary containing attribute names and their corresponding values. 

778 """ 

779 data_types = get_metadata_type_dict(metadata_object) 

780 

781 for key, value in list(attrs_dict.items()): 

782 # First convert from numpy types 

783 value = from_numpy_type(value) 

784 

785 # Skip None values - let pydantic use defaults instead 

786 # This handles legacy files where some fields weren't set 

787 if value is None: 

788 del attrs_dict[key] 

789 continue 

790 

791 # Then coerce to expected type based on metadata schema 

792 # Check if key exists in data_types (may not exist for legacy attributes) 

793 if key in data_types: 

794 attrs_dict[key] = coerce_value_to_expected_type( 

795 key, value, get_data_type(data_types[key]) 

796 ) 

797 else: 

798 # Keep the value as-is if we don't have type information 

799 attrs_dict[key] = value 

800 return attrs_dict 

801 

802 

803# ============================================================================= 

804# 

805# ============================================================================= 

806def inherit_doc_string(cls: Type[Any]) -> Type[Any]: 

807 """ 

808 Class decorator to inherit docstring from parent classes. 

809 

810 This decorator searches through the method resolution order (MRO) of a class 

811 to find the first parent class with a docstring and applies it to the current class. 

812 

813 Parameters 

814 ---------- 

815 cls : type 

816 The class to apply docstring inheritance to 

817 

818 Returns 

819 ------- 

820 type 

821 The same class with inherited docstring if found 

822 

823 Notes 

824 ----- 

825 This is useful for subclasses that should inherit documentation from their 

826 parent classes when they don't have their own docstring defined. 

827 """ 

828 for base in inspect.getmro(cls): 

829 if base.__doc__ is not None: 

830 cls.__doc__ = base.__doc__ 

831 break 

832 return cls 

833 

834 

835def validate_name(name: str | None, pattern: str | None = None) -> str: 

836 """ 

837 Validate and clean a name for HDF5 compatibility. 

838 

839 Parameters 

840 ---------- 

841 name : str or None 

842 The name to validate and clean 

843 pattern : str, optional 

844 Pattern for validation (currently not used but reserved for future use) 

845 

846 Returns 

847 ------- 

848 str 

849 The cleaned name with spaces replaced by underscores and commas removed. 

850 Returns "unknown" if input name is None. 

851 

852 Notes 

853 ----- 

854 This function ensures that names are compatible with HDF5 naming conventions 

855 by removing problematic characters. If the input name is None, it returns 

856 "unknown" as a default value. 

857 """ 

858 if name is None: 

859 return "unknown" 

860 return name.replace(" ", "_").replace(",", "") 

861 

862 

863def add_attributes_to_metadata_class_pydantic(obj: Type[Any]) -> Type[Any]: 

864 """ 

865 Add MTH5-specific attributes to a pydantic metadata class. 

866 

867 This function enhances a pydantic class by adding two important fields: 

868 - mth5_type: derived from the class name, indicates the type of MTH5 group 

869 - hdf5_reference: stores the HDF5 internal reference 

870 

871 Parameters 

872 ---------- 

873 obj : type 

874 A pydantic class to enhance with MTH5 attributes 

875 

876 Returns 

877 ------- 

878 object 

879 An instance of the enhanced class with added MTH5-specific fields 

880 

881 Raises 

882 ------ 

883 TypeError 

884 If the input is not a class 

885 

886 Notes 

887 ----- 

888 This function is used to dynamically add metadata fields that are required 

889 for MTH5 group management. The mth5_type field is derived from the class 

890 name by removing "Group" suffix, and the hdf5_reference field is initialized 

891 to None but will be set when the object is associated with an HDF5 group. 

892 """ 

893 if not inspect.isclass(obj): 

894 raise TypeError("Input must be a class") 

895 

896 # Create an instance of the class 

897 obj = obj() 

898 # Create FieldInfo for mth5_type 

899 mth5_type_field = FieldInfo( 

900 annotation=str, 

901 default=obj._class_name.split("Group")[0], 

902 description="type of group", 

903 json_schema_extra={ 

904 "required": True, 

905 "units": None, 

906 "examples": ["group_name"], 

907 }, 

908 ) 

909 

910 # Use add_new_field to add mth5_type - this returns a class, not an instance 

911 enhanced_class = obj.add_new_field("mth5_type", mth5_type_field)() 

912 

913 # Create FieldInfo for hdf5_reference 

914 # Use a plain type for annotation (object) because FieldInfo.annotation expects a concrete type, 

915 # not a typing.Union; the default None and json_schema_extra still indicate optionality. 

916 hdf5_ref_field = FieldInfo( 

917 annotation=object, 

918 default=None, # Will be set later 

919 description="hdf5 internal reference", 

920 json_schema_extra={ 

921 "required": True, 

922 "units": None, 

923 "examples": ["<HDF5 Group Reference>"], 

924 }, 

925 ) 

926 

927 # Create an instance of the enhanced class to add the second field 

928 return enhanced_class.add_new_field("hdf5_reference", hdf5_ref_field)()