Commit 8dcdec7b authored by Matthieu Dorier's avatar Matthieu Dorier
Browse files

Merge branch 'master' of xgitlab.cels.anl.gov:sds/HEPnOS

parents bcf785f1 1a3f5f7f
...@@ -7,10 +7,11 @@ ...@@ -7,10 +7,11 @@
# -DCMAKE_PREFIX_PATH='/dir1;/dir2;/dir3' # -DCMAKE_PREFIX_PATH='/dir1;/dir2;/dir3'
# #
cmake_minimum_required (VERSION 3.0) cmake_minimum_required (VERSION 3.14)
project (hepnos CXX) project (hepnos CXX)
add_definitions (-g) add_definitions (-g)
set (CMAKE_CXX_STANDARD 14)
# add our cmake module directory to the path # add our cmake module directory to the path
set (CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} set (CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH}
...@@ -35,13 +36,11 @@ include_directories(${CMAKE_CURRENT_SOURCE_DIR}/include) ...@@ -35,13 +36,11 @@ include_directories(${CMAKE_CURRENT_SOURCE_DIR}/include)
# packages we depend on # packages we depend on
include (xpkg-import) include (xpkg-import)
find_package (mercury CONFIG REQUIRED) find_package (mercury CONFIG REQUIRED)
find_package (Boost REQUIRED COMPONENTS serialization system filesystem) find_package (Boost REQUIRED COMPONENTS serialization)
include_directories(${Boost_INCLUDE_DIRS}) include_directories(${Boost_INCLUDE_DIRS})
xpkg_import_module (margo REQUIRED margo) xpkg_import_module (margo REQUIRED margo)
xpkg_import_module (sdskv-client REQUIRED sdskv-client) xpkg_import_module (sdskv-client REQUIRED sdskv-client)
xpkg_import_module (bake-client REQUIRED bake-client)
xpkg_import_module (sdskv-server REQUIRED sdskv-server) xpkg_import_module (sdskv-server REQUIRED sdskv-server)
xpkg_import_module (bake-server REQUIRED bake-server)
xpkg_import_module (ch-placement REQUIRED ch-placement) xpkg_import_module (ch-placement REQUIRED ch-placement)
find_package (yaml-cpp REQUIRED) find_package (yaml-cpp REQUIRED)
......
add_executable(hepnos-daemon hepnos-daemon.cpp) add_executable(hepnos-daemon hepnos-daemon.cpp)
target_link_libraries(hepnos-daemon hepnos-service yaml-cpp margo bake-server sdskv-server) target_link_libraries(hepnos-daemon hepnos-service yaml-cpp margo sdskv-server)
add_executable(hepnos-shutdown hepnos-shutdown.cpp) add_executable(hepnos-shutdown hepnos-shutdown.cpp)
target_link_libraries(hepnos-shutdown hepnos yaml-cpp margo) target_link_libraries(hepnos-shutdown hepnos yaml-cpp margo)
......
...@@ -9,14 +9,8 @@ ...@@ -9,14 +9,8 @@
#include <vector> #include <vector>
#include <unistd.h> #include <unistd.h>
#include <mpi.h> #include <mpi.h>
#include <margo.h>
#include <bake-server.h>
#include <sdskv-server.h>
#include <yaml-cpp/yaml.h>
#include "hepnos-service.h" #include "hepnos-service.h"
#define ASSERT(__cond, __msg, ...) { if(!(__cond)) { fprintf(stderr, "[%s:%d] " __msg, __FILE__, __LINE__, __VA_ARGS__); exit(-1); } }
void usage(void) void usage(void)
{ {
fprintf(stderr, "Usage: hepnos-daemon <config-file> <connection-file>\n"); fprintf(stderr, "Usage: hepnos-daemon <config-file> <connection-file>\n");
......
...@@ -13,5 +13,6 @@ ...@@ -13,5 +13,6 @@
#include <hepnos/RunSet.hpp> #include <hepnos/RunSet.hpp>
#include <hepnos/SubRun.hpp> #include <hepnos/SubRun.hpp>
#include <hepnos/SubRunNumber.hpp> #include <hepnos/SubRunNumber.hpp>
#include <hepnos/WriteBatch.hpp>
#endif #endif
...@@ -161,7 +161,10 @@ class DataSet : public KeyValueContainer { ...@@ -161,7 +161,10 @@ class DataSet : public KeyValueContainer {
* @return a valid ProductID if the key did not already exist and the write succeeded, * @return a valid ProductID if the key did not already exist and the write succeeded,
* an invalid one otherwise. * an invalid one otherwise.
*/ */
ProductID storeRawData(const std::string& key, const std::vector<char>& buffer) override; ProductID storeRawData(const std::string& key, const std::string& value) override;
ProductID storeRawData(std::string&& key, std::string&& value) override;
ProductID storeRawData(WriteBatch& batch, const std::string& key, const std::string& value) override;
ProductID storeRawData(WriteBatch& batch, std::string&& key, std::string&& value) override;
/** /**
* @brief Loads binary data associated with a particular key from the DataSet. * @brief Loads binary data associated with a particular key from the DataSet.
...@@ -174,7 +177,7 @@ class DataSet : public KeyValueContainer { ...@@ -174,7 +177,7 @@ class DataSet : public KeyValueContainer {
* @return true if the key exists and the read succeeded, * @return true if the key exists and the read succeeded,
* false otherwise. * false otherwise.
*/ */
bool loadRawData(const std::string& key, std::vector<char>& buffer) const override; bool loadRawData(const std::string& key, std::string& value) const override;
/** /**
* @brief Comparison operator. * @brief Comparison operator.
...@@ -222,6 +225,7 @@ class DataSet : public KeyValueContainer { ...@@ -222,6 +225,7 @@ class DataSet : public KeyValueContainer {
* @return A Run instance pointing to the created run. * @return A Run instance pointing to the created run.
*/ */
Run createRun(const RunNumber& runNumber); Run createRun(const RunNumber& runNumber);
Run createRun(WriteBatch& batch, const RunNumber& runNumber);
typedef DataStore::const_iterator const_iterator; typedef DataStore::const_iterator const_iterator;
typedef DataStore::iterator iterator; typedef DataStore::iterator iterator;
......
...@@ -19,6 +19,7 @@ class Run; ...@@ -19,6 +19,7 @@ class Run;
class SubRun; class SubRun;
class Event; class Event;
template<typename T, typename C = std::vector<T>> class Ptr; template<typename T, typename C = std::vector<T>> class Ptr;
class WriteBatch;
/** /**
* The DataStore class is the main handle referencing an HEPnOS service. * The DataStore class is the main handle referencing an HEPnOS service.
...@@ -32,6 +33,7 @@ class DataStore { ...@@ -32,6 +33,7 @@ class DataStore {
friend class Run; friend class Run;
friend class SubRun; friend class SubRun;
friend class Event; friend class Event;
friend class WriteBatch;
public: public:
...@@ -249,6 +251,18 @@ class DataStore { ...@@ -249,6 +251,18 @@ class DataStore {
*/ */
DataSet createDataSet(const std::string& name); DataSet createDataSet(const std::string& name);
/**
* @brief Creates a dataset with a given name inside the data store.
* This function takes a WriteBatch instance, the dataset will be
* actually created when this batch is flushed or destroyed.
*
* @param batch WriteBatch in which to enqueue the creation.
* @param name Name of the dataset.
*
* @return A DataSet instance pointing to the created dataset.
*/
DataSet createDataSet(WriteBatch& batch, const std::string& name);
/** /**
* @brief Create a pointer to a product. The type T used must * @brief Create a pointer to a product. The type T used must
* match the type of the product corresponding to the provided * match the type of the product corresponding to the provided
...@@ -315,7 +329,7 @@ class DataStore { ...@@ -315,7 +329,7 @@ class DataStore {
* *
* @return true if the data was loaded successfuly, false otherwise. * @return true if the data was loaded successfuly, false otherwise.
*/ */
bool loadRawProduct(const ProductID& productID, std::vector<char>& buffer); bool loadRawProduct(const ProductID& productID, std::string& buffer);
}; };
class DataStore::const_iterator { class DataStore::const_iterator {
...@@ -560,7 +574,7 @@ Ptr<T,C> DataStore::makePtr(const ProductID& productID, std::size_t index) { ...@@ -560,7 +574,7 @@ Ptr<T,C> DataStore::makePtr(const ProductID& productID, std::size_t index) {
template<typename T> template<typename T>
bool DataStore::loadProduct(const ProductID& productID, T& t) { bool DataStore::loadProduct(const ProductID& productID, T& t) {
std::vector<char> buffer; std::string buffer;
if(!loadRawProduct(productID, buffer)) { if(!loadRawProduct(productID, buffer)) {
return false; return false;
} }
......
...@@ -105,21 +105,24 @@ class Event : public KeyValueContainer { ...@@ -105,21 +105,24 @@ class Event : public KeyValueContainer {
* @brief Stores raw key/value data in this Event. * @brief Stores raw key/value data in this Event.
* *
* @param key Key * @param key Key
* @param buffer Value * @param value Value
* *
* @return a valid ProductID if the key did not already exist, an invalid one otherwise. * @return a valid ProductID if the key did not already exist, an invalid one otherwise.
*/ */
ProductID storeRawData(const std::string& key, const std::vector<char>& buffer) override; ProductID storeRawData(const std::string& key, const std::string& value) override;
ProductID storeRawData(std::string&& key, std::string&& value) override;
ProductID storeRawData(WriteBatch& batch, const std::string& key, const std::string& value) override;
ProductID storeRawData(WriteBatch& batch, std::string&& key, std::string&& value) override;
/** /**
* @brief Loads raw key/value data from this Event. * @brief Loads raw key/value data from this Event.
* *
* @param key Key * @param key Key
* @param buffer Buffer used to hold the value. * @param value Buffer used to hold the value.
* *
* @return true if the key exists, false otherwise. * @return true if the key exists, false otherwise.
*/ */
bool loadRawData(const std::string& key, std::vector<char>& buffer) const override; bool loadRawData(const std::string& key, std::string& value) const override;
/** /**
* @brief Compares this Event with another Event. The Events must point to * @brief Compares this Event with another Event. The Events must point to
......
...@@ -17,7 +17,10 @@ class Exception : public std::exception ...@@ -17,7 +17,10 @@ class Exception : public std::exception
public: public:
Exception() = default;
Exception(const std::string& msg) : m_msg(msg){} Exception(const std::string& msg) : m_msg(msg){}
Exception(const Exception&) = default;
Exception& operator=(const Exception&) = default;
virtual const char* what() const noexcept override virtual const char* what() const noexcept override
{ {
......
...@@ -19,6 +19,8 @@ ...@@ -19,6 +19,8 @@
namespace hepnos { namespace hepnos {
class WriteBatch;
class KeyValueContainer { class KeyValueContainer {
public: public:
...@@ -69,7 +71,13 @@ class KeyValueContainer { ...@@ -69,7 +71,13 @@ class KeyValueContainer {
* *
* @return A valid ProductID if the key did not already exist, an invalid one otherwise. * @return A valid ProductID if the key did not already exist, an invalid one otherwise.
*/ */
virtual ProductID storeRawData(const std::string& key, const std::vector<char>& buffer) = 0; virtual ProductID storeRawData(const std::string& key, const std::string& value) = 0;
virtual ProductID storeRawData(std::string&& key, std::string&& value) = 0;
virtual ProductID storeRawData(WriteBatch& batch, const std::string& key, const std::string& value) = 0;
virtual ProductID storeRawData(WriteBatch& batch, std::string&& key, std::string&& value) = 0;
/** /**
* @brief Loads raw key/value data from this KeyValueContainer. * @brief Loads raw key/value data from this KeyValueContainer.
...@@ -80,7 +88,7 @@ class KeyValueContainer { ...@@ -80,7 +88,7 @@ class KeyValueContainer {
* *
* @return true if the key exists, false otherwise. * @return true if the key exists, false otherwise.
*/ */
virtual bool loadRawData(const std::string& key, std::vector<char>& buffer) const = 0; virtual bool loadRawData(const std::string& key, std::string& buffer) const = 0;
/** /**
* @brief Stores a key/value pair into the KeyValueContainer. * @brief Stores a key/value pair into the KeyValueContainer.
...@@ -99,18 +107,16 @@ class KeyValueContainer { ...@@ -99,18 +107,16 @@ class KeyValueContainer {
*/ */
template<typename K, typename V> template<typename K, typename V>
ProductID store(const K& key, const V& value) { ProductID store(const K& key, const V& value) {
std::stringstream ss_value; std::string key_str, val_str;
std::stringstream ss_key; serializeKeyValue(key, value, key_str, val_str);
ss_key << key << "#" << demangle<V>(); return storeRawData(std::move(key_str), std::move(val_str));
boost::archive::binary_oarchive oa(ss_value); }
try {
oa << value; template<typename K, typename V>
} catch(...) { ProductID store(WriteBatch& batch, const K& key, const V& value) {
throw Exception("Exception occured during serialization"); std::string key_str, val_str;
} serializeKeyValue(key, value, key_str, val_str);
std::string serialized = ss_value.str(); return storeRawData(batch, std::move(key_str), std::move(val_str));
std::vector<char> buffer(serialized.begin(), serialized.end());
return storeRawData(ss_key.str(), buffer);
} }
/** /**
...@@ -130,15 +136,14 @@ class KeyValueContainer { ...@@ -130,15 +136,14 @@ class KeyValueContainer {
*/ */
template<typename K, typename V> template<typename K, typename V>
bool load(const K& key, V& value) const { bool load(const K& key, V& value) const {
std::vector<char> buffer; std::string buffer;
std::stringstream ss_key; std::stringstream ss_key;
ss_key << key << "#" << demangle<V>(); ss_key << key << "#" << demangle<V>();
if(!loadRawData(ss_key.str(), buffer)) { if(!loadRawData(ss_key.str(), buffer)) {
return false; return false;
} }
try { try {
std::string serialized(buffer.begin(), buffer.end()); std::stringstream ss(buffer);
std::stringstream ss(serialized);
//boost::archive::binary_iarchive ia(ss); //boost::archive::binary_iarchive ia(ss);
InputArchive ia(getDataStore(), ss); InputArchive ia(getDataStore(), ss);
ia >> value; ia >> value;
...@@ -147,6 +152,24 @@ class KeyValueContainer { ...@@ -147,6 +152,24 @@ class KeyValueContainer {
} }
return true; return true;
} }
private:
template<typename K, typename V>
static void serializeKeyValue(const K& key, const V& value,
std::string& key_str, std::string& value_str) {
std::stringstream ss_value;
std::stringstream ss_key;
ss_key << key << "#" << demangle<V>();
key_str = std::move(ss_key.str());
boost::archive::binary_oarchive oa(ss_value);
try {
oa << value;
} catch(...) {
throw Exception("Exception occured during serialization");
}
value_str = ss_value.str();
}
}; };
} }
......
...@@ -11,14 +11,16 @@ ...@@ -11,14 +11,16 @@
#include <boost/serialization/access.hpp> #include <boost/serialization/access.hpp>
#include <boost/serialization/string.hpp> #include <boost/serialization/string.hpp>
#include <hepnos/DataStore.hpp>
namespace hepnos { namespace hepnos {
class WriteBatch;
class DataStore;
class ProductID { class ProductID {
friend class DataStore; friend class DataStore;
friend class DataStore::Impl; friend class WriteBatch;
friend class boost::serialization::access; friend class boost::serialization::access;
private: private:
......
...@@ -115,8 +115,10 @@ class Run : public KeyValueContainer { ...@@ -115,8 +115,10 @@ class Run : public KeyValueContainer {
* *
* @return a valid ProductID if the key did not already exist, an invalid one otherwise. * @return a valid ProductID if the key did not already exist, an invalid one otherwise.
*/ */
ProductID storeRawData(const std::string& key, const std::vector<char>& buffer) override; ProductID storeRawData(const std::string& key, const std::string& value) override;
ProductID storeRawData(std::string&& key, std::string&& value) override;
ProductID storeRawData(WriteBatch& batch, const std::string& key, const std::string& value) override;
ProductID storeRawData(WriteBatch& batch, std::string&& key, std::string&& value) override;
/** /**
* @brief Loads raw key/value data from this Run. * @brief Loads raw key/value data from this Run.
* *
...@@ -125,7 +127,7 @@ class Run : public KeyValueContainer { ...@@ -125,7 +127,7 @@ class Run : public KeyValueContainer {
* *
* @return true if the key exists, false otherwise. * @return true if the key exists, false otherwise.
*/ */
bool loadRawData(const std::string& key, std::vector<char>& buffer) const override; bool loadRawData(const std::string& key, std::string& buffer) const override;
/** /**
* @brief Compares this Run with another Run. The Runs must point to * @brief Compares this Run with another Run. The Runs must point to
...@@ -311,6 +313,7 @@ class Run : public KeyValueContainer { ...@@ -311,6 +313,7 @@ class Run : public KeyValueContainer {
* @return a handle to the created or existing SubRun. * @return a handle to the created or existing SubRun.
*/ */
SubRun createSubRun(const SubRunNumber& subRunNumber); SubRun createSubRun(const SubRunNumber& subRunNumber);
SubRun createSubRun(WriteBatch& batch, const SubRunNumber& subRunNumber);
}; };
class Run::const_iterator { class Run::const_iterator {
......
...@@ -112,7 +112,10 @@ class SubRun : public KeyValueContainer { ...@@ -112,7 +112,10 @@ class SubRun : public KeyValueContainer {
* *
* @return a valid ProductID if the key did not already exist, an invalid one otherwise. * @return a valid ProductID if the key did not already exist, an invalid one otherwise.
*/ */
ProductID storeRawData(const std::string& key, const std::vector<char>& buffer) override; ProductID storeRawData(const std::string& key, const std::string& value) override;
ProductID storeRawData(std::string&& key, std::string&& value) override;
ProductID storeRawData(WriteBatch& batch, const std::string& key, const std::string& value) override;
ProductID storeRawData(WriteBatch& batch, std::string&& key, std::string&& value) override;
/** /**
* @brief Loads raw key/value data from this SubRun. * @brief Loads raw key/value data from this SubRun.
...@@ -122,7 +125,7 @@ class SubRun : public KeyValueContainer { ...@@ -122,7 +125,7 @@ class SubRun : public KeyValueContainer {
* *
* @return true if the key exists, false otherwise. * @return true if the key exists, false otherwise.
*/ */
bool loadRawData(const std::string& key, std::vector<char>& buffer) const override; bool loadRawData(const std::string& key, std::string& buffer) const override;
/** /**
* @brief Compares this SubRun with another SubRun. The SubRuns must point to * @brief Compares this SubRun with another SubRun. The SubRuns must point to
...@@ -300,6 +303,7 @@ class SubRun : public KeyValueContainer { ...@@ -300,6 +303,7 @@ class SubRun : public KeyValueContainer {
* @return a handle to the created or existing Event. * @return a handle to the created or existing Event.
*/ */
Event createEvent(const EventNumber& eventNumber); Event createEvent(const EventNumber& eventNumber);
Event createEvent(WriteBatch& batch, const EventNumber& eventNumber);
}; };
class SubRun::const_iterator { class SubRun::const_iterator {
......
/*
* (C) 2019 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __HEPNOS_WRITE_BATCH_H
#define __HEPNOS_WRITE_BATCH_H
#include <memory>
#include <string>
#include <vector>
#include <hepnos/KeyValueContainer.hpp>
#include <hepnos/ProductID.hpp>
#include <hepnos/DataStore.hpp>
#include <hepnos/Exception.hpp>
namespace hepnos {
class DataStore;
class DataSet;
class Run;
class SubRun;
class Event;
class WriteBatch {
friend class DataStore;
friend class DataSet;
friend class Run;
friend class SubRun;
friend class Event;
friend class KeyValueContainer;
private:
class Impl;
std::unique_ptr<Impl> m_impl;
public:
WriteBatch(DataStore& ds);
~WriteBatch();
WriteBatch(const WriteBatch&) = delete;
WriteBatch& operator=(const WriteBatch&) = delete;
WriteBatch(WriteBatch&&) = delete;
WriteBatch& operator=(WriteBatch&&) = delete;
};
}
#endif
...@@ -4,7 +4,8 @@ set(hepnos-src DataStore.cpp ...@@ -4,7 +4,8 @@ set(hepnos-src DataStore.cpp
RunSet.cpp RunSet.cpp
Run.cpp Run.cpp
SubRun.cpp SubRun.cpp
Event.cpp) Event.cpp
WriteBatch.cpp)
set(hepnos-service-src service/HEPnOSService.cpp set(hepnos-service-src service/HEPnOSService.cpp
service/ServiceConfig.cpp service/ServiceConfig.cpp
...@@ -26,7 +27,7 @@ set (hepnos-vers "${HEPNOS_VERSION_MAJOR}.${HEPNOS_VERSION_MINOR}") ...@@ -26,7 +27,7 @@ set (hepnos-vers "${HEPNOS_VERSION_MAJOR}.${HEPNOS_VERSION_MINOR}")
set (HEPNOS_VERSION "${hepnos-vers}.${HEPNOS_VERSION_PATCH}") set (HEPNOS_VERSION "${hepnos-vers}.${HEPNOS_VERSION_PATCH}")
add_library(hepnos ${hepnos-src}) add_library(hepnos ${hepnos-src})
target_link_libraries (hepnos mercury margo yaml-cpp sdskv-client bake-client ch-placement) target_link_libraries (hepnos mercury margo yaml-cpp sdskv-client ch-placement)
target_include_directories (hepnos PUBLIC $<INSTALL_INTERFACE:include>) target_include_directories (hepnos PUBLIC $<INSTALL_INTERFACE:include>)
# local include's BEFORE, in case old incompatable .h files in prefix/include # local include's BEFORE, in case old incompatable .h files in prefix/include
...@@ -40,7 +41,7 @@ set_target_properties (hepnos ...@@ -40,7 +41,7 @@ set_target_properties (hepnos
SOVERSION ${HEPNOS_VERSION_MAJOR}) SOVERSION ${HEPNOS_VERSION_MAJOR})
add_library(hepnos-service ${hepnos-service-src}) add_library(hepnos-service ${hepnos-service-src})
target_link_libraries (hepnos mercury margo yaml-cpp sdskv-client sdskv-server bake-client bake-server ch-placement) target_link_libraries (hepnos mercury margo yaml-cpp sdskv-client sdskv-server ch-placement)
target_include_directories (hepnos-service PUBLIC $<INSTALL_INTERFACE:include>) target_include_directories (hepnos-service PUBLIC $<INSTALL_INTERFACE:include>)
# local include's BEFORE, in case old incompatable .h files in prefix/include # local include's BEFORE, in case old incompatable .h files in prefix/include
......