Commit 0885eaad authored by Matthieu Dorier's avatar Matthieu Dorier

Merge branch 'dev-put-multi-packed' into 'master'

Added "packed" versions of "put_multi", "get_multi" and "length_multi"

See merge request !10
parents 0eccaf2c 05468e39
......@@ -24,6 +24,7 @@ check_PROGRAMS = test/sdskv-open-test \
test/sdskv-custom-cmp-test \
test/sdskv-migrate-test \
test/sdskv-multi-test \
test/sdskv-packed-test \
test/sdskv-cxx-test \
test/sdskv-custom-server-daemon
......@@ -127,6 +128,7 @@ TESTS = test/basic.sh \
test/migrate-test.sh \
test/custom-cmp-test.sh \
test/multi-test.sh \
test/packed-test.sh \
test/cxx-test.sh
TESTS_ENVIRONMENT = TIMEOUT="$(TIMEOUT)" \
......@@ -185,6 +187,10 @@ test_sdskv_multi_test_SOURCES = test/sdskv-multi-test.cc
test_sdskv_multi_test_DEPENDENCIES = lib/libsdskv-client.la
test_sdskv_multi_test_LDFLAGS = -Llib -lsdskv-client
test_sdskv_packed_test_SOURCES = test/sdskv-packed-test.cc
test_sdskv_packed_test_DEPENDENCIES = lib/libsdskv-client.la
test_sdskv_packed_test_LDFLAGS = -Llib -lsdskv-client
test_sdskv_cxx_test_SOURCES = test/sdskv-cxx-test.cc
test_sdskv_cxx_test_DEPENDENCIES = lib/libsdskv-client.la
test_sdskv_cxx_test_LDFLAGS = -Llib -lsdskv-client
......
......@@ -178,6 +178,28 @@ int sdskv_put_multi(sdskv_provider_handle_t provider,
size_t num, const void* const* keys, const hg_size_t* ksizes,
const void* const* values, const hg_size_t *vsizes);
/**
* @brief Puts multiple key/value pairs into the database.
* This method will send all the key/value pairs in batch,
* thus optimizing transfers by avoiding many RPC round trips.
* This version of put_multi assumes that the keys are packed
* in back to back in a single buffer, and so are the values.
*
* @param provider provider handle managing the database
* @param db_id targeted database id
* @param num number of key/value pairs to put
* @param packed_keys buffer containing the keys
* @param ksizes array of key sizes
* @param packed_values buffer containing the values
* @param vsizes array of value sizes
*
* @return SDSKV_SUCCESS or error code defined in sdskv-common.h
*/
int sdskv_put_packed(sdskv_provider_handle_t provider,
sdskv_database_id_t db_id,
size_t num, const void* packed_keys, const hg_size_t *ksizes,
const void* packed_values, const hg_size_t *vsizes);
/**
* @brief Gets the value associated with a given key.
* vsize needs to be set to the current size of the allocated
......@@ -236,6 +258,25 @@ int sdskv_get_multi(sdskv_provider_handle_t provider,
size_t num, const void* const* keys, const hg_size_t* ksizes,
void** values, hg_size_t *vsizes);
/**
* @brief Get multiple values into a single packed buffer.
*
* @param[in] provider provider handle
* @param[in] db_id database id
* @param[inout] num number of values to retrieve, number of values actually retrieved
* @param[in] keys buffer of packed keys to retrieve
* @param[in] ksizes size of the keys
* @param[in] vbufsize size of the buffer allocated for the values
* @param[out] values buffer allocated to receive packed values
* @param[out] vsizes sizes of the values
*
* @return SDSKV_SUCCESS or error code defined in sdskv-common.h
*/
int sdskv_get_packed(sdskv_provider_handle_t provider,
sdskv_database_id_t db_id,
size_t* num, const void* packed_keys, const hg_size_t* ksizes,
hg_size_t vbufsize, void* packed_values, hg_size_t *vsizes);
/**
* @brief Gets the length of a value associated with a given key.
*
......@@ -271,6 +312,26 @@ int sdskv_length_multi(sdskv_provider_handle_t handle,
const void* const* keys, const hg_size_t* ksizes,
hg_size_t *vsizes);
/**
* @brief Gets the length of values associated with multiple keys.
* If a particular key does not exists, this function will set the length
* of its value to 0 (so the user needs another way to differenciate
* between a key that does not exists and a 0-sized value).
*
* @param[in] handle provider handle
* @param[in] db_id database id
* @param[in] num number of keys
* @param[in] keys buffer of packed keys
* @param[in] ksizes array of key sizes
* @param[out] vsizes array where to put value sizes
*
* @return SDSKV_SUCCESS or error code defined in sdskv-common.h
*/
int sdskv_length_packed(sdskv_provider_handle_t handle,
sdskv_database_id_t db_id, size_t num,
const void* packed_keys, const hg_size_t* ksizes,
hg_size_t *vsizes);
/**
* @brief Checks if the given key exists in the database.
*
......
......@@ -324,6 +324,39 @@ class client {
put_multi(db, kdata, ksizes, vdata, vsizes);
}
//////////////////////////
// PUT_PACKED methods
//////////////////////////
/**
* @brief Equivalent to sdskv_put_packed.
*
* @param db Database instance.
* @param count Number of key/val pairs.
* @param keys Buffer of keys.
* @param ksizes Array of key sizes.
* @param values Buffer of values.
* @param vsizes Array of value sizes.
*/
void put_packed(const database& db,
hg_size_t count, const void* keys, const hg_size_t* ksizes,
const void* values, const hg_size_t *vsizes) const;
/**
* @brief Version of put taking std::strings instead of pointers.
*
* @param db Database instance.
* @param keys Vector of pointers to keys.
* @param ksizes Vector of key sizes.
* @param values Vector of pointers to values.
* @param vsizes Vector of value sizes.
*/
inline void put_packed(const database& db,
const std::string& packed_keys, const std::vector<hg_size_t>& ksizes,
const std::string& packed_values, const std::vector<hg_size_t>& vsizes) const {
put_packed(db, ksizes.size(), packed_keys.data(), ksizes.data(), packed_values.data(), vsizes.data());
}
//////////////////////////
// EXISTS methods
//////////////////////////
......@@ -445,6 +478,38 @@ class client {
return vsizes;
}
//////////////////////////
// LENGTH_PACKED methods
//////////////////////////
/**
* @brief Equivalent to sdskv_length_packed.
*
* @param db Database instance.
* @param num Number of keys.
* @param keys Packed keys.
* @param ksizes Array of key sizes.
* @param vsizes Resulting value sizes.
*/
bool length_packed(const database& db,
hg_size_t num, const void* keys,
const hg_size_t* ksizes, hg_size_t* vsizes) const;
/**
* Version of length_packed that takes an std::string as packed buffer.
*
* @param db Database instance.
* @param keys Packed keys.
* @param vsizes Resulting vector of value sizes.
*/
inline bool length_multi(const database& db,
const std::string& keys,
const std::vector<hg_size_t>& ksizes,
std::vector<hg_size_t>& vsizes) const {
vsizes.resize(ksizes.size());
return length_packed(db, ksizes.size(), keys.data(), ksizes.data(), vsizes.data());
}
//////////////////////////
// GET methods
//////////////////////////
......@@ -652,6 +717,49 @@ class client {
return values;
}
//////////////////////////
// GET_PACKED methods
//////////////////////////
/**
* @brief Equivalent to sdskv_get_packed.
*
* @param db Database instance.
* @param count Number of key/val pairs.
* @param keys Buffer of packed keys.
* @param ksizes Array of key sizes.
* @param valbufsize Size of the value buffer.
* @param values Buffer of packed values.
* @param vsizes Array of sizes of value buffers.
*/
bool get_packed(const database& db,
hg_size_t* count, const void* keys, const hg_size_t* ksizes,
hg_size_t valbufsize, void* values, hg_size_t *vsizes) const;
/**
* @brief Get multiple key/val pairs using std::string packed buffers
* and std::vector<hg_size_t> of sizes. The value buffer must be
* pre-allocated. vsizes will be resized to the right number of retrieved
* values.
*
* @param db Database instance.
* @param keys Vector of key addresses.
* @param ksizes Vector of key sizes.
* @param values Vector of value addresses.
* @param vsizes Vector of value sizes.
*/
inline bool get_packed(const database& db,
const std::string& packed_keys, const std::vector<hg_size_t>& ksizes,
std::string& packed_values, std::vector<hg_size_t>& vsizes) const {
hg_size_t count = ksizes.size();
vsizes.resize(count);
bool b = get_packed(db, &count, packed_keys.data(), ksizes.data(),
packed_values.size(), const_cast<char*>(packed_values.data()), vsizes.data());
vsizes.resize(count);
return b;
}
//////////////////////////
// ERASE methods
//////////////////////////
......@@ -1305,6 +1413,14 @@ class database {
m_ph.m_client->put_multi(*this, std::forward<T>(args)...);
}
/**
* @brief @see client::put_packed.
*/
template<typename ... T>
void put_packed(T&& ... args) const {
m_ph.m_client->put_packed(*this, std::forward<T>(args)...);
}
/**
* @brief @see client::length.
*/
......@@ -1321,6 +1437,14 @@ class database {
return m_ph.m_client->length_multi(*this, std::forward<T>(args)...);
}
/**
* @brief @see client::length_packed.
*/
template<typename ... T>
decltype(auto) length_packed(T&& ... args) const {
return m_ph.m_client->length_packed(*this, std::forward<T>(args)...);
}
/**
* @brief @see client::get.
*/
......@@ -1336,6 +1460,15 @@ class database {
decltype(auto) get_multi(T&& ... args) const {
return m_ph.m_client->get_multi(*this, std::forward<T>(args)...);
}
/**
* @brief @see client::get_packed.
*/
template<typename ... T>
decltype(auto) get_packed(T&& ... args) const {
return m_ph.m_client->get_packed(*this, std::forward<T>(args)...);
}
/**
* @brief @see client::exists
......@@ -1438,6 +1571,14 @@ inline void client::put_multi(const database& db,
_CHECK_RET(ret);
}
inline void client::put_packed(const database& db,
hg_size_t count, const void* keys, const hg_size_t* ksizes,
const void* values, const hg_size_t *vsizes) const {
int ret = sdskv_put_packed(db.m_ph.m_ph, db.m_db_id,
count, keys, ksizes, values, vsizes);
_CHECK_RET(ret);
}
inline hg_size_t client::length(const database& db,
const void* key, hg_size_t ksize) const {
hg_size_t vsize;
......@@ -1463,6 +1604,15 @@ inline bool client::length_multi(const database& db,
return true;
}
inline bool client::length_packed(const database& db,
hg_size_t num, const void* keys,
const hg_size_t* ksizes, hg_size_t* vsizes) const {
int ret = sdskv_length_packed(db.m_ph.m_ph, db.m_db_id,
num, keys, ksizes, vsizes);
_CHECK_RET(ret);
return true;
}
inline bool client::get(const database& db,
const void* key, hg_size_t ksize,
void* value, hg_size_t* vsize) const {
......@@ -1481,6 +1631,15 @@ inline bool client::get_multi(const database& db,
return true;
}
inline bool client::get_packed(const database& db,
hg_size_t* count, const void* keys, const hg_size_t* ksizes,
hg_size_t valbufsize, void* values, hg_size_t *vsizes) const {
int ret = sdskv_get_packed(db.m_ph.m_ph, db.m_db_id,
count, keys, ksizes, valbufsize, values, vsizes);
_CHECK_RET(ret);
return true;
}
inline void client::erase(const database& db,
const void* key,
hg_size_t ksize) const {
......
spack:
specs:
- jsoncpp
- mpich
- mochi-margo
- mochi-abt-io
- berkeley-db
- leveldb
concretization: together
......@@ -41,6 +41,23 @@ class AbstractDataStore {
}
return ret;
}
virtual int put_packed(hg_size_t num_items,
const char* keys,
const hg_size_t* ksizes,
const char* values,
const hg_size_t* vsizes)
{
int ret = 0;
size_t keys_offset = 0;
size_t vals_offset = 0;
for(hg_size_t i=0; i < num_items; i++) {
int r = put(keys+keys_offset, ksizes[i], values+vals_offset, vsizes[i]);
ret = ret == 0 ? r : 0;
keys_offset += ksizes[i];
vals_offset += vsizes[i];
}
return ret;
}
virtual bool get(const ds_bulk_t &key, ds_bulk_t &data)=0;
virtual bool get(const ds_bulk_t &key, std::vector<ds_bulk_t> &data)=0;
virtual bool exists(const void* key, hg_size_t ksize) const = 0;
......
......@@ -14,14 +14,17 @@ struct sdskv_client {
/* accessing database */
hg_id_t sdskv_put_id;
hg_id_t sdskv_put_multi_id;
hg_id_t sdskv_put_packed_id;
hg_id_t sdskv_bulk_put_id;
hg_id_t sdskv_get_id;
hg_id_t sdskv_get_multi_id;
hg_id_t sdskv_get_packed_id;
hg_id_t sdskv_exists_id;
hg_id_t sdskv_erase_id;
hg_id_t sdskv_erase_multi_id;
hg_id_t sdskv_length_id;
hg_id_t sdskv_length_multi_id;
hg_id_t sdskv_length_packed_id;
hg_id_t sdskv_bulk_get_id;
hg_id_t sdskv_list_keys_id;
hg_id_t sdskv_list_keyvals_id;
......@@ -59,14 +62,17 @@ static int sdskv_client_register(sdskv_client_t client, margo_instance_id mid)
margo_registered_name(mid, "sdskv_list_databases_rpc", &client->sdskv_list_databases_id, &flag);
margo_registered_name(mid, "sdskv_put_rpc", &client->sdskv_put_id, &flag);
margo_registered_name(mid, "sdskv_put_multi_rpc", &client->sdskv_put_multi_id, &flag);
margo_registered_name(mid, "sdskv_put_packed_rpc", &client->sdskv_put_packed_id, &flag);
margo_registered_name(mid, "sdskv_bulk_put_rpc", &client->sdskv_bulk_put_id, &flag);
margo_registered_name(mid, "sdskv_get_rpc", &client->sdskv_get_id, &flag);
margo_registered_name(mid, "sdskv_get_multi_rpc", &client->sdskv_get_multi_id, &flag);
margo_registered_name(mid, "sdskv_get_packed_rpc", &client->sdskv_get_packed_id, &flag);
margo_registered_name(mid, "sdskv_erase_rpc", &client->sdskv_erase_id, &flag);
margo_registered_name(mid, "sdskv_erase_multi_rpc", &client->sdskv_erase_multi_id, &flag);
margo_registered_name(mid, "sdskv_exists_rpc", &client->sdskv_exists_id, &flag);
margo_registered_name(mid, "sdskv_length_rpc", &client->sdskv_length_id, &flag);
margo_registered_name(mid, "sdskv_length_multi_rpc", &client->sdskv_length_multi_id, &flag);
margo_registered_name(mid, "sdskv_length_packed_rpc", &client->sdskv_length_packed_id, &flag);
margo_registered_name(mid, "sdskv_bulk_get_rpc", &client->sdskv_bulk_get_id, &flag);
margo_registered_name(mid, "sdskv_list_keys_rpc", &client->sdskv_list_keys_id, &flag);
margo_registered_name(mid, "sdskv_list_keyvals_rpc", &client->sdskv_list_keyvals_id, &flag);
......@@ -88,12 +94,16 @@ static int sdskv_client_register(sdskv_client_t client, margo_instance_id mid)
MARGO_REGISTER(mid, "sdskv_put_rpc", put_in_t, put_out_t, NULL);
client->sdskv_put_multi_id =
MARGO_REGISTER(mid, "sdskv_put_multi_rpc", put_multi_in_t, put_multi_out_t, NULL);
client->sdskv_put_packed_id =
MARGO_REGISTER(mid, "sdskv_put_packed_rpc", put_packed_in_t, put_packed_out_t, NULL);
client->sdskv_bulk_put_id =
MARGO_REGISTER(mid, "sdskv_bulk_put_rpc", bulk_put_in_t, bulk_put_out_t, NULL);
client->sdskv_get_id =
MARGO_REGISTER(mid, "sdskv_get_rpc", get_in_t, get_out_t, NULL);
client->sdskv_get_multi_id =
MARGO_REGISTER(mid, "sdskv_get_multi_rpc", get_multi_in_t, get_multi_out_t, NULL);
client->sdskv_get_packed_id =
MARGO_REGISTER(mid, "sdskv_get_packed_rpc", get_packed_in_t, get_packed_out_t, NULL);
client->sdskv_erase_id =
MARGO_REGISTER(mid, "sdskv_erase_rpc", erase_in_t, erase_out_t, NULL);
client->sdskv_erase_multi_id =
......@@ -104,6 +114,8 @@ static int sdskv_client_register(sdskv_client_t client, margo_instance_id mid)
MARGO_REGISTER(mid, "sdskv_length_rpc", length_in_t, length_out_t, NULL);
client->sdskv_length_multi_id =
MARGO_REGISTER(mid, "sdskv_length_multi_rpc", length_multi_in_t, length_multi_out_t, NULL);
client->sdskv_length_packed_id =
MARGO_REGISTER(mid, "sdskv_length_packed_rpc", length_packed_in_t, length_packed_out_t, NULL);
client->sdskv_bulk_get_id =
MARGO_REGISTER(mid, "sdskv_bulk_get_rpc", bulk_get_in_t, bulk_get_out_t, NULL);
client->sdskv_list_keys_id =
......@@ -582,6 +594,77 @@ finish:
return ret;
}
int sdskv_put_packed(sdskv_provider_handle_t provider,
sdskv_database_id_t db_id,
size_t num, const void* packed_keys, const hg_size_t *ksizes,
const void* packed_values, const hg_size_t *vsizes)
{
hg_return_t hret;
int ret = SDSKV_SUCCESS;
hg_handle_t handle;
put_packed_in_t in;
put_packed_out_t out;
in.db_id = db_id;
in.num_keys = num;
hg_size_t keys_buffer_size = 0;
hg_size_t vals_buffer_size = 0;
unsigned i=0;
for(i=0; i < num; i++) {
keys_buffer_size += ksizes[i];
vals_buffer_size += vsizes[i];
}
in.bulk_size = keys_buffer_size + vals_buffer_size + 2*num*sizeof(size_t);
hg_size_t seg_sizes[4] = { num*sizeof(size_t), num*sizeof(size_t), keys_buffer_size, vals_buffer_size };
void* seg_ptrs[4] = { (void*)ksizes, (void*)vsizes, (void*)packed_keys, (void*)packed_values };
int num_seg = vals_buffer_size == 0 ? 3 : 4;
hret = margo_bulk_create(provider->client->mid, num_seg, seg_ptrs, seg_sizes,
HG_BULK_READ_ONLY, &in.bulk_handle);
if(hret != HG_SUCCESS) {
fprintf(stderr,"[SDSKV] margo_bulk_create() failed in sdskv_put_packed()\n");
return SDSKV_ERR_MERCURY;
}
/* create handle */
hret = margo_create(
provider->client->mid,
provider->addr,
provider->client->sdskv_put_packed_id,
&handle);
if(hret != HG_SUCCESS) {
fprintf(stderr,"[SDSKV] margo_create() failed in sdskv_put_packed()\n");
margo_bulk_free(in.bulk_handle);
return SDSKV_ERR_MERCURY;
}
hret = margo_provider_forward(provider->provider_id, handle, &in);
if(hret != HG_SUCCESS) {
fprintf(stderr,"[SDSKV] margo_forward() failed in sdskv_put_packed()\n");
margo_bulk_free(in.bulk_handle);
margo_destroy(handle);
return SDSKV_ERR_MERCURY;
}
hret = margo_get_output(handle, &out);
if(hret != HG_SUCCESS) {
fprintf(stderr,"[SDSKV] margo_get_output() failed in sdskv_put_packed()\n");
margo_bulk_free(in.bulk_handle);
margo_destroy(handle);
return SDSKV_ERR_MERCURY;
}
ret = out.ret;
margo_free_output(handle, &out);
margo_bulk_free(in.bulk_handle);
margo_destroy(handle);
return ret;
}
int sdskv_get(sdskv_provider_handle_t provider,
sdskv_database_id_t db_id,
const void *key, hg_size_t ksize,
......@@ -1005,6 +1088,187 @@ finish:
return ret;
}
int sdskv_length_packed(sdskv_provider_handle_t provider,
sdskv_database_id_t db_id, size_t num, const void *packed_keys,
const hg_size_t* ksizes, hg_size_t* vsizes)
{
hg_return_t hret;
int ret;
hg_handle_t handle;
length_packed_in_t in;
length_packed_out_t out;
in.db_id = db_id;
in.num_keys = num;
in.in_bulk_size = 0;
in.in_bulk_handle = HG_BULK_NULL;
in.out_bulk_handle = HG_BULK_NULL;
hg_size_t total_ksize = 0;
unsigned i=0;
for(i = 0; i < num; i++) {
total_ksize += ksizes[i];
}
/* create bulk handle to expose the packed_keys and ksizes */
void* seg_ptrs[2] = { (void*)ksizes, (void*)packed_keys };
hg_size_t seg_sizes[2] = { num*sizeof(hg_size_t), total_ksize };
in.in_bulk_size = total_ksize + num*sizeof(hg_size_t);
hret = margo_bulk_create(provider->client->mid, 2, seg_ptrs, seg_sizes,
HG_BULK_READ_ONLY, &in.in_bulk_handle);
if(hret != HG_SUCCESS) {
fprintf(stderr,"[SDSKV] margo_bulk_create() for keys/ksizes failed in sdskv_length_packed()\n");
return SDSKV_ERR_MERCURY;
}
/* create bulk handle to expose the vsizes */
hg_size_t val_size_buf_size = num*sizeof(hg_size_t);
hret = margo_bulk_create(provider->client->mid, 1, (void**)(&vsizes), &val_size_buf_size,
HG_BULK_WRITE_ONLY, &in.out_bulk_handle);
if(hret != HG_SUCCESS) {
fprintf(stderr,"[SDSKV] margo_bulk_create() for vsizes failed in sdskv_length_packed()\n");
margo_bulk_free(in.in_bulk_handle);
return SDSKV_ERR_MERCURY;
}
/* create RPC handle */
hret = margo_create(
provider->client->mid,
provider->addr,
provider->client->sdskv_length_packed_id,
&handle);
if(hret != HG_SUCCESS) {
fprintf(stderr,"[SDSKV] margo_create() failed in sdskv_length_packed()\n");
margo_bulk_free(in.in_bulk_handle);
margo_bulk_free(in.out_bulk_handle);
return SDSKV_ERR_MERCURY;
}
/* forward RPC */
hret = margo_provider_forward(provider->provider_id, handle, &in);
if(hret != HG_SUCCESS) {
fprintf(stderr,"[SDSKV] margo_provider_forward() failed in sdskv_length_packed()\n");
margo_bulk_free(in.in_bulk_handle);
margo_bulk_free(in.out_bulk_handle);
margo_destroy(handle);
return SDSKV_ERR_MERCURY;
}
/* Get output */
hret = margo_get_output(handle, &out);
if(hret != HG_SUCCESS) {
fprintf(stderr,"[SDSKV] margo_get_output() failed in sdskv_length_packed()\n");
margo_bulk_free(in.in_bulk_handle);
margo_bulk_free(in.out_bulk_handle);
margo_destroy(handle);
return SDSKV_ERR_MERCURY;
}
ret = out.ret;
margo_bulk_free(in.in_bulk_handle);
margo_bulk_free(in.out_bulk_handle);
margo_free_output(handle, &out);
margo_destroy(handle);
return ret;
}
int sdskv_get_packed(sdskv_provider_handle_t provider,
sdskv_database_id_t db_id,
size_t* num, const void* packed_keys, const hg_size_t* ksizes,
hg_size_t vbufsize, void* packed_vals, hg_size_t *vsizes)
{
hg_return_t hret;
int ret;
hg_handle_t handle;
get_packed_in_t in;
get_packed_out_t out;
in.db_id = db_id;
in.num_keys = *num;
in.keys_bulk_size = 0;
in.keys_bulk_handle = HG_BULK_NULL;
in.vals_bulk_size = 0;
in.vals_bulk_handle = HG_BULK_NULL;
hg_size_t total_ksize = 0;
unsigned i=0;
for(i = 0; i < *num; i++) {
total_ksize += ksizes[i];
}
/* create bulk handle to expose the packed_keys and ksizes */
void* seg_ptrs[2] = { (void*)ksizes, (void*)packed_keys };
hg_size_t seg_sizes[2] = { (*num)*sizeof(hg_size_t), total_ksize };
in.keys_bulk_size = total_ksize + (*num)*sizeof(hg_size_t);
hret = margo_bulk_create(provider->client->mid, 2, seg_ptrs, seg_sizes,
HG_BULK_READ_ONLY, &in.keys_bulk_handle);
if(hret != HG_SUCCESS) {
fprintf(stderr,"[SDSKV] margo_bulk_create() for keys/ksizes failed in sdskv_get_packed()\n");
return SDSKV_ERR_MERCURY;
}
/* create bulk handle to expose the packed_vals and vsizes */
seg_ptrs[0] = (void*)vsizes;
seg_ptrs[1] = (void*)packed_vals;
seg_sizes[0] = (*num)*sizeof(hg_size_t);
seg_sizes[1] = vbufsize;
in.vals_bulk_size = (*num)*sizeof(hg_size_t) + vbufsize;
hret = margo_bulk_create(provider->client->mid, 2, seg_ptrs, seg_sizes,
HG_BULK_WRITE_ONLY, &in.vals_bulk_handle);
if(hret != HG_SUCCESS) {
fprintf(stderr,"[SDSKV] margo_bulk_create() for vals/vsizes failed in sdskv_get_packed()\n");
margo_bulk_free(in.keys_bulk_handle);
return SDSKV_ERR_MERCURY;
}
/* create RPC handle */
hret = margo_create(
provider->client->mid,
provider->addr,
provider->client->sdskv_get_packed_id,
&handle);
if(hret != HG_SUCCESS) {
fprintf(stderr,"[SDSKV] margo_create() failed in sdskv_get_packed()\n");
margo_bulk_free(in.keys_bulk_handle);
margo_bulk_free(in.vals_bulk_handle);
return SDSKV_ERR_MERCURY;
}
/* forward RPC */
hret = margo_provider_forward(provider->provider_id, handle, &in);
if(hret != HG_SUCCESS) {
fprintf(stderr,"[SDSKV] margo_provider_forward() failed in sdskv_get_packed()\n");
margo_bulk_free(in.keys_bulk_handle);
margo_bulk_free(in.vals_bulk_handle);
margo_destroy(handle);
return SDSKV_ERR_MERCURY;
}
/* Get output */
hret = margo_get_output(handle, &out);
if(hret != HG_SUCCESS) {
fprintf(stderr,"[SDSKV] margo_get_output() failed in sdskv_get_packed()\n");
margo_bulk_free(in.keys_bulk_handle);
margo_bulk_free(in.vals_bulk_handle);
margo_destroy(handle);
return SDSKV_ERR_MERCURY;
}
ret = out.ret;
*num = out.num_keys;
margo_bulk_free(in.keys_bulk_handle);
margo_bulk_free(in.vals_bulk_handle);
margo_free_output(handle, &out);
margo_destroy(handle);
return ret;
}
int sdskv_erase(sdskv_provider_handle_t provider,
sdskv_database_id_t db_id, const void *key,
hg_size_t ksize)
......
......@@ -191,6 +191,14 @@ MERCURY_GEN_PROC(put_multi_in_t, \
((hg_size_t)(vals_bulk_size)))
MERCURY_GEN_PROC(put_multi_out_t, ((int32_t)(ret)))
// ------------- PUT PACKED ------------- //
MERCURY_GEN_PROC(put_packed_in_t, \
((uint64_t)(db_id))\
((hg_size_t)(num_keys))\
((hg_size_t)(bulk_size))\
((hg_bulk_t)(bulk_handle)))
MERCURY_GEN_PROC(put_packed_out_t, ((int32_t)(ret)))
// ------------- GET MULTI ------------- //
MERCURY_GEN_PROC(get_multi_in_t, \
((uint64_t)(db_id))\
......@@ -201,6 +209,18 @@ MERCURY_GEN_PROC(get_multi_in_t, \
((hg_size_t)(vals_bulk_size)))
MERCURY_GEN_PROC(get_multi_out_t, ((int32_t)(ret)))
// ------------- GET PACKED ------------- //
MERCURY_GEN_PROC(get_packed_in_t, \
((uint64_t)(db_id))\
((hg_size_t)(num_keys))\
((hg_size_t)(keys_bulk_size))\
((hg_bulk_t)(keys_bulk_handle))\
((hg_size_t)(vals_bulk_size))\