openPMD-api
ADIOS2File.hpp
1 /* Copyright 2023 Franz Poeschel
2  *
3  * This file is part of openPMD-api.
4  *
5  * openPMD-api is free software: you can redistribute it and/or modify
6  * it under the terms of of either the GNU General Public License or
7  * the GNU Lesser General Public License as published by
8  * the Free Software Foundation, either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * openPMD-api is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License and the GNU Lesser General Public License
15  * for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * and the GNU Lesser General Public License along with openPMD-api.
19  * If not, see <http://www.gnu.org/licenses/>.
20  */
21 #pragma once
22 
23 #include "openPMD/IO/ADIOS/ADIOS2Auxiliary.hpp"
24 #include "openPMD/IO/AbstractIOHandler.hpp"
25 #include "openPMD/IO/IOTask.hpp"
26 #include "openPMD/IO/InvalidatableFile.hpp"
27 #include "openPMD/config.hpp"
28 
29 #if openPMD_HAVE_ADIOS2
30 #include <adios2.h>
31 #endif
32 #if openPMD_HAVE_MPI
33 #include <mpi.h>
34 #endif
35 
36 #include <functional>
37 #include <string>
38 
39 namespace openPMD
40 {
41 class ADIOS2IOHandlerImpl;
42 }
43 
44 #if openPMD_HAVE_ADIOS2
45 namespace openPMD::detail
46 {
47 class ADIOS2File;
48 
49 /*
50  * IO-heavy action to be executed upon flushing.
51  */
53 {
54  explicit BufferedAction() = default;
55  virtual ~BufferedAction() = default;
56 
57  BufferedAction(BufferedAction const &other) = delete;
58  BufferedAction(BufferedAction &&other) = default;
59 
60  BufferedAction &operator=(BufferedAction const &other) = delete;
61  BufferedAction &operator=(BufferedAction &&other) = default;
62 
63  virtual void run(ADIOS2File &) = 0;
64 };
65 
67 {
68  std::string name;
70 
71  void run(ADIOS2File &) override;
72 };
73 
75 {
76  template <typename T>
77  static void call(
78  ADIOS2IOHandlerImpl *impl,
79  BufferedGet &bp,
80  adios2::IO &IO,
81  adios2::Engine &engine,
82  std::string const &fileName);
83 
84  static constexpr char const *errorMsg = "ADIOS2: readDataset()";
85 };
86 
88 {
89  std::string name;
91 
92  void run(ADIOS2File &) override;
93 };
94 
96 {
97  template <typename T>
98  static void call(ADIOS2File &ba, BufferedPut &bp);
99 
100  template <int n, typename... Params>
101  static void call(Params &&...);
102 };
103 
105 {
106  std::string name;
107  Offset offset;
108  Extent extent;
110  Datatype dtype = Datatype::UNDEFINED;
111 
112  void run(ADIOS2File &);
113 };
114 
116 {
117  virtual void *update() = 0;
118  virtual ~I_UpdateSpan() = default;
119 };
120 
121 template <typename T>
123 {
124  adios2::detail::Span<T> span;
125 
126  UpdateSpan(adios2::detail::Span<T>);
127 
128  void *update() override;
129 };
130 
131 /*
132  * Manages per-file information about
133  * (1) the file's IO and Engine objects
134  * (2) the file's deferred IO-heavy actions
135  */
137 {
138  friend struct BufferedGet;
139  friend struct BufferedPut;
140  friend struct RunUniquePtrPut;
141  friend struct WriteDataset;
142 
143  using UseGroupTable = adios_defs::UseGroupTable;
144  using FlushTarget = adios_defs::FlushTarget;
145 
146 public:
147  ADIOS2File(ADIOS2File const &) = delete;
148 
156  std::string m_file;
175  std::string m_IOName;
176  adios2::ADIOS &m_ADIOS;
177  adios2::IO m_IO;
182  std::vector<std::unique_ptr<BufferedAction>> m_buffer;
189  std::vector<BufferedUniquePtrPut> m_uniquePtrPuts;
196  std::vector<std::unique_ptr<BufferedAction>> m_alreadyEnqueued;
197  adios2::Mode m_mode;
206  std::map<unsigned, std::unique_ptr<I_UpdateSpan>> m_updateSpans;
207 
208  /*
209  * We call an attribute committed if the step during which it was
210  * written has been closed.
211  * A committed attribute cannot be modified.
212  */
213  std::set<std::string> uncommittedAttributes;
214 
215  /*
216  * The openPMD API will generally create new attributes for each
217  * iteration. This results in a growing number of attributes over time.
218  * In streaming-based modes, these will be completely sent anew in each
219  * iteration. If the following boolean is true, old attributes will be
220  * removed upon CLOSE_GROUP.
221  * Should not be set to true in persistent backends.
222  * Will be automatically set by ADIOS2File::configure_IO depending
223  * on chosen ADIOS2 engine and can not be explicitly overridden by user.
224  */
225  bool optimizeAttributesStreaming = false;
226 
227  using ParsePreference = Parameter<Operation::OPEN_FILE>::ParsePreference;
228  ParsePreference parsePreference = ParsePreference::UpFront;
229 
230  using AttributeMap_t = std::map<std::string, adios2::Params>;
231 
233 
234  ~ADIOS2File();
235 
240  void finalize();
241 
242  UseGroupTable detectGroupTable();
243 
244  adios2::Engine &getEngine();
245 
246  template <typename BA>
247  void enqueue(BA &&ba)
248  {
249  enqueue<BA>(std::forward<BA>(ba), m_buffer);
250  }
251 
252  template <typename BA>
253  void enqueue(BA &&ba, decltype(m_buffer) &buffer)
254  {
255  using BA_ = typename std::remove_reference<BA>::type;
256  buffer.emplace_back(
257  std::unique_ptr<BufferedAction>(new BA_(std::forward<BA>(ba))));
258  }
259 
260  template <typename... Args>
261  void flush(Args &&...args);
262 
264  {
265  /*
266  * Only execute performPutsGets if UserFlush.
267  */
268  FlushLevel level;
269  FlushTarget flushTarget = FlushTarget::Disk;
270 
271  ADIOS2FlushParams(FlushLevel level_in) : level(level_in)
272  {}
273 
274  ADIOS2FlushParams(FlushLevel level_in, FlushTarget flushTarget_in)
275  : level(level_in), flushTarget(flushTarget_in)
276  {}
277  };
278 
296  void flush_impl(
297  ADIOS2FlushParams flushParams,
298  std::function<void(ADIOS2File &, adios2::Engine &)> const
299  &performPutGets,
300  bool writeLatePuts,
301  bool flushUnconditionally);
302 
308  void flush_impl(ADIOS2FlushParams, bool writeLatePuts = false);
309 
317 
318  /*
319  * Delete all buffered actions without running them.
320  */
321  void drop();
322 
323  AttributeMap_t const &availableAttributes();
324 
325  std::vector<std::string>
326  availableAttributesPrefixed(std::string const &prefix);
327 
328  /*
329  * See description below.
330  */
331  void invalidateAttributesMap();
332 
333  AttributeMap_t const &availableVariables();
334 
335  std::vector<std::string>
336  availableVariablesPrefixed(std::string const &prefix);
337 
338  /*
339  * See description below.
340  */
341  void invalidateVariablesMap();
342 
343  void markActive(Writable *);
344 
345  // bool isActive(std::string const & path);
346 
347  /*
348  * streamStatus is NoStream for file-based ADIOS engines.
349  * This is relevant for the method ADIOS2File::requireActiveStep,
350  * where a step is only opened if the status is OutsideOfStep, but not
351  * if NoStream. The rationale behind this is that parsing a Series
352  * works differently for file-based and for stream-based engines:
353  * * stream-based: Iterations are parsed as they arrive. For parsing an
354  * iteration, the iteration must be awaited.
355  * ADIOS2File::requireActiveStep takes care of this.
356  * * file-based: The Series is parsed up front. If no step has been
357  * opened yet, ADIOS2 gives access to all variables and attributes
358  * from all steps. Upon opening a step, only the variables from that
359  * step are shown which hinders parsing. So, until a step is
360  * explicitly opened via ADIOS2IOHandlerImpl::advance, do not open
361  * one.
362  * This is to enable use of ADIOS files without the Streaming API
363  * (i.e. all iterations should be visible to the user upon opening
364  * the Series.)
365  * @todo Add a workflow without up-front parsing of all iterations
366  * for file-based engines.
367  * (This would merely be an optimization since the streaming
368  * API still works with files as intended.)
369  *
370  */
371  enum class StreamStatus
372  {
376  DuringStep,
384  StreamOver,
407  Undecided
408  };
410 
411  size_t currentStep();
412 
413 private:
414  ADIOS2IOHandlerImpl *m_impl;
415  std::optional<adios2::Engine> m_engine;
419  std::string m_engineType;
420 
421  /*
422  * Not all engines support the CurrentStep() call, so we have to
423  * implement this manually.
424  */
425  size_t m_currentStep = 0;
426 
427  /*
428  * ADIOS2 does not give direct access to its internal attribute and
429  * variable maps, but will instead give access to copies of them.
430  * In order to avoid unnecessary copies, we buffer the returned map.
431  * The downside of this is that we need to pay attention to invalidate
432  * the map whenever an attribute/variable is altered. In that case, we
433  * fetch the map anew.
434  * If empty, the buffered map has been invalidated and needs to be
435  * queried from ADIOS2 again. If full, the buffered map is equivalent to
436  * the map that would be returned by a call to
437  * IO::Available(Attributes|Variables).
438  */
439  std::optional<AttributeMap_t> m_availableAttributes;
440  std::optional<AttributeMap_t> m_availableVariables;
441 
442  std::set<Writable *> m_pathsMarkedAsActive;
443 
444  /*
445  * Cannot write attributes right after opening the engine
446  * https://github.com/ornladios/ADIOS2/issues/3433
447  */
448  bool initializedDefaults = false;
449  /*
450  * finalize() will set this true to avoid running twice.
451  */
452  bool finalized = false;
453 
454  UseGroupTable useGroupTable() const;
455 
456  void create_IO();
457 
458  void configure_IO();
459  void configure_IO_Read();
460  void configure_IO_Write();
461 };
462 
463 template <typename... Args>
464 void ADIOS2File::flush(Args &&...args)
465 {
466  try
467  {
468  flush_impl(std::forward<Args>(args)...);
469  }
470  catch (error::ReadError const &)
471  {
472  /*
473  * We need to take actions out of the buffer, since an exception
474  * should reset everything from the current IOHandler->flush() call.
475  * However, we cannot simply clear the buffer, since tasks may have
476  * been enqueued to ADIOS2 already and we cannot undo that.
477  * So, we need to keep the memory alive for the benefit of ADIOS2.
478  * Luckily, we have m_alreadyEnqueued for exactly that purpose.
479  */
480  for (auto &task : m_buffer)
481  {
482  m_alreadyEnqueued.emplace_back(std::move(task));
483  }
484  m_buffer.clear();
485  throw;
486  }
487 }
488 } // namespace openPMD::detail
489 #endif
Definition: ADIOS2IOHandler.hpp:94
Layer to mirror structure of logical data and persistent data in file.
Definition: Writable.hpp:75
Definition: ADIOS2File.hpp:137
AdvanceStatus advance(AdvanceMode mode)
Begin or end an ADIOS step.
Definition: ADIOS2File.cpp:1095
void finalize()
Implementation of destructor, will only run once.
Definition: ADIOS2File.cpp:198
std::map< unsigned, std::unique_ptr< I_UpdateSpan > > m_updateSpans
The base pointer of an ADIOS2 span might change after reallocations.
Definition: ADIOS2File.hpp:206
std::vector< std::unique_ptr< BufferedAction > > m_buffer
The default queue for deferred actions.
Definition: ADIOS2File.hpp:182
std::vector< std::unique_ptr< BufferedAction > > m_alreadyEnqueued
This contains deferred actions that have already been enqueued into ADIOS2, but not yet performed in ...
Definition: ADIOS2File.hpp:196
std::vector< BufferedUniquePtrPut > m_uniquePtrPuts
When receiving a unique_ptr, we know that the buffer is ours and ours alone.
Definition: ADIOS2File.hpp:189
std::string m_IOName
ADIOS requires giving names to instances of adios2::IO.
Definition: ADIOS2File.hpp:175
void flush_impl(ADIOS2FlushParams flushParams, std::function< void(ADIOS2File &, adios2::Engine &)> const &performPutGets, bool writeLatePuts, bool flushUnconditionally)
Flush deferred IO actions.
Definition: ADIOS2File.cpp:932
std::string m_file
The full path to the file created on disk, including the containing directory and the file extension,...
Definition: ADIOS2File.hpp:156
StreamStatus
Definition: ADIOS2File.hpp:372
@ ReadWithoutStream
File is not written is streaming fashion.
@ DuringStep
A step is currently active.
@ OutsideOfStep
A stream is active, but no step.
@ Undecided
The stream status of a file-based engine will be decided upon opening the engine if in read mode.
Public definitions of openPMD-api.
Definition: Date.cpp:29
AdvanceMode
In step-based mode (i.e.
Definition: Streaming.hpp:35
AdvanceStatus
In step-based mode (i.e.
Definition: Streaming.hpp:21
FlushLevel
Determine what items should be flushed upon Series::flush()
Definition: AbstractIOHandler.hpp:49
Datatype
Concrete datatype of an object available at runtime.
Definition: Datatype.hpp:51
Wrapper around a shared pointer to:
Definition: InvalidatableFile.hpp:44
Typesafe description of all required arguments for a specified Operation.
Definition: IOTask.hpp:105
Definition: ADIOS2File.hpp:53
Definition: ADIOS2File.hpp:67
Definition: ADIOS2File.hpp:88
Definition: ADIOS2File.hpp:105
Definition: ADIOS2File.hpp:75
Definition: ADIOS2File.hpp:116
Definition: ADIOS2File.cpp:140
Definition: ADIOS2File.hpp:123
Definition: ADIOS2File.hpp:96