Source code for mth5.utils.extract_subset_mth5
import pathlib
import pandas
from loguru import logger
from mth5.data.make_mth5_from_asc import _add_survey
from mth5.mth5 import MTH5
from mth5.timeseries import ChannelTS, RunTS
from mth5.utils.helpers import add_filters, station_in_mth5, survey_in_mth5
[docs]
def extract_subset(
source_file: pathlib.Path,
target_file: pathlib.Path,
subset_df: pandas.DataFrame,
filters: str = "all",
):
"""
This function is a proof-of-concept of issue 219: exporting a subset
TODO: add check that subset_df is a subset of source_file
TODO: add tests for source/target v0.1.0
TODO: add tests for source/target v0.2.0
TODO: Consider add tests for source v0.1.0/target v0.2.0
TODO: Consider add tests for source v0.2.0/target v0.1.0
:param source_file: Where the data will be extracted from
:param target_file: Where the data will be exported to
:param subset_df: description of the data to extract
:param filters: whether to bring all the filters or only those that are needed to describe the data.
Right now this is "all", but
TODO: support "required_only" filters, meaning that we only bring the filters from the selected channels.
:return:
"""
groupby = ["survey", "station", "run"]
m_source = MTH5(source_file)
m_source.open_mth5()
m_target = MTH5(target_file, file_version=m_source.file_version)
m_target.open_mth5()
groupby = ["survey", "station", "run"]
logger.info(f"Testing file_version {m_source.file_version}")
for (survey_id, station_id, run_id), run_df in subset_df.groupby(groupby):
survey = m_source.get_survey(survey_id)
# TODO: Thhe following assert is a nice-to-have, but is not robust to case survey_id is None
# assert survey.metadata.id == survey_id
# Check if survey already in mth5, don't add again (its cleaner but won't actually matter in results)
if not survey_in_mth5(m_target, survey.metadata.id):
logger.info(f"Survey {survey_id} not in mth5 -- Adding")
_add_survey(
m_target, survey.metadata
) # could be done using mth5, but need to handle 0.1.0, 0.2.0
else:
print(f"Survey {survey_id} already in target mth5")
# Add filters
if filters.lower() == "all":
filters_to_add = _get_list_of_filters_to_add_to_target_mth5(
m_source, m_target, survey_id=survey_id
)
# TODO: make this only get the filters from the relevant channels
if filters_to_add:
add_filters(m_target, filters_to_add, survey_id=survey_id)
# filters_dict = {x: m.filters_group.to_filter_object(x) for x in channel_metadata.filter.name}
source_station_obj = m_source.get_station(station_id, survey_id)
if not station_in_mth5(m_target, station_id, survey_id):
print(f"Need to make station {station_id}")
target_station_obj = m_target.add_station(
station_id,
station_metadata=source_station_obj.metadata,
survey=survey_id,
)
else:
print(f"station {station_id} already in target mth5")
target_station_obj = m_target.get_station(station_id, survey=survey_id)
source_run_obj = m_source.get_run(station_id, run_id, survey=survey_id)
logger.info(f"source_run_obj: {source_run_obj}")
target_channels = run_df.component.to_list()
source_channels = source_run_obj.channel_summary.component.to_list()
if set(source_channels) == set(target_channels):
logger.info(
"channels in source and target are same -- just map whole RunTS "
)
source_runts = source_run_obj.to_runts()
target_runts = source_runts
else:
msg = "there are a lot of edge cases to worry about here -- Help Wanted"
logger.info(msg)
# raise NotImplementedError(msg)
# Code in this case could be klindo like the following:
ch_list = []
for comp in run_df.component.to_list():
source_ch_obj = source_run_obj.get_channel(comp)
source_chts = source_ch_obj.to_channel_ts()
target_chts_metadata = source_chts.channel_metadata.copy()
target_chts = ChannelTS(
channel_type=target_chts_metadata.type,
data=source_chts.data_array.data,
channel_metadata=target_chts_metadata.to_dict(),
)
ch_list.append(target_chts)
target_runts = RunTS(array_list=ch_list)
target_runts.run_metadata.id = source_run_obj.metadata.id
# TODO:
# try:
# target_run_group = target_station_obj.get_run(run_id)
# except MTH5Error:
# target_run_group = target_station_obj.add_run(run_id)
target_run_group = target_station_obj.add_run(run_id)
target_run_group.from_runts(target_runts)
m_source.close_mth5()
m_target.close_mth5()
return
def _get_list_of_filters_to_add_to_target_mth5(m_source, m_target, survey_id=None):
"""
if v0.2.0 m_target must already have survey group
Returns
-------
"""
filters_to_add = []
if m_source.file_version == "0.1.0":
filter_names = m_source.filters_group.filter_dict.keys()
filter_names_to_add = [
x
for x in filter_names
if x not in m_target.filters_group.filter_dict.keys()
]
for filter_name in filter_names_to_add:
filter_instance = m_source.filters_group.to_filter_object(filter_name)
filters_to_add.append(filter_instance)
elif m_source.file_version == "0.2.0":
source_survey = m_source.get_survey(survey_id)
target_survey = m_target.get_survey(survey_id)
filter_names = source_survey.filters_group.filter_dict.keys()
filter_names_to_add = [
x
for x in filter_names
if x not in target_survey.filters_group.filter_dict.keys()
]
for filter_name in filter_names_to_add:
filter_instance = source_survey.filters_group.to_filter_object(filter_name)
filters_to_add.append(filter_instance)
return filters_to_add