diff --git a/bin/hepnos-daemon.cpp b/bin/hepnos-daemon.cpp index 0736a7e5e44e5b3f980b63c826600033ac7ec4cf..6b1d5b65e9d7cd622f2e9f794c5dd5a5858db309 100644 --- a/bin/hepnos-daemon.cpp +++ b/bin/hepnos-daemon.cpp @@ -20,14 +20,14 @@ void usage(void) { fprintf(stderr, "Usage: hepnos-daemon \n"); - fprintf(stderr, " the Mercury address to listen on (e.g. tcp://)\n"); - fprintf(stderr, " path to the YAML file to generate for clients\n"); + fprintf(stderr, " path to the YAML file containing the service configuration\n"); + fprintf(stderr, " path to the YAML file to generate for clients\n"); exit(-1); } int main(int argc, char *argv[]) { - char* listen_addr; + char* connection_file; char* config_file; int rank; @@ -42,10 +42,10 @@ int main(int argc, char *argv[]) exit(0); } - listen_addr = argv[1]; - config_file = argv[2]; + config_file = argv[1]; + connection_file = argv[2]; - hepnos_run_service(MPI_COMM_WORLD, listen_addr, config_file); + hepnos_run_service(MPI_COMM_WORLD, config_file, connection_file); MPI_Finalize(); } diff --git a/include/hepnos-service.h b/include/hepnos-service.h index 59093b537a38482182e482338b6415e70823edb1..96c269be4fe84d25ac5b75d801f556e33518580e 100644 --- a/include/hepnos-service.h +++ b/include/hepnos-service.h @@ -7,7 +7,7 @@ extern "C" { #endif -void hepnos_run_service(MPI_Comm comm, const char* listen_addr, const char* config_file); +void hepnos_run_service(MPI_Comm comm, const char* config_file, const char* connection_file); #ifdef __cplusplus } diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index bc2654df81a44fc23b277eb4a7e67059c6127662..a6786b78a030c1e21914a5d4e6ad08abc23045e3 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -6,7 +6,9 @@ set(hepnos-src DataStore.cpp SubRun.cpp Event.cpp) -set(hepnos-service-src service/HEPnOSService.cpp) +set(hepnos-service-src service/HEPnOSService.cpp + service/ServiceConfig.cpp + service/ConnectionInfoGenerator.cpp) # load package helper for generating cmake CONFIG packages include (CMakePackageConfigHelpers) @@ -19,7 +21,7 @@ set (hepnos-pkg "share/cmake/hepnos") # set (HEPNOS_VERSION_MAJOR 0) set (HEPNOS_VERSION_MINOR 1) -set (HEPNOS_VERSION_PATCH 0) +set (HEPNOS_VERSION_PATCH 2) set (hepnos-vers "${HEPNOS_VERSION_MAJOR}.${HEPNOS_VERSION_MINOR}") set (HEPNOS_VERSION "${hepnos-vers}.${HEPNOS_VERSION_PATCH}") @@ -44,6 +46,8 @@ target_include_directories (hepnos-service PUBLIC $) # local include's BEFORE, in case old incompatable .h files in prefix/include target_include_directories (hepnos-service BEFORE PUBLIC $) +target_include_directories (hepnos-service BEFORE PUBLIC + $) # for shared libs, establish the lib version set_target_properties (hepnos-service diff --git a/src/DataStore.cpp b/src/DataStore.cpp index c46c2dc90a3322cdf2f7d80159dc8b3d70d48979..befd3bdfd6d1329c70f27991c593883f8ffb0722 100644 --- a/src/DataStore.cpp +++ b/src/DataStore.cpp @@ -163,7 +163,7 @@ DataSet DataStore::createDataSet(const std::string& name) { void DataStore::shutdown() { for(auto addr : m_impl->m_addrs) { - margo_shutdown_remote_instance(m_impl->m_mid, addr); + margo_shutdown_remote_instance(m_impl->m_mid, addr.second); } } diff --git a/src/private/DataStoreImpl.hpp b/src/private/DataStoreImpl.hpp index 55287ecb04372d052ca7f553d3607f5c47d7d22d..77b5c2297ae93f9b95f1ac48c48acd1109f52a82 100644 --- a/src/private/DataStoreImpl.hpp +++ b/src/private/DataStoreImpl.hpp @@ -8,6 +8,7 @@ #include #include +#include #include #include #include @@ -15,6 +16,7 @@ #include #include #include "KeyTypes.hpp" +#include "ValueTypes.hpp" #include "hepnos/Exception.hpp" #include "hepnos/DataStore.hpp" #include "hepnos/DataSet.hpp" @@ -28,16 +30,17 @@ namespace hepnos { class DataStore::Impl { public: - margo_instance_id m_mid; // Margo instance - std::unordered_set m_addrs; // Addresses used by the service - sdskv_client_t m_sdskv_client; // SDSKV client - bake_client_t m_bake_client; // BAKE client - std::vector m_sdskv_ph; // list of SDSKV provider handlers - std::vector m_sdskv_db; // list of SDSKV database ids - struct ch_placement_instance* m_chi_sdskv; // ch-placement instance for SDSKV - std::vector m_bake_ph; // list of BAKE provider handlers - struct ch_placement_instance* m_chi_bake; // ch-placement instance for BAKE - const DataStore::iterator m_end; // iterator for the end() of the DataStore + margo_instance_id m_mid; // Margo instance + std::unordered_map m_addrs; // Addresses used by the service + sdskv_client_t m_sdskv_client; // SDSKV client + bake_client_t m_bake_client; // BAKE client + std::vector m_sdskv_ph; // list of SDSKV provider handlers + std::vector m_sdskv_db; // list of SDSKV database ids + struct ch_placement_instance* m_chi_sdskv; // ch-placement instance for SDSKV + std::vector m_bake_ph; // list of BAKE provider handlers + std::vector m_bake_targets; // list of BAKE target ids + struct ch_placement_instance* m_chi_bake; // ch-placement instance for BAKE + const DataStore::iterator m_end; // iterator for the end() of the DataStore Impl(DataStore* parent) : m_mid(MARGO_INSTANCE_NULL) @@ -77,18 +80,22 @@ class DataStore::Impl { for(YAML::const_iterator it = sdskv.begin(); it != sdskv.end(); it++) { std::string str_addr = it->first.as(); hg_addr_t addr; - hret = margo_addr_lookup(m_mid, str_addr.c_str(), &addr); - if(hret != HG_SUCCESS) { - margo_addr_free(m_mid,addr); - cleanup(); - throw Exception("margo_addr_lookup failed"); + if(m_addrs.count(str_addr) != 0) { + addr = m_addrs[str_addr]; + } else { + hret = margo_addr_lookup(m_mid, str_addr.c_str(), &addr); + if(hret != HG_SUCCESS) { + margo_addr_free(m_mid,addr); + cleanup(); + throw Exception("margo_addr_lookup failed"); + } + m_addrs[str_addr] = addr; } - m_addrs.insert(addr); + // get the provider id(s) if(it->second.IsScalar()) { uint16_t provider_id = it->second.as(); sdskv_provider_handle_t ph; ret = sdskv_provider_handle_create(m_sdskv_client, addr, provider_id, &ph); - margo_addr_free(m_mid, addr); if(ret != SDSKV_SUCCESS) { cleanup(); throw Exception("sdskv_provider_handle_create failed"); @@ -99,7 +106,6 @@ class DataStore::Impl { uint16_t provider_id = pid->second.as(); sdskv_provider_handle_t ph; ret = sdskv_provider_handle_create(m_sdskv_client, addr, provider_id, &ph); - margo_addr_free(m_mid, addr); if(ret != SDSKV_SUCCESS) { cleanup(); throw Exception("sdskv_provider_handle_create failed"); @@ -120,40 +126,60 @@ class DataStore::Impl { } // initialize ch-placement for the SDSKV providers m_chi_sdskv = ch_placement_initialize("hash_lookup3", m_sdskv_ph.size(), 4, 0); + // get list of bake provider handles YAML::Node bake = config["hepnos"]["providers"]["bake"]; - for(YAML::const_iterator it = bake.begin(); it != bake.end(); it++) { - std::string str_addr = it->first.as(); - hg_addr_t addr; - hret = margo_addr_lookup(m_mid, str_addr.c_str(), &addr); - if(hret != HG_SUCCESS) { - margo_addr_free(m_mid, addr); - cleanup(); - throw Exception("margo_addr_lookup failed"); - } - m_addrs.insert(addr); - if(it->second.IsScalar()) { - uint16_t provider_id = it->second.as(); - bake_provider_handle_t ph; - ret = bake_provider_handle_create(m_bake_client, addr, provider_id, &ph); - margo_addr_free(m_mid, addr); - if(ret != 0) { - cleanup(); - throw Exception("bake_provider_handle_create failed"); + if(bake) { + for(YAML::const_iterator it = bake.begin(); it != bake.end(); it++) { + // get the address of a bake provider + std::string str_addr = it->first.as(); + hg_addr_t addr; + if(m_addrs.count(str_addr) != 0) { + addr = m_addrs[str_addr]; + } else { + // lookup the address + hret = margo_addr_lookup(m_mid, str_addr.c_str(), &addr); + if(hret != HG_SUCCESS) { + margo_addr_free(m_mid, addr); + cleanup(); + throw Exception("margo_addr_lookup failed"); + } + m_addrs[str_addr] = addr; } - m_bake_ph.push_back(ph); - } else if(it->second.IsSequence()) { - for(YAML::const_iterator pid = it->second.begin(); pid != it->second.end(); pid++) { - uint16_t provider_id = pid->second.as(); + if(it->second.IsScalar()) { + uint16_t provider_id = it->second.as(); bake_provider_handle_t ph; ret = bake_provider_handle_create(m_bake_client, addr, provider_id, &ph); - margo_addr_free(m_mid, addr); if(ret != 0) { cleanup(); throw Exception("bake_provider_handle_create failed"); } m_bake_ph.push_back(ph); + } else if(it->second.IsSequence()) { + for(YAML::const_iterator pid = it->second.begin(); pid != it->second.end(); pid++) { + uint16_t provider_id = pid->second.as(); + bake_provider_handle_t ph; + ret = bake_provider_handle_create(m_bake_client, addr, provider_id, &ph); + if(ret != 0) { + cleanup(); + throw Exception("bake_provider_handle_create failed"); + } + m_bake_ph.push_back(ph); + } + } // if(it->second.IsSequence()) + } // for loop + // find out the bake targets at each bake provider + for(auto& bake_ph : m_bake_ph) { + bake_target_id_t bti; + uint64_t num_targets = 0; + ret = bake_probe(bake_ph, 1, &bti, &num_targets); + if(ret != BAKE_SUCCESS) { + throw Exception("bake_probe failed to retrieve targets"); } + if(num_targets != 1) { + throw Exception("bake_prove returned no target"); + } + m_bake_targets.push_back(bti); } } // initialize ch-placement for the bake providers @@ -175,45 +201,63 @@ class DataStore::Impl { ch_placement_finalize(m_chi_sdskv); if(m_chi_bake) ch_placement_finalize(m_chi_bake); + for(auto& addr : m_addrs) { + margo_addr_free(m_mid, addr.second); + } if(m_mid) margo_finalize(m_mid); } private: static void checkConfig(YAML::Node& config) { + // config file starts with hepnos entry auto hepnosNode = config["hepnos"]; if(!hepnosNode) { throw Exception("\"hepnos\" entry not found in YAML file"); } + // hepnos entry has client entry auto clientNode = hepnosNode["client"]; if(!clientNode) { throw Exception("\"client\" entry not found in \"hepnos\" section"); } + // client entry has protocol entry auto protoNode = clientNode["protocol"]; if(!protoNode) { throw Exception("\"protocol\" entry not found in \"client\" section"); } + // hepnos entry has providers entry auto providersNode = hepnosNode["providers"]; if(!providersNode) { throw Exception("\"providers\" entry not found in \"hepnos\" section"); } + // provider entry has sdskv entry auto sdskvNode = providersNode["sdskv"]; if(!sdskvNode) { throw Exception("\"sdskv\" entry not found in \"providers\" section"); } + // sdskv entry is not empty if(sdskvNode.size() == 0) { throw Exception("No provider found in \"sdskv\" section"); } + // for each sdskv entry for(auto it = sdskvNode.begin(); it != sdskvNode.end(); it++) { if(it->second.IsScalar()) continue; // one provider id given if(it->second.IsSequence()) { // array of provider ids given + // the sequence is not empty if(it->second.size() == 0) { throw Exception("Empty array of provider ids encountered in \"sdskv\" section"); } + // all objects in the sequence are scalar and appear only once + std::unordered_set ids; for(auto pid = it->second.begin(); pid != it->second.end(); pid++) { if(!pid->second.IsScalar()) { throw Exception("Non-scalar provider id encountered in \"sdskv\" section"); } + uint16_t pid_int = pid->as(); + if(ids.count(pid_int) != 0) { + throw Exception("Provider id encountered twice in \"sdskv\" section"); + } + ids.insert(pid_int); } } else { throw Exception("Invalid value type for provider in \"sdskv\" section"); @@ -230,10 +274,16 @@ class DataStore::Impl { if(it->second.size() == 0) { throw Exception("No provider found in \"bake\" section"); } + std::unordered_set ids; for(auto pid = it->second.begin(); pid != it->second.end(); pid++) { if(!pid->second.IsScalar()) { throw Exception("Non-scalar provider id encountered in \"bake\" section"); } + uint16_t pid_int = pid->as(); + if(ids.count(pid_int) != 0) { + throw Exception("Provider id encountered twice in \"bake\" section"); + } + ids.insert(pid_int); } } else { throw Exception("Invalid value type for provider in \"bake\" section"); @@ -252,33 +302,68 @@ class DataStore::Impl { ss << containerName << "/"; ss << objectName; // hash the name to get the provider id - long unsigned provider_idx = 0; + long unsigned sdskv_provider_idx = 0; + uint64_t name_hash; if(level != 0) { - uint64_t h = std::hash()(containerName); - ch_placement_find_closest(m_chi_sdskv, h, 1, &provider_idx); + name_hash = std::hash()(containerName); } else { // use the complete name for final objects (level 0) - uint64_t h = std::hash()(ss.str()); - ch_placement_find_closest(m_chi_sdskv, h, 1, &provider_idx); + name_hash = std::hash()(ss.str()); } + ch_placement_find_closest(m_chi_sdskv, name_hash, 1, &sdskv_provider_idx); // make corresponding datastore entry DataStoreEntryPtr entry = make_datastore_entry(level, ss.str()); - auto ph = m_sdskv_ph[provider_idx]; - auto db_id = m_sdskv_db[provider_idx]; - // find the size of the value, as a way to check if the key exists - hg_size_t vsize; - ret = sdskv_length(ph, db_id, entry->raw(), entry->length(), &vsize); - if(ret == SDSKV_ERR_UNKNOWN_KEY) { - return false; - } - if(ret != SDSKV_SUCCESS) { - throw Exception("Error occured when calling sdskv_length"); - } + auto sdskv_ph = m_sdskv_ph[sdskv_provider_idx]; + auto db_id = m_sdskv_db[sdskv_provider_idx]; // read the value - data.resize(vsize); - ret = sdskv_get(ph, db_id, entry->raw(), entry->length(), data.data(), &vsize); - if(ret != SDSKV_SUCCESS) { - throw Exception("Error occured when calling sdskv_get"); + if(level != 0 || m_bake_ph.empty()) { // read directly from sdskv + + // find the size of the value, as a way to check if the key exists + hg_size_t vsize; + ret = sdskv_length(sdskv_ph, db_id, entry->raw(), entry->length(), &vsize); + if(ret == SDSKV_ERR_UNKNOWN_KEY) { + return false; + } + if(ret != SDSKV_SUCCESS) { + throw Exception("Error occured when calling sdskv_length"); + } + + data.resize(vsize); + ret = sdskv_get(sdskv_ph, db_id, entry->raw(), entry->length(), data.data(), &vsize); + if(ret != SDSKV_SUCCESS) { + throw Exception("Error occured when calling sdskv_get"); + } + + } else { // read from BAKE + + // first get the key/val from sdskv + DataStoreValue rid_info; + hg_size_t vsize = sizeof(rid_info); + ret = sdskv_get(sdskv_ph, db_id, entry->raw(), entry->length(), (void*)(&rid_info), &vsize); + if(ret == SDSKV_ERR_UNKNOWN_KEY) { + return false; + } + if(ret != SDSKV_SUCCESS) { + throw Exception("Error occured when calling sdskv_get"); + } + if(vsize != sizeof(rid_info)) { + throw Exception("Call to sdskv_get returned a value of unexpected size"); + } + // now read the data from bake + data.resize(rid_info.getDataSize()); + if(data.size() == 0) return true; + long unsigned bake_provider_idx = 0; + ch_placement_find_closest(m_chi_bake, name_hash, 1, &bake_provider_idx); + auto bake_ph = m_bake_ph[bake_provider_idx]; + auto target = m_bake_targets[bake_provider_idx]; + uint64_t bytes_read = 0; + ret = bake_read(bake_ph, rid_info.getBakeRegionID(), 0, data.data(), data.size(), &bytes_read); + if(ret != BAKE_SUCCESS) { + throw Exception("Couldn't read region from BAKE"); + } + if(bytes_read != rid_info.getDataSize()) { + throw Exception("Bytes read from BAKE did not match expected object size"); + } } return true; } @@ -291,29 +376,52 @@ class DataStore::Impl { ss << containerName << "/"; ss << objectName; // hash the name to get the provider id - long unsigned provider_idx = 0; + long unsigned sdskv_provider_idx = 0; + uint64_t name_hash; if(level != 0) { - uint64_t h = std::hash()(containerName); - ch_placement_find_closest(m_chi_sdskv, h, 1, &provider_idx); + name_hash = std::hash()(containerName); } else { // use the complete name for final objects (level 0) - uint64_t h = std::hash()(ss.str()); - ch_placement_find_closest(m_chi_sdskv, h, 1, &provider_idx); + name_hash = std::hash()(ss.str()); } - // make corresponding datastore entry + ch_placement_find_closest(m_chi_sdskv, name_hash, 1, &sdskv_provider_idx); + // make corresponding datastore entry key DataStoreEntryPtr entry = make_datastore_entry(level, ss.str()); - auto ph = m_sdskv_ph[provider_idx]; - auto db_id = m_sdskv_db[provider_idx]; + auto sdskv_ph = m_sdskv_ph[sdskv_provider_idx]; + auto db_id = m_sdskv_db[sdskv_provider_idx]; // check if the key exists hg_size_t vsize; - int ret = sdskv_length(ph, db_id, entry->raw(), entry->length(), &vsize); + int ret = sdskv_length(sdskv_ph, db_id, entry->raw(), entry->length(), &vsize); if(ret == HG_SUCCESS) return false; // key already exists if(ret != SDSKV_ERR_UNKNOWN_KEY) { // there was a problem with sdskv throw Exception("Could not check if key exists in SDSKV (sdskv_length error)"); } - ret = sdskv_put(ph, db_id, entry->raw(), entry->length(), data.data(), data.size()); - if(ret != SDSKV_SUCCESS) { - throw Exception("Could not put key/value pair in SDSKV (sdskv_put error)"); + // if it's not a last-level data entry (data product), store in sdskeyval + if(level != 0 || m_bake_ph.empty()) { + ret = sdskv_put(sdskv_ph, db_id, entry->raw(), entry->length(), data.data(), data.size()); + if(ret != SDSKV_SUCCESS) { + throw Exception("Could not put key/value pair in SDSKV (sdskv_put error)"); + } + } else { // store data in bake + long unsigned bake_provider_idx = 0; + ch_placement_find_closest(m_chi_bake, name_hash, 1, &bake_provider_idx); + auto bake_ph = m_bake_ph[bake_provider_idx]; + auto target = m_bake_targets[bake_provider_idx]; + bake_region_id_t rid; + ret = bake_create_write_persist(bake_ph, target, data.data(), data.size(), &rid); + if(ret != BAKE_SUCCESS) { + throw Exception("Could not create bake region (bake_create_write_persist error)"); + } + // create Value to put in SDSKV + DataStoreValue value(data.size(), bake_provider_idx, rid); + ret = sdskv_put(sdskv_ph, db_id, entry->raw(), entry->length(), (void*)(&value), sizeof(value)); + if(ret != SDSKV_SUCCESS) { + ret = bake_remove(bake_ph, rid); + if(ret != BAKE_SUCCESS) { + throw Exception("Dude, not only did SDSKV fail to put the key, but I couldn't cleanup BAKE. Is it Friday 13?"); + } + throw Exception("Could not put key/value pair in SDSKV (sdskv_put error)"); + } } return true; } diff --git a/src/private/ValueTypes.hpp b/src/private/ValueTypes.hpp new file mode 100644 index 0000000000000000000000000000000000000000..d19e32fe3080984fe6d35230122f4df101389ec0 --- /dev/null +++ b/src/private/ValueTypes.hpp @@ -0,0 +1,45 @@ +/* + * (C) 2018 The University of Chicago + * + * See COPYRIGHT in top-level directory. + */ +#ifndef __PRIVATE_VALUE_TYPES_H +#define __PRIVATE_VALUE_TYPES_H + +#include +#include +#include +#include + +namespace hepnos { + +class DataStoreValue { + + size_t m_object_size; + uint64_t m_server_id; + bake_region_id_t m_region_id; + + public: + + DataStoreValue() + : m_object_size(0), m_server_id(0) {} + + DataStoreValue(size_t object_size, uint64_t bake_server_id, const bake_region_id_t& region_id) + : m_object_size(object_size), m_server_id(bake_server_id), m_region_id(region_id) {} + + size_t getDataSize() const { + return m_object_size; + } + + const bake_region_id_t& getBakeRegionID() const { + return m_region_id; + } + + const uint64_t& getBakeServerID() const { + return m_server_id; + } +}; + +} + +#endif diff --git a/src/service/ConnectionInfoGenerator.cpp b/src/service/ConnectionInfoGenerator.cpp new file mode 100644 index 0000000000000000000000000000000000000000..f2f801f05d759b9538345f37faa3ff4529a55af9 --- /dev/null +++ b/src/service/ConnectionInfoGenerator.cpp @@ -0,0 +1,81 @@ +#include +#include +#include "ConnectionInfoGenerator.hpp" + +namespace hepnos { + +struct ConnectionInfoGenerator::Impl { + std::string m_addr; // address of this process + uint16_t m_bake_id; // provider ids for BAKE + uint16_t m_sdskv_id; // provider ids for SDSKV +}; + +ConnectionInfoGenerator::ConnectionInfoGenerator( + const std::string& address, + uint16_t sdskv_provider_id, + uint16_t bake_provider_id) +: m_impl(std::make_unique()) { + m_impl->m_addr = address; + m_impl->m_bake_id = bake_provider_id; + m_impl->m_sdskv_id = sdskv_provider_id; +} + +ConnectionInfoGenerator::~ConnectionInfoGenerator() {} + +void ConnectionInfoGenerator::generateFile(MPI_Comm comm, const std::string& filename) const { + int rank, size; + const char* addr = m_impl->m_addr.c_str(); + + MPI_Comm_rank(comm, &rank); + MPI_Comm_size(comm, &size); + + unsigned j=0; + while(addr[j] != '\0' && addr[j] != ':') j++; + std::string proto(addr, j); + + // Exchange addresses + std::vector addresses_buf(128*size); + MPI_Gather(addr, 128, MPI_BYTE, addresses_buf.data(), 128, MPI_BYTE, 0, comm); + + // Exchange bake providers info + std::vector bake_pr_ids_buf(size); + MPI_Gather(&(m_impl->m_bake_id), + 1, MPI_UNSIGNED_SHORT, + bake_pr_ids_buf.data(), + 1, MPI_UNSIGNED_SHORT, + 0, comm); + + // Exchange sdskv providers info + std::vector sdskv_pr_ids_buf(size); + MPI_Gather(&(m_impl->m_sdskv_id), + 1, MPI_UNSIGNED_SHORT, + sdskv_pr_ids_buf.data(), + 1, MPI_UNSIGNED_SHORT, + 0, comm); + + // After this line, the rest is executed only by rank 0 + if(rank != 0) return; + + std::vector addresses; + for(unsigned i=0; i < size; i++) { + addresses.emplace_back(&addresses_buf[128*i]); + } + + YAML::Node config; + config["hepnos"]["client"]["protocol"] = proto; + YAML::Node providers = config["hepnos"]["providers"]; + for(unsigned int i=0; i < size; i++) { + const auto& provider_addr = addresses[i]; + if(sdskv_pr_ids_buf[i]) { + providers["sdskv"][provider_addr] = sdskv_pr_ids_buf[i]; + } + if(bake_pr_ids_buf[i]) { + providers["bake"][provider_addr] = bake_pr_ids_buf[i]; + } + } + + std::ofstream fout(filename); + fout << config; +} + +} diff --git a/src/service/ConnectionInfoGenerator.hpp b/src/service/ConnectionInfoGenerator.hpp new file mode 100644 index 0000000000000000000000000000000000000000..1606de3c42e2738f5aed8e64ccfaea751031908b --- /dev/null +++ b/src/service/ConnectionInfoGenerator.hpp @@ -0,0 +1,33 @@ +#ifndef __HEPNOS_CONNECTION_INFO_GENERATOR_H +#define __HEPNOS_CONNECTION_INFO_GENERATOR_H + +#include +#include +#include + +namespace hepnos { + +class ConnectionInfoGenerator { + +private: + + class Impl; + std::unique_ptr m_impl; + +public: + + ConnectionInfoGenerator(const std::string& address, + uint16_t sdskv_provider_id, + uint16_t bake_provider_id); + ConnectionInfoGenerator(const ConnectionInfoGenerator&) = delete; + ConnectionInfoGenerator(ConnectionInfoGenerator&&) = delete; + ConnectionInfoGenerator& operator=(const ConnectionInfoGenerator&) = delete; + ConnectionInfoGenerator& operator=(ConnectionInfoGenerator&&) = delete; + ~ConnectionInfoGenerator(); + + void generateFile(MPI_Comm comm, const std::string& filename) const; +}; + +} + +#endif diff --git a/src/service/HEPnOSService.cpp b/src/service/HEPnOSService.cpp index b8b20090b01bd8a6b707b4f2841d213d20a7e19f..94c4bc59ecb226871717c1175edb771c5a485965 100644 --- a/src/service/HEPnOSService.cpp +++ b/src/service/HEPnOSService.cpp @@ -12,14 +12,15 @@ #include #include #include -#include +#include "ServiceConfig.hpp" +#include "ConnectionInfoGenerator.hpp" #include "hepnos-service.h" #define ASSERT(__cond, __msg, ...) { if(!(__cond)) { fprintf(stderr, "[%s:%d] " __msg, __FILE__, __LINE__, __VA_ARGS__); exit(-1); } } -static void generate_config_file(MPI_Comm comm, const char* addr, const char* config_file); +//static void generate_connection_file(MPI_Comm comm, const char* addr, const char* filename); -void hepnos_run_service(MPI_Comm comm, const char* listen_addr, const char* config_file) +void hepnos_run_service(MPI_Comm comm, const char* config_file, const char* connection_file) { margo_instance_id mid; int ret; @@ -27,11 +28,24 @@ void hepnos_run_service(MPI_Comm comm, const char* listen_addr, const char* conf MPI_Comm_rank(comm, &rank); + /* load configuration */ + std::unique_ptr config; + try { + config = std::make_unique(config_file, rank); + } catch(const std::exception& e) { + std::cerr << "Error: when reading configuration:" << std::endl; + std::cerr << " " << e.what() << std::endl; + std::cerr << "Aborting." << std::endl; + MPI_Abort(MPI_COMM_WORLD, -1); + return; + } + /* Margo initialization */ - mid = margo_init(listen_addr, MARGO_SERVER_MODE, 0, -1); + mid = margo_init(config->getAddress().c_str(), MARGO_SERVER_MODE, 0, -1); if (mid == MARGO_INSTANCE_NULL) { - fprintf(stderr, "Error: Unable to initialize margo\n"); + std::cerr << "Error: unable to initialize margo" << std::endl; + std::cerr << "Aborting." << std::endl; MPI_Abort(MPI_COMM_WORLD, -1); return; } @@ -44,43 +58,55 @@ void hepnos_run_service(MPI_Comm comm, const char* listen_addr, const char* conf hg_size_t self_addr_str_size = 128; margo_addr_to_string(mid, self_addr_str, &self_addr_str_size, self_addr); - /* Bake provider initialization */ - uint16_t bake_mplex_id = 1; - char bake_target_name[128]; - sprintf(bake_target_name, "/dev/shm/hepnos.%d.dat", rank); - /* create the bake target if it does not exist */ - if(-1 == access(bake_target_name, F_OK)) { - // XXX creating a pool of 10MB - this should come from a config file - ret = bake_makepool(bake_target_name, 10*1024*1024, 0664); - ASSERT(ret == 0, "bake_makepool() failed (ret = %d)\n", ret); + uint16_t bake_provider_id = 0; + if(config->hasStorage()) { + /* Bake provider initialization */ + bake_provider_id = 1; // XXX we can make that come from the config file + const char* bake_target_name = config->getStoragePath().c_str(); + size_t bake_target_size = config->getStorageSize()*(1024*1024); + /* create the bake target if it does not exist */ + if(-1 == access(bake_target_name, F_OK)) { + ret = bake_makepool(bake_target_name, bake_target_size, 0664); + ASSERT(ret == 0, "bake_makepool() failed (ret = %d)\n", ret); + } + bake_provider_t bake_prov; + bake_target_id_t bake_tid; + ret = bake_provider_register(mid, bake_provider_id, BAKE_ABT_POOL_DEFAULT, &bake_prov); + ASSERT(ret == 0, "bake_provider_register() failed (ret = %d)\n", ret); + ret = bake_provider_add_storage_target(bake_prov, bake_target_name, &bake_tid); + ASSERT(ret == 0, "bake_provider_add_storage_target() failed to add target %s (ret = %d)\n", + bake_target_name, ret); + } + + uint8_t sdskv_provider_id = 0; + if(config->hasDatabase()) { + /* SDSKV provider initialization */ + sdskv_provider_id = 1; // XXX we can make that come from the config file + sdskv_provider_t sdskv_prov; + ret = sdskv_provider_register(mid, sdskv_provider_id, SDSKV_ABT_POOL_DEFAULT, &sdskv_prov); + ASSERT(ret == 0, "sdskv_provider_register() failed (ret = %d)\n", ret); + + /* creating the database */ + const char* db_path = config->getDatabasePath().c_str(); + const char* db_name = config->getDatabaseName().c_str(); + sdskv_db_type_t db_type; + if(config->getDatabaseType() == "map") db_type = KVDB_MAP; + if(config->getDatabaseType() == "ldb") db_type = KVDB_LEVELDB; + if(config->getDatabaseType() == "bdb") db_type = KVDB_BERKELEYDB; + sdskv_database_id_t db_id; + ret = sdskv_provider_add_database(sdskv_prov, db_name, db_path, db_type, SDSKV_COMPARE_DEFAULT, &db_id); + ASSERT(ret == 0, "sdskv_provider_add_database() failed (ret = %d)\n", ret); } - bake_provider_t bake_prov; - bake_target_id_t bake_tid; - ret = bake_provider_register(mid, bake_mplex_id, BAKE_ABT_POOL_DEFAULT, &bake_prov); - ASSERT(ret == 0, "bake_provider_register() failed (ret = %d)\n", ret); - ret = bake_provider_add_storage_target(bake_prov, bake_target_name, &bake_tid); - ASSERT(ret == 0, "bake_provider_add_storage_target() failed to add target %s (ret = %d)\n", - bake_target_name, ret); - - /* SDSKV provider initialization */ - uint8_t sdskv_mplex_id = 1; - sdskv_provider_t sdskv_prov; - ret = sdskv_provider_register(mid, sdskv_mplex_id, SDSKV_ABT_POOL_DEFAULT, &sdskv_prov); - ASSERT(ret == 0, "sdskv_provider_register() failed (ret = %d)\n", ret); - - // XXX creating the database - this should come from a config file - sdskv_database_id_t db_id; - ret = sdskv_provider_add_database(sdskv_prov, "hepnosdb", "", KVDB_MAP, SDSKV_COMPARE_DEFAULT, &db_id); - ASSERT(ret == 0, "sdskv_provider_add_database() failed (ret = %d)\n", ret); margo_addr_free(mid, self_addr); - generate_config_file(MPI_COMM_WORLD, self_addr_str, config_file); + hepnos::ConnectionInfoGenerator fileGen(self_addr_str, sdskv_provider_id, bake_provider_id); + fileGen.generateFile(MPI_COMM_WORLD, connection_file); margo_wait_for_finalize(mid); } - -static void generate_config_file(MPI_Comm comm, const char* addr, const char* config_file) +/* +static void generate_connection_file(MPI_Comm comm, const char* addr, const char* filename) { int rank, size; MPI_Comm_rank(comm, &rank); @@ -106,6 +132,7 @@ static void generate_config_file(MPI_Comm comm, const char* addr, const char* co for(auto& s : addresses) config["hepnos"]["providers"]["sdskv"][s] = 1; - std::ofstream fout(config_file); + std::ofstream fout(filename); fout << config; } +*/ diff --git a/src/service/ServiceConfig.cpp b/src/service/ServiceConfig.cpp new file mode 100644 index 0000000000000000000000000000000000000000..e2647d187ad0c059f76408c46c6dec7564f3351c --- /dev/null +++ b/src/service/ServiceConfig.cpp @@ -0,0 +1,136 @@ +#include "ServiceConfig.hpp" +#include "hepnos/Exception.hpp" +#include + +namespace hepnos { + +struct ServiceConfig::Impl { + + std::string m_address; + bool m_hasDatabase; + std::string m_databasePath; + std::string m_databaseName; + std::string m_databaseType; + bool m_hasStorage; + std::string m_storagePath; + size_t m_storageSize; + +}; + +static YAML::Node loadAndValidate(const std::string& filename); +static std::string insertRankIn(const std::string& str, int rank); + +ServiceConfig::ServiceConfig(const std::string& filename, int rank) +: m_impl(std::make_unique()) { + + YAML::Node config = loadAndValidate(filename); + YAML::Node address = config["address"]; + YAML::Node db_node = config["database"]; + YAML::Node storage_node = config["storage"]; + m_impl->m_address = address.as(); + if(!db_node) { + m_impl->m_hasDatabase = false; + } else { + m_impl->m_hasDatabase = true; + m_impl->m_databasePath = insertRankIn(db_node["path"].as(), rank); + m_impl->m_databaseName = db_node["name"].as(); + m_impl->m_databaseType = db_node["type"].as(); + } + if(!storage_node) { + m_impl->m_hasStorage = false; + } else { + m_impl->m_hasStorage = true; + m_impl->m_storagePath = insertRankIn(storage_node["path"].as(), rank); + m_impl->m_storageSize = storage_node["size"].as(); + } +} + +ServiceConfig::~ServiceConfig() {} + +const std::string& ServiceConfig::getAddress() const { + return m_impl->m_address; +} + +bool ServiceConfig::hasDatabase() const { + return m_impl->m_hasDatabase; +} + +const std::string& ServiceConfig::getDatabasePath() const { + return m_impl->m_databasePath; +} + +const std::string& ServiceConfig::getDatabaseName() const { + return m_impl->m_databaseName; +} + +const std::string& ServiceConfig::getDatabaseType() const { + return m_impl->m_databaseType; +} + +bool ServiceConfig::hasStorage() const { + return m_impl->m_hasStorage; +} + +const std::string& ServiceConfig::getStoragePath() const { + return m_impl->m_storagePath; +} + +size_t ServiceConfig::getStorageSize() const { + return m_impl->m_storageSize; +} + +static YAML::Node loadAndValidate(const std::string& filename) { + YAML::Node config = YAML::LoadFile(filename); + if(!config["address"]) { + throw Exception("\"address\" field not found in configuration file."); + } + if(!config["database"]) { + throw Exception("\"database\" field not found in configuration file."); + } + if(!config["database"]["path"]) { + throw Exception("\"database.path\" field not found in configuration file."); + } + if(!config["database"]["name"]) { + throw Exception("\"database.name\" field not found in configuration file."); + } + if(!config["database"]["type"]) { + throw Exception("\"database.type\" field not found in configuration file."); + } + std::string db_type = config["database"]["type"].as(); + if(db_type != "map" + && db_type != "ldb" + && db_type != "bdb") { + throw Exception("\"database.type\" field should be \"map\", \"ldb\", or \"bdb\"."); + } + if(config["storage"]) { + if(!config["storage"]["path"]) { + throw Exception("\"storage.path\" field not found in configuration file."); + } + if(!config["storage"]["size"]) { + throw Exception("\"storage.size\" field not found in configuration file."); + } + } + return config; +} + +static std::string insertRankIn(const std::string& str, int rank) { + size_t index = 0; + std::string result = str; + std::stringstream ssrank; + ssrank << rank; + std::string srank = ssrank.str(); + while (true) { + index = result.find("$RANK", index); + if (index == std::string::npos) break; + if(rank >= 0) { + result.replace(index, 5, srank.c_str()); + } else { + result.replace(index, 5, ""); + } + index += 5; + } + return result; +} + +} + diff --git a/src/service/ServiceConfig.hpp b/src/service/ServiceConfig.hpp new file mode 100644 index 0000000000000000000000000000000000000000..b9a45b5e39eb8fb36b97e4b6a3367c0bdfe4ea85 --- /dev/null +++ b/src/service/ServiceConfig.hpp @@ -0,0 +1,38 @@ +#ifndef __HEPNOS_SERVICE_CONFIG_H +#define __HEPNOS_SERVICE_CONFIG_H + +#include +#include +#include + +namespace hepnos { + +class ServiceConfig { + +private: + class Impl; + std::unique_ptr m_impl; + +public: + + ServiceConfig(const std::string& filename, int rank=-1); + + ServiceConfig(const ServiceConfig&) = delete; + ServiceConfig(ServiceConfig&&) = delete; + ServiceConfig& operator=(const ServiceConfig&) = delete; + ServiceConfig& operator=(ServiceConfig&&) = delete; + ~ServiceConfig(); + + const std::string& getAddress() const; + bool hasDatabase() const; + const std::string& getDatabasePath() const; + const std::string& getDatabaseName() const; + const std::string& getDatabaseType() const; + bool hasStorage() const; + const std::string& getStoragePath() const; + size_t getStorageSize() const; +}; + +} + +#endif diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 36273f2f26f3cd9a8e7c170bfe0ed31d8eff6a5e..7d226284eb9596432ceda17bdca10d12c8f4fb19 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -1,6 +1,7 @@ # Move the configuration file used for the configuration test configure_file(${CMAKE_CURRENT_SOURCE_DIR}/run-test.sh ${CMAKE_CURRENT_BINARY_DIR}/run-test.sh COPYONLY) configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test-util.sh ${CMAKE_CURRENT_BINARY_DIR}/test-util.sh COPYONLY) +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/config.yaml ${CMAKE_CURRENT_BINARY_DIR}/config.yaml COPYONLY) add_executable(example example.cpp) target_link_libraries(example hepnos ${Boost_LIBRARIES}) diff --git a/test/config.yaml b/test/config.yaml new file mode 100644 index 0000000000000000000000000000000000000000..62c90ad8ed10ac6a9e7a8e97d1355e9dfe9829c4 --- /dev/null +++ b/test/config.yaml @@ -0,0 +1,9 @@ +--- +address: tcp:// +database: + name: hepnosdb + path: XXX/$RANK + type: bdb +storage: + path: /dev/shm/hepnos.$RANK.dat + size: 50 diff --git a/test/run-test.sh b/test/run-test.sh index d46c635371cd88615c4b98ef35391af47f80bbd4..e4c33856fffe6378211a0cffacd3e968ad2a9559 100755 --- a/test/run-test.sh +++ b/test/run-test.sh @@ -8,14 +8,17 @@ fi source test-util.sh TEST_DIR=`$MKTEMP -d /tmp/hepnos-XXXXXX` -CFG_FILE=$TEST_DIR/config.yml +CON_FILE=$TEST_DIR/connection.yaml +cp config.yaml $TEST_DIR/config.yaml +CFG_FILE=$TEST_DIR/config.yaml +sed -i -e "s|XXX|${TEST_DIR}/database|g" $CFG_FILE -hepnos_test_start_servers 2 1 20 $CFG_FILE +hepnos_test_start_servers 2 1 20 $CFG_FILE $CON_FILE -export HEPNOS_CONFIG_FILE=$CFG_FILE +export HEPNOS_CONFIG_FILE=$CON_FILE # run a connect test client -run_to 10 $1 $CFG_FILE +run_to 10 $1 $CON_FILE if [ $? -ne 0 ]; then wait exit 1 @@ -27,5 +30,5 @@ wait # cleanup rm -rf $TEST_DIR - +rm -rf /dev/shm/hepnos.dat exit 0 diff --git a/test/test-util.sh b/test/test-util.sh index 71f91791c168158a3d80689558777c5a8db42a9e..47a89ac854575ac8e24cd19aea897e34e2c165dd 100755 --- a/test/test-util.sh +++ b/test/test-util.sh @@ -19,11 +19,11 @@ function hepnos_test_start_servers() nservers=${1:-4} startwait=${2:-15} maxtime=${3:-120} - cfile=${4:-testconfig.yml} - + config=${4:-config.yaml} + cfile=${5:-connection.yaml} rm -rf ${cfile} - run_to $maxtime mpirun -np $nservers ../bin/hepnos-daemon tcp:// $cfile & + run_to $maxtime mpirun -np $nservers ../bin/hepnos-daemon $config $cfile & if [ $? -ne 0 ]; then # TODO: this doesn't actually work; can't check return code of # something executing in background. We have to rely on the