Commit eea77117 authored by Shane Snyder's avatar Shane Snyder
Browse files

update finalize_waiters when threads sleep/wake

parent 7432a2f0
...@@ -198,6 +198,8 @@ static void hg_progress_fn(void* foo) ...@@ -198,6 +198,8 @@ static void hg_progress_fn(void* foo)
if(!mid->hg_progress_shutdown_flag) if(!mid->hg_progress_shutdown_flag)
{ {
ABT_mutex_lock(mid->finalize_mutex);
ABT_pool_get_total_size(mid->progress_pool, &size); ABT_pool_get_total_size(mid->progress_pool, &size);
/* Are there any other threads executing in this pool that are *not* /* Are there any other threads executing in this pool that are *not*
* blocked on margo_wait_for_finalize()? If so then, we can't * blocked on margo_wait_for_finalize()? If so then, we can't
...@@ -206,12 +208,13 @@ static void hg_progress_fn(void* foo) ...@@ -206,12 +208,13 @@ static void hg_progress_fn(void* foo)
*/ */
if(size > mid->finalize_waiters_in_progress_pool) if(size > mid->finalize_waiters_in_progress_pool)
{ {
ABT_mutex_unlock(mid->finalize_mutex);
HG_Progress(mid->hg_context, 0); HG_Progress(mid->hg_context, 0);
ABT_thread_yield(); ABT_thread_yield();
} }
else else
{ {
printf("sleep\n"); ABT_mutex_unlock(mid->finalize_mutex);
HG_Progress(mid->hg_context, 100); HG_Progress(mid->hg_context, 100);
} }
} }
...@@ -465,6 +468,7 @@ hg_return_t margo_bulk_transfer( ...@@ -465,6 +468,7 @@ hg_return_t margo_bulk_transfer(
typedef struct typedef struct
{ {
margo_instance_id mid;
ABT_mutex mutex; ABT_mutex mutex;
ABT_cond cond; ABT_cond cond;
int is_asleep; int is_asleep;
...@@ -475,6 +479,11 @@ static void margo_thread_sleep_cb(void *arg) ...@@ -475,6 +479,11 @@ static void margo_thread_sleep_cb(void *arg)
margo_thread_sleep_cb_dat *sleep_cb_dat = margo_thread_sleep_cb_dat *sleep_cb_dat =
(margo_thread_sleep_cb_dat *)arg; (margo_thread_sleep_cb_dat *)arg;
/* decrement number of waiting threads */
ABT_mutex_lock(sleep_cb_dat->mid->finalize_mutex);
sleep_cb_dat->mid->finalize_waiters_in_progress_pool--;
ABT_mutex_unlock(sleep_cb_dat->mid->finalize_mutex);
/* wake up the sleeping thread */ /* wake up the sleeping thread */
ABT_mutex_lock(sleep_cb_dat->mutex); ABT_mutex_lock(sleep_cb_dat->mutex);
sleep_cb_dat->is_asleep = 0; sleep_cb_dat->is_asleep = 0;
...@@ -492,6 +501,7 @@ void margo_thread_sleep( ...@@ -492,6 +501,7 @@ void margo_thread_sleep(
margo_thread_sleep_cb_dat sleep_cb_dat; margo_thread_sleep_cb_dat sleep_cb_dat;
/* set data needed for sleep callback */ /* set data needed for sleep callback */
sleep_cb_dat.mid = mid;
ABT_mutex_create(&(sleep_cb_dat.mutex)); ABT_mutex_create(&(sleep_cb_dat.mutex));
ABT_cond_create(&(sleep_cb_dat.cond)); ABT_cond_create(&(sleep_cb_dat.cond));
sleep_cb_dat.is_asleep = 1; sleep_cb_dat.is_asleep = 1;
...@@ -500,6 +510,11 @@ void margo_thread_sleep( ...@@ -500,6 +510,11 @@ void margo_thread_sleep(
margo_timer_init(mid, &sleep_timer, margo_thread_sleep_cb, margo_timer_init(mid, &sleep_timer, margo_thread_sleep_cb,
&sleep_cb_dat, timeout_ms); &sleep_cb_dat, timeout_ms);
/* increment number of waiting threads */
ABT_mutex_lock(mid->finalize_mutex);
mid->finalize_waiters_in_progress_pool++;
ABT_mutex_unlock(mid->finalize_mutex);
/* yield thread for specified timeout */ /* yield thread for specified timeout */
ABT_mutex_lock(sleep_cb_dat.mutex); ABT_mutex_lock(sleep_cb_dat.mutex);
while(sleep_cb_dat.is_asleep) while(sleep_cb_dat.is_asleep)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment