Commit ee5dd535 authored by Matthieu Dorier's avatar Matthieu Dorier

added exists_multi function

parent 1ffff767
......@@ -374,6 +374,24 @@ int sdskv_length_packed(sdskv_provider_handle_t handle,
int sdskv_exists(sdskv_provider_handle_t handle,
sdskv_database_id_t db_id, const void *key,
hg_size_t ksize, int* flag);
/**
* @brief Checks if the given key exists in the database.
*
* @param[in] handle provider handle
* @param[in] db_id database id
* @param[in] count number of keys
* @param[in] keys keys to lookup
* @param[in] ksizes size of the keys
* @param[out] array of count flags containing 1 if the key exists, 0 otherwise
*
* @return SDSKV_SUCCESS or error code defined in sdskv-common.h
*/
int sdskv_exists_multi(sdskv_provider_handle_t handle,
sdskv_database_id_t db_id, size_t count,
const void* const *keys,
const hg_size_t *ksizes, int* flags);
/**
* @brief Erases the key/value pair pointed by the given key.
*
......
......@@ -399,6 +399,43 @@ class client {
return exists(db, object_data(key), object_size(key));
}
//////////////////////////
// EXISTS MULTI methods
//////////////////////////
/**
* @brief Equivalent of sdskv_exists_multi.
*
* @param db Database instance.
* @param num Number of keys.
* @param keys Keys.
* @param ksizes Size of the keys.
*
* @return an std::vector<bool> v where v[i] is true iff key i exists.
*/
std::vector<bool> exists_multi(const database& db, size_t num, const void* const* key, const hg_size_t* ksize) const;
/**
* @brief Templated version of exists_multi method, meant to work with
* std::string and std::vector<X>. X must be a standard layout type.
*
* @tparam K Key type.
* @param db Database instance.
* @param keys Keys.
*
* @return an std::vector<bool> v where v[i] is true iff key i exists.
*/
template<typename K>
inline std::vector<bool> exists_multi(const database& db, const std::vector<K>& keys) const {
std::vector<const void*> key_addr(keys.size());
std::vector<hg_size_t> key_sizes(keys.size());
for(unsigned i=0; i < keys.size(); i++) {
key_addr[i] = object_data(keys[i]);
key_sizes[i] = object_size(keys[i]);
}
return exists_multi(db, keys.size(), key_addr.data(), key_sizes.data());
}
//////////////////////////
// LENGTH methods
//////////////////////////
......@@ -1490,6 +1527,14 @@ class database {
return m_ph.m_client->exists(*this, std::forward<T>(args)...);
}
/**
* @brief @see client::exists_multi
*/
template<typename ... T>
decltype(auto) exists_multi(T&& ... args) const {
return m_ph.m_client->exists_multi(*this, std::forward<T>(args)...);
}
/**
* @brief @see client::erase.
*/
......@@ -1614,6 +1659,17 @@ inline bool client::exists(const database& db, const void* key, hg_size_t ksize)
return flag;
}
inline std::vector<bool> client::exists_multi(const database& db, size_t num,
const void* const* keys, const hg_size_t* ksizes) const {
int flag;
std::vector<int> flags(num);
int ret = sdskv_exists_multi(db.m_ph.m_ph, db.m_db_id, num, keys, ksizes, flags.data());
_CHECK_RET(ret);
std::vector<bool> result(num);
for(unsigned i=0; i < num; i++) result[i] = flags[i];
return result;
}
inline bool client::length_multi(const database& db,
hg_size_t num, const void* const* keys,
const hg_size_t* ksizes, hg_size_t* vsizes) const {
......
......@@ -20,6 +20,7 @@ struct sdskv_client {
hg_id_t sdskv_get_multi_id;
hg_id_t sdskv_get_packed_id;
hg_id_t sdskv_exists_id;
hg_id_t sdskv_exists_multi_id;
hg_id_t sdskv_erase_id;
hg_id_t sdskv_erase_multi_id;
hg_id_t sdskv_length_id;
......@@ -70,6 +71,7 @@ static int sdskv_client_register(sdskv_client_t client, margo_instance_id mid)
margo_registered_name(mid, "sdskv_erase_rpc", &client->sdskv_erase_id, &flag);
margo_registered_name(mid, "sdskv_erase_multi_rpc", &client->sdskv_erase_multi_id, &flag);
margo_registered_name(mid, "sdskv_exists_rpc", &client->sdskv_exists_id, &flag);
margo_registered_name(mid, "sdskv_exists_multi_rpc", &client->sdskv_exists_multi_id, &flag);
margo_registered_name(mid, "sdskv_length_rpc", &client->sdskv_length_id, &flag);
margo_registered_name(mid, "sdskv_length_multi_rpc", &client->sdskv_length_multi_id, &flag);
margo_registered_name(mid, "sdskv_length_packed_rpc", &client->sdskv_length_packed_id, &flag);
......@@ -110,6 +112,8 @@ static int sdskv_client_register(sdskv_client_t client, margo_instance_id mid)
MARGO_REGISTER(mid, "sdskv_erase_multi_rpc", erase_multi_in_t, erase_multi_out_t, NULL);
client->sdskv_exists_id =
MARGO_REGISTER(mid, "sdskv_exists_rpc", exists_in_t, exists_out_t, NULL);
client->sdskv_exists_multi_id =
MARGO_REGISTER(mid, "sdskv_exists_multi_rpc", exists_multi_in_t, exists_multi_out_t, NULL);
client->sdskv_length_id =
MARGO_REGISTER(mid, "sdskv_length_rpc", length_in_t, length_out_t, NULL);
client->sdskv_length_multi_id =
......@@ -968,6 +972,112 @@ int sdskv_exists(sdskv_provider_handle_t provider,
return ret;
}
int sdskv_exists_multi(sdskv_provider_handle_t provider,
sdskv_database_id_t db_id, size_t num,
const void* const* keys, const hg_size_t* ksizes, int *flags)
{
hg_return_t hret;
int ret;
hg_handle_t handle = HG_HANDLE_NULL;
exists_multi_in_t in;
exists_multi_out_t out;
void** key_seg_ptrs = NULL;
hg_size_t* key_seg_sizes = NULL;
in.db_id = db_id;
in.num_keys = num;
in.keys_bulk_handle = HG_BULK_NULL;
in.keys_bulk_size = 0;
in.flags_bulk_handle = HG_BULK_NULL;
/* create an array of key sizes and key pointers */
key_seg_sizes = malloc(sizeof(hg_size_t)*(num+1));
key_seg_sizes[0] = num*sizeof(hg_size_t);
memcpy(key_seg_sizes+1, ksizes, num*sizeof(hg_size_t));
key_seg_ptrs = malloc(sizeof(void*)*(num+1));
key_seg_ptrs[0] = (void*)ksizes;
memcpy(key_seg_ptrs+1, keys, num*sizeof(void*));
int i;
for(i=0; i<num+1; i++) {
in.keys_bulk_size += key_seg_sizes[i];
}
/* create the bulk handle to access the keys */
hret = margo_bulk_create(provider->client->mid, num+1, key_seg_ptrs, key_seg_sizes,
HG_BULK_READ_ONLY, &in.keys_bulk_handle);
if(hret != HG_SUCCESS) {
fprintf(stderr,"[SDSKV] margo_bulk_create() for keys failed in sdskv_exists_multi()\n");
out.ret = SDSKV_ERR_MERCURY;
goto finish;
}
/* create the bulk handle for the server to whether the keys exist */
hg_size_t exist_size = num/8 + (num % 8 == 0 ? 0 : 1);
uint8_t* exist = calloc(exist_size,1);
hret = margo_bulk_create(provider->client->mid, 1, (void**)&exist, &exist_size,
HG_BULK_WRITE_ONLY, &in.flags_bulk_handle);
if(hret != HG_SUCCESS) {
fprintf(stderr,"[SDSKV] margo_bulk_create() for vsizes failed in sdskv_exists_multi()\n");
out.ret = SDSKV_ERR_MERCURY;
goto finish;
}
/* create a RPC handle */
hret = margo_create(
provider->client->mid,
provider->addr,
provider->client->sdskv_exists_multi_id,
&handle);
if(hret != HG_SUCCESS) {
fprintf(stderr,"[SDSKV] margo_create() failed in sdskv_exists_multi()\n");
out.ret = SDSKV_ERR_MERCURY;
goto finish;
}
/* forward the RPC handle */
hret = margo_provider_forward(provider->provider_id, handle, &in);
if(hret != HG_SUCCESS) {
fprintf(stderr,"[SDSKV] margo_forward() failed in sdskv_exists_multi()\n");
out.ret = SDSKV_ERR_MERCURY;
goto finish;
}
/* get the response */
hret = margo_get_output(handle, &out);
if(hret != HG_SUCCESS) {
fprintf(stderr,"[SDSKV] margo_get_output() failed in sdskv_exists_multi()\n");
out.ret = SDSKV_ERR_MERCURY;
goto finish;
}
ret = out.ret;
if(out.ret != SDSKV_SUCCESS) {
goto finish;
}
uint8_t mask = 1;
for(i = 0; i < num; i++) {
uint8_t c = exist[i/8];
*(flags+i) = c & mask ? 1 : 0;
if(i % 8 == 7) {
mask = 1;
} else {
mask = mask << 1;
}
}
finish:
margo_free_output(handle, &out);
margo_bulk_free(in.keys_bulk_handle);
margo_bulk_free(in.flags_bulk_handle);
free(key_seg_sizes);
free(key_seg_ptrs);
free(exist);
margo_destroy(handle);
return ret;
}
int sdskv_length(sdskv_provider_handle_t provider,
sdskv_database_id_t db_id, const void *key,
hg_size_t ksize, hg_size_t* vsize)
......
......@@ -240,6 +240,15 @@ MERCURY_GEN_PROC(length_packed_in_t, \
((hg_bulk_t)(out_bulk_handle)))
MERCURY_GEN_PROC(length_packed_out_t, ((int32_t)(ret)))
// ------------- EXIST MULTI ------------- //
MERCURY_GEN_PROC(exists_multi_in_t, \
((uint64_t)(db_id))\
((hg_size_t)(num_keys))\
((hg_bulk_t)(keys_bulk_handle))\
((hg_size_t)(keys_bulk_size))\
((hg_bulk_t)(flags_bulk_handle)))
MERCURY_GEN_PROC(exists_multi_out_t, ((int32_t)(ret)))
// ------------- ERASE MULTI ------------- //
MERCURY_GEN_PROC(erase_multi_in_t, \
((uint64_t)(db_id))\
......
......@@ -49,6 +49,7 @@ struct sdskv_server_context_t
hg_id_t sdskv_get_multi_id;
hg_id_t sdskv_get_packed_id;
hg_id_t sdskv_exists_id;
hg_id_t sdskv_exists_multi_id;
hg_id_t sdskv_erase_id;
hg_id_t sdskv_erase_multi_id;
hg_id_t sdskv_length_id;
......@@ -96,6 +97,7 @@ DECLARE_MARGO_RPC_HANDLER(sdskv_list_keyvals_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_erase_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_erase_multi_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_exists_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_exists_multi_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_migrate_keys_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_migrate_key_range_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_migrate_keys_prefixed_ult)
......@@ -242,6 +244,12 @@ extern "C" int sdskv_provider_register(
tmp_svr_ctx->sdskv_exists_id = rpc_id;
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_exists_multi_rpc",
exists_multi_in_t, exists_multi_out_t,
sdskv_exists_multi_ult, provider_id, abt_pool);
tmp_svr_ctx->sdskv_exists_multi_id = rpc_id;
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_bulk_get_rpc",
bulk_get_in_t, bulk_get_out_t,
sdskv_bulk_get_ult, provider_id, abt_pool);
......@@ -1406,6 +1414,118 @@ static void sdskv_length_multi_ult(hg_handle_t handle)
}
DEFINE_MARGO_RPC_HANDLER(sdskv_length_multi_ult)
static void sdskv_exists_multi_ult(hg_handle_t handle)
{
hg_return_t hret;
exists_multi_in_t in;
exists_multi_out_t out;
out.ret = SDSKV_SUCCESS;
std::vector<char> local_keys_buffer;
std::vector<uint8_t> local_flags_buffer;
hg_bulk_t local_keys_bulk_handle;
hg_bulk_t local_flags_bulk_handle;
auto r1 = at_exit([&handle]() { margo_destroy(handle); });
auto r2 = at_exit([&handle,&out]() { margo_respond(handle, &out); });
/* get margo instance and provider */
margo_instance_id mid = margo_hg_handle_get_instance(handle);
const struct hg_info* info = margo_get_info(handle);
sdskv_provider_t svr_ctx =
(sdskv_provider_t)margo_registered_data(mid, info->id);
if(!svr_ctx) {
out.ret = SDSKV_ERR_UNKNOWN_PR;
return;
}
/* deserialize input */
hret = margo_get_input(handle, &in);
if(hret != HG_SUCCESS) {
out.ret = SDSKV_ERR_MERCURY;
return;
}
auto r3 = at_exit([&handle,&in]() { margo_free_input(handle, &in); });
/* find the target database */
ABT_rwlock_rdlock(svr_ctx->lock);
auto it = svr_ctx->databases.find(in.db_id);
if(it == svr_ctx->databases.end()) {
ABT_rwlock_unlock(svr_ctx->lock);
out.ret = SDSKV_ERR_UNKNOWN_DB;
return;
}
auto db = it->second;
ABT_rwlock_unlock(svr_ctx->lock);
/* allocate buffers to receive the keys */
local_keys_buffer.resize(in.keys_bulk_size);
std::vector<void*> keys_addr(1);
keys_addr[0] = (void*)local_keys_buffer.data();
/* create bulk handle to receive key sizes and packed keys */
hret = margo_bulk_create(mid, 1, keys_addr.data(), &in.keys_bulk_size,
HG_BULK_WRITE_ONLY, &local_keys_bulk_handle);
if(hret != HG_SUCCESS) {
out.ret = SDSKV_ERR_MERCURY;
return;
}
auto r6 = at_exit([&local_keys_bulk_handle]() { margo_bulk_free(local_keys_bulk_handle); });
/* allocate buffer to send the flags */
hg_size_t local_flags_buffer_size = in.num_keys/8 + (in.num_keys % 8 == 0 ? 0 : 1);
local_flags_buffer.resize(local_flags_buffer_size, 0);
std::vector<void*> flags_buffer_addr(1);
flags_buffer_addr[0] = (void*)local_flags_buffer.data();
/* create bulk handle to send flags */
hret = margo_bulk_create(mid, 1, flags_buffer_addr.data(), &local_flags_buffer_size,
HG_BULK_READ_ONLY, &local_flags_bulk_handle);
if(hret != HG_SUCCESS) {
out.ret = SDSKV_ERR_MERCURY;
return;
}
auto r7 = at_exit([&local_flags_bulk_handle]() { margo_bulk_free(local_flags_bulk_handle); });
/* transfer keys */
hret = margo_bulk_transfer(mid, HG_BULK_PULL, info->addr, in.keys_bulk_handle, 0,
local_keys_bulk_handle, 0, in.keys_bulk_size);
if(hret != HG_SUCCESS) {
out.ret = SDSKV_ERR_MERCURY;
return;
}
/* interpret beginning of the key buffer as a list of key sizes */
hg_size_t* key_sizes = (hg_size_t*)local_keys_buffer.data();
/* find beginning of packed keys */
char* packed_keys = local_keys_buffer.data() + in.num_keys*sizeof(hg_size_t);
/* go through the key/value pairs and get the values from the database */
uint8_t mask = 1;
for(unsigned i=0; i < in.num_keys; i++) {
ds_bulk_t kdata(packed_keys, packed_keys+key_sizes[i]);
ds_bulk_t vdata;
if(db->get(kdata, vdata)) {
local_flags_buffer[i/8] |= mask;
}
mask = mask << 1;
if(i % 8 == 7)
mask = 1;
packed_keys += key_sizes[i];
}
/* do a PUSH operation to push back the value sizes to the client */
hret = margo_bulk_transfer(mid, HG_BULK_PUSH, info->addr, in.flags_bulk_handle, 0,
local_flags_bulk_handle, 0, local_flags_buffer_size);
if(hret != HG_SUCCESS) {
out.ret = SDSKV_ERR_MERCURY;
return;
}
return;
}
DEFINE_MARGO_RPC_HANDLER(sdskv_exists_multi_ult)
static void sdskv_length_packed_ult(hg_handle_t handle)
{
......
......@@ -139,6 +139,32 @@ int main(int argc, char *argv[])
}
printf("Successfuly inserted %d keys\n", num_keys);
/* check that the values exist */
std::vector<int> keys_exist(num_keys);
ret = sdskv_exists_multi(kvph, db_id, num_keys,
keys_ptr.data(), keys_size.data(), keys_exist.data());
if(ret != 0) {
fprintf(stderr, "Error: sdskv_length_multi() failed\n");
sdskv_shutdown_service(kvcl, svr_addr);
sdskv_provider_handle_release(kvph);
margo_addr_free(mid, svr_addr);
sdskv_client_finalize(kvcl);
margo_finalize(mid);
return -1;
}
for(auto& k : keys_exist) {
if(k != 1) {
fprintf(stderr, "Error: sdskv_exists_multi() failed (one flag is not 1)\n");
sdskv_shutdown_service(kvcl, svr_addr);
sdskv_provider_handle_release(kvph);
margo_addr_free(mid, svr_addr);
sdskv_client_finalize(kvcl);
margo_finalize(mid);
return -1;
}
}
/* retrieve the length of the values */
std::vector<hg_size_t> rval_len(num_keys);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment