Commit 2328f3b5 authored by Matthieu Dorier's avatar Matthieu Dorier

removed dependency on Bake

parent d37ddf02
......@@ -40,9 +40,7 @@ find_package (Boost REQUIRED COMPONENTS serialization)
include_directories(${Boost_INCLUDE_DIRS})
xpkg_import_module (margo REQUIRED margo)
xpkg_import_module (sdskv-client REQUIRED sdskv-client)
xpkg_import_module (bake-client REQUIRED bake-client)
xpkg_import_module (sdskv-server REQUIRED sdskv-server)
xpkg_import_module (bake-server REQUIRED bake-server)
xpkg_import_module (ch-placement REQUIRED ch-placement)
find_package (yaml-cpp REQUIRED)
......
add_executable(hepnos-daemon hepnos-daemon.cpp)
target_link_libraries(hepnos-daemon hepnos-service yaml-cpp margo bake-server sdskv-server)
target_link_libraries(hepnos-daemon hepnos-service yaml-cpp margo sdskv-server)
add_executable(hepnos-shutdown hepnos-shutdown.cpp)
target_link_libraries(hepnos-shutdown hepnos yaml-cpp margo)
......
......@@ -9,14 +9,8 @@
#include <vector>
#include <unistd.h>
#include <mpi.h>
#include <margo.h>
#include <bake-server.h>
#include <sdskv-server.h>
#include <yaml-cpp/yaml.h>
#include "hepnos-service.h"
#define ASSERT(__cond, __msg, ...) { if(!(__cond)) { fprintf(stderr, "[%s:%d] " __msg, __FILE__, __LINE__, __VA_ARGS__); exit(-1); } }
void usage(void)
{
fprintf(stderr, "Usage: hepnos-daemon <config-file> <connection-file>\n");
......
......@@ -26,7 +26,7 @@ set (hepnos-vers "${HEPNOS_VERSION_MAJOR}.${HEPNOS_VERSION_MINOR}")
set (HEPNOS_VERSION "${hepnos-vers}.${HEPNOS_VERSION_PATCH}")
add_library(hepnos ${hepnos-src})
target_link_libraries (hepnos mercury margo yaml-cpp sdskv-client bake-client ch-placement)
target_link_libraries (hepnos mercury margo yaml-cpp sdskv-client ch-placement)
target_include_directories (hepnos PUBLIC $<INSTALL_INTERFACE:include>)
# local include's BEFORE, in case old incompatable .h files in prefix/include
......@@ -40,7 +40,7 @@ set_target_properties (hepnos
SOVERSION ${HEPNOS_VERSION_MAJOR})
add_library(hepnos-service ${hepnos-service-src})
target_link_libraries (hepnos mercury margo yaml-cpp sdskv-client sdskv-server bake-client bake-server ch-placement)
target_link_libraries (hepnos mercury margo yaml-cpp sdskv-client sdskv-server ch-placement)
target_include_directories (hepnos-service PUBLIC $<INSTALL_INTERFACE:include>)
# local include's BEFORE, in case old incompatable .h files in prefix/include
......
......@@ -8,7 +8,6 @@
#include <iostream>
#include <yaml-cpp/yaml.h>
#include <sdskv-client.h>
#include <bake-client.h>
#include <ch-placement.h>
#include "hepnos/Exception.hpp"
#include "hepnos/DataStore.hpp"
......
......@@ -13,10 +13,8 @@
#include <iostream>
#include <yaml-cpp/yaml.h>
#include <sdskv-client.h>
#include <bake-client.h>
#include <ch-placement.h>
#include "KeyTypes.hpp"
#include "ValueTypes.hpp"
#include "hepnos/Exception.hpp"
#include "hepnos/DataStore.hpp"
#include "hepnos/DataSet.hpp"
......@@ -35,27 +33,17 @@ class DataStore::Impl {
sdskv_database_id_t m_sdskv_db;
};
struct storage {
bake_provider_handle_t m_bake_ph;
bake_target_id_t m_bake_target;
};
margo_instance_id m_mid; // Margo instance
std::unordered_map<std::string,hg_addr_t> m_addrs; // Addresses used by the service
sdskv_client_t m_sdskv_client; // SDSKV client
bake_client_t m_bake_client; // BAKE client
std::vector<database> m_databases; // list of SDSKV databases
struct ch_placement_instance* m_chi_sdskv; // ch-placement instance for SDSKV
std::vector<storage> m_storage; // list of BAKE storage targets
struct ch_placement_instance* m_chi_bake; // ch-placement instance for BAKE
const DataStore::iterator m_end; // iterator for the end() of the DataStore
Impl(DataStore* parent)
: m_mid(MARGO_INSTANCE_NULL)
, m_sdskv_client(SDSKV_CLIENT_NULL)
, m_chi_sdskv(nullptr)
, m_bake_client(BAKE_CLIENT_NULL)
, m_chi_bake(nullptr)
, m_end() {}
void init(const std::string& configFile) {
......@@ -77,12 +65,6 @@ class DataStore::Impl {
cleanup();
throw Exception("Could not create SDSKV client");
}
// initialize BAKE client
ret = bake_client_init(m_mid, &m_bake_client);
if(ret != 0) {
cleanup();
throw Exception("Could not create BAKE client");
}
// create list of sdskv provider handles
YAML::Node sdskv = config["hepnos"]["providers"]["sdskv"];
for(YAML::const_iterator it = sdskv.begin(); it != sdskv.end(); it++) {
......@@ -143,74 +125,15 @@ class DataStore::Impl {
}
// initialize ch-placement for the SDSKV providers
m_chi_sdskv = ch_placement_initialize("hash_lookup3", m_databases.size(), 4, 0);
// get list of bake provider handles
YAML::Node bake = config["hepnos"]["providers"]["bake"];
if(bake) {
for(YAML::const_iterator it = bake.begin(); it != bake.end(); it++) {
// get the address of a bake provider
std::string str_addr = it->first.as<std::string>();
hg_addr_t addr;
if(m_addrs.count(str_addr) != 0) {
addr = m_addrs[str_addr];
} else {
// lookup the address
hret = margo_addr_lookup(m_mid, str_addr.c_str(), &addr);
if(hret != HG_SUCCESS) {
margo_addr_free(m_mid, addr);
cleanup();
throw Exception("margo_addr_lookup failed");
}
m_addrs[str_addr] = addr;
}
uint16_t num_providers = it->second.as<uint16_t>();
for(uint16_t provider_id = 0; provider_id < num_providers; provider_id++) {
bake_provider_handle_t ph;
ret = bake_provider_handle_create(m_bake_client, addr, provider_id, &ph);
if(ret != 0) {
cleanup();
throw Exception("bake_provider_handle_create failed");
}
uint64_t num_targets;
std::vector<bake_target_id_t> targets(256);
ret = bake_probe(ph, 256, targets.data(), &num_targets);
if(ret != 0) {
bake_provider_handle_release(ph);
cleanup();
throw Exception("bake_probe failed");
}
targets.resize(num_targets);
for(const auto& id : targets) {
storage tgt;
bake_provider_handle_ref_incr(ph);
tgt.m_bake_ph = ph;
tgt.m_bake_target = id;
m_storage.push_back(tgt);
}
bake_provider_handle_release(ph);
}
} // for loop
// find out the bake targets at each bake provider
}
// initialize ch-placement for the bake providers
if(m_storage.size()) {
m_chi_bake = ch_placement_initialize("hash_lookup3", m_storage.size(), 4, 0);
}
}
void cleanup() {
for(const auto& db : m_databases) {
sdskv_provider_handle_release(db.m_sdskv_ph);
}
for(const auto& tgt : m_storage) {
bake_provider_handle_release(tgt.m_bake_ph);
}
sdskv_client_finalize(m_sdskv_client);
bake_client_finalize(m_bake_client);
if(m_chi_sdskv)
ch_placement_finalize(m_chi_sdskv);
if(m_chi_bake)
ch_placement_finalize(m_chi_bake);
for(auto& addr : m_addrs) {
margo_addr_free(m_mid, addr.second);
}
......@@ -256,17 +179,6 @@ class DataStore::Impl {
throw Exception("Invalid value type for provider in \"sdskv\" section");
}
}
// bake providers are not mandatory. If they are not present,
// objects will be stored in sdskv providers.
auto bakeNode = providersNode["bake"];
if(!bakeNode) return;
if(bakeNode.size() == 0) return;
for(auto it = bakeNode.begin(); it != bakeNode.end(); it++) {
if(it->second.IsScalar()) continue; // one provider id given
else {
throw Exception("Invalid value type for provider in \"bake\" section");
}
}
}
public:
......@@ -295,55 +207,20 @@ class DataStore::Impl {
auto sdskv_ph = db.m_sdskv_ph;
auto db_id = db.m_sdskv_db;
// read the value
if(level != 0 || m_storage.empty()) { // read directly from sdskv
// find the size of the value, as a way to check if the key exists
hg_size_t vsize;
ret = sdskv_length(sdskv_ph, db_id, entry->raw(), entry->length(), &vsize);
if(ret == SDSKV_ERR_UNKNOWN_KEY) {
return false;
}
if(ret != SDSKV_SUCCESS) {
throw Exception("Error occured when calling sdskv_length");
}
data.resize(vsize);
ret = sdskv_get(sdskv_ph, db_id, entry->raw(), entry->length(), data.data(), &vsize);
if(ret != SDSKV_SUCCESS) {
throw Exception("Error occured when calling sdskv_get");
}
} else { // read from BAKE
// find the size of the value, as a way to check if the key exists
hg_size_t vsize;
ret = sdskv_length(sdskv_ph, db_id, entry->raw(), entry->length(), &vsize);
if(ret == SDSKV_ERR_UNKNOWN_KEY) {
return false;
}
if(ret != SDSKV_SUCCESS) {
throw Exception("Error occured when calling sdskv_length");
}
// first get the key/val from sdskv
DataStoreValue rid_info;
hg_size_t vsize = sizeof(rid_info);
ret = sdskv_get(sdskv_ph, db_id, entry->raw(), entry->length(), (void*)(&rid_info), &vsize);
if(ret == SDSKV_ERR_UNKNOWN_KEY) {
return false;
}
if(ret != SDSKV_SUCCESS) {
throw Exception("Error occured when calling sdskv_get");
}
if(vsize != sizeof(rid_info)) {
throw Exception("Call to sdskv_get returned a value of unexpected size");
}
// now read the data from bake
data.resize(rid_info.getDataSize());
if(data.size() == 0) return true;
long unsigned bake_provider_idx = 0;
ch_placement_find_closest(m_chi_bake, name_hash, 1, &bake_provider_idx);
auto& bake_info = m_storage[bake_provider_idx];
auto bake_ph = bake_info.m_bake_ph;
auto target = bake_info.m_bake_target;
uint64_t bytes_read = 0;
ret = bake_read(bake_ph, rid_info.getBakeRegionID(), 0, data.data(), data.size(), &bytes_read);
if(ret != BAKE_SUCCESS) {
throw Exception("Couldn't read region from BAKE");
}
if(bytes_read != rid_info.getDataSize()) {
throw Exception("Bytes read from BAKE did not match expected object size");
}
data.resize(vsize);
ret = sdskv_get(sdskv_ph, db_id, entry->raw(), entry->length(), data.data(), &vsize);
if(ret != SDSKV_SUCCESS) {
throw Exception("Error occured when calling sdskv_get");
}
return true;
}
......@@ -380,32 +257,9 @@ class DataStore::Impl {
throw Exception("Could not check if key exists in SDSKV (sdskv_length error)");
}
// if it's not a last-level data entry (data product), store in sdskeyval
if(level != 0 || m_storage.empty()) {
ret = sdskv_put(sdskv_ph, db_id, entry->raw(), entry->length(), data.data(), data.size());
if(ret != SDSKV_SUCCESS) {
throw Exception("Could not put key/value pair in SDSKV (sdskv_put error)");
}
} else { // store data in bake
long unsigned bake_provider_idx = 0;
ch_placement_find_closest(m_chi_bake, name_hash, 1, &bake_provider_idx);
const auto& bake_info = m_storage[bake_provider_idx];
auto bake_ph = bake_info.m_bake_ph;
auto target = bake_info.m_bake_target;
bake_region_id_t rid;
ret = bake_create_write_persist(bake_ph, target, data.data(), data.size(), &rid);
if(ret != BAKE_SUCCESS) {
throw Exception("Could not create bake region (bake_create_write_persist error)");
}
// create Value to put in SDSKV
DataStoreValue value(data.size(), bake_provider_idx, rid);
ret = sdskv_put(sdskv_ph, db_id, entry->raw(), entry->length(), (void*)(&value), sizeof(value));
if(ret != SDSKV_SUCCESS) {
ret = bake_remove(bake_ph, rid);
if(ret != BAKE_SUCCESS) {
throw Exception("Dude, not only did SDSKV fail to put the key, but I couldn't cleanup BAKE. Is it Friday 13?");
}
throw Exception("Could not put key/value pair in SDSKV (sdskv_put error)");
}
ret = sdskv_put(sdskv_ph, db_id, entry->raw(), entry->length(), data.data(), data.size());
if(ret != SDSKV_SUCCESS) {
throw Exception("Could not put key/value pair in SDSKV (sdskv_put error)");
}
return product_id;
}
......
/*
* (C) 2018 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __PRIVATE_VALUE_TYPES_H
#define __PRIVATE_VALUE_TYPES_H
#include <cstring>
#include <cstdlib>
#include <cstdint>
#include <memory>
namespace hepnos {
class DataStoreValue {
size_t m_object_size;
uint64_t m_server_id;
bake_region_id_t m_region_id;
public:
DataStoreValue()
: m_object_size(0), m_server_id(0) {}
DataStoreValue(size_t object_size, uint64_t bake_server_id, const bake_region_id_t& region_id)
: m_object_size(object_size), m_server_id(bake_server_id), m_region_id(region_id) {}
size_t getDataSize() const {
return m_object_size;
}
const bake_region_id_t& getBakeRegionID() const {
return m_region_id;
}
const uint64_t& getBakeServerID() const {
return m_server_id;
}
};
}
#endif
......@@ -11,7 +11,6 @@
#include <unistd.h>
#include <mpi.h>
#include <margo.h>
#include <bake-server.h>
#include <sdskv-server.h>
#include "ServiceConfig.hpp"
#include "ConnectionInfoGenerator.hpp"
......@@ -57,28 +56,6 @@ void hepnos_run_service(MPI_Comm comm, const char* config_file, const char* conn
hg_size_t self_addr_str_size = 128;
margo_addr_to_string(mid, self_addr_str, &self_addr_str_size, self_addr);
if(config->hasStorage()) {
for(auto bake_provider_id = 0; bake_provider_id < config->getNumStorageProviders(); bake_provider_id++) {
/* create provider */
bake_provider_t bake_prov;
ret = bake_provider_register(mid, bake_provider_id, BAKE_ABT_POOL_DEFAULT, &bake_prov);
ASSERT(ret == 0, "bake_provider_register() failed (ret = %d)\n", ret);
/* create databases */
for(unsigned i=0; i < config->getNumStorageTargets(); i++) {
auto bake_target_name = config->getStoragePath(rank, bake_provider_id, i);
size_t bake_target_size = config->getStorageSize()*(1024*1024);
if(-1 == access(bake_target_name.c_str(), F_OK)) {
ret = bake_makepool(bake_target_name.c_str(), bake_target_size, 0664);
ASSERT(ret == 0, "bake_makepool() failed (ret = %d)\n", ret);
}
bake_target_id_t bake_tid;
ret = bake_provider_add_storage_target(bake_prov, bake_target_name.c_str(), &bake_tid);
ASSERT(ret == 0, "bake_provider_add_storage_target() failed to add target %s (ret = %d)\n",
bake_target_name.c_str(), ret);
}
}
}
if(config->hasDatabase()) {
/* SDSKV provider initialization */
for(auto sdskv_provider_id = 0; sdskv_provider_id < config->getNumDatabaseProviders(); sdskv_provider_id++) {
......
---
address: tcp://
address: na+sm://
threads: 4
database:
name: hepnosdb.$RANK.$PROVIDER.$TARGET
......@@ -7,8 +7,3 @@ database:
type: bdb
providers: 2
targets: 2
storage:
path: /dev/shm/hepnos.$RANK.$PROVIDER.$TARGET.dat
size: 50
providers: 2
targets: 2
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment