Commit a5c19eb5 authored by David Rich's avatar David Rich Committed by Rob Latham
Browse files

Bug in SDSKV client code caused puts for the same key to make it to the...

Bug in SDSKV client code caused puts for the same key to make it to the kv-server. Add a possible options framework for handling duplicates, etc.
parent cbeb7eb3
...@@ -162,6 +162,9 @@ BwTree<uint64_t, std::vector<char>, ...@@ -162,6 +162,9 @@ BwTree<uint64_t, std::vector<char>,
const char *my_db = "minima_store"; const char *my_db = "minima_store";
// these should probably be passed in put/get calls
uint16_t db_options = KV_IGNOREDUPKEY;
static hg_return_t open_handler(hg_handle_t handle) static hg_return_t open_handler(hg_handle_t handle)
{ {
hg_return_t ret; hg_return_t ret;
...@@ -229,6 +232,7 @@ static hg_return_t put_handler(hg_handle_t handle) ...@@ -229,6 +232,7 @@ static hg_return_t put_handler(hg_handle_t handle)
std::vector<char> data; std::vector<char> data;
data.resize(sizeof(in.value)); data.resize(sizeof(in.value));
memcpy(data.data(), &in.value, sizeof(in.value)); memcpy(data.data(), &in.value, sizeof(in.value));
// TODO: check return value here (see bulk_put_handler)
TREE->Insert(in.key, data); TREE->Insert(in.key, data);
ret = margo_respond(handle, &out); ret = margo_respond(handle, &out);
...@@ -268,15 +272,36 @@ static hg_return_t bulk_put_handler(hg_handle_t handle) ...@@ -268,15 +272,36 @@ static hg_return_t bulk_put_handler(hg_handle_t handle)
ret = margo_bulk_transfer(mid, HG_BULK_PULL, hgi->addr, bpin.bulk_handle, 0, bulk_handle, 0, bpin.size); ret = margo_bulk_transfer(mid, HG_BULK_PULL, hgi->addr, bpin.bulk_handle, 0, bulk_handle, 0, bpin.size);
assert(ret == HG_SUCCESS); assert(ret == HG_SUCCESS);
if (TREE->Insert(bpin.key, data)) { std::vector<std::vector<char>> values;
printf("SERVER: TREE Insert succeeded for key = %lu\n", bpin.key); TREE->GetValue(bpin.key, values);
bpout.ret = HG_SUCCESS; bool duplicate_key = (values.size() != 0);
// only option currently implemented is to ignore duplicate Insert attempts
if (duplicate_key) {
if ((db_options & KV_IGNOREDUPKEY) == KV_IGNOREDUPKEY) {
printf("SERVER: TREE ignoring duplicate Insert attempt for key = %lu\n", bpin.key);
bpout.ret = HG_SUCCESS;
}
else if ((db_options & KV_ALLOWDUPKEY) == KV_ALLOWDUPKEY) {
printf("SERVER: TREE duplicate Insert support not implemented, key = %lu\n", bpin.key);
bpout.ret = HG_OTHER_ERROR;
}
else {
printf("SERVER: TREE unhandled duplicate Insert attempt for key = %lu\n", bpin.key);
bpout.ret = HG_OTHER_ERROR;
}
} }
else { else {
// BwTree Insert returns False if the key-value pair already if (TREE->Insert(bpin.key, data)) {
// exists in the DB. printf("SERVER: TREE Insert succeeded for key = %lu\n", bpin.key);
printf("SERVER: TREE Insert failed for key = %lu\n", bpin.key); bpout.ret = HG_SUCCESS;
bpout.ret = HG_OTHER_ERROR; }
else {
// BwTree Insert returns False if the key-value pair already
// exists in the DB.
printf("SERVER: TREE unexpected duplicate Insert failed for key = %lu\n", bpin.key);
bpout.ret = HG_OTHER_ERROR; // shouldn't get here, but...
}
} }
bpout.ret = ret; bpout.ret = ret;
...@@ -601,14 +626,14 @@ kv_context *kv_server_register(margo_instance_id mid); ...@@ -601,14 +626,14 @@ kv_context *kv_server_register(margo_instance_id mid);
context->put_id = MARGO_REGISTER(context->mid, "put", context->put_id = MARGO_REGISTER(context->mid, "put",
put_in_t, put_out_t, put_handler); put_in_t, put_out_t, put_handler);
context->put_id = MARGO_REGISTER(context->mid, "bulk_put", context->bulk_put_id = MARGO_REGISTER(context->mid, "bulk_put",
bulk_put_in_t, bulk_put_out_t, bulk_put_handler); bulk_put_in_t, bulk_put_out_t, bulk_put_handler);
context->get_id = MARGO_REGISTER(context->mid, "get", context->get_id = MARGO_REGISTER(context->mid, "get",
get_in_t, get_out_t, get_handler); get_in_t, get_out_t, get_handler);
context->get_id = MARGO_REGISTER(context->mid, "bulk_get", context->bulk_get_id = MARGO_REGISTER(context->mid, "bulk_get",
bulk_get_in_t, bulk_get_out_t, bulk_get_handler); bulk_get_in_t, bulk_get_out_t, bulk_get_handler);
context->bench_id = MARGO_REGISTER(context->mid, "bench", context->bench_id = MARGO_REGISTER(context->mid, "bench",
bench_in_t, bench_out_t, bench_handler); bench_in_t, bench_out_t, bench_handler);
......
...@@ -23,9 +23,15 @@ typedef enum { ...@@ -23,9 +23,15 @@ typedef enum {
KV_FLOAT, KV_FLOAT,
KV_DOUBLE, KV_DOUBLE,
KV_STRING, KV_STRING,
KV_BULK KV_BULK,
} kv_type; } kv_type;
typedef enum {
KV_ALLOWDUPKEY = 0x0001,
KV_IGNOREDUPKEY = 0x0002,
KV_ERASEONGETKEY = 0x0004,
} kv_options;
/* do we need one for server, one for client? */ /* do we need one for server, one for client? */
typedef struct kv_context_s { typedef struct kv_context_s {
margo_instance_id mid; margo_instance_id mid;
...@@ -71,17 +77,18 @@ MERCURY_GEN_PROC(get_out_t, ...@@ -71,17 +77,18 @@ MERCURY_GEN_PROC(get_out_t,
DECLARE_MARGO_RPC_HANDLER(get_handler) DECLARE_MARGO_RPC_HANDLER(get_handler)
MERCURY_GEN_PROC(open_in_t, MERCURY_GEN_PROC(open_in_t,
((hg_string_t)(name))\ ((hg_string_t)(name)) \
((int32_t) (keytype))\ ((uint32_t) (keytype)) \
((int32_t) (valtype)) ) ((uint32_t) (valtype)))
MERCURY_GEN_PROC(open_out_t, ((int32_t)(ret))) MERCURY_GEN_PROC(open_out_t, ((int32_t)(ret)))
KV_ALLOWDUP=0x010,
DECLARE_MARGO_RPC_HANDLER(open_handler) DECLARE_MARGO_RPC_HANDLER(open_handler)
MERCURY_GEN_PROC(close_in_t, MERCURY_GEN_PROC(close_in_t,
((int32_t)(x))\ ((int32_t)(x)) \
((int32_t)(y)) ) ((int32_t)(y)) )
MERCURY_GEN_PROC(close_out_t, ((int32_t)(ret)) ) MERCURY_GEN_PROC(close_out_t, ((int32_t)(ret)) )
DECLARE_MARGO_RPC_HANDLER(close_handler) DECLARE_MARGO_RPC_HANDLER(close_handler)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment