Commit 1ae1855d authored by Matthieu Dorier's avatar Matthieu Dorier

should now be possible to call margo_init multiple times

parent 065632ad
......@@ -23,6 +23,15 @@
#define DEFAULT_MERCURY_PROGRESS_TIMEOUT_UB 100 /* 100 milliseconds */
#define DEFAULT_MERCURY_HANDLE_CACHE_SIZE 32
/* If margo is initializing ABT, we need to track how many instances of margo
* are being created, so that the last one can call ABT_finalize.
* If margo initializes ABT, g_num_margo_instances_mtx will be created, so
* in later calls and in margo_cleanup we can check for g_num_margo_instances_mtx != ABT_MUTEX_NULL
* to know if we should do something to cleanup ABT as well.
*/
static int g_num_margo_instances = 0; // how many margo instances exist
static ABT_mutex g_num_margo_instances_mtx = ABT_MUTEX_NULL; // mutex for above global variable
struct diag_data
{
double min;
......@@ -66,7 +75,6 @@ struct margo_instance
/* internal to margo for this particular instance */
int margo_init;
int abt_init;
ABT_thread hg_progress_tid;
int hg_progress_shutdown_flag;
ABT_xstream progress_xstream;
......@@ -229,7 +237,6 @@ margo_instance_id margo_init_opt(const char *addr_str, int mode, const struct hg
hg_class_t *hg_class = NULL;
hg_context_t *hg_context = NULL;
int listen_flag = (mode == MARGO_CLIENT_MODE) ? HG_FALSE : HG_TRUE;
int abt_init = 0;
int i;
int ret;
struct margo_instance *mid = MARGO_INSTANCE_NULL;
......@@ -243,7 +250,8 @@ margo_instance_id margo_init_opt(const char *addr_str, int mode, const struct hg
{
ret = ABT_init(0, NULL); /* XXX: argc/argv not currently used by ABT ... */
if(ret != 0) goto err;
abt_init = 1;
ret = ABT_mutex_create(&g_num_margo_instances_mtx);
if(ret != 0) goto err;
}
/* set caller (self) ES to sleep when idle by using sched_wait */
......@@ -253,7 +261,10 @@ margo_instance_id margo_init_opt(const char *addr_str, int mode, const struct hg
ret = ABT_xstream_self(&self_xstream);
if(ret != ABT_SUCCESS) goto err;
ret = ABT_xstream_set_main_sched(self_xstream, self_sched);
if(ret != ABT_SUCCESS) goto err;
if(ret != ABT_SUCCESS) {
// best effort
ABT_sched_free(&self_sched);
}
if (use_progress_thread)
{
......@@ -312,7 +323,6 @@ margo_instance_id margo_init_opt(const char *addr_str, int mode, const struct hg
if (mid == MARGO_INSTANCE_NULL) goto err;
mid->margo_init = 1;
mid->abt_init = abt_init;
mid->owns_progress_pool = use_progress_thread;
mid->progress_xstream = progress_xstream;
mid->num_handler_pool_threads = rpc_thread_count < 0 ? 0 : rpc_thread_count;
......@@ -347,8 +357,11 @@ err:
HG_Context_destroy(hg_context);
if(hg_class)
HG_Finalize(hg_class);
if(abt_init)
if(g_num_margo_instances_mtx != ABT_MUTEX_NULL && g_num_margo_instances == 0) {
ABT_mutex_free(&g_num_margo_instances_mtx);
g_num_margo_instances_mtx = ABT_MUTEX_NULL;
ABT_finalize();
}
return MARGO_INSTANCE_NULL;
}
......@@ -394,6 +407,13 @@ margo_instance_id margo_init_pool(ABT_pool progress_pool, ABT_pool handler_pool,
mid->shutdown_rpc_id = MARGO_REGISTER(mid, "__shutdown__",
void, margo_shutdown_out_t, remote_shutdown_ult);
/* increment the number of margo instances */
if(g_num_margo_instances_mtx == ABT_MUTEX_NULL)
ABT_mutex_create(&g_num_margo_instances_mtx);
ABT_mutex_lock(g_num_margo_instances_mtx);
g_num_margo_instances += 1;
ABT_mutex_unlock(g_num_margo_instances_mtx);
return mid;
err:
......@@ -453,8 +473,19 @@ static void margo_cleanup(margo_instance_id mid)
HG_Context_destroy(mid->hg_context);
if (mid->hg_class)
HG_Finalize(mid->hg_class);
if (mid->abt_init)
ABT_finalize();
if(g_num_margo_instances_mtx != ABT_MUTEX_NULL) {
ABT_mutex_lock(g_num_margo_instances_mtx);
g_num_margo_instances -= 1;
if(g_num_margo_instances > 0) {
ABT_mutex_unlock(g_num_margo_instances_mtx);
} else {
ABT_mutex_unlock(g_num_margo_instances_mtx);
ABT_mutex_free(&g_num_margo_instances_mtx);
g_num_margo_instances_mtx = ABT_MUTEX_NULL;
ABT_finalize();
}
}
}
free(mid);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment