Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mth5 \ mth5 \ utils \ extract_subset_mth5.py: 74%

72 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-10 00:01 -0800

1import pathlib 

2 

3import pandas 

4from loguru import logger 

5 

6from mth5.data.make_mth5_from_asc import _add_survey 

7from mth5.mth5 import MTH5 

8from mth5.timeseries import ChannelTS, RunTS 

9from mth5.utils.helpers import add_filters, station_in_mth5, survey_in_mth5 

10 

11 

12def extract_subset( 

13 source_file: pathlib.Path, 

14 target_file: pathlib.Path, 

15 subset_df: pandas.DataFrame, 

16 filters: str = "all", 

17): 

18 """ 

19 This function is a proof-of-concept of issue 219: exporting a subset 

20 

21 TODO: add check that subset_df is a subset of source_file 

22 TODO: add tests for source/target v0.1.0 

23 TODO: add tests for source/target v0.2.0 

24 TODO: Consider add tests for source v0.1.0/target v0.2.0 

25 TODO: Consider add tests for source v0.2.0/target v0.1.0 

26 

27 :param source_file: Where the data will be extracted from 

28 :param target_file: Where the data will be exported to 

29 :param subset_df: description of the data to extract 

30 :param filters: whether to bring all the filters or only those that are needed to describe the data. 

31 Right now this is "all", but 

32 TODO: support "required_only" filters, meaning that we only bring the filters from the selected channels. 

33 

34 :return: 

35 

36 """ 

37 

38 groupby = ["survey", "station", "run"] 

39 m_source = MTH5(source_file) 

40 m_source.open_mth5() 

41 

42 m_target = MTH5(target_file, file_version=m_source.file_version) 

43 m_target.open_mth5() 

44 

45 groupby = ["survey", "station", "run"] 

46 logger.info(f"Testing file_version {m_source.file_version}") 

47 for (survey_id, station_id, run_id), run_df in subset_df.groupby(groupby): 

48 survey = m_source.get_survey(survey_id) 

49 

50 # TODO: Thhe following assert is a nice-to-have, but is not robust to case survey_id is None 

51 # assert survey.metadata.id == survey_id 

52 

53 # Check if survey already in mth5, don't add again (its cleaner but won't actually matter in results) 

54 if not survey_in_mth5(m_target, survey.metadata.id): 

55 logger.info(f"Survey {survey_id} not in mth5 -- Adding") 

56 _add_survey( 

57 m_target, survey.metadata 

58 ) # could be done using mth5, but need to handle 0.1.0, 0.2.0 

59 else: 

60 print(f"Survey {survey_id} already in target mth5") 

61 

62 # Add filters 

63 if filters.lower() == "all": 

64 filters_to_add = _get_list_of_filters_to_add_to_target_mth5( 

65 m_source, m_target, survey_id=survey_id 

66 ) 

67 # TODO: make this only get the filters from the relevant channels 

68 if filters_to_add: 

69 add_filters(m_target, filters_to_add, survey_id=survey_id) 

70 # filters_dict = {x: m.filters_group.to_filter_object(x) for x in channel_metadata.filter.name} 

71 

72 source_station_obj = m_source.get_station(station_id, survey_id) 

73 if not station_in_mth5(m_target, station_id, survey_id): 

74 print(f"Need to make station {station_id}") 

75 target_station_obj = m_target.add_station( 

76 station_id, 

77 station_metadata=source_station_obj.metadata, 

78 survey=survey_id, 

79 ) 

80 else: 

81 print(f"station {station_id} already in target mth5") 

82 target_station_obj = m_target.get_station(station_id, survey=survey_id) 

83 

84 source_run_obj = m_source.get_run(station_id, run_id, survey=survey_id) 

85 logger.info(f"source_run_obj: {source_run_obj}") 

86 

87 target_channels = run_df.component.to_list() 

88 source_channels = source_run_obj.channel_summary.component.to_list() 

89 if set(source_channels) == set(target_channels): 

90 logger.info( 

91 "channels in source and target are same -- just map whole RunTS " 

92 ) 

93 source_runts = source_run_obj.to_runts() 

94 target_runts = source_runts 

95 else: 

96 msg = "there are a lot of edge cases to worry about here -- Help Wanted" 

97 logger.info(msg) 

98 # raise NotImplementedError(msg) 

99 # Code in this case could be klindo like the following: 

100 ch_list = [] 

101 for comp in run_df.component.to_list(): 

102 source_ch_obj = source_run_obj.get_channel(comp) 

103 source_chts = source_ch_obj.to_channel_ts() 

104 target_chts_metadata = source_chts.channel_metadata.copy() 

105 target_chts = ChannelTS( 

106 channel_type=target_chts_metadata.type, 

107 data=source_chts.data_array.data, 

108 channel_metadata=target_chts_metadata.to_dict(), 

109 ) 

110 ch_list.append(target_chts) 

111 target_runts = RunTS(array_list=ch_list) 

112 target_runts.run_metadata.id = source_run_obj.metadata.id 

113 

114 # TODO: 

115 # try: 

116 # target_run_group = target_station_obj.get_run(run_id) 

117 # except MTH5Error: 

118 # target_run_group = target_station_obj.add_run(run_id) 

119 target_run_group = target_station_obj.add_run(run_id) 

120 target_run_group.from_runts(target_runts) 

121 

122 m_source.close_mth5() 

123 m_target.close_mth5() 

124 return 

125 

126 

127def _get_list_of_filters_to_add_to_target_mth5(m_source, m_target, survey_id=None): 

128 """ 

129 if v0.2.0 m_target must already have survey group 

130 Returns 

131 ------- 

132 

133 """ 

134 filters_to_add = [] 

135 if m_source.file_version == "0.1.0": 

136 filter_names = m_source.filters_group.filter_dict.keys() 

137 filter_names_to_add = [ 

138 x 

139 for x in filter_names 

140 if x not in m_target.filters_group.filter_dict.keys() 

141 ] 

142 for filter_name in filter_names_to_add: 

143 filter_instance = m_source.filters_group.to_filter_object(filter_name) 

144 filters_to_add.append(filter_instance) 

145 

146 elif m_source.file_version == "0.2.0": 

147 source_survey = m_source.get_survey(survey_id) 

148 target_survey = m_target.get_survey(survey_id) 

149 filter_names = source_survey.filters_group.filter_dict.keys() 

150 filter_names_to_add = [ 

151 x 

152 for x in filter_names 

153 if x not in target_survey.filters_group.filter_dict.keys() 

154 ] 

155 for filter_name in filter_names_to_add: 

156 filter_instance = source_survey.filters_group.to_filter_object(filter_name) 

157 filters_to_add.append(filter_instance) 

158 return filters_to_add