Commit 3581e140 authored by Shane Snyder's avatar Shane Snyder

implement client-side proxy read call

parent ff058a00
......@@ -124,6 +124,18 @@ int bake_bulk_read(
void *buf,
uint64_t buf_size);
/**
*
*/
int bake_bulk_proxy_read(
bake_target_id_t bti,
bake_bulk_region_id_t rid,
uint64_t region_offset,
hg_bulk_t remote_bulk,
uint64_t remote_offset,
const char* remote_addr,
uint64_t size);
/**
* Release local resources associated with access to a target; does not
* modify the target in any way.
......
......@@ -10,6 +10,8 @@
#include "uthash.h"
#include "bake-bulk-rpc.h"
#define BAKE_BULK_EAGER_LIMIT 2048
/* Refers to a single Margo initialization, for now this is shared by
* all remote targets. In the future we probably need to support multiple in
* case we run atop more than one transport at a time.
......@@ -45,12 +47,6 @@ struct bake_margo_instance g_margo_inst = {
.mid = MARGO_INSTANCE_NULL,
};
static int bake_bulk_eager_read(
bake_target_id_t bti,
bake_bulk_region_id_t rid,
uint64_t region_offset,
void *buf,
uint64_t buf_size);
/* XXX calling this function again just overwrites the previous global mid...
* need to be smarter if we truly want to support multiple client-side mids
......@@ -288,8 +284,6 @@ static int bake_bulk_eager_write(
return(ret);
}
#define BAKE_BULK_EAGER_LIMIT 2048
int bake_bulk_write(
bake_target_id_t bti,
bake_bulk_region_id_t rid,
......@@ -573,6 +567,57 @@ int bake_bulk_noop(
return(0);
}
static int bake_bulk_eager_read(
bake_target_id_t bti,
bake_bulk_region_id_t rid,
uint64_t region_offset,
void *buf,
uint64_t buf_size)
{
hg_return_t hret;
hg_handle_t handle;
bake_bulk_eager_read_in_t in;
bake_bulk_eager_read_out_t out;
int ret;
struct bake_instance *instance = NULL;
HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
if(!instance)
return(-1);
in.bti = bti;
in.rid = rid;
in.region_offset = region_offset;
in.size = buf_size;
hret = margo_create(g_margo_inst.mid, instance->dest,
g_margo_inst.bake_bulk_eager_read_id, &handle);
if(hret != HG_SUCCESS)
return(-1);
hret = margo_forward(handle, &in);
if(hret != HG_SUCCESS)
{
margo_destroy(handle);
return(-1);
}
hret = margo_get_output(handle, &out);
if(hret != HG_SUCCESS)
{
margo_destroy(handle);
return(-1);
}
ret = out.ret;
if(ret == 0)
memcpy(buf, out.buffer, out.size);
margo_free_output(handle, &out);
margo_destroy(handle);
return(ret);
}
int bake_bulk_read(
bake_target_id_t bti,
bake_bulk_region_id_t rid,
......@@ -597,6 +642,9 @@ int bake_bulk_read(
in.bti = bti;
in.rid = rid;
in.region_offset = region_offset;
in.bulk_offset = 0;
in.bulk_size = buf_size;
in.remote_addr_str = NULL; /* set remote_addr to NULL to disable proxy read */
hret = margo_bulk_create(g_margo_inst.mid, 1, (void**)(&buf), &buf_size,
HG_BULK_WRITE_ONLY, &in.bulk_handle);
......@@ -635,20 +683,21 @@ int bake_bulk_read(
return(ret);
}
static int bake_bulk_eager_read(
int bake_bulk_proxy_read(
bake_target_id_t bti,
bake_bulk_region_id_t rid,
uint64_t region_offset,
void *buf,
uint64_t buf_size)
hg_bulk_t remote_bulk,
uint64_t remote_offset,
const char* remote_addr,
uint64_t size)
{
hg_return_t hret;
hg_handle_t handle;
bake_bulk_eager_read_in_t in;
bake_bulk_eager_read_out_t out;
int ret;
bake_bulk_read_in_t in;
bake_bulk_read_out_t out;
struct bake_instance *instance = NULL;
int ret;
HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
if(!instance)
......@@ -657,13 +706,16 @@ static int bake_bulk_eager_read(
in.bti = bti;
in.rid = rid;
in.region_offset = region_offset;
in.size = buf_size;
in.bulk_handle = remote_bulk;
in.bulk_offset = remote_offset;
in.bulk_size = size;
in.remote_addr_str = (char*)remote_addr;
hret = margo_create(g_margo_inst.mid, instance->dest,
g_margo_inst.bake_bulk_eager_read_id, &handle);
g_margo_inst.bake_bulk_read_id, &handle);
if(hret != HG_SUCCESS)
return(-1);
hret = margo_forward(handle, &in);
if(hret != HG_SUCCESS)
{
......@@ -677,10 +729,8 @@ static int bake_bulk_eager_read(
margo_destroy(handle);
return(-1);
}
ret = out.ret;
if(ret == 0)
memcpy(buf, out.buffer, out.size);
margo_free_output(handle, &out);
margo_destroy(handle);
......
......@@ -76,7 +76,12 @@ MERCURY_GEN_PROC(bake_bulk_read_in_t,
((bake_target_id_t)(bti))\
((bake_bulk_region_id_t)(rid))\
((uint64_t)(region_offset))\
((hg_bulk_t)(bulk_handle)))
((hg_bulk_t)(bulk_handle))\
((uint64_t)(bulk_offset))\
((uint64_t)(bulk_size))\
((hg_string_t)(remote_addr_str)))
MERCURY_GEN_PROC(bake_bulk_read_out_t,
((int32_t)(ret)))
DECLARE_MARGO_RPC_HANDLER(bake_bulk_read_ult)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment