Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mt_metadata \ mt_metadata \ transfer_functions \ io \ edi \ metadata \ information.py: 91%

274 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-10 00:11 -0800

1# -*- coding: utf-8 -*- 

2""" 

3Created on Sat Dec 4 14:13:37 2021 

4 

5@author: jpeacock 

6""" 

7# ============================================================================= 

8# Imports 

9# ============================================================================= 

10from collections import OrderedDict 

11 

12from pydantic import Field, PrivateAttr 

13 

14from mt_metadata.base import MetadataBase 

15 

16 

17# ============================================================================== 

18# Info object 

19# ============================================================================== 

20class Information(MetadataBase): 

21 """ 

22 Contain, read, and write info section of .edi file 

23 

24 not much to really do here, but just keep it in the same format that it is 

25 read in as, except if it is in phoenix format then split the two paragraphs 

26 up so they are sequential. 

27 

28 """ 

29 

30 info_dict: dict[str, str | list | None] = Field( 

31 default_factory=dict, 

32 description="Dictionary of information lines from the info section", 

33 ) 

34 _phoenix_col_width: int = PrivateAttr(default=38) 

35 _phoenix_file: bool = PrivateAttr(default=False) 

36 _empower_file: bool = PrivateAttr(default=False) 

37 _phoenix_translation_dict: dict[str, str | list] = PrivateAttr( 

38 default_factory=lambda: { 

39 "survey": "survey.id", 

40 "company": "station.acquired_by.organization", 

41 "job": "survey.project", 

42 "hardware": "run.data_logger.model", 

43 "mtuprog version": "run.data_logger.firmware.version", 

44 "xpr weighting": "processing_parameter", 

45 "hx sen": "run.hx.sensor.id", 

46 "hy sen": "run.hy.sensor.id", 

47 "hz sen": "run.hz.sensor.id", 

48 "rx sen": "run.rrhx.sensor.id", 

49 "ry sen": "run.rrhy.sensor.id", 

50 "stn number": "station.id", 

51 "mtu-box serial number": "run.data_logger.id", 

52 "ex pot resist": "run.ex.contact_resistance.start", 

53 "ey pot resist": "run.ey.contact_resistance.start", 

54 "ex voltage": ["run.ex.ac.start", "run.ex.dc.start"], 

55 "ey voltage": ["run.ey.ac.start", "run.ey.dc.start"], 

56 "start-up": "station.time_period.start", 

57 "end-time": "station.time_period.end", 

58 } 

59 ) 

60 

61 _translation_dict: dict[str, str] = PrivateAttr( 

62 default_factory=lambda: { 

63 "operator": "run.acquired_by.author", 

64 "adu_serial": "run.data_logger.id", 

65 "e_azimuth": "run.ex.measurement_azimuth", 

66 "ex_len": "run.ex.dipole_length", 

67 "ey_len": "run.ey.dipole_length", 

68 "ex_resistance": "run.ex.contact_resistance.start", 

69 "ey_resistance": "run.ey.contact_resistance.start", 

70 "h_azimuth": "run.hx.measurement_azimuth", 

71 "hx": "run.hx.sensor.id", 

72 "hy": "run.hy.sensor.id", 

73 "hz": "run.hz.sensor.id", 

74 "hx_resistance": "run.hx.h_field_max.start", 

75 "hy_resistance": "run.hy.h_field_max.start", 

76 "hz_resistance": "run.hz.h_field_max.start", 

77 "algorithmname": "transfer_function.software.name", 

78 "ndec": "processing_parameter", 

79 "nfft": "processing_parameter", 

80 "ntype": "processing_parameter", 

81 "rrtype": "processing_parameter", 

82 "removelargelines": "processing_parameter", 

83 "rotmaxe": "processing_parameter", 

84 "project": "survey.project", 

85 "processedby": "transfer_function.processed_by.name", 

86 "processingsoftware": "transfer_function.software.name", 

87 "processingtag": "transfer_function.id", 

88 "signconvention": "transfer_function.sign_convention", 

89 "sitename": "station.geographic_name", 

90 "survey": "survey.id", 

91 "year": "survey.time_period.start_date", 

92 "runlist": "transfer_function.runs_processed", 

93 "remotesite": "transfer_function.remote_references", 

94 "remoteref": "transfer_function.processing_parameters", 

95 } 

96 ) 

97 _empower_translation_dict: dict[str, str] = PrivateAttr( 

98 default_factory=lambda: { 

99 "processingsoftware": "transfer_function.software.name", 

100 "sitename": "station.geographic_name", 

101 "year": "survey.time_period.start_date", 

102 "process_date": "transfer_function.processed_date", 

103 "declination": "station.location.declination.value", 

104 "tag": "component", 

105 "length": "dipole_length", 

106 "ac": "ac.end", 

107 "dc": "dc.end", 

108 "negative res": "contact_resistance.start", 

109 "negative_res": "contact_resistance.start", 

110 "positive res": "contact_resistance.end", 

111 "positive_res": "contact_resistance.end", 

112 "sensor type": "sensor.model", 

113 "sensor_type": "sensor.model", 

114 "detected sensor type": "sensor.model", 

115 "azimuth": "measured_azimuth", 

116 "sensor serial": "sensor.id", 

117 "sensor_serial": "sensor.id", 

118 "cal name": "comments", 

119 "cal_name": "comments", 

120 "saturation": "comments", 

121 "instrument type": "data_logger.model", 

122 "station name": "geographic_name", 

123 "operator": "acquired_by.author", 

124 "recording id": "id", 

125 "min value": "comments", 

126 "max value": "comments", 

127 } 

128 ) 

129 

130 def __str__(self): 

131 return "".join(self.write_info()) 

132 

133 def __repr__(self): 

134 return self.__str__() 

135 

136 def read_info(self, edi_lines: list[str]) -> None: 

137 """ 

138 Read information section and parse directly to info_dict. 

139 

140 Parameters 

141 ---------- 

142 edi_lines : list[str] 

143 List of lines from the EDI file. 

144 """ 

145 self.info_dict = OrderedDict() 

146 self._phoenix_file = False 

147 self._empower_file = False 

148 

149 # 1. Identify the info section and detect format in a single pass 

150 info_section = [] 

151 info_started = False 

152 

153 for line in edi_lines: 

154 line = line.strip() 

155 

156 # Check for start/end markers 

157 if ">info" in line.lower(): 

158 info_started = True 

159 continue 

160 elif info_started and line and line[0] == ">": 

161 break 

162 

163 # Collect info lines for processing 

164 if info_started and line: 

165 # Detect format while collecting 

166 if "run information" in line.lower(): 

167 self._phoenix_file = True 

168 elif ( 

169 ("empower" in line.lower() and "v" in line.lower()) 

170 or "electrics" in line.lower() 

171 or "magnetics" in line.lower() 

172 ): 

173 self._empower_file = True 

174 

175 info_section.append(line) 

176 

177 # 2. Parse lines based on detected format 

178 if self._empower_file: 

179 self._parse_empower_info(info_section) 

180 self._comments_to_string() 

181 elif self._phoenix_file: 

182 self._parse_phoenix_info(info_section) 

183 self._comments_to_string() 

184 else: 

185 self._parse_standard_info(info_section) 

186 self._comments_to_string() 

187 

188 def _comments_to_string(self) -> None: 

189 """Convert list comments to a single string.""" 

190 for key, value in self.info_dict.items(): 

191 if "comment" in key and isinstance(value, list): 

192 self.info_dict[key] = ",".join(value) 

193 

194 def _get_separator(self, line: str) -> str | None: 

195 """Find the key-value separator in a line.""" 

196 sep = None 

197 if line.count(":") > 0 and line.count("=") > 0: 

198 if line.find(":") < line.find("="): 

199 sep = ":" 

200 else: 

201 sep = "=" 

202 

203 elif line.count(":") >= 1: 

204 sep = ":" 

205 # colon_find = line.find(":") 

206 elif line.count("=") >= 1: 

207 sep = "=" 

208 

209 return sep 

210 

211 def _parse_standard_info(self, info_lines: list[str]) -> None: 

212 """Parse standard format EDI info lines directly to info_dict.""" 

213 for line in info_lines: 

214 # Skip empty lines and section headers 

215 if not line or "<" in line or ">" in line: 

216 continue 

217 

218 # Get separator and parse key/value 

219 sep = self._get_separator(line) 

220 if not sep: 

221 self.info_dict[line.strip()] = "" 

222 continue 

223 

224 parts = line.split(sep, 1) 

225 if len(parts) != 2: 

226 continue 

227 

228 key = parts[0].strip().lower() 

229 value = parts[1].strip() 

230 

231 # Handle list values 

232 if value.startswith("[") and value.endswith("]"): 

233 value = [ 

234 v.strip() 

235 for v in value[1:-1] 

236 .replace(",", " ") 

237 .replace(";", " ") 

238 .replace(":", " ") 

239 .split() 

240 ] 

241 

242 # Apply translation dictionary 

243 std_key = self._translation_dict.get(key) 

244 

245 if std_key is not None: 

246 # Handle special processing parameters 

247 if std_key == "processing_parameter": 

248 tf_parameters = self.info_dict.get( 

249 "transfer_function.processing_parameters", [] 

250 ) 

251 if not isinstance(tf_parameters, list): 

252 tf_parameters = [tf_parameters] 

253 tf_parameters.append(f"{key}={value}") 

254 self.info_dict["transfer_function.processing_parameters"] = ( 

255 tf_parameters 

256 ) 

257 else: 

258 self.info_dict[std_key] = value 

259 else: 

260 # Store unrecognized keys with original name 

261 self.info_dict[key] = value 

262 

263 def _parse_phoenix_info(self, info_lines: list[str]) -> None: 

264 """Parse Phoenix format EDI info lines efficiently.""" 

265 for line in info_lines: 

266 # Process each line for potential multi-column content 

267 is_multi_column, columns = self._split_phoenix_columns(line) 

268 

269 for column in columns: 

270 sep = self._get_separator(column) 

271 if not sep: 

272 continue 

273 

274 parts = column.split(sep, 1) 

275 if len(parts) != 2: 

276 continue 

277 

278 key = parts[0].strip().lower() 

279 value = parts[1].strip() 

280 if value.count(" ") > 0: 

281 value = value.split(" ")[0].strip() # Apply Phoenix translation 

282 self._apply_phoenix_translation(key, value) 

283 

284 def _parse_empower_info(self, info_lines: list[str]) -> None: 

285 """ 

286 Parse Empower format EDI info lines efficiently. 

287 

288 Empower format has a hierarchical structure with sections for 

289 general info, electrics, magnetics, and reference stations. 

290 """ 

291 section = "general" 

292 component = None 

293 sub_section = None 

294 

295 # Process all lines and handle hierarchical structure 

296 for line in info_lines: 

297 original_line = line 

298 line = line.strip() 

299 

300 # Skip empty lines 

301 if not line: 

302 continue 

303 

304 # Get indentation level to understand hierarchy 

305 indent_level = len(original_line) - len(original_line.lstrip()) 

306 

307 # Check for main section headers (typically at low indentation) 

308 line_lower = line.lower() 

309 if indent_level <= 5: # Main sections are usually at low indentation 

310 if line_lower == "stations": 

311 section = "stations" 

312 continue 

313 elif line_lower == "electrics": 

314 section = "electrics" 

315 sub_section = "electrics" 

316 continue 

317 elif line_lower == "magnetics": 

318 section = "magnetics" 

319 sub_section = "magnetics" 

320 continue 

321 elif line_lower == "reference": 

322 section = "reference" 

323 sub_section = "reference" 

324 continue 

325 

326 # Component-level headers (e.g., "EX", "EY", "HX", "HY", etc.) 

327 if section in ["electrics", "magnetics", "reference"] or sub_section in [ 

328 "electrics", 

329 "magnetics", 

330 "reference", 

331 ]: 

332 # Check if this is a component header (no separator and matches component pattern) 

333 if self._get_separator(line) is None and line_lower in [ 

334 "ex", 

335 "ey", 

336 "hx", 

337 "hy", 

338 "hz", 

339 "rx", 

340 "ry", 

341 "e1", 

342 "e2", 

343 "h1", 

344 "h2", 

345 "h3", 

346 ]: # Components are typically more indented 

347 component = line_lower 

348 continue 

349 

350 # Regular key-value pairs 

351 sep = self._get_separator(line) 

352 if not sep: 

353 # Handle special cases for lines without separators 

354 if line_lower in ["editing workbench", "stations"]: 

355 section = line_lower.replace(" ", "_") 

356 continue 

357 

358 parts = line.split(sep, 1) 

359 if len(parts) != 2: 

360 continue 

361 

362 key = parts[0].strip().lower() 

363 value = parts[1].strip() 

364 

365 # Clean up value (remove units in brackets and degree symbol) 

366 if value.find("[") > 2: # need to avoid values that are lists 

367 value = value.replace("[", "").replace("]", "").split(",") 

368 if len(value) == 1: 

369 value = value[0].strip() 

370 value = value.split(" ")[0] # remove units 

371 else: 

372 value = ",".join(v.strip() for v in value) 

373 

374 value = value.replace("°", "").replace("Â", "").strip() 

375 

376 # Build the key based on section/component context 

377 std_key = self._get_empower_std_key(section, component, key, sub_section) 

378 

379 # special case handling 

380 if std_key: 

381 if "remote_references." in std_key: 

382 # skip these for now 

383 if ( 

384 "acquired_by" in std_key 

385 or "data_logger" in std_key 

386 or "author" in std_key 

387 ): 

388 continue 

389 if "azimuth" in std_key: 

390 # Only skip azimuth if it's in a problematic context, not for measured_azimuth 

391 if "measured_azimuth" not in std_key: 

392 continue 

393 if "component" in std_key: 

394 value = component 

395 if "hx" in std_key or "hy" in std_key or "hz" in std_key: 

396 if "acquired_by" in std_key or "data_logger" in std_key: 

397 # Handle author information for Hx/Hy/Hz 

398 std_key = ( 

399 std_key.replace(".hx.", ".") 

400 .replace(".hy.", ".") 

401 .replace(".hz.", ".") 

402 ) 

403 elif "ac" in std_key or "dc" in std_key: 

404 # Handle AC/DC values for Hx/Hy/Hz 

405 std_key = std_key.replace("ac", "comments").replace( 

406 "dc", "comments" 

407 ) 

408 

409 if "comments" in std_key: 

410 original_value = self.info_dict.get(std_key, []) 

411 if not isinstance(original_value, list): 

412 original_value = [] if not original_value else [original_value] 

413 original_value.append(f"{key}={value}") 

414 value = original_value 

415 elif "data_logger.model" in std_key: 

416 std_key = "run.data_logger.model" 

417 elif std_key.endswith(".id") and "sensor.id" not in std_key: 

418 # Only map recording IDs, not sensor IDs 

419 std_key = "run.id" 

420 elif "geographic_name" in std_key: 

421 if "remote_references" in std_key: 

422 std_key = "transfer_function.remote_references.geographic_name" 

423 else: 

424 std_key = "station.geographic_name" 

425 elif "author" in std_key: 

426 std_key = "run.acquired_by.author" 

427 self.info_dict[std_key] = value 

428 

429 else: 

430 # For unrecognized keys, store with section prefix 

431 if component: 

432 context_key = f"{section}.{component}.{key}" 

433 elif sub_section and sub_section != section: 

434 context_key = f"{sub_section}.{key}" 

435 elif section != "general": 

436 context_key = f"{section}.{key}" 

437 else: 

438 context_key = key 

439 self.info_dict[context_key] = value 

440 

441 def _get_empower_std_key( 

442 self, 

443 section: str, 

444 component: str | None, 

445 key: str, 

446 sub_section: str | None = None, 

447 ) -> str | None: 

448 """ 

449 Get standardized key for Empower format based on section and component context. 

450 

451 Parameters 

452 ---------- 

453 section : str 

454 Current section ("general", "electrics", "magnetics", "reference", etc.) 

455 component : str 

456 Current component (e.g., "ex", "ey", "hx", "hy", "hz", "rx", "ry", None) 

457 key : str 

458 Original key name 

459 sub_section : str, optional 

460 Sub-section for additional context 

461 

462 Returns 

463 ------- 

464 str or None 

465 Standardized key name or None if no mapping found 

466 """ 

467 # Handle general section keys 

468 if section == "general": 

469 mapped_key = self._empower_translation_dict.get(key) 

470 if mapped_key: 

471 return mapped_key 

472 return None 

473 

474 # Handle component-specific keys 

475 if not component: 

476 # Handle section-level keys without component context 

477 mapped_key = self._empower_translation_dict.get(key) 

478 if mapped_key: 

479 if section == "reference": 

480 return f"transfer_function.remote_references.{mapped_key}" 

481 elif sub_section: 

482 return f"run.{mapped_key}" 

483 else: 

484 return mapped_key 

485 return None 

486 

487 # Map component names to standard names 

488 component_map = { 

489 "ex": "ex", 

490 "ey": "ey", 

491 "hx": "hx", 

492 "hy": "hy", 

493 "hz": "hz", 

494 "rx": "rrhx", # Remote reference components 

495 "ry": "rrhy", 

496 "e1": "ex", # Alternative naming 

497 "e2": "ey", 

498 "h1": "hx", 

499 "h2": "hy", 

500 "h3": "hz", 

501 } 

502 

503 std_component = component_map.get(component, component) 

504 

505 # Create run-prefixed attribute key 

506 attribute_key = self._empower_translation_dict.get(key) 

507 if attribute_key: 

508 if section == "reference": 

509 return f"transfer_function.remote_references.{std_component}.{attribute_key}" 

510 else: 

511 return f"run.{std_component}.{attribute_key}" 

512 

513 # Handle special cases for comments field 

514 if key in ["cal name", "cal_name", "saturation", "min value", "max value"]: 

515 # Append to comments field 

516 if section == "reference": 

517 return f"transfer_function.remote_references.{std_component}.comments" 

518 else: 

519 return f"run.{std_component}.comments" 

520 

521 # Default case: use run.component.key format 

522 if section == "reference": 

523 return f"transfer_function.remote_references.{std_component}.{key}" 

524 else: 

525 return f"run.{std_component}.{key}" 

526 

527 def _split_phoenix_columns(self, line: str) -> tuple[bool, list[str]]: 

528 """ 

529 Split Phoenix line into columns based on whitespace gaps and separators. 

530 Returns (is_multi_column, list_of_columns) 

531 """ 

532 import re 

533 

534 # Check for basic indicators first 

535 if not line or len(line) < 10: 

536 return False, [line] 

537 

538 # Look for patterns that indicate multi-column format 

539 parts = [(m.group(), m.start()) for m in re.finditer(r"\S+", line)] 

540 

541 if len(parts) < 4: # Need at least 4 words for two key-value pairs 

542 return False, [line] 

543 

544 # Calculate word gaps 

545 gaps = [ 

546 parts[i + 1][1] - (parts[i][1] + len(parts[i][0])) 

547 for i in range(len(parts) - 1) 

548 ] 

549 

550 # Find the largest gap 

551 if not gaps: 

552 return False, [line] 

553 

554 max_gap = max(gaps) 

555 if max_gap <= 3: # Too small to be a column separator 

556 return False, [line] 

557 

558 max_gap_idx = gaps.index(max_gap) 

559 split_pos = parts[max_gap_idx + 1][1] 

560 

561 # Check if we have key-value pairs on both sides 

562 left_text = line[:split_pos].strip() 

563 right_text = line[split_pos:].strip() 

564 

565 # Verify both columns have separators 

566 left_has_sep = ":" in left_text or "=" in left_text 

567 right_has_sep = ":" in right_text or "=" in right_text 

568 

569 if left_has_sep and right_has_sep: 

570 return True, [left_text, right_text] 

571 

572 return False, [line] 

573 

574 def _apply_phoenix_translation(self, key: str, value: str) -> None: 

575 """Apply Phoenix-specific translations and handle special cases.""" 

576 

577 # Remove units for resistance values 

578 if "Pot Resist".lower() in key.lower() and isinstance(value, str): 

579 value = value.split()[0] 

580 

581 # Handle voltage with AC/DC 

582 if "voltage" in key.lower() and isinstance(value, str): 

583 comps = value.replace(" ", "").split(",") 

584 for comp in comps: 

585 if "=" in comp: 

586 typ, val = comp.split("=") 

587 typ = typ.lower() 

588 val = val.replace("mV", "") 

589 std_key = f"run.{key[0:2].lower()}.{typ}.start" 

590 self.info_dict[std_key] = val 

591 return 

592 

593 std_key = self._phoenix_translation_dict.get(key.lower(), "phoenix_attribute") 

594 if std_key: 

595 if isinstance(std_key, list): 

596 for kk in std_key: 

597 self.info_dict[kk] = value 

598 else: 

599 self.info_dict[std_key] = value 

600 # Add Phoenix sensor metadata for Hx/Hy/Hz 

601 if " sen" in key.lower(): 

602 comp = key.lower().split()[0] 

603 self.info_dict[f"{comp}.sensor.manufacturer"] = "Phoenix Geophysics" 

604 self.info_dict[f"{comp}.sensor.type"] = "Induction Coil" 

605 else: 

606 self.info_dict[key] = value 

607 

608 def write_info(self) -> list[str]: 

609 """ 

610 write out information 

611 """ 

612 

613 info_lines = [">INFO\n"] 

614 

615 for key, value in self.info_dict.items(): 

616 if key is None: 

617 continue 

618 if value in ["", None]: 

619 info_lines.append(f"{' '*4}{key}\n") 

620 continue 

621 if isinstance(value, list): 

622 value = f"[{', '.join(value)}]" 

623 elif isinstance(value, str): 

624 value = value.strip() 

625 info_lines.append(f"{' '*4}{key}={value}\n") 

626 

627 return info_lines