Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mth5 \ mth5 \ io \ phoenix \ readers \ mtu \ mtu_ts.py: 94%
137 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:01 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:01 -0800
1"""
2=======================================================================
3original comments from MATLAB script:
5read_tsn - reads a (binary) TS file of the legacy Phoenix MTU-5A instrument
6(TS2, TS3, TS4, TS5) and the even older V5-2000 system (TSL, TSH), and
7output the "ts" array and "tag" metadata dictionary.
9=======================================================================
10Parameters:
11 fpath: path to the TS file
12 fname: name of the TS file (including extensions)
14Returns:
15 ts: output array of the TS data
16 tag: output dict of the TSn metadata
18=======================================================================
19definition of the TS tag (or what I guessed after reading the user manual
20and fiddling with their files)
210-7 UTC time of first scan in the record.
228-9 instrument serial number (16-bit integer)
2310-11 number of scans in the record (16-bit integer)
2412 number of channels per scan
2513 tag length (TSn) or tag length code (TSH, TSL)
2614 status code
2715 bit-wise saturation flags (please note that the older TSH/L tag
28 ends here )
2916 reserved for future indication of different tag and/or sample
30 formats
3117 sample length in bytes
3218-19 sample rate (in units defined by byte 20)
3320 units of sample rate
3421 clock status
3522-25 clock error in seconds
3626-32 reserved; must be 0
38=======================================================================
39notes on the TS format of TSn files:
40The binary TS file consists of several data blocks, each contains a data
41tag and a number of records in it.
42Each time record consists of three bytes (24 bit), let's name them byte1,
43byte2, and byte3:
44the ts record (int) should be (+/-) (byte3*65536 + byte2*256 + byte1)
46Hao
472012.07.04
48Beijing
49=======================================================================
50"""
52from __future__ import annotations
54from pathlib import Path
55from typing import BinaryIO
57import numpy as np
58from loguru import logger
59from mt_metadata.common import MTime
61from mth5.timeseries import ChannelTS, RunTS
63from .mtu_table import MTUTable
66class MTUTSN:
67 """
68 Reader for legacy Phoenix MTU-5A instrument time series binary files.
70 Reads time series data from Phoenix MTU-5A (.TS2, .TS3, .TS4, .TS5) and
71 V5-2000 system (.TSL, .TSH) binary files. The data consists of 24-bit
72 signed integers organized in data blocks with headers.
74 Parameters
75 ----------
76 file_path : str or Path or None, optional
77 Path to the TSN file to read. If None, the reader is created without
78 loading data. Default is None.
80 Attributes
81 ----------
82 file_path : Path or None
83 Path to the currently loaded TSN file.
84 ts : ndarray or None
85 Time series data array with shape (n_channels, n_samples).
86 tag : dict
87 Metadata dictionary containing file information.
89 Examples
90 --------
91 Read a TS3 file:
93 >>> from pathlib import Path
94 >>> reader = MTUTSN('data/1690C16C.TS3')
95 >>> print(reader.ts.shape)
96 (3, 86400)
97 >>> print(reader.tag['sample_rate'])
98 24
100 Create reader without loading data:
102 >>> reader = MTUTSN()
103 >>> reader.read('data/1690C16C.TS3')
105 Access metadata:
107 >>> reader = MTUTSN('data/1690C16C.TS4')
108 >>> reader.read()
109 >>> print(f"Channels: {reader.tag['n_ch']}")
110 Channels: 4
111 >>> print(f"Blocks: {reader.tag['n_block']}")
112 Blocks: 48
113 """
115 def __init__(self, file_path: str | Path | None = None, **kwargs) -> None:
116 self._p16 = 2**16
117 self._p8 = 2**8
118 self._accepted_extensions = ["TS2", "TS3", "TS4", "TS5", "TSL", "TSH"]
119 self._file_path = None
121 self.file_path = file_path
122 self.ts = None
123 self.ts_metadata = None
125 for key, value in kwargs.items():
126 setattr(self, key, value)
128 @property
129 def file_path(self) -> Path | None:
130 """Get the TSN file path."""
131 return self._file_path
133 @file_path.setter
134 def file_path(self, value: str | Path | None) -> None:
135 """
136 Set the TSN file path with validation.
138 Parameters
139 ----------
140 value : str or Path or None
141 Path to the TSN file. Must exist and have a valid extension
142 (.TS2, .TS3, .TS4, .TS5, .TSL, .TSH).
144 Raises
145 ------
146 FileNotFoundError
147 If the specified file does not exist.
148 ValueError
149 If the file extension is not recognized.
151 Examples
152 --------
153 >>> reader = MTUTSN()
154 >>> reader.file_path = 'data/1690C16C.TS3'
155 >>> print(reader.file_path.name)
156 1690C16C.TS3
157 """
158 if value is not None:
159 self._file_path = Path(value)
160 if not self._file_path.exists():
161 msg = f"File path {self._file_path} does not exist."
162 logger.error(msg)
163 raise FileNotFoundError(msg)
164 if (
165 self._file_path.suffix.upper().lstrip(".")
166 not in self._accepted_extensions
167 ):
168 msg = (
169 f"File extension {self._file_path.suffix} is not a recognized "
170 f"TSN format. Accepted extensions: {self._accepted_extensions}"
171 )
172 logger.error(msg)
173 raise ValueError(msg)
174 else:
175 self._file_path = None
177 def get_sign24(self, x: np.ndarray | list | int) -> np.ndarray:
178 """
179 Convert unsigned 24-bit integers to signed integers.
181 Converts unsigned 24-bit values (0 to 16777215) to their signed
182 equivalents (-8388608 to 8388607) by applying two's complement.
184 Parameters
185 ----------
186 x : ndarray or list or int
187 Unsigned 24-bit integer value(s) to convert.
189 Returns
190 -------
191 ndarray
192 Signed 24-bit integer value(s) as int32 array.
194 Examples
195 --------
196 Convert a single positive value:
198 >>> reader = MTUTSN()
199 >>> reader.get_sign24(100)
200 array([100], dtype=int32)
202 Convert a single negative value (unsigned representation):
204 >>> reader.get_sign24(16777215) # -1 in 24-bit signed
205 array([-1], dtype=int32)
207 Convert an array:
209 >>> values = np.array([0, 8388607, 8388608, 16777215])
210 >>> reader.get_sign24(values)
211 array([ 0, 8388607, -8388608, -1], dtype=int32)
212 """
213 x = np.array(x, dtype=np.int32)
214 x[x > 2**23 - 1] = x[x > 2**23 - 1] - 2**24
215 return x
217 def _read_header(
218 self, ts_fid: BinaryIO
219 ) -> tuple[MTime, int, int, int, int, str, int]:
220 """
221 Read and parse the 32-byte header from a TSN file.
223 Extracts timestamp, instrument serial number, number of scans,
224 channel count, tag length, instrument type, and sample rate from
225 the binary file header.
227 Parameters
228 ----------
229 ts_fid : BinaryIO
230 Open binary file handle positioned at the start of the header.
232 Returns
233 -------
234 start_time : MTime
235 UTC timestamp of the first scan in the file.
236 box_num : int
237 Instrument serial number (16-bit integer).
238 n_scan : int
239 Number of scans per data block (16-bit integer).
240 n_ch : int
241 Number of channels per scan (3, 4, 5, or 6).
242 tag_length : int
243 Length of the tag in bytes (32 for MTU-5, 16 for V5-2000).
244 ts_type : str
245 Instrument type: 'MTU-5' or 'V5-2000'.
246 sample_rate : int
247 Sampling frequency in Hz (0 for V5-2000 files).
249 Examples
250 --------
251 >>> with open('data/1690C16C.TS3', 'rb') as f:
252 ... reader = MTUTSN()
253 ... start, box, scan, ch, tag_len, ts_type, sr = reader._read_header(f)
254 ... print(f"Type: {ts_type}, Rate: {sr} Hz, Channels: {ch}")
255 Type: MTU-5, Rate: 24 Hz, Channels: 3
256 """
257 # Starting time
258 s = ts_fid.read(1)[0] # Starting second
259 m = ts_fid.read(1)[0] # Starting minute
260 h = ts_fid.read(1)[0] # Starting hour
261 d = ts_fid.read(1)[0] # Starting day
262 l = ts_fid.read(1)[0] # Starting month
263 y = ts_fid.read(1)[0] # Starting year
264 ts_fid.read(1) # skip the Starting weekday
265 c = ts_fid.read(1)[0] # Starting century(-1)
266 start = MTime(time_stamp=f"{(c*100)+y}-{l:02d}-{d:02d}T{h:02d}:{m:02d}:{s:02d}")
267 logger.info(f"Start time is: {start.isoformat()} UTC")
269 # Box series number (16-bit integer)
270 box_num = int.from_bytes(ts_fid.read(2), byteorder="little", signed=False)
271 # Number of scans in a data block (16-bit integer)
272 n_scan = int.from_bytes(ts_fid.read(2), byteorder="little", signed=False)
273 # Number of channels in a record
274 n_ch = ts_fid.read(1)[0]
275 # Length of the tag
276 tag_length = ts_fid.read(1)[0]
278 if tag_length != 32:
279 ts_type = "V5-2000"
280 sample_rate = 0
281 else:
282 ts_type = "MTU-5"
283 ts_fid.seek(4, 1) # skip to sampling frequency
284 # Sampling frequency (16-bit integer, little endian)
285 sample_rate = int.from_bytes(
286 ts_fid.read(2), byteorder="little", signed=False
287 )
288 ts_fid.seek(12, 1) # skip some (unknown) head info...
289 logger.info(f"Sampling frequency is {sample_rate} Hz")
290 logger.info(f"Number of records is {n_scan} in each data block")
292 logger.info(f"TS type is: {ts_type}")
293 return start, box_num, n_scan, n_ch, tag_length, ts_type, sample_rate
295 def _extract_channel_data(
296 self, data: np.ndarray, byte_indices: list[int]
297 ) -> np.ndarray:
298 """
299 Extract and convert 24-bit signed channel data from raw bytes.
301 Combines three consecutive bytes (low, middle, high) per sample to
302 reconstruct 24-bit signed integer values for a single channel.
304 Parameters
305 ----------
306 data : ndarray
307 Raw byte data array with shape (n_ch*3, n_scan), where each row
308 represents one byte position and each column is one time sample.
309 byte_indices : list of int
310 Three byte indices [low_byte, mid_byte, high_byte] indicating which
311 rows of the data array contain the 24-bit value components.
313 Returns
314 -------
315 ndarray
316 Signed 24-bit integer values for the channel with shape (n_scan,).
318 Examples
319 --------
320 Extract channel 0 from 3-channel data:
322 >>> data = np.random.randint(0, 256, size=(9, 1000), dtype=np.int32)
323 >>> reader = MTUTSN()
324 >>> ch0_data = reader._extract_channel_data(data, [0, 1, 2])
325 >>> print(ch0_data.shape)
326 (1000,)
328 Extract channel 1 from 4-channel data:
330 >>> data = np.random.randint(0, 256, size=(12, 500), dtype=np.int32)
331 >>> ch1_data = reader._extract_channel_data(data, [3, 4, 5])
332 >>> print(ch1_data.shape)
333 (500,)
334 """
335 return self.get_sign24(
336 data[byte_indices[2], :] * self._p16
337 + data[byte_indices[1], :] * self._p8
338 + data[byte_indices[0], :]
339 )
341 def _process_data_block(
342 self,
343 ts_fid: BinaryIO,
344 n_ch: int,
345 n_scan: int,
346 ts: np.ndarray,
347 start_idx: int,
348 end_idx: int,
349 ) -> bool:
350 """
351 Read and process a single data block from the TSN file.
353 Reads one data block consisting of a 32-byte header followed by
354 n_scan*n_ch*3 bytes of 24-bit channel data. Extracts and converts
355 the data for all channels and populates the specified slice of the
356 time series array.
358 Parameters
359 ----------
360 ts_fid : BinaryIO
361 Open binary file handle positioned at the start of a data block.
362 n_ch : int
363 Number of channels (3, 4, 5, or 6).
364 n_scan : int
365 Number of time samples in this block.
366 ts : ndarray
367 Pre-allocated time series array to populate with shape
368 (n_ch, total_samples).
369 start_idx : int
370 Starting index in the second dimension of ts for this block's data.
371 end_idx : int
372 Ending index (exclusive) in the second dimension of ts.
374 Returns
375 -------
376 bool
377 True if data was read and processed successfully, False if end of
378 file reached or unsupported channel count.
380 Examples
381 --------
382 Process blocks from a file:
384 >>> with open('data/1690C16C.TS3', 'rb') as f:
385 ... reader = MTUTSN()
386 ... # ... read header first ...
387 ... ts = np.zeros((3, 3600))
388 ... success = reader._process_data_block(f, 3, 1200, ts, 0, 1200)
389 ... print(f"Block processed: {success}")
390 Block processed: True
391 """
392 ts_fid.seek(32, 1) # skip the file tag
393 data = np.frombuffer(ts_fid.read(n_ch * 3 * n_scan), dtype=np.uint8)
395 if len(data) == 0:
396 logger.warning("No data read in current block...")
397 return False
399 # Reshape data to [Nch*3, Nscan] (Fortran order to match MATLAB)
400 # Convert to int32 to avoid overflow during multiplication
401 data = data.reshape((n_scan, n_ch * 3)).T.astype(np.int32)
403 # Channel byte index mapping (each channel uses 3 consecutive bytes)
404 # Format: [low_byte, mid_byte, high_byte]
405 channel_map = {
406 3: {0: [0, 1, 2], 1: [3, 4, 5], 2: [6, 7, 8]},
407 4: {0: [0, 1, 2], 1: [3, 4, 5], 2: [6, 7, 8], 3: [12, 13, 14]},
408 5: {
409 0: [0, 1, 2],
410 1: [3, 4, 5],
411 2: [6, 7, 8],
412 3: [9, 10, 11],
413 4: [12, 13, 14],
414 },
415 6: {
416 0: [0, 1, 2],
417 1: [3, 4, 5],
418 2: [6, 7, 8],
419 3: [9, 10, 11],
420 4: [12, 13, 14],
421 5: [15, 16, 17],
422 },
423 }
425 if n_ch in channel_map:
426 for ch_idx, byte_indices in channel_map[n_ch].items():
427 ts[ch_idx, start_idx:end_idx] = self._extract_channel_data(
428 data, byte_indices
429 )
430 else:
431 logger.warning(f"Unsupported number of channels: {n_ch}")
432 return False
434 return True
436 def read(self, file_path: str | Path | None = None) -> None:
437 """
438 Read and parse a Phoenix MTU time series binary file.
440 Reads complete time series data from legacy Phoenix MTU-5A instrument
441 files (.TS2, .TS3, .TS4, .TS5) or V5-2000 system files (.TSL, .TSH).
442 Each file contains multiple data blocks with 24-bit signed integer
443 samples organized by channel.
445 Parameters
446 ----------
447 file_path : str or Path or None, optional
448 Path to the TSN file to read. If None, uses the current file_path
449 attribute. Default is None.
451 Returns
452 -------
453 ts : ndarray
454 Time series data array with shape (n_channels, total_samples).
455 Data type is float64. Each row represents one channel, and each
456 column is a time sample.
457 tag : dict
458 Metadata dictionary containing file information with keys:
460 - 'box_number' (int): Instrument serial number
461 - 'ts_type' (str): Instrument type ('MTU-5' or 'V5-2000')
462 - 'sample_rate' (int): Sampling frequency in Hz
463 - 'n_ch' (int): Number of channels
464 - 'n_scan' (int): Number of scans per data block
465 - 'start' (MTime): UTC timestamp of first sample
466 - 'ts_length' (float): Duration of each block in seconds
467 - 'n_block' (int): Total number of data blocks in file
469 Raises
470 ------
471 EOFError
472 If the file is empty or cannot be read.
473 ValueError
474 If the file has an unsupported extension or channel count.
475 FileNotFoundError
476 If the specified file does not exist.
478 Examples
479 --------
480 Read a 3-channel TS3 file:
482 >>> reader = MTUTSN()
483 >>> ts, tag = reader.read('data/1690C16C.TS3')
484 >>> print(f"Shape: {ts.shape}")
485 Shape: (3, 86400)
486 >>> print(f"Sample rate: {tag['sample_rate']} Hz")
487 Sample rate: 24 Hz
488 >>> print(f"Duration: {ts.shape[1] / tag['sample_rate']:.1f} seconds")
489 Duration: 3600.0 seconds
491 Read a 4-channel TS4 file:
493 >>> reader = MTUTSN('data/1690C16C.TS4')
494 >>> print(f"Channels: {reader.tag['n_ch']}")
495 Channels: 4
496 >>> print(f"Start time: {reader.tag['start'].isoformat()}")
497 Start time: 2016-07-16T00:00:00+00:00
499 Read and process data:
501 >>> ts, tag = MTUTSN().read('data/station.TS5')
502 >>> # Calculate statistics for each channel
503 >>> for i in range(tag['n_ch']):
504 ... print(f"Ch{i} mean: {ts[i].mean():.2f}, std: {ts[i].std():.2f}")
505 Ch0 mean: 123.45, std: 456.78
506 Ch1 mean: -234.56, std: 567.89
507 ...
508 """
509 # try opening the ts data file
510 if file_path is not None:
511 self.file_path = file_path
513 logger.info(f"Opening file: {self.file_path.name} ...")
515 with open(self.file_path, "rb") as ts_fid:
516 # Read the header to check if file is empty
517 s_byte = ts_fid.read(1) # Starting second
518 if len(s_byte) == 0:
519 raise EOFError("TSN file is empty or could not be read.")
521 # Reset to beginning and read full header
522 ts_fid.seek(0, 0)
523 (
524 start,
525 box_num,
526 n_scan,
527 n_ch,
528 tag_length,
529 ts_type,
530 sample_rate,
531 ) = self._read_header(ts_fid)
533 # Calculate file size and number of blocks
534 ts_fid.seek(0, 2)
535 file_size = ts_fid.tell()
536 n_block = round(file_size / (n_scan * n_ch * 3 + 32))
538 # Preallocate memory for time series data
539 ts = np.zeros((n_ch, n_scan * n_block), dtype=np.float64)
540 logger.info(f"Total {n_block} block(s) found in current file")
542 # Reset to beginning of file and process each data block
543 ts_fid.seek(0, 0)
545 for iblock in range(n_block):
546 start_idx = iblock * n_scan
547 end_idx = (iblock + 1) * n_scan
549 if not self._process_data_block(
550 ts_fid, n_ch, n_scan, ts, start_idx, end_idx
551 ):
552 break
554 logger.info("# finish reading time series...")
556 # Build the tag dictionary
557 tag = {
558 "box_number": box_num,
559 "ts_type": ts_type,
560 "sample_rate": sample_rate,
561 "n_ch": n_ch,
562 "n_scan": n_scan,
563 "start": start,
564 "ts_length": n_scan / sample_rate if sample_rate > 0 else 0,
565 "n_block": n_block,
566 }
568 self.ts = ts
569 self.ts_metadata = tag
571 def to_runts(
572 self, table_filepath: str | Path | None = None, calibrate=True
573 ) -> RunTS:
574 """
575 Create an MTUTable object from the TSN file and associated TBL file.
577 Parameters
578 ----------
579 table_filepath : str or Path
580 Path to the corresponding TBL file.
582 Returns
583 -------
584 MTUTable
585 An MTUTable object containing metadata from the TBL file.
587 Examples
588 --------
589 >>> reader = MTUTSN('data/1690C16C.TS3')
590 >>> mtu_table = reader.to_runts('data/1690C16C.TBL')
591 >>> print(mtu_table.metadata)
592 {...}
593 """
594 # Read data if not already loaded
595 if self.ts is None or self.ts_metadata is None:
596 self.read()
598 ts = self.ts
599 ts_metadata = self.ts_metadata
601 if table_filepath is None and self.file_path is not None:
602 table_filepath = self.file_path.with_suffix(".TBL")
604 mtu_table = MTUTable(table_filepath)
605 survey_metadata = mtu_table.survey_metadata # to trigger warning if no data
606 run_metadata = mtu_table.run_metadata.copy()
607 run_metadata.sample_rate = ts_metadata["sample_rate"]
608 # update run id to include sample rate
609 run_metadata.id = f"sr{ts_metadata['sample_rate']}_001"
610 run_ts = RunTS(
611 survey_metadata=mtu_table.survey_metadata,
612 station_metadata=survey_metadata.stations[0],
613 run_metadata=run_metadata,
614 )
615 for comp, channel_number in mtu_table.channel_keys.items():
616 channel_metadata = getattr(mtu_table, f"{comp}_metadata")
617 channel_metadata.sample_rate = ts_metadata["sample_rate"]
618 channel_metadata.start_time = ts_metadata["start"]
620 # Channel numbers in TBL are 1-indexed, convert to 0-indexed for numpy
621 channel_index = channel_number - 1
623 if calibrate:
624 if comp in ["ex", "ey"]:
625 scale_factor = (
626 mtu_table.ex_calibration
627 if comp == "ex"
628 else mtu_table.ey_calibration
629 )
630 channel_metadata.units = "mV/km"
632 elif comp in ["hx", "hy", "hz"]:
633 scale_factor = mtu_table.magnetic_calibration
634 channel_metadata.units = "nT"
636 logger.info(
637 f"Applying scale factor of {scale_factor} to channel {comp}"
638 )
639 ts[channel_index, :] = ts[channel_index, :] * scale_factor
641 # Determine channel type
642 channel_type = "electric" if comp[0] in ["e"] else "magnetic"
644 ch_ts = ChannelTS(
645 channel_type=channel_type,
646 data=ts[channel_index, :],
647 channel_metadata=channel_metadata,
648 )
649 run_ts.add_channel(ch_ts)
651 return run_ts