Commit 4a4109b1 authored by Matthieu Dorier's avatar Matthieu Dorier

removed waiters_in_progress_pool issue

parent d914cdeb
...@@ -51,7 +51,6 @@ struct margo_instance ...@@ -51,7 +51,6 @@ struct margo_instance
/* control logic for callers waiting on margo to be finalized */ /* control logic for callers waiting on margo to be finalized */
int finalize_flag; int finalize_flag;
int waiters_in_progress_pool;
int refcount; int refcount;
ABT_mutex finalize_mutex; ABT_mutex finalize_mutex;
ABT_cond finalize_cond; ABT_cond finalize_cond;
...@@ -64,7 +63,6 @@ struct margo_cb_arg ...@@ -64,7 +63,6 @@ struct margo_cb_arg
{ {
ABT_eventual *eventual; ABT_eventual *eventual;
margo_instance_id mid; margo_instance_id mid;
char in_pool;
}; };
struct margo_rpc_data struct margo_rpc_data
...@@ -257,18 +255,10 @@ void margo_finalize(margo_instance_id mid) ...@@ -257,18 +255,10 @@ void margo_finalize(margo_instance_id mid)
void margo_wait_for_finalize(margo_instance_id mid) void margo_wait_for_finalize(margo_instance_id mid)
{ {
int in_pool = 0;
int do_cleanup; int do_cleanup;
/* Is this waiter in the same pool as the pool running the progress
* thread?
*/
if(margo_xstream_is_in_progress_pool(mid))
in_pool = 1;
ABT_mutex_lock(mid->finalize_mutex); ABT_mutex_lock(mid->finalize_mutex);
mid->waiters_in_progress_pool += in_pool;
mid->refcount++; mid->refcount++;
while(!mid->finalize_flag) while(!mid->finalize_flag)
...@@ -308,16 +298,15 @@ static void hg_progress_fn(void* foo) ...@@ -308,16 +298,15 @@ static void hg_progress_fn(void* foo)
if(trigger_happened) if(trigger_happened)
ABT_thread_yield(); ABT_thread_yield();
ABT_pool_get_total_size(mid->progress_pool, &size); ABT_pool_get_size(mid->progress_pool, &size);
/* Are there any other threads executing in this pool that are *not* /* Are there any other threads executing in this pool that are *not*
* blocked on margo_wait_for_finalize()? If so then, we can't * blocked ? If so then, we can't sleep here or else those threads
* sleep here or else those threads will not get a chance to * will not get a chance to execute.
* execute. * TODO: check is ABT_pool_get_size returns the number of ULT/tasks
* that can be executed including this one, or not including this one.
*/ */
if(size > mid->waiters_in_progress_pool) if(size > 0)
{ {
//printf("DEBUG: Margo progress function running while other ULTs are eligible for execution (size: %d, waiters: %d.\n", size, mid->waiters_in_progress_pool);
/* TODO: this is being executed more than is necessary (i.e. /* TODO: this is being executed more than is necessary (i.e.
* in cases where there are other legitimate ULTs eligible * in cases where there are other legitimate ULTs eligible
* for execution that are not blocking on any events, Margo * for execution that are not blocking on any events, Margo
...@@ -433,11 +422,6 @@ static hg_return_t margo_cb(const struct hg_cb_info *info) ...@@ -433,11 +422,6 @@ static hg_return_t margo_cb(const struct hg_cb_info *info)
/* propagate return code out through eventual */ /* propagate return code out through eventual */
ABT_eventual_set(*(arg->eventual), &hret, sizeof(hret)); ABT_eventual_set(*(arg->eventual), &hret, sizeof(hret));
#if 0
if(arg->in_pool)
arg->mid->waiters_in_progress_pool--;
#endif
return(HG_SUCCESS); return(HG_SUCCESS);
} }
...@@ -483,15 +467,7 @@ hg_return_t margo_forward_timed( ...@@ -483,15 +467,7 @@ hg_return_t margo_forward_timed(
arg.eventual = &eventual; arg.eventual = &eventual;
arg.mid = mid; arg.mid = mid;
#if 0
if(margo_xstream_is_in_progress_pool(mid))
{
arg.in_pool = 1;
mid->waiters_in_progress_pool++;
}
else
arg.in_pool = 0;
#endif
hret = HG_Forward(handle, margo_cb, &arg, in_struct); hret = HG_Forward(handle, margo_cb, &arg, in_struct);
if(hret == 0) if(hret == 0)
{ {
...@@ -532,15 +508,7 @@ hg_return_t margo_forward( ...@@ -532,15 +508,7 @@ hg_return_t margo_forward(
arg.eventual = &eventual; arg.eventual = &eventual;
arg.mid = mid; arg.mid = mid;
#if 0
if(margo_xstream_is_in_progress_pool(mid))
{
arg.in_pool = 1;
mid->waiters_in_progress_pool++;
}
else
arg.in_pool = 0;
#endif
hret = HG_Forward(handle, margo_cb, &arg, in_struct); hret = HG_Forward(handle, margo_cb, &arg, in_struct);
if(hret == 0) if(hret == 0)
{ {
...@@ -572,15 +540,7 @@ hg_return_t margo_respond( ...@@ -572,15 +540,7 @@ hg_return_t margo_respond(
arg.eventual = &eventual; arg.eventual = &eventual;
arg.mid = mid; arg.mid = mid;
#if 0
if(margo_xstream_is_in_progress_pool(mid))
{
arg.in_pool = 1;
mid->waiters_in_progress_pool++;
}
else
arg.in_pool = 0;
#endif
hret = HG_Respond(handle, margo_cb, &arg, out_struct); hret = HG_Respond(handle, margo_cb, &arg, out_struct);
if(hret == 0) if(hret == 0)
{ {
...@@ -602,9 +562,6 @@ static hg_return_t margo_bulk_transfer_cb(const struct hg_cb_info *info) ...@@ -602,9 +562,6 @@ static hg_return_t margo_bulk_transfer_cb(const struct hg_cb_info *info)
/* propagate return code out through eventual */ /* propagate return code out through eventual */
ABT_eventual_set(*(arg->eventual), &hret, sizeof(hret)); ABT_eventual_set(*(arg->eventual), &hret, sizeof(hret));
if(arg->in_pool)
arg->mid->waiters_in_progress_pool--;
return(HG_SUCCESS); return(HG_SUCCESS);
} }
...@@ -624,11 +581,6 @@ static hg_return_t margo_addr_lookup_cb(const struct hg_cb_info *info) ...@@ -624,11 +581,6 @@ static hg_return_t margo_addr_lookup_cb(const struct hg_cb_info *info)
/* propagate return code out through eventual */ /* propagate return code out through eventual */
ABT_eventual_set(*(arg->eventual), &evt, sizeof(evt)); ABT_eventual_set(*(arg->eventual), &evt, sizeof(evt));
#if 0
if(arg->in_pool)
arg->mid->waiters_in_progress_pool--;
#endif
return(HG_SUCCESS); return(HG_SUCCESS);
} }
...@@ -652,15 +604,7 @@ hg_return_t margo_addr_lookup( ...@@ -652,15 +604,7 @@ hg_return_t margo_addr_lookup(
arg.eventual = &eventual; arg.eventual = &eventual;
arg.mid = mid; arg.mid = mid;
#if 0
if(margo_xstream_is_in_progress_pool(mid))
{
arg.in_pool = 1;
mid->waiters_in_progress_pool++;
}
else
arg.in_pool = 0;
#endif
nret = HG_Addr_lookup(mid->hg_context, margo_addr_lookup_cb, nret = HG_Addr_lookup(mid->hg_context, margo_addr_lookup_cb,
&arg, name, HG_OP_ID_IGNORE); &arg, name, HG_OP_ID_IGNORE);
if(nret == 0) if(nret == 0)
...@@ -699,13 +643,7 @@ hg_return_t margo_bulk_transfer( ...@@ -699,13 +643,7 @@ hg_return_t margo_bulk_transfer(
arg.eventual = &eventual; arg.eventual = &eventual;
arg.mid = mid; arg.mid = mid;
if(margo_xstream_is_in_progress_pool(mid))
{
arg.in_pool = 1;
mid->waiters_in_progress_pool++;
}
else
arg.in_pool = 0;
hret = HG_Bulk_transfer(mid->hg_context, margo_bulk_transfer_cb, hret = HG_Bulk_transfer(mid->hg_context, margo_bulk_transfer_cb,
&arg, op, origin_addr, origin_handle, origin_offset, local_handle, &arg, op, origin_addr, origin_handle, origin_offset, local_handle,
local_offset, size, HG_OP_ID_IGNORE); local_offset, size, HG_OP_ID_IGNORE);
...@@ -726,7 +664,6 @@ typedef struct ...@@ -726,7 +664,6 @@ typedef struct
ABT_mutex mutex; ABT_mutex mutex;
ABT_cond cond; ABT_cond cond;
char is_asleep; char is_asleep;
char in_pool;
} margo_thread_sleep_cb_dat; } margo_thread_sleep_cb_dat;
static void margo_thread_sleep_cb(void *arg) static void margo_thread_sleep_cb(void *arg)
...@@ -734,10 +671,6 @@ static void margo_thread_sleep_cb(void *arg) ...@@ -734,10 +671,6 @@ static void margo_thread_sleep_cb(void *arg)
margo_thread_sleep_cb_dat *sleep_cb_dat = margo_thread_sleep_cb_dat *sleep_cb_dat =
(margo_thread_sleep_cb_dat *)arg; (margo_thread_sleep_cb_dat *)arg;
/* decrement number of waiting threads */
sleep_cb_dat->mid->waiters_in_progress_pool -=
sleep_cb_dat->in_pool;
/* wake up the sleeping thread */ /* wake up the sleeping thread */
ABT_mutex_lock(sleep_cb_dat->mutex); ABT_mutex_lock(sleep_cb_dat->mutex);
sleep_cb_dat->is_asleep = 0; sleep_cb_dat->is_asleep = 0;
...@@ -751,27 +684,19 @@ void margo_thread_sleep( ...@@ -751,27 +684,19 @@ void margo_thread_sleep(
margo_instance_id mid, margo_instance_id mid,
double timeout_ms) double timeout_ms)
{ {
int in_pool = 0;
margo_timer_t sleep_timer; margo_timer_t sleep_timer;
margo_thread_sleep_cb_dat sleep_cb_dat; margo_thread_sleep_cb_dat sleep_cb_dat;
if(margo_xstream_is_in_progress_pool(mid))
in_pool = 1;
/* set data needed for sleep callback */ /* set data needed for sleep callback */
sleep_cb_dat.mid = mid; sleep_cb_dat.mid = mid;
ABT_mutex_create(&(sleep_cb_dat.mutex)); ABT_mutex_create(&(sleep_cb_dat.mutex));
ABT_cond_create(&(sleep_cb_dat.cond)); ABT_cond_create(&(sleep_cb_dat.cond));
sleep_cb_dat.is_asleep = 1; sleep_cb_dat.is_asleep = 1;
sleep_cb_dat.in_pool = in_pool;
/* initialize the sleep timer */ /* initialize the sleep timer */
margo_timer_init(mid, &sleep_timer, margo_thread_sleep_cb, margo_timer_init(mid, &sleep_timer, margo_thread_sleep_cb,
&sleep_cb_dat, timeout_ms); &sleep_cb_dat, timeout_ms);
/* increment number of waiting threads */
mid->waiters_in_progress_pool += in_pool;
/* yield thread for specified timeout */ /* yield thread for specified timeout */
ABT_mutex_lock(sleep_cb_dat.mutex); ABT_mutex_lock(sleep_cb_dat.mutex);
while(sleep_cb_dat.is_asleep) while(sleep_cb_dat.is_asleep)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment