Commit 95de9497 authored by Matthieu Dorier's avatar Matthieu Dorier

added put_packed functionality

parent 0eccaf2c
......@@ -178,6 +178,28 @@ int sdskv_put_multi(sdskv_provider_handle_t provider,
size_t num, const void* const* keys, const hg_size_t* ksizes,
const void* const* values, const hg_size_t *vsizes);
/**
* @brief Puts multiple key/value pairs into the database.
* This method will send all the key/value pairs in batch,
* thus optimizing transfers by avoiding many RPC round trips.
* This version of put_multi assumes that the keys are packed
* in back to back in a single buffer, and so are the values.
*
* @param provider provider handle managing the database
* @param db_id targeted database id
* @param num number of key/value pairs to put
* @param packed_keys buffer containing the keys
* @param ksizes array of key sizes
* @param packed_values buffer containing the values
* @param vsizes array of value sizes
*
* @return SDSKV_SUCCESS or error code defined in sdskv-common.h
*/
int sdskv_put_packed(sdskv_provider_handle_t provider,
sdskv_database_id_t db_id,
size_t num, const void* packed_keys, const hg_size_t *ksizes,
const void* packed_values, const hg_size_t *vsizes);
/**
* @brief Gets the value associated with a given key.
* vsize needs to be set to the current size of the allocated
......
......@@ -324,6 +324,39 @@ class client {
put_multi(db, kdata, ksizes, vdata, vsizes);
}
//////////////////////////
// PUT_PACKED methods
//////////////////////////
/**
* @brief Equivalent to sdskv_put_packed.
*
* @param db Database instance.
* @param count Number of key/val pairs.
* @param keys Buffer of keys.
* @param ksizes Array of key sizes.
* @param values Buffer of values.
* @param vsizes Array of value sizes.
*/
void put_packed(const database& db,
hg_size_t count, const void* keys, const hg_size_t* ksizes,
const void* values, const hg_size_t *vsizes) const;
/**
* @brief Version of put taking std::strings instead of pointers.
*
* @param db Database instance.
* @param keys Vector of pointers to keys.
* @param ksizes Vector of key sizes.
* @param values Vector of pointers to values.
* @param vsizes Vector of value sizes.
*/
inline void put_packed(const database& db,
const std::string& packed_keys, const std::vector<hg_size_t>& ksizes,
const std::string& packed_values, const std::vector<hg_size_t>& vsizes) const {
put_packed(db, ksizes.size(), packed_keys.data(), ksizes.data(), packed_values.data(), vsizes.data());
}
//////////////////////////
// EXISTS methods
//////////////////////////
......@@ -1305,6 +1338,14 @@ class database {
m_ph.m_client->put_multi(*this, std::forward<T>(args)...);
}
/**
* @brief @see client::put_packed.
*/
template<typename ... T>
void put_packed(T&& ... args) const {
m_ph.m_client->put_packed(*this, std::forward<T>(args)...);
}
/**
* @brief @see client::length.
*/
......@@ -1438,6 +1479,14 @@ inline void client::put_multi(const database& db,
_CHECK_RET(ret);
}
inline void client::put_packed(const database& db,
hg_size_t count, const void* keys, const hg_size_t* ksizes,
const void* values, const hg_size_t *vsizes) const {
int ret = sdskv_put_packed(db.m_ph.m_ph, db.m_db_id,
count, keys, ksizes, values, vsizes);
_CHECK_RET(ret);
}
inline hg_size_t client::length(const database& db,
const void* key, hg_size_t ksize) const {
hg_size_t vsize;
......
......@@ -41,6 +41,23 @@ class AbstractDataStore {
}
return ret;
}
virtual int put_packed(hg_size_t num_items,
const char* keys,
const hg_size_t* ksizes,
const char* values,
const hg_size_t* vsizes)
{
int ret = 0;
size_t keys_offset = 0;
size_t vals_offset = 0;
for(hg_size_t i=0; i < num_items; i++) {
int r = put(keys+keys_offset, ksizes[i], values+vals_offset, vsizes[i]);
ret = ret == 0 ? r : 0;
keys_offset += ksizes[i];
vals_offset += vsizes[i];
}
return ret;
}
virtual bool get(const ds_bulk_t &key, ds_bulk_t &data)=0;
virtual bool get(const ds_bulk_t &key, std::vector<ds_bulk_t> &data)=0;
virtual bool exists(const void* key, hg_size_t ksize) const = 0;
......
......@@ -14,6 +14,7 @@ struct sdskv_client {
/* accessing database */
hg_id_t sdskv_put_id;
hg_id_t sdskv_put_multi_id;
hg_id_t sdskv_put_packed_id;
hg_id_t sdskv_bulk_put_id;
hg_id_t sdskv_get_id;
hg_id_t sdskv_get_multi_id;
......@@ -59,6 +60,7 @@ static int sdskv_client_register(sdskv_client_t client, margo_instance_id mid)
margo_registered_name(mid, "sdskv_list_databases_rpc", &client->sdskv_list_databases_id, &flag);
margo_registered_name(mid, "sdskv_put_rpc", &client->sdskv_put_id, &flag);
margo_registered_name(mid, "sdskv_put_multi_rpc", &client->sdskv_put_multi_id, &flag);
margo_registered_name(mid, "sdskv_put_packed_rpc", &client->sdskv_put_packed_id, &flag);
margo_registered_name(mid, "sdskv_bulk_put_rpc", &client->sdskv_bulk_put_id, &flag);
margo_registered_name(mid, "sdskv_get_rpc", &client->sdskv_get_id, &flag);
margo_registered_name(mid, "sdskv_get_multi_rpc", &client->sdskv_get_multi_id, &flag);
......@@ -88,6 +90,8 @@ static int sdskv_client_register(sdskv_client_t client, margo_instance_id mid)
MARGO_REGISTER(mid, "sdskv_put_rpc", put_in_t, put_out_t, NULL);
client->sdskv_put_multi_id =
MARGO_REGISTER(mid, "sdskv_put_multi_rpc", put_multi_in_t, put_multi_out_t, NULL);
client->sdskv_put_packed_id =
MARGO_REGISTER(mid, "sdskv_put_packed_rpc", put_packed_in_t, put_packed_out_t, NULL);
client->sdskv_bulk_put_id =
MARGO_REGISTER(mid, "sdskv_bulk_put_rpc", bulk_put_in_t, bulk_put_out_t, NULL);
client->sdskv_get_id =
......@@ -582,6 +586,77 @@ finish:
return ret;
}
int sdskv_put_packed(sdskv_provider_handle_t provider,
sdskv_database_id_t db_id,
size_t num, const void* packed_keys, const hg_size_t *ksizes,
const void* packed_values, const hg_size_t *vsizes)
{
hg_return_t hret;
int ret = SDSKV_SUCCESS;
hg_handle_t handle;
put_packed_in_t in;
put_packed_out_t out;
in.db_id = db_id;
in.num_keys = num;
hg_size_t keys_buffer_size = 0;
hg_size_t vals_buffer_size = 0;
unsigned i=0;
for(i=0; i < num; i++) {
keys_buffer_size += ksizes[i];
vals_buffer_size += vsizes[i];
}
in.bulk_size = keys_buffer_size + vals_buffer_size + 2*num*sizeof(size_t);
hg_size_t seg_sizes[4] = { num*sizeof(size_t), num*sizeof(size_t), keys_buffer_size, vals_buffer_size };
void* seg_ptrs[4] = { (void*)ksizes, (void*)vsizes, (void*)packed_keys, (void*)packed_values };
int num_seg = vals_buffer_size == 0 ? 3 : 4;
hret = margo_bulk_create(provider->client->mid, num_seg, seg_ptrs, seg_sizes,
HG_BULK_READ_ONLY, &in.bulk_handle);
if(hret != HG_SUCCESS) {
fprintf(stderr,"[SDSKV] margo_bulk_create() failed in sdskv_put_packed()\n");
return SDSKV_ERR_MERCURY;
}
/* create handle */
hret = margo_create(
provider->client->mid,
provider->addr,
provider->client->sdskv_put_packed_id,
&handle);
if(hret != HG_SUCCESS) {
fprintf(stderr,"[SDSKV] margo_create() failed in sdskv_put_packed()\n");
margo_bulk_free(in.bulk_handle);
return SDSKV_ERR_MERCURY;
}
hret = margo_provider_forward(provider->provider_id, handle, &in);
if(hret != HG_SUCCESS) {
fprintf(stderr,"[SDSKV] margo_forward() failed in sdskv_put_packed()\n");
margo_bulk_free(in.bulk_handle);
margo_destroy(handle);
return SDSKV_ERR_MERCURY;
}
hret = margo_get_output(handle, &out);
if(hret != HG_SUCCESS) {
fprintf(stderr,"[SDSKV] margo_get_output() failed in sdskv_put_packed()\n");
margo_bulk_free(in.bulk_handle);
margo_destroy(handle);
return SDSKV_ERR_MERCURY;
}
ret = out.ret;
margo_free_output(handle, &out);
margo_bulk_free(in.bulk_handle);
margo_destroy(handle);
return ret;
}
int sdskv_get(sdskv_provider_handle_t provider,
sdskv_database_id_t db_id,
const void *key, hg_size_t ksize,
......
......@@ -191,6 +191,14 @@ MERCURY_GEN_PROC(put_multi_in_t, \
((hg_size_t)(vals_bulk_size)))
MERCURY_GEN_PROC(put_multi_out_t, ((int32_t)(ret)))
// ------------- PUT PACKED MULTI ------------- //
MERCURY_GEN_PROC(put_packed_in_t, \
((uint64_t)(db_id))\
((hg_size_t)(num_keys))\
((hg_size_t)(bulk_size))\
((hg_bulk_t)(bulk_handle)))
MERCURY_GEN_PROC(put_packed_out_t, ((int32_t)(ret)))
// ------------- GET MULTI ------------- //
MERCURY_GEN_PROC(get_multi_in_t, \
((uint64_t)(db_id))\
......
......@@ -43,6 +43,7 @@ struct sdskv_server_context_t
hg_id_t sdskv_list_databases_id;
hg_id_t sdskv_put_id;
hg_id_t sdskv_put_multi_id;
hg_id_t sdskv_put_packed_id;
hg_id_t sdskv_bulk_put_id;
hg_id_t sdskv_get_id;
hg_id_t sdskv_get_multi_id;
......@@ -79,6 +80,7 @@ DECLARE_MARGO_RPC_HANDLER(sdskv_count_db_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_list_db_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_put_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_put_multi_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_put_packed_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_length_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_length_multi_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_get_ult)
......@@ -182,6 +184,12 @@ extern "C" int sdskv_provider_register(
tmp_svr_ctx->sdskv_put_multi_id = rpc_id;
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_put_packed_rpc",
put_packed_in_t, put_packed_out_t,
sdskv_put_packed_ult, provider_id, abt_pool);
tmp_svr_ctx->sdskv_put_packed_id = rpc_id;
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_bulk_put_rpc",
bulk_put_in_t, bulk_put_out_t,
sdskv_bulk_put_ult, provider_id, abt_pool);
......@@ -787,6 +795,85 @@ static void sdskv_put_multi_ult(hg_handle_t handle)
}
DEFINE_MARGO_RPC_HANDLER(sdskv_put_multi_ult)
static void sdskv_put_packed_ult(hg_handle_t handle)
{
hg_return_t hret;
put_packed_in_t in;
put_packed_out_t out;
out.ret = SDSKV_SUCCESS;
std::vector<char> local_buffer;
hg_bulk_t local_bulk_handle;
auto r1 = at_exit([&handle]() { margo_destroy(handle); });
auto r2 = at_exit([&handle,&out]() { margo_respond(handle, &out); });
margo_instance_id mid = margo_hg_handle_get_instance(handle);
const struct hg_info* info = margo_get_info(handle);
sdskv_provider_t svr_ctx =
(sdskv_provider_t)margo_registered_data(mid, info->id);
if(!svr_ctx) {
out.ret = SDSKV_ERR_UNKNOWN_PR;
return;
}
hret = margo_get_input(handle, &in);
if(hret != HG_SUCCESS) {
out.ret = SDSKV_ERR_MERCURY;
return;
}
auto r3 = at_exit([&handle,&in]() { margo_free_input(handle, &in); });
ABT_rwlock_rdlock(svr_ctx->lock);
auto it = svr_ctx->databases.find(in.db_id);
if(it == svr_ctx->databases.end()) {
ABT_rwlock_unlock(svr_ctx->lock);
out.ret = SDSKV_ERR_UNKNOWN_DB;
return;
}
auto db = it->second;
ABT_rwlock_unlock(svr_ctx->lock);
// allocate a buffer to receive the keys and a buffer to receive the values
local_buffer.resize(in.bulk_size);
void* buf_ptr = local_buffer.data();
hg_size_t buf_size = in.bulk_size;
/* create bulk handle to receive keys */
hret = margo_bulk_create(mid, 1, &buf_ptr, &buf_size,
HG_BULK_WRITE_ONLY, &local_bulk_handle);
if(hret != HG_SUCCESS) {
out.ret = SDSKV_ERR_MERCURY;
return;
}
auto r6 = at_exit([&local_bulk_handle]() { margo_bulk_free(local_bulk_handle); });
/* transfer data */
hret = margo_bulk_transfer(mid, HG_BULK_PULL, info->addr, in.bulk_handle, 0,
local_bulk_handle, 0, in.bulk_size);
if(hret != HG_SUCCESS) {
out.ret = SDSKV_ERR_MERCURY;
return;
}
/* interpret buffer as a list of key sizes */
hg_size_t* key_sizes = (hg_size_t*)local_buffer.data();
/* interpret buffer as a list of value sizes */
hg_size_t* val_sizes = key_sizes + in.num_keys;
/* interpret buffer as list of keys */
char* packed_keys = (char*)(val_sizes + in.num_keys);
/* compute the size of part of the buffer that contain keys */
size_t k=0;
for(unsigned i=0; i < in.num_keys; i++) k += key_sizes[i];
/* interpret the rest of the buffer as list of values */
char* packed_vals = packed_keys + k;
/* insert key/vals into the DB */
out.ret = db->put_packed(in.num_keys, packed_keys, key_sizes, packed_vals, val_sizes);
return;
}
DEFINE_MARGO_RPC_HANDLER(sdskv_put_packed_ult)
static void sdskv_length_ult(hg_handle_t handle)
{
hg_return_t hret;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment