Commit 91903f4f authored by Matthieu Dorier's avatar Matthieu Dorier

Separated Dataset storage from storage of the rest of the items

parent 9745f40c
......@@ -31,12 +31,10 @@ DataStore DataSet::datastore() const {
DataSet DataSet::next() const {
if(!valid()) return DataSet();
std::vector<std::string> keys;
size_t s = m_impl->m_datastore->nextKeys(
m_impl->m_level, *m_impl->m_container, m_impl->m_name, keys, 1);
std::vector<std::shared_ptr<DataSetImpl>> result;
size_t s = m_impl->m_datastore->nextDataSets(m_impl, result, 1);
if(s == 0) return DataSet();
return DataSet(std::make_shared<DataSetImpl>(m_impl->m_datastore, m_impl->m_level, keys[0]));
else return DataSet(std::move(result[0]));
}
bool DataSet::valid() const {
......@@ -80,10 +78,11 @@ bool DataSet::operator==(const DataSet& other) const {
bool v2 = other.valid();
if(!v1 && !v2) return true;
if(v1 && !v2) return false;
if(!v2 && v2) return false;
if(!v1 && v2) return false;
return m_impl->m_datastore == other.m_impl->m_datastore
&& m_impl->m_level == other.m_impl->m_level
&& *m_impl->m_container == *other.m_impl->m_container
&& (m_impl->m_container == other.m_impl->m_container
|| *m_impl->m_container == *other.m_impl->m_container )
&& m_impl->m_name == other.m_impl->m_name;
}
......@@ -106,11 +105,10 @@ const std::string& DataSet::container() const {
}
std::string DataSet::fullname() const {
std::stringstream ss;
if(container().size() != 0)
ss << container() << "/";
ss << name();
return ss.str();
if(!valid()) {
throw Exception("Calling DataSet member function on an invalid DataSet");
}
return m_impl->fullname();
}
DataSet DataSet::createDataSet(const std::string& name) {
......@@ -119,7 +117,7 @@ DataSet DataSet::createDataSet(const std::string& name) {
throw Exception("Invalid character '/' or '%' in dataset name");
}
std::string parent = fullname();
m_impl->m_datastore->store(m_impl->m_level+1, parent, name);
m_impl->m_datastore->createDataSet(m_impl->m_level+1, parent, name);
return DataSet(std::make_shared<DataSetImpl>(
m_impl->m_datastore, m_impl->m_level+1,
std::make_shared<std::string>(parent), name));
......@@ -190,7 +188,7 @@ DataSet::iterator DataSet::find(const std::string& datasetPath) {
datasetName = datasetPath.substr(c+1);
}
bool b = m_impl->m_datastore->exists(level, containerName, datasetName);
bool b = m_impl->m_datastore->dataSetExists(level, containerName, datasetName);
if(!b) {
return m_impl->m_datastore->m_end;
}
......
......@@ -6,12 +6,13 @@
#ifndef __HEPNOS_PRIVATE_DATASET_IMPL_H
#define __HEPNOS_PRIVATE_DATASET_IMPL_H
#include "DataStoreImpl.hpp"
#include "hepnos/DataSet.hpp"
#include "hepnos/RunSet.hpp"
namespace hepnos {
class DataStoreImpl;
class DataSetImpl {
public:
......
......@@ -71,8 +71,7 @@ DataStore::iterator DataStore::find(const std::string& datasetPath) {
datasetName = datasetPath.substr(c+1);
}
std::string data;
bool b = m_impl->load(level, containerName, datasetName, data);
bool b = m_impl->dataSetExists(level, containerName, datasetName);
if(!b) {
return m_impl->m_end;
}
......@@ -181,7 +180,7 @@ DataSet DataStore::createDataSet(const std::string& name) {
|| name.find('%') != std::string::npos) {
throw Exception("Invalid character ('/' or '%') in dataset name");
}
m_impl->store(1, "", name);
m_impl->createDataSet(1, "", name);
return DataSet(
std::make_shared<DataSetImpl>(
m_impl, 1, std::make_shared<std::string>(""), name));
......
......@@ -18,6 +18,7 @@
#include "hepnos/DataStore.hpp"
#include "hepnos/DataSet.hpp"
#include "StringHash.hpp"
#include "DataSetImpl.hpp"
namespace hepnos {
......@@ -37,36 +38,17 @@ class DataStoreImpl {
std::unordered_map<std::string,hg_addr_t> m_addrs; // Addresses used by the service
sdskv::client m_sdskv_client; // SDSKV client
DistributedDBInfo m_databases; // list of SDSKV databases
DistributedDBInfo m_dataset_dbs; // list of SDSKV databases for DataSets
const DataStore::iterator m_end; // iterator for the end() of the DataStore
DataStoreImpl()
: m_mid(MARGO_INSTANCE_NULL)
, m_end() {}
void init(const std::string& configFile) {
void populateDatabases(DistributedDBInfo& db_info, const YAML::Node& db_config) {
int ret;
hg_return_t hret;
YAML::Node config = YAML::LoadFile(configFile);
checkConfig(config);
// get protocol
std::string proto = config["hepnos"]["client"]["protocol"].as<std::string>();
// initialize Margo
m_mid = margo_init(proto.c_str(), MARGO_CLIENT_MODE, 0, 0);
if(!m_mid) {
cleanup();
throw Exception("Could not initialized Margo");
}
// initialize SDSKV client
try {
m_sdskv_client = sdskv::client(m_mid);
} catch(sdskv::exception& ex) {
cleanup();
throw Exception("Could not create SDSKV client (SDSKV error="+std::to_string(ex.error())+")");
}
// create list of sdskv provider handles
YAML::Node databases = config["hepnos"]["databases"];
YAML::Node dataset_db = databases["datasets"];
for(YAML::const_iterator address_it = dataset_db.begin(); address_it != dataset_db.end(); address_it++) {
for(YAML::const_iterator address_it = db_config.begin(); address_it != db_config.end(); address_it++) {
std::string str_addr = address_it->first.as<std::string>();
YAML::Node providers = address_it->second;
// lookup the address
......@@ -92,12 +74,43 @@ class DataStoreImpl {
YAML::Node databases = provider_it->second;
// iterate over databases for this provider
for(unsigned i=0; i < databases.size(); i++) {
m_databases.dbs.push_back(sdskv::database(ph, databases[i].as<uint64_t>()));
db_info.dbs.push_back(sdskv::database(ph, databases[i].as<uint64_t>()));
}
} // for each provider
} // for each address
// initialize ch-placement for the SDSKV providers
m_databases.chi = ch_placement_initialize("hash_lookup3", m_databases.dbs.size(), 4, 0);
// initialize ch-placement
db_info.chi = ch_placement_initialize("hash_lookup3", db_info.dbs.size(), 4, 0);
}
void init(const std::string& configFile) {
int ret;
hg_return_t hret;
YAML::Node config = YAML::LoadFile(configFile);
checkConfig(config);
// get protocol
std::string proto = config["hepnos"]["client"]["protocol"].as<std::string>();
// initialize Margo
m_mid = margo_init(proto.c_str(), MARGO_CLIENT_MODE, 0, 0);
if(!m_mid) {
cleanup();
throw Exception("Could not initialized Margo");
}
// initialize SDSKV client
try {
m_sdskv_client = sdskv::client(m_mid);
} catch(sdskv::exception& ex) {
cleanup();
throw Exception("Could not create SDSKV client (SDSKV error="+std::to_string(ex.error())+")");
}
// populate database info structures for each type of database
YAML::Node databases = config["hepnos"]["databases"];
YAML::Node dataset_db = databases["datasets"];
YAML::Node run_db = databases["runs"];
YAML::Node subrun_db = databases["subruns"];
YAML::Node event_db = databases["events"];
YAML::Node product_db = databases["products"];
populateDatabases(m_dataset_dbs, dataset_db);
populateDatabases(m_databases, product_db); // TODO change this
}
void cleanup() {
......@@ -330,6 +343,128 @@ class DataStoreImpl {
}
return keys.size();
}
///////////////////////////////////////////////////////////////////////////
// DataSet access functions
///////////////////////////////////////////////////////////////////////////
/**
* Builds the database key of a particular DataSet.
*/
static inline std::string _buildDataSetKey(uint8_t level, const std::string& containerName, const std::string& objectName) {
size_t c = 1 + objectName.size();
if(!containerName.empty()) c += containerName.size() + 1;
std::string result(c,'\0');
result[0] = level;
if(!containerName.empty()) {
std::memcpy(&result[1], containerName.data(), containerName.size());
size_t x = 1+containerName.size();
result[x] = '/';
std::memcpy(&result[x+1], objectName.data(), objectName.size());
} else {
std::memcpy(&result[1], objectName.data(), objectName.size());
}
return result;
}
/**
* Locates and return the database in charge of the provided DataSet info.
*/
const sdskv::database& _locateDataSetDb(const std::string& containerName) const {
// hash the name to get the provider id
long unsigned db_idx = 0;
uint64_t hash;
hash = hashString(containerName);
ch_placement_find_closest(m_dataset_dbs.chi, hash, 1, &db_idx);
return m_dataset_dbs.dbs[db_idx];
}
/**
* @brief Fills the result vector with a sequence of up to
* maxDataSets shared_ptr to DataSetImpl coming after the
* current dataset. Returns the number of DataSets read.
*/
size_t nextDataSets(const std::shared_ptr<DataSetImpl>& current,
std::vector<std::shared_ptr<DataSetImpl>>& result,
size_t maxDataSets) const {
int ret;
result.resize(0);
auto& level = current->m_level;
auto& containerName = *current->m_container;
auto& currentName = current->m_name;
if(current->m_level == 0) return 0; // cannot iterate at object level
auto& db = _locateDataSetDb(containerName);
// make an entry for the lower bound
auto lb_entry = _buildDataSetKey(level, containerName, currentName);
// ignore keys that don't have the same level or the same prefix
std::string prefix(2+containerName.size(), '\0');
prefix[0] = level;
if(containerName.size() != 0) {
std::memcpy(&prefix[1], containerName.data(), containerName.size());
prefix[prefix.size()-1] = '/';
} else {
prefix.resize(1);
}
// issue an sdskv_list_keys
std::vector<std::string> entries(maxDataSets, std::string(1024,'\0'));
try {
db.list_keys(lb_entry, prefix, entries);
} catch(sdskv::exception& ex) {
throw Exception("Error occured when calling sdskv::database::list_keys (SDSKV error="+std::string(ex.what()) + ")");
}
result.resize(0);
for(const auto& entry : entries) {
size_t i = entry.find_last_of('/');
if(i == std::string::npos) i = 1;
else i += 1;
result.push_back(
std::make_shared<DataSetImpl>(
current->m_datastore,
level,
current->m_container,
entry.substr(i)
)
);
}
return result.size();
}
/**
* @brief Checks if a particular dataset exists.
*/
bool dataSetExists(uint8_t level, const std::string& containerName, const std::string& objectName) const {
int ret;
// build key
auto key = buildKey(level, containerName, objectName);
// find out which DB to access
auto& db = _locateDataSetDb(containerName);
try {
return db.exists(key);
} catch(sdskv::exception& ex) {
throw Exception("Error occured when calling sdskv::database::exists (SDSKV error="+std::to_string(ex.error())+")");
}
return false;
}
/**
* Creates a DataSet
*/
bool createDataSet(uint8_t level, const std::string& containerName, const std::string& objectName) {
// build full name
auto key = _buildDataSetKey(level, containerName, objectName);
// find out which DB to access
auto& db = _locateDataSetDb(containerName);
try {
db.put(key.data(), key.size(), nullptr, 0);
} catch(sdskv::exception& ex) {
if(!ex.error() == SDSKV_ERR_KEYEXISTS) {
return false;
} else {
throw Exception("Error occured when calling sdskv::database::put (SDSKV error=" +std::to_string(ex.error()) + ")");
}
}
return true;
}
};
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment