Commit 0327461b authored by Matthieu Dorier's avatar Matthieu Dorier

completed proxy_put_packed functionality

parent 160f2917
......@@ -819,6 +819,7 @@ static void sdskv_put_packed_ult(hg_handle_t handle)
out.ret = SDSKV_SUCCESS;
std::vector<char> local_buffer;
hg_bulk_t local_bulk_handle;
hg_addr_t origin_addr = HG_ADDR_NULL;
auto r1 = at_exit([&handle]() { margo_destroy(handle); });
auto r2 = at_exit([&handle,&out]() { margo_respond(handle, &out); });
......@@ -849,6 +850,18 @@ static void sdskv_put_packed_ult(hg_handle_t handle)
auto db = it->second;
ABT_rwlock_unlock(svr_ctx->lock);
// find out the address of the origin
if(in.origin_addr != NULL) {
hret = margo_addr_lookup(mid, in.origin_addr, &origin_addr);
} else {
hret = margo_addr_dup(mid, info->addr, &origin_addr);
}
if(hret != HG_SUCCESS) {
out.ret = SDSKV_ERR_MERCURY;
return;
}
auto r4 = at_exit([&origin_addr,&mid]() { margo_addr_free(mid, origin_addr); });
// allocate a buffer to receive the keys and a buffer to receive the values
local_buffer.resize(in.bulk_size);
void* buf_ptr = local_buffer.data();
......@@ -861,10 +874,10 @@ static void sdskv_put_packed_ult(hg_handle_t handle)
out.ret = SDSKV_ERR_MERCURY;
return;
}
auto r6 = at_exit([&local_bulk_handle]() { margo_bulk_free(local_bulk_handle); });
auto r5 = at_exit([&local_bulk_handle]() { margo_bulk_free(local_bulk_handle); });
/* transfer data */
hret = margo_bulk_transfer(mid, HG_BULK_PULL, info->addr, in.bulk_handle, 0,
hret = margo_bulk_transfer(mid, HG_BULK_PULL, origin_addr, in.bulk_handle, 0,
local_bulk_handle, 0, in.bulk_size);
if(hret != HG_SUCCESS) {
out.ret = SDSKV_ERR_MERCURY;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment