Commit 1d43e0dd authored by Philip Carns's avatar Philip Carns

move abt pool creation outside of margo

parent 521d75fa
......@@ -39,6 +39,8 @@ int main(int argc, char **argv)
ABT_xstream xstream;
ABT_pool pool;
margo_instance_id mid;
ABT_xstream progress_xstream;
ABT_pool progress_pool;
ret = ABT_init(argc, argv);
if(ret != 0)
......@@ -69,10 +71,20 @@ int main(int argc, char **argv)
return(-1);
}
/* create a dedicated ES drive Mercury progress */
ret = ABT_snoozer_xstream_create(1, &progress_pool, &progress_xstream);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_snoozer_xstream_create()\n");
return(-1);
}
/* initialize
* note: address here is really just being used to identify transport
* note: the handler_pool is NULL because this is a client and is not
* expected to run rpc handlers.
*/
mid = margo_init(NA_FALSE, "tcp://localhost:1234");
mid = margo_init(NA_FALSE, "tcp://localhost:1234", progress_pool, ABT_POOL_NULL);
/* register RPC */
my_rpc_id = my_rpc_register(mid);
......@@ -114,6 +126,10 @@ int main(int argc, char **argv)
}
margo_finalize(mid);
ABT_xstream_join(progress_xstream);
ABT_xstream_free(&progress_xstream);
ABT_finalize();
return(0);
......
......@@ -23,6 +23,10 @@ int main(int argc, char **argv)
ABT_eventual eventual;
int *shutdown;
margo_instance_id mid;
ABT_xstream handler_xstream;
ABT_pool handler_pool;
ABT_xstream progress_xstream;
ABT_pool progress_pool;
ret = ABT_init(argc, argv);
if(ret != 0)
......@@ -39,7 +43,29 @@ int main(int argc, char **argv)
return(-1);
}
mid = margo_init(NA_TRUE, "tcp://localhost:1234");
/* Find primary pool to use for running rpc handlers */
ret = ABT_xstream_self(&handler_xstream);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_xstream_self()\n");
return(-1);
}
ret = ABT_xstream_get_main_pools(handler_xstream, 1, &handler_pool);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_xstream_get_main_pools()\n");
return(-1);
}
/* create a dedicated ES drive Mercury progress */
ret = ABT_snoozer_xstream_create(1, &progress_pool, &progress_xstream);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_snoozer_xstream_create()\n");
return(-1);
}
mid = margo_init(NA_TRUE, "tcp://localhost:1234", progress_pool, handler_pool);
/* register RPC */
my_rpc_register(mid);
......@@ -55,6 +81,10 @@ int main(int argc, char **argv)
ABT_eventual_wait(eventual, (void**)&shutdown);
margo_finalize(mid);
ABT_xstream_join(progress_xstream);
ABT_xstream_free(&progress_xstream);
ABT_finalize();
return(0);
......
......@@ -35,7 +35,7 @@ typedef struct margo_instance* margo_instance_id;
* @param [in] local_addr address to listen on if listen is set
* @returns margo instance id on success, NULL upon error
*/
margo_instance_id margo_init(na_bool_t listen, const char* local_addr);
margo_instance_id margo_init(na_bool_t listen, const char* local_addr, ABT_pool progress_pool, ABT_pool handler_pool);
/**
* Shuts down margo library and its underlying evfibers and mercury resources
......@@ -52,7 +52,7 @@ hg_class_t* margo_get_class(margo_instance_id mid);
* Retrieve the ABT pool associated with the main caller (whoever invoked the
* init function); this is where margo will execute RPC handlers.
*/
ABT_pool* margo_get_main_pool(margo_instance_id mid);
ABT_pool* margo_get_handler_pool(margo_instance_id mid);
/**
* Lookup the Mercury/NA address associated with the given string
......@@ -126,7 +126,7 @@ static hg_return_t __name##_handler(hg_handle_t handle) { \
*__handle = handle; \
__hgi = HG_Get_info(handle); \
__mid = margo_hg_class_to_instance(__hgi->hg_class); \
__pool = margo_get_main_pool(__mid); \
__pool = margo_get_handler_pool(__mid); \
__ret = ABT_thread_create(*__pool, __name, __handle, ABT_THREAD_ATTR_NULL, NULL); \
if(__ret != 0) { \
return(HG_NOMEM_ERROR); \
......
......@@ -15,15 +15,18 @@
struct margo_instance
{
/* not needed */
na_class_t *network_class;
na_context_t *na_context;
/* provided by caller */
hg_context_t *hg_context;
hg_class_t *hg_class;
ABT_pool handler_pool;
ABT_pool progress_pool;
ABT_thread hg_progress_tid;
int hg_progress_shutdown_flag;
ABT_pool main_pool;
ABT_pool engine_pool;
ABT_xstream engine_xstream;
int table_index;
};
......@@ -47,10 +50,9 @@ struct handler_entry
struct handler_entry *next;
};
margo_instance_id margo_init(na_bool_t listen, const char* local_addr)
margo_instance_id margo_init(na_bool_t listen, const char* local_addr, ABT_pool progress_pool, ABT_pool handler_pool)
{
int ret;
ABT_xstream xstream;
struct margo_instance *mid;
if(handler_mapping_table_size >= MAX_HANDLER_MAPPING)
......@@ -61,6 +63,9 @@ margo_instance_id margo_init(na_bool_t listen, const char* local_addr)
return(NULL);
memset(mid, 0, sizeof(*mid));
mid->progress_pool = progress_pool;
mid->handler_pool = handler_pool;
/* boilerplate HG initialization steps */
mid->network_class = NA_Initialize(local_addr, listen);
if(!mid->network_class)
......@@ -95,34 +100,7 @@ margo_instance_id margo_init(na_bool_t listen, const char* local_addr)
return(NULL);
}
/* get the primary pool for the caller, this is where we will run ULTs to
* handle incoming requests
*/
ret = ABT_xstream_self(&xstream);
if(ret != 0)
{
/* TODO: err handling */
fprintf(stderr, "Error: ABT_xstream_self()\n");
return(NULL);
}
ret = ABT_xstream_get_main_pools(xstream, 1, &mid->main_pool);
if(ret != 0)
{
/* TODO: err handling */
fprintf(stderr, "Error: ABT_xstream_get_main_pools()\n");
return(NULL);
}
/* create an ES and ULT to drive Mercury progress */
ret = ABT_snoozer_xstream_create(1, &mid->engine_pool, &mid->engine_xstream);
if(ret != 0)
{
/* TODO: err handling */
fprintf(stderr, "Error: ABT_snoozer_xstream_create()\n");
return(NULL);
}
ret = ABT_thread_create(mid->engine_pool, hg_progress_fn, mid,
ret = ABT_thread_create(mid->progress_pool, hg_progress_fn, mid,
ABT_THREAD_ATTR_NULL, &mid->hg_progress_tid);
if(ret != 0)
{
......@@ -149,8 +127,6 @@ void margo_finalize(margo_instance_id mid)
/* wait for it to shutdown cleanly */
ABT_thread_join(mid->hg_progress_tid);
ABT_thread_free(&mid->hg_progress_tid);
ABT_xstream_join(mid->engine_xstream);
ABT_xstream_free(&mid->engine_xstream);
HG_Context_destroy(mid->hg_context);
HG_Finalize(mid->hg_class);
......@@ -186,14 +162,20 @@ static void hg_progress_fn(void* foo)
return;
}
/****************************/
hg_class_t* margo_get_class(margo_instance_id mid)
{
return(mid->hg_class);
}
ABT_pool* margo_get_main_pool(margo_instance_id mid)
hg_return_t margo_create_handle(margo_instance_id mid, na_addr_t addr,
hg_id_t id, hg_handle_t *handle)
{
return(&mid->main_pool);
hg_return_t ret;
ret = HG_Create(mid->hg_class, mid->hg_context, addr, id, handle);
return ret;
}
na_return_t margo_addr_lookup(margo_instance_id mid, const char* name, na_addr_t* addr)
......@@ -204,15 +186,11 @@ na_return_t margo_addr_lookup(margo_instance_id mid, const char* name, na_addr_t
return ret;
}
/****************************/
hg_return_t margo_create_handle(margo_instance_id mid, na_addr_t addr,
hg_id_t id, hg_handle_t *handle)
ABT_pool* margo_get_handler_pool(margo_instance_id mid)
{
hg_return_t ret;
ret = HG_Create(mid->hg_class, mid->hg_context, addr, id, handle);
return ret;
return(&mid->handler_pool);
}
static hg_return_t margo_forward_cb(const struct hg_cb_info *info)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment