Commit 8d8a6d2e authored by Andrew Gaspar's avatar Andrew Gaspar
Browse files

Connect to sdskv database

parent 35a6b6ef
......@@ -13,7 +13,10 @@ add_library(
tausdskvplugin
SHARED
src/lib.cpp
src/margo.cpp
src/mercury.cpp
src/plugin.cpp
src/sdskv_client.cpp
)
target_include_directories(
......
include(FindPackageHandleStandardArgs)
find_library(argobots_LIBRARY abt)
find_path(argobots_INCLUDE_DIR abt.h)
find_package_handle_standard_args(
argobots
FOUND_VAR argobots_FOUND
REQUIRED_VARS
argobots_LIBRARY
argobots_INCLUDE_DIR)
if (argobots_FOUND)
add_library(argobots::argobots UNKNOWN IMPORTED)
set_target_properties(
argobots::argobots
PROPERTIES
IMPORTED_LOCATION "${argobots_LIBRARY}"
IMPORTED_LINK_INTERFACE_LANGUAGES CXX
INTERFACE_INCLUDE_DIRECTORIES "${argobots_INCLUDE_DIR}"
INTERFACE_COMPILE_FEATURES cxx_std_14)
endif()
\ No newline at end of file
include(FindPackageHandleStandardArgs)
find_package(mercury REQUIRED)
find_package(argobots REQUIRED)
find_library(margo_LIBRARY margo)
find_path(margo_INCLUDE_DIR margo.h)
find_package_handle_standard_args(
margo
FOUND_VAR margo_FOUND
REQUIRED_VARS
margo_LIBRARY
margo_INCLUDE_DIR)
if (margo_FOUND)
add_library(margo::margo UNKNOWN IMPORTED)
set_target_properties(
margo::margo
PROPERTIES
IMPORTED_LOCATION "${margo_LIBRARY}"
IMPORTED_LINK_INTERFACE_LANGUAGES CXX
INTERFACE_INCLUDE_DIRECTORIES "${margo_INCLUDE_DIR}"
INTERFACE_LINK_LIBRARIES "mercury;argobots::argobots"
INTERFACE_COMPILE_FEATURES cxx_std_14)
endif()
\ No newline at end of file
include(FindPackageHandleStandardArgs)
find_package(margo REQUIRED)
find_library(sdskeyval_client_LIBRARY sdskv-client)
find_library(sdskeyval_server_LIBRARY sdskv-server)
find_path(sdskeyval_INCLUDE_DIR sdskv-client.h)
......@@ -20,6 +22,7 @@ if (sdskeyval_FOUND)
IMPORTED_LOCATION "${sdskeyval_client_LIBRARY}"
IMPORTED_LINK_INTERFACE_LANGUAGES CXX
INTERFACE_INCLUDE_DIRECTORIES "${sdskeyval_INCLUDE_DIR}"
INTERFACE_LINK_LIBRARIES "margo::margo"
INTERFACE_COMPILE_FEATURES cxx_std_14)
add_library(sdskeyval::server UNKNOWN IMPORTED)
......@@ -29,5 +32,6 @@ if (sdskeyval_FOUND)
IMPORTED_LOCATION "${sdskeyval_server_LIBRARY}"
IMPORTED_LINK_INTERFACE_LANGUAGES CXX
INTERFACE_INCLUDE_DIRECTORIES "${sdskeyval_INCLUDE_DIR}"
INTERFACE_LINK_LIBRARIES "margo::margo"
INTERFACE_COMPILE_FEATURES cxx_std_14)
endif()
\ No newline at end of file
......@@ -24,7 +24,7 @@ target_compile_definitions(
target_compile_features(
tautest
PRIVATE
cxx_std_14
cxx_std_17
)
find_program(TAUEXEC tau_exec)
......
// STL Includes
#include <atomic>
#include <iostream>
#include <numeric>
#include <vector>
......@@ -18,8 +19,6 @@ void recurse(int n) {
int main(int argc, char **argv) {
Tau_init(argc, argv);
TAU_REGISTER_THREAD();
#pragma omp parallel
{ TAU_REGISTER_THREAD(); }
......@@ -46,7 +45,6 @@ int main(int argc, char **argv) {
std::cout << "Hi from thread " << omp_get_thread_num() << "!" << std::endl;
}
Tau_dump();
#pragma omp parallel
Tau_dump();
......
#ifndef TAUSDSKVPLUGIN_CONTROL_H_
#define TAUSDSKVPLUGIN_CONTROL_H_
#ifndef TAUSDSKV_CONTROL_H_
#define TAUSDSKV_CONTROL_H_
#ifdef __cplusplus
extern "C" {
......@@ -11,4 +11,4 @@ int tausdskv_set_dump_name(char const *dump_name);
}
#endif // __cplusplus
#endif // TAUSDSKVPLUGIN_CONTROL_H_
\ No newline at end of file
#endif // TAUSDSKV_CONTROL_H_
\ No newline at end of file
/**
* @file margo.hpp
* @author Andrew Gaspar (you@domain.com)
* @brief C++ interface to margo
* @date 2019-05-24
*
* @copyright Copyright (c) 2019 Triad National Security, LLC
*
*/
#ifndef TAUSDSKV_MARGO_HPP_
#define TAUSDSKV_MARGO_HPP_
// Third Party Includes
#include <margo.h>
// STL Includes
#include <exception>
#include <string_view>
namespace tausdskv {
class MargoInstance;
enum class MargoMode { Client = MARGO_CLIENT_MODE, Server = MARGO_SERVER_MODE };
class MargoInitException : public std::exception {
public:
char const *what() const noexcept override;
private:
};
class MargoAddress {
public:
explicit MargoAddress(MargoInstance const &mid, hg_addr_t addr);
MargoAddress(MargoAddress const &other);
MargoAddress &operator=(MargoAddress const &other);
MargoAddress(MargoAddress &&other);
MargoAddress &operator=(MargoAddress &&other);
~MargoAddress();
void Reset();
hg_addr_t Address() const { return addr_; }
private:
MargoInstance const *mid_ = nullptr;
hg_addr_t addr_ = HG_ADDR_NULL;
};
class MargoInstance {
public:
MargoInstance(std::string_view addr_str,
MargoMode mode,
bool use_progress_thread,
int rpc_thread_count);
MargoInstance(MargoInstance const &) = delete;
MargoInstance &operator=(MargoInstance const &) = delete;
MargoInstance(MargoInstance &&other);
MargoInstance &operator=(MargoInstance &&other);
~MargoInstance();
void Reset();
margo_instance_id InstanceId() const { return mid_; }
MargoAddress Lookup(std::string_view name);
private:
margo_instance_id mid_ = nullptr;
};
} // namespace tausdskv
#endif // TAUSDSKV_MARGO_HPP_
\ No newline at end of file
/**
* @file margo.hpp
* @author Andrew Gaspar (you@domain.com)
* @brief C++ interface to mercury
* @date 2019-05-24
*
* @copyright Copyright (c) 2019 Triad National Security, LLC
*
*/
#ifndef TAUSDSKV_MERCURY_HPP_
#define TAUSDSKV_MERCURY_HPP_
// Third Party Includes
#include <mercury.h>
// STL Includes
#include <exception>
#include <string>
namespace tausdskv {
class MercuryException {
public:
explicit MercuryException(hg_return_t status);
private:
std::string what_;
hg_return_t status_;
};
inline void MercuryCheck(hg_return_t status) {
if (status != HG_SUCCESS) {
throw MercuryException(status);
}
}
} // namespace tausdskv
#endif // TAUSDSKV_MERCURY_HPP_
\ No newline at end of file
......@@ -8,25 +8,36 @@
*
*/
#ifndef TAUSDSKVPLUGIN_PLUGIN_HPP_
#define TAUSDSKVPLUGIN_PLUGIN_HPP_
#ifndef TAUSDSKV_PLUGIN_HPP_
#define TAUSDSKV_PLUGIN_HPP_
// STL Includes
#include <map>
#include <memory>
#include <optional>
#include <shared_mutex>
#include <string_view>
#include <vector>
// Third Party Includes
#include <Profile/TauPlugin.h>
// Internal Includes
#include <margo.hpp>
#include <sdskv_client.hpp>
enum class PluginStatus { Success = 0, Fail = 1 };
namespace tausdskv {
class Plugin {
public:
static void InitializeInstance(int argc, char **argv);
static Plugin *GetInstance();
static int GlobalEndOfExecution(Tau_plugin_event_end_of_execution_data_t *data);
explicit Plugin(std::string_view addr_str,
std::string_view server_address_str,
std::string_view db_name);
template <typename EventData>
using Callback = int (*)(EventData *);
......@@ -34,7 +45,13 @@ class Plugin {
template <typename EventData, PluginStatus (Plugin::*MemberFunction)(EventData const &)>
static Callback<EventData> Bind() {
return [](EventData *data) -> int {
return static_cast<int>((Plugin::GetInstance()->*MemberFunction)(*data));
auto instance = Plugin::GetInstance();
if (instance) {
return static_cast<int>((instance->*MemberFunction)(*data));
} else {
return 1;
}
};
}
......@@ -82,8 +99,16 @@ class Plugin {
* references are not invalidated.
*/
std::vector<std::unique_ptr<ThreadState>> thread_state_;
// mid_ must come before client_
MargoInstance mid_;
SdskvClient client_;
MargoAddress server_address_;
SdskvProviderHandle kvph_;
sdskv_database_id_t db_id_;
};
} // namespace tausdskv
#endif // TAUSDSKVPLUGIN_PLUGIN_HPP_
\ No newline at end of file
#endif // TAUSDSKV_PLUGIN_HPP_
\ No newline at end of file
/**
* @file sdskeyval_client.hpp
* @author Andrew Gaspar (you@domain.com)
* @brief C++ interface to sdskv
* @date 2019-05-24
*
* @copyright Copyright (c) 2019 Triad National Security, LLC
*/
#ifndef TAUSDSKV_SDSKV_CLIENT_HPP_
#define TAUSDSKV_SDSKV_CLIENT_HPP_
// STL Includes
#include <exception>
#include <string>
// Third Party Includes
#include <sdskv-client.h>
// Internal Includes
#include <margo.hpp>
namespace tausdskv {
class SdskvException : public std::exception {
public:
explicit SdskvException(int status);
int Status() const { return status_; }
const char *what() const noexcept override;
private:
std::string what_;
int status_;
};
inline void SdskvCheck(int status) {
if (status != SDSKV_SUCCESS) {
throw SdskvException(status);
}
}
class SdskvProviderHandle {
public:
explicit SdskvProviderHandle(sdskv_provider_handle_t kvph);
SdskvProviderHandle(SdskvProviderHandle const &other);
SdskvProviderHandle &operator=(SdskvProviderHandle const &other);
SdskvProviderHandle(SdskvProviderHandle &&other);
SdskvProviderHandle &operator=(SdskvProviderHandle &&other);
~SdskvProviderHandle();
void Reset();
sdskv_database_id_t Open(std::string_view db_name) const;
private:
sdskv_provider_handle_t kvph_ = SDSKV_PROVIDER_HANDLE_NULL;
};
class SdskvClient {
public:
SdskvClient(MargoInstance const &mi);
SdskvClient(SdskvClient const &) = delete;
SdskvClient &operator=(SdskvClient const &) = delete;
SdskvClient(SdskvClient &&other);
SdskvClient &operator=(SdskvClient &&other);
~SdskvClient();
void Reset();
SdskvProviderHandle ProviderHandleCreate(MargoAddress const &address,
uint16_t provider_id) const;
private:
sdskv_client_t client_ = SDSKV_CLIENT_NULL;
};
} // namespace tausdskv
#endif // TAUSDSKV_SDSKV_CLIENT_HPP_
\ No newline at end of file
......@@ -16,6 +16,8 @@ using namespace tausdskv;
extern "C" int Tau_plugin_init_func(int argc, char **argv) {
// TAU_SoS
Plugin::InitializeInstance(argc, argv);
Tau_plugin_callbacks_t cb;
Tau_util_init_tau_plugin_callbacks(&cb);
......@@ -26,8 +28,7 @@ extern "C" int Tau_plugin_init_func(int argc, char **argv) {
cb.PostInit = Plugin::Bind<Tau_plugin_event_post_init_data_t, &Plugin::PostInit>();
cb.PreEndOfExecution =
Plugin::Bind<Tau_plugin_event_pre_end_of_execution_data_t, &Plugin::PreEndOfExecution>();
cb.EndOfExecution =
Plugin::Bind<Tau_plugin_event_end_of_execution_data_t, &Plugin::EndOfExecution>();
cb.EndOfExecution = Plugin::GlobalEndOfExecution;
cb.Send = Plugin::Bind<Tau_plugin_event_send_data_t, &Plugin::Send>();
cb.Recv = Plugin::Bind<Tau_plugin_event_recv_data_t, &Plugin::Recv>();
......
/**
* @file margo.cpp
* @author Andrew Gaspar (you@domain.com)
* @brief
* @date 2019-05-24
*
* @copyright Copyright (c) 2019 Triad National Security, LLC
*
*/
// Self Include
#include <margo.hpp>
// STL Includes
#include <iostream>
#include <string>
#include <utility>
// Internal Includes
#include <mercury.hpp>
using namespace tausdskv;
char const *MargoInitException::what() const noexcept { return "Margo failed to initialize!"; }
MargoAddress::MargoAddress(MargoInstance const &mid, hg_addr_t addr) : mid_(&mid), addr_(addr) {}
MargoAddress::MargoAddress(MargoAddress const &other) {
mid_ = other.mid_;
MercuryCheck(margo_addr_dup(other.mid_->InstanceId(), other.addr_, &addr_));
}
MargoAddress &MargoAddress::operator=(MargoAddress const &other) {
Reset();
mid_ = other.mid_;
MercuryCheck(margo_addr_dup(other.mid_->InstanceId(), other.addr_, &addr_));
return *this;
}
MargoAddress::MargoAddress(MargoAddress &&other) {
mid_ = other.mid_;
std::swap(addr_, other.addr_);
}
MargoAddress &MargoAddress::operator=(MargoAddress &&other) {
Reset();
mid_ = other.mid_;
std::swap(addr_, other.addr_);
return *this;
}
MargoAddress::~MargoAddress() { Reset(); }
void MargoAddress::Reset() {
if (addr_ != HG_ADDR_NULL) {
MercuryCheck(margo_addr_free(mid_->InstanceId(), addr_));
addr_ = HG_ADDR_NULL;
}
}
MargoInstance::MargoInstance(std::string_view addr_str,
MargoMode mode,
bool use_progress_thread,
int rpc_thread_count) {
std::string addr_str_z(addr_str);
mid_ = margo_init(
addr_str_z.c_str(), static_cast<int>(mode), use_progress_thread, rpc_thread_count);
if (mid_ == MARGO_INSTANCE_NULL) {
throw MargoInitException();
}
}
MargoInstance::MargoInstance(MargoInstance &&other) { std::swap(mid_, other.mid_); }
MargoInstance &MargoInstance::operator=(MargoInstance &&other) {
Reset();
std::swap(mid_, other.mid_);
return *this;
}
MargoInstance::~MargoInstance() { Reset(); }
void MargoInstance::Reset() {
if (mid_ != MARGO_INSTANCE_NULL) {
margo_finalize(mid_);
mid_ = MARGO_INSTANCE_NULL;
}
}
MargoAddress MargoInstance::Lookup(std::string_view name) {
std::string name_z(name);
hg_addr_t addr;
MercuryCheck(margo_addr_lookup(InstanceId(), name_z.c_str(), &addr));
return MargoAddress(*this, addr);
}
\ No newline at end of file
/**
* @file mercury.cpp
* @author Andrew Gaspar (you@domain.com)
* @brief
* @date 2019-05-24
*
* @copyright Copyright (c) 2019 Triad National Security, LLC
*
*/
// Self Include
#include <mercury.hpp>
// STL Includes
#include <sstream>
using namespace tausdskv;
MercuryException::MercuryException(hg_return_t status) : status_(status) {
std::stringstream ss;
ss << "MercuryException: ";
#define CASE(enum) \
case enum: \
ss << #enum; \
break;
switch (status) {
CASE(HG_NA_ERROR)
CASE(HG_TIMEOUT)
CASE(HG_INVALID_PARAM)
CASE(HG_SIZE_ERROR)
CASE(HG_NOMEM_ERROR)
CASE(HG_PROTOCOL_ERROR)
CASE(HG_NO_MATCH)
CASE(HG_CHECKSUM_ERROR)
CASE(HG_CANCELED)
CASE(HG_OTHER_ERROR)
default:
ss << "Unknown(" << status << ")";
break;
}
#undef CASE
what_ = ss.str();
}
\ No newline at end of file
......@@ -16,10 +16,54 @@
using namespace tausdskv;
Plugin *Plugin::GetInstance() {
static Plugin instance;
namespace {
static Plugin *instance = nullptr;
}
void Plugin::InitializeInstance(int argc, char **argv) {
if (instance) return;
return &instance;
try {
instance = new Plugin("tcp", "tcp://127.0.0.1:1234", "tau");
} catch (SdskvException const &ex) {
if (ex.Status() == SDSKV_ERR_DB_NAME) {
std::cerr << "Database name does not exist!" << std::endl;
std::exit(EXIT_FAILURE);
}
}
}
Plugin *Plugin::GetInstance() { return instance; }
int Plugin::GlobalEndOfExecution(Tau_plugin_event_end_of_execution_data_t *data) {
if (data->tid != 0) {
return 1;
}
auto result = GetInstance()->EndOfExecution(*data);
delete instance;
instance = nullptr;
return static_cast<int>(result);
}
Plugin::Plugin(std::string_view addr_str,
std::string_view server_address_str,
std::string_view db_name)
: mid_(addr_str, MargoMode::Client, false, -1),
client_(mid_),
server_address_(mid_.Lookup(server_address_str)),
kvph_(client_.ProviderHandleCreate(server_address_, 1)),
db_id_(kvph_.Open(db_name)) {
static_assert(offsetof(Plugin, client_) > offsetof(Plugin, mid_),
"This code assumes mid_ is initialized before client_");
static_assert(offsetof(Plugin, server_address_) > offsetof(Plugin, mid_),
"This code assumes mid_ is initialized before server_address_");
static_assert(offsetof(Plugin, kvph_) > offsetof(Plugin, client_),