Commit c9b9a2f0 authored by Matthieu Dorier's avatar Matthieu Dorier
Browse files

switch client to using Thallium instead of Margo

parent 61ff41b3
......@@ -55,7 +55,7 @@ void DataStore::shutdown() {
throw Exception("Calling DataStore member function on an invalid DataStore object");
}
for(auto addr : m_impl->m_addrs) {
margo_shutdown_remote_instance(m_impl->m_mid, addr.second);
m_impl->m_engine.shutdown_remote_engine(addr.second);
}
}
......
......@@ -11,6 +11,7 @@
#include <unordered_map>
#include <functional>
#include <iostream>
#include <thallium.hpp>
#include <yaml-cpp/yaml.h>
#include <sdskv-client.hpp>
#include <ch-placement.h>
......@@ -49,20 +50,23 @@ struct DistributedDBInfo {
struct ch_placement_instance* chi = nullptr;
};
using namespace std::string_literals;
namespace tl = thallium;
class DataStoreImpl {
public:
margo_instance_id m_mid; // Margo instance
std::unordered_map<std::string,hg_addr_t> m_addrs; // Addresses used by the service
sdskv::client m_sdskv_client; // SDSKV client
DistributedDBInfo m_dataset_dbs; // list of SDSKV databases for DataSets
DistributedDBInfo m_run_dbs; // list of SDSKV databases for Runs
DistributedDBInfo m_subrun_dbs; // list of SDSKV databases for Runs
DistributedDBInfo m_event_dbs; // list of SDSKV databases for Runs
DistributedDBInfo m_product_dbs; // list of SDSKV databases for Products
tl::engine m_engine; // Thallium engine
bool m_engine_initialized = false;
std::unordered_map<std::string,tl::endpoint> m_addrs; // Addresses used by the service
sdskv::client m_sdskv_client; // SDSKV client
DistributedDBInfo m_dataset_dbs; // list of SDSKV databases for DataSets
DistributedDBInfo m_run_dbs; // list of SDSKV databases for Runs
DistributedDBInfo m_subrun_dbs; // list of SDSKV databases for Runs
DistributedDBInfo m_event_dbs; // list of SDSKV databases for Runs
DistributedDBInfo m_product_dbs; // list of SDSKV databases for Products
DataStoreImpl()
: m_mid(MARGO_INSTANCE_NULL)
{}
~DataStoreImpl() {
......@@ -76,24 +80,28 @@ class DataStoreImpl {
std::string str_addr = address_it->first.as<std::string>();
YAML::Node providers = address_it->second;
// lookup the address
hg_addr_t addr;
tl::endpoint addr;
if(m_addrs.count(str_addr) != 0) {
addr = m_addrs[str_addr];
} else {
hret = margo_addr_lookup(m_mid, str_addr.c_str(), &addr);
if(hret != HG_SUCCESS) {
margo_addr_free(m_mid,addr);
cleanup();
throw Exception("margo_addr_lookup failed (MARGO error="+std::to_string(hret)+")");
try {
addr = m_engine.lookup(str_addr);
m_addrs[str_addr] = addr;
} catch(const std::exception& ex) {
throw Exception("Address lookup failed: "s + ex.what());
}
m_addrs[str_addr] = addr;
}
// iterate over providers for this address
for(YAML::const_iterator provider_it = providers.begin(); provider_it != providers.end(); provider_it++) {
// get the provider id
uint16_t provider_id = provider_it->first.as<uint16_t>();
// create provider handle
sdskv::provider_handle ph(m_sdskv_client, addr, provider_id);
sdskv::provider_handle ph;
try {
ph = sdskv::provider_handle(m_sdskv_client, addr.get_addr(), provider_id);
} catch(const std::exception& ex) {
throw Exception("Could not create SDSKV provider handle: "s + ex.what());
}
// get the database ids
YAML::Node databases = provider_it->second;
// iterate over databases for this provider
......@@ -120,14 +128,16 @@ class DataStoreImpl {
memset(&hg_opt, 0, sizeof(hg_opt));
if(busySpin)
hg_opt.na_init_info.progress_mode = NA_NO_BLOCK;
m_mid = margo_init_opt(proto.c_str(), MARGO_CLIENT_MODE, &hg_opt, use_progress_thread, 0);
if(!m_mid) {
try {
m_engine = tl::engine(proto, THALLIUM_SERVER_MODE, use_progress_thread, -1, &hg_opt);
m_engine_initialized = true;
} catch(const std::exception& ex) {
cleanup();
throw Exception("Could not initialized Margo");
throw Exception("Could not initialized Thallium: "s + ex.what());
}
// initialize SDSKV client
try {
m_sdskv_client = sdskv::client(m_mid);
m_sdskv_client = sdskv::client(m_engine.get_margo_instance());
} catch(sdskv::exception& ex) {
cleanup();
throw Exception("Could not create SDSKV client (SDSKV error="+std::to_string(ex.error())+")");
......@@ -157,11 +167,10 @@ class DataStoreImpl {
if(m_run_dbs.chi) ch_placement_finalize(m_run_dbs.chi);
if(m_subrun_dbs.chi) ch_placement_finalize(m_subrun_dbs.chi);
if(m_event_dbs.chi) ch_placement_finalize(m_event_dbs.chi);
if(m_product_dbs.chi) ch_placement_finalize(m_product_dbs.chi);
for(auto& addr : m_addrs) {
margo_addr_free(m_mid, addr.second);
}
if(m_mid) margo_finalize(m_mid);
if(m_product_dbs.chi) ch_placement_finalize(m_product_dbs.chi);
m_addrs.clear();
if(m_engine_initialized) m_engine.finalize();
m_engine_initialized = false;
}
size_t numTargets(const ItemType& type) const {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment