Commit ca74a8b4 authored by Matthieu Dorier's avatar Matthieu Dorier
Browse files

done adapting to C++ interface

parent 837650b2
...@@ -6,9 +6,6 @@ ...@@ -6,9 +6,6 @@
#include <vector> #include <vector>
#include <functional> #include <functional>
#include <iostream> #include <iostream>
#include <yaml-cpp/yaml.h>
#include <sdskv-client.h>
#include <ch-placement.h>
#include "hepnos/Exception.hpp" #include "hepnos/Exception.hpp"
#include "hepnos/DataStore.hpp" #include "hepnos/DataStore.hpp"
#include "hepnos/DataSet.hpp" #include "hepnos/DataSet.hpp"
......
...@@ -12,9 +12,9 @@ ...@@ -12,9 +12,9 @@
#include <functional> #include <functional>
#include <iostream> #include <iostream>
#include <yaml-cpp/yaml.h> #include <yaml-cpp/yaml.h>
#include <sdskv-client.h> #include <sdskv-client.hpp>
#include <ch-placement.h> #include <ch-placement.h>
#include "KeyTypes.hpp" //#include "KeyTypes.hpp"
#include "hepnos/Exception.hpp" #include "hepnos/Exception.hpp"
#include "hepnos/DataStore.hpp" #include "hepnos/DataStore.hpp"
#include "hepnos/DataSet.hpp" #include "hepnos/DataSet.hpp"
...@@ -28,21 +28,15 @@ namespace hepnos { ...@@ -28,21 +28,15 @@ namespace hepnos {
class DataStore::Impl { class DataStore::Impl {
public: public:
struct database {
sdskv_provider_handle_t m_sdskv_ph;
sdskv_database_id_t m_sdskv_db;
};
margo_instance_id m_mid; // Margo instance margo_instance_id m_mid; // Margo instance
std::unordered_map<std::string,hg_addr_t> m_addrs; // Addresses used by the service std::unordered_map<std::string,hg_addr_t> m_addrs; // Addresses used by the service
sdskv_client_t m_sdskv_client; // SDSKV client sdskv::client m_sdskv_client; // SDSKV client
std::vector<database> m_databases; // list of SDSKV databases std::vector<sdskv::database> m_databases; // list of SDSKV databases
struct ch_placement_instance* m_chi_sdskv; // ch-placement instance for SDSKV struct ch_placement_instance* m_chi_sdskv; // ch-placement instance for SDSKV
const DataStore::iterator m_end; // iterator for the end() of the DataStore const DataStore::iterator m_end; // iterator for the end() of the DataStore
Impl(DataStore* parent) Impl(DataStore* parent)
: m_mid(MARGO_INSTANCE_NULL) : m_mid(MARGO_INSTANCE_NULL)
, m_sdskv_client(SDSKV_CLIENT_NULL)
, m_chi_sdskv(nullptr) , m_chi_sdskv(nullptr)
, m_end() {} , m_end() {}
...@@ -60,8 +54,9 @@ class DataStore::Impl { ...@@ -60,8 +54,9 @@ class DataStore::Impl {
throw Exception("Could not initialized Margo"); throw Exception("Could not initialized Margo");
} }
// initialize SDSKV client // initialize SDSKV client
ret = sdskv_client_init(m_mid, &m_sdskv_client); try {
if(ret != SDSKV_SUCCESS) { m_sdskv_client = sdskv::client(m_mid);
} catch(...) {
cleanup(); cleanup();
throw Exception("Could not create SDSKV client"); throw Exception("Could not create SDSKV client");
} }
...@@ -83,44 +78,20 @@ class DataStore::Impl { ...@@ -83,44 +78,20 @@ class DataStore::Impl {
} }
// get the number of providers // get the number of providers
uint16_t num_providers = it->second.as<uint16_t>(); uint16_t num_providers = it->second.as<uint16_t>();
sdskv_provider_handle_t ph;
for(uint16_t provider_id = 0 ; provider_id < num_providers; provider_id++) { for(uint16_t provider_id = 0 ; provider_id < num_providers; provider_id++) {
ret = sdskv_provider_handle_create(m_sdskv_client, addr, provider_id, &ph); std::vector<sdskv::database> dbs;
if(ret != SDSKV_SUCCESS) { try {
cleanup(); sdskv::provider_handle ph(m_sdskv_client, addr, provider_id);
throw Exception("sdskv_provider_handle_create failed"); dbs = m_sdskv_client.open(ph);
} } catch(...) {
size_t db_count = 256;
ret = sdskv_count_databases(ph, &db_count);
if(ret != SDSKV_SUCCESS) {
sdskv_provider_handle_release(ph);
cleanup(); cleanup();
throw Exception("sdskv_count_databases failed"); throw Exception("sdskv_count_databases failed");
} }
std::cerr << "Found " << db_count << " databases" << std::endl; if(dbs.size() == 0) {
if(db_count == 0) {
continue; continue;
} }
std::vector<sdskv_database_id_t> db_ids(db_count); for(auto& db : dbs)
std::vector<char*> db_names(db_count);
ret = sdskv_list_databases(ph, &db_count, db_names.data(), db_ids.data());
if(ret != SDSKV_SUCCESS) {
sdskv_provider_handle_release(ph);
cleanup();
throw Exception("sdskv_list_databases failed");
}
std::cout << "db_count is now " << db_count << std::endl;
unsigned i = 0;
for(auto id : db_ids) {
std::cout << "Database: " << id << " " << db_names[i] << std::endl;
database db;
sdskv_provider_handle_ref_incr(ph);
db.m_sdskv_ph = ph;
db.m_sdskv_db = id;
m_databases.push_back(db); m_databases.push_back(db);
i += 1;
}
sdskv_provider_handle_release(ph);
} }
} }
// initialize ch-placement for the SDSKV providers // initialize ch-placement for the SDSKV providers
...@@ -128,10 +99,8 @@ class DataStore::Impl { ...@@ -128,10 +99,8 @@ class DataStore::Impl {
} }
void cleanup() { void cleanup() {
for(const auto& db : m_databases) { m_databases.clear();
sdskv_provider_handle_release(db.m_sdskv_ph); m_sdskv_client = sdskv::client();
}
sdskv_client_finalize(m_sdskv_client);
if(m_chi_sdskv) if(m_chi_sdskv)
ch_placement_finalize(m_chi_sdskv); ch_placement_finalize(m_chi_sdskv);
for(auto& addr : m_addrs) { for(auto& addr : m_addrs) {
...@@ -181,46 +150,63 @@ class DataStore::Impl { ...@@ -181,46 +150,63 @@ class DataStore::Impl {
} }
} }
static inline std::string buildKey(
uint8_t level,
const std::string& containerName,
const std::string& objectName) {
size_t c = 1 + objectName.size();
if(!containerName.empty()) c += containerName.size() + 1;
std::string result(c,'\0');
result[0] = level;
if(!containerName.empty()) {
std::memcpy(&result[1], containerName.data(), containerName.size());
size_t x = 1+containerName.size();
result[x] = '/';
std::memcpy(&result[x+1], objectName.data(), objectName.size());
} else {
std::memcpy(&result[1], objectName.data(), objectName.size());
}
return result;
}
public: public:
bool load(uint8_t level, const std::string& containerName, bool load(uint8_t level, const std::string& containerName,
const std::string& objectName, std::vector<char>& data) const { const std::string& objectName, std::vector<char>& data) const {
int ret; int ret;
// build full name // build key
std::stringstream ss; auto key = buildKey(level, containerName, objectName);
if(!containerName.empty())
ss << containerName << "/";
ss << objectName;
// hash the name to get the provider id // hash the name to get the provider id
long unsigned sdskv_provider_idx = 0; long unsigned sdskv_db_idx = 0;
uint64_t name_hash; uint64_t name_hash;
if(level != 0) { if(level != 0) {
name_hash = std::hash<std::string>()(containerName); name_hash = std::hash<std::string>()(containerName);
} else { } else {
// use the complete name for final objects (level 0) // use the complete name for final objects (level 0)
name_hash = std::hash<std::string>()(ss.str()); name_hash = std::hash<std::string>()(key);
} }
ch_placement_find_closest(m_chi_sdskv, name_hash, 1, &sdskv_provider_idx); ch_placement_find_closest(m_chi_sdskv, name_hash, 1, &sdskv_db_idx);
// make corresponding datastore entry // make corresponding datastore entry
DataStoreEntryPtr entry = make_datastore_entry(level, ss.str()); auto& db = m_databases[sdskv_db_idx];
auto& db = m_databases[sdskv_provider_idx];
auto sdskv_ph = db.m_sdskv_ph;
auto db_id = db.m_sdskv_db;
// read the value // read the value
// find the size of the value, as a way to check if the key exists // find the size of the value, as a way to check if the key exists
// XXX optimization: instead of getting the size, we could try getting
// the data with a certain size and retry with the right size if it doesn't work
hg_size_t vsize; hg_size_t vsize;
ret = sdskv_length(sdskv_ph, db_id, entry->raw(), entry->length(), &vsize); try {
if(ret == SDSKV_ERR_UNKNOWN_KEY) { vsize = db.length(key);
return false; } catch(sdskv::exception& ex) {
} if(ex.error() == SDSKV_ERR_UNKNOWN_KEY)
if(ret != SDSKV_SUCCESS) { return false;
throw Exception("Error occured when calling sdskv_length"); else
throw Exception("Error occured when calling sdskv::database::length");
} }
data.resize(vsize); data.resize(vsize);
ret = sdskv_get(sdskv_ph, db_id, entry->raw(), entry->length(), data.data(), &vsize); try {
if(ret != SDSKV_SUCCESS) { db.get(key, data);
throw Exception("Error occured when calling sdskv_get"); } catch(sdskv::exception& ex) {
throw Exception("Error occured when calling sdskv::database::get");
} }
return true; return true;
} }
...@@ -228,38 +214,34 @@ class DataStore::Impl { ...@@ -228,38 +214,34 @@ class DataStore::Impl {
ProductID store(uint8_t level, const std::string& containerName, ProductID store(uint8_t level, const std::string& containerName,
const std::string& objectName, const std::vector<char>& data) { const std::string& objectName, const std::vector<char>& data) {
// build full name // build full name
std::stringstream ss; auto key = buildKey(level, containerName, objectName);
if(!containerName.empty())
ss << containerName << "/";
ss << objectName;
// Create the product id // Create the product id
ProductID product_id(level, containerName, objectName); ProductID product_id(level, containerName, objectName);
// hash the name to get the provider id // hash the name to get the provider id
long unsigned sdskv_provider_idx = 0; long unsigned sdskv_db_idx = 0;
uint64_t name_hash; uint64_t name_hash;
if(level != 0) { if(level != 0) {
name_hash = std::hash<std::string>()(containerName); name_hash = std::hash<std::string>()(containerName);
} else { } else {
// use the complete name for final objects (level 0) // use the complete name for final objects (level 0)
name_hash = std::hash<std::string>()(ss.str()); name_hash = std::hash<std::string>()(key);
} }
ch_placement_find_closest(m_chi_sdskv, name_hash, 1, &sdskv_provider_idx); ch_placement_find_closest(m_chi_sdskv, name_hash, 1, &sdskv_db_idx);
// make corresponding datastore entry key const auto& db = m_databases[sdskv_db_idx];
DataStoreEntryPtr entry = make_datastore_entry(level, ss.str());
const auto& sdskv_info = m_databases[sdskv_provider_idx];
auto sdskv_ph = sdskv_info.m_sdskv_ph;
auto db_id = sdskv_info.m_sdskv_db;
// check if the key exists // check if the key exists
hg_size_t vsize; bool key_exists;
int ret = sdskv_length(sdskv_ph, db_id, entry->raw(), entry->length(), &vsize); try {
if(ret == HG_SUCCESS) return ProductID(); // key already exists key_exists = db.exists(key);
if(ret != SDSKV_ERR_UNKNOWN_KEY) { // there was a problem with sdskv std::cerr << "In store(), key_exists = " << key_exists << " for key = " << key << std::endl;
throw Exception("Could not check if key exists in SDSKV (sdskv_length error)"); } catch(sdskv::exception& ex) {
throw Exception("Could not check if key exists in SDSKV (sdskv::database::exists error)");
} }
// if it's not a last-level data entry (data product), store in sdskeyval if(key_exists) return ProductID();
ret = sdskv_put(sdskv_ph, db_id, entry->raw(), entry->length(), data.data(), data.size());
if(ret != SDSKV_SUCCESS) { try {
throw Exception("Could not put key/value pair in SDSKV (sdskv_put error)"); db.put(key, data);
} catch(sdskv::exception& ex) {
throw Exception("Could not put key/value pair in SDSKV (sdskv::database::put error)");
} }
return product_id; return product_id;
} }
...@@ -269,63 +251,35 @@ class DataStore::Impl { ...@@ -269,63 +251,35 @@ class DataStore::Impl {
std::vector<std::string>& keys, size_t maxKeys) const { std::vector<std::string>& keys, size_t maxKeys) const {
int ret; int ret;
if(level == 0) return 0; // cannot iterate at object level if(level == 0) return 0; // cannot iterate at object level
// build full name from lower bound key
std::stringstream ss;
if(!containerName.empty())
ss << containerName << "/";
ss << lower;
// hash the name to get the provider id // hash the name to get the provider id
long unsigned provider_idx = 0; long unsigned db_idx = 0;
uint64_t h = std::hash<std::string>()(containerName); uint64_t h = std::hash<std::string>()(containerName);
ch_placement_find_closest(m_chi_sdskv, h, 1, &provider_idx); ch_placement_find_closest(m_chi_sdskv, h, 1, &db_idx);
// make an entry for the lower bound // make an entry for the lower bound
DataStoreEntryPtr lb_entry = make_datastore_entry(level, ss.str()); auto lb_entry = buildKey(level, containerName, lower);
// create data structures to receive keys
std::vector<DataStoreEntryPtr> keys_ent;
std::vector<void*> keys_ptr(maxKeys);
std::vector<hg_size_t> keys_len(maxKeys);
for(unsigned i=0; i < maxKeys; i++) {
keys_ent.push_back(make_datastore_entry(level, 1024));
keys_ptr[i] = keys_ent[i]->raw();
keys_len[i] = sizeof(DataStoreEntry) + 1024;
}
// get provider and database // get provider and database
const auto& sdskv_info = m_databases[provider_idx]; const auto& db = m_databases[db_idx];
auto ph = sdskv_info.m_sdskv_ph; // ignore keys that don't have the same level or the same prefix
auto db_id = sdskv_info.m_sdskv_db; std::string prefix(2+containerName.size(), '\0');
prefix[0] = level;
if(containerName.size() != 0) {
std::memcpy(&prefix[1], containerName.data(), containerName.size());
prefix[prefix.size()-1] = '/';
} else {
prefix.resize(1);
}
// issue an sdskv_list_keys // issue an sdskv_list_keys
hg_size_t max_keys = maxKeys; std::vector<std::string> entries(maxKeys);
ret = sdskv_list_keys(ph, db_id, lb_entry->raw(), lb_entry->length(), try {
keys_ptr.data(), keys_len.data(), &max_keys); db.list_keys(lb_entry, prefix, entries);
if(ret != HG_SUCCESS) { } catch(...) {
throw Exception("Error occured when calling sdskv_list_keys"); throw Exception("Error occured when calling sdskv::database::list_keys");
} }
unsigned i = max_keys - 1;
if(max_keys == 0) return 0;
// remove keys that don't have the same level or the same prefix
std::string prefix = containerName + "/";
keys.resize(0); keys.resize(0);
for(unsigned i = 0; i < max_keys; i++) { for(const auto& entry : entries) {
if(keys_ent[i]->m_level != level) { keys.emplace_back(&entry[1], entry.size()-1);
max_keys = i;
break;
}
if(!containerName.empty()) {
size_t lenpre = prefix.size();
size_t lenstr = strlen(keys_ent[i]->m_fullname);
if(lenstr < lenpre) {
max_keys = i;
break;
}
if(strncmp(prefix.c_str(), keys_ent[i]->m_fullname, lenpre) != 0) {
max_keys = i;
break;
}
}
keys.push_back(keys_ent[i]->m_fullname);
} }
// set the resulting keys return keys.size();
return max_keys;
} }
}; };
......
...@@ -10,6 +10,7 @@ int main(int argc, char* argv[]) ...@@ -10,6 +10,7 @@ int main(int argc, char* argv[])
{ {
if(argc != 2) return 1; if(argc != 2) return 1;
sleep(1);
// Create the datastore // Create the datastore
datastore = new hepnos::DataStore(argv[1]); datastore = new hepnos::DataStore(argv[1]);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment