Parallel API

The following examples show parallel reading and writing of domain-decomposed data with MPI.

Reading

#include <openPMD/openPMD.hpp>

#include <mpi.h>

#include <iostream>
#include <memory>
#include <cstddef>


using std::cout;
using namespace openPMD;

int main(int argc, char *argv[])
{
    MPI_Init(&argc, &argv);

    int mpi_size;
    int mpi_rank;

    MPI_Comm_size(MPI_COMM_WORLD, &mpi_size);
    MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank);


    /* note: this scope is intentional to destruct the openPMD::Series object
     *       prior to MPI_Finalize();
     */
    {
        Series series = Series(
            "../samples/git-sample/data%T.h5",
            AccessType::READ_ONLY,
            MPI_COMM_WORLD
        );
        if( 0 == mpi_rank )
            cout << "Read a series in parallel with " << mpi_size << " MPI ranks\n";

        MeshRecordComponent E_x = series.iterations[100].meshes["E"]["x"];

        Offset chunk_offset = {
            static_cast< long unsigned int >(mpi_rank) + 1,
            1,
            1
        };
        Extent chunk_extent = {2, 2, 1};

        auto chunk_data = E_x.loadChunk<double>(chunk_offset, chunk_extent);

        if( 0 == mpi_rank )
            cout << "Queued the loading of a single chunk per MPI rank from disk, "
                    "ready to execute\n";
        series.flush();

        if( 0 == mpi_rank )
            cout << "Chunks have been read from disk\n";

        for( int i = 0; i < mpi_size; ++i )
        {
            if( i == mpi_rank )
            {
                cout << "Rank " << mpi_rank << " - Read chunk contains:\n";
                for( size_t row = 0; row < chunk_extent[0]; ++row )
                {
                    for( size_t col = 0; col < chunk_extent[1]; ++col )
                        cout << "\t"
                             << '(' << row + chunk_offset[0] << '|' << col + chunk_offset[1] << '|' << 1 << ")\t"
                             << chunk_data.get()[row*chunk_extent[1]+col];
                    cout << std::endl;
                }
            }

            // this barrier is not necessary but structures the example output
            MPI_Barrier(MPI_COMM_WORLD);
        }
    }

    // openPMD::Series MUST be destructed at this point
    MPI_Finalize();

    return 0;
}

Writing

#include <openPMD/openPMD.hpp>

#include <mpi.h>

#include <iostream>
#include <memory>


using std::cout;
using namespace openPMD;

int main(int argc, char *argv[])
{
    MPI_Init(&argc, &argv);

    int mpi_size;
    int mpi_rank;

    MPI_Comm_size(MPI_COMM_WORLD, &mpi_size);
    MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank);

    /* note: this scope is intentional to destruct the openPMD::Series object
     *       prior to MPI_Finalize();
     */
    {
        // allocate a data set to write
        std::shared_ptr< double > global_data(new double[mpi_size], [](double *p) { delete[] p; });
        for( int i = 0; i < mpi_size; ++i )
            global_data.get()[i] = i;
        if( 0 == mpi_rank )
            cout << "Set up a 1D array with one element per MPI rank (" << mpi_size
                 << ") that will be written to disk\n";

        std::shared_ptr< double > local_data{new double};
        *local_data = global_data.get()[mpi_rank];
        if( 0 == mpi_rank )
            cout << "Set up a 1D array with one element, representing the view of the MPI rank\n";

        // open file for writing
        Series series = Series(
            "../samples/5_parallel_write.h5",
            AccessType::CREATE,
            MPI_COMM_WORLD
        );
        if( 0 == mpi_rank )
          cout << "Created an empty series in parallel with "
               << mpi_size << " MPI ranks\n";

        MeshRecordComponent id =
            series
                .iterations[1]
                .meshes["id"][MeshRecordComponent::SCALAR];

        Datatype datatype = determineDatatype(local_data);
        Extent dataset_extent = {static_cast< long unsigned int >(mpi_size)};
        Dataset dataset = Dataset(datatype, dataset_extent);

        if( 0 == mpi_rank )
            cout << "Created a Dataset of size " << dataset.extent[0]
                 << " and Datatype " << dataset.dtype << '\n';

        id.resetDataset(dataset);
        if( 0 == mpi_rank )
            cout << "Set the global on-disk Dataset properties for the scalar field id in iteration 1\n";

        series.flush();
        if( 0 == mpi_rank )
            cout << "File structure has been written to disk\n";

        Offset chunk_offset = {static_cast< long unsigned int >(mpi_rank)};
        Extent chunk_extent = {1};
        id.storeChunk(chunk_offset, chunk_extent, local_data);
        if( 0 == mpi_rank )
            cout << "Stored a single chunk per MPI rank containing its contribution, "
                    "ready to write content to disk\n";

        series.flush();
        if( 0 == mpi_rank )
            cout << "Dataset content has been fully written to disk\n";
    }

    // openPMD::Series MUST be destructed at this point
    MPI_Finalize();

    return 0;
}