Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mth5 \ mth5 \ io \ collection.py: 84%

95 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-10 00:01 -0800

1# -*- coding: utf-8 -*- 

2""" 

3Phoenix file collection 

4 

5Created on Thu Aug 4 16:48:47 2022 

6 

7@author: jpeacock 

8""" 

9 

10# ============================================================================= 

11# Imports 

12# ============================================================================= 

13from collections import OrderedDict 

14from pathlib import Path 

15 

16import pandas as pd 

17from loguru import logger 

18 

19 

20# ============================================================================= 

21 

22 

23class Collection: 

24 """ 

25 A general collection class to keep track of files with methods to create 

26 runs and run ids. 

27 

28 """ 

29 

30 def __init__(self, file_path=None, **kwargs): 

31 self.logger = logger 

32 self.file_path = file_path 

33 self.file_ext = "*" 

34 

35 self._columns = [ 

36 "survey", 

37 "station", 

38 "run", 

39 "start", 

40 "end", 

41 "channel_id", 

42 "component", 

43 "fn", 

44 "sample_rate", 

45 "file_size", 

46 "n_samples", 

47 "sequence_number", 

48 "dipole", 

49 "coil_number", 

50 "latitude", 

51 "longitude", 

52 "elevation", 

53 "instrument_id", 

54 "calibration_fn", 

55 ] 

56 

57 for key, value in kwargs.items(): 

58 setattr(self, key, value) 

59 

60 def __str__(self): 

61 lines = [f"Collection for file type {self.file_ext} in {self._file_path}"] 

62 

63 return "\n".join(lines) 

64 

65 def __repr__(self): 

66 return f"Collection({self.file_path})" 

67 

68 def get_empty_entry_dict(self): 

69 """ 

70 

71 :return: an empty dictionary with the proper keys for an entry into 

72 a dataframe 

73 :rtype: dict 

74 

75 """ 

76 return dict([(key, None) for key in self._columns]) 

77 

78 @property 

79 def file_path(self): 

80 """ 

81 Path object to file directory 

82 """ 

83 return self._file_path 

84 

85 @file_path.setter 

86 def file_path(self, file_path): 

87 """ 

88 :param file_path: path to files 

89 :type file_path: string or Path object 

90 

91 sets file_path as a Path object 

92 """ 

93 

94 if file_path is None: 

95 self._file_path = None 

96 return 

97 if not isinstance(file_path, Path): 

98 file_path = Path(file_path) 

99 self._file_path = file_path 

100 

101 if not self._file_path.exists(): 

102 raise IOError() 

103 

104 def get_files(self, extension): 

105 """ 

106 Get files with given extension. Uses Pathlib.Path.rglob, so it finds 

107 all files within the `file_path` by searching all sub-directories. 

108 

109 :param extension: file extension(s) 

110 :type extension: string or list 

111 :return: list of files in the `file_path` with the given extensions 

112 :rtype: list of Path objects 

113 

114 """ 

115 

116 if self.file_path is None: 

117 return [] 

118 

119 fn_list = [] 

120 

121 # If an empty extension is requested, return all files under the 

122 # directory (rglob "*"), letting callers filter as needed. 

123 if extension == "": 

124 fn_list = list(self.file_path.rglob("*")) 

125 return sorted([p for p in fn_list if p.is_file()]) 

126 

127 # If a list/tuple was passed, expand each provided extension to 

128 # include lower/upper forms so searches are case-insensitive. 

129 if isinstance(extension, (list, tuple)): 

130 exts = [] 

131 for e in list(extension): 

132 if not e: 

133 continue 

134 # add the original plus lower/upper variants, avoiding duplicates 

135 for candidate in (e, e.lower(), e.upper()): 

136 if candidate not in exts: 

137 exts.append(candidate) 

138 else: 

139 # For a single extension string, search case-insensitively by 

140 # including lower/upper forms to accommodate filesystems that 

141 # may be case-sensitive (e.g., Linux CI runners). 

142 exts = [extension, extension.lower(), extension.upper()] 

143 

144 seen = set() 

145 for ext in exts: 

146 if not ext: 

147 continue 

148 for p in self.file_path.rglob(f"*.{ext}"): 

149 if p.is_file() and p not in seen: 

150 seen.add(p) 

151 fn_list.append(p) 

152 

153 return sorted(fn_list) 

154 

155 def to_dataframe(self, sample_rates=None, run_name_zeros=4, calibration_path=None): 

156 """ 

157 Get a data frame of the file summary with column names: 

158 

159 - **survey**: survey id 

160 - **station**: station id 

161 - **run**: run id 

162 - **start**: start time UTC 

163 - **end**: end time UTC 

164 - **channel_id**: channel id or list of channel id's in file 

165 - **component**: channel component or list of components in file 

166 - **fn**: path to file 

167 - **sample_rate**: sample rate in samples per second 

168 - **file_size**: file size in bytes 

169 - **n_samples**: number of samples in file 

170 - **sequence_number**: sequence number of the file 

171 - **instrument_id**: instrument id 

172 - **calibration_fn**: calibration file 

173 

174 :param sample_rates: list of sample rates to process, defaults to None 

175 :type sample_rates: list, optional 

176 :param run_name_zeros: number of zeros in run name, defaults to 4 

177 :type run_name_zeros: int, optional 

178 :param calibration_path: path to calibration files, defaults to None 

179 :type calibration_path: str or Path, optional 

180 :return: summary table of file names, 

181 :rtype: pandas.DataFrame 

182 

183 """ 

184 import pandas as pd 

185 

186 # Base implementation returns empty DataFrame with proper columns 

187 # Subclasses should override this method 

188 return pd.DataFrame(columns=self._columns) 

189 

190 def assign_run_names(self, df, zeros=4): 

191 """ 

192 Assign run names to a dataframe. This is a base method that should 

193 be overridden by subclasses. 

194 

195 :param df: dataframe with file information 

196 :type df: pandas.DataFrame 

197 :param zeros: number of zeros in run name, defaults to 4 

198 :type zeros: int, optional 

199 :return: dataframe with run names assigned 

200 :rtype: pandas.DataFrame 

201 """ 

202 # Base implementation - subclasses should override this 

203 if "run" not in df.columns: 

204 df["run"] = "sr1_0001" # Default run name 

205 return df 

206 

207 def _set_df_dtypes(self, df): 

208 """ 

209 Set some of the columns in the dataframe to desired types 

210 

211 - **start**: pandas.datetime 

212 - **end**: pandas.datetime 

213 - **instrument_id**: string 

214 - **calibration_fn**: string 

215 

216 :param df: summary table 

217 :type df: :class:`pandas.DataFrame` 

218 :return: summary table with proper types 

219 :rtype: :class:`pandas.DataFrame` 

220 

221 """ 

222 

223 df.start = pd.to_datetime(df.start, errors="coerce") 

224 df.end = pd.to_datetime(df.end, errors="coerce") 

225 df.instrument_id = df.instrument_id.astype(str) 

226 df.calibration_fn = df.calibration_fn.astype(str) 

227 

228 return df 

229 

230 def _sort_df(self, df, zeros): 

231 """ 

232 sort to a given dataframe by start date and then by run name. The 

233 index is reset. 

234 

235 :param df: summary table 

236 :type df: :class:`pandas.DataFrame` 

237 :param zeros: number of zeros in run id 

238 :type zeros: integer 

239 :return: summary table sorted by start time and run id 

240 :rtype: :class:`pandas.DataFrame` 

241 

242 """ 

243 

244 df.sort_values(by=["start"], inplace=True) 

245 df.reset_index(inplace=True, drop=True) 

246 

247 # assign run names 

248 df = self.assign_run_names(df, zeros=zeros) 

249 

250 df.sort_values(by=["run", "start"], inplace=True) 

251 df.reset_index(inplace=True, drop=True) 

252 

253 return df 

254 

255 def get_runs( 

256 self, 

257 sample_rates, 

258 run_name_zeros=4, 

259 calibration_path=None, 

260 ): 

261 """ 

262 Get a list of runs contained within the given folder. First the 

263 dataframe will be developed from which the runs are extracted. 

264 

265 For continous data all you need is the first file in the sequence. The 

266 reader will read in the entire sequence. 

267 

268 For segmented data it will only read in the given segment, which is 

269 slightly different from the original reader. 

270 

271 :param sample_rates: list of sample rates to read, defaults to [150, 24000] 

272 :param run_name_zeros: Number of zeros in the run name, defaults to 4 

273 :type run_name_zeros: integer, optional 

274 :return: List of run dataframes with only the first block of files 

275 :rtype: :class:`collections.OrderedDict` 

276 

277 :Example: 

278 

279 >>> from mth5.io.phoenix import PhoenixCollection 

280 >>> phx_collection = PhoenixCollection(r"/path/to/station") 

281 >>> run_dict = phx_collection.get_runs(sample_rates=[150, 24000]) 

282 

283 """ 

284 

285 df = self.to_dataframe( 

286 sample_rates=sample_rates, 

287 run_name_zeros=run_name_zeros, 

288 calibration_path=calibration_path, 

289 ) 

290 

291 run_dict = OrderedDict() 

292 

293 for station in sorted(df.station.unique()): 

294 run_dict[station] = OrderedDict() 

295 

296 for run_id in sorted( 

297 df[df.station == station].run.unique(), 

298 key=lambda x: x[-run_name_zeros:], 

299 ): 

300 run_df = df[(df.station == station) & (df.run == run_id)] 

301 run_dict[station][run_id] = run_df 

302 return run_dict 

303 

304 def get_remote_reference_list(self, df, max_hours=6, min_hours=1.5): 

305 """ 

306 get remote reference pairs 

307 

308 :param max_hours: DESCRIPTION, defaults to 6 

309 :type max_hours: TYPE, optional 

310 :param min_hours: DESCRIPTION, defaults to 1.5 

311 :type min_hours: TYPE, optional 

312 :return: DESCRIPTION 

313 :rtype: TYPE 

314 

315 """ 

316 

317 a = df.groupby("station", as_index=False).first() 

318 station_list = [] 

319 for row in a.itertuples(): 

320 td = a.copy() 

321 td.dt = abs(row.start - a.start) 

322 remote = ( 

323 td[ 

324 (td.dt < pd.Timedelta(f"{max_hours}h")) 

325 & (td.dt > pd.Timedelta(f"{min_hours}h")) 

326 ] 

327 .iloc[0] 

328 .station 

329 ) 

330 station_list.append({"local": row.station, "remote": remote}) 

331 return station_list