Commit caff79d8 authored by Matthieu Dorier's avatar Matthieu Dorier
Browse files

made list_out_t more generic

parent 3786283d
......@@ -69,7 +69,7 @@ include_HEADERS = include/sdskv-client.h \
include/sdskv-common.h
noinst_HEADERS = src/sds-keyval.h \
src/sds-leyval-group.h \
src/sds-keyval-group.h \
src/sdskv-rpc-types.h \
src/datastore/datastore.h \
src/datastore/map_datastore.h \
......
......@@ -53,7 +53,8 @@ int sdskv_erase(sdskv_provider_handle_t db,
sdskv_database_id_t db_id, const void *key,
hg_size_t ksize);
int sdskv_list_keys(sdskv_provider_handle_t provider,
int sdskv_list_keys(
sdskv_provider_handle_t provider,
sdskv_database_id_t db_id, // db instance
const void *start_key, // we want keys strictly after this start_key
hg_size_t start_ksize, // size of the start_key
......@@ -64,7 +65,8 @@ int sdskv_list_keys(sdskv_provider_handle_t provider,
// keys for each key
hg_size_t* max_keys); // maximum number of keys requested
int sdskv_list_keys_with_prefix(sdskv_provider_handle_t provider,
int sdskv_list_keys_with_prefix(
sdskv_provider_handle_t provider,
sdskv_database_id_t db_id, // db instance
const void *start_key, // we want keys strictly after this start_key
hg_size_t start_ksize, // size of the start_key
......@@ -77,6 +79,31 @@ int sdskv_list_keys_with_prefix(sdskv_provider_handle_t provider,
// keys for each key
hg_size_t* max_keys); // maximum number of keys requested
int sdskv_list_keyvals(
sdskv_provider_handle_t provider,
sdskv_database_id_t db_id,
const void *start_key,
hg_size_t start_ksize,
void **keys,
hg_size_t* ksizes,
void **values,
hg_size_t* vsizes,
hg_size_t* max_items);
int sdskv_list_keyvals_with_prefix(
sdskv_provider_handle_t provider,
sdskv_database_id_t db_id,
const void *start_key,
hg_size_t start_ksize,
const void *prefix,
hg_size_t prefix_size,
void **keys,
hg_size_t* ksizes,
void **values,
hg_size_t* vsizes,
hg_size_t* max_items);
/**
* Shuts down a remote SDSKV service (given an address).
* This will shutdown all the providers on the target address.
......
......@@ -13,7 +13,7 @@ struct sdskv_client {
hg_id_t sdskv_length_id;
hg_id_t sdskv_bulk_get_id;
hg_id_t sdskv_open_id;
hg_id_t sdskv_list_id;
hg_id_t sdskv_list_keys_id;
uint64_t num_provider_handles;
};
......@@ -43,7 +43,7 @@ static int sdskv_client_register(sdskv_client_t client, margo_instance_id mid)
margo_registered_name(mid, "sdskv_length_rpc", &client->sdskv_length_id, &flag);
margo_registered_name(mid, "sdskv_bulk_get_rpc", &client->sdskv_bulk_get_id, &flag);
margo_registered_name(mid, "sdskv_open_rpc", &client->sdskv_open_id, &flag);
margo_registered_name(mid, "sdskv_list_rpc", &client->sdskv_list_id, &flag);
margo_registered_name(mid, "sdskv_list_keys_rpc", &client->sdskv_list_keys_id, &flag);
} else {
......@@ -61,8 +61,8 @@ static int sdskv_client_register(sdskv_client_t client, margo_instance_id mid)
MARGO_REGISTER(mid, "sdskv_bulk_get_rpc", bulk_get_in_t, bulk_get_out_t, NULL);
client->sdskv_open_id =
MARGO_REGISTER(mid, "sdskv_open_rpc", open_in_t, open_out_t, NULL);
client->sdskv_list_id =
MARGO_REGISTER(mid, "sdskv_list_rpc", list_in_t, list_out_t, NULL);
client->sdskv_list_keys_id =
MARGO_REGISTER(mid, "sdskv_list_keys_rpc", list_in_t, list_out_t, NULL);
}
return 0;
......@@ -554,7 +554,7 @@ int sdskv_list_keys(sdskv_provider_handle_t provider,
hret = margo_create(
provider->client->mid,
provider->addr,
provider->client->sdskv_list_id,
provider->client->sdskv_list_keys_id,
&handle);
if(hret != HG_SUCCESS) return -1;
......
......@@ -241,6 +241,9 @@ typedef struct {
hg_size_t nkeys;
kv_data_t *keys;
hg_size_t *ksizes;
hg_size_t nvalues;
kv_data_t *values;
hg_size_t *vsizes;
int32_t ret;
} list_out_t;
......@@ -280,47 +283,109 @@ static inline hg_return_t hg_proc_list_out_t(hg_proc_t proc, void *data)
hg_return_t ret;
unsigned int i;
list_out_t *out = (list_out_t*)data;
/* encode/decode the number of keys */
ret = hg_proc_hg_size_t(proc, &out->nkeys);
if(ret != HG_SUCCESS) return ret;
/* encode/decode the number values */
ret = hg_proc_hg_size_t(proc, &out->nvalues);
if(ret != HG_SUCCESS) return ret;
if (out->nkeys) {
switch(hg_proc_get_op(proc)) {
switch(hg_proc_get_op(proc)) {
case HG_ENCODE:
for (i=0; i<out->nkeys; i++) {
ret = hg_proc_raw(proc, &(out->ksizes[i]),
sizeof(*(out->ksizes)) );
if(ret != HG_SUCCESS) return ret;
}
for (i=0; i<out->nkeys; i++) {
ret = hg_proc_raw(proc, out->keys[i], out->ksizes[i]);
if(ret != HG_SUCCESS) return ret;
}
break;
case HG_DECODE:
out->ksizes =
(hg_size_t*)malloc(out->nkeys*sizeof(*out->ksizes));
for (i=0; i<out->nkeys; i++) {
ret = hg_proc_raw(proc, &(out->ksizes[i]),
sizeof(*out->ksizes));
if(ret != HG_SUCCESS) return ret;
}
out->keys = (kv_data_t *)malloc(out->nkeys*sizeof(kv_data_t));
for (i=0; i<out->nkeys; i++) {
out->keys[i] = (kv_data_t)malloc(out->ksizes[i]);
ret = hg_proc_raw(proc, out->keys[i], out->ksizes[i]);
if(ret != HG_SUCCESS) return ret;
}
break;
/* encode the size of each key */
for (i=0; i<out->nkeys; i++) {
ret = hg_proc_hg_size_t(proc, &(out->ksizes[i]));
if(ret != HG_SUCCESS) return ret;
}
/* encode each key */
for (i=0; i<out->nkeys; i++) {
ret = hg_proc_raw(proc, out->keys[i], out->ksizes[i]);
if(ret != HG_SUCCESS) return ret;
}
/* encode the size of values, if present */
if(out->vsizes) {
for(i=0; i<out->nvalues; i++) {
ret = hg_proc_hg_size_t(proc, &(out->vsizes[i]));
if(ret != HG_SUCCESS) return ret;
}
}
/* encode the values, if present */
if(out->values) {
for(i=0; i<out->nvalues; i++) {
ret = hg_proc_raw(proc, out->values[i], out->vsizes[i]);
if(ret != HG_SUCCESS) return ret;
}
}
break;
case HG_DECODE:
if(out->nkeys) {
/* decode the size of each key */
out->ksizes = (hg_size_t*)malloc(out->nkeys*sizeof(*out->ksizes));
for (i=0; i<out->nkeys; i++) {
ret = hg_proc_hg_size_t(proc, &(out->ksizes[i]));
if(ret != HG_SUCCESS) return ret;
}
/* decode each key */
out->keys = (kv_data_t *)malloc(out->nkeys*sizeof(kv_data_t));
for (i=0; i<out->nkeys; i++) {
if(out->ksizes[i] == 0) {
out->keys[i] = NULL;
continue;
}
out->keys[i] = (kv_data_t)malloc(out->ksizes[i]);
ret = hg_proc_raw(proc, out->keys[i], out->ksizes[i]);
if(ret != HG_SUCCESS) return ret;
}
} else {
out->ksizes = NULL;
out->keys = NULL;
}
if(out->nvalues) {
/* decode the size of each value */
out->vsizes = (hg_size_t*)malloc(out->nvalues*sizeof(*out->vsizes));
for( i=0; i<out->nvalues; i++) {
ret = hg_proc_hg_size_t(proc, &(out->vsizes[i]));
if(ret != HG_SUCCESS) return ret;
}
/* decode each key */
out->values = (kv_data_t *)malloc(out->nvalues*sizeof(kv_data_t));
for(i=0; i<out->nvalues; i++) {
if(out->vsizes[i] == 0) {
out->values[i] = NULL;
continue;
}
out->values[i] = (kv_data_t)malloc(out->vsizes[i]);
ret = hg_proc_raw(proc, out->values[i], out->vsizes[i]);
if(ret != HG_SUCCESS) return ret;
}
} else {
out->vsizes = NULL;
out->values = NULL;
}
break;
case HG_FREE:
for (i=0; i<out->nkeys; i++) {
free(out->keys[i]);
}
free(out->keys);
free(out->ksizes);
break;
for (i=0; i<out->nkeys; i++) {
free(out->keys[i]);
}
for(i=0; i<out->nvalues; i++) {
free(out->values[i]);
}
free(out->keys);
free(out->ksizes);
free(out->values);
free(out->vsizes);
break;
default:
break;
}
break;
}
}
/* encode/decode the return value */
ret = hg_proc_int32_t(proc, &out->ret);
return ret;
}
......
......@@ -25,7 +25,7 @@ DECLARE_MARGO_RPC_HANDLER(sdskv_get_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_open_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_bulk_put_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_bulk_get_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_list_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_list_keys_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_erase_ult)
static void sdskv_server_finalize_cb(void *data);
......@@ -80,9 +80,9 @@ extern "C" int sdskv_provider_register(
open_in_t, open_out_t,
sdskv_open_ult, mplex_id, abt_pool);
margo_register_data_mplex(mid, rpc_id, mplex_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_MPLEX(mid, "sdskv_list_rpc",
rpc_id = MARGO_REGISTER_MPLEX(mid, "sdskv_list_keys_rpc",
list_in_t, list_out_t,
sdskv_list_ult, mplex_id, abt_pool);
sdskv_list_keys_ult, mplex_id, abt_pool);
margo_register_data_mplex(mid, rpc_id, mplex_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_MPLEX(mid, "sdskv_erase_rpc",
erase_in_t, erase_out_t,
......@@ -610,13 +610,21 @@ static void sdskv_erase_ult(hg_handle_t handle)
}
DEFINE_MARGO_RPC_HANDLER(sdskv_erase_ult)
static void sdskv_list_ult(hg_handle_t handle)
static void sdskv_list_keys_ult(hg_handle_t handle)
{
hg_return_t hret;
list_in_t in;
list_out_t out;
out.ret = -1;
out.nkeys = 0;
out.ksizes = nullptr;
out.keys = nullptr;
out.nvalues = 0;
out.vsizes = nullptr;
out.values = nullptr;
margo_instance_id mid = margo_hg_handle_get_instance(handle);
assert(mid);
const struct hg_info* info = margo_get_info(handle);
......@@ -624,10 +632,6 @@ static void sdskv_list_ult(hg_handle_t handle)
(sdskv_provider_t)margo_registered_data_mplex(mid, info->id, info->target_id);
if(!svr_ctx) {
fprintf(stderr, "Error: SDSKV could not find provider\n");
out.ret = -1;
out.nkeys = 0;
out.keys = nullptr;
out.ksizes = nullptr;
margo_respond(handle, &out);
margo_destroy(handle);
return;
......@@ -635,10 +639,6 @@ static void sdskv_list_ult(hg_handle_t handle)
hret = margo_get_input(handle, &in);
if(hret != HG_SUCCESS) {
out.ret = -1;
out.nkeys = 0;
out.keys = nullptr;
out.ksizes = nullptr;
margo_respond(handle, &out);
margo_destroy(handle);
return;
......@@ -646,10 +646,6 @@ static void sdskv_list_ult(hg_handle_t handle)
auto it = svr_ctx->databases.find(in.db_id);
if(it == svr_ctx->databases.end()) {
out.ret = -1;
out.nkeys = 0;
out.keys = nullptr;
out.ksizes = nullptr;
margo_respond(handle, &out);
margo_free_input(handle, &in);
margo_destroy(handle);
......@@ -679,7 +675,7 @@ static void sdskv_list_ult(hg_handle_t handle)
return;
}
DEFINE_MARGO_RPC_HANDLER(sdskv_list_ult)
DEFINE_MARGO_RPC_HANDLER(sdskv_list_keys_ult)
static void sdskv_server_finalize_cb(void *data)
{
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment