Commit a97c0951 authored by Philip Carns's avatar Philip Carns

Merge branch 'dev-diag-api' into 'master'

Dev diag api

Closes #28

See merge request !1
parents de34c0d5 3bf769f9
...@@ -111,6 +111,8 @@ int main(int argc, char **argv) ...@@ -111,6 +111,8 @@ int main(int argc, char **argv)
/* actually start margo */ /* actually start margo */
/***************************************/ /***************************************/
mid = margo_init(0, 0, hg_context); mid = margo_init(0, 0, hg_context);
assert(mid);
margo_diag_start(mid);
/* register RPC */ /* register RPC */
my_rpc_id = MERCURY_REGISTER(hg_class, "my_rpc", my_rpc_in_t, my_rpc_out_t, my_rpc_id = MERCURY_REGISTER(hg_class, "my_rpc", my_rpc_in_t, my_rpc_out_t,
...@@ -174,6 +176,7 @@ int main(int argc, char **argv) ...@@ -174,6 +176,7 @@ int main(int argc, char **argv)
HG_Addr_free(hg_class, svr_addr); HG_Addr_free(hg_class, svr_addr);
/* shut down everything */ /* shut down everything */
margo_diag_dump(mid, "-");
margo_finalize(mid); margo_finalize(mid);
ABT_finalize(); ABT_finalize();
......
...@@ -93,6 +93,7 @@ int main(int argc, char **argv) ...@@ -93,6 +93,7 @@ int main(int argc, char **argv)
/* actually start margo */ /* actually start margo */
mid = margo_init(0, 0, hg_context); mid = margo_init(0, 0, hg_context);
assert(mid); assert(mid);
margo_diag_start(mid);
/* register RPC */ /* register RPC */
MERCURY_REGISTER(hg_class, "my_rpc", my_rpc_in_t, my_rpc_out_t, MERCURY_REGISTER(hg_class, "my_rpc", my_rpc_in_t, my_rpc_out_t,
......
...@@ -102,6 +102,8 @@ static void my_rpc_shutdown_ult(hg_handle_t handle) ...@@ -102,6 +102,8 @@ static void my_rpc_shutdown_ult(hg_handle_t handle)
HG_Destroy(handle); HG_Destroy(handle);
margo_diag_dump(mid, "-");
/* NOTE: we assume that the server daemon is using /* NOTE: we assume that the server daemon is using
* margo_wait_for_finalize() to suspend until this RPC executes, so there * margo_wait_for_finalize() to suspend until this RPC executes, so there
* is no need to send any extra signal to notify it. * is no need to send any extra signal to notify it.
......
...@@ -209,6 +209,24 @@ int margo_register_mplex(margo_instance_id mid, hg_id_t id, uint32_t mplex_id, A ...@@ -209,6 +209,24 @@ int margo_register_mplex(margo_instance_id mid, hg_id_t id, uint32_t mplex_id, A
*/ */
int margo_lookup_mplex(margo_instance_id mid, hg_id_t id, uint32_t mplex_id, ABT_pool *pool); int margo_lookup_mplex(margo_instance_id mid, hg_id_t id, uint32_t mplex_id, ABT_pool *pool);
/**
* Enables diagnostic collection on specified Margo instance
*
* @param [in] mid Margo instance
* @returns void
*/
void margo_diag_start(margo_instance_id mid);
/**
* Appends diagnostic statistics (enabled via margo_diag_start()) to specified
* output file.
*
* @param [in] mid Margo instance
* @param [in] file output file ("-" for stdout)
* @returns void
*/
void margo_diag_dump(margo_instance_id mid, const char* file);
#define MARGO_REGISTER(__mid, __func_name, __in_t, __out_t, __handler, __mplex_id, __pool) do { \ #define MARGO_REGISTER(__mid, __func_name, __in_t, __out_t, __handler, __mplex_id, __pool) do { \
hg_return_t __hret; \ hg_return_t __hret; \
hg_id_t __id; \ hg_id_t __id; \
......
...@@ -33,6 +33,22 @@ struct mplex_element ...@@ -33,6 +33,22 @@ struct mplex_element
UT_hash_handle hh; UT_hash_handle hh;
}; };
struct diag_data
{
double min;
double max;
double cumulative;
int count;
};
#define __DIAG_UPDATE(__data, __time)\
do {\
__data.count++; \
__data.cumulative += (__time); \
if((__time) > __data.max) __data.max = (__time); \
if((__time) < __data.min) __data.min = (__time); \
} while(0)
struct margo_instance struct margo_instance
{ {
/* provided by caller */ /* provided by caller */
...@@ -60,6 +76,18 @@ struct margo_instance ...@@ -60,6 +76,18 @@ struct margo_instance
/* hash table to track multiplexed rpcs registered with margo */ /* hash table to track multiplexed rpcs registered with margo */
struct mplex_element *mplex_table; struct mplex_element *mplex_table;
/* optional diagnostics data tracking */
/* NOTE: technically the following fields are subject to races if they
* are updated from more than one thread at a time. We will be careful
* to only update the counters from the progress_fn,
* which will serialize access.
*/
int diag_enabled;
struct diag_data diag_trigger_elapsed;
struct diag_data diag_progress_elapsed_zero_timeout;
struct diag_data diag_progress_elapsed_nonzero_timeout;
struct diag_data diag_progress_timeout_value;
}; };
struct margo_handler_mapping struct margo_handler_mapping
...@@ -313,12 +341,26 @@ static void hg_progress_fn(void* foo) ...@@ -313,12 +341,26 @@ static void hg_progress_fn(void* foo)
unsigned int hg_progress_timeout = MERCURY_PROGRESS_TIMEOUT_UB; unsigned int hg_progress_timeout = MERCURY_PROGRESS_TIMEOUT_UB;
double next_timer_exp; double next_timer_exp;
int trigger_happened; int trigger_happened;
double tm1, tm2;
int diag_enabled = 0;
while(!mid->hg_progress_shutdown_flag) while(!mid->hg_progress_shutdown_flag)
{ {
trigger_happened = 0; trigger_happened = 0;
do { do {
/* save value of instance diag variable, in case it is modified
* while we are in loop
*/
diag_enabled = mid->diag_enabled;
if(diag_enabled) tm1 = ABT_get_wtime();
ret = HG_Trigger(mid->hg_context, 0, 1, &actual_count); ret = HG_Trigger(mid->hg_context, 0, 1, &actual_count);
if(diag_enabled)
{
tm2 = ABT_get_wtime();
__DIAG_UPDATE(mid->diag_trigger_elapsed, (tm2-tm1));
}
if(ret == HG_SUCCESS && actual_count > 0) if(ret == HG_SUCCESS && actual_count > 0)
trigger_happened = 1; trigger_happened = 1;
} while((ret == HG_SUCCESS) && actual_count && !mid->hg_progress_shutdown_flag); } while((ret == HG_SUCCESS) && actual_count && !mid->hg_progress_shutdown_flag);
...@@ -343,7 +385,14 @@ static void hg_progress_fn(void* foo) ...@@ -343,7 +385,14 @@ static void hg_progress_fn(void* foo)
* to make sure that this ULT is the lowest priority in that * to make sure that this ULT is the lowest priority in that
* scenario. * scenario.
*/ */
if(diag_enabled) tm1 = ABT_get_wtime();
ret = HG_Progress(mid->hg_context, 0); ret = HG_Progress(mid->hg_context, 0);
if(diag_enabled)
{
tm2 = ABT_get_wtime();
__DIAG_UPDATE(mid->diag_progress_elapsed_zero_timeout, (tm2-tm1));
__DIAG_UPDATE(mid->diag_progress_timeout_value, 0);
}
if(ret == HG_SUCCESS) if(ret == HG_SUCCESS)
{ {
/* Mercury completed something; loop around to trigger /* Mercury completed something; loop around to trigger
...@@ -381,7 +430,18 @@ static void hg_progress_fn(void* foo) ...@@ -381,7 +430,18 @@ static void hg_progress_fn(void* foo)
hg_progress_timeout = 0; hg_progress_timeout = 0;
} }
} }
if(diag_enabled) tm1 = ABT_get_wtime();
ret = HG_Progress(mid->hg_context, hg_progress_timeout); ret = HG_Progress(mid->hg_context, hg_progress_timeout);
if(diag_enabled)
{
tm2 = ABT_get_wtime();
if(hg_progress_timeout == 0)
__DIAG_UPDATE(mid->diag_progress_elapsed_zero_timeout, (tm2-tm1));
else
__DIAG_UPDATE(mid->diag_progress_elapsed_nonzero_timeout, (tm2-tm1));
__DIAG_UPDATE(mid->diag_progress_timeout_value, hg_progress_timeout);
}
if(ret != HG_SUCCESS && ret != HG_TIMEOUT) if(ret != HG_SUCCESS && ret != HG_TIMEOUT)
{ {
/* TODO: error handling */ /* TODO: error handling */
...@@ -855,3 +915,71 @@ int margo_register_mplex(margo_instance_id mid, hg_id_t id, uint32_t mplex_id, A ...@@ -855,3 +915,71 @@ int margo_register_mplex(margo_instance_id mid, hg_id_t id, uint32_t mplex_id, A
return(0); return(0);
} }
void margo_diag_start(margo_instance_id mid)
{
mid->diag_enabled = 1;
}
static void print_diag_data(FILE *file, const char* name, const char *description, struct diag_data *data)
{
double avg;
fprintf(file, "# %s\n", description);
if(data->count != 0)
avg = data->cumulative/data->count;
else
avg = 0;
fprintf(file, "%s\t%.9f\t%.9f\t%.9f\t%d\n", name, avg, data->min, data->max, data->count);
return;
}
void margo_diag_dump(margo_instance_id mid, const char* file)
{
FILE *outfile;
time_t ltime;
assert(mid->diag_enabled);
if(strcmp("-", file) == 0)
{
outfile = stdout;
}
else
{
outfile = fopen(file, "a");
if(!outfile)
{
perror("fopen");
return;
}
}
/* TODO: retrieve self addr and include in output */
/* TODO: support pattern substitution in file name to create unique
* output files per process
*/
time(&ltime);
fprintf(outfile, "# Margo diagnostics\n");
fprintf(outfile, "# %s\n", ctime(&ltime));
fprintf(outfile, "# <stat>\t<avg>\t<min>\t<max>\t<count>\n");
print_diag_data(outfile, "trigger_elapsed",
"Time consumed by HG_Trigger()",
&mid->diag_trigger_elapsed);
print_diag_data(outfile, "progress_elapsed_zero_timeout",
"Time consumed by HG_Progress() when called with timeout==0",
&mid->diag_progress_elapsed_zero_timeout);
print_diag_data(outfile, "progress_elapsed_nonzero_timeout",
"Time consumed by HG_Progress() when called with timeout!=0",
&mid->diag_progress_elapsed_nonzero_timeout);
print_diag_data(outfile, "progress_timeout_value",
"Timeout values passed to HG_Progress()",
&mid->diag_progress_timeout_value);
if(outfile != stdout)
fclose(outfile);
return;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment