# -*- coding: utf-8 -*-
"""
Module to read and parse native Phoenix Geophysics data formats of the
MTU-5C Family
This module implements Streamed readers for segmented-decimated time series
formats of the MTU-5C family.
:author: Jorge Torres-Solis
Revised 2022 by J. Peacock
"""
# =============================================================================
# Imports
# =============================================================================
import numpy as np
from struct import unpack_from
from mt_metadata.utils.mttime import MTime
from mth5.io.phoenix.readers import TSReaderBase
from mth5.timeseries import ChannelTS
# =============================================================================
[docs]class Segment(SubHeader):
"""
A segment class to hold a single segment
"""
def __init__(self, stream, **kwargs):
super().__init__(**kwargs)
self.stream = stream
self.data = None
[docs] def read_segment(self, metadata_only=False):
"""
Read the whole file in
:return: DESCRIPTION
:rtype: TYPE
"""
self.unpack_header(self.stream)
if not metadata_only:
self.data = np.fromfile(
self.stream, dtype=np.float32, count=self.n_samples
)
@property
def segment_start_time(self):
"""
estimate the segment start time based on sequence number
:return: DESCRIPTION
:rtype: TYPE
"""
return self.gps_time_stamp
@property
def segment_end_time(self):
"""
estimate end time
:return: DESCRIPTION
:rtype: TYPE
"""
return self.segment_start_time + ((self.n_samples) / self.sample_rate)
[docs]class DecimatedSegmentedReader(TSReaderBase):
"""
Class to create a streamer for segmented decimated time series,
i.e. 'td_24k'
These files have a sub header
"""
def __init__(self, path, num_files=1, report_hw_sat=False, **kwargs):
# Init the base class
super().__init__(
path,
num_files=num_files,
header_length=128,
report_hw_sat=report_hw_sat,
**kwargs,
)
self.unpack_header(self.stream)
self.sub_header = SubHeader()
self.subheader = {}
[docs] def read_segment(self, metadata_only=False):
"""
Read in a single segment
:param metadata_only: DESCRIPTION, defaults to False
:type metadata_only: TYPE, optional
:return: DESCRIPTION
:rtype: TYPE
"""
kwargs = {
"instrument_type": self.instrument_type,
"instrument_serial_number": self.instrument_serial_number,
"latitude": self.gps_lat,
"longitude": self.gps_long,
"elevation": self.gps_elevation,
"sample_rate": self.sample_rate,
"channel_id": self.channel_id,
"channel_type": self.channel_type,
"segment": 0,
}
segment = Segment(self.stream, **kwargs)
segment.read_segment(metadata_only=metadata_only)
return segment
[docs] def to_channel_ts(self):
"""
convert to a ChannelTS object
:return: DESCRIPTION
:rtype: TYPE
"""
segment = self.read_segment()
ch_metadata = self.channel_metadata()
ch_metadata.time_period.start = segment.segment_start_time.isoformat()
return ChannelTS(
channel_type=ch_metadata.type,
data=segment.data,
channel_metadata=ch_metadata,
run_metadata=self.run_metadata(),
station_metadata=self.station_metadata(),
)
[docs]class DecimatedSegmentCollection(TSReaderBase):
"""
Class to create a streamer for segmented decimated time series,
i.e. 'td_24k'
These files have a sub header
"""
def __init__(self, path, num_files=1, report_hw_sat=False, **kwargs):
# Init the base class
super().__init__(
path,
num_files=num_files,
header_length=128,
report_hw_sat=report_hw_sat,
**kwargs,
)
self.unpack_header(self.stream)
self.sub_header = SubHeader()
self.subheader = {}
[docs] def read_segments(self, metadata_only=False):
"""
Read the whole file in
:return: DESCRIPTION
:rtype: TYPE
"""
kwargs = {
"instrument_type": self.instrument_type,
"instrument_serial_number": self.instrument_serial_number,
"latitude": self.gps_lat,
"longitude": self.gps_long,
"elevation": self.gps_elevation,
"sample_rate": self.sample_rate,
"channel_id": self.channel_id,
"channel_type": self.channel_type,
"segment": 0,
}
segments = []
count = 1
while True:
try:
kwargs["segment"] = count
segment = Segment(self.stream, **kwargs)
segment.read_segment(metadata_only=metadata_only)
segments.append(segment)
count += 1
except:
break
self.logger.info(f"Read {count - 1} segments")
return segments
[docs] def to_channel_ts(self):
"""
convert to a ChannelTS object
:return: DESCRIPTION
:rtype: TYPE
"""
seq_list = []
for seq in self.read_segments():
ch_metadata = self.channel_metadata()
ch_metadata.time_period.start = seq.gps_time_stamp.isoformat()
seq_list.append(
ChannelTS(
channel_type=ch_metadata.type,
data=seq.data,
channel_metadata=ch_metadata,
run_metadata=self.run_metadata(),
station_metadata=self.station_metadata(),
)
)
return seq_list