diff --git a/src/margo.c b/src/margo.c index 395b2b3492b14e36767b36227588ff609d028404..657614099c06c7d78cf87cfe857b590141b180fb 100644 --- a/src/margo.c +++ b/src/margo.c @@ -51,7 +51,6 @@ struct margo_instance /* control logic for callers waiting on margo to be finalized */ int finalize_flag; - int waiters_in_progress_pool; int refcount; ABT_mutex finalize_mutex; ABT_cond finalize_cond; @@ -64,7 +63,6 @@ struct margo_cb_arg { ABT_eventual *eventual; margo_instance_id mid; - char in_pool; }; struct margo_rpc_data @@ -257,18 +255,10 @@ void margo_finalize(margo_instance_id mid) void margo_wait_for_finalize(margo_instance_id mid) { - int in_pool = 0; int do_cleanup; - /* Is this waiter in the same pool as the pool running the progress - * thread? - */ - if(margo_xstream_is_in_progress_pool(mid)) - in_pool = 1; - ABT_mutex_lock(mid->finalize_mutex); - mid->waiters_in_progress_pool += in_pool; mid->refcount++; while(!mid->finalize_flag) @@ -308,16 +298,15 @@ static void hg_progress_fn(void* foo) if(trigger_happened) ABT_thread_yield(); - ABT_pool_get_total_size(mid->progress_pool, &size); + ABT_pool_get_size(mid->progress_pool, &size); /* Are there any other threads executing in this pool that are *not* - * blocked on margo_wait_for_finalize()? If so then, we can't - * sleep here or else those threads will not get a chance to - * execute. + * blocked ? If so then, we can't sleep here or else those threads + * will not get a chance to execute. + * TODO: check is ABT_pool_get_size returns the number of ULT/tasks + * that can be executed including this one, or not including this one. */ - if(size > mid->waiters_in_progress_pool) + if(size > 0) { - //printf("DEBUG: Margo progress function running while other ULTs are eligible for execution (size: %d, waiters: %d.\n", size, mid->waiters_in_progress_pool); - /* TODO: this is being executed more than is necessary (i.e. * in cases where there are other legitimate ULTs eligible * for execution that are not blocking on any events, Margo @@ -433,11 +422,6 @@ static hg_return_t margo_cb(const struct hg_cb_info *info) /* propagate return code out through eventual */ ABT_eventual_set(*(arg->eventual), &hret, sizeof(hret)); -#if 0 - if(arg->in_pool) - arg->mid->waiters_in_progress_pool--; -#endif - return(HG_SUCCESS); } @@ -483,15 +467,7 @@ hg_return_t margo_forward_timed( arg.eventual = &eventual; arg.mid = mid; -#if 0 - if(margo_xstream_is_in_progress_pool(mid)) - { - arg.in_pool = 1; - mid->waiters_in_progress_pool++; - } - else - arg.in_pool = 0; -#endif + hret = HG_Forward(handle, margo_cb, &arg, in_struct); if(hret == 0) { @@ -532,15 +508,7 @@ hg_return_t margo_forward( arg.eventual = &eventual; arg.mid = mid; -#if 0 - if(margo_xstream_is_in_progress_pool(mid)) - { - arg.in_pool = 1; - mid->waiters_in_progress_pool++; - } - else - arg.in_pool = 0; -#endif + hret = HG_Forward(handle, margo_cb, &arg, in_struct); if(hret == 0) { @@ -572,15 +540,7 @@ hg_return_t margo_respond( arg.eventual = &eventual; arg.mid = mid; -#if 0 - if(margo_xstream_is_in_progress_pool(mid)) - { - arg.in_pool = 1; - mid->waiters_in_progress_pool++; - } - else - arg.in_pool = 0; -#endif + hret = HG_Respond(handle, margo_cb, &arg, out_struct); if(hret == 0) { @@ -602,9 +562,6 @@ static hg_return_t margo_bulk_transfer_cb(const struct hg_cb_info *info) /* propagate return code out through eventual */ ABT_eventual_set(*(arg->eventual), &hret, sizeof(hret)); - if(arg->in_pool) - arg->mid->waiters_in_progress_pool--; - return(HG_SUCCESS); } @@ -624,11 +581,6 @@ static hg_return_t margo_addr_lookup_cb(const struct hg_cb_info *info) /* propagate return code out through eventual */ ABT_eventual_set(*(arg->eventual), &evt, sizeof(evt)); -#if 0 - if(arg->in_pool) - arg->mid->waiters_in_progress_pool--; -#endif - return(HG_SUCCESS); } @@ -652,15 +604,7 @@ hg_return_t margo_addr_lookup( arg.eventual = &eventual; arg.mid = mid; -#if 0 - if(margo_xstream_is_in_progress_pool(mid)) - { - arg.in_pool = 1; - mid->waiters_in_progress_pool++; - } - else - arg.in_pool = 0; -#endif + nret = HG_Addr_lookup(mid->hg_context, margo_addr_lookup_cb, &arg, name, HG_OP_ID_IGNORE); if(nret == 0) @@ -699,13 +643,7 @@ hg_return_t margo_bulk_transfer( arg.eventual = &eventual; arg.mid = mid; - if(margo_xstream_is_in_progress_pool(mid)) - { - arg.in_pool = 1; - mid->waiters_in_progress_pool++; - } - else - arg.in_pool = 0; + hret = HG_Bulk_transfer(mid->hg_context, margo_bulk_transfer_cb, &arg, op, origin_addr, origin_handle, origin_offset, local_handle, local_offset, size, HG_OP_ID_IGNORE); @@ -726,7 +664,6 @@ typedef struct ABT_mutex mutex; ABT_cond cond; char is_asleep; - char in_pool; } margo_thread_sleep_cb_dat; static void margo_thread_sleep_cb(void *arg) @@ -734,10 +671,6 @@ static void margo_thread_sleep_cb(void *arg) margo_thread_sleep_cb_dat *sleep_cb_dat = (margo_thread_sleep_cb_dat *)arg; - /* decrement number of waiting threads */ - sleep_cb_dat->mid->waiters_in_progress_pool -= - sleep_cb_dat->in_pool; - /* wake up the sleeping thread */ ABT_mutex_lock(sleep_cb_dat->mutex); sleep_cb_dat->is_asleep = 0; @@ -751,27 +684,19 @@ void margo_thread_sleep( margo_instance_id mid, double timeout_ms) { - int in_pool = 0; margo_timer_t sleep_timer; margo_thread_sleep_cb_dat sleep_cb_dat; - if(margo_xstream_is_in_progress_pool(mid)) - in_pool = 1; - /* set data needed for sleep callback */ sleep_cb_dat.mid = mid; ABT_mutex_create(&(sleep_cb_dat.mutex)); ABT_cond_create(&(sleep_cb_dat.cond)); sleep_cb_dat.is_asleep = 1; - sleep_cb_dat.in_pool = in_pool; /* initialize the sleep timer */ margo_timer_init(mid, &sleep_timer, margo_thread_sleep_cb, &sleep_cb_dat, timeout_ms); - /* increment number of waiting threads */ - mid->waiters_in_progress_pool += in_pool; - /* yield thread for specified timeout */ ABT_mutex_lock(sleep_cb_dat.mutex); while(sleep_cb_dat.is_asleep)