1# =====================================================
2# Imports
3# =====================================================
4from typing import Annotated
5
6from loguru import logger
7from pydantic import Field, PrivateAttr
8
9from mt_metadata.base import MetadataBase
10from mt_metadata.base.helpers import validate_name
11
12
13# =====================================================
14class DataSection(MetadataBase):
15 """
16 DataSection contains the small metadata block that describes which channel
17 is which. A typical block looks like::
18
19 >=MTSECT
20
21 ex=1004.001
22 ey=1005.001
23 hx=1001.001
24 hy=1002.001
25 hz=1003.001
26 nfreq=14
27 sectid=par28ew
28 nchan=None
29 maxblks=None
30
31
32 :param fn: full path to .edi file to read in.
33 :type fn: string
34
35
36 ================= ==================================== ======== ===========
37 Attributes Description Default In .edi
38 ================= ==================================== ======== ===========
39 ex ex channel id number None yes
40 ey ey channel id number None yes
41 hx hx channel id number None yes
42 hy hy channel id number None yes
43 hz hz channel id number None yes
44 nfreq number of frequencies None yes
45 sectid section id, should be the same
46 as the station name -> Header.dataid None yes
47 maxblks maximum number of data blocks None yes
48 nchan number of channels None yes
49 _kw_list list of key words to put in metadata [1]_ no
50 ================= ==================================== ======== ===========
51
52 .. [1] Changes these values to change what is written to edi file
53 """
54
55 nfreq: Annotated[
56 int,
57 Field(
58 default=0,
59 description="Number of frequencies",
60 alias=None,
61 json_schema_extra={
62 "units": None,
63 "required": False,
64 "examples": [16, 1],
65 },
66 ), # type: ignore
67 ]
68
69 sectid: Annotated[
70 str,
71 Field(
72 default="",
73 description="ID of the station that the data is from. This is important if you have more than one station per file.",
74 alias=None,
75 json_schema_extra={
76 "units": None,
77 "required": True,
78 "examples": ["mt001"],
79 },
80 ),
81 ]
82
83 nchan: Annotated[
84 int,
85 Field(
86 default=0,
87 description="Number of channels in the transfer function",
88 alias=None,
89 json_schema_extra={
90 "units": None,
91 "required": True,
92 "examples": [7],
93 },
94 ),
95 ]
96
97 maxblocks: Annotated[
98 int,
99 Field(
100 default=999,
101 description="Maximum number of data blocks",
102 alias=None,
103 json_schema_extra={
104 "units": None,
105 "required": True,
106 "examples": [999],
107 },
108 ),
109 ]
110
111 ex: Annotated[
112 str | None,
113 Field(
114 default=None,
115 description="Measurement ID for EX",
116 alias=None,
117 json_schema_extra={
118 "units": None,
119 "required": True,
120 "examples": ["1"],
121 },
122 ),
123 ]
124
125 ey: Annotated[
126 str | None,
127 Field(
128 default=None,
129 description="Measurement ID for EY",
130 alias=None,
131 json_schema_extra={
132 "units": None,
133 "required": True,
134 "examples": ["2"],
135 },
136 ),
137 ]
138
139 hx: Annotated[
140 str | None,
141 Field(
142 default=None,
143 description="Measurement ID for HX",
144 alias=None,
145 json_schema_extra={
146 "units": None,
147 "required": True,
148 "examples": ["3"],
149 },
150 ),
151 ]
152
153 hy: Annotated[
154 str | None,
155 Field(
156 default=None,
157 description="Measurement ID for HY",
158 alias=None,
159 json_schema_extra={
160 "units": None,
161 "required": True,
162 "examples": ["4"],
163 },
164 ),
165 ]
166
167 hz: Annotated[
168 str | None,
169 Field(
170 default=None,
171 description="Measurement ID for HZ",
172 alias=None,
173 json_schema_extra={
174 "units": None,
175 "required": True,
176 "examples": ["5"],
177 },
178 ),
179 ]
180
181 rrhx: Annotated[
182 str | None,
183 Field(
184 default=None,
185 description="Measurement ID for RRHX",
186 alias=None,
187 json_schema_extra={
188 "units": None,
189 "required": True,
190 "examples": ["6"],
191 },
192 ),
193 ]
194
195 rrhy: Annotated[
196 str | None,
197 Field(
198 default=None,
199 description="Measurement ID for RRHY",
200 alias=None,
201 json_schema_extra={
202 "units": None,
203 "required": True,
204 "examples": ["7"],
205 },
206 ),
207 ]
208
209 # List of keywords for the data section, used for writing metadata.
210 # This list can be modified to change what is written to the EDI file.
211 _kw_list: list[str] = PrivateAttr(
212 default_factory=lambda: [
213 "nfreq",
214 "sectid",
215 "nchan",
216 "maxblocks",
217 "ex",
218 "ey",
219 "hx",
220 "hy",
221 "hz",
222 "rrhx",
223 "rrhy",
224 ]
225 )
226
227 # Private attributes
228 # Line number in the EDI file where the data section starts.
229 _line_num: int = PrivateAttr(default=0)
230
231 # Data type for output, typically 'z' for complex impedance data.
232 _data_type_out: str = PrivateAttr(default="z")
233
234 # Data type for input, typically 'z' for complex impedance data.
235 _data_type_in: str = PrivateAttr(default="z")
236
237 # List of channel IDs associated with the data section.
238 _channel_ids: list[str] = PrivateAttr(default_factory=list)
239
240 def __str__(self) -> str:
241 return "".join(self.write_data())
242
243 def __repr__(self) -> str:
244 return self.__str__()
245
246 def get_data(self, edi_lines: list[str]) -> list[str]:
247 """
248 Read in the data of the file, will detect if reading spectra or
249 impedance.
250 """
251 data_list = []
252 data_find = False
253
254 for ii, line in enumerate(edi_lines):
255 if ">=" in line and "sect" in line.lower():
256 data_find = True
257 self._line_num = ii
258 if "spect" in line.lower():
259 self._data_type_in = "spectra"
260 elif "mt" in line.lower():
261 self._data_type_in = "z"
262 elif ">" in line and data_find is True:
263 self._line_num = ii
264 break
265
266 elif data_find:
267 if len(line.strip()) > 2:
268 data_list.append(line.strip())
269 return data_list
270
271 def read_data(self, edi_lines: list[str]) -> None:
272 """
273 Read data section
274 """
275 data_list = self.get_data(edi_lines)
276
277 channels = False
278 self._channel_ids = []
279 for d_line in data_list:
280 d_list = d_line.split("=")
281 if len(d_list) > 1:
282 key = d_list[0].lower()
283 value = d_list[1].strip().replace('"', "")
284 if key not in ["sectid"]:
285 try:
286 value = int(value)
287 except ValueError:
288 pass
289 elif key in ["sectid"]:
290 value = validate_name(value)
291 setattr(self, key, value)
292 else:
293 if "//" in d_line:
294 channels = True
295 continue
296 if channels:
297 if len(d_line) > 10:
298 self._channel_ids += d_line.strip().split()
299 else:
300 self._channel_ids.append(d_line)
301 if self._channel_ids == []:
302 for comp in self._kw_list[4:]:
303 ch_id = getattr(self, comp)
304 if ch_id is not None:
305 self._channel_ids.append(ch_id)
306
307 def write_data(
308 self, data_list: list[str] | None = None, over_dict: dict | None = None
309 ) -> list[str]:
310 """
311 Write the data section to a list of strings.
312 """
313 # FZ: need to modify the nfreq (number of freqs),
314 # when re-writing effective EDI files)
315 if over_dict is not None:
316 for akey in list(over_dict.keys()):
317 self.__setattr__(akey, over_dict[akey])
318
319 if data_list is not None:
320 self.read_data(data_list)
321
322 logger.debug("Writing out data a impedances")
323
324 if self._data_type_out == "z":
325 data_lines = ["\n>=mtsect\n".upper()]
326 elif self._data_type_out == "spectra":
327 data_lines = ["\n>spectrasect\n".upper()]
328
329 for key in self._kw_list[0:4]:
330 data_lines.append(f"{' '*4}{key.upper()}={getattr(self, key)}\n")
331
332 # need to sort the list so it is descending order by channel number
333 ch_list = [
334 (key.upper(), getattr(self, key))
335 for key in self._kw_list[4:-2]
336 if getattr(self, key) is not None
337 ]
338 rr_ch_list = [
339 (key.upper(), getattr(self, key))
340 for key in self._kw_list[-2:]
341 if getattr(self, key) is not None
342 ]
343 ch_list2 = sorted(ch_list, key=lambda x: x[1]) + sorted(
344 rr_ch_list, key=lambda x: x[1]
345 )
346
347 for ch in ch_list2:
348 if ch[1] not in [0, "0"]:
349 data_lines.append(f"{' '*4}{ch[0]}={ch[1]}\n")
350
351 data_lines.append("\n")
352
353 return data_lines
354
355 def match_channels(self, ch_ids: dict[str, str]) -> None:
356 """
357 Match the channels in the data section with the provided channel IDs.
358 This method updates the channel IDs based on the provided list.
359 """
360
361 for ch_id in self._channel_ids:
362 for key, value in ch_ids.items():
363 if isinstance(ch_id, (str)):
364 ch_id = ch_id.lower().split("ch")[-1]
365 try:
366 if float(ch_id) == value:
367 setattr(self, key.lower(), value)
368 except ValueError:
369 logger.warning(f"Could not match channel {ch_id}")