Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mth5 \ mth5 \ io \ phoenix \ readers \ mtu \ mtu_ts.py: 94%

137 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-10 00:01 -0800

1""" 

2======================================================================= 

3original comments from MATLAB script: 

4 

5read_tsn - reads a (binary) TS file of the legacy Phoenix MTU-5A instrument 

6(TS2, TS3, TS4, TS5) and the even older V5-2000 system (TSL, TSH), and 

7output the "ts" array and "tag" metadata dictionary. 

8 

9======================================================================= 

10Parameters: 

11 fpath: path to the TS file 

12 fname: name of the TS file (including extensions) 

13 

14Returns: 

15 ts: output array of the TS data 

16 tag: output dict of the TSn metadata 

17 

18======================================================================= 

19definition of the TS tag (or what I guessed after reading the user manual 

20and fiddling with their files) 

210-7 UTC time of first scan in the record. 

228-9 instrument serial number (16-bit integer) 

2310-11 number of scans in the record (16-bit integer) 

2412 number of channels per scan 

2513 tag length (TSn) or tag length code (TSH, TSL) 

2614 status code 

2715 bit-wise saturation flags (please note that the older TSH/L tag 

28 ends here ) 

2916 reserved for future indication of different tag and/or sample 

30 formats 

3117 sample length in bytes 

3218-19 sample rate (in units defined by byte 20) 

3320 units of sample rate 

3421 clock status 

3522-25 clock error in seconds 

3626-32 reserved; must be 0 

37 

38======================================================================= 

39notes on the TS format of TSn files: 

40The binary TS file consists of several data blocks, each contains a data 

41tag and a number of records in it. 

42Each time record consists of three bytes (24 bit), let's name them byte1, 

43byte2, and byte3: 

44the ts record (int) should be (+/-) (byte3*65536 + byte2*256 + byte1) 

45 

46Hao 

472012.07.04 

48Beijing 

49======================================================================= 

50""" 

51 

52from __future__ import annotations 

53 

54from pathlib import Path 

55from typing import BinaryIO 

56 

57import numpy as np 

58from loguru import logger 

59from mt_metadata.common import MTime 

60 

61from mth5.timeseries import ChannelTS, RunTS 

62 

63from .mtu_table import MTUTable 

64 

65 

66class MTUTSN: 

67 """ 

68 Reader for legacy Phoenix MTU-5A instrument time series binary files. 

69 

70 Reads time series data from Phoenix MTU-5A (.TS2, .TS3, .TS4, .TS5) and 

71 V5-2000 system (.TSL, .TSH) binary files. The data consists of 24-bit 

72 signed integers organized in data blocks with headers. 

73 

74 Parameters 

75 ---------- 

76 file_path : str or Path or None, optional 

77 Path to the TSN file to read. If None, the reader is created without 

78 loading data. Default is None. 

79 

80 Attributes 

81 ---------- 

82 file_path : Path or None 

83 Path to the currently loaded TSN file. 

84 ts : ndarray or None 

85 Time series data array with shape (n_channels, n_samples). 

86 tag : dict 

87 Metadata dictionary containing file information. 

88 

89 Examples 

90 -------- 

91 Read a TS3 file: 

92 

93 >>> from pathlib import Path 

94 >>> reader = MTUTSN('data/1690C16C.TS3') 

95 >>> print(reader.ts.shape) 

96 (3, 86400) 

97 >>> print(reader.tag['sample_rate']) 

98 24 

99 

100 Create reader without loading data: 

101 

102 >>> reader = MTUTSN() 

103 >>> reader.read('data/1690C16C.TS3') 

104 

105 Access metadata: 

106 

107 >>> reader = MTUTSN('data/1690C16C.TS4') 

108 >>> reader.read() 

109 >>> print(f"Channels: {reader.tag['n_ch']}") 

110 Channels: 4 

111 >>> print(f"Blocks: {reader.tag['n_block']}") 

112 Blocks: 48 

113 """ 

114 

115 def __init__(self, file_path: str | Path | None = None, **kwargs) -> None: 

116 self._p16 = 2**16 

117 self._p8 = 2**8 

118 self._accepted_extensions = ["TS2", "TS3", "TS4", "TS5", "TSL", "TSH"] 

119 self._file_path = None 

120 

121 self.file_path = file_path 

122 self.ts = None 

123 self.ts_metadata = None 

124 

125 for key, value in kwargs.items(): 

126 setattr(self, key, value) 

127 

128 @property 

129 def file_path(self) -> Path | None: 

130 """Get the TSN file path.""" 

131 return self._file_path 

132 

133 @file_path.setter 

134 def file_path(self, value: str | Path | None) -> None: 

135 """ 

136 Set the TSN file path with validation. 

137 

138 Parameters 

139 ---------- 

140 value : str or Path or None 

141 Path to the TSN file. Must exist and have a valid extension 

142 (.TS2, .TS3, .TS4, .TS5, .TSL, .TSH). 

143 

144 Raises 

145 ------ 

146 FileNotFoundError 

147 If the specified file does not exist. 

148 ValueError 

149 If the file extension is not recognized. 

150 

151 Examples 

152 -------- 

153 >>> reader = MTUTSN() 

154 >>> reader.file_path = 'data/1690C16C.TS3' 

155 >>> print(reader.file_path.name) 

156 1690C16C.TS3 

157 """ 

158 if value is not None: 

159 self._file_path = Path(value) 

160 if not self._file_path.exists(): 

161 msg = f"File path {self._file_path} does not exist." 

162 logger.error(msg) 

163 raise FileNotFoundError(msg) 

164 if ( 

165 self._file_path.suffix.upper().lstrip(".") 

166 not in self._accepted_extensions 

167 ): 

168 msg = ( 

169 f"File extension {self._file_path.suffix} is not a recognized " 

170 f"TSN format. Accepted extensions: {self._accepted_extensions}" 

171 ) 

172 logger.error(msg) 

173 raise ValueError(msg) 

174 else: 

175 self._file_path = None 

176 

177 def get_sign24(self, x: np.ndarray | list | int) -> np.ndarray: 

178 """ 

179 Convert unsigned 24-bit integers to signed integers. 

180 

181 Converts unsigned 24-bit values (0 to 16777215) to their signed 

182 equivalents (-8388608 to 8388607) by applying two's complement. 

183 

184 Parameters 

185 ---------- 

186 x : ndarray or list or int 

187 Unsigned 24-bit integer value(s) to convert. 

188 

189 Returns 

190 ------- 

191 ndarray 

192 Signed 24-bit integer value(s) as int32 array. 

193 

194 Examples 

195 -------- 

196 Convert a single positive value: 

197 

198 >>> reader = MTUTSN() 

199 >>> reader.get_sign24(100) 

200 array([100], dtype=int32) 

201 

202 Convert a single negative value (unsigned representation): 

203 

204 >>> reader.get_sign24(16777215) # -1 in 24-bit signed 

205 array([-1], dtype=int32) 

206 

207 Convert an array: 

208 

209 >>> values = np.array([0, 8388607, 8388608, 16777215]) 

210 >>> reader.get_sign24(values) 

211 array([ 0, 8388607, -8388608, -1], dtype=int32) 

212 """ 

213 x = np.array(x, dtype=np.int32) 

214 x[x > 2**23 - 1] = x[x > 2**23 - 1] - 2**24 

215 return x 

216 

217 def _read_header( 

218 self, ts_fid: BinaryIO 

219 ) -> tuple[MTime, int, int, int, int, str, int]: 

220 """ 

221 Read and parse the 32-byte header from a TSN file. 

222 

223 Extracts timestamp, instrument serial number, number of scans, 

224 channel count, tag length, instrument type, and sample rate from 

225 the binary file header. 

226 

227 Parameters 

228 ---------- 

229 ts_fid : BinaryIO 

230 Open binary file handle positioned at the start of the header. 

231 

232 Returns 

233 ------- 

234 start_time : MTime 

235 UTC timestamp of the first scan in the file. 

236 box_num : int 

237 Instrument serial number (16-bit integer). 

238 n_scan : int 

239 Number of scans per data block (16-bit integer). 

240 n_ch : int 

241 Number of channels per scan (3, 4, 5, or 6). 

242 tag_length : int 

243 Length of the tag in bytes (32 for MTU-5, 16 for V5-2000). 

244 ts_type : str 

245 Instrument type: 'MTU-5' or 'V5-2000'. 

246 sample_rate : int 

247 Sampling frequency in Hz (0 for V5-2000 files). 

248 

249 Examples 

250 -------- 

251 >>> with open('data/1690C16C.TS3', 'rb') as f: 

252 ... reader = MTUTSN() 

253 ... start, box, scan, ch, tag_len, ts_type, sr = reader._read_header(f) 

254 ... print(f"Type: {ts_type}, Rate: {sr} Hz, Channels: {ch}") 

255 Type: MTU-5, Rate: 24 Hz, Channels: 3 

256 """ 

257 # Starting time 

258 s = ts_fid.read(1)[0] # Starting second 

259 m = ts_fid.read(1)[0] # Starting minute 

260 h = ts_fid.read(1)[0] # Starting hour 

261 d = ts_fid.read(1)[0] # Starting day 

262 l = ts_fid.read(1)[0] # Starting month 

263 y = ts_fid.read(1)[0] # Starting year 

264 ts_fid.read(1) # skip the Starting weekday 

265 c = ts_fid.read(1)[0] # Starting century(-1) 

266 start = MTime(time_stamp=f"{(c*100)+y}-{l:02d}-{d:02d}T{h:02d}:{m:02d}:{s:02d}") 

267 logger.info(f"Start time is: {start.isoformat()} UTC") 

268 

269 # Box series number (16-bit integer) 

270 box_num = int.from_bytes(ts_fid.read(2), byteorder="little", signed=False) 

271 # Number of scans in a data block (16-bit integer) 

272 n_scan = int.from_bytes(ts_fid.read(2), byteorder="little", signed=False) 

273 # Number of channels in a record 

274 n_ch = ts_fid.read(1)[0] 

275 # Length of the tag 

276 tag_length = ts_fid.read(1)[0] 

277 

278 if tag_length != 32: 

279 ts_type = "V5-2000" 

280 sample_rate = 0 

281 else: 

282 ts_type = "MTU-5" 

283 ts_fid.seek(4, 1) # skip to sampling frequency 

284 # Sampling frequency (16-bit integer, little endian) 

285 sample_rate = int.from_bytes( 

286 ts_fid.read(2), byteorder="little", signed=False 

287 ) 

288 ts_fid.seek(12, 1) # skip some (unknown) head info... 

289 logger.info(f"Sampling frequency is {sample_rate} Hz") 

290 logger.info(f"Number of records is {n_scan} in each data block") 

291 

292 logger.info(f"TS type is: {ts_type}") 

293 return start, box_num, n_scan, n_ch, tag_length, ts_type, sample_rate 

294 

295 def _extract_channel_data( 

296 self, data: np.ndarray, byte_indices: list[int] 

297 ) -> np.ndarray: 

298 """ 

299 Extract and convert 24-bit signed channel data from raw bytes. 

300 

301 Combines three consecutive bytes (low, middle, high) per sample to 

302 reconstruct 24-bit signed integer values for a single channel. 

303 

304 Parameters 

305 ---------- 

306 data : ndarray 

307 Raw byte data array with shape (n_ch*3, n_scan), where each row 

308 represents one byte position and each column is one time sample. 

309 byte_indices : list of int 

310 Three byte indices [low_byte, mid_byte, high_byte] indicating which 

311 rows of the data array contain the 24-bit value components. 

312 

313 Returns 

314 ------- 

315 ndarray 

316 Signed 24-bit integer values for the channel with shape (n_scan,). 

317 

318 Examples 

319 -------- 

320 Extract channel 0 from 3-channel data: 

321 

322 >>> data = np.random.randint(0, 256, size=(9, 1000), dtype=np.int32) 

323 >>> reader = MTUTSN() 

324 >>> ch0_data = reader._extract_channel_data(data, [0, 1, 2]) 

325 >>> print(ch0_data.shape) 

326 (1000,) 

327 

328 Extract channel 1 from 4-channel data: 

329 

330 >>> data = np.random.randint(0, 256, size=(12, 500), dtype=np.int32) 

331 >>> ch1_data = reader._extract_channel_data(data, [3, 4, 5]) 

332 >>> print(ch1_data.shape) 

333 (500,) 

334 """ 

335 return self.get_sign24( 

336 data[byte_indices[2], :] * self._p16 

337 + data[byte_indices[1], :] * self._p8 

338 + data[byte_indices[0], :] 

339 ) 

340 

341 def _process_data_block( 

342 self, 

343 ts_fid: BinaryIO, 

344 n_ch: int, 

345 n_scan: int, 

346 ts: np.ndarray, 

347 start_idx: int, 

348 end_idx: int, 

349 ) -> bool: 

350 """ 

351 Read and process a single data block from the TSN file. 

352 

353 Reads one data block consisting of a 32-byte header followed by 

354 n_scan*n_ch*3 bytes of 24-bit channel data. Extracts and converts 

355 the data for all channels and populates the specified slice of the 

356 time series array. 

357 

358 Parameters 

359 ---------- 

360 ts_fid : BinaryIO 

361 Open binary file handle positioned at the start of a data block. 

362 n_ch : int 

363 Number of channels (3, 4, 5, or 6). 

364 n_scan : int 

365 Number of time samples in this block. 

366 ts : ndarray 

367 Pre-allocated time series array to populate with shape 

368 (n_ch, total_samples). 

369 start_idx : int 

370 Starting index in the second dimension of ts for this block's data. 

371 end_idx : int 

372 Ending index (exclusive) in the second dimension of ts. 

373 

374 Returns 

375 ------- 

376 bool 

377 True if data was read and processed successfully, False if end of 

378 file reached or unsupported channel count. 

379 

380 Examples 

381 -------- 

382 Process blocks from a file: 

383 

384 >>> with open('data/1690C16C.TS3', 'rb') as f: 

385 ... reader = MTUTSN() 

386 ... # ... read header first ... 

387 ... ts = np.zeros((3, 3600)) 

388 ... success = reader._process_data_block(f, 3, 1200, ts, 0, 1200) 

389 ... print(f"Block processed: {success}") 

390 Block processed: True 

391 """ 

392 ts_fid.seek(32, 1) # skip the file tag 

393 data = np.frombuffer(ts_fid.read(n_ch * 3 * n_scan), dtype=np.uint8) 

394 

395 if len(data) == 0: 

396 logger.warning("No data read in current block...") 

397 return False 

398 

399 # Reshape data to [Nch*3, Nscan] (Fortran order to match MATLAB) 

400 # Convert to int32 to avoid overflow during multiplication 

401 data = data.reshape((n_scan, n_ch * 3)).T.astype(np.int32) 

402 

403 # Channel byte index mapping (each channel uses 3 consecutive bytes) 

404 # Format: [low_byte, mid_byte, high_byte] 

405 channel_map = { 

406 3: {0: [0, 1, 2], 1: [3, 4, 5], 2: [6, 7, 8]}, 

407 4: {0: [0, 1, 2], 1: [3, 4, 5], 2: [6, 7, 8], 3: [12, 13, 14]}, 

408 5: { 

409 0: [0, 1, 2], 

410 1: [3, 4, 5], 

411 2: [6, 7, 8], 

412 3: [9, 10, 11], 

413 4: [12, 13, 14], 

414 }, 

415 6: { 

416 0: [0, 1, 2], 

417 1: [3, 4, 5], 

418 2: [6, 7, 8], 

419 3: [9, 10, 11], 

420 4: [12, 13, 14], 

421 5: [15, 16, 17], 

422 }, 

423 } 

424 

425 if n_ch in channel_map: 

426 for ch_idx, byte_indices in channel_map[n_ch].items(): 

427 ts[ch_idx, start_idx:end_idx] = self._extract_channel_data( 

428 data, byte_indices 

429 ) 

430 else: 

431 logger.warning(f"Unsupported number of channels: {n_ch}") 

432 return False 

433 

434 return True 

435 

436 def read(self, file_path: str | Path | None = None) -> None: 

437 """ 

438 Read and parse a Phoenix MTU time series binary file. 

439 

440 Reads complete time series data from legacy Phoenix MTU-5A instrument 

441 files (.TS2, .TS3, .TS4, .TS5) or V5-2000 system files (.TSL, .TSH). 

442 Each file contains multiple data blocks with 24-bit signed integer 

443 samples organized by channel. 

444 

445 Parameters 

446 ---------- 

447 file_path : str or Path or None, optional 

448 Path to the TSN file to read. If None, uses the current file_path 

449 attribute. Default is None. 

450 

451 Returns 

452 ------- 

453 ts : ndarray 

454 Time series data array with shape (n_channels, total_samples). 

455 Data type is float64. Each row represents one channel, and each 

456 column is a time sample. 

457 tag : dict 

458 Metadata dictionary containing file information with keys: 

459 

460 - 'box_number' (int): Instrument serial number 

461 - 'ts_type' (str): Instrument type ('MTU-5' or 'V5-2000') 

462 - 'sample_rate' (int): Sampling frequency in Hz 

463 - 'n_ch' (int): Number of channels 

464 - 'n_scan' (int): Number of scans per data block 

465 - 'start' (MTime): UTC timestamp of first sample 

466 - 'ts_length' (float): Duration of each block in seconds 

467 - 'n_block' (int): Total number of data blocks in file 

468 

469 Raises 

470 ------ 

471 EOFError 

472 If the file is empty or cannot be read. 

473 ValueError 

474 If the file has an unsupported extension or channel count. 

475 FileNotFoundError 

476 If the specified file does not exist. 

477 

478 Examples 

479 -------- 

480 Read a 3-channel TS3 file: 

481 

482 >>> reader = MTUTSN() 

483 >>> ts, tag = reader.read('data/1690C16C.TS3') 

484 >>> print(f"Shape: {ts.shape}") 

485 Shape: (3, 86400) 

486 >>> print(f"Sample rate: {tag['sample_rate']} Hz") 

487 Sample rate: 24 Hz 

488 >>> print(f"Duration: {ts.shape[1] / tag['sample_rate']:.1f} seconds") 

489 Duration: 3600.0 seconds 

490 

491 Read a 4-channel TS4 file: 

492 

493 >>> reader = MTUTSN('data/1690C16C.TS4') 

494 >>> print(f"Channels: {reader.tag['n_ch']}") 

495 Channels: 4 

496 >>> print(f"Start time: {reader.tag['start'].isoformat()}") 

497 Start time: 2016-07-16T00:00:00+00:00 

498 

499 Read and process data: 

500 

501 >>> ts, tag = MTUTSN().read('data/station.TS5') 

502 >>> # Calculate statistics for each channel 

503 >>> for i in range(tag['n_ch']): 

504 ... print(f"Ch{i} mean: {ts[i].mean():.2f}, std: {ts[i].std():.2f}") 

505 Ch0 mean: 123.45, std: 456.78 

506 Ch1 mean: -234.56, std: 567.89 

507 ... 

508 """ 

509 # try opening the ts data file 

510 if file_path is not None: 

511 self.file_path = file_path 

512 

513 logger.info(f"Opening file: {self.file_path.name} ...") 

514 

515 with open(self.file_path, "rb") as ts_fid: 

516 # Read the header to check if file is empty 

517 s_byte = ts_fid.read(1) # Starting second 

518 if len(s_byte) == 0: 

519 raise EOFError("TSN file is empty or could not be read.") 

520 

521 # Reset to beginning and read full header 

522 ts_fid.seek(0, 0) 

523 ( 

524 start, 

525 box_num, 

526 n_scan, 

527 n_ch, 

528 tag_length, 

529 ts_type, 

530 sample_rate, 

531 ) = self._read_header(ts_fid) 

532 

533 # Calculate file size and number of blocks 

534 ts_fid.seek(0, 2) 

535 file_size = ts_fid.tell() 

536 n_block = round(file_size / (n_scan * n_ch * 3 + 32)) 

537 

538 # Preallocate memory for time series data 

539 ts = np.zeros((n_ch, n_scan * n_block), dtype=np.float64) 

540 logger.info(f"Total {n_block} block(s) found in current file") 

541 

542 # Reset to beginning of file and process each data block 

543 ts_fid.seek(0, 0) 

544 

545 for iblock in range(n_block): 

546 start_idx = iblock * n_scan 

547 end_idx = (iblock + 1) * n_scan 

548 

549 if not self._process_data_block( 

550 ts_fid, n_ch, n_scan, ts, start_idx, end_idx 

551 ): 

552 break 

553 

554 logger.info("# finish reading time series...") 

555 

556 # Build the tag dictionary 

557 tag = { 

558 "box_number": box_num, 

559 "ts_type": ts_type, 

560 "sample_rate": sample_rate, 

561 "n_ch": n_ch, 

562 "n_scan": n_scan, 

563 "start": start, 

564 "ts_length": n_scan / sample_rate if sample_rate > 0 else 0, 

565 "n_block": n_block, 

566 } 

567 

568 self.ts = ts 

569 self.ts_metadata = tag 

570 

571 def to_runts( 

572 self, table_filepath: str | Path | None = None, calibrate=True 

573 ) -> RunTS: 

574 """ 

575 Create an MTUTable object from the TSN file and associated TBL file. 

576 

577 Parameters 

578 ---------- 

579 table_filepath : str or Path 

580 Path to the corresponding TBL file. 

581 

582 Returns 

583 ------- 

584 MTUTable 

585 An MTUTable object containing metadata from the TBL file. 

586 

587 Examples 

588 -------- 

589 >>> reader = MTUTSN('data/1690C16C.TS3') 

590 >>> mtu_table = reader.to_runts('data/1690C16C.TBL') 

591 >>> print(mtu_table.metadata) 

592 {...} 

593 """ 

594 # Read data if not already loaded 

595 if self.ts is None or self.ts_metadata is None: 

596 self.read() 

597 

598 ts = self.ts 

599 ts_metadata = self.ts_metadata 

600 

601 if table_filepath is None and self.file_path is not None: 

602 table_filepath = self.file_path.with_suffix(".TBL") 

603 

604 mtu_table = MTUTable(table_filepath) 

605 survey_metadata = mtu_table.survey_metadata # to trigger warning if no data 

606 run_metadata = mtu_table.run_metadata.copy() 

607 run_metadata.sample_rate = ts_metadata["sample_rate"] 

608 # update run id to include sample rate 

609 run_metadata.id = f"sr{ts_metadata['sample_rate']}_001" 

610 run_ts = RunTS( 

611 survey_metadata=mtu_table.survey_metadata, 

612 station_metadata=survey_metadata.stations[0], 

613 run_metadata=run_metadata, 

614 ) 

615 for comp, channel_number in mtu_table.channel_keys.items(): 

616 channel_metadata = getattr(mtu_table, f"{comp}_metadata") 

617 channel_metadata.sample_rate = ts_metadata["sample_rate"] 

618 channel_metadata.start_time = ts_metadata["start"] 

619 

620 # Channel numbers in TBL are 1-indexed, convert to 0-indexed for numpy 

621 channel_index = channel_number - 1 

622 

623 if calibrate: 

624 if comp in ["ex", "ey"]: 

625 scale_factor = ( 

626 mtu_table.ex_calibration 

627 if comp == "ex" 

628 else mtu_table.ey_calibration 

629 ) 

630 channel_metadata.units = "mV/km" 

631 

632 elif comp in ["hx", "hy", "hz"]: 

633 scale_factor = mtu_table.magnetic_calibration 

634 channel_metadata.units = "nT" 

635 

636 logger.info( 

637 f"Applying scale factor of {scale_factor} to channel {comp}" 

638 ) 

639 ts[channel_index, :] = ts[channel_index, :] * scale_factor 

640 

641 # Determine channel type 

642 channel_type = "electric" if comp[0] in ["e"] else "magnetic" 

643 

644 ch_ts = ChannelTS( 

645 channel_type=channel_type, 

646 data=ts[channel_index, :], 

647 channel_metadata=channel_metadata, 

648 ) 

649 run_ts.add_channel(ch_ts) 

650 

651 return run_ts