Source code for mth5.io.phoenix.phoenix_collection

# -*- coding: utf-8 -*-
"""
Phoenix file collection module for organizing and processing Phoenix MTU data files.

This module provides the PhoenixCollection class for discovering, organizing,
and managing Phoenix magnetotelluric receiver files within a directory structure.

Created on Thu Aug  4 16:48:47 2022

@author: jpeacock
"""
from __future__ import annotations

from collections import OrderedDict
from pathlib import Path

import numpy as np
import pandas as pd

from mth5.io import Collection
from mth5.io.phoenix import open_phoenix, PhoenixReceiverMetadata


# =============================================================================


[docs] class PhoenixCollection(Collection): """ Collection manager for Phoenix MTU data files. Organizes Phoenix magnetotelluric receiver files into runs based on timing and sample rates. Handles multiple sample rates (30, 150, 2400, 24000, 96000 Hz) and manages receiver metadata. Parameters ---------- file_path : str | Path | None, optional Path to the directory containing Phoenix data files. Can be the station folder or a parent folder containing multiple stations. **kwargs Additional keyword arguments passed to parent Collection class. Attributes ---------- metadata_dict : dict[str, PhoenixReceiverMetadata] Dictionary mapping station IDs to their receiver metadata. Examples -------- Create a collection from a station directory: >>> from mth5.io.phoenix import PhoenixCollection >>> collection = PhoenixCollection(r"/path/to/station") >>> runs = collection.get_runs(sample_rates=[150, 24000]) >>> print(runs.keys()) dict_keys(['MT001']) Process multiple sample rates: >>> df = collection.to_dataframe(sample_rates=[150, 2400, 24000]) >>> print(df.columns) Index(['survey', 'station', 'run', 'start', 'end', ...]) Notes ----- The class automatically discovers station folders by locating 'recmeta.json' files and organizes time series files by sample rate. File extensions are mapped as: - 30 Hz: td_30 - 150 Hz: td_150 - 2400 Hz: td_2400 - 24000 Hz: td_24k - 96000 Hz: td_96k See Also -------- mth5.io.Collection : Base collection class mth5.io.phoenix.PhoenixReceiverMetadata : Receiver metadata handler """ def __init__(self, file_path: str | Path | None = None, **kwargs) -> None: self._file_extension_map = { 30: "td_30", 150: "td_150", 2400: "td_2400", 24000: "td_24k", 96000: "td_96k", } self._default_channel_map = { 0: "E1", 1: "H3", 2: "H2", 3: "H1", 4: "H4", 5: "H5", 6: "H6", 7: "E2", } super().__init__(file_path=file_path, **kwargs)
[docs] self.metadata_dict = {}
self._receiver_metadata_name = "recmeta.json" def _read_receiver_metadata_json( self, rec_fn: str | Path ) -> PhoenixReceiverMetadata | None: """ Read receiver metadata from JSON file. Loads and parses the recmeta.json file containing station and channel configuration information. Parameters ---------- rec_fn : str | Path Path to the recmeta.json metadata file. Returns ------- PhoenixReceiverMetadata | None Receiver metadata object if file exists, None otherwise. Examples -------- >>> metadata = collection._read_receiver_metadata_json( ... Path("/data/station/recmeta.json") ... ) >>> print(metadata.station_metadata.id) 'MT001' """ if Path(rec_fn).is_file(): return PhoenixReceiverMetadata(fn=rec_fn) else: self.logger.warning( f"Could not find {self._receiver_metadata_name} in {self.file_path}" ) return None def _locate_station_folders(self) -> list[Path]: """ Locate all station folders containing recmeta.json files. Recursively searches the collection path for directories containing the receiver metadata file (recmeta.json), which identifies a valid Phoenix station folder. Returns ------- list[Path] List of Path objects pointing to station folders. Examples -------- >>> folders = collection._locate_station_folders() >>> print([f.name for f in folders]) ['MT001', 'MT002', 'MT003'] Notes ----- Each station folder must contain a recmeta.json file to be recognized. The search is recursive, allowing for nested directory structures. """ station_folders = [] for folder in self.file_path.rglob("**/"): rec_fn = folder.joinpath("recmeta.json") if rec_fn.exists(): station_folders.append(folder) return station_folders
[docs] def to_dataframe( self, sample_rates: list[int] | int = [150, 24000], run_name_zeros: int = 4, calibration_path: str | Path | None = None, ) -> pd.DataFrame: """ Create a DataFrame cataloging all Phoenix files in the collection. Scans all station folders for time series files at specified sample rates and creates a comprehensive inventory with metadata for each file. Parameters ---------- sample_rates : list[int] | int, optional Sample rate(s) to include in Hz. Valid values are 30, 150, 2400, 24000, 96000. Can be a single integer or list (default is [150, 24000]). run_name_zeros : int, optional Number of zeros for zero-padding run names (default is 4). For example, 4 produces 'sr150_0001'. calibration_path : str | Path | None, optional Path to calibration files. Currently unused but reserved for future functionality. Returns ------- pd.DataFrame DataFrame with one row per file containing columns: - survey: Survey ID from metadata - station: Station ID from metadata - run: Run ID (assigned by assign_run_names) - start: File start time (ISO format) - end: File end time (ISO format) - channel_id: Numeric channel identifier - component: Channel component name (e.g., 'Ex', 'Hy') - fn: Full file path - sample_rate: Sample rate in Hz - file_size: File size in bytes - n_samples: Number of samples in file - sequence_number: File sequence number for continuous data - instrument_id: Recording/receiver ID - calibration_fn: Path to calibration file (currently None) Examples -------- Get DataFrame for standard sample rates: >>> df = collection.to_dataframe(sample_rates=[150, 24000]) >>> print(df.shape) (245, 14) >>> print(df.station.unique()) ['MT001'] Process single sample rate: >>> df_150 = collection.to_dataframe(sample_rates=150) >>> print(df_150.sample_rate.unique()) [150.] Check file coverage: >>> for comp in df.component.unique(): ... comp_df = df[df.component == comp] ... print(f"{comp}: {len(comp_df)} files") Ex: 35 files Ey: 35 files Hx: 35 files Notes ----- - Calibration files (identified by 'calibration' in filename) are automatically skipped - Files that cannot be opened are logged and skipped - The DataFrame is sorted by station, sample_rate, and start time - Run names must be assigned separately using assign_run_names() See Also -------- assign_run_names : Assign run identifiers based on timing get_runs : Get organized runs directly """ if not isinstance(sample_rates, (list, tuple)): sample_rates = [sample_rates] station_folders = self._locate_station_folders() entries = [] for folder in station_folders: rec_fn = folder.joinpath(self._receiver_metadata_name) receiver_metadata = self._read_receiver_metadata_json(rec_fn) self.metadata_dict[ receiver_metadata.station_metadata.id ] = receiver_metadata for sr in sample_rates: for fn in folder.rglob(f"*{self._file_extension_map[int(sr)]}"): if "calibration" in fn.as_posix().lower(): self.logger.debug(f"skipping calibration time series {fn}") continue try: phx_obj = open_phoenix(fn) except OSError: self.logger.warning(f"Skipping {fn.name}") continue if hasattr(phx_obj, "read_segment"): segment = phx_obj.read_segment(metadata_only=True) try: start = segment.segment_start_time.isoformat() except IOError: self.logger.warning(f"Could not read file {fn}, SKIPPING") continue end = segment.segment_end_time.isoformat() n_samples = segment.n_samples else: start = phx_obj.segment_start_time.isoformat() end = phx_obj.segment_end_time.isoformat() n_samples = phx_obj.max_samples entry = self.get_empty_entry_dict() entry["survey"] = receiver_metadata.survey_metadata.id entry["station"] = receiver_metadata.station_metadata.id entry["run"] = (None,) entry["start"] = start entry["end"] = end entry["channel_id"] = phx_obj.channel_id entry["component"] = receiver_metadata.channel_map[ phx_obj.channel_id ] entry["fn"] = fn entry["sample_rate"] = phx_obj.sample_rate entry["file_size"] = phx_obj.file_size entry["n_samples"] = n_samples entry["sequence_number"] = phx_obj.seq entry["instrument_id"] = phx_obj.recording_id entry["calibration_fn"] = None entries.append(entry) df = self._sort_df(self._set_df_dtypes(pd.DataFrame(entries)), run_name_zeros) return df
[docs] def assign_run_names(self, df: pd.DataFrame, zeros: int = 4) -> pd.DataFrame: """ Assign run names based on temporal continuity. Analyzes file timing to group files into runs. For continuous data (< 1000 Hz), maintains a single run as long as files are contiguous. For segmented data (≥ 1000 Hz), assigns a unique run to each segment. Parameters ---------- df : pd.DataFrame DataFrame returned by `to_dataframe` method with file inventory. zeros : int, optional Number of zeros for zero-padding run names (default is 4). Returns ------- pd.DataFrame DataFrame with 'run' column populated. Run names follow the format 'sr{rate}_{number:0{zeros}}', e.g., 'sr150_0001'. Examples -------- Assign run names to a DataFrame: >>> df = collection.to_dataframe(sample_rates=[150, 24000]) >>> df_with_runs = collection.assign_run_names(df, zeros=4) >>> print(df_with_runs.run.unique()) ['sr150_0001', 'sr24k_0001', 'sr24k_0002', ...] Check for data gaps in continuous data: >>> df_150 = df_with_runs[df_with_runs.sample_rate == 150] >>> print(df_150.run.unique()) ['sr150_0001', 'sr150_0002'] # Gap detected between runs Count segments in high-rate data: >>> df_24k = df_with_runs[df_with_runs.sample_rate == 24000] >>> n_segments = len(df_24k.run.unique()) >>> print(f"Found {n_segments} segments at 24 kHz") Found 43 segments at 24 kHz Notes ----- **Continuous Data (< 1000 Hz):** - Maintains single run ID while files are temporally contiguous - Detects gaps by comparing end time of file N with start time of file N+1 - Increments run counter when gap > 0 seconds detected **Segmented Data (≥ 1000 Hz):** - Each unique start time receives a new run ID - Typically results in one run per segment/file The run naming scheme uses the sample rate in the identifier: - 30 Hz → 'sr30_NNNN' - 150 Hz → 'sr150_NNNN' - 2400 Hz → 'sr2400_NNNN' - 24000 Hz → 'sr24k_NNNN' - 96000 Hz → 'sr96k_NNNN' """ rdf = df.copy() sample_rates = rdf.sample_rate.unique() for station in df.station.unique(): for sr in sample_rates: run_stem = self._file_extension_map[int(sr)].split("_")[-1] # continuous data if sr < 1000: sdf = rdf.loc[ (rdf.station == station) & (rdf.sample_rate == sr) ].sort_values("sequence_number") starts = np.sort(sdf.loc[sdf.sample_rate == sr].start.unique()) ends = np.sort(sdf.loc[sdf.sample_rate == sr].end.unique()) # find any breaks in the data diff = ends[0:-1] - starts[1:] diff = diff.astype("timedelta64[s]").astype(float) breaks = np.nonzero(diff)[0] # this logic probably needs some work. Need to figure # out how to set pandas values count = 1 if len(breaks) > 0: start_breaks = starts[breaks] for ii in range(len(start_breaks)): count += 1 rdf.loc[ (rdf.station == station) & (rdf.start == start_breaks[ii]) & (rdf.sample_rate == sr), "run", ] = f"sr{run_stem}_{count:0{zeros}}" else: rdf.loc[ (rdf.station == station) & (rdf.sample_rate == sr), "run", ] = f"sr{run_stem}_{count:0{zeros}}" # segmented data else: starts = rdf.loc[ (rdf.station == station) & (rdf.sample_rate == sr), "start", ].unique() for ii, s in enumerate(starts, 1): rdf.loc[ (rdf.start == s) & (rdf.sample_rate == sr), "run" ] = f"sr{run_stem}_{ii:0{zeros}}" return rdf
[docs] def get_runs( self, sample_rates: list[int] | int, run_name_zeros: int = 4, calibration_path: str | Path | None = None, ) -> OrderedDict[str, OrderedDict[str, pd.DataFrame]]: """ Organize Phoenix files into runs ready for reading. Creates a nested dictionary structure organizing files by station and run. For each run, returns only the first file(s) needed to initialize reading, as continuous readers will automatically load sequences. Parameters ---------- sample_rates : list[int] | int Sample rate(s) to include in Hz. Valid values are 30, 150, 2400, 24000, 96000. Can be a single integer or list. run_name_zeros : int, optional Number of zeros for zero-padding run names (default is 4). calibration_path : str | Path | None, optional Path to calibration files. Currently unused but reserved for future functionality. Returns ------- OrderedDict[str, OrderedDict[str, pd.DataFrame]] Nested OrderedDict with structure: - Keys: station IDs - Values: OrderedDict of runs - Keys: run IDs (e.g., 'sr150_0001') - Values: DataFrame with first file(s) for each channel Examples -------- Get runs for standard sample rates: >>> from mth5.io.phoenix import PhoenixCollection >>> collection = PhoenixCollection(r"/path/to/station") >>> runs = collection.get_runs(sample_rates=[150, 24000]) >>> print(runs.keys()) odict_keys(['MT001']) Access specific station's runs: >>> station_runs = runs['MT001'] >>> print(list(station_runs.keys())) ['sr150_0001', 'sr24k_0001', 'sr24k_0002', ...] Get first file for a specific run: >>> run_df = runs['MT001']['sr150_0001'] >>> print(run_df[['component', 'fn', 'start']]) component fn start 0 Ex /path/to/8441_2020...td_150 2020-06-02T19:00:00 1 Ey /path/to/8441_2020...td_150 2020-06-02T19:00:00 Iterate over all runs: >>> for station_id, station_runs in runs.items(): ... for run_id, run_df in station_runs.items(): ... print(f"{station_id}/{run_id}: {len(run_df)} channels") MT001/sr150_0001: 5 channels MT001/sr24k_0001: 5 channels Get single sample rate: >>> runs_150 = collection.get_runs(sample_rates=150) >>> run_ids = list(runs_150['MT001'].keys()) >>> print([r for r in run_ids if 'sr150' in r]) ['sr150_0001'] Notes ----- **For Continuous Data (< 1000 Hz):** Returns only the first file in each sequence per channel. The Phoenix reader will automatically load the complete sequence when reading. **For Segmented Data (≥ 1000 Hz):** Returns the first file for each segment. Each segment must be read separately. **DataFrame Content:** Each DataFrame contains one row per channel component with the earliest file for that component in the run. This ensures all channels start from the same time. The method internally: 1. Calls to_dataframe() to inventory all files 2. Calls assign_run_names() to group files into runs 3. Selects first file(s) for each run and component 4. Returns organized structure for easy iteration See Also -------- to_dataframe : Create complete file inventory assign_run_names : Group files into runs mth5.io.phoenix.read_phoenix : Read Phoenix files """ df = self.to_dataframe( sample_rates=sample_rates, run_name_zeros=run_name_zeros, calibration_path=calibration_path, ) run_dict = OrderedDict() for station in sorted(df.station.unique()): run_dict[station] = OrderedDict() for run_id in sorted( df[df.station == station].run.unique(), key=lambda x: x[-run_name_zeros:], ): run_df = df[(df.station == station) & (df.run == run_id)] first_row_list = [] for comp in run_df.component.unique(): comp_df = run_df[run_df.component == comp] comp_df = comp_df[comp_df.start == comp_df.start.min()] first_row_list.append(comp_df) # run_dict[station][run_id] = run_df[ # run_df.start == run_df.start.min() # ] # need to get the earliest file for each component separately run_dict[station][run_id] = pd.concat(first_row_list) return run_dict