Commit 9d859fdd authored by Rob Latham's avatar Rob Latham

mercury processing routines for list in/out

parent 82f90b24
......@@ -85,6 +85,9 @@ static inline hg_return_t hg_proc_hg_return_t(hg_proc_t proc, void *data)
return hg_proc_hg_int32_t(proc, data);
}
/* the put_in, get_in, put_out, get_out, list_in, list_out code is repetitive. candidate for a template? */
static inline hg_return_t hg_proc_kv_put_in_t(hg_proc_t proc, void *data)
{
hg_return_t ret;
......@@ -196,6 +199,92 @@ static inline hg_return_t hg_proc_kv_get_out_t(hg_proc_t proc, void *data)
return HG_SUCCESS;
}
typedef struct {
kv_data_t start_key;
hg_size_t start_ksize;
hg_size_t max_keys;
} kv_list_in_t;
typedef struct {
hg_size_t nkeys;
kv_data_t *keys;
hg_size_t *ksizes;
hg_return_t ret;
} kv_list_out_t;
static inline hg_return_t hg_proc_kv_list_in_t(hg_proc_t proc, void *data)
{
hg_return_t ret;
kv_list_in_t *in = (kv_list_in_t*)data;
ret = hg_proc_hg_size_t(proc, &in->start_ksize);
assert(ret == HG_SUCCESS);
if (in->start_ksize) {
switch(hg_proc_get_op(proc)) {
case HG_ENCODE:
ret = hg_proc_raw(proc, in->start_key, in->start_ksize);
assert(ret == HG_SUCCESS);
break;
case HG_DECODE:
in->start_key = (kv_data_t)malloc(in->start_ksize);
ret = hg_proc_raw(proc, in->start_key, in->start_ksize);
assert(ret == HG_SUCCESS);
break;
case HG_FREE:
free(in->start_key);
default:
break;
}
}
ret = hg_proc_hg_size_t(proc, &in->max_keys);
assert(ret == HG_SUCCESS);
return ret;
}
static inline hg_return_t hg_proc_kv_list_out_t(hg_proc_t proc, void *data)
{
hg_return_t ret;
int i;
kv_list_out_t *out = (kv_list_out_t*)data;
/* typedef struct {
hg_size_t nkeys;
kv_data_t *keys;
hg_size_t *ksizes;
} kv_list_out_t; */
ret = hg_proc_hg_size_t(proc, &out->nkeys);
assert (ret == HG_SUCCESS);
if (out->nkeys) {
switch(hg_proc_get_op(proc)) {
case HG_ENCODE:
for (i=0; i<out->nkeys; i++) {
ret = hg_proc_raw(proc, out->keys[i], out->ksizes[i]);
assert(ret == HG_SUCCESS);
}
break;
case HG_DECODE:
for (i=0; i<out->nkeys; i++) {
out->keys[i] = (kv_data_t)malloc(out->ksizes[i]);
ret = hg_proc_raw(proc, out->keys[i], out->ksizes[i]);
assert(ret == HG_SUCCESS);
}
break;
case HG_FREE:
for (i=0; i<out->nkeys; i++) {
free(out->keys[i]);
}
break;
default:
break;
}
}
ret = hg_proc_hg_return_t(proc, &out->ret);
assert (ret == HG_SUCCESS);
return ret;
}
MERCURY_GEN_PROC(put_in_t, ((kv_put_in_t)(pi)))
MERCURY_GEN_PROC(put_out_t, ((hg_return_t)(ret)))
DECLARE_MARGO_RPC_HANDLER(put_handler)
......@@ -253,6 +342,10 @@ static inline hg_return_t hg_proc_kv_bulk_t(hg_proc_t proc, void *data)
return HG_SUCCESS;
}
MERCURY_GEN_PROC(list_in_t, ((kv_list_in_t)(list_in)))
MERCURY_GEN_PROC(list_out_t, ((kv_list_out_t)(list_out)))
MERCURY_GEN_PROC(bulk_put_in_t, ((kv_bulk_t)(bulk)))
MERCURY_GEN_PROC(bulk_put_out_t, ((hg_return_t)(ret)))
DECLARE_MARGO_RPC_HANDLER(bulk_put_handler)
......@@ -263,11 +356,6 @@ DECLARE_MARGO_RPC_HANDLER(bulk_get_handler)
DECLARE_MARGO_RPC_HANDLER(shutdown_handler)
MERCURY_GEN_PROC(list_in_t, ((hg_size_t)(start))
((hg_size_t) (max_keys)) )
MERCURY_GEN_PROC(list_out_t, ((hg_size_t)(nkeys))
((hg_bulk_t)(bulk_keys))
((hg_bulk_t)(bulk_sizes)) )
DECLARE_MARGO_RPC_HANDLER(list_keys_handler)
// some setup to support simple benchmarking
......
......@@ -285,13 +285,20 @@ hg_return_t kv_list_keys(kv_database_t *db,
list_in_t list_in;
list_out_t list_out;
int ret = HG_SUCCESS;
int i;
list_in.list_in.start_key = (kv_data_t) start_key;
list_in.list_in.start_ksize = start_ksize;
list_in.list_in.max_keys = *max_keys;
list_in.start = (hg_size_t) start_key;
list_in.max_keys = *max_keys;
ret = margo_forward(db->list_handle, &list_in);
/* TODO: unpack list_out into something client can use */
ret = margo_get_output(db->list_handle, &list_out);
*max_keys = list_out.list_out.nkeys;
for (i=0; i<list_out.list_out.nkeys; i++) {
ksizes[i] = list_out.list_out.ksizes[i];
memcpy(keys[i], list_out.list_out.keys[i], list_out.list_out.ksizes[i]);
}
margo_free_output(db->list_handle, &list_out);
return ret;
......
......@@ -359,8 +359,9 @@ static hg_return_t list_handler(hg_handle_t handle)
std::vector<char> start{};
margo_get_input(handle, &list_in);
auto keys = datastore->list(start, list_in.max_keys);
std::cout << "max_keys: " << list_in.list_in.max_keys;
auto keys = datastore->list(start, list_in.list_in.max_keys);
std::cout << " found " << start.size() << " keys" << std::endl;
for (auto i: *keys) {
std::cout << "as string" << std::string(i.data()) << " ";
std::cout << "as int" << *(int *)(i.data()) << " ";
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment