Commit f0277d5c authored by Matthieu Dorier's avatar Matthieu Dorier

improved list keys and list keyvals functions

parent 1d251140
......@@ -167,6 +167,9 @@ int sdskv_put_multi(sdskv_provider_handle_t provider,
* will hold the actual size of the key. Note that the size
* of a value can get obtained using sdskv_length.
*
* If value is NULL, the call will be equivalent
* to sdskv_length and put the length of the value in *vsize.
*
* @param[in] provider provider handle
* @param[in] db_id database id of the target database
* @param[in] key key to lookup
......@@ -194,6 +197,9 @@ int sdskv_get(sdskv_provider_handle_t provider,
* another way to distinguish a 0-sized value and a value for
* which there was an error).
*
* If values is set to NULL, the call is equivalent to calling
* sdskv_length_multi to get the size of the values.
*
* @param[in] provider provider handle
* @param[in] db_id database id
* @param[in] num number of keys to retrieve
......@@ -299,9 +305,12 @@ int sdskv_erase_multi(sdskv_provider_handle_t handle,
* being a preallocated buffer of size ksizes[i]. ksizes must be an array
* of sizes of the preallocated buffers. After a successful call, max_keys
* is set to the actual number of keys retrieved, ksizes[i] is set to the
* actual size of the i-th key, and keys[i] contains the i-th key. The call
* will fail if at least one of the preallocated size is too small to hold
* the key.
* actual size of the i-th key, and keys[i] contains the i-th key.
*
* If at least one of the keys cannot fit in its buffer, the call will fail
* and return SDSKV_ERR_SIZE, but the ksizes array will be updated to the
* sizes required to fit each key, and max_keys will be updated to the number
* of keys that can be listed. keys will be left untouched.
*
* @param[in] provider provider handle
* @param[in] db_id database id
......@@ -358,6 +367,11 @@ int sdskv_list_keys_with_prefix(
* After a successful call, values[i] will hold the i-th value
* and vsizes[i] will be set to the actual size of the i-th value.
*
* If a buffer for a key or a value is too small, the call will fail
* but ksizes and vsizes will be updated to the correct sizes needed,
* and max_items will be updated to the number of entries that can be
* listed.
*
* @param[in] provider provider handle
* @param[in] db_id database id
* @param[in] start_key starting key
......
......@@ -551,6 +551,10 @@ int sdskv_get(sdskv_provider_handle_t provider,
int ret;
hg_handle_t handle;
if(value == NULL) {
return sdskv_length(provider, db_id, key, ksize, vsize);
}
size = *(hg_size_t*)vsize;
msize = size + sizeof(hg_size_t) + sizeof(hg_return_t);
......@@ -657,6 +661,10 @@ int sdskv_get_multi(sdskv_provider_handle_t provider,
hg_size_t* key_seg_sizes = NULL;
char* vals_buffer = NULL;
if(values == NULL) {
return sdskv_length_multi(provider, db_id, num, keys, ksizes, vsizes);
}
in.db_id = db_id;
in.num_keys = num;
in.keys_bulk_handle = HG_BULK_NULL;
......
......@@ -1556,16 +1556,33 @@ static void sdskv_list_keys_ult(hg_handle_t handle)
/* create the array of actual sizes */
std::vector<hg_size_t> true_ksizes(num_keys);
hg_size_t keys_bulk_size = 0;
bool size_error = false;
for(unsigned i = 0; i < num_keys; i++) {
true_ksizes[i] = keys[i].size();
if(true_ksizes[i] > ksizes[i]) {
// this key has a size that exceeds the allocated size on client
throw SDSKV_ERR_SIZE;
} else {
ksizes[i] = true_ksizes[i];
keys_bulk_size += ksizes[i];
size_error = true;
}
ksizes[i] = true_ksizes[i];
keys_bulk_size += ksizes[i];
}
for(unsigned i = num_keys; i < in.max_keys; i++) {
ksizes[i] = 0;
}
out.nkeys = num_keys;
/* transfer the ksizes back to the client */
hret = margo_bulk_transfer(mid, HG_BULK_PUSH, origin_addr,
in.ksizes_bulk_handle, 0, ksizes_local_bulk, 0, ksizes_bulk_size);
if(hret != HG_SUCCESS) {
std::cerr << "Error: SDSKV list_keys could not issue bulk transfer "
<< "(push from ksizes_local_bulk to in.ksizes_bulk_handle)" << std::endl;
throw SDSKV_ERR_MERCURY;
}
/* if user provided a size too small for some key, return error (we already set the right key sizes) */
if(size_error)
throw SDSKV_ERR_SIZE;
/* create an array of addresses pointing to keys */
std::vector<void*> keys_addr(num_keys);
......@@ -1581,15 +1598,6 @@ static void sdskv_list_keys_ult(hg_handle_t handle)
throw SDSKV_ERR_MERCURY;
}
/* transfer the ksizes back to the client */
hret = margo_bulk_transfer(mid, HG_BULK_PUSH, origin_addr,
in.ksizes_bulk_handle, 0, ksizes_local_bulk, 0, ksizes_bulk_size);
if(hret != HG_SUCCESS) {
std::cerr << "Error: SDSKV list_keys could not issue bulk transfer "
<< "(push from ksizes_local_bulk to in.ksizes_bulk_handle)" << std::endl;
throw SDSKV_ERR_MERCURY;
}
/* transfer the keys to the client */
uint64_t remote_offset = 0;
uint64_t local_offset = 0;
......@@ -1608,7 +1616,6 @@ static void sdskv_list_keys_ult(hg_handle_t handle)
local_offset += true_ksizes[i];
}
out.nkeys = num_keys;
out.ret = SDSKV_SUCCESS;
} catch(int exc_no) {
......@@ -1729,8 +1736,12 @@ static void sdskv_list_keyvals_ult(hg_handle_t handle)
auto keyvals = db->list_keyvals(start_kdata, in.max_keys, prefix);
hg_size_t num_keys = std::min(keyvals.size(), in.max_keys);
out.nkeys = num_keys;
if(num_keys == 0) throw SDSKV_SUCCESS;
bool size_error = false;
/* create the array of actual key sizes */
std::vector<hg_size_t> true_ksizes(num_keys);
hg_size_t keys_bulk_size = 0;
......@@ -1738,12 +1749,12 @@ static void sdskv_list_keyvals_ult(hg_handle_t handle)
true_ksizes[i] = keyvals[i].first.size();
if(true_ksizes[i] > ksizes[i]) {
// this key has a size that exceeds the allocated size on client
throw SDSKV_ERR_SIZE;
} else {
ksizes[i] = true_ksizes[i];
keys_bulk_size += ksizes[i];
}
size_error = true;
}
ksizes[i] = true_ksizes[i];
keys_bulk_size += ksizes[i];
}
for(unsigned i = num_keys; i < ksizes.size(); i++) ksizes[i] = 0;
/* create the array of actual value sizes */
std::vector<hg_size_t> true_vsizes(num_keys);
......@@ -1752,12 +1763,33 @@ static void sdskv_list_keyvals_ult(hg_handle_t handle)
true_vsizes[i] = keyvals[i].second.size();
if(true_vsizes[i] > vsizes[i]) {
// this value has a size that exceeds the allocated size on client
throw SDSKV_ERR_SIZE;
} else {
vsizes[i] = true_vsizes[i];
vals_bulk_size += vsizes[i];
size_error = true;
}
vsizes[i] = true_vsizes[i];
vals_bulk_size += vsizes[i];
}
for(unsigned i = num_keys; i < vsizes.size(); i++) vsizes[i] = 0;
/* transfer the ksizes back to the client */
hret = margo_bulk_transfer(mid, HG_BULK_PUSH, origin_addr,
in.ksizes_bulk_handle, 0, ksizes_local_bulk, 0, ksizes_bulk_size);
if(hret != HG_SUCCESS) {
std::cerr << "Error: SDSKV list_keyvals could not issue bulk transfer "
<< "(push from ksizes_local_bulk to in.ksizes_bulk_handle)" << std::endl;
throw SDSKV_ERR_MERCURY;
}
/* transfer the vsizes back to the client */
hret = margo_bulk_transfer(mid, HG_BULK_PUSH, origin_addr,
in.vsizes_bulk_handle, 0, vsizes_local_bulk, 0, vsizes_bulk_size);
if(hret != HG_SUCCESS) {
std::cerr << "Error: SDSKV list_keyvals could not issue bulk transfer "
<< "(push from vsizes_local_bulk to in.vsizes_bulk_handle)" << std::endl;
throw SDSKV_ERR_MERCURY;
}
if(size_error)
throw SDSKV_ERR_SIZE;
/* create an array of addresses pointing to keys */
std::vector<void*> keys_addr(num_keys);
......@@ -1787,23 +1819,6 @@ static void sdskv_list_keyvals_ult(hg_handle_t handle)
throw SDSKV_ERR_MERCURY;
}
/* transfer the ksizes back to the client */
hret = margo_bulk_transfer(mid, HG_BULK_PUSH, origin_addr,
in.ksizes_bulk_handle, 0, ksizes_local_bulk, 0, ksizes_bulk_size);
if(hret != HG_SUCCESS) {
std::cerr << "Error: SDSKV list_keyvals could not issue bulk transfer "
<< "(push from ksizes_local_bulk to in.ksizes_bulk_handle)" << std::endl;
throw SDSKV_ERR_MERCURY;
}
/* transfer the vsizes back to the client */
hret = margo_bulk_transfer(mid, HG_BULK_PUSH, origin_addr,
in.vsizes_bulk_handle, 0, vsizes_local_bulk, 0, vsizes_bulk_size);
if(hret != HG_SUCCESS) {
std::cerr << "Error: SDSKV list_keyvals could not issue bulk transfer "
<< "(push from vsizes_local_bulk to in.vsizes_bulk_handle)" << std::endl;
throw SDSKV_ERR_MERCURY;
}
uint64_t remote_offset = 0;
uint64_t local_offset = 0;
......@@ -1839,7 +1854,6 @@ static void sdskv_list_keyvals_ult(hg_handle_t handle)
local_offset += true_vsizes[i];
}
out.nkeys = num_keys;
out.ret = SDSKV_SUCCESS;
} catch(int exc_no) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment