Commit 7e49b1e1 authored by Matthieu Dorier's avatar Matthieu Dorier

implemented get_packed

parent e2b04965
......@@ -258,6 +258,25 @@ int sdskv_get_multi(sdskv_provider_handle_t provider,
size_t num, const void* const* keys, const hg_size_t* ksizes,
void** values, hg_size_t *vsizes);
/**
* @brief Get multiple values into a single packed buffer.
*
* @param[in] provider provider handle
* @param[in] db_id database id
* @param[inout] num number of values to retrieve, number of values actually retrieved
* @param[in] keys buffer of packed keys to retrieve
* @param[in] ksizes size of the keys
* @param[in] vbufsize size of the buffer allocated for the values
* @param[out] values buffer allocated to receive packed values
* @param[out] vsizes sizes of the values
*
* @return SDSKV_SUCCESS or error code defined in sdskv-common.h
*/
int sdskv_get_packed(sdskv_provider_handle_t provider,
sdskv_database_id_t db_id,
size_t* num, const void* packed_keys, const hg_size_t* ksizes,
hg_size_t vbufsize, void* packed_values, hg_size_t *vsizes);
/**
* @brief Gets the length of a value associated with a given key.
*
......
......@@ -18,6 +18,7 @@ struct sdskv_client {
hg_id_t sdskv_bulk_put_id;
hg_id_t sdskv_get_id;
hg_id_t sdskv_get_multi_id;
hg_id_t sdskv_get_packed_id;
hg_id_t sdskv_exists_id;
hg_id_t sdskv_erase_id;
hg_id_t sdskv_erase_multi_id;
......@@ -61,10 +62,11 @@ static int sdskv_client_register(sdskv_client_t client, margo_instance_id mid)
margo_registered_name(mid, "sdskv_list_databases_rpc", &client->sdskv_list_databases_id, &flag);
margo_registered_name(mid, "sdskv_put_rpc", &client->sdskv_put_id, &flag);
margo_registered_name(mid, "sdskv_put_multi_rpc", &client->sdskv_put_multi_id, &flag);
margo_registered_name(mid, "sdskv_put_packed_rpc", &client->sdskv_put_packed_id, &flag);
margo_registered_name(mid, "sdskv_put_packed_rpc", &client->sdskv_put_packed_id, &flag);
margo_registered_name(mid, "sdskv_bulk_put_rpc", &client->sdskv_bulk_put_id, &flag);
margo_registered_name(mid, "sdskv_get_rpc", &client->sdskv_get_id, &flag);
margo_registered_name(mid, "sdskv_get_multi_rpc", &client->sdskv_get_multi_id, &flag);
margo_registered_name(mid, "sdskv_get_packed_rpc", &client->sdskv_get_packed_id, &flag);
margo_registered_name(mid, "sdskv_erase_rpc", &client->sdskv_erase_id, &flag);
margo_registered_name(mid, "sdskv_erase_multi_rpc", &client->sdskv_erase_multi_id, &flag);
margo_registered_name(mid, "sdskv_exists_rpc", &client->sdskv_exists_id, &flag);
......@@ -100,6 +102,8 @@ static int sdskv_client_register(sdskv_client_t client, margo_instance_id mid)
MARGO_REGISTER(mid, "sdskv_get_rpc", get_in_t, get_out_t, NULL);
client->sdskv_get_multi_id =
MARGO_REGISTER(mid, "sdskv_get_multi_rpc", get_multi_in_t, get_multi_out_t, NULL);
client->sdskv_get_packed_id =
MARGO_REGISTER(mid, "sdskv_get_packed_rpc", get_packed_in_t, get_packed_out_t, NULL);
client->sdskv_erase_id =
MARGO_REGISTER(mid, "sdskv_erase_rpc", erase_in_t, erase_out_t, NULL);
client->sdskv_erase_multi_id =
......@@ -1170,6 +1174,101 @@ int sdskv_length_packed(sdskv_provider_handle_t provider,
return ret;
}
int sdskv_get_packed(sdskv_provider_handle_t provider,
sdskv_database_id_t db_id,
size_t* num, const void* packed_keys, const hg_size_t* ksizes,
hg_size_t vbufsize, void* packed_vals, hg_size_t *vsizes)
{
hg_return_t hret;
int ret;
hg_handle_t handle;
get_packed_in_t in;
get_packed_out_t out;
in.db_id = db_id;
in.num_keys = *num;
in.keys_bulk_size = 0;
in.keys_bulk_handle = HG_BULK_NULL;
in.vals_bulk_size = 0;
in.vals_bulk_handle = HG_BULK_NULL;
hg_size_t total_ksize = 0;
unsigned i=0;
for(i = 0; i < *num; i++) {
total_ksize += ksizes[i];
}
/* create bulk handle to expose the packed_keys and ksizes */
void* seg_ptrs[2] = { (void*)ksizes, (void*)packed_keys };
hg_size_t seg_sizes[2] = { (*num)*sizeof(hg_size_t), total_ksize };
in.keys_bulk_size = total_ksize + (*num)*sizeof(hg_size_t);
hret = margo_bulk_create(provider->client->mid, 2, seg_ptrs, seg_sizes,
HG_BULK_READ_ONLY, &in.keys_bulk_handle);
if(hret != HG_SUCCESS) {
fprintf(stderr,"[SDSKV] margo_bulk_create() for keys/ksizes failed in sdskv_get_packed()\n");
return SDSKV_ERR_MERCURY;
}
/* create bulk handle to expose the packed_vals and vsizes */
seg_ptrs[0] = (void*)vsizes;
seg_ptrs[1] = (void*)packed_vals;
seg_sizes[0] = (*num)*sizeof(hg_size_t);
seg_sizes[1] = vbufsize;
in.vals_bulk_size = (*num)*sizeof(hg_size_t) + vbufsize;
hret = margo_bulk_create(provider->client->mid, 2, seg_ptrs, seg_sizes,
HG_BULK_WRITE_ONLY, &in.vals_bulk_handle);
if(hret != HG_SUCCESS) {
fprintf(stderr,"[SDSKV] margo_bulk_create() for vals/vsizes failed in sdskv_get_packed()\n");
margo_bulk_free(in.keys_bulk_handle);
return SDSKV_ERR_MERCURY;
}
/* create RPC handle */
hret = margo_create(
provider->client->mid,
provider->addr,
provider->client->sdskv_get_packed_id,
&handle);
if(hret != HG_SUCCESS) {
fprintf(stderr,"[SDSKV] margo_create() failed in sdskv_get_packed()\n");
margo_bulk_free(in.keys_bulk_handle);
margo_bulk_free(in.vals_bulk_handle);
return SDSKV_ERR_MERCURY;
}
/* forward RPC */
hret = margo_provider_forward(provider->provider_id, handle, &in);
if(hret != HG_SUCCESS) {
fprintf(stderr,"[SDSKV] margo_provider_forward() failed in sdskv_get_packed()\n");
margo_bulk_free(in.keys_bulk_handle);
margo_bulk_free(in.vals_bulk_handle);
margo_destroy(handle);
return SDSKV_ERR_MERCURY;
}
/* Get output */
hret = margo_get_output(handle, &out);
if(hret != HG_SUCCESS) {
fprintf(stderr,"[SDSKV] margo_get_output() failed in sdskv_get_packed()\n");
margo_bulk_free(in.keys_bulk_handle);
margo_bulk_free(in.vals_bulk_handle);
margo_destroy(handle);
return SDSKV_ERR_MERCURY;
}
ret = out.ret;
*num = out.num_keys;
margo_bulk_free(in.keys_bulk_handle);
margo_bulk_free(in.vals_bulk_handle);
margo_free_output(handle, &out);
margo_destroy(handle);
return ret;
}
int sdskv_erase(sdskv_provider_handle_t provider,
sdskv_database_id_t db_id, const void *key,
hg_size_t ksize)
......
......@@ -209,6 +209,18 @@ MERCURY_GEN_PROC(get_multi_in_t, \
((hg_size_t)(vals_bulk_size)))
MERCURY_GEN_PROC(get_multi_out_t, ((int32_t)(ret)))
// ------------- GET PACKED ------------- //
MERCURY_GEN_PROC(get_packed_in_t, \
((uint64_t)(db_id))\
((hg_size_t)(num_keys))\
((hg_size_t)(keys_bulk_size))\
((hg_bulk_t)(keys_bulk_handle))\
((hg_size_t)(vals_bulk_size))\
((hg_bulk_t)(vals_bulk_handle)))
MERCURY_GEN_PROC(get_packed_out_t, \
((int32_t)(ret))\
((hg_size_t)(num_keys)))
// ------------- LENGTH MULTI ------------- //
MERCURY_GEN_PROC(length_multi_in_t, \
((uint64_t)(db_id))\
......
......@@ -47,6 +47,7 @@ struct sdskv_server_context_t
hg_id_t sdskv_bulk_put_id;
hg_id_t sdskv_get_id;
hg_id_t sdskv_get_multi_id;
hg_id_t sdskv_get_packed_id;
hg_id_t sdskv_exists_id;
hg_id_t sdskv_erase_id;
hg_id_t sdskv_erase_multi_id;
......@@ -87,6 +88,7 @@ DECLARE_MARGO_RPC_HANDLER(sdskv_length_multi_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_length_packed_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_get_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_get_multi_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_get_packed_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_bulk_put_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_bulk_get_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_list_keys_ult)
......@@ -210,6 +212,12 @@ extern "C" int sdskv_provider_register(
tmp_svr_ctx->sdskv_get_multi_id = rpc_id;
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_get_packed_rpc",
get_packed_in_t, get_packed_out_t,
sdskv_get_packed_ult, provider_id, abt_pool);
tmp_svr_ctx->sdskv_get_packed_id = rpc_id;
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_length_rpc",
length_in_t, length_out_t,
sdskv_length_ult, provider_id, abt_pool);
......@@ -1147,6 +1155,134 @@ static void sdskv_get_multi_ult(hg_handle_t handle)
}
DEFINE_MARGO_RPC_HANDLER(sdskv_get_multi_ult)
static void sdskv_get_packed_ult(hg_handle_t handle)
{
hg_return_t hret;
get_packed_in_t in;
get_packed_out_t out;
out.ret = SDSKV_SUCCESS;
std::vector<char> local_keys_buffer;
std::vector<char> local_vals_buffer;
hg_bulk_t local_keys_bulk_handle;
hg_bulk_t local_vals_bulk_handle;
auto r1 = at_exit([&handle]() { margo_destroy(handle); });
auto r2 = at_exit([&handle,&out]() { margo_respond(handle, &out); });
/* get margo instance and provider */
margo_instance_id mid = margo_hg_handle_get_instance(handle);
const struct hg_info* info = margo_get_info(handle);
sdskv_provider_t svr_ctx =
(sdskv_provider_t)margo_registered_data(mid, info->id);
if(!svr_ctx) {
out.ret = SDSKV_ERR_UNKNOWN_PR;
return;
}
/* deserialize input */
hret = margo_get_input(handle, &in);
if(hret != HG_SUCCESS) {
out.ret = SDSKV_ERR_MERCURY;
return;
}
auto r3 = at_exit([&handle,&in]() { margo_free_input(handle, &in); });
/* find the target database */
ABT_rwlock_rdlock(svr_ctx->lock);
auto it = svr_ctx->databases.find(in.db_id);
if(it == svr_ctx->databases.end()) {
ABT_rwlock_unlock(svr_ctx->lock);
out.ret = SDSKV_ERR_UNKNOWN_DB;
return;
}
auto db = it->second;
ABT_rwlock_unlock(svr_ctx->lock);
/* allocate buffers to receive the keys */
local_keys_buffer.resize(in.keys_bulk_size);
std::vector<void*> keys_addr(1);
keys_addr[0] = (void*)local_keys_buffer.data();
/* create bulk handle to receive key sizes and packed keys */
hret = margo_bulk_create(mid, 1, keys_addr.data(), &in.keys_bulk_size,
HG_BULK_WRITE_ONLY, &local_keys_bulk_handle);
if(hret != HG_SUCCESS) {
out.ret = SDSKV_ERR_MERCURY;
return;
}
auto r6 = at_exit([&local_keys_bulk_handle]() { margo_bulk_free(local_keys_bulk_handle); });
/* allocate buffer to send the values */
local_vals_buffer.resize(in.vals_bulk_size);
std::vector<void*> vals_addr(1);
vals_addr[0] = (void*)local_vals_buffer.data();
/* create bulk handle to receive max value sizes and to send values */
hret = margo_bulk_create(mid, 1, vals_addr.data(), &in.vals_bulk_size,
HG_BULK_READ_ONLY, &local_vals_bulk_handle);
if(hret != HG_SUCCESS) {
out.ret = SDSKV_ERR_MERCURY;
return;
}
auto r7 = at_exit([&local_vals_bulk_handle]() { margo_bulk_free(local_vals_bulk_handle); });
/* transfer keys and key sizes */
hret = margo_bulk_transfer(mid, HG_BULK_PULL, info->addr, in.keys_bulk_handle, 0,
local_keys_bulk_handle, 0, in.keys_bulk_size);
if(hret != HG_SUCCESS) {
out.ret = SDSKV_ERR_MERCURY;
return;
}
/* interpret beginning of the key buffer as a list of key sizes */
hg_size_t* key_sizes = (hg_size_t*)local_keys_buffer.data();
/* find beginning of packed keys */
char* packed_keys = local_keys_buffer.data() + in.num_keys*sizeof(hg_size_t);
/* interpret beginning of the value buffer as a list of value sizes */
hg_size_t* val_sizes = (hg_size_t*)local_vals_buffer.data();
/* find beginning of region where to pack values */
char* packed_values = local_vals_buffer.data() + in.num_keys*sizeof(hg_size_t);
/* go through the key/value pairs and get the values from the database */
size_t available_client_memory = in.vals_bulk_size - in.num_keys*sizeof(hg_size_t);
unsigned i = 0;
for(unsigned i=0; i < in.num_keys; i++) {
ds_bulk_t kdata(packed_keys, packed_keys+key_sizes[i]);
ds_bulk_t vdata;
if(available_client_memory == 0) {
val_sizes[i] = 0;
out.ret = SDSKV_ERR_SIZE;
continue;
}
if(db->get(kdata, vdata)) {
if(vdata.size() > available_client_memory) {
available_client_memory = 0;
out.ret = SDSKV_ERR_SIZE;
val_sizes[i] = 0;
} else {
val_sizes[i] = vdata.size();
memcpy(packed_values, vdata.data(), val_sizes[i]);
}
} else {
val_sizes[i] = 0;
}
packed_keys += key_sizes[i];
packed_values += val_sizes[i];
}
/* do a PUSH operation to push back the values to the client */
hret = margo_bulk_transfer(mid, HG_BULK_PUSH, info->addr, in.vals_bulk_handle, 0,
local_vals_bulk_handle, 0, in.vals_bulk_size);
if(hret != HG_SUCCESS) {
out.ret = SDSKV_ERR_MERCURY;
return;
}
return;
}
DEFINE_MARGO_RPC_HANDLER(sdskv_get_packed_ult)
static void sdskv_length_multi_ult(hg_handle_t handle)
{
......
......@@ -138,7 +138,7 @@ int main(int argc, char *argv[])
packed_keys.data(), packed_key_sizes.data(),
rval_len.data());
if(ret != 0) {
fprintf(stderr, "Error: sdskv_length_multi() failed\n");
fprintf(stderr, "Error: sdskv_length_packed() failed\n");
sdskv_shutdown_service(kvcl, svr_addr);
sdskv_provider_handle_release(kvph);
margo_addr_free(mid, svr_addr);
......@@ -161,23 +161,38 @@ int main(int argc, char *argv[])
}
}
#if 0
{
/* **** get keys **** */
std::vector<std::vector<char>> read_values(num_keys);
for(unsigned i=0; i < num_keys; i++) {
read_values[i].resize(rval_len[i]);
}
std::vector<void*> read_values_ptr(num_keys);
for(unsigned i=0; i < num_keys; i++) {
read_values_ptr[i] = read_values[i].data();
}
/* **** get keys **** */
/* figure out the total size needed */
unsigned total_read_size = 0;
for(auto& l : rval_len) total_read_size += l;
ret = sdskv_get_multi(kvph, db_id, num_keys,
keys_ptr.data(), keys_size.data(),
read_values_ptr.data(), rval_len.data());
if(ret != 0) {
fprintf(stderr, "Error: sdskv_get_multi() failed\n");
std::vector<char> read_values(total_read_size);
std::vector<hg_size_t> read_value_sizes(num_keys);
hg_size_t num_keys_read = num_keys;
ret = sdskv_get_packed(kvph, db_id, &num_keys_read,
packed_keys.data(), packed_key_sizes.data(),
total_read_size, read_values.data(), read_value_sizes.data());
if(ret != 0) {
fprintf(stderr, "Error: sdskv_get_packed() failed\n");
sdskv_shutdown_service(kvcl, svr_addr);
sdskv_provider_handle_release(kvph);
margo_addr_free(mid, svr_addr);
sdskv_client_finalize(kvcl);
margo_finalize(mid);
return -1;
}
/* check the keys we received against reference */
size_t koffset = 0, voffset = 0;
for(unsigned i=0; i < num_keys; i++) {
std::string vstring(read_values.data() + voffset, read_value_sizes[i]);
std::string k(packed_keys.data() + koffset, packed_key_sizes[i]);
std::cout << "Got " << k << " ===> " << vstring << "\t(size = " << vstring.size()
<< ") expected: " << reference[k] << " (size = " << reference[k].size() << ")"
<< std::endl;
if(vstring != reference[k]) {
fprintf(stderr, "Error: sdskv_get_packed() returned a value different from the reference\n");
sdskv_shutdown_service(kvcl, svr_addr);
sdskv_provider_handle_release(kvph);
margo_addr_free(mid, svr_addr);
......@@ -185,27 +200,9 @@ int main(int argc, char *argv[])
margo_finalize(mid);
return -1;
}
/* check the keys we received against reference */
for(unsigned i=0; i < num_keys; i++) {
std::string vstring(read_values[i].data());
vstring.resize(rval_len[i]);
auto& k = keys[i];
std::cout << "Got " << k << " ===> " << vstring << "\t(size = " << vstring.size()
<< ") expected: " << reference[k] << " (size = " << reference[k].size() << ")"
<< std::endl;
if(vstring != reference[k]) {
fprintf(stderr, "Error: sdskv_get_multi() returned a value different from the reference\n");
sdskv_shutdown_service(kvcl, svr_addr);
sdskv_provider_handle_release(kvph);
margo_addr_free(mid, svr_addr);
sdskv_client_finalize(kvcl);
margo_finalize(mid);
return -1;
}
}
koffset += packed_key_sizes[i];
voffset += read_value_sizes[i];
}
#endif
/* shutdown the server */
ret = sdskv_shutdown_service(kvcl, svr_addr);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment