Module seisio.seg2
I/O of seismic files in SEG2 format.
Classes
class Reader (file)-
Class to deal with input of seismic files in SEG2 format.
Initialize class Reader.
Parameters
file:strorpathlib.Path- The name of the SEG2 input file to read.
Expand source code
class Reader(seisio.SeisIO): """Class to deal with input of seismic files in SEG2 format.""" def __init__(self, file): """ Initialize class Reader. Parameters ---------- file : str or pathlib.Path The name of the SEG2 input file to read. """ super().__init__(file) self._sg2 = self._SEG2() self._fp.fixed = True log.info("Input file: %s", self._fp.file) with open(self._fp.file, "rb") as fio: fio.seek(0, 2) self._fp.filesize = fio.tell() fio.seek(0, 0) # file descriptor block and free format section tpb_size = self._read_file_descriptor_block(fio) log.debug("Size of file descriptor block: %d bytes.", tpb_size) self._read_trace_pointer_subblock(fio, tpb_size) free_form_str = fio.read(self._sg2.trcptr[0]-fio.tell()) self._sg2.fheader = self._parse_free_format_section(self._sg2.fheader, free_form_str) self._check_fdb_strings() self._fp.skip = self._sg2.trcptr[0] # first trace header descriptor block fio.seek(self._sg2.trcptr[0], 0) tdb_size = self._read_trace_descriptor_block(fio) log.debug("Size of first trace descriptor block: %d bytes.", tpb_size) free_form_str = fio.read(tdb_size-32) self._sg2.theader = self._parse_free_format_section(self._sg2.theader, free_form_str) self._check_tdb_strings() @property def fheader(self): """ Get information from the free-form file header. Returns ------- dict The file header as dictionary. """ return self._sg2.fheader @property def vsi(self): """ Get the (vertical) sampling interval (of first trace). Returns ------- float Sampling interval. """ return self._dp.si def _read_file_descriptor_block(self, fio): """Read SEG2 file descriptor block.""" fdb = fio.read(32) # Determine the endianness and check validity of block magic = np.frombuffer(fdb, dtype='B', count=2) if magic[0] == 85 and magic[1] == 58: self._fp.endian = "<" elif magic[0] == 58 and magic[1] == 85: self._fp.endian = ">" else: log.error("File descriptor block does not start 0x3a55 or 0x553a.") raise ValueError("Invalid file descriptor block magic number.") log.info("Valid SEG2 file descriptor found. File endianess: '%s'.", self._fp.endian) # Check revision number revno = np.frombuffer(fdb, dtype=f"{self._fp.endian}H", count=1, offset=2) if revno != 1: log.warning("Only SEG2 rev. 1 is supported. File's revision is " "%d, though. Trying to continue.", revno) else: log.info("File standard is SEG2 rev. 1.") # Determine size of Trace Pointer Subblock in bytes tpb_size, ntraces = np.frombuffer(fdb, dtype=f"{self._fp.endian}H", count=2, offset=4) log.debug("Size of trace pointer subblock: %d bytes.", tpb_size) if ntraces*4 > tpb_size: log.error("File indicates %d traces but there are only %d trace " "pointers.", ntraces, tpb_size//4) raise ValueError("Mismatch of number of traces and trace pointer block size.") self._dp.nt = ntraces log.info("Number of data traces in file: %d.", self.nt) # Define string and line terminator st_size, fstchar, sstchar, lt_size, fltchar, sltchar = unpack_from("BccBcc", fdb, 8) log.debug("String terminator size: %d, characters '%s','%s'", st_size, fstchar, sstchar) log.debug("Line terminator size: %d, characters '%s','%s'", lt_size, fltchar, sltchar) # Assemble the string terminator if st_size == 1: self._sg2.sterm = fstchar elif st_size == 2: self._sg2.sterm = fstchar + sstchar else: raise ValueError("String terminator has wrong size.") # Assemble the line terminator if lt_size == 1: self._sg2.lterm = fltchar elif lt_size == 2: self._sg2.lterm = fltchar + sltchar else: raise ValueError("Line terminator has wrong size.") return tpb_size def _read_trace_pointer_subblock(self, fio, tpb_size): """ Read the Trace Pointer Subblock. The Trace Pointer Subblock starts at byte 32, and contains pointers (unsigned long integers) to the start of each Trace Descriptor Block contained in the file. The length of this subblock in bytes (M) is specified in bytes 4 and 5, and the number of pointers contained in the subblock (N) is specified in bytes 6 and 7 (see above). This fixed format allows a file to be built up one trace at a time by upgrading the value N and adding a pointer to the new trace in the Trace Pointer Subblock, but without otherwise changing the file format. """ fio.seek(32, 0) tpb = fio.read(tpb_size) self._sg2.trcptr = np.array(unpack_from(self._fp.endian+("L"*self.nt), tpb), dtype="int64") def _parse_free_format_section(self, header, free_form_str): """ Parse the free form section. Parameter --------- header : dict The header dictionary to update. free_form_str : str The free-form string to parse. """ def cleanup_and_decode_string(value): def is_good_char(c): return c in (b'0123456789' b'abcdefghijklmnopqrstuvwxyz' b'ABCDEFGHIJKLMNOPQRSTUVWXYZ' b'!"#$%&\'()*+,-./:; <=>?@[\\]^_`{|}~ ') return "".join(map(chr, filter(is_good_char, value))).strip() offset = 0 strings = [] while offset+2 < len(free_form_str): strlen = np.frombuffer(free_form_str, dtype=f"{self._fp.endian}H", count=1, offset=offset)[0] if strlen == 0: break curstr = free_form_str[offset+2:offset+strlen] try: curstrlen = curstr.index(self._sg2.sterm) except ValueError: strings.append(curstr) else: strings.append(curstr[:curstrlen]) offset += strlen for string in strings: string = string.strip().split(b" ", 1) key = cleanup_and_decode_string(string[0]) try: value = string[1] except IndexError: value = "" if key == "NOTE": if len(value) > 0: value = [cleanup_and_decode_string(line) for line in value.split(self._sg2.lterm) if line] else: value = "" else: value = cleanup_and_decode_string(value) header[key] = value return header def _check_fdb_strings(self): """Check strings in file descriptor block.""" for key in self._sg2.fheader: if key not in tools._SEG2FILEDESCSTR: log.warning("Unknown string '%s' found in free format section.", key) log.warning("Value associated with this key: %s.", self._sg2.fheader[key]) def _read_trace_descriptor_block(self, fio): """ Read first part of a trace descriptor block. The file pointer must have been positioned correctly. """ tdb = fio.read(32) # Check validity of block magic = np.frombuffer(tdb, dtype=f"{self._fp.endian}H", count=1) if magic != 17442: log.error("Trace descriptor block does not start 0x4422.") raise ValueError("Invalid trace descriptor block magic number.") else: log.info("Valid SEG2 trace descriptor found on first trace.") self._tr.thsize = np.frombuffer(tdb, dtype=f"{self._fp.endian}H", count=1, offset=2)[0] trdsize = np.frombuffer(tdb, dtype=f"{self._fp.endian}I", count=1, offset=4)[0] self._tr.trsize = self._tr.thsize + trdsize self._dp.ns = np.frombuffer(tdb, dtype=f"{self._fp.endian}I", count=1, offset=8)[0] self._fp.datfmt = np.frombuffer(tdb, dtype=f"{self._fp.endian}H", count=1, offset=12)[0] if self._fp.datfmt not in tools._SEG2DATAFORMAT: raise ValueError(f"SEG2 data format '{self._fp.datfmt}' not supported.") log.info("Number of samples on first data trace: %d.", self.ns) log.info("Data sample format: %s.", tools._SEG2DATAFORMAT[self._fp.datfmt]["desc"]) return self._tr.thsize def _check_tdb_strings(self): """Check strings in (first) trace descriptor block.""" for i, key in enumerate(tools._SEG2TRACEDESCSTR): self._tr.thdict[key] = {"byte": i+1, "type": "str", "desc": tools._SEG2TRACEDESCALIAS[i]} i = len(tools._SEG2TRACEDESCSTR) for key in self._sg2.theader: if (key not in tools._SEG2TRACEDESCSTR): i += 1 log.warning("Unknown string '%s' found in trace descriptor block of 1st trace.", key) log.warning("Adding string to trace header table.") self._tr.thdict[key] = {"byte": i, "type": "str", "desc": "...non-standard SEG2 string..."} self._tr.thdict["NSAMPLES"] = {"byte": i+1, "type": "str", "desc": "Number of samples"} self._dp.si = float(self._sg2.theader[tools._SEG2TRACEDESCSTR[25]]) def read_dataset(self, silent=False, history=None): """Get all traces - an alias for read_all_traces().""" return self.read_all_traces(silent=silent, history=history) def read_all_traces(self, silent=False, history=None): """ Get all traces (e.g., read the entire file). Parameters ---------- silent : bool, optional (default: False) Whether to suppress all standard logging (True) or not (False). history : list, optional (default: None) Processing history as list of strings. Returns ------- Numpy array, Pandas dataframe Data and trace headers. """ if not silent: log.info("Reading entire data set (%d traces) from disk...", self.nt) st = time.time() all_headers = [] all_data = [] with open(self._fp.file, "rb") as fio: for trace in np.arange(self.nt): trp = self._sg2.trcptr[trace] fio.seek(trp, 0) d, h = self._parse_trace(fio, trace) all_headers.append(h) all_data.append(d) max_ns = self.ns if not self._fp.fixed: for d in all_data: max_ns = len(d) if len(d) > max_ns else max_ns if max_ns != self.ns: self._dp.ns = max_ns log.warning("Updated number of samples to maximum in data set: %d.", self.ns) data = np.zeros((self.nt, max_ns), dtype=tools._SEG2DATAFORMAT[self._fp.datfmt]["dtype"]) df = pd.DataFrame(columns=self.mnemonics, index=range(self.nt)) if not self._fp.fixed: for i, d in enumerate(all_data): if len(d) < max_ns: data[i, 0:len(d)] = all_data[i] else: data[i, :] = all_data[i] for key in self.mnemonics: df.at[i, key] = all_headers[i].get(key, 0) else: for i in np.arange(self.nt): data[i, :] = all_data[i] for key in self.mnemonics: df.at[i, key] = all_headers[i].get(key, 0) et = time.time() if not silent: diff = et-st if diff < 0.1: log.info("Reading all traces took %.3f seconds.", et-st) else: log.info("Reading all traces took %.1f seconds.", et-st) if history is not None: history.append(f"seisio {__version__}: read entire data set '{self._fp.file.absolute()}', " f"ntraces={self.nt:d}, nsamples={self.ns:d}.") return data, df def read_all_headers(self, silent=False): """ Get all trace headers. Parameters ---------- silent : bool, optional (default: False) Whether to suppress all standard logging (True) or not (False). Returns ------- Pandas dataframe Trace header table. """ if not silent: log.info("Reading all (i.e., %d) trace headers from disk...", self.nt) st = time.time() all_headers = [] with open(self._fp.file, "rb") as fio: for trace in np.arange(self.nt): trp = self._sg2.trcptr[trace] fio.seek(trp, 0) _, h = self._parse_trace(fio, trace, headers_only=True) all_headers.append(h) df = pd.DataFrame(columns=self.mnemonics, index=range(self.nt)) for i in np.arange(len(all_headers)): for key in self.mnemonics: df.at[i, key] = all_headers[i].get(key, 0) et = time.time() if not silent: diff = et-st if diff < 0.1: log.info("Reading all headers took %.3f seconds.", et-st) else: log.info("Reading all headers took %.1f seconds.", et-st) return df def _parse_trace(self, fio, itrc, headers_only=False): """Parse next trace.""" tdb = fio.read(32) # Check validity of block magic = np.frombuffer(tdb, dtype=f"{self._fp.endian}H", count=1) if magic != 17442: log.error("Trace descriptor block %d does not start 0x4422.", itrc) raise ValueError("Invalid trace descriptor block magic number.") tdb_size = np.frombuffer(tdb, dtype=f"{self._fp.endian}H", count=1, offset=2)[0] ns = np.frombuffer(tdb, dtype=f"{self._fp.endian}I", count=1, offset=8)[0] if ns != self.ns: log.warning("Number of samples varies in file %s!", self._fp.file) self._fp.fixed = False if self._fp.datfmt == 3: if ns % 4 != 0: log.error("Data format code 3 requires that the no. of samples is divisible by 4!") log.error("However, ns=%d on trace %d.", ns, itrc) raise RuntimeError("SEG2 file violating data format code 3 specs.") else: nsfmt3 = int((2.5*ns)/2) dtype = np.dtype(f"({nsfmt3},){self._fp.endian}" f"{tools._SEG2DATAFORMAT[self._fp.datfmt]['type']}") else: dtype = np.dtype(f"({ns},){self._fp.endian}" f"{tools._SEG2DATAFORMAT[self._fp.datfmt]['type']}") header = {} free_form_str = fio.read(tdb_size-32) header = self._parse_free_format_section(header, free_form_str) header["NSAMPLES"] = str(ns) if headers_only: data = None else: data = np.fromfile(fio, dtype=dtype, count=1).squeeze() if self._fp.datfmt == 3: # The following conversion code stems from obspy, Lion Krischer, # licensed under LGPL v3 - full credits to the original author # # Convert one's complement to two's complement by adding one to # negative numbers. one_to_two = (data < 0) # The first two bytes (1 word) of every 10 bytes (5 words) contains # a 4-bit exponent for each of the 4 remaining 2-byte (int16) samples. exponents = data[0::5].view(self._fp.endian + 'u2') result = np.empty(ns, dtype=np.int32) # Apply the negative correction, then multiply by correct exponent. result[0::4] = ((data[1::5] + one_to_two[1::5]) * 2**((exponents & 0x000f) >> 0)) result[1::4] = ((data[2::5] + one_to_two[2::5]) * 2**((exponents & 0x00f0) >> 4)) result[2::4] = ((data[3::5] + one_to_two[3::5]) * 2**((exponents & 0x0f00) >> 8)) result[3::4] = ((data[4::5] + one_to_two[4::5]) * 2**((exponents & 0xf000) >> 12)) data = result return data, headerAncestors
- SeisIO
- abc.ABC
Instance variables
prop fheader-
Get information from the free-form file header.
Returns
dict- The file header as dictionary.
Expand source code
@property def fheader(self): """ Get information from the free-form file header. Returns ------- dict The file header as dictionary. """ return self._sg2.fheader prop vsi-
Get the (vertical) sampling interval (of first trace).
Returns
float- Sampling interval.
Expand source code
@property def vsi(self): """ Get the (vertical) sampling interval (of first trace). Returns ------- float Sampling interval. """ return self._dp.si
Methods
def read_all_headers(self, silent=False)-
Get all trace headers.
Parameters
silent:bool, optional(default: False)- Whether to suppress all standard logging (True) or not (False).
Returns
Pandas dataframe- Trace header table.
def read_all_traces(self, silent=False, history=None)-
Get all traces (e.g., read the entire file).
Parameters
silent:bool, optional(default: False)- Whether to suppress all standard logging (True) or not (False).
history:list, optional(default: None)- Processing history as list of strings.
Returns
Numpy array, Pandas dataframe- Data and trace headers.
def read_dataset(self, silent=False, history=None)-
Get all traces - an alias for read_all_traces().
Inherited members