Commit 2813e7e9 authored by Matthieu Dorier's avatar Matthieu Dorier
Browse files

list_keys now uses bulk

parent db9687a7
......@@ -544,9 +544,11 @@ int sdskv_list_keys(sdskv_provider_handle_t provider,
{
list_keys_in_t in;
list_keys_out_t out;
hg_return_t hret = HG_SUCCESS;
hg_handle_t handle;
int ret;
in.keys_bulk_handle = HG_BULK_NULL;
in.ksizes_bulk_handle = HG_BULK_NULL;
hg_return_t hret = HG_SUCCESS;
hg_handle_t handle = HG_HANDLE_NULL;
int ret = 0;
int i;
in.db_id = db_id;
......@@ -554,40 +556,69 @@ int sdskv_list_keys(sdskv_provider_handle_t provider,
in.start_ksize = start_ksize;
in.max_keys = *max_keys;
/* create bulk handle to expose the segments with key sizes */
hg_size_t ksize_bulk_size = (*max_keys)*sizeof(*ksizes);
void* buf_ptr[1] = { ksizes };
hret = margo_bulk_create(provider->client->mid,
1, buf_ptr, &ksize_bulk_size,
HG_BULK_READWRITE,
&in.ksizes_bulk_handle);
if(hret != HG_SUCCESS) {
ret = -1;
goto finish;
}
/* create bulk handle to expose where the keys should be placed */
hret = margo_bulk_create(provider->client->mid,
*max_keys, keys, ksizes,
HG_BULK_WRITE_ONLY,
&in.keys_bulk_handle);
if(hret != HG_SUCCESS) {
ret = -1;
goto finish;
}
/* create handle */
hret = margo_create(
provider->client->mid,
provider->addr,
provider->client->sdskv_list_keys_id,
&handle);
if(hret != HG_SUCCESS) return -1;
if(hret != HG_SUCCESS) {
ret = -1;
goto finish;
}
/* set target id */
hret = margo_set_target_id(handle, provider->mplex_id);
if(hret != HG_SUCCESS) {
margo_destroy(handle);
return -1;
ret = -1;
goto finish;
}
/* forward to provider */
hret = margo_forward(handle, &in);
if(hret != HG_SUCCESS) {
margo_destroy(handle);
return -1;
ret = -1;
goto finish;
}
/* get the output from provider */
hret = margo_get_output(handle, &out);
if(hret != HG_SUCCESS) {
margo_destroy(handle);
return -1;
ret = -1;
goto finish;
}
/* set return values */
*max_keys = out.nkeys;
ret = out.ret;
for (i=0; i < out.nkeys; i++) {
ksizes[i] = out.ksizes[i];
memcpy(keys[i], out.keys[i], out.ksizes[i]);
}
finish:
/* free everything we created */
margo_bulk_free(in.ksizes_bulk_handle);
margo_bulk_free(in.keys_bulk_handle);
margo_free_output(handle, &out);
margo_destroy(handle);
......
......@@ -15,32 +15,23 @@
// setup to support opaque type handling
typedef char* kv_data_t;
typedef struct {
uint64_t db_id;
kv_data_t key;
hg_size_t ksize;
kv_data_t value;
hg_size_t vsize;
} put_in_t;
// ------------- OPEN ------------- //
MERCURY_GEN_PROC(open_in_t,
((hg_string_t)(name)))
MERCURY_GEN_PROC(open_out_t, ((uint64_t)(db_id)) ((int32_t)(ret)))
// ------------- PUT ------------- //
typedef struct {
uint64_t db_id;
kv_data_t key;
hg_size_t ksize;
hg_size_t vsize;
} get_in_t;
typedef struct {
kv_data_t value;
hg_size_t vsize;
int32_t ret;
} get_out_t;
} put_in_t;
typedef struct {
uint64_t db_id;
kv_data_t key;
hg_size_t ksize;
} length_in_t;
MERCURY_GEN_PROC(put_out_t, ((int32_t)(ret)))
static inline hg_return_t hg_proc_put_in_t(hg_proc_t proc, void *data)
{
......@@ -94,10 +85,25 @@ static inline hg_return_t hg_proc_put_in_t(hg_proc_t proc, void *data)
return HG_SUCCESS;
}
static inline hg_return_t hg_proc_length_in_t(hg_proc_t proc, void *data)
// ------------- GET ------------- //
typedef struct {
uint64_t db_id;
kv_data_t key;
hg_size_t ksize;
hg_size_t vsize;
} get_in_t;
typedef struct {
kv_data_t value;
hg_size_t vsize;
int32_t ret;
} get_out_t;
static inline hg_return_t hg_proc_get_in_t(hg_proc_t proc, void *data)
{
hg_return_t ret;
length_in_t *in = (length_in_t*)data;
get_in_t *in = (get_in_t*)data;
ret = hg_proc_uint64_t(proc, &in->db_id);
if(ret != HG_SUCCESS) return ret;
......@@ -122,74 +128,86 @@ static inline hg_return_t hg_proc_length_in_t(hg_proc_t proc, void *data)
break;
}
}
ret = hg_proc_hg_size_t(proc, &in->vsize);
if(ret != HG_SUCCESS) return ret;
return HG_SUCCESS;
}
static inline hg_return_t hg_proc_get_in_t(hg_proc_t proc, void *data)
static inline hg_return_t hg_proc_get_out_t(hg_proc_t proc, void *data)
{
hg_return_t ret;
get_in_t *in = (get_in_t*)data;
ret = hg_proc_uint64_t(proc, &in->db_id);
if(ret != HG_SUCCESS) return ret;
get_out_t *out = (get_out_t*)data;
ret = hg_proc_hg_size_t(proc, &in->ksize);
ret = hg_proc_hg_size_t(proc, &out->vsize);
if(ret != HG_SUCCESS) return ret;
if (in->ksize) {
if (out->vsize) {
switch (hg_proc_get_op(proc)) {
case HG_ENCODE:
ret = hg_proc_raw(proc, in->key, in->ksize);
ret = hg_proc_raw(proc, out->value, out->vsize);
if(ret != HG_SUCCESS) return ret;
break;
case HG_DECODE:
in->key = (kv_data_t)malloc(in->ksize);
ret = hg_proc_raw(proc, in->key, in->ksize);
out->value = (kv_data_t)malloc(out->vsize);
ret = hg_proc_raw(proc, out->value, out->vsize);
if(ret != HG_SUCCESS) return ret;
break;
case HG_FREE:
free(in->key);
free(out->value);
break;
default:
break;
}
}
ret = hg_proc_hg_size_t(proc, &in->vsize);
ret = hg_proc_int32_t(proc, &out->ret);
if(ret != HG_SUCCESS) return ret;
return HG_SUCCESS;
}
static inline hg_return_t hg_proc_get_out_t(hg_proc_t proc, void *data)
// ------------- LENGTH ------------- //
typedef struct {
uint64_t db_id;
kv_data_t key;
hg_size_t ksize;
} length_in_t;
MERCURY_GEN_PROC(length_out_t, ((hg_size_t)(size)) ((int32_t)(ret)))
static inline hg_return_t hg_proc_length_in_t(hg_proc_t proc, void *data)
{
hg_return_t ret;
get_out_t *out = (get_out_t*)data;
length_in_t *in = (length_in_t*)data;
ret = hg_proc_hg_size_t(proc, &out->vsize);
ret = hg_proc_uint64_t(proc, &in->db_id);
if(ret != HG_SUCCESS) return ret;
if (out->vsize) {
ret = hg_proc_hg_size_t(proc, &in->ksize);
if(ret != HG_SUCCESS) return ret;
if (in->ksize) {
switch (hg_proc_get_op(proc)) {
case HG_ENCODE:
ret = hg_proc_raw(proc, out->value, out->vsize);
ret = hg_proc_raw(proc, in->key, in->ksize);
if(ret != HG_SUCCESS) return ret;
break;
case HG_DECODE:
out->value = (kv_data_t)malloc(out->vsize);
ret = hg_proc_raw(proc, out->value, out->vsize);
in->key = (kv_data_t)malloc(in->ksize);
ret = hg_proc_raw(proc, in->key, in->ksize);
if(ret != HG_SUCCESS) return ret;
break;
case HG_FREE:
free(out->value);
free(in->key);
break;
default:
break;
}
}
ret = hg_proc_int32_t(proc, &out->ret);
if(ret != HG_SUCCESS) return ret;
return HG_SUCCESS;
}
// ------------- ERASE ------------- //
typedef struct {
uint64_t db_id;
kv_data_t key;
......@@ -230,22 +248,18 @@ static inline hg_return_t hg_proc_erase_in_t(hg_proc_t proc, void *data)
MERCURY_GEN_PROC(erase_out_t, ((int32_t)(ret)))
// ------------- LIST KEYS ------------- //
typedef struct {
uint64_t db_id;
kv_data_t start_key;
hg_size_t start_ksize;
hg_size_t max_keys;
hg_bulk_t ksizes_bulk_handle;
hg_bulk_t keys_bulk_handle;
} list_keys_in_t;
typedef struct {
hg_size_t nkeys;
kv_data_t *keys;
hg_size_t *ksizes;
hg_size_t nvalues;
kv_data_t *values;
hg_size_t *vsizes;
int32_t ret;
} list_keys_out_t;
MERCURY_GEN_PROC(list_keys_out_t, ((hg_size_t)(nkeys)) ((int32_t)(ret)))
static inline hg_return_t hg_proc_list_keys_in_t(hg_proc_t proc, void *data)
{
......@@ -258,153 +272,31 @@ static inline hg_return_t hg_proc_list_keys_in_t(hg_proc_t proc, void *data)
ret = hg_proc_hg_size_t(proc, &in->start_ksize);
if(ret != HG_SUCCESS) return ret;
if (in->start_ksize) {
switch(hg_proc_get_op(proc)) {
case HG_ENCODE:
ret = hg_proc_raw(proc, in->start_key, in->start_ksize);
if(ret != HG_SUCCESS) return ret;
break;
case HG_DECODE:
in->start_key = (kv_data_t)malloc(in->start_ksize);
ret = hg_proc_raw(proc, in->start_key, in->start_ksize);
if(ret != HG_SUCCESS) return ret;
break;
case HG_FREE:
free(in->start_key);
default:
break;
}
switch(hg_proc_get_op(proc)) {
case HG_ENCODE:
ret = hg_proc_raw(proc, in->start_key, in->start_ksize);
if(ret != HG_SUCCESS) return ret;
break;
case HG_DECODE:
in->start_key = (kv_data_t)malloc(in->start_ksize);
ret = hg_proc_raw(proc, in->start_key, in->start_ksize);
if(ret != HG_SUCCESS) return ret;
break;
case HG_FREE:
free(in->start_key);
default:
break;
}
}
ret = hg_proc_hg_size_t(proc, &in->max_keys);
return ret;
}
static inline hg_return_t hg_proc_list_keys_out_t(hg_proc_t proc, void *data)
{
hg_return_t ret;
unsigned int i;
list_keys_out_t *out = (list_keys_out_t*)data;
/* encode/decode the number of keys */
ret = hg_proc_hg_size_t(proc, &out->nkeys);
if(ret != HG_SUCCESS) return ret;
/* encode/decode the number of values */
ret = hg_proc_hg_size_t(proc, &out->nvalues);
ret = hg_proc_hg_bulk_t(proc, &in->ksizes_bulk_handle);
if(ret != HG_SUCCESS) return ret;
switch(hg_proc_get_op(proc)) {
case HG_ENCODE:
/* encode the size of each key */
for (i=0; i<out->nkeys; i++) {
ret = hg_proc_hg_size_t(proc, &(out->ksizes[i]));
if(ret != HG_SUCCESS) return ret;
}
/* encode each key */
for (i=0; i<out->nkeys; i++) {
ret = hg_proc_raw(proc, out->keys[i], out->ksizes[i]);
if(ret != HG_SUCCESS) return ret;
}
break;
case HG_DECODE:
if(out->nkeys) {
/* decode the size of each key */
out->ksizes = (hg_size_t*)malloc(out->nkeys*sizeof(*out->ksizes));
for (i=0; i<out->nkeys; i++) {
ret = hg_proc_hg_size_t(proc, &(out->ksizes[i]));
if(ret != HG_SUCCESS) return ret;
}
/* decode each key */
out->keys = (kv_data_t *)malloc(out->nkeys*sizeof(kv_data_t));
for (i=0; i<out->nkeys; i++) {
if(out->ksizes[i] == 0) {
out->keys[i] = NULL;
continue;
}
out->keys[i] = (kv_data_t)malloc(out->ksizes[i]);
ret = hg_proc_raw(proc, out->keys[i], out->ksizes[i]);
if(ret != HG_SUCCESS) return ret;
}
} else {
out->ksizes = NULL;
out->keys = NULL;
}
break;
case HG_FREE:
for (i=0; i<out->nkeys; i++) {
free(out->keys[i]);
}
free(out->keys);
free(out->ksizes);
break;
default:
break;
}
switch(hg_proc_get_op(proc)) {
case HG_ENCODE:
/* encode the size of values, if present */
for(i=0; i <out->nvalues; i++) {
ret = hg_proc_hg_size_t(proc, &(out->vsizes[i]));
if(ret != HG_SUCCESS) return ret;
}
/* encode the values, if present */
for(i=0; i < out->nvalues; i++) {
ret = hg_proc_raw(proc, out->values[i], out->vsizes[i]);
if(ret != HG_SUCCESS) return ret;
}
break;
case HG_DECODE:
if(out->nvalues != 0) {
/* decode the size of each value */
out->vsizes = (hg_size_t*)malloc(out->nvalues*sizeof(*out->vsizes));
for( i=0; i < out->nvalues; i++) {
ret = hg_proc_hg_size_t(proc, &(out->vsizes[i]));
if(ret != HG_SUCCESS) return ret;
}
/* decode each value */
out->values = (kv_data_t *)malloc(out->nvalues*sizeof(kv_data_t));
for(i=0; i < out->nvalues; i++) {
if(out->vsizes[i] == 0) {
out->values[i] = NULL;
continue;
}
out->values[i] = (kv_data_t)malloc(out->vsizes[i]);
ret = hg_proc_raw(proc, out->values[i], out->vsizes[i]);
if(ret != HG_SUCCESS) return ret;
}
} else {
out->vsizes = NULL;
out->values = NULL;
}
break;
case HG_FREE:
for(i=0; i < out->nvalues; i++) {
free(out->values[i]);
}
free(out->values);
free(out->vsizes);
break;
default:
break;
}
/* encode/decode the return value */
ret = hg_proc_int32_t(proc, &out->ret);
ret = hg_proc_hg_bulk_t(proc, &in->keys_bulk_handle);
return ret;
}
MERCURY_GEN_PROC(put_out_t, ((int32_t)(ret)))
MERCURY_GEN_PROC(length_out_t, ((hg_size_t)(size)) ((int32_t)(ret)))
MERCURY_GEN_PROC(open_in_t,
((hg_string_t)(name)))
MERCURY_GEN_PROC(open_out_t, ((uint64_t)(db_id)) ((int32_t)(ret)))
// ------------- BULK ------------- //
// for handling bulk puts/gets (e.g. for ParSplice use case)
typedef struct {
......@@ -451,21 +343,14 @@ static inline hg_return_t hg_proc_kv_bulk_t(hg_proc_t proc, void *data)
return HG_SUCCESS;
}
// ------------- BULK PUT ------------- //
MERCURY_GEN_PROC(bulk_put_in_t, ((kv_bulk_t)(bulk)))
MERCURY_GEN_PROC(bulk_put_out_t, ((int32_t)(ret)))
// ------------- BULK GET ------------- //
MERCURY_GEN_PROC(bulk_get_in_t, ((kv_bulk_t)(bulk)))
MERCURY_GEN_PROC(bulk_get_out_t, ((hg_size_t)(size)) ((int32_t)(ret)))
static inline hg_return_t hg_proc_double(hg_proc_t proc, void *data)
{
hg_return_t ret;
hg_size_t size = sizeof(double);
ret = hg_proc_raw(proc, data, size);
assert(ret == HG_SUCCESS);
return HG_SUCCESS;
}
#endif
......@@ -621,59 +621,125 @@ static void sdskv_list_keys_ult(hg_handle_t handle)
hg_return_t hret;
list_keys_in_t in;
list_keys_out_t out;
hg_bulk_t ksizes_local_bulk = HG_BULK_NULL;
hg_bulk_t keys_local_bulk = HG_BULK_NULL;
out.ret = -1;
out.nkeys = 0;
out.ksizes = nullptr;
out.keys = nullptr;
out.nvalues = 0;
out.vsizes = nullptr;
out.values = nullptr;
/* get the provider handling this request */
margo_instance_id mid = margo_hg_handle_get_instance(handle);
assert(mid);
const struct hg_info* info = margo_get_info(handle);
sdskv_provider_t svr_ctx =
(sdskv_provider_t)margo_registered_data_mplex(mid, info->id, info->target_id);
if(!svr_ctx) {
fprintf(stderr, "Error: SDSKV could not find provider\n");
std::cerr << "Error: SDSKV list_keys could not find provider" << std::endl;
margo_respond(handle, &out);
margo_destroy(handle);
return;
}
/* get the input */
hret = margo_get_input(handle, &in);
if(hret != HG_SUCCESS) {
std::cerr << "Error: SDSKV list_keys could not get RPC input" << std::endl;
margo_respond(handle, &out);
margo_destroy(handle);
return;
}
auto it = svr_ctx->databases.find(in.db_id);
if(it == svr_ctx->databases.end()) {
margo_respond(handle, &out);
margo_free_input(handle, &in);
margo_destroy(handle);
return;
}
ds_bulk_t start_kdata(in.start_key, in.start_key+in.start_ksize);
auto keys = it->second->list_keys(start_kdata, in.max_keys);
out.nkeys = keys.size();
/* create the array of sizes */
std::vector<hg_size_t> sizes(out.nkeys);
for(unsigned i = 0; i < out.nkeys; i++) {
sizes[i] = keys[i].size();
}
out.ksizes = sizes.data();
/* create the packed data */
std::vector<kv_data_t> packed_keys(keys.size());
for(unsigned i = 0; i < out.nkeys; i++) {
packed_keys[i] = (char*)(keys[i].data());
try {
/* find the database targeted */
auto it = svr_ctx->databases.find(in.db_id);
if(it == svr_ctx->databases.end()) {
std::cerr << "Error: SDSKV list_keys could not get database with id " << in.db_id << std::endl;
throw -1;
}
auto db = it->second;
/* create a bulk handle to receive and send key sizes from client */
std::vector<hg_size_t> ksizes(in.max_keys);
std::vector<void*> ksizes_addr(1);
ksizes_addr[0] = (void*)ksizes.data();
hg_size_t ksizes_bulk_size = ksizes.size()*sizeof(hg_size_t);
hret = margo_bulk_create(mid, 1, ksizes_addr.data(),
&ksizes_bulk_size, HG_BULK_READWRITE, &ksizes_local_bulk);
if(hret != HG_SUCCESS) {
std::cerr << "Error: SDSKV list_keys could not create bulk handle (ksizes_local_bulk)" << std::endl;
throw -1;
}
/* receive the key sizes from the client */
hg_addr_t origin_addr = info->addr;
hret = margo_bulk_transfer(mid, HG_BULK_PULL, origin_addr,
in.ksizes_bulk_handle, 0, ksizes_local_bulk, 0, ksizes_bulk_size);
if(hret != HG_SUCCESS) {
std::cerr << "Error: SDSKV list_keys could not issue bulk transfer "
<< "(pull from in.ksizes_bulk_handle to ksizes_local_bulk)" << std::endl;
throw -1;
}
/* get the keys from the underlying database */
ds_bulk_t start_kdata(in.start_key, in.start_key+in.start_ksize);
auto keys = db->list_keys(start_kdata, in.max_keys);
hg_size_t num_keys = std::min(keys.size(), in.max_keys);
/* create the array of actual sizes */
std::vector<hg_size_t> true_ksizes(num_keys);
hg_size_t keys_bulk_size = 0;
for(unsigned i = 0; i < num_keys; i++) {
true_ksizes[i] = keys[i].size();
if(true_ksizes[i] > ksizes[i]) {
// this key has a size that exceeds the allocated size on client
throw -1;
} else {
ksizes[i] = true_ksizes[i];
keys_bulk_size += ksizes[i];
}