Commit 3c70631f authored by Shane Snyder's avatar Shane Snyder
Browse files

Merge branch 'margo-registered-data' into dev-wrap-hg-iface

parents 6a345e9e c34e5501
......@@ -111,6 +111,8 @@ int main(int argc, char **argv)
/* actually start margo */
/***************************************/
mid = margo_init(0, 0, hg_context);
assert(mid);
margo_diag_start(mid);
/* register RPC */
MARGO_REGISTER(mid, "my_rpc", my_rpc_in_t, my_rpc_out_t, NULL, &my_rpc_id);
......@@ -168,11 +170,11 @@ int main(int argc, char **argv)
margo_forward(mid, handle, NULL);
HG_Destroy(handle);
HG_Destroy(handle);
HG_Addr_free(hg_class, svr_addr);
/* shut down everything */
margo_diag_dump(mid, "-", 0);
margo_finalize(mid);
ABT_finalize();
......
......@@ -93,6 +93,7 @@ int main(int argc, char **argv)
/* actually start margo */
mid = margo_init(0, 0, hg_context);
assert(mid);
margo_diag_start(mid);
/* register RPC */
MARGO_REGISTER(mid, "my_rpc", my_rpc_in_t, my_rpc_out_t, my_rpc_ult, MARGO_RPC_ID_IGNORE);
......
......@@ -102,6 +102,8 @@ static void my_rpc_shutdown_ult(hg_handle_t handle)
HG_Destroy(handle);
margo_diag_dump(mid, "-", 0);
/* NOTE: we assume that the server daemon is using
* margo_wait_for_finalize() to suspend until this RPC executes, so there
* is no need to send any extra signal to notify it.
......
......@@ -647,6 +647,25 @@ margo_instance_id margo_hg_handle_get_instance(hg_handle_t h);
*/
int margo_lookup_mplex(margo_instance_id mid, hg_id_t id, uint32_t mplex_id, ABT_pool *pool);
/**
* Enables diagnostic collection on specified Margo instance
*
* @param [in] mid Margo instance
* @returns void
*/
void margo_diag_start(margo_instance_id mid);
/**
* Appends diagnostic statistics (enabled via margo_diag_start()) to specified
* output file.
*
* @param [in] mid Margo instance
* @param [in] file output file ("-" for stdout)
* @param [in] uniquify flag indicating if file name should have additional
* information added to it to make output from different processes unique
* @returns void
*/
void margo_diag_dump(margo_instance_id mid, const char* file, int uniquify);
/**
* macro that registers a function as an RPC.
......
......@@ -33,6 +33,22 @@ struct mplex_element
UT_hash_handle hh;
};
struct diag_data
{
double min;
double max;
double cumulative;
int count;
};
#define __DIAG_UPDATE(__data, __time)\
do {\
__data.count++; \
__data.cumulative += (__time); \
if((__time) > __data.max) __data.max = (__time); \
if((__time) < __data.min) __data.min = (__time); \
} while(0)
struct margo_instance
{
/* provided by caller */
......@@ -52,20 +68,30 @@ struct margo_instance
/* control logic for callers waiting on margo to be finalized */
int finalize_flag;
int waiters_in_progress_pool;
int refcount;
ABT_mutex finalize_mutex;
ABT_cond finalize_cond;
/* hash table to track multiplexed rpcs registered with margo */
struct mplex_element *mplex_table;
/* optional diagnostics data tracking */
/* NOTE: technically the following fields are subject to races if they
* are updated from more than one thread at a time. We will be careful
* to only update the counters from the progress_fn,
* which will serialize access.
*/
int diag_enabled;
struct diag_data diag_trigger_elapsed;
struct diag_data diag_progress_elapsed_zero_timeout;
struct diag_data diag_progress_elapsed_nonzero_timeout;
struct diag_data diag_progress_timeout_value;
};
struct margo_cb_arg
{
ABT_eventual *eventual;
margo_instance_id mid;
char in_pool;
};
struct margo_rpc_data
......@@ -76,7 +102,6 @@ struct margo_rpc_data
};
static void hg_progress_fn(void* foo);
static int margo_xstream_is_in_progress_pool(margo_instance_id mid);
static void margo_rpc_data_free(void* ptr);
margo_instance_id margo_init(const char *addr_str, int mode,
......@@ -290,18 +315,10 @@ void margo_finalize(margo_instance_id mid)
void margo_wait_for_finalize(margo_instance_id mid)
{
int in_pool = 0;
int do_cleanup;
/* Is this waiter in the same pool as the pool running the progress
* thread?
*/
if(margo_xstream_is_in_progress_pool(mid))
in_pool = 1;
ABT_mutex_lock(mid->finalize_mutex);
mid->waiters_in_progress_pool += in_pool;
mid->refcount++;
while(!mid->finalize_flag)
......@@ -423,25 +440,20 @@ hg_return_t margo_registered_disable_response(
struct lookup_cb_evt
{
hg_return_t nret;
hg_return_t hret;
hg_addr_t addr;
};
static hg_return_t margo_addr_lookup_cb(const struct hg_cb_info *info)
{
struct lookup_cb_evt evt;
evt.nret = info->ret;
evt.hret = info->ret;
evt.addr = info->info.lookup.addr;
struct margo_cb_arg* arg = info->arg;
/* propagate return code out through eventual */
ABT_eventual_set(*(arg->eventual), &evt, sizeof(evt));
#if 0
if(arg->in_pool)
arg->mid->waiters_in_progress_pool--;
#endif
return(HG_SUCCESS);
}
......@@ -450,7 +462,7 @@ hg_return_t margo_addr_lookup(
const char *name,
hg_addr_t *addr)
{
hg_return_t nret;
hg_return_t hret;
struct lookup_cb_evt *evt;
ABT_eventual eventual;
int ret;
......@@ -464,27 +476,19 @@ hg_return_t margo_addr_lookup(
arg.eventual = &eventual;
arg.mid = mid;
#if 0
if(margo_xstream_is_in_progress_pool(mid))
{
arg.in_pool = 1;
mid->waiters_in_progress_pool++;
}
else
arg.in_pool = 0;
#endif
nret = HG_Addr_lookup(mid->hg_context, margo_addr_lookup_cb,
hret = HG_Addr_lookup(mid->hg_context, margo_addr_lookup_cb,
&arg, name, HG_OP_ID_IGNORE);
if(nret == 0)
if(hret == HG_SUCCESS)
{
ABT_eventual_wait(eventual, (void**)&evt);
*addr = evt->addr;
nret = evt->nret;
hret = evt->hret;
}
ABT_eventual_free(&eventual);
return(nret);
return(hret);
}
hg_return_t margo_addr_free(
......@@ -573,7 +577,6 @@ hg_return_t margo_free_output(
return(HG_Free_output(handle, out_struct));
}
static hg_return_t margo_cb(const struct hg_cb_info *info)
{
hg_return_t hret = info->ret;
......@@ -582,11 +585,6 @@ static hg_return_t margo_cb(const struct hg_cb_info *info)
/* propagate return code out through eventual */
ABT_eventual_set(*(arg->eventual), &hret, sizeof(hret));
#if 0
if(arg->in_pool)
arg->mid->waiters_in_progress_pool--;
#endif
return(HG_SUCCESS);
}
......@@ -609,17 +607,9 @@ hg_return_t margo_forward(
arg.eventual = &eventual;
arg.mid = mid;
#if 0
if(margo_xstream_is_in_progress_pool(mid))
{
arg.in_pool = 1;
mid->waiters_in_progress_pool++;
}
else
arg.in_pool = 0;
#endif
hret = HG_Forward(handle, margo_cb, &arg, in_struct);
if(hret == 0)
if(hret == HG_SUCCESS)
{
ABT_eventual_wait(eventual, (void**)&waited_hret);
hret = *waited_hret;
......@@ -672,17 +662,9 @@ hg_return_t margo_forward_timed(
arg.eventual = &eventual;
arg.mid = mid;
#if 0
if(margo_xstream_is_in_progress_pool(mid))
{
arg.in_pool = 1;
mid->waiters_in_progress_pool++;
}
else
arg.in_pool = 0;
#endif
hret = HG_Forward(handle, margo_cb, &arg, in_struct);
if(hret == 0)
if(hret == HG_SUCCESS)
{
ABT_eventual_wait(eventual, (void**)&waited_hret);
hret = *waited_hret;
......@@ -720,17 +702,9 @@ hg_return_t margo_respond(
arg.eventual = &eventual;
arg.mid = mid;
#if 0
if(margo_xstream_is_in_progress_pool(mid))
{
arg.in_pool = 1;
mid->waiters_in_progress_pool++;
}
else
arg.in_pool = 0;
#endif
hret = HG_Respond(handle, margo_cb, &arg, out_struct);
if(hret == 0)
if(hret == HG_SUCCESS)
{
ABT_eventual_wait(eventual, (void**)&waited_hret);
hret = *waited_hret;
......@@ -834,9 +808,6 @@ static hg_return_t margo_bulk_transfer_cb(const struct hg_cb_info *info)
/* propagate return code out through eventual */
ABT_eventual_set(*(arg->eventual), &hret, sizeof(hret));
if(arg->in_pool)
arg->mid->waiters_in_progress_pool--;
return(HG_SUCCESS);
}
......@@ -865,17 +836,11 @@ hg_return_t margo_bulk_transfer(
arg.eventual = &eventual;
arg.mid = mid;
if(margo_xstream_is_in_progress_pool(mid))
{
arg.in_pool = 1;
mid->waiters_in_progress_pool++;
}
else
arg.in_pool = 0;
hret = HG_Bulk_transfer(mid->hg_context, margo_bulk_transfer_cb,
&arg, op, origin_addr, origin_handle, origin_offset, local_handle,
local_offset, size, op_id);
if(hret == 0)
if(hret == HG_SUCCESS)
{
ABT_eventual_wait(eventual, (void**)&waited_hret);
hret = *waited_hret;
......@@ -892,14 +857,11 @@ hg_return_t margo_bulk_cancel(
return(HG_Bulk_cancel(op_id));
}
/* returns 1 if current xstream is in the progress pool, 0 if not */
typedef struct
{
margo_instance_id mid;
ABT_mutex mutex;
ABT_cond cond;
char is_asleep;
char in_pool;
} margo_thread_sleep_cb_dat;
static void margo_thread_sleep_cb(void *arg)
......@@ -907,10 +869,6 @@ static void margo_thread_sleep_cb(void *arg)
margo_thread_sleep_cb_dat *sleep_cb_dat =
(margo_thread_sleep_cb_dat *)arg;
/* decrement number of waiting threads */
sleep_cb_dat->mid->waiters_in_progress_pool -=
sleep_cb_dat->in_pool;
/* wake up the sleeping thread */
ABT_mutex_lock(sleep_cb_dat->mutex);
sleep_cb_dat->is_asleep = 0;
......@@ -924,27 +882,18 @@ void margo_thread_sleep(
margo_instance_id mid,
double timeout_ms)
{
int in_pool = 0;
margo_timer_t sleep_timer;
margo_thread_sleep_cb_dat sleep_cb_dat;
if(margo_xstream_is_in_progress_pool(mid))
in_pool = 1;
/* set data needed for sleep callback */
sleep_cb_dat.mid = mid;
ABT_mutex_create(&(sleep_cb_dat.mutex));
ABT_cond_create(&(sleep_cb_dat.cond));
sleep_cb_dat.is_asleep = 1;
sleep_cb_dat.in_pool = in_pool;
/* initialize the sleep timer */
margo_timer_init(mid, &sleep_timer, margo_thread_sleep_cb,
&sleep_cb_dat, timeout_ms);
/* increment number of waiting threads */
mid->waiters_in_progress_pool += in_pool;
/* yield thread for specified timeout */
ABT_mutex_lock(sleep_cb_dat.mutex);
while(sleep_cb_dat.is_asleep)
......@@ -1009,23 +958,6 @@ int margo_lookup_mplex(margo_instance_id mid, hg_id_t id, uint32_t mplex_id, ABT
return(0);
}
static int margo_xstream_is_in_progress_pool(margo_instance_id mid)
{
int ret;
ABT_xstream xstream;
ABT_pool pool;
ret = ABT_xstream_self(&xstream);
assert(ret == ABT_SUCCESS);
ret = ABT_xstream_get_main_pools(xstream, 1, &pool);
assert(ret == ABT_SUCCESS);
if(pool == mid->progress_pool)
return(1);
else
return(0);
}
static void margo_rpc_data_free(void* ptr)
{
struct margo_rpc_data* data = (struct margo_rpc_data*) ptr;
......@@ -1045,12 +977,26 @@ static void hg_progress_fn(void* foo)
unsigned int hg_progress_timeout = MERCURY_PROGRESS_TIMEOUT_UB;
double next_timer_exp;
int trigger_happened;
double tm1, tm2;
int diag_enabled = 0;
while(!mid->hg_progress_shutdown_flag)
{
trigger_happened = 0;
do {
/* save value of instance diag variable, in case it is modified
* while we are in loop
*/
diag_enabled = mid->diag_enabled;
if(diag_enabled) tm1 = ABT_get_wtime();
ret = HG_Trigger(mid->hg_context, 0, 1, &actual_count);
if(diag_enabled)
{
tm2 = ABT_get_wtime();
__DIAG_UPDATE(mid->diag_trigger_elapsed, (tm2-tm1));
}
if(ret == HG_SUCCESS && actual_count > 0)
trigger_happened = 1;
} while((ret == HG_SUCCESS) && actual_count && !mid->hg_progress_shutdown_flag);
......@@ -1058,16 +1004,15 @@ static void hg_progress_fn(void* foo)
if(trigger_happened)
ABT_thread_yield();
ABT_pool_get_total_size(mid->progress_pool, &size);
ABT_pool_get_size(mid->progress_pool, &size);
/* Are there any other threads executing in this pool that are *not*
* blocked on margo_wait_for_finalize()? If so then, we can't
* sleep here or else those threads will not get a chance to
* execute.
* blocked ? If so then, we can't sleep here or else those threads
* will not get a chance to execute.
* TODO: check is ABT_pool_get_size returns the number of ULT/tasks
* that can be executed including this one, or not including this one.
*/
if(size > mid->waiters_in_progress_pool)
if(size > 0)
{
//printf("DEBUG: Margo progress function running while other ULTs are eligible for execution (size: %d, waiters: %d.\n", size, mid->waiters_in_progress_pool);
/* TODO: this is being executed more than is necessary (i.e.
* in cases where there are other legitimate ULTs eligible
* for execution that are not blocking on any events, Margo
......@@ -1075,7 +1020,14 @@ static void hg_progress_fn(void* foo)
* to make sure that this ULT is the lowest priority in that
* scenario.
*/
if(diag_enabled) tm1 = ABT_get_wtime();
ret = HG_Progress(mid->hg_context, 0);
if(diag_enabled)
{
tm2 = ABT_get_wtime();
__DIAG_UPDATE(mid->diag_progress_elapsed_zero_timeout, (tm2-tm1));
__DIAG_UPDATE(mid->diag_progress_timeout_value, 0);
}
if(ret == HG_SUCCESS)
{
/* Mercury completed something; loop around to trigger
......@@ -1113,7 +1065,18 @@ static void hg_progress_fn(void* foo)
hg_progress_timeout = 0;
}
}
if(diag_enabled) tm1 = ABT_get_wtime();
ret = HG_Progress(mid->hg_context, hg_progress_timeout);
if(diag_enabled)
{
tm2 = ABT_get_wtime();
if(hg_progress_timeout == 0)
__DIAG_UPDATE(mid->diag_progress_elapsed_zero_timeout, (tm2-tm1));
else
__DIAG_UPDATE(mid->diag_progress_elapsed_nonzero_timeout, (tm2-tm1));
__DIAG_UPDATE(mid->diag_progress_timeout_value, hg_progress_timeout);
}
if(ret != HG_SUCCESS && ret != HG_TIMEOUT)
{
/* TODO: error handling */
......@@ -1127,3 +1090,87 @@ static void hg_progress_fn(void* foo)
return;
}
void margo_diag_start(margo_instance_id mid)
{
mid->diag_enabled = 1;
}
static void print_diag_data(FILE *file, const char* name, const char *description, struct diag_data *data)
{
double avg;
fprintf(file, "# %s\n", description);
if(data->count != 0)
avg = data->cumulative/data->count;
else
avg = 0;
fprintf(file, "%s\t%.9f\t%.9f\t%.9f\t%d\n", name, avg, data->min, data->max, data->count);
return;
}
void margo_diag_dump(margo_instance_id mid, const char* file, int uniquify)
{
FILE *outfile;
time_t ltime;
char revised_file_name[256] = {0};
assert(mid->diag_enabled);
if(uniquify)
{
char hostname[128] = {0};
int pid;
gethostname(hostname, 128);
pid = getpid();
sprintf(revised_file_name, "%s-%s-%d", file, hostname, pid);
}
else
{
sprintf(revised_file_name, "%s", file);
}
if(strcmp("-", file) == 0)
{
outfile = stdout;
}
else
{
outfile = fopen(revised_file_name, "a");
if(!outfile)
{
perror("fopen");
return;
}
}
/* TODO: retrieve self addr and include in output */
/* TODO: support pattern substitution in file name to create unique
* output files per process
*/
time(&ltime);
fprintf(outfile, "# Margo diagnostics\n");
fprintf(outfile, "# %s\n", ctime(&ltime));
fprintf(outfile, "# <stat>\t<avg>\t<min>\t<max>\t<count>\n");
print_diag_data(outfile, "trigger_elapsed",
"Time consumed by HG_Trigger()",
&mid->diag_trigger_elapsed);
print_diag_data(outfile, "progress_elapsed_zero_timeout",
"Time consumed by HG_Progress() when called with timeout==0",
&mid->diag_progress_elapsed_zero_timeout);
print_diag_data(outfile, "progress_elapsed_nonzero_timeout",
"Time consumed by HG_Progress() when called with timeout!=0",
&mid->diag_progress_elapsed_nonzero_timeout);
print_diag_data(outfile, "progress_timeout_value",
"Timeout values passed to HG_Progress()",
&mid->diag_progress_timeout_value);
if(outfile != stdout)
fclose(outfile);
return;
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment