Commit eeb9b5de authored by Matthieu Dorier's avatar Matthieu Dorier

added sdskv_length_packed function

parent 2c988d1a
...@@ -293,6 +293,26 @@ int sdskv_length_multi(sdskv_provider_handle_t handle, ...@@ -293,6 +293,26 @@ int sdskv_length_multi(sdskv_provider_handle_t handle,
const void* const* keys, const hg_size_t* ksizes, const void* const* keys, const hg_size_t* ksizes,
hg_size_t *vsizes); hg_size_t *vsizes);
/**
* @brief Gets the length of values associated with multiple keys.
* If a particular key does not exists, this function will set the length
* of its value to 0 (so the user needs another way to differenciate
* between a key that does not exists and a 0-sized value).
*
* @param[in] handle provider handle
* @param[in] db_id database id
* @param[in] num number of keys
* @param[in] keys buffer of packed keys
* @param[in] ksizes array of key sizes
* @param[out] vsizes array where to put value sizes
*
* @return SDSKV_SUCCESS or error code defined in sdskv-common.h
*/
int sdskv_length_packed(sdskv_provider_handle_t handle,
sdskv_database_id_t db_id, size_t num,
const void* packed_keys, const hg_size_t* ksizes,
hg_size_t *vsizes);
/** /**
* @brief Checks if the given key exists in the database. * @brief Checks if the given key exists in the database.
* *
......
...@@ -23,6 +23,7 @@ struct sdskv_client { ...@@ -23,6 +23,7 @@ struct sdskv_client {
hg_id_t sdskv_erase_multi_id; hg_id_t sdskv_erase_multi_id;
hg_id_t sdskv_length_id; hg_id_t sdskv_length_id;
hg_id_t sdskv_length_multi_id; hg_id_t sdskv_length_multi_id;
hg_id_t sdskv_length_packed_id;
hg_id_t sdskv_bulk_get_id; hg_id_t sdskv_bulk_get_id;
hg_id_t sdskv_list_keys_id; hg_id_t sdskv_list_keys_id;
hg_id_t sdskv_list_keyvals_id; hg_id_t sdskv_list_keyvals_id;
...@@ -69,6 +70,7 @@ static int sdskv_client_register(sdskv_client_t client, margo_instance_id mid) ...@@ -69,6 +70,7 @@ static int sdskv_client_register(sdskv_client_t client, margo_instance_id mid)
margo_registered_name(mid, "sdskv_exists_rpc", &client->sdskv_exists_id, &flag); margo_registered_name(mid, "sdskv_exists_rpc", &client->sdskv_exists_id, &flag);
margo_registered_name(mid, "sdskv_length_rpc", &client->sdskv_length_id, &flag); margo_registered_name(mid, "sdskv_length_rpc", &client->sdskv_length_id, &flag);
margo_registered_name(mid, "sdskv_length_multi_rpc", &client->sdskv_length_multi_id, &flag); margo_registered_name(mid, "sdskv_length_multi_rpc", &client->sdskv_length_multi_id, &flag);
margo_registered_name(mid, "sdskv_length_packed_rpc", &client->sdskv_length_packed_id, &flag);
margo_registered_name(mid, "sdskv_bulk_get_rpc", &client->sdskv_bulk_get_id, &flag); margo_registered_name(mid, "sdskv_bulk_get_rpc", &client->sdskv_bulk_get_id, &flag);
margo_registered_name(mid, "sdskv_list_keys_rpc", &client->sdskv_list_keys_id, &flag); margo_registered_name(mid, "sdskv_list_keys_rpc", &client->sdskv_list_keys_id, &flag);
margo_registered_name(mid, "sdskv_list_keyvals_rpc", &client->sdskv_list_keyvals_id, &flag); margo_registered_name(mid, "sdskv_list_keyvals_rpc", &client->sdskv_list_keyvals_id, &flag);
...@@ -108,6 +110,8 @@ static int sdskv_client_register(sdskv_client_t client, margo_instance_id mid) ...@@ -108,6 +110,8 @@ static int sdskv_client_register(sdskv_client_t client, margo_instance_id mid)
MARGO_REGISTER(mid, "sdskv_length_rpc", length_in_t, length_out_t, NULL); MARGO_REGISTER(mid, "sdskv_length_rpc", length_in_t, length_out_t, NULL);
client->sdskv_length_multi_id = client->sdskv_length_multi_id =
MARGO_REGISTER(mid, "sdskv_length_multi_rpc", length_multi_in_t, length_multi_out_t, NULL); MARGO_REGISTER(mid, "sdskv_length_multi_rpc", length_multi_in_t, length_multi_out_t, NULL);
client->sdskv_length_packed_id =
MARGO_REGISTER(mid, "sdskv_length_packed_rpc", length_packed_in_t, length_packed_out_t, NULL);
client->sdskv_bulk_get_id = client->sdskv_bulk_get_id =
MARGO_REGISTER(mid, "sdskv_bulk_get_rpc", bulk_get_in_t, bulk_get_out_t, NULL); MARGO_REGISTER(mid, "sdskv_bulk_get_rpc", bulk_get_in_t, bulk_get_out_t, NULL);
client->sdskv_list_keys_id = client->sdskv_list_keys_id =
...@@ -1080,6 +1084,92 @@ finish: ...@@ -1080,6 +1084,92 @@ finish:
return ret; return ret;
} }
int sdskv_length_packed(sdskv_provider_handle_t provider,
sdskv_database_id_t db_id, size_t num, const void *packed_keys,
const hg_size_t* ksizes, hg_size_t* vsizes)
{
hg_return_t hret;
int ret;
hg_handle_t handle;
length_packed_in_t in;
length_packed_out_t out;
in.db_id = db_id;
in.num_keys = num;
in.in_bulk_size = 0;
in.in_bulk_handle = HG_BULK_NULL;
in.out_bulk_handle = HG_BULK_NULL;
hg_size_t total_ksize = 0;
unsigned i=0;
for(i = 0; i < num; i++) {
total_ksize += ksizes[i];
}
/* create bulk handle to expose the packed_keys and ksizes */
void* seg_ptrs[2] = { (void*)ksizes, (void*)packed_keys };
hg_size_t seg_sizes[2] = { num*sizeof(hg_size_t), total_ksize };
in.in_bulk_size = total_ksize + num*sizeof(hg_size_t);
hret = margo_bulk_create(provider->client->mid, 2, seg_ptrs, seg_sizes,
HG_BULK_READ_ONLY, &in.in_bulk_handle);
if(hret != HG_SUCCESS) {
fprintf(stderr,"[SDSKV] margo_bulk_create() for keys/ksizes failed in sdskv_length_packed()\n");
return SDSKV_ERR_MERCURY;
}
/* create bulk handle to expose the vsizes */
hg_size_t val_size_buf_size = num*sizeof(hg_size_t);
hret = margo_bulk_create(provider->client->mid, 1, (void**)(&vsizes), &val_size_buf_size,
HG_BULK_WRITE_ONLY, &in.out_bulk_handle);
if(hret != HG_SUCCESS) {
fprintf(stderr,"[SDSKV] margo_bulk_create() for vsizes failed in sdskv_length_packed()\n");
margo_bulk_free(in.in_bulk_handle);
return SDSKV_ERR_MERCURY;
}
/* create RPC handle */
hret = margo_create(
provider->client->mid,
provider->addr,
provider->client->sdskv_length_packed_id,
&handle);
if(hret != HG_SUCCESS) {
fprintf(stderr,"[SDSKV] margo_create() failed in sdskv_length_packed()\n");
margo_bulk_free(in.in_bulk_handle);
margo_bulk_free(in.out_bulk_handle);
return SDSKV_ERR_MERCURY;
}
/* forward RPC */
hret = margo_provider_forward(provider->provider_id, handle, &in);
if(hret != HG_SUCCESS) {
fprintf(stderr,"[SDSKV] margo_provider_forward() failed in sdskv_length_packed()\n");
margo_bulk_free(in.in_bulk_handle);
margo_bulk_free(in.out_bulk_handle);
margo_destroy(handle);
return SDSKV_ERR_MERCURY;
}
/* Get output */
hret = margo_get_output(handle, &out);
if(hret != HG_SUCCESS) {
fprintf(stderr,"[SDSKV] margo_get_output() failed in sdskv_length_packed()\n");
margo_bulk_free(in.in_bulk_handle);
margo_bulk_free(in.out_bulk_handle);
margo_destroy(handle);
return SDSKV_ERR_MERCURY;
}
ret = out.ret;
margo_bulk_free(in.in_bulk_handle);
margo_bulk_free(in.out_bulk_handle);
margo_free_output(handle, &out);
margo_destroy(handle);
return ret;
}
int sdskv_erase(sdskv_provider_handle_t provider, int sdskv_erase(sdskv_provider_handle_t provider,
sdskv_database_id_t db_id, const void *key, sdskv_database_id_t db_id, const void *key,
hg_size_t ksize) hg_size_t ksize)
......
...@@ -191,7 +191,7 @@ MERCURY_GEN_PROC(put_multi_in_t, \ ...@@ -191,7 +191,7 @@ MERCURY_GEN_PROC(put_multi_in_t, \
((hg_size_t)(vals_bulk_size))) ((hg_size_t)(vals_bulk_size)))
MERCURY_GEN_PROC(put_multi_out_t, ((int32_t)(ret))) MERCURY_GEN_PROC(put_multi_out_t, ((int32_t)(ret)))
// ------------- PUT PACKED MULTI ------------- // // ------------- PUT PACKED ------------- //
MERCURY_GEN_PROC(put_packed_in_t, \ MERCURY_GEN_PROC(put_packed_in_t, \
((uint64_t)(db_id))\ ((uint64_t)(db_id))\
((hg_size_t)(num_keys))\ ((hg_size_t)(num_keys))\
...@@ -218,7 +218,16 @@ MERCURY_GEN_PROC(length_multi_in_t, \ ...@@ -218,7 +218,16 @@ MERCURY_GEN_PROC(length_multi_in_t, \
((hg_bulk_t)(vals_size_bulk_handle))) ((hg_bulk_t)(vals_size_bulk_handle)))
MERCURY_GEN_PROC(length_multi_out_t, ((int32_t)(ret))) MERCURY_GEN_PROC(length_multi_out_t, ((int32_t)(ret)))
// ------------- LENGTH MULTI ------------- // // ------------- LENGTH PACKED ------------- //
MERCURY_GEN_PROC(length_packed_in_t, \
((uint64_t)(db_id))\
((hg_size_t)(num_keys))\
((hg_size_t)(in_bulk_size))\
((hg_bulk_t)(in_bulk_handle))\
((hg_bulk_t)(out_bulk_handle)))
MERCURY_GEN_PROC(length_packed_out_t, ((int32_t)(ret)))
// ------------- ERASE MULTI ------------- //
MERCURY_GEN_PROC(erase_multi_in_t, \ MERCURY_GEN_PROC(erase_multi_in_t, \
((uint64_t)(db_id))\ ((uint64_t)(db_id))\
((hg_size_t)(num_keys))\ ((hg_size_t)(num_keys))\
......
...@@ -52,6 +52,7 @@ struct sdskv_server_context_t ...@@ -52,6 +52,7 @@ struct sdskv_server_context_t
hg_id_t sdskv_erase_multi_id; hg_id_t sdskv_erase_multi_id;
hg_id_t sdskv_length_id; hg_id_t sdskv_length_id;
hg_id_t sdskv_length_multi_id; hg_id_t sdskv_length_multi_id;
hg_id_t sdskv_length_packed_id;
hg_id_t sdskv_bulk_get_id; hg_id_t sdskv_bulk_get_id;
hg_id_t sdskv_list_keys_id; hg_id_t sdskv_list_keys_id;
hg_id_t sdskv_list_keyvals_id; hg_id_t sdskv_list_keyvals_id;
...@@ -83,6 +84,7 @@ DECLARE_MARGO_RPC_HANDLER(sdskv_put_multi_ult) ...@@ -83,6 +84,7 @@ DECLARE_MARGO_RPC_HANDLER(sdskv_put_multi_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_put_packed_ult) DECLARE_MARGO_RPC_HANDLER(sdskv_put_packed_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_length_ult) DECLARE_MARGO_RPC_HANDLER(sdskv_length_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_length_multi_ult) DECLARE_MARGO_RPC_HANDLER(sdskv_length_multi_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_length_packed_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_get_ult) DECLARE_MARGO_RPC_HANDLER(sdskv_get_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_get_multi_ult) DECLARE_MARGO_RPC_HANDLER(sdskv_get_multi_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_bulk_put_ult) DECLARE_MARGO_RPC_HANDLER(sdskv_bulk_put_ult)
...@@ -220,6 +222,12 @@ extern "C" int sdskv_provider_register( ...@@ -220,6 +222,12 @@ extern "C" int sdskv_provider_register(
tmp_svr_ctx->sdskv_length_multi_id = rpc_id; tmp_svr_ctx->sdskv_length_multi_id = rpc_id;
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL); margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_length_packed_rpc",
length_packed_in_t, length_packed_out_t,
sdskv_length_packed_ult, provider_id, abt_pool);
tmp_svr_ctx->sdskv_length_packed_id = rpc_id;
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_exists_rpc", rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_exists_rpc",
exists_in_t, exists_out_t, exists_in_t, exists_out_t,
sdskv_exists_ult, provider_id, abt_pool); sdskv_exists_ult, provider_id, abt_pool);
...@@ -1249,6 +1257,115 @@ static void sdskv_length_multi_ult(hg_handle_t handle) ...@@ -1249,6 +1257,115 @@ static void sdskv_length_multi_ult(hg_handle_t handle)
} }
DEFINE_MARGO_RPC_HANDLER(sdskv_length_multi_ult) DEFINE_MARGO_RPC_HANDLER(sdskv_length_multi_ult)
static void sdskv_length_packed_ult(hg_handle_t handle)
{
hg_return_t hret;
length_packed_in_t in;
length_packed_out_t out;
out.ret = SDSKV_SUCCESS;
std::vector<char> local_keys_buffer;
std::vector<hg_size_t> local_vals_size_buffer;
hg_bulk_t local_keys_bulk_handle;
hg_bulk_t local_vals_size_bulk_handle;
auto r1 = at_exit([&handle]() { margo_destroy(handle); });
auto r2 = at_exit([&handle,&out]() { margo_respond(handle, &out); });
/* get margo instance and provider */
margo_instance_id mid = margo_hg_handle_get_instance(handle);
const struct hg_info* info = margo_get_info(handle);
sdskv_provider_t svr_ctx =
(sdskv_provider_t)margo_registered_data(mid, info->id);
if(!svr_ctx) {
out.ret = SDSKV_ERR_UNKNOWN_PR;
return;
}
/* deserialize input */
hret = margo_get_input(handle, &in);
if(hret != HG_SUCCESS) {
out.ret = SDSKV_ERR_MERCURY;
return;
}
auto r3 = at_exit([&handle,&in]() { margo_free_input(handle, &in); });
/* find the target database */
ABT_rwlock_rdlock(svr_ctx->lock);
auto it = svr_ctx->databases.find(in.db_id);
if(it == svr_ctx->databases.end()) {
ABT_rwlock_unlock(svr_ctx->lock);
out.ret = SDSKV_ERR_UNKNOWN_DB;
return;
}
auto db = it->second;
ABT_rwlock_unlock(svr_ctx->lock);
/* allocate buffers to receive the keys and key sizes*/
local_keys_buffer.resize(in.in_bulk_size);
std::vector<void*> keys_addr(1);
keys_addr[0] = (void*)local_keys_buffer.data();
/* create bulk handle to receive key sizes and packed keys */
hret = margo_bulk_create(mid, 1, keys_addr.data(), &in.in_bulk_size,
HG_BULK_WRITE_ONLY, &local_keys_bulk_handle);
if(hret != HG_SUCCESS) {
out.ret = SDSKV_ERR_MERCURY;
return;
}
auto r6 = at_exit([&local_keys_bulk_handle]() { margo_bulk_free(local_keys_bulk_handle); });
/* allocate buffer to send the value sizes */
local_vals_size_buffer.resize(in.num_keys);
std::vector<void*> vals_sizes_addr(1);
hg_size_t local_vals_size_buffer_size = in.num_keys * sizeof(hg_size_t);
vals_sizes_addr[0] = (void*)local_vals_size_buffer.data();
/* create bulk handle to send values sizes */
hret = margo_bulk_create(mid, 1, vals_sizes_addr.data(), &local_vals_size_buffer_size,
HG_BULK_READ_ONLY, &local_vals_size_bulk_handle);
if(hret != HG_SUCCESS) {
out.ret = SDSKV_ERR_MERCURY;
return;
}
auto r7 = at_exit([&local_vals_size_bulk_handle]() { margo_bulk_free(local_vals_size_bulk_handle); });
/* transfer keys and ksizes */
hret = margo_bulk_transfer(mid, HG_BULK_PULL, info->addr, in.in_bulk_handle, 0,
local_keys_bulk_handle, 0, in.in_bulk_size);
if(hret != HG_SUCCESS) {
out.ret = SDSKV_ERR_MERCURY;
return;
}
/* interpret beginning of the key buffer as a list of key sizes */
hg_size_t* key_sizes = (hg_size_t*)local_keys_buffer.data();
/* find beginning of packed keys */
char* packed_keys = local_keys_buffer.data() + in.num_keys*sizeof(hg_size_t);
/* go through the key/value pairs and get the values from the database */
for(unsigned i=0; i < in.num_keys; i++) {
ds_bulk_t kdata(packed_keys, packed_keys+key_sizes[i]);
ds_bulk_t vdata;
if(db->get(kdata, vdata)) {
local_vals_size_buffer[i] = vdata.size();
} else {
local_vals_size_buffer[i] = 0;
}
packed_keys += key_sizes[i];
}
/* do a PUSH operation to push back the value sizes to the client */
hret = margo_bulk_transfer(mid, HG_BULK_PUSH, info->addr, in.out_bulk_handle, 0,
local_vals_size_bulk_handle, 0, local_vals_size_buffer_size);
if(hret != HG_SUCCESS) {
out.ret = SDSKV_ERR_MERCURY;
return;
}
return;
}
DEFINE_MARGO_RPC_HANDLER(sdskv_length_packed_ult)
static void sdskv_bulk_put_ult(hg_handle_t handle) static void sdskv_bulk_put_ult(hg_handle_t handle)
{ {
......
#!/bin/bash -x
if [ -z $srcdir ]; then
echo srcdir variable not set.
exit 1
fi
source $srcdir/test/test-util.sh
find_db_name
# start a server with 2 second wait,
# 20s timeout, and my_test_db as database
test_start_server 2 20 $test_db_full
sleep 1
#####################
run_to 20 test/sdskv-packed-test $svr_addr 1 $test_db_name 100
if [ $? -ne 0 ]; then
wait
exit 1
fi
wait
echo cleaning up $TMPBASE
rm -rf $TMPBASE
exit 0
/*
* (C) 2015 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include <stdio.h>
#include <assert.h>
#include <unistd.h>
#include <margo.h>
#include <iostream>
#include <string>
#include <vector>
#include <algorithm>
#include <map>
#include "sdskv-client.h"
static std::string gen_random_string(size_t len);
int main(int argc, char *argv[])
{
char cli_addr_prefix[64] = {0};
char *sdskv_svr_addr_str;
char *db_name;
margo_instance_id mid;
hg_addr_t svr_addr;
uint8_t mplex_id;
uint32_t num_keys;
sdskv_client_t kvcl;
sdskv_provider_handle_t kvph;
hg_return_t hret;
int ret;
if(argc != 5)
{
fprintf(stderr, "Usage: %s <sdskv_server_addr> <mplex_id> <db_name> <num_keys>\n", argv[0]);
fprintf(stderr, " Example: %s tcp://localhost:1234 1 foo 1000\n", argv[0]);
return(-1);
}
sdskv_svr_addr_str = argv[1];
mplex_id = atoi(argv[2]);
db_name = argv[3];
num_keys = atoi(argv[4]);
/* initialize Margo using the transport portion of the server
* address (i.e., the part before the first : character if present)
*/
for(unsigned i=0; (i<63 && sdskv_svr_addr_str[i] != '\0' && sdskv_svr_addr_str[i] != ':'); i++)
cli_addr_prefix[i] = sdskv_svr_addr_str[i];
/* start margo */
mid = margo_init(cli_addr_prefix, MARGO_SERVER_MODE, 0, 0);
if(mid == MARGO_INSTANCE_NULL)
{
fprintf(stderr, "Error: margo_init()\n");
return(-1);
}
ret = sdskv_client_init(mid, &kvcl);
if(ret != 0)
{
fprintf(stderr, "Error: sdskv_client_init()\n");
margo_finalize(mid);
return -1;
}
/* look up the SDSKV server address */
hret = margo_addr_lookup(mid, sdskv_svr_addr_str, &svr_addr);
if(hret != HG_SUCCESS)
{
fprintf(stderr, "Error: margo_addr_lookup()\n");
sdskv_client_finalize(kvcl);
margo_finalize(mid);
return(-1);
}
/* create a SDSKV provider handle */
ret = sdskv_provider_handle_create(kvcl, svr_addr, mplex_id, &kvph);
if(ret != 0)
{
fprintf(stderr, "Error: sdskv_provider_handle_create()\n");
margo_addr_free(mid, svr_addr);
sdskv_client_finalize(kvcl);
margo_finalize(mid);
return(-1);
}
/* open the database */
sdskv_database_id_t db_id;
ret = sdskv_open(kvph, db_name, &db_id);
if(ret == 0) {
printf("Successfuly open database %s, id is %ld\n", db_name, db_id);
} else {
fprintf(stderr, "Error: could not open database %s\n", db_name);
sdskv_provider_handle_release(kvph);
margo_addr_free(mid, svr_addr);
sdskv_client_finalize(kvcl);
margo_finalize(mid);
return(-1);
}
/* **** generate packed key/vals ***** */
std::string packed_keys;
std::vector<hg_size_t> packed_key_sizes;
std::string packed_vals;
std::vector<hg_size_t> packed_val_sizes;
std::map<std::string, std::string> reference;
size_t max_value_size = 24;
for(unsigned i=0; i < num_keys; i++) {
auto k = gen_random_string(16);
auto v = gen_random_string(3+i*(max_value_size-3)/num_keys);
reference[k] = v;
packed_keys += k;
packed_vals += v;
packed_key_sizes.push_back(k.size());
packed_val_sizes.push_back(v.size());
}
/* **** issue a put_packed ***** */
ret = sdskv_put_packed(kvph, db_id, num_keys, packed_keys.data(), packed_key_sizes.data(),
packed_vals.data(), packed_val_sizes.data());
if(ret != 0) {
fprintf(stderr, "Error: sdskv_put_packed() failed\n");
sdskv_shutdown_service(kvcl, svr_addr);
sdskv_provider_handle_release(kvph);
margo_addr_free(mid, svr_addr);
sdskv_client_finalize(kvcl);
margo_finalize(mid);
return -1;
}
printf("Successfuly inserted %d keys\n", num_keys);
/* retrieve the length of the values */
std::vector<hg_size_t> rval_len(num_keys);
ret = sdskv_length_packed(kvph, db_id, num_keys,
packed_keys.data(), packed_key_sizes.data(),
rval_len.data());
if(ret != 0) {
fprintf(stderr, "Error: sdskv_length_multi() failed\n");
sdskv_shutdown_service(kvcl, svr_addr);
sdskv_provider_handle_release(kvph);
margo_addr_free(mid, svr_addr);
sdskv_client_finalize(kvcl);
margo_finalize(mid);
return -1;
}
/* check if the lengths are correct */
for(unsigned i=0; i < num_keys; i++) {
if(rval_len[i] != packed_val_sizes[i]) {
fprintf(stderr, "Error: value %d doesn't have the right length (%ld != %ld)\n", i,
rval_len[i], packed_val_sizes[i]);
sdskv_shutdown_service(kvcl, svr_addr);
sdskv_provider_handle_release(kvph);
margo_addr_free(mid, svr_addr);
sdskv_client_finalize(kvcl);
margo_finalize(mid);
return -1;
}
}
#if 0
{
/* **** get keys **** */
std::vector<std::vector<char>> read_values(num_keys);
for(unsigned i=0; i < num_keys; i++) {
read_values[i].resize(rval_len[i]);
}
std::vector<void*> read_values_ptr(num_keys);
for(unsigned i=0; i < num_keys; i++) {
read_values_ptr[i] = read_values[i].data();
}
ret = sdskv_get_multi(kvph, db_id, num_keys,
keys_ptr.data(), keys_size.data(),
read_values_ptr.data(), rval_len.data());
if(ret != 0) {
fprintf(stderr, "Error: sdskv_get_multi() failed\n");
sdskv_shutdown_service(kvcl, svr_addr);
sdskv_provider_handle_release(kvph);
margo_addr_free(mid, svr_addr);
sdskv_client_finalize(kvcl);
margo_finalize(mid);
return -1;
}
/* check the keys we received against reference */
for(unsigned i=0; i < num_keys; i++) {
std::string vstring(read_values[i].data());
vstring.resize(rval_len[i]);
auto& k = keys[i];
std::cout << "Got " << k << " ===> " << vstring << "\t(size = " << vstring.size()
<< ") expected: " << reference[k] << " (size = " << reference[k].size() << ")"
<< std::endl;
if(vstring != reference[k]) {
fprintf(stderr, "Error: sdskv_get_multi() returned a value different from the reference\n");
sdskv_shutdown_service(kvcl, svr_addr);
sdskv_provider_handle_release(kvph);
margo_addr_free(mid, svr_addr);
sdskv_client_finalize(kvcl);
margo_finalize(mid);
return -1;
}
}
}
#endif
/* shutdown the server */
ret = sdskv_shutdown_service(kvcl, svr_addr);
/**** cleanup ****/
sdskv_provider_handle_release(kvph);
margo_addr_free(mid, svr_addr);
sdskv_client_finalize(kvcl);
margo_finalize(mid);
return(ret);
}
static std::string gen_random_string(size_t len) {
static const char alphanum[] =
"0123456789"
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
"abcdefghijklmnopqrstuvwxyz";
std::string s(len, ' ');
for (unsigned i = 0; i < len; ++i) {
s[i] = alphanum[rand() % (sizeof(alphanum) - 1)];
}
return s;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment