Commit 3d726daf authored by Philip Carns's avatar Philip Carns
Browse files

wip: margo progress loop optimizations

- count ULTs in progress pool that are waiting on communication as
  waiters so that we don't busy spin when it isn't necessary
- if a trigger occurs, yield immediately to service ULTs that may have
  been activated as a result
- HG_Progress() is still called too frequently, see TODO note
- outcome: this makes servers significantly faster, but seems to make
  clients a bit slower; optimal performance right now comes from
  building servers with this git commit and building clients without
  it; need to investigate
parent 34fb805b
...@@ -37,7 +37,7 @@ struct margo_instance ...@@ -37,7 +37,7 @@ struct margo_instance
/* control logic for callers waiting on margo to be finalized */ /* control logic for callers waiting on margo to be finalized */
int finalize_flag; int finalize_flag;
int finalize_waiters_in_progress_pool; int waiters_in_progress_pool;
int refcount; int refcount;
ABT_mutex finalize_mutex; ABT_mutex finalize_mutex;
ABT_cond finalize_cond; ABT_cond finalize_cond;
...@@ -51,6 +51,13 @@ struct margo_handler_mapping ...@@ -51,6 +51,13 @@ struct margo_handler_mapping
margo_instance_id mid; margo_instance_id mid;
}; };
struct margo_cb_arg
{
ABT_eventual *eventual;
margo_instance_id mid;
char in_pool;
};
#define MAX_HANDLER_MAPPING 8 #define MAX_HANDLER_MAPPING 8
static int handler_mapping_table_size = 0; static int handler_mapping_table_size = 0;
static struct margo_handler_mapping handler_mapping_table[MAX_HANDLER_MAPPING] = {0}; static struct margo_handler_mapping handler_mapping_table[MAX_HANDLER_MAPPING] = {0};
...@@ -262,7 +269,7 @@ void margo_wait_for_finalize(margo_instance_id mid) ...@@ -262,7 +269,7 @@ void margo_wait_for_finalize(margo_instance_id mid)
ABT_mutex_lock(mid->finalize_mutex); ABT_mutex_lock(mid->finalize_mutex);
mid->finalize_waiters_in_progress_pool += in_pool; mid->waiters_in_progress_pool += in_pool;
mid->refcount++; mid->refcount++;
while(!mid->finalize_flag) while(!mid->finalize_flag)
...@@ -288,16 +295,19 @@ static void hg_progress_fn(void* foo) ...@@ -288,16 +295,19 @@ static void hg_progress_fn(void* foo)
size_t size; size_t size;
unsigned int hg_progress_timeout = MERCURY_PROGRESS_TIMEOUT_UB; unsigned int hg_progress_timeout = MERCURY_PROGRESS_TIMEOUT_UB;
double next_timer_exp; double next_timer_exp;
int trigger_happened;
while(!mid->hg_progress_shutdown_flag) while(!mid->hg_progress_shutdown_flag)
{ {
trigger_happened = 0;
do { do {
ret = HG_Trigger(mid->hg_context, 0, 1, &actual_count); ret = HG_Trigger(mid->hg_context, 0, 1, &actual_count);
if(ret == HG_SUCCESS && actual_count > 0)
trigger_happened = 1;
} while((ret == HG_SUCCESS) && actual_count && !mid->hg_progress_shutdown_flag); } while((ret == HG_SUCCESS) && actual_count && !mid->hg_progress_shutdown_flag);
if(!mid->hg_progress_shutdown_flag) if(trigger_happened)
{ ABT_thread_yield();
ABT_mutex_lock(mid->finalize_mutex);
ABT_pool_get_total_size(mid->progress_pool, &size); ABT_pool_get_total_size(mid->progress_pool, &size);
/* Are there any other threads executing in this pool that are *not* /* Are there any other threads executing in this pool that are *not*
...@@ -305,16 +315,22 @@ static void hg_progress_fn(void* foo) ...@@ -305,16 +315,22 @@ static void hg_progress_fn(void* foo)
* sleep here or else those threads will not get a chance to * sleep here or else those threads will not get a chance to
* execute. * execute.
*/ */
if(size > mid->finalize_waiters_in_progress_pool) if(size > mid->waiters_in_progress_pool)
{ {
ABT_mutex_unlock(mid->finalize_mutex); //printf("DEBUG: Margo progress function running while other ULTs are eligible for execution (size: %d, waiters: %d.\n", size, mid->waiters_in_progress_pool);
/* TODO: this is being executed more than is necessary (i.e.
* in cases where there are other legitimate ULTs eligible
* for execution that are not blocking on any events, Margo
* or otherwise). Maybe we need an abt scheduling tweak here
* to make sure that this ULT is the lowest priority in that
* scenario.
*/
HG_Progress(mid->hg_context, 0); HG_Progress(mid->hg_context, 0);
ABT_thread_yield(); ABT_thread_yield();
} }
else else
{ {
ABT_mutex_unlock(mid->finalize_mutex);
ret = margo_timer_get_next_expiration(mid, &next_timer_exp); ret = margo_timer_get_next_expiration(mid, &next_timer_exp);
if(ret == 0) if(ret == 0)
{ {
...@@ -334,7 +350,6 @@ static void hg_progress_fn(void* foo) ...@@ -334,7 +350,6 @@ static void hg_progress_fn(void* foo)
} }
HG_Progress(mid->hg_context, hg_progress_timeout); HG_Progress(mid->hg_context, hg_progress_timeout);
} }
}
/* check for any expired timers */ /* check for any expired timers */
margo_check_timers(mid); margo_check_timers(mid);
...@@ -362,10 +377,13 @@ hg_class_t* margo_get_class(margo_instance_id mid) ...@@ -362,10 +377,13 @@ hg_class_t* margo_get_class(margo_instance_id mid)
static hg_return_t margo_cb(const struct hg_cb_info *info) static hg_return_t margo_cb(const struct hg_cb_info *info)
{ {
hg_return_t hret = info->ret; hg_return_t hret = info->ret;
struct margo_cb_arg* arg = info->arg;
ABT_eventual *eventual = info->arg;
/* propagate return code out through eventual */ /* propagate return code out through eventual */
ABT_eventual_set(*eventual, &hret, sizeof(hret)); ABT_eventual_set(*(arg->eventual), &hret, sizeof(hret));
if(arg->in_pool)
arg->mid->waiters_in_progress_pool--;
return(HG_SUCCESS); return(HG_SUCCESS);
} }
...@@ -397,6 +415,7 @@ hg_return_t margo_forward_timed( ...@@ -397,6 +415,7 @@ hg_return_t margo_forward_timed(
hg_return_t* waited_hret; hg_return_t* waited_hret;
margo_timer_t forward_timer; margo_timer_t forward_timer;
margo_forward_timeout_cb_dat timeout_cb_dat; margo_forward_timeout_cb_dat timeout_cb_dat;
struct margo_cb_arg arg;
ret = ABT_eventual_create(sizeof(hret), &eventual); ret = ABT_eventual_create(sizeof(hret), &eventual);
if(ret != 0) if(ret != 0)
...@@ -409,7 +428,16 @@ hg_return_t margo_forward_timed( ...@@ -409,7 +428,16 @@ hg_return_t margo_forward_timed(
margo_timer_init(mid, &forward_timer, margo_forward_timeout_cb, margo_timer_init(mid, &forward_timer, margo_forward_timeout_cb,
&timeout_cb_dat, timeout_ms); &timeout_cb_dat, timeout_ms);
hret = HG_Forward(handle, margo_cb, &eventual, in_struct); arg.eventual = &eventual;
arg.mid = mid;
if(margo_xstream_is_in_progress_pool(mid))
{
arg.in_pool = 1;
mid->waiters_in_progress_pool++;
}
else
arg.in_pool = 0;
hret = HG_Forward(handle, margo_cb, &arg, in_struct);
if(hret == 0) if(hret == 0)
{ {
ABT_eventual_wait(eventual, (void**)&waited_hret); ABT_eventual_wait(eventual, (void**)&waited_hret);
...@@ -439,6 +467,7 @@ hg_return_t margo_forward( ...@@ -439,6 +467,7 @@ hg_return_t margo_forward(
ABT_eventual eventual; ABT_eventual eventual;
int ret; int ret;
hg_return_t* waited_hret; hg_return_t* waited_hret;
struct margo_cb_arg arg;
ret = ABT_eventual_create(sizeof(hret), &eventual); ret = ABT_eventual_create(sizeof(hret), &eventual);
if(ret != 0) if(ret != 0)
...@@ -446,7 +475,16 @@ hg_return_t margo_forward( ...@@ -446,7 +475,16 @@ hg_return_t margo_forward(
return(HG_NOMEM_ERROR); return(HG_NOMEM_ERROR);
} }
hret = HG_Forward(handle, margo_cb, &eventual, in_struct); arg.eventual = &eventual;
arg.mid = mid;
if(margo_xstream_is_in_progress_pool(mid))
{
arg.in_pool = 1;
mid->waiters_in_progress_pool++;
}
else
arg.in_pool = 0;
hret = HG_Forward(handle, margo_cb, &arg, in_struct);
if(hret == 0) if(hret == 0)
{ {
ABT_eventual_wait(eventual, (void**)&waited_hret); ABT_eventual_wait(eventual, (void**)&waited_hret);
...@@ -467,6 +505,7 @@ hg_return_t margo_respond( ...@@ -467,6 +505,7 @@ hg_return_t margo_respond(
ABT_eventual eventual; ABT_eventual eventual;
int ret; int ret;
hg_return_t* waited_hret; hg_return_t* waited_hret;
struct margo_cb_arg arg;
ret = ABT_eventual_create(sizeof(hret), &eventual); ret = ABT_eventual_create(sizeof(hret), &eventual);
if(ret != 0) if(ret != 0)
...@@ -474,7 +513,16 @@ hg_return_t margo_respond( ...@@ -474,7 +513,16 @@ hg_return_t margo_respond(
return(HG_NOMEM_ERROR); return(HG_NOMEM_ERROR);
} }
hret = HG_Respond(handle, margo_cb, &eventual, out_struct); arg.eventual = &eventual;
arg.mid = mid;
if(margo_xstream_is_in_progress_pool(mid))
{
arg.in_pool = 1;
mid->waiters_in_progress_pool++;
}
else
arg.in_pool = 0;
hret = HG_Respond(handle, margo_cb, &arg, out_struct);
if(hret == 0) if(hret == 0)
{ {
ABT_eventual_wait(eventual, (void**)&waited_hret); ABT_eventual_wait(eventual, (void**)&waited_hret);
...@@ -490,10 +538,13 @@ hg_return_t margo_respond( ...@@ -490,10 +538,13 @@ hg_return_t margo_respond(
static hg_return_t margo_bulk_transfer_cb(const struct hg_cb_info *info) static hg_return_t margo_bulk_transfer_cb(const struct hg_cb_info *info)
{ {
hg_return_t hret = info->ret; hg_return_t hret = info->ret;
ABT_eventual *eventual = info->arg; struct margo_cb_arg* arg = info->arg;
/* propagate return code out through eventual */ /* propagate return code out through eventual */
ABT_eventual_set(*eventual, &hret, sizeof(hret)); ABT_eventual_set(*(arg->eventual), &hret, sizeof(hret));
if(arg->in_pool)
arg->mid->waiters_in_progress_pool--;
return(HG_SUCCESS); return(HG_SUCCESS);
} }
...@@ -509,11 +560,13 @@ static hg_return_t margo_addr_lookup_cb(const struct hg_cb_info *info) ...@@ -509,11 +560,13 @@ static hg_return_t margo_addr_lookup_cb(const struct hg_cb_info *info)
struct lookup_cb_evt evt; struct lookup_cb_evt evt;
evt.nret = info->ret; evt.nret = info->ret;
evt.addr = info->info.lookup.addr; evt.addr = info->info.lookup.addr;
struct margo_cb_arg* arg = info->arg;
ABT_eventual *eventual = info->arg;
/* propagate return code out through eventual */ /* propagate return code out through eventual */
ABT_eventual_set(*eventual, &evt, sizeof(evt)); ABT_eventual_set(*(arg->eventual), &evt, sizeof(evt));
if(arg->in_pool)
arg->mid->waiters_in_progress_pool--;
return(HG_SUCCESS); return(HG_SUCCESS);
} }
...@@ -528,6 +581,7 @@ hg_return_t margo_addr_lookup( ...@@ -528,6 +581,7 @@ hg_return_t margo_addr_lookup(
struct lookup_cb_evt *evt; struct lookup_cb_evt *evt;
ABT_eventual eventual; ABT_eventual eventual;
int ret; int ret;
struct margo_cb_arg arg;
ret = ABT_eventual_create(sizeof(*evt), &eventual); ret = ABT_eventual_create(sizeof(*evt), &eventual);
if(ret != 0) if(ret != 0)
...@@ -535,8 +589,17 @@ hg_return_t margo_addr_lookup( ...@@ -535,8 +589,17 @@ hg_return_t margo_addr_lookup(
return(HG_NOMEM_ERROR); return(HG_NOMEM_ERROR);
} }
arg.eventual = &eventual;
arg.mid = mid;
if(margo_xstream_is_in_progress_pool(mid))
{
arg.in_pool = 1;
mid->waiters_in_progress_pool++;
}
else
arg.in_pool = 0;
nret = HG_Addr_lookup(mid->hg_context, margo_addr_lookup_cb, nret = HG_Addr_lookup(mid->hg_context, margo_addr_lookup_cb,
&eventual, name, HG_OP_ID_IGNORE); &arg, name, HG_OP_ID_IGNORE);
if(nret == 0) if(nret == 0)
{ {
ABT_eventual_wait(eventual, (void**)&evt); ABT_eventual_wait(eventual, (void**)&evt);
...@@ -563,6 +626,7 @@ hg_return_t margo_bulk_transfer( ...@@ -563,6 +626,7 @@ hg_return_t margo_bulk_transfer(
hg_return_t *waited_hret; hg_return_t *waited_hret;
ABT_eventual eventual; ABT_eventual eventual;
int ret; int ret;
struct margo_cb_arg arg;
ret = ABT_eventual_create(sizeof(hret), &eventual); ret = ABT_eventual_create(sizeof(hret), &eventual);
if(ret != 0) if(ret != 0)
...@@ -570,8 +634,17 @@ hg_return_t margo_bulk_transfer( ...@@ -570,8 +634,17 @@ hg_return_t margo_bulk_transfer(
return(HG_NOMEM_ERROR); return(HG_NOMEM_ERROR);
} }
arg.eventual = &eventual;
arg.mid = mid;
if(margo_xstream_is_in_progress_pool(mid))
{
arg.in_pool = 1;
mid->waiters_in_progress_pool++;
}
else
arg.in_pool = 0;
hret = HG_Bulk_transfer(mid->hg_context, margo_bulk_transfer_cb, hret = HG_Bulk_transfer(mid->hg_context, margo_bulk_transfer_cb,
&eventual, op, origin_addr, origin_handle, origin_offset, local_handle, &arg, op, origin_addr, origin_handle, origin_offset, local_handle,
local_offset, size, HG_OP_ID_IGNORE); local_offset, size, HG_OP_ID_IGNORE);
if(hret == 0) if(hret == 0)
{ {
...@@ -599,10 +672,8 @@ static void margo_thread_sleep_cb(void *arg) ...@@ -599,10 +672,8 @@ static void margo_thread_sleep_cb(void *arg)
(margo_thread_sleep_cb_dat *)arg; (margo_thread_sleep_cb_dat *)arg;
/* decrement number of waiting threads */ /* decrement number of waiting threads */
ABT_mutex_lock(sleep_cb_dat->mid->finalize_mutex); sleep_cb_dat->mid->waiters_in_progress_pool -=
sleep_cb_dat->mid->finalize_waiters_in_progress_pool -=
sleep_cb_dat->in_pool; sleep_cb_dat->in_pool;
ABT_mutex_unlock(sleep_cb_dat->mid->finalize_mutex);
/* wake up the sleeping thread */ /* wake up the sleeping thread */
ABT_mutex_lock(sleep_cb_dat->mutex); ABT_mutex_lock(sleep_cb_dat->mutex);
...@@ -636,9 +707,7 @@ void margo_thread_sleep( ...@@ -636,9 +707,7 @@ void margo_thread_sleep(
&sleep_cb_dat, timeout_ms); &sleep_cb_dat, timeout_ms);
/* increment number of waiting threads */ /* increment number of waiting threads */
ABT_mutex_lock(mid->finalize_mutex); mid->waiters_in_progress_pool += in_pool;
mid->finalize_waiters_in_progress_pool += in_pool;
ABT_mutex_unlock(mid->finalize_mutex);
/* yield thread for specified timeout */ /* yield thread for specified timeout */
ABT_mutex_lock(sleep_cb_dat.mutex); ABT_mutex_lock(sleep_cb_dat.mutex);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment