mth5.processing package

Subpackages

Submodules

mth5.processing.kernel_dataset module

Magnetotelluric kernel dataset processing module.

This module contains a class for representing a dataset that can be processed.

The module provides functionality for: - Managing magnetotelluric time series intervals - Supporting single station and remote reference processing - Handling run combination and time interval restrictions - Interfacing with MTH5 data structures

Development Notes

Players on the stage: One or more mth5s.

Each mth5 has a “run_summary” dataframe available. Run_summary provides options for the local and possibly remote reference stations. Candidates for local station are the unique values in the station column.

For any candidate station, there are some integer n runs available. This yields 2^n - 1 possible combinations that can be processed, neglecting any flagging of time intervals within any run, or any joining of runs. (There are actually 2**n, but we ignore the empty set, so -1)

Intuition suggests default ought to be to process n runs in n+1 configurations: {all runs} + each run individually. This will give a bulk answer, and bad runs can be flagged by comparing them. After an initial processing, the tfs can be reviewed and the problematic runs can be addressed.

The user can interact with the run_summary_df, selecting sub dataframes via querying, and in future maybe via some GUI (or a spreadsheet).

The intended usage process is as follows:
  1. Start with a list of mth5s

  2. Extract a run_summary

  3. Stare at the run_summary_df, and select a station “S” to process

  4. Select a non-empty set of runs for station “S”

  5. Select a remote reference “RR”, (this is allowed to be None)

5. Extract the sub-dataframe corresponding to acquisition_runs from “S” and “RR” 7. If the remote is not None:

  • Drop the runs (rows) associated with RR that do not intersect with S

  • Restrict start/end times of RR runs that intersect with S so overlap is complete.

  • Restrict start/end times of S runs so that they intersect with remote

8. This is now a TFKernel Dataset Definition (ish). Initialize a default processing object and pass it this df.

Examples

>>> cc = ConfigCreator()
>>> p = cc.create_from_kernel_dataset(kernel_dataset)
- Optionally pass emtf_band_file=emtf_band_setup_file
 9. Edit the Processing Config appropriately,

TODO: Consider supporting a default value for ‘channel_scale_factors’ that is None, TODO: Might need to groupby survey & station, for now consider station_id unique.

class mth5.processing.kernel_dataset.KernelDataset(df: DataFrame | None = None, local_station_id: str = '', remote_station_id: str | None = None, **kwargs: Any)[source]

Bases: object

Magnetotelluric kernel dataset for time series processing.

This class works with mth5-derived channel_summary or run_summary dataframes that specify time series intervals. It manages acquisition “runs” that can be merged into processing runs, with support for both single station and remote reference processing configurations.

Parameters:
  • df (pd.DataFrame | None, optional) – Pre-formed dataframe with dataset configuration. Normally built from a run_summary, by default None

  • local_station_id (str, optional) – Local station identifier for the dataset. Normally passed via from_run_summary method, by default “”

  • remote_station_id (str | None, optional) – Remote reference station identifier. Normally passed via from_run_summary method, by default None

  • **kwargs (dict) – Additional keyword arguments to set as attributes

df[source]

Main dataset dataframe with time series intervals

Type:

pd.DataFrame | None

local_station_id[source]

Local station identifier

Type:

str | None

remote_station_id[source]

Remote reference station identifier

Type:

str | None

survey_metadata[source]

Survey metadata container

Type:

dict

initialized[source]

Whether MTH5 objects have been initialized

Type:

bool

local_mth5_obj[source]

Local station MTH5 object

Type:

Any | None

remote_mth5_obj[source]

Remote station MTH5 object

Type:

Any | None

Notes

The class is closely related to (may actually be an extension of) RunSummary. The main idea is to specify one or two stations, and a list of acquisition “runs” that can be merged into a “processing run”. Each acquisition run can be further divided into non-overlapping chunks by specifying time-intervals associated with that acquisition run.

The time intervals can be used for several purposes but primarily: - STFT processing for merged FC data structures - Binding together into xarray time series for gap filling - Managing and analyzing availability of reference time series

Examples

Create a kernel dataset from run summary:

>>> from mth5.processing.run_summary import RunSummary
>>> run_summary = RunSummary()
>>> dataset = KernelDataset()
>>> dataset.from_run_summary(run_summary, "station01", "station02")

Process single station data:

>>> single_dataset = KernelDataset()
>>> single_dataset.from_run_summary(run_summary, "station01")

See also

RunSummary

Data summary for processing configuration

add_columns_for_processing() None[source]

Add columns to the dataframe used during processing.

Development Notes: - This was originally in pipelines. - Q: Should mth5_objs be keyed by survey-station? - A: Yes, and … since the KernelDataset dataframe will be iterated over, should probably write an iterator method. This can iterate over survey-station tuples for multiple station processing. - Currently the model of keeping all these data objects “live” in the df seems to work OK, but is not well suited to HPC or lazy processing. :param mth5_objs: Keys are station_id, values are MTH5 objects. :type mth5_objs: dict,

clone() KernelDataset[source]

Create a deep copy of the dataset.

Returns:

Deep copy of this instance

Return type:

KernelDataset

clone_dataframe() DataFrame | None[source]

Create a deep copy of the dataframe.

Returns:

Deep copy of the dataframe, or None if dataframe is not set

Return type:

pd.DataFrame | None

close_mth5s() None[source]

Loop over all unique mth5_objs in dataset df and make sure they are closed.+.

property df: DataFrame | None[source]

Main dataset dataframe.

Returns:

Dataset dataframe with time series intervals, or None if not set

Return type:

pd.DataFrame | None

drop_runs_shorter_than(minimum_duration: float, units: str = 's', inplace: bool = True) DataFrame | None[source]

Drop runs from dataframe that are shorter than minimum duration.

Parameters:
  • minimum_duration (float) – The minimum allowed duration for a run (in units of units)

  • units (str, optional) – Time units, by default “s”. Currently only seconds are supported

  • inplace (bool, optional) – Whether to modify dataframe in place, by default True

Returns:

Modified dataframe if inplace=False, None if inplace=True

Return type:

pd.DataFrame | None

Raises:

NotImplementedError – If units other than seconds are specified

Notes

This method needs to have duration refreshed beforehand.

from_run_summary(run_summary: RunSummary, local_station_id: str | None = None, remote_station_id: str | None = None, sample_rate: float | int | None = None) None[source]

Initialize the dataframe from a run summary.

Parameters:
  • run_summary (RunSummary) – Summary of available data for processing from one or more stations

  • local_station_id (str | None, optional) – Label of the station for which an estimate will be computed, by default None

  • remote_station_id (str | None, optional) – Label of the remote reference station, by default None

  • sample_rate (float | int | None, optional) – Sample rate to filter data by, by default None

Raises:

ValueError – If restricting to specified stations yields empty dataset or if local and remote stations do not overlap for remote reference

get_metadata_from_df(df: DataFrame) Survey[source]

Extract metadata from the dataframe. The data frame should only include one station. So use self.local_df or self.remote_df. (Run Summary)

Parameters:

df (pd.DataFrame) – Dataframe to extract metadata from

Returns:

Dictionary containing survey metadata

Return type:

dict[str, Any]

get_run_object(index_or_row: int | Series) Run[source]

Get the run object associated with a row of the dataframe.

Parameters:

index_or_row (int | pd.Series) – Row index or row Series from the dataframe

Returns:

The run object associated with the row

Return type:

mt_metadata.timeseries.Run

Notes

This method may be deprecated in favor of direct calls to run_obj = row.mth5_obj.from_reference(row.run_hdf5_reference) in pipelines.

get_station_metadata(local_station_id: str) Station[source]

Returns the station metadata.

Development Notes: TODO: This appears to be unused. Was probably a precursor to the

update_survey_metadata() method. Delete if unused. If used fill out doc:

“Helper function for archiving the TF – returns an object we can use to populate station metadata in the _____” :param local_station_id: The name of the local station. :type local_station_id: str :rtype: mt_metadata.timeseries.Station

has_local_mth5() bool[source]

Check if local MTH5 file exists.

Returns:

True if local MTH5 file exists on filesystem

Return type:

bool

has_remote_mth5() bool[source]

Test if remote mth5 exists.

initialize_dataframe_for_processing() None[source]

Adds extra columns needed for processing to the dataframe.

Populates them with mth5 objects, run_hdf5_reference, and xr.Datasets.

Development Notes: Note #1: When assigning xarrays to dataframe cells, df dislikes xr.Dataset, so we convert to xr.DataArray before packing df

Note #2: [OPTIMIZATION] By accessing the run_ts and packing the “run_dataarray” column of the df, we

perform a non-lazy operation, and essentially forcing the entire decimation_level=0 dataset to be loaded into memory. Seeking a lazy method to handle this maybe worthwhile. For example, using a df.apply() approach to initialize only one row at a time would allow us to generate the FCs one row at a time and never ingest more than one run of data at a time …

Note #3: Uncommenting the continue statement here is desireable, will speed things up, but

is not yet tested. A nice test would be to have two stations, some runs having FCs built and others not having FCs built. What goes wrong is in update_survey_metadata. Need a way to get the survey metadata from a run, not a run_ts if possible

initialize_mth5s(mode: str = 'r')[source]

Return a dictionary of open mth5 objects, keyed by station_id.

Parameters:

mode (str, optional) – File opening mode, by default “r” (read-only)

Returns:

Dictionary keyed by station IDs containing MTH5 objects: - local station id: mth5.mth5.MTH5 - remote station id: mth5.mth5.MTH5 (if present)

Return type:

dict

Notes

Future versions for multiple station processing may need nested dict structure with [survey_id][station].

property input_channels: list[str][source]

Get input channels from dataframe.

Returns:

Input channel identifiers (sources)

Return type:

list[str]

Raises:

AttributeError – If dataframe is not available or local_df has no input_channels

property is_single_station: bool[source]

Returns True if no RR station.

property local_df: DataFrame | None[source]

Get dataframe subset for local station runs.

Returns:

Local station runs data, or None if dataframe not available

Return type:

pd.DataFrame | None

property local_mth5_path: Path | None[source]

Local station MTH5 file path.

Returns:

Path to local station MTH5 file, extracted from dataframe or stored path, or None if not available

Return type:

Path | None

property local_station_id: str | None[source]

Local station identifier.

Returns:

Local station identifier

Return type:

str | None

property local_survey_id: str[source]

Return string label for local survey id.

Returns:

Survey ID for the local station

Return type:

str

property local_survey_metadata: Survey[source]

Return survey metadata for local station.

property mini_summary: DataFrame[source]

Return a dataframe that fits in terminal display.

Returns:

Subset of the main dataframe with key columns for summary display

Return type:

pd.DataFrame

property mth5_objs[source]

Mth5 objs. :return: Dictionary [station_id: mth5_obj]. :rtype: dict

property num_sample_rates: int[source]

Returns the number of unique sample rates in the dataframe.

property output_channels: list[str][source]

Get output channels from dataframe.

Returns:

Output channel identifiers

Return type:

list[str]

Raises:

AttributeError – If dataframe is not available or local_df has no output_channels

property processing_id: str[source]

Its difficult to come up with unique ids without crazy long names so this is a generic id of local-remote, the station metadata will have run information and the config parameters.

property remote_channels: list[str][source]

Get remote reference channels from dataframe.

Returns:

Remote reference channel identifiers

Return type:

list[str]

Raises:

AttributeError – If dataframe is not available or remote_df has no remote_channels

property remote_df: DataFrame | None[source]

Get dataframe subset for remote station runs.

Returns:

Remote station runs data, or None if dataframe not available or no remote station configured

Return type:

pd.DataFrame | None

property remote_mth5_path: Path[source]

Remote mth5 path. :return: Remote station MTH5 path, a property extracted from the dataframe. :rtype: Path

property remote_station_id: str | None[source]

Remote reference station identifier.

Returns:

Remote station identifier

Return type:

str | None

restrict_run_intervals_to_simultaneous(df: DataFrame) None[source]

For each run in local_station_id check if it has overlap with other runs

There is room for optimization here

Note that you can wind up splitting runs here. For example, in that case where local is running continuously, but remote is intermittent. Then the local run may break into several chunks. :rtype: None

property sample_rate: float[source]

Returns the sample rate that of the data in the dataframe.

select_station_runs(station_runs_dict: dict, keep_or_drop: bool, inplace: bool = True) DataFrame | None[source]

Partition dataframe based on station_runs_dict and return one partition.

Parameters:
  • station_runs_dict (dict) – Keys are string IDs of stations to keep/drop. Values are lists of string labels for run_ids to keep/drop. Example: {“mt01”: [“0001”, “0003”]}

  • keep_or_drop (bool) – If True: returns df with only the station-runs specified If False: returns df with station_runs_dict entries removed

  • inplace (bool, optional) – If True, modifies dataframe in place, by default True

Returns:

Modified dataframe if inplace=False, None if inplace=True

Return type:

pd.DataFrame | None

classmethod set_path(value: str | Path | None) Path | None[source]

Set and validate a file path.

Parameters:

value (str | Path | None) – Path value to set and validate

Returns:

Validated Path object, or None if input is None

Return type:

Path | None

Raises:
  • IOError – If path does not exist on filesystem

  • ValueError – If value cannot be converted to Path

set_run_times(run_time_dict: dict, inplace: bool = True)[source]

Set run times from a dictionary.

Parameters:
  • run_time_dict (dict) – Dictionary formatted as {run_id: {start, end}}

  • inplace (bool, optional) – Whether to modify dataframe in place, by default True

Returns:

Modified dataframe if inplace=False, None if inplace=True

Return type:

pd.DataFrame | None

update_survey_metadata(i: int, row: Series, run_ts: RunTS) None[source]

Wrangle survey_metadata into kernel_dataset.

Development Notes: - The survey metadata needs to be passed to TF before exporting data. - This was factored out of initialize_dataframe_for_processing - TODO: It looks like we don’t need to pass the whole run_ts, just its metadata

There may be some performance implications to passing the whole object. Consider passing run_ts.survey_metadata, run_ts.run_metadata, run_ts.station_metadata only

Parameters:
  • i (int) – This would be the index of row, if we were sure that the dataframe was cleanly indexed.

  • row (pd.Series)

  • run_ts (mth5.timeseries.run_ts.RunTS) – Mth5 object having the survey_metadata.

Return type:

None

mth5.processing.kernel_dataset.intervals_overlap(start1: Timestamp, end1: Timestamp, start2: Timestamp, end2: Timestamp) bool[source]

Checks if intervals 1, and 2 overlap.

Interval 1 is (start1, end1), Interval 2 is (start2, end2),

Development Notes: This may work vectorized out of the box but has not been tested. Also, it is intended to work with pd.Timestamp objects, but should work for many objects that have an ordering associated. This website was used as a reference when writing the method: https://stackoverflow.com/questions/3721249/python-date-interval-intersection :param start1: Start of interval 1. :type start1: pd.Timestamp :param end1: End of interval 1. :type end1: pd.Timestamp :param start2: Start of interval 2. :type start2: pd.Timestamp :param end2: End of interval 2. :type end2: pd.Timestamp :return cond: True of the intervals overlap, False if they do now. :rtype cond: bool

mth5.processing.kernel_dataset.overlap(t1_start: Timestamp, t1_end: Timestamp, t2_start: Timestamp, t2_end: Timestamp) tuple[source]

Get the start and end times of the overlap between two intervals.

Interval 1 is (start1, end1), Interval 2 is (start2, end2),

Development Notes:

Possibly some nicer syntax in this discussion: https://stackoverflow.com/questions/3721249/python-date-interval-intersection - Intended to work with pd.Timestamp objects, but should work for many objects

that have an ordering associated.

Parameters:
  • t1_start (pd.Timestamp) – The start of interval 1.

  • t1_end (pd.Timestamp) – The end of interval 1.

  • t2_start (pd.Timestamp) – The start of interval 2.

  • t2_end (pd.Timestamp) – The end of interval 2.

Return start, end:

Start, end are either same type as input, or they are None,None.

Rtype start, end:

tuple

mth5.processing.kernel_dataset.restrict_to_station_list(df: DataFrame, station_ids: str | list[str], inplace: bool = True) DataFrame[source]

Drop all rows where station_ids are NOT in the provided list.

Operates on a deepcopy of dataframe if inplace=False.

Parameters:
  • df (pd.DataFrame) – A run summary dataframe

  • station_ids (str | list[str]) – Station ids to keep, normally local and remote

  • inplace (bool, optional) – If True, modifies dataframe in place, by default True

Returns:

Filtered dataframe with only specified stations

Return type:

pd.DataFrame

mth5.processing.run_summary module

This module contains the RunSummary class.

This is a helper class that summarizes the Runs in an mth5.

TODO: This class and methods could be replaced by methods in MTH5.

Functionality of RunSummary() 1. User can get a list of local_station options, which correspond to unique pairs of values: (survey, station)

2. User can see all possible ways of processing the data: - one list per (survey, station) pair in the run_summary

Some of the following functionalities may end up in KernelDataset: 3. User can select local_station -this can trigger a reduction of runs to only those that are from the local staion and simultaneous runs at other stations 4. Given a local station, a list of possible reference stations can be generated 5. Given a remote reference station, a list of all relevent runs, truncated to maximize coverage of the local station runs is generated 6. Given such a “restricted run list”, runs can be dropped 7. Time interval endpoints can be changed

Development Notes:
TODO: consider adding methods:
  • drop_runs_shorter_than”: removes short runs from summary

  • fill_gaps_by_time_interval”: allows runs to be merged if gaps between are short

  • fill_gaps_by_run_names”: allows runs to be merged if gaps between are short

TODO: Consider whether this should return a copy or modify in-place when querying the df.

class mth5.processing.run_summary.RunSummary(input_dict: dict | None = None, df: DataFrame | None = None)[source]

Bases: object

Class to contain a run-summary table from one or more mth5s.

WIP: For the full MMT case this may need modification to a channel based summary.

clone()[source]

2022-10-20: Cloning may be causing issues with extra instances of open h5 files …

property df: DataFrame[source]

Df function.

drop_no_data_rows() bool[source]

Drops rows marked has_data = False and resets the index of self.df.

from_mth5s(mth5_list) list[source]

Iterates over mth5s in list and creates one big dataframe summarizing the runs

property mini_summary: DataFrame[source]

Shows the dataframe with only a few columns for readbility.

property print_mini_summary: str[source]

Calls minisummary through logger so it is formatted.

set_sample_rate(sample_rate: float, inplace: bool = False)[source]

Set the sample rate so that the run summary represents all runs for a single sample rate.

Parameters:
  • sample_rate (float)

  • inplace (bool, optional) – DESCRIPTION. By default, False.

Returns:

DESCRIPTION.

Return type:

TYPE

mth5.processing.run_summary.extract_run_summaries_from_mth5s(mth5_list, summary_type='run', deduplicate=True)[source]

Given a list of mth5’s, iterate over them, extracting run_summaries and merging into one big table.

Development Notes: ToDo: Move this method into mth5? or mth5_helpers? ToDo: Make this a class so that the __repr__ is a nice visual representation of the df, like what channel summary does in mth5 - 2022-05-28 Modified to allow this method to accept mth5 objects as well as the already supported types of pathlib.Path or str

In order to drop duplicates I used the solution here: https://stackoverflow.com/questions/43855462/pandas-drop-duplicates-method-not-working-on-dataframe-containing-lists

Parameters:
  • deduplicate (, defaults to True. : bool, optional) – By default, True.

  • mth5_list

  • mth5_paths (list) – Paths or strings that point to mth5s.

  • summary_type (string, optional) – One of [“channel”, “run”] “channel” returns concatenated channel summary, “run” returns concatenated run summary,. By default, “run”.

  • deduplicate

Returns:

super_summary – Given a list of mth5s, a dataframe of all available runs.

Return type:

pd.DataFrame

Module contents