Source code for mth5.io.usgs_ascii.usgs_ascii

# -*- coding: utf-8 -*-
"""
Created on Thu Aug 27 16:54:09 2020

:author: Jared Peacock

:license: MIT

"""

# =============================================================================
# Imports
# =============================================================================

from pathlib import Path
import time
import datetime

import gzip

import numpy as np
import pandas as pd

from mth5.timeseries import ChannelTS, RunTS
from mth5.io.usgs_ascii import AsciiMetadata

# =============================================================================
#  Metadata for usgs ascii file
# =============================================================================


[docs]class USGSascii(AsciiMetadata): """ Read and write USGS ascii formatted time series. =================== ======================================================= Attributes Description =================== ======================================================= ts Pandas dataframe holding the time series data fn Full path to .asc file station_dir Full path to station directory meta_notes Notes of how the station was collected =================== ======================================================= :Example: :: >>> zc = Z3DCollection() >>> fn_list = zc.get_time_blocks(z3d_path) >>> zm = USGSasc() >>> zm.SurveyID = 'iMUSH' >>> zm.get_z3d_db(fn_list[0]) >>> zm.read_mtft24_cfg() >>> zm.CoordinateSystem = 'Geomagnetic North' >>> zm.SurveyID = 'MT' >>> zm.write_asc_file(str_fmt='%15.7e') >>> zm.write_station_info_metadata() """ def __init__(self, fn=None, **kwargs): super().__init__(fn, **kwargs) self.ts = None self.station_dir = Path().cwd() self.meta_notes = None for key in kwargs.keys(): setattr(self, key, kwargs[key]) ### need copy in the metadata otherwise the order in the channels ### gets messed up. @property def hx(self): """HX""" if self.ts is not None: return ChannelTS( "magnetic", data=self.ts.hx.to_numpy(), channel_metadata=self.hx_metadata.copy(), run_metadata=self.run_metadata.copy(), station_metadata=self.station_metadata.copy(), survey_metadata=self.survey_metadata.copy(), ) return None @property def hy(self): """hy""" if self.ts is not None: return ChannelTS( "magnetic", data=self.ts.hy.to_numpy(), channel_metadata=self.hy_metadata.copy(), run_metadata=self.run_metadata.copy(), station_metadata=self.station_metadata.copy(), survey_metadata=self.survey_metadata.copy(), ) return None @property def hz(self): """hz""" if self.ts is not None: return ChannelTS( "magnetic", data=self.ts.hz.to_numpy(), channel_metadata=self.hz_metadata.copy(), run_metadata=self.run_metadata.copy(), station_metadata=self.station_metadata.copy(), survey_metadata=self.survey_metadata.copy(), ) return None @property def ex(self): """ex""" if self.ts is not None: return ChannelTS( "electric", data=self.ts.ex.to_numpy(), channel_metadata=self.ex_metadata.copy(), run_metadata=self.run_metadata.copy(), station_metadata=self.station_metadata.copy(), survey_metadata=self.survey_metadata.copy(), ) return None @property def ey(self): """ey""" if self.ts is not None: return ChannelTS( "electric", data=self.ts.ey.to_numpy(), channel_metadata=self.ey_metadata.copy(), run_metadata=self.run_metadata.copy(), station_metadata=self.station_metadata.copy(), survey_metadata=self.survey_metadata.copy(), ) return None
[docs] def to_run_ts(self): """Get xarray for run""" if self.ts is not None: return RunTS( array_list=[self.hx, self.hy, self.hz, self.ex, self.ey], run_metadata=self.run_metadata.copy(), station_metadata=self.station_metadata.copy(), survey_metadata=self.survey_metadata.copy(), ) return None
[docs] def read(self, fn=None): """ Read in a USGS ascii file and fill attributes accordingly. :param fn: full path to .asc file to be read in :type fn: string """ if fn is not None: self.fn = fn st = datetime.datetime.now() data_line = self.read_metadata() self.ts = pd.read_csv( self.fn, delim_whitespace=True, skiprows=data_line, dtype=np.float32, ) dt_index = pd.date_range( start=self.start, periods=self.n_samples, end=self.end ) self.ts.index = dt_index self.ts.columns = self.ts.columns.str.lower() et = datetime.datetime.now() read_time = et - st self.logger.info("Reading took {0}".format(read_time.total_seconds()))
def _make_file_name( self, save_path=None, compression=True, compress_type="zip" ): """ get the file name to save to :param save_path: full path to directory to save file to :type save_path: string :param compression: compress file :type compression: [ True | False ] :return: save_fn :rtype: string """ # make the file name to save to if save_path is not None: save_path = Path(save_path) save_fn = save_path.joinpath( "{0}_{1}T{2}_{3:.0f}.asc".format( self.SiteID, self._start_time.strftime("%Y-%m-%d"), self._start_time.strftime("%H%M%S"), self.AcqSmpFreq, ), ) else: save_fn = self.station_dir.joinpath( "{0}_{1}T{2}_{3:.0f}.asc".format( self.SiteID, self._start_time.strftime("%Y-%m-%d"), self._start_time.strftime("%H%M%S"), self.AcqSmpFreq, ), ) if compression: if compress_type == "zip": save_fn = save_fn + ".zip" elif compress_type == "gzip": save_fn = save_fn + ".gz" return save_fn
[docs] def write( self, save_fn=None, chunk_size=1024, str_fmt="%15.7e", full=True, compress=False, save_dir=None, compress_type="zip", convert_electrics=True, ): """ Write an ascii file in the USGS ascii format. :param save_fn: full path to file name to save the merged ascii to :type save_fn: string :param chunck_size: chunck size to write file in blocks, larger numbers are typically slower. :type chunck_size: int :param str_fmt: format of the data as written :type str_fmt: string :param full: write out the complete file, mostly for testing. :type full: boolean [ True | False ] :param compress: compress file :type compress: boolean [ True | False ] :param compress_type: compress file using zip or gzip :type compress_type: boolean [ zip | gzip ] """ # get the filename to save to save_fn = self._make_file_name( save_path=save_dir, compression=compress, compress_type=compress_type, ) # get the number of characters in the desired string s_num = int(str_fmt[1 : str_fmt.find(".")]) # convert electric fields into mV/km if convert_electrics: self.convert_electrics() self.logger.debug("==> {0}".format(save_fn)) self.logger.debug("START --> {0}".format(time.ctime())) st = datetime.datetime.now() # write meta data first # sort channel information same as columns meta_lines = self.write_metadata( chn_list=[c.capitalize() for c in self.ts.columns] ) if compress == True and compress_type == "gzip": with gzip.open(save_fn, "wb") as fid: h_line = [ "".join( [ "{0:>{1}}".format(c.capitalize(), s_num) for c in self.ts.columns ] ) ] fid.write("\n".join(meta_lines + h_line) + "\n") # write out data if full is False: out = np.array(self.ts[0:chunk_size]) out[np.where(out == 0)] = float(self.MissingDataFlag) out = np.char.mod(str_fmt, out) lines = "\n".join( ["".join(out[ii, :]) for ii in range(out.shape[0])] ) fid.write(lines + "\n") self.logger.debug("END --> {0}".format(time.ctime())) et = datetime.datetime.now() write_time = et - st self.logger.debug( "Writing took: {0} seconds".format( write_time.total_seconds() ) ) return for chunk in range(int(self.ts.shape[0] / chunk_size)): out = np.array( self.ts[chunk * chunk_size : (chunk + 1) * chunk_size] ) out[np.where(out == 0)] = float(self.MissingDataFlag) out = np.char.mod(str_fmt, out) lines = "\n".join( ["".join(out[ii, :]) for ii in range(out.shape[0])] ) fid.write(lines + "\n") else: if compress == True and compress_type == "zip": self.logger.debug("ZIPPING") save_fn = save_fn[0:-4] zip_file = True self.logger.debug(zip_file) with open(save_fn, "w") as fid: h_line = [ "".join( [ "{0:>{1}}".format(c.capitalize(), s_num) for c in self.ts.columns ] ) ] fid.write("\n".join(meta_lines + h_line) + "\n") # write out data if full is False: out = np.array(self.ts[0:chunk_size]) out[np.where(out == 0)] = float(self.MissingDataFlag) out = np.char.mod(str_fmt, out) lines = "\n".join( ["".join(out[ii, :]) for ii in range(out.shape[0])] ) fid.write(lines + "\n") self.logger.debug("END --> {0}".format(time.ctime())) et = datetime.datetime.now() write_time = et - st self.logger.debug( "Writing took: {0} seconds".format( write_time.total_seconds() ) ) return for chunk in range(int(self.ts.shape[0] / chunk_size)): out = np.array( self.ts[chunk * chunk_size : (chunk + 1) * chunk_size] ) out[np.where(out == 0)] = float(self.MissingDataFlag) out = np.char.mod(str_fmt, out) lines = "\n".join( ["".join(out[ii, :]) for ii in range(out.shape[0])] ) fid.write(lines + "\n") # for some fucking reason, all interal variables don't exist anymore # and if you try to do the zipping nothing happens, so have to do # it externally. WTF self.logger.debug("END --> {0}".format(time.ctime())) et = datetime.datetime.now() write_time = et - st self.logger.debug( "Writing took: {0} seconds".format(write_time.total_seconds()) )
[docs]def read_ascii(fn): """ read USGS ASCII formatted file :param fn: DESCRIPTION :type fn: TYPE :return: DESCRIPTION :rtype: TYPE """ asc_obj = USGSascii(fn) asc_obj.read_ascii_file() return asc_obj.to_run_ts()