Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mt_metadata \ mt_metadata \ base \ helpers.py: 70%

451 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-10 00:11 -0800

1# -*- coding: utf-8 -*- 

2""" 

3Created on Wed Dec 23 20:37:52 2020 

4 

5:copyright: 

6 Jared Peacock (jpeacock@usgs.gov) 

7 

8:license: MIT 

9 

10""" 

11import hashlib 

12import json 

13import logging 

14import os 

15 

16# ============================================================================= 

17# Imports 

18# ============================================================================= 

19import textwrap 

20from collections import defaultdict, OrderedDict 

21from collections.abc import MutableMapping 

22from operator import itemgetter 

23from pathlib import Path 

24from threading import RLock 

25from typing import Any, Dict 

26from xml.dom import minidom 

27from xml.etree import cElementTree as et 

28 

29import numpy as np 

30from loguru import logger 

31from pydantic import BaseModel 

32from pydantic.fields import FieldInfo 

33from pydantic_core import PydanticUndefined 

34 

35filter_descriptions = { 

36 "zpk": "poles and zeros filter", 

37 "coefficient": "coefficient filter", 

38 "time delay": "time delay filter", 

39 "fir": "finite impaulse response filter", 

40 "fap": "frequency amplitude phase lookup table", 

41 "frequency response table": "frequency amplitude phase lookup table", 

42 "base": "base filter", 

43} 

44 

45# ============================================================================= 

46# write doc strings 

47# ============================================================================= 

48 

49 

50def wrap_description(description, column_width): 

51 """ 

52 split a description into separate lines 

53 """ 

54 if isinstance(description, list): 

55 description = " ".join([str(d) for d in description]) 

56 description = description.strip() 

57 elif not isinstance(description, str): 

58 description = str(description) 

59 description = description.strip() 

60 d_lines = textwrap.wrap(description, column_width) 

61 if len(d_lines) < 11: 

62 d_lines += [""] * (11 - len(d_lines)) 

63 return d_lines 

64 

65 

66def validate_c1(attr_dict, c1): 

67 """ 

68 Validate column 1 width based on attribute dictionary 

69 

70 Parameters 

71 ---------- 

72 attr_dict : dict 

73 DESCRIPTION 

74 c1 : int 

75 DESCRIPTION 

76 

77 Returns 

78 ------- 

79 int 

80 DESCRIPTION 

81 """ 

82 try: 

83 max_c1 = max([len(key) for key in attr_dict.keys()]) 

84 

85 if max_c1 > (c1 - 4): 

86 c1 = max_c1 + 6 

87 except ValueError: 

88 pass 

89 

90 return c1 

91 

92 

93def write_lines(field_dict, c1=45, c2=45, c3=15): 

94 """ 

95 Takes a dictionary of field names to FieldInfo objects and parses it into a table 

96 Returns a string representation of this table. This overwrites the doc. 

97 

98 Parameters 

99 ---------- 

100 field_dict : dict 

101 dictionary mapping field names to FieldInfo objects 

102 c1 : int, optional 

103 column 1 width, by default 45 

104 c2 : int, optional 

105 column 2 width, by default 45 

106 c3 : int, optional 

107 column 3 width, by default 15 

108 

109 Returns 

110 ------- 

111 str 

112 doc string 

113 """ 

114 c1 = validate_c1(field_dict, c1) 

115 

116 line = " | {0:<{1}}| {2:<{3}} | {4:<{5}}|" 

117 hline = " +{0}+{1}+{2}+".format( 

118 "-" * (c1 + 1), "-" * (c2 + 2), "-" * (c3 + 1) 

119 ) 

120 mline = " +{0}+{1}+{2}+".format( 

121 "=" * (c1 + 1), "=" * (c2 + 2), "=" * (c3 + 1) 

122 ) 

123 

124 lines = [ 

125 hline, 

126 line.format("**Metadata Key**", c1, "**Description**", c2, "**Example**", c3), 

127 mline, 

128 ] 

129 

130 for key, field_info in field_dict.items(): 

131 if isinstance(field_info, logging.Logger): 

132 continue 

133 

134 # Extract description from FieldInfo 

135 description = field_info.description or "" 

136 d_lines = wrap_description(description, c2) 

137 

138 # Extract examples from json_schema_extra 

139 examples = "" 

140 if field_info.json_schema_extra and isinstance( 

141 field_info.json_schema_extra, dict 

142 ): 

143 examples = field_info.json_schema_extra.get("examples", "") 

144 e_lines = wrap_description(examples, c3) 

145 

146 # Get required status 

147 required = "False" 

148 if field_info.json_schema_extra and isinstance( 

149 field_info.json_schema_extra, dict 

150 ): 

151 required = str(field_info.json_schema_extra.get("required", False)) 

152 

153 # Get units 

154 units = "" 

155 if field_info.json_schema_extra and isinstance( 

156 field_info.json_schema_extra, dict 

157 ): 

158 units = str(field_info.json_schema_extra.get("units", "")) 

159 

160 # Get type from annotation 

161 field_type = str(field_info.annotation) if field_info.annotation else "string" 

162 

163 # Get style 

164 style = "free form" 

165 if field_info.json_schema_extra and isinstance( 

166 field_info.json_schema_extra, dict 

167 ): 

168 style = field_info.json_schema_extra.get("style", "free form") 

169 

170 # line 1 is with the entry 

171 lines.append(line.format(f"**{key}**", c1, d_lines[0], c2, e_lines[0], c3)) 

172 # line 2 skip an entry in the 

173 lines.append(line.format("", c1, d_lines[1], c2, e_lines[1], c3)) 

174 # line 3 required 

175 lines.append( 

176 line.format( 

177 f"Required: {required}", 

178 c1, 

179 d_lines[2], 

180 c2, 

181 e_lines[2], 

182 c3, 

183 ) 

184 ) 

185 # line 4 blank 

186 lines.append(line.format("", c1, d_lines[3], c2, e_lines[3], c3)) 

187 

188 # line 5 units 

189 lines.append(line.format(f"Units: {units}", c1, d_lines[4], c2, e_lines[4], c3)) 

190 

191 # line 6 blank 

192 lines.append(line.format("", c1, d_lines[5], c2, e_lines[5], c3)) 

193 

194 # line 7 type 

195 lines.append( 

196 line.format(f"Type: {field_type}", c1, d_lines[6], c2, e_lines[6], c3) 

197 ) 

198 

199 # line 8 blank 

200 lines.append(line.format("", c1, d_lines[7], c2, e_lines[7], c3)) 

201 

202 # line 9 style 

203 lines.append(line.format(f"Style: {style}", c1, d_lines[8], c2, e_lines[8], c3)) 

204 

205 # line 10 blank 

206 lines.append(line.format("", c1, d_lines[9], c2, e_lines[9], c3)) 

207 

208 # Handle default value - similar to write_block 

209 default_value = field_info.default 

210 if default_value is PydanticUndefined: 

211 if ( 

212 field_info.default_factory is not None 

213 and field_info.default_factory is not PydanticUndefined 

214 ): 

215 try: 

216 # Some default factories may require arguments, handle both cases 

217 if callable(field_info.default_factory): 

218 try: 

219 default_value = field_info.default_factory() 

220 except TypeError: 

221 # If it needs arguments, we can't call it 

222 default_value = f"<{field_info.default_factory.__name__}>" 

223 else: 

224 default_value = field_info.default_factory 

225 # If it's a complex object, just show the type name 

226 if not isinstance( 

227 default_value, (str, int, float, bool, type(None)) 

228 ): 

229 default_value = type(default_value).__name__ 

230 except Exception: 

231 default_value = None 

232 else: 

233 default_value = None 

234 

235 default = [str(default_value)] + [""] * 5 

236 if len(str(default_value)) > c1 - 15: 

237 default = [""] + wrap_description(str(default_value), c1) 

238 

239 # line 9 type 

240 lines.append( 

241 line.format( 

242 f"**Default**: {default[0]}", 

243 c1, 

244 d_lines[8], 

245 c2, 

246 e_lines[8], 

247 c3, 

248 ) 

249 ) 

250 

251 # line 10 blank 

252 lines.append(line.format(default[1], c1, d_lines[9], c2, e_lines[9], c3)) 

253 

254 # line 9 type 

255 lines.append(line.format(default[2], c1, d_lines[10], c2, e_lines[10], c3)) 

256 

257 # line 10 blank 

258 if len(d_lines) > 11: 

259 lines.append(line.format(default[3], c1, d_lines[11], c2, "", c3)) 

260 for index, d_line in enumerate(d_lines[12:], 4): 

261 try: 

262 lines.append(line.format(default[index], c1, d_line, c2, "", c3)) 

263 except IndexError: 

264 lines.append(line.format("", c1, d_line, c2, "", c3)) 

265 

266 # long default value 

267 if len(default) > 7: 

268 lines.append(line.format(default[3], c1, "", c2, "", c3)) 

269 for index, d_line in enumerate(default[4:], 12): 

270 try: 

271 lines.append(line.format(d_line, c1, d_lines[index], c2, "", c3)) 

272 except IndexError: 

273 lines.append(line.format(d_line, c1, "", c2, "", c3)) 

274 lines.append(hline) 

275 return "\n".join(lines) 

276 

277 

278def write_block(key, field_info: FieldInfo, c1=45, c2=45, c3=15): 

279 """ 

280 

281 :param key: key to write from attr dict 

282 :type key: string 

283 :param field_info: field information dictionary 

284 :type field_info: dict 

285 :param c1: column 1 width, defaults to 45 

286 :type c1: int, optional 

287 :param c2: column 2 width, defaults to 45 

288 :type c2: int, optional 

289 :param c3: column 3 width, defaults to 15 

290 :type c3: int, optional 

291 :return: list of lines 

292 :rtype: list 

293 

294 """ 

295 if len(key) > c1 - 4: 

296 c1 = len(key) + 6 

297 

298 line = " | {0:<{1}}| {2:<{3}} | {4:<{5}}|" 

299 hline = " +{0}+{1}+{2}+".format( 

300 "-" * (c1 + 1), "-" * (c2 + 2), "-" * (c3 + 1) 

301 ) 

302 mline = " +{0}+{1}+{2}+".format( 

303 "=" * (c1 + 1), "=" * (c2 + 2), "=" * (c3 + 1) 

304 ) 

305 section = f":navy:`{key}`" 

306 

307 lines = [ 

308 section, 

309 "~" * len(section), 

310 "", 

311 ".. container::", 

312 "", 

313 " .. table::", 

314 " :class: tight-table", 

315 f" :widths: {c1} {c2} {c3}", 

316 "", 

317 hline, 

318 line.format(f"**{key}**", c1, "**Description**", c2, "**Example**", c3), 

319 mline, 

320 ] 

321 

322 t_lines = wrap_description(field_info.annotation, c1 - 10) 

323 d_lines = wrap_description(field_info.description, c2) 

324 

325 # Safely get examples from json_schema_extra 

326 examples = "" 

327 if field_info.json_schema_extra and isinstance(field_info.json_schema_extra, dict): 

328 examples = field_info.json_schema_extra.get("examples", "") 

329 e_lines = wrap_description(examples, c3) 

330 

331 # Safely get required and units 

332 required = "False" 

333 units = "" 

334 if field_info.json_schema_extra and isinstance(field_info.json_schema_extra, dict): 

335 required = str(field_info.json_schema_extra.get("required", False)) 

336 units = str(field_info.json_schema_extra.get("units", "")) 

337 

338 # line 1 is with the entry 

339 lines.append( 

340 line.format( 

341 f"**Required**: {required}", 

342 c1, 

343 d_lines[0], 

344 c2, 

345 e_lines[0], 

346 c3, 

347 ) 

348 ) 

349 # line 2 skip an entry in the 

350 lines.append(line.format("", c1, d_lines[1], c2, e_lines[1], c3)) 

351 # line 3 required 

352 lines.append( 

353 line.format( 

354 f"**Units**: {units}", 

355 c1, 

356 d_lines[2], 

357 c2, 

358 e_lines[2], 

359 c3, 

360 ) 

361 ) 

362 # line 4 blank 

363 lines.append(line.format("", c1, d_lines[3], c2, e_lines[3], c3)) 

364 

365 # line 5 units 

366 lines.append( 

367 line.format( 

368 f"**Type**: {t_lines[0]}", 

369 c1, 

370 d_lines[4], 

371 c2, 

372 e_lines[4], 

373 c3, 

374 ) 

375 ) 

376 

377 # line 6 blank 

378 lines.append(line.format(t_lines[1], c1, d_lines[5], c2, e_lines[5], c3)) 

379 

380 # line 7 - continuation of type if needed 

381 lines.append(line.format(t_lines[2], c1, d_lines[6], c2, e_lines[6], c3)) 

382 

383 # Add additional lines if type annotation is very long (more than 2 lines) 

384 if len(t_lines) > 3: 

385 for i in range(3, min(len(t_lines), 6)): # Add up to 3 more lines for type 

386 desc_index = 7 + (i - 3) 

387 example_index = 7 + (i - 3) 

388 lines.append( 

389 line.format( 

390 t_lines[i], 

391 c1, 

392 d_lines[desc_index] if desc_index < len(d_lines) else "", 

393 c2, 

394 e_lines[example_index] if example_index < len(e_lines) else "", 

395 c3, 

396 ) 

397 ) 

398 

399 # line 8+ blank (adjust based on how many type lines we added) 

400 type_lines_used = min(len(t_lines), 6) 

401 desc_start_index = 5 + type_lines_used - 3 # Adjust description index 

402 example_start_index = 5 + type_lines_used - 3 # Adjust example index 

403 

404 lines.append( 

405 line.format( 

406 "", 

407 c1, 

408 d_lines[desc_start_index] if desc_start_index < len(d_lines) else "", 

409 c2, 

410 e_lines[example_start_index] if example_start_index < len(e_lines) else "", 

411 c3, 

412 ) 

413 ) 

414 

415 # Handle default value - always convert to string 

416 default_value = field_info.default 

417 if default_value is PydanticUndefined: 

418 if ( 

419 field_info.default_factory is not None 

420 and field_info.default_factory is not PydanticUndefined 

421 ): 

422 try: 

423 # Some default factories may require arguments, handle both cases 

424 if callable(field_info.default_factory): 

425 try: 

426 default_value = field_info.default_factory() 

427 except TypeError: 

428 # If it needs arguments, we can't call it 

429 default_value = f"<{field_info.default_factory.__name__}>" 

430 else: 

431 default_value = field_info.default_factory 

432 # If it's a complex object, just show the type name 

433 if not isinstance(default_value, (str, int, float, bool, type(None))): 

434 default_value = type(default_value).__name__ 

435 except Exception: 

436 default_value = "None" 

437 else: 

438 default_value = "None" 

439 

440 # Ensure default_value is always a string 

441 default_value_str = str(default_value) 

442 

443 # Handle special cases for display 

444 if default_value_str == "": 

445 default_value_str = '""' # Show empty string explicitly 

446 elif default_value_str == "None": 

447 default_value_str = "None" # Keep None as is 

448 

449 # Wrap default value if it's too long 

450 if len(default_value_str) > c1 - 15: 

451 default_lines = wrap_description(default_value_str, c1) 

452 default = [""] + default_lines 

453 else: 

454 default = [default_value_str] + [""] * 10 # Ensure we have enough empty strings 

455 

456 # Ensure we have at least 11 items in default list 

457 while len(default) < 11: 

458 default.append("") 

459 

460 # Calculate the description and example line indices for the default section 

461 # Account for the additional type lines we may have added 

462 default_desc_start = 8 + max( 

463 0, min(len(t_lines) - 3, 3) 

464 ) # Start after type section 

465 default_example_start = 8 + max(0, min(len(t_lines) - 3, 3)) 

466 

467 # line N - Default value (where N depends on type length) 

468 lines.append( 

469 line.format( 

470 f"**Default**: {default[0]}", 

471 c1, 

472 d_lines[default_desc_start] if default_desc_start < len(d_lines) else "", 

473 c2, 

474 ( 

475 e_lines[default_example_start] 

476 if default_example_start < len(e_lines) 

477 else "" 

478 ), 

479 c3, 

480 ) 

481 ) 

482 

483 # line N+1 - continuation of default/description 

484 lines.append( 

485 line.format( 

486 default[1], 

487 c1, 

488 ( 

489 d_lines[default_desc_start + 1] 

490 if (default_desc_start + 1) < len(d_lines) 

491 else "" 

492 ), 

493 c2, 

494 ( 

495 e_lines[default_example_start + 1] 

496 if (default_example_start + 1) < len(e_lines) 

497 else "" 

498 ), 

499 c3, 

500 ) 

501 ) 

502 

503 # line N+2 - continuation of default/description 

504 lines.append( 

505 line.format( 

506 default[2], 

507 c1, 

508 ( 

509 d_lines[default_desc_start + 2] 

510 if (default_desc_start + 2) < len(d_lines) 

511 else "" 

512 ), 

513 c2, 

514 ( 

515 e_lines[default_example_start + 2] 

516 if (default_example_start + 2) < len(e_lines) 

517 else "" 

518 ), 

519 c3, 

520 ) 

521 ) 

522 

523 # Handle additional description lines if they exist 

524 if len(d_lines) > 11: 

525 lines.append(line.format(default[3], c1, d_lines[11], c2, "", c3)) 

526 for index, d_line in enumerate(d_lines[12:], 4): 

527 if index < len(default): 

528 lines.append(line.format(default[index], c1, d_line, c2, "", c3)) 

529 else: 

530 lines.append(line.format("", c1, d_line, c2, "", c3)) 

531 

532 # Handle long default values that span multiple lines 

533 if len(default) > 4: 

534 start_index = 4 

535 # Only add additional lines if we haven't already handled them above 

536 if len(d_lines) <= 11: 

537 for index, default_line in enumerate(default[start_index:], start_index): 

538 if default_line.strip(): # Only add non-empty default lines 

539 lines.append(line.format(default_line, c1, "", c2, "", c3)) 

540 

541 lines.append(hline) 

542 lines.append("") 

543 

544 return lines 

545 

546 

547# code to convert ini_dict to flattened dictionary 

548# default seperater '_' 

549def flatten_dict(meta_dict, parent_key=None, sep="."): 

550 """ 

551 

552 :param meta_dict: DESCRIPTION 

553 :type meta_dict: TYPE 

554 :param parent_key: DESCRIPTION, defaults to None 

555 :type parent_key: TYPE, optional 

556 :param sep: DESCRIPTION, defaults to '.' 

557 :type sep: TYPE, optional 

558 :return: DESCRIPTION 

559 :rtype: TYPE 

560 

561 """ 

562 items = [] 

563 for key, value in meta_dict.items(): 

564 if parent_key: 

565 new_key = f"{parent_key}{sep}{key}" 

566 else: 

567 new_key = key 

568 if isinstance(value, MutableMapping): 

569 items.extend(flatten_dict(value, new_key, sep=sep).items()) 

570 else: 

571 items.append((new_key, value)) 

572 return dict(items) 

573 

574 

575def flatten_list(x_list): 

576 """ 

577 Flatten a nested list 

578 flatten = lambda l: [item for sublist in l for item in sublist] 

579 

580 Returns 

581 ------- 

582 None. 

583 

584 """ 

585 

586 flat_list = [item for sublist in x_list for item in sublist] 

587 

588 return flat_list 

589 

590 

591def recursive_split_dict(key, value, remainder, sep="."): 

592 """ 

593 recursively split a dictionary 

594 

595 :param key: DESCRIPTION 

596 :type key: TYPE 

597 :param value: DESCRIPTION 

598 :type value: TYPE 

599 :param remainder: DESCRIPTION 

600 :type remainder: TYPE 

601 :return: DESCRIPTION 

602 :rtype: TYPE 

603 

604 """ 

605 

606 key, *other = key.split(sep, 1) 

607 if other: 

608 recursive_split_dict(other[0], value, remainder.setdefault(key, {})) 

609 else: 

610 remainder[key] = value 

611 

612 

613def get_by_alias(model, alias_name): 

614 # Find the field name that corresponds to the given alias 

615 # Use __pydantic_fields__ instead of model_fields (which is deprecated) 

616 for field_name, field_info in model.__pydantic_fields__.items(): 

617 if field_info.alias == alias_name: 

618 return getattr(model, field_name) 

619 return None 

620 

621 

622# def get_alias_key(model, key: str) -> str: 

623# """ 

624# Try to find an alias for a field name in a Pydantic BaseModel 

625 

626# Parameters 

627# ---------- 

628# model : BaseModel 

629# The Pydantic model to search for the field 

630# key : str 

631# The field name to find the alias for 

632 

633# Returns 

634# ------- 

635# str or None 

636# The alias name if found, None otherwise 

637# """ 

638# try: 

639# field_info = model.__pydantic_fields__.get(key) 

640# if field_info.validation_alias: 

641 

642# if field_info and field_info.alias: 

643# return field_info.alias 

644# return key # Return the original key if no alias found 

645# except (AttributeError, KeyError): 

646# return key # Return the original key if any errors occur 

647 

648 

649def recursive_split_getattr(base_object, name, sep="."): 

650 key, *other = name.split(sep, 1) 

651 

652 if other: 

653 base_object = getattr(base_object, key) 

654 value, prop = recursive_split_getattr(base_object, other[0]) 

655 else: 

656 # with Pydantic, if the attribute does not exist an attribute error 

657 # will be raised, which is desired. The only issue will be if the 

658 # attribute is an alias, then TODO create a get from alias method. 

659 try: 

660 value = getattr(base_object, key) 

661 except AttributeError: 

662 value = None 

663 prop = False 

664 try: 

665 if isinstance(getattr(type(base_object), key), property): 

666 prop = True 

667 except AttributeError: 

668 prop = False 

669 return value, prop 

670 

671 

672def recursive_split_setattr(base_object, name, value, sep=".", skip_validation=False): 

673 """ 

674 Recursively split a name and set the value of the last key. Recursion splits on the separator present in the name. 

675 

676 :param base_object: The object having its attribute set, or a "parent" object in the recursive/nested scenario 

677 :type base_object: object 

678 :param name: The name of the attribute to set 

679 :type name: str 

680 :param value: The value to set the attribute to 

681 :type value: any 

682 :param sep: The separator to split the name on, defaults to "." 

683 :type sep: str, optional 

684 :param skip_validation: Whether to skip validation/parse of the attribute, defaults to False 

685 :type skip_validation: Optional[bool] 

686 

687 :return: None 

688 :rtype: NoneType 

689 

690 """ 

691 key, *other = name.split(sep, 1) 

692 

693 if other: 

694 base_object = getattr(base_object, key) 

695 recursive_split_setattr(base_object, other[0], value) 

696 else: 

697 # if the value is a list or dict then we need to add accordingly 

698 if isinstance(value, list): 

699 if len(value) == 0: 

700 value = [] 

701 elif isinstance(value[0], (dict, OrderedDict)): 

702 new_list = [] 

703 for obj_dict in value: 

704 obj_key = list(obj_dict.keys())[0] 

705 try: 

706 obj = base_object._objects_included[obj_key]() 

707 obj.from_dict(obj_dict) 

708 new_list.append(obj) 

709 except KeyError: 

710 raise KeyError( 

711 f"Could not find {obj_key} in {base_object._objects_included}" 

712 ) 

713 value = new_list 

714 

715 setattr(base_object, key, value) 

716 

717 

718def structure_dict(meta_dict, sep="."): 

719 """ 

720 

721 :param meta_dict: DESCRIPTION 

722 :type meta_dict: TYPE 

723 :param sep: DESCRIPTION, defaults to '.' 

724 :type sep: TYPE, optional 

725 :return: DESCRIPTION 

726 :rtype: TYPE 

727 

728 """ 

729 structured_dict = {} 

730 for key, value in meta_dict.items(): 

731 recursive_split_dict(key, value, structured_dict, sep=sep) 

732 return structured_dict 

733 

734 

735def get_units(name, attr_dict): 

736 """ """ 

737 try: 

738 units = attr_dict["json_schema_extra"]["units"] 

739 if not isinstance(units, str): 

740 units = "{0}".format(units) 

741 except KeyError: 

742 units = None 

743 if units in [None, "None", "none"]: 

744 return None 

745 return units 

746 

747 

748def get_type(name, attr_dict): 

749 """ """ 

750 try: 

751 v_type = attr_dict[name]["type"] 

752 if v_type in ["string", str, "str", "String"]: 

753 v_type = None 

754 except KeyError: 

755 v_type = None 

756 return v_type 

757 

758 

759def recursive_split_xml(element, item, base, name, attr_dict=None): 

760 """ """ 

761 key = None 

762 if isinstance(item, dict): 

763 for key, value in item.items(): 

764 attr_name = ".".join([base, key]) 

765 

766 sub_element = et.SubElement(element, key) 

767 recursive_split_xml(sub_element, value, attr_name, key, attr_dict) 

768 elif isinstance(item, (tuple, list)): 

769 for ii in item: 

770 sub_element = et.SubElement(element, "item") 

771 recursive_split_xml(sub_element, ii, base, name, attr_dict) 

772 elif isinstance(item, str): 

773 element.text = item 

774 elif item is None: 

775 # Leave element.text as None so XML has empty element (no text) 

776 pass 

777 elif isinstance(item, (float, int)): 

778 element.text = str(item) 

779 else: 

780 # if the value is an hdf5 reference make it a string 

781 if "reference" in str(type(item)).lower(): 

782 element.text = str(item) 

783 else: 

784 raise ValueError("Value cannot be {0}".format(type(item))) 

785 if attr_dict: 

786 units = get_units(base, attr_dict) 

787 if units: 

788 element.set("units", str(units)) 

789 # v_type = get_type(base, attr_dict) 

790 # if v_type: 

791 # element.set("type", v_type) 

792 return element, name 

793 

794 

795def dict_to_xml(meta_dict, attr_dict=None): 

796 """ 

797 Assumes dictionary is structured {class:{attribute_dict}} 

798 

799 :param meta_dict: DESCRIPTION 

800 :type meta_dict: TYPE 

801 :return: DESCRIPTION 

802 :rtype: TYPE 

803 

804 """ 

805 class_name = list(meta_dict.keys())[0] 

806 root = et.Element(class_name) 

807 

808 for key, value in meta_dict[class_name].items(): 

809 element = et.SubElement(root, key) 

810 recursive_split_xml(element, value, key, key, attr_dict) 

811 return root 

812 

813 

814def element_to_dict(element): 

815 """ 

816 

817 .. todo:: Add way to read in attritues like units and validate them. 

818 

819 :param element: DESCRIPTION 

820 :type element: TYPE 

821 :return: DESCRIPTION 

822 :rtype: TYPE 

823 

824 """ 

825 meta_dict = {element.tag: {} if element.attrib else None} 

826 children = list(element) 

827 if children: 

828 child_dict = defaultdict(list) 

829 for dc in map(element_to_dict, children): 

830 for k, v in dc.items(): 

831 child_dict[k].append(v) 

832 meta_dict = { 

833 element.tag: {k: v[0] if len(v) == 1 else v for k, v in child_dict.items()} 

834 } 

835 if "item" in meta_dict[element.tag].keys(): 

836 meta_dict[element.tag] = meta_dict[element.tag]["item"] 

837 # going to skip attributes for now, later can check them against 

838 # standards, neet to skip units and type 

839 if element.attrib: 

840 pop_units = False 

841 pop_type = False 

842 for k, v in element.attrib.items(): 

843 if k in ["units"]: 

844 if "type" in element.attrib.keys(): 

845 pop_type = True 

846 if len(element.attrib.keys()) <= 2: 

847 pop_units = True 

848 continue 

849 if k in ["type"]: 

850 if len(element.attrib.keys()) <= 1: 

851 if v in [ 

852 "float", 

853 "string", 

854 "integer", 

855 "boolean", 

856 "list", 

857 "tuple", 

858 ]: 

859 pop_type = True 

860 continue 

861 

862 meta_dict[element.tag][k] = v 

863 if pop_units: 

864 element.attrib.pop("units") 

865 if pop_type: 

866 element.attrib.pop("type") 

867 if element.text: 

868 text = element.text.strip() 

869 if children or element.attrib: 

870 if text: 

871 if len(element.attrib.keys()) > 0: 

872 meta_dict[element.tag]["value"] = text 

873 else: 

874 meta_dict[element.tag] = text 

875 else: 

876 meta_dict[element.tag] = text 

877 return OrderedDict(sorted(meta_dict.items(), key=itemgetter(0))) 

878 

879 

880def element_to_string(element): 

881 return ( 

882 minidom.parseString(et.tostring(element).decode()) 

883 .toprettyxml( 

884 indent=" ", 

885 encoding="UTF-8", 

886 ) 

887 .decode() 

888 ) 

889 

890 

891# ============================================================================= 

892# Helper function to be sure everything is encoded properly 

893# ============================================================================= 

894class NumpyEncoder(json.JSONEncoder): 

895 """ 

896 Need to encode numpy ints and floats for json to work 

897 """ 

898 

899 def default(self, obj): 

900 """ 

901 

902 :param obj: 

903 :type obj: 

904 :return: 

905 """ 

906 if isinstance( 

907 obj, 

908 ( 

909 np.int_, 

910 np.intc, 

911 np.intp, 

912 np.int8, 

913 np.int16, 

914 np.int32, 

915 np.int64, 

916 np.uint8, 

917 np.uint16, 

918 np.uint32, 

919 np.uint64, 

920 ), 

921 ): 

922 return int(obj) 

923 elif isinstance(obj, (np.float16, np.float32, np.float64)): 

924 return float(obj) 

925 elif isinstance(obj, (np.ndarray)): 

926 if obj.dtype == complex: 

927 return {"real": obj.real.tolist(), "imag": obj.imag.tolist()} 

928 else: 

929 return obj.tolist() 

930 # For now turn references into a generic string 

931 elif "h5" in str(type(obj)): 

932 return str(obj) 

933 elif hasattr(obj, "unicode_string"): 

934 return obj.unicode_string() 

935 elif isinstance(obj, Path): 

936 return str(obj) 

937 return json.JSONEncoder.default(self, obj) 

938 

939 

940def validate_name(name, pattern=None): 

941 """ 

942 Validate name 

943 

944 :param name: DESCRIPTION 

945 :type name: TYPE 

946 :param pattern: DESCRIPTION, defaults to None 

947 :type pattern: TYPE, optional 

948 :return: DESCRIPTION 

949 :rtype: TYPE 

950 

951 """ 

952 if name is None: 

953 return "unknown" 

954 return name.replace(" ", "_") 

955 

956 

957def has_numbers(text): 

958 """ 

959 Check if a string contains any numeric characters. 

960 

961 Parameters 

962 ---------- 

963 text : str 

964 The string to check for numeric characters. 

965 

966 Returns 

967 ------- 

968 bool 

969 True if the string contains any digits (0-9), False otherwise. 

970 

971 Examples 

972 -------- 

973 >>> has_numbers("abc123") 

974 True 

975 >>> has_numbers("hello") 

976 False 

977 >>> has_numbers("test1") 

978 True 

979 >>> has_numbers("") 

980 False 

981 """ 

982 if not isinstance(text, str): 

983 return False 

984 return any(char.isdigit() for char in text) 

985 

986 

987def is_numeric_string(text): 

988 """ 

989 Check if a string represents a valid number (int or float). 

990 

991 Parameters 

992 ---------- 

993 text : str 

994 The string to check if it represents a number. 

995 

996 Returns 

997 ------- 

998 bool 

999 True if the string can be converted to a number, False otherwise. 

1000 

1001 Examples 

1002 -------- 

1003 >>> is_numeric_string("123") 

1004 True 

1005 >>> is_numeric_string("12.34") 

1006 True 

1007 >>> is_numeric_string("-45.6") 

1008 True 

1009 >>> is_numeric_string("1.23e-4") 

1010 True 

1011 >>> is_numeric_string("abc") 

1012 False 

1013 >>> is_numeric_string("12abc") 

1014 False 

1015 """ 

1016 if not isinstance(text, str): 

1017 return False 

1018 

1019 # Handle empty string 

1020 if not text.strip(): 

1021 return False 

1022 

1023 try: 

1024 float(text) 

1025 return True 

1026 except ValueError: 

1027 return False 

1028 

1029 

1030def extract_numbers(text): 

1031 """ 

1032 Extract all numeric values from a string. 

1033 

1034 Parameters 

1035 ---------- 

1036 text : str 

1037 The string to extract numbers from. 

1038 

1039 Returns 

1040 ------- 

1041 list 

1042 List of float values found in the string. 

1043 

1044 Examples 

1045 -------- 

1046 >>> extract_numbers("abc123def45.6") 

1047 [123.0, 45.6] 

1048 >>> extract_numbers("no numbers here") 

1049 [] 

1050 >>> extract_numbers("1.5 and -2.3e4") 

1051 [1.5, -23000.0] 

1052 """ 

1053 import re 

1054 

1055 if not isinstance(text, str): 

1056 return [] 

1057 

1058 # Pattern to match integers, floats, and scientific notation 

1059 number_pattern = r"[-+]?(?:\d*\.?\d+(?:[eE][-+]?\d+)?)" 

1060 

1061 matches = re.findall(number_pattern, text) 

1062 

1063 numbers = [] 

1064 for match in matches: 

1065 try: 

1066 numbers.append(float(match)) 

1067 except ValueError: 

1068 continue 

1069 

1070 return numbers 

1071 

1072 

1073def requires(**requirements): 

1074 """Decorate a function with optional dependencies. 

1075 

1076 Parameters 

1077 ---------- 

1078 **requirements : obj 

1079 keywords of package name and the required object for 

1080 a function. 

1081 

1082 Returns 

1083 ------- 

1084 decorated_function : function 

1085 Original function if all soft dependencies are met, otherwise 

1086 it returns an empty function which prints why it is not running. 

1087 

1088 Examples 

1089 -------- 

1090 ``` 

1091 try: 

1092 import obspy 

1093 except ImportError: 

1094 obspy = None 

1095 

1096 @requires(obspy=obspy) 

1097 def obspy_function(): 

1098 ... 

1099 # does something using obspy 

1100 

1101 """ 

1102 # Check the requirements, add missing package name in the list `missing`. 

1103 missing = [] 

1104 for key, item in requirements.items(): 

1105 if not item: 

1106 missing.append(key) 

1107 

1108 def decorated_function(function): 

1109 """Wrap function.""" 

1110 if not missing: 

1111 return function 

1112 else: 

1113 

1114 def passer(*args, **kwargs): 

1115 logger.warning(f"Missing dependencies: {missing}.") 

1116 logger.warning(f"Not running `{function.__name__}`.") 

1117 

1118 return passer 

1119 

1120 return decorated_function 

1121 

1122 

1123def object_to_array(value, dtype=float): 

1124 """ 

1125 Convert a value to a numpy array. 

1126 

1127 Parameters 

1128 ---------- 

1129 value : any 

1130 The value to convert. 

1131 

1132 Returns 

1133 ------- 

1134 np.ndarray 

1135 The converted numpy array. 

1136 

1137 """ 

1138 if value is None: 

1139 return np.empty(0) 

1140 elif isinstance(value, (list, tuple)): 

1141 return np.array(value, dtype=dtype) 

1142 elif isinstance(value, np.ndarray): 

1143 return value.astype(dtype) 

1144 elif isinstance(value, str): 

1145 if value in ["", "none", "None"]: 

1146 return np.empty(0) 

1147 

1148 if not has_numbers(value) and not is_numeric_string(value): 

1149 msg = f"String input must be a single number or a list of numbers, not '{value}'" 

1150 raise TypeError(msg) 

1151 

1152 elif has_numbers(value) and not is_numeric_string(value): 

1153 value = extract_numbers(value) 

1154 return np.array(value, dtype=dtype) 

1155 

1156 if "j" in value and has_numbers(value): 

1157 dtype = complex 

1158 

1159 if "," in value: 

1160 separator = "," 

1161 else: 

1162 separator = " " # Use space as default separator for whitespace 

1163 

1164 try: 

1165 return np.fromstring(value, sep=separator, dtype=dtype) 

1166 

1167 except ValueError: 

1168 msg = ( 

1169 f"input values must be a list, tuple, or np.ndarray, not {type(value)}" 

1170 ) 

1171 raise TypeError(msg) 

1172 elif isinstance(value, (int, float)): 

1173 # Handle single numeric input 

1174 return np.array([float(value)], dtype=dtype) 

1175 elif isinstance(value, bytes): 

1176 # Handle bytes input (e.g., from binary files) 

1177 try: 

1178 return np.frombuffer(value, dtype=dtype) 

1179 except ValueError: 

1180 msg = ( 

1181 f"input values must be a list, tuple, or np.ndarray, not {type(value)}" 

1182 ) 

1183 raise TypeError(msg) 

1184 else: 

1185 msg = f"input values must be an list, tuple, or np.ndarray, not {type(value)}" 

1186 raise TypeError(msg) 

1187 

1188 

1189def _should_include_coordinate_field(field_name: str) -> bool: 

1190 """ 

1191 Helper function to determine if a coordinate field should be included 

1192 in to_dict output even when it has None/default values. 

1193 

1194 This ensures backward compatibility for coordinate fields that tests expect. 

1195 """ 

1196 coordinate_fields = { 

1197 "negative.x", 

1198 "negative.y", 

1199 "negative.z", 

1200 "positive.x2", 

1201 "positive.y2", 

1202 "positive.z2", 

1203 "location.x", 

1204 "location.y", 

1205 "location.z", 

1206 } 

1207 return field_name in coordinate_fields 

1208 

1209 

1210def _should_convert_none_to_empty_string(field_name: str) -> bool: 

1211 """ 

1212 Helper function to determine if a field should convert None to empty string 

1213 for backward compatibility. 

1214 """ 

1215 string_fields = { 

1216 "data_logger.firmware.author", 

1217 "provenance.software.author", 

1218 "provenance.software.version", 

1219 } 

1220 # Convert external URL None -> "" for backward compatibility in to_dict 

1221 string_fields.add("url") 

1222 return field_name in string_fields