Commit be6df328 authored by David Rich's avatar David Rich Committed by Rob Latham
Browse files

Add some timing code. TODO: go back and ifdef this code (and clean up some of the KV_DEBUG stuff).

parent 9fcc6046
......@@ -2,6 +2,9 @@
// All rights reserved.
#include "datastore.h"
#include <boost/filesystem.hpp>
#include <chrono>
using namespace std::chrono;
AbstractDataStore::AbstractDataStore() {
_duplicates = Duplicates::IGNORE;
......@@ -178,6 +181,7 @@ bool LevelDBDataStore::put(ds_bulk_t &key, ds_bulk_t &data) {
leveldb::Status status;
bool success = false;
high_resolution_clock::time_point start = high_resolution_clock::now();
// IGNORE case deals with redundant puts (where key/value is the same). In LevelDB a
// redundant put simply overwrites previous value which is fine when key/value is the same.
if (_duplicates == Duplicates::IGNORE) {
......@@ -195,6 +199,8 @@ bool LevelDBDataStore::put(ds_bulk_t &key, ds_bulk_t &data) {
else {
std::cerr << "LevelDBDataStore::put: Unexpected Duplicates option = " << int32_t(_duplicates) << std::endl;
}
uint64_t elapsed = duration_cast<microseconds>(high_resolution_clock::now()-start).count();
std::cout << "LevelDBDataStore::put time = " << elapsed << " microseconds" << std::endl;
return success;
};
......@@ -203,6 +209,7 @@ bool LevelDBDataStore::get(ds_bulk_t &key, ds_bulk_t &data) {
leveldb::Status status;
bool success = false;
high_resolution_clock::time_point start = high_resolution_clock::now();
data.clear();
std::string value;
status = _dbm->Get(leveldb::ReadOptions(), toString(key), &value);
......@@ -213,6 +220,8 @@ bool LevelDBDataStore::get(ds_bulk_t &key, ds_bulk_t &data) {
else if (!status.IsNotFound()) {
std::cerr << "LevelDBDataStore::get: LevelDB error on Get = " << status.ToString() << std::endl;
}
uint64_t elapsed = duration_cast<microseconds>(high_resolution_clock::now()-start).count();
std::cout << "LevelDBDataStore::get time = " << elapsed << " microseconds" << std::endl;
return success;
};
......@@ -354,16 +363,17 @@ bool BerkeleyDBDataStore::put(ds_bulk_t &key, ds_bulk_t &data) {
bool success = false;
// IGNORE case deals with redundant puts (where key/value is the same). In BerkeleyDB a
// redundant put simply overwrites previous value which is fine when key/value is the same.
// redundant may overwrite previous value which is fine when key/value is the same.
// ALLOW case deals with actual duplicates (where key is the same but value is different).
// This option might be used when eraseOnGet is set (e.g. ParSplice hotpoint use case).
if (_duplicates == Duplicates::IGNORE || _duplicates == Duplicates::ALLOW) {
ds_bulk_t keydata;
Dbt db_key(&(keydata[0]), uint32_t(keydata.size()));
Dbt put_data(&(data[0]), uint32_t(data.size()));
uint32_t flags = DB_NOOVERWRITE;
uint32_t flags = DB_NOOVERWRITE; // to simply overwrite value, don't use this flag
status = _dbm->put(NULL, &db_key, &put_data, flags);
if (status == 0) { // is this the right test for success?
if (status == 0 ||
(_duplicates == Duplicates::IGNORE && status == DB_KEYEXIST)) {
success = true;
}
else {
......
......@@ -112,10 +112,13 @@ hg_return_t kv_put(kv_context_t *context,
msize = ksize + vsize + 2*sizeof(hg_size_t);
printf("kv_put ksize %lu, vsize %lu, msize %lu\n", ksize, vsize, msize);
/*
* If total payload is large, we'll do our own
* explicit transfer of the value data.
*/
double st1, et1, st2, et2;
st1 = ABT_get_wtime();
if (msize <= MAX_RPC_MESSAGE_SIZE) {
put_in_t pin;
put_out_t pout;
......@@ -125,7 +128,10 @@ hg_return_t kv_put(kv_context_t *context,
pin.pi.value = (kv_data_t)value;
pin.pi.vsize = vsize;
st2 = ABT_get_wtime();
ret = margo_forward(context->put_handle, &pin);
et2 = ABT_get_wtime();
printf("kv_put forward time: %f microseconds\n", (et2-st2)*1000000);
assert(ret == HG_SUCCESS);
ret = margo_get_output(context->put_handle, &pout);
......@@ -149,11 +155,17 @@ hg_return_t kv_put(kv_context_t *context,
bpin.bulk.ksize = ksize;
bpin.bulk.vsize = vsize;
st2 = ABT_get_wtime();
ret = margo_bulk_create(context->mid, 1, &value, &bpin.bulk.vsize,
HG_BULK_READ_ONLY, &bpin.bulk.handle);
et2 = ABT_get_wtime();
printf("kv_put bulk create time: %f microseconds\n", (et2-st2)*1000000);
assert(ret == HG_SUCCESS);
st2 = ABT_get_wtime();
ret = margo_forward(context->bulk_put_handle, &bpin);
et2 = ABT_get_wtime();
printf("kv_put bulk forward time: %f microseconds\n", (et2-st2)*1000000);
assert(ret == HG_SUCCESS);
ret = margo_get_output(context->bulk_put_handle, &bpout);
......@@ -162,6 +174,8 @@ hg_return_t kv_put(kv_context_t *context,
margo_free_output(context->bulk_put_handle, &bpout);
}
et1 = ABT_get_wtime();
printf("kv_put time: %f microseconds\n", (et1-st1)*1000000);
return ret;
}
......@@ -176,12 +190,15 @@ hg_return_t kv_get(kv_context_t *context,
hg_size_t msize;
size = *(hg_size_t*)vsize;
msize = size + sizeof(hg_size_t);
msize = size + sizeof(hg_size_t) + sizeof(hg_return_t);
printf("kv_get ksize %lu, vsize %lu, msize %lu\n", ksize, size, msize);
/*
* If return payload is large, we'll do our own
* explicit transfer of the value data.
*/
double st1, et1, st2, et2;
st1 = ABT_get_wtime();
if (msize <= MAX_RPC_MESSAGE_SIZE) {
get_in_t gin;
get_out_t gout;
......@@ -190,7 +207,10 @@ hg_return_t kv_get(kv_context_t *context,
gin.gi.ksize = ksize;
gin.gi.vsize = size;
st2 = ABT_get_wtime();
ret = margo_forward(context->get_handle, &gin);
et2 = ABT_get_wtime();
printf("kv_get forward time: %f microseconds\n", (et2-st2)*1000000);
assert(ret == HG_SUCCESS);
ret = margo_get_output(context->get_handle, &gout);
......@@ -217,11 +237,17 @@ hg_return_t kv_get(kv_context_t *context,
bgin.bulk.ksize = ksize;
bgin.bulk.vsize = size;
st2 = ABT_get_wtime();
ret = margo_bulk_create(context->mid, 1, &value, &bgin.bulk.vsize,
HG_BULK_WRITE_ONLY, &bgin.bulk.handle);
et2 = ABT_get_wtime();
printf("kv_get bulk create time: %f microseconds\n", (et2-st2)*1000000);
assert(ret == HG_SUCCESS);
st2 = ABT_get_wtime();
ret = margo_forward(context->bulk_get_handle, &bgin);
et2 = ABT_get_wtime();
printf("kv_get bulk forward time: %f microseconds\n", (et2-st2)*1000000);
assert(ret == HG_SUCCESS);
ret = margo_get_output(context->bulk_get_handle, &bgout);
......@@ -237,6 +263,8 @@ hg_return_t kv_get(kv_context_t *context,
margo_free_output(context->bulk_get_handle, &bgout);
}
et1 = ABT_get_wtime();
printf("kv_get time: %f microseconds\n", (et1-st1)*1000000);
return ret;
}
......
......@@ -32,6 +32,7 @@ static hg_return_t open_handler(hg_handle_t handle)
//datastore = new BwTreeDataStore(); // testing BwTree
datastore = new LevelDBDataStore(); // testing LevelDB
//datastore = new BerkeleyDBDataStore(); // testing BerkeleyDB
//datastore->set_in_memory(true); // testing in-memory BerkeleyDB
db_name = in_name;
datastore->createDatabase(db_name);
#ifdef KV_DEBUG
......@@ -85,7 +86,9 @@ static hg_return_t put_handler(hg_handle_t handle)
hg_return_t ret;
put_in_t pin;
put_out_t pout;
double st1, et1, st2, et2;
st1 = ABT_get_wtime();
ret = margo_get_input(handle, &pin);
assert(ret == HG_SUCCESS);
......@@ -112,6 +115,9 @@ static hg_return_t put_handler(hg_handle_t handle)
pout.ret = HG_OTHER_ERROR;
}
et1 = ABT_get_wtime();
std::cout << "put_handler time: " << (et1-st1)*1000000 << " microseconds" << std::endl;
ret = margo_respond(handle, &pout);
assert(ret == HG_SUCCESS);
......@@ -130,7 +136,9 @@ static hg_return_t bulk_put_handler(hg_handle_t handle)
hg_bulk_t bulk_handle;
const struct hg_info *hgi;
margo_instance_id mid;
double st1, et1, st2, et2;
st1 = ABT_get_wtime();
ret = margo_get_input(handle, &bpin);
assert(ret == HG_SUCCESS);
......@@ -174,6 +182,9 @@ static hg_return_t bulk_put_handler(hg_handle_t handle)
#ifdef KV_DEBUG
std::cout << "bulk_put_handler sending response back with ret=" << bpout.ret << std::endl;
#endif
et1 = ABT_get_wtime();
std::cout << "bulk_put_handler time: " << (et1-st1)*1000000 << " microseconds" << std::endl;
ret = margo_respond(handle, &bpout);
assert(ret == HG_SUCCESS);
......@@ -193,7 +204,9 @@ static hg_return_t get_handler(hg_handle_t handle)
hg_return_t ret;
get_in_t gin;
get_out_t gout;
double st1, et1, st2, et2;
st1 = ABT_get_wtime();
ret = margo_get_input(handle, &gin);
assert(ret == HG_SUCCESS);
......@@ -230,6 +243,9 @@ static hg_return_t get_handler(hg_handle_t handle)
gout.go.ret = HG_OTHER_ERROR; // caller should be checking return value
}
et1 = ABT_get_wtime();
std::cout << "get_handler time: " << (et1-st1)*1000000 << " microseconds" << std::endl;
ret = margo_respond(handle, &gout);
assert(ret == HG_SUCCESS);
......@@ -248,7 +264,9 @@ static hg_return_t bulk_get_handler(hg_handle_t handle)
hg_bulk_t bulk_handle;
const struct hg_info *hgi;
margo_instance_id mid;
double st1, et1, st2, et2;
st1 = ABT_get_wtime();
ret = margo_get_input(handle, &bgin);
assert(ret == HG_SUCCESS);
......@@ -300,6 +318,9 @@ static hg_return_t bulk_get_handler(hg_handle_t handle)
std::cout << "bulk_get_handler returning ret = " << bgout.ret
<< ", anad size = " << bgout.size << std::endl;
#endif
et1 = ABT_get_wtime();
std::cout << "bulk_get_handler time: " << (et1-st1)*1000000 << " microseconds" << std::endl;
ret = margo_respond(handle, &bgout);
assert(ret == HG_SUCCESS);
......
......@@ -42,7 +42,7 @@ typedef struct kv_context_s {
} kv_context_t;
#define MAX_RPC_MESSAGE_SIZE 2048 // in bytes
#define MAX_RPC_MESSAGE_SIZE 4096 // in bytes
// setup to support opaque type handling
typedef char* kv_data_t;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment