Commit b23286ea authored by Matthieu Dorier's avatar Matthieu Dorier
Browse files

DataStore functionalities (and iterators) done

parent e9c43787
......@@ -87,6 +87,7 @@ int main(int argc, char *argv[])
ret = sdskv_provider_register(mid, sdskv_mplex_id, SDSKV_ABT_POOL_DEFAULT, &sdskv_prov);
ASSERT(ret == 0, "sdskv_provider_register() failed (ret = %d)\n", ret);
// XXX creating the database - this should come from a config file
sdskv_database_id_t db_id;
ret = sdskv_provider_add_database(sdskv_prov, "hepnosdb", KVDB_MAP, SDSKV_COMPARE_DEFAULT, &db_id);
ASSERT(ret == 0, "sdskv_provider_add_database() failed (ret = %d)\n", ret);
......
#ifndef __HEPNOS_DATA_SET_H
#define __HEPNOS_DATA_SET_H
#include <boost/archive/binary_oarchive.hpp>
#include <boost/archive/binary_iarchive.hpp>
#include <boost/archive/text_oarchive.hpp>
#include <boost/archive/text_iarchive.hpp>
#include <boost/serialization/string.hpp>
#include <hepnos/Exception.hpp>
#include <hepnos/DataStore.hpp>
namespace hepnos {
......@@ -13,11 +19,76 @@ class DataSet {
DataSet();
DataSet(DataStore& ds, uint8_t level, const std::string& name);
DataSet(DataStore& ds, uint8_t level, const std::string& fullname);
DataStore* m_datastore;
uint8_t m_level;
std::string m_name;
DataSet(DataStore& ds, uint8_t level, const std::string& container, const std::string& name);
bool storeBuffer(const std::string& key, const std::vector<char>& buffer);
bool loadBuffer(const std::string& key, std::vector<char>& buffer) const;
DataStore* m_datastore;
uint8_t m_level;
std::string m_container;
std::string m_name;
public:
const std::string& name() const {
return m_name;
}
const std::string container() const {
return m_container;
}
std::string fullname() const {
std::stringstream ss;
if(m_container.size() != 0)
ss << m_container << "/";
ss << m_name;
return ss.str();
}
DataSet next() const;
bool valid() const;
template<typename K, typename V>
bool store(const K& key, const V& value) {
std::stringstream ss_key, ss_value;
ss_key << key;
boost::archive::binary_oarchive oa(ss_value);
try {
oa << value;
} catch(...) {
throw Exception("Exception occured during serialization");
}
std::string serialized = ss_value.str();
std::vector<char> buffer(serialized.begin(), serialized.end());
return storeBuffer(ss_key.str(), buffer);
}
template<typename K, typename V>
bool load(const K& key, V& value) const {
std::stringstream ss_key;
ss_key << key;
std::vector<char> buffer;
if(!loadBuffer(key, buffer)) {
return false;
}
try {
std::string serialized(buffer.begin(), buffer.end());
std::stringstream ss(serialized);
boost::archive::binary_iarchive ia(ss);
ia >> value;
} catch(...) {
throw Exception("Exception occured during serialization");
}
return true;
}
bool operator==(const DataSet& other) const;
};
}
......
#ifndef __HEPNOS_DATA_STORE_H
#define __HEPNOS_DATA_STORE_H
#include <vector>
#include <string>
#include <memory>
......@@ -10,6 +11,8 @@ class DataSet;
class DataStore {
friend class DataSet;
public:
DataStore(const std::string& configFile);
......@@ -50,6 +53,16 @@ class DataStore {
class Impl;
std::unique_ptr<Impl> m_impl;
bool load(uint8_t level, const std::string& containerName,
const std::string& objectName, std::vector<char>& data) const;
bool store(uint8_t level, const std::string& containerName,
const std::string& objectName, const std::vector<char>& data);
size_t nextKeys(uint8_t level, const std::string& containerName,
const std::string& lower,
std::vector<std::string>& keys, size_t maxKeys) const;
};
class DataStore::const_iterator {
......
......@@ -5,11 +5,66 @@ namespace hepnos {
DataSet::DataSet()
: m_datastore(nullptr)
, m_level(0)
, m_container("")
, m_name("") {}
DataSet::DataSet(DataStore& ds, uint8_t level, const std::string& name)
DataSet::DataSet(DataStore& ds, uint8_t level, const std::string& fullname)
: m_datastore(&ds)
, m_level(level) {
size_t p = fullname.find_last_of('/');
if(p == std::string::npos) {
m_name = fullname;
} else {
m_name = fullname.substr(p+1);
m_container = fullname.substr(0, p);
}
}
DataSet::DataSet(DataStore& ds, uint8_t level, const std::string& container, const std::string& name)
: m_datastore(&ds)
, m_level(level)
, m_container(container)
, m_name(name) {}
DataSet DataSet::next() const {
if(!valid()) return DataSet();
std::vector<std::string> keys;
size_t s = m_datastore->nextKeys(m_level, m_container, m_name, keys, 1);
if(s == 0) return DataSet();
return DataSet(*m_datastore, m_level, m_container, keys[0]);
}
bool DataSet::valid() const {
return m_datastore != nullptr;
}
bool DataSet::storeBuffer(const std::string& key, const std::vector<char>& buffer) {
if(!valid()) {
throw Exception("Calling store() on invalid DataSet");
}
// forward the call to the datastore's store function
return m_datastore->store(0, fullname(), key, buffer);
}
bool DataSet::loadBuffer(const std::string& key, std::vector<char>& buffer) const {
if(!valid()) {
throw Exception("Calling load() on invalid DataSet");
}
// forward the call to the datastore's load function
std::stringstream ss;
if(m_container.size() != 0)
ss << m_container << "/";
ss << m_name;
return m_datastore->load(0, fullname(), key, buffer);
}
bool DataSet::operator==(const DataSet& other) const {
return m_datastore == other.m_datastore
&& m_level == other.m_level
&& m_container == other.m_container
&& m_name == other.m_name;
}
}
......@@ -12,19 +12,23 @@
namespace hepnos {
////////////////////////////////////////////////////////////////////////////////////////////
// DataStore::Impl implementation
////////////////////////////////////////////////////////////////////////////////////////////
class DataStore::Impl {
public:
DataStore& m_parent;
margo_instance_id m_mid;
sdskv_client_t m_sdskv_client;
bake_client_t m_bake_client;
std::vector<sdskv_provider_handle_t> m_sdskv_ph;
std::vector<sdskv_database_id_t> m_sdskv_db;
struct ch_placement_instance* m_chi_sdskv;
std::vector<bake_provider_handle_t> m_bake_ph;
struct ch_placement_instance* m_chi_bake;
const DataStore::iterator m_end;
DataStore& m_parent; // parent DataStore
margo_instance_id m_mid; // Margo instance
sdskv_client_t m_sdskv_client; // SDSKV client
bake_client_t m_bake_client; // BAKE client
std::vector<sdskv_provider_handle_t> m_sdskv_ph; // list of SDSKV provider handlers
std::vector<sdskv_database_id_t> m_sdskv_db; // list of SDSKV database ids
struct ch_placement_instance* m_chi_sdskv; // ch-placement instance for SDSKV
std::vector<bake_provider_handle_t> m_bake_ph; // list of BAKE provider handlers
struct ch_placement_instance* m_chi_bake; // ch-placement instance for BAKE
const DataStore::iterator m_end; // iterator for the end() of the DataStore
Impl(DataStore& parent)
: m_parent(parent)
......@@ -228,6 +232,10 @@ class DataStore::Impl {
}
};
////////////////////////////////////////////////////////////////////////////////////////////
// DataStore implementation
////////////////////////////////////////////////////////////////////////////////////////////
DataStore::DataStore(const std::string& configFile)
: m_impl(std::make_unique<DataStore::Impl>(*this)) {
m_impl->init(configFile);
......@@ -255,23 +263,12 @@ DataStore::iterator DataStore::find(const std::string& datasetName) {
if(datasetName.find('/') != std::string::npos) {
throw Exception("Invalid character '/' in dataset name");
}
DataStoreEntryPtr entry = make_datastore_entry(0, datasetName);
// find which sdskv provider to contact
uint64_t h = std::hash<std::string>()(datasetName);
unsigned long provider_idx;
ch_placement_find_closest(m_impl->m_chi_sdskv, h, 1, &provider_idx);
// find the size of the value, as a way to check if the key exists
auto ph = m_impl->m_sdskv_ph[provider_idx];
auto db_id = m_impl->m_sdskv_db[provider_idx];
hg_size_t vsize;
ret = sdskv_length(ph, db_id, entry->raw(), entry->length(), &vsize);
if(ret == SDSKV_ERR_UNKNOWN_KEY) {
return end();
}
if(ret != SDSKV_SUCCESS) {
throw Exception("Error occured when calling sdskv_length");
std::vector<char> data;
bool b = load(1, "", datasetName, data);
if(!b) {
return m_impl->m_end;
}
return iterator(*this, DataSet(*this, 0, datasetName));
return iterator(*this, DataSet(*this, 1, datasetName));
}
DataStore::const_iterator DataStore::find(const std::string& datasetName) const {
......@@ -280,7 +277,10 @@ DataStore::const_iterator DataStore::find(const std::string& datasetName) const
}
DataStore::iterator DataStore::begin() {
return iterator(*this);
DataSet ds(*this, 1, "");
ds = ds.next();
if(ds.valid()) return iterator(*this, ds);
else return end();
}
DataStore::iterator DataStore::end() {
......@@ -292,139 +292,304 @@ DataStore::const_iterator DataStore::cbegin() const {
}
DataStore::const_iterator DataStore::cend() const {
return m_impl->m_end;
return const_cast<DataStore*>(this)->end();
}
DataStore::iterator DataStore::lower_bound(const std::string& lb) {
// TODO
std::string lb2 = lb;
size_t s = lb2.size();
lb2[s-1] -= 1; // sdskv_list_keys's start_key is exclusive
iterator it = find(lb2);
if(it != end()) {
// we found something before the specified lower bound
++it;
return it;
}
DataSet ds(*this, 1, lb2);
ds = ds.next();
if(!ds.valid()) return end();
else return iterator(*this, ds);
}
DataStore::const_iterator DataStore::lower_bound(const std::string& lb) const {
// TODO
iterator it = const_cast<DataStore*>(this)->lower_bound(lb);
return it;
}
DataStore::iterator DataStore::upper_bound(const std::string& ub) {
// TODO
DataSet ds(*this, 1, ub);
ds = ds.next();
if(!ds.valid()) return end();
else return iterator(*this, ds);
}
DataStore::const_iterator DataStore::upper_bound(const std::string& ub) const {
// TODO
iterator it = const_cast<DataStore*>(this)->upper_bound(ub);
return it;
}
DataSet DataStore::createDataSet(const std::string& name) {
if(name.find('/') != std::string::npos) {
throw Exception("Invalid character '/' in dataset name");
}
DataStoreEntryPtr entry = make_datastore_entry(0, name);
// find which sdskv provider to contact
uint64_t h = std::hash<std::string>()(name);
unsigned long provider_idx;
ch_placement_find_closest(m_impl->m_chi_sdskv, h, 1, &provider_idx);
// store the key
store(1, "", name, std::vector<char>());
return DataSet(*this, 1, name);
}
bool DataStore::load(uint8_t level, const std::string& containerName,
const std::string& objectName, std::vector<char>& data) const {
int ret;
// build full name
std::stringstream ss;
if(!containerName.empty())
ss << containerName << "/";
ss << objectName;
// hash the name to get the provider id
long unsigned provider_idx = 0;
if(level != 0) {
uint64_t h = std::hash<std::string>()(containerName);
ch_placement_find_closest(m_impl->m_chi_sdskv, h, 1, &provider_idx);
} else {
// use the complete name for final objects (level 255)
uint64_t h = std::hash<std::string>()(ss.str());
ch_placement_find_closest(m_impl->m_chi_sdskv, h, 1, &provider_idx);
}
// make corresponding datastore entry
DataStoreEntryPtr entry = make_datastore_entry(level, ss.str());
auto ph = m_impl->m_sdskv_ph[provider_idx];
auto db_id = m_impl->m_sdskv_db[provider_idx];
int ret = sdskv_put(ph, db_id, entry->raw(), entry->length(), NULL, 0);
// find the size of the value, as a way to check if the key exists
hg_size_t vsize;
std::cerr << "[LOG] load (level=" << (int)level
<< ", container=\"" << containerName << "\", object=\""
<< objectName << "\")" << std::endl;
ret = sdskv_length(ph, db_id, entry->raw(), entry->length(), &vsize);
if(ret == SDSKV_ERR_UNKNOWN_KEY) {
return false;
}
if(ret != SDSKV_SUCCESS) {
throw Exception("Could not create DataSet (sdskv error)");
throw Exception("Error occured when calling sdskv_length");
}
return DataSet(*this, 0, name);
// read the value
data.resize(vsize);
ret = sdskv_get(ph, db_id, entry->raw(), entry->length(), data.data(), &vsize);
if(ret != SDSKV_SUCCESS) {
throw Exception("Error occured when calling sdskv_get");
}
return true;
}
bool DataStore::store(uint8_t level, const std::string& containerName,
const std::string& objectName, const std::vector<char>& data) {
// build full name
std::stringstream ss;
if(!containerName.empty())
ss << containerName << "/";
ss << objectName;
// hash the name to get the provider id
long unsigned provider_idx = 0;
if(level != 0) {
uint64_t h = std::hash<std::string>()(containerName);
ch_placement_find_closest(m_impl->m_chi_sdskv, h, 1, &provider_idx);
} else {
// use the complete name for final objects (level 0)
uint64_t h = std::hash<std::string>()(ss.str());
ch_placement_find_closest(m_impl->m_chi_sdskv, h, 1, &provider_idx);
}
// make corresponding datastore entry
DataStoreEntryPtr entry = make_datastore_entry(level, ss.str());
auto ph = m_impl->m_sdskv_ph[provider_idx];
auto db_id = m_impl->m_sdskv_db[provider_idx];
std::cerr << "[LOG] store (level=" << (int)level
<< ", container=\"" << containerName << "\", object=\""
<< objectName << "\")" << std::endl;
int ret = sdskv_put(ph, db_id, entry->raw(), entry->length(), data.data(), data.size());
if(ret != SDSKV_SUCCESS) {
throw Exception("Could not put key/value pair in SDSKV (sdskv_put error)");
}
return true;
}
size_t DataStore::nextKeys(uint8_t level, const std::string& containerName,
const std::string& lower,
std::vector<std::string>& keys, size_t maxKeys) const {
int ret;
if(level == 0) return 0; // cannot iterate at object level
// build full name from lower bound key
std::stringstream ss;
if(!containerName.empty())
ss << containerName << "/";
ss << lower;
// hash the name to get the provider id
long unsigned provider_idx = 0;
uint64_t h = std::hash<std::string>()(containerName);
ch_placement_find_closest(m_impl->m_chi_sdskv, h, 1, &provider_idx);
// make an entry for the lower bound
DataStoreEntryPtr lb_entry = make_datastore_entry(level, ss.str());
// create data structures to receive keys
std::vector<DataStoreEntryPtr> keys_ent;
std::vector<void*> keys_ptr(maxKeys);
std::vector<hg_size_t> keys_len(maxKeys);
for(unsigned i=0; i < maxKeys; i++) {
keys_ent.push_back(make_datastore_entry(level, 1024));
keys_ptr[i] = keys_ent[i]->raw();
keys_len[i] = sizeof(DataStoreEntry) + 1024;
}
// get provider and database
auto ph = m_impl->m_sdskv_ph[provider_idx];
auto db_id = m_impl->m_sdskv_db[provider_idx];
// issue an sdskv_list_keys
hg_size_t max_keys = maxKeys;
std::cerr << "[LOG] list keys (level=" << (int)level
<< ", container=\"" << containerName << "\", greaterthan=\""
<< lower << "\")" << std::endl;
ret = sdskv_list_keys(ph, db_id, lb_entry->raw(), lb_entry->length(),
keys_ptr.data(), keys_len.data(), &max_keys);
if(ret != HG_SUCCESS) {
throw Exception("Error occured when calling sdskv_list_keys");
}
unsigned i = max_keys - 1;
if(max_keys == 0) return 0;
// remove keys that don't have the same level or the same prefix
std::string prefix = containerName + "/";
keys.resize(0);
for(unsigned i = 0; i < max_keys; i++) {
if(keys_ent[i]->m_level != level) {
max_keys = i;
break;
}
if(!containerName.empty()) {
size_t lenpre = prefix.size();
size_t lenstr = strlen(keys_ent[i]->m_fullname);
if(lenstr < lenpre) {
max_keys = i;
break;
}
if(strncmp(prefix.c_str(), keys_ent[i]->m_fullname, lenpre) != 0) {
max_keys = i;
break;
}
}
keys.push_back(keys_ent[i]->m_fullname);
}
// set the resulting keys
return max_keys;
}
////////////////////////////////////////////////////////////////////////////////////////////
// DataStore::const_iterator::Impl implementation
////////////////////////////////////////////////////////////////////////////////////////////
class DataStore::const_iterator::Impl {
public:
DataStore* m_datastore;
DataSet m_current_dataset;
bool m_is_valid;
Impl(DataStore& ds)
: m_datastore(&ds)
, m_current_dataset()
, m_is_valid(false) {}
{}
Impl(DataStore& ds, const DataSet& dataset)
: m_datastore(&ds)
, m_current_dataset(dataset)
, m_is_valid(true) {}
{}
Impl(const Impl& other)
: m_datastore(other.m_datastore)
, m_current_dataset(other.m_current_dataset)
, m_is_valid(other.m_is_valid) {}
{}
bool operator==(const Impl& other) const {
return m_datastore == other.m_datastore
&& m_current_dataset == other.m_current_dataset;
}
};
////////////////////////////////////////////////////////////////////////////////////////////
// DataStore::const_iterator::Impl implementation
////////////////////////////////////////////////////////////////////////////////////////////
DataStore::const_iterator::const_iterator(DataStore& ds)
: m_impl(std::make_unique<Impl>(ds)) {
}
: m_impl(std::make_unique<Impl>(ds)) {}
DataStore::const_iterator::const_iterator(DataStore& ds, const DataSet& dataset)
: m_impl(std::make_unique<Impl>(ds, dataset)) {
}
: m_impl(std::make_unique<Impl>(ds, dataset)) {}
DataStore::const_iterator::~const_iterator() {
// TODO
}
DataStore::const_iterator::~const_iterator() {}
DataStore::const_iterator::const_iterator(const DataStore::const_iterator& other)
: m_impl(std::make_unique<Impl>(*other.m_impl)) {
}
: m_impl(std::make_unique<Impl>(*other.m_impl)) {}
DataStore::const_iterator::const_iterator(DataStore::const_iterator&& other)
: m_impl(std::move(other.m_impl)) {
// TODO
}
: m_impl(std::move(other.m_impl)) {}
DataStore::const_iterator& DataStore::const_iterator::operator=(const DataStore::const_iterator&) {
// TODO
DataStore::const_iterator& DataStore::const_iterator::operator=(const DataStore::const_iterator& other) {
if(&other == this) return *this;
m_impl = std::make_unique<Impl>(*other.m_impl);
return *this;
}
DataStore::const_iterator& DataStore::const_iterator::operator=(DataStore::const_iterator&&) {
// TODO
DataStore::const_iterator& DataStore::const_iterator::operator=(DataStore::const_iterator&& other) {
if(&other == this) return *this;
m_impl = std::move(other.m_impl);
return *this;
}
DataStore::const_iterator::self_type DataStore::const_iterator::operator++() {
// TODO
if(!m_impl) {
throw Exception("Trying to increment an invalid iterator");
}
m_impl->m_current_dataset = m_impl->m_current_dataset.next();
return *this;
}
DataStore::const_iterator::self_type DataStore::const_iterator::operator++(int) {
// TODO
const_iterator copy = *this;
++(*this);
return copy;
}
const DataStore::const_iterator::reference DataStore::const_iterator::operator*() {
// TODO
if(!m_impl) {
throw Exception("Trying to dereference an invalid iterator");
}
return m_impl->m_current_dataset;
}
const DataStore::const_iterator::pointer DataStore::const_iterator::operator->() {
// TODO
if(!m_impl) return nullptr;
return &(m_impl->m_current_dataset);
}
bool DataStore::const_iterator::operator==(const self_type& rhs) const {
// TODO
if(!m_impl && !rhs.m_impl) return true;
if(m_impl && !rhs.m_impl) return false;
if(!m_impl && rhs.m_impl) return false;
return *m_impl == *