Creating an mth5 file using Parallel HDF5

This tutorial will examine how to build an mth5 file and write MT time series data for each station in a survey in parallel by utilising h5py’s Parallel HDF5 functionality. Parallel HDF5 allows users to open files across multiple parallel processes by utilising the Messaging Passing Interface (MPI) standard in mpi4py.

Note that in order to use Parallel HDF5, both HDF5 and h5py must be compiled with MPI support turned on.

For this example, we will be converting Earth Data Logger (ASCII) time series data from 93 stations of the AusLAMP Musgraves Province survey (see https://dx.doi.org/10.25914/5eaa30d63bd17). The ASCII time series were concatenated per run for each station and the total volume of time series data was 106 GB. 90 of the 93 stations were a single run - stations SA246, SA299 and SA324-2 had multiple runs.

This example was tested on the National Computational Infrastructure’s Gadi HPC system. Gadi is Australia’s most powerful supercomputer, a highly parallel cluster comprising more than 200,000 processor cores on ten different types of compute nodes. The example also makes use of the NCI-geophysics 2022.06 module which contains Parallel HDF5.

Building an mth5 skeleton

To build our mth5 file requires a two step process: 1. create an mth5 skeleton 2. populate the skeleton with the time series data in parallel using Parallel HDF5.

Let’s start by building the mth5 skeleton script which requires the following libraries:

[ ]:
from mth5.mth5 import MTH5
import numpy as np
from os import path
import os
import glob
import time

from mt_metadata.utils.mttime import MTime

startTime = time.time()

Next we can define our working directories and file paths which will need to be changed for your use case:

[ ]:
### directory on Gadi file system that contains merged ASCII time series per station run
work_dir = '/g/data/.../.../merged_data_all'

### full path to the concatenated Earth Data Logger ASCII time series files
full_path_to_files = sorted(glob.glob(work_dir+"/*"))

### directory on Gadi to write final mth5 file to
mth5_test_dir = '/g/data/.../.../mth5_outdir'

### name of the mth5 file
hdf5_filename = 'example_mth5_file.h5'

### full path to the mth5 file
h5_fn = mth5_test_dir+'/'+hdf5_filename

Now we can define: 1. the Earth Data Logger channels 2. the stations in our survey 3. the survey name 4. the run number for stations with a single run

[ ]:
### define raw time series data channels from Earth Data Logger

raw_data_channels = ['EX','EY','BX','BY','BZ','TP','ambientTemperature']


### define stations to go into mth5 file

stations_all = ['SA225-2','SA227',   'SA242',  'SA243',  'SA245',
                'SA247',  'SA248',   'SA249',  'SA250',  'SA251',
                'SA252',  'SA26W-2', 'SA270',  'SA271',  'SA272',
                'SA273',  'SA274-2', 'SA274',  'SA275',  'SA276',
                'SA277',  'SA293-2', 'SA294',  'SA295',  'SA296',
                'SA297',  'SA298',   'SA300',  'SA301',  'SA319',
                'SA320',  'SA320-2', 'SA321',  'SA322',  'SA323',
                'SA324',  'SA325-2', 'SA325',  'SA326N', 'SA326S',
                'SA344',  'SA344-2', 'SA345',  'SA346',  'SA347',
                'SA348',  'SA349',   'SA350',  'SA351',             ### 49 single run SA stations
                'WA10',   'WA13',    'WA14',   'WA15',   'WA26',
                'WA27',   'WA29',    'WA30',   'WA31',   'WA42',
                'WA43',   'WA44',    'WA45',   'WA46',   'WA47',
                'WA54',   'WA55',    'WA56',   'WA57',   'WA58',
                'WA60',   'WA61',    'WA62',   'WA63',   'WA64',
                'WA65',   'WA66',    'WA67',   'WA68',   'WA69',
                'WA70',   'WA71',    'WA72',   'WA73',   'WA74',
                'WA75',   'WANT19',  'WANT38', 'WANT45', 'WASA302',
                'WASA327',                          ### 41 single run WA stations
                'SA246',  'SA299',   'SA324-2']     ### 3 stations with multiple runs


### define survey name and run number (for stations with a single run)

survey_name = "AusLAMP_Musgraves"
run_number = "001"

We will define some functions that will be used to create our MTH5 skeleton:

[ ]:
def make_mth5_dir(mth5_output_directory):
### creates mth5 output directory if it doesn't already exist
    try:
        os.makedirs(mth5_output_directory)
    except FileExistsError:
        # directory already exists
        print('directory already exists!')
        pass


def remove_existing_mth5_file(mth5_file):
### removes existing mth5 file if it exists
    if path.exists(mth5_file):
        os.unlink(mth5_file)
        print("INFO: Removed existing file {mth5_file}")
    else:
        print("File does not exist")


def channel_line_count(channels):
### counts lines in electromagnetic ASCII files
    EX = [file for file in channels if file.endswith('EX')]
    EY = [file for file in channels if file.endswith('EY')]
    BX = [file for file in channels if file.endswith('BX')]
    BY = [file for file in channels if file.endswith('BY')]
    BZ = [file for file in channels if file.endswith('BZ')]

    count_ex = sum(1 for line in open(EX[0]))
    count_ey = sum(1 for line in open(EY[0]))
    count_bx = sum(1 for line in open(BX[0]))
    count_by = sum(1 for line in open(BY[0]))
    count_bz = sum(1 for line in open(BZ[0]))

    return count_ex, count_ey, count_bx, count_by, count_bz


def create_mth5_group_station_run_channel(station):
### creates the mth5 gropus, stations, runs and channels for the mth5 skeleton
    add_station = m.add_station(station, survey=survey_name)
    channels = []
    for file in full_path_to_files:
        if station in file:
            channels.append(file)
        else:
            continue
### for stations with a single run:
    if len(channels) == len(raw_data_channels):
        add_run = m.add_run(station, run_number, survey=survey_name)

        count_ex,count_ey,count_bx,count_by,count_bz = channel_line_count(channels)

        ex_zeros = np.zeros(count_ex,)
        ey_zeros = np.zeros(count_ey,)
        bx_zeros = np.zeros(count_bx,)
        by_zeros = np.zeros(count_by,)
        bz_zeros = np.zeros(count_bz,)

        ex = m.add_channel(station, run_number, "ex", "electric", ex_zeros, survey=survey_name)
        ey = m.add_channel(station, run_number, "ey", "electric", ey_zeros, survey=survey_name)
        bx = m.add_channel(station, run_number, "bx", "magnetic", bx_zeros, survey=survey_name)
        by = m.add_channel(station, run_number, "by", "magnetic", by_zeros, survey=survey_name)
        bz = m.add_channel(station, run_number, "bz", "magnetic", bz_zeros, survey=survey_name)

        #######################################################################################
        # Note: At the time of writing this example, the resizing of datasets caused h5py     #
        # parallel to fail when running using the mpio driver. A workaround was to create     #
        # 'zeros' arrays of size count_<xx> (see above).                                      #                                       #
        #                                                                                     #
        # ex = m.add_channel(station, run_number, "ex", "electric", None, survey=survey_name) #
        # ey = m.add_channel(station, run_number, "ey", "electric", None, survey=survey_name) #
        # bx = m.add_channel(station, run_number, "bx", "magnetic", None, survey=survey_name) #
        # by = m.add_channel(station, run_number, "by", "magnetic", None, survey=survey_name) #
        # bz = m.add_channel(station, run_number, "bz", "magnetic", None, survey=survey_name) #
        #                                                                                     #
        # ex.hdf5_dataset.resize((count_ex,))                                                 #
        # ey.hdf5_dataset.resize((count_ey,))                                                 #
        # bx.hdf5_dataset.resize((count_bx,))                                                 #
        # by.hdf5_dataset.resize((count_by,))                                                 #
        # bz.hdf5_dataset.resize((count_bz,))                                                 #
        #######################################################################################

### for stations with multiple runs:
    elif len(channels) > len(raw_data_channels):
        sort_files = sorted(channels)
        number_of_channels = len(raw_data_channels)
        split_lists = [sort_files[x:x+number_of_channels] for x in range(0, len(sort_files), number_of_channels)]
        for i,group in enumerate(split_lists):
            mrun_number = i+1
            run = "00%i" % mrun_number
            add_run = m.add_run(station, run, survey=survey_name)
            count_ex,count_ey,count_bx,count_by,count_bz = channel_line_count(group)
            ex_zeros = np.zeros(count_ex,)
            ey_zeros = np.zeros(count_ey,)
            bx_zeros = np.zeros(count_bx,)
            by_zeros = np.zeros(count_by,)
            bz_zeros = np.zeros(count_bz,)

            ex = m.add_channel(station, run, "ex", "electric", ex_zeros, survey=survey_name)
            ey = m.add_channel(station, run, "ey", "electric", ey_zeros, survey=survey_name)
            bx = m.add_channel(station, run, "bx", "magnetic", bx_zeros, survey=survey_name)
            by = m.add_channel(station, run, "by", "magnetic", by_zeros, survey=survey_name)
            bz = m.add_channel(station, run, "bz", "magnetic", bz_zeros, survey=survey_name)

    elif len(channels) < len(raw_data_channels):
        print('you are likely missing some channels')
        print(station)

    else:
        print('something has gone wrong')

The final step is to create our mth5 skeleton. Note that the Parallel HDF5 version used in this example does not support compression, so compression was turned off when generating the MTH5 skeleton:

[ ]:
### create mth5 directory (if it doesn't already exist)
make_mth5_dir(mth5_test_dir)

### remove any existing mth5 file in our mth5 directory
remove_existing_mth5_file(h5_fn)

start = MTime()
start.now()

### ensure compression is turned off
m = MTH5(file_version='0.2.0',shuffle=None,fletcher32=None,compression=None,compression_opts=None)

### open mth5 file in write mode
m.open_mth5(h5_fn, "w")

### add survey group
survey_group = m.add_survey(survey_name)

### create station, run and channel groups for all stations in our survey
for station in sorted(stations_all):
    create_mth5_group_station_run_channel(station)
m.close_mth5()

### print total time to run our mth5 skeleton script
print('The script took {0} seconds !'.format(time.time()-startTime))

Putting this all together into a Python script (mth5_skeleton.py):

[ ]:
from mth5.mth5 import MTH5
import numpy as np
from os import path
import os
import glob
import time

from mt_metadata.utils.mttime import MTime


startTime = time.time()


### define working directories and file paths

work_dir = '/g/data/.../.../merged_data_all'
mth5_test_dir = '/g/data/.../.../mth5_outdir'
hdf5_filename = 'example_mth5_file.h5'
h5_fn = mth5_test_dir+'/'+hdf5_filename
full_path_to_files = sorted(glob.glob(work_dir+"/*"))


### define raw time series data channels

raw_data_channels = ['EX','EY','BX','BY','BZ','TP','ambientTemperature']


### define stations to go into mth5 file

stations_all = ['SA225-2','SA227',   'SA242',  'SA243',  'SA245',
                'SA247',  'SA248',   'SA249',  'SA250',  'SA251',
                'SA252',  'SA26W-2', 'SA270',  'SA271',  'SA272',
                'SA273',  'SA274-2', 'SA274',  'SA275',  'SA276',
                'SA277',  'SA293-2', 'SA294',  'SA295',  'SA296',
                'SA297',  'SA298',   'SA300',  'SA301',  'SA319',
                'SA320',  'SA320-2', 'SA321',  'SA322',  'SA323',
                'SA324',  'SA325-2', 'SA325',  'SA326N', 'SA326S',
                'SA344',  'SA344-2', 'SA345',  'SA346',  'SA347',
                'SA348',  'SA349',   'SA350',  'SA351',             ### 49 single run SA stations
                'WA10',   'WA13',    'WA14',   'WA15',   'WA26',
                'WA27',   'WA29',    'WA30',   'WA31',   'WA42',
                'WA43',   'WA44',    'WA45',   'WA46',   'WA47',
                'WA54',   'WA55',    'WA56',   'WA57',   'WA58',
                'WA60',   'WA61',    'WA62',   'WA63',   'WA64',
                'WA65',   'WA66',    'WA67',   'WA68',   'WA69',
                'WA70',   'WA71',    'WA72',   'WA73',   'WA74',
                'WA75',   'WANT19',  'WANT38', 'WANT45', 'WASA302',
                'WASA327',                          ### 41 single run WA stations
                'SA246',  'SA299',   'SA324-2']     ### 3 stations with multiple runs


### define survey name and run number (for stations with a single run)

survey_name = "AusLAMP_Musgraves"
run_number = "001"

### define functions

def make_mth5_dir(mth5_output_directory):
### creates mth5 output directory if it doesn't already exist
    try:
        os.makedirs(mth5_output_directory)
    except FileExistsError:
        # directory already exists
        print('directory already exists!')
        pass


def remove_existing_mth5_file(mth5_file):
### removes existing mth5 file if it exists
    if path.exists(mth5_file):
        os.unlink(mth5_file)
        print("INFO: Removed existing file {mth5_file}")
    else:
        print("File does not exist")


def channel_line_count(channels):
### counts lines in electromagnetic ASCII files
    EX = [file for file in channels if file.endswith('EX')]
    EY = [file for file in channels if file.endswith('EY')]
    BX = [file for file in channels if file.endswith('BX')]
    BY = [file for file in channels if file.endswith('BY')]
    BZ = [file for file in channels if file.endswith('BZ')]

    count_ex = sum(1 for line in open(EX[0]))
    count_ey = sum(1 for line in open(EY[0]))
    count_bx = sum(1 for line in open(BX[0]))
    count_by = sum(1 for line in open(BY[0]))
    count_bz = sum(1 for line in open(BZ[0]))

    return count_ex, count_ey, count_bx, count_by, count_bz


def create_mth5_group_station_run_channel(station):
### creates the mth5 gropus, stations, runs and channels for the mth5 skeleton
    add_station = m.add_station(station, survey=survey_name)
    channels = []
    for file in full_path_to_files:
        if station in file:
            channels.append(file)
        else:
            continue
### for stations with a single run:
    if len(channels) == len(raw_data_channels):
        add_run = m.add_run(station, run_number, survey=survey_name)

        count_ex,count_ey,count_bx,count_by,count_bz = channel_line_count(channels)

        ex_zeros = np.zeros(count_ex,)
        ey_zeros = np.zeros(count_ey,)
        bx_zeros = np.zeros(count_bx,)
        by_zeros = np.zeros(count_by,)
        bz_zeros = np.zeros(count_bz,)

        ex = m.add_channel(station, run_number, "ex", "electric", ex_zeros, survey=survey_name)
        ey = m.add_channel(station, run_number, "ey", "electric", ey_zeros, survey=survey_name)
        bx = m.add_channel(station, run_number, "bx", "magnetic", bx_zeros, survey=survey_name)
        by = m.add_channel(station, run_number, "by", "magnetic", by_zeros, survey=survey_name)
        bz = m.add_channel(station, run_number, "bz", "magnetic", bz_zeros, survey=survey_name)

        #######################################################################################
        # Note: At the time of writing this example, the resizing of datasets caused h5py     #
        # parallel to fail when running using the mpio driver. A workaround was to create     #
        # 'zeros' arrays of size count_<xx> (see above).                                      #                                       #
        #                                                                                     #
        # ex = m.add_channel(station, run_number, "ex", "electric", None, survey=survey_name) #
        # ey = m.add_channel(station, run_number, "ey", "electric", None, survey=survey_name) #
        # bx = m.add_channel(station, run_number, "bx", "magnetic", None, survey=survey_name) #
        # by = m.add_channel(station, run_number, "by", "magnetic", None, survey=survey_name) #
        # bz = m.add_channel(station, run_number, "bz", "magnetic", None, survey=survey_name) #
        #                                                                                     #
        # ex.hdf5_dataset.resize((count_ex,))                                                 #
        # ey.hdf5_dataset.resize((count_ey,))                                                 #
        # bx.hdf5_dataset.resize((count_bx,))                                                 #
        # by.hdf5_dataset.resize((count_by,))                                                 #
        # bz.hdf5_dataset.resize((count_bz,))                                                 #
        #######################################################################################

### for stations with multiple runs:
    elif len(channels) > len(raw_data_channels):
        sort_files = sorted(channels)
        number_of_channels = len(raw_data_channels)
        split_lists = [sort_files[x:x+number_of_channels] for x in range(0, len(sort_files), number_of_channels)]
        for i,group in enumerate(split_lists):
            mrun_number = i+1
            run = "00%i" % mrun_number
            add_run = m.add_run(station, run, survey=survey_name)
            count_ex,count_ey,count_bx,count_by,count_bz = channel_line_count(group)
            ex_zeros = np.zeros(count_ex,)
            ey_zeros = np.zeros(count_ey,)
            bx_zeros = np.zeros(count_bx,)
            by_zeros = np.zeros(count_by,)
            bz_zeros = np.zeros(count_bz,)

            ex = m.add_channel(station, run, "ex", "electric", ex_zeros, survey=survey_name)
            ey = m.add_channel(station, run, "ey", "electric", ey_zeros, survey=survey_name)
            bx = m.add_channel(station, run, "bx", "magnetic", bx_zeros, survey=survey_name)
            by = m.add_channel(station, run, "by", "magnetic", by_zeros, survey=survey_name)
            bz = m.add_channel(station, run, "bz", "magnetic", bz_zeros, survey=survey_name)

    elif len(channels) < len(raw_data_channels):
        print('you are likely missing some channels')
        print(station)

    else:
        print('something has gone wrong')


### create mth5 file skeleton

make_mth5_dir(mth5_test_dir)
remove_existing_mth5_file(h5_fn)
start = MTime()
start.now()

m = MTH5(file_version='0.2.0',shuffle=None,fletcher32=None,compression=None,compression_opts=None)

m.open_mth5(h5_fn, "w")
survey_group = m.add_survey(survey_name)
for station in sorted(stations_all):
    create_mth5_group_station_run_channel(station)
m.close_mth5()

### print total time to run script

print('The script took {0} seconds !'.format(time.time()-startTime))

For the mth5 skeleton, we have populated a single mth5 file with all station groups in our survey. No actual time series data has been added yet (this will happen in parallel in the next steps). It is important to note that this process is a collective process that modifies the structure of an HDF5 file and all processes must participate. It is for this reason that mth5_skeleton.py can not be run in parallel when generating a single mth5 file for all stations (see Collective versus independent operations in https://docs.h5py.org/en/stable/mpi.html).

The following is a summary of how long mth5_skeleton.py took to run for the AusLAMP Musgraves Province survey.

run on: Gadi
number of stations: 93
number of runs: 101
NCPUs used: 1
memory used: 30GB
walltime used: 1264 seconds (21m04s)
CPU time used: 1162 seconds (19m22s)
service units: 5.27

Populating our mth5 skeleton with time series data in parallel

Now that we have created our mth5 skeleton, it’s time to populate the time time series data from each station in our survey using Parallel HDF5.

First we will import the required Python libraries. The important libraries are mpi4py and h5py.

[ ]:
from mpi4py import MPI
import h5py
import os, psutil
import glob
import numpy as np
import shutil
import sys
from os import path
import time

startTime = time.time()

Next we will define the MPI communicator, rank and size:

[ ]:
comm = MPI.COMM_WORLD
rank = MPI.COMM_WORLD.rank
size = MPI.COMM_WORLD.size

Once again, we will define our: 1. working directories and file paths 2. Earth Data Logger channels 3. survey stations

[ ]:
### directory on Gadi file system that contains merged ASCII time series per station per run
work_dir = '/g/data/.../.../merged_data_all'

### directory to write final mth5 file
mth5_test_dir = '/g/data/.../.../mth5_outdir'

### name of mth5 file
hdf5_filename = 'example_mth5_file.h5'

### full path to mth5 file
h5_fn = mth5_test_dir+'/'+hdf5_filename

### full path to concatenated Earth Data Logger ASCII time series files
full_path_files = sorted(glob.glob(work_dir+"/*"))

### raw data channels
raw_data_channels = ['EX','EY','BX','BY','BZ','TP','ambientTemperature']

### MT stations to go into mth5 file
stations_all = ['SA225-2','SA227',   'SA242',  'SA243',  'SA245',
                'SA247',  'SA248',   'SA249',  'SA250',  'SA251',
                'SA252',  'SA26W-2', 'SA270',  'SA271',  'SA272',
                'SA273',  'SA274-2', 'SA274',  'SA275',  'SA276',
                'SA277',  'SA293-2', 'SA294',  'SA295',  'SA296',
                'SA297',  'SA298',   'SA300',  'SA301',  'SA319',
                'SA320',  'SA320-2', 'SA321',  'SA322',  'SA323',
                'SA324',  'SA325-2', 'SA325',  'SA326N', 'SA326S',
                'SA344',  'SA344-2', 'SA345',  'SA346',  'SA347',
                'SA348',  'SA349',   'SA350',  'SA351',             ### 49 single run SA stations
                'WA10',   'WA13',    'WA14',   'WA15',   'WA26',
                'WA27',   'WA29',    'WA30',   'WA31',   'WA42',
                'WA43',   'WA44',    'WA45',   'WA46',   'WA47',
                'WA54',   'WA55',    'WA56',   'WA57',   'WA58',
                'WA60',   'WA61',    'WA62',   'WA63',   'WA64',
                'WA65',   'WA66',    'WA67',   'WA68',   'WA69',
                'WA70',   'WA71',    'WA72',   'WA73',   'WA74',
                'WA75',   'WANT19',  'WANT38', 'WANT45', 'WASA302'
                'WASA327',                          ### 41 single run WA stations
                'SA246',  'SA299',   'SA324-2']     ### 3 stations with multiple runs


Let’s define the functions that will be used to populate our MTH5 skeleton with our Earth Data Logger time series data:

[ ]:
def channel_data_extraction(channels):
### extracts electromagnetic time series data from concatenated (per run) Earth Data Logger ASCII time series files
    EX = [file for file in channels if file.endswith('EX')]
    EY = [file for file in channels if file.endswith('EY')]
    BX = [file for file in channels if file.endswith('BX')]
    BY = [file for file in channels if file.endswith('BY')]
    BZ = [file for file in channels if file.endswith('BZ')]
    with open(EX[0], 'r') as file:
        EX1 = file.read().splitlines()
        ex = np.array(EX1).astype(np.int32)
    with open(EY[0], 'r') as file:
        EY1 = file.read().splitlines()
        ey = np.array(EY1).astype(np.int32)
    with open(BX[0], 'r') as file:
        BX1 = file.read().splitlines()
        bx = np.array(BX1).astype(np.int32)
    with open(BY[0], 'r') as file:
        BY1 = file.read().splitlines()
        by = np.array(BY1).astype(np.int32)
    with open(BZ[0], 'r') as file:
        BZ1 = file.read().splitlines()
        bz = np.array(BZ1).astype(np.int32)

    return ex, ey, bx, by, bz


def write_channels(list_of_stations,full_path_to_files):
### writes time series data to the mth5 skeleton file in an embarrisingly parallel way - that is,
### each rank is dedicated to writing data from a single station.
    for i,station in enumerate(sorted(list_of_stations)):
        if i%size!=rank:
            continue
        channels = []
        for file in full_path_to_files:
            if station in file:
                channels.append(file)
            else:
                continue

### for stations with a single run:
        if len(channels) == len(raw_data_channels):
            run = '001'
            station_group = 'Experiment/Surveys/AusLAMP_Musgraves/Stations/'
            site_run_path = station_group+station+'/'+run
            channel_ex_path = site_run_path+'/'+'ex'
            channel_ey_path = site_run_path+'/'+'ey'
            channel_bx_path = site_run_path+'/'+'bx'
            channel_by_path = site_run_path+'/'+'by'
            channel_bz_path = site_run_path+'/'+'bz'

            channel_ex = f[channel_ex_path]
            channel_ey = f[channel_ey_path]
            channel_bx = f[channel_bx_path]
            channel_by = f[channel_by_path]
            channel_bz = f[channel_bz_path]

            ex,ey,bx,by,bz = channel_data_extraction(channels)

            channel_ex[:] = ex
            channel_ey[:] = ey
            channel_bx[:] = bx
            channel_by[:] = by
            channel_bz[:] = bz

            process = psutil.Process(os.getpid())
            print('this run took %d MB of memory' % (process.memory_info().rss / 1024 ** 2))
            print("Station number %d (%s) being done by processor %d of %d" % (i, station, rank, size))

### for stations with multiple runs:
        elif len(channels) > len(raw_data_channels):
            sort_files = sorted(channels)
            number_of_channels = len(raw_data_channels)
            split_lists = [sort_files[x:x+number_of_channels] for x in range(0, len(sort_files), number_of_channels)]
            for i,group in enumerate(split_lists):
                mrun_number = i+1
                run = "00%i" % mrun_number
                station_group = 'Experiment/Surveys/AusLAMP_Musgraves/Stations/'
                site_run_path = station_group+station+'/'+run
                channel_ex_path = site_run_path+'/'+'ex'
                channel_ey_path = site_run_path+'/'+'ey'
                channel_bx_path = site_run_path+'/'+'bx'
                channel_by_path = site_run_path+'/'+'by'
                channel_bz_path = site_run_path+'/'+'bz'

                channel_ex = f[channel_ex_path]
                channel_ey = f[channel_ey_path]
                channel_bx = f[channel_bx_path]
                channel_by = f[channel_by_path]
                channel_bz = f[channel_bz_path]

                ex,ey,bx,by,bz = channel_data_extraction(group)

                channel_ex[:] = ex
                channel_ey[:] = ey
                channel_bx[:] = bx
                channel_by[:] = by
                channel_bz[:] = bz

                process = psutil.Process(os.getpid())
                print('this run took %d MB of memory' % (process.memory_info().rss / 1024 ** 2))
                print("Station number %d (%s) being done by processor %d of %d" % (i, station, rank, size))

        elif len(channels) < len(raw_data_channels):
            print('you are likely missing some channels')

        else:
            print('something has gone wrong')

Finally, we need to open our MTH5 skeleton in append mode utilising the mpio driver. We can then write our time series channels for all our stations in parallel and close the file:

[ ]:
### write to mth5 file in parallel using the mpio driver
with h5py.File(h5_fn,'a',driver='mpio',comm=MPI.COMM_WORLD) as f:
    write_channels(stations_all,full_path_files)
f.close()

### print total time for script to run
if rank==0:
    print('The script took {0} seconds !'.format(time.time()-startTime))

Putting this all together into a Python script (mth5_muscle.py):

[ ]:
from mpi4py import MPI
import h5py
import os, psutil
import glob
import numpy as np
import shutil
import sys
from os import path
import time

startTime = time.time()


### define MPI comm, rank and size

comm = MPI.COMM_WORLD
rank = MPI.COMM_WORLD.rank
size = MPI.COMM_WORLD.size


### define working directories, hdf5 file name and the full path to the ASCII MT time series files

work_dir = '/g/data/.../.../merged_data_all'
mth5_test_dir = '/g/data/.../.../mth5_outdir'
hdf5_filename = 'example_mth5_file.h5'
h5_fn = mth5_test_dir+'/'+hdf5_filename
full_path_files = sorted(glob.glob(work_dir+"/*"))


### define raw data channels

raw_data_channels = ['EX','EY','BX','BY','BZ','TP','ambientTemperature']


### define MT stations to go into mth5 file

stations_all = ['SA225-2','SA227',   'SA242',  'SA243',  'SA245',
                'SA247',  'SA248',   'SA249',  'SA250',  'SA251',
                'SA252',  'SA26W-2', 'SA270',  'SA271',  'SA272',
                'SA273',  'SA274-2', 'SA274',  'SA275',  'SA276',
                'SA277',  'SA293-2', 'SA294',  'SA295',  'SA296',
                'SA297',  'SA298',   'SA300',  'SA301',  'SA319',
                'SA320',  'SA320-2', 'SA321',  'SA322',  'SA323',
                'SA324',  'SA325-2', 'SA325',  'SA326N', 'SA326S',
                'SA344',  'SA344-2', 'SA345',  'SA346',  'SA347',
                'SA348',  'SA349',   'SA350',  'SA351',             ### 49 single run SA stations
                'WA10',   'WA13',    'WA14',   'WA15',   'WA26',
                'WA27',   'WA29',    'WA30',   'WA31',   'WA42',
                'WA43',   'WA44',    'WA45',   'WA46',   'WA47',
                'WA54',   'WA55',    'WA56',   'WA57',   'WA58',
                'WA60',   'WA61',    'WA62',   'WA63',   'WA64',
                'WA65',   'WA66',    'WA67',   'WA68',   'WA69',
                'WA70',   'WA71',    'WA72',   'WA73',   'WA74',
                'WA75',   'WANT19',  'WANT38', 'WANT45', 'WASA302'
                'WASA327',                          ### 41 single run WA stations
                'SA246',  'SA299',   'SA324-2']     ### 3 stations with multiple runs


### define functions

def channel_data_extraction(channels):
    EX = [file for file in channels if file.endswith('EX')]
    EY = [file for file in channels if file.endswith('EY')]
    BX = [file for file in channels if file.endswith('BX')]
    BY = [file for file in channels if file.endswith('BY')]
    BZ = [file for file in channels if file.endswith('BZ')]
    with open(EX[0], 'r') as file:
        EX1 = file.read().splitlines()
        ex = np.array(EX1).astype(np.int32)
    with open(EY[0], 'r') as file:
        EY1 = file.read().splitlines()
        ey = np.array(EY1).astype(np.int32)
    with open(BX[0], 'r') as file:
        BX1 = file.read().splitlines()
        bx = np.array(BX1).astype(np.int32)
    with open(BY[0], 'r') as file:
        BY1 = file.read().splitlines()
        by = np.array(BY1).astype(np.int32)
    with open(BZ[0], 'r') as file:
        BZ1 = file.read().splitlines()
        bz = np.array(BZ1).astype(np.int32)

    return ex, ey, bx, by, bz

def write_channels(list_of_stations,full_path_to_files):
    for i,station in enumerate(sorted(list_of_stations)):
        if i%size!=rank:
            continue
        channels = []
        for file in full_path_to_files:
            if station in file:
                channels.append(file)
            else:
                continue
        if len(channels) == len(raw_data_channels):
            run = '001'
            station_group = 'Experiment/Surveys/AusLAMP_Musgraves/Stations/'
            site_run_path = station_group+station+'/'+run
            channel_ex_path = site_run_path+'/'+'ex'
            channel_ey_path = site_run_path+'/'+'ey'
            channel_bx_path = site_run_path+'/'+'bx'
            channel_by_path = site_run_path+'/'+'by'
            channel_bz_path = site_run_path+'/'+'bz'

            channel_ex = f[channel_ex_path]
            channel_ey = f[channel_ey_path]
            channel_bx = f[channel_bx_path]
            channel_by = f[channel_by_path]
            channel_bz = f[channel_bz_path]

            ex,ey,bx,by,bz = channel_data_extraction(channels)

            channel_ex[:] = ex
            channel_ey[:] = ey
            channel_bx[:] = bx
            channel_by[:] = by
            channel_bz[:] = bz

            process = psutil.Process(os.getpid())
            print('this run took %d MB of memory' % (process.memory_info().rss / 1024 ** 2))
            print("Station number %d (%s) being done by processor %d of %d" % (i, station, rank, size))


        elif len(channels) > len(raw_data_channels):
            sort_files = sorted(channels)
            number_of_channels = len(raw_data_channels)
            split_lists = [sort_files[x:x+number_of_channels] for x in range(0, len(sort_files), number_of_channels)]
            for i,group in enumerate(split_lists):
                mrun_number = i+1
                run = "00%i" % mrun_number
                station_group = 'Experiment/Surveys/AusLAMP_Musgraves/Stations/'
                site_run_path = station_group+station+'/'+run
                channel_ex_path = site_run_path+'/'+'ex'
                channel_ey_path = site_run_path+'/'+'ey'
                channel_bx_path = site_run_path+'/'+'bx'
                channel_by_path = site_run_path+'/'+'by'
                channel_bz_path = site_run_path+'/'+'bz'

                channel_ex = f[channel_ex_path]
                channel_ey = f[channel_ey_path]
                channel_bx = f[channel_bx_path]
                channel_by = f[channel_by_path]
                channel_bz = f[channel_bz_path]

                ex,ey,bx,by,bz = channel_data_extraction(group)

                channel_ex[:] = ex
                channel_ey[:] = ey
                channel_bx[:] = bx
                channel_by[:] = by
                channel_bz[:] = bz

                process = psutil.Process(os.getpid())
                print('this run took %d MB of memory' % (process.memory_info().rss / 1024 ** 2))
                print("Station number %d (%s) being done by processor %d of %d" % (i, station, rank, size))

        elif len(channels) < len(raw_data_channels):
            print('you are likely missing some channels')

        else:
            print('something has gone wrong')


### write to mth5 file in parallel using the mpio driver

with h5py.File(h5_fn,'a',driver='mpio',comm=MPI.COMM_WORLD) as f:
    write_channels(stations_all,full_path_files)
f.close()

### print total time for script to run

if rank==0:
    print('The script took {0} seconds !'.format(time.time()-startTime))

Now that we have created our mth5_muscle.py script, we next need to create a job submission script to submit to the Gadi PBSPro scheduler. The job submission script specifys the queue to use and the duration/resources needed for the job.

#!/bin/bash

#PBS -N mth5_mpi4py
#PBS -q hugemem
#PBS -P fp0
#PBS -l walltime=0:30:00
#PBS -l ncpus=96
#PBS -l mem=1000GB
#PBS -l jobfs=10GB
#PBS -l storage=gdata/fp0+gdata/my80+gdata/lm70+gdata/up99

module use /g/data/up99/modulefiles
module load NCI-geophys/22.06

cd ${PBS_O_WORKDIR}

mpirun -np $PBS_NCPUS python3 mth5_muscle.py > ./pbs_job_logs/$PBS_JOBID.log

For our jobscript above (mth5_muscle.sh), we have requested: 1. to use the hugemem queue 2. 96 CPUs (2 nodes) 3. 1 TB of memory 4. half an hour of walltime

We also need to define the NCI project codes used in mth5_muscle.py. The project code up99 is required to make use of the NCI-geophys/22.06 module that contains Parallel HDF5.

To submit this jobscript (mth5_muscle.sh) on Gadi:

$ qsub mth5_muscle.sh

This job will process the 93 stations in our survey using 96 MPI ranks (one station per rank).

The following is a summary of how long mth5_muscle.py took to run for the AusLAMP Musgraves Province survey.

run on: Gadi
number of stations: 93
number of runs: 101
NCPUs used: 96 (2 nodes)
memory used: 736 GB
walltime used: 836 seconds (13m56s)
CPU time used: 48570 seconds (13h29m30s)
service units: 66.88

Concluding remarks

Generating one mth5 file for many stations can take a significant amount of time if no parallelism is introduced. For the Musgraves example above, if using the mth5 library alone it would have taken approximately 14 hours to generate our final mth5 file. By utilising Parallel HDF5 we have managed to reduce this time to approximately 35 minutes.

For the “all stations in a single mth5 file” model, the bottleneck lies in generating the mth5 skeleton as this can’t be done in parallel. The authors have created the tutorial mth5_in_parallel_one_file_per_station that shows how to generate a single mth5 file per station in parallel, which yields much quicker results than the “all stations in a single mth5 file” model.

This example only dealt with the writing of MT time series data and did not consider mt_metadata. The mth5_in_parallel_one_file_per_station tutorial demonstrates how one could add mt_metadata into their mth5 automations.