1# -*- coding: utf-8 -*-
2"""
3Created on Sat Dec 4 14:13:37 2021
4
5@author: jpeacock
6"""
7# =============================================================================
8# Imports
9# =============================================================================
10from collections import OrderedDict
11
12from pydantic import Field, PrivateAttr
13
14from mt_metadata.base import MetadataBase
15
16
17# ==============================================================================
18# Info object
19# ==============================================================================
20class Information(MetadataBase):
21 """
22 Contain, read, and write info section of .edi file
23
24 not much to really do here, but just keep it in the same format that it is
25 read in as, except if it is in phoenix format then split the two paragraphs
26 up so they are sequential.
27
28 """
29
30 info_dict: dict[str, str | list | None] = Field(
31 default_factory=dict,
32 description="Dictionary of information lines from the info section",
33 )
34 _phoenix_col_width: int = PrivateAttr(default=38)
35 _phoenix_file: bool = PrivateAttr(default=False)
36 _empower_file: bool = PrivateAttr(default=False)
37 _phoenix_translation_dict: dict[str, str | list] = PrivateAttr(
38 default_factory=lambda: {
39 "survey": "survey.id",
40 "company": "station.acquired_by.organization",
41 "job": "survey.project",
42 "hardware": "run.data_logger.model",
43 "mtuprog version": "run.data_logger.firmware.version",
44 "xpr weighting": "processing_parameter",
45 "hx sen": "run.hx.sensor.id",
46 "hy sen": "run.hy.sensor.id",
47 "hz sen": "run.hz.sensor.id",
48 "rx sen": "run.rrhx.sensor.id",
49 "ry sen": "run.rrhy.sensor.id",
50 "stn number": "station.id",
51 "mtu-box serial number": "run.data_logger.id",
52 "ex pot resist": "run.ex.contact_resistance.start",
53 "ey pot resist": "run.ey.contact_resistance.start",
54 "ex voltage": ["run.ex.ac.start", "run.ex.dc.start"],
55 "ey voltage": ["run.ey.ac.start", "run.ey.dc.start"],
56 "start-up": "station.time_period.start",
57 "end-time": "station.time_period.end",
58 }
59 )
60
61 _translation_dict: dict[str, str] = PrivateAttr(
62 default_factory=lambda: {
63 "operator": "run.acquired_by.author",
64 "adu_serial": "run.data_logger.id",
65 "e_azimuth": "run.ex.measurement_azimuth",
66 "ex_len": "run.ex.dipole_length",
67 "ey_len": "run.ey.dipole_length",
68 "ex_resistance": "run.ex.contact_resistance.start",
69 "ey_resistance": "run.ey.contact_resistance.start",
70 "h_azimuth": "run.hx.measurement_azimuth",
71 "hx": "run.hx.sensor.id",
72 "hy": "run.hy.sensor.id",
73 "hz": "run.hz.sensor.id",
74 "hx_resistance": "run.hx.h_field_max.start",
75 "hy_resistance": "run.hy.h_field_max.start",
76 "hz_resistance": "run.hz.h_field_max.start",
77 "algorithmname": "transfer_function.software.name",
78 "ndec": "processing_parameter",
79 "nfft": "processing_parameter",
80 "ntype": "processing_parameter",
81 "rrtype": "processing_parameter",
82 "removelargelines": "processing_parameter",
83 "rotmaxe": "processing_parameter",
84 "project": "survey.project",
85 "processedby": "transfer_function.processed_by.name",
86 "processingsoftware": "transfer_function.software.name",
87 "processingtag": "transfer_function.id",
88 "signconvention": "transfer_function.sign_convention",
89 "sitename": "station.geographic_name",
90 "survey": "survey.id",
91 "year": "survey.time_period.start_date",
92 "runlist": "transfer_function.runs_processed",
93 "remotesite": "transfer_function.remote_references",
94 "remoteref": "transfer_function.processing_parameters",
95 }
96 )
97 _empower_translation_dict: dict[str, str] = PrivateAttr(
98 default_factory=lambda: {
99 "processingsoftware": "transfer_function.software.name",
100 "sitename": "station.geographic_name",
101 "year": "survey.time_period.start_date",
102 "process_date": "transfer_function.processed_date",
103 "declination": "station.location.declination.value",
104 "tag": "component",
105 "length": "dipole_length",
106 "ac": "ac.end",
107 "dc": "dc.end",
108 "negative res": "contact_resistance.start",
109 "negative_res": "contact_resistance.start",
110 "positive res": "contact_resistance.end",
111 "positive_res": "contact_resistance.end",
112 "sensor type": "sensor.model",
113 "sensor_type": "sensor.model",
114 "detected sensor type": "sensor.model",
115 "azimuth": "measured_azimuth",
116 "sensor serial": "sensor.id",
117 "sensor_serial": "sensor.id",
118 "cal name": "comments",
119 "cal_name": "comments",
120 "saturation": "comments",
121 "instrument type": "data_logger.model",
122 "station name": "geographic_name",
123 "operator": "acquired_by.author",
124 "recording id": "id",
125 "min value": "comments",
126 "max value": "comments",
127 }
128 )
129
130 def __str__(self):
131 return "".join(self.write_info())
132
133 def __repr__(self):
134 return self.__str__()
135
136 def read_info(self, edi_lines: list[str]) -> None:
137 """
138 Read information section and parse directly to info_dict.
139
140 Parameters
141 ----------
142 edi_lines : list[str]
143 List of lines from the EDI file.
144 """
145 self.info_dict = OrderedDict()
146 self._phoenix_file = False
147 self._empower_file = False
148
149 # 1. Identify the info section and detect format in a single pass
150 info_section = []
151 info_started = False
152
153 for line in edi_lines:
154 line = line.strip()
155
156 # Check for start/end markers
157 if ">info" in line.lower():
158 info_started = True
159 continue
160 elif info_started and line and line[0] == ">":
161 break
162
163 # Collect info lines for processing
164 if info_started and line:
165 # Detect format while collecting
166 if "run information" in line.lower():
167 self._phoenix_file = True
168 elif (
169 ("empower" in line.lower() and "v" in line.lower())
170 or "electrics" in line.lower()
171 or "magnetics" in line.lower()
172 ):
173 self._empower_file = True
174
175 info_section.append(line)
176
177 # 2. Parse lines based on detected format
178 if self._empower_file:
179 self._parse_empower_info(info_section)
180 self._comments_to_string()
181 elif self._phoenix_file:
182 self._parse_phoenix_info(info_section)
183 self._comments_to_string()
184 else:
185 self._parse_standard_info(info_section)
186 self._comments_to_string()
187
188 def _comments_to_string(self) -> None:
189 """Convert list comments to a single string."""
190 for key, value in self.info_dict.items():
191 if "comment" in key and isinstance(value, list):
192 self.info_dict[key] = ",".join(value)
193
194 def _get_separator(self, line: str) -> str | None:
195 """Find the key-value separator in a line."""
196 sep = None
197 if line.count(":") > 0 and line.count("=") > 0:
198 if line.find(":") < line.find("="):
199 sep = ":"
200 else:
201 sep = "="
202
203 elif line.count(":") >= 1:
204 sep = ":"
205 # colon_find = line.find(":")
206 elif line.count("=") >= 1:
207 sep = "="
208
209 return sep
210
211 def _parse_standard_info(self, info_lines: list[str]) -> None:
212 """Parse standard format EDI info lines directly to info_dict."""
213 for line in info_lines:
214 # Skip empty lines and section headers
215 if not line or "<" in line or ">" in line:
216 continue
217
218 # Get separator and parse key/value
219 sep = self._get_separator(line)
220 if not sep:
221 self.info_dict[line.strip()] = ""
222 continue
223
224 parts = line.split(sep, 1)
225 if len(parts) != 2:
226 continue
227
228 key = parts[0].strip().lower()
229 value = parts[1].strip()
230
231 # Handle list values
232 if value.startswith("[") and value.endswith("]"):
233 value = [
234 v.strip()
235 for v in value[1:-1]
236 .replace(",", " ")
237 .replace(";", " ")
238 .replace(":", " ")
239 .split()
240 ]
241
242 # Apply translation dictionary
243 std_key = self._translation_dict.get(key)
244
245 if std_key is not None:
246 # Handle special processing parameters
247 if std_key == "processing_parameter":
248 tf_parameters = self.info_dict.get(
249 "transfer_function.processing_parameters", []
250 )
251 if not isinstance(tf_parameters, list):
252 tf_parameters = [tf_parameters]
253 tf_parameters.append(f"{key}={value}")
254 self.info_dict["transfer_function.processing_parameters"] = (
255 tf_parameters
256 )
257 else:
258 self.info_dict[std_key] = value
259 else:
260 # Store unrecognized keys with original name
261 self.info_dict[key] = value
262
263 def _parse_phoenix_info(self, info_lines: list[str]) -> None:
264 """Parse Phoenix format EDI info lines efficiently."""
265 for line in info_lines:
266 # Process each line for potential multi-column content
267 is_multi_column, columns = self._split_phoenix_columns(line)
268
269 for column in columns:
270 sep = self._get_separator(column)
271 if not sep:
272 continue
273
274 parts = column.split(sep, 1)
275 if len(parts) != 2:
276 continue
277
278 key = parts[0].strip().lower()
279 value = parts[1].strip()
280 if value.count(" ") > 0:
281 value = value.split(" ")[0].strip() # Apply Phoenix translation
282 self._apply_phoenix_translation(key, value)
283
284 def _parse_empower_info(self, info_lines: list[str]) -> None:
285 """
286 Parse Empower format EDI info lines efficiently.
287
288 Empower format has a hierarchical structure with sections for
289 general info, electrics, magnetics, and reference stations.
290 """
291 section = "general"
292 component = None
293 sub_section = None
294
295 # Process all lines and handle hierarchical structure
296 for line in info_lines:
297 original_line = line
298 line = line.strip()
299
300 # Skip empty lines
301 if not line:
302 continue
303
304 # Get indentation level to understand hierarchy
305 indent_level = len(original_line) - len(original_line.lstrip())
306
307 # Check for main section headers (typically at low indentation)
308 line_lower = line.lower()
309 if indent_level <= 5: # Main sections are usually at low indentation
310 if line_lower == "stations":
311 section = "stations"
312 continue
313 elif line_lower == "electrics":
314 section = "electrics"
315 sub_section = "electrics"
316 continue
317 elif line_lower == "magnetics":
318 section = "magnetics"
319 sub_section = "magnetics"
320 continue
321 elif line_lower == "reference":
322 section = "reference"
323 sub_section = "reference"
324 continue
325
326 # Component-level headers (e.g., "EX", "EY", "HX", "HY", etc.)
327 if section in ["electrics", "magnetics", "reference"] or sub_section in [
328 "electrics",
329 "magnetics",
330 "reference",
331 ]:
332 # Check if this is a component header (no separator and matches component pattern)
333 if self._get_separator(line) is None and line_lower in [
334 "ex",
335 "ey",
336 "hx",
337 "hy",
338 "hz",
339 "rx",
340 "ry",
341 "e1",
342 "e2",
343 "h1",
344 "h2",
345 "h3",
346 ]: # Components are typically more indented
347 component = line_lower
348 continue
349
350 # Regular key-value pairs
351 sep = self._get_separator(line)
352 if not sep:
353 # Handle special cases for lines without separators
354 if line_lower in ["editing workbench", "stations"]:
355 section = line_lower.replace(" ", "_")
356 continue
357
358 parts = line.split(sep, 1)
359 if len(parts) != 2:
360 continue
361
362 key = parts[0].strip().lower()
363 value = parts[1].strip()
364
365 # Clean up value (remove units in brackets and degree symbol)
366 if value.find("[") > 2: # need to avoid values that are lists
367 value = value.replace("[", "").replace("]", "").split(",")
368 if len(value) == 1:
369 value = value[0].strip()
370 value = value.split(" ")[0] # remove units
371 else:
372 value = ",".join(v.strip() for v in value)
373
374 value = value.replace("°", "").replace("Â", "").strip()
375
376 # Build the key based on section/component context
377 std_key = self._get_empower_std_key(section, component, key, sub_section)
378
379 # special case handling
380 if std_key:
381 if "remote_references." in std_key:
382 # skip these for now
383 if (
384 "acquired_by" in std_key
385 or "data_logger" in std_key
386 or "author" in std_key
387 ):
388 continue
389 if "azimuth" in std_key:
390 # Only skip azimuth if it's in a problematic context, not for measured_azimuth
391 if "measured_azimuth" not in std_key:
392 continue
393 if "component" in std_key:
394 value = component
395 if "hx" in std_key or "hy" in std_key or "hz" in std_key:
396 if "acquired_by" in std_key or "data_logger" in std_key:
397 # Handle author information for Hx/Hy/Hz
398 std_key = (
399 std_key.replace(".hx.", ".")
400 .replace(".hy.", ".")
401 .replace(".hz.", ".")
402 )
403 elif "ac" in std_key or "dc" in std_key:
404 # Handle AC/DC values for Hx/Hy/Hz
405 std_key = std_key.replace("ac", "comments").replace(
406 "dc", "comments"
407 )
408
409 if "comments" in std_key:
410 original_value = self.info_dict.get(std_key, [])
411 if not isinstance(original_value, list):
412 original_value = [] if not original_value else [original_value]
413 original_value.append(f"{key}={value}")
414 value = original_value
415 elif "data_logger.model" in std_key:
416 std_key = "run.data_logger.model"
417 elif std_key.endswith(".id") and "sensor.id" not in std_key:
418 # Only map recording IDs, not sensor IDs
419 std_key = "run.id"
420 elif "geographic_name" in std_key:
421 if "remote_references" in std_key:
422 std_key = "transfer_function.remote_references.geographic_name"
423 else:
424 std_key = "station.geographic_name"
425 elif "author" in std_key:
426 std_key = "run.acquired_by.author"
427 self.info_dict[std_key] = value
428
429 else:
430 # For unrecognized keys, store with section prefix
431 if component:
432 context_key = f"{section}.{component}.{key}"
433 elif sub_section and sub_section != section:
434 context_key = f"{sub_section}.{key}"
435 elif section != "general":
436 context_key = f"{section}.{key}"
437 else:
438 context_key = key
439 self.info_dict[context_key] = value
440
441 def _get_empower_std_key(
442 self,
443 section: str,
444 component: str | None,
445 key: str,
446 sub_section: str | None = None,
447 ) -> str | None:
448 """
449 Get standardized key for Empower format based on section and component context.
450
451 Parameters
452 ----------
453 section : str
454 Current section ("general", "electrics", "magnetics", "reference", etc.)
455 component : str
456 Current component (e.g., "ex", "ey", "hx", "hy", "hz", "rx", "ry", None)
457 key : str
458 Original key name
459 sub_section : str, optional
460 Sub-section for additional context
461
462 Returns
463 -------
464 str or None
465 Standardized key name or None if no mapping found
466 """
467 # Handle general section keys
468 if section == "general":
469 mapped_key = self._empower_translation_dict.get(key)
470 if mapped_key:
471 return mapped_key
472 return None
473
474 # Handle component-specific keys
475 if not component:
476 # Handle section-level keys without component context
477 mapped_key = self._empower_translation_dict.get(key)
478 if mapped_key:
479 if section == "reference":
480 return f"transfer_function.remote_references.{mapped_key}"
481 elif sub_section:
482 return f"run.{mapped_key}"
483 else:
484 return mapped_key
485 return None
486
487 # Map component names to standard names
488 component_map = {
489 "ex": "ex",
490 "ey": "ey",
491 "hx": "hx",
492 "hy": "hy",
493 "hz": "hz",
494 "rx": "rrhx", # Remote reference components
495 "ry": "rrhy",
496 "e1": "ex", # Alternative naming
497 "e2": "ey",
498 "h1": "hx",
499 "h2": "hy",
500 "h3": "hz",
501 }
502
503 std_component = component_map.get(component, component)
504
505 # Create run-prefixed attribute key
506 attribute_key = self._empower_translation_dict.get(key)
507 if attribute_key:
508 if section == "reference":
509 return f"transfer_function.remote_references.{std_component}.{attribute_key}"
510 else:
511 return f"run.{std_component}.{attribute_key}"
512
513 # Handle special cases for comments field
514 if key in ["cal name", "cal_name", "saturation", "min value", "max value"]:
515 # Append to comments field
516 if section == "reference":
517 return f"transfer_function.remote_references.{std_component}.comments"
518 else:
519 return f"run.{std_component}.comments"
520
521 # Default case: use run.component.key format
522 if section == "reference":
523 return f"transfer_function.remote_references.{std_component}.{key}"
524 else:
525 return f"run.{std_component}.{key}"
526
527 def _split_phoenix_columns(self, line: str) -> tuple[bool, list[str]]:
528 """
529 Split Phoenix line into columns based on whitespace gaps and separators.
530 Returns (is_multi_column, list_of_columns)
531 """
532 import re
533
534 # Check for basic indicators first
535 if not line or len(line) < 10:
536 return False, [line]
537
538 # Look for patterns that indicate multi-column format
539 parts = [(m.group(), m.start()) for m in re.finditer(r"\S+", line)]
540
541 if len(parts) < 4: # Need at least 4 words for two key-value pairs
542 return False, [line]
543
544 # Calculate word gaps
545 gaps = [
546 parts[i + 1][1] - (parts[i][1] + len(parts[i][0]))
547 for i in range(len(parts) - 1)
548 ]
549
550 # Find the largest gap
551 if not gaps:
552 return False, [line]
553
554 max_gap = max(gaps)
555 if max_gap <= 3: # Too small to be a column separator
556 return False, [line]
557
558 max_gap_idx = gaps.index(max_gap)
559 split_pos = parts[max_gap_idx + 1][1]
560
561 # Check if we have key-value pairs on both sides
562 left_text = line[:split_pos].strip()
563 right_text = line[split_pos:].strip()
564
565 # Verify both columns have separators
566 left_has_sep = ":" in left_text or "=" in left_text
567 right_has_sep = ":" in right_text or "=" in right_text
568
569 if left_has_sep and right_has_sep:
570 return True, [left_text, right_text]
571
572 return False, [line]
573
574 def _apply_phoenix_translation(self, key: str, value: str) -> None:
575 """Apply Phoenix-specific translations and handle special cases."""
576
577 # Remove units for resistance values
578 if "Pot Resist".lower() in key.lower() and isinstance(value, str):
579 value = value.split()[0]
580
581 # Handle voltage with AC/DC
582 if "voltage" in key.lower() and isinstance(value, str):
583 comps = value.replace(" ", "").split(",")
584 for comp in comps:
585 if "=" in comp:
586 typ, val = comp.split("=")
587 typ = typ.lower()
588 val = val.replace("mV", "")
589 std_key = f"run.{key[0:2].lower()}.{typ}.start"
590 self.info_dict[std_key] = val
591 return
592
593 std_key = self._phoenix_translation_dict.get(key.lower(), "phoenix_attribute")
594 if std_key:
595 if isinstance(std_key, list):
596 for kk in std_key:
597 self.info_dict[kk] = value
598 else:
599 self.info_dict[std_key] = value
600 # Add Phoenix sensor metadata for Hx/Hy/Hz
601 if " sen" in key.lower():
602 comp = key.lower().split()[0]
603 self.info_dict[f"{comp}.sensor.manufacturer"] = "Phoenix Geophysics"
604 self.info_dict[f"{comp}.sensor.type"] = "Induction Coil"
605 else:
606 self.info_dict[key] = value
607
608 def write_info(self) -> list[str]:
609 """
610 write out information
611 """
612
613 info_lines = [">INFO\n"]
614
615 for key, value in self.info_dict.items():
616 if key is None:
617 continue
618 if value in ["", None]:
619 info_lines.append(f"{' '*4}{key}\n")
620 continue
621 if isinstance(value, list):
622 value = f"[{', '.join(value)}]"
623 elif isinstance(value, str):
624 value = value.strip()
625 info_lines.append(f"{' '*4}{key}={value}\n")
626
627 return info_lines