Commit 9485e38e authored by Matthieu Dorier's avatar Matthieu Dorier

integrating latest changes from master

parent e791b15a
...@@ -71,6 +71,7 @@ struct margo_instance ...@@ -71,6 +71,7 @@ struct margo_instance
/* internal to margo for this particular instance */ /* internal to margo for this particular instance */
int margo_init; int margo_init;
int abt_init;
ABT_thread hg_progress_tid; ABT_thread hg_progress_tid;
int hg_progress_shutdown_flag; int hg_progress_shutdown_flag;
ABT_xstream progress_xstream; ABT_xstream progress_xstream;
...@@ -134,14 +135,19 @@ margo_instance_id margo_init(const char *addr_str, int mode, ...@@ -134,14 +135,19 @@ margo_instance_id margo_init(const char *addr_str, int mode,
hg_class_t *hg_class = NULL; hg_class_t *hg_class = NULL;
hg_context_t *hg_context = NULL; hg_context_t *hg_context = NULL;
int listen_flag = (mode == MARGO_CLIENT_MODE) ? HG_FALSE : HG_TRUE; int listen_flag = (mode == MARGO_CLIENT_MODE) ? HG_FALSE : HG_TRUE;
int abt_init = 0;
int i; int i;
int ret; int ret;
struct margo_instance *mid = MARGO_INSTANCE_NULL; struct margo_instance *mid = MARGO_INSTANCE_NULL;
if(mode != MARGO_CLIENT_MODE && mode != MARGO_SERVER_MODE) goto err; if(mode != MARGO_CLIENT_MODE && mode != MARGO_SERVER_MODE) goto err;
ret = ABT_init(0, NULL); /* XXX: argc/argv not currently used by ABT ... */ if (ABT_initialized() == ABT_ERR_UNINITIALIZED)
if(ret != 0) goto err; {
ret = ABT_init(0, NULL); /* XXX: argc/argv not currently used by ABT ... */
if(ret != 0) goto err;
abt_init = 1;
}
/* set caller (self) ES to idle without polling */ /* set caller (self) ES to idle without polling */
#ifdef HAVE_ABT_SNOOZER #ifdef HAVE_ABT_SNOOZER
...@@ -209,6 +215,7 @@ margo_instance_id margo_init(const char *addr_str, int mode, ...@@ -209,6 +215,7 @@ margo_instance_id margo_init(const char *addr_str, int mode,
if (mid == MARGO_INSTANCE_NULL) goto err; if (mid == MARGO_INSTANCE_NULL) goto err;
mid->margo_init = 1; mid->margo_init = 1;
mid->abt_init = abt_init;
mid->owns_progress_pool = use_progress_thread; mid->owns_progress_pool = use_progress_thread;
mid->progress_xstream = progress_xstream; mid->progress_xstream = progress_xstream;
mid->num_handler_pool_threads = rpc_thread_count < 0 ? 0 : rpc_thread_count; mid->num_handler_pool_threads = rpc_thread_count < 0 ? 0 : rpc_thread_count;
...@@ -242,7 +249,8 @@ err: ...@@ -242,7 +249,8 @@ err:
HG_Context_destroy(hg_context); HG_Context_destroy(hg_context);
if(hg_class) if(hg_class)
HG_Finalize(hg_class); HG_Finalize(hg_class);
ABT_finalize(); if(abt_init)
ABT_finalize();
return MARGO_INSTANCE_NULL; return MARGO_INSTANCE_NULL;
} }
...@@ -325,7 +333,8 @@ static void margo_cleanup(margo_instance_id mid) ...@@ -325,7 +333,8 @@ static void margo_cleanup(margo_instance_id mid)
HG_Context_destroy(mid->hg_context); HG_Context_destroy(mid->hg_context);
if (mid->hg_class) if (mid->hg_class)
HG_Finalize(mid->hg_class); HG_Finalize(mid->hg_class);
ABT_finalize(); if (mid->abt_init)
ABT_finalize();
} }
free(mid); free(mid);
...@@ -780,22 +789,6 @@ hg_return_t margo_bulk_deserialize( ...@@ -780,22 +789,6 @@ hg_return_t margo_bulk_deserialize(
return(HG_Bulk_deserialize(mid->hg_class, handle, buf, buf_size)); return(HG_Bulk_deserialize(mid->hg_class, handle, buf, buf_size));
} }
/* TODO: currently identical to a vanilla margo_cb -- consider reusing that */
/* Done, we are using margo_cb now */
#if 0
static hg_return_t margo_bulk_transfer_cb(const struct hg_cb_info *info)
{
hg_return_t hret = info->ret;
struct margo_cb_arg* arg = info->arg;
/* propagate return code out through eventual */
ABT_eventual_set(arg->eventual, &hret, sizeof(hret));
return(HG_SUCCESS);
}
#endif
hg_return_t margo_bulk_transfer( hg_return_t margo_bulk_transfer(
margo_instance_id mid, margo_instance_id mid,
hg_bulk_op_t op, hg_bulk_op_t op,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment