Source code for mth5.io.nims.nims

# -*- coding: utf-8 -*-
"""
===============
NIMS
===============

    * deals with reading in NIMS DATA.BIN files

    This is a translation from Matlab codes written and edited by:
        * Anna Kelbert
        * Paul Bedrosian
        * Esteban Bowles-Martinez
        * Possibly others.

    I've tested it against a version, and it matches.  The data/GPS  gaps I
    still don't understand so for now the time series is just
    made continuous and the number of missing seconds is clipped from the
    end of the time series.

    .. note:: this only works for 8Hz data for now


:copyright:
    Jared Peacock (jpeacock@usgs.gov)

:license:
    MIT
"""

from __future__ import annotations

import datetime
import struct
from pathlib import Path
from typing import Any, Optional, Union

# =============================================================================
# Imports
# =============================================================================
import numpy as np
import pandas as pd
from mt_metadata.common.mttime import MTime
from mt_metadata.timeseries import Auxiliary, Electric, Magnetic, Run, Station

from mth5 import timeseries
from mth5.io.nims.gps import GPS
from mth5.io.nims.header import NIMSHeader
from mth5.io.nims.response_filters import Response


# =============================================================================
# Exceptions
# =============================================================================


[docs] class NIMS(NIMSHeader): """ NIMS Class for reading NIMS DATA.BIN files. A fast way to read the binary files are to first read in the GPS strings, the third byte in each block as a character and parse that into valid GPS stamps. Then read in the entire data set as unsigned 8 bit integers and reshape the data to be n seconds x block size. Then parse that array into the status information and data. Parameters ---------- fn : str or Path, optional Path to the NIMS DATA.BIN file to read, by default None Attributes ---------- block_size : int Size of data blocks (default 131 for 8 Hz data) block_sequence : list of int Sequence pattern to locate [1, 131] sample_rate : int Sample rate in samples/second (default 8) e_conversion_factor : float Electric field conversion factor h_conversion_factor : float Magnetic field conversion factor t_conversion_factor : float Temperature conversion factor t_offset : int Temperature offset value info_array : ndarray or None Structured array of block information stamps : list or None List of valid GPS stamps ts_data : DataFrame or None Time series data as pandas DataFrame gaps : list or None List of timing gaps found in data duplicate_list : list or None List of duplicate blocks found Notes ----- I only have a limited amount of .BIN files to test so this will likely break if there are issues such as data gaps. This has been tested against the matlab program loadNIMS by Anna Kelbert and the match for all the .bin files I have. If something looks weird check it against that program. .. todo:: Deal with timing issues, right now a warning is sent to the user need to figure out a way to find where the gap is and adjust accordingly. .. warning:: Currently Only 8 Hz data is supported Examples -------- >>> from mth5.io.nims import nims >>> n = nims.NIMS(r"/home/mt_data/nims/mt001.bin") >>> n.read_nims() """ def __init__(self, fn: Optional[Union[str, Path]] = None) -> None: super().__init__(fn) # change thes if the sample rate is different
[docs] self.block_size = 131
[docs] self.block_sequence = [1, self.block_size]
[docs] self.sample_rate = 8 ### samples/second
[docs] self.e_conversion_factor = 2.44141221047903e-06
[docs] self.h_conversion_factor = 0.01
[docs] self.t_conversion_factor = 70
[docs] self.t_offset = 18048
self._int_max = 8388608 self._int_factor = 16777216 self._block_dict = { "soh": 0, "block_len": 1, "status": 2, "gps": 3, "sequence": 4, "box_temp": (5, 6), "head_temp": (7, 8), "logic": 81, "end": 130, }
[docs] self.info_array = None
[docs] self.stamps = None
[docs] self.ts_data = None
[docs] self.gaps = None
[docs] self.duplicate_list = None
self._raw_string = None
[docs] self.indices = self._make_index_values()
def __str__(self) -> str: """ Return string representation of NIMS object. Returns ------- str Formatted string with NIMS station information and data summary """ lines = [f"NIMS Station: {self.site_name}", "-" * 20] lines.append(f"NIMS ID: {self.box_id}") lines.append(f"magnetometer ID: {self.mag_id}") lines.append(f"operator: {self.operator}") lines.append(f"location: {self.state_province}, {self.country}") lines.append(f"latitude: {self.latitude} (degrees)") lines.append(f"longitude: {self.longitude} (degrees)") lines.append(f"elevation: {self.elevation} m") lines.append(f"gps stamp: {self.header_gps_stamp}") lines.append(f"EX: length = {self.ex_length} m; azimuth = {self.ex_azimuth}") lines.append(f"EY: length = {self.ey_length} m; azimuth = {self.ey_azimuth}") lines.append(f"comments: {self.comments}") if self.has_data(): lines.append("") lines.append(f"Start: {self.start_time.isoformat()}") lines.append(f"End: {self.end_time.isoformat()}") lines.append(f"Data shape: {self.ts_data.shape}") lines.append(f"Found {len(self.stamps)} GPS stamps") return "\n".join(lines) def __repr__(self) -> str: """ Return string representation for debugging. Returns ------- str String representation of the NIMS object """ return self.__str__()
[docs] def has_data(self) -> bool: """ Check if the NIMS object contains time series data. Returns ------- bool True if ts_data is not None, False otherwise """ if self.ts_data is not None: return True return False
@property
[docs] def n_samples(self) -> Optional[int]: """ Number of samples in the time series. Returns ------- int or None Number of samples if data is loaded, estimated from file size if file exists, None otherwise """ if self.has_data(): return self.ts_data.shape[0] elif self.fn is not None: return int(self.file_size / 16.375)
@property
[docs] def latitude(self) -> Optional[float]: """ Median latitude value from all GPS stamps. Returns ------- float or None Median latitude in decimal degrees (WGS84) from GPRMC stamps, or header GPS latitude if no stamps available Notes ----- Only uses GPRMC stamps as they should be duplicates of GPGGA stamps but include additional validation. """ if self.stamps is not None: latitude = np.zeros(len(self.stamps)) for ii, stamp in enumerate(self.stamps): latitude[ii] = stamp[1][0].latitude return np.median(latitude[np.nonzero(latitude)]) return self.header_gps_latitude
@property
[docs] def longitude(self) -> Optional[float]: """ Median longitude value from all GPS stamps. Returns ------- float or None Median longitude in decimal degrees (WGS84) from GPS stamps, or header GPS longitude if no stamps available Notes ----- Uses the first stamp within each GPS stamp set. """ if self.stamps is not None: longitude = np.zeros(len(self.stamps)) for ii, stamp in enumerate(self.stamps): longitude[ii] = stamp[1][0].longitude return np.median(longitude[np.nonzero(longitude)]) return self.header_gps_longitude
@property
[docs] def elevation(self) -> Optional[float]: """ Median elevation value from all GPS stamps. Returns ------- float or None Median elevation in meters (WGS84) from GPS stamps, or header GPS elevation if no stamps available Notes ----- Uses the first stamp within each GPS stamp set. For paired stamps (GPRMC/GPGGA), uses the GPGGA elevation if available. """ if self.stamps is not None: elevation = np.zeros(len(self.stamps)) for ii, stamp in enumerate(self.stamps): if len(stamp[1]) == 1: elev = stamp[1][0].elevation if len(stamp[1]) == 2: elev = stamp[1][1].elevation if elev is None: continue elevation[ii] = elev return np.median(elevation[np.nonzero(elevation)]) return self.header_gps_elevation
@property
[docs] def declination(self) -> Optional[float]: """ Median magnetic declination value from all GPS stamps. Returns ------- float or None Median magnetic declination in decimal degrees from GPRMC stamps, or None if no declination data available Notes ----- Only uses GPRMC stamps as they contain declination information. """ if self.stamps is not None: declination = np.zeros(len(self.stamps)) for ii, stamp in enumerate(self.stamps): if stamp[1][0].gps_type == "GPRMC": dec = stamp[1][0].declination if dec is None: continue declination[ii] = dec return np.median(declination[np.nonzero(declination)]) return None
@property
[docs] def start_time(self) -> MTime: """ Start time of the time series data. Returns ------- MTime Start time derived from the first GPS time stamp index, or header GPS stamp if no time series data available Notes ----- The start time is calculated from the first good GPS time stamp minus the seconds to the beginning of the time series. """ if self.stamps is not None: return MTime(time_stamp=self.ts_data.index[0]) return self.header_gps_stamp
@property
[docs] def end_time(self) -> MTime: """ End time of the time series data. Returns ------- MTime End time derived from the last time series index, or estimated from start time and number of samples Notes ----- If time series data is available, uses the last timestamp. Otherwise estimates end time from start time plus duration calculated from number of samples and sample rate. """ if self.stamps is not None: return MTime(time_stamp=self.ts_data.index[-1]) self.logger.warning("Estimating end time from n_samples") return self.start_time + int(self.n_samples / self.sample_rate)
@property
[docs] def box_temperature(self) -> Optional[timeseries.ChannelTS]: """ Data logger temperature channel. Returns ------- ChannelTS or None Temperature channel sampled at 1 second, interpolated to match the time series sample rate, or None if no time series data Notes ----- Temperature is measured in Celsius and interpolated onto the same time grid as the magnetic and electric field channels. """ if self.ts_data is not None: auxiliary_metadata = Auxiliary() auxiliary_metadata.from_dict( { "channel_number": 6, "component": "temperature", "measurement_azimuth": 0, "measurement_tilt": 0, "sample_rate": 1, "time_period.start": self.start_time.isoformat(), "time_period.end": self.end_time.isoformat(), "type": "auxiliary", "units": "celsius", } ) temp = timeseries.ChannelTS( channel_type="auxiliary", data=self.info_array["box_temp"], channel_metadata=auxiliary_metadata, run_metadata=self.run_metadata, station_metadata=self.station_metadata, ) # interpolate temperature onto the same sample rate as the channels. temp.data_array = temp.data_array.interp_like(self.hx.data_array) temp.channel_metadata.sample_rate = self.sample_rate temp.channel_metadata.time_period.end = self.end_time.isoformat() return temp return None
[docs] def get_channel_response(self, channel: str, dipole_length: float = 1) -> Any: """ Get the channel response for a given channel. Parameters ---------- channel : str Channel identifier (e.g., 'hx', 'hy', 'hz', 'ex', 'ey') dipole_length : float, optional Dipole length for electric field channels, by default 1 Returns ------- Any Channel response object from the NIMS response filters Notes ----- Uses the NIMS response filters to generate appropriate response functions for magnetic and electric field channels at the current sample rate. """ nims_filters = Response(sample_rate=self.sample_rate) return nims_filters.get_channel_response(channel, dipole_length=dipole_length)
@property
[docs] def hx_metadata(self) -> Optional[Magnetic]: """ Metadata for the HX magnetic field channel. Returns ------- Magnetic or None Magnetic field metadata object for the HX channel, or None if no time series data is loaded """ if self.ts_data is not None: hx_metadata = Magnetic() hx_metadata.from_dict( { "channel_number": 1, "component": "hx", "measurement_azimuth": 0, "measurement_tilt": 0, "sample_rate": self.sample_rate, "time_period.start": self.start_time.isoformat(), "time_period.end": self.end_time.isoformat(), "type": "magnetic", "units": "counts", "sensor.id": self.mag_id, "sensor.manufacturer": "Barry Narod", "sensor.type": "fluxgate triaxial magnetometer", } ) return hx_metadata
@property
[docs] def hx(self) -> Optional[timeseries.ChannelTS]: """ HX magnetic field channel time series. Returns ------- ChannelTS or None Time series data for the HX magnetic field component, or None if no time series data is loaded """ if self.ts_data is not None: return timeseries.ChannelTS( channel_type="magnetic", data=self.ts_data.hx.to_numpy(), channel_metadata=self.hx_metadata, run_metadata=self.run_metadata, station_metadata=self.station_metadata, channel_response=self.get_channel_response("hx"), ) return None
@property
[docs] def hy_metadata(self) -> Optional[Magnetic]: """ Metadata for the HY magnetic field channel. Returns ------- Magnetic or None Magnetic field metadata object for the HY channel, or None if no time series data is loaded """ if self.ts_data is not None: hy_metadata = Magnetic() hy_metadata.from_dict( { "channel_number": 2, "component": "hy", "measurement_azimuth": 90, "measurement_tilt": 0, "sample_rate": self.sample_rate, "time_period.start": self.start_time.isoformat(), "time_period.end": self.end_time.isoformat(), "type": "magnetic", "units": "counts", "sensor.id": self.mag_id, "sensor.manufacturer": "Barry Narod", "sensor.type": "fluxgate triaxial magnetometer", } ) return hy_metadata
@property
[docs] def hy(self) -> Optional[timeseries.ChannelTS]: """ HY magnetic field channel time series. Returns ------- ChannelTS or None Time series data for the HY magnetic field component, or None if no time series data is loaded """ if self.ts_data is not None: return timeseries.ChannelTS( channel_type="magnetic", data=self.ts_data.hy.to_numpy(), channel_metadata=self.hy_metadata, run_metadata=self.run_metadata, station_metadata=self.station_metadata, channel_response=self.get_channel_response("hy"), ) return None
@property
[docs] def hz_metadata(self) -> Optional[Magnetic]: """ Metadata for the HZ magnetic field channel. Returns ------- Magnetic or None Magnetic field metadata object for the HZ channel, or None if no time series data is loaded """ if self.ts_data is not None: hz_metadata = Magnetic() hz_metadata.from_dict( { "channel_number": 3, "component": "hz", "measurement_azimuth": 0, "measurement_tilt": 0, "sample_rate": self.sample_rate, "time_period.start": self.start_time.isoformat(), "time_period.end": self.end_time.isoformat(), "type": "magnetic", "units": "counts", "sensor.id": self.mag_id, "sensor.manufacturer": "Barry Narod", "sensor.type": "fluxgate triaxial magnetometer", } ) return hz_metadata
@property
[docs] def hz(self) -> Optional[timeseries.ChannelTS]: """ HZ magnetic field channel time series. Returns ------- ChannelTS or None Time series data for the HZ magnetic field component, or None if no time series data is loaded """ """HZ""" if self.ts_data is not None: return timeseries.ChannelTS( channel_type="magnetic", data=self.ts_data.hz.to_numpy(), channel_metadata=self.hz_metadata, run_metadata=self.run_metadata, station_metadata=self.station_metadata, channel_response=self.get_channel_response("hz"), ) return None
@property
[docs] def ex_metadata(self) -> Optional[Electric]: """ Metadata for the EX electric field channel. Returns ------- Electric or None Electric field metadata object for the EX channel, or None if no time series data is loaded """ if self.ts_data is not None: ex_metadata = Electric() ex_metadata.from_dict( { "channel_number": 4, "component": "ex", "measurement_azimuth": self.ex_azimuth, "measurement_tilt": 0, "sample_rate": self.sample_rate, "dipole_length": self.ex_length, "time_period.start": self.start_time.isoformat(), "time_period.end": self.end_time.isoformat(), "type": "electric", "units": "counts", "negative.id": self.s_electrode_id, "positive.id": self.n_electrode_id, } ) return ex_metadata
@property
[docs] def ex(self) -> Optional[timeseries.ChannelTS]: """ EX electric field channel time series. Returns ------- ChannelTS or None Time series data for the EX electric field component, or None if no time series data is loaded """ if self.ts_data is not None: return timeseries.ChannelTS( channel_type="electric", data=self.ts_data.ex.to_numpy(), channel_metadata=self.ex_metadata, run_metadata=self.run_metadata, station_metadata=self.station_metadata, channel_response=self.get_channel_response("ex", self.ex_length), ) return None
@property
[docs] def ey_metadata(self) -> Optional[Electric]: """ Metadata for the EY electric field channel. Returns ------- Electric or None Electric field metadata object for the EY channel, or None if no time series data is loaded """ if self.ts_data is not None: ey_metadata = Electric() ey_metadata.from_dict( { "channel_number": 5, "component": "ey", "measurement_azimuth": self.ey_azimuth, "measurement_tilt": 0, "sample_rate": self.sample_rate, "dipole_length": self.ey_length, "time_period.start": self.start_time.isoformat(), "time_period.end": self.end_time.isoformat(), "type": "electric", "units": "counts", "negative.id": self.w_electrode_id, "positive.id": self.e_electrode_id, } ) return ey_metadata
@property
[docs] def ey(self) -> Optional[timeseries.ChannelTS]: """ EY electric field channel time series. Returns ------- ChannelTS or None Time series data for the EY electric field component, or None if no time series data is loaded """ if self.ts_data is not None: return timeseries.ChannelTS( channel_type="electric", data=self.ts_data.ey.to_numpy(), channel_metadata=self.ey_metadata, run_metadata=self.run_metadata, station_metadata=self.station_metadata, channel_response=self.get_channel_response("ey", self.ey_length), ) return None
@property
[docs] def run_metadata(self) -> Optional[Run]: """ Run metadata for the NIMS data collection. Returns ------- Run or None MT run metadata including data logger information, timing, and channel metadata, or None if no time series data is loaded """ if self.ts_data is not None: run_metadata = Run() run_metadata.from_dict( { "Run": { "comments": self.comments, "data_logger.firmware.author": "B. Narod", "data_logger.firmware.name": "nims", "data_logger.firmware.version": "1.0", "data_logger.manufacturer": "Narod", "data_logger.model": self.box_id, "data_logger.id": self.box_id, "data_logger.type": "long period", "id": self.run_id, "data_type": "LPMT", "sample_rate": self.sample_rate, "time_period.end": self.end_time.isoformat(), "time_period.start": self.start_time.isoformat(), } } ) for comp in ["hx", "hy", "hz", "ex", "ey"]: run_metadata.channels.append(getattr(self, f"{comp}_metadata")) return run_metadata return None
@property
[docs] def station_metadata(self) -> Optional[Station]: """ Station metadata from NIMS file. Returns ------- Station or None MT station metadata including geographic information and location data, or None if no time series data is loaded """ if self.ts_data is not None: station_metadata = Station() if self.run_id is None: self.run_id = f"001" station_metadata.from_dict( { "Station": { "geographic_name": f"{self.site_name}, {self.state_province}, {self.country}", "location.declination.value": self.declination, "location.elevation": self.elevation, "location.latitude": self.latitude, "location.longitude": self.longitude, "id": self.run_id[0:-1], "orientation.reference_frame": "geomagnetic", } } ) station_metadata.runs.append(self.run_metadata) return station_metadata return None
[docs] def to_runts(self, calibrate: bool = False) -> Optional[timeseries.RunTS]: """ Get xarray RunTS object for the NIMS data. Parameters ---------- calibrate : bool, optional Whether to apply calibration to the data, by default False Returns ------- RunTS or None Time series run object containing all channels and metadata, or None if no time series data is loaded Notes ----- Includes all magnetic field channels (hx, hy, hz), electric field channels (ex, ey), and box temperature data. """ if self.ts_data is not None: run = timeseries.RunTS( array_list=[ self.hx, self.hy, self.hz, self.ex, self.ey, self.box_temperature, ], run_metadata=self.run_metadata, station_metadata=self.station_metadata, ) if calibrate: return run.calibrate() else: return run return None
def _make_index_values(self) -> np.ndarray: """ Create index values for the recorded channels. Returns ------- ndarray Array of shape (8, 5) containing byte indices for extracting magnetic (first 3 columns) and electric (last 2 columns) channel data from each 8-sample block Notes ----- Creates an array of index values for magnetics and electrics. Each row corresponds to one sample within an 8-sample block. Columns 0-2 are for magnetic channels (hx, hy, hz). Columns 3-4 are for electric channels (ex, ey). """ ### make an array of index values for magnetics and electrics indices = np.zeros((8, 5), dtype=int) for kk in range(8): ### magnetic blocks for ii in range(3): indices[kk, ii] = 9 + (kk) * 9 + (ii) * 3 ### electric blocks for ii in range(2): indices[kk, 3 + ii] = 82 + (kk) * 6 + (ii) * 3 return indices def _get_gps_string_list( self, nims_string: bytes ) -> tuple[list[float], list[bytes]]: """ Extract GPS strings from raw NIMS binary data. This method takes the 3rd byte in each block, concatenates into a long string and creates a list by splitting by '$'. The index values of where the '$' characters are found are also calculated. Parameters ---------- nims_string : bytes Raw binary string output by NIMS Returns ------- list of float List of index values associated with the location of the '$' characters list of bytes List of possible raw GPS strings split by '$' Notes ----- This assumes that there are an even amount of data blocks. This might be a bad assumption in some cases. """ ### get index values of $ and gps_strings index_values = [] gps_str_list = [] for ii in range(int(len(nims_string) / self.block_size)): index = ii * self.block_size + 3 g_char = struct.unpack("c", nims_string[index : index + 1])[0] if g_char == b"$": index_values.append((index - 3) / self.block_size) gps_str_list.append(g_char) gps_raw_stamp_list = b"".join(gps_str_list).split(b"$") return index_values, gps_raw_stamp_list
[docs] def get_stamps(self, nims_string: bytes) -> list[tuple[Any, list[GPS]]]: """ Extract and parse valid GPS strings, matching GPRMC with GPGGA stamps. Parameters ---------- nims_string : bytes Raw GPS binary string output by NIMS Returns ------- list of tuple List of matched GPS stamps where each element is a tuple containing index and list of GPS objects [GPRMC, GPGGA] (or just [GPRMC]) Notes ----- Skips the first entry as it tends to be incomplete. Attempts to match synchronous GPRMC with GPGGA stamps when possible. """ ### read in GPS strings into a list to be parsed later index_list, gps_raw_stamp_list = self._get_gps_string_list(nims_string) gprmc_list = [] gpgga_list = [] ### note we are skipping the first entry, it tends to be not ### complete anyway for ii, index, raw_stamp in zip( range(len(index_list)), index_list, gps_raw_stamp_list[1:] ): gps_obj = GPS(raw_stamp, index) if gps_obj.valid: if gps_obj.gps_type == "GPRMC": gprmc_list.append(gps_obj) elif gps_obj.gps_type == "GPGGA": gpgga_list.append(gps_obj) else: self.logger.debug(f"GPS Error: file index {index}, stamp number {ii}") max_len = min([len(raw_stamp), 15]) self.logger.debug(f"GPS Raw Stamp: {raw_stamp[0:max_len]}") return self._gps_match_gprmc_gpgga_strings(gprmc_list, gpgga_list)
def _gps_match_gprmc_gpgga_strings(self, gprmc_list, gpgga_list): """ match GPRMC and GPGGA strings together into a list [[GPRMC, GPGGA], ...] :param list gprmc_list: list of GPS objects for the GPRMC stamps :param list gpgga_list: list of GPS objects for the GPGGA stamps :returns: list of matched GPRMC and GPGGA stamps """ ### match up the GPRMC and GPGGA together gps_match_list = [] for gprmc in gprmc_list: find = False for ii, gpgga in enumerate(gpgga_list): if gprmc.time_stamp.time() == gpgga.time_stamp.time(): gps_match_list.append([gprmc, gpgga]) find = True del gpgga_list[ii] break if not find: gps_match_list.append([gprmc]) return gps_match_list def _get_gps_stamp_indices_from_status(self, status_array): """ get the index location of the stamps from the status array assuming that 0 indicates GPS lock. :param :class:`np.ndarray` status_array: an array of status values from data blocks :returns: array of index values where GPS lock was acquired ignoring sequential locks. """ index_values = np.where(status_array == 0)[0] status_index = np.zeros_like(index_values) for ii in range(index_values.size): if index_values[ii] - index_values[ii - 1] == 1: continue else: status_index[ii] = index_values[ii] status_index = status_index[np.nonzero(status_index)] return status_index
[docs] def match_status_with_gps_stamps(self, status_array, gps_list): """ Match the index values from the status array with the index values of the GPS stamps. There appears to be a bit of wiggle room between when the lock is recorded and the stamp was actually recorded. This is typically 1 second and sometimes 2. :param array status_array: array of status values from each data block :param list gps_list: list of valid GPS stamps [[GPRMC, GPGGA], ...] .. note:: I think there is a 2 second gap between the lock and the first stamp character. """ stamp_indices = self._get_gps_stamp_indices_from_status(status_array) gps_stamps = [] for index in stamp_indices: stamp_find = False for ii, stamps in enumerate(gps_list): index_diff = stamps[0].index - index ### check the index value, should be 2 or 74, if it is off by ### a value left or right apply a correction. if index_diff == 1 or index_diff == 73: index += 1 stamps[0].index += 1 elif index_diff == 2 or index_diff == 74: index = index elif index_diff == 3 or index_diff == 75: index -= 1 stamps[0].index -= 1 elif index_diff == 4 or index_diff == 76: index -= 2 stamps[0].index -= 2 if stamps[0].gps_type in ["GPRMC", "gprmc"]: if index_diff in [1, 2, 3, 4]: gps_stamps.append((index, stamps)) stamp_find = True del gps_list[ii] break elif stamps[0].gps_type in ["GPGGA", "gpgga"]: if index_diff in [73, 74, 75, 76]: gps_stamps.append((index, stamps)) stamp_find = True del gps_list[ii] break if not stamp_find: self.logger.debug(f"GPS Error: No good GPS stamp at {index} seconds") return gps_stamps
[docs] def find_sequence( self, data_array: np.ndarray, block_sequence: Optional[list[int]] = None ) -> np.ndarray: """ Find a sequence pattern in the data array. Parameters ---------- data_array : ndarray Array of the data with shape [n, m] where n is the number of seconds recorded and m is the block length for a given sampling rate block_sequence : list of int, optional Sequence pattern to locate, by default [1, 131] (start of data block) Returns ------- ndarray Array of index locations where the sequence is found Notes ----- Uses numpy rolling and comparison to find all occurrences of the specified sequence pattern in the data array. """ if block_sequence is not None: self.block_sequence = block_sequence # want to find the index there the test data is equal to the test sequence t = np.vstack( [ np.roll(data_array, shift) for shift in -np.arange(len(self.block_sequence)) ] ).T return np.where(np.all(t == self.block_sequence, axis=1))[0]
[docs] def unwrap_sequence(self, sequence: np.ndarray) -> np.ndarray: """ Unwrap sequence to sequential numbers instead of modulo 256. Parameters ---------- sequence : ndarray Sequence of byte numbers (0-255) to unwrap Returns ------- ndarray Unwrapped sequence with first number set to 0 and subsequent values forming a continuous count Notes ----- Handles the fact that sequence numbers are stored as single bytes (0-255) but represent a continuous count. When a value of 255 is encountered, the next rollover is anticipated. """ count = 0 unwrapped = np.zeros_like(sequence) for ii, seq in enumerate(sequence): unwrapped[ii] = seq + count * 256 if seq == 255: count += 1 unwrapped -= unwrapped[0] return unwrapped
def _locate_duplicate_blocks(self, sequence): """ locate the sequence number where the duplicates exist :param list sequence: sequence to match duplicate numbers. :returns: list of duplicate index values. """ duplicates = np.where(np.abs(np.diff(sequence)) == 0)[0] if len(duplicates) == 0: return None duplicate_list = [] for dup in duplicates: dup_dict = {} dup_dict["sequence_index"] = dup dup_dict["ts_index_0"] = dup * self.sample_rate dup_dict["ts_index_1"] = dup * self.sample_rate + self.sample_rate dup_dict["ts_index_2"] = (dup + 1) * self.sample_rate dup_dict["ts_index_3"] = (dup + 1) * self.sample_rate + self.sample_rate duplicate_list.append(dup_dict) return duplicate_list def _check_duplicate_blocks(self, block_01, block_02, info_01, info_02): """ make sure the blocks are truly duplicates :param np.array block_01: block of data to compare :param np.array block_02: block of data to compare :param np.array info_01: information array from info_array[sequence_index] :param np.array info_02: information array from info_array[sequence_index] :returns: boolean if the blocks and information match """ if np.array_equal(block_01, block_02): if np.array_equal(info_01, info_02): return True else: return False else: return False
[docs] def remove_duplicates(self, info_array, data_array): """ remove duplicate blocks, removing the first duplicate as suggested by Paul and Anna. Checks to make sure that the mag data are identical for the duplicate blocks. Removes the blocks from the information and data arrays and returns the reduced arrays. This should sync up the timing of GPS stamps and index values. :param np.array info_array: structured array of block information :param np.array data_array: structured array of the data :returns: reduced information array :returns: reduced data array :returns: index of duplicates in raw data """ ### locate duplicate_test_list = self._locate_duplicate_blocks(self.info_array["sequence"]) if duplicate_test_list is None: return info_array, data_array, None duplicate_list = [] for d in duplicate_test_list: if self._check_duplicate_blocks( data_array[d["ts_index_0"] : d["ts_index_1"]], data_array[d["ts_index_2"] : d["ts_index_3"]], info_array[d["sequence_index"]], info_array[d["sequence_index"] + 1], ): duplicate_list.append(d) if len(duplicate_list) == 0: self.logger.info(f"Duplicate block count is zero") return info_array, data_array, None self.logger.debug(f"Deleting {len(duplicate_list)} duplicate blocks") ### get the index of the blocks to be removed, namely the 1st duplicate ### block remove_sequence_index = [d["sequence_index"] for d in duplicate_list] remove_data_index = np.concatenate( [np.arange(d["ts_index_0"], d["ts_index_1"]) for d in duplicate_list] ).astype(int) ### remove the data return_info_array = np.delete(info_array, remove_sequence_index) return_data_array = np.delete(data_array, remove_data_index) ### set sequence to be monotonic return_info_array["sequence"][:] = np.arange(return_info_array.shape[0]) return return_info_array, return_data_array, duplicate_list
[docs] def read_nims(self, fn: Optional[Union[str, Path]] = None) -> None: """ Read NIMS DATA.BIN file and parse all data. This method performs the complete data reading and processing workflow: 1. Read header information and store as attributes 2. Locate data block beginning by finding first [1, 131, ...] sequence 3. Ensure data is multiple of block length, trim excess bits 4. Extract GPS data (3rd byte of each block) and parse GPS stamps 5. Read data as unsigned 8-bit integers, reshape to [N, block_length] 6. Remove duplicate blocks (first of each duplicate pair) 7. Match GPS status locks with valid GPS stamps 8. Verify timing between first/last GPS stamps, trim excess seconds Parameters ---------- fn : str or Path, optional Path to NIMS DATA.BIN file. Uses self.fn if not provided. Notes ----- The data and information arrays returned have duplicates removed and sequence reset to be monotonic. Extra seconds due to timing gaps are trimmed from the end of the time series. Examples -------- >>> from mth5.io import nims >>> n = nims.NIMS(r"/home/mt_data/nims/mt001.bin") >>> n.read_nims() """ if fn is not None: self.fn = fn st = datetime.datetime.now() ### read in header information and get the location of end of header self.read_header(self.fn) ### load in the entire file, its not too big, start from the ### end of the header information. with open(self.fn, "rb") as fid: fid.seek(self.data_start_seek) self._raw_string = fid.read() ### read in full string as unsigned integers data = np.frombuffer(self._raw_string, dtype=np.uint8) ### need to make sure that the data starts with a full block find_first = self.find_sequence(data[0 : self.block_size * 10])[0] data = data[find_first:] ### get GPS stamps from the binary string first self.gps_list = self.get_stamps(self._raw_string[find_first:]) ### check the size of the data, should have an equal amount of blocks if (data.size % self.block_size) != 0: self.logger.warning( f"odd number of bytes {data.size}, not even blocks " f"cutting down the data by {data.size % self.block_size} bits" ) end_data = data.size - (data.size % self.block_size) data = data[0:end_data] # resized the data into an even amount of blocks data = data.reshape((int(data.size / self.block_size), self.block_size)) ### need to parse the data ### first get the status information self.info_array = np.zeros( data.shape[0], dtype=[ ("soh", int), ("block_len", int), ("status", int), ("gps", int), ("sequence", int), ("box_temp", float), ("head_temp", float), ("logic", int), ("end", int), ], ) for key, index in self._block_dict.items(): if "temp" in key: # compute temperature - cast to int32 to avoid uint8 overflow t_value = data[:, index[0]].astype(np.int32) * 256 + data[ :, index[1] ].astype(np.int32) # something to do with the bits where you have to subtract t_value[np.where(t_value > 32768)] -= 65536 value = (t_value - self.t_offset) / self.t_conversion_factor else: value = data[:, index] self.info_array[key][:] = value ### unwrap sequence self.info_array["sequence"] = self.unwrap_sequence(self.info_array["sequence"]) ### get data data_array = np.zeros( data.shape[0] * self.sample_rate, dtype=[ ("hx", float), ("hy", float), ("hz", float), ("ex", float), ("ey", float), ], ) ### fill the data for cc, comp in enumerate(["hx", "hy", "hz", "ex", "ey"]): channel_arr = np.zeros((data.shape[0], 8), dtype=float) for kk in range(self.sample_rate): index = self.indices[kk, cc] # Cast to int32 to avoid uint8 overflow value = ( data[:, index].astype(np.int32) * 256 + data[:, index + 1].astype(np.int32) ) * np.array([256]) + data[:, index + 2].astype(np.int32) value[np.where(value > self._int_max)] -= self._int_factor channel_arr[:, kk] = value data_array[comp][:] = channel_arr.flatten() ### clean things up ### I guess that the E channels are opposite phase? for comp in ["ex", "ey"]: data_array[comp] *= -1 ### remove duplicates ( self.info_array, data_array, self.duplicate_list, ) = self.remove_duplicates(self.info_array, data_array) ### get GPS stamps with index values self.stamps = self.match_status_with_gps_stamps( self.info_array["status"], self.gps_list ) ### align data checking for timing gaps self.ts_data = self.align_data(data_array, self.stamps) et = datetime.datetime.now() read_time = (et - st).total_seconds() self.logger.debug(f"Reading took {read_time:.2f} seconds")
def _get_first_gps_stamp(self, stamps): """ get the first GPRMC stamp """ for stamp in stamps: if stamp[1][0].gps_type in ["gprmc", "GPRMC"]: return stamp return None def _get_last_gps_stamp(self, stamps): """ get the last gprmc stamp """ for stamp in stamps[::-1]: if stamp[1][0].gps_type in ["gprmc", "GPRMC"]: return stamp return None def _locate_timing_gaps(self, stamps): """ locate timing gaps in the data by comparing the stamp index with the GPS time stamp. The number of points and seconds should be the same :param list stamps: list of GPS stamps [[status_index, [GPRMC, GPGGA]]] :returns: list of gap index values """ stamp_01 = self._get_first_gps_stamp(stamps)[1][0] # current_gap = 0 current_stamp = stamp_01 gap_beginning = [] total_gap = 0 for ii, stamp in enumerate(stamps[1:], 1): stamp = stamp[1][0] # can only compare those with a date and time. if stamp.gps_type == "GPGGA": continue time_diff = (stamp.time_stamp - current_stamp.time_stamp).total_seconds() index_diff = stamp.index - current_stamp.index time_gap = index_diff - time_diff if time_gap == 0: continue elif time_gap > 0: total_gap += time_gap current_stamp = stamp gap_beginning.append(stamp.index) self.logger.debug( f"GPS tamp at {stamp.time_stamp.isoformat()} is off " f"from previous time by { time_gap} seconds" ) self.logger.warning(f"Timing is off by {total_gap} seconds") return gap_beginning
[docs] def check_timing(self, stamps): """ make sure that there are the correct number of seconds in between the first and last GPS GPRMC stamps :param list stamps: list of GPS stamps [[status_index, [GPRMC, GPGGA]]] :returns: [ True | False ] if data is valid or not. :returns: gap index locations .. note:: currently it is assumed that if a data gap occurs the data can be squeezed to remove them. Probably a more elegant way of doing it. """ gaps = None first_stamp = self._get_first_gps_stamp(stamps)[1][0] last_stamp = self._get_last_gps_stamp(stamps)[1][0] time_diff = last_stamp.time_stamp - first_stamp.time_stamp index_diff = last_stamp.index - first_stamp.index difference = index_diff - time_diff.total_seconds() if difference != 0: gaps = self._locate_timing_gaps(stamps) return False, gaps, difference return True, gaps, difference
[docs] def align_data(self, data_array, stamps): """ Need to match up the first good GPS stamp with the data Do this by using the first GPS stamp and assuming that the time from the first time stamp to the start is the index value. put the data into a pandas data frame that is indexed by time :param array data_array: structure array with columns for each component [hx, hy, hz, ex, ey] :param list stamps: list of GPS stamps [[status_index, [GPRMC, GPGGA]]] :returns: pandas DataFrame with colums of components and indexed by time initialized by the start time. .. note:: Data gaps are squeezed cause not sure what a gap actually means. """ ### check timing first to make sure there is no drift timing_valid, self.gaps, time_difference = self.check_timing(stamps) ### first GPS stamp within the data is at a given index that is ### assumed to be the number of seconds from the start of the run. ### therefore make the start time the first GPS stamp time minus ### the index value for that stamp. ### need to be sure that the first GPS stamp has a date, need GPRMC first_stamp = self._get_first_gps_stamp(stamps) first_index = first_stamp[0] start_time = first_stamp[1][0].time_stamp - datetime.timedelta( seconds=int(first_index) ) dt_index = self.make_dt_index( start_time.isoformat(), self.sample_rate, n_samples=data_array.shape[0], ) return pd.DataFrame(data_array, index=dt_index)
[docs] def make_dt_index( self, start_time: str, sample_rate: float, stop_time: Optional[str] = None, n_samples: Optional[int] = None, ) -> pd.DatetimeIndex: """ Create datetime index array for time series data. Parameters ---------- start_time : str Start time in format YYYY-MM-DDThh:mm:ss.ms UTC sample_rate : float Sample rate in samples/second stop_time : str, optional End time in same format as start_time n_samples : int, optional Number of samples to generate Returns ------- DatetimeIndex Pandas datetime index with UTC timezone Notes ----- Either stop_time or n_samples must be provided. The datetime format should be YYYY-MM-DDThh:mm:ss.ms UTC. Raises ------ ValueError If neither stop_time nor n_samples is provided """ # set the index to be UTC time dt_freq = "{0:.0f}ns".format(1.0 / (sample_rate) * 1e9) if stop_time is not None: dt_index = pd.date_range( start=start_time, end=stop_time, freq=dt_freq, closed="left", tz="UTC", ) elif n_samples is not None: dt_index = pd.date_range( start=start_time, periods=n_samples, freq=dt_freq, tz="UTC" ) else: raise ValueError("Need to input either stop_time or n_samples") return dt_index
# ============================================================================= # convenience read # =============================================================================
[docs] def read_nims(fn: Union[str, Path]) -> Optional[timeseries.RunTS]: """ Convenience function to read a NIMS DATA.BIN file. Parameters ---------- fn : str or Path Path to the NIMS DATA.BIN file Returns ------- RunTS or None Time series run object containing all channels and metadata, or None if reading fails Examples -------- >>> from mth5.io.nims import nims >>> run_ts = nims.read_nims("/path/to/data.bin") """ nims_obj = NIMS(fn) nims_obj.read_nims() return nims_obj.to_runts()