Creating one mth5 file per station in parallel
In this tutorial, we will demonstrate how to generate one mth5 file per station (with associated mt_metadata) from a survey in parallel on HPC.
For this example, we will be converting Earth Data Logger (ASCII) time series data from 93 stations of the AusLAMP Musgraves Province survey (see https://dx.doi.org/10.25914/5eaa30d63bd17). The ASCII time series were concatenated per run for each station and the total volume of time series data was 106 GB. 90 of the 93 stations were a single run - stations SA246, SA299 and SA324-2 had multiple runs.
This example was tested on the National Computational Infrastructure’s Gadi HPC system. Gadi is Australia’s most powerful supercomputer, a highly parallel cluster comprising more than 200,000 processor cores on ten different types of compute nodes. The example also makes use of the NCI-geophysics 2022.06 module which contains the Python libraries used in this tutorial.
Building our mth5_onefileperstation.py code
In the mth5_in_parallel tutorial, we built a single mth5 file containing all stations in our survey. In that example, we had to construct two codes: 1. mth5_skeleton.py which created the mth5 file structure without adding in any time series data. This code could not be run in parallel as the processes involved were collective (see Collective versus independent operations
in
https://docs.h5py.org/en/stable/mpi.html). 2. mth5_muscle.py which added in the timeseries data for each station in parallel (i.e. one station per node).
For this example, we will be creating a single file per station with associated mt_metadata. As a result, we only need to create one code as the generation of each file is independent (i.e., the different mpi ranks do not need to communicate with each other). The authors use mpi4py for this tutorial, but note that other libraries such as Dask or multiprocessing could also be used.
Our mth5_onefileperstation.py
code requires the following Python libraries:
[ ]:
from mpi4py import MPI
import h5py
from mth5.mth5 import MTH5
import numpy as np
from os import path
import os, psutil
import glob
import nc_time_axis
import time
from mt_metadata import timeseries as metadata
from mt_metadata.utils.mttime import MTime
import json
startTime = time.time()
As we are using mpi4py, we will need to define the MPI communicator, rank and size:
[ ]:
comm = MPI.COMM_WORLD
rank = MPI.COMM_WORLD.rank
size = MPI.COMM_WORLD.size
Next, we will define our: 1. working directories and file paths 2. Earth Data Logger channels 3. survey stations 4. survey name and run number (for stations with a single run)
[ ]:
### define working directories and file paths
work_dir = '/g/data/.../.../merged_data_all'
mth5_output_directory = '/g/data/.../.../mth5_outdir_single_file_per_station'
full_path_to_mth5_files = sorted(glob.glob(mth5_output_directory+"/*"))
full_path_to_ascii_files = sorted(glob.glob(work_dir+"/*"))
mt_metadata_dir = '/g/data/.../.../mt_metadata_json'
full_path_to_mt_metadata = sorted(glob.glob(mt_metadata_dir+"/*"))
survey_file_name = 'survey.json'
survey_json = mt_metadata_dir+'/'+survey_file_name
### define raw time series data channels
raw_data_channels = ['EX','EY','BX','BY','BZ','TP','ambientTemperature']
### define the stations in our survey
stations_all = ['SA225-2','SA227', 'SA242', 'SA243', 'SA245',
'SA247', 'SA248', 'SA249', 'SA250', 'SA251',
'SA252', 'SA26W-2', 'SA270', 'SA271', 'SA272',
'SA273', 'SA274-2', 'SA274', 'SA275', 'SA276',
'SA277', 'SA293-2', 'SA294', 'SA295', 'SA296',
'SA297', 'SA298', 'SA300', 'SA301', 'SA319',
'SA320', 'SA320-2', 'SA321', 'SA322', 'SA323',
'SA324', 'SA325-2', 'SA325', 'SA326N', 'SA326S',
'SA344', 'SA344-2', 'SA345', 'SA346', 'SA347',
'SA348', 'SA349', 'SA350', 'SA351', ### 49 single run SA stations
'WA10', 'WA13', 'WA14', 'WA15', 'WA26',
'WA27', 'WA29', 'WA30', 'WA31', 'WA42',
'WA43', 'WA44', 'WA45', 'WA46', 'WA47',
'WA54', 'WA55', 'WA56', 'WA57', 'WA58',
'WA60', 'WA61', 'WA62', 'WA63', 'WA64',
'WA65', 'WA66', 'WA67', 'WA68', 'WA69',
'WA70', 'WA71', 'WA72', 'WA73', 'WA74',
'WA75', 'WANT19', 'WANT38', 'WANT45', 'WASA302',
'WASA327', ### 41 single run WA stations
'SA246', 'SA299', 'SA324-2'] ### 3 stations with multiple runs
### define the survey name and run number (for stations with a single run)
survey_name = "AusLAMP_Musgraves"
run_number = "001"
We will also be adding mt_metadata into our mth5 files. For this example, a single json file was created per run and the contents of each json file looked something like this:
{
"electric_ex": {
"channel_number": 0,
"component": null,
"data_quality.rating.value": 0,
"dipole_length": null,
"filter.applied": [
false
],
"filter.name": [],
"measurement_azimuth": 0.0,
"measurement_tilt": 0.0,
"negative.elevation": 0.0,
"negative.id": null,
"negative.latitude": 0.0,
"negative.longitude": 0.0,
"negative.manufacturer": null,
"negative.type": null,
"positive.elevation": 0.0,
"positive.id": null,
"positive.latitude": 0.0,
"positive.longitude": 0.0,
"positive.manufacturer": null,
"positive.type": null,
"sample_rate": 10.0,
"time_period.end": "1980-01-01T00:00:00+00:00",
"time_period.start": "1980-01-01T00:00:00+00:00",
"type": "electric",
"units": null
},
"electric_ey": {
"channel_number": 0,
"component": null,
"data_quality.rating.value": 0,
"dipole_length": null,
"filter.applied": [
false
],
"filter.name": [],
"measurement_azimuth": 0.0,
"measurement_tilt": 0.0,
"negative.elevation": 0.0,
"negative.id": null,
"negative.latitude": 0.0,
"negative.longitude": 0.0,
"negative.manufacturer": null,
"negative.type": null,
"positive.elevation": 0.0,
"positive.id": null,
"positive.latitude": 0.0,
"positive.longitude": 0.0,
"positive.manufacturer": null,
"positive.type": null,
"sample_rate": 10.0,
"time_period.end": "1980-01-01T00:00:00+00:00",
"time_period.start": "1980-01-01T00:00:00+00:00",
"type": "electric",
"units": null
},
"magnetic_bx": {
"channel_number": 0,
"component": null,
"data_quality.rating.value": 0,
"filter.applied": [
false
],
"filter.name": [],
"location.elevation": 0.0,
"location.latitude": 0.0,
"location.longitude": 0.0,
"measurement_azimuth": 0.0,
"measurement_tilt": 0.0,
"sample_rate": 10.0,
"sensor.id": null,
"sensor.manufacturer": null,
"sensor.type": null,
"time_period.end": "1980-01-01T00:00:00+00:00",
"time_period.start": "1980-01-01T00:00:00+00:00",
"type": "magnetic",
"units": null
},
"magnetic_by": {
"channel_number": 0,
"component": null,
"data_quality.rating.value": 0,
"filter.applied": [
false
],
"filter.name": [],
"location.elevation": 0.0,
"location.latitude": 0.0,
"location.longitude": 0.0,
"measurement_azimuth": 0.0,
"measurement_tilt": 0.0,
"sample_rate": 10.0,
"sensor.id": null,
"sensor.manufacturer": null,
"sensor.type": null,
"time_period.end": "1980-01-01T00:00:00+00:00",
"time_period.start": "1980-01-01T00:00:00+00:00",
"type": "magnetic",
"units": null
},
"magnetic_bz": {
"channel_number": 0,
"component": null,
"data_quality.rating.value": 0,
"filter.applied": [
false
],
"filter.name": [],
"location.elevation": 0.0,
"location.latitude": 0.0,
"location.longitude": 0.0,
"measurement_azimuth": 0.0,
"measurement_tilt": 0.0,
"sample_rate": 10.0,
"sensor.id": null,
"sensor.manufacturer": null,
"sensor.type": null,
"time_period.end": "1980-01-01T00:00:00+00:00",
"time_period.start": "1980-01-01T00:00:00+00:00",
"type": "magnetic",
"units": null
},
"run": {
"channels_recorded_auxiliary": [],
"channels_recorded_electric": [],
"channels_recorded_magnetic": [],
"data_logger.firmware.author": null,
"data_logger.firmware.name": null,
"data_logger.firmware.version": null,
"data_logger.id": null,
"data_logger.manufacturer": null,
"data_logger.timing_system.drift": 0.0,
"data_logger.timing_system.type": "GPS",
"data_logger.timing_system.uncertainty": 0.0,
"data_logger.type": null,
"data_type": "LPMT",
"id": null,
"sample_rate": 10.0,
"time_period.end": "1980-01-01T00:00:00+00:00",
"time_period.start": "1980-01-01T00:00:00+00:00"
},
"station": {
"acquired_by.name": null,
"channels_recorded": [],
"data_type": "LPMT",
"geographic_name": null,
"id": null,
"location.declination.model": "WMM",
"location.declination.value": 0.0,
"location.elevation": 0.0,
"location.latitude": 0.0,
"location.longitude": 0.0,
"orientation.method": null,
"orientation.reference_frame": "geographic",
"provenance.creation_time": "1980-01-01T00:00:00+00:00",
"provenance.software.author": "none",
"provenance.software.name": null,
"provenance.software.version": null,
"provenance.submitter.email": null,
"provenance.submitter.organization": null,
"run_list": [],
"time_period.end": "1980-01-01T00:00:00+00:00",
"time_period.start": "1980-01-01T00:00:00+00:00"
},
"survey": {
"citation_dataset.doi": null,
"citation_journal.doi": null,
"country": null,
"datum": "WGS84",
"geographic_name": null,
"id": null,
"name": null,
"northwest_corner.latitude": 0.0,
"northwest_corner.longitude": 0.0,
"project": null,
"project_lead.email": null,
"project_lead.organization": null,
"release_license": "CC-0",
"southeast_corner.latitude": 0.0,
"southeast_corner.longitude": 0.0,
"summary": null,
"time_period.end_date": "1980-01-01",
"time_period.start_date": "1980-01-01"
},
}
Note that the mt_metadata in these json files were synthetic and were mainly used to show how mt_metadata could be added into our automation.
We will now define functions that will be used to create our MTH5 files:
[ ]:
def make_mth5_dir(mth5_output_directory):
### creates the mth5 output directory if it doesn't already exist
try:
os.makedirs(mth5_output_directory)
except FileExistsError:
# directory already exists
print('directory already exists!')
pass
def remove_existing_mth5_files(mth5_files):
### removes existing mth5 files if they exists
for mth5_file in mth5_files:
if path.exists(mth5_file):
os.unlink(mth5_file)
print("INFO: Removed existing file {}".format(mth5_file))
else:
print("File does not exist")
def channel_data_extraction(channels):
### extracts electromagnetic time series data from concatenated (per run) Earth Data Logger
### ASCII time series files
EX = [file for file in channels if file.endswith('EX')]
EY = [file for file in channels if file.endswith('EY')]
BX = [file for file in channels if file.endswith('BX')]
BY = [file for file in channels if file.endswith('BY')]
BZ = [file for file in channels if file.endswith('BZ')]
with open(EX[0], 'r') as file:
EX1 = file.read().splitlines()
ex_ts = np.array(EX1).astype(np.int32)
with open(EY[0], 'r') as file:
EY1 = file.read().splitlines()
ey_ts = np.array(EY1).astype(np.int32)
with open(BX[0], 'r') as file:
BX1 = file.read().splitlines()
bx_ts = np.array(BX1).astype(np.int32)
with open(BY[0], 'r') as file:
BY1 = file.read().splitlines()
by_ts = np.array(BY1).astype(np.int32)
with open(BZ[0], 'r') as file:
BZ1 = file.read().splitlines()
bz_ts = np.array(BZ1).astype(np.int32)
return ex_ts, ey_ts, bx_ts, by_ts, bz_ts
def create_mth5_group_station_run_channel(station,mt_metadata):
### creates the mth5 gropus, stations, runs and channels for each mth5 file
### and populates mt_metadata from the relevant json file
with open(mt_metadata[0], 'r') as json_file:
json_load = json.load(json_file)
station_dict = json_load['station']
add_station = m.add_station(station, survey=survey_name)
add_station.metadata.from_dict(station_dict)
add_station.write_metadata()
channels = []
for file in full_path_to_ascii_files:
if station in file:
channels.append(file)
else:
continue
### for stations with a single run:
if len(channels) == len(raw_data_channels):
run_dict = json_load['run']
ex_dict = json_load['electric_ex']
ey_dict = json_load['electric_ey']
bx_dict = json_load['magnetic_bx']
by_dict = json_load['magnetic_by']
bz_dict = json_load['magnetic_bz']
add_run = m.add_run(station, run_number, survey=survey_name)
add_run.metadata.from_dict(run_dict)
add_run.write_metadata()
ex_ts,ey_ts,bx_ts,by_ts,bz_ts = channel_data_extraction(channels)
ex = m.add_channel(station, run_number, "ex", "electric", ex_ts, survey=survey_name)
ey = m.add_channel(station, run_number, "ey", "electric", ey_ts, survey=survey_name)
bx = m.add_channel(station, run_number, "bx", "magnetic", bx_ts, survey=survey_name)
by = m.add_channel(station, run_number, "by", "magnetic", by_ts, survey=survey_name)
bz = m.add_channel(station, run_number, "bz", "magnetic", bz_ts, survey=survey_name)
ex.metadata.from_dict(ex_dict)
ex.write_metadata()
ey.metadata.from_dict(ey_dict)
ey.write_metadata()
bx.metadata.from_dict(bx_dict)
bx.write_metadata()
by.metadata.from_dict(by_dict)
by.write_metadata()
bz.metadata.from_dict(bz_dict)
bz.write_metadata()
### for stations with multiple runs:
elif len(channels) > len(raw_data_channels):
sort_files = sorted(channels)
number_of_channels = len(raw_data_channels)
split_lists = [sort_files[x:x+number_of_channels] for x in range(0, len(sort_files), number_of_channels)]
mt_metadata_files = sorted(mt_metadata)
for i, (group,mt_meta) in enumerate(zip(split_lists,mt_metadata_files)):
mrun_number = i+1
run = "00%i" % mrun_number
with open(mt_meta, 'r') as json_file:
json_load = json.load(json_file)
run_dict = json_load['run']
ex_dict = json_load['electric_ex']
ey_dict = json_load['electric_ey']
bx_dict = json_load['magnetic_bx']
by_dict = json_load['magnetic_by']
bz_dict = json_load['magnetic_bz']
add_run = m.add_run(station, run, survey=survey_name)
add_run.metadata.from_dict(run_dict)
add_run.write_metadata()
ex_ts,ey_ts,bx_ts,by_ts,bz_ts = channel_data_extraction(channels)
ex = m.add_channel(station, run, "ex", "electric", ex_ts, survey=survey_name)
ey = m.add_channel(station, run, "ey", "electric", ey_ts, survey=survey_name)
bx = m.add_channel(station, run, "bx", "magnetic", bx_ts, survey=survey_name)
by = m.add_channel(station, run, "by", "magnetic", by_ts, survey=survey_name)
bz = m.add_channel(station, run, "bz", "magnetic", bz_ts, survey=survey_name)
ex.metadata.from_dict(ex_dict)
ex.write_metadata()
ey.metadata.from_dict(ey_dict)
ey.write_metadata()
bx.metadata.from_dict(bx_dict)
bx.write_metadata()
by.metadata.from_dict(by_dict)
by.write_metadata()
bz.metadata.from_dict(bz_dict)
bz.write_metadata()
elif len(channels) < len(raw_data_channels):
print('you are likely missing some channels')
print(station)
else:
print('something has gone wrong')
The final step is to create our mth5 files per station. For this we will be creating a single mth5 file per MPI rank for each station in our survey. As each process on each rank is independent, we can run all processes in parallel. Additionally, we can add compression to each file as we are not using the Parallel HDF5 library (which doesn’t support compression) that was used in the mth5_in_parallel tutorial. For this example, we will use “gzip” level 4 compression.
[ ]:
### create mth5 output directory and remove existing files
if rank==0:
make_mth5_dir(mth5_output_directory)
remove_existing_mth5_files(full_path_to_mth5_files)
comm.Barrier()
### create a single mth5 file per station with compression and associated mt_metadata
for i,station in enumerate(sorted(stations_all)):
if i%size!=rank:
continue
m = MTH5(file_version='0.2.0',shuffle=None,fletcher32=None,compression="gzip",compression_opts=4)
hdf5_filename = '{}.h5'.format(station)
h5_fn = mth5_output_directory+'/'+hdf5_filename
m.open_mth5(h5_fn, "w")
survey_group = m.add_survey(survey_name)
with open(survey_json, 'r') as json_file:
json_load = json.load(json_file)
survey_dict = json_load['survey']
survey_group.metadata.from_dict(survey_dict)
survey_group.write_metadata()
mt_metadata_file_name = '{}.json'.format(station)
mt_metadata = [file for file in full_path_to_mt_metadata if file.endswith(mt_metadata_file_name)]
create_mth5_group_station_run_channel(station,mt_metadata)
m.close_mth5()
comm.Barrier()
### print total time to run script
if rank==0:
print('The script took {0} seconds !'.format(time.time()-startTime))
Putting this altogether into a Python script (mth5_onefileperstation.py
):
[ ]:
from mpi4py import MPI
import h5py
from mth5.mth5 import MTH5
import numpy as np
from os import path
import os, psutil
import glob
import nc_time_axis
import time
from mt_metadata import timeseries as metadata
from mt_metadata.utils.mttime import MTime
import json
startTime = time.time()
### define MPI comm, rank and size
comm = MPI.COMM_WORLD
rank = MPI.COMM_WORLD.rank
size = MPI.COMM_WORLD.size
### define working directories and file paths
work_dir = '/g/data/.../.../merged_data_all'
mth5_output_directory = '/g/data/.../.../mth5_outdir_single_file_per_station'
full_path_to_mth5_files = sorted(glob.glob(mth5_output_directory+"/*"))
full_path_to_ascii_files = sorted(glob.glob(work_dir+"/*"))
mt_metadata_dir = '/g/data/.../.../mt_metadata_json'
full_path_to_mt_metadata = sorted(glob.glob(mt_metadata_dir+"/*"))
survey_file_name = 'survey.json'
survey_json = mt_metadata_dir+'/'+survey_file_name
### define raw time series data channels
raw_data_channels = ['EX','EY','BX','BY','BZ','TP','ambientTemperature']
### define stations to go into mth5 file
stations_all = ['SA225-2','SA227', 'SA242', 'SA243', 'SA245',
'SA247', 'SA248', 'SA249', 'SA250', 'SA251',
'SA252', 'SA26W-2', 'SA270', 'SA271', 'SA272',
'SA273', 'SA274-2', 'SA274', 'SA275', 'SA276',
'SA277', 'SA293-2', 'SA294', 'SA295', 'SA296',
'SA297', 'SA298', 'SA300', 'SA301', 'SA319',
'SA320', 'SA320-2', 'SA321', 'SA322', 'SA323',
'SA324', 'SA325-2', 'SA325', 'SA326N', 'SA326S',
'SA344', 'SA344-2', 'SA345', 'SA346', 'SA347',
'SA348', 'SA349', 'SA350', 'SA351', ### 49 single run SA stations
'WA10', 'WA13', 'WA14', 'WA15', 'WA26',
'WA27', 'WA29', 'WA30', 'WA31', 'WA42',
'WA43', 'WA44', 'WA45', 'WA46', 'WA47',
'WA54', 'WA55', 'WA56', 'WA57', 'WA58',
'WA60', 'WA61', 'WA62', 'WA63', 'WA64',
'WA65', 'WA66', 'WA67', 'WA68', 'WA69',
'WA70', 'WA71', 'WA72', 'WA73', 'WA74',
'WA75', 'WANT19', 'WANT38', 'WANT45', 'WASA302',
'WASA327', ### 41 single run WA stations
'SA246', 'SA299', 'SA324-2'] ### 3 stations with multiple runs
### define survey name and run number (for stations with a single run)
survey_name = "AusLAMP_Musgraves"
run_number = "001"
### define functions
def make_mth5_dir(mth5_output_directory):
### creates mth5 output directory if it doesn't already exist
try:
os.makedirs(mth5_output_directory)
except FileExistsError:
# directory already exists
print('directory already exists!')
pass
def remove_existing_mth5_files(mth5_files):
### removes existing mth5 files if they exists
for mth5_file in mth5_files:
if path.exists(mth5_file):
os.unlink(mth5_file)
print("INFO: Removed existing file {}".format(mth5_file))
else:
print("File does not exist")
def channel_data_extraction(channels):
### extracts electromagnetic time series data from concatenated (per run) Earth Data Logger ASCII time series files
EX = [file for file in channels if file.endswith('EX')]
EY = [file for file in channels if file.endswith('EY')]
BX = [file for file in channels if file.endswith('BX')]
BY = [file for file in channels if file.endswith('BY')]
BZ = [file for file in channels if file.endswith('BZ')]
with open(EX[0], 'r') as file:
EX1 = file.read().splitlines()
ex_ts = np.array(EX1).astype(np.int32)
with open(EY[0], 'r') as file:
EY1 = file.read().splitlines()
ey_ts = np.array(EY1).astype(np.int32)
with open(BX[0], 'r') as file:
BX1 = file.read().splitlines()
bx_ts = np.array(BX1).astype(np.int32)
with open(BY[0], 'r') as file:
BY1 = file.read().splitlines()
by_ts = np.array(BY1).astype(np.int32)
with open(BZ[0], 'r') as file:
BZ1 = file.read().splitlines()
bz_ts = np.array(BZ1).astype(np.int32)
return ex_ts, ey_ts, bx_ts, by_ts, bz_ts
def create_mth5_group_station_run_channel(station,mt_metadata):
### creates the mth5 gropus, stations, runs and channels for each mth5 file
### and populates mt_metadata from the relevant json file
with open(mt_metadata[0], 'r') as json_file:
json_load = json.load(json_file)
station_dict = json_load['station']
add_station = m.add_station(station, survey=survey_name)
add_station.metadata.from_dict(station_dict)
add_station.write_metadata()
channels = []
for file in full_path_to_ascii_files:
if station in file:
channels.append(file)
else:
continue
### for stations with a single run:
if len(channels) == len(raw_data_channels):
run_dict = json_load['run']
ex_dict = json_load['electric_ex']
ey_dict = json_load['electric_ey']
bx_dict = json_load['magnetic_bx']
by_dict = json_load['magnetic_by']
bz_dict = json_load['magnetic_bz']
add_run = m.add_run(station, run_number, survey=survey_name)
add_run.metadata.from_dict(run_dict)
add_run.write_metadata()
ex_ts,ey_ts,bx_ts,by_ts,bz_ts = channel_data_extraction(channels)
ex = m.add_channel(station, run_number, "ex", "electric", ex_ts, survey=survey_name)
ey = m.add_channel(station, run_number, "ey", "electric", ey_ts, survey=survey_name)
bx = m.add_channel(station, run_number, "bx", "magnetic", bx_ts, survey=survey_name)
by = m.add_channel(station, run_number, "by", "magnetic", by_ts, survey=survey_name)
bz = m.add_channel(station, run_number, "bz", "magnetic", bz_ts, survey=survey_name)
ex.metadata.from_dict(ex_dict)
ex.write_metadata()
ey.metadata.from_dict(ey_dict)
ey.write_metadata()
bx.metadata.from_dict(bx_dict)
bx.write_metadata()
by.metadata.from_dict(by_dict)
by.write_metadata()
bz.metadata.from_dict(bz_dict)
bz.write_metadata()
### for stations with multiple runs:
elif len(channels) > len(raw_data_channels):
sort_files = sorted(channels)
number_of_channels = len(raw_data_channels)
split_lists = [sort_files[x:x+number_of_channels] for x in range(0, len(sort_files), number_of_channels)]
mt_metadata_files = sorted(mt_metadata)
for i, (group,mt_meta) in enumerate(zip(split_lists,mt_metadata_files)):
mrun_number = i+1
run = "00%i" % mrun_number
with open(mt_meta, 'r') as json_file:
json_load = json.load(json_file)
run_dict = json_load['run']
ex_dict = json_load['electric_ex']
ey_dict = json_load['electric_ey']
bx_dict = json_load['magnetic_bx']
by_dict = json_load['magnetic_by']
bz_dict = json_load['magnetic_bz']
add_run = m.add_run(station, run, survey=survey_name)
add_run.metadata.from_dict(run_dict)
add_run.write_metadata()
ex_ts,ey_ts,bx_ts,by_ts,bz_ts = channel_data_extraction(channels)
ex = m.add_channel(station, run, "ex", "electric", ex_ts, survey=survey_name)
ey = m.add_channel(station, run, "ey", "electric", ey_ts, survey=survey_name)
bx = m.add_channel(station, run, "bx", "magnetic", bx_ts, survey=survey_name)
by = m.add_channel(station, run, "by", "magnetic", by_ts, survey=survey_name)
bz = m.add_channel(station, run, "bz", "magnetic", bz_ts, survey=survey_name)
ex.metadata.from_dict(ex_dict)
ex.write_metadata()
ey.metadata.from_dict(ey_dict)
ey.write_metadata()
bx.metadata.from_dict(bx_dict)
bx.write_metadata()
by.metadata.from_dict(by_dict)
by.write_metadata()
bz.metadata.from_dict(bz_dict)
bz.write_metadata()
elif len(channels) < len(raw_data_channels):
print('you are likely missing some channels')
print(station)
else:
print('something has gone wrong')
### create mth5 output directory and remove existing files
if rank==0:
make_mth5_dir(mth5_output_directory)
remove_existing_mth5_files(full_path_to_mth5_files)
comm.Barrier()
### create a single mth5 file per station with compression and associated mt_metadata
for i,station in enumerate(sorted(stations_all)):
if i%size!=rank:
continue
m = MTH5(file_version='0.2.0',shuffle=None,fletcher32=None,compression="gzip",compression_opts=4)
hdf5_filename = '{}.h5'.format(station)
h5_fn = mth5_output_directory+'/'+hdf5_filename
m.open_mth5(h5_fn, "w")
survey_group = m.add_survey(survey_name)
with open(survey_json, 'r') as json_file:
json_load = json.load(json_file)
survey_dict = json_load['survey']
survey_group.metadata.from_dict(survey_dict)
survey_group.write_metadata()
mt_metadata_file_name = '{}.json'.format(station)
mt_metadata = [file for file in full_path_to_mt_metadata if file.endswith(mt_metadata_file_name)]
create_mth5_group_station_run_channel(station,mt_metadata)
m.close_mth5()
comm.Barrier()
### print total time to run script
if rank==0:
print('The script took {0} seconds !'.format(time.time()-startTime))
Now that we have created our mth5_onefileperstation.py
script, we next need to make a job submission script to submit to the Gadi PBSPro scheduler. The job submission script specifys the queue to use and the duration/resources needed for the job.
#!/bin/bash
#PBS -N mth5_onefileperstation
#PBS -q hugemem
#PBS -P fp0
#PBS -l walltime=0:05:00
#PBS -l ncpus=96
#PBS -l mem=900GB
#PBS -l jobfs=10GB
#PBS -l storage=gdata/fp0+gdata/my80+gdata/lm70+gdata/up99
module use /g/data/up99/modulefiles
module load NCI-geophys/22.06
cd ${PBS_O_WORKDIR}
mpirun -np $PBS_NCPUS python3 mth5_onefileperstation.py > ./pbs_job_logs/$PBS_JOBID.log
For our jobscript above (mth5_onefileperstation.sh
), we have requested: 1. to use the hugemem queue 2. 96 CPUs (2 nodes) 3. 900GB of memory 4. 5 minutes of walltime
We also need to define the NCI project codes used in mth5_onefileperstation.py
. The project code up99
is required to make use of the NCI-geophys/22.06 module that contains the Python libraries used in this tutorial.
To submit this jobscript (mth5_onefileperstation.sh
) on Gadi:
$ qsub mth5_muscle.sh
This job will process the 93 stations in our survey using 96 MPI ranks (one station per rank) and creates the following 93 mth5 files:
SA225-2.h5 SA252.h5 SA293-2.h5 SA320.h5 SA344.h5 WA15.h5 WA47.h5 WA65.h5 WANT19.h5
SA227.h5 SA26W-2.h5 SA294.h5 SA321.h5 SA345.h5 WA26.h5 WA54.h5 WA66.h5 WANT38.h5
SA242.h5 SA270.h5 SA295.h5 SA322.h5 SA346.h5 WA27.h5 WA55.h5 WA67.h5 WANT45.h5
SA243.h5 SA271.h5 SA296.h5 SA323.h5 SA347.h5 WA29.h5 WA56.h5 WA68.h5 WASA302.h5
SA245.h5 SA272.h5 SA297.h5 SA324-2.h5 SA348.h5 WA30.h5 WA57.h5 WA69.h5 WASA327.h5
SA246.h5 SA273.h5 SA298.h5 SA324.h5 SA349.h5 WA31.h5 WA58.h5 WA70.h5
SA247.h5 SA274-2.h5 SA299.h5 SA325-2.h5 SA350.h5 WA42.h5 WA60.h5 WA71.h5
SA248.h5 SA274.h5 SA300.h5 SA325.h5 SA351.h5 WA43.h5 WA61.h5 WA72.h5
SA249.h5 SA275.h5 SA301.h5 SA326N.h5 WA10.h5 WA44.h5 WA62.h5 WA73.h5
SA250.h5 SA276.h5 SA319.h5 SA326S.h5 WA13.h5 WA45.h5 WA63.h5 WA74.h5
SA251.h5 SA277.h5 SA320-2.h5 SA344-2.h5 WA14.h5 WA46.h5 WA64.h5 WA75.h5
Below is a summary of how long mth5_onefileperstation.py
took to run for the AusLAMP Musgraves Province survey:
run on: Gadi
number of stations: 93
number of runs: 101
NCPUs used: 96 (2 nodes)
memory used: 791 GB
walltime used: 215 seconds (3m35s)
CPU time used: 48570 seconds (5h06m07s)
service units: 17.20
Concluding remarks
We have generated 93 mth5 files for the 93 stations from the Musgraves survey in 3 minutes and 35 seconds using 2 nodes (96 CPUs) on Gadi. For comparison, if we only used one CPU it would have taken approximately 5 hours to create these files.
In our mth5_in_parallel example, we generated a single mth5 file with all stations and this took approximately 35 minutes of walltime (or ~14 hours of CPU time). The “single file per station” model was much quicker as it did not require the creation of an mth5 skeleton as each process (rank) was run independently.
We were able to introduce compression into our mth5_onefileperstation.py
code and the total volume of the 93 mth5 files was 14 GB. The mth5_in_parallel.ipynb
tutorial was following the “all stations in a single mth5 file” model and as our version of Parallel HDF5 did not support compression, the total volume of the single mth5 file was 106 GB.