From 942c1e3ed768303ce8e47f10c6cfbf84d3f10edf Mon Sep 17 00:00:00 2001 From: John Jenkins Date: Mon, 2 May 2016 15:30:09 -0500 Subject: [PATCH] add refcount semantics --- src/margo.c | 70 ++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 53 insertions(+), 17 deletions(-) diff --git a/src/margo.c b/src/margo.c index 7503695..876c6db 100644 --- a/src/margo.c +++ b/src/margo.c @@ -30,6 +30,7 @@ struct margo_instance /* internal to margo for this particular instance */ ABT_thread hg_progress_tid; int hg_progress_shutdown_flag; + ABT_xstream progress_xstream; int owns_progress_pool; ABT_xstream *rpc_xstreams; int num_handler_pool_threads; @@ -37,6 +38,7 @@ struct margo_instance /* control logic for callers waiting on margo to be finalized */ int finalize_flag; int finalize_waiters_in_progress_pool; + int refcount; ABT_mutex finalize_mutex; ABT_cond finalize_cond; @@ -111,8 +113,9 @@ margo_instance_id margo_init(int use_progress_thread, int rpc_thread_count, mid = margo_init_pool(progress_pool, rpc_pool, hg_context); if (mid == MARGO_INSTANCE_NULL) goto err; - mid->owns_progress_pool = !use_progress_thread; - mid->num_handler_pool_threads = rpc_thread_count; + mid->owns_progress_pool = use_progress_thread; + mid->progress_xstream = progress_xstream; + mid->num_handler_pool_threads = rpc_thread_count < 0 ? 0 : rpc_thread_count; mid->rpc_xstreams = rpc_xstreams; return mid; @@ -155,6 +158,7 @@ margo_instance_id margo_init_pool(ABT_pool progress_pool, ABT_pool handler_pool, mid->handler_pool = handler_pool; mid->hg_class = HG_Context_get_class(hg_context); mid->hg_context = hg_context; + mid->refcount = 1; ret = margo_timer_instance_init(mid); if(ret != 0) @@ -181,9 +185,38 @@ margo_instance_id margo_init_pool(ABT_pool progress_pool, ABT_pool handler_pool, return mid; } +static void margo_cleanup(margo_instance_id mid) +{ + int i; + + margo_timer_instance_finalize(mid); + + ABT_mutex_free(&mid->finalize_mutex); + ABT_cond_free(&mid->finalize_cond); + + if (mid->owns_progress_pool) + { + ABT_xstream_join(mid->progress_xstream); + ABT_xstream_free(&mid->progress_xstream); + } + + if (mid->num_handler_pool_threads > 0) + { + for (i = 0; i < mid->num_handler_pool_threads; i++) + { + ABT_xstream_join(mid->rpc_xstreams[i]); + ABT_xstream_free(&mid->rpc_xstreams[i]); + } + free(mid->rpc_xstreams); + } + + free(mid); +} + void margo_finalize(margo_instance_id mid) { int i; + int do_cleanup; /* tell progress thread to wrap things up */ mid->hg_progress_shutdown_flag = 1; @@ -201,22 +234,17 @@ void margo_finalize(margo_instance_id mid) ABT_mutex_lock(mid->finalize_mutex); mid->finalize_flag = 1; ABT_cond_broadcast(mid->finalize_cond); - ABT_mutex_unlock(mid->finalize_mutex); - /* TODO: yuck, there is a race here if someone was really waiting for - * finalize; we can't destroy the data structures out from under them. - * We could fix this by reference counting so that the last caller - * (whether a finalize() caller or wait_for_finalize() caller) knows it - * is safe to turn off the lights on their way out. For now we just leak - * a small amount of memory. - */ -#if 0 - margo_timer_instance_finalize(mid); + mid->refcount--; + do_cleanup = mid->refcount == 0; - ABT_mutex_free(&mid->finalize_mutex); - ABT_cond_free(&mid->finalize_cond); - free(mid); -#endif + ABT_mutex_unlock(mid->finalize_mutex); + + /* if there was noone waiting on the finalize at the time of the finalize + * broadcast, then we're safe to clean up. Otherwise, let the finalizer do + * it */ + if (do_cleanup) + margo_cleanup(mid); return; } @@ -224,6 +252,7 @@ void margo_finalize(margo_instance_id mid) void margo_wait_for_finalize(margo_instance_id mid) { int in_pool = 0; + int do_cleanup; /* Is this waiter in the same pool as the pool running the progress * thread? @@ -234,12 +263,19 @@ void margo_wait_for_finalize(margo_instance_id mid) ABT_mutex_lock(mid->finalize_mutex); mid->finalize_waiters_in_progress_pool += in_pool; + mid->refcount++; while(!mid->finalize_flag) ABT_cond_wait(mid->finalize_cond, mid->finalize_mutex); + mid->refcount--; + do_cleanup = mid->refcount == 0; + ABT_mutex_unlock(mid->finalize_mutex); - + + if (do_cleanup) + margo_cleanup(mid); + return; } -- 2.26.2