Commit f83d3985 authored by Shane Snyder's avatar Shane Snyder

margo_init now inits abt and hg

parent f7c3d925
......@@ -34,6 +34,8 @@ typedef struct margo_data* margo_data_ptr;
/**
* Initializes margo library.
* @param [in] addr_str Mercury host address with port number
* @param [in] listen_flag Boolean flag to listen for incoming connections
* @param [in] use_progress_thread Boolean flag to use a dedicated thread for
* running Mercury's progress loop. If false,
* it will run in the caller's thread context.
......@@ -45,7 +47,6 @@ typedef struct margo_data* margo_data_ptr;
* of 0. A value of -1 directs Margo to use
* the same execution context as that used
* for Mercury progress.
* @param [in] hg_context
* @returns margo instance id on success, MARGO_INSTANCE_NULL upon error
*
* NOTE: Servers (processes expecting to service incoming RPC requests) must
......@@ -53,8 +54,11 @@ typedef struct margo_data* margo_data_ptr;
* call margo_wait_for_finalize() after margo_init() to relinguish control to
* Margo.
*/
margo_instance_id margo_init(int use_progress_thread, int rpc_thread_count,
hg_context_t *hg_context);
margo_instance_id margo_init(
const char *addr_str,
int listen_flag,
int use_progress_thread,
int rpc_thread_count);
/**
* Initializes margo library from given argobots and Mercury instances.
......@@ -63,7 +67,9 @@ margo_instance_id margo_init(int use_progress_thread, int rpc_thread_count,
* @param [in] hg_context Mercury context
* @returns margo instance id on success, MARGO_INSTANCE_NULL upon error
*/
margo_instance_id margo_init_pool(ABT_pool progress_pool, ABT_pool handler_pool,
margo_instance_id margo_init_pool(
ABT_pool progress_pool,
ABT_pool handler_pool,
hg_context_t *hg_context);
/**
......
......@@ -42,6 +42,7 @@ struct margo_instance
ABT_pool progress_pool;
/* internal to margo for this particular instance */
int margo_init;
ABT_thread hg_progress_tid;
int hg_progress_shutdown_flag;
ABT_xstream progress_xstream;
......@@ -78,24 +79,28 @@ static void hg_progress_fn(void* foo);
static int margo_xstream_is_in_progress_pool(margo_instance_id mid);
static void margo_rpc_data_free(void* ptr);
struct handler_entry
/* XXX: maybe instead of listen_flag, we can specify either CLIENT or SERVER mode? */
margo_instance_id margo_init(const char *addr_str, int listen_flag,
int use_progress_thread, int rpc_thread_count)
{
void* fn;
hg_handle_t handle;
struct handler_entry *next;
};
margo_instance_id margo_init(int use_progress_thread, int rpc_thread_count,
hg_context_t *hg_context)
{
struct margo_instance *mid = MARGO_INSTANCE_NULL;
ABT_xstream progress_xstream = ABT_XSTREAM_NULL;
ABT_pool progress_pool = ABT_POOL_NULL;
ABT_xstream *rpc_xstreams = NULL;
ABT_xstream rpc_xstream = ABT_XSTREAM_NULL;
ABT_pool rpc_pool = ABT_POOL_NULL;
int ret;
hg_class_t *hg_class = NULL;
hg_context_t *hg_context = NULL;
int i;
int ret;
struct margo_instance *mid = MARGO_INSTANCE_NULL;
ret = ABT_init(0, NULL); /* XXX: argc/argv not currently used by ABT ... */
if(ret != 0) goto err;
/* set primary ES to idle without polling */
/* XXX: is this right? always set snoozer scheduler on the calling xstream? */
ret = ABT_snoozer_xstream_self_set();
if(ret != 0) goto err;
if (use_progress_thread)
{
......@@ -110,29 +115,39 @@ margo_instance_id margo_init(int use_progress_thread, int rpc_thread_count,
if (ret != ABT_SUCCESS) goto err;
}
if (rpc_thread_count > 0)
if (listen_flag)
{
rpc_xstreams = malloc(rpc_thread_count * sizeof(*rpc_xstreams));
if (rpc_xstreams == NULL) goto err;
ret = ABT_snoozer_xstream_create(rpc_thread_count, &rpc_pool,
rpc_xstreams);
if (ret != ABT_SUCCESS) goto err;
}
else if (rpc_thread_count == 0)
{
ret = ABT_xstream_self(&rpc_xstream);
if (ret != ABT_SUCCESS) goto err;
ret = ABT_xstream_get_main_pools(rpc_xstream, 1, &rpc_pool);
if (ret != ABT_SUCCESS) goto err;
}
else
{
rpc_pool = progress_pool;
if (rpc_thread_count > 0)
{
rpc_xstreams = malloc(rpc_thread_count * sizeof(*rpc_xstreams));
if (rpc_xstreams == NULL) goto err;
ret = ABT_snoozer_xstream_create(rpc_thread_count, &rpc_pool,
rpc_xstreams);
if (ret != ABT_SUCCESS) goto err;
}
else if (rpc_thread_count == 0)
{
ret = ABT_xstream_self(&rpc_xstream);
if (ret != ABT_SUCCESS) goto err;
ret = ABT_xstream_get_main_pools(rpc_xstream, 1, &rpc_pool);
if (ret != ABT_SUCCESS) goto err;
}
else
{
rpc_pool = progress_pool;
}
}
hg_class = HG_Init(addr_str, listen_flag);
if(!hg_class) goto err;
hg_context = HG_Context_create(hg_class);
if(!hg_context) goto err;
mid = margo_init_pool(progress_pool, rpc_pool, hg_context);
if (mid == MARGO_INSTANCE_NULL) goto err;
mid->margo_init = 1;
mid->owns_progress_pool = use_progress_thread;
mid->progress_xstream = progress_xstream;
mid->num_handler_pool_threads = rpc_thread_count < 0 ? 0 : rpc_thread_count;
......@@ -140,6 +155,13 @@ margo_instance_id margo_init(int use_progress_thread, int rpc_thread_count,
return mid;
err:
if(mid)
{
margo_timer_instance_finalize(mid);
ABT_mutex_free(&mid->finalize_mutex);
ABT_cond_free(&mid->finalize_cond);
free(mid);
}
if (use_progress_thread && progress_xstream != ABT_XSTREAM_NULL)
{
ABT_xstream_join(progress_xstream);
......@@ -154,6 +176,11 @@ err:
}
free(rpc_xstreams);
}
if(hg_context)
HG_Context_destroy(hg_context);
if(hg_class)
HG_Finalize(hg_class);
ABT_finalize();
return MARGO_INSTANCE_NULL;
}
......@@ -164,8 +191,7 @@ margo_instance_id margo_init_pool(ABT_pool progress_pool, ABT_pool handler_pool,
struct margo_instance *mid;
mid = malloc(sizeof(*mid));
if(!mid)
return(MARGO_INSTANCE_NULL);
if(!mid) goto err;
memset(mid, 0, sizeof(*mid));
ABT_mutex_create(&mid->finalize_mutex);
......@@ -178,23 +204,21 @@ margo_instance_id margo_init_pool(ABT_pool progress_pool, ABT_pool handler_pool,
mid->refcount = 1;
ret = margo_timer_instance_init(mid);
if(ret != 0)
{
fprintf(stderr, "Error: margo_timer_instance_init()\n");
free(mid);
return(MARGO_INSTANCE_NULL);
}
if(ret != 0) goto err;
ret = ABT_thread_create(mid->progress_pool, hg_progress_fn, mid,
ABT_THREAD_ATTR_NULL, &mid->hg_progress_tid);
if(ret != 0)
if(ret != 0) goto err;
err:
if(mid)
{
fprintf(stderr, "Error: ABT_thread_create()\n");
margo_timer_instance_finalize(mid);
ABT_mutex_free(&mid->finalize_mutex);
ABT_cond_free(&mid->finalize_cond);
free(mid);
return(MARGO_INSTANCE_NULL);
}
return mid;
return MARGO_INSTANCE_NULL;
}
static void margo_cleanup(margo_instance_id mid)
......@@ -222,12 +246,20 @@ static void margo_cleanup(margo_instance_id mid)
free(mid->rpc_xstreams);
}
if (mid->margo_init)
{
if (mid->hg_context)
HG_Context_destroy(mid->hg_context);
if (mid->hg_class)
HG_Finalize(mid->hg_class);
ABT_finalize();
}
free(mid);
}
void margo_finalize(margo_instance_id mid)
{
int i;
int do_cleanup;
/* tell progress thread to wrap things up */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment