Commit b0f63518 authored by Jonathan Jenkins's avatar Jonathan Jenkins

add non-abt initializer

parent bfc83943
......@@ -122,7 +122,7 @@ int main(int argc, char **argv)
* because this is a pure client that will not be servicing rpc requests.
*/
/***************************************/
mid = margo_init(progress_pool, ABT_POOL_NULL, hg_context);
mid = margo_init_pool(progress_pool, ABT_POOL_NULL, hg_context);
/* register RPC */
my_rpc_id = MERCURY_REGISTER(hg_class, "my_rpc", my_rpc_in_t, my_rpc_out_t,
......
......@@ -122,7 +122,7 @@ int main(int argc, char **argv)
* because this is a pure client that will not be servicing rpc requests.
*/
/***************************************/
mid = margo_init(progress_pool, ABT_POOL_NULL, hg_context);
mid = margo_init_pool(progress_pool, ABT_POOL_NULL, hg_context);
/* register RPC */
my_rpc_id = MERCURY_REGISTER(hg_class, "my_rpc", my_rpc_in_t, my_rpc_out_t,
......
......@@ -89,7 +89,7 @@ int main(int argc, char **argv)
* communication.
*/
/***************************************/
mid = margo_init(handler_pool, handler_pool, hg_context);
mid = margo_init_pool(handler_pool, handler_pool, hg_context);
assert(mid);
/* register RPC */
......
......@@ -113,9 +113,9 @@ int main(int argc, char **argv)
*/
/***************************************/
if(single_pool_mode)
mid = margo_init(handler_pool, handler_pool, hg_context);
mid = margo_init_pool(handler_pool, handler_pool, hg_context);
else
mid = margo_init(progress_pool, handler_pool, hg_context);
mid = margo_init_pool(progress_pool, handler_pool, hg_context);
assert(mid);
/* register RPC */
......
......@@ -103,7 +103,7 @@ int main(int argc, char **argv)
/* use a single pool for progress and sleeper threads */
/* NOTE: we don't use RPC handlers, so no need for an RPC pool */
/***************************************/
mid = margo_init(pool, ABT_POOL_NULL, hg_context);
mid = margo_init_pool(pool, ABT_POOL_NULL, hg_context);
for(i=0; i<4; i++)
{
t_ids[i] = i;
......
......@@ -22,6 +22,31 @@ typedef struct margo_instance* margo_instance_id;
#define MARGO_INSTANCE_NULL ((margo_instance_id)NULL)
/**
* Initializes margo library.
* @param [in] use_progress_thread Boolean flag to use a dedicated thread for
* running Mercury's progress loop. If false,
* will run in the caller's thread context -
* the caller is then expected to call
* margo_wait_for_finalize in order to
* relinquish control to the progress loop
* @param [in] rpc_thread_count Number of threads to use for running RPC
* calls. A value of 0 directs Margo to execute
* RPCs in the caller's thread context - the
* caller is then expected to call
* margo_wait_for_finalize in order to
* relinquish control to the RPC runner.
* Non-RPC users should use a value of 0. A
* value of -1 directs Margo to use the same
* execution context as that used for Mercury
* progress.
* @param [in] hg_context
* @returns margo instance id on success, MARGO_INSTANCE_NULL upon error
*/
margo_instance_id margo_init(int use_progress_thread, int rpc_thread_count,
hg_context_t *hg_context);
/**
* Initializes margo library from given argobots and Mercury instances.
* @param [in] progress_pool Argobots pool to drive communication
......@@ -29,7 +54,7 @@ typedef struct margo_instance* margo_instance_id;
* @param [in] hg_context Mercury context
* @returns margo instance id on success, MARGO_INSTANCE_NULL upon error
*/
margo_instance_id margo_init(ABT_pool progress_pool, ABT_pool handler_pool,
margo_instance_id margo_init_pool(ABT_pool progress_pool, ABT_pool handler_pool,
hg_context_t *hg_context);
/**
......
......@@ -30,6 +30,9 @@ struct margo_instance
/* internal to margo for this particular instance */
ABT_thread hg_progress_tid;
int hg_progress_shutdown_flag;
int owns_progress_pool;
ABT_xstream *rpc_xstreams;
int num_handler_pool_threads;
/* control logic for callers waiting on margo to be finalized */
int finalize_flag;
......@@ -60,7 +63,78 @@ struct handler_entry
struct handler_entry *next;
};
margo_instance_id margo_init(ABT_pool progress_pool, ABT_pool handler_pool,
margo_instance_id margo_init(int use_progress_thread, int rpc_thread_count,
hg_context_t *hg_context)
{
struct margo_instance *mid = MARGO_INSTANCE_NULL;
ABT_xstream progress_xstream = ABT_XSTREAM_NULL;
ABT_pool progress_pool = ABT_POOL_NULL;
ABT_xstream *rpc_xstreams = NULL;
ABT_xstream rpc_xstream = ABT_XSTREAM_NULL;
ABT_pool rpc_pool = ABT_POOL_NULL;
int ret;
int i;
if (use_progress_thread)
{
ret = ABT_snoozer_xstream_create(1, &progress_pool, &progress_xstream);
if (ret != ABT_SUCCESS) goto err;
}
else
{
ret = ABT_xstream_self(&progress_xstream);
if (ret != ABT_SUCCESS) goto err;
ret = ABT_xstream_get_main_pools(progress_xstream, 1, &progress_pool);
if (ret != ABT_SUCCESS) goto err;
}
if (rpc_thread_count > 0)
{
rpc_xstreams = malloc(rpc_thread_count * sizeof(*rpc_xstreams));
if (rpc_xstreams == NULL) goto err;
ret = ABT_snoozer_xstream_create(rpc_thread_count, &rpc_pool,
rpc_xstreams);
if (ret != ABT_SUCCESS) goto err;
}
else if (rpc_thread_count == 0)
{
ret = ABT_xstream_self(&rpc_xstream);
if (ret != ABT_SUCCESS) goto err;
ret = ABT_xstream_get_main_pools(rpc_xstream, 1, &rpc_pool);
if (ret != ABT_SUCCESS) goto err;
}
else
{
rpc_pool = progress_pool;
}
mid = margo_init_pool(progress_pool, rpc_pool, hg_context);
if (mid == MARGO_INSTANCE_NULL) goto err;
mid->owns_progress_pool = !use_progress_thread;
mid->num_handler_pool_threads = rpc_thread_count;
mid->rpc_xstreams = rpc_xstreams;
return mid;
err:
if (use_progress_thread && progress_xstream != ABT_XSTREAM_NULL)
{
ABT_xstream_join(progress_xstream);
ABT_xstream_free(&progress_xstream);
}
if (rpc_thread_count > 0 && rpc_xstreams != NULL)
{
for (i = 0; i < rpc_thread_count; i++)
{
ABT_xstream_join(rpc_xstreams[i]);
ABT_xstream_free(&rpc_xstreams[i]);
}
free(rpc_xstreams);
}
return MARGO_INSTANCE_NULL;
}
margo_instance_id margo_init_pool(ABT_pool progress_pool, ABT_pool handler_pool,
hg_context_t *hg_context)
{
int ret;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment