Commit b656534f authored by Shane Snyder's avatar Shane Snyder

Merge branch 'dev-handle-cache' into margo-registered-data

parents e38ef611 5ae1ce8f
......@@ -124,7 +124,7 @@ int main(int argc, char **argv)
assert(hret == HG_SUCCESS);
hret = margo_forward(mid, handle, NULL);
assert(hret == HG_SUCCESS);
margo_destroy(handle);
margo_destroy(mid, handle);
if(strcmp(argv[1], argv[2]))
{
sleep(3);
......@@ -133,7 +133,7 @@ int main(int argc, char **argv)
assert(hret == HG_SUCCESS);
hret = margo_forward(mid, handle, NULL);
assert(hret == HG_SUCCESS);
margo_destroy(handle);
margo_destroy(mid, handle);
}
margo_addr_free(mid, delegator_svr_addr);
......
......@@ -72,7 +72,7 @@ void composed_read(margo_instance_id mid, hg_addr_t svr_addr, void *buffer, hg_s
/* clean up resources consumed by this rpc */
margo_free_output(handle, &out);
margo_bulk_free(in.bulk_handle);
margo_destroy(handle);
margo_destroy(mid, handle);
return;
}
......@@ -122,7 +122,7 @@ void data_xfer_read(margo_instance_id mid, hg_addr_t svr_addr, void *buffer, hg_
/* clean up resources consumed by this rpc */
margo_free_output(handle, &out);
margo_bulk_free(in.bulk_handle);
margo_destroy(handle);
margo_destroy(mid, handle);
margo_addr_free(mid, addr_self);
return;
......
......@@ -36,7 +36,7 @@ static void my_rpc_shutdown_ult(hg_handle_t handle)
hret = margo_respond(mid, handle, NULL);
assert(hret == HG_SUCCESS);
margo_destroy(handle);
margo_destroy(mid, handle);
/* NOTE: we assume that the server daemon is using
* margo_wait_for_finalize() to suspend until this RPC executes, so there
......
......@@ -66,7 +66,7 @@ static void data_xfer_read_ult(hg_handle_t handle)
hret = margo_respond(mid, handle, &out);
assert(hret == HG_SUCCESS);
margo_destroy(handle);
margo_destroy(mid, handle);
return;
}
......
......@@ -74,8 +74,8 @@ static void delegator_read_ult(hg_handle_t handle)
assert(hret == HG_SUCCESS);
margo_addr_free(mid, data_xfer_svc_addr);
margo_destroy(handle);
margo_destroy(handle_relay);
margo_destroy(mid, handle);
margo_destroy(mid, handle_relay);
return;
}
......
......@@ -143,7 +143,7 @@ int main(int argc, char **argv)
hret = margo_forward(mid, handle, NULL);
assert(hret == HG_SUCCESS);
margo_destroy(handle);
margo_destroy(mid, handle);
margo_addr_free(mid, svr_addr);
/* shut down everything */
......@@ -196,7 +196,7 @@ static void run_my_rpc(void *_arg)
/* clean up resources consumed by this rpc */
margo_bulk_free(in.bulk_handle);
margo_free_output(handle, &out);
margo_destroy(handle);
margo_destroy(arg->mid, handle);
free(buffer);
printf("ULT [%d] done.\n", arg->val);
......
......@@ -76,7 +76,7 @@ int main(int argc, char **argv)
hret = margo_forward(mid, handle, NULL);
assert(hret == HG_SUCCESS);
margo_destroy(handle);
margo_destroy(mid, handle);
margo_addr_free(mid, svr_addr);
/* shut down everything */
......
......@@ -36,7 +36,7 @@ static void my_rpc_shutdown_ult(hg_handle_t handle)
hret = margo_respond(mid, handle, NULL);
assert(hret == HG_SUCCESS);
margo_destroy(handle);
margo_destroy(mid, handle);
/* NOTE: we assume that the server daemon is using
* margo_wait_for_finalize() to suspend until this RPC executes, so there
......
......@@ -70,7 +70,7 @@ void svc1_do_thing(margo_instance_id mid, hg_addr_t svr_addr, uint32_t mplex_id)
/* clean up resources consumed by this rpc */
margo_free_output(handle, &out);
margo_bulk_free(in.bulk_handle);
margo_destroy(handle);
margo_destroy(mid, handle);
free(buffer);
return;
......@@ -116,7 +116,7 @@ void svc1_do_other_thing(margo_instance_id mid, hg_addr_t svr_addr, uint32_t mpl
/* clean up resources consumed by this rpc */
margo_free_output(handle, &out);
margo_bulk_free(in.bulk_handle);
margo_destroy(handle);
margo_destroy(mid, handle);
free(buffer);
return;
......
......@@ -60,7 +60,7 @@ static void svc1_do_thing_ult(hg_handle_t handle)
assert(hret == HG_SUCCESS);
margo_bulk_free(bulk_handle);
margo_destroy(handle);
margo_destroy(mid, handle);
free(buffer);
return;
......@@ -118,7 +118,7 @@ static void svc1_do_other_thing_ult(hg_handle_t handle)
assert(hret == HG_SUCCESS);
margo_bulk_free(bulk_handle);
margo_destroy(handle);
margo_destroy(mid, handle);
free(buffer);
return;
......
......@@ -69,7 +69,7 @@ void svc2_do_thing(margo_instance_id mid, hg_addr_t svr_addr, uint32_t mplex_id)
/* clean up resources consumed by this rpc */
margo_free_output(handle, &out);
margo_bulk_free(in.bulk_handle);
margo_destroy(handle);
margo_destroy(mid, handle);
free(buffer);
return;
......@@ -115,7 +115,7 @@ void svc2_do_other_thing(margo_instance_id mid, hg_addr_t svr_addr, uint32_t mpl
/* clean up resources consumed by this rpc */
margo_free_output(handle, &out);
margo_bulk_free(in.bulk_handle);
margo_destroy(handle);
margo_destroy(mid, handle);
free(buffer);
return;
......
......@@ -60,7 +60,7 @@ static void svc2_do_thing_ult(hg_handle_t handle)
assert(hret == HG_SUCCESS);
margo_bulk_free(bulk_handle);
margo_destroy(handle);
margo_destroy(mid, handle);
free(buffer);
return;
......@@ -118,7 +118,7 @@ static void svc2_do_other_thing_ult(hg_handle_t handle)
assert(hret == HG_SUCCESS);
margo_bulk_free(bulk_handle);
margo_destroy(handle);
margo_destroy(mid, handle);
free(buffer);
return;
......
......@@ -80,7 +80,7 @@ static void my_rpc_ult(hg_handle_t handle)
assert(hret == HG_SUCCESS);
margo_bulk_free(bulk_handle);
margo_destroy(handle);
margo_destroy(mid, handle);
free(buffer);
return;
......@@ -104,7 +104,7 @@ static void my_rpc_shutdown_ult(hg_handle_t handle)
hret = margo_respond(mid, handle, NULL);
assert(hret == HG_SUCCESS);
margo_destroy(handle);
margo_destroy(mid, handle);
margo_diag_dump(mid, "-", 0);
......
......@@ -292,11 +292,13 @@ hg_return_t margo_create(
/**
* Destroy Mercury handle.
*
* \param [in] handle Mercury handle
* \param [in] mid Margo instance
* \param [in] handle Mercury handle
*
* \return HG_SUCCESS or corresponding HG error code
*/
hg_return_t margo_destroy(
margo_instance_id mid,
hg_handle_t handle);
/**
......
......@@ -23,6 +23,7 @@
#include "uthash.h"
#define DEFAULT_MERCURY_PROGRESS_TIMEOUT_UB 100 /* 100 milliseconds */
#define DEFAULT_MERCURY_HANDLE_CACHE_SIZE 32
struct mplex_key
{
......@@ -53,9 +54,16 @@ do {\
if((__time) < __data.min) __data.min = (__time); \
} while(0)
struct margo_handle_cache_el
{
hg_handle_t handle;
UT_hash_handle hh; /* in-use hash link */
struct margo_handle_cache_el *next; /* free list link */
};
struct margo_instance
{
/* provided by caller */
/* mercury/argobots state */
hg_context_t *hg_context;
hg_class_t *hg_class;
ABT_pool handler_pool;
......@@ -80,6 +88,10 @@ struct margo_instance
/* hash table to track multiplexed rpcs registered with margo */
struct mplex_element *mplex_table;
/* linked list of free hg handles and a hash of in-use handles */
struct margo_handle_cache_el *free_handle_list;
struct margo_handle_cache_el *used_handle_hash;
/* optional diagnostics data tracking */
/* NOTE: technically the following fields are subject to races if they
* are updated from more than one thread at a time. We will be careful
......@@ -109,6 +121,13 @@ struct margo_rpc_data
static void hg_progress_fn(void* foo);
static void margo_rpc_data_free(void* ptr);
static hg_return_t margo_handle_cache_init(margo_instance_id mid);
static void margo_handle_cache_destroy(margo_instance_id mid);
static hg_return_t margo_handle_cache_get(margo_instance_id mid,
hg_addr_t addr, hg_id_t id, hg_handle_t *handle);
static hg_return_t margo_handle_cache_put(margo_instance_id mid,
hg_handle_t handle);
margo_instance_id margo_init(const char *addr_str, int mode,
int use_progress_thread, int rpc_thread_count)
{
......@@ -239,6 +258,7 @@ margo_instance_id margo_init_pool(ABT_pool progress_pool, ABT_pool handler_pool,
hg_context_t *hg_context)
{
int ret;
hg_return_t hret;
struct margo_instance *mid;
mid = malloc(sizeof(*mid));
......@@ -258,6 +278,10 @@ margo_instance_id margo_init_pool(ABT_pool progress_pool, ABT_pool handler_pool,
ret = margo_timer_instance_init(mid);
if(ret != 0) goto err;
/* initialize the handle cache */
hret = margo_handle_cache_init(mid);
if(hret != HG_SUCCESS) goto err;
ret = ABT_thread_create(mid->progress_pool, hg_progress_fn, mid,
ABT_THREAD_ATTR_NULL, &mid->hg_progress_tid);
if(ret != 0) goto err;
......@@ -267,6 +291,7 @@ margo_instance_id margo_init_pool(ABT_pool progress_pool, ABT_pool handler_pool,
err:
if(mid)
{
margo_handle_cache_destroy(mid);
margo_timer_instance_finalize(mid);
ABT_mutex_free(&mid->finalize_mutex);
ABT_cond_free(&mid->finalize_cond);
......@@ -300,6 +325,8 @@ static void margo_cleanup(margo_instance_id mid)
free(mid->rpc_xstreams);
}
margo_handle_cache_destroy(mid);
if (mid->margo_init)
{
if (mid->hg_context)
......@@ -553,16 +580,32 @@ hg_return_t margo_addr_to_string(
hg_return_t margo_create(margo_instance_id mid, hg_addr_t addr,
hg_id_t id, hg_handle_t *handle)
{
/* TODO: handle caching logic? */
hg_return_t hret;
/* look for a handle to reuse */
hret = margo_handle_cache_get(mid, addr, id, handle);
if(hret != HG_SUCCESS)
{
/* else try creating a new handle */
hret = HG_Create(mid->hg_context, addr, id, handle);
}
return(HG_Create(mid->hg_context, addr, id, handle));
return hret;
}
hg_return_t margo_destroy(hg_handle_t handle)
hg_return_t margo_destroy(margo_instance_id mid, hg_handle_t handle)
{
/* TODO handle caching logic? */
hg_return_t hret;
/* recycle this handle if it came from the handle cache */
hret = margo_handle_cache_put(mid, handle);
if(hret != HG_SUCCESS)
{
/* else destroy the handle manually */
hret = HG_Destroy(handle);
}
return(HG_Destroy(handle));
return hret;
}
static hg_return_t margo_cb(const struct hg_cb_info *info)
......@@ -1131,3 +1174,105 @@ void margo_get_param(margo_instance_id mid, int option, void *param)
return;
}
static hg_return_t margo_handle_cache_init(margo_instance_id mid)
{
int i;
struct margo_handle_cache_el *el;
hg_return_t hret = HG_SUCCESS;
for(i = 0; i < DEFAULT_MERCURY_HANDLE_CACHE_SIZE; i++)
{
el = malloc(sizeof(*el));
if(!el)
{
hret = HG_NOMEM_ERROR;
margo_handle_cache_destroy(mid);
break;
}
/* create handle with NULL_ADDRs, we will reset later to valid addrs */
hret = HG_Create(mid->hg_context, HG_ADDR_NULL, 0, &el->handle);
if(hret != HG_SUCCESS)
{
free(el);
margo_handle_cache_destroy(mid);
break;
}
/* add to the free list */
LL_PREPEND(mid->free_handle_list, el);
}
return hret;
}
static void margo_handle_cache_destroy(margo_instance_id mid)
{
struct margo_handle_cache_el *el, *tmp;
/* only free handle list elements -- handles in hash are still in use */
LL_FOREACH_SAFE(mid->free_handle_list, el, tmp)
{
LL_DELETE(mid->free_handle_list, el);
HG_Destroy(el->handle);
free(el);
}
return;
}
static hg_return_t margo_handle_cache_get(margo_instance_id mid,
hg_addr_t addr, hg_id_t id, hg_handle_t *handle)
{
struct margo_handle_cache_el *el;
hg_return_t hret;
if(!mid->free_handle_list)
{
/* if no available handles, just fall through */
return HG_OTHER_ERROR;
}
/* pop first element from the free handle list */
el = mid->free_handle_list;
LL_DELETE(mid->free_handle_list, el);
/* reset handle */
hret = HG_Reset(el->handle, addr, id);
if(hret == HG_SUCCESS)
{
/* put on in-use list and pass back handle */
HASH_ADD(hh, mid->used_handle_hash, handle, sizeof(hg_handle_t), el);
*handle = el->handle;
}
else
{
/* reset failed, add handle back to the free list */
LL_APPEND(mid->free_handle_list, el);
}
return hret;
}
static hg_return_t margo_handle_cache_put(margo_instance_id mid,
hg_handle_t handle)
{
struct margo_handle_cache_el *el;
/* look for handle in the in-use hash */
HASH_FIND(hh, mid->used_handle_hash, &handle, sizeof(hg_handle_t), el);
if(!el)
{
/* this handle was manually allocated -- just fall through */
return HG_OTHER_ERROR;
}
/* remove from the in-use hash */
HASH_DELETE(hh, mid->used_handle_hash, el);
/* add to the tail of the free handle list */
LL_APPEND(mid->free_handle_list, el);
return HG_SUCCESS;
}
......@@ -144,7 +144,7 @@ int main(int argc, char **argv)
hret = margo_forward_timed(mid, handle, NULL, 2000.0);
assert(hret == HG_SUCCESS);
margo_destroy(handle);
margo_destroy(mid, handle);
margo_addr_free(mid, svr_addr);
/* shut down everything */
......@@ -202,7 +202,7 @@ static void run_my_rpc(void *_arg)
/* clean up resources consumed by this rpc */
margo_bulk_free(in.bulk_handle);
margo_destroy(handle);
margo_destroy(arg->mid, handle);
free(buffer);
printf("ULT [%d] done.\n", arg->val);
......
......@@ -142,7 +142,7 @@ int main(int argc, char **argv)
hret = margo_forward(mid, handle, NULL);
assert(hret == HG_SUCCESS);
margo_destroy(handle);
margo_destroy(mid, handle);
margo_addr_free(mid, svr_addr);
/* shut down everything */
......@@ -194,7 +194,7 @@ static void run_my_rpc(void *_arg)
/* clean up resources consumed by this rpc */
margo_bulk_free(in.bulk_handle);
margo_free_output(handle, &out);
margo_destroy(handle);
margo_destroy(arg->mid, handle);
free(buffer);
printf("ULT [%d] done.\n", arg->val);
......
......@@ -80,7 +80,7 @@ static void my_rpc_ult(hg_handle_t handle)
assert(hret == HG_SUCCESS);
margo_bulk_free(bulk_handle);
margo_destroy(handle);
margo_destroy(mid, handle);
free(buffer);
return;
......@@ -104,7 +104,7 @@ static void my_rpc_shutdown_ult(hg_handle_t handle)
hret = margo_respond(mid, handle, NULL);
assert(hret == HG_SUCCESS);
margo_destroy(handle);
margo_destroy(mid, handle);
/* NOTE: we assume that the server daemon is using
* margo_wait_for_finalize() to suspend until this RPC executes, so there
......@@ -164,7 +164,7 @@ static void my_rpc_hang_ult(hg_handle_t handle)
assert(hret == HG_SUCCESS);
margo_bulk_free(bulk_handle);
margo_destroy(handle);
margo_destroy(mid, handle);
free(buffer);
return;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment