Commit b3fa964f authored by David Rich's avatar David Rich Committed by Rob Latham
Browse files

Update spack sds-repo and am now using OFI/libfabric by default.

parent 2e1ecae9
......@@ -43,13 +43,13 @@ test_test_client_LDADD = -lkvclient
test_test_server_SOURCES = test/test-server.cc
test_test_server_CXXFLAGS = -pthread -std=c++11 -g -Wall -mcx16 -Wno-invalid-offsetof
test_test_server_CPPFLAGS = ${CPPFLAGS} -I${srcdir}/src -I${srcdir}/src/BwTree/src
test_test_server_LDADD = -lkvserver -lleveldb -lsnappy
test_test_server_LDADD = -lkvserver -lleveldb -lsnappy -lboost_filesystem -lboost_system
test_test_mpi_SOURCES = test/test-mpi.cc
test_test_mpi_CXXFLAGS = -pthread -std=c++11 -g -Wall -mcx16 -Wno-invalid-offsetof
test_test_mpi_CPPFLAGS = ${CPPFLAGS} -I${srcdir}/src -I${srcdir}/src/BwTree/src
test_test_mpi_LDADD = -lkvclient -lkvserver -lleveldb -lsnappy
test_test_mpi_LDADD = -lkvclient -lkvserver -lleveldb -lsnappy -lboost_filesystem -lboost_system
pkgconfigdir = $(libdir)/pkgconfig
pkgconfig_DATA = maint/kv-server.pc \
......
// Copyright (c) 2017, Los Alamos National Security, LLC.
// All rights reserved.
#include "datastore.h"
#include <boost/filesystem.hpp>
AbstractDataStore::AbstractDataStore() {
_duplicates = Duplicates::IGNORE;
......@@ -133,8 +134,8 @@ kv_key_t LevelDBDataStore::string2key(std::string &keystr) {
};
LevelDBDataStore::~LevelDBDataStore() {
// deleting LevelDB can cause core dump
delete _dbm;
leveldb::Env::Shutdown();
};
void LevelDBDataStore::createDatabase(std::string db_name) {
......@@ -142,6 +143,9 @@ void LevelDBDataStore::createDatabase(std::string db_name) {
leveldb::Status status;
// db_name assumed to include the full path (e.g. /var/data/db.dat)
boost::filesystem::path p(db_name);
boost::filesystem::create_directories(p.parent_path().string());
options.create_if_missing = true;
status = leveldb::DB::Open(options, db_name, &_dbm);
......
......@@ -6,6 +6,7 @@
#include <boost/functional/hash.hpp>
#include <vector>
#include <leveldb/db.h>
#include <leveldb/env.h>
#include <db_cxx.h>
#include <dbstl_map.h>
......
......@@ -25,8 +25,8 @@ static hg_return_t open_handler(hg_handle_t handle)
std::cout << "SERVER: OPEN " << in_name << std::endl;
if (!datastore) {
datastore = new BwTreeDataStore(); // testing BwTree
//datastore = new LevelDBDataStore(); // testing LevelDB
//datastore = new BwTreeDataStore(); // testing BwTree
datastore = new LevelDBDataStore(); // testing LevelDB
db_name = in_name;
datastore->createDatabase(db_name);
std::cout << "SERVER OPEN: DataStore initialized and ready for " << db_name << std::endl;
......@@ -64,7 +64,6 @@ static hg_return_t close_handler(hg_handle_t handle)
hg_return_t ret;
close_out_t out;
// there may be cleanup we want to do here
out.ret = HG_SUCCESS;
ret = margo_respond(handle, &out);
......@@ -393,23 +392,23 @@ DEFINE_MARGO_RPC_HANDLER(bench_handler)
kv_context *kv_server_register(margo_instance_id mid);
{
hg_return_t ret;
hg_addr_t addr_self;
char addr_self_string[128];
hg_size_t addr_self_string_sz = 128;
kv_context *context;
/* sds keyval server init */
context = (kv_context *)malloc(sizeof(*context));
if (!addr_str) {
context->mid = margo_init("cci+tcp://",
MARGO_SERVER_MODE, 0, -1);
}
else {
context->mid = margo_init(addr_str,
MARGO_SERVER_MODE, 0, -1);
}
assert(context->mid);
hg_return_t ret;
hg_addr_t addr_self;
char addr_self_string[128];
hg_size_t addr_self_string_sz = 128;
kv_context *context;
/* sds keyval server init */
context = (kv_context *)malloc(sizeof(*context));
if (!addr_str) {
context->mid = margo_init("ofi+tcp://",
MARGO_SERVER_MODE, 0, -1);
}
else {
context->mid = margo_init(addr_str,
MARGO_SERVER_MODE, 0, -1);
}
assert(context->mid);
/* figure out what address this server is listening on */
ret = margo_addr_self(context->mid, &addr_self);
......
......@@ -81,7 +81,7 @@ int main(int argc, char *argv[])
// kv-client
//sprintf(client_addr_str_in, "cci+tcp://534%02d", rank);
sprintf(client_addr_str_in, "cci+tcp://");
sprintf(client_addr_str_in, "ofi+tcp://");
kv_context *context = kv_client_register(client_addr_str_in);
hret = margo_addr_self(context->mid, &client_addr);
DIE_IF(hret != HG_SUCCESS, "margo_addr_self");
......@@ -97,40 +97,42 @@ int main(int argc, char *argv[])
hret = kv_open(context, server_addr_str, (char*)db);
DIE_IF(hret != HG_SUCCESS, "kv_open");
uint64_t key = rank;
// put
int put_val = rank;
std::vector<char> put_data;
put_data.resize(sizeof(put_val));
uint64_t data_size = put_data.size();
memcpy(put_data.data(), &put_val, data_size);
hret = kv_bulk_put(context, (void*)&key, (void*)put_data.data(), &data_size);
printf("(put) key %lu, size=%lu\n", key, data_size);
DIE_IF(hret != HG_SUCCESS, "kv_bulk_put");
for (int i=1; i<rank*10; i++) {
int put_val = rank-i;
uint64_t key = (uint64_t)(rank+i);
std::vector<char> put_data;
put_data.resize(sizeof(put_val));
uint64_t data_size = put_data.size();
memcpy(put_data.data(), &put_val, data_size);
hret = kv_bulk_put(context, (void*)&key, (void*)put_data.data(), &data_size);
printf("(rank %d: put) key %lu, value %d, size=%lu\n", rank, key, put_val, data_size);
DIE_IF(hret != HG_SUCCESS, "kv_bulk_put");
}
sleep(2);
// get
int get_val;
std::vector<char> get_data;
get_data.resize(sizeof(get_val));
data_size = get_data.size();
printf("(get) key %lu, estimated size=%lu\n", key, data_size);
hret = kv_bulk_get(context, (void*)&key, (void*)get_data.data(), &data_size);
DIE_IF(hret != HG_SUCCESS, "kv_bulk_get");
printf("(get) key %lu, actual size=%lu\n", key, data_size);
get_data.resize(data_size);
memcpy(&get_val, get_data.data(), data_size);
printf("key: %lu in: %d out: %d\n", key, put_val, get_val);
for (int i=1; i<rank*10; i++) {
int get_val = rank-i;
uint64_t key = (uint64_t)(rank+i);
std::vector<char> get_data;
get_data.resize(sizeof(get_val));
uint64_t data_size = get_data.size();
hret = kv_bulk_get(context, (void*)&key, (void*)get_data.data(), &data_size);
DIE_IF(hret != HG_SUCCESS, "kv_bulk_get");
get_data.resize(data_size);
memcpy(&get_val, get_data.data(), data_size);
printf("(rank %d: get) key %lu, value %d, actual size=%lu\n", rank, key, get_val, data_size);
}
// close
hret = kv_close(context);
DIE_IF(hret != HG_SUCCESS, "kv_close");
// once all clients are done, one client can signal server
// once all clients are done with the close, one client can signal server
MPI_Barrier(clientComm);
if (rank==1) {
printf("rank %d: sending server a shutdown request\n", rank);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment