Creating one mth5 file per station in parallel

In this tutorial, we will demonstrate how to generate one mth5 file per station (with associated mt_metadata) from a survey in parallel on HPC.

For this example, we will be converting Earth Data Logger (ASCII) time series data from 93 stations of the AusLAMP Musgraves Province survey (see https://dx.doi.org/10.25914/5eaa30d63bd17). The ASCII time series were concatenated per run for each station and the total volume of time series data was 106 GB. 90 of the 93 stations were a single run - stations SA246, SA299 and SA324-2 had multiple runs.

This example was tested on the National Computational Infrastructure’s Gadi HPC system. Gadi is Australia’s most powerful supercomputer, a highly parallel cluster comprising more than 200,000 processor cores on ten different types of compute nodes. The example also makes use of the NCI-geophysics 2022.06 module which contains the Python libraries used in this tutorial.

Building our mth5_onefileperstation.py code

In the mth5_in_parallel tutorial, we built a single mth5 file containing all stations in our survey. In that example, we had to construct two codes: 1. mth5_skeleton.py which created the mth5 file structure without adding in any time series data. This code could not be run in parallel as the processes involved were collective (see Collective versus independent operations in https://docs.h5py.org/en/stable/mpi.html). 2. mth5_muscle.py which added in the timeseries data for each station in parallel (i.e. one station per node).

For this example, we will be creating a single file per station with associated mt_metadata. As a result, we only need to create one code as the generation of each file is independent (i.e., the different mpi ranks do not need to communicate with each other). The authors use mpi4py for this tutorial, but note that other libraries such as Dask or multiprocessing could also be used.

Our mth5_onefileperstation.py code requires the following Python libraries:

[ ]:
from mpi4py import MPI
import h5py
from mth5.mth5 import MTH5
import numpy as np
from os import path
import os, psutil
import glob
import nc_time_axis
import time

from mt_metadata import timeseries as metadata
from mt_metadata.utils.mttime import MTime
import json

startTime = time.time()

As we are using mpi4py, we will need to define the MPI communicator, rank and size:

[ ]:
comm = MPI.COMM_WORLD
rank = MPI.COMM_WORLD.rank
size = MPI.COMM_WORLD.size

Next, we will define our: 1. working directories and file paths 2. Earth Data Logger channels 3. survey stations 4. survey name and run number (for stations with a single run)

[ ]:
### define working directories and file paths

work_dir = '/g/data/.../.../merged_data_all'
mth5_output_directory = '/g/data/.../.../mth5_outdir_single_file_per_station'
full_path_to_mth5_files = sorted(glob.glob(mth5_output_directory+"/*"))
full_path_to_ascii_files = sorted(glob.glob(work_dir+"/*"))
mt_metadata_dir = '/g/data/.../.../mt_metadata_json'
full_path_to_mt_metadata = sorted(glob.glob(mt_metadata_dir+"/*"))
survey_file_name = 'survey.json'
survey_json = mt_metadata_dir+'/'+survey_file_name

### define raw time series data channels

raw_data_channels = ['EX','EY','BX','BY','BZ','TP','ambientTemperature']


### define the stations in our survey

stations_all = ['SA225-2','SA227',   'SA242',  'SA243',  'SA245',
                'SA247',  'SA248',   'SA249',  'SA250',  'SA251',
                'SA252',  'SA26W-2', 'SA270',  'SA271',  'SA272',
                'SA273',  'SA274-2', 'SA274',  'SA275',  'SA276',
                'SA277',  'SA293-2', 'SA294',  'SA295',  'SA296',
                'SA297',  'SA298',   'SA300',  'SA301',  'SA319',
                'SA320',  'SA320-2', 'SA321',  'SA322',  'SA323',
                'SA324',  'SA325-2', 'SA325',  'SA326N', 'SA326S',
                'SA344',  'SA344-2', 'SA345',  'SA346',  'SA347',
                'SA348',  'SA349',   'SA350',  'SA351',             ### 49 single run SA stations
                'WA10',   'WA13',    'WA14',   'WA15',   'WA26',
                'WA27',   'WA29',    'WA30',   'WA31',   'WA42',
                'WA43',   'WA44',    'WA45',   'WA46',   'WA47',
                'WA54',   'WA55',    'WA56',   'WA57',   'WA58',
                'WA60',   'WA61',    'WA62',   'WA63',   'WA64',
                'WA65',   'WA66',    'WA67',   'WA68',   'WA69',
                'WA70',   'WA71',    'WA72',   'WA73',   'WA74',
                'WA75',   'WANT19',  'WANT38', 'WANT45', 'WASA302',
                'WASA327',                          ### 41 single run WA stations
                'SA246',  'SA299',   'SA324-2']     ### 3 stations with multiple runs


### define the survey name and run number (for stations with a single run)

survey_name = "AusLAMP_Musgraves"
run_number = "001"

We will also be adding mt_metadata into our mth5 files. For this example, a single json file was created per run and the contents of each json file looked something like this:

{
     "electric_ex": {
         "channel_number": 0,
         "component": null,
         "data_quality.rating.value": 0,
         "dipole_length": null,
         "filter.applied": [
             false
         ],
         "filter.name": [],
         "measurement_azimuth": 0.0,
         "measurement_tilt": 0.0,
         "negative.elevation": 0.0,
         "negative.id": null,
         "negative.latitude": 0.0,
         "negative.longitude": 0.0,
         "negative.manufacturer": null,
         "negative.type": null,
         "positive.elevation": 0.0,
         "positive.id": null,
         "positive.latitude": 0.0,
         "positive.longitude": 0.0,
         "positive.manufacturer": null,
         "positive.type": null,
         "sample_rate": 10.0,
         "time_period.end": "1980-01-01T00:00:00+00:00",
         "time_period.start": "1980-01-01T00:00:00+00:00",
         "type": "electric",
         "units": null
     },

     "electric_ey": {
         "channel_number": 0,
         "component": null,
         "data_quality.rating.value": 0,
         "dipole_length": null,
         "filter.applied": [
             false
         ],
         "filter.name": [],
         "measurement_azimuth": 0.0,
         "measurement_tilt": 0.0,
         "negative.elevation": 0.0,
         "negative.id": null,
         "negative.latitude": 0.0,
         "negative.longitude": 0.0,
         "negative.manufacturer": null,
         "negative.type": null,
         "positive.elevation": 0.0,
         "positive.id": null,
         "positive.latitude": 0.0,
         "positive.longitude": 0.0,
         "positive.manufacturer": null,
         "positive.type": null,
         "sample_rate": 10.0,
         "time_period.end": "1980-01-01T00:00:00+00:00",
         "time_period.start": "1980-01-01T00:00:00+00:00",
         "type": "electric",
         "units": null
     },


     "magnetic_bx": {
         "channel_number": 0,
         "component": null,
         "data_quality.rating.value": 0,
         "filter.applied": [
             false
         ],
         "filter.name": [],
         "location.elevation": 0.0,
         "location.latitude": 0.0,
         "location.longitude": 0.0,
         "measurement_azimuth": 0.0,
         "measurement_tilt": 0.0,
         "sample_rate": 10.0,
         "sensor.id": null,
         "sensor.manufacturer": null,
         "sensor.type": null,
         "time_period.end": "1980-01-01T00:00:00+00:00",
         "time_period.start": "1980-01-01T00:00:00+00:00",
         "type": "magnetic",
         "units": null
     },


     "magnetic_by": {
         "channel_number": 0,
         "component": null,
         "data_quality.rating.value": 0,
         "filter.applied": [
             false
         ],
         "filter.name": [],
         "location.elevation": 0.0,
         "location.latitude": 0.0,
         "location.longitude": 0.0,
         "measurement_azimuth": 0.0,
         "measurement_tilt": 0.0,
         "sample_rate": 10.0,
         "sensor.id": null,
         "sensor.manufacturer": null,
         "sensor.type": null,
         "time_period.end": "1980-01-01T00:00:00+00:00",
         "time_period.start": "1980-01-01T00:00:00+00:00",
         "type": "magnetic",
         "units": null
     },


     "magnetic_bz": {
         "channel_number": 0,
         "component": null,
         "data_quality.rating.value": 0,
         "filter.applied": [
             false
         ],
         "filter.name": [],
         "location.elevation": 0.0,
         "location.latitude": 0.0,
         "location.longitude": 0.0,
         "measurement_azimuth": 0.0,
         "measurement_tilt": 0.0,
         "sample_rate": 10.0,
         "sensor.id": null,
         "sensor.manufacturer": null,
         "sensor.type": null,
         "time_period.end": "1980-01-01T00:00:00+00:00",
         "time_period.start": "1980-01-01T00:00:00+00:00",
         "type": "magnetic",
         "units": null
     },


     "run": {
         "channels_recorded_auxiliary": [],
         "channels_recorded_electric": [],
         "channels_recorded_magnetic": [],
         "data_logger.firmware.author": null,
         "data_logger.firmware.name": null,
         "data_logger.firmware.version": null,
         "data_logger.id": null,
         "data_logger.manufacturer": null,
         "data_logger.timing_system.drift": 0.0,
         "data_logger.timing_system.type": "GPS",
         "data_logger.timing_system.uncertainty": 0.0,
         "data_logger.type": null,
         "data_type": "LPMT",
         "id": null,
         "sample_rate": 10.0,
         "time_period.end": "1980-01-01T00:00:00+00:00",
         "time_period.start": "1980-01-01T00:00:00+00:00"
     },

     "station": {
         "acquired_by.name": null,
         "channels_recorded": [],
         "data_type": "LPMT",
         "geographic_name": null,
         "id": null,
         "location.declination.model": "WMM",
         "location.declination.value": 0.0,
         "location.elevation": 0.0,
         "location.latitude": 0.0,
         "location.longitude": 0.0,
         "orientation.method": null,
         "orientation.reference_frame": "geographic",
         "provenance.creation_time": "1980-01-01T00:00:00+00:00",
         "provenance.software.author": "none",
         "provenance.software.name": null,
         "provenance.software.version": null,
         "provenance.submitter.email": null,
         "provenance.submitter.organization": null,
         "run_list": [],
         "time_period.end": "1980-01-01T00:00:00+00:00",
         "time_period.start": "1980-01-01T00:00:00+00:00"
     },

     "survey": {
         "citation_dataset.doi": null,
         "citation_journal.doi": null,
         "country": null,
         "datum": "WGS84",
         "geographic_name": null,
         "id": null,
         "name": null,
         "northwest_corner.latitude": 0.0,
         "northwest_corner.longitude": 0.0,
         "project": null,
         "project_lead.email": null,
         "project_lead.organization": null,
         "release_license": "CC-0",
         "southeast_corner.latitude": 0.0,
         "southeast_corner.longitude": 0.0,
         "summary": null,
         "time_period.end_date": "1980-01-01",
         "time_period.start_date": "1980-01-01"
     },
 }

Note that the mt_metadata in these json files were synthetic and were mainly used to show how mt_metadata could be added into our automation.

We will now define functions that will be used to create our MTH5 files:

[ ]:
def make_mth5_dir(mth5_output_directory):
### creates the mth5 output directory if it doesn't already exist
    try:
        os.makedirs(mth5_output_directory)
    except FileExistsError:
        # directory already exists
        print('directory already exists!')
        pass


def remove_existing_mth5_files(mth5_files):
### removes existing mth5 files if they exists
    for mth5_file in mth5_files:
        if path.exists(mth5_file):
            os.unlink(mth5_file)
            print("INFO: Removed existing file {}".format(mth5_file))
        else:
            print("File does not exist")


def channel_data_extraction(channels):
### extracts electromagnetic time series data from concatenated (per run) Earth Data Logger
### ASCII time series files
    EX = [file for file in channels if file.endswith('EX')]
    EY = [file for file in channels if file.endswith('EY')]
    BX = [file for file in channels if file.endswith('BX')]
    BY = [file for file in channels if file.endswith('BY')]
    BZ = [file for file in channels if file.endswith('BZ')]
    with open(EX[0], 'r') as file:
        EX1 = file.read().splitlines()
        ex_ts = np.array(EX1).astype(np.int32)
    with open(EY[0], 'r') as file:
        EY1 = file.read().splitlines()
        ey_ts = np.array(EY1).astype(np.int32)
    with open(BX[0], 'r') as file:
        BX1 = file.read().splitlines()
        bx_ts = np.array(BX1).astype(np.int32)
    with open(BY[0], 'r') as file:
        BY1 = file.read().splitlines()
        by_ts = np.array(BY1).astype(np.int32)
    with open(BZ[0], 'r') as file:
        BZ1 = file.read().splitlines()
        bz_ts = np.array(BZ1).astype(np.int32)

    return ex_ts, ey_ts, bx_ts, by_ts, bz_ts


def create_mth5_group_station_run_channel(station,mt_metadata):
### creates the mth5 gropus, stations, runs and channels for each mth5 file
### and populates mt_metadata from the relevant json file
    with open(mt_metadata[0], 'r') as json_file:
        json_load = json.load(json_file)

    station_dict = json_load['station']
    add_station = m.add_station(station, survey=survey_name)
    add_station.metadata.from_dict(station_dict)
    add_station.write_metadata()

    channels = []
    for file in full_path_to_ascii_files:
        if station in file:
            channels.append(file)
        else:
            continue

### for stations with a single run:
    if len(channels) == len(raw_data_channels):
        run_dict = json_load['run']
        ex_dict = json_load['electric_ex']
        ey_dict = json_load['electric_ey']
        bx_dict = json_load['magnetic_bx']
        by_dict = json_load['magnetic_by']
        bz_dict = json_load['magnetic_bz']

        add_run = m.add_run(station, run_number, survey=survey_name)
        add_run.metadata.from_dict(run_dict)
        add_run.write_metadata()

        ex_ts,ey_ts,bx_ts,by_ts,bz_ts = channel_data_extraction(channels)

        ex = m.add_channel(station, run_number, "ex", "electric", ex_ts, survey=survey_name)
        ey = m.add_channel(station, run_number, "ey", "electric", ey_ts, survey=survey_name)
        bx = m.add_channel(station, run_number, "bx", "magnetic", bx_ts, survey=survey_name)
        by = m.add_channel(station, run_number, "by", "magnetic", by_ts, survey=survey_name)
        bz = m.add_channel(station, run_number, "bz", "magnetic", bz_ts, survey=survey_name)

        ex.metadata.from_dict(ex_dict)
        ex.write_metadata()
        ey.metadata.from_dict(ey_dict)
        ey.write_metadata()
        bx.metadata.from_dict(bx_dict)
        bx.write_metadata()
        by.metadata.from_dict(by_dict)
        by.write_metadata()
        bz.metadata.from_dict(bz_dict)
        bz.write_metadata()

### for stations with multiple runs:

        elif len(channels) > len(raw_data_channels):
        sort_files = sorted(channels)
        number_of_channels = len(raw_data_channels)
        split_lists = [sort_files[x:x+number_of_channels] for x in range(0, len(sort_files), number_of_channels)]
        mt_metadata_files = sorted(mt_metadata)
        for i, (group,mt_meta) in enumerate(zip(split_lists,mt_metadata_files)):
            mrun_number = i+1
            run = "00%i" % mrun_number
            with open(mt_meta, 'r') as json_file:
                json_load = json.load(json_file)

            run_dict = json_load['run']
            ex_dict = json_load['electric_ex']
            ey_dict = json_load['electric_ey']
            bx_dict = json_load['magnetic_bx']
            by_dict = json_load['magnetic_by']
            bz_dict = json_load['magnetic_bz']

            add_run = m.add_run(station, run, survey=survey_name)
            add_run.metadata.from_dict(run_dict)
            add_run.write_metadata()

            ex_ts,ey_ts,bx_ts,by_ts,bz_ts = channel_data_extraction(channels)

            ex = m.add_channel(station, run, "ex", "electric", ex_ts, survey=survey_name)
            ey = m.add_channel(station, run, "ey", "electric", ey_ts, survey=survey_name)
            bx = m.add_channel(station, run, "bx", "magnetic", bx_ts, survey=survey_name)
            by = m.add_channel(station, run, "by", "magnetic", by_ts, survey=survey_name)
            bz = m.add_channel(station, run, "bz", "magnetic", bz_ts, survey=survey_name)

            ex.metadata.from_dict(ex_dict)
            ex.write_metadata()
            ey.metadata.from_dict(ey_dict)
            ey.write_metadata()
            bx.metadata.from_dict(bx_dict)
            bx.write_metadata()
            by.metadata.from_dict(by_dict)
            by.write_metadata()
            bz.metadata.from_dict(bz_dict)
            bz.write_metadata()

    elif len(channels) < len(raw_data_channels):
        print('you are likely missing some channels')
        print(station)

    else:
        print('something has gone wrong')

The final step is to create our mth5 files per station. For this we will be creating a single mth5 file per MPI rank for each station in our survey. As each process on each rank is independent, we can run all processes in parallel. Additionally, we can add compression to each file as we are not using the Parallel HDF5 library (which doesn’t support compression) that was used in the mth5_in_parallel tutorial. For this example, we will use “gzip” level 4 compression.

[ ]:
### create mth5 output directory and remove existing files

if rank==0:
    make_mth5_dir(mth5_output_directory)
    remove_existing_mth5_files(full_path_to_mth5_files)

comm.Barrier()

### create a single mth5 file per station with compression and associated mt_metadata

for i,station in enumerate(sorted(stations_all)):
    if i%size!=rank:
        continue
    m = MTH5(file_version='0.2.0',shuffle=None,fletcher32=None,compression="gzip",compression_opts=4)
    hdf5_filename = '{}.h5'.format(station)
    h5_fn = mth5_output_directory+'/'+hdf5_filename
    m.open_mth5(h5_fn, "w")
    survey_group = m.add_survey(survey_name)
    with open(survey_json, 'r') as json_file:
        json_load = json.load(json_file)
    survey_dict = json_load['survey']
    survey_group.metadata.from_dict(survey_dict)
    survey_group.write_metadata()
    mt_metadata_file_name = '{}.json'.format(station)
    mt_metadata = [file for file in full_path_to_mt_metadata if file.endswith(mt_metadata_file_name)]
    create_mth5_group_station_run_channel(station,mt_metadata)
    m.close_mth5()


comm.Barrier()

### print total time to run script
if rank==0:
    print('The script took {0} seconds !'.format(time.time()-startTime))

Putting this altogether into a Python script (mth5_onefileperstation.py):

[ ]:
from mpi4py import MPI
import h5py
from mth5.mth5 import MTH5
import numpy as np
from os import path
import os, psutil
import glob
import nc_time_axis
import time

from mt_metadata import timeseries as metadata
from mt_metadata.utils.mttime import MTime
import json

startTime = time.time()

### define MPI comm, rank and size

comm = MPI.COMM_WORLD
rank = MPI.COMM_WORLD.rank
size = MPI.COMM_WORLD.size



### define working directories and file paths

work_dir = '/g/data/.../.../merged_data_all'
mth5_output_directory = '/g/data/.../.../mth5_outdir_single_file_per_station'
full_path_to_mth5_files = sorted(glob.glob(mth5_output_directory+"/*"))
full_path_to_ascii_files = sorted(glob.glob(work_dir+"/*"))
mt_metadata_dir = '/g/data/.../.../mt_metadata_json'
full_path_to_mt_metadata = sorted(glob.glob(mt_metadata_dir+"/*"))
survey_file_name = 'survey.json'
survey_json = mt_metadata_dir+'/'+survey_file_name

### define raw time series data channels

raw_data_channels = ['EX','EY','BX','BY','BZ','TP','ambientTemperature']


### define stations to go into mth5 file

stations_all = ['SA225-2','SA227',   'SA242',  'SA243',  'SA245',
                'SA247',  'SA248',   'SA249',  'SA250',  'SA251',
                'SA252',  'SA26W-2', 'SA270',  'SA271',  'SA272',
                'SA273',  'SA274-2', 'SA274',  'SA275',  'SA276',
                'SA277',  'SA293-2', 'SA294',  'SA295',  'SA296',
                'SA297',  'SA298',   'SA300',  'SA301',  'SA319',
                'SA320',  'SA320-2', 'SA321',  'SA322',  'SA323',
                'SA324',  'SA325-2', 'SA325',  'SA326N', 'SA326S',
                'SA344',  'SA344-2', 'SA345',  'SA346',  'SA347',
                'SA348',  'SA349',   'SA350',  'SA351',             ### 49 single run SA stations
                'WA10',   'WA13',    'WA14',   'WA15',   'WA26',
                'WA27',   'WA29',    'WA30',   'WA31',   'WA42',
                'WA43',   'WA44',    'WA45',   'WA46',   'WA47',
                'WA54',   'WA55',    'WA56',   'WA57',   'WA58',
                'WA60',   'WA61',    'WA62',   'WA63',   'WA64',
                'WA65',   'WA66',    'WA67',   'WA68',   'WA69',
                'WA70',   'WA71',    'WA72',   'WA73',   'WA74',
                'WA75',   'WANT19',  'WANT38', 'WANT45', 'WASA302',
                'WASA327',                          ### 41 single run WA stations
                'SA246',  'SA299',   'SA324-2']     ### 3 stations with multiple runs


### define survey name and run number (for stations with a single run)

survey_name = "AusLAMP_Musgraves"
run_number = "001"


### define functions

def make_mth5_dir(mth5_output_directory):
### creates mth5 output directory if it doesn't already exist
    try:
        os.makedirs(mth5_output_directory)
    except FileExistsError:
        # directory already exists
        print('directory already exists!')
        pass


def remove_existing_mth5_files(mth5_files):
### removes existing mth5 files if they exists
    for mth5_file in mth5_files:
        if path.exists(mth5_file):
            os.unlink(mth5_file)
            print("INFO: Removed existing file {}".format(mth5_file))
        else:
            print("File does not exist")


def channel_data_extraction(channels):
### extracts electromagnetic time series data from concatenated (per run) Earth Data Logger ASCII time series files
    EX = [file for file in channels if file.endswith('EX')]
    EY = [file for file in channels if file.endswith('EY')]
    BX = [file for file in channels if file.endswith('BX')]
    BY = [file for file in channels if file.endswith('BY')]
    BZ = [file for file in channels if file.endswith('BZ')]
    with open(EX[0], 'r') as file:
        EX1 = file.read().splitlines()
        ex_ts = np.array(EX1).astype(np.int32)
    with open(EY[0], 'r') as file:
        EY1 = file.read().splitlines()
        ey_ts = np.array(EY1).astype(np.int32)
    with open(BX[0], 'r') as file:
        BX1 = file.read().splitlines()
        bx_ts = np.array(BX1).astype(np.int32)
    with open(BY[0], 'r') as file:
        BY1 = file.read().splitlines()
        by_ts = np.array(BY1).astype(np.int32)
    with open(BZ[0], 'r') as file:
        BZ1 = file.read().splitlines()
        bz_ts = np.array(BZ1).astype(np.int32)

    return ex_ts, ey_ts, bx_ts, by_ts, bz_ts


def create_mth5_group_station_run_channel(station,mt_metadata):
### creates the mth5 gropus, stations, runs and channels for each mth5 file
### and populates mt_metadata from the relevant json file
    with open(mt_metadata[0], 'r') as json_file:
        json_load = json.load(json_file)

    station_dict = json_load['station']
    add_station = m.add_station(station, survey=survey_name)
    add_station.metadata.from_dict(station_dict)
    add_station.write_metadata()

    channels = []
    for file in full_path_to_ascii_files:
        if station in file:
            channels.append(file)
        else:
            continue
### for stations with a single run:
    if len(channels) == len(raw_data_channels):
        run_dict = json_load['run']
        ex_dict = json_load['electric_ex']
        ey_dict = json_load['electric_ey']
        bx_dict = json_load['magnetic_bx']
        by_dict = json_load['magnetic_by']
        bz_dict = json_load['magnetic_bz']

        add_run = m.add_run(station, run_number, survey=survey_name)
        add_run.metadata.from_dict(run_dict)
        add_run.write_metadata()

        ex_ts,ey_ts,bx_ts,by_ts,bz_ts = channel_data_extraction(channels)

        ex = m.add_channel(station, run_number, "ex", "electric", ex_ts, survey=survey_name)
        ey = m.add_channel(station, run_number, "ey", "electric", ey_ts, survey=survey_name)
        bx = m.add_channel(station, run_number, "bx", "magnetic", bx_ts, survey=survey_name)
        by = m.add_channel(station, run_number, "by", "magnetic", by_ts, survey=survey_name)
        bz = m.add_channel(station, run_number, "bz", "magnetic", bz_ts, survey=survey_name)

        ex.metadata.from_dict(ex_dict)
        ex.write_metadata()
        ey.metadata.from_dict(ey_dict)
        ey.write_metadata()
        bx.metadata.from_dict(bx_dict)
        bx.write_metadata()
        by.metadata.from_dict(by_dict)
        by.write_metadata()
        bz.metadata.from_dict(bz_dict)
        bz.write_metadata()

 ### for stations with multiple runs:
    elif len(channels) > len(raw_data_channels):
        sort_files = sorted(channels)
        number_of_channels = len(raw_data_channels)
        split_lists = [sort_files[x:x+number_of_channels] for x in range(0, len(sort_files), number_of_channels)]
        mt_metadata_files = sorted(mt_metadata)
        for i, (group,mt_meta) in enumerate(zip(split_lists,mt_metadata_files)):
            mrun_number = i+1
            run = "00%i" % mrun_number
            with open(mt_meta, 'r') as json_file:
                json_load = json.load(json_file)

            run_dict = json_load['run']
            ex_dict = json_load['electric_ex']
            ey_dict = json_load['electric_ey']
            bx_dict = json_load['magnetic_bx']
            by_dict = json_load['magnetic_by']
            bz_dict = json_load['magnetic_bz']

            add_run = m.add_run(station, run, survey=survey_name)
            add_run.metadata.from_dict(run_dict)
            add_run.write_metadata()

            ex_ts,ey_ts,bx_ts,by_ts,bz_ts = channel_data_extraction(channels)

            ex = m.add_channel(station, run, "ex", "electric", ex_ts, survey=survey_name)
            ey = m.add_channel(station, run, "ey", "electric", ey_ts, survey=survey_name)
            bx = m.add_channel(station, run, "bx", "magnetic", bx_ts, survey=survey_name)
            by = m.add_channel(station, run, "by", "magnetic", by_ts, survey=survey_name)
            bz = m.add_channel(station, run, "bz", "magnetic", bz_ts, survey=survey_name)

            ex.metadata.from_dict(ex_dict)
            ex.write_metadata()
            ey.metadata.from_dict(ey_dict)
            ey.write_metadata()
            bx.metadata.from_dict(bx_dict)
            bx.write_metadata()
            by.metadata.from_dict(by_dict)
            by.write_metadata()
            bz.metadata.from_dict(bz_dict)
            bz.write_metadata()

    elif len(channels) < len(raw_data_channels):
        print('you are likely missing some channels')
        print(station)

    else:
        print('something has gone wrong')


### create mth5 output directory and remove existing files

if rank==0:
    make_mth5_dir(mth5_output_directory)
    remove_existing_mth5_files(full_path_to_mth5_files)

comm.Barrier()

### create a single mth5 file per station with compression and associated mt_metadata

for i,station in enumerate(sorted(stations_all)):
    if i%size!=rank:
        continue
    m = MTH5(file_version='0.2.0',shuffle=None,fletcher32=None,compression="gzip",compression_opts=4)
    hdf5_filename = '{}.h5'.format(station)
    h5_fn = mth5_output_directory+'/'+hdf5_filename
    m.open_mth5(h5_fn, "w")
    survey_group = m.add_survey(survey_name)
    with open(survey_json, 'r') as json_file:
        json_load = json.load(json_file)
    survey_dict = json_load['survey']
    survey_group.metadata.from_dict(survey_dict)
    survey_group.write_metadata()
    mt_metadata_file_name = '{}.json'.format(station)
    mt_metadata = [file for file in full_path_to_mt_metadata if file.endswith(mt_metadata_file_name)]
    create_mth5_group_station_run_channel(station,mt_metadata)
    m.close_mth5()


comm.Barrier()

### print total time to run script
if rank==0:
    print('The script took {0} seconds !'.format(time.time()-startTime))

Now that we have created our mth5_onefileperstation.py script, we next need to make a job submission script to submit to the Gadi PBSPro scheduler. The job submission script specifys the queue to use and the duration/resources needed for the job.

#!/bin/bash

#PBS -N mth5_onefileperstation
#PBS -q hugemem
#PBS -P fp0
#PBS -l walltime=0:05:00
#PBS -l ncpus=96
#PBS -l mem=900GB
#PBS -l jobfs=10GB
#PBS -l storage=gdata/fp0+gdata/my80+gdata/lm70+gdata/up99

module use /g/data/up99/modulefiles
module load NCI-geophys/22.06

cd ${PBS_O_WORKDIR}

mpirun -np $PBS_NCPUS python3 mth5_onefileperstation.py > ./pbs_job_logs/$PBS_JOBID.log

For our jobscript above (mth5_onefileperstation.sh), we have requested: 1. to use the hugemem queue 2. 96 CPUs (2 nodes) 3. 900GB of memory 4. 5 minutes of walltime

We also need to define the NCI project codes used in mth5_onefileperstation.py. The project code up99 is required to make use of the NCI-geophys/22.06 module that contains the Python libraries used in this tutorial.

To submit this jobscript (mth5_onefileperstation.sh) on Gadi:

$ qsub mth5_muscle.sh

This job will process the 93 stations in our survey using 96 MPI ranks (one station per rank) and creates the following 93 mth5 files:

SA225-2.h5  SA252.h5    SA293-2.h5  SA320.h5    SA344.h5  WA15.h5  WA47.h5  WA65.h5  WANT19.h5
SA227.h5    SA26W-2.h5  SA294.h5    SA321.h5    SA345.h5  WA26.h5  WA54.h5  WA66.h5  WANT38.h5
SA242.h5    SA270.h5    SA295.h5    SA322.h5    SA346.h5  WA27.h5  WA55.h5  WA67.h5  WANT45.h5
SA243.h5    SA271.h5    SA296.h5    SA323.h5    SA347.h5  WA29.h5  WA56.h5  WA68.h5  WASA302.h5
SA245.h5    SA272.h5    SA297.h5    SA324-2.h5  SA348.h5  WA30.h5  WA57.h5  WA69.h5  WASA327.h5
SA246.h5    SA273.h5    SA298.h5    SA324.h5    SA349.h5  WA31.h5  WA58.h5  WA70.h5
SA247.h5    SA274-2.h5  SA299.h5    SA325-2.h5  SA350.h5  WA42.h5  WA60.h5  WA71.h5
SA248.h5    SA274.h5    SA300.h5    SA325.h5    SA351.h5  WA43.h5  WA61.h5  WA72.h5
SA249.h5    SA275.h5    SA301.h5    SA326N.h5   WA10.h5   WA44.h5  WA62.h5  WA73.h5
SA250.h5    SA276.h5    SA319.h5    SA326S.h5   WA13.h5   WA45.h5  WA63.h5  WA74.h5
SA251.h5    SA277.h5    SA320-2.h5  SA344-2.h5  WA14.h5   WA46.h5  WA64.h5  WA75.h5

Below is a summary of how long mth5_onefileperstation.py took to run for the AusLAMP Musgraves Province survey:

run on: Gadi
number of stations: 93
number of runs: 101
NCPUs used: 96 (2 nodes)
memory used: 791 GB
walltime used: 215 seconds (3m35s)
CPU time used: 48570 seconds (5h06m07s)
service units: 17.20

Concluding remarks

We have generated 93 mth5 files for the 93 stations from the Musgraves survey in 3 minutes and 35 seconds using 2 nodes (96 CPUs) on Gadi. For comparison, if we only used one CPU it would have taken approximately 5 hours to create these files.

In our mth5_in_parallel example, we generated a single mth5 file with all stations and this took approximately 35 minutes of walltime (or ~14 hours of CPU time). The “single file per station” model was much quicker as it did not require the creation of an mth5 skeleton as each process (rank) was run independently.

We were able to introduce compression into our mth5_onefileperstation.py code and the total volume of the 93 mth5 files was 14 GB. The mth5_in_parallel.ipynb tutorial was following the “all stations in a single mth5 file” model and as our version of Parallel HDF5 did not support compression, the total volume of the single mth5 file was 106 GB.