Commit 490c0c15 authored by Philip Carns's avatar Philip Carns

Merge branch 'carns/issue-53' into 'master'

switch to margo_thread_yield in sparkline fn

Closes #53

See merge request !23
parents 02e99ac2 8577f38b
......@@ -43,9 +43,11 @@ struct margo_timer_list* margo_timer_list_create()
return timer_lst;
}
void margo_timer_list_free(struct margo_timer_list* timer_lst)
void margo_timer_list_free(margo_instance_id mid, struct margo_timer_list* timer_lst)
{
margo_timer_t *cur;
ABT_pool handler_pool;
int ret;
ABT_mutex_lock(timer_lst->mutex);
/* delete any remaining timers from the queue */
......@@ -53,6 +55,24 @@ void margo_timer_list_free(struct margo_timer_list* timer_lst)
{
cur = timer_lst->queue_head;
DL_DELETE(timer_lst->queue_head, cur);
cur->prev = cur->next = NULL;
/* we must issue the callback now for any pending timers or else the
* callers will hang indefinitely
*/
margo_get_handler_pool(mid, &handler_pool);
if(handler_pool != ABT_POOL_NULL)
{
/* if handler pool is present, run callback there */
ret = ABT_thread_create(handler_pool, cur->cb_fn, cur->cb_dat,
ABT_THREAD_ATTR_NULL, NULL);
assert(ret == ABT_SUCCESS);
}
else
{
/* else run callback in place */
cur->cb_fn(cur->cb_dat);
}
}
ABT_mutex_unlock(timer_lst->mutex);
ABT_mutex_free(&(timer_lst->mutex));
......
......@@ -32,7 +32,7 @@ struct margo_timer_list* margo_timer_list_create();
* Frees the timer list
* @param [in] timer_lst timer list to free
*/
void margo_timer_list_free(struct margo_timer_list* timer_lst);
void margo_timer_list_free(margo_instance_id mid, struct margo_timer_list* timer_lst);
/**
* Initializes a margo timer object which will perform some action
......
......@@ -426,7 +426,7 @@ margo_instance_id margo_init_opt(const char *addr_str, int mode, const struct hg
err:
if(mid)
{
margo_timer_list_free(mid->timer_list);
margo_timer_list_free(mid, mid->timer_list);
ABT_mutex_free(&mid->finalize_mutex);
ABT_cond_free(&mid->finalize_cond);
free(mid);
......@@ -534,7 +534,7 @@ err:
if(mid)
{
margo_handle_cache_destroy(mid);
margo_timer_list_free(mid->timer_list);
margo_timer_list_free(mid, mid->timer_list);
ABT_mutex_free(&mid->finalize_mutex);
ABT_cond_free(&mid->finalize_cond);
ABT_mutex_free(&mid->pending_operations_mtx);
......@@ -559,8 +559,6 @@ static void margo_cleanup(margo_instance_id mid)
free(tmp);
}
margo_timer_list_free(mid->timer_list);
ABT_mutex_free(&mid->finalize_mutex);
ABT_cond_free(&mid->finalize_cond);
ABT_mutex_free(&mid->pending_operations_mtx);
......@@ -651,6 +649,9 @@ void margo_finalize(margo_instance_id mid)
ABT_thread_join(mid->hg_progress_tid);
ABT_thread_free(&mid->hg_progress_tid);
/* shut down pending timers */
margo_timer_list_free(mid, mid->timer_list);
if(mid->profile_enabled) {
ABT_thread_join(mid->sparkline_data_collection_tid);
ABT_thread_free(&mid->sparkline_data_collection_tid);
......@@ -1614,14 +1615,9 @@ static void margo_rpc_data_free(void* ptr)
}
/* dedicated thread function to collect sparkline data */
/* TODO: Initially, we had used margo_thread_sleep() here to keep the logic simple, but for some reason this ULT was not cleaning itself properly. It continued to run even after margo_finalize(), but we weren't able to figure out why. So we resorted to this logic of the ULT continuously being scheduled to run, and checking the timer to see if it needs to collect sparkline data. Either ways, it checks and yields.
* We need to:
* a. Figure out the performance overhead of this sort of busy waiting for timeout to trigger sparkline data collection
* b. Replace this logic with the correct way of doing it: using margo_thread_sleep() */
static void sparkline_data_collection_fn(void* foo)
{
struct margo_instance *mid = (struct margo_instance *)foo;
double time_passed, end = 0;
struct diag_data *stat, *tmp;
/* double check that profile collection should run, else, close this ULT */
......@@ -1632,11 +1628,7 @@ static void sparkline_data_collection_fn(void* foo)
while(!mid->hg_progress_shutdown_flag)
{
end = ABT_get_wtime();
time_passed = end - mid->previous_sparkline_data_collection_time;
if(time_passed >= MARGO_SPARKLINE_TIMESLICE) {
margo_thread_sleep(mid, MARGO_SPARKLINE_TIMESLICE*1000);
HASH_ITER(hh, mid->diag_rpc, stat, tmp)
{
......@@ -1650,13 +1642,8 @@ static void sparkline_data_collection_fn(void* foo)
//Drop!
}
}
mid->sparkline_index++;
mid->previous_sparkline_data_collection_time = ABT_get_wtime();
ABT_thread_yield();
} else {
ABT_thread_yield();
}
}
return;
......@@ -1728,7 +1715,12 @@ static void hg_progress_fn(void* foo)
pending = mid->pending_operations;
ABT_mutex_unlock(mid->pending_operations_mtx);
if(size > 1 || pending)
/* Note that if profiling is enabled then there will be one extra
* ULT in the progress pool. We don't need to worry about that one;
* a margo timer will wake the progress loop when it needs
* attention.
*/
if(pending || (mid->profile_enabled && size > 2) || (!mid->profile_enabled && size > 1))
{
/* TODO: a custom ABT scheduler could optimize this further by
* delaying Mercury progress until all other runnable ULTs have
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment