Commit 942c1e3e authored by Jonathan Jenkins's avatar Jonathan Jenkins

add refcount semantics

parent b0f63518
...@@ -30,6 +30,7 @@ struct margo_instance ...@@ -30,6 +30,7 @@ struct margo_instance
/* internal to margo for this particular instance */ /* internal to margo for this particular instance */
ABT_thread hg_progress_tid; ABT_thread hg_progress_tid;
int hg_progress_shutdown_flag; int hg_progress_shutdown_flag;
ABT_xstream progress_xstream;
int owns_progress_pool; int owns_progress_pool;
ABT_xstream *rpc_xstreams; ABT_xstream *rpc_xstreams;
int num_handler_pool_threads; int num_handler_pool_threads;
...@@ -37,6 +38,7 @@ struct margo_instance ...@@ -37,6 +38,7 @@ struct margo_instance
/* control logic for callers waiting on margo to be finalized */ /* control logic for callers waiting on margo to be finalized */
int finalize_flag; int finalize_flag;
int finalize_waiters_in_progress_pool; int finalize_waiters_in_progress_pool;
int refcount;
ABT_mutex finalize_mutex; ABT_mutex finalize_mutex;
ABT_cond finalize_cond; ABT_cond finalize_cond;
...@@ -111,8 +113,9 @@ margo_instance_id margo_init(int use_progress_thread, int rpc_thread_count, ...@@ -111,8 +113,9 @@ margo_instance_id margo_init(int use_progress_thread, int rpc_thread_count,
mid = margo_init_pool(progress_pool, rpc_pool, hg_context); mid = margo_init_pool(progress_pool, rpc_pool, hg_context);
if (mid == MARGO_INSTANCE_NULL) goto err; if (mid == MARGO_INSTANCE_NULL) goto err;
mid->owns_progress_pool = !use_progress_thread; mid->owns_progress_pool = use_progress_thread;
mid->num_handler_pool_threads = rpc_thread_count; mid->progress_xstream = progress_xstream;
mid->num_handler_pool_threads = rpc_thread_count < 0 ? 0 : rpc_thread_count;
mid->rpc_xstreams = rpc_xstreams; mid->rpc_xstreams = rpc_xstreams;
return mid; return mid;
...@@ -155,6 +158,7 @@ margo_instance_id margo_init_pool(ABT_pool progress_pool, ABT_pool handler_pool, ...@@ -155,6 +158,7 @@ margo_instance_id margo_init_pool(ABT_pool progress_pool, ABT_pool handler_pool,
mid->handler_pool = handler_pool; mid->handler_pool = handler_pool;
mid->hg_class = HG_Context_get_class(hg_context); mid->hg_class = HG_Context_get_class(hg_context);
mid->hg_context = hg_context; mid->hg_context = hg_context;
mid->refcount = 1;
ret = margo_timer_instance_init(mid); ret = margo_timer_instance_init(mid);
if(ret != 0) if(ret != 0)
...@@ -181,9 +185,38 @@ margo_instance_id margo_init_pool(ABT_pool progress_pool, ABT_pool handler_pool, ...@@ -181,9 +185,38 @@ margo_instance_id margo_init_pool(ABT_pool progress_pool, ABT_pool handler_pool,
return mid; return mid;
} }
static void margo_cleanup(margo_instance_id mid)
{
int i;
margo_timer_instance_finalize(mid);
ABT_mutex_free(&mid->finalize_mutex);
ABT_cond_free(&mid->finalize_cond);
if (mid->owns_progress_pool)
{
ABT_xstream_join(mid->progress_xstream);
ABT_xstream_free(&mid->progress_xstream);
}
if (mid->num_handler_pool_threads > 0)
{
for (i = 0; i < mid->num_handler_pool_threads; i++)
{
ABT_xstream_join(mid->rpc_xstreams[i]);
ABT_xstream_free(&mid->rpc_xstreams[i]);
}
free(mid->rpc_xstreams);
}
free(mid);
}
void margo_finalize(margo_instance_id mid) void margo_finalize(margo_instance_id mid)
{ {
int i; int i;
int do_cleanup;
/* tell progress thread to wrap things up */ /* tell progress thread to wrap things up */
mid->hg_progress_shutdown_flag = 1; mid->hg_progress_shutdown_flag = 1;
...@@ -201,22 +234,17 @@ void margo_finalize(margo_instance_id mid) ...@@ -201,22 +234,17 @@ void margo_finalize(margo_instance_id mid)
ABT_mutex_lock(mid->finalize_mutex); ABT_mutex_lock(mid->finalize_mutex);
mid->finalize_flag = 1; mid->finalize_flag = 1;
ABT_cond_broadcast(mid->finalize_cond); ABT_cond_broadcast(mid->finalize_cond);
ABT_mutex_unlock(mid->finalize_mutex);
/* TODO: yuck, there is a race here if someone was really waiting for mid->refcount--;
* finalize; we can't destroy the data structures out from under them. do_cleanup = mid->refcount == 0;
* We could fix this by reference counting so that the last caller
* (whether a finalize() caller or wait_for_finalize() caller) knows it
* is safe to turn off the lights on their way out. For now we just leak
* a small amount of memory.
*/
#if 0
margo_timer_instance_finalize(mid);
ABT_mutex_free(&mid->finalize_mutex); ABT_mutex_unlock(mid->finalize_mutex);
ABT_cond_free(&mid->finalize_cond);
free(mid); /* if there was noone waiting on the finalize at the time of the finalize
#endif * broadcast, then we're safe to clean up. Otherwise, let the finalizer do
* it */
if (do_cleanup)
margo_cleanup(mid);
return; return;
} }
...@@ -224,6 +252,7 @@ void margo_finalize(margo_instance_id mid) ...@@ -224,6 +252,7 @@ void margo_finalize(margo_instance_id mid)
void margo_wait_for_finalize(margo_instance_id mid) void margo_wait_for_finalize(margo_instance_id mid)
{ {
int in_pool = 0; int in_pool = 0;
int do_cleanup;
/* Is this waiter in the same pool as the pool running the progress /* Is this waiter in the same pool as the pool running the progress
* thread? * thread?
...@@ -234,12 +263,19 @@ void margo_wait_for_finalize(margo_instance_id mid) ...@@ -234,12 +263,19 @@ void margo_wait_for_finalize(margo_instance_id mid)
ABT_mutex_lock(mid->finalize_mutex); ABT_mutex_lock(mid->finalize_mutex);
mid->finalize_waiters_in_progress_pool += in_pool; mid->finalize_waiters_in_progress_pool += in_pool;
mid->refcount++;
while(!mid->finalize_flag) while(!mid->finalize_flag)
ABT_cond_wait(mid->finalize_cond, mid->finalize_mutex); ABT_cond_wait(mid->finalize_cond, mid->finalize_mutex);
mid->refcount--;
do_cleanup = mid->refcount == 0;
ABT_mutex_unlock(mid->finalize_mutex); ABT_mutex_unlock(mid->finalize_mutex);
if (do_cleanup)
margo_cleanup(mid);
return; return;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment