Commit 3638f001 authored by Matthieu Dorier's avatar Matthieu Dorier

Revert "improved get-multi"

This reverts commit 450d8832.
parent 7283b499
......@@ -811,8 +811,7 @@ int sdskv_get_multi(sdskv_provider_handle_t provider,
get_multi_out_t out;
void** key_seg_ptrs = NULL;
hg_size_t* key_seg_sizes = NULL;
void** val_seg_ptrs = NULL;
hg_size_t* val_seg_sizes = NULL;
char* vals_buffer = NULL;
int vals_use_poolset = 1;
int keys_use_poolset = 1;
......@@ -840,18 +839,6 @@ int sdskv_get_multi(sdskv_provider_handle_t provider,
in.keys_bulk_size += key_seg_sizes[i];
}
/* create an array of val sizes and val pointers */
val_seg_sizes = malloc(sizeof(hg_size_t)*(num+1));
val_seg_sizes[0] = num*sizeof(hg_size_t);
memcpy(val_seg_sizes+1, vsizes, num*sizeof(hg_size_t));
val_seg_ptrs = malloc(sizeof(void*)*(num+1));
val_seg_ptrs[0] = (void*)vsizes;
memcpy(val_seg_ptrs+1, values, num*sizeof(void*));
for(i=0; i<num+1; i++) {
in.vals_bulk_size += val_seg_sizes[i];
}
/* create the bulk handle to access the keys */
hret = create_bulk(provider->client, num+1, key_seg_ptrs, key_seg_sizes,
HG_BULK_READ_ONLY, &keys_use_poolset, &in.keys_bulk_handle);
......@@ -861,8 +848,19 @@ int sdskv_get_multi(sdskv_provider_handle_t provider,
goto finish;
}
/* allocate memory to send max value sizes and receive values */
for(i=0; i<num; i++) {
in.vals_bulk_size += vsizes[i];
}
in.vals_bulk_size += sizeof(hg_size_t)*num;
vals_buffer = malloc(in.vals_bulk_size);
hg_size_t* value_sizes = (hg_size_t*)vals_buffer; // beginning of the buffer used to hold sizes
for(i=0; i<num; i++) {
value_sizes[i] = vsizes[i];
}
/* create the bulk handle to access the values */
hret = create_bulk(provider->client, num+1, val_seg_ptrs, val_seg_sizes,
hret = create_bulk(provider->client, 1, (void**)&vals_buffer, &in.vals_bulk_size,
HG_BULK_READWRITE, &vals_use_poolset, &in.vals_bulk_handle);
if(hret != HG_SUCCESS) {
fprintf(stderr,"[SDSKV] failed to create bulk in sdskv_get_multi()\n");
......@@ -900,7 +898,7 @@ int sdskv_get_multi(sdskv_provider_handle_t provider,
/* sync the bulk */
hret = sync_bulk(provider->client, in.vals_bulk_handle, 0, in.vals_bulk_size,
num+1, val_seg_ptrs, val_seg_sizes, vals_use_poolset);
1, (void**)&vals_buffer, &in.vals_bulk_size, vals_use_poolset);
if(hret != HG_SUCCESS) {
fprintf(stderr, "[SDSKV] sync_bulk() failed in sdskv_get_multi()\n");
ret = SDSKV_ERR_POOLSET;
......@@ -912,14 +910,23 @@ int sdskv_get_multi(sdskv_provider_handle_t provider,
goto finish;
}
/* copy the values from the buffer into the user-provided buffer */
// XXX we could optimize this if use_poolset == 1, by removing the
// call to sync_bulk and taking data directly out of the bulk handle
char* value_ptr = vals_buffer + num*sizeof(hg_size_t);
for(i=0; i<num; i++) {
memcpy(values[i], value_ptr, value_sizes[i]);
vsizes[i] = value_sizes[i];
value_ptr += value_sizes[i];
}
finish:
margo_free_output(handle, &out);
destroy_bulk(provider->client, keys_use_poolset, in.keys_bulk_handle);
destroy_bulk(provider->client, vals_use_poolset, in.vals_bulk_handle);
free(key_seg_sizes);
free(key_seg_ptrs);
free(val_seg_sizes);
free(val_seg_ptrs);
free(vals_buffer);
margo_destroy(handle);
return ret;
}
......
......@@ -1009,50 +1009,25 @@ static void sdskv_get_multi_ult(hg_handle_t handle)
char* packed_keys = local_keys_buffer + in.num_keys*sizeof(hg_size_t);
/* interpret beginning of the value buffer as a list of value sizes */
hg_size_t* val_sizes = (hg_size_t*)local_vals_buffer;
/* find beginning of region that should contain values */
/* find beginning of region where to pack values */
char* packed_values = local_vals_buffer + in.num_keys*sizeof(hg_size_t);
/* go through the key/value pairs and get the values from the database */
std::vector<margo_request> requests;
requests.reserve(in.num_keys);
hg_size_t remote_offset = in.num_keys*sizeof(hg_size_t);
hg_size_t local_offset = vals_offset + in.num_keys*sizeof(hg_size_t);
for(unsigned i=0; i < in.num_keys; i++) {
hg_size_t client_vsize = val_sizes[i]; // size allocated by client for this value
data_slice kdata(packed_keys, key_sizes[i]);
data_slice vdata(packed_values, val_sizes[i]);
if(db->get(kdata, vdata) == SDSKV_SUCCESS) {
hg_size_t actual_vsize = vdata.size();
/* do a PUSH operation to push back the value to the client */
margo_request req;
hret = margo_bulk_itransfer(mid, HG_BULK_PUSH, info->addr, in.vals_bulk_handle, remote_offset,
local_bulk_handle, local_offset, actual_vsize, &req);
if(hret != HG_SUCCESS) {
val_sizes[i] = 0;
break;
} else {
val_sizes[i] = actual_vsize;
requests.push_back(req);
}
val_sizes[i] = vdata.size();
} else {
val_sizes[i] = 0;
}
packed_keys += key_sizes[i];
packed_values += client_vsize;
remote_offset += client_vsize;
local_offset += client_vsize;
}
for(auto& r : requests)
margo_wait(r);
if(hret != HG_SUCCESS) {
out.ret = SDSKV_ERR_MERCURY;
return;
packed_values += val_sizes[i];
}
/* do a PUSH operation to push back the value sizes to the client */
/* do a PUSH operation to push back the values to the client */
hret = margo_bulk_transfer(mid, HG_BULK_PUSH, info->addr, in.vals_bulk_handle, 0,
local_bulk_handle, vals_offset, in.num_keys*sizeof(hg_size_t));
local_bulk_handle, vals_offset, in.vals_bulk_size);
if(hret != HG_SUCCESS) {
out.ret = SDSKV_ERR_MERCURY;
return;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment