Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mt_metadata \ mt_metadata \ transfer_functions \ io \ zonge \ zonge.py: 79%

312 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-10 00:11 -0800

1# -*- coding: utf-8 -*- 

2""" 

3==================== 

4zonge 

5==================== 

6 * Tools for interfacing with MTFT24 

7 * Tools for interfacing with MTEdit 

8 

9 

10Created on Tue Jul 11 10:53:23 2013 

11@author: jpeacock-pr 

12""" 

13 

14# ============================================================================== 

15from pathlib import Path 

16 

17import numpy as np 

18import pandas as pd 

19from loguru import logger 

20 

21from mt_metadata.timeseries import Electric, Magnetic, Run, Survey 

22from mt_metadata.transfer_functions.io.tools import get_nm_elev 

23from mt_metadata.transfer_functions.tf import Station 

24 

25from .metadata import Header 

26 

27 

28# ============================================================================== 

29# deal with avg files output from mtedit 

30# ============================================================================== 

31class ZongeMTAvg: 

32 """ 

33 deal with avg files output from mtedit 

34 """ 

35 

36 def __init__(self, fn=None, **kwargs): 

37 self.header = Header() 

38 

39 self.info_keys = [ 

40 "skip", 

41 "frequency", 

42 "e_magnitude", 

43 "b_magnitude", 

44 "z_magnitude", 

45 "z_phase", 

46 "apparent_resistivity", 

47 "apparent_resistivity_err", 

48 "z_phase_err", 

49 "coherency", 

50 "fc_use", 

51 "fc_try", 

52 ] 

53 self.info_fmt = [] 

54 

55 self.z = None 

56 self.z_err = None 

57 self.t = None 

58 self.t_err = None 

59 self.components = [] 

60 

61 self._comp_index_down = { 

62 "zxx": (0, 0), 

63 "zxy": (0, 1), 

64 "zyx": (1, 0), 

65 "zyy": (1, 1), 

66 "tzx": (0, 0), 

67 "tzy": (0, 1), 

68 "zxxr": (0, 0), 

69 "zxyr": (0, 1), 

70 "zyxr": (1, 0), 

71 "zyyr": (1, 1), 

72 } 

73 

74 self._comp_index_up = { 

75 "zxx": (1, 1), 

76 "zxy": (1, 0), 

77 "zyx": (0, 1), 

78 "zyy": (0, 0), 

79 "tzx": (0, 1), 

80 "tzy": (0, 0), 

81 "zxxr": (1, 1), 

82 "zxyr": (1, 0), 

83 "zyxr": (0, 1), 

84 "zyyr": (0, 0), 

85 } 

86 

87 self.freq_index_dict = None 

88 self.z_positive = "down" 

89 

90 self.fn = fn 

91 

92 for key, value in kwargs.items(): 

93 setattr(self, key, value) 

94 

95 def _get_comp_index(self) -> dict[str, tuple[int, int]]: 

96 """ 

97 get the correct component index dictionary based on z_positive 

98 

99 Down assumes x is north, y is east 

100 

101 Up assumes x is east, y is north 

102 """ 

103 if self.z_positive == "down": 

104 return self._comp_index_down 

105 elif self.z_positive == "up": 

106 return self._comp_index_up 

107 else: 

108 raise ValueError("z_postiive must be either [ 'up' | 'down' ]") 

109 

110 @property 

111 def fn(self) -> Path | None: 

112 return self._fn 

113 

114 @fn.setter 

115 def fn(self, value: str | Path | None): 

116 if value is not None: 

117 self._fn = Path(value) 

118 else: 

119 self._fn = None 

120 

121 def read(self, fn: str | Path | None = None, get_elevation: bool = False) -> None: 

122 """ 

123 Read data from a file into the object as a pandas DataFrame 

124 

125 Parameters 

126 ---------- 

127 fn : str | Path | None, optional 

128 The file name to read from, by default None 

129 get_elevation : bool, optional 

130 Whether to get elevation data, by default False 

131 """ 

132 

133 if fn is not None: 

134 self.fn = Path(fn) 

135 

136 if self.fn is None or not self.fn.exists(): 

137 raise FileNotFoundError(f"File not found: {self.fn}") 

138 

139 with self.fn.open("r") as fid: 

140 lines = fid.readlines() 

141 

142 # read header 

143 data_lines = self.header.read_header(lines) 

144 

145 data_list = [] 

146 for line in data_lines: 

147 if "$" in line: 

148 key, comp = [ss.strip() for ss in line.split("=")] 

149 elif "skp" in line.lower() or len(line) < 2: 

150 continue 

151 else: 

152 line = line.replace("*", "0.50") 

153 values = [comp.lower()] + [float(ss.strip()) for ss in line.split(",")] 

154 entry = dict( 

155 [ 

156 (key.lower(), value) 

157 for key, value in zip(["comp"] + self.info_keys, values) 

158 ] 

159 ) 

160 data_list.append(entry) 

161 

162 self.df = pd.DataFrame(data_list) 

163 

164 self.frequency = self.df.frequency.unique() 

165 self.frequency.sort() 

166 self.n_freq = self.frequency.size 

167 self.components = self.df.comp.unique() 

168 

169 self.freq_index_dict = dict([(ff, ii) for ii, ff in enumerate(self.frequency)]) 

170 

171 self.z, self.z_err = self._fill_z() 

172 self.t, self.t_err = self._fill_t() 

173 

174 if self.header.elevation == 0 and get_elevation: 

175 if self.header.latitude != 0 and self.header.longitude != 0: 

176 self.header.elevation = get_nm_elev( 

177 self.header.latitude, self.header.longitude 

178 ) 

179 

180 def to_complex( 

181 self, zmag: np.typing.NDArray, zphase: np.typing.NDArray 

182 ) -> tuple[np.typing.NDArray, np.typing.NDArray]: 

183 """ 

184 Convert magnitude and phase to complex representation. 

185 

186 Outputs of mtedit are magnitude and phase of z, convert to real and 

187 imaginary parts, phase is in milliradians. 

188 

189 Parameters 

190 ---------- 

191 zmag : np.typing.NDArray 

192 The magnitude array. 

193 zphase : np.typing.NDArray 

194 The phase array. 

195 

196 Returns 

197 ------- 

198 tuple[np.typing.NDArray, np.typing.NDArray] 

199 The real and imaginary parts of the complex representation. 

200 

201 """ 

202 

203 if isinstance(zmag, np.ndarray): 

204 assert len(zmag) == len(zphase) 

205 zreal = zmag * np.cos((zphase / 1000)) 

206 zimag = zmag * np.sin((zphase / 1000)) 

207 return zreal, zimag 

208 

209 def to_amp_phase( 

210 self, zreal: np.typing.NDArray, zimag: np.typing.NDArray 

211 ) -> tuple[np.typing.NDArray, np.typing.NDArray]: 

212 """ 

213 Convert to amplitude and phase from real and imaginary 

214 

215 Parameters 

216 ---------- 

217 zreal : np.typing.NDArray 

218 The real part of the complex representation. 

219 zimag : np.typing.NDArray 

220 The imaginary part of the complex representation. 

221 

222 Returns 

223 ------- 

224 tuple[np.typing.NDArray, np.typing.NDArray] 

225 The amplitude and phase representation. 

226 

227 """ 

228 

229 if isinstance(zreal, np.ndarray): 

230 assert len(zreal) == len(zimag) 

231 zphase = np.arctan2(zimag, zreal) * 1000 

232 zmag = np.sqrt(zreal**2 + zimag**2) 

233 

234 return zmag, zphase 

235 

236 def _fill_z(self) -> tuple[np.typing.NDArray, np.typing.NDArray]: 

237 """ 

238 create Z array with data, need to take into account when the different 

239 components have different frequencies, sometimes one might get skipped. 

240 

241 Parameters 

242 ---------- 

243 None 

244 

245 Returns 

246 ------- 

247 tuple[np.typing.NDArray, np.typing.NDArray] 

248 The Z and Z error arrays. 

249 """ 

250 

251 z = np.zeros((self.n_freq, 2, 2), dtype=complex) 

252 z_err = np.ones((self.n_freq, 2, 2), dtype=float) 

253 

254 comp_index = self._get_comp_index() 

255 

256 for row in self.df[self.df.comp.str.startswith("z")].itertuples(): 

257 ii, jj = comp_index[row.comp] 

258 f_index = self.freq_index_dict[row.frequency] 

259 z_real, z_imag = self.to_complex(row.z_magnitude, row.z_phase) 

260 z_real_error, z_imag_error = self.to_complex( 

261 ( 

262 np.sqrt( 

263 ( 

264 (row.apparent_resistivity_err / 100) 

265 * row.apparent_resistivity 

266 ) 

267 * 5 

268 * row.frequency 

269 ) 

270 ), 

271 row.z_phase_err, 

272 ) 

273 

274 z[f_index, ii, jj] = z_real + 1j * z_imag 

275 

276 z_err[f_index, ii, jj] = np.sqrt(z_real_error**2 + z_imag_error**2) 

277 

278 return z, z_err 

279 

280 def _fill_t( 

281 self, 

282 ) -> tuple[np.typing.NDArray, np.typing.NDArray] | tuple[None, None]: 

283 """ 

284 fill tipper values 

285 

286 Returns 

287 ------- 

288 tuple[np.typing.NDArray, np.typing.NDArray] 

289 The T and T error arrays. 

290 """ 

291 

292 if "tzx" not in self.df.comp.to_list(): 

293 logger.debug( 

294 "No Tipper found in {self.fn.name}", 

295 ) 

296 return None, None 

297 

298 t = np.zeros((self.n_freq, 1, 2), dtype=complex) 

299 t_err = np.ones((self.n_freq, 1, 2), dtype=float) 

300 

301 comp_index = self._get_comp_index() 

302 

303 for row in self.df[self.df.comp.str.startswith("t")].itertuples(): 

304 t_real, t_imag = self.to_complex(row.z_magnitude, row.z_phase) 

305 ii, jj = comp_index[row.comp] 

306 f_index = self.freq_index_dict[row.frequency] 

307 

308 if self.z_positive == "up": 

309 t[f_index, ii, jj] = -1 * (t_real + t_imag * 1j) 

310 else: 

311 t[f_index, ii, jj] = t_real + t_imag * 1j 

312 # error estimation 

313 t_real_error, t_imag_error = self.to_complex( 

314 ( 

315 np.sqrt( 

316 ( 

317 (row.apparent_resistivity_err / 100) 

318 * row.apparent_resistivity 

319 ) 

320 ) 

321 ), 

322 row.z_phase_err, 

323 ) 

324 t_err[f_index, ii, jj] = np.sqrt(t_real**2 + t_imag**2) 

325 

326 return t, t_err 

327 

328 @property 

329 def run_metadata(self) -> Run: 

330 rm = Run(id="001") 

331 rm.data_logger.id = self.header.instrument_id 

332 rm.data_logger.type = self.header.instrument_type 

333 rm.data_logger.manufacturer = "Zonge International" 

334 if self.header.firmware is not None: 

335 rm.data_logger.firmware.version = self.header.firmware 

336 if self.header.start_time is not None: 

337 rm.time_period.start = self.header.start_time 

338 

339 if "zxy" in self.components or "zxxr" in self.components: 

340 rm.add_channel(self.ex_metadata) 

341 rm.add_channel(self.ey_metadata) 

342 rm.add_channel(self.hx_metadata) 

343 rm.add_channel(self.hy_metadata) 

344 

345 if "tzx" in self.components or "rxxr" in self.components: 

346 rm.add_channel(self.hz_metadata) 

347 

348 return rm 

349 

350 @property 

351 def ex_metadata(self) -> Electric: 

352 ch = Electric(component="ex") 

353 if self.header._has_channel("zxy"): 

354 ch.dipole_length = self.header._comp_dict["zxy"]["rx"].length 

355 ch.measurement_azimuth = self.header._comp_dict["zxy"]["ch"].azimuth[0] 

356 ch.translated_azimuth = self.header._comp_dict["zxy"]["ch"].azimuth[0] 

357 ch.measurement_tilt = self.header._comp_dict["zxy"]["ch"].incl[0] 

358 ch.translated_tilt = self.header._comp_dict["zxy"]["ch"].incl[0] 

359 ch.channel_id = self.header._comp_dict["zxy"]["ch"].number[0] 

360 ch.time_period.start = self.header.start_time 

361 

362 else: 

363 ch.dipole_length = self.header.rx.length 

364 ch.measurement_azimuth = self.header.rx.h_p_r[0] 

365 ch.translated_azimuth = self.header.rx.h_p_r[0] 

366 ch.channel_id = 4 

367 

368 return ch 

369 

370 @property 

371 def ey_metadata(self) -> Electric: 

372 ch = Electric(component="ey") 

373 if self.header._has_channel("zyx"): 

374 ch.dipole_length = self.header._comp_dict["zyx"]["rx"].length 

375 ch.measurement_azimuth = self.header._comp_dict["zyx"]["ch"].azimuth[0] 

376 ch.translated_azimuth = self.header._comp_dict["zyx"]["ch"].azimuth[0] 

377 ch.measurement_tilt = self.header._comp_dict["zyx"]["ch"].incl[0] 

378 ch.translated_tilt = self.header._comp_dict["zyx"]["ch"].incl[0] 

379 ch.channel_id = self.header._comp_dict["zyx"]["ch"].number[0] 

380 ch.time_period.start = self.header.start_time 

381 

382 else: 

383 ch.dipole_length = self.header.rx.length 

384 ch.measurement_azimuth = self.header.rx.h_p_r[0] + 90 

385 ch.translated_azimuth = self.header.rx.h_p_r[0] + 90 

386 ch.channel_id = 5 

387 

388 return ch 

389 

390 @property 

391 def hx_metadata(self) -> Magnetic: 

392 ch = Magnetic(component="hx") 

393 if self.header._has_channel("zyx"): 

394 ch.measurement_azimuth = self.header._comp_dict["zyx"]["ch"].azimuth[1] 

395 ch.translated_azimuth = self.header._comp_dict["zyx"]["ch"].azimuth[1] 

396 ch.measurement_tilt = self.header._comp_dict["zyx"]["ch"].incl[1] 

397 ch.translated_tilt = self.header._comp_dict["zyx"]["ch"].incl[1] 

398 ch.sensor.id = self.header._comp_dict["zyx"]["ch"].number[1] 

399 ch.channel_id = 1 

400 ch.time_period.start = self.header.start_time 

401 

402 else: 

403 ch.measurement_azimuth = self.header.rx.h_p_r[0] 

404 ch.translated_azimuth = self.header.rx.h_p_r[0] 

405 ch.channel_id = 1 

406 

407 return ch 

408 

409 @property 

410 def hy_metadata(self) -> Magnetic: 

411 ch = Magnetic(component="hy") 

412 if self.header._has_channel("zxy"): 

413 ch.measurement_azimuth = self.header._comp_dict["zxy"]["ch"].azimuth[1] 

414 ch.translated_azimuth = self.header._comp_dict["zxy"]["ch"].azimuth[1] 

415 ch.measurement_tilt = self.header._comp_dict["zxy"]["ch"].incl[1] 

416 ch.translated_tilt = self.header._comp_dict["zxy"]["ch"].incl[1] 

417 ch.sensor.id = self.header._comp_dict["zxy"]["ch"].number[1] 

418 ch.channel_id = 2 

419 ch.time_period.start = self.header.start_time 

420 

421 else: 

422 ch.measurement_azimuth = self.header.rx.h_p_r[0] + 90 

423 ch.translated_azimuth = self.header.rx.h_p_r[0] + 90 

424 ch.channel_id = 2 

425 

426 return ch 

427 

428 @property 

429 def hz_metadata(self) -> Magnetic: 

430 ch = Magnetic(component="hz") 

431 if self.header._has_channel("tzx"): 

432 # Parse comma-separated values safely 

433 azimuth_values = self.header._comp_dict["tzx"]["ch"].azimuth 

434 incl_values = self.header._comp_dict["tzx"]["ch"].incl 

435 number_values = self.header._comp_dict["tzx"]["ch"].number 

436 

437 # Extract second value from comma-separated strings 

438 if azimuth_values and "," in azimuth_values: 

439 try: 

440 azimuth_val = float(azimuth_values.split(",")[1].strip()) 

441 ch.measurement_azimuth = azimuth_val 

442 ch.translated_azimuth = azimuth_val 

443 except (IndexError, ValueError): 

444 pass 

445 

446 if incl_values and "," in incl_values: 

447 try: 

448 incl_val = float(incl_values.split(",")[1].strip()) 

449 ch.measurement_tilt = incl_val 

450 ch.translated_tilt = incl_val 

451 except (IndexError, ValueError): 

452 pass 

453 

454 if number_values and "," in number_values: 

455 try: 

456 ch.sensor.id = number_values.split(",")[1].strip() 

457 except IndexError: 

458 pass 

459 

460 ch.channel_id = "3" 

461 if self.header.start_time: 

462 ch.time_period.start = self.header.start_time 

463 

464 else: 

465 ch.measurement_azimuth = self.header.rx.h_p_r[-1] 

466 ch.translated_azimuth = self.header.rx.h_p_r[-1] 

467 ch.channel_id = "3" 

468 

469 return ch 

470 

471 @property 

472 def station_metadata(self) -> Station: 

473 sm = Station() 

474 

475 sm.id = self.header.station 

476 sm.location.latitude = self.header.latitude 

477 sm.location.longitude = self.header.longitude 

478 sm.location.elevation = self.header.elevation 

479 sm.location.datum = self.header.datum.upper() 

480 

481 sm.transfer_function.id = self.header.station 

482 sm.transfer_function.software.author = "Zonge International" 

483 sm.transfer_function.software.name = "MTEdit" 

484 sm.transfer_function.software.version = self.header.m_t_edit.version.split()[0] 

485 sm.transfer_function.software.last_updated = ( 

486 self.header.m_t_edit.version.split()[-1] 

487 ) 

488 

489 for key, value in self.header.m_t_edit.to_dict(single=True).items(): 

490 if "version" in key: 

491 continue 

492 sm.transfer_function.processing_parameters.append(f"mtedit.{key}={value}") 

493 

494 sm.data_type = self.header.survey.type 

495 sm.add_run(self.run_metadata) 

496 sm.transfer_function.runs_processed = [self.run_metadata.id] 

497 if self.header.start_time is not None: 

498 sm.time_period.start = self.header.start_time 

499 

500 return sm 

501 

502 @station_metadata.setter 

503 def station_metadata(self, sm): 

504 self.header.station = sm.id 

505 self.header.latitdude = sm.location.latitude 

506 self.header.longitude = sm.location.longitude 

507 

508 if hasattr(sm.run[0].ex): 

509 self.header.rx.length = sm.run[0].ex.dipole_length 

510 

511 @property 

512 def survey_metadata(self) -> Survey: 

513 sm = Survey() 

514 sm.add_station(self.station_metadata) 

515 sm.update_time_period() 

516 return sm 

517 

518 def _create_dataframe_from_arrays(self) -> pd.DataFrame: 

519 """ 

520 Create a DataFrame from Z and T arrays for writing. 

521 This is used when data is available as arrays but not as a DataFrame. 

522 """ 

523 if self.z is None or self.frequency is None: 

524 raise ValueError("No impedance data or frequency array available") 

525 

526 data_list = [] 

527 comp_index = self._get_comp_index() 

528 

529 # Create data for each impedance component 

530 for comp, (i, j) in comp_index.items(): 

531 if not comp.startswith(("z", "t")): 

532 continue 

533 

534 for f_idx, freq in enumerate(self.frequency): 

535 if comp.startswith("z") and self.z is not None: 

536 z_val = self.z[f_idx, i, j] 

537 z_err_val = ( 

538 self.z_err[f_idx, i, j] if self.z_err is not None else 0.0 

539 ) 

540 

541 # Convert complex impedance to magnitude and phase 

542 z_mag, z_phase = self.to_amp_phase(z_val.real, z_val.imag) 

543 

544 # Calculate apparent resistivity 

545 app_res = 0.2 * (1.0 / freq) * (z_mag**2) 

546 app_res_err = 2.0 * z_err_val / z_mag * 100 if z_mag != 0 else 0.0 

547 

548 entry = { 

549 "comp": comp, 

550 "skip": 2, # Default skip value 

551 "frequency": freq, 

552 "e_magnitude": 1.0, # Default E magnitude 

553 "b_magnitude": 1.0, # Default B magnitude 

554 "z_magnitude": z_mag, 

555 "z_phase": z_phase, 

556 "apparent_resistivity": app_res, 

557 "apparent_resistivity_err": app_res_err, 

558 "z_phase_err": z_err_val * 1000, # Convert to milliradians 

559 "coherency": 0.9, # Default coherency 

560 "fc_use": 100, # Default values 

561 "fc_try": 200, 

562 } 

563 data_list.append(entry) 

564 

565 elif comp.startswith("t") and self.t is not None: 

566 t_val = self.t[f_idx, 0, j] # Tipper is 1x2 

567 t_err_val = ( 

568 self.t_err[f_idx, 0, j] if self.t_err is not None else 0.0 

569 ) 

570 

571 # Convert complex tipper to magnitude and phase 

572 t_mag, t_phase = self.to_amp_phase(t_val.real, t_val.imag) 

573 

574 entry = { 

575 "comp": comp, 

576 "skip": 2, 

577 "frequency": freq, 

578 "e_magnitude": 1.0, 

579 "b_magnitude": 1.0, 

580 "z_magnitude": t_mag, 

581 "z_phase": t_phase, 

582 "apparent_resistivity": 1.0, # Not applicable for tipper 

583 "apparent_resistivity_err": t_err_val * 100, 

584 "z_phase_err": t_err_val * 1000, 

585 "coherency": 0.9, 

586 "fc_use": 100, 

587 "fc_try": 200, 

588 } 

589 data_list.append(entry) 

590 

591 self.df = pd.DataFrame(data_list) 

592 self.components = self.df.comp.unique() 

593 

594 def write(self, fn: str | Path) -> None: 

595 """ 

596 Write an .avg file 

597 

598 Parameters 

599 ---------- 

600 fn : str or Path 

601 Filename to write to 

602 

603 """ 

604 

605 if fn is not None: 

606 self.fn = Path(fn) 

607 

608 # Create DataFrame from arrays if it doesn't exist 

609 if not hasattr(self, "df") or self.df is None or self.df.empty: 

610 self._create_dataframe_from_arrays() 

611 

612 # Write header lines 

613 header_lines = self.header.write_header() 

614 

615 # Group data by component and write each component section 

616 for comp in self.components: 

617 # Get component data 

618 comp_data = self.df[self.df.comp == comp].copy() 

619 if comp_data.empty: 

620 continue 

621 

622 # Add component header line 

623 header_lines.append(f"$Rx.Cmp = {comp.capitalize()}") 

624 

625 # Add component metadata lines if they exist in header 

626 if hasattr(self.header, "_comp_dict") and comp in self.header._comp_dict: 

627 comp_metadata = self.header._comp_dict[comp] 

628 # Add component specific metadata lines based on the structure 

629 # These would include lines like $Rx.Length, $Rx.Center, etc. 

630 # The exact structure depends on the header metadata 

631 for key, value in comp_metadata.items(): 

632 if key.startswith("rx_"): 

633 header_key = key.replace("rx_", "$Rx.").replace("_", ".") 

634 header_lines.append(f"{header_key}={value}") 

635 elif key.startswith("ch_"): 

636 header_key = key.replace("ch_", "$Ch.").replace("_", ".") 

637 header_lines.append(f"{header_key}={value}") 

638 

639 # Add data column header 

640 header_lines.append( 

641 "Skp,Freq, E.mag, B.mag, Z.mag, Z.phz, " 

642 "ARes.mag, ARes.%err,Z.perr, Coher, FC.NUse,FC.NTry" 

643 ) 

644 

645 # Sort data by frequency for this component 

646 comp_data = comp_data.sort_values("frequency") 

647 

648 # Write data lines for this component 

649 for _, row in comp_data.iterrows(): 

650 # Format each value according to the expected format to match the original file 

651 line = ( 

652 f"{int(row.skip)}, " 

653 f"{row.frequency:11.4E}, " 

654 f"{row.e_magnitude:11.4E}, " 

655 f"{row.b_magnitude:11.4E}, " 

656 f"{row.z_magnitude:11.4E}, " 

657 f"{row.z_phase:6.1f}, " 

658 f" {row.apparent_resistivity:11.4E}, " 

659 f"{row.apparent_resistivity_err:4.1f}, " 

660 f" {row.z_phase_err:6.1f}, " 

661 f" {row.coherency:5.3f}, " 

662 f" {int(row.fc_use):2}, " 

663 f" {int(row.fc_try):2}" 

664 ) 

665 

666 header_lines.append(line) 

667 

668 # Write to file 

669 if self.fn is None: 

670 raise ValueError("No filename specified for writing") 

671 

672 with self.fn.open("w") as fid: 

673 fid.write("\n".join(header_lines))