# Streaming¶

Note

Data streaming is a novel backend and under active development. At the moment, the internal data format is still changing rapidly and is likely not compatible between releases of the openPMD-api.

The openPMD API includes a streaming-aware API as well as streaming-enabled backends (currently: ADIOS2).

Unlike in file-based backends, the order in which data is put out becomes relevant in streaming-based backends. Each iteration will be published as one atomical step by the streaming API (compare the concept of steps in ADIOS2).

The reading end of the streaming API enforces further restrictions that become necessary through the nature of streaming. It can be used to read any kind of openPMD-compatible dataset, stream-based and filesystem-based alike.

### C++¶

The reading end of the streaming API is activated through use of Series::readIterations() instead of accessing the field Series::iterations directly. The returned object of type ReadIterations can be used in a C++11 range-based for loop to iterate over objects of type IndexedIteration. This class extends the Iteration class with a field IndexedIteration::iterationIndex, denoting this iteration’s index.

Users are encouraged to explicitly .close() the iteration after reading from it. Closing the iteration will flush all pending operations on that iteration. If an iteration is not closed until the beginning of the next iteration, it will be closed automatically.

Note that a closed iteration cannot be reopened. This pays tribute to the fact that in streaming mode, an iteration may be dropped by the data source once the data sink has finished reading from it.

#define BUILD_STREAMING_EXAMPLE false
#if BUILD_STREAMING_EXAMPLE
#include <openPMD/openPMD.hpp>

#include <array>
#include <iostream>
#include <memory>

using std::cout;
using namespace openPMD;

int
main()
{
using position_t = double;
Series series = Series( "electrons.sst", Access::READ_ONLY );

for( IndexedIteration iteration : series.readIterations() )
{
std::cout << "Current iteration: " << iteration.iterationIndex
<< std::endl;
Record electronPositions = iteration.particles[ "e" ][ "position" ];
std::array< std::shared_ptr< position_t >, 3 > loadedChunks;
std::array< Extent, 3 > extents;
std::array< std::string, 3 > const dimensions{ { "x", "y", "z" } };

for( size_t i = 0; i < 3; ++i )
{
std::string dim = dimensions[ i ];
RecordComponent rc = electronPositions[ dim ];
Offset( rc.getDimensionality(), 0 ), rc.getExtent() );
extents[ i ] = rc.getExtent();
}

iteration.close();

for( size_t i = 0; i < 3; ++i )
{
std::string dim = dimensions[ i ];
Extent const & extent = extents[ i ];
std::cout << "\ndim: " << dim << "\n" << std::endl;
auto chunk = loadedChunks[ i ];
for( size_t j = 0; j < extent[ 0 ]; ++j )
{
std::cout << chunk.get()[ j ] << ", ";
}
std::cout << "\n----------\n" << std::endl;
}
}

return 0;
#else
std::cout << "The streaming example requires that openPMD has been built "
<< std::endl;
return 0;
#endif
}
#else
int main(){ return 0; }
#endif


### Python¶

The reading end of the streaming API is activated through use of Series.read_iterations() instead of accessing the field Series.iterations directly. The returned object of type ReadIterations can be used in a Python range-based for loop to iterate over objects of type IndexedIteration. This class extends the Iteration class with a field IndexedIteration.iteration_index, denoting this iteration’s index.

Users are encouraged to explicitly .close() the iteration after reading from it. Closing the iteration will flush all pending operations on that iteration. If an iteration is not closed until the beginning of the next iteration, it will be closed automatically.

Note that a closed iteration cannot be reopened. This pays tribute to the fact that in streaming mode, an iteration may be dropped by the data source once the data sink has finished reading from it.

#!/usr/bin/env python
import openpmd_api as io

run_streaming_example = False

if not run_streaming_example:
exit(0)

if __name__ == "__main__":
exit(0)

print("Current iteration {}".format(iteration.iteration_index))
electronPositions = iteration.particles["e"]["position"]
shapes = []
dimensions = ["x", "y", "z"]

for i in range(3):
dim = dimensions[i]
rc = electronPositions[dim]
shapes.append(rc.shape)
iteration.close()

for i in range(3):
dim = dimensions[i]
shape = shapes[i]
print("dim: {}".format(dim))
print(chunk)


## Writing¶

The writing end of the streaming API enforces further restrictions that become necessary through the nature of streaming. It can be used to write any kind of openPMD-compatible dataset, stream-based and filesystem-based alike.

### C++¶

The writing end of the streaming API is activated through use of Series::writeIterations() instead of accessing the field Series::iterations directly. The returned object of type WriteIterations wraps the field Series::iterations, but exposes only a restricted subset of functionality. Using WriteIterations::operator[]( uint64_t ) will automatically open a streaming step for the corresponding iteration.

Users are encouraged to explicitly .close() the iteration after writing to it. Closing the iteration will flush all pending operations on that iteration. If an iteration is not closed until the next iteration is accessed via WriteIterations::operator[]( uint64_t ), it will be closed automatically.

Note that a closed iteration cannot be reopened. This pays tribute to the fact that in streaming mode, an iteration is sent to the sink upon closing it and the data source can no longer modify it.

#define BUILD_STREAMING_EXAMPLE false
#if BUILD_STREAMING_EXAMPLE
#include <openPMD/openPMD.hpp>

#include <iostream>
#include <memory>
#include <numeric> // std::iota

using std::cout;
using namespace openPMD;

int
main()
{
using position_t = double;

// open file for writing
Series series = Series( "electrons.sst", Access::CREATE );

Datatype datatype = determineDatatype< position_t >();
constexpr unsigned long length = 10ul;
Extent global_extent = { length };
Dataset dataset = Dataset( datatype, global_extent );
std::shared_ptr< position_t > local_data(
new position_t[ length ],
[]( position_t const * ptr ) { delete[] ptr; } );

WriteIterations iterations = series.writeIterations();
for( size_t i = 0; i < 100; ++i )
{
Iteration iteration = iterations[ i ];
Record electronPositions = iteration.particles[ "e" ][ "position" ];

std::iota( local_data.get(), local_data.get() + length, i * length );
for( auto const & dim : { "x", "y", "z" } )
{
RecordComponent pos = electronPositions[ dim ];
pos.resetDataset( dataset );
pos.storeChunk( local_data, Offset{ 0 }, global_extent );
}
iteration.close();
}

return 0;
#else
std::cout << "The streaming example requires that openPMD has been built "
<< std::endl;
return 0;
#endif
}
#else
int main(){ return 0; }
#endif


### Python¶

The writing end of the streaming API is activated through use of Series.write_iterations() instead of accessing the field Series.iterations directly. The returned object of type WriteIterations wraps the field Series.iterations, but exposes only a restricted subset of functionality. Using WriteIterations.__getitem__(index) (i.e. the index operator series.writeIterations()[index]) will automatically open a streaming step for the corresponding iteration.

Users are encouraged to explicitly .close() the iteration after writing to it. Closing the iteration will flush all pending operations on that iteration. If an iteration is not closed until the next iteration is accessed via WriteIterations.__getitem__(index), it will be closed automatically.

Note that a closed iteration cannot be reopened. This pays tribute to the fact that in streaming mode, an iteration is sent to the sink upon closing it and the data source can no longer modify it.

#!/usr/bin/env python
import openpmd_api as io
import numpy as np

run_streaming_example = False

if not run_streaming_example:
exit(0)

if __name__ == "__main__":
exit(0)

series = io.Series("stream.sst", io.Access_Type.create)
datatype = np.dtype("double")
length = 10
global_extent = [10]
dataset = io.Dataset(datatype, global_extent)

iterations = series.write_iterations()
for i in range(100):
iteration = iterations[i]
electronPositions = iteration.particles["e"]["position"]

local_data = np.arange(i * length, (i + 1) * length, dtype=datatype)
for dim in ["x", "y", "z"]:
pos = electronPositions[dim]
pos.reset_dataset(dataset)
pos[()] = local_data
iteration.close()