Source code for mth5.io.phoenix.readers.mtu.mtu_ts

"""
=======================================================================
original comments from MATLAB script:

read_tsn - reads a (binary) TS file of the legacy Phoenix MTU-5A instrument
(TS2, TS3, TS4, TS5) and the even older V5-2000 system (TSL, TSH), and
output the "ts" array and "tag" metadata dictionary.

=======================================================================
Parameters:
    fpath: path to the TS file
    fname: name of the TS file (including extensions)

Returns:
    ts:    output array of the TS data
    tag:   output dict of the TSn metadata

=======================================================================
definition of the TS tag (or what I guessed after reading the user manual
and fiddling with their files)
0-7   UTC time of first scan in the record.
8-9   instrument serial number (16-bit integer)
10-11 number of scans in the record (16-bit integer)
12    number of channels per scan
13    tag length (TSn) or tag length code (TSH, TSL)
14    status code
15    bit-wise saturation flags (please note that the older TSH/L tag
      ends here )
16    reserved for future indication of different tag and/or sample
      formats
17    sample length in bytes
18-19 sample rate (in units defined by byte 20)
20    units of sample rate
21    clock status
22-25 clock error in seconds
26-32 reserved; must be 0

=======================================================================
notes on the TS format of TSn files:
The binary TS file consists of several data blocks, each contains a data
tag and a number of records in it.
Each time record consists of three bytes (24 bit), let's name them byte1,
byte2, and byte3:
the ts record (int) should be (+/-) (byte3*65536 + byte2*256 + byte1)

Hao
2012.07.04
Beijing
=======================================================================
"""

from __future__ import annotations

from pathlib import Path
from typing import BinaryIO

import numpy as np
from loguru import logger
from mt_metadata.common import MTime

from mth5.timeseries import ChannelTS, RunTS

from .mtu_table import MTUTable


[docs] class MTUTSN: """ Reader for legacy Phoenix MTU-5A instrument time series binary files. Reads time series data from Phoenix MTU-5A (.TS2, .TS3, .TS4, .TS5) and V5-2000 system (.TSL, .TSH) binary files. The data consists of 24-bit signed integers organized in data blocks with headers. Parameters ---------- file_path : str or Path or None, optional Path to the TSN file to read. If None, the reader is created without loading data. Default is None. Attributes ---------- file_path : Path or None Path to the currently loaded TSN file. ts : ndarray or None Time series data array with shape (n_channels, n_samples). tag : dict Metadata dictionary containing file information. Examples -------- Read a TS3 file: >>> from pathlib import Path >>> reader = MTUTSN('data/1690C16C.TS3') >>> print(reader.ts.shape) (3, 86400) >>> print(reader.tag['sample_rate']) 24 Create reader without loading data: >>> reader = MTUTSN() >>> reader.read('data/1690C16C.TS3') Access metadata: >>> reader = MTUTSN('data/1690C16C.TS4') >>> reader.read() >>> print(f"Channels: {reader.tag['n_ch']}") Channels: 4 >>> print(f"Blocks: {reader.tag['n_block']}") Blocks: 48 """ def __init__(self, file_path: str | Path | None = None, **kwargs) -> None: self._p16 = 2**16 self._p8 = 2**8 self._accepted_extensions = ["TS2", "TS3", "TS4", "TS5", "TSL", "TSH"] self._file_path = None self.file_path = file_path
[docs] self.ts = None
[docs] self.ts_metadata = None
for key, value in kwargs.items(): setattr(self, key, value) @property
[docs] def file_path(self) -> Path | None: """Get the TSN file path.""" return self._file_path
@file_path.setter def file_path(self, value: str | Path | None) -> None: """ Set the TSN file path with validation. Parameters ---------- value : str or Path or None Path to the TSN file. Must exist and have a valid extension (.TS2, .TS3, .TS4, .TS5, .TSL, .TSH). Raises ------ FileNotFoundError If the specified file does not exist. ValueError If the file extension is not recognized. Examples -------- >>> reader = MTUTSN() >>> reader.file_path = 'data/1690C16C.TS3' >>> print(reader.file_path.name) 1690C16C.TS3 """ if value is not None: self._file_path = Path(value) if not self._file_path.exists(): msg = f"File path {self._file_path} does not exist." logger.error(msg) raise FileNotFoundError(msg) if ( self._file_path.suffix.upper().lstrip(".") not in self._accepted_extensions ): msg = ( f"File extension {self._file_path.suffix} is not a recognized " f"TSN format. Accepted extensions: {self._accepted_extensions}" ) logger.error(msg) raise ValueError(msg) else: self._file_path = None
[docs] def get_sign24(self, x: np.ndarray | list | int) -> np.ndarray: """ Convert unsigned 24-bit integers to signed integers. Converts unsigned 24-bit values (0 to 16777215) to their signed equivalents (-8388608 to 8388607) by applying two's complement. Parameters ---------- x : ndarray or list or int Unsigned 24-bit integer value(s) to convert. Returns ------- ndarray Signed 24-bit integer value(s) as int32 array. Examples -------- Convert a single positive value: >>> reader = MTUTSN() >>> reader.get_sign24(100) array([100], dtype=int32) Convert a single negative value (unsigned representation): >>> reader.get_sign24(16777215) # -1 in 24-bit signed array([-1], dtype=int32) Convert an array: >>> values = np.array([0, 8388607, 8388608, 16777215]) >>> reader.get_sign24(values) array([ 0, 8388607, -8388608, -1], dtype=int32) """ x = np.array(x, dtype=np.int32) x[x > 2**23 - 1] = x[x > 2**23 - 1] - 2**24 return x
def _read_header( self, ts_fid: BinaryIO ) -> tuple[MTime, int, int, int, int, str, int]: """ Read and parse the 32-byte header from a TSN file. Extracts timestamp, instrument serial number, number of scans, channel count, tag length, instrument type, and sample rate from the binary file header. Parameters ---------- ts_fid : BinaryIO Open binary file handle positioned at the start of the header. Returns ------- start_time : MTime UTC timestamp of the first scan in the file. box_num : int Instrument serial number (16-bit integer). n_scan : int Number of scans per data block (16-bit integer). n_ch : int Number of channels per scan (3, 4, 5, or 6). tag_length : int Length of the tag in bytes (32 for MTU-5, 16 for V5-2000). ts_type : str Instrument type: 'MTU-5' or 'V5-2000'. sample_rate : int Sampling frequency in Hz (0 for V5-2000 files). Examples -------- >>> with open('data/1690C16C.TS3', 'rb') as f: ... reader = MTUTSN() ... start, box, scan, ch, tag_len, ts_type, sr = reader._read_header(f) ... print(f"Type: {ts_type}, Rate: {sr} Hz, Channels: {ch}") Type: MTU-5, Rate: 24 Hz, Channels: 3 """ # Starting time s = ts_fid.read(1)[0] # Starting second m = ts_fid.read(1)[0] # Starting minute h = ts_fid.read(1)[0] # Starting hour d = ts_fid.read(1)[0] # Starting day l = ts_fid.read(1)[0] # Starting month y = ts_fid.read(1)[0] # Starting year ts_fid.read(1) # skip the Starting weekday c = ts_fid.read(1)[0] # Starting century(-1) start = MTime(time_stamp=f"{(c*100)+y}-{l:02d}-{d:02d}T{h:02d}:{m:02d}:{s:02d}") logger.info(f"Start time is: {start.isoformat()} UTC") # Box series number (16-bit integer) box_num = int.from_bytes(ts_fid.read(2), byteorder="little", signed=False) # Number of scans in a data block (16-bit integer) n_scan = int.from_bytes(ts_fid.read(2), byteorder="little", signed=False) # Number of channels in a record n_ch = ts_fid.read(1)[0] # Length of the tag tag_length = ts_fid.read(1)[0] if tag_length != 32: ts_type = "V5-2000" sample_rate = 0 else: ts_type = "MTU-5" ts_fid.seek(4, 1) # skip to sampling frequency # Sampling frequency (16-bit integer, little endian) sample_rate = int.from_bytes( ts_fid.read(2), byteorder="little", signed=False ) ts_fid.seek(12, 1) # skip some (unknown) head info... logger.info(f"Sampling frequency is {sample_rate} Hz") logger.info(f"Number of records is {n_scan} in each data block") logger.info(f"TS type is: {ts_type}") return start, box_num, n_scan, n_ch, tag_length, ts_type, sample_rate def _extract_channel_data( self, data: np.ndarray, byte_indices: list[int] ) -> np.ndarray: """ Extract and convert 24-bit signed channel data from raw bytes. Combines three consecutive bytes (low, middle, high) per sample to reconstruct 24-bit signed integer values for a single channel. Parameters ---------- data : ndarray Raw byte data array with shape (n_ch*3, n_scan), where each row represents one byte position and each column is one time sample. byte_indices : list of int Three byte indices [low_byte, mid_byte, high_byte] indicating which rows of the data array contain the 24-bit value components. Returns ------- ndarray Signed 24-bit integer values for the channel with shape (n_scan,). Examples -------- Extract channel 0 from 3-channel data: >>> data = np.random.randint(0, 256, size=(9, 1000), dtype=np.int32) >>> reader = MTUTSN() >>> ch0_data = reader._extract_channel_data(data, [0, 1, 2]) >>> print(ch0_data.shape) (1000,) Extract channel 1 from 4-channel data: >>> data = np.random.randint(0, 256, size=(12, 500), dtype=np.int32) >>> ch1_data = reader._extract_channel_data(data, [3, 4, 5]) >>> print(ch1_data.shape) (500,) """ return self.get_sign24( data[byte_indices[2], :] * self._p16 + data[byte_indices[1], :] * self._p8 + data[byte_indices[0], :] ) def _process_data_block( self, ts_fid: BinaryIO, n_ch: int, n_scan: int, ts: np.ndarray, start_idx: int, end_idx: int, ) -> bool: """ Read and process a single data block from the TSN file. Reads one data block consisting of a 32-byte header followed by n_scan*n_ch*3 bytes of 24-bit channel data. Extracts and converts the data for all channels and populates the specified slice of the time series array. Parameters ---------- ts_fid : BinaryIO Open binary file handle positioned at the start of a data block. n_ch : int Number of channels (3, 4, 5, or 6). n_scan : int Number of time samples in this block. ts : ndarray Pre-allocated time series array to populate with shape (n_ch, total_samples). start_idx : int Starting index in the second dimension of ts for this block's data. end_idx : int Ending index (exclusive) in the second dimension of ts. Returns ------- bool True if data was read and processed successfully, False if end of file reached or unsupported channel count. Examples -------- Process blocks from a file: >>> with open('data/1690C16C.TS3', 'rb') as f: ... reader = MTUTSN() ... # ... read header first ... ... ts = np.zeros((3, 3600)) ... success = reader._process_data_block(f, 3, 1200, ts, 0, 1200) ... print(f"Block processed: {success}") Block processed: True """ ts_fid.seek(32, 1) # skip the file tag data = np.frombuffer(ts_fid.read(n_ch * 3 * n_scan), dtype=np.uint8) if len(data) == 0: logger.warning("No data read in current block...") return False # Reshape data to [Nch*3, Nscan] (Fortran order to match MATLAB) # Convert to int32 to avoid overflow during multiplication data = data.reshape((n_scan, n_ch * 3)).T.astype(np.int32) # Channel byte index mapping (each channel uses 3 consecutive bytes) # Format: [low_byte, mid_byte, high_byte] channel_map = { 3: {0: [0, 1, 2], 1: [3, 4, 5], 2: [6, 7, 8]}, 4: {0: [0, 1, 2], 1: [3, 4, 5], 2: [6, 7, 8], 3: [12, 13, 14]}, 5: { 0: [0, 1, 2], 1: [3, 4, 5], 2: [6, 7, 8], 3: [9, 10, 11], 4: [12, 13, 14], }, 6: { 0: [0, 1, 2], 1: [3, 4, 5], 2: [6, 7, 8], 3: [9, 10, 11], 4: [12, 13, 14], 5: [15, 16, 17], }, } if n_ch in channel_map: for ch_idx, byte_indices in channel_map[n_ch].items(): ts[ch_idx, start_idx:end_idx] = self._extract_channel_data( data, byte_indices ) else: logger.warning(f"Unsupported number of channels: {n_ch}") return False return True
[docs] def read(self, file_path: str | Path | None = None) -> None: """ Read and parse a Phoenix MTU time series binary file. Reads complete time series data from legacy Phoenix MTU-5A instrument files (.TS2, .TS3, .TS4, .TS5) or V5-2000 system files (.TSL, .TSH). Each file contains multiple data blocks with 24-bit signed integer samples organized by channel. Parameters ---------- file_path : str or Path or None, optional Path to the TSN file to read. If None, uses the current file_path attribute. Default is None. Returns ------- ts : ndarray Time series data array with shape (n_channels, total_samples). Data type is float64. Each row represents one channel, and each column is a time sample. tag : dict Metadata dictionary containing file information with keys: - 'box_number' (int): Instrument serial number - 'ts_type' (str): Instrument type ('MTU-5' or 'V5-2000') - 'sample_rate' (int): Sampling frequency in Hz - 'n_ch' (int): Number of channels - 'n_scan' (int): Number of scans per data block - 'start' (MTime): UTC timestamp of first sample - 'ts_length' (float): Duration of each block in seconds - 'n_block' (int): Total number of data blocks in file Raises ------ EOFError If the file is empty or cannot be read. ValueError If the file has an unsupported extension or channel count. FileNotFoundError If the specified file does not exist. Examples -------- Read a 3-channel TS3 file: >>> reader = MTUTSN() >>> ts, tag = reader.read('data/1690C16C.TS3') >>> print(f"Shape: {ts.shape}") Shape: (3, 86400) >>> print(f"Sample rate: {tag['sample_rate']} Hz") Sample rate: 24 Hz >>> print(f"Duration: {ts.shape[1] / tag['sample_rate']:.1f} seconds") Duration: 3600.0 seconds Read a 4-channel TS4 file: >>> reader = MTUTSN('data/1690C16C.TS4') >>> print(f"Channels: {reader.tag['n_ch']}") Channels: 4 >>> print(f"Start time: {reader.tag['start'].isoformat()}") Start time: 2016-07-16T00:00:00+00:00 Read and process data: >>> ts, tag = MTUTSN().read('data/station.TS5') >>> # Calculate statistics for each channel >>> for i in range(tag['n_ch']): ... print(f"Ch{i} mean: {ts[i].mean():.2f}, std: {ts[i].std():.2f}") Ch0 mean: 123.45, std: 456.78 Ch1 mean: -234.56, std: 567.89 ... """ # try opening the ts data file if file_path is not None: self.file_path = file_path logger.info(f"Opening file: {self.file_path.name} ...") with open(self.file_path, "rb") as ts_fid: # Read the header to check if file is empty s_byte = ts_fid.read(1) # Starting second if len(s_byte) == 0: raise EOFError("TSN file is empty or could not be read.") # Reset to beginning and read full header ts_fid.seek(0, 0) ( start, box_num, n_scan, n_ch, tag_length, ts_type, sample_rate, ) = self._read_header(ts_fid) # Calculate file size and number of blocks ts_fid.seek(0, 2) file_size = ts_fid.tell() n_block = round(file_size / (n_scan * n_ch * 3 + 32)) # Preallocate memory for time series data ts = np.zeros((n_ch, n_scan * n_block), dtype=np.float64) logger.info(f"Total {n_block} block(s) found in current file") # Reset to beginning of file and process each data block ts_fid.seek(0, 0) for iblock in range(n_block): start_idx = iblock * n_scan end_idx = (iblock + 1) * n_scan if not self._process_data_block( ts_fid, n_ch, n_scan, ts, start_idx, end_idx ): break logger.info("# finish reading time series...") # Build the tag dictionary tag = { "box_number": box_num, "ts_type": ts_type, "sample_rate": sample_rate, "n_ch": n_ch, "n_scan": n_scan, "start": start, "ts_length": n_scan / sample_rate if sample_rate > 0 else 0, "n_block": n_block, } self.ts = ts self.ts_metadata = tag
[docs] def to_runts( self, table_filepath: str | Path | None = None, calibrate=True ) -> RunTS: """ Create an MTUTable object from the TSN file and associated TBL file. Parameters ---------- table_filepath : str or Path Path to the corresponding TBL file. Returns ------- MTUTable An MTUTable object containing metadata from the TBL file. Examples -------- >>> reader = MTUTSN('data/1690C16C.TS3') >>> mtu_table = reader.to_runts('data/1690C16C.TBL') >>> print(mtu_table.metadata) {...} """ # Read data if not already loaded if self.ts is None or self.ts_metadata is None: self.read() ts = self.ts ts_metadata = self.ts_metadata if table_filepath is None and self.file_path is not None: table_filepath = self.file_path.with_suffix(".TBL") mtu_table = MTUTable(table_filepath) survey_metadata = mtu_table.survey_metadata # to trigger warning if no data run_metadata = mtu_table.run_metadata.copy() run_metadata.sample_rate = ts_metadata["sample_rate"] # update run id to include sample rate run_metadata.id = f"sr{ts_metadata['sample_rate']}_001" run_ts = RunTS( survey_metadata=mtu_table.survey_metadata, station_metadata=survey_metadata.stations[0], run_metadata=run_metadata, ) for comp, channel_number in mtu_table.channel_keys.items(): channel_metadata = getattr(mtu_table, f"{comp}_metadata") channel_metadata.sample_rate = ts_metadata["sample_rate"] channel_metadata.start_time = ts_metadata["start"] # Channel numbers in TBL are 1-indexed, convert to 0-indexed for numpy channel_index = channel_number - 1 if calibrate: if comp in ["ex", "ey"]: scale_factor = ( mtu_table.ex_calibration if comp == "ex" else mtu_table.ey_calibration ) channel_metadata.units = "mV/km" elif comp in ["hx", "hy", "hz"]: scale_factor = mtu_table.magnetic_calibration channel_metadata.units = "nT" logger.info( f"Applying scale factor of {scale_factor} to channel {comp}" ) ts[channel_index, :] = ts[channel_index, :] * scale_factor # Determine channel type channel_type = "electric" if comp[0] in ["e"] else "magnetic" ch_ts = ChannelTS( channel_type=channel_type, data=ts[channel_index, :], channel_metadata=channel_metadata, ) run_ts.add_channel(ch_ts) return run_ts