Commit ee5a3158 authored by Philip Carns's avatar Philip Carns

add diagnostics API

parent 03137369
......@@ -111,6 +111,8 @@ int main(int argc, char **argv)
/* actually start margo */
/***************************************/
mid = margo_init(0, 0, hg_context);
assert(mid);
margo_diag_start(mid);
/* register RPC */
my_rpc_id = MERCURY_REGISTER(hg_class, "my_rpc", my_rpc_in_t, my_rpc_out_t,
......@@ -173,6 +175,7 @@ int main(int argc, char **argv)
HG_Addr_free(hg_class, svr_addr);
/* shut down everything */
margo_diag_dump(mid, "-");
margo_finalize(mid);
ABT_finalize();
......
......@@ -93,6 +93,7 @@ int main(int argc, char **argv)
/* actually start margo */
mid = margo_init(0, 0, hg_context);
assert(mid);
margo_diag_start(mid);
/* register RPC */
MERCURY_REGISTER(hg_class, "my_rpc", my_rpc_in_t, my_rpc_out_t,
......
......@@ -102,6 +102,8 @@ static void my_rpc_shutdown_ult(hg_handle_t handle)
HG_Destroy(handle);
margo_diag_dump(mid, "-");
/* NOTE: we assume that the server daemon is using
* margo_wait_for_finalize() to suspend until this RPC executes, so there
* is no need to send any extra signal to notify it.
......
......@@ -209,6 +209,24 @@ int margo_register_mplex(margo_instance_id mid, hg_id_t id, uint32_t mplex_id, A
*/
int margo_lookup_mplex(margo_instance_id mid, hg_id_t id, uint32_t mplex_id, ABT_pool *pool);
/**
* Enables diagnostic collection on specified Margo instance
*
* @param [in] mid Margo instance
* @returns void
*/
void margo_diag_start(margo_instance_id mid);
/**
* Appends diagnostic statistics (enabled via margo_diag_start()) to specified
* output file.
*
* @param [in] mid Margo instance
* @param [in] file output file ("-" for stdout)
* @returns void
*/
void margo_diag_dump(margo_instance_id mid, const char* file);
#define MARGO_REGISTER(__mid, __func_name, __in_t, __out_t, __handler, __mplex_id, __pool) do { \
hg_return_t __hret; \
hg_id_t __id; \
......
......@@ -33,6 +33,22 @@ struct mplex_element
UT_hash_handle hh;
};
struct diag_data
{
double min;
double max;
double cumulative;
int count;
};
#define __DIAG_UPDATE(__data, __time)\
do {\
__data.count++; \
__data.cumulative += (__time); \
if((__time) > __data.max) __data.max = (__time); \
if((__time) < __data.min) __data.min = (__time); \
} while(0)
struct margo_instance
{
/* provided by caller */
......@@ -60,6 +76,18 @@ struct margo_instance
/* hash table to track multiplexed rpcs registered with margo */
struct mplex_element *mplex_table;
/* optional diagnostics data tracking */
/* NOTE: technically the following fields are subject to races if they
* are updated from more than one thread at a time. We will be careful
* to only update the counters from the progress_fn,
* which will serialize access.
*/
int diag_enabled;
struct diag_data diag_trigger_elapsed;
struct diag_data diag_progress_elapsed_zero_timeout;
struct diag_data diag_progress_elapsed_nonzero_timeout;
struct diag_data diag_progress_timeout_value;
};
struct margo_handler_mapping
......@@ -313,12 +341,26 @@ static void hg_progress_fn(void* foo)
unsigned int hg_progress_timeout = MERCURY_PROGRESS_TIMEOUT_UB;
double next_timer_exp;
int trigger_happened;
double tm1, tm2;
int diag_enabled = 0;
while(!mid->hg_progress_shutdown_flag)
{
trigger_happened = 0;
do {
/* save value of instance diag variable, in case it is modified
* while we are in loop
*/
diag_enabled = mid->diag_enabled;
if(diag_enabled) tm1 = ABT_get_wtime();
ret = HG_Trigger(mid->hg_context, 0, 1, &actual_count);
if(diag_enabled)
{
tm2 = ABT_get_wtime();
__DIAG_UPDATE(mid->diag_trigger_elapsed, (tm2-tm1));
}
if(ret == HG_SUCCESS && actual_count > 0)
trigger_happened = 1;
} while((ret == HG_SUCCESS) && actual_count && !mid->hg_progress_shutdown_flag);
......@@ -855,3 +897,53 @@ int margo_register_mplex(margo_instance_id mid, hg_id_t id, uint32_t mplex_id, A
return(0);
}
void margo_diag_start(margo_instance_id mid)
{
mid->diag_enabled = 1;
}
static void print_diag_data(FILE *file, const char* name, struct diag_data *data)
{
fprintf(file, "%s\t%.9f\t%.9f\t%.9f\t%d\n", name, data->cumulative/data->count, data->min, data->max, data->count);
return;
}
void margo_diag_dump(margo_instance_id mid, const char* file)
{
FILE *outfile;
time_t ltime;
assert(mid->diag_enabled);
if(strcmp("-", file) == 0)
{
outfile = stdout;
}
else
{
outfile = fopen(file, "a");
if(!outfile)
{
perror("fopen");
return;
}
}
/* TODO: retrieve self addr and include in output */
/* TODO: support pattern substitution in file name to create unique
* output files per process
*/
time(&ltime);
fprintf(outfile, "# Margo diagnostics\n");
fprintf(outfile, "# %s\n", ctime(&ltime));
fprintf(outfile, "# <stat>\t<avg>\t<min>\t<max>\t<count>\n");
print_diag_data(outfile, "trigger_elapsed", &mid->diag_trigger_elapsed);
if(outfile != stdout)
fclose(outfile);
return;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment