Commit 28694799 authored by Shane Snyder's avatar Shane Snyder

add bake-bulk proxy write call + implementation

parent d835155a
......@@ -62,7 +62,19 @@ int bake_bulk_write(
uint64_t region_offset,
void const *buf,
uint64_t buf_size);
/**
*
*/
int bake_bulk_proxy_write(
bake_target_id_t bti,
bake_bulk_region_id_t rid,
uint64_t region_offset,
hg_bulk_t remote_bulk,
uint64_t remote_offset,
hg_addr_t remote_addr,
uint64_t size);
/**
* Persist a bulk region. The region is considered immutable at this point
* and reads may be performed on the region.
......
......@@ -314,6 +314,9 @@ int bake_bulk_write(
in.bti = bti;
in.rid = rid;
in.region_offset = region_offset;
in.bulk_offset = 0;
in.bulk_size = buf_size;
in.remote_addr_str = NULL; /* set remote_addr to NULL to disable proxy write */
hret = margo_bulk_create(g_margo_inst.mid, 1, (void**)(&buf), &buf_size,
HG_BULK_READ_ONLY, &in.bulk_handle);
......@@ -352,6 +355,67 @@ int bake_bulk_write(
return(ret);
}
int bake_bulk_proxy_write(
bake_target_id_t bti,
bake_bulk_region_id_t rid,
uint64_t region_offset,
hg_bulk_t remote_bulk,
uint64_t remote_offset,
hg_addr_t remote_addr,
uint64_t size)
{
hg_return_t hret;
hg_handle_t handle;
bake_bulk_write_in_t in;
bake_bulk_write_out_t out;
struct bake_instance *instance = NULL;
char remote_addr_str[128] = {0};
hg_size_t remote_addr_str_sz = 128;
int ret;
HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
if(!instance)
return(-1);
hret = margo_addr_to_string(g_margo_inst.mid, remote_addr_str,
&remote_addr_str_sz, remote_addr);
if(hret != HG_SUCCESS)
return(-1);
in.bti = bti;
in.rid = rid;
in.region_offset = region_offset;
in.bulk_handle = remote_bulk;
in.bulk_offset = remote_offset;
in.bulk_size = size;
in.remote_addr_str = remote_addr_str; /* enable proxy write to remote source */
hret = margo_create(g_margo_inst.mid, instance->dest,
g_margo_inst.bake_bulk_write_id, &handle);
if(hret != HG_SUCCESS)
return(-1);
hret = margo_forward(handle, &in);
if(hret != HG_SUCCESS)
{
margo_destroy(handle);
return(-1);
}
hret = margo_get_output(handle, &out);
if(hret != HG_SUCCESS)
{
margo_destroy(handle);
return(-1);
}
ret = out.ret;
margo_free_output(handle, &out);
margo_destroy(handle);
return(ret);
}
int bake_bulk_create(
bake_target_id_t bti,
uint64_t region_size,
......
......@@ -8,6 +8,7 @@
#define __BAKE_BULK_RPC
#include <margo.h>
#include <mercury_proc_string.h>
#include <bake-bulk.h>
/* encoders for bake-specific types */
......@@ -30,7 +31,10 @@ MERCURY_GEN_PROC(bake_bulk_write_in_t,
((bake_target_id_t)(bti))\
((bake_bulk_region_id_t)(rid))\
((uint64_t)(region_offset))\
((hg_bulk_t)(bulk_handle)))
((hg_bulk_t)(bulk_handle))\
((uint64_t)(bulk_offset))\
((uint64_t)(bulk_size))\
((hg_string_t)(remote_addr_str)))
MERCURY_GEN_PROC(bake_bulk_write_out_t,
((int32_t)(ret)))
DECLARE_MARGO_RPC_HANDLER(bake_bulk_write_ult)
......
......@@ -161,6 +161,7 @@ static void bake_bulk_write_ult(hg_handle_t handle)
bake_bulk_write_out_t out;
bake_bulk_write_in_t in;
hg_return_t hret;
hg_addr_t src_addr;
char* buffer;
hg_size_t size;
hg_bulk_t bulk_handle;
......@@ -212,11 +213,33 @@ static void bake_bulk_write_ult(hg_handle_t handle)
return;
}
hret = margo_bulk_transfer(mid, HG_BULK_PULL, hgi->addr, in.bulk_handle,
if(in.remote_addr_str)
{
/* a proxy address was provided to send write data to */
hret = margo_addr_lookup(mid, in.remote_addr_str, &src_addr);
if(hret != HG_SUCCESS)
{
out.ret = -1;
margo_bulk_free(bulk_handle);
margo_free_input(handle, &in);
margo_respond(handle, &out);
margo_destroy(handle);
return;
}
}
else
{
/* no proxy write, use the source of this request */
src_addr = hgi->addr;
}
hret = margo_bulk_transfer(mid, HG_BULK_PULL, src_addr, in.bulk_handle,
0, bulk_handle, 0, size);
if(hret != HG_SUCCESS)
{
out.ret = -1;
if(in.remote_addr_str)
margo_addr_free(mid, src_addr);
margo_bulk_free(bulk_handle);
margo_free_input(handle, &in);
margo_respond(handle, &out);
......@@ -226,6 +249,8 @@ static void bake_bulk_write_ult(hg_handle_t handle)
out.ret = 0;
if(in.remote_addr_str)
margo_addr_free(mid, src_addr);
margo_bulk_free(bulk_handle);
margo_free_input(handle, &in);
margo_respond(handle, &out);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment