Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mt_metadata \ mt_metadata \ transfer_functions \ io \ zonge \ metadata \ header.py: 79%

212 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-10 00:11 -0800

1# ===================================================== 

2# Imports 

3# ===================================================== 

4from typing import Annotated, Any, Dict, Optional 

5 

6from pydantic import computed_field, Field, field_validator, PrivateAttr 

7 

8from mt_metadata.base import MetadataBase 

9from mt_metadata.utils.validators import validate_attribute 

10 

11from . import CH, GDP, GPS, Job, Line, MTEdit, MTFT24, Rx, STN, Survey, Tx, Unit 

12 

13 

14# ===================================================== 

15class Header(MetadataBase): 

16 name: Annotated[ 

17 str | None, 

18 Field( 

19 default=None, 

20 description="Station name", 

21 alias=None, 

22 json_schema_extra={ 

23 "units": None, 

24 "required": False, 

25 "examples": ["null"], 

26 }, 

27 ), 

28 ] 

29 

30 survey: Annotated[ 

31 Survey, 

32 Field( 

33 default_factory=Survey, 

34 description="Survey metadata", 

35 alias=None, 

36 json_schema_extra={ 

37 "units": None, 

38 "required": False, 

39 "examples": ["null"], 

40 }, 

41 ), 

42 ] 

43 

44 tx: Annotated[ 

45 Tx, 

46 Field( 

47 default_factory=Tx, 

48 description="Transmitter metadata", 

49 alias=None, 

50 json_schema_extra={ 

51 "units": None, 

52 "required": False, 

53 "examples": ["null"], 

54 }, 

55 ), 

56 ] 

57 

58 rx: Annotated[ 

59 Rx, 

60 Field( 

61 default_factory=Rx, 

62 description="Receiver metadata", 

63 alias=None, 

64 json_schema_extra={ 

65 "units": None, 

66 "required": False, 

67 "examples": ["null"], 

68 }, 

69 ), 

70 ] 

71 

72 m_t_edit: Annotated[ 

73 MTEdit, 

74 Field( 

75 default_factory=MTEdit, 

76 description="MTEdit metadata", 

77 alias=None, 

78 json_schema_extra={ 

79 "units": None, 

80 "required": False, 

81 "examples": ["null"], 

82 }, 

83 ), 

84 ] 

85 

86 m_t_f_t24: Annotated[ 

87 MTFT24, 

88 Field( 

89 default_factory=MTFT24, 

90 description="MTFT24 metadata", 

91 alias=None, 

92 json_schema_extra={ 

93 "units": None, 

94 "required": False, 

95 "examples": ["null"], 

96 }, 

97 ), 

98 ] 

99 

100 gps: Annotated[ 

101 GPS, 

102 Field( 

103 default_factory=GPS, 

104 description="GPS metadata", 

105 alias=None, 

106 json_schema_extra={ 

107 "units": None, 

108 "required": False, 

109 "examples": ["null"], 

110 }, 

111 ), 

112 ] 

113 

114 gdp: Annotated[ 

115 GDP, 

116 Field( 

117 default_factory=GDP, 

118 description="GDP metadata", 

119 alias=None, 

120 json_schema_extra={ 

121 "units": None, 

122 "required": False, 

123 "examples": ["null"], 

124 }, 

125 ), 

126 ] 

127 

128 ch: Annotated[ 

129 CH, 

130 Field( 

131 default_factory=CH, 

132 description="CH metadata", 

133 alias=None, 

134 json_schema_extra={ 

135 "units": None, 

136 "required": False, 

137 "examples": ["null"], 

138 }, 

139 ), 

140 ] 

141 

142 stn: Annotated[ 

143 STN, 

144 Field( 

145 default_factory=STN, 

146 description="STN metadata", 

147 alias=None, 

148 json_schema_extra={ 

149 "units": None, 

150 "required": False, 

151 "examples": ["null"], 

152 }, 

153 ), 

154 ] 

155 

156 line: Annotated[ 

157 Line, 

158 Field( 

159 default_factory=Line, 

160 description="Line metadata", 

161 alias=None, 

162 json_schema_extra={ 

163 "units": None, 

164 "required": False, 

165 "examples": ["null"], 

166 }, 

167 ), 

168 ] 

169 

170 unit: Annotated[ 

171 Unit, 

172 Field( 

173 default_factory=Unit, 

174 description="Unit metadata", 

175 alias=None, 

176 json_schema_extra={ 

177 "units": None, 

178 "required": False, 

179 "examples": ["null"], 

180 }, 

181 ), 

182 ] 

183 

184 job: Annotated[ 

185 Job, 

186 Field( 

187 default_factory=Job, 

188 description="Job metadata", 

189 alias=None, 

190 json_schema_extra={ 

191 "units": None, 

192 "required": False, 

193 "examples": ["null"], 

194 }, 

195 ), 

196 ] 

197 

198 elevation: Annotated[ 

199 float, 

200 Field( 

201 default=0.0, 

202 description="Elevation metadata", 

203 alias=None, 

204 json_schema_extra={ 

205 "units": None, 

206 "required": False, 

207 "examples": ["null"], 

208 }, 

209 ), 

210 ] 

211 

212 # Private fields for GPS coordinates (excluded from serialization but used internally) 

213 _gps_lat: float = PrivateAttr(default=0.0) 

214 _gps_lon: float = PrivateAttr(default=0.0) 

215 _elevation: float = PrivateAttr(default=0.0) 

216 

217 _comp_dict: Dict[str, Any] = PrivateAttr(default_factory=dict) 

218 

219 _header_keys = [ 

220 "survey.type", 

221 "survey.array", 

222 "tx.type", 

223 "m_t_edit.version", 

224 "m_t_edit.auto.phase_flip", 

225 "m_t_edit.phase_slope.smooth", 

226 "m_t_edit.phase_slope.to_z_mag", 

227 "m_t_edit.d_plus.use", 

228 "rx.gdp_stn", 

229 "rx.length", 

230 "rx.h_p_r", 

231 "g_p_s.lat", 

232 "g_p_s.lon", 

233 "unit.length", 

234 ] 

235 

236 def read_header(self, lines: list[str]) -> list[str]: 

237 """ 

238 Read the header of an AVG file and fill attributes accordingly 

239 

240 Parameters 

241 ----------- 

242 lines: list[str] 

243 list of strings representing the lines of the AVG file 

244 

245 Returns 

246 -------- 

247 list[str] 

248 

249 """ 

250 

251 comp = None 

252 data_lines = [] 

253 for ii, line in enumerate(lines): 

254 if line.find("=") > 0 and line.find("$") == 0: 

255 key, value = line[1:].split("=") 

256 key = ".".join( 

257 [validate_attribute(k) for k in key.replace(":", ".").split(".")] 

258 ) 

259 

260 value = value.strip() 

261 # Don't lowercase enum values that need to maintain case 

262 if not ( 

263 key.endswith(".proj") 

264 or key.endswith(".type") 

265 or key.endswith(".datum") 

266 ): 

267 value = value.lower() 

268 

269 # Only split on commas for specific fields that should be lists (h_p_r) 

270 # Most coordinate and position fields (xyz1, utm1, center) should remain as strings 

271 if "," in value and key.endswith(".h_p_r"): 

272 value = [float(v.strip()) for v in value.split(",")] 

273 if "length" in key and isinstance(value, str): 

274 value = value.split() 

275 if len(value) > 1: 

276 value = value[0] 

277 else: 

278 value = value[0].strip() 

279 

280 if "rx.cmp" in key: 

281 comp = str(value) # Ensure comp is always a string 

282 data_lines.append(line) 

283 self._comp_dict[comp] = {"rx": Rx(), "ch": CH()} 

284 if comp is not None: 

285 comp_key, comp_attr = key.split(".") 

286 

287 self._comp_dict[comp][comp_key].update_attribute(comp_attr, value) 

288 else: 

289 # Map converted snake_case names back to actual attribute names 

290 # This fixes the issue where GPS -> g_p_s but attribute is 'gps' 

291 mapped_key = key.replace("g_p_s", "gps").replace("g_d_p", "gdp") 

292 self.update_attribute(mapped_key, value) 

293 else: 

294 if len(line) > 2: 

295 data_lines.append(line) 

296 

297 return data_lines 

298 

299 def _has_channel(self, component): 

300 try: 

301 if self._comp_dict["zxx"]["ch"].cmp is None: 

302 return False 

303 except KeyError: 

304 return False 

305 return True 

306 

307 # ===================================================== 

308 # Field validators for input validation 

309 # ===================================================== 

310 

311 @field_validator("elevation", mode="before") 

312 @classmethod 

313 def validate_elevation(cls, v): 

314 """Validate and convert elevation input.""" 

315 if v is None: 

316 return 0.0 

317 if isinstance(v, str): 

318 try: 

319 return float(v.strip()) 

320 except ValueError: 

321 raise ValueError(f"Cannot convert '{v}' to float") 

322 return float(v) 

323 

324 @classmethod 

325 def validate_coordinates(cls, v): 

326 """Validate and convert coordinate input.""" 

327 if v is None: 

328 return 0.0 

329 if isinstance(v, str): 

330 try: 

331 return float(v.strip()) 

332 except ValueError: 

333 raise ValueError(f"Cannot convert '{v}' to float") 

334 return float(v) 

335 

336 # ===================================================== 

337 # Computed fields (read-only properties) 

338 # ===================================================== 

339 

340 @computed_field 

341 @property 

342 def latitude(self) -> float: 

343 """Get latitude from GPS data.""" 

344 return self.gps.lat if hasattr(self.gps, "lat") else self._gps_lat 

345 

346 @computed_field 

347 @property 

348 def longitude(self) -> float: 

349 """Get longitude from GPS data.""" 

350 return self.gps.lon if hasattr(self.gps, "lon") else self._gps_lon 

351 

352 @computed_field 

353 @property 

354 def easting(self) -> Optional[float]: 

355 """Get easting from center location.""" 

356 center_loc = self.center_location 

357 return center_loc[0] if center_loc is not None else None 

358 

359 @computed_field 

360 @property 

361 def northing(self) -> Optional[float]: 

362 """Get northing from center location.""" 

363 center_loc = self.center_location 

364 return center_loc[1] if center_loc is not None else None 

365 

366 @computed_field 

367 @property 

368 def center_location(self) -> Optional[list[float]]: 

369 """Get center location from component data.""" 

370 if self._has_channel("zxx"): 

371 location_str = self._comp_dict["zxx"]["rx"].center 

372 if location_str is None: 

373 return None 

374 try: 

375 return [float(ss.strip().split()[0]) for ss in location_str.split(":")] 

376 except (ValueError, AttributeError, IndexError): 

377 return None 

378 return None 

379 

380 @computed_field 

381 @property 

382 def datum(self) -> Optional[str]: 

383 """Get datum from GPS data.""" 

384 return ( 

385 self.gps.datum.upper() 

386 if hasattr(self.gps, "datum") and self.gps.datum 

387 else None 

388 ) 

389 

390 @computed_field 

391 @property 

392 def utm_zone(self) -> Optional[str]: 

393 """Get UTM zone from GPS data.""" 

394 zone = self.gps.u_t_m_zone if hasattr(self.gps, "u_t_m_zone") else None 

395 return str(zone) if zone is not None else None 

396 

397 @computed_field 

398 @property 

399 def station(self) -> Optional[str]: 

400 """Get station from RX data.""" 

401 return self.rx.gdp_stn if hasattr(self.rx, "gdp_stn") else None 

402 

403 @computed_field 

404 @property 

405 def instrument_id(self) -> Optional[str]: 

406 """Get instrument ID from component data.""" 

407 if self._has_channel("zxx"): 

408 try: 

409 return self._comp_dict["zxx"]["ch"].gdp_box[0] 

410 except (KeyError, IndexError, AttributeError): 

411 return None 

412 return None 

413 

414 @computed_field 

415 @property 

416 def instrument_type(self) -> Optional[str]: 

417 """Get instrument type from GDP data.""" 

418 return ( 

419 self.gdp.type.upper() 

420 if hasattr(self.gdp, "type") and self.gdp.type 

421 else None 

422 ) 

423 

424 @computed_field 

425 @property 

426 def firmware(self) -> Optional[str]: 

427 """Get firmware version from GDP data.""" 

428 try: 

429 if hasattr(self.gdp, "prog_ver") and self.gdp.prog_ver: 

430 return self.gdp.prog_ver.split(":")[0] 

431 except (IndexError, AttributeError): 

432 pass 

433 return None 

434 

435 @computed_field 

436 @property 

437 def start_time(self) -> Optional[str]: 

438 """Get start time from GDP data.""" 

439 try: 

440 if hasattr(self.gdp, "time") and hasattr(self.gdp, "date"): 

441 if self.gdp.time != "1980-01-01T00:00:00+00:00": 

442 return f"{self.gdp.date}T{self.gdp.time}" 

443 except AttributeError: 

444 pass 

445 return None 

446 

447 @property 

448 def g_p_s(self): 

449 """Alias for GPS object to maintain backward compatibility.""" 

450 return self.gps 

451 

452 @property 

453 def g_d_p(self): 

454 """Alias for GDP object to maintain backward compatibility.""" 

455 return self.gdp 

456 

457 # ===================================================== 

458 # Custom setters using __setattr__ override 

459 # ===================================================== 

460 

461 def __setattr__(self, name: str, value: Any) -> None: 

462 """Custom setters for computed properties.""" 

463 

464 # Setter for latitude 

465 if name == "latitude": 

466 if isinstance(value, str): 

467 try: 

468 value = float(value.strip()) 

469 except ValueError: 

470 raise ValueError(f"Invalid latitude: {value}") 

471 # Update both the private field and GPS object if available 

472 super().__setattr__("_gps_lat", float(value)) 

473 if hasattr(self, "gps") and hasattr(self.gps, "lat"): 

474 self.gps.lat = float(value) 

475 return 

476 

477 # Setter for longitude 

478 if name == "longitude": 

479 if isinstance(value, str): 

480 try: 

481 value = float(value.strip()) 

482 except ValueError: 

483 raise ValueError(f"Invalid longitude: {value}") 

484 # Update both the private field and GPS object if available 

485 super().__setattr__("_gps_lon", float(value)) 

486 if hasattr(self, "gps") and hasattr(self.gps, "lon"): 

487 self.gps.lon = float(value) 

488 return 

489 

490 # Setter for elevation 

491 if name == "elevation": 

492 if isinstance(value, str): 

493 try: 

494 value = float(value.strip()) 

495 except ValueError: 

496 raise ValueError(f"Invalid elevation: {value}") 

497 super().__setattr__("_elevation", float(value)) 

498 # Also update the main elevation field 

499 super().__setattr__("elevation", float(value)) 

500 return 

501 

502 # Setter for station 

503 if name == "station": 

504 if ( 

505 hasattr(self, "rx") 

506 and hasattr(self.rx, "gdp_stn") 

507 and value is not None 

508 ): 

509 self.rx.gdp_stn = str(value) 

510 return 

511 

512 # Default behavior for all other attributes 

513 super().__setattr__(name, value) 

514 

515 def write_header(self): 

516 """ 

517 Write .avg header lines 

518 

519 :return: DESCRIPTION 

520 :rtype: TYPE 

521 

522 """ 

523 lines = [""] 

524 

525 for key in self._header_keys: 

526 # Map g_p_s to gps for attribute access 

527 actual_key = key.replace("g_p_s", "gps") 

528 value = self.get_attr_from_name(actual_key) 

529 if isinstance(value, list): 

530 value = ",".join([f"{v:.1f}" for v in value]) 

531 elif isinstance(value, (float)): 

532 value = f"{value:.7f}" 

533 elif isinstance(value, (int)): 

534 value = f"{value:.0f}" 

535 

536 key = ( 

537 key.replace("_", " ") 

538 .title() 

539 .replace(" ", "") 

540 .replace("MTEdit.", "MTEdit:") 

541 ) 

542 

543 lines.append(f"${key}={value.capitalize()}") 

544 

545 return lines