Commit 4f4be87d authored by Philip Carns's avatar Philip Carns

differentiate bulk xfer target for relays

parent 77a7d10a
...@@ -95,6 +95,7 @@ void data_xfer_read(margo_instance_id mid, hg_addr_t svr_addr, void *buffer, hg_ ...@@ -95,6 +95,7 @@ void data_xfer_read(margo_instance_id mid, hg_addr_t svr_addr, void *buffer, hg_
ret = HG_Bulk_create(hgi->hg_class, 1, &buffer, &buffer_sz, ret = HG_Bulk_create(hgi->hg_class, 1, &buffer, &buffer_sz,
HG_BULK_WRITE_ONLY, &in.bulk_handle); HG_BULK_WRITE_ONLY, &in.bulk_handle);
assert(ret == 0); assert(ret == 0);
in.bulk_relay_addr = NULL;
#if 0 #if 0
HG_Set_target_id(handle, mplex_id); HG_Set_target_id(handle, mplex_id);
......
...@@ -8,9 +8,11 @@ ...@@ -8,9 +8,11 @@
#define __DATA_XFER_PROTO #define __DATA_XFER_PROTO
#include <margo.h> #include <margo.h>
#include <mercury_proc_string.h>
MERCURY_GEN_PROC(data_xfer_read_out_t, ((int32_t)(ret))) MERCURY_GEN_PROC(data_xfer_read_out_t, ((int32_t)(ret)))
MERCURY_GEN_PROC(data_xfer_read_in_t, MERCURY_GEN_PROC(data_xfer_read_in_t,
((hg_string_t)(bulk_relay_addr))\
((hg_bulk_t)(bulk_handle))) ((hg_bulk_t)(bulk_handle)))
#endif /* __DATA_XFER_PROTO */ #endif /* __DATA_XFER_PROTO */
...@@ -21,6 +21,7 @@ static void data_xfer_read_ult(hg_handle_t handle) ...@@ -21,6 +21,7 @@ static void data_xfer_read_ult(hg_handle_t handle)
int ret; int ret;
const struct hg_info *hgi; const struct hg_info *hgi;
margo_instance_id mid; margo_instance_id mid;
hg_addr_t bulk_addr;
#if 0 #if 0
ABT_thread my_ult; ABT_thread my_ult;
ABT_xstream my_xstream; ABT_xstream my_xstream;
...@@ -44,9 +45,17 @@ static void data_xfer_read_ult(hg_handle_t handle) ...@@ -44,9 +45,17 @@ static void data_xfer_read_ult(hg_handle_t handle)
mid = margo_hg_class_to_instance(hgi->hg_class); mid = margo_hg_class_to_instance(hgi->hg_class);
if(!in.bulk_relay_addr)
bulk_addr = hgi->addr;
else
{
hret = margo_addr_lookup(mid, in.bulk_relay_addr, &bulk_addr);
assert(hret == HG_SUCCESS);
}
/* do bulk transfer from client to server */ /* do bulk transfer from client to server */
ret = margo_bulk_transfer(mid, HG_BULK_PUSH, ret = margo_bulk_transfer(mid, HG_BULK_PUSH,
hgi->addr, in.bulk_handle, 0, bulk_addr, in.bulk_handle, 0,
g_buffer_bulk_handle, 0, g_buffer_size); g_buffer_bulk_handle, 0, g_buffer_size);
assert(ret == 0); assert(ret == 0);
......
...@@ -24,6 +24,8 @@ static void delegator_read_ult(hg_handle_t handle) ...@@ -24,6 +24,8 @@ static void delegator_read_ult(hg_handle_t handle)
const struct hg_info *hgi; const struct hg_info *hgi;
margo_instance_id mid; margo_instance_id mid;
hg_handle_t handle_relay; hg_handle_t handle_relay;
char relay_addr_string[64];
hg_size_t relay_addr_string_sz = 64;
#if 0 #if 0
ABT_thread my_ult; ABT_thread my_ult;
ABT_xstream my_xstream; ABT_xstream my_xstream;
...@@ -52,6 +54,9 @@ static void delegator_read_ult(hg_handle_t handle) ...@@ -52,6 +54,9 @@ static void delegator_read_ult(hg_handle_t handle)
assert(hret == HG_SUCCESS); assert(hret == HG_SUCCESS);
/* pass through bulk handle */ /* pass through bulk handle */
in_relay.bulk_handle = in.bulk_handle; in_relay.bulk_handle = in.bulk_handle;
hret = HG_Addr_to_string(margo_get_class(mid), relay_addr_string, &relay_addr_string_sz, hgi->addr);
assert(hret == HG_SUCCESS);
in_relay.bulk_relay_addr = relay_addr_string;
margo_forward(mid, handle_relay, &in_relay); margo_forward(mid, handle_relay, &in_relay);
hret = HG_Get_output(handle_relay, &out_relay); hret = HG_Get_output(handle_relay, &out_relay);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment