Commit cb2df8fa authored by Matthieu Dorier's avatar Matthieu Dorier

preventing mercury from doing 0-sized bulk transfers

parent e15179c9
......@@ -433,29 +433,36 @@ static void sdskv_bulk_put_ult(hg_handle_t handle)
margo_destroy(handle);
return;
}
ds_bulk_t vdata(in.vsize);
void *buffer = (void*)vdata.data();
hg_size_t size = vdata.size();
hret = margo_bulk_create(mid, 1, (void**)&buffer, &size,
HG_BULK_WRITE_ONLY, &bulk_handle);
if(hret != HG_SUCCESS) {
out.ret = SDSKV_ERR_MERCURY;
margo_respond(handle, &out);
margo_free_input(handle, &in);
margo_destroy(handle);
return;
}
hret = margo_bulk_transfer(mid, HG_BULK_PULL, info->addr, in.handle, 0,
bulk_handle, 0, vdata.size());
if(hret != HG_SUCCESS) {
out.ret = SDSKV_ERR_MERCURY;
margo_respond(handle, &out);
margo_free_input(handle, &in);
if(in.vsize > 0) {
void *buffer = (void*)vdata.data();
hg_size_t size = vdata.size();
hret = margo_bulk_create(mid, 1, (void**)&buffer, &size,
HG_BULK_WRITE_ONLY, &bulk_handle);
if(hret != HG_SUCCESS) {
out.ret = SDSKV_ERR_MERCURY;
margo_respond(handle, &out);
margo_free_input(handle, &in);
margo_destroy(handle);
return;
}
hret = margo_bulk_transfer(mid, HG_BULK_PULL, info->addr, in.handle, 0,
bulk_handle, 0, vdata.size());
if(hret != HG_SUCCESS) {
out.ret = SDSKV_ERR_MERCURY;
margo_respond(handle, &out);
margo_free_input(handle, &in);
margo_bulk_free(bulk_handle);
margo_destroy(handle);
return;
}
margo_bulk_free(bulk_handle);
margo_destroy(handle);
return;
}
ds_bulk_t kdata(in.key.data, in.key.data+in.key.size);
......@@ -470,7 +477,6 @@ static void sdskv_bulk_put_ult(hg_handle_t handle)
margo_respond(handle, &out);
margo_free_input(handle, &in);
margo_bulk_free(bulk_handle);
margo_destroy(handle);
return;
......@@ -531,35 +537,38 @@ static void sdskv_bulk_get_ult(hg_handle_t handle)
void *buffer = (void*)vdata.data();
hg_size_t size = vdata.size();
hret = margo_bulk_create(mid, 1, (void**)&buffer, &size,
HG_BULK_READ_ONLY, &bulk_handle);
if(hret != HG_SUCCESS) {
out.size = 0;
out.ret = SDSKV_ERR_MERCURY;
margo_respond(handle, &out);
margo_free_input(handle, &in);
margo_destroy(handle);
return;
}
if(size > 0) {
hret = margo_bulk_create(mid, 1, (void**)&buffer, &size,
HG_BULK_READ_ONLY, &bulk_handle);
if(hret != HG_SUCCESS) {
out.size = 0;
out.ret = SDSKV_ERR_MERCURY;
margo_respond(handle, &out);
margo_free_input(handle, &in);
margo_destroy(handle);
return;
}
hret = margo_bulk_transfer(mid, HG_BULK_PUSH, info->addr, in.handle, 0,
bulk_handle, 0, vdata.size());
if(hret != HG_SUCCESS) {
out.size = 0;
out.ret = SDSKV_ERR_MERCURY;
margo_respond(handle, &out);
margo_free_input(handle, &in);
margo_bulk_free(bulk_handle);
margo_destroy(handle);
return;
}
hret = margo_bulk_transfer(mid, HG_BULK_PUSH, info->addr, in.handle, 0,
bulk_handle, 0, vdata.size());
if(hret != HG_SUCCESS) {
out.size = 0;
out.ret = SDSKV_ERR_MERCURY;
margo_respond(handle, &out);
margo_free_input(handle, &in);
margo_bulk_free(bulk_handle);
margo_destroy(handle);
return;
}
out.size = vdata.size();
out.size = size;
out.ret = SDSKV_SUCCESS;
margo_respond(handle, &out);
margo_free_input(handle, &in);
margo_bulk_free(bulk_handle);
margo_destroy(handle);
return;
......@@ -741,11 +750,13 @@ static void sdskv_list_keys_ult(hg_handle_t handle)
uint64_t local_offset = 0;
for(unsigned i = 0; i < num_keys; i++) {
hret = margo_bulk_transfer(mid, HG_BULK_PUSH, origin_addr,
in.keys_bulk_handle, remote_offset, keys_local_bulk, local_offset, true_ksizes[i]);
if(hret != HG_SUCCESS) {
std::cerr << "Error: SDSKV list_keys could not issue bulk transfer (keys_local_bulk)" << std::endl;
throw SDSKV_ERR_MERCURY;
if(true_ksizes[i] > 0) {
hret = margo_bulk_transfer(mid, HG_BULK_PUSH, origin_addr,
in.keys_bulk_handle, remote_offset, keys_local_bulk, local_offset, true_ksizes[i]);
if(hret != HG_SUCCESS) {
std::cerr << "Error: SDSKV list_keys could not issue bulk transfer (keys_local_bulk)" << std::endl;
throw SDSKV_ERR_MERCURY;
}
}
remote_offset += remote_ksizes[i];
......@@ -951,11 +962,13 @@ static void sdskv_list_keyvals_ult(hg_handle_t handle)
/* transfer the keys to the client */
for(unsigned i=0; i < num_keys; i++) {
hret = margo_bulk_transfer(mid, HG_BULK_PUSH, origin_addr,
in.keys_bulk_handle, remote_offset, keys_local_bulk, local_offset, true_ksizes[i]);
if(hret != HG_SUCCESS) {
std::cerr << "Error: SDSKV list_keyvals could not issue bulk transfer (keys_local_bulk)" << std::endl;
throw SDSKV_ERR_MERCURY;
if(true_ksizes[i] > 0) {
hret = margo_bulk_transfer(mid, HG_BULK_PUSH, origin_addr,
in.keys_bulk_handle, remote_offset, keys_local_bulk, local_offset, true_ksizes[i]);
if(hret != HG_SUCCESS) {
std::cerr << "Error: SDSKV list_keyvals could not issue bulk transfer (keys_local_bulk)" << std::endl;
throw SDSKV_ERR_MERCURY;
}
}
remote_offset += remote_ksizes[i];
local_offset += true_ksizes[i];
......@@ -966,11 +979,13 @@ static void sdskv_list_keyvals_ult(hg_handle_t handle)
/* transfer the values to the client */
for(unsigned i=0; i < num_keys; i++) {
hret = margo_bulk_transfer(mid, HG_BULK_PUSH, origin_addr,
in.vals_bulk_handle, remote_offset, vals_local_bulk, local_offset, true_vsizes[i]);
if(hret != HG_SUCCESS) {
std::cerr << "Error: SDSKV list_keyvals could not issue bulk transfer (vals_local_bulk)" << std::endl;
throw SDSKV_ERR_MERCURY;
if(true_vsizes[i] > 0) {
hret = margo_bulk_transfer(mid, HG_BULK_PUSH, origin_addr,
in.vals_bulk_handle, remote_offset, vals_local_bulk, local_offset, true_vsizes[i]);
if(hret != HG_SUCCESS) {
std::cerr << "Error: SDSKV list_keyvals could not issue bulk transfer (vals_local_bulk)" << std::endl;
throw SDSKV_ERR_MERCURY;
}
}
remote_offset += remote_vsizes[i];
local_offset += true_vsizes[i];
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment