Commit 97421e01 authored by Philip Carns's avatar Philip Carns

eager mode in client path

parent b1b4e799
......@@ -44,7 +44,7 @@ typedef struct
uint32_t size;
char * buffer;
} bake_bulk_eager_write_in_t;
static inline hg_return_t hg_proc_bake_bulk_eager_write_in_t(hg_proc_t proc, bake_bulk_eager_write_in_t *in);
static inline hg_return_t hg_proc_bake_bulk_eager_write_in_t(hg_proc_t proc, void *v_out_p);
MERCURY_GEN_PROC(bake_bulk_eager_write_out_t,
((int32_t)(ret)))
DECLARE_MARGO_RPC_HANDLER(bake_bulk_eager_write_ult)
......@@ -122,9 +122,10 @@ static inline hg_return_t hg_proc_bake_target_id_t(hg_proc_t proc, bake_target_i
return(hg_proc_raw(proc, bti->id, sizeof(bti->id)));
}
static inline hg_return_t hg_proc_bake_bulk_eager_write_in_t(hg_proc_t proc, bake_bulk_eager_write_in_t *in)
static inline hg_return_t hg_proc_bake_bulk_eager_write_in_t(hg_proc_t proc, void *v_out_p)
{
/* TODO: error checking */
bake_bulk_eager_write_in_t *in = v_out_p;
hg_proc_bake_target_id_t(proc, &in->bti);
hg_proc_bake_bulk_region_id_t(proc, &in->rid);
......
......@@ -24,6 +24,7 @@ struct hg_instance
hg_id_t bake_bulk_probe_id;
hg_id_t bake_bulk_shutdown_id;
hg_id_t bake_bulk_create_id;
hg_id_t bake_bulk_eager_write_id;
hg_id_t bake_bulk_write_id;
hg_id_t bake_bulk_persist_id;
hg_id_t bake_bulk_get_size_id;
......@@ -102,6 +103,12 @@ static int hg_instance_init(const char *mercury_dest)
bake_bulk_write_in_t,
bake_bulk_write_out_t,
NULL);
g_hginst.bake_bulk_eager_write_id =
MERCURY_REGISTER(g_hginst.hg_class,
"bake_bulk_eager_write_rpc",
bake_bulk_eager_write_in_t,
bake_bulk_eager_write_out_t,
NULL);
g_hginst.bake_bulk_persist_id =
MERCURY_REGISTER(g_hginst.hg_class,
"bake_bulk_persist_rpc",
......@@ -277,6 +284,61 @@ int bake_shutdown_service(bake_target_id_t bti)
return(0);
}
int bake_bulk_eager_write(
bake_target_id_t bti,
bake_bulk_region_id_t rid,
uint64_t region_offset,
void const *buf,
uint64_t buf_size)
{
hg_return_t hret;
hg_handle_t handle;
bake_bulk_eager_write_in_t in;
bake_bulk_eager_write_out_t out;
int ret;
struct bake_instance *instance = NULL;
HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
if(!instance)
return(-1);
in.bti = bti;
in.rid = rid;
in.region_offset = region_offset;
in.size = buf_size;
in.buffer = (char*)buf;
/* create handle */
hret = HG_Create(g_hginst.hg_context, instance->dest,
g_hginst.bake_bulk_eager_write_id, &handle);
if(hret != HG_SUCCESS)
{
return(-1);
}
hret = margo_forward(g_hginst.mid, handle, &in);
if(hret != HG_SUCCESS)
{
HG_Destroy(handle);
return(-1);
}
hret = HG_Get_output(handle, &out);
if(hret != HG_SUCCESS)
{
HG_Destroy(handle);
return(-1);
}
ret = out.ret;
HG_Free_output(handle, &out);
HG_Destroy(handle);
return(ret);
}
#define BAKE_BULK_EAGER_LIMIT 4096
int bake_bulk_write(
bake_target_id_t bti,
bake_bulk_region_id_t rid,
......@@ -291,6 +353,11 @@ int bake_bulk_write(
int ret;
struct bake_instance *instance = NULL;
if(buf_size <= BAKE_BULK_EAGER_LIMIT)
{
return(bake_bulk_eager_write(bti, rid, region_offset, buf, buf_size));
}
HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
if(!instance)
return(-1);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment