/* * (C) 2018 The University of Chicago * * See COPYRIGHT in top-level directory. */ #ifndef __HEPNOS_PRIVATE_DATASTORE_IMPL #define __HEPNOS_PRIVATE_DATASTORE_IMPL #include #include #include #include #include #include #include #include #include #include "KeyTypes.hpp" #include "ValueTypes.hpp" #include "hepnos/Exception.hpp" #include "hepnos/DataStore.hpp" #include "hepnos/DataSet.hpp" namespace hepnos { //////////////////////////////////////////////////////////////////////////////////////////// // DataStore::Impl implementation //////////////////////////////////////////////////////////////////////////////////////////// class DataStore::Impl { public: margo_instance_id m_mid; // Margo instance std::unordered_map m_addrs; // Addresses used by the service sdskv_client_t m_sdskv_client; // SDSKV client bake_client_t m_bake_client; // BAKE client std::vector m_sdskv_ph; // list of SDSKV provider handlers std::vector m_sdskv_db; // list of SDSKV database ids struct ch_placement_instance* m_chi_sdskv; // ch-placement instance for SDSKV std::vector m_bake_ph; // list of BAKE provider handlers std::vector m_bake_targets; // list of BAKE target ids struct ch_placement_instance* m_chi_bake; // ch-placement instance for BAKE const DataStore::iterator m_end; // iterator for the end() of the DataStore Impl(DataStore* parent) : m_mid(MARGO_INSTANCE_NULL) , m_sdskv_client(SDSKV_CLIENT_NULL) , m_chi_sdskv(nullptr) , m_bake_client(BAKE_CLIENT_NULL) , m_chi_bake(nullptr) , m_end() {} void init(const std::string& configFile) { int ret; hg_return_t hret; YAML::Node config = YAML::LoadFile(configFile); checkConfig(config); // get protocol std::string proto = config["hepnos"]["client"]["protocol"].as(); // initialize Margo m_mid = margo_init(proto.c_str(), MARGO_CLIENT_MODE, 0, 0); if(!m_mid) { cleanup(); throw Exception("Could not initialized Margo"); } // initialize SDSKV client ret = sdskv_client_init(m_mid, &m_sdskv_client); if(ret != SDSKV_SUCCESS) { cleanup(); throw Exception("Could not create SDSKV client"); } // initialize BAKE client ret = bake_client_init(m_mid, &m_bake_client); if(ret != 0) { cleanup(); throw Exception("Could not create BAKE client"); } // create list of sdskv provider handles YAML::Node sdskv = config["hepnos"]["providers"]["sdskv"]; for(YAML::const_iterator it = sdskv.begin(); it != sdskv.end(); it++) { std::string str_addr = it->first.as(); hg_addr_t addr; if(m_addrs.count(str_addr) != 0) { addr = m_addrs[str_addr]; } else { hret = margo_addr_lookup(m_mid, str_addr.c_str(), &addr); if(hret != HG_SUCCESS) { margo_addr_free(m_mid,addr); cleanup(); throw Exception("margo_addr_lookup failed"); } m_addrs[str_addr] = addr; } // get the provider id(s) if(it->second.IsScalar()) { uint16_t provider_id = it->second.as(); sdskv_provider_handle_t ph; ret = sdskv_provider_handle_create(m_sdskv_client, addr, provider_id, &ph); if(ret != SDSKV_SUCCESS) { cleanup(); throw Exception("sdskv_provider_handle_create failed"); } m_sdskv_ph.push_back(ph); } else if(it->second.IsSequence()) { for(YAML::const_iterator pid = it->second.begin(); pid != it->second.end(); pid++) { uint16_t provider_id = pid->second.as(); sdskv_provider_handle_t ph; ret = sdskv_provider_handle_create(m_sdskv_client, addr, provider_id, &ph); if(ret != SDSKV_SUCCESS) { cleanup(); throw Exception("sdskv_provider_handle_create failed"); } m_sdskv_ph.push_back(ph); } } } // loop over sdskv providers and get the database id for(auto ph : m_sdskv_ph) { sdskv_database_id_t db_id; ret = sdskv_open(ph, "hepnosdb", &db_id); if(ret != SDSKV_SUCCESS) { cleanup(); throw Exception("sdskv_open failed to open database"); } m_sdskv_db.push_back(db_id); } // initialize ch-placement for the SDSKV providers m_chi_sdskv = ch_placement_initialize("hash_lookup3", m_sdskv_ph.size(), 4, 0); // get list of bake provider handles YAML::Node bake = config["hepnos"]["providers"]["bake"]; if(bake) { for(YAML::const_iterator it = bake.begin(); it != bake.end(); it++) { // get the address of a bake provider std::string str_addr = it->first.as(); hg_addr_t addr; if(m_addrs.count(str_addr) != 0) { addr = m_addrs[str_addr]; } else { // lookup the address hret = margo_addr_lookup(m_mid, str_addr.c_str(), &addr); if(hret != HG_SUCCESS) { margo_addr_free(m_mid, addr); cleanup(); throw Exception("margo_addr_lookup failed"); } m_addrs[str_addr] = addr; } if(it->second.IsScalar()) { uint16_t provider_id = it->second.as(); bake_provider_handle_t ph; ret = bake_provider_handle_create(m_bake_client, addr, provider_id, &ph); if(ret != 0) { cleanup(); throw Exception("bake_provider_handle_create failed"); } m_bake_ph.push_back(ph); } else if(it->second.IsSequence()) { for(YAML::const_iterator pid = it->second.begin(); pid != it->second.end(); pid++) { uint16_t provider_id = pid->second.as(); bake_provider_handle_t ph; ret = bake_provider_handle_create(m_bake_client, addr, provider_id, &ph); if(ret != 0) { cleanup(); throw Exception("bake_provider_handle_create failed"); } m_bake_ph.push_back(ph); } } // if(it->second.IsSequence()) } // for loop // find out the bake targets at each bake provider for(auto& bake_ph : m_bake_ph) { bake_target_id_t bti; uint64_t num_targets = 0; ret = bake_probe(bake_ph, 1, &bti, &num_targets); if(ret != BAKE_SUCCESS) { throw Exception("bake_probe failed to retrieve targets"); } if(num_targets != 1) { throw Exception("bake_prove returned no target"); } m_bake_targets.push_back(bti); } } // initialize ch-placement for the bake providers if(m_bake_ph.size()) { m_chi_bake = ch_placement_initialize("hash_lookup3", m_bake_ph.size(), 4, 0); } } void cleanup() { for(auto ph : m_sdskv_ph) { sdskv_provider_handle_release(ph); } for(auto ph : m_bake_ph) { bake_provider_handle_release(ph); } sdskv_client_finalize(m_sdskv_client); bake_client_finalize(m_bake_client); if(m_chi_sdskv) ch_placement_finalize(m_chi_sdskv); if(m_chi_bake) ch_placement_finalize(m_chi_bake); for(auto& addr : m_addrs) { margo_addr_free(m_mid, addr.second); } if(m_mid) margo_finalize(m_mid); } private: static void checkConfig(YAML::Node& config) { // config file starts with hepnos entry auto hepnosNode = config["hepnos"]; if(!hepnosNode) { throw Exception("\"hepnos\" entry not found in YAML file"); } // hepnos entry has client entry auto clientNode = hepnosNode["client"]; if(!clientNode) { throw Exception("\"client\" entry not found in \"hepnos\" section"); } // client entry has protocol entry auto protoNode = clientNode["protocol"]; if(!protoNode) { throw Exception("\"protocol\" entry not found in \"client\" section"); } // hepnos entry has providers entry auto providersNode = hepnosNode["providers"]; if(!providersNode) { throw Exception("\"providers\" entry not found in \"hepnos\" section"); } // provider entry has sdskv entry auto sdskvNode = providersNode["sdskv"]; if(!sdskvNode) { throw Exception("\"sdskv\" entry not found in \"providers\" section"); } // sdskv entry is not empty if(sdskvNode.size() == 0) { throw Exception("No provider found in \"sdskv\" section"); } // for each sdskv entry for(auto it = sdskvNode.begin(); it != sdskvNode.end(); it++) { if(it->second.IsScalar()) continue; // one provider id given if(it->second.IsSequence()) { // array of provider ids given // the sequence is not empty if(it->second.size() == 0) { throw Exception("Empty array of provider ids encountered in \"sdskv\" section"); } // all objects in the sequence are scalar and appear only once std::unordered_set ids; for(auto pid = it->second.begin(); pid != it->second.end(); pid++) { if(!pid->second.IsScalar()) { throw Exception("Non-scalar provider id encountered in \"sdskv\" section"); } uint16_t pid_int = pid->as(); if(ids.count(pid_int) != 0) { throw Exception("Provider id encountered twice in \"sdskv\" section"); } ids.insert(pid_int); } } else { throw Exception("Invalid value type for provider in \"sdskv\" section"); } } // bake providers are not mandatory. If they are not present, // objects will be stored in sdskv providers. auto bakeNode = providersNode["bake"]; if(!bakeNode) return; if(bakeNode.size() == 0) return; for(auto it = bakeNode.begin(); it != bakeNode.end(); it++) { if(it->second.IsScalar()) continue; // one provider id given if(it->second.IsSequence()) { // array of provider ids given if(it->second.size() == 0) { throw Exception("No provider found in \"bake\" section"); } std::unordered_set ids; for(auto pid = it->second.begin(); pid != it->second.end(); pid++) { if(!pid->second.IsScalar()) { throw Exception("Non-scalar provider id encountered in \"bake\" section"); } uint16_t pid_int = pid->as(); if(ids.count(pid_int) != 0) { throw Exception("Provider id encountered twice in \"bake\" section"); } ids.insert(pid_int); } } else { throw Exception("Invalid value type for provider in \"bake\" section"); } } } public: bool load(uint8_t level, const std::string& containerName, const std::string& objectName, std::vector& data) const { int ret; // build full name std::stringstream ss; if(!containerName.empty()) ss << containerName << "/"; ss << objectName; // hash the name to get the provider id long unsigned sdskv_provider_idx = 0; uint64_t name_hash; if(level != 0) { name_hash = std::hash()(containerName); } else { // use the complete name for final objects (level 0) name_hash = std::hash()(ss.str()); } ch_placement_find_closest(m_chi_sdskv, name_hash, 1, &sdskv_provider_idx); // make corresponding datastore entry DataStoreEntryPtr entry = make_datastore_entry(level, ss.str()); auto sdskv_ph = m_sdskv_ph[sdskv_provider_idx]; auto db_id = m_sdskv_db[sdskv_provider_idx]; // read the value if(level != 0 || m_bake_ph.empty()) { // read directly from sdskv // find the size of the value, as a way to check if the key exists hg_size_t vsize; ret = sdskv_length(sdskv_ph, db_id, entry->raw(), entry->length(), &vsize); if(ret == SDSKV_ERR_UNKNOWN_KEY) { return false; } if(ret != SDSKV_SUCCESS) { throw Exception("Error occured when calling sdskv_length"); } data.resize(vsize); ret = sdskv_get(sdskv_ph, db_id, entry->raw(), entry->length(), data.data(), &vsize); if(ret != SDSKV_SUCCESS) { throw Exception("Error occured when calling sdskv_get"); } } else { // read from BAKE // first get the key/val from sdskv DataStoreValue rid_info; hg_size_t vsize = sizeof(rid_info); ret = sdskv_get(sdskv_ph, db_id, entry->raw(), entry->length(), (void*)(&rid_info), &vsize); if(ret == SDSKV_ERR_UNKNOWN_KEY) { return false; } if(ret != SDSKV_SUCCESS) { throw Exception("Error occured when calling sdskv_get"); } if(vsize != sizeof(rid_info)) { throw Exception("Call to sdskv_get returned a value of unexpected size"); } // now read the data from bake data.resize(rid_info.getDataSize()); if(data.size() == 0) return true; long unsigned bake_provider_idx = 0; ch_placement_find_closest(m_chi_bake, name_hash, 1, &bake_provider_idx); auto bake_ph = m_bake_ph[bake_provider_idx]; auto target = m_bake_targets[bake_provider_idx]; uint64_t bytes_read = 0; ret = bake_read(bake_ph, rid_info.getBakeRegionID(), 0, data.data(), data.size(), &bytes_read); if(ret != BAKE_SUCCESS) { throw Exception("Couldn't read region from BAKE"); } if(bytes_read != rid_info.getDataSize()) { throw Exception("Bytes read from BAKE did not match expected object size"); } } return true; } bool store(uint8_t level, const std::string& containerName, const std::string& objectName, const std::vector& data) { // build full name std::stringstream ss; if(!containerName.empty()) ss << containerName << "/"; ss << objectName; // hash the name to get the provider id long unsigned sdskv_provider_idx = 0; uint64_t name_hash; if(level != 0) { name_hash = std::hash()(containerName); } else { // use the complete name for final objects (level 0) name_hash = std::hash()(ss.str()); } ch_placement_find_closest(m_chi_sdskv, name_hash, 1, &sdskv_provider_idx); // make corresponding datastore entry key DataStoreEntryPtr entry = make_datastore_entry(level, ss.str()); auto sdskv_ph = m_sdskv_ph[sdskv_provider_idx]; auto db_id = m_sdskv_db[sdskv_provider_idx]; // check if the key exists hg_size_t vsize; int ret = sdskv_length(sdskv_ph, db_id, entry->raw(), entry->length(), &vsize); if(ret == HG_SUCCESS) return false; // key already exists if(ret != SDSKV_ERR_UNKNOWN_KEY) { // there was a problem with sdskv throw Exception("Could not check if key exists in SDSKV (sdskv_length error)"); } // if it's not a last-level data entry (data product), store in sdskeyval if(level != 0 || m_bake_ph.empty()) { ret = sdskv_put(sdskv_ph, db_id, entry->raw(), entry->length(), data.data(), data.size()); if(ret != SDSKV_SUCCESS) { throw Exception("Could not put key/value pair in SDSKV (sdskv_put error)"); } } else { // store data in bake long unsigned bake_provider_idx = 0; ch_placement_find_closest(m_chi_bake, name_hash, 1, &bake_provider_idx); auto bake_ph = m_bake_ph[bake_provider_idx]; auto target = m_bake_targets[bake_provider_idx]; bake_region_id_t rid; ret = bake_create_write_persist(bake_ph, target, data.data(), data.size(), &rid); if(ret != BAKE_SUCCESS) { throw Exception("Could not create bake region (bake_create_write_persist error)"); } // create Value to put in SDSKV DataStoreValue value(data.size(), bake_provider_idx, rid); ret = sdskv_put(sdskv_ph, db_id, entry->raw(), entry->length(), (void*)(&value), sizeof(value)); if(ret != SDSKV_SUCCESS) { ret = bake_remove(bake_ph, rid); if(ret != BAKE_SUCCESS) { throw Exception("Dude, not only did SDSKV fail to put the key, but I couldn't cleanup BAKE. Is it Friday 13?"); } throw Exception("Could not put key/value pair in SDSKV (sdskv_put error)"); } } return true; } size_t nextKeys(uint8_t level, const std::string& containerName, const std::string& lower, std::vector& keys, size_t maxKeys) const { int ret; if(level == 0) return 0; // cannot iterate at object level // build full name from lower bound key std::stringstream ss; if(!containerName.empty()) ss << containerName << "/"; ss << lower; // hash the name to get the provider id long unsigned provider_idx = 0; uint64_t h = std::hash()(containerName); ch_placement_find_closest(m_chi_sdskv, h, 1, &provider_idx); // make an entry for the lower bound DataStoreEntryPtr lb_entry = make_datastore_entry(level, ss.str()); // create data structures to receive keys std::vector keys_ent; std::vector keys_ptr(maxKeys); std::vector keys_len(maxKeys); for(unsigned i=0; i < maxKeys; i++) { keys_ent.push_back(make_datastore_entry(level, 1024)); keys_ptr[i] = keys_ent[i]->raw(); keys_len[i] = sizeof(DataStoreEntry) + 1024; } // get provider and database auto ph = m_sdskv_ph[provider_idx]; auto db_id = m_sdskv_db[provider_idx]; // issue an sdskv_list_keys hg_size_t max_keys = maxKeys; ret = sdskv_list_keys(ph, db_id, lb_entry->raw(), lb_entry->length(), keys_ptr.data(), keys_len.data(), &max_keys); if(ret != HG_SUCCESS) { throw Exception("Error occured when calling sdskv_list_keys"); } unsigned i = max_keys - 1; if(max_keys == 0) return 0; // remove keys that don't have the same level or the same prefix std::string prefix = containerName + "/"; keys.resize(0); for(unsigned i = 0; i < max_keys; i++) { if(keys_ent[i]->m_level != level) { max_keys = i; break; } if(!containerName.empty()) { size_t lenpre = prefix.size(); size_t lenstr = strlen(keys_ent[i]->m_fullname); if(lenstr < lenpre) { max_keys = i; break; } if(strncmp(prefix.c_str(), keys_ent[i]->m_fullname, lenpre) != 0) { max_keys = i; break; } } keys.push_back(keys_ent[i]->m_fullname); } // set the resulting keys return max_keys; } }; } #endif