Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mth5 \ mth5 \ utils \ extract_subset_mth5.py: 74%
72 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:01 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:01 -0800
1import pathlib
3import pandas
4from loguru import logger
6from mth5.data.make_mth5_from_asc import _add_survey
7from mth5.mth5 import MTH5
8from mth5.timeseries import ChannelTS, RunTS
9from mth5.utils.helpers import add_filters, station_in_mth5, survey_in_mth5
12def extract_subset(
13 source_file: pathlib.Path,
14 target_file: pathlib.Path,
15 subset_df: pandas.DataFrame,
16 filters: str = "all",
17):
18 """
19 This function is a proof-of-concept of issue 219: exporting a subset
21 TODO: add check that subset_df is a subset of source_file
22 TODO: add tests for source/target v0.1.0
23 TODO: add tests for source/target v0.2.0
24 TODO: Consider add tests for source v0.1.0/target v0.2.0
25 TODO: Consider add tests for source v0.2.0/target v0.1.0
27 :param source_file: Where the data will be extracted from
28 :param target_file: Where the data will be exported to
29 :param subset_df: description of the data to extract
30 :param filters: whether to bring all the filters or only those that are needed to describe the data.
31 Right now this is "all", but
32 TODO: support "required_only" filters, meaning that we only bring the filters from the selected channels.
34 :return:
36 """
38 groupby = ["survey", "station", "run"]
39 m_source = MTH5(source_file)
40 m_source.open_mth5()
42 m_target = MTH5(target_file, file_version=m_source.file_version)
43 m_target.open_mth5()
45 groupby = ["survey", "station", "run"]
46 logger.info(f"Testing file_version {m_source.file_version}")
47 for (survey_id, station_id, run_id), run_df in subset_df.groupby(groupby):
48 survey = m_source.get_survey(survey_id)
50 # TODO: Thhe following assert is a nice-to-have, but is not robust to case survey_id is None
51 # assert survey.metadata.id == survey_id
53 # Check if survey already in mth5, don't add again (its cleaner but won't actually matter in results)
54 if not survey_in_mth5(m_target, survey.metadata.id):
55 logger.info(f"Survey {survey_id} not in mth5 -- Adding")
56 _add_survey(
57 m_target, survey.metadata
58 ) # could be done using mth5, but need to handle 0.1.0, 0.2.0
59 else:
60 print(f"Survey {survey_id} already in target mth5")
62 # Add filters
63 if filters.lower() == "all":
64 filters_to_add = _get_list_of_filters_to_add_to_target_mth5(
65 m_source, m_target, survey_id=survey_id
66 )
67 # TODO: make this only get the filters from the relevant channels
68 if filters_to_add:
69 add_filters(m_target, filters_to_add, survey_id=survey_id)
70 # filters_dict = {x: m.filters_group.to_filter_object(x) for x in channel_metadata.filter.name}
72 source_station_obj = m_source.get_station(station_id, survey_id)
73 if not station_in_mth5(m_target, station_id, survey_id):
74 print(f"Need to make station {station_id}")
75 target_station_obj = m_target.add_station(
76 station_id,
77 station_metadata=source_station_obj.metadata,
78 survey=survey_id,
79 )
80 else:
81 print(f"station {station_id} already in target mth5")
82 target_station_obj = m_target.get_station(station_id, survey=survey_id)
84 source_run_obj = m_source.get_run(station_id, run_id, survey=survey_id)
85 logger.info(f"source_run_obj: {source_run_obj}")
87 target_channels = run_df.component.to_list()
88 source_channels = source_run_obj.channel_summary.component.to_list()
89 if set(source_channels) == set(target_channels):
90 logger.info(
91 "channels in source and target are same -- just map whole RunTS "
92 )
93 source_runts = source_run_obj.to_runts()
94 target_runts = source_runts
95 else:
96 msg = "there are a lot of edge cases to worry about here -- Help Wanted"
97 logger.info(msg)
98 # raise NotImplementedError(msg)
99 # Code in this case could be klindo like the following:
100 ch_list = []
101 for comp in run_df.component.to_list():
102 source_ch_obj = source_run_obj.get_channel(comp)
103 source_chts = source_ch_obj.to_channel_ts()
104 target_chts_metadata = source_chts.channel_metadata.copy()
105 target_chts = ChannelTS(
106 channel_type=target_chts_metadata.type,
107 data=source_chts.data_array.data,
108 channel_metadata=target_chts_metadata.to_dict(),
109 )
110 ch_list.append(target_chts)
111 target_runts = RunTS(array_list=ch_list)
112 target_runts.run_metadata.id = source_run_obj.metadata.id
114 # TODO:
115 # try:
116 # target_run_group = target_station_obj.get_run(run_id)
117 # except MTH5Error:
118 # target_run_group = target_station_obj.add_run(run_id)
119 target_run_group = target_station_obj.add_run(run_id)
120 target_run_group.from_runts(target_runts)
122 m_source.close_mth5()
123 m_target.close_mth5()
124 return
127def _get_list_of_filters_to_add_to_target_mth5(m_source, m_target, survey_id=None):
128 """
129 if v0.2.0 m_target must already have survey group
130 Returns
131 -------
133 """
134 filters_to_add = []
135 if m_source.file_version == "0.1.0":
136 filter_names = m_source.filters_group.filter_dict.keys()
137 filter_names_to_add = [
138 x
139 for x in filter_names
140 if x not in m_target.filters_group.filter_dict.keys()
141 ]
142 for filter_name in filter_names_to_add:
143 filter_instance = m_source.filters_group.to_filter_object(filter_name)
144 filters_to_add.append(filter_instance)
146 elif m_source.file_version == "0.2.0":
147 source_survey = m_source.get_survey(survey_id)
148 target_survey = m_target.get_survey(survey_id)
149 filter_names = source_survey.filters_group.filter_dict.keys()
150 filter_names_to_add = [
151 x
152 for x in filter_names
153 if x not in target_survey.filters_group.filter_dict.keys()
154 ]
155 for filter_name in filter_names_to_add:
156 filter_instance = source_survey.filters_group.to_filter_object(filter_name)
157 filters_to_add.append(filter_instance)
158 return filters_to_add