Commit 9fa8d431 authored by Matthieu Dorier's avatar Matthieu Dorier
Browse files

added erase_multi function

parent 0b99eddb
......@@ -121,8 +121,8 @@ int sdskv_put(sdskv_provider_handle_t provider,
*/
int sdskv_put_multi(sdskv_provider_handle_t provider,
sdskv_database_id_t db_id,
size_t num, const void** keys, const hg_size_t* ksizes,
const void** values, const hg_size_t *vsizes);
size_t num, const void* const* keys, const hg_size_t* ksizes,
const void* const* values, const hg_size_t *vsizes);
/**
* @brief Gets the value associated with a given key.
......@@ -170,7 +170,7 @@ int sdskv_get(sdskv_provider_handle_t provider,
*/
int sdskv_get_multi(sdskv_provider_handle_t provider,
sdskv_database_id_t db_id,
size_t num, const void** keys, const hg_size_t* ksizes,
size_t num, const void* const* keys, const hg_size_t* ksizes,
void** values, hg_size_t *vsizes);
/**
......@@ -205,7 +205,7 @@ int sdskv_length(sdskv_provider_handle_t handle,
*/
int sdskv_length_multi(sdskv_provider_handle_t handle,
sdskv_database_id_t db_id, size_t num,
const void** keys, const hg_size_t* ksizes,
const void* const* keys, const hg_size_t* ksizes,
hg_size_t *vsizes);
/**
......@@ -236,6 +236,22 @@ int sdskv_erase(sdskv_provider_handle_t handle,
sdskv_database_id_t db_id, const void *key,
hg_size_t ksize);
/**
* @brief Erases all the key/value pairs pointed to by the given keys.
*
* @param handle provider handle
* @param db_id database id
* @param num_keys number of keys
* @param keys array of keys
* @param ksizes array of key sizes
*
* @return SDSKV_SUCCESS or error code defined in sdskv-common.h
*/
int sdskv_erase_multi(sdskv_provider_handle_t handle,
sdskv_database_id_t db_id, size_t num_keys,
const void* const* keys,
const hg_size_t* ksizes);
/**
* Lists at most max_keys keys starting strictly after start_key,
* whether start_key is effectively in the database or not. "strictly after"
......
......@@ -13,6 +13,7 @@ struct sdskv_client {
hg_id_t sdskv_get_multi_id;
hg_id_t sdskv_exists_id;
hg_id_t sdskv_erase_id;
hg_id_t sdskv_erase_multi_id;
hg_id_t sdskv_length_id;
hg_id_t sdskv_length_multi_id;
hg_id_t sdskv_bulk_get_id;
......@@ -52,6 +53,7 @@ static int sdskv_client_register(sdskv_client_t client, margo_instance_id mid)
margo_registered_name(mid, "sdskv_get_rpc", &client->sdskv_get_id, &flag);
margo_registered_name(mid, "sdskv_get_multi_rpc", &client->sdskv_get_multi_id, &flag);
margo_registered_name(mid, "sdskv_erase_rpc", &client->sdskv_erase_id, &flag);
margo_registered_name(mid, "sdskv_erase_multi_rpc", &client->sdskv_erase_multi_id, &flag);
margo_registered_name(mid, "sdskv_exists_rpc", &client->sdskv_exists_id, &flag);
margo_registered_name(mid, "sdskv_length_rpc", &client->sdskv_length_id, &flag);
margo_registered_name(mid, "sdskv_length_multi_rpc", &client->sdskv_length_multi_id, &flag);
......@@ -78,6 +80,8 @@ static int sdskv_client_register(sdskv_client_t client, margo_instance_id mid)
MARGO_REGISTER(mid, "sdskv_get_multi_rpc", get_multi_in_t, get_multi_out_t, NULL);
client->sdskv_erase_id =
MARGO_REGISTER(mid, "sdskv_erase_rpc", erase_in_t, erase_out_t, NULL);
client->sdskv_erase_multi_id =
MARGO_REGISTER(mid, "sdskv_erase_multi_rpc", erase_multi_in_t, erase_multi_out_t, NULL);
client->sdskv_exists_id =
MARGO_REGISTER(mid, "sdskv_exists_rpc", exists_in_t, exists_out_t, NULL);
client->sdskv_length_id =
......@@ -329,8 +333,8 @@ int sdskv_put(sdskv_provider_handle_t provider,
int sdskv_put_multi(sdskv_provider_handle_t provider,
sdskv_database_id_t db_id,
size_t num, const void** keys, const hg_size_t* ksizes,
const void** values, const hg_size_t *vsizes)
size_t num, const void* const* keys, const hg_size_t* ksizes,
const void* const* values, const hg_size_t *vsizes)
{
hg_return_t hret;
int ret;
......@@ -536,7 +540,7 @@ int sdskv_get(sdskv_provider_handle_t provider,
int sdskv_get_multi(sdskv_provider_handle_t provider,
sdskv_database_id_t db_id,
size_t num, const void** keys, const hg_size_t* ksizes,
size_t num, const void* const* keys, const hg_size_t* ksizes,
void** values, hg_size_t *vsizes)
{
hg_return_t hret;
......@@ -737,7 +741,7 @@ int sdskv_length(sdskv_provider_handle_t provider,
int sdskv_length_multi(sdskv_provider_handle_t provider,
sdskv_database_id_t db_id, size_t num,
const void** keys, const hg_size_t* ksizes, hg_size_t *vsizes)
const void* const* keys, const hg_size_t* ksizes, hg_size_t *vsizes)
{
hg_return_t hret;
int ret;
......@@ -870,6 +874,88 @@ int sdskv_erase(sdskv_provider_handle_t provider,
return ret;
}
int sdskv_erase_multi(sdskv_provider_handle_t provider,
sdskv_database_id_t db_id, size_t num,
const void* const* keys,
const hg_size_t* ksizes)
{
hg_return_t hret;
int ret;
hg_handle_t handle = HG_HANDLE_NULL;
erase_multi_in_t in;
erase_multi_out_t out;
void** key_seg_ptrs = NULL;
hg_size_t* key_seg_sizes = NULL;
in.db_id = db_id;
in.num_keys = num;
in.keys_bulk_handle = HG_BULK_NULL;
in.keys_bulk_size = 0;
/* create an array of key sizes and key pointers */
key_seg_sizes = malloc(sizeof(hg_size_t)*(num+1));
key_seg_sizes[0] = num*sizeof(hg_size_t);
memcpy(key_seg_sizes+1, ksizes, num*sizeof(hg_size_t));
key_seg_ptrs = malloc(sizeof(void*)*(num+1));
key_seg_ptrs[0] = (void*)ksizes;
memcpy(key_seg_ptrs+1, keys, num*sizeof(void*));
int i;
for(i=0; i<num+1; i++) {
in.keys_bulk_size += key_seg_sizes[i];
}
/* create the bulk handle to access the keys */
hret = margo_bulk_create(provider->client->mid, num+1, key_seg_ptrs, key_seg_sizes,
HG_BULK_READ_ONLY, &in.keys_bulk_handle);
if(hret != HG_SUCCESS) {
fprintf(stderr,"[SDSKV] margo_bulk_create() failed in sdskv_erase_multi()\n");
out.ret = SDSKV_ERR_MERCURY;
goto finish;
}
/* create a RPC handle */
hret = margo_create(
provider->client->mid,
provider->addr,
provider->client->sdskv_erase_multi_id,
&handle);
if(hret != HG_SUCCESS) {
fprintf(stderr,"[SDSKV] margo_create() failed in sdskv_erase_multi()\n");
out.ret = SDSKV_ERR_MERCURY;
goto finish;
}
/* forward the RPC handle */
hret = margo_provider_forward(provider->provider_id, handle, &in);
if(hret != HG_SUCCESS) {
fprintf(stderr,"[SDSKV] margo_forward() failed in sdskv_erase_multi()\n");
out.ret = SDSKV_ERR_MERCURY;
goto finish;
}
/* get the response */
hret = margo_get_output(handle, &out);
if(hret != HG_SUCCESS) {
fprintf(stderr,"[SDSKV] margo_get_output() failed in sdskv_erase_multi()\n");
out.ret = SDSKV_ERR_MERCURY;
goto finish;
}
ret = out.ret;
if(out.ret != SDSKV_SUCCESS) {
goto finish;
}
finish:
margo_free_output(handle, &out);
margo_bulk_free(in.keys_bulk_handle);
free(key_seg_sizes);
free(key_seg_ptrs);
margo_destroy(handle);
return ret;
}
int sdskv_list_keys(sdskv_provider_handle_t provider,
sdskv_database_id_t db_id, // db instance
const void *start_key, // we want keys strictly after this start_key
......
......@@ -142,6 +142,14 @@ MERCURY_GEN_PROC(length_multi_in_t, \
((hg_bulk_t)(vals_size_bulk_handle)))
MERCURY_GEN_PROC(length_multi_out_t, ((int32_t)(ret)))
// ------------- LENGTH MULTI ------------- //
MERCURY_GEN_PROC(erase_multi_in_t, \
((uint64_t)(db_id))\
((hg_size_t)(num_keys))\
((hg_bulk_t)(keys_bulk_handle))\
((hg_size_t)(keys_bulk_size)))
MERCURY_GEN_PROC(erase_multi_out_t, ((int32_t)(ret)))
// ------------- MIGRATE KEYS ----------- //
MERCURY_GEN_PROC(migrate_keys_in_t,
((uint64_t)(source_db_id))\
......
......@@ -180,15 +180,18 @@ int main(int argc, char **argv)
}
sdskv_database_id_t db_id;
ret = sdskv_provider_add_database(provider,
opts.db_names[i],
"",
opts.db_types[i], SDSKV_COMPARE_DEFAULT,
&db_id);
sdskv_config_t db_config = {
.db_name = opts.db_names[i],
.db_path = "",
.db_type = opts.db_types[i],
.db_comparison_fn = SDSKV_COMPARE_DEFAULT,
.db_no_overwrite = 0
};
ret = sdskv_provider_attach_database(provider, &db_config, &db_id);
if(ret != 0)
{
fprintf(stderr, "Error: bake_provider_add_database()\n");
fprintf(stderr, "Error: bake_provider_attach_database()\n");
margo_finalize(mid);
return(-1);
}
......@@ -213,16 +216,18 @@ int main(int argc, char **argv)
for(i=0; i < opts.num_db; i++) {
sdskv_database_id_t db_id;
ret = sdskv_provider_add_database(provider,
opts.db_names[i],
"",
opts.db_types[i],
SDSKV_COMPARE_DEFAULT,
&db_id);
sdskv_config_t db_config = {
.db_name = opts.db_names[i],
.db_path = "",
.db_type = opts.db_types[i],
.db_comparison_fn = SDSKV_COMPARE_DEFAULT,
.db_no_overwrite = 0
};
ret = sdskv_provider_attach_database(provider, &db_config, &db_id);
if(ret != 0)
{
fprintf(stderr, "Error: sdskv_provider_add_database()\n");
fprintf(stderr, "Error: sdskv_provider_attach_database()\n");
margo_finalize(mid);
return(-1);
}
......
......@@ -25,6 +25,7 @@ struct sdskv_server_context_t
hg_id_t sdskv_get_multi_id;
hg_id_t sdskv_exists_id;
hg_id_t sdskv_erase_id;
hg_id_t sdskv_erase_multi_id;
hg_id_t sdskv_length_id;
hg_id_t sdskv_length_multi_id;
hg_id_t sdskv_bulk_get_id;
......@@ -63,6 +64,7 @@ DECLARE_MARGO_RPC_HANDLER(sdskv_bulk_get_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_list_keys_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_list_keyvals_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_erase_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_erase_multi_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_exists_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_migrate_keys_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_migrate_key_range_ult)
......@@ -162,6 +164,11 @@ extern "C" int sdskv_provider_register(
sdskv_erase_ult, provider_id, abt_pool);
tmp_svr_ctx->sdskv_erase_id = rpc_id;
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_erase_multi_rpc",
erase_multi_in_t, erase_multi_out_t,
sdskv_erase_multi_ult, provider_id, abt_pool);
tmp_svr_ctx->sdskv_erase_multi_id = rpc_id;
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
/* migration RPC */
rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_migrate_keys_rpc",
migrate_keys_in_t, migrate_keys_out_t,
......@@ -1061,6 +1068,74 @@ static void sdskv_erase_ult(hg_handle_t handle)
}
DEFINE_MARGO_RPC_HANDLER(sdskv_erase_ult)
static void sdskv_erase_multi_ult(hg_handle_t handle)
{
hg_return_t hret;
erase_multi_in_t in;
erase_multi_out_t out;
out.ret = SDSKV_SUCCESS;
std::vector<char> local_keys_buffer;
hg_bulk_t local_keys_bulk_handle;
auto r1 = at_exit([&handle]() { margo_destroy(handle); });
auto r2 = at_exit([&handle,&out]() { margo_respond(handle, &out); });
/* get margo instance and provider */
margo_instance_id mid = margo_hg_handle_get_instance(handle);
const struct hg_info* info = margo_get_info(handle);
sdskv_provider_t svr_ctx =
(sdskv_provider_t)margo_registered_data(mid, info->id);
if(!svr_ctx) {
out.ret = SDSKV_ERR_UNKNOWN_PR;
return;
}
/* deserialize input */
hret = margo_get_input(handle, &in);
if(hret != HG_SUCCESS) {
out.ret = SDSKV_ERR_MERCURY;
return;
}
auto r3 = at_exit([&handle,&in]() { margo_free_input(handle, &in); });
/* find the target database */
auto it = svr_ctx->databases.find(in.db_id);
if(it == svr_ctx->databases.end()) {
out.ret = SDSKV_ERR_UNKNOWN_DB;
return;
}
/* allocate buffers to receive the keys */
local_keys_buffer.resize(in.keys_bulk_size);
std::vector<void*> keys_addr(1);
keys_addr[0] = (void*)local_keys_buffer.data();
/* create bulk handle to receive key sizes and packed keys */
hret = margo_bulk_create(mid, 1, keys_addr.data(), &in.keys_bulk_size,
HG_BULK_WRITE_ONLY, &local_keys_bulk_handle);
if(hret != HG_SUCCESS) {
out.ret = SDSKV_ERR_MERCURY;
return;
}
auto r6 = at_exit([&local_keys_bulk_handle]() { margo_bulk_free(local_keys_bulk_handle); });
/* interpret beginning of the key buffer as a list of key sizes */
hg_size_t* key_sizes = (hg_size_t*)local_keys_buffer.data();
/* find beginning of packed keys */
char* packed_keys = local_keys_buffer.data() + in.num_keys*sizeof(hg_size_t);
/* go through the key/value pairs and erase them */
for(unsigned i=0; i < in.num_keys; i++) {
ds_bulk_t kdata(packed_keys, packed_keys+key_sizes[i]);
it->second->erase(kdata);
packed_keys += key_sizes[i];
}
return;
}
DEFINE_MARGO_RPC_HANDLER(sdskv_erase_multi_ult)
static void sdskv_exists_ult(hg_handle_t handle)
{
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment