Source code for mth5.io.zen.zen_tools

# -*- coding: utf-8 -*-
"""
Created on Tue Apr 18 15:40:28 2023

@author: jpeacock
"""
import datetime
import shutil
import string
import time

# =============================================================================
# Imports
# =============================================================================
from pathlib import Path

import dateutil.parser
import numpy as np
from loguru import logger

from mth5.io.zen import Z3D


try:
    import win32api
except ImportError:
    print("WARNING: Cannot find win32api, will not be able to detect" " drive names")
# =============================================================================


# get the external drives for SD cards
[docs] def get_drives(): """ get a list of logical drives detected on the machine Note this only works for windows. Outputs: ---------- **drives** : list of drives as letters :Example: :: >>> import mtpy.usgs.zen as zen >>> zen.get_drives() """ drives = [] bitmask = win32api.GetLogicalDrives() for letter in string.ascii_uppercase: if bitmask & 1: drives.append(letter) bitmask >>= 1 return drives
# get the names of the drives which should correspond to channels
[docs] def get_drive_names(): """ get a list of drive names detected assuming the cards are names by box and channel. Outputs: ---------- **drive_dict** : dictionary keys are the drive letters and values are the drive names :Example: :: >>> import mtpy.usgs.zen as zen >>> zen.get_drives_names() """ drives = get_drives() drive_dict = {} for drive in drives: try: drive_name = win32api.GetVolumeInformation(drive + ":\\")[0] if drive_name.find("CH") >= 0: drive_dict[drive] = drive_name except: pass if not bool(drive_dict): return None return drive_dict
[docs] def split_station(station): """ split station name into name and number """ for ii, ss in enumerate(station): try: int(ss) find = ii break except ValueError: continue name = station[0:find] number = station[find:] return (name, number)
[docs] def copy_from_sd( station, save_path=r"d:\Peacock\MTData", channel_dict={ "1": "HX", "2": "HY", "3": "HZ", "4": "EX", "5": "EY", "6": "HZ", }, copy_date=None, copy_type="all", ): """ copy files from sd cards into a common folder (save_path) do not put an underscore in station, causes problems at the moment Arguments: ----------- **station** : string full name of station from which data is being saved **save_path** : string full path to save data to **channel_dict** : dictionary keys are the channel numbers as strings and the values are the component that corresponds to that channel, values are placed in upper case in the code **copy_date** : YYYY-MM-DD date to copy from depending on copy_type **copy_type** : [ 'all' | 'before' | 'after' | 'on' ] * 'all' --> copy all files on the SD card * 'before' --> copy files before and on this date * 'after' --> copy files on and after this date * 'on' --> copy files on this date only Outputs: ----------- **fn_list** : list list of filenames copied to save_path :Example: :: >>> import mtpy.usgs.zen as zen >>> fn_list = zen.copy_from_sd('mt01', save_path=r"/home/mt/survey_1") """ s_name, s_int = split_station(station) drive_names = get_drive_names() save_path = Path(save_path).joinpath(station) if not save_path.exists(): save_path.mkdir() if drive_names is None: logger.error("No drive names found. No files copied.") return [], save_path # make a datetime object from copy date if copy_date is not None: c_date = dateutil.parser.parse(copy_date) st_test = time.ctime() fn_list = [] for key in list(drive_names.keys()): dr = Path(f"{key}:") logger.info(f"Reading from drive {key}.") for fn in list(set(list(dr.rglob("*.z3d")) + list(dr.rglob("*.Z3D")))): # test for copy date if copy_date is not None: file_date = datetime.datetime.fromtimestamp(fn.stat().st_mtime) if copy_type == "after": if file_date < c_date: continue elif copy_type == "before": if file_date > c_date: continue elif copy_type == "on": if file_date.date() != c_date.date(): continue try: file_size = fn.stat().st_size if file_size >= 1600: zt = Z3D(fn=fn) zt.read_all_info() if zt.metadata.station.find(s_int) >= 0: channel = zt.metadata.ch_cmp.upper() st = zt.schedule.Time.replace(":", "") sd = zt.schedule.Date.replace("-", "") sv_fn = ( f"{station}_{sd}_{st}_{int(zt.sample_rate)}_{channel}.Z3D" ) new_fn = save_path.joinpath(sv_fn) fn_list.append(new_fn) shutil.copy(fn, new_fn) logger.info(f"Copied {fn} to {new_fn}") logger.info(f"File size is {file_size}") else: logger.warning(f"Skipped {fn} because file to small {file_size}") except WindowsError: logger.warning(f"Faulty file at {fn}") et_test = time.ctime() logger.info(f"Started copying at: {st_test}") logger.info(f"Ended copying at: {et_test}") return fn_list, save_path
# ============================================================================== # delete files from sd cards # ==============================================================================
[docs] def delete_files_from_sd( delete_date=None, delete_type=None, delete_folder=r"d:\Peacock\MTData\Deleted", verbose=True, ): """ delete files from sd card, if delete_date is not None, anything on this date and before will be deleted. Deletes just .Z3D files, leaves zenini.cfg Agruments: ----------- **delete_date** : YYYY-MM-DD date to delete files from **delete_type** : [ 'all' | 'before' | 'after' | 'on' ] * 'all' --> delete all files on sd card * 'before' --> delete files on and before delete_date * 'after' --> delete files on and after delete_date * 'on' --> delete files on delete_date **delete_folder** : string full path to a folder where files will be moved to just in case. If None, files will be deleted for ever. Returns: --------- **delete_fn_list** : list list of deleted files. :Example: :: >>> import mtpy.usgs.zen as zen >>> # Delete all files before given date, forever. >>> zen.delete_files_from_sd(delete_date='2004/04/20', delete_type='before', delete_folder=None) >>> # Delete all files into a folder just in case >>> zen.delete_files_from_sd(delete_type='all', delete_folder=r"/home/mt/deleted_files") """ drive_names = get_drive_names() if delete_folder is None: delete_path = Path().cwd() else: delete_path = Path(delete_folder) if not delete_path.exists(): delete_path.mkdir() if drive_names is None: logger.error("No drives found.") raise IOError("No drives to copy from.") if delete_date is not None: delete_date = int(delete_date.replace("-", "")) delete_fn_list = [] for key, value in drive_names.items(): dr = Path(f"{key}:") for fn in dr.iterdir(): if fn.suffix in [".Z3D", ".z3d"]: zt = Z3D(fn) zt.read_all_info() zt_date = int(zt.schedule.Date.replace("-", "")) # zt.get_info() if delete_type == "all" or delete_date is None: if delete_folder is None: fn.unlink() delete_fn_list.append(fn) logger.info(f"Deleted {fn}") else: shutil.move(fn, delete_path.joinpath(fn.name)) delete_fn_list.append(fn) logger.info(f"Moved {fn} to {delete_path}") else: if delete_type == "before": if zt_date <= delete_date: if delete_folder is None: fn.unlink() delete_fn_list.append(fn) logger.info(f"Deleted {fn}") else: shutil.move(fn, delete_path.joinpath(fn.name)) delete_fn_list.append(fn) logger.info(f"Moved {fn} to {delete_path}") elif delete_type == "after": if zt_date >= delete_date: if delete_folder is None: fn.unlink() delete_fn_list.append(fn) logger.info(f"Deleted {fn}") else: shutil.move(fn, delete_path.joinpath(fn.name)) delete_fn_list.append(fn) logger.info(f"Moved {fn} to {delete_path}") elif delete_type == "on": if zt_date == delete_date: if delete_folder is None: fn.unlink() delete_fn_list.append(fn) logger.info(f"Deleted {fn}") else: shutil.move(fn, delete_path.joinpath(fn.name)) delete_fn_list.append(fn) logger.info(f"Moved {fn} to {delete_path}") return delete_fn_list
# ============================================================================== # read and write a zen schedule # ==============================================================================
[docs] class ZenSchedule(object): """ deals with reading, writing and copying schedule Creates a repeating schedule based on the master_schedule. It will then change the first scheduling action to coincide with the master schedule, such that all deployed boxes will have the same schedule. :Example: :: >>> import mtpy.usgs.zen as zen >>> zs = zen.ZenSchedule() >>> zs.write_schedule('MT01', dt_offset='2013-06-23,04:00:00') ====================== ==================================================== Attributes Description ====================== ==================================================== ch_cmp_dict dictionary for channel components with keys being the channel number and values being the channel label ch_num_dict dictionary for channel components whith keys being channel label and values being channel number df_list sequential list of sampling rates to repeat in schedule df_time_list sequential list of time intervals to measure for each corresponding sampling rate dt_format date and time format. *default* is YYY-MM-DD,hh:mm:ss dt_offset start date and time of schedule in dt_format gain_dict dictionary of gain values for channel number initial_dt initial date, or dummy zero date for scheduling light_dict dictionary of light color values for schedule master_schedule the schedule that all data loggers should schedule at. Will taylor the schedule to match the master schedule according to dt_offset meta_dict dictionary for meta data meta_keys keys for meta data dictionary sa_keys keys for schedule actions sa_list list of schedule actions including time and df sr_dict dictionary of sampling rate values verbose [ True | False ] True to print information to console ====================== ==================================================== """ def __init__(self):
[docs] self.verbose = True
[docs] self.sr_dict = { "256": "0", "512": "1", "1024": "2", "2048": "3", "4096": "4", }
[docs] self.sa_list = []
[docs] self.ch_cmp_dict = { "1": "hx", "2": "hy", "3": "hz", "4": "ex", "5": "ey", "6": "hz", }
[docs] self.ch_num_dict = dict( [(self.ch_cmp_dict[key], key) for key in self.ch_cmp_dict] )
[docs] self.dt_format = "%Y-%m-%d,%H:%M:%S"
[docs] self.initial_dt = "2000-01-01,00:00:00"
[docs] self.dt_offset = time.strftime(self.dt_format, time.gmtime())
[docs] self.df_list = (4096, 256)
[docs] self.df_time_list = ("00:10:00", "07:50:00")
[docs] self.master_schedule = self.make_schedule( self.df_list, self.df_time_list, repeat=16 )
self._resync_pause = 20 # ==================================================
[docs] def add_time( self, date_time, add_minutes=0, add_seconds=0, add_hours=0, add_days=0 ): """ add time to a time string assuming date_time is in the format YYYY-MM-DD,HH:MM:SS """ fulldate = datetime.datetime.strptime(date_time, self.dt_format) fulldate = fulldate + datetime.timedelta( days=add_days, hours=add_hours, minutes=add_minutes, seconds=add_seconds, ) return fulldate
# ==================================================
[docs] def make_schedule(self, df_list, df_length_list, repeat=5, t1_dict=None): """ make a repeated schedule given list of sampling frequencies and duration for each. Arguments: ----------- **df_list** : list list of sampling frequencies in Hz, note needs to be powers of 2 starting at 256 **df_length_list** : list list of durations in hh:mm:ss format **repeat** : int number of times to repeat the sequence **t1_dict** : dictionary dictionary returned from get_schedule_offset Returns: -------- **time_list**: list of dictionaries with keys: * 'dt' --> date and time of schedule event * 'df' --> sampling rate for that event """ df_list = np.array(df_list) df_length_list = np.array(df_length_list) ndf = len(df_list) if t1_dict is not None: time_list = [{"dt": self.initial_dt, "df": t1_dict["df"]}] kk = np.where(np.array(df_list) == t1_dict["df"])[0][0] - ndf + 1 df_list = np.append(df_list[kk:], df_list[:kk]) df_length_list = np.append(df_length_list[kk:], df_length_list[:kk]) time_list.append(dict([("dt", t1_dict["dt"]), ("df", df_list[0])])) ii = 1 else: time_list = [{"dt": self.initial_dt, "df": df_list[0]}] ii = 0 for rr in range(1, repeat + 1): for df, df_length, jj in zip(df_list, df_length_list, range(ndf)): dtime = time.strptime(df_length, "%H:%M:%S") ndt = self.add_time( time_list[ii]["dt"], add_hours=dtime.tm_hour, add_minutes=dtime.tm_min, add_seconds=dtime.tm_sec, ) time_list.append( { "dt": ndt.strftime(self.dt_format), "df": df_list[jj - ndf + 1], } ) ii += 1 for nn, ns in enumerate(time_list): sdate, stime = ns["dt"].split(",") ns["date"] = sdate ns["time"] = stime ns["sr"] = self.sr_dict[str(ns["df"])] return time_list
# ==================================================
[docs] def get_schedule_offset(self, time_offset, schedule_time_list): """ gets the offset in time from master schedule list and time_offset so that all schedules will record at the same time according to master schedule list schedule_time_list Attributes: ----------- **time_offset** : hh:mm:ss the time offset given to the zen reciever **schedule_time_list** : list list of actual schedule times returned from make_schedule Returns: -------- **s1** : dictionary dictionary with keys: * 'dt' --> date and time of offset from next schedule event from schedule_time_list * 'df' --> sampling rate of that event """ dt_offset = "{0},{1}".format("2000-01-01", time_offset) t0 = time.mktime(time.strptime("2000-01-01,00:00:00", self.dt_format)) for ii, tt in enumerate(schedule_time_list): ssec = time.mktime(time.strptime(tt["dt"], self.dt_format)) osec = time.mktime(time.strptime(dt_offset, self.dt_format)) if ssec > osec: sdiff = time.localtime(t0 + (ssec - osec)) t1 = self.add_time( "2000-01-01,00:00:00", add_hours=sdiff.tm_hour, add_minutes=sdiff.tm_min, add_seconds=sdiff.tm_sec, ) s1 = { "dt": t1.strftime(self.dt_format), "df": schedule_time_list[ii - 1]["df"], } return s1
[docs] def write_schedule_for_gui( self, zen_start=None, df_list=None, df_time_list=None, repeat=8, gain=0, save_path=None, schedule_fn="zen_schedule.MTsch", version=4, ): """ write a zen schedule file **Note**: for the older boxes use 'Zeus3Ini.cfg' for the savename Arguments: ---------- **zen_start** : hh:mm:ss start time you want the zen to start collecting data. if this is none then current time on computer is used. **In UTC Time** **Note**: this will shift the starting point to match the master schedule, so that all stations have the same schedule. **df_list** : list list of sampling rates in Hz **df_time_list** : list list of time intervals corresponding to df_list in hh:mm:ss format **repeat** : int number of time to repeat the cycle of df_list **gain** : int gain on instrument, 2 raised to this number. Returns: -------- * writes a schedule file to input into the ZenAcq Gui """ if df_list is not None: self.df_list = df_list if df_time_list is not None: self.df_time_list = df_time_list if save_path is None: save_path = Path().cwd() else: save_path = Path(save_path) # make a master schedule first self.master_schedule = self.make_schedule( self.df_list, self.df_time_list, repeat=repeat * 3 ) # estimate the first off set time t_offset_dict = self.get_schedule_offset(zen_start, self.master_schedule) # make the schedule with the offset of the first schedule action self.sa_list = self.make_schedule( self.df_list, self.df_time_list, t1_dict=t_offset_dict, repeat=repeat, ) # make a list of lines to write to a file for ZenAcq if version >= 4: zacq_list = ["$TX=0", "$Type=339"] elif version < 4: zacq_list = [] for ii, ss in enumerate(self.sa_list[:-1]): t0 = self._convert_time_to_seconds(ss["time"]) t1 = self._convert_time_to_seconds(self.sa_list[ii + 1]["time"]) if ss["date"] != self.sa_list[ii + 1]["date"]: t1 += 24 * 3600 # subtract 10 seconds for transition between schedule items. duration = t1 - t0 - self._resync_pause sr = int(self.sr_dict[str(ss["df"])]) if version >= 4: zacq_list.append(f"$schline{ii+1} = {duration:.0f},{sr:.0f},1,0,0") elif version < 4: zacq_list.append(f"$schline{ii+1} = {duration:.0f},{sr:.0f},1") if version >= 4: zacq_list += [ "$DayRepeat=0", "$RelativeOffsetSeconds=0", "$AutoSleep=0", ] fn = save_path.joinpath(schedule_fn) with open(fn, "w") as fid: fid.write("\n".join(zacq_list)) print("Wrote schedule file to {0}".format(fn)) print("+--------------------------------------+") print("| SET ZEN START TIME TO: {0} |".format(zen_start)) print("+--------------------------------------+")
def _convert_time_to_seconds(self, time_string): """ convert a time string given as hh:mm:ss into seconds """ t_list = [float(tt) for tt in time_string.split(":")] t_seconds = t_list[0] * 3600 + t_list[1] * 60 + t_list[2] return t_seconds