Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mth5 \ mth5 \ io \ zen \ zen_tools.py: 90%

216 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-27 20:09 -0800

1# -*- coding: utf-8 -*- 

2""" 

3Created on Tue Apr 18 15:40:28 2023 

4 

5@author: jpeacock 

6""" 

7import datetime 

8import shutil 

9import string 

10import time 

11 

12# ============================================================================= 

13# Imports 

14# ============================================================================= 

15from pathlib import Path 

16 

17import dateutil.parser 

18import numpy as np 

19from loguru import logger 

20 

21from mth5.io.zen import Z3D 

22 

23 

24try: 

25 import win32api 

26except ImportError: 

27 print("WARNING: Cannot find win32api, will not be able to detect" " drive names") 

28# ============================================================================= 

29 

30 

31# get the external drives for SD cards 

32def get_drives() -> list[str]: 

33 """ 

34 Get a list of logical drives detected on the machine. 

35 

36 Note: This only works for Windows. 

37 

38 Returns 

39 ------- 

40 list of str 

41 List of drives as letters. 

42 

43 Examples 

44 -------- 

45 >>> get_drives() 

46 ['C', 'D', 'E'] 

47 """ 

48 drives = [] 

49 bitmask = win32api.GetLogicalDrives() 

50 for letter in string.ascii_uppercase: 

51 if bitmask & 1: 

52 drives.append(letter) 

53 bitmask >>= 1 

54 return drives 

55 

56 

57# get the names of the drives which should correspond to channels 

58def get_drive_names() -> dict[str, str] | None: 

59 """ 

60 Get a list of drive names detected assuming the cards are named by box and channel. 

61 

62 Returns 

63 ------- 

64 dict of str to str or None 

65 Keys are the drive letters and values are the drive names. Returns None if no drives are found. 

66 

67 Examples 

68 -------- 

69 >>> get_drive_names() 

70 {'D': 'CH1', 'E': 'CH2'} 

71 """ 

72 drives = get_drives() 

73 

74 drive_dict = {} 

75 for drive in drives: 

76 try: 

77 drive_name = win32api.GetVolumeInformation(drive + ":\\")[0] 

78 if drive_name.find("CH") >= 0: 

79 drive_dict[drive] = drive_name 

80 except: 

81 pass 

82 if not bool(drive_dict): 

83 return None 

84 return drive_dict 

85 

86 

87def split_station(station: str) -> tuple[str, str]: 

88 """ 

89 Split station name into name and number. 

90 

91 Parameters 

92 ---------- 

93 station : str 

94 Full station name. 

95 

96 Returns 

97 ------- 

98 tuple of str 

99 Tuple containing the station name and number. 

100 

101 Examples 

102 -------- 

103 >>> split_station('MT01') 

104 ('MT', '01') 

105 """ 

106 

107 for ii, ss in enumerate(station): 

108 try: 

109 int(ss) 

110 find = ii 

111 break 

112 except ValueError: 

113 continue 

114 name = station[0:find] 

115 number = station[find:] 

116 

117 return (name, number) 

118 

119 

120def copy_from_sd( 

121 station: str, 

122 save_path: str | Path = Path(r"d:\\Peacock\\MTData"), 

123 channel_dict: dict[str, str] = { 

124 "1": "HX", 

125 "2": "HY", 

126 "3": "HZ", 

127 "4": "EX", 

128 "5": "EY", 

129 "6": "HZ", 

130 }, 

131 copy_date: str | None = None, 

132 copy_type: str = "all", 

133) -> tuple[list[Path], Path]: 

134 """ 

135 Copy files from SD cards into a common folder (save_path). 

136 

137 Parameters 

138 ---------- 

139 station : str 

140 Full name of station from which data is being saved. 

141 save_path : str or Path, optional 

142 Full path to save data to, by default 'd:\\Peacock\\MTData'. 

143 channel_dict : dict of str to str, optional 

144 Keys are the channel numbers as strings and the values are the component that corresponds to that channel. Values are placed in upper case in the code. 

145 copy_date : str, optional 

146 Date to copy from depending on copy_type, in 'YYYY-MM-DD' format. 

147 copy_type : {'all', 'before', 'after', 'on'}, optional 

148 Type of copy operation: 

149 - 'all': Copy all files on the SD card. 

150 - 'before': Copy files before and on this date. 

151 - 'after': Copy files on and after this date. 

152 - 'on': Copy files on this date only. 

153 

154 Returns 

155 ------- 

156 tuple of list of Path and Path 

157 List of filenames copied to save_path and the save_path itself. 

158 

159 Examples 

160 -------- 

161 >>> copy_from_sd('MT01', save_path=r"/home/mt/survey_1") 

162 ([Path('/home/mt/survey_1/MT01_2026-01-27_120000_256_HX.Z3D')], Path('/home/mt/survey_1')) 

163 """ 

164 save_path = Path(save_path).joinpath(station) 

165 

166 if not save_path.exists(): 

167 save_path.mkdir(parents=True) 

168 

169 fn_list = [] 

170 drive_names = get_drive_names() 

171 if drive_names is None: 

172 logger.error("No drive names found. No files copied.") 

173 return [], save_path 

174 

175 if copy_date is not None: 

176 c_date = dateutil.parser.parse(copy_date) 

177 

178 s_name, s_int = split_station(station) 

179 

180 for key, drive_name in drive_names.items(): 

181 dr = Path(f"{key}:") 

182 logger.info(f"Reading from drive {key}.") 

183 

184 for fn in dr.rglob("*.z3d"): 

185 if copy_date is not None: 

186 file_date = datetime.datetime.fromtimestamp(fn.stat().st_mtime) 

187 if ( 

188 (copy_type == "after" and file_date < c_date) 

189 or (copy_type == "before" and file_date > c_date) 

190 or (copy_type == "on" and file_date.date() != c_date.date()) 

191 ): 

192 continue 

193 

194 try: 

195 file_size = fn.stat().st_size 

196 if file_size >= 1600: 

197 zt = Z3D(fn=fn) 

198 zt.read_all_info() 

199 

200 if ( 

201 zt.metadata 

202 and zt.metadata.station 

203 and s_int in zt.metadata.station 

204 ): 

205 channel = ( 

206 zt.metadata.ch_cmp.upper() 

207 if zt.metadata.ch_cmp 

208 else "UNKNOWN" 

209 ) 

210 st = ( 

211 zt.schedule.Time.replace(":", "") 

212 if zt.schedule and zt.schedule.Time 

213 else "000000" 

214 ) 

215 sd = ( 

216 zt.schedule.Date.replace("-", "") 

217 if zt.schedule and zt.schedule.Date 

218 else "000000" 

219 ) 

220 

221 sv_fn = f"{station}_{sd}_{st}_{int(zt.sample_rate or 0)}_{channel}.Z3D" 

222 new_fn = save_path / sv_fn 

223 fn_list.append(new_fn) 

224 

225 shutil.copy(fn, new_fn) 

226 logger.info(f"Copied {fn} to {new_fn}") 

227 logger.info(f"File size is {file_size}") 

228 else: 

229 logger.warning(f"Skipped {fn} because file too small {file_size}") 

230 except Exception as e: 

231 logger.warning(f"Error processing file {fn}: {e}") 

232 

233 return fn_list, save_path 

234 

235 

236# ============================================================================== 

237# delete files from sd cards 

238# ============================================================================== 

239def delete_files_from_sd( 

240 delete_date: str | None = None, 

241 delete_type: str | None = None, 

242 delete_folder: str | Path = Path().cwd(), 

243 verbose: bool = True, 

244) -> list[Path]: 

245 """ 

246 Delete files from SD card. If delete_date is not None, anything on this date and before will be deleted. 

247 

248 Parameters 

249 ---------- 

250 delete_date : str, optional 

251 Date to delete files from, in 'YYYY-MM-DD' format. 

252 delete_type : {'all', 'before', 'after', 'on'}, optional 

253 Type of delete operation: 

254 - 'all': Delete all files on SD card. 

255 - 'before': Delete files on and before delete_date. 

256 - 'after': Delete files on and after delete_date. 

257 - 'on': Delete files on delete_date. 

258 delete_folder : str or Path, optional 

259 Full path to a folder where files will be moved to just in case. If None, files will be deleted permanently. 

260 verbose : bool, optional 

261 If True, print detailed logs, by default True. 

262 

263 Returns 

264 ------- 

265 list of Path 

266 List of deleted files. 

267 

268 Examples 

269 -------- 

270 >>> delete_files_from_sd(delete_date='2026-01-27', delete_type='before', delete_folder=None) 

271 [Path('D:/file1.Z3D'), Path('D:/file2.Z3D')] 

272 """ 

273 delete_path = Path(delete_folder) if delete_folder else Path.cwd() 

274 if not delete_path.exists(): 

275 delete_path.mkdir(parents=True) 

276 

277 delete_fn_list = [] 

278 drive_names = get_drive_names() 

279 if drive_names is None: 

280 logger.error("No drives found.") 

281 raise OSError("No drives found.") 

282 

283 for key, drive_name in drive_names.items(): 

284 dr = Path(f"{key}:") 

285 for fn in dr.iterdir(): 

286 if fn.suffix.lower() == ".z3d": 

287 zt = Z3D(fn) 

288 zt.read_all_info() 

289 zt_date = ( 

290 int(zt.schedule.Date.replace("-", "")) 

291 if zt.schedule and zt.schedule.Date 

292 else 0 

293 ) 

294 

295 if delete_type == "all" or delete_date is None: 

296 target = delete_path / fn.name if delete_folder else None 

297 if target: 

298 shutil.move(fn, target) 

299 else: 

300 fn.unlink() 

301 delete_fn_list.append(fn) 

302 elif delete_type == "before" and zt_date <= int( 

303 delete_date.replace("-", "") 

304 ): 

305 target = delete_path / fn.name if delete_folder else None 

306 if target: 

307 shutil.move(fn, target) 

308 else: 

309 fn.unlink() 

310 delete_fn_list.append(fn) 

311 elif delete_type == "after" and zt_date >= int( 

312 delete_date.replace("-", "") 

313 ): 

314 target = delete_path / fn.name if delete_folder else None 

315 if target: 

316 shutil.move(fn, target) 

317 else: 

318 fn.unlink() 

319 delete_fn_list.append(fn) 

320 elif delete_type == "on" and zt_date == int( 

321 delete_date.replace("-", "") 

322 ): 

323 target = delete_path / fn.name if delete_folder else None 

324 if target: 

325 shutil.move(fn, target) 

326 else: 

327 fn.unlink() 

328 delete_fn_list.append(fn) 

329 

330 return delete_fn_list 

331 

332 

333# ============================================================================== 

334# read and write a zen schedule 

335# ============================================================================== 

336class ZenSchedule: 

337 """ 

338 Deals with reading, writing, and copying schedules. 

339 

340 Creates a repeating schedule based on the master_schedule. It will then change the first scheduling action to coincide with the master schedule, such that all deployed boxes will have the same schedule. 

341 

342 Attributes 

343 ---------- 

344 verbose : bool 

345 If True, print detailed logs. 

346 sr_dict : dict of str to str 

347 Dictionary of sampling rate values. 

348 sa_list : list of dict 

349 List of schedule actions including time and df. 

350 ch_cmp_dict : dict of str to str 

351 Dictionary for channel components with keys being the channel number and values being the channel label. 

352 ch_num_dict : dict of str to str 

353 Dictionary for channel components with keys being the channel label and values being the channel number. 

354 dt_format : str 

355 Date and time format, default is 'YYYY-MM-DD,hh:mm:ss'. 

356 initial_dt : str 

357 Initial date, or dummy zero date for scheduling. 

358 dt_offset : str 

359 Start date and time of schedule in dt_format. 

360 df_list : tuple of int 

361 Sequential list of sampling rates to repeat in schedule. 

362 df_time_list : tuple of str 

363 Sequential list of time intervals to measure for each corresponding sampling rate. 

364 master_schedule : list of dict 

365 The schedule that all data loggers should schedule at. Will tailor the schedule to match the master schedule according to dt_offset. 

366 """ 

367 

368 def __init__(self): 

369 self.verbose = True 

370 self.sr_dict = { 

371 "256": "0", 

372 "512": "1", 

373 "1024": "2", 

374 "2048": "3", 

375 "4096": "4", 

376 } 

377 self.sa_list = [] 

378 self.ch_cmp_dict = { 

379 "1": "hx", 

380 "2": "hy", 

381 "3": "hz", 

382 "4": "ex", 

383 "5": "ey", 

384 "6": "hz", 

385 } 

386 self.ch_num_dict = dict( 

387 [(self.ch_cmp_dict[key], key) for key in self.ch_cmp_dict] 

388 ) 

389 

390 self.dt_format = "%Y-%m-%d,%H:%M:%S" 

391 self.initial_dt = "2000-01-01,00:00:00" 

392 self.dt_offset = time.strftime(self.dt_format, time.gmtime()) 

393 self.df_list = (4096, 256) 

394 self.df_time_list = ("00:10:00", "07:50:00") 

395 self.master_schedule = self.make_schedule( 

396 self.df_list, self.df_time_list, repeat=16 

397 ) 

398 self._resync_pause = 20 

399 

400 # ================================================== 

401 def add_time( 

402 self, date_time, add_minutes=0, add_seconds=0, add_hours=0, add_days=0 

403 ): 

404 """ 

405 add time to a time string 

406 assuming date_time is in the format YYYY-MM-DD,HH:MM:SS 

407 """ 

408 

409 fulldate = datetime.datetime.strptime(date_time, self.dt_format) 

410 

411 fulldate = fulldate + datetime.timedelta( 

412 days=add_days, 

413 hours=add_hours, 

414 minutes=add_minutes, 

415 seconds=add_seconds, 

416 ) 

417 return fulldate 

418 

419 # ================================================== 

420 def make_schedule(self, df_list, df_length_list, repeat=5, t1_dict=None): 

421 """ 

422 make a repeated schedule given list of sampling frequencies and 

423 duration for each. 

424 Arguments: 

425 ----------- 

426 **df_list** : list 

427 list of sampling frequencies in Hz, note needs to be 

428 powers of 2 starting at 256 

429 **df_length_list** : list 

430 list of durations in hh:mm:ss format 

431 **repeat** : int 

432 number of times to repeat the sequence 

433 **t1_dict** : dictionary 

434 dictionary returned from get_schedule_offset 

435 Returns: 

436 -------- 

437 **time_list**: list of dictionaries with keys: 

438 * 'dt' --> date and time of schedule event 

439 * 'df' --> sampling rate for that event 

440 """ 

441 

442 df_list = np.array(df_list) 

443 df_length_list = np.array(df_length_list) 

444 ndf = len(df_list) 

445 

446 if t1_dict is not None: 

447 time_list = [{"dt": self.initial_dt, "df": t1_dict["df"]}] 

448 

449 kk = np.where(np.array(df_list) == t1_dict["df"])[0][0] - ndf + 1 

450 df_list = np.append(df_list[kk:], df_list[:kk]) 

451 df_length_list = np.append(df_length_list[kk:], df_length_list[:kk]) 

452 time_list.append(dict([("dt", t1_dict["dt"]), ("df", df_list[0])])) 

453 ii = 1 

454 else: 

455 time_list = [{"dt": self.initial_dt, "df": df_list[0]}] 

456 ii = 0 

457 for rr in range(1, repeat + 1): 

458 for df, df_length, jj in zip(df_list, df_length_list, range(ndf)): 

459 dtime = time.strptime(df_length, "%H:%M:%S") 

460 ndt = self.add_time( 

461 time_list[ii]["dt"], 

462 add_hours=dtime.tm_hour, 

463 add_minutes=dtime.tm_min, 

464 add_seconds=dtime.tm_sec, 

465 ) 

466 time_list.append( 

467 { 

468 "dt": ndt.strftime(self.dt_format), 

469 "df": df_list[jj - ndf + 1], 

470 } 

471 ) 

472 ii += 1 

473 for nn, ns in enumerate(time_list): 

474 sdate, stime = ns["dt"].split(",") 

475 ns["date"] = sdate 

476 ns["time"] = stime 

477 ns["sr"] = self.sr_dict[str(ns["df"])] 

478 

479 return time_list 

480 

481 # ================================================== 

482 def get_schedule_offset(self, time_offset, schedule_time_list): 

483 """ 

484 gets the offset in time from master schedule list and time_offset so 

485 that all schedules will record at the same time according to master 

486 schedule list schedule_time_list 

487 Attributes: 

488 ----------- 

489 **time_offset** : hh:mm:ss 

490 the time offset given to the zen reciever 

491 **schedule_time_list** : list 

492 list of actual schedule times returned 

493 from make_schedule 

494 Returns: 

495 -------- 

496 **s1** : dictionary 

497 dictionary with keys: 

498 * 'dt' --> date and time of offset from next schedule 

499 event from schedule_time_list 

500 * 'df' --> sampling rate of that event 

501 """ 

502 

503 dt_offset = "{0},{1}".format("2000-01-01", time_offset) 

504 t0 = time.mktime(time.strptime("2000-01-01,00:00:00", self.dt_format)) 

505 

506 for ii, tt in enumerate(schedule_time_list): 

507 ssec = time.mktime(time.strptime(tt["dt"], self.dt_format)) 

508 osec = time.mktime(time.strptime(dt_offset, self.dt_format)) 

509 

510 if ssec > osec: 

511 sdiff = time.localtime(t0 + (ssec - osec)) 

512 t1 = self.add_time( 

513 "2000-01-01,00:00:00", 

514 add_hours=sdiff.tm_hour, 

515 add_minutes=sdiff.tm_min, 

516 add_seconds=sdiff.tm_sec, 

517 ) 

518 s1 = { 

519 "dt": t1.strftime(self.dt_format), 

520 "df": schedule_time_list[ii - 1]["df"], 

521 } 

522 return s1 

523 

524 def write_schedule_for_gui( 

525 self, 

526 zen_start: str | None = None, 

527 df_list: list[int] | None = None, 

528 df_time_list: list[str] | None = None, 

529 repeat: int = 8, 

530 gain: int = 0, 

531 save_path: str | Path | None = None, 

532 schedule_fn: str = "zen_schedule.MTsch", 

533 version: int = 4, 

534 ) -> None: 

535 """ 

536 Write a zen schedule file. 

537 

538 Parameters 

539 ---------- 

540 zen_start : str, optional 

541 Start time you want the zen to start collecting data, in UTC time. If None, current time is used. 

542 df_list : list of int, optional 

543 List of sampling rates in Hz. 

544 df_time_list : list of str, optional 

545 List of time intervals corresponding to df_list in hh:mm:ss format. 

546 repeat : int, optional 

547 Number of times to repeat the cycle of df_list, by default 8. 

548 gain : int, optional 

549 Gain on instrument, 2 raised to this number, by default 0. 

550 save_path : str or Path, optional 

551 Path to save the schedule file, by default current working directory. 

552 schedule_fn : str, optional 

553 Name of the schedule file, by default 'zen_schedule.MTsch'. 

554 version : int, optional 

555 Version of the schedule file format, by default 4. 

556 

557 Returns 

558 ------- 

559 None 

560 """ 

561 if df_list is not None: 

562 self.df_list = df_list 

563 if df_time_list is not None: 

564 self.df_time_list = df_time_list 

565 if save_path is None: 

566 save_path = Path.cwd() 

567 else: 

568 save_path = Path(save_path) 

569 

570 self.master_schedule = self.make_schedule( 

571 self.df_list, self.df_time_list, repeat=repeat * 3 

572 ) 

573 

574 t_offset_dict = self.get_schedule_offset(zen_start, self.master_schedule) 

575 

576 self.sa_list = self.make_schedule( 

577 self.df_list, 

578 self.df_time_list, 

579 t1_dict=t_offset_dict, 

580 repeat=repeat, 

581 ) 

582 

583 zacq_list: list[str] = [] 

584 if version >= 4: 

585 zacq_list = ["$TX=0", "$Type=339"] 

586 

587 for ii, ss in enumerate(self.sa_list[:-1]): 

588 t0 = self._convert_time_to_seconds(ss["time"]) 

589 t1 = self._convert_time_to_seconds(self.sa_list[ii + 1]["time"]) 

590 if ss["date"] != self.sa_list[ii + 1]["date"]: 

591 t1 += 24 * 3600 

592 

593 duration = t1 - t0 - self._resync_pause 

594 sr = int( 

595 self.sr_dict[str(ss["df"])] if str(ss["df"]) in self.sr_dict else 0 

596 ) 

597 

598 if version >= 4: 

599 zacq_list.append(f"$schline{ii+1} = {duration:.0f},{sr:.0f},1,0,0") 

600 else: 

601 zacq_list.append(f"$schline{ii+1} = {duration:.0f},{sr:.0f},1") 

602 

603 if version >= 4: 

604 zacq_list.extend( 

605 [ 

606 "$DayRepeat=0", 

607 "$RelativeOffsetSeconds=0", 

608 "$AutoSleep=0", 

609 ] 

610 ) 

611 

612 fn = save_path / schedule_fn 

613 with open(fn, "w") as fid: 

614 fid.write("\n".join(zacq_list)) 

615 

616 print(f"Wrote schedule file to {fn}") 

617 print("+--------------------------------------+") 

618 print(f"| SET ZEN START TIME TO: {zen_start} |") 

619 print("+--------------------------------------+") 

620 

621 def _convert_time_to_seconds(self, time_string): 

622 """ 

623 convert a time string given as hh:mm:ss into seconds 

624 """ 

625 t_list = [float(tt) for tt in time_string.split(":")] 

626 t_seconds = t_list[0] * 3600 + t_list[1] * 60 + t_list[2] 

627 

628 return t_seconds