Commit 80aa8dd2 authored by David Rich's avatar David Rich Committed by Rob Latham
Browse files

Checkpoint work so far.

parent 90d6921f
......@@ -8,14 +8,21 @@
kv_context *kv_client_register(int argc, char **argv) {
kv_context *kv_client_register(char *addr_str) {
int ret;
kv_context * context;
context = malloc(sizeof(kv_context));
/* client side: no custom xstreams */
context->mid = margo_init("ofi+tcp://",
MARGO_CLIENT_MODE, 0, -1);
if (!addr_str) {
context->mid = margo_init("ofi+tcp://",
MARGO_CLIENT_MODE, 0, -1);
}
else {
context->mid = margo_init(addr_str,
MARGO_CLIENT_MODE, 0, -1);
}
context->put_id = MARGO_REGISTER(context->mid, "put",
put_in_t, put_out_t, NULL);
......@@ -88,6 +95,26 @@ int kv_put(kv_context *context, void *key, void *value) {
return ret;
}
int kv_bulk_put(kv_context *context, void *key, void *value, uint64_t value_size) {
int ret;
bulk_put_in_t bpin;
bulk_put_out_t bpret;
bpin.key = *(uint64_t*)key;
bpin.size = value_size;
ret = margo_bulk_create(context->mid, 1, value, value_size,
HG_BULK_READ_ONLY, &bpin.bulk_handle);
assert(ret == HG_SUCCESS);
ret = margo_forward(context->bulk_put_handle, &bpin);
assert(ret == HG_SUCCESS);
ret = HG_Get_output(context->bulk_put_handle, &bpret);
assert(ret == HG_SUCCESS);
assert(bpret == HG_SUCCESS); // make sure the server side says all is OK
HG_Free_output(context->bulk_put_handle, &bpret);
return ret;
}
int kv_get(kv_context *context, void *key, void *value)
{
int ret=0;
......@@ -103,6 +130,28 @@ int kv_get(kv_context *context, void *key, void *value)
HG_Free_output(context->get_handle, &get_out);
return ret;
}
int kv_bulk_get(kv_context *context, void *key, void *value, uint64_t value_size)
{
int ret;
bulk_get_in_t bgin;
bulk_get_out_t bgret;
bgin.key = *(uint64_t*)key;
bgin.size = value_size;
ret = margo_bulk_create(context->mid, 1, value, value_size,
HG_BULK_WRITE_ONLY, &bgin.bulk_handle);
assert(ret == HG_SUCCESS);
ret = margo_forward(context->bulk_get_handle, &bgin);
assert(ret == HG_SUCCESS);
ret = HG_Get_output(context->bulk_get_handle, &bgret);
assert(ret == HG_SUCCESS);
assert(bgret == HG_SUCCESS); // make sure the server side says all is OK
HG_Free_output(context->get_handle, &bgret);
return ret;
}
int kv_close(kv_context *context)
{
int ret=0;
......
......@@ -134,7 +134,29 @@ DEFINE_MARGO_RPC_HANDLER(bench_handler)
#include <bwtree.h>
#include <vector>
wangziqi2013::bwtree::BwTree<int, int> TREE;
typedef std::vector<char> vblob_t; // how ParSplice manages its "blobs" (i.e. the Rd type)
struct vblob_hash {
size_t operator()(const vblob_t& vblob) {
size_t hash = 0;
boost::hash_range(hash, vblob_t.begin(), vblob_t.end());
return hash;
}
};
struct vblob_equal_to {
bool operator()(const vblob_t& vb1, const vblob_t& vb2) {
return vblob_hash(vb1) == vblob_hash(vb2);
}
}
wangziqi2013::bwtree::BwTree<uint64_t, vblob_t,
std::less<uint64_t>,
std::equal_to<uint64_t>,
std::hash<uint64_t>,
vblob_equal_to,
vblob_hash> *TREE;
static hg_return_t open_handler(hg_handle_t h)
{
......@@ -144,9 +166,15 @@ static hg_return_t open_handler(hg_handle_t h)
ret = margo_get_input(h, &in);
TREE.SetDebugLogging(0);
TREE.UpdateThreadLocal(1);
TREE.AssignGCID(0);
TREE = new(wangziqi2013::bwtree::BwTree<uint64_t, vblob_t,
std::less<uint64_t>,
std::equal_to<uint64_t>,
std::hash<uint64_t>,
vblob_equal_to,
vblob_hash>);
TREE->SetDebugLogging(0);
TREE->UpdateThreadLocal(1);
TREE->AssignGCID(0);
/* TODO: something with in.keytype and in.valtype. In C I would get
* away with sloppy casting. Not sure how to do the same with a C++
......@@ -197,7 +225,7 @@ static hg_return_t put_handler(hg_handle_t h)
ret = HG_Get_input(h, &in);
TREE.Insert(in.key, in.value);
TREE->Insert(in.key, in.value);
assert(ret == HG_SUCCESS);
ret = HG_Respond(h, NULL, NULL, &out);
......@@ -209,6 +237,48 @@ static hg_return_t put_handler(hg_handle_t h)
}
DEFINE_MARGO_RPC_HANDLER(put_handler)
static hg_return_t bulk_put_handler(hg_handle_t h)
{
hg_return_t ret;
bulk_put_in_t bpin;
bulk_put_out_t bpret;
vblob_t *vblob;
hg_bulk_t bulk_handle;
const struct hg_info *hgi;
margo_instance_id mid;
ret = HG_Get_input(h, &bpin);
printf("SERVER: BULK PUT key = %d size = %d\n", bpin.key, bpin.size);
/* get handle info and margo instance */
hgi = margo_get_info(h);
assert(hgi);
mid = margo_hg_info_get_instance(hgi);
assert(mid != MARGO_INSTANCE_NULL);
vblob = new vblob_t[bpin.size];
ret = margo_bulk_create(mid, 1, &((void*)vblob), &bpin.size, HG_BULK_WRITE_ONLY, &bulk_handle);
assert(ret == HG_SUCCESS);
ret = margo_bulk_transfer(mid, HG_BULK_PULL, hgi->addr, bpin.bulk_handle, 0, bulk_handle, 0, bpin.size);
assert(ret == HG_SUCCESS);
TREE->Insert(bpin.key, &(*vblob)); // handling vblob correctly?
assert(ret == HG_SUCCESS);
bpret = ret;
ret = HG_Respond(h, NULL, NULL, &bpret);
assert(ret == HG_SUCCESS);
HG_Free_input(h, &bpin);
margo_bulk_free(bulk_handle);
HG_Destroy(h);
delete(vblob);
return HG_SUCCESS;
}
DEFINE_MARGO_RPC_HANDLER(bulk_put_handler)
static hg_return_t get_handler(hg_handle_t h)
{
hg_return_t ret;
......@@ -221,7 +291,7 @@ static hg_return_t get_handler(hg_handle_t h)
/*void GetValue (const KeyType &search_key, std::vector< ValueType > &value_list) */
std::vector<int> value;
TREE.GetValue(in.key, value);
TREE->GetValue(in.key, value);
if (value.size() >= 1) {
out.value = value.front();
......@@ -239,7 +309,34 @@ static hg_return_t get_handler(hg_handle_t h)
}
DEFINE_MARGO_RPC_HANDLER(get_handler)
static hg_return_t bulk_get_handler(hg_handle_t h)
{
hg_return_t ret;
get_in_t in;
get_out_t out;
ret = HG_Get_input(h, &in);
assert(ret == HG_SUCCESS);
/*void GetValue (const KeyType &search_key, std::vector< ValueType > &value_list) */
std::vector<int> value;
TREE->GetValue(in.key, value);
if (value.size() >= 1) {
printf("SERVER: GET: key=%d, value=%d\n",
in.key, value.front());
}
out.value = value.front();
ret = HG_Respond(h, NULL, NULL, &out);
assert(ret == HG_SUCCESS);
HG_Free_input(h, &in);
HG_Destroy(h);
return HG_SUCCESS;
}
DEFINE_MARGO_RPC_HANDLER(bulk_get_handler)
/*
* from BwTree tests:
......@@ -404,5 +501,6 @@ int kv_server_deregister(kv_context *context) {
margo_wait_for_finalize(context->mid);
margo_finalize(context->mid);
free(context);
delete(TREE);
return 0;
}
......@@ -35,7 +35,9 @@ typedef struct kv_context_s {
hg_id_t close_id;
hg_id_t bench_id;
hg_handle_t put_handle;
hg_handle_t bulk_put_handle;
hg_handle_t get_handle;
hg_handle_t bulk_get_handle;
hg_handle_t bench_handle;
/* some keyval dodad goes here so the server can discriminate. Seems
* like it should be some universal identifier we can share with other
......@@ -95,10 +97,25 @@ static inline hg_return_t hg_proc_bench_result( hg_proc_t proc, bench_result *re
}
DECLARE_MARGO_RPC_HANDLER(bench_handler)
MERCURY_GEN_PROC(bench_out_t, ((bench_result)(result)) )
kv_context *kv_client_register(int argc, char **argv);
// for handling bulk puts/gets (e.g. for ParSplice use case)
MERCURY_GEN_PROC(bulk_put_in_t,
((uint64_t)(key)) \
((uint64_t)(size)) \
((hg_bulk_t)(bulk_handle)) )
MERCURY_GEN_PROC(bulk_put_out_t, ((int32_t)(ret)))
DECLARE_MARGO_RPC_HANDLER(bulk_put_handler)
MERCURY_GEN_PROC(bulk_get_in_t,
((uint64_t)(key)) \
((uint64_t)(size)) \
((hg_bulk_t)(bulk_handle)) )
MERCURY_GEN_PROC(bulk_get_out_t, ((int32_t)(ret)))
DECLARE_MARGO_RPC_HANDLER(bulk_get_handler)
kv_context *kv_client_register(char *addr_str=0);
kv_context * kv_server_register(margo_instance_id mid);
/* both the same: should probably move to common */
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment