Commit fbb7e414 authored by Matthieu Dorier's avatar Matthieu Dorier

greatly simplified the RPC types

parent a7c8ccb6
......@@ -212,10 +212,10 @@ int sdskv_put(sdskv_provider_handle_t provider,
put_out_t out;
in.db_id = db_id;
in.key = (kv_data_t)key;
in.ksize = ksize;
in.value = (kv_data_t)value;
in.vsize = vsize;
in.key.data = (kv_ptr_t)key;
in.key.size = ksize;
in.value.data = (kv_ptr_t)value;
in.value.size = vsize;
/* create handle */
hret = margo_create(
......@@ -259,13 +259,13 @@ int sdskv_put(sdskv_provider_handle_t provider,
bulk_put_in_t in;
bulk_put_out_t out;
in.bulk.db_id = db_id;
in.bulk.key = (kv_data_t)key;
in.bulk.ksize = ksize;
in.bulk.vsize = vsize;
in.db_id = db_id;
in.key.data = (kv_ptr_t)key;
in.key.size = ksize;
in.vsize = vsize;
hret = margo_bulk_create(provider->client->mid, 1, (void**)(&value), &in.bulk.vsize,
HG_BULK_READ_ONLY, &in.bulk.handle);
hret = margo_bulk_create(provider->client->mid, 1, (void**)(&value), &in.vsize,
HG_BULK_READ_ONLY, &in.handle);
if(hret != HG_SUCCESS) {
fprintf(stderr,"[SDSKV] margo_bulk_create() failed in sdskv_put()\n");
return -1;
......@@ -279,7 +279,7 @@ int sdskv_put(sdskv_provider_handle_t provider,
&handle);
if(hret != HG_SUCCESS) {
fprintf(stderr,"[SDSKV] margo_create() failed in sdskv_put()\n");
margo_bulk_free(in.bulk.handle);
margo_bulk_free(in.handle);
return -1;
}
......@@ -287,7 +287,7 @@ int sdskv_put(sdskv_provider_handle_t provider,
if(hret != HG_SUCCESS) {
fprintf(stderr,"[SDSKV] margo_set_target_id() failed in sdskv_put()\n");
margo_bulk_free(in.bulk.handle);
margo_bulk_free(in.handle);
margo_destroy(handle);
return -1;
}
......@@ -295,7 +295,7 @@ int sdskv_put(sdskv_provider_handle_t provider,
hret = margo_forward(handle, &in);
if(hret != HG_SUCCESS) {
fprintf(stderr,"[SDSKV] margo_forward() failed in sdskv_put()\n");
margo_bulk_free(in.bulk.handle);
margo_bulk_free(in.handle);
margo_destroy(handle);
return -1;
}
......@@ -303,14 +303,14 @@ int sdskv_put(sdskv_provider_handle_t provider,
hret = margo_get_output(handle, &out);
if(hret != HG_SUCCESS) {
fprintf(stderr,"[SDSKV] margo_get_output() failed in sdskv_put()\n");
margo_bulk_free(in.bulk.handle);
margo_bulk_free(in.handle);
margo_destroy(handle);
return -1;
}
ret = out.ret;
margo_free_output(handle, &out);
margo_bulk_free(in.bulk.handle);
margo_bulk_free(in.handle);
}
margo_destroy(handle);
......@@ -337,8 +337,8 @@ int sdskv_get(sdskv_provider_handle_t provider,
get_out_t out;
in.db_id = db_id;
in.key = (kv_data_t)key;
in.ksize = ksize;
in.key.data = (kv_ptr_t)key;
in.key.size = ksize;
in.vsize = size;
/* create handle */
......@@ -369,10 +369,10 @@ int sdskv_get(sdskv_provider_handle_t provider,
}
ret = out.ret;
*vsize = (hg_size_t)out.vsize;
*vsize = (hg_size_t)out.value.size;
if (out.vsize > 0) {
memcpy(value, out.value, out.vsize);
if (out.value.size > 0) {
memcpy(value, out.value.data, out.value.size);
}
margo_free_output(handle, &out);
......@@ -382,13 +382,13 @@ int sdskv_get(sdskv_provider_handle_t provider,
bulk_get_in_t in;
bulk_get_out_t out;
in.bulk.db_id = db_id;
in.bulk.key = (kv_data_t)key;
in.bulk.ksize = ksize;
in.bulk.vsize = size;
in.db_id = db_id;
in.key.data = (kv_ptr_t)key;
in.key.size = ksize;
in.vsize = size;
hret = margo_bulk_create(provider->client->mid, 1, &value, &in.bulk.vsize,
HG_BULK_WRITE_ONLY, &in.bulk.handle);
hret = margo_bulk_create(provider->client->mid, 1, &value, &in.vsize,
HG_BULK_WRITE_ONLY, &in.handle);
if(hret != HG_SUCCESS) return -1;
/* create handle */
......@@ -398,27 +398,27 @@ int sdskv_get(sdskv_provider_handle_t provider,
provider->client->sdskv_bulk_get_id,
&handle);
if(hret != HG_SUCCESS) {
margo_bulk_free(in.bulk.handle);
margo_bulk_free(in.handle);
return -1;
}
hret = margo_set_target_id(handle, provider->mplex_id);
if(hret != HG_SUCCESS) {
margo_bulk_free(in.bulk.handle);
margo_bulk_free(in.handle);
margo_destroy(handle);
return -1;
}
hret = margo_forward(handle, &in);
if(hret != HG_SUCCESS) {
margo_bulk_free(in.bulk.handle);
margo_bulk_free(in.handle);
margo_destroy(handle);
return -1;
}
hret = margo_get_output(handle, &out);
if(hret != HG_SUCCESS) {
margo_bulk_free(in.bulk.handle);
margo_bulk_free(in.handle);
margo_destroy(handle);
return -1;
}
......@@ -427,7 +427,7 @@ int sdskv_get(sdskv_provider_handle_t provider,
*vsize = (hg_size_t)out.size;
margo_free_output(handle, &out);
margo_bulk_free(in.bulk.handle);
margo_bulk_free(in.handle);
}
margo_destroy(handle);
......@@ -446,8 +446,8 @@ int sdskv_length(sdskv_provider_handle_t provider,
length_out_t out;
in.db_id = db_id;
in.key = (kv_data_t)key;
in.ksize = ksize;
in.key.data = (kv_ptr_t)key;
in.key.size = ksize;
/* create handle */
hret = margo_create(
......@@ -495,8 +495,8 @@ int sdskv_erase(sdskv_provider_handle_t provider,
erase_out_t out;
in.db_id = db_id;
in.key = (kv_data_t)key;
in.ksize = ksize;
in.key.data = (kv_ptr_t)key;
in.key.size = ksize;
/* create handle */
hret = margo_create(
......@@ -552,8 +552,8 @@ int sdskv_list_keys(sdskv_provider_handle_t provider,
int i;
in.db_id = db_id;
in.start_key = (kv_data_t) start_key;
in.start_ksize = start_ksize;
in.start_key.data = (kv_ptr_t) start_key;
in.start_key.size = start_ksize;
in.max_keys = *max_keys;
/* create bulk handle to expose the segments with key sizes */
......@@ -653,8 +653,8 @@ int sdskv_list_keyvals(sdskv_provider_handle_t provider,
int i;
in.db_id = db_id;
in.start_key = (kv_data_t) start_key;
in.start_ksize = start_ksize;
in.start_key.data = (kv_ptr_t) start_key;
in.start_key.size = start_ksize;
in.max_keys = *max_keys;
/* create bulk handle to expose the segments with key sizes */
......@@ -748,83 +748,7 @@ finish:
return ret;
}
#if 0
int sdskv_list_keyvals(
sdskv_provider_handle_t provider,
sdskv_database_id_t db_id,
const void *start_key,
hg_size_t start_ksize,
void **keys,
hg_size_t* ksizes,
void** values,
hg_size_t* vsizes,
hg_size_t* max_keys)
{
list_in_t in;
list_out_t out;
hg_return_t hret = HG_SUCCESS;
hg_handle_t handle;
int ret;
int i;
in.db_id = db_id;
in.start_key = (kv_data_t) start_key;
in.start_ksize = start_ksize;
in.max_keys = *max_keys;
/* create handle */
hret = margo_create(
provider->client->mid,
provider->addr,
provider->client->sdskv_list_keyvals_id,
&handle);
if(hret != HG_SUCCESS) return -1;
hret = margo_set_target_id(handle, provider->mplex_id);
if(hret != HG_SUCCESS) {
margo_destroy(handle);
return -1;
}
hret = margo_forward(handle, &in);
if(hret != HG_SUCCESS) {
margo_destroy(handle);
return -1;
}
hret = margo_get_output(handle, &out);
if(hret != HG_SUCCESS) {
margo_destroy(handle);
return -1;
}
*max_keys = out.nkeys;
ret = out.ret;
if(ret == 0) {
hg_size_t s;
for (i=0; i < out.nkeys; i++) {
s = ksizes[i] > out.ksizes[i] ? out.ksizes[i] : ksizes[i];
ksizes[i] = s;
memcpy(keys[i], out.keys[i], s);
if(s < out.ksizes[i]) ret = -1; // truncated key
}
for(i=0; i< out.nvalues; i++) {
s = vsizes[i] > out.vsizes[i] ? out.vsizes[i] : vsizes[i];
vsizes[i] = s;
memcpy(values[i], out.values[i], s);
if(s < out.vsizes[i]) return -1; // truncated value
}
}
margo_free_output(handle, &out);
margo_destroy(handle);
return ret;
}
#endif
int sdskv_shutdown_service(sdskv_client_t client, hg_addr_t addr)
{
return margo_shutdown_remote_instance(client->mid, addr);
......
This diff is collapsed.
......@@ -206,8 +206,8 @@ static void sdskv_put_ult(hg_handle_t handle)
return;
}
ds_bulk_t kdata(in.key, in.key+in.ksize);
ds_bulk_t vdata(in.value, in.value+in.vsize);
ds_bulk_t kdata(in.key.data, in.key.data+in.key.size);
ds_bulk_t vdata(in.value.data, in.value.data+in.value.size);
if(it->second->put(kdata, vdata)) {
out.ret = 0;
......@@ -258,7 +258,7 @@ static void sdskv_length_ult(hg_handle_t handle)
return;
}
ds_bulk_t kdata(in.key, in.key+in.ksize);
ds_bulk_t kdata(in.key.data, in.key.data+in.key.size);
ds_bulk_t vdata;
if(it->second->get(kdata, vdata)) {
......@@ -300,8 +300,8 @@ static void sdskv_get_ult(hg_handle_t handle)
hret = margo_get_input(handle, &in);
if(hret != HG_SUCCESS) {
out.ret = -1;
out.value = nullptr;
out.vsize = 0;
out.value.data = nullptr;
out.value.size = 0;
margo_respond(handle, &out);
margo_destroy(handle);
return;
......@@ -310,30 +310,30 @@ static void sdskv_get_ult(hg_handle_t handle)
auto it = svr_ctx->databases.find(in.db_id);
if(it == svr_ctx->databases.end()) {
out.ret = -1;
out.value = nullptr;
out.vsize = 0;
out.value.data = nullptr;
out.value.size = 0;
margo_respond(handle, &out);
margo_free_input(handle, &in);
margo_destroy(handle);
return;
}
ds_bulk_t kdata(in.key, in.key+in.ksize);
ds_bulk_t kdata(in.key.data, in.key.data+in.key.size);
ds_bulk_t vdata;
if(it->second->get(kdata, vdata)) {
if(vdata.size() <= in.vsize) {
out.vsize = vdata.size();
out.value = vdata.data();
out.value.size = vdata.size();
out.value.data = vdata.data();
out.ret = 0;
} else {
out.vsize = 0;
out.value = nullptr;
out.value.size = 0;
out.value.data = nullptr;
out.ret = -1;
}
} else {
out.vsize = 0;
out.value = nullptr;
out.value.size = 0;
out.value.data = nullptr;
out.ret = -1;
}
......@@ -421,7 +421,7 @@ static void sdskv_bulk_put_ult(hg_handle_t handle)
return;
}
auto it = svr_ctx->databases.find(in.bulk.db_id);
auto it = svr_ctx->databases.find(in.db_id);
if(it == svr_ctx->databases.end()) {
out.ret = -1;
margo_respond(handle, &out);
......@@ -430,7 +430,7 @@ static void sdskv_bulk_put_ult(hg_handle_t handle)
return;
}
ds_bulk_t vdata(in.bulk.vsize);
ds_bulk_t vdata(in.vsize);
void *buffer = (void*)vdata.data();
hg_size_t size = vdata.size();
hret = margo_bulk_create(mid, 1, (void**)&buffer, &size,
......@@ -443,7 +443,7 @@ static void sdskv_bulk_put_ult(hg_handle_t handle)
return;
}
hret = margo_bulk_transfer(mid, HG_BULK_PULL, info->addr, in.bulk.handle, 0,
hret = margo_bulk_transfer(mid, HG_BULK_PULL, info->addr, in.handle, 0,
bulk_handle, 0, vdata.size());
if(hret != HG_SUCCESS) {
out.ret = -1;
......@@ -454,7 +454,7 @@ static void sdskv_bulk_put_ult(hg_handle_t handle)
return;
}
ds_bulk_t kdata(in.bulk.key, in.bulk.key+in.bulk.ksize);
ds_bulk_t kdata(in.key.data, in.key.data+in.key.size);
auto b = it->second->put(kdata, vdata);
......@@ -502,7 +502,7 @@ static void sdskv_bulk_get_ult(hg_handle_t handle)
return;
}
auto it = svr_ctx->databases.find(in.bulk.db_id);
auto it = svr_ctx->databases.find(in.db_id);
if(it == svr_ctx->databases.end()) {
out.ret = -1;
margo_respond(handle, &out);
......@@ -511,12 +511,12 @@ static void sdskv_bulk_get_ult(hg_handle_t handle)
return;
}
ds_bulk_t kdata(in.bulk.key, in.bulk.key+in.bulk.ksize);
ds_bulk_t kdata(in.key.data, in.key.data+in.key.size);
ds_bulk_t vdata;
auto b = it->second->get(kdata, vdata);
if(!b || vdata.size() > in.bulk.vsize) {
if(!b || vdata.size() > in.vsize) {
out.size = 0;
out.ret = -1;
margo_respond(handle, &out);
......@@ -538,7 +538,7 @@ static void sdskv_bulk_get_ult(hg_handle_t handle)
return;
}
hret = margo_bulk_transfer(mid, HG_BULK_PUSH, info->addr, in.bulk.handle, 0,
hret = margo_bulk_transfer(mid, HG_BULK_PUSH, info->addr, in.handle, 0,
bulk_handle, 0, vdata.size());
if(hret != HG_SUCCESS) {
out.size = 0;
......@@ -599,7 +599,7 @@ static void sdskv_erase_ult(hg_handle_t handle)
return;
}
ds_bulk_t kdata(in.key, in.key+in.ksize);
ds_bulk_t kdata(in.key.data, in.key.data+in.key.size);
if(it->second->erase(kdata)) {
out.ret = 0;
......@@ -686,7 +686,7 @@ static void sdskv_list_keys_ult(hg_handle_t handle)
std::vector<hg_size_t> remote_ksizes(ksizes.begin(), ksizes.end());
/* get the keys from the underlying database */
ds_bulk_t start_kdata(in.start_key, in.start_key+in.start_ksize);
ds_bulk_t start_kdata(in.start_key.data, in.start_key.data+in.start_key.size);
auto keys = db->list_keys(start_kdata, in.max_keys);
hg_size_t num_keys = std::min(keys.size(), in.max_keys);
......@@ -854,7 +854,7 @@ static void sdskv_list_keyvals_ult(hg_handle_t handle)
std::vector<hg_size_t> remote_vsizes(vsizes.begin(), vsizes.end());
/* get the keys and values from the underlying database */
ds_bulk_t start_kdata(in.start_key, in.start_key+in.start_ksize);
ds_bulk_t start_kdata(in.start_key.data, in.start_key.data+in.start_key.size);
auto keyvals = db->list_keyvals(start_kdata, in.max_keys);
hg_size_t num_keys = std::min(keyvals.size(), in.max_keys);
......@@ -981,89 +981,6 @@ static void sdskv_list_keyvals_ult(hg_handle_t handle)
}
DEFINE_MARGO_RPC_HANDLER(sdskv_list_keyvals_ult)
#if 0
static void sdskv_list_keyvals_ult(hg_handle_t handle)
{
hg_return_t hret;
list_in_t in;
list_out_t out;
out.ret = -1;
out.nkeys = 0;
out.ksizes = nullptr;
out.keys = nullptr;
out.nvalues = 0;
out.vsizes = nullptr;
out.values = nullptr;
margo_instance_id mid = margo_hg_handle_get_instance(handle);
assert(mid);
const struct hg_info* info = margo_get_info(handle);
sdskv_provider_t svr_ctx =
(sdskv_provider_t)margo_registered_data_mplex(mid, info->id, info->target_id);
if(!svr_ctx) {
fprintf(stderr, "Error: SDSKV could not find provider\n");
margo_respond(handle, &out);
margo_destroy(handle);
return;
}
hret = margo_get_input(handle, &in);
if(hret != HG_SUCCESS) {
margo_respond(handle, &out);
margo_destroy(handle);
return;
}
auto it = svr_ctx->databases.find(in.db_id);
if(it == svr_ctx->databases.end()) {
margo_respond(handle, &out);
margo_free_input(handle, &in);
margo_destroy(handle);
return;
}
ds_bulk_t start_kdata(in.start_key, in.start_key+in.start_ksize);
auto keyvals = it->second->list_keyvals(start_kdata, in.max_keys);
out.nkeys = keyvals.size();
out.nvalues = keyvals.size();
/* create the array of sizes */
std::vector<hg_size_t> sizes(out.nkeys);
for(unsigned i = 0; i < out.nkeys; i++) {
sizes[i] = keyvals[i].first.size();
}
out.ksizes = sizes.data();
/* create the packed data */
std::vector<kv_data_t> packed_keys(out.nkeys);
for(unsigned i = 0; i < out.nkeys; i++) {
packed_keys[i] = (char*)(keyvals[i].first.data());
}
out.keys = packed_keys.data();
/* create the array of value sizes */
std::vector<hg_size_t> vsizes(out.nvalues);
for(unsigned i = 0; i < out.nvalues; i++) {
vsizes[i] = keyvals[i].second.size();
}
out.vsizes = vsizes.data();
/* create the packed value data */
std::vector<kv_data_t> packed_vals(out.nvalues);
for(unsigned i = 0; i < out.nvalues; i++) {
packed_vals[i] = (char*)(keyvals[i].second.data());
}
out.values = packed_vals.data();
out.ret = 0;
margo_respond(handle, &out);
margo_free_input(handle, &in);
margo_destroy(handle);
return;
}
DEFINE_MARGO_RPC_HANDLER(sdskv_list_keyvals_ult)
#endif
static void sdskv_server_finalize_cb(void *data)
{
sdskv_provider_t svr_ctx = (sdskv_provider_t)data;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment