Commit f6d9632d authored by Philip Carns's avatar Philip Carns
Browse files

convert from pthread to ES for HG engine

- fixes #2
parent d4bb2507
...@@ -6,10 +6,10 @@ ...@@ -6,10 +6,10 @@
*/ */
#include <assert.h> #include <assert.h>
#include <pthread.h>
#include <unistd.h> #include <unistd.h>
#include <errno.h> #include <errno.h>
#include <abt.h> #include <abt.h>
#include <abt-snoozer.h>
#include "margo.h" #include "margo.h"
...@@ -18,11 +18,13 @@ static na_context_t *na_context = NULL; ...@@ -18,11 +18,13 @@ static na_context_t *na_context = NULL;
static hg_context_t *hg_context = NULL; static hg_context_t *hg_context = NULL;
static hg_class_t *hg_class = NULL; static hg_class_t *hg_class = NULL;
static pthread_t hg_progress_tid; static ABT_thread hg_progress_tid;
static int hg_progress_shutdown_flag = 0; static int hg_progress_shutdown_flag = 0;
static void* hg_progress_fn(void* foo); static void hg_progress_fn(void* foo);
static ABT_pool main_pool; static ABT_pool main_pool;
static ABT_pool engine_pool;
static ABT_xstream engine_xstream;
struct handler_entry struct handler_entry
{ {
...@@ -86,14 +88,20 @@ int margo_init(na_bool_t listen, const char* local_addr) ...@@ -86,14 +88,20 @@ int margo_init(na_bool_t listen, const char* local_addr)
return(-1); return(-1);
} }
/* start up thread to drive progress */ /* create an ES and ULT to drive Mercury progress */
ret = pthread_create(&hg_progress_tid, NULL, hg_progress_fn, NULL); ret = ABT_snoozer_xstream_create(&engine_pool, &engine_xstream);
if(ret != 0) if(ret != 0)
{ {
HG_Context_destroy(hg_context); /* TODO: err handling */
HG_Finalize(hg_class); fprintf(stderr, "Error: ABT_snoozer_xstream_create()\n");
NA_Context_destroy(network_class, na_context); return(-1);
NA_Finalize(network_class); }
ret = ABT_thread_create(engine_pool, hg_progress_fn, NULL,
ABT_THREAD_ATTR_NULL, &hg_progress_tid);
if(ret != 0)
{
/* TODO: err handling */
fprintf(stderr, "Error: ABT_thread_create()\n");
return(-1); return(-1);
} }
...@@ -102,14 +110,14 @@ int margo_init(na_bool_t listen, const char* local_addr) ...@@ -102,14 +110,14 @@ int margo_init(na_bool_t listen, const char* local_addr)
void margo_finalize(void) void margo_finalize(void)
{ {
int ret;
/* tell progress thread to wrap things up */ /* tell progress thread to wrap things up */
hg_progress_shutdown_flag = 1; hg_progress_shutdown_flag = 1;
/* wait for it to shutdown cleanly */ /* wait for it to shutdown cleanly */
ret = pthread_join(hg_progress_tid, NULL); ABT_thread_join(hg_progress_tid);
assert(ret == 0); ABT_thread_free(&hg_progress_tid);
ABT_xstream_join(engine_xstream);
ABT_xstream_free(&engine_xstream);
HG_Context_destroy(hg_context); HG_Context_destroy(hg_context);
HG_Finalize(hg_class); HG_Finalize(hg_class);
...@@ -120,7 +128,7 @@ void margo_finalize(void) ...@@ -120,7 +128,7 @@ void margo_finalize(void)
} }
/* dedicated thread function to drive Mercury progress */ /* dedicated thread function to drive Mercury progress */
static void* hg_progress_fn(void* foo) static void hg_progress_fn(void* foo)
{ {
int ret; int ret;
unsigned int actual_count; unsigned int actual_count;
...@@ -135,7 +143,7 @@ static void* hg_progress_fn(void* foo) ...@@ -135,7 +143,7 @@ static void* hg_progress_fn(void* foo)
HG_Progress(hg_class, hg_context, 100); HG_Progress(hg_class, hg_context, 100);
} }
return(NULL); return;
} }
hg_class_t* margo_get_class(void) hg_class_t* margo_get_class(void)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment