Parallel Examples

The following examples show parallel reading and writing of domain-decomposed data with MPI.

The Message Passing Interface (MPI) is an open communication standard for scientific computing. MPI is used on clusters, e.g. large-scale supercomputers, to communicate between nodes and provides parallel I/O primitives.

Reading

C++

#include <openPMD/openPMD.hpp>

#include <mpi.h>

#include <cstddef>
#include <iostream>
#include <memory>

using std::cout;
using namespace openPMD;

int main(int argc, char *argv[])
{
    MPI_Init(&argc, &argv);

    int mpi_size;
    int mpi_rank;

    MPI_Comm_size(MPI_COMM_WORLD, &mpi_size);
    MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank);

    /* note: this scope is intentional to destruct the openPMD::Series object
     *       prior to MPI_Finalize();
     */
    {
        Series series = Series(
            "../samples/git-sample/data%T.h5",
            Access::READ_ONLY,
            MPI_COMM_WORLD);
        if (0 == mpi_rank)
            cout << "Read a series in parallel with " << mpi_size
                 << " MPI ranks\n";

        MeshRecordComponent E_x = series.iterations[100].meshes["E"]["x"];

        Offset chunk_offset = {
            static_cast<long unsigned int>(mpi_rank) + 1, 1, 1};
        Extent chunk_extent = {2, 2, 1};

        auto chunk_data = E_x.loadChunk<double>(chunk_offset, chunk_extent);

        if (0 == mpi_rank)
            cout << "Queued the loading of a single chunk per MPI rank from "
                    "disk, "
                    "ready to execute\n";
        series.flush();

        if (0 == mpi_rank)
            cout << "Chunks have been read from disk\n";

        for (int i = 0; i < mpi_size; ++i)
        {
            if (i == mpi_rank)
            {
                cout << "Rank " << mpi_rank << " - Read chunk contains:\n";
                for (size_t row = 0; row < chunk_extent[0]; ++row)
                {
                    for (size_t col = 0; col < chunk_extent[1]; ++col)
                        cout << "\t" << '(' << row + chunk_offset[0] << '|'
                             << col + chunk_offset[1] << '|' << 1 << ")\t"
                             << chunk_data.get()[row * chunk_extent[1] + col];
                    cout << std::endl;
                }
            }

            // this barrier is not necessary but structures the example output
            MPI_Barrier(MPI_COMM_WORLD);
        }
    }

    // openPMD::Series MUST be destructed at this point
    MPI_Finalize();

    return 0;
}

Python

# IMPORTANT: include mpi4py FIRST
# https://mpi4py.readthedocs.io/en/stable/mpi4py.run.html
# on import: calls MPI_Init_thread()
# exit hook: calls MPI_Finalize()
from mpi4py import MPI
import openpmd_api as io

if __name__ == "__main__":
    # also works with any other MPI communicator
    comm = MPI.COMM_WORLD

    series = io.Series(
        "../samples/git-sample/data%T.h5",
        io.Access.read_only,
        comm
    )
    if 0 == comm.rank:
        print("Read a series in parallel with {} MPI ranks".format(
              comm.size))

    E_x = series.iterations[100].meshes["E"]["x"]

    chunk_offset = [comm.rank + 1, 1, 1]
    chunk_extent = [2, 2, 1]

    chunk_data = E_x.load_chunk(chunk_offset, chunk_extent)

    if 0 == comm.rank:
        print("Queued the loading of a single chunk per MPI rank from disk, "
              "ready to execute")
    series.flush()

    if 0 == comm.rank:
        print("Chunks have been read from disk")

    for i in range(comm.size):
        if i == comm.rank:
            print("Rank {} - Read chunk contains:".format(i))
            for row in range(chunk_extent[0]):
                for col in range(chunk_extent[1]):
                    print("\t({}|{}|1)\t{:e}".format(
                        row + chunk_offset[0],
                        col + chunk_offset[1],
                        chunk_data[row, col, 0]
                    ), end='')
                print("")

        # this barrier is not necessary but structures the example output
        comm.Barrier()

    # The files in 'series' are still open until the object is destroyed, on
    # which it cleanly flushes and closes all open file handles.
    # One can delete the object explicitly (or let it run out of scope) to
    # trigger this.
    # In any case, this must happen before MPI_Finalize() is called
    # (usually in the mpi4py exit hook).
    del series

Writing

C++

#include <openPMD/openPMD.hpp>

#include <mpi.h>

#include <iostream>
#include <memory>
#include <vector> // std::vector

using std::cout;
using namespace openPMD;

int main(int argc, char *argv[])
{
    MPI_Init(&argc, &argv);

    int mpi_size;
    int mpi_rank;

    MPI_Comm_size(MPI_COMM_WORLD, &mpi_size);
    MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank);

    /* note: this scope is intentional to destruct the openPMD::Series object
     *       prior to MPI_Finalize();
     */
    {
        // global data set to write: [MPI_Size * 10, 300]
        // each rank writes a 10x300 slice with its MPI rank as values
        auto const value = float(mpi_size);
        std::vector<float> local_data(10 * 300, value);
        if (0 == mpi_rank)
            cout << "Set up a 2D array with 10x300 elements per MPI rank ("
                 << mpi_size << "x) that will be written to disk\n";

        // open file for writing
        Series series = Series(
            "../samples/5_parallel_write.h5", Access::CREATE, MPI_COMM_WORLD);
        if (0 == mpi_rank)
            cout << "Created an empty series in parallel with " << mpi_size
                 << " MPI ranks\n";

        MeshRecordComponent mymesh =
            series.iterations[1].meshes["mymesh"][MeshRecordComponent::SCALAR];

        // example 1D domain decomposition in first index
        Datatype datatype = determineDatatype<float>();
        Extent global_extent = {10ul * mpi_size, 300};
        Dataset dataset = Dataset(datatype, global_extent);

        if (0 == mpi_rank)
            cout << "Prepared a Dataset of size " << dataset.extent[0] << "x"
                 << dataset.extent[1] << " and Datatype " << dataset.dtype
                 << '\n';

        mymesh.resetDataset(dataset);
        if (0 == mpi_rank)
            cout << "Set the global Dataset properties for the scalar field "
                    "mymesh in iteration 1\n";

        // example shows a 1D domain decomposition in first index
        Offset chunk_offset = {10ul * mpi_rank, 0};
        Extent chunk_extent = {10, 300};
        mymesh.storeChunk(local_data, chunk_offset, chunk_extent);
        if (0 == mpi_rank)
            cout << "Registered a single chunk per MPI rank containing its "
                    "contribution, "
                    "ready to write content to disk\n";

        series.flush();
        if (0 == mpi_rank)
            cout << "Dataset content has been fully written to disk\n";
    }

    // openPMD::Series MUST be destructed at this point
    MPI_Finalize();

    return 0;
}

Python

# IMPORTANT: include mpi4py FIRST
# https://mpi4py.readthedocs.io/en/stable/mpi4py.run.html
# on import: calls MPI_Init_thread()
# exit hook: calls MPI_Finalize()
from mpi4py import MPI
import numpy as np
import openpmd_api as io

if __name__ == "__main__":
    # also works with any other MPI communicator
    comm = MPI.COMM_WORLD

    # global data set to write: [MPI_Size * 10, 300]
    # each rank writes a 10x300 slice with its MPI rank as values
    local_value = comm.size
    local_data = np.ones(10 * 300,
                         dtype=np.double).reshape(10, 300) * local_value
    if 0 == comm.rank:
        print("Set up a 2D array with 10x300 elements per MPI rank ({}x) "
              "that will be written to disk".format(comm.size))

    # open file for writing
    series = io.Series(
        "../samples/5_parallel_write_py.h5",
        io.Access.create,
        comm
    )
    if 0 == comm.rank:
        print("Created an empty series in parallel with {} MPI ranks".format(
              comm.size))

    mymesh = series.iterations[1]. \
        meshes["mymesh"][io.Mesh_Record_Component.SCALAR]

    # example 1D domain decomposition in first index
    global_extent = [comm.size * 10, 300]
    dataset = io.Dataset(local_data.dtype, global_extent)

    if 0 == comm.rank:
        print("Prepared a Dataset of size {} and Datatype {}".format(
              dataset.extent, dataset.dtype))

    mymesh.reset_dataset(dataset)
    if 0 == comm.rank:
        print("Set the global Dataset properties for the scalar field "
              "mymesh in iteration 1")

    # example shows a 1D domain decomposition in first index
    mymesh[comm.rank*10:(comm.rank+1)*10, :] = local_data
    if 0 == comm.rank:
        print("Registered a single chunk per MPI rank containing its "
              "contribution, ready to write content to disk")

    series.flush()
    if 0 == comm.rank:
        print("Dataset content has been fully written to disk")

    # The files in 'series' are still open until the object is destroyed, on
    # which it cleanly flushes and closes all open file handles.
    # One can delete the object explicitly (or let it run out of scope) to
    # trigger this.
    del series