Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mth5 \ mth5 \ io \ zen \ zen_tools.py: 90%
216 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-27 20:09 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-27 20:09 -0800
1# -*- coding: utf-8 -*-
2"""
3Created on Tue Apr 18 15:40:28 2023
5@author: jpeacock
6"""
7import datetime
8import shutil
9import string
10import time
12# =============================================================================
13# Imports
14# =============================================================================
15from pathlib import Path
17import dateutil.parser
18import numpy as np
19from loguru import logger
21from mth5.io.zen import Z3D
24try:
25 import win32api
26except ImportError:
27 print("WARNING: Cannot find win32api, will not be able to detect" " drive names")
28# =============================================================================
31# get the external drives for SD cards
32def get_drives() -> list[str]:
33 """
34 Get a list of logical drives detected on the machine.
36 Note: This only works for Windows.
38 Returns
39 -------
40 list of str
41 List of drives as letters.
43 Examples
44 --------
45 >>> get_drives()
46 ['C', 'D', 'E']
47 """
48 drives = []
49 bitmask = win32api.GetLogicalDrives()
50 for letter in string.ascii_uppercase:
51 if bitmask & 1:
52 drives.append(letter)
53 bitmask >>= 1
54 return drives
57# get the names of the drives which should correspond to channels
58def get_drive_names() -> dict[str, str] | None:
59 """
60 Get a list of drive names detected assuming the cards are named by box and channel.
62 Returns
63 -------
64 dict of str to str or None
65 Keys are the drive letters and values are the drive names. Returns None if no drives are found.
67 Examples
68 --------
69 >>> get_drive_names()
70 {'D': 'CH1', 'E': 'CH2'}
71 """
72 drives = get_drives()
74 drive_dict = {}
75 for drive in drives:
76 try:
77 drive_name = win32api.GetVolumeInformation(drive + ":\\")[0]
78 if drive_name.find("CH") >= 0:
79 drive_dict[drive] = drive_name
80 except:
81 pass
82 if not bool(drive_dict):
83 return None
84 return drive_dict
87def split_station(station: str) -> tuple[str, str]:
88 """
89 Split station name into name and number.
91 Parameters
92 ----------
93 station : str
94 Full station name.
96 Returns
97 -------
98 tuple of str
99 Tuple containing the station name and number.
101 Examples
102 --------
103 >>> split_station('MT01')
104 ('MT', '01')
105 """
107 for ii, ss in enumerate(station):
108 try:
109 int(ss)
110 find = ii
111 break
112 except ValueError:
113 continue
114 name = station[0:find]
115 number = station[find:]
117 return (name, number)
120def copy_from_sd(
121 station: str,
122 save_path: str | Path = Path(r"d:\\Peacock\\MTData"),
123 channel_dict: dict[str, str] = {
124 "1": "HX",
125 "2": "HY",
126 "3": "HZ",
127 "4": "EX",
128 "5": "EY",
129 "6": "HZ",
130 },
131 copy_date: str | None = None,
132 copy_type: str = "all",
133) -> tuple[list[Path], Path]:
134 """
135 Copy files from SD cards into a common folder (save_path).
137 Parameters
138 ----------
139 station : str
140 Full name of station from which data is being saved.
141 save_path : str or Path, optional
142 Full path to save data to, by default 'd:\\Peacock\\MTData'.
143 channel_dict : dict of str to str, optional
144 Keys are the channel numbers as strings and the values are the component that corresponds to that channel. Values are placed in upper case in the code.
145 copy_date : str, optional
146 Date to copy from depending on copy_type, in 'YYYY-MM-DD' format.
147 copy_type : {'all', 'before', 'after', 'on'}, optional
148 Type of copy operation:
149 - 'all': Copy all files on the SD card.
150 - 'before': Copy files before and on this date.
151 - 'after': Copy files on and after this date.
152 - 'on': Copy files on this date only.
154 Returns
155 -------
156 tuple of list of Path and Path
157 List of filenames copied to save_path and the save_path itself.
159 Examples
160 --------
161 >>> copy_from_sd('MT01', save_path=r"/home/mt/survey_1")
162 ([Path('/home/mt/survey_1/MT01_2026-01-27_120000_256_HX.Z3D')], Path('/home/mt/survey_1'))
163 """
164 save_path = Path(save_path).joinpath(station)
166 if not save_path.exists():
167 save_path.mkdir(parents=True)
169 fn_list = []
170 drive_names = get_drive_names()
171 if drive_names is None:
172 logger.error("No drive names found. No files copied.")
173 return [], save_path
175 if copy_date is not None:
176 c_date = dateutil.parser.parse(copy_date)
178 s_name, s_int = split_station(station)
180 for key, drive_name in drive_names.items():
181 dr = Path(f"{key}:")
182 logger.info(f"Reading from drive {key}.")
184 for fn in dr.rglob("*.z3d"):
185 if copy_date is not None:
186 file_date = datetime.datetime.fromtimestamp(fn.stat().st_mtime)
187 if (
188 (copy_type == "after" and file_date < c_date)
189 or (copy_type == "before" and file_date > c_date)
190 or (copy_type == "on" and file_date.date() != c_date.date())
191 ):
192 continue
194 try:
195 file_size = fn.stat().st_size
196 if file_size >= 1600:
197 zt = Z3D(fn=fn)
198 zt.read_all_info()
200 if (
201 zt.metadata
202 and zt.metadata.station
203 and s_int in zt.metadata.station
204 ):
205 channel = (
206 zt.metadata.ch_cmp.upper()
207 if zt.metadata.ch_cmp
208 else "UNKNOWN"
209 )
210 st = (
211 zt.schedule.Time.replace(":", "")
212 if zt.schedule and zt.schedule.Time
213 else "000000"
214 )
215 sd = (
216 zt.schedule.Date.replace("-", "")
217 if zt.schedule and zt.schedule.Date
218 else "000000"
219 )
221 sv_fn = f"{station}_{sd}_{st}_{int(zt.sample_rate or 0)}_{channel}.Z3D"
222 new_fn = save_path / sv_fn
223 fn_list.append(new_fn)
225 shutil.copy(fn, new_fn)
226 logger.info(f"Copied {fn} to {new_fn}")
227 logger.info(f"File size is {file_size}")
228 else:
229 logger.warning(f"Skipped {fn} because file too small {file_size}")
230 except Exception as e:
231 logger.warning(f"Error processing file {fn}: {e}")
233 return fn_list, save_path
236# ==============================================================================
237# delete files from sd cards
238# ==============================================================================
239def delete_files_from_sd(
240 delete_date: str | None = None,
241 delete_type: str | None = None,
242 delete_folder: str | Path = Path().cwd(),
243 verbose: bool = True,
244) -> list[Path]:
245 """
246 Delete files from SD card. If delete_date is not None, anything on this date and before will be deleted.
248 Parameters
249 ----------
250 delete_date : str, optional
251 Date to delete files from, in 'YYYY-MM-DD' format.
252 delete_type : {'all', 'before', 'after', 'on'}, optional
253 Type of delete operation:
254 - 'all': Delete all files on SD card.
255 - 'before': Delete files on and before delete_date.
256 - 'after': Delete files on and after delete_date.
257 - 'on': Delete files on delete_date.
258 delete_folder : str or Path, optional
259 Full path to a folder where files will be moved to just in case. If None, files will be deleted permanently.
260 verbose : bool, optional
261 If True, print detailed logs, by default True.
263 Returns
264 -------
265 list of Path
266 List of deleted files.
268 Examples
269 --------
270 >>> delete_files_from_sd(delete_date='2026-01-27', delete_type='before', delete_folder=None)
271 [Path('D:/file1.Z3D'), Path('D:/file2.Z3D')]
272 """
273 delete_path = Path(delete_folder) if delete_folder else Path.cwd()
274 if not delete_path.exists():
275 delete_path.mkdir(parents=True)
277 delete_fn_list = []
278 drive_names = get_drive_names()
279 if drive_names is None:
280 logger.error("No drives found.")
281 raise OSError("No drives found.")
283 for key, drive_name in drive_names.items():
284 dr = Path(f"{key}:")
285 for fn in dr.iterdir():
286 if fn.suffix.lower() == ".z3d":
287 zt = Z3D(fn)
288 zt.read_all_info()
289 zt_date = (
290 int(zt.schedule.Date.replace("-", ""))
291 if zt.schedule and zt.schedule.Date
292 else 0
293 )
295 if delete_type == "all" or delete_date is None:
296 target = delete_path / fn.name if delete_folder else None
297 if target:
298 shutil.move(fn, target)
299 else:
300 fn.unlink()
301 delete_fn_list.append(fn)
302 elif delete_type == "before" and zt_date <= int(
303 delete_date.replace("-", "")
304 ):
305 target = delete_path / fn.name if delete_folder else None
306 if target:
307 shutil.move(fn, target)
308 else:
309 fn.unlink()
310 delete_fn_list.append(fn)
311 elif delete_type == "after" and zt_date >= int(
312 delete_date.replace("-", "")
313 ):
314 target = delete_path / fn.name if delete_folder else None
315 if target:
316 shutil.move(fn, target)
317 else:
318 fn.unlink()
319 delete_fn_list.append(fn)
320 elif delete_type == "on" and zt_date == int(
321 delete_date.replace("-", "")
322 ):
323 target = delete_path / fn.name if delete_folder else None
324 if target:
325 shutil.move(fn, target)
326 else:
327 fn.unlink()
328 delete_fn_list.append(fn)
330 return delete_fn_list
333# ==============================================================================
334# read and write a zen schedule
335# ==============================================================================
336class ZenSchedule:
337 """
338 Deals with reading, writing, and copying schedules.
340 Creates a repeating schedule based on the master_schedule. It will then change the first scheduling action to coincide with the master schedule, such that all deployed boxes will have the same schedule.
342 Attributes
343 ----------
344 verbose : bool
345 If True, print detailed logs.
346 sr_dict : dict of str to str
347 Dictionary of sampling rate values.
348 sa_list : list of dict
349 List of schedule actions including time and df.
350 ch_cmp_dict : dict of str to str
351 Dictionary for channel components with keys being the channel number and values being the channel label.
352 ch_num_dict : dict of str to str
353 Dictionary for channel components with keys being the channel label and values being the channel number.
354 dt_format : str
355 Date and time format, default is 'YYYY-MM-DD,hh:mm:ss'.
356 initial_dt : str
357 Initial date, or dummy zero date for scheduling.
358 dt_offset : str
359 Start date and time of schedule in dt_format.
360 df_list : tuple of int
361 Sequential list of sampling rates to repeat in schedule.
362 df_time_list : tuple of str
363 Sequential list of time intervals to measure for each corresponding sampling rate.
364 master_schedule : list of dict
365 The schedule that all data loggers should schedule at. Will tailor the schedule to match the master schedule according to dt_offset.
366 """
368 def __init__(self):
369 self.verbose = True
370 self.sr_dict = {
371 "256": "0",
372 "512": "1",
373 "1024": "2",
374 "2048": "3",
375 "4096": "4",
376 }
377 self.sa_list = []
378 self.ch_cmp_dict = {
379 "1": "hx",
380 "2": "hy",
381 "3": "hz",
382 "4": "ex",
383 "5": "ey",
384 "6": "hz",
385 }
386 self.ch_num_dict = dict(
387 [(self.ch_cmp_dict[key], key) for key in self.ch_cmp_dict]
388 )
390 self.dt_format = "%Y-%m-%d,%H:%M:%S"
391 self.initial_dt = "2000-01-01,00:00:00"
392 self.dt_offset = time.strftime(self.dt_format, time.gmtime())
393 self.df_list = (4096, 256)
394 self.df_time_list = ("00:10:00", "07:50:00")
395 self.master_schedule = self.make_schedule(
396 self.df_list, self.df_time_list, repeat=16
397 )
398 self._resync_pause = 20
400 # ==================================================
401 def add_time(
402 self, date_time, add_minutes=0, add_seconds=0, add_hours=0, add_days=0
403 ):
404 """
405 add time to a time string
406 assuming date_time is in the format YYYY-MM-DD,HH:MM:SS
407 """
409 fulldate = datetime.datetime.strptime(date_time, self.dt_format)
411 fulldate = fulldate + datetime.timedelta(
412 days=add_days,
413 hours=add_hours,
414 minutes=add_minutes,
415 seconds=add_seconds,
416 )
417 return fulldate
419 # ==================================================
420 def make_schedule(self, df_list, df_length_list, repeat=5, t1_dict=None):
421 """
422 make a repeated schedule given list of sampling frequencies and
423 duration for each.
424 Arguments:
425 -----------
426 **df_list** : list
427 list of sampling frequencies in Hz, note needs to be
428 powers of 2 starting at 256
429 **df_length_list** : list
430 list of durations in hh:mm:ss format
431 **repeat** : int
432 number of times to repeat the sequence
433 **t1_dict** : dictionary
434 dictionary returned from get_schedule_offset
435 Returns:
436 --------
437 **time_list**: list of dictionaries with keys:
438 * 'dt' --> date and time of schedule event
439 * 'df' --> sampling rate for that event
440 """
442 df_list = np.array(df_list)
443 df_length_list = np.array(df_length_list)
444 ndf = len(df_list)
446 if t1_dict is not None:
447 time_list = [{"dt": self.initial_dt, "df": t1_dict["df"]}]
449 kk = np.where(np.array(df_list) == t1_dict["df"])[0][0] - ndf + 1
450 df_list = np.append(df_list[kk:], df_list[:kk])
451 df_length_list = np.append(df_length_list[kk:], df_length_list[:kk])
452 time_list.append(dict([("dt", t1_dict["dt"]), ("df", df_list[0])]))
453 ii = 1
454 else:
455 time_list = [{"dt": self.initial_dt, "df": df_list[0]}]
456 ii = 0
457 for rr in range(1, repeat + 1):
458 for df, df_length, jj in zip(df_list, df_length_list, range(ndf)):
459 dtime = time.strptime(df_length, "%H:%M:%S")
460 ndt = self.add_time(
461 time_list[ii]["dt"],
462 add_hours=dtime.tm_hour,
463 add_minutes=dtime.tm_min,
464 add_seconds=dtime.tm_sec,
465 )
466 time_list.append(
467 {
468 "dt": ndt.strftime(self.dt_format),
469 "df": df_list[jj - ndf + 1],
470 }
471 )
472 ii += 1
473 for nn, ns in enumerate(time_list):
474 sdate, stime = ns["dt"].split(",")
475 ns["date"] = sdate
476 ns["time"] = stime
477 ns["sr"] = self.sr_dict[str(ns["df"])]
479 return time_list
481 # ==================================================
482 def get_schedule_offset(self, time_offset, schedule_time_list):
483 """
484 gets the offset in time from master schedule list and time_offset so
485 that all schedules will record at the same time according to master
486 schedule list schedule_time_list
487 Attributes:
488 -----------
489 **time_offset** : hh:mm:ss
490 the time offset given to the zen reciever
491 **schedule_time_list** : list
492 list of actual schedule times returned
493 from make_schedule
494 Returns:
495 --------
496 **s1** : dictionary
497 dictionary with keys:
498 * 'dt' --> date and time of offset from next schedule
499 event from schedule_time_list
500 * 'df' --> sampling rate of that event
501 """
503 dt_offset = "{0},{1}".format("2000-01-01", time_offset)
504 t0 = time.mktime(time.strptime("2000-01-01,00:00:00", self.dt_format))
506 for ii, tt in enumerate(schedule_time_list):
507 ssec = time.mktime(time.strptime(tt["dt"], self.dt_format))
508 osec = time.mktime(time.strptime(dt_offset, self.dt_format))
510 if ssec > osec:
511 sdiff = time.localtime(t0 + (ssec - osec))
512 t1 = self.add_time(
513 "2000-01-01,00:00:00",
514 add_hours=sdiff.tm_hour,
515 add_minutes=sdiff.tm_min,
516 add_seconds=sdiff.tm_sec,
517 )
518 s1 = {
519 "dt": t1.strftime(self.dt_format),
520 "df": schedule_time_list[ii - 1]["df"],
521 }
522 return s1
524 def write_schedule_for_gui(
525 self,
526 zen_start: str | None = None,
527 df_list: list[int] | None = None,
528 df_time_list: list[str] | None = None,
529 repeat: int = 8,
530 gain: int = 0,
531 save_path: str | Path | None = None,
532 schedule_fn: str = "zen_schedule.MTsch",
533 version: int = 4,
534 ) -> None:
535 """
536 Write a zen schedule file.
538 Parameters
539 ----------
540 zen_start : str, optional
541 Start time you want the zen to start collecting data, in UTC time. If None, current time is used.
542 df_list : list of int, optional
543 List of sampling rates in Hz.
544 df_time_list : list of str, optional
545 List of time intervals corresponding to df_list in hh:mm:ss format.
546 repeat : int, optional
547 Number of times to repeat the cycle of df_list, by default 8.
548 gain : int, optional
549 Gain on instrument, 2 raised to this number, by default 0.
550 save_path : str or Path, optional
551 Path to save the schedule file, by default current working directory.
552 schedule_fn : str, optional
553 Name of the schedule file, by default 'zen_schedule.MTsch'.
554 version : int, optional
555 Version of the schedule file format, by default 4.
557 Returns
558 -------
559 None
560 """
561 if df_list is not None:
562 self.df_list = df_list
563 if df_time_list is not None:
564 self.df_time_list = df_time_list
565 if save_path is None:
566 save_path = Path.cwd()
567 else:
568 save_path = Path(save_path)
570 self.master_schedule = self.make_schedule(
571 self.df_list, self.df_time_list, repeat=repeat * 3
572 )
574 t_offset_dict = self.get_schedule_offset(zen_start, self.master_schedule)
576 self.sa_list = self.make_schedule(
577 self.df_list,
578 self.df_time_list,
579 t1_dict=t_offset_dict,
580 repeat=repeat,
581 )
583 zacq_list: list[str] = []
584 if version >= 4:
585 zacq_list = ["$TX=0", "$Type=339"]
587 for ii, ss in enumerate(self.sa_list[:-1]):
588 t0 = self._convert_time_to_seconds(ss["time"])
589 t1 = self._convert_time_to_seconds(self.sa_list[ii + 1]["time"])
590 if ss["date"] != self.sa_list[ii + 1]["date"]:
591 t1 += 24 * 3600
593 duration = t1 - t0 - self._resync_pause
594 sr = int(
595 self.sr_dict[str(ss["df"])] if str(ss["df"]) in self.sr_dict else 0
596 )
598 if version >= 4:
599 zacq_list.append(f"$schline{ii+1} = {duration:.0f},{sr:.0f},1,0,0")
600 else:
601 zacq_list.append(f"$schline{ii+1} = {duration:.0f},{sr:.0f},1")
603 if version >= 4:
604 zacq_list.extend(
605 [
606 "$DayRepeat=0",
607 "$RelativeOffsetSeconds=0",
608 "$AutoSleep=0",
609 ]
610 )
612 fn = save_path / schedule_fn
613 with open(fn, "w") as fid:
614 fid.write("\n".join(zacq_list))
616 print(f"Wrote schedule file to {fn}")
617 print("+--------------------------------------+")
618 print(f"| SET ZEN START TIME TO: {zen_start} |")
619 print("+--------------------------------------+")
621 def _convert_time_to_seconds(self, time_string):
622 """
623 convert a time string given as hh:mm:ss into seconds
624 """
625 t_list = [float(tt) for tt in time_string.split(":")]
626 t_seconds = t_list[0] * 3600 + t_list[1] * 60 + t_list[2]
628 return t_seconds