Commit a6d68278 authored by David Rich's avatar David Rich Committed by Rob Latham
Browse files

Working on test apps. Some issue with the kv_benchmark/bench_handler stuff.

parent a2de1206
......@@ -18,7 +18,7 @@ AbstractDataStore::~AbstractDataStore()
{};
BwTreeDataStore::BwTreeDataStore() :
AbstractDataStore() {
AbstractDataStore(Duplicates::IGNORE, false, false) {
_tree = NULL;
};
......@@ -111,7 +111,7 @@ bool BwTreeDataStore::get(const kv_key_t &key, std::vector<ds_bulk_t> &data) {
LevelDBDataStore::LevelDBDataStore() :
AbstractDataStore() {
AbstractDataStore(Duplicates::IGNORE, false, false) {
_dbm = NULL;
};
......
......@@ -76,9 +76,6 @@ public:
virtual bool get(const kv_key_t &key, std::vector<ds_bulk_t> &data);
protected:
leveldb::DB *_dbm = NULL;
Duplicates _duplicates;
bool _eraseOnGet;
bool _debug;
private:
std::string key2string(const kv_key_t &key);
kv_key_t string2key(std::string &keystr);
......
......@@ -91,9 +91,6 @@ hg_return_t kv_open(kv_context *context, const char *server_addr, const char *db
ret = margo_create(context->mid, context->svr_addr,
context->bulk_get_id, &(context->bulk_get_handle));
assert(ret == HG_SUCCESS);
ret = margo_create(context->mid, context->svr_addr,
context->bench_id, &(context->bench_handle));
assert(ret == HG_SUCCESS);
ret = margo_create(context->mid, context->svr_addr,
context->shutdown_id, &(context->shutdown_handle));
assert(ret == HG_SUCCESS);
......@@ -119,10 +116,11 @@ hg_return_t kv_put(kv_context *context, void *key, void *value) {
ret = margo_get_output(context->put_handle, &put_out);
assert(ret == HG_SUCCESS);
assert(put_out.ret == HG_SUCCESS);
ret = put_out.ret;
margo_free_output(context->put_handle, &put_out);
return HG_SUCCESS;
return ret;
}
hg_return_t kv_bulk_put(kv_context *context, void *key, void *data, size_t *data_size) {
......@@ -142,11 +140,11 @@ hg_return_t kv_bulk_put(kv_context *context, void *key, void *data, size_t *data
ret = margo_get_output(context->bulk_put_handle, &bpout);
assert(ret == HG_SUCCESS);
assert(bpout.ret == HG_SUCCESS); // make sure the server side says all is OK
ret = bpout.ret; // make sure the server side says all is OK
margo_free_output(context->bulk_put_handle, &bpout);
return HG_SUCCESS;
return ret;
}
hg_return_t kv_get(kv_context *context, void *key, void *value)
......@@ -163,39 +161,40 @@ hg_return_t kv_get(kv_context *context, void *key, void *value)
ret = margo_get_output(context->get_handle, &get_out);
assert(ret == HG_SUCCESS);
assert(get_out.ret == HG_SUCCESS);
ret = get_out.ret;
*(kv_value_t*)value = get_out.value;
margo_free_output(context->get_handle, &get_out);
return HG_SUCCESS;
return ret;
}
hg_return_t kv_bulk_get(kv_context *context, void *key, void *data, size_t *data_size)
{
hg_return_t ret;
bulk_get_in_t bgin;
bulk_get_out_t bgout;
hg_return_t ret;
bulk_get_in_t bgin;
bulk_get_out_t bgout;
bgin.key = *(kv_key_t*)key;
bgin.size = *(size_t*)data_size;
bgin.key = *(kv_key_t*)key;
bgin.size = *(size_t*)data_size;
ret = margo_bulk_create(context->mid, 1, &data, data_size,
HG_BULK_WRITE_ONLY, &bgin.bulk_handle);
assert(ret == HG_SUCCESS);
ret = margo_bulk_create(context->mid, 1, &data, data_size,
HG_BULK_WRITE_ONLY, &bgin.bulk_handle);
assert(ret == HG_SUCCESS);
ret = margo_forward(context->bulk_get_handle, &bgin);
assert(ret == HG_SUCCESS);
ret = margo_forward(context->bulk_get_handle, &bgin);
assert(ret == HG_SUCCESS);
ret = margo_get_output(context->bulk_get_handle, &bgout);
assert(ret == HG_SUCCESS);
assert(bgout.ret == HG_SUCCESS); // make sure the server side says all is OK
ret = margo_get_output(context->bulk_get_handle, &bgout);
assert(ret == HG_SUCCESS);
ret = bgout.ret; // make sure the server side says all is OK
*data_size = (size_t)bgout.size; // report actual size of data transferred to caller
*data_size = (size_t)bgout.size; // report actual size of data transferred to caller
margo_free_output(context->bulk_get_handle, &bgout);
margo_free_output(context->bulk_get_handle, &bgout);
return HG_SUCCESS;
return ret;
}
hg_return_t kv_close(kv_context *context)
......@@ -221,39 +220,24 @@ hg_return_t kv_close(kv_context *context)
return HG_SUCCESS;
}
bench_result *kv_benchmark(kv_context *context, int count) {
hg_return_t ret;
hg_handle_t handle;
bench_in_t bench_in;
bench_out_t bench_out;
bench_result *result=NULL;
context->bench_id= MARGO_REGISTER(context->mid, "bench",
bench_in_t, bench_out_t, NULL);
ret = margo_create(context->mid, context->svr_addr,
context->bench_id, &(context->bench_handle) );
assert(ret == HG_SUCCESS);
bench_result *kv_benchmark(kv_context *context, int32_t count) {
ret = margo_create(context->mid, context->svr_addr,
context->bench_id, &handle);
assert(ret == HG_SUCCESS);
bench_in.count = count;
ret = margo_forward(context->bench_handle, &bench_in);
assert(ret == HG_SUCCESS);
ret = margo_get_output(context->bench_handle, &bench_out);
margo_free_output(handle, &bench_out);
margo_destroy(handle);
ret = margo_forward(context->bench_handle, &bench_in);
bench_in.count = count;
ret = margo_forward(handle, &bench_in);
assert(ret == HG_SUCCESS);
ret = margo_get_output(context->bench_handle, &bench_out);
ret = margo_get_output(handle, &bench_out);
assert(ret == HG_SUCCESS);
result = malloc(sizeof(bench_result));
result->nkeys = bench_out.result.nkeys;
result->insert_time = bench_out.result.insert_time;
result->read_time = bench_out.result.read_time;
result->overhead = bench_out.result.overhead;
margo_free_output(handle, &bench_out);
margo_destroy(handle);
......@@ -268,7 +252,6 @@ hg_return_t kv_client_deregister(kv_context *context) {
margo_destroy(context->get_handle);
margo_destroy(context->bulk_put_handle);
margo_destroy(context->bulk_get_handle);
margo_destroy(context->bench_handle);
margo_destroy(context->shutdown_handle);
assert(ret == HG_SUCCESS);
......
......@@ -7,6 +7,9 @@
#include <abt.h>
#include <assert.h>
#include <random>
#include <iostream>
// since this is global, we're assuming this server instance will manage a single DB
AbstractDataStore *datastore = NULL; // created by caller, passed into kv_server_register
std::string db_name;
......@@ -273,8 +276,6 @@ static void shutdown_handler(hg_handle_t handle)
margo_finalize(mid);
std::cout << "SERVER: margo finalized" << std::endl;
return;
}
DEFINE_MARGO_RPC_HANDLER(shutdown_handler)
......@@ -282,17 +283,13 @@ DEFINE_MARGO_RPC_HANDLER(shutdown_handler)
* from BwTree tests:
* RandomInsertSpeedTest() - Tests how fast it is to insert keys randomly
*/
#include "bwtree.h"
#include <random>
#include <iostream>
static void RandomInsertSpeedTest(size_t key_num, bench_result *results)
static void RandomInsertSpeedTest(int32_t key_num, bench_result *results)
{
std::random_device r{};
std::default_random_engine e1(r());
std::random_device rd;
std::uniform_int_distribution<int> uniform_dist(0, key_num - 1);
auto *t = new wangziqi2013::bwtree::BwTree<int, int>;
BwTree<int, int> *t = new BwTree<int, int>();
t->SetDebugLogging(0);
t->UpdateThreadLocal(1);
t->AssignGCID(0);
......@@ -303,8 +300,8 @@ static void RandomInsertSpeedTest(size_t key_num, bench_result *results)
// We loop for keynum * 2 because in average half of the insertion
// will hit an empty slot
for(size_t i = 0;i < key_num * 2;i++) {
int key = uniform_dist(e1);
for(int32_t i = 0;i < key_num * 2;i++) {
int key = uniform_dist(rd);
t->Insert(key, key);
}
......@@ -324,8 +321,8 @@ static void RandomInsertSpeedTest(size_t key_num, bench_result *results)
v.reserve(100);
start = std::chrono::system_clock::now();
for(size_t i = 0;i < key_num * 2;i++) {
int key = uniform_dist(e1);
for(int32_t i = 0;i < key_num * 2;i++) {
int key = uniform_dist(rd);
t->GetValue(key, v);
......@@ -343,8 +340,8 @@ static void RandomInsertSpeedTest(size_t key_num, bench_result *results)
start = std::chrono::system_clock::now();
for(size_t i = 0;i < key_num * 2;i++) {
int key = uniform_dist(e1);
for(int32_t i = 0;i < key_num * 2;i++) {
int key = uniform_dist(rd);
v.push_back(key);
......@@ -358,7 +355,7 @@ static void RandomInsertSpeedTest(size_t key_num, bench_result *results)
std::cout << " Overhead = " << overhead.count() << " seconds" << std::endl;
results->overhead = overhead.count();
return;
delete t;
}
static hg_return_t bench_handler(hg_handle_t handle)
......@@ -380,88 +377,78 @@ static hg_return_t bench_handler(hg_handle_t handle)
bench_out.result.overhead = random_insert.overhead;
ret = margo_respond(handle, &bench_out);
assert(ret == HG_SUCCESS);
margo_free_input(handle, &bench_in);
margo_destroy(handle);
return ret;
return HG_SUCCESS;
}
DEFINE_MARGO_RPC_HANDLER(bench_handler)
#endif
kv_context *kv_server_register(margo_instance_id mid);
{
int ret;
hg_addr_t addr_self;
char addr_self_string[128];
hg_size_t addr_self_string_sz = 128;
kv_context *context;
context->mid = mid;
TREE->SetDebugLogging(0);
TREE->UpdateThreadLocal(1);
TREE->AssignGCID(0);
size_t num_threads = TREE->GetThreadNum();
printf("SERVER: BwTree initialized, using %lu thread(s)\n", num_threads);
/* sds keyval server init */
context = (kv_context *)malloc(sizeof(*context));
if (!addr_str) {
context->mid = margo_init("cci+tcp://localhost:52345",
MARGO_SERVER_MODE, 0, -1);
}
else {
context->mid = margo_init(addr_str,
MARGO_SERVER_MODE, 0, -1);
}
assert(context->mid);
/* figure out what address this server is listening on */
ret = margo_addr_self(context->mid, &addr_self);
if(ret != HG_SUCCESS)
{
fprintf(stderr, "Error: margo_addr_selff()\n");
margo_finalize(context->mid);
return(NULL);
}
ret = margo_addr_to_string(context->mid, addr_self_string,
&addr_self_string_sz, addr_self);
if(ret != HG_SUCCESS)
{
fprintf(stderr, "Error: HG_Addr_self()\n");
margo_finalize(context->mid);
return(NULL);
}
margo_addr_free(context->mid, addr_self);
printf("# accepting RPCs on address \"%s\"\n", addr_self_string);
context->open_id = MARGO_REGISTER(context->mid, "open",
open_in_t, open_out_t, open_handler);
hg_return_t ret;
hg_addr_t addr_self;
char addr_self_string[128];
hg_size_t addr_self_string_sz = 128;
kv_context *context;
/* sds keyval server init */
context = (kv_context *)malloc(sizeof(*context));
if (!addr_str) {
context->mid = margo_init("cci+tcp://",
MARGO_SERVER_MODE, 0, -1);
}
else {
context->mid = margo_init(addr_str,
MARGO_SERVER_MODE, 0, -1);
}
assert(context->mid);
/* figure out what address this server is listening on */
ret = margo_addr_self(context->mid, &addr_self);
if(ret != HG_SUCCESS)
{
std::cerr << "Error: margo_addr_self()" << std::endl;
margo_finalize(context->mid);
return NULL;
}
ret = margo_addr_to_string(context->mid, addr_self_string,
&addr_self_string_sz, addr_self);
if(ret != HG_SUCCESS)
{
std::cerr << "Error: margo_addr_to_string()" << std::endl;
margo_finalize(context->mid);
return NULL;
}
margo_addr_free(context->mid, addr_self);
std::cout << "accepting RPCs on address " << std::string(addr_self_string) << std::endl;
context->close_id = MARGO_REGISTER(context->mid, "close",
close_in_t, close_out_t, close_handler);
context->open_id = MARGO_REGISTER(context->mid, "open",
open_in_t, open_out_t, open_handler);
context->put_id = MARGO_REGISTER(context->mid, "put",
put_in_t, put_out_t, put_handler);
context->close_id = MARGO_REGISTER(context->mid, "close",
void, close_out_t, close_handler);
context->bulk_put_id = MARGO_REGISTER(context->mid, "bulk_put",
bulk_put_in_t, bulk_put_out_t, bulk_put_handler);
context->put_id = MARGO_REGISTER(context->mid, "put",
put_in_t, put_out_t, put_handler);
context->get_id = MARGO_REGISTER(context->mid, "get",
get_in_t, get_out_t, get_handler);
context->bulk_put_id = MARGO_REGISTER(context->mid, "bulk_put",
bulk_put_in_t, bulk_put_out_t, bulk_put_handler);
context->bulk_get_id = MARGO_REGISTER(context->mid, "bulk_get",
bulk_get_in_t, bulk_get_out_t, bulk_get_handler);
context->get_id = MARGO_REGISTER(context->mid, "get",
get_in_t, get_out_t, get_handler);
context->bench_id = MARGO_REGISTER(context->mid, "bench",
bench_in_t, bench_out_t, bench_handler);
context->bulk_get_id = MARGO_REGISTER(context->mid, "bulk_get",
bulk_get_in_t, bulk_get_out_t, bulk_get_handler);
context->shutdown_id = MARGO_REGISTER(context->mid, "shutdown",
void, void, shutdown_handler);
context->bench_id = MARGO_REGISTER(context->mid, "bench",
bench_in_t, bench_out_t, bench_handler);
return context;
context->shutdown_id = MARGO_REGISTER(context->mid, "shutdown",
void, void, shutdown_handler);
}
......
......@@ -51,7 +51,6 @@ typedef struct kv_context_s {
hg_handle_t bulk_put_handle; // necessary?
hg_handle_t get_handle;
hg_handle_t bulk_get_handle; // necessary?
hg_handle_t bench_handle;
hg_handle_t shutdown_handle;
/* some keyval dodad goes here so the server can discriminate
* seems like it should be some universal identifier we can
......@@ -89,10 +88,10 @@ typedef struct {
double overhead;
} bench_result;
static inline hg_return_t hg_proc_bench_result(hg_proc_t proc, bench_result *result)
static inline hg_return_t hg_proc_bench_result(hg_proc_t proc, void *data)
{
/* TODO: needs a portable encoding */
return(hg_proc_memcpy(proc, result, sizeof(*result)));
return(hg_proc_memcpy(proc, data, sizeof(bench_result)));
}
DECLARE_MARGO_RPC_HANDLER(bench_handler)
......
#include <sds-keyval.h>
#include <assert.h>
#include <random>
#include <chrono>
void RandomInsertSpeedTest(kv_context *context,
size_t key_num, bench_result *results)
size_t key_num, bench_result *results)
{
std::random_device r{};
std::default_random_engine e1(r());
std::uniform_int_distribution<int> uniform_dist(0, key_num - 1);
std::uniform_int_distribution<uint64_t> uniform_dist(0, key_num - 1);
std::chrono::time_point<std::chrono::system_clock> start, end;
......@@ -18,7 +19,7 @@ void RandomInsertSpeedTest(kv_context *context,
// We loop for keynum * 2 because in average half of the insertion
// will hit an empty slot
for(size_t i = 0;i < key_num * 2;i++) {
int key = uniform_dist(e1);
uint64_t key = uniform_dist(e1);
kv_put(context, &key, &key);
}
end = std::chrono::system_clock::now();
......@@ -29,11 +30,11 @@ void RandomInsertSpeedTest(kv_context *context,
results->insert_time = elapsed_seconds.count();
// Then test random read after random insert
int v;
uint64_t v;
start = std::chrono::system_clock::now();
for(size_t i = 0;i < key_num * 2;i++) {
int key = uniform_dist(e1);
uint64_t key = uniform_dist(e1);
kv_get(context, &key, &v);
}
......@@ -57,21 +58,34 @@ void print_results(bench_result *r)
int main(int argc, char **argv)
{
bench_result rpc;
bench_result *server;
kv_context *context;
size_t items = atoi(argv[1]);
context = kv_client_register(NULL);
kv_open(context, argv[2], NULL);
RandomInsertSpeedTest(context, items, &rpc);
print_results(&rpc);
kv_close(context);
server = kv_benchmark(context, items);
print_results(server);
free(server);
// kv_client_deregister(context);
return 0;
hg_return_t ret;
bench_result rpc;
bench_result *server;
kv_context *context;
size_t items = atoi(argv[1]);
context = kv_client_register(NULL);
ret = kv_open(context, argv[2], "testdb");
assert(ret == HG_SUCCESS);
RandomInsertSpeedTest(context, items, &rpc);
print_results(&rpc);
#if 0
server = kv_benchmark(context, items);
print_results(server);
free(server);
#endif
/* close */
ret = kv_close(context);
assert(ret == HG_SUCCESS);
/* signal server */
ret = kv_client_signal_shutdown(context);
assert(ret == HG_SUCCESS);
/* cleanup */
ret = kv_client_deregister(context);
assert(ret == HG_SUCCESS);
}
[ethernet]
# use this example for TCP
transport = tcp
interface = eth4 # switch this to eth2 or to an external hostname for non-localhost use
#include "sds-keyval.h"
#include <unistd.h>
#include <assert.h>
int main(int argc, char **argv) {
int ret;
hg_return_t ret;
kv_context * context = kv_client_register(NULL);
/* open */
ret = kv_open(context, argv[1], "booger");
assert(ret == HG_SUCCESS);
/* put */
int key = 10;
uint64_t key = 10;
int val = 10;
ret = kv_put(context, &key, &val);
assert(ret == HG_SUCCESS);
/* get */
int remote_val;
ret = kv_get(context, &key, &remote_val);
assert(ret == HG_SUCCESS);
printf("key: %d in: %d out: %d\n", key, val, remote_val);
/* signal server */
ret = kv_client_signal_shutdown(context);
/* close */
ret = kv_close(context);
/* benchmark doesn't require an open keyval */
#if 0
bench_result *output;
output = kv_benchmark(context, 1000);
printf("insert: %zd keys in %f seconds: %f Million-insert per sec\n",
output->nkeys, output->insert_time,
output->nkeys/(output->insert_time*1024*1024) );
free(output);
#endif
/* close */
ret = kv_close(context);
assert(ret == HG_SUCCESS);
/* signal server */
ret = kv_client_signal_shutdown(context);
assert(ret == HG_SUCCESS);
kv_client_deregister(context);
/* cleanup */
ret = kv_client_deregister(context);
assert(ret == HG_SUCCESS);
}
#include "sds-keyval.h"
#include <assert.h>
int main(int argc, char **argv) {
kv_context * context = kv_server_register(NULL);
kv_context *context = kv_server_register(argv[1]);
margo_wait_for_finalize(context->mid);
hg_return_t ret;
ret = kv_server_wait_for_shutdown(context);
assert(ret == HG_SUCCESS);
kv_server_deregister(context);
ret = kv_server_deregister(context);
assert(ret == HG_SUCCESS);
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment