Source code for mth5.io.phoenix.readers.native.native_reader

# -*- coding: utf-8 -*-
"""
Module to read and parse native Phoenix Geophysics data formats of the MTU-5C
Family.

This module implements Streamed readers for segmented-decimated time series 
formats of the MTU-5C family.

:author: Jorge Torres-Solis

Revised 2022 by J. Peacock 

"""

# =============================================================================
# Imports
# =============================================================================

import numpy as np
from numpy.lib.stride_tricks import as_strided

from struct import unpack_from, unpack
from mth5.io.phoenix.readers import TSReaderBase
from mth5.timeseries import ChannelTS

AD_IN_AD_UNITS = 0
AD_INPUT_VOLTS = 1
INSTRUMENT_INPUT_VOLTS = 2

# =============================================================================


[docs]class NativeReader(TSReaderBase): """ Native sampling rate 'Raw' time series reader class, these are the .bin files. They are formatted with a header of 128 bytes then frames of 64. Each frame is 20 x 3 byte (24-bit) data point then a 4 byte footer. """ def __init__( self, path, num_files=1, scale_to=AD_INPUT_VOLTS, header_length=128, last_frame=0, ad_plus_minus_range=5.0, channel_type="E", report_hw_sat=False, **kwargs, ): # Init the base class super().__init__( path, num_files, header_length, report_hw_sat, **kwargs ) self._chunk_size = 4096 # Track the last frame seen by the streamer, to report missing frames self.last_frame = last_frame self.header_length = header_length self.data_scaling = scale_to self.ad_plus_minus_range = ad_plus_minus_range if header_length == 128: self.unpack_header(self.stream) # Now that we have the channel circuit-based gain (either form init # or from the header) # We can calculate the voltage range at the input of the board. self.input_plusminus_range = self._calculate_input_plusminus_range() self.scale_factor = self._calculate_data_scaling() # optimization variables self.footer_idx_samp_mask = int("0x0fffffff", 16) self.footer_sat_mask = int("0x70000000", 16) def _calculate_input_plusminus_range(self): """ set the correct input plusminus range from metadata """ return self.ad_plus_minus_range / self.total_circuitry_gain def _calculate_data_scaling(self): """ Get the correct data scaling for the AD converter """ if self.data_scaling == AD_IN_AD_UNITS: return 256 elif self.data_scaling == AD_INPUT_VOLTS: return self.ad_plus_minus_range / (2**31) elif self.data_scaling == INSTRUMENT_INPUT_VOLTS: return self.input_plusminus_range / (2**31) else: raise LookupError("Invalid scaling requested")
[docs] def read_frames(self, num_frames): """ Read the given amount of frames from the data. .. note:: that seek is not reset so if you iterate this the stream reads from the last tell. :param num_frames: Number of frames to read :type num_frames: integer :return: Scaled data from the given number of frames :rtype: np.ndarray(dtype=float) """ frames_in_buf = 0 _idx_buf = 0 _data_buf = np.empty([num_frames * 20]) # 20 samples packed in a frame while frames_in_buf < num_frames: dataFrame = self.stream.read(64) if not dataFrame: if not self.open_next(): return np.empty([0]) dataFrame = self.stream.read(64) if not dataFrame: return np.empty([0]) dataFooter = unpack_from("I", dataFrame, 60) # Check that there are no skipped frames frameCount = dataFooter[0] & self.footer_idx_samp_mask difCount = frameCount - self.last_frame if difCount != 1: self.logger.warning( "Ch [%s] Missing frames at %d [%d]\n" % (self.channel_id, frameCount, difCount) ) self.last_frame = frameCount for ptrSamp in range(0, 60, 3): # unpack expects 4 bytes, but the frames are only 3? value = unpack( ">i", dataFrame[ptrSamp : ptrSamp + 3] + b"\x00" )[0] _data_buf[_idx_buf] = value * self.scale_factor _idx_buf += 1 frames_in_buf += 1 if self.report_hw_sat: satCount = (dataFooter[0] & self.footer_sat_mask) >> 24 if satCount: self.logger.warning( "Ch [%s] Frame %d has %d saturations" % (self.ch_id, frameCount, satCount) ) return _data_buf
@property def npts_per_frame(self): return int((self.frame_size_bytes - 4) / 3)
[docs] def read(self): """ Read the full data file. .. note:: This uses :class:`numpy.lib.stride_tricks.as_strided` which can be unstable if the bytes are not the correct length. See notes by numpy. Got this solution from: https://stackoverflow.com/questions/12080279/how-do-i-create-a-numpy-dtype-that-includes-24-bit-integers?msclkid=3398046ecd6511ec9a37394f28c5aaba :return: scaled data and footer :rtype: tuple (data, footer) """ # should do a memory map otherwise things can go badly with as_strided raw_data = np.memmap(self.base_path, ">i1", mode="r") raw_data = raw_data[self.header_length :] # trim of any extra bytes extra_bytes = raw_data.size % 64 if extra_bytes != 0: self.warning(f"found {extra_bytes} extra bits in file.") useable_bytes = raw_data.size - extra_bytes raw_data = raw_data[:useable_bytes] # This should now be the exact number of frames after trimming off # extra bytes. n_frames = int(raw_data.size / self.frame_size_bytes) # reshape to (nframes, 64) raw_data = raw_data.reshape((n_frames, self.frame_size_bytes)) # split the data from the footer ts = raw_data[:, 0 : self.npts_per_frame * 3].flatten() footer = raw_data[:, self.npts_per_frame * 3 :].flatten() # get the number of raw byte frames raw_frames = int(ts.size / 12) # stride over bytes making new 4 bytes for a 32bit integer and scale ts_data = ( as_strided( ts.view(np.int32), strides=(12, 3), shape=(raw_frames, 4), ) .flatten() .byteswap() * self.scale_factor ) # somehow the number is off by just a bit ~1E-7 V # view the footer as an int32 footer = footer.view(np.int32) return ts_data, footer
[docs] def read_sequence(self, start=0, end=None): """ Read sequence of files into a single array :param start: sequence start, defaults to 0 :type start: integer, optional :param end: sequence end, defaults to None :type end: integer, optional :return: scaled data :rtype: np.ndarray(dtype=float32) :return: footer :rtype: np.ndarray(dtype=int32) """ data = np.array([]) footer = np.array([]) for fn in self.sequence_list[slice(start, end)]: self._open_file(fn) self.unpack_header(self.stream) ts, foot = self.read() data = np.append(data, ts) footer = np.append(footer, foot) return data, footer
[docs] def skip_frames(self, num_frames): """ Skip frames of the stream :param num_frames: number of frames to skip :type num_frames: integer :return: end of file :rtype: boolean """ bytes_to_skip = int(num_frames * 64) # Python is dumb for seek and tell, it cannot tell us if a seek goes # past EOF so instead we need to do inefficient reads to skip bytes while bytes_to_skip > 0: foo = self.stream.read(bytes_to_skip) local_read_size = len(foo) # If we ran out of data in this file before finishing the skip, # open the next file and return false if there is no next file # to indicate that the skip ran out of # data before completion if local_read_size == 0: more_data = self.open_next() if not more_data: return False else: bytes_to_skip -= local_read_size # If we reached here we managed to skip all the data requested # return true self.last_frame += num_frames return True
[docs] def to_channel_ts(self): """ convert to a ChannelTS object :return: DESCRIPTION :rtype: TYPE """ data, footer = self.read() ch_metadata = self.channel_metadata() return ChannelTS( channel_type=ch_metadata.type, data=data, channel_metadata=ch_metadata, run_metadata=self.run_metadata(), station_metadata=self.station_metadata(), )