Source code for mth5.io.nims.nims

# -*- coding: utf-8 -*-
"""
===============
NIMS
===============

    * deals with reading in NIMS DATA.BIN files
    
    This is a translation from Matlab codes written and edited by:
        * Anna Kelbert
        * Paul Bedrosian
        * Esteban Bowles-Martinez
        * Possibly others.
        
    I've tested it against a version, and it matches.  The data/GPS  gaps I
    still don't understand so for now the time series is just 
    made continuous and the number of missing seconds is clipped from the 
    end of the time series.
    
    .. note:: this only works for 8Hz data for now
    
    
:copyright:
    Jared Peacock (jpeacock@usgs.gov)
    
:license: 
    MIT
"""

# =============================================================================
# Imports
# =============================================================================
import struct
import datetime

import numpy as np
import pandas as pd

from mth5.io.nims.gps import GPS
from mth5.io.nims.header import NIMSHeader
from mth5.io.nims.response_filters import Response
from mth5 import timeseries

from mt_metadata.utils.mttime import MTime
from mt_metadata.timeseries import Station, Run, Electric, Magnetic, Auxiliary

# =============================================================================
# Exceptions
# =============================================================================


[docs]class NIMS(NIMSHeader): """ NIMS Class will read in a NIMS DATA.BIN file. A fast way to read the binary files are to first read in the GPS strings, the third byte in each block as a character and parse that into valid GPS stamps. Then read in the entire data set as unsigned 8 bit integers and reshape the data to be n seconds x block size. Then parse that array into the status information and data. I only have a limited amount of .BIN files to test so this will likely break if there are issues such as data gaps. This has been tested against the matlab program loadNIMS by Anna Kelbert and the match for all the .bin files I have. If something looks weird check it against that program. .. todo:: deal with timing issues, right now a warning is sent to the user need to figure out a way to find where the gap is and adjust accordingly. .. warning:: Currently Only 8 Hz data is supported """ def __init__(self, fn=None): super().__init__(fn) # change thes if the sample rate is different self.block_size = 131 self.block_sequence = [1, self.block_size] self.sample_rate = 8 ### samples/second self.e_conversion_factor = 2.44141221047903e-06 self.h_conversion_factor = 0.01 self.t_conversion_factor = 70 self.t_offset = 18048 self._int_max = 8388608 self._int_factor = 16777216 self._block_dict = { "soh": 0, "block_len": 1, "status": 2, "gps": 3, "sequence": 4, "box_temp": (5, 6), "head_temp": (7, 8), "logic": 81, "end": 130, } self.info_array = None self.stamps = None self.ts_data = None self.gaps = None self.duplicate_list = None self._raw_string = None self.indices = self._make_index_values() def __str__(self): lines = [f"NIMS Station: {self.site_name}", "-" * 20] lines.append(f"NIMS ID: {self.box_id}") lines.append(f"magnetometer ID: {self.mag_id}") lines.append(f"operator: {self.operator}") lines.append(f"location: {self.state_province}, {self.country}") lines.append(f"latitude: {self.latitude} (degrees)") lines.append(f"longitude: {self.longitude} (degrees)") lines.append(f"elevation: {self.elevation} m") lines.append(f"gps stamp: {self.header_gps_stamp}") lines.append( f"EX: length = {self.ex_length} m; azimuth = {self.ex_azimuth}" ) lines.append( f"EY: length = {self.ey_length} m; azimuth = {self.ey_azimuth}" ) lines.append(f"comments: {self.comments}") if self.has_data(): lines.append("") lines.append(f"Start: {self.start_time.isoformat()}") lines.append(f"End: {self.end_time.isoformat()}") lines.append(f"Data shape: {self.ts_data.shape}") lines.append(f"Found {len(self.stamps)} GPS stamps") return "\n".join(lines) def __repr__(self): return self.__str__()
[docs] def has_data(self): if self.ts_data is not None: return True return False
@property def n_samples(self): if self.has_data(): return self.ts_data.shape[0] elif self.fn is not None: return int(self.file_size / 16.375) @property def latitude(self): """ median latitude value from all the GPS stamps in decimal degrees WGS84 Only get from the GPRMC stamp as they should be duplicates """ if self.stamps is not None: latitude = np.zeros(len(self.stamps)) for ii, stamp in enumerate(self.stamps): latitude[ii] = stamp[1][0].latitude return np.median(latitude[np.nonzero(latitude)]) return self.header_gps_latitude @property def longitude(self): """ median longitude value from all the GPS stamps in decimal degrees WGS84 Only get from the first stamp within the sets """ if self.stamps is not None: longitude = np.zeros(len(self.stamps)) for ii, stamp in enumerate(self.stamps): longitude[ii] = stamp[1][0].longitude return np.median(longitude[np.nonzero(longitude)]) return self.header_gps_longitude @property def elevation(self): """ median elevation value from all the GPS stamps in decimal degrees WGS84 Only get from the first stamp within the sets """ if self.stamps is not None: elevation = np.zeros(len(self.stamps)) for ii, stamp in enumerate(self.stamps): if len(stamp[1]) == 1: elev = stamp[1][0].elevation if len(stamp[1]) == 2: elev = stamp[1][1].elevation if elev is None: continue elevation[ii] = elev return np.median(elevation[np.nonzero(elevation)]) return self.header_gps_elevation @property def declination(self): """ median elevation value from all the GPS stamps in decimal degrees WGS84 Only get from the first stamp within the sets """ if self.stamps is not None: declination = np.zeros(len(self.stamps)) for ii, stamp in enumerate(self.stamps): if stamp[1][0].gps_type == "GPRMC": dec = stamp[1][0].declination if dec is None: continue declination[ii] = dec return np.median(declination[np.nonzero(declination)]) return None @property def start_time(self): """ start time is the first good GPS time stamp minus the seconds to the beginning of the time series. """ if self.stamps is not None: return MTime(self.ts_data.index[0]) return self.header_gps_stamp @property def end_time(self): """ start time is the first good GPS time stamp minus the seconds to the beginning of the time series. """ if self.stamps is not None: return MTime(self.ts_data.index[-1]) self.logger.warning("Estimating end time from n_samples") return self.start_time + int(self.n_samples / self.sample_rate) @property def box_temperature(self): """data logger temperature, sampled at 1 second""" if self.ts_data is not None: auxiliary_metadata = Auxiliary() auxiliary_metadata.from_dict( { "channel_number": 6, "component": "temperature", "measurement_azimuth": 0, "measurement_tilt": 0, "sample_rate": 1, "time_period.start": self.start_time.isoformat(), "time_period.end": self.end_time.isoformat(), "type": "auxiliary", "units": "celsius", } ) temp = timeseries.ChannelTS( channel_type="auxiliary", data=self.info_array["box_temp"], channel_metadata=auxiliary_metadata, run_metadata=self.run_metadata, station_metadata=self.station_metadata, ) # interpolate temperature onto the same sample rate as the channels. temp._ts = temp._ts.interp_like(self.hx._ts) temp.channel_metadata.sample_rate = self.sample_rate temp.channel_metadata.time_period.end = self.end_time.isoformat() return temp return None
[docs] def get_channel_response(self, channel, dipole_length=1): """ Get the channel response for a given channel :param channel: DESCRIPTION :type channel: TYPE :param dipole_length: DESCRIPTION, defaults to 1 :type dipole_length: TYPE, optional :return: DESCRIPTION :rtype: TYPE """ nims_filters = Response(sample_rate=self.sample_rate) return nims_filters.get_channel_response( channel, dipole_length=dipole_length )
@property def hx_metadata(self): if self.ts_data is not None: hx_metadata = Magnetic() hx_metadata.from_dict( { "channel_number": 1, "component": "hx", "measurement_azimuth": 0, "measurement_tilt": 0, "sample_rate": self.sample_rate, "time_period.start": self.start_time.isoformat(), "time_period.end": self.end_time.isoformat(), "type": "magnetic", "units": "counts", "sensor.id": self.mag_id, "sensor.manufacturer": "Barry Narod", "sensor.type": "fluxgate triaxial magnetometer", } ) return hx_metadata @property def hx(self): """HX""" if self.ts_data is not None: return timeseries.ChannelTS( channel_type="magnetic", data=self.ts_data.hx.to_numpy(), channel_metadata=self.hx_metadata, run_metadata=self.run_metadata, station_metadata=self.station_metadata, channel_response_filter=self.get_channel_response("hx"), ) return None @property def hy_metadata(self): if self.ts_data is not None: hy_metadata = Magnetic() hy_metadata.from_dict( { "channel_number": 2, "component": "hy", "measurement_azimuth": 90, "measurement_tilt": 0, "sample_rate": self.sample_rate, "time_period.start": self.start_time.isoformat(), "time_period.end": self.end_time.isoformat(), "type": "magnetic", "units": "counts", "sensor.id": self.mag_id, "sensor.manufacturer": "Barry Narod", "sensor.type": "fluxgate triaxial magnetometer", } ) return hy_metadata @property def hy(self): """HY""" if self.ts_data is not None: return timeseries.ChannelTS( channel_type="magnetic", data=self.ts_data.hy.to_numpy(), channel_metadata=self.hy_metadata, run_metadata=self.run_metadata, station_metadata=self.station_metadata, channel_response_filter=self.get_channel_response("hy"), ) return None @property def hz_metadata(self): if self.ts_data is not None: hz_metadata = Magnetic() hz_metadata.from_dict( { "channel_number": 3, "component": "hz", "measurement_azimuth": 0, "measurement_tilt": 0, "sample_rate": self.sample_rate, "time_period.start": self.start_time.isoformat(), "time_period.end": self.end_time.isoformat(), "type": "magnetic", "units": "counts", "sensor.id": self.mag_id, "sensor.manufacturer": "Barry Narod", "sensor.type": "fluxgate triaxial magnetometer", } ) return hz_metadata @property def hz(self): """HZ""" if self.ts_data is not None: return timeseries.ChannelTS( channel_type="magnetic", data=self.ts_data.hz.to_numpy(), channel_metadata=self.hz_metadata, run_metadata=self.run_metadata, station_metadata=self.station_metadata, channel_response_filter=self.get_channel_response("hz"), ) return None @property def ex_metadata(self): if self.ts_data is not None: ex_metadata = Electric() ex_metadata.from_dict( { "channel_number": 4, "component": "ex", "measurement_azimuth": self.ex_azimuth, "measurement_tilt": 0, "sample_rate": self.sample_rate, "dipole_length": self.ex_length, "time_period.start": self.start_time.isoformat(), "time_period.end": self.end_time.isoformat(), "type": "electric", "units": "counts", "negative.id": self.s_electrode_id, "positive.id": self.n_electrode_id, } ) return ex_metadata @property def ex(self): """EX""" if self.ts_data is not None: return timeseries.ChannelTS( channel_type="electric", data=self.ts_data.ex.to_numpy(), channel_metadata=self.ex_metadata, run_metadata=self.run_metadata, station_metadata=self.station_metadata, channel_response_filter=self.get_channel_response( "ex", self.ex_length ), ) return None @property def ey_metadata(self): if self.ts_data is not None: ey_metadata = Electric() ey_metadata.from_dict( { "channel_number": 5, "component": "ey", "measurement_azimuth": self.ey_azimuth, "measurement_tilt": 0, "sample_rate": self.sample_rate, "dipole_length": self.ey_length, "time_period.start": self.start_time.isoformat(), "time_period.end": self.end_time.isoformat(), "type": "electric", "units": "counts", "negative.id": self.w_electrode_id, "positive.id": self.e_electrode_id, } ) return ey_metadata @property def ey(self): """EY""" if self.ts_data is not None: return timeseries.ChannelTS( channel_type="electric", data=self.ts_data.ey.to_numpy(), channel_metadata=self.ey_metadata, run_metadata=self.run_metadata, station_metadata=self.station_metadata, channel_response_filter=self.get_channel_response( "ey", self.ey_length ), ) return None @property def run_metadata(self): """Run metadata""" if self.ts_data is not None: run_metadata = Run() run_metadata.from_dict( { "Run": { "comments": self.comments, "data_logger.firmware.author": "B. Narod", "data_logger.firmware.name": "nims", "data_logger.firmware.version": "1.0", "data_logger.manufacturer": "Narod", "data_logger.model": self.box_id, "data_logger.id": self.box_id, "data_logger.type": "long period", "id": self.run_id, "data_type": "MTLP", "sample_rate": self.sample_rate, "time_period.end": self.end_time.isoformat(), "time_period.start": self.start_time.isoformat(), } } ) for comp in ["hx", "hy", "hz", "ex", "ey"]: run_metadata.channels.append(getattr(self, f"{comp}_metadata")) return run_metadata return None @property def station_metadata(self): """Station metadata from nims file""" if self.ts_data is not None: station_metadata = Station() station_metadata.from_dict( { "Station": { "geographic_name": f"{self.site_name}, {self.state_province}, {self.country}", "location.declination.value": self.declination, "location.elevation": self.elevation, "location.latitude": self.latitude, "location.longitude": self.longitude, "id": self.run_id[0:-1], "orientation.reference_frame": "geomagnetic", } } ) station_metadata.runs.append(self.run_metadata) return station_metadata return None
[docs] def to_runts(self, calibrate=False): """Get xarray for run""" if self.ts_data is not None: run = timeseries.RunTS( array_list=[ self.hx, self.hy, self.hz, self.ex, self.ey, self.box_temperature, ], run_metadata=self.run_metadata, station_metadata=self.station_metadata, ) if calibrate: return run.calibrate() else: return run return None
def _make_index_values(self): """ Index values for the channels recorded """ ### make an array of index values for magnetics and electrics indices = np.zeros((8, 5), dtype=int) for kk in range(8): ### magnetic blocks for ii in range(3): indices[kk, ii] = 9 + (kk) * 9 + (ii) * 3 ### electric blocks for ii in range(2): indices[kk, 3 + ii] = 82 + (kk) * 6 + (ii) * 3 return indices def _get_gps_string_list(self, nims_string): """ get the gps strings from the raw string output by the NIMS. This will take the 3rd value in each block, concatenate into a long string and then make a list by splitting by '$'. The index values of where the '$' are found are also calculated. :param str nims_string: raw binary string output by NIMS :returns: list of index values associated with the location of the '$' :returns: list of possible raw GPS strings .. note:: This assumes that there are an even amount of data blocks. Might be a bad assumption """ ### get index values of $ and gps_strings index_values = [] gps_str_list = [] for ii in range(int(len(nims_string) / self.block_size)): index = ii * self.block_size + 3 g_char = struct.unpack("c", nims_string[index : index + 1])[0] if g_char == b"$": index_values.append((index - 3) / self.block_size) gps_str_list.append(g_char) gps_raw_stamp_list = b"".join(gps_str_list).split(b"$") return index_values, gps_raw_stamp_list
[docs] def get_stamps(self, nims_string): """ get a list of valid GPS strings and match synchronous GPRMC with GPGGA stamps if possible. :param str nims_string: raw GPS string output by NIMS """ ### read in GPS strings into a list to be parsed later index_list, gps_raw_stamp_list = self._get_gps_string_list(nims_string) gprmc_list = [] gpgga_list = [] ### note we are skipping the first entry, it tends to be not ### complete anyway for ii, index, raw_stamp in zip( range(len(index_list)), index_list, gps_raw_stamp_list[1:] ): gps_obj = GPS(raw_stamp, index) if gps_obj.valid: if gps_obj.gps_type == "GPRMC": gprmc_list.append(gps_obj) elif gps_obj.gps_type == "GPGGA": gpgga_list.append(gps_obj) else: self.logger.debug( f"GPS Error: file index {index}, stamp number {ii}" ) max_len = min([len(raw_stamp), 15]) self.logger.debug(f"GPS Raw Stamp: {raw_stamp[0:max_len]}") return self._gps_match_gprmc_gpgga_strings(gprmc_list, gpgga_list)
def _gps_match_gprmc_gpgga_strings(self, gprmc_list, gpgga_list): """ match GPRMC and GPGGA strings together into a list [[GPRMC, GPGGA], ...] :param list gprmc_list: list of GPS objects for the GPRMC stamps :param list gpgga_list: list of GPS objects for the GPGGA stamps :returns: list of matched GPRMC and GPGGA stamps """ ### match up the GPRMC and GPGGA together gps_match_list = [] for gprmc in gprmc_list: find = False for ii, gpgga in enumerate(gpgga_list): if gprmc.time_stamp.time() == gpgga.time_stamp.time(): gps_match_list.append([gprmc, gpgga]) find = True del gpgga_list[ii] break if not find: gps_match_list.append([gprmc]) return gps_match_list def _get_gps_stamp_indices_from_status(self, status_array): """ get the index location of the stamps from the status array assuming that 0 indicates GPS lock. :param :class:`np.ndarray` status_array: an array of status values from data blocks :returns: array of index values where GPS lock was acquired ignoring sequential locks. """ index_values = np.where(status_array == 0)[0] status_index = np.zeros_like(index_values) for ii in range(index_values.size): if index_values[ii] - index_values[ii - 1] == 1: continue else: status_index[ii] = index_values[ii] status_index = status_index[np.nonzero(status_index)] return status_index
[docs] def match_status_with_gps_stamps(self, status_array, gps_list): """ Match the index values from the status array with the index values of the GPS stamps. There appears to be a bit of wiggle room between when the lock is recorded and the stamp was actually recorded. This is typically 1 second and sometimes 2. :param array status_array: array of status values from each data block :param list gps_list: list of valid GPS stamps [[GPRMC, GPGGA], ...] .. note:: I think there is a 2 second gap between the lock and the first stamp character. """ stamp_indices = self._get_gps_stamp_indices_from_status(status_array) gps_stamps = [] for index in stamp_indices: stamp_find = False for ii, stamps in enumerate(gps_list): index_diff = stamps[0].index - index ### check the index value, should be 2 or 74, if it is off by ### a value left or right apply a correction. if index_diff == 1 or index_diff == 73: index += 1 stamps[0].index += 1 elif index_diff == 2 or index_diff == 74: index = index elif index_diff == 3 or index_diff == 75: index -= 1 stamps[0].index -= 1 elif index_diff == 4 or index_diff == 76: index -= 2 stamps[0].index -= 2 if stamps[0].gps_type in ["GPRMC", "gprmc"]: if index_diff in [1, 2, 3, 4]: gps_stamps.append((index, stamps)) stamp_find = True del gps_list[ii] break elif stamps[0].gps_type in ["GPGGA", "gpgga"]: if index_diff in [73, 74, 75, 76]: gps_stamps.append((index, stamps)) stamp_find = True del gps_list[ii] break if not stamp_find: self.logger.debug( f"GPS Error: No good GPS stamp at {index} seconds" ) return gps_stamps
[docs] def find_sequence(self, data_array, block_sequence=None): """ find a sequence in a given array :param array data_array: array of the data with shape [n, m] where n is the number of seconds recorded m is the block length for a given sampling rate. :param list block_sequence: sequence pattern to locate *default* is [1, 131] the start of a data block. :returns: array of index locations where the sequence is found. """ if block_sequence is not None: self.block_sequence = block_sequence # want to find the index there the test data is equal to the test sequence t = np.vstack( [ np.roll(data_array, shift) for shift in -np.arange(len(self.block_sequence)) ] ).T return np.where(np.all(t == self.block_sequence, axis=1))[0]
[docs] def unwrap_sequence(self, sequence): """ unwrap the sequence to be sequential numbers instead of modulated by 256. sets the first number to 0 :param list sequence: sequence of bytes numbers :return: unwrapped number of counts """ count = 0 unwrapped = np.zeros_like(sequence) for ii, seq in enumerate(sequence): unwrapped[ii] = seq + count * 256 if seq == 255: count += 1 unwrapped -= unwrapped[0] return unwrapped
def _locate_duplicate_blocks(self, sequence): """ locate the sequence number where the duplicates exist :param list sequence: sequence to match duplicate numbers. :returns: list of duplicate index values. """ duplicates = np.where(np.abs(np.diff(sequence)) == 0)[0] if len(duplicates) == 0: return None duplicate_list = [] for dup in duplicates: dup_dict = {} dup_dict["sequence_index"] = dup dup_dict["ts_index_0"] = dup * self.sample_rate dup_dict["ts_index_1"] = dup * self.sample_rate + self.sample_rate dup_dict["ts_index_2"] = (dup + 1) * self.sample_rate dup_dict["ts_index_3"] = ( dup + 1 ) * self.sample_rate + self.sample_rate duplicate_list.append(dup_dict) return duplicate_list def _check_duplicate_blocks(self, block_01, block_02, info_01, info_02): """ make sure the blocks are truly duplicates :param np.array block_01: block of data to compare :param np.array block_02: block of data to compare :param np.array info_01: information array from info_array[sequence_index] :param np.array info_02: information array from info_array[sequence_index] :returns: boolean if the blocks and information match """ if np.array_equal(block_01, block_02): if np.array_equal(info_01, info_02): return True else: return False else: return False
[docs] def remove_duplicates(self, info_array, data_array): """ remove duplicate blocks, removing the first duplicate as suggested by Paul and Anna. Checks to make sure that the mag data are identical for the duplicate blocks. Removes the blocks from the information and data arrays and returns the reduced arrays. This should sync up the timing of GPS stamps and index values. :param np.array info_array: structured array of block information :param np.array data_array: structured array of the data :returns: reduced information array :returns: reduced data array :returns: index of duplicates in raw data """ ### locate duplicate_test_list = self._locate_duplicate_blocks( self.info_array["sequence"] ) if duplicate_test_list is None: return info_array, data_array, None duplicate_list = [] for d in duplicate_test_list: if self._check_duplicate_blocks( data_array[d["ts_index_0"] : d["ts_index_1"]], data_array[d["ts_index_2"] : d["ts_index_3"]], info_array[d["sequence_index"]], info_array[d["sequence_index"] + 1], ): duplicate_list.append(d) self.logger.debug(f"Deleting {len(duplicate_list)} duplicate blocks") ### get the index of the blocks to be removed, namely the 1st duplicate ### block remove_sequence_index = [d["sequence_index"] for d in duplicate_list] remove_data_index = np.array( [ np.arange(d["ts_index_0"], d["ts_index_1"], 1) for d in duplicate_list ] ).flatten() ### remove the data return_info_array = np.delete(info_array, remove_sequence_index) return_data_array = np.delete(data_array, remove_data_index) ### set sequence to be monotonic return_info_array["sequence"][:] = np.arange(return_info_array.shape[0]) return return_info_array, return_data_array, duplicate_list
[docs] def read_nims(self, fn=None): """ Read NIMS DATA.BIN file. 1. Read in the header information and stores those as attributes with the same names as in the header file. 2. Locate the beginning of the data blocks by looking for the first [1, 131, ...] combo. Anything before that is cut out. 3. Make sure the data is a multiple of the block length, if the data is longer the extra bits are cut off. 4. Read in the GPS data (3rd byte of each block) as characters. Parses those into valid GPS stamps with appropriate index locations of where the '$' was found. 5. Read in the data as unsigned 8-bit integers and reshape the array into [N, data_block_length]. Parse this array into the status information and the data. 6. Remove duplicate blocks, by removing the first of the duplicates as suggested by Anna and Paul. 7. Match the GPS locks from the status with valid GPS stamps. 8. Check to make sure that there is the correct number of seconds between the first and last GPS stamp. The extra seconds are cut off from the end of the time series. Not sure if this is the best way to accommodate gaps in the data. .. note:: The data and information array returned have the duplicates removed and the sequence reset to be monotonic. :param str fn: full path to DATA.BIN file :Example: >>> from mth5.io import nims >>> n = nims.NIMS(r"/home/mt_data/nims/mt001.bin") """ if fn is not None: self.fn = fn st = datetime.datetime.now() ### read in header information and get the location of end of header self.read_header(self.fn) ### load in the entire file, its not too big, start from the ### end of the header information. with open(self.fn, "rb") as fid: fid.seek(self.data_start_seek) self._raw_string = fid.read() ### read in full string as unsigned integers data = np.frombuffer(self._raw_string, dtype=np.uint8) ### need to make sure that the data starts with a full block find_first = self.find_sequence(data[0 : self.block_size * 5])[0] data = data[find_first:] ### get GPS stamps from the binary string first self.gps_list = self.get_stamps(self._raw_string[find_first:]) ### check the size of the data, should have an equal amount of blocks if (data.size % self.block_size) != 0: self.logger.warning( f"odd number of bytes {data.size}, not even blocks " + "cutting down the data by {0} bits".format( data.size % self.block_size ) ) end_data = data.size - (data.size % self.block_size) data = data[0:end_data] # resized the data into an even amount of blocks data = data.reshape((int(data.size / self.block_size), self.block_size)) ### need to parse the data ### first get the status information self.info_array = np.zeros( data.shape[0], dtype=[ ("soh", int), ("block_len", int), ("status", int), ("gps", int), ("sequence", int), ("box_temp", float), ("head_temp", float), ("logic", int), ("end", int), ], ) for key, index in self._block_dict.items(): if "temp" in key: # compute temperature t_value = data[:, index[0]] * 256 + data[:, index[1]] # something to do with the bits where you have to subtract t_value[np.where(t_value > 32768)] -= 65536 value = (t_value - self.t_offset) / self.t_conversion_factor else: value = data[:, index] self.info_array[key][:] = value ### unwrap sequence self.info_array["sequence"] = self.unwrap_sequence( self.info_array["sequence"] ) ### get data data_array = np.zeros( data.shape[0] * self.sample_rate, dtype=[ ("hx", float), ("hy", float), ("hz", float), ("ex", float), ("ey", float), ], ) ### fill the data for cc, comp in enumerate(["hx", "hy", "hz", "ex", "ey"]): channel_arr = np.zeros((data.shape[0], 8), dtype=float) for kk in range(self.sample_rate): index = self.indices[kk, cc] value = (data[:, index] * 256 + data[:, index + 1]) * np.array( [256] ) + data[:, index + 2] value[np.where(value > self._int_max)] -= self._int_factor channel_arr[:, kk] = value data_array[comp][:] = channel_arr.flatten() ### clean things up ### I guess that the E channels are opposite phase? for comp in ["ex", "ey"]: data_array[comp] *= -1 ### remove duplicates ( self.info_array, data_array, self.duplicate_list, ) = self.remove_duplicates(self.info_array, data_array) ### get GPS stamps with index values self.stamps = self.match_status_with_gps_stamps( self.info_array["status"], self.gps_list ) ### align data checking for timing gaps self.ts_data = self.align_data(data_array, self.stamps) et = datetime.datetime.now() read_time = (et - st).total_seconds() self.logger.debug(f"Reading took {read_time:.2f} seconds")
def _get_first_gps_stamp(self, stamps): """ get the first GPRMC stamp """ for stamp in stamps: if stamp[1][0].gps_type in ["gprmc", "GPRMC"]: return stamp return None def _get_last_gps_stamp(self, stamps): """ get the last gprmc stamp """ for stamp in stamps[::-1]: if stamp[1][0].gps_type in ["gprmc", "GPRMC"]: return stamp return None def _locate_timing_gaps(self, stamps): """ locate timing gaps in the data by comparing the stamp index with the GPS time stamp. The number of points and seconds should be the same :param list stamps: list of GPS stamps [[status_index, [GPRMC, GPGGA]]] :returns: list of gap index values """ stamp_01 = self._get_first_gps_stamp(stamps)[1][0] # current_gap = 0 current_stamp = stamp_01 gap_beginning = [] total_gap = 0 for ii, stamp in enumerate(stamps[1:], 1): stamp = stamp[1][0] # can only compare those with a date and time. if stamp.gps_type == "GPGGA": continue time_diff = ( stamp.time_stamp - current_stamp.time_stamp ).total_seconds() index_diff = stamp.index - current_stamp.index time_gap = index_diff - time_diff if time_gap == 0: continue elif time_gap > 0: total_gap += time_gap current_stamp = stamp gap_beginning.append(stamp.index) self.logger.debug( "GPS tamp at {0} is off from previous time by {1} seconds".format( stamp.time_stamp.isoformat(), time_gap, ) ) self.logger.warning(f"Timing is off by {total_gap} seconds") return gap_beginning
[docs] def check_timing(self, stamps): """ make sure that there are the correct number of seconds in between the first and last GPS GPRMC stamps :param list stamps: list of GPS stamps [[status_index, [GPRMC, GPGGA]]] :returns: [ True | False ] if data is valid or not. :returns: gap index locations .. note:: currently it is assumed that if a data gap occurs the data can be squeezed to remove them. Probably a more elegant way of doing it. """ gaps = None first_stamp = self._get_first_gps_stamp(stamps)[1][0] last_stamp = self._get_last_gps_stamp(stamps)[1][0] time_diff = last_stamp.time_stamp - first_stamp.time_stamp index_diff = last_stamp.index - first_stamp.index difference = index_diff - time_diff.total_seconds() if difference != 0: gaps = self._locate_timing_gaps(stamps) return False, gaps, difference return True, gaps, difference
[docs] def align_data(self, data_array, stamps): """ Need to match up the first good GPS stamp with the data Do this by using the first GPS stamp and assuming that the time from the first time stamp to the start is the index value. put the data into a pandas data frame that is indexed by time :param array data_array: structure array with columns for each component [hx, hy, hz, ex, ey] :param list stamps: list of GPS stamps [[status_index, [GPRMC, GPGGA]]] :returns: pandas DataFrame with colums of components and indexed by time initialized by the start time. .. note:: Data gaps are squeezed cause not sure what a gap actually means. """ ### check timing first to make sure there is no drift timing_valid, self.gaps, time_difference = self.check_timing(stamps) ### need to trim off the excess number of points that are present because of ### data gaps. This will be the time difference times the sample rate if time_difference > 0: remove_points = int(time_difference * self.sample_rate) data_array = data_array[0:-remove_points] self.logger.info( f"Trimmed {remove_points} points off the end of the time " "series because of timing gaps" ) ### first GPS stamp within the data is at a given index that is ### assumed to be the number of seconds from the start of the run. ### therefore make the start time the first GPS stamp time minus ### the index value for that stamp. ### need to be sure that the first GPS stamp has a date, need GPRMC first_stamp = self._get_first_gps_stamp(stamps) first_index = first_stamp[0] start_time = first_stamp[1][0].time_stamp - datetime.timedelta( seconds=int(first_index) ) dt_index = self.make_dt_index( start_time.isoformat(), self.sample_rate, n_samples=data_array.shape[0], ) return pd.DataFrame(data_array, index=dt_index)
[docs] def make_dt_index( self, start_time, sample_rate, stop_time=None, n_samples=None ): """ make time index array .. note:: date-time format should be YYYY-M-DDThh:mm:ss.ms UTC :param start_time: start time :type start_time: string :param end_time: end time :type end_time: string :param sample_rate: sample_rate in samples/second :type sample_rate: float """ # set the index to be UTC time dt_freq = "{0:.0f}N".format(1.0 / (sample_rate) * 1e9) if stop_time is not None: dt_index = pd.date_range( start=start_time, end=stop_time, freq=dt_freq, closed="left", tz="UTC", ) elif n_samples is not None: dt_index = pd.date_range( start=start_time, periods=n_samples, freq=dt_freq, tz="UTC" ) else: raise ValueError("Need to input either stop_time or n_samples") return dt_index
# ============================================================================= # convenience read # =============================================================================
[docs]def read_nims(fn): """ :param fn: DESCRIPTION :type fn: TYPE :return: DESCRIPTION :rtype: TYPE """ nims_obj = NIMS(fn) nims_obj.read_nims() return nims_obj.to_runts()