Commit c34e5501 authored by Philip Carns's avatar Philip Carns

Merge remote-tracking branch 'origin/master' into margo-registered-data

parents 4a4109b1 d6973c44
......@@ -111,6 +111,8 @@ int main(int argc, char **argv)
/* actually start margo */
/***************************************/
mid = margo_init(0, 0, hg_context);
assert(mid);
margo_diag_start(mid);
/* register RPC */
MARGO_REGISTER(mid, "my_rpc", my_rpc_in_t, my_rpc_out_t, NULL, &my_rpc_id);
......@@ -168,11 +170,11 @@ int main(int argc, char **argv)
margo_forward(mid, handle, NULL);
HG_Destroy(handle);
HG_Destroy(handle);
HG_Addr_free(hg_class, svr_addr);
/* shut down everything */
margo_diag_dump(mid, "-", 0);
margo_finalize(mid);
ABT_finalize();
......
......@@ -93,6 +93,7 @@ int main(int argc, char **argv)
/* actually start margo */
mid = margo_init(0, 0, hg_context);
assert(mid);
margo_diag_start(mid);
/* register RPC */
MARGO_REGISTER(mid, "my_rpc", my_rpc_in_t, my_rpc_out_t, my_rpc_ult, MARGO_RPC_ID_IGNORE);
......
......@@ -102,6 +102,8 @@ static void my_rpc_shutdown_ult(hg_handle_t handle)
HG_Destroy(handle);
margo_diag_dump(mid, "-", 0);
/* NOTE: we assume that the server daemon is using
* margo_wait_for_finalize() to suspend until this RPC executes, so there
* is no need to send any extra signal to notify it.
......
......@@ -256,6 +256,25 @@ int margo_register_mplex(margo_instance_id mid, hg_id_t id, uint32_t mplex_id, A
*/
int margo_lookup_mplex(margo_instance_id mid, hg_id_t id, uint32_t mplex_id, ABT_pool *pool);
/**
* Enables diagnostic collection on specified Margo instance
*
* @param [in] mid Margo instance
* @returns void
*/
void margo_diag_start(margo_instance_id mid);
/**
* Appends diagnostic statistics (enabled via margo_diag_start()) to specified
* output file.
*
* @param [in] mid Margo instance
* @param [in] file output file ("-" for stdout)
* @param [in] uniquify flag indicating if file name should have additional
* information added to it to make output from different processes unique
* @returns void
*/
void margo_diag_dump(margo_instance_id mid, const char* file, int uniquify);
/**
* macro that registers a function as an RPC.
......
......@@ -33,6 +33,22 @@ struct mplex_element
UT_hash_handle hh;
};
struct diag_data
{
double min;
double max;
double cumulative;
int count;
};
#define __DIAG_UPDATE(__data, __time)\
do {\
__data.count++; \
__data.cumulative += (__time); \
if((__time) > __data.max) __data.max = (__time); \
if((__time) < __data.min) __data.min = (__time); \
} while(0)
struct margo_instance
{
/* provided by caller */
......@@ -57,6 +73,18 @@ struct margo_instance
/* hash table to track multiplexed rpcs registered with margo */
struct mplex_element *mplex_table;
/* optional diagnostics data tracking */
/* NOTE: technically the following fields are subject to races if they
* are updated from more than one thread at a time. We will be careful
* to only update the counters from the progress_fn,
* which will serialize access.
*/
int diag_enabled;
struct diag_data diag_trigger_elapsed;
struct diag_data diag_progress_elapsed_zero_timeout;
struct diag_data diag_progress_elapsed_nonzero_timeout;
struct diag_data diag_progress_timeout_value;
};
struct margo_cb_arg
......@@ -285,12 +313,26 @@ static void hg_progress_fn(void* foo)
unsigned int hg_progress_timeout = MERCURY_PROGRESS_TIMEOUT_UB;
double next_timer_exp;
int trigger_happened;
double tm1, tm2;
int diag_enabled = 0;
while(!mid->hg_progress_shutdown_flag)
{
trigger_happened = 0;
do {
/* save value of instance diag variable, in case it is modified
* while we are in loop
*/
diag_enabled = mid->diag_enabled;
if(diag_enabled) tm1 = ABT_get_wtime();
ret = HG_Trigger(mid->hg_context, 0, 1, &actual_count);
if(diag_enabled)
{
tm2 = ABT_get_wtime();
__DIAG_UPDATE(mid->diag_trigger_elapsed, (tm2-tm1));
}
if(ret == HG_SUCCESS && actual_count > 0)
trigger_happened = 1;
} while((ret == HG_SUCCESS) && actual_count && !mid->hg_progress_shutdown_flag);
......@@ -314,7 +356,14 @@ static void hg_progress_fn(void* foo)
* to make sure that this ULT is the lowest priority in that
* scenario.
*/
if(diag_enabled) tm1 = ABT_get_wtime();
ret = HG_Progress(mid->hg_context, 0);
if(diag_enabled)
{
tm2 = ABT_get_wtime();
__DIAG_UPDATE(mid->diag_progress_elapsed_zero_timeout, (tm2-tm1));
__DIAG_UPDATE(mid->diag_progress_timeout_value, 0);
}
if(ret == HG_SUCCESS)
{
/* Mercury completed something; loop around to trigger
......@@ -352,7 +401,18 @@ static void hg_progress_fn(void* foo)
hg_progress_timeout = 0;
}
}
if(diag_enabled) tm1 = ABT_get_wtime();
ret = HG_Progress(mid->hg_context, hg_progress_timeout);
if(diag_enabled)
{
tm2 = ABT_get_wtime();
if(hg_progress_timeout == 0)
__DIAG_UPDATE(mid->diag_progress_elapsed_zero_timeout, (tm2-tm1));
else
__DIAG_UPDATE(mid->diag_progress_elapsed_nonzero_timeout, (tm2-tm1));
__DIAG_UPDATE(mid->diag_progress_timeout_value, hg_progress_timeout);
}
if(ret != HG_SUCCESS && ret != HG_TIMEOUT)
{
/* TODO: error handling */
......@@ -810,3 +870,87 @@ int margo_register_mplex(margo_instance_id mid, hg_id_t id, uint32_t mplex_id, A
return(0);
}
void margo_diag_start(margo_instance_id mid)
{
mid->diag_enabled = 1;
}
static void print_diag_data(FILE *file, const char* name, const char *description, struct diag_data *data)
{
double avg;
fprintf(file, "# %s\n", description);
if(data->count != 0)
avg = data->cumulative/data->count;
else
avg = 0;
fprintf(file, "%s\t%.9f\t%.9f\t%.9f\t%d\n", name, avg, data->min, data->max, data->count);
return;
}
void margo_diag_dump(margo_instance_id mid, const char* file, int uniquify)
{
FILE *outfile;
time_t ltime;
char revised_file_name[256] = {0};
assert(mid->diag_enabled);
if(uniquify)
{
char hostname[128] = {0};
int pid;
gethostname(hostname, 128);
pid = getpid();
sprintf(revised_file_name, "%s-%s-%d", file, hostname, pid);
}
else
{
sprintf(revised_file_name, "%s", file);
}
if(strcmp("-", file) == 0)
{
outfile = stdout;
}
else
{
outfile = fopen(revised_file_name, "a");
if(!outfile)
{
perror("fopen");
return;
}
}
/* TODO: retrieve self addr and include in output */
/* TODO: support pattern substitution in file name to create unique
* output files per process
*/
time(&ltime);
fprintf(outfile, "# Margo diagnostics\n");
fprintf(outfile, "# %s\n", ctime(&ltime));
fprintf(outfile, "# <stat>\t<avg>\t<min>\t<max>\t<count>\n");
print_diag_data(outfile, "trigger_elapsed",
"Time consumed by HG_Trigger()",
&mid->diag_trigger_elapsed);
print_diag_data(outfile, "progress_elapsed_zero_timeout",
"Time consumed by HG_Progress() when called with timeout==0",
&mid->diag_progress_elapsed_zero_timeout);
print_diag_data(outfile, "progress_elapsed_nonzero_timeout",
"Time consumed by HG_Progress() when called with timeout!=0",
&mid->diag_progress_elapsed_nonzero_timeout);
print_diag_data(outfile, "progress_timeout_value",
"Timeout values passed to HG_Progress()",
&mid->diag_progress_timeout_value);
if(outfile != stdout)
fclose(outfile);
return;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment