diff --git a/CMakeLists.txt b/CMakeLists.txt index 08ced21eac7957066566d92938d7f91581007091..066d109086ffc8907655731365f17fc81f94e083 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -36,6 +36,7 @@ include_directories(${CMAKE_CURRENT_SOURCE_DIR}/include) # packages we depend on include (xpkg-import) find_package (mercury CONFIG REQUIRED) +find_package (thallium CONFIG REQUIRED) find_package (Boost REQUIRED COMPONENTS serialization) include_directories(${Boost_INCLUDE_DIRS}) xpkg_import_module (margo REQUIRED margo) diff --git a/bin/CMakeLists.txt b/bin/CMakeLists.txt index 4ec75c5774bc0c522c68087a48f71e27a0db4076..72ebbd04007f7e61d81d62c4ff827e424ec6b4b4 100644 --- a/bin/CMakeLists.txt +++ b/bin/CMakeLists.txt @@ -1,5 +1,5 @@ add_executable(hepnos-daemon hepnos-daemon.cpp) -target_link_libraries(hepnos-daemon hepnos-service yaml-cpp margo sdskv-server) +target_link_libraries(hepnos-daemon hepnos-service yaml-cpp margo sdskv-server thallium) add_executable(hepnos-shutdown hepnos-shutdown.cpp) target_link_libraries(hepnos-shutdown hepnos yaml-cpp margo) diff --git a/include/hepnos/DataSet.hpp b/include/hepnos/DataSet.hpp index f42172ac71ae1bd3b234b1aaea7acdfc653e9dfc..840423453412c80f7c81b767e0f7c2b28b39f7ee 100644 --- a/include/hepnos/DataSet.hpp +++ b/include/hepnos/DataSet.hpp @@ -7,6 +7,7 @@ #define __HEPNOS_DATA_SET_H #include +#include #include #include #include @@ -47,7 +48,7 @@ class DataSet : public KeyValueContainer { * @param container Full name of the parent DataSet ("" if no parent). * @param name Name of the DataSet. */ - DataSet(DataStore* ds, uint8_t level, const std::string& container, const std::string& name); + DataSet(DataStore* ds, uint8_t level, const std::shared_ptr& container, const std::string& name); /** @@ -393,6 +394,19 @@ class DataSet : public KeyValueContainer { * @return a Run corresponding to the provided run number. */ Run operator[](const RunNumber& runNumber) const; + + /** + * @brief Executes a callback for each event in the hierarchy down + * from this DataSet. The events are dispatched to members of the comm + * MPI communicator. + * + * The callback should not make MPI calls. + * + * @param comm MPI communicator. + * @param callback Callback that will be called on each item. + */ + void foreach(MPI_Comm comm, + const std::function& callback); }; } diff --git a/include/hepnos/Event.hpp b/include/hepnos/Event.hpp index cd8d84a9b8c1d9e60e2cd4941240b22abc7170f3..07ef36fd7bf2409a8e46568c445f326549d1ae64 100644 --- a/include/hepnos/Event.hpp +++ b/include/hepnos/Event.hpp @@ -32,7 +32,7 @@ class Event : public KeyValueContainer { * @param container Full name of the container containing the event. * @param n Event number. */ - Event(DataStore* datastore, uint8_t level, const std::string& container, const EventNumber& n); + Event(DataStore* datastore, uint8_t level, const std::shared_ptr& container, const EventNumber& n); public: diff --git a/include/hepnos/Run.hpp b/include/hepnos/Run.hpp index 2f24138bd5cfa44e61a7c0d4c760f6e17c81c348..6f65e73720db2fb7252b1a8f0e7821c3d4597a1f 100644 --- a/include/hepnos/Run.hpp +++ b/include/hepnos/Run.hpp @@ -36,7 +36,8 @@ class Run : public KeyValueContainer { * @param container Full name of the dataset containing the run. * @param run Run number. */ - Run(DataStore* datastore, uint8_t level, const std::string& container, const RunNumber& run); + Run(DataStore* datastore, uint8_t level, + const std::shared_ptr& container, const RunNumber& run); public: diff --git a/include/hepnos/SubRun.hpp b/include/hepnos/SubRun.hpp index 7f1d739239fe0d8da64005ca6d4ecf8aaf8bce6c..34cd0bb5b50d8881d3ba1cc537d124ffafdce79e 100644 --- a/include/hepnos/SubRun.hpp +++ b/include/hepnos/SubRun.hpp @@ -33,7 +33,7 @@ class SubRun : public KeyValueContainer { * @param container Full name of the dataset containing the run. * @param run SubRun number. */ - SubRun(DataStore* datastore, uint8_t level, const std::string& container, const SubRunNumber& run); + SubRun(DataStore* datastore, uint8_t level, const std::shared_ptr& container, const SubRunNumber& run); public: diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 40ee7be70234ecf442885c207266f7213f29e710..9c4a7d3e849d4f663c3de480a8e90505a0a8b5fe 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -41,7 +41,7 @@ set_target_properties (hepnos SOVERSION ${HEPNOS_VERSION_MAJOR}) add_library(hepnos-service ${hepnos-service-src}) -target_link_libraries (hepnos mercury margo yaml-cpp sdskv-client sdskv-server ch-placement) +target_link_libraries (hepnos thallium mercury margo yaml-cpp sdskv-client sdskv-server ch-placement) target_include_directories (hepnos-service PUBLIC $) # local include's BEFORE, in case old incompatable .h files in prefix/include diff --git a/src/DataSet.cpp b/src/DataSet.cpp index 315024b162d76b63e30c98109258fe4c6426e1a3..fe9babd2376fa3d2cdb6f840a9492683f240e977 100644 --- a/src/DataSet.cpp +++ b/src/DataSet.cpp @@ -15,20 +15,20 @@ namespace hepnos { DataSet::DataSet() -: m_impl(std::make_unique(this, nullptr, 0, "", "")) {} +: m_impl(std::make_unique(this, nullptr, 0, std::make_shared(""), "")) {} DataSet::DataSet(DataStore* ds, uint8_t level, const std::string& fullname) -: m_impl(std::make_unique(this, ds, level, "", "")) { +: m_impl(std::make_unique(this, ds, level, std::make_shared(""), "")) { size_t p = fullname.find_last_of('/'); if(p == std::string::npos) { m_impl->m_name = fullname; } else { m_impl->m_name = fullname.substr(p+1); - m_impl->m_container = fullname.substr(0, p); + m_impl->m_container = std::make_shared(fullname.substr(0, p)); } } -DataSet::DataSet(DataStore* ds, uint8_t level, const std::string& container, const std::string& name) +DataSet::DataSet(DataStore* ds, uint8_t level, const std::shared_ptr& container, const std::string& name) : m_impl(std::make_unique(this, ds, level, container, name)) {} DataSet::DataSet(const DataSet& other) { @@ -85,7 +85,7 @@ DataSet DataSet::next() const { std::vector keys; size_t s = m_impl->m_datastore->m_impl->nextKeys( - m_impl->m_level, m_impl->m_container, m_impl->m_name, keys, 1); + m_impl->m_level, *m_impl->m_container, m_impl->m_name, keys, 1); if(s == 0) return DataSet(); return DataSet(m_impl->m_datastore, m_impl->m_level, keys[0]); } @@ -136,10 +136,10 @@ bool DataSet::operator==(const DataSet& other) const { if(!v1 && !v2) return true; if(v1 && !v2) return false; if(!v2 && v2) return false; - return m_impl->m_datastore == other.m_impl->m_datastore - && m_impl->m_level == other.m_impl->m_level - && m_impl->m_container == other.m_impl->m_container - && m_impl->m_name == other.m_impl->m_name; + return m_impl->m_datastore == other.m_impl->m_datastore + && m_impl->m_level == other.m_impl->m_level + && *m_impl->m_container == *other.m_impl->m_container + && m_impl->m_name == other.m_impl->m_name; } bool DataSet::operator!=(const DataSet& other) const { @@ -157,7 +157,7 @@ const std::string& DataSet::container() const { if(!valid()) { throw Exception("Calling DataSet member function on an invalid DataSet"); } - return m_impl->m_container; + return *m_impl->m_container; } std::string DataSet::fullname() const { @@ -175,7 +175,7 @@ DataSet DataSet::createDataSet(const std::string& name) { } std::string parent = fullname(); m_impl->m_datastore->m_impl->store(m_impl->m_level+1, parent, name, std::string()); - return DataSet(m_impl->m_datastore, m_impl->m_level+1, parent, name); + return DataSet(m_impl->m_datastore, m_impl->m_level+1, std::make_shared(parent), name); } Run DataSet::createRun(const RunNumber& runNumber) { @@ -185,7 +185,8 @@ Run DataSet::createRun(const RunNumber& runNumber) { std::string parent = fullname(); std::string runStr = Run::Impl::makeKeyStringFromRunNumber(runNumber); m_impl->m_datastore->m_impl->store(m_impl->m_level+1, parent, runStr, std::string()); - return Run(m_impl->m_datastore, m_impl->m_level+1, parent, runNumber); + return Run(m_impl->m_datastore, m_impl->m_level+1, + std::make_shared(parent), runNumber); } Run DataSet::createRun(WriteBatch& batch, const RunNumber& runNumber) { @@ -195,7 +196,8 @@ Run DataSet::createRun(WriteBatch& batch, const RunNumber& runNumber) { std::string parent = fullname(); std::string runStr = Run::Impl::makeKeyStringFromRunNumber(runNumber); batch.m_impl->store(m_impl->m_level+1, parent, runStr, std::string()); - return Run(m_impl->m_datastore, m_impl->m_level+1, parent, runNumber); + return Run(m_impl->m_datastore, m_impl->m_level+1, + std::make_shared(parent), runNumber); } DataSet DataSet::operator[](const std::string& datasetName) const { @@ -241,7 +243,7 @@ DataSet::iterator DataSet::find(const std::string& datasetPath) { if(!b) { return m_impl->m_datastore->end(); } - return iterator(DataSet(m_impl->m_datastore, level, containerName, datasetName)); + return iterator(DataSet(m_impl->m_datastore, level, std::make_shared(containerName), datasetName)); } DataSet::const_iterator DataSet::find(const std::string& datasetName) const { @@ -255,7 +257,7 @@ DataSet::iterator DataSet::begin() { } // we use the prefix "&" because we need something that comes after "%" // (which represents runs) and is not going to be in a dataset name - DataSet ds(m_impl->m_datastore, m_impl->m_level+1, fullname(),"&"); + DataSet ds(m_impl->m_datastore, m_impl->m_level+1, std::make_shared(fullname()),"&"); ds = ds.next(); if(ds.valid()) return iterator(ds); else return end(); @@ -303,7 +305,7 @@ DataSet::iterator DataSet::lower_bound(const std::string& lb) { ++it; return it; } - DataSet ds(m_impl->m_datastore, m_impl->m_level+1, fullname(), lb2); + DataSet ds(m_impl->m_datastore, m_impl->m_level+1, std::make_shared(fullname()), lb2); ds = ds.next(); if(!ds.valid()) return end(); else return iterator(ds); @@ -318,7 +320,7 @@ DataSet::iterator DataSet::upper_bound(const std::string& ub) { if(!valid()) { throw Exception("Calling DataSet member function on an invalid DataSet"); } - DataSet ds(m_impl->m_datastore, m_impl->m_level+1, fullname(), ub); + DataSet ds(m_impl->m_datastore, m_impl->m_level+1, std::make_shared(fullname()), ub); ds = ds.next(); if(!ds.valid()) return end(); else return iterator(ds); @@ -340,4 +342,10 @@ const RunSet& DataSet::runs() const { return const_cast(this)->runs(); } +void DataSet::foreach(MPI_Comm comm, + const std::function& callback) +{ + // TODO +} + } diff --git a/src/DataStore.cpp b/src/DataStore.cpp index 7ebfa04f16d7653b8e2664b8ffc6e3012fcce7e7..c24bfc6aaa86c6c5b91a95bb85b7402121a995fd 100644 --- a/src/DataStore.cpp +++ b/src/DataStore.cpp @@ -81,7 +81,7 @@ DataStore::iterator DataStore::find(const std::string& datasetPath) { if(!b) { return m_impl->m_end; } - return iterator(DataSet(this, level, containerName, datasetName)); + return iterator(DataSet(this, level, std::make_shared(containerName), datasetName)); } DataSet DataStore::operator[](const std::string& datasetName) const { @@ -100,7 +100,7 @@ DataStore::iterator DataStore::begin() { if(!m_impl) { throw Exception("Calling DataStore member function on an invalid DataStore object"); } - DataSet ds(this, 1, "", ""); + DataSet ds(this, 1, std::make_shared(""), ""); ds = ds.next(); if(ds.valid()) return iterator(std::move(ds)); else return end(); @@ -143,7 +143,7 @@ DataStore::iterator DataStore::lower_bound(const std::string& lb) { ++it; return it; } - DataSet ds(this, 1, "", lb2); + DataSet ds(this, 1, std::make_shared(""), lb2); ds = ds.next(); if(!ds.valid()) return end(); else return iterator(std::move(ds)); @@ -158,7 +158,7 @@ DataStore::iterator DataStore::upper_bound(const std::string& ub) { if(!m_impl) { throw Exception("Calling DataStore member function on an invalid DataStore object"); } - DataSet ds(this, 1, "", ub); + DataSet ds(this, 1, std::make_shared(""), ub); ds = ds.next(); if(!ds.valid()) return end(); else return iterator(std::move(ds)); @@ -178,7 +178,7 @@ DataSet DataStore::createDataSet(const std::string& name) { throw Exception("Invalid character ('/' or '%') in dataset name"); } m_impl->store(1, "", name, std::string()); - return DataSet(this, 1, "", name); + return DataSet(this, 1, std::make_shared(""), name); } DataSet DataStore::createDataSet(WriteBatch& batch, const std::string& name) { @@ -190,7 +190,7 @@ DataSet DataStore::createDataSet(WriteBatch& batch, const std::string& name) { throw Exception("Invalid character ('/' or '%') in dataset name"); } batch.m_impl->store(1, "", name, std::string()); - return DataSet(this, 1, "", name); + return DataSet(this, 1, std::make_shared(""), name); } void DataStore::shutdown() { diff --git a/src/Event.cpp b/src/Event.cpp index 24518e957fd1fa17e2fbfda158421aba0ed67e2a..28e597e3bd7ddec8d5d4430189767786053a1521 100644 --- a/src/Event.cpp +++ b/src/Event.cpp @@ -11,9 +11,9 @@ namespace hepnos { Event::Event() -: m_impl(std::make_unique(nullptr, 0, "", InvalidEventNumber)) {} +: m_impl(std::make_unique(nullptr, 0, std::make_shared(""), InvalidEventNumber)) {} -Event::Event(DataStore* ds, uint8_t level, const std::string& container, const EventNumber& rn) +Event::Event(DataStore* ds, uint8_t level, const std::shared_ptr& container, const EventNumber& rn) : m_impl(std::make_unique(ds, level, container, rn)) { } Event::Event(const Event& other) { @@ -46,10 +46,10 @@ Event Event::next() const { std::vector keys; size_t s = m_impl->m_datastore->m_impl->nextKeys( - m_impl->m_level, m_impl->m_container, + m_impl->m_level, *m_impl->m_container, m_impl->makeKeyStringFromEventNumber(), keys, 1); if(s == 0) return Event(); - size_t i = m_impl->m_container.size()+1; + size_t i = m_impl->m_container->size()+1; if(keys[0].size() <= i) return Event(); if(keys[0][i] != '%') return Event(); std::stringstream strEventNumber; @@ -107,10 +107,10 @@ bool Event::operator==(const Event& other) const { if(!v1 && !v2) return true; if(!v1 && v2) return false; if(v1 && !v2) return false; - return m_impl->m_datastore == other.m_impl->m_datastore - && m_impl->m_level == other.m_impl->m_level - && m_impl->m_container == other.m_impl->m_container - && m_impl->m_event_nr == other.m_impl->m_event_nr; + return m_impl->m_datastore == other.m_impl->m_datastore + && m_impl->m_level == other.m_impl->m_level + && *m_impl->m_container == *other.m_impl->m_container + && m_impl->m_event_nr == other.m_impl->m_event_nr; } bool Event::operator!=(const Event& other) const { diff --git a/src/Run.cpp b/src/Run.cpp index 482324234f101b5d4412c11becd855c8ba5413ec..9b73c1c98d4e7c81fb1c7d3b117024f6562ca845 100644 --- a/src/Run.cpp +++ b/src/Run.cpp @@ -12,9 +12,9 @@ namespace hepnos { Run::Run() -: m_impl(std::make_unique(nullptr, 0, "", InvalidRunNumber)) {} +: m_impl(std::make_unique(nullptr, 0, std::make_shared(""), InvalidRunNumber)) {} -Run::Run(DataStore* ds, uint8_t level, const std::string& container, const RunNumber& rn) +Run::Run(DataStore* ds, uint8_t level, const std::shared_ptr& container, const RunNumber& rn) : m_impl(std::make_unique(ds, level, container, rn)) { } Run::Run(const Run& other) { @@ -48,10 +48,10 @@ Run Run::next() const { std::vector keys; size_t s = m_impl->m_datastore->m_impl->nextKeys( - m_impl->m_level, m_impl->m_container, + m_impl->m_level, *m_impl->m_container, m_impl->makeKeyStringFromRunNumber(), keys, 1); if(s == 0) return Run(); - size_t i = m_impl->m_container.size()+1; + size_t i = m_impl->m_container->size()+1; if(keys[0].size() <= i) return Run(); if(keys[0][i] != '%') return Run(); std::stringstream strRunNumber; @@ -111,7 +111,7 @@ bool Run::operator==(const Run& other) const { if(!v1 && v2) return false; return m_impl->m_datastore == other.m_impl->m_datastore && m_impl->m_level == other.m_impl->m_level - && m_impl->m_container == other.m_impl->m_container + && *m_impl->m_container == *other.m_impl->m_container && m_impl->m_run_nr == other.m_impl->m_run_nr; } @@ -130,7 +130,7 @@ const std::string& Run::container() const { if(!valid()) { throw Exception("Calling Run member function on an invalid Run object"); } - return m_impl->m_container; + return *m_impl->m_container; } SubRun Run::createSubRun(const SubRunNumber& subRunNumber) { @@ -140,7 +140,8 @@ SubRun Run::createSubRun(const SubRunNumber& subRunNumber) { std::string parent = m_impl->fullpath(); std::string subRunStr = SubRun::Impl::makeKeyStringFromSubRunNumber(subRunNumber); m_impl->m_datastore->m_impl->store(m_impl->m_level+1, parent, subRunStr, std::string()); - return SubRun(m_impl->m_datastore, m_impl->m_level+1, parent, subRunNumber); + return SubRun(m_impl->m_datastore, m_impl->m_level+1, + std::make_shared(parent), subRunNumber); } SubRun Run::createSubRun(WriteBatch& batch, const SubRunNumber& subRunNumber) { @@ -150,7 +151,8 @@ SubRun Run::createSubRun(WriteBatch& batch, const SubRunNumber& subRunNumber) { std::string parent = m_impl->fullpath(); std::string subRunStr = SubRun::Impl::makeKeyStringFromSubRunNumber(subRunNumber); batch.m_impl->store(m_impl->m_level+1, parent, subRunStr, std::string()); - return SubRun(m_impl->m_datastore, m_impl->m_level+1, parent, subRunNumber); + return SubRun(m_impl->m_datastore, m_impl->m_level+1, + std::make_shared(parent), subRunNumber); } SubRun Run::operator[](const SubRunNumber& subRunNumber) const { @@ -171,7 +173,8 @@ Run::iterator Run::find(const SubRunNumber& subRunNumber) { if(!b) { return m_impl->m_end; } - return iterator(SubRun(m_impl->m_datastore, m_impl->m_level+1, parent, subRunNumber)); + return iterator(SubRun(m_impl->m_datastore, m_impl->m_level+1, + std::make_shared(parent), subRunNumber)); } Run::const_iterator Run::find(const SubRunNumber& subRunNumber) const { @@ -186,7 +189,7 @@ Run::iterator Run::begin() { auto level = m_impl->m_level; auto datastore = m_impl->m_datastore; std::string container = m_impl->fullpath(); - SubRun subrun(datastore, level+1, container, 0); + SubRun subrun(datastore, level+1, std::make_shared(container), 0); subrun = subrun.next(); if(subrun.valid()) return iterator(subrun); @@ -230,7 +233,7 @@ Run::iterator Run::lower_bound(const SubRunNumber& lb) { } else { SubRun subrun(m_impl->m_datastore, m_impl->m_level+1, - m_impl->fullpath(), 0); + std::make_shared(m_impl->fullpath()), 0); subrun = subrun.next(); if(!subrun.valid()) return end(); else return iterator(subrun); @@ -243,7 +246,7 @@ Run::iterator Run::lower_bound(const SubRunNumber& lb) { } SubRun subrun(m_impl->m_datastore, m_impl->m_level+1, - m_impl->fullpath(), lb-1); + std::make_shared(m_impl->fullpath()), lb-1); subrun = subrun.next(); if(!subrun.valid()) return end(); else return iterator(subrun); @@ -261,7 +264,7 @@ Run::iterator Run::upper_bound(const SubRunNumber& ub) { } SubRun subrun(m_impl->m_datastore, m_impl->m_level+1, - m_impl->fullpath(), ub); + std::make_shared(m_impl->fullpath()), ub); subrun = subrun.next(); if(!subrun.valid()) return end(); else return iterator(subrun); diff --git a/src/RunSet.cpp b/src/RunSet.cpp index 8abf8fd74d7717b1224318f28e33817671d3665b..f54c5f1c1063cdabff041868bf55c475f533cb30 100644 --- a/src/RunSet.cpp +++ b/src/RunSet.cpp @@ -40,7 +40,7 @@ RunSet::iterator RunSet::find(const RunNumber& runNumber) { auto level = m_impl->m_dataset->m_impl->m_level; bool b = datastore->m_impl->exists(level+1, parent, strNum); if(!b) return end(); - return iterator(Run(datastore, level+1, parent, runNumber)); + return iterator(Run(datastore, level+1, std::make_shared(parent), runNumber)); } RunSet::const_iterator RunSet::find(const RunNumber& runNumber) const { @@ -55,7 +55,7 @@ RunSet::iterator RunSet::begin() { auto ds_level = m_impl->m_dataset->m_impl->m_level; auto datastore = m_impl->m_dataset->m_impl->m_datastore; std::string container = m_impl->m_dataset->fullname(); - Run run(datastore, ds_level+1, container, 0); + Run run(datastore, ds_level+1, std::make_shared(container), 0); run = run.next(); if(run.valid()) return iterator(run); @@ -90,7 +90,7 @@ RunSet::iterator RunSet::lower_bound(const RunNumber& lb) { } else { Run run(m_impl->m_dataset->m_impl->m_datastore, m_impl->m_dataset->m_impl->m_level+1, - m_impl->m_dataset->fullname(), 0); + std::make_shared(m_impl->m_dataset->fullname()), 0); run = run.next(); if(!run.valid()) return end(); else return iterator(run); @@ -103,7 +103,7 @@ RunSet::iterator RunSet::lower_bound(const RunNumber& lb) { } Run run(m_impl->m_dataset->m_impl->m_datastore, m_impl->m_dataset->m_impl->m_level+1, - m_impl->m_dataset->fullname(), lb-1); + std::make_shared(m_impl->m_dataset->fullname()), lb-1); run = run.next(); if(!run.valid()) return end(); else return iterator(run); @@ -118,7 +118,7 @@ RunSet::const_iterator RunSet::lower_bound(const RunNumber& lb) const { RunSet::iterator RunSet::upper_bound(const RunNumber& ub) { Run run(m_impl->m_dataset->m_impl->m_datastore, m_impl->m_dataset->m_impl->m_level+1, - m_impl->m_dataset->fullname(), ub); + std::make_shared(m_impl->m_dataset->fullname()), ub); run = run.next(); if(!run.valid()) return end(); else return iterator(run); diff --git a/src/SubRun.cpp b/src/SubRun.cpp index f99299a029b1f6bf1f985b9aeaa7753733621edb..a3fb4854412de61e500fde065f49d03313a45bb4 100644 --- a/src/SubRun.cpp +++ b/src/SubRun.cpp @@ -4,6 +4,7 @@ * See COPYRIGHT in top-level directory. */ +#include #include "hepnos/SubRun.hpp" #include "private/SubRunImpl.hpp" #include "private/EventImpl.hpp" @@ -13,9 +14,9 @@ namespace hepnos { SubRun::SubRun() -: m_impl(std::make_unique(nullptr, 0, "", InvalidSubRunNumber)) {} +: m_impl(std::make_unique(nullptr, 0, std::make_shared(""), InvalidSubRunNumber)) {} -SubRun::SubRun(DataStore* ds, uint8_t level, const std::string& container, const SubRunNumber& rn) +SubRun::SubRun(DataStore* ds, uint8_t level, const std::shared_ptr& container, const SubRunNumber& rn) : m_impl(std::make_unique(ds, level, container, rn)) { } SubRun::SubRun(const SubRun& other) { @@ -50,10 +51,10 @@ SubRun SubRun::next() const { std::vector keys; size_t s = m_impl->m_datastore->m_impl->nextKeys( - m_impl->m_level, m_impl->m_container, + m_impl->m_level, *m_impl->m_container, m_impl->makeKeyStringFromSubRunNumber(), keys, 1); if(s == 0) return SubRun(); - size_t i = m_impl->m_container.size()+1; + size_t i = m_impl->m_container->size()+1; if(keys[0].size() <= i) return SubRun(); if(keys[0][i] != '%') return SubRun(); std::stringstream strSubRunNumber; @@ -131,7 +132,8 @@ Event SubRun::createEvent(const EventNumber& eventNumber) { std::string parent = m_impl->fullpath(); std::string eventStr = Event::Impl::makeKeyStringFromEventNumber(eventNumber); m_impl->m_datastore->m_impl->store(m_impl->m_level+1, parent, eventStr, std::string()); - return Event(m_impl->m_datastore, m_impl->m_level+1, parent, eventNumber); + return Event(m_impl->m_datastore, m_impl->m_level+1, + std::make_shared(parent), eventNumber); } Event SubRun::createEvent(WriteBatch& batch, const EventNumber& eventNumber) { @@ -141,7 +143,8 @@ Event SubRun::createEvent(WriteBatch& batch, const EventNumber& eventNumber) { std::string parent = m_impl->fullpath(); std::string eventStr = Event::Impl::makeKeyStringFromEventNumber(eventNumber); batch.m_impl->store(m_impl->m_level+1, parent, eventStr, std::string()); - return Event(m_impl->m_datastore, m_impl->m_level+1, parent, eventNumber); + return Event(m_impl->m_datastore, m_impl->m_level+1, + std::make_shared(parent), eventNumber); } Event SubRun::operator[](const EventNumber& eventNumber) const { @@ -162,7 +165,8 @@ SubRun::iterator SubRun::find(const EventNumber& eventNumber) { if(!b) { return m_impl->m_end; } - return iterator(Event(m_impl->m_datastore, m_impl->m_level+1, parent, eventNumber)); + return iterator(Event(m_impl->m_datastore, m_impl->m_level+1, + std::make_shared(parent), eventNumber)); } SubRun::const_iterator SubRun::find(const EventNumber& eventNumber) const { @@ -177,7 +181,7 @@ SubRun::iterator SubRun::begin() { auto level = m_impl->m_level; auto datastore = m_impl->m_datastore; std::string container = m_impl->fullpath(); - Event event(datastore, level+1, container, 0); + Event event(datastore, level+1, std::make_shared(container), 0); event = event.next(); if(event.valid()) return iterator(std::move(event)); @@ -221,7 +225,7 @@ SubRun::iterator SubRun::lower_bound(const EventNumber& lb) { } else { Event event(m_impl->m_datastore, m_impl->m_level+1, - m_impl->fullpath(), 0); + std::make_shared(m_impl->fullpath()), 0); event = event.next(); if(!event.valid()) return end(); else return iterator(event); @@ -234,7 +238,7 @@ SubRun::iterator SubRun::lower_bound(const EventNumber& lb) { } Event event(m_impl->m_datastore, m_impl->m_level+1, - m_impl->fullpath(), lb-1); + std::make_shared(m_impl->fullpath()), lb-1); event = event.next(); if(!event.valid()) return end(); else return iterator(event); @@ -252,7 +256,7 @@ SubRun::iterator SubRun::upper_bound(const EventNumber& ub) { } Event event(m_impl->m_datastore, m_impl->m_level+1, - m_impl->fullpath(), ub); + std::make_shared(m_impl->fullpath()), ub); event = event.next(); if(!event.valid()) return end(); else return iterator(event); diff --git a/src/private/DataSetImpl.hpp b/src/private/DataSetImpl.hpp index fa8ea2c08d0a491b9a0da2f4f3033d5400e25f50..2cbc216de95a469cea5cd555ebd4e5b18caba68e 100644 --- a/src/private/DataSetImpl.hpp +++ b/src/private/DataSetImpl.hpp @@ -16,12 +16,12 @@ class DataSet::Impl { DataStore* m_datastore; uint8_t m_level; - std::string m_container; + std::shared_ptr m_container; std::string m_name; RunSet m_runset; Impl(DataSet* dataset, DataStore* ds, uint8_t level, - const std::string& container, const std::string& name) + const std::shared_ptr& container, const std::string& name) : m_datastore(ds) , m_level(level) , m_container(container) diff --git a/src/private/EventImpl.hpp b/src/private/EventImpl.hpp index f0f038274e2bd1f95741c6a6db044bc947303c75..da837c040ee4bad96561fcf573881632dd1330b4 100644 --- a/src/private/EventImpl.hpp +++ b/src/private/EventImpl.hpp @@ -8,6 +8,7 @@ #include #include +#include #include "hepnos/Event.hpp" namespace hepnos { @@ -16,12 +17,12 @@ class Event::Impl { public: - DataStore* m_datastore; - uint8_t m_level; - std::string m_container; - EventNumber m_event_nr; + DataStore* m_datastore; + uint8_t m_level; + std::shared_ptr m_container; + EventNumber m_event_nr; - Impl(DataStore* ds, uint8_t level, const std::string& container, const EventNumber& n) + Impl(DataStore* ds, uint8_t level, const std::shared_ptr& container, const EventNumber& n) : m_datastore(ds) , m_level(level) , m_container(container) @@ -38,7 +39,7 @@ class Event::Impl { } std::string fullpath() const { - return m_container + std::string("/") + makeKeyStringFromEventNumber(); + return *m_container + std::string("/") + makeKeyStringFromEventNumber(); } }; diff --git a/src/private/RunImpl.hpp b/src/private/RunImpl.hpp index f9773f7d6a0500b37c5017c7e4768db249e17936..85acbbe795988d84b36fd35de7a4df8407816f69 100644 --- a/src/private/RunImpl.hpp +++ b/src/private/RunImpl.hpp @@ -8,6 +8,7 @@ #include #include +#include #include "hepnos/Run.hpp" namespace hepnos { @@ -18,11 +19,11 @@ class Run::Impl { DataStore* m_datastore; uint8_t m_level; - std::string m_container; + std::shared_ptr m_container; RunNumber m_run_nr; iterator m_end; - Impl(DataStore* ds, uint8_t level, const std::string& container, const RunNumber& rn) + Impl(DataStore* ds, uint8_t level, const std::shared_ptr& container, const RunNumber& rn) : m_datastore(ds) , m_level(level) , m_container(container) @@ -39,7 +40,7 @@ class Run::Impl { } std::string fullpath() const { - return m_container + std::string("/") + makeKeyStringFromRunNumber(m_run_nr); + return *m_container + std::string("/") + makeKeyStringFromRunNumber(m_run_nr); } }; diff --git a/src/private/SubRunImpl.hpp b/src/private/SubRunImpl.hpp index 82a79b918da6db8a3900c84a43b1b7321daab8b6..75011d986c6f57141441f60836c006ab90cd9f1d 100644 --- a/src/private/SubRunImpl.hpp +++ b/src/private/SubRunImpl.hpp @@ -7,6 +7,7 @@ #define __HEPNOS_PRIVATE_SUBRUN_IMPL_H #include +#include #include #include "hepnos/SubRun.hpp" @@ -18,11 +19,11 @@ class SubRun::Impl { DataStore* m_datastore; uint8_t m_level; - std::string m_container; + std::shared_ptr m_container; SubRunNumber m_subrun_nr; iterator m_end; - Impl(DataStore* ds, uint8_t level, const std::string& container, const SubRunNumber& rn) + Impl(DataStore* ds, uint8_t level, const std::shared_ptr& container, const SubRunNumber& rn) : m_datastore(ds) , m_level(level) , m_container(container) @@ -39,7 +40,7 @@ class SubRun::Impl { } std::string fullpath() const { - return m_container + std::string("/") + makeKeyStringFromSubRunNumber(); + return *m_container + std::string("/") + makeKeyStringFromSubRunNumber(); } }; diff --git a/src/service/HEPnOSService.cpp b/src/service/HEPnOSService.cpp index f2ad4fb1949ce351c1ba873aa44d79bcfb241f2e..451543c22826d10effbda3b1cdee02c05c32f9ac 100644 --- a/src/service/HEPnOSService.cpp +++ b/src/service/HEPnOSService.cpp @@ -10,17 +10,18 @@ #include #include #include -#include +#include #include #include "ServiceConfig.hpp" #include "ConnectionInfoGenerator.hpp" #include "hepnos-service.h" +namespace tl = thallium; + #define ASSERT(__cond, __msg, ...) { if(!(__cond)) { fprintf(stderr, "[%s:%d] " __msg, __FILE__, __LINE__, __VA_ARGS__); exit(-1); } } void hepnos_run_service(MPI_Comm comm, const char* config_file, const char* connection_file) { - margo_instance_id mid; int ret; int rank; @@ -38,29 +39,29 @@ void hepnos_run_service(MPI_Comm comm, const char* config_file, const char* conn return; } - /* Margo initialization */ - mid = margo_init(config->getAddress().c_str(), MARGO_SERVER_MODE, 0, config->getNumThreads()-1); - if (mid == MARGO_INSTANCE_NULL) - { - std::cerr << "Error: unable to initialize margo" << std::endl; + std::unique_ptr engine; + try { + engine = std::make_unique( + config->getAddress(), + THALLIUM_SERVER_MODE, + false, config->getNumThreads()-1); + + } catch(std::exception& ex) { + std::cerr << "Error: unable to initialize thallium" << std::endl; + std::cerr << "Exception: " << ex.what() << std::endl; std::cerr << "Aborting." << std::endl; MPI_Abort(MPI_COMM_WORLD, -1); return; } - margo_enable_remote_shutdown(mid); - - /* Get self address as string */ - hg_addr_t self_addr; - margo_addr_self(mid, &self_addr); - char self_addr_str[128]; - hg_size_t self_addr_str_size = 128; - margo_addr_to_string(mid, self_addr_str, &self_addr_str_size, self_addr); + engine->enable_remote_shutdown(); + auto self_addr_str = static_cast(engine->self()); if(config->hasDatabase()) { /* SDSKV provider initialization */ for(auto sdskv_provider_id = 0; sdskv_provider_id < config->getNumDatabaseProviders(); sdskv_provider_id++) { - sdskv::provider* provider = sdskv::provider::create(mid, sdskv_provider_id, SDSKV_ABT_POOL_DEFAULT); + sdskv::provider* provider = sdskv::provider::create( + engine->get_margo_instance(), sdskv_provider_id, SDSKV_ABT_POOL_DEFAULT); for(unsigned i=0 ; i < config->getNumDatabaseTargets(); i++) { auto db_path = config->getDatabasePath(rank, sdskv_provider_id, i); @@ -81,12 +82,10 @@ void hepnos_run_service(MPI_Comm comm, const char* config_file, const char* conn } } - margo_addr_free(mid, self_addr); - hepnos::ConnectionInfoGenerator fileGen(self_addr_str, config->getNumDatabaseProviders(), config->getNumStorageProviders()); fileGen.generateFile(MPI_COMM_WORLD, connection_file); - margo_wait_for_finalize(mid); + engine->wait_for_finalize(); } diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 4afb69499402bbeed3340bb4cfd529082115da23..c7bce2fafbd6589cf2ff1b03a110976cba998831 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -45,6 +45,9 @@ target_link_libraries(RestartAndReadTest ${CPPUNIT_LIBRARIES} hepnos ${BOOST_DEP add_executable(WriteBatchTest WriteBatchTest.cpp HEPnOSTestMain.cpp) target_link_libraries(WriteBatchTest ${CPPUNIT_LIBRARIES} hepnos ${BOOST_DEPS}) +#add_executable(ParallelMPITest ParallelMPITest.cpp HEPnOSTestMain.cpp) +#target_link_libraries(ParallelMPITest ${CPPUNIT_LIBRARIES} hepnos ${BOOST_DEPS}) + add_test(NAME DataStoreTest COMMAND run-test.sh ./DataStoreTest) add_test(NAME DataSetTest COMMAND run-test.sh ./DataSetTest) add_test(NAME RunSetTest COMMAND run-test.sh ./RunSetTest) @@ -55,3 +58,4 @@ add_test(NAME LoadStoreTest COMMAND run-test.sh ./LoadStoreTest) add_test(NAME WriteBatchTest COMMAND run-test.sh ./WriteBatchTest) add_test(NAME PtrTest COMMAND run-test.sh ./PtrTest) add_test(NAME RestartTest COMMAND run-two-tests.sh ./WriteAndRestartTest ./RestartAndReadTest) +#add_test(NAME ParallelMPITest COMMAND run-test.sh "mpirun -np 4 ./ParallelMPITest" 30) diff --git a/test/HEPnOSTestMain.cpp b/test/HEPnOSTestMain.cpp index 8bc60e1597aad0480e68c713c79740be8e6db6b5..64bb14c145cceadd3e87577eacc733ad4eccd6b6 100644 --- a/test/HEPnOSTestMain.cpp +++ b/test/HEPnOSTestMain.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -10,6 +11,8 @@ int main(int argc, char* argv[]) { if(argc != 2) return 1; + MPI_Init(&argc, &argv); + sleep(1); // Create the datastore datastore = new hepnos::DataStore(argv[1]); @@ -27,9 +30,14 @@ int main(int argc, char* argv[]) // Run the tests. bool wasSucessful = runner.run(); - datastore->shutdown(); + MPI_Barrier(MPI_COMM_WORLD); + int rank; + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + if(rank == 0) datastore->shutdown(); delete datastore; + MPI_Finalize(); + // Return error code 1 if the one of test failed. return wasSucessful ? 0 : 1; } diff --git a/test/run-test.sh b/test/run-test.sh index c40b66b0951f964f0b401b5b63b1245e1e7fb512..4bb1fe1957dd0924339a9de814a0f87b104579ba 100755 --- a/test/run-test.sh +++ b/test/run-test.sh @@ -5,6 +5,8 @@ if [ -z "$MKTEMP" ] ; then exit 1 fi +timeout_sec=${2:-10} + source test-util.sh TEST_DIR=`$MKTEMP -d /tmp/hepnos-XXXXXX` @@ -18,7 +20,7 @@ hepnos_test_start_servers 2 1 20 $CFG_FILE $CON_FILE export HEPNOS_CONFIG_FILE=$CON_FILE # run a connect test client -run_to 10 $1 $CON_FILE +run_to ${timeout_sec} $1 $CON_FILE if [ $? -ne 0 ]; then wait exit 1