Commit f6dde05d authored by Matthieu Dorier's avatar Matthieu Dorier
Browse files

big refactoring

parent 756aeb5e
......@@ -21,9 +21,7 @@ class Event;
class DataStoreImpl;
class DataSetImpl;
class RunSetImpl;
class RunImpl;
class SubRunImpl;
class EventImpl;
class ItemImpl;
template<typename T, typename C = std::vector<T>> class Ptr;
class WriteBatch;
......
......@@ -21,13 +21,13 @@ class Event : public KeyValueContainer {
friend class SubRun;
std::shared_ptr<EventImpl> m_impl;
std::shared_ptr<ItemImpl> m_impl;
/**
* @brief Constructor.
*/
Event(std::shared_ptr<EventImpl>&& impl);
Event(const std::shared_ptr<EventImpl>& impl);
Event(std::shared_ptr<ItemImpl>&& impl);
Event(const std::shared_ptr<ItemImpl>& impl);
public:
......
......@@ -14,27 +14,13 @@
namespace hepnos {
class WriteBatch;
class DataStore;
class ProductID {
friend class DataStore;
friend class DataStoreImpl;
friend class WriteBatch;
friend class WriteBatchImpl;
friend class boost::serialization::access;
private:
std::uint8_t m_level;
std::string m_containerName;
std::string m_objectName;
ProductID(std::uint8_t level, const std::string& containerName, const std::string& objectName)
: m_level(level)
, m_containerName(containerName)
, m_objectName(objectName) {}
public:
ProductID() = default;
......@@ -84,7 +70,7 @@ class ProductID {
* underlying service, false otherwise.
*/
bool valid() const {
return m_objectName.size() != 0;
return m_key.size() != 0;
}
/**
......@@ -105,9 +91,7 @@ class ProductID {
* @return true if the ProductIDs are the same, false otherwise.
*/
bool operator==(const ProductID& other) const {
return m_level == other.m_level
&& m_containerName == other.m_containerName
&& m_objectName == other.m_objectName;
return m_key == other.m_key;
}
/**
......@@ -123,11 +107,14 @@ class ProductID {
private:
std::string m_key;
ProductID(const std::string& key)
: m_key(key) {}
template<typename Archive>
void serialize(Archive& ar, const unsigned int version) {
ar & m_level;
ar & m_containerName;
ar & m_objectName;
ar & m_key;
}
};
......
......@@ -25,13 +25,13 @@ class Run : public KeyValueContainer {
friend class RunSet;
friend class DataSet;
std::shared_ptr<RunImpl> m_impl;
std::shared_ptr<ItemImpl> m_impl;
/**
* @brief Constructor.
*/
Run(const std::shared_ptr<RunImpl>& impl);
Run(std::shared_ptr<RunImpl>&& impl);
Run(const std::shared_ptr<ItemImpl>& impl);
Run(std::shared_ptr<ItemImpl>&& impl);
public:
......
......@@ -22,13 +22,13 @@ class SubRun : public KeyValueContainer {
friend class Run;
std::shared_ptr<SubRunImpl> m_impl;
std::shared_ptr<ItemImpl> m_impl;
/**
* @brief Constructor.
*/
SubRun(std::shared_ptr<SubRunImpl>&& impl);
SubRun(const std::shared_ptr<SubRunImpl>& impl);
SubRun(std::shared_ptr<ItemImpl>&& impl);
SubRun(const std::shared_ptr<ItemImpl>& impl);
public:
......
......@@ -46,6 +46,8 @@ struct UUID {
void randomize();
bool operator==(const UUID& other) const;
uint64_t hash() const;
};
template<typename T>
......
......@@ -22,6 +22,7 @@ class DataSet;
class Run;
class SubRun;
class Event;
class WriteBatchImpl;
class WriteBatch {
......@@ -34,9 +35,7 @@ class WriteBatch {
private:
class Impl;
std::unique_ptr<Impl> m_impl;
std::unique_ptr<WriteBatchImpl> m_impl;
public:
......
......@@ -10,6 +10,7 @@ set(hepnos-src DataStore.cpp
set(hepnos-service-src service/HEPnOSService.cpp
service/ServiceConfig.cpp
UUID.cpp
service/ConnectionInfoGenerator.cpp)
# load package helper for generating cmake CONFIG packages
......@@ -28,7 +29,7 @@ set (hepnos-vers "${HEPNOS_VERSION_MAJOR}.${HEPNOS_VERSION_MINOR}")
set (HEPNOS_VERSION "${hepnos-vers}.${HEPNOS_VERSION_PATCH}")
add_library(hepnos ${hepnos-src})
target_link_libraries (hepnos uuid mercury margo yaml-cpp sdskv-client ch-placement)
target_link_libraries (hepnos uuid thallium mercury margo yaml-cpp sdskv-client ch-placement)
target_include_directories (hepnos PUBLIC $<INSTALL_INTERFACE:include>)
# local include's BEFORE, in case old incompatable .h files in prefix/include
......@@ -42,7 +43,7 @@ set_target_properties (hepnos
SOVERSION ${HEPNOS_VERSION_MAJOR})
add_library(hepnos-service ${hepnos-service-src})
target_link_libraries (hepnos uuid thallium mercury margo yaml-cpp sdskv-client sdskv-server ch-placement)
target_link_libraries (hepnos-service uuid thallium mercury margo yaml-cpp sdskv-client sdskv-server ch-placement)
target_include_directories (hepnos-service PUBLIC $<INSTALL_INTERFACE:include>)
# local include's BEFORE, in case old incompatable .h files in prefix/include
......
......@@ -6,7 +6,7 @@
#include "hepnos/DataSet.hpp"
#include "hepnos/Run.hpp"
#include "hepnos/RunSet.hpp"
#include "RunImpl.hpp"
#include "ItemImpl.hpp"
#include "DataSetImpl.hpp"
#include "DataStoreImpl.hpp"
#include "WriteBatchImpl.hpp"
......@@ -48,7 +48,8 @@ ProductID DataSet::storeRawData(const std::string& key, const char* value, size_
throw Exception("Calling DataSet member function on an invalid DataSet");
}
// forward the call to the datastore's store function
return m_impl->m_datastore->store(0, fullname(), key, value, vsize);
ItemDescriptor id(m_impl->m_uuid);
return m_impl->m_datastore->storeRawProduct(id, key, value, vsize);
}
ProductID DataSet::storeRawData(WriteBatch& batch, const std::string& key, const char* value, size_t vsize) {
......@@ -56,7 +57,8 @@ ProductID DataSet::storeRawData(WriteBatch& batch, const std::string& key, const
throw Exception("Calling DataSet member function on an invalid DataSet");
}
// forward the call to the datastore's store function
return batch.m_impl->store(0, fullname(), key, value, vsize);
ItemDescriptor id(m_impl->m_uuid);
return batch.m_impl->storeRawProduct(id, key, value, vsize);
}
bool DataSet::loadRawData(const std::string& key, std::string& buffer) const {
......@@ -64,7 +66,8 @@ bool DataSet::loadRawData(const std::string& key, std::string& buffer) const {
throw Exception("Calling DataSet member function on an invalid DataSet");
}
// forward the call to the datastore's load function
return m_impl->m_datastore->load(0, fullname(), key, buffer);
ItemDescriptor id(m_impl->m_uuid);
return m_impl->m_datastore->loadRawProduct(id, key, buffer);
}
bool DataSet::loadRawData(const std::string& key, char* value, size_t* vsize) const {
......@@ -72,7 +75,8 @@ bool DataSet::loadRawData(const std::string& key, char* value, size_t* vsize) co
throw Exception("Calling DataSet member function on an invalid DataSet");
}
// forward the call to the datastore's load function
return m_impl->m_datastore->load(0, fullname(), key, value, vsize);
ItemDescriptor id(m_impl->m_uuid);
return m_impl->m_datastore->loadRawProduct(id, key, value, vsize);
}
bool DataSet::operator==(const DataSet& other) const {
......@@ -130,12 +134,9 @@ Run DataSet::createRun(const RunNumber& runNumber) {
if(InvalidRunNumber == runNumber) {
throw Exception("Trying to create a Run with InvalidRunNumber");
}
std::string parent_uuid = m_impl->m_uuid.to_string();
std::string runStr = makeKeyStringFromNumber(runNumber);
m_impl->m_datastore->store(m_impl->m_level+1, parent_uuid, runStr);
return Run(std::make_shared<RunImpl>(
m_impl->m_datastore->createItem(m_impl->m_uuid, runNumber);
return Run(std::make_shared<ItemImpl>(
m_impl->m_datastore,
m_impl->m_level+1,
m_impl->m_uuid,
runNumber));
}
......@@ -144,13 +145,11 @@ Run DataSet::createRun(WriteBatch& batch, const RunNumber& runNumber) {
if(InvalidRunNumber == runNumber) {
throw Exception("Trying to create a Run with InvalidRunNumber");
}
std::string parent_uuid = m_impl->m_uuid.to_string();
std::string runStr = makeKeyStringFromNumber(runNumber);
batch.m_impl->store(m_impl->m_level+1, parent_uuid, runStr);
return Run(
std::make_shared<RunImpl>(
m_impl->m_datastore, m_impl->m_level+1,
m_impl->m_uuid, runNumber));
batch.m_impl->createItem(m_impl->m_uuid, runNumber);
return Run(std::make_shared<ItemImpl>(
m_impl->m_datastore,
m_impl->m_uuid,
runNumber));
}
DataSet DataSet::operator[](const std::string& datasetName) const {
......
......@@ -63,14 +63,14 @@ bool DataStore::loadRawProduct(const ProductID& productID, std::string& buffer)
if(!m_impl) {
throw Exception("Calling DataStore member function on an invalid DataStore object");
}
return m_impl->load(productID.m_level, productID.m_containerName, productID.m_objectName, buffer);
return m_impl->loadRawProduct(productID, buffer);
}
bool DataStore::loadRawProduct(const ProductID& productID, char* data, size_t* size) {
if(!m_impl) {
throw Exception("Calling DataStore member function on an invalid DataStore object");
}
return m_impl->load(productID.m_level, productID.m_containerName, productID.m_objectName, data, size);
return m_impl->loadRawProduct(productID, data, size);
}
}
......
......@@ -19,7 +19,7 @@
#include "hepnos/DataSet.hpp"
#include "StringHash.hpp"
#include "DataSetImpl.hpp"
#include "hepnos/UUID.hpp"
#include "ItemImpl.hpp"
namespace hepnos {
......@@ -55,8 +55,11 @@ class DataStoreImpl {
margo_instance_id m_mid; // Margo instance
std::unordered_map<std::string,hg_addr_t> m_addrs; // Addresses used by the service
sdskv::client m_sdskv_client; // SDSKV client
DistributedDBInfo m_databases; // list of SDSKV databases
DistributedDBInfo m_dataset_dbs; // list of SDSKV databases for DataSets
DistributedDBInfo m_run_dbs; // list of SDSKV databases for Runs
DistributedDBInfo m_subrun_dbs; // list of SDSKV databases for Runs
DistributedDBInfo m_event_dbs; // list of SDSKV databases for Runs
DistributedDBInfo m_product_dbs; // list of SDSKV databases for Products
DataStoreImpl()
: m_mid(MARGO_INSTANCE_NULL)
......@@ -127,16 +130,24 @@ class DataStoreImpl {
YAML::Node event_db = databases["events"];
YAML::Node product_db = databases["products"];
populateDatabases(m_dataset_dbs, dataset_db);
populateDatabases(m_databases, product_db); // TODO change this
populateDatabases(m_run_dbs, run_db);
populateDatabases(m_subrun_dbs, subrun_db);
populateDatabases(m_event_dbs, event_db);
populateDatabases(m_product_dbs, product_db);
}
void cleanup() {
m_databases.dbs.clear();
m_dataset_dbs.dbs.clear();
m_run_dbs.dbs.clear();
m_subrun_dbs.dbs.clear();
m_event_dbs.dbs.clear();
m_product_dbs.dbs.clear();
m_sdskv_client = sdskv::client();
if(m_databases.chi)
ch_placement_finalize(m_databases.chi);
if(m_dataset_dbs.chi)
ch_placement_finalize(m_dataset_dbs.chi);
if(m_dataset_dbs.chi) ch_placement_finalize(m_dataset_dbs.chi);
if(m_run_dbs.chi) ch_placement_finalize(m_run_dbs.chi);
if(m_subrun_dbs.chi) ch_placement_finalize(m_subrun_dbs.chi);
if(m_event_dbs.chi) ch_placement_finalize(m_event_dbs.chi);
if(m_product_dbs.chi) ch_placement_finalize(m_product_dbs.chi);
for(auto& addr : m_addrs) {
margo_addr_free(m_mid, addr.second);
}
......@@ -198,53 +209,36 @@ class DataStoreImpl {
public:
static inline std::string buildKey(
uint8_t level,
const std::string& containerName,
const std::string& objectName) {
size_t c = 1 + objectName.size();
if(!containerName.empty()) c += containerName.size() + 1;
std::string result(c,'\0');
result[0] = level;
if(!containerName.empty()) {
std::memcpy(&result[1], containerName.data(), containerName.size());
size_t x = 1+containerName.size();
result[x] = '/';
std::memcpy(&result[x+1], objectName.data(), objectName.size());
} else {
std::memcpy(&result[1], objectName.data(), objectName.size());
}
///////////////////////////////////////////////////////////////////////////
// Product access functions
///////////////////////////////////////////////////////////////////////////
static inline ProductID buildProductID(const ItemDescriptor& id, const std::string& productName) {
ProductID result;
result.m_key.resize(sizeof(id)+productName.size());
std::memcpy(const_cast<char*>(result.m_key.data()), &id, sizeof(id));
std::memcpy(const_cast<char*>(result.m_key.data()+sizeof(id)), productName.data(), productName.size());
return result;
}
unsigned long computeDbIndex(uint8_t level, const std::string& containerName, const std::string& key) const {
const sdskv::database& locateProductDb(const ProductID& productID) const {
// hash the name to get the provider id
long unsigned sdskv_db_idx = 0;
uint64_t name_hash;
if(level != 0) {
name_hash = hashString(containerName);
} else {
// use the complete name for final objects (level 0)
name_hash = hashString(key);
}
ch_placement_find_closest(m_databases.chi, name_hash, 1, &sdskv_db_idx);
return sdskv_db_idx;
long unsigned db_idx = 0;
uint64_t hash;
hash = hashString(productID.m_key);
ch_placement_find_closest(m_product_dbs.chi, hash, 1, &db_idx);
return m_product_dbs.dbs[db_idx];
}
bool load(uint8_t level, const std::string& containerName,
const std::string& objectName, std::string& data) const {
int ret;
// build key
auto key = buildKey(level, containerName, objectName);
bool loadRawProduct(const ProductID& key,
std::string& data) const {
// find out which DB to access
long unsigned sdskv_db_idx = computeDbIndex(level, containerName, key);
// make corresponding datastore entry
auto& db = m_databases.dbs[sdskv_db_idx];
auto& db = locateProductDb(key);
// read the value
if(data.size() == 0)
data.resize(2048); // eagerly allocate 2KB
try {
db.get(key, data);
db.get(key.m_key, data);
} catch(sdskv::exception& ex) {
if(ex.error() == SDSKV_ERR_UNKNOWN_KEY)
return false;
......@@ -254,18 +248,20 @@ class DataStoreImpl {
return true;
}
bool load(uint8_t level, const std::string& containerName,
const std::string& objectName, char* value, size_t* vsize) const {
int ret;
// build key
auto key = buildKey(level, containerName, objectName);
bool loadRawProduct(const ItemDescriptor& id,
const std::string& productName,
std::string& data) const {
// build product id
auto key = buildProductID(id, productName);
return loadRawProduct(key, data);
}
bool loadRawProduct(const ProductID& key,
char* value, size_t* vsize) const {
// find out which DB to access
long unsigned sdskv_db_idx = computeDbIndex(level, containerName, key);
// make corresponding datastore entry
auto& db = m_databases.dbs[sdskv_db_idx];
// read the value
auto& db = locateProductDb(key);
try {
db.get(key.data(), key.size(), value, vsize);
db.get(key.m_key.data(), key.m_key.size(), value, vsize);
} catch(sdskv::exception& ex) {
if(ex.error() == SDSKV_ERR_UNKNOWN_KEY)
return false;
......@@ -277,33 +273,44 @@ class DataStoreImpl {
return true;
}
bool exists(uint8_t level, const std::string& containerName,
const std::string& objectName) const {
int ret;
// build key
auto key = buildKey(level, containerName, objectName);
bool loadRawProduct(const ItemDescriptor& id,
const std::string& productName,
char* value, size_t* vsize) const {
// build product id
auto key = buildProductID(id, productName);
return loadRawProduct(key, value, vsize);
}
ProductID storeRawProduct(const ItemDescriptor& id,
const std::string& productName,
const char* value, size_t vsize) const {
// build product id
auto key = buildProductID(id, productName);
// find out which DB to access
long unsigned sdskv_db_idx = computeDbIndex(level, containerName, key);
// make corresponding datastore entry
auto& db = m_databases.dbs[sdskv_db_idx];
auto& db = locateProductDb(key);
// read the value
try {
return db.exists(key);
db.put(key.m_key.data(), key.m_key.size(), value, vsize);
} catch(sdskv::exception& ex) {
throw Exception("Error occured when calling sdskv::database::exists (SDSKV error="+std::to_string(ex.error())+")");
if(ex.error() == SDSKV_ERR_KEYEXISTS) {
return ProductID();
} else {
throw Exception("Error occured when calling sdskv::database::put (SDSKV error=" +std::to_string(ex.error()) + ")");
}
}
return false;
return key;
}
ProductID store(uint8_t level, const std::string& containerName,
const std::string& objectName, const char* data=nullptr, size_t data_size=0) {
// build full name
auto key = buildKey(level, containerName, objectName);
ProductID storeRawProduct(const ItemDescriptor& id,
const std::string& productName,
const std::string& data) const {
// build product id
auto key = buildProductID(id, productName);
// find out which DB to access
long unsigned sdskv_db_idx = computeDbIndex(level, containerName, key);
// Create the product id
const auto& db = m_databases.dbs[sdskv_db_idx];
auto& db = locateProductDb(key);
// read the value
try {
db.put(key.data(), key.size(), data, data_size);
db.put(key.m_key, data);
} catch(sdskv::exception& ex) {
if(ex.error() == SDSKV_ERR_KEYEXISTS) {
return ProductID();
......@@ -311,9 +318,9 @@ class DataStoreImpl {
throw Exception("Error occured when calling sdskv::database::put (SDSKV error=" +std::to_string(ex.error()) + ")");
}
}
return ProductID(level, containerName, objectName);
return key;
}
#if 0
void storeMultiple(unsigned long db_index,
const std::vector<std::string>& keys,
const std::vector<std::string>& values) {
......@@ -361,7 +368,7 @@ class DataStoreImpl {
}
return keys.size();
}
#endif
///////////////////////////////////////////////////////////////////////////
// DataSet access functions
///////////////////////////////////////////////////////////////////////////
......@@ -369,7 +376,7 @@ class DataStoreImpl {
/**
* Builds the database key of a particular DataSet.
*/
static inline std::string _buildDataSetKey(uint8_t level, const std::string& containerName, const std::string& objectName) {
static inline std::string buildDataSetKey(uint8_t level, const std::string& containerName, const std::string& objectName) {
size_t c = 1 + objectName.size();
if(!containerName.empty()) c += containerName.size() + 1;
std::string result(c,'\0');
......@@ -388,7 +395,7 @@ class DataStoreImpl {
/**
* Locates and return the database in charge of the provided DataSet info.
*/
const sdskv::database& _locateDataSetDb(const std::string& containerName) const {
const sdskv::database& locateDataSetDb(const std::string& containerName) const {
// hash the name to get the provider id
long unsigned db_idx = 0;
uint64_t hash;
......@@ -411,9 +418,9 @@ class DataStoreImpl {
auto& containerName = *current->m_container;
auto& currentName = current->m_name;
if(current->m_level == 0) return 0; // cannot iterate at object level
auto& db = _locateDataSetDb(containerName);
auto& db = locateDataSetDb(containerName);
// make an entry for the lower bound
auto lb_entry = _buildDataSetKey(level, containerName, currentName);
auto lb_entry = buildDataSetKey(level, containerName, currentName);
// ignore keys that don't have the same level or the same prefix
std::string prefix(2+containerName.size(), '\0');
prefix[0] = level;
......@@ -455,9 +462,9 @@ class DataStoreImpl {
bool dataSetExists(uint8_t level, const std::string& containerName, const std::string& objectName) const {
int ret;
// build key
auto key = buildKey(level, containerName, objectName);
auto key = buildDataSetKey(level, containerName, objectName);
// find out which DB to access
auto& db = _locateDataSetDb(containerName);
auto& db = locateDataSetDb(containerName);
try {
bool b = db.exists(key);
return b;
......@@ -473,9 +480,9 @@ class DataStoreImpl {
bool loadDataSet(uint8_t level, const std::string& containerName, const std::string& objectName, UUID& uuid) const {
int ret;
// build key
auto key = buildKey(level, containerName, objectName);
auto key = buildDataSetKey(level, containerName, objectName);
// find out which DB to access
auto& db = _locateDataSetDb(containerName);
auto& db = locateDataSetDb(containerName);
try {
size_t s = sizeof(uuid);
db.get(static_cast<const void*>(key.data()),
......@@ -497,14 +504,126 @@ class DataStoreImpl {
*/
bool createDataSet(uint8_t level, const std::string& containerName, const std::string& objectName, UUID& uuid) {
// build full name
auto key = _buildDataSetKey(level, containerName, objectName);
auto key = buildDataSetKey(level, containerName, objectName);
// find out which DB to access
auto& db = _locateDataSetDb(containerName);
auto& db = locateDataSetDb(containerName);
uuid.randomize();
try {
db.put(key.data(), key.size(), uuid.data, sizeof(uuid));
} catch(sdskv::exception& ex) {
if(!ex.error() == SDSKV_ERR_KEYEXISTS) {
if(ex.error() == SDSKV_ERR_KEYEXISTS) {
return false;
} else {
throw Exception("Error occured when calling sdskv::database::put (SDSKV error=" +std::to_string(ex.error()) + ")");
}
}
return true;
}
///////////////////////////////////////////////////////////////////////////
// Access functions for numbered items (Runs, SubRuns, and Events)
///////////////////////////////////////////////////////////////////////////
const sdskv::database& locateItemDb(const ItemDescriptor& id) const {
long unsigned db_idx = 0;
uint64_t hash;
size_t prime = 1099511628211ULL;
hash = id