Commit c7066a8d authored by Philip Carns's avatar Philip Carns

implement client-side handle cache

parent 3fa30d81
......@@ -20,7 +20,7 @@ struct hg_instance
hg_class_t *hg_class;
hg_context_t *hg_context;
int refct;
hg_id_t bake_bulk_probe_id;
hg_id_t bake_bulk_shutdown_id;
hg_id_t bake_bulk_create_id;
......@@ -33,15 +33,24 @@ struct hg_instance
hg_id_t bake_bulk_noop_id;
};
struct bake_handle_cache_el
{
hg_id_t id;
hg_handle_t handle;
UT_hash_handle hh;
};
/* Refers to an instance connected to a specific target */
struct bake_instance
{
bake_target_id_t bti; /* persistent identifier for this target */
hg_addr_t dest; /* resolved Mercury address */
struct bake_handle_cache_el *handle_hash;
UT_hash_handle hh;
};
struct bake_instance *instance_hash;
struct bake_instance *instance_hash = NULL;
struct hg_instance g_hginst = {
.mid = MARGO_INSTANCE_NULL,
......@@ -56,6 +65,8 @@ static int bake_bulk_eager_read(
uint64_t region_offset,
void *buf,
uint64_t buf_size);
static struct bake_handle_cache_el *get_handle(struct bake_instance *instance, hg_id_t id);
static void put_handle(struct bake_instance *instance, struct bake_handle_cache_el *el);
static int hg_instance_init(const char *mercury_dest)
{
......@@ -272,29 +283,24 @@ void bake_release_instance(
int bake_shutdown_service(bake_target_id_t bti)
{
hg_return_t hret;
hg_handle_t handle;
struct bake_instance *instance = NULL;
struct bake_handle_cache_el *el = NULL;
HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
if(!instance)
return(-1);
/* create handle */
hret = HG_Create(g_hginst.hg_context, instance->dest,
g_hginst.bake_bulk_shutdown_id, &handle);
if(hret != HG_SUCCESS)
{
return(-1);
}
el = get_handle(instance, g_hginst.bake_bulk_shutdown_id);
assert(el);
hret = margo_forward(g_hginst.mid, handle, NULL);
hret = margo_forward(g_hginst.mid, el->handle, NULL);
if(hret != HG_SUCCESS)
{
HG_Destroy(handle);
put_handle(instance, el);
return(-1);
}
HG_Destroy(handle);
put_handle(instance, el);
return(0);
}
......@@ -306,11 +312,11 @@ static int bake_bulk_eager_write(
uint64_t buf_size)
{
hg_return_t hret;
hg_handle_t handle;
bake_bulk_eager_write_in_t in;
bake_bulk_eager_write_out_t out;
int ret;
struct bake_instance *instance = NULL;
struct bake_handle_cache_el *el = NULL;
HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
if(!instance)
......@@ -321,33 +327,29 @@ static int bake_bulk_eager_write(
in.region_offset = region_offset;
in.size = buf_size;
in.buffer = (char*)buf;
/* create handle */
hret = HG_Create(g_hginst.hg_context, instance->dest,
g_hginst.bake_bulk_eager_write_id, &handle);
if(hret != HG_SUCCESS)
{
return(-1);
}
el = get_handle(instance, g_hginst.bake_bulk_eager_write_id);
assert(el);
hret = margo_forward(g_hginst.mid, handle, &in);
hret = margo_forward(g_hginst.mid, el->handle, &in);
if(hret != HG_SUCCESS)
{
HG_Destroy(handle);
put_handle(instance, el);
return(-1);
}
hret = HG_Get_output(handle, &out);
hret = HG_Get_output(el->handle, &out);
if(hret != HG_SUCCESS)
{
HG_Destroy(handle);
put_handle(instance, el);
return(-1);
}
ret = out.ret;
HG_Free_output(handle, &out);
HG_Destroy(handle);
HG_Free_output(el->handle, &out);
put_handle(instance, el);
return(ret);
}
......@@ -361,11 +363,11 @@ int bake_bulk_write(
uint64_t buf_size)
{
hg_return_t hret;
hg_handle_t handle;
bake_bulk_write_in_t in;
bake_bulk_write_out_t out;
int ret;
struct bake_instance *instance = NULL;
struct bake_handle_cache_el *el = NULL;
if(buf_size <= BAKE_BULK_EAGER_LIMIT)
{
......@@ -387,36 +389,30 @@ int bake_bulk_write(
return(-1);
}
/* create handle */
hret = HG_Create(g_hginst.hg_context, instance->dest,
g_hginst.bake_bulk_write_id, &handle);
if(hret != HG_SUCCESS)
{
HG_Bulk_free(in.bulk_handle);
return(-1);
}
el = get_handle(instance, g_hginst.bake_bulk_write_id);
assert(el);
hret = margo_forward(g_hginst.mid, handle, &in);
hret = margo_forward(g_hginst.mid, el->handle, &in);
if(hret != HG_SUCCESS)
{
HG_Destroy(handle);
HG_Bulk_free(in.bulk_handle);
put_handle(instance, el);
return(-1);
}
hret = HG_Get_output(handle, &out);
hret = HG_Get_output(el->handle, &out);
if(hret != HG_SUCCESS)
{
HG_Destroy(handle);
HG_Bulk_free(in.bulk_handle);
put_handle(instance, el);
return(-1);
}
ret = out.ret;
HG_Free_output(handle, &out);
HG_Destroy(handle);
HG_Free_output(el->handle, &out);
HG_Bulk_free(in.bulk_handle);
put_handle(instance, el);
return(ret);
}
......@@ -426,11 +422,11 @@ int bake_bulk_create(
bake_bulk_region_id_t *rid)
{
hg_return_t hret;
hg_handle_t handle;
bake_bulk_create_in_t in;
bake_bulk_create_out_t out;
int ret;
struct bake_instance *instance = NULL;
struct bake_handle_cache_el *el = NULL;
HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
if(!instance)
......@@ -439,33 +435,28 @@ int bake_bulk_create(
in.bti = bti;
in.region_size = region_size;
/* create handle */
hret = HG_Create(g_hginst.hg_context, instance->dest,
g_hginst.bake_bulk_create_id, &handle);
if(hret != HG_SUCCESS)
{
return(-1);
}
el = get_handle(instance, g_hginst.bake_bulk_create_id);
assert(el);
hret = margo_forward(g_hginst.mid, handle, &in);
hret = margo_forward(g_hginst.mid, el->handle, &in);
if(hret != HG_SUCCESS)
{
HG_Destroy(handle);
put_handle(instance, el);
return(-1);
}
hret = HG_Get_output(handle, &out);
hret = HG_Get_output(el->handle, &out);
if(hret != HG_SUCCESS)
{
HG_Destroy(handle);
put_handle(instance, el);
return(-1);
}
ret = out.ret;
*rid = out.rid;
HG_Free_output(handle, &out);
HG_Destroy(handle);
HG_Free_output(el->handle, &out);
put_handle(instance, el);
return(ret);
}
......@@ -475,11 +466,11 @@ int bake_bulk_persist(
bake_bulk_region_id_t rid)
{
hg_return_t hret;
hg_handle_t handle;
bake_bulk_persist_in_t in;
bake_bulk_persist_out_t out;
int ret;
struct bake_instance *instance = NULL;
struct bake_handle_cache_el *el = NULL;
HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
if(!instance)
......@@ -488,32 +479,27 @@ int bake_bulk_persist(
in.bti = bti;
in.rid = rid;
/* create handle */
hret = HG_Create(g_hginst.hg_context, instance->dest,
g_hginst.bake_bulk_persist_id, &handle);
if(hret != HG_SUCCESS)
{
return(-1);
}
el = get_handle(instance, g_hginst.bake_bulk_persist_id);
assert(el);
hret = margo_forward(g_hginst.mid, handle, &in);
hret = margo_forward(g_hginst.mid, el->handle, &in);
if(hret != HG_SUCCESS)
{
HG_Destroy(handle);
put_handle(instance, el);
return(-1);
}
hret = HG_Get_output(handle, &out);
hret = HG_Get_output(el->handle, &out);
if(hret != HG_SUCCESS)
{
HG_Destroy(handle);
put_handle(instance, el);
return(-1);
}
ret = out.ret;
HG_Free_output(handle, &out);
HG_Destroy(handle);
HG_Free_output(el->handle, &out);
put_handle(instance, el);
return(ret);
}
......@@ -523,11 +509,11 @@ int bake_bulk_get_size(
uint64_t *region_size)
{
hg_return_t hret;
hg_handle_t handle;
bake_bulk_get_size_in_t in;
bake_bulk_get_size_out_t out;
int ret;
struct bake_instance *instance = NULL;
struct bake_handle_cache_el *el = NULL;
HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
if(!instance)
......@@ -536,33 +522,28 @@ int bake_bulk_get_size(
in.bti = bti;
in.rid = rid;
/* create handle */
hret = HG_Create(g_hginst.hg_context, instance->dest,
g_hginst.bake_bulk_get_size_id, &handle);
if(hret != HG_SUCCESS)
{
return(-1);
}
el = get_handle(instance, g_hginst.bake_bulk_get_size_id);
assert(el);
hret = margo_forward(g_hginst.mid, handle, &in);
hret = margo_forward(g_hginst.mid, el->handle, &in);
if(hret != HG_SUCCESS)
{
HG_Destroy(handle);
put_handle(instance, el);
return(-1);
}
hret = HG_Get_output(handle, &out);
hret = HG_Get_output(el->handle, &out);
if(hret != HG_SUCCESS)
{
HG_Destroy(handle);
put_handle(instance, el);
return(-1);
}
ret = out.ret;
*region_size = out.size;
HG_Free_output(handle, &out);
HG_Destroy(handle);
HG_Free_output(el->handle, &out);
put_handle(instance, el);
return(ret);
}
......@@ -570,29 +551,24 @@ int bake_bulk_noop(
bake_target_id_t bti)
{
hg_return_t hret;
hg_handle_t handle;
struct bake_instance *instance = NULL;
struct bake_handle_cache_el *el = NULL;
HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
if(!instance)
return(-1);
/* create handle */
hret = HG_Create(g_hginst.hg_context, instance->dest,
g_hginst.bake_bulk_noop_id, &handle);
if(hret != HG_SUCCESS)
{
return(-1);
}
el = get_handle(instance, g_hginst.bake_bulk_noop_id);
assert(el);
hret = margo_forward(g_hginst.mid, handle, NULL);
hret = margo_forward(g_hginst.mid, el->handle, NULL);
if(hret != HG_SUCCESS)
{
HG_Destroy(handle);
put_handle(instance, el);
return(-1);
}
HG_Destroy(handle);
put_handle(instance, el);
return(0);
}
......@@ -604,11 +580,11 @@ int bake_bulk_read(
uint64_t buf_size)
{
hg_return_t hret;
hg_handle_t handle;
bake_bulk_read_in_t in;
bake_bulk_read_out_t out;
int ret;
struct bake_instance *instance = NULL;
struct bake_handle_cache_el *el = NULL;
if(buf_size <= BAKE_BULK_EAGER_LIMIT)
{
......@@ -630,36 +606,30 @@ int bake_bulk_read(
return(-1);
}
/* create handle */
hret = HG_Create(g_hginst.hg_context, instance->dest,
g_hginst.bake_bulk_read_id, &handle);
if(hret != HG_SUCCESS)
{
HG_Bulk_free(in.bulk_handle);
return(-1);
}
el = get_handle(instance, g_hginst.bake_bulk_read_id);
assert(el);
hret = margo_forward(g_hginst.mid, handle, &in);
hret = margo_forward(g_hginst.mid, el->handle, &in);
if(hret != HG_SUCCESS)
{
HG_Destroy(handle);
HG_Bulk_free(in.bulk_handle);
put_handle(instance, el);
return(-1);
}
hret = HG_Get_output(handle, &out);
hret = HG_Get_output(el->handle, &out);
if(hret != HG_SUCCESS)
{
HG_Destroy(handle);
HG_Bulk_free(in.bulk_handle);
put_handle(instance, el);
return(-1);
}
ret = out.ret;
HG_Free_output(handle, &out);
HG_Destroy(handle);
HG_Free_output(el->handle, &out);
HG_Bulk_free(in.bulk_handle);
put_handle(instance, el);
return(ret);
}
......@@ -672,11 +642,11 @@ static int bake_bulk_eager_read(
uint64_t buf_size)
{
hg_return_t hret;
hg_handle_t handle;
bake_bulk_eager_read_in_t in;
bake_bulk_eager_read_out_t out;
int ret;
struct bake_instance *instance = NULL;
struct bake_handle_cache_el *el = NULL;
HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
if(!instance)
......@@ -687,25 +657,20 @@ static int bake_bulk_eager_read(
in.region_offset = region_offset;
in.size = buf_size;
/* create handle */
hret = HG_Create(g_hginst.hg_context, instance->dest,
g_hginst.bake_bulk_eager_read_id, &handle);
if(hret != HG_SUCCESS)
{
return(-1);
}
el = get_handle(instance, g_hginst.bake_bulk_eager_read_id);
assert(el);
hret = margo_forward(g_hginst.mid, handle, &in);
hret = margo_forward(g_hginst.mid, el->handle, &in);
if(hret != HG_SUCCESS)
{
HG_Destroy(handle);
put_handle(instance, el);
return(-1);
}
hret = HG_Get_output(handle, &out);
hret = HG_Get_output(el->handle, &out);
if(hret != HG_SUCCESS)
{
HG_Destroy(handle);
put_handle(instance, el);
return(-1);
}
......@@ -713,9 +678,42 @@ static int bake_bulk_eager_read(
if(ret == 0)
memcpy(buf, out.buffer, out.size);
HG_Free_output(handle, &out);
HG_Destroy(handle);
HG_Free_output(el->handle, &out);
put_handle(instance, el);
return(ret);
}
static struct bake_handle_cache_el *get_handle(struct bake_instance *instance, hg_id_t id)
{
struct bake_handle_cache_el *el = NULL;
hg_return_t hret;
HASH_FIND(hh, instance->handle_hash, &id, sizeof(id), el);
if(el)
{
HASH_DELETE(hh, instance->handle_hash, el);
return(el);
}
el = malloc(sizeof(*el));
if(!el)
return(NULL);
el->id = id;
/* create handle */
hret = HG_Create(g_hginst.hg_context, instance->dest,
id, &el->handle);
if(hret != HG_SUCCESS)
{
free(el);
return(NULL);
}
return(el);
}
static void put_handle(struct bake_instance *instance, struct bake_handle_cache_el *el)
{
HASH_ADD(hh, instance->handle_hash, id, sizeof(el->id), el);
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment