Parallel Examples
The following examples show parallel reading and writing of domain-decomposed data with MPI.
The Message Passing Interface (MPI) is an open communication standard for scientific computing. MPI is used on clusters, e.g. large-scale supercomputers, to communicate between nodes and provides parallel I/O primitives.
Reading
C++
#include <openPMD/openPMD.hpp>
#include <mpi.h>
#include <cstddef>
#include <iostream>
#include <memory>
using std::cout;
using namespace openPMD;
int main(int argc, char *argv[])
{
MPI_Init(&argc, &argv);
int mpi_size;
int mpi_rank;
MPI_Comm_size(MPI_COMM_WORLD, &mpi_size);
MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank);
Series series = Series(
"../samples/git-sample/data%T.h5", Access::READ_ONLY, MPI_COMM_WORLD);
if (0 == mpi_rank)
cout << "Read a series in parallel with " << mpi_size << " MPI ranks\n";
MeshRecordComponent E_x = series.iterations[100].meshes["E"]["x"];
Offset chunk_offset = {static_cast<long unsigned int>(mpi_rank) + 1, 1, 1};
Extent chunk_extent = {2, 2, 1};
// If you know the datatype, use `loadChunk<double>(...)` instead.
auto chunk_data = E_x.loadChunkVariant(chunk_offset, chunk_extent);
if (0 == mpi_rank)
cout << "Queued the loading of a single chunk per MPI rank from "
"disk, "
"ready to execute\n";
// The iteration can be closed in order to help free up resources.
// The iteration's content will be flushed automatically.
// An iteration once closed cannot (yet) be reopened.
series.iterations[100].close();
if (0 == mpi_rank)
cout << "Chunks have been read from disk\n";
for (int i = 0; i < mpi_size; ++i)
{
if (i == mpi_rank)
{
cout << "Rank " << mpi_rank << " - Read chunk contains:\n";
for (size_t row = 0; row < chunk_extent[0]; ++row)
{
for (size_t col = 0; col < chunk_extent[1]; ++col)
{
cout << "\t" << '(' << row + chunk_offset[0] << '|'
<< col + chunk_offset[1] << '|' << 1 << ")\t";
/*
* For hot loops, the std::visit(...) call should be moved
* further up.
*/
std::visit(
[row, col, &chunk_extent](auto &shared_ptr) {
cout << shared_ptr
.get()[row * chunk_extent[1] + col];
},
chunk_data);
}
cout << std::endl;
}
}
// this barrier is not necessary but structures the example output
MPI_Barrier(MPI_COMM_WORLD);
}
// The files in 'series' are still open until the series is closed, at which
// time it cleanly flushes and closes all open file handles.
// One can close the object explicitly to trigger this.
// Alternatively, this will automatically happen once the garbage collector
// claims (every copy of) the series object.
// In any case, this must happen before MPI_Finalize() is called
series.close();
// openPMD::Series MUST be destructed or closed at this point
MPI_Finalize();
return 0;
}
Python
# IMPORTANT: include mpi4py FIRST
# https://mpi4py.readthedocs.io/en/stable/mpi4py.run.html
# on import: calls MPI_Init_thread()
# exit hook: calls MPI_Finalize()
from mpi4py import MPI
import openpmd_api as io
if __name__ == "__main__":
# also works with any other MPI communicator
comm = MPI.COMM_WORLD
series = io.Series(
"../samples/git-sample/data%T.h5",
io.Access.read_only,
comm
)
if 0 == comm.rank:
print("Read a series in parallel with {} MPI ranks".format(
comm.size))
E_x = series.iterations[100].meshes["E"]["x"]
chunk_offset = [comm.rank + 1, 1, 1]
chunk_extent = [2, 2, 1]
chunk_data = E_x.load_chunk(chunk_offset, chunk_extent)
if 0 == comm.rank:
print("Queued the loading of a single chunk per MPI rank from disk, "
"ready to execute")
# The iteration can be closed in order to help free up resources.
# The iteration's content will be flushed automatically.
# An iteration once closed cannot (yet) be reopened.
series.iterations[100].close()
if 0 == comm.rank:
print("Chunks have been read from disk")
for i in range(comm.size):
if i == comm.rank:
print("Rank {} - Read chunk contains:".format(i))
for row in range(chunk_extent[0]):
for col in range(chunk_extent[1]):
print("\t({}|{}|1)\t{:e}".format(
row + chunk_offset[0],
col + chunk_offset[1],
chunk_data[row, col, 0]
), end='')
print("")
# this barrier is not necessary but structures the example output
comm.Barrier()
# The files in 'series' are still open until the series is closed, at which
# time it cleanly flushes and closes all open file handles.
# One can close the object explicitly to trigger this.
# Alternatively, this will automatically happen once the garbage collector
# claims (every copy of) the series object.
# In any case, this must happen before MPI_Finalize() is called
# (usually in the mpi4py exit hook).
series.close()
Writing
C++
#include <openPMD/openPMD.hpp>
#include <mpi.h>
#include <iostream>
#include <memory>
#include <vector> // std::vector
using std::cout;
using namespace openPMD;
int main(int argc, char *argv[])
{
int provided;
MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
int mpi_size;
int mpi_rank;
MPI_Comm_size(MPI_COMM_WORLD, &mpi_size);
MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank);
// global data set to write: [MPI_Size * 10, 300]
// each rank writes a 10x300 slice with its MPI rank as values
auto const value = float(mpi_size);
std::vector<float> local_data(10 * 300, value);
if (0 == mpi_rank)
cout << "Set up a 2D array with 10x300 elements per MPI rank ("
<< mpi_size << "x) that will be written to disk\n";
std::string subfiling_config = R"(
[hdf5.vfd]
type = "subfiling"
ioc_selection = "every_nth_rank"
stripe_size = 33554432
stripe_count = -1
[hdf5.dataset]
chunks = "auto"
)";
// open file for writing
Series series = Series(
"../samples/5_parallel_write.h5",
Access::CREATE,
MPI_COMM_WORLD,
subfiling_config);
if (0 == mpi_rank)
cout << "Created an empty series in parallel with " << mpi_size
<< " MPI ranks\n";
// In parallel contexts, it's important to explicitly open iterations.
// You can either explicitly access Series::iterations and use
// Iteration::open() afterwards, or use `Series::writeIterations()`,
// or in read mode `Series::readIterations()` where iterations are opened
// automatically.
// `Series::writeIterations()` and `Series::readIterations()` are
// intentionally restricted APIs that ensure a workflow which also works
// in streaming setups, e.g. an iteration cannot be opened again once
// it has been closed.
series.iterations[1].open();
Mesh mymesh = series.iterations[1].meshes["mymesh"];
// example 1D domain decomposition in first index
Datatype datatype = determineDatatype<float>();
Extent global_extent = {10ul * mpi_size, 300};
Dataset dataset = Dataset(datatype, global_extent, R"(
[hdf5.dataset]
chunks = [10, 100]
)");
if (0 == mpi_rank)
cout << "Prepared a Dataset of size " << dataset.extent[0] << "x"
<< dataset.extent[1] << " and Datatype " << dataset.dtype << '\n';
mymesh.resetDataset(dataset);
if (0 == mpi_rank)
cout << "Set the global Dataset properties for the scalar field "
"mymesh in iteration 1\n";
// example shows a 1D domain decomposition in first index
Offset chunk_offset = {10ul * mpi_rank, 0};
Extent chunk_extent = {10, 300};
mymesh.storeChunk(local_data, chunk_offset, chunk_extent);
if (0 == mpi_rank)
cout << "Registered a single chunk per MPI rank containing its "
"contribution, "
"ready to write content to disk\n";
// The iteration can be closed in order to help free up resources.
// The iteration's content will be flushed automatically.
// An iteration once closed cannot (yet) be reopened.
series.iterations[100].close();
if (0 == mpi_rank)
cout << "Dataset content has been fully written to disk\n";
/* The files in 'series' are still open until the object is destroyed, on
* which it cleanly flushes and closes all open file handles.
* When running out of scope on return, the 'Series' destructor is called.
* Alternatively, one can call `series.close()` to the same effect as
* calling the destructor, including the release of file handles.
*/
series.close();
// openPMD::Series MUST be destructed or closed at this point
MPI_Finalize();
return 0;
}
Python
# IMPORTANT: include mpi4py FIRST
# https://mpi4py.readthedocs.io/en/stable/mpi4py.run.html
# on import: calls MPI_Init_thread()
# exit hook: calls MPI_Finalize()
from mpi4py import MPI
import numpy as np
import openpmd_api as io
try:
import adios2
from packaging import version
USE_JOINED_DIMENSION = \
version.parse(adios2.__version__) >= version.parse('2.9.0')
except ImportError:
USE_JOINED_DIMENSION = False
if __name__ == "__main__":
# also works with any other MPI communicator
comm = MPI.COMM_WORLD
# global data set to write: [MPI_Size * 10, 300]
# each rank writes a 10x300 slice with its MPI rank as values
local_value = comm.rank
local_data = np.ones(10 * 300,
dtype=np.double).reshape(10, 300) * local_value
if 0 == comm.rank:
print("Set up a 2D array with 10x300 elements per MPI rank ({}x) "
"that will be written to disk".format(comm.size))
# open file for writing
series = io.Series(
"../samples/5_parallel_write_py.bp"
if USE_JOINED_DIMENSION
else "../samples/5_parallel_write_py.h5",
io.Access.create,
comm
)
if 0 == comm.rank:
print("Created an empty series in parallel with {} MPI ranks".format(
comm.size))
# In parallel contexts, it's important to explicitly open iterations.
# This is done automatically when using `Series.write_iterations()`,
# or in read mode `Series.read_iterations()`.
#
# `Series.write_iterations()` and `Series.read_iterations()` are
# intentionally restricted APIs that ensure a workflow which also works
# in streaming setups, e.g. an iteration cannot be opened again once
# it has been closed.
# `Series.iterations` can be directly accessed in random-access workflows.
series.iterations[1].open()
mymesh = series.iterations[1]. \
meshes["mymesh"]
# example 1D domain decomposition in first index
global_extent = [io.Dataset.JOINED_DIMENSION, 300] \
if USE_JOINED_DIMENSION else [comm.size * 10, 300]
dataset = io.Dataset(local_data.dtype, global_extent)
if 0 == comm.rank:
print("Prepared a Dataset of size {} and Datatype {}".format(
dataset.extent, dataset.dtype))
mymesh.reset_dataset(dataset)
if 0 == comm.rank:
print("Set the global Dataset properties for the scalar field "
"mymesh in iteration 1")
# example shows a 1D domain decomposition in first index
if USE_JOINED_DIMENSION:
# explicit API
# mymesh.store_chunk(local_data, [], [10, 300])
mymesh[:, :] = local_data
# or short:
# mymesh[()] = local_data
else:
mymesh[comm.rank*10:(comm.rank+1)*10, :] = local_data
if 0 == comm.rank:
print("Registered a single chunk per MPI rank containing its "
"contribution, ready to write content to disk")
# The iteration can be closed in order to help free up resources.
# The iteration's content will be flushed automatically.
# An iteration once closed cannot (yet) be reopened.
series.iterations[1].close()
if 0 == comm.rank:
print("Dataset content has been fully written to disk")
# The files in 'series' are still open until the series is closed, at which
# time it cleanly flushes and closes all open file handles.
# One can close the object explicitly to trigger this.
# Alternatively, this will automatically happen once the garbage collector
# claims (every copy of) the series object.
series.close()