Commit 321c6534 authored by Philip Carns's avatar Philip Carns
Browse files

Merge branch 'carns/dev-rpc-breadcrumb-redo2' into 'master'

Margo RPC instrumentation mechanism

See merge request !16
parents 065632ad d835964d
......@@ -129,6 +129,11 @@ also need to specify
We do not recommend using any BMI methods besides TCP. It's usage is very similar to the CCI/TCP examples above, except that "bmi+" should be substituted for "cci+".
## Instrumentation
See the [Instrumentation documentation](doc/instrumentation.md) for
information on how to extract diagnostic instrumentation from Margo.
## Design details
![Margo architecture](doc/fig/margo-diagram.png)
......
# Margo instrumentation
This file documents instrumentation capabilities that are built into the
margo library. See the [top level README.md](../README.md) for general
information about margo.
Margo includes two forms of instrumentation. The first measures time spent
executing key Mercury functions within the communication progress
loop. The second measures time spent invoking remote procedure calls.
## Usage
Both can be enabled at run time by calling the `margo_diag_start()` any
time after `margo_init()` on the process that you wish to instrument.
Statistics from both can then be emitted at any time prior to
`margo_finalize()` by calling the `margo_diag_dump()` function.
The arguments to `margo_diag_dump()` are as follows:
* `mid`: the margo instance to retrieve instrumentation from
* `file`: name of the file to write the (text) data to. If the "-" string
is used, then data will be written to `STDOUT`.
* `uniquify`: flag indicating that the file name should be suffixed with
additional characters to make it unique from other diagnostic files emited
on the same node.
## Output format
Example output from `margo_diag_dump()` will look like this for a given
processes:
```
# Margo diagnostics
# Wed Jul 31 11:15:13 2019
# RPC breadcrumbs for RPCs that were registered on this process:
# 0x5f22 data_xfer_read
# 0xa1ef delegator_read
# 0x5f22 data_xfer_read
# 0x9245 my_shutdown_rpc
# <stat> <avg> <min> <max> <count>
# Time consumed by HG_Trigger()
trigger_elapsed 0.000000036 0.000000238 0.000114679 3911094
# Time consumed by HG_Progress() when called with timeout==0
progress_elapsed_zero_timeout 0.000004716 0.000000238 0.016073227 3909480
# Time consumed by HG_Progress() when called with timeout!=0
progress_elapsed_nonzero_timeout 0.051754011 0.000023842 0.100308180 411
# Timeout values passed to HG_Progress()
progress_timeout_value 0.010511802 0.000000000 100.000000000 3909891
# RPC statistics
0x5f22 0xa1ef 0x0000 0x0000 0.001448274 0.001207113 0.007883787 100
```
Key components of the output are:
* A table of RPC names registered on that processes. Each has a 16 bit
hexadecimal identifier and a string name. There may be duplicates in the
table if the same RPC is registered more than once on the process.
* A set of statistics for Mercury functions used to drive communication and
completion project. There are counters and elapsed time measurements for
the `HG_Trigger()` function and the `HG_Progress()` function (when called with
or without a timeout value, as Margo varies its pollin strategy). There
is also a category that records statistics about the actual timeout values
used.
* A set of statistics for each RPC that was _issued_ by the process (in the
"RPC statistics" category at the end. Each RPC will be identified by a
set of up to 4 hexidecmial identifiers. The set of identifiers represents a
stack that shows the heritage of up to 4 chained RPCS that lead to this
measurement. Each identifier will match a name in the table at the top.
In the above example, only one RPC was issued by this
process: a "data_xfer_read" RPC that was issed as a side effect of a
"delegator_read" RPC.
## Implementation
## Future directions and use cases
......@@ -60,6 +60,7 @@ int main(int argc, char **argv)
fprintf(stderr, "Error: margo_init()\n");
return(-1);
}
margo_diag_start(mid);
/* register core RPC */
my_rpc_shutdown_id = MARGO_REGISTER(mid, "my_shutdown_rpc",
......@@ -145,6 +146,7 @@ int main(int argc, char **argv)
margo_addr_free(mid, data_xfer_svr_addr);
/* shut down everything */
margo_diag_dump(mid, "-", 0);
margo_finalize(mid);
free(buffer);
......
......@@ -42,6 +42,7 @@ static void my_rpc_shutdown_ult(hg_handle_t handle)
* margo_wait_for_finalize() to suspend until this RPC executes, so there
* is no need to send any extra signal to notify it.
*/
margo_diag_dump(mid, "-", 0);
margo_finalize(mid);
return;
......@@ -80,6 +81,7 @@ int main(int argc, char **argv)
fprintf(stderr, "Error: margo_init()\n");
return(-1);
}
margo_diag_start(mid);
/* figure out what address this server is listening on */
hret = margo_addr_self(mid, &addr_self);
......
......@@ -962,6 +962,20 @@ void __margo_internal_incr_pending(margo_instance_id mid);
*/
void __margo_internal_decr_pending(margo_instance_id mid);
/**
* @private
* Internal function used by DEFINE_MARGO_RPC_HANDLER, not supposed to be
* called by users!
*/
void __margo_internal_pre_wrapper_hooks(margo_instance_id mid, hg_handle_t handle);
/**
* @private
* Internal function used by DEFINE_MARGO_RPC_HANDLER, not supposed to be
* called by users!
*/
void __margo_internal_post_wrapper_hooks(margo_instance_id mid);
/**
* macro that registers a function as an RPC.
*/
......@@ -969,33 +983,31 @@ void __margo_internal_decr_pending(margo_instance_id mid);
margo_provider_register_name(__mid, __func_name, \
BOOST_PP_CAT(hg_proc_, __in_t), \
BOOST_PP_CAT(hg_proc_, __out_t), \
__handler##_handler, \
_handler_for_##__handler, \
MARGO_DEFAULT_PROVIDER_ID, ABT_POOL_NULL);
#define MARGO_REGISTER_PROVIDER(__mid, __func_name, __in_t, __out_t, __handler, __provider_id, __pool) \
margo_provider_register_name(__mid, __func_name, \
BOOST_PP_CAT(hg_proc_, __in_t), \
BOOST_PP_CAT(hg_proc_, __out_t), \
__handler##_handler, \
_handler_for_##__handler, \
__provider_id, __pool);
#define NULL_handler NULL
#define _handler_for_NULL NULL
/**
* macro that defines a function to glue an RPC handler to a ult handler
* @param [in] __name name of handler function
*/
#define DEFINE_MARGO_RPC_HANDLER(__name) \
void __name##_wrapper(hg_handle_t handle) { \
#define __MARGO_INTERNAL_RPC_WRAPPER_BODY(__name) \
margo_instance_id __mid; \
__mid = margo_hg_handle_get_instance(handle); \
__margo_internal_pre_wrapper_hooks(__mid, handle); \
__name(handle); \
__margo_internal_decr_pending(__mid); \
if(__margo_internal_finalize_requested(__mid)) { \
margo_finalize(__mid); \
} \
} \
hg_return_t __name##_handler(hg_handle_t handle) { \
__margo_internal_post_wrapper_hooks(__mid);
#define __MARGO_INTERNAL_RPC_WRAPPER(__name) \
void _wrapper_for_##__name(hg_handle_t handle) { \
__MARGO_INTERNAL_RPC_WRAPPER_BODY(__name) \
}
#define __MARGO_INTERNAL_RPC_HANDLER_BODY(__name) \
int __ret; \
ABT_pool __pool; \
margo_instance_id __mid; \
......@@ -1004,19 +1016,31 @@ hg_return_t __name##_handler(hg_handle_t handle) { \
if(__margo_internal_finalize_requested(__mid)) { return(HG_CANCELED); } \
__pool = margo_hg_handle_get_handler_pool(handle); \
__margo_internal_incr_pending(__mid); \
__ret = ABT_thread_create(__pool, (void (*)(void *))__name##_wrapper, handle, ABT_THREAD_ATTR_NULL, NULL); \
__ret = ABT_thread_create(__pool, (void (*)(void *))_wrapper_for_##__name, handle, ABT_THREAD_ATTR_NULL, NULL); \
if(__ret != 0) { \
return(HG_NOMEM_ERROR); \
} \
return(HG_SUCCESS); \
return(HG_SUCCESS);
#define __MARGO_INTERNAL_RPC_HANDLER(__name) \
hg_return_t _handler_for_##__name(hg_handle_t handle) { \
__MARGO_INTERNAL_RPC_HANDLER_BODY(__name) \
}
/**
* macro that defines a function to glue an RPC handler to a ult handler
* @param [in] __name name of handler function
*/
#define DEFINE_MARGO_RPC_HANDLER(__name) \
__MARGO_INTERNAL_RPC_WRAPPER(__name) \
__MARGO_INTERNAL_RPC_HANDLER(__name)
/**
* macro that declares the prototype for a function to glue an RPC
* handler to a ult
* @param [in] __name name of handler function
*/
#define DECLARE_MARGO_RPC_HANDLER(__name) hg_return_t __name##_handler(hg_handle_t handle);
#define DECLARE_MARGO_RPC_HANDLER(__name) hg_return_t _handler_for_##__name(hg_handle_t handle);
#ifdef __cplusplus
}
......
/*
* (C) 2015 The University of Chicago
*
......@@ -14,6 +13,7 @@
#include <margo-config.h>
#include <time.h>
#include <math.h>
#include <endian.h>
#include "margo.h"
#include "margo-timer.h"
......@@ -23,14 +23,26 @@
#define DEFAULT_MERCURY_PROGRESS_TIMEOUT_UB 100 /* 100 milliseconds */
#define DEFAULT_MERCURY_HANDLE_CACHE_SIZE 32
/* Structure to store timing information */
struct diag_data
{
double min;
double max;
double cumulative;
int count;
/* the following fields are only used when this structure is used to track
* rpc timing information
*/
uint64_t rpc_breadcrumb; /* identifier for rpc and it's ancestors */
UT_hash_handle hh; /* hash table link */
};
/* key for Argobots thread-local storage to track RPC breadcrumbs across thread
* execution
*/
static ABT_key rpc_breadcrumb_key = ABT_KEY_NULL;
#define __DIAG_UPDATE(__data, __time)\
do {\
__data.count++; \
......@@ -56,6 +68,17 @@ struct margo_finalize_cb
struct margo_timer_list; /* defined in margo-timer.c */
/* Stores the name and rpc id of a registered RPC. We track this purely for
* debugging and instrumentation purposes
*/
struct margo_registered_rpc
{
hg_id_t id; /* rpc id */
uint64_t rpc_breadcrumb_fragment; /* fragment id used in rpc tracing */
char func_name[64]; /* string name of rpc */
struct margo_registered_rpc *next; /* pointer to next in list */
};
struct margo_instance
{
/* mercury/argobots state */
......@@ -75,6 +98,9 @@ struct margo_instance
int num_handler_pool_threads;
unsigned int hg_progress_timeout_ub;
/* list of rpcs registered on this instance for debugging and profiling purposes */
struct margo_registered_rpc *registered_rpcs;
/* control logic for callers waiting on margo to be finalized */
int finalize_flag;
int refcount;
......@@ -110,12 +136,16 @@ struct margo_instance
struct diag_data diag_progress_elapsed_zero_timeout;
struct diag_data diag_progress_elapsed_nonzero_timeout;
struct diag_data diag_progress_timeout_value;
struct diag_data *diag_rpc;
ABT_mutex diag_rpc_mutex;
};
struct margo_request_struct {
ABT_eventual eventual;
margo_timer_t* timer;
hg_handle_t handle;
double start_time; /* timestamp of when the operation started */
uint64_t rpc_breadcrumb; /* statistics tracking identifier, if applicable */
};
struct margo_rpc_data
......@@ -130,6 +160,8 @@ MERCURY_GEN_PROC(margo_shutdown_out_t, ((int32_t)(ret)))
static void hg_progress_fn(void* foo);
static void margo_rpc_data_free(void* ptr);
static uint64_t margo_breadcrumb_set(hg_id_t rpc_id);
static void margo_breadcrumb_measure(margo_instance_id mid, uint64_t rpc_breadcrumb, double start);
static void remote_shutdown_ult(hg_handle_t handle);
DECLARE_MARGO_RPC_HANDLER(remote_shutdown_ult);
......@@ -359,6 +391,11 @@ margo_instance_id margo_init_pool(ABT_pool progress_pool, ABT_pool handler_pool,
hg_return_t hret;
struct margo_instance *mid;
/* set input offset to include breadcrumb information in Mercury requests */
hret = HG_Class_set_input_offset(HG_Context_get_class(hg_context), sizeof(uint64_t));
/* this should not ever fail */
assert(hret == HG_SUCCESS);
mid = calloc(1,sizeof(*mid));
if(!mid) goto err;
memset(mid, 0, sizeof(*mid));
......@@ -380,6 +417,8 @@ margo_instance_id margo_init_pool(ABT_pool progress_pool, ABT_pool handler_pool,
ABT_mutex_create(&mid->pending_operations_mtx);
mid->finalize_requested = 0;
ABT_mutex_create(&mid->diag_rpc_mutex);
mid->timer_list = margo_timer_list_create();
if(mid->timer_list == NULL) goto err;
......@@ -387,6 +426,15 @@ margo_instance_id margo_init_pool(ABT_pool progress_pool, ABT_pool handler_pool,
hret = margo_handle_cache_init(mid);
if(hret != HG_SUCCESS) goto err;
/* register thread local key to track RPC breadcrumbs across threads */
/* NOTE: we are registering a global key, even though init could be called
* multiple times for different margo instances. As of May 2019 this doesn't
* seem to be a problem to call ABT_key_create() multiple times.
*/
ret = ABT_key_create(free, &rpc_breadcrumb_key);
if(ret != 0)
goto err;
ret = ABT_thread_create(mid->progress_pool, hg_progress_fn, mid,
ABT_THREAD_ATTR_NULL, &mid->hg_progress_tid);
if(ret != 0) goto err;
......@@ -404,6 +452,7 @@ err:
ABT_mutex_free(&mid->finalize_mutex);
ABT_cond_free(&mid->finalize_cond);
ABT_mutex_free(&mid->pending_operations_mtx);
ABT_mutex_free(&mid->diag_rpc_mutex);
free(mid);
}
return MARGO_INSTANCE_NULL;
......@@ -412,6 +461,7 @@ err:
static void margo_cleanup(margo_instance_id mid)
{
int i;
struct margo_registered_rpc *next_rpc;
/* call finalize callbacks */
struct margo_finalize_cb* fcb = mid->finalize_cb;
......@@ -428,6 +478,7 @@ static void margo_cleanup(margo_instance_id mid)
ABT_mutex_free(&mid->finalize_mutex);
ABT_cond_free(&mid->finalize_cond);
ABT_mutex_free(&mid->pending_operations_mtx);
ABT_mutex_free(&mid->diag_rpc_mutex);
if (mid->owns_progress_pool)
{
......@@ -447,6 +498,11 @@ static void margo_cleanup(margo_instance_id mid)
margo_handle_cache_destroy(mid);
/* TODO: technically we could/should call ABT_key_free() for
* rpc_breadcrumb_key. We can't do that here, though, because the key is
* global, not local to this mid.
*/
if (mid->margo_init)
{
if (mid->hg_context)
......@@ -457,6 +513,13 @@ static void margo_cleanup(margo_instance_id mid)
ABT_finalize();
}
while(mid->registered_rpcs)
{
next_rpc = mid->registered_rpcs->next;
free(mid->registered_rpcs);
mid->registered_rpcs = next_rpc;
}
free(mid);
}
......@@ -628,14 +691,38 @@ hg_id_t margo_provider_register_name(margo_instance_id mid, const char *func_nam
{
hg_id_t id;
int ret;
struct margo_registered_rpc * tmp_rpc;
assert(provider_id <= MARGO_MAX_PROVIDER_ID);
id = gen_id(func_name, provider_id);
if(mid->diag_enabled)
{
/* track information about this rpc registration for debugging and
* profiling
*/
tmp_rpc = calloc(1, sizeof(*tmp_rpc));
if(!tmp_rpc)
return(0);
tmp_rpc->id = id;
tmp_rpc->rpc_breadcrumb_fragment = id >> (__MARGO_PROVIDER_ID_SIZE*8);
tmp_rpc->rpc_breadcrumb_fragment &= 0xffff;
strncpy(tmp_rpc->func_name, func_name, 63);
tmp_rpc->next = mid->registered_rpcs;
mid->registered_rpcs = tmp_rpc;
}
ret = margo_register_internal(mid, id, in_proc_cb, out_proc_cb, rpc_cb, pool);
if(ret == 0)
{
if(mid->diag_enabled)
{
mid->registered_rpcs = tmp_rpc->next;
free(tmp_rpc);
}
return(0);
}
return(id);
}
......@@ -836,6 +923,7 @@ static hg_return_t margo_cb(const struct hg_cb_info *info)
{
hg_return_t hret = info->ret;
margo_request req = (margo_request)(info->arg);
margo_instance_id mid;
if(hret == HG_CANCELED && req->timer) {
hret = HG_TIMEOUT;
......@@ -850,6 +938,16 @@ static hg_return_t margo_cb(const struct hg_cb_info *info)
free(req->timer);
}
if(req->rpc_breadcrumb != 0)
{
/* This is the callback from an HG_Forward call. Track RPC timing
* information.
*/
mid = margo_hg_handle_get_instance(req->handle);
assert(mid);
margo_breadcrumb_measure(mid, req->rpc_breadcrumb, req->start_time);
}
/* propagate return code out through eventual */
ABT_eventual_set(req->eventual, &hret, sizeof(hret));
......@@ -896,6 +994,7 @@ static hg_return_t margo_provider_iforward_internal(
hg_proc_cb_t in_cb, out_cb;
hg_bool_t flag;
margo_instance_id mid = margo_hg_handle_get_instance(handle);
uint64_t *rpc_breadcrumb;
assert(provider_id <= MARGO_MAX_PROVIDER_ID);
......@@ -958,6 +1057,17 @@ static hg_return_t margo_provider_iforward_internal(
req, timeout_ms);
}
/* add rpc breadcrumb to outbound request; this will be used to track
* rpc statistics.
*/
ret = HG_Get_input_buf(handle, (void**)&rpc_breadcrumb, NULL);
if(ret != HG_SUCCESS)
return(ret);
req->rpc_breadcrumb = margo_breadcrumb_set(hgi->id);
/* LE encoding */
*rpc_breadcrumb = htole64(req->rpc_breadcrumb);
req->start_time = ABT_get_wtime();
return HG_Forward(handle, margo_cb, (void*)req, in_struct);
}
......@@ -1038,6 +1148,8 @@ static hg_return_t margo_irespond_internal(
}
req->handle = handle;
req->timer = NULL;
req->start_time = ABT_get_wtime();
req->rpc_breadcrumb = 0;
return HG_Respond(handle, margo_cb, (void*)req, out_struct);
}
......@@ -1121,6 +1233,8 @@ static hg_return_t margo_bulk_itransfer_internal(
}
req->timer = NULL;
req->handle = HG_HANDLE_NULL;
req->start_time = ABT_get_wtime();
req->rpc_breadcrumb = 0;
hret = HG_Bulk_transfer(mid->hg_context, margo_cb,
(void*)req, op, origin_addr, origin_handle, origin_offset, local_handle,
......@@ -1464,6 +1578,9 @@ void margo_diag_dump(margo_instance_id mid, const char* file, int uniquify)
FILE *outfile;
time_t ltime;
char revised_file_name[256] = {0};
struct diag_data *dd, *tmp;
char rpc_breadcrumb_str[24] = {0};
struct margo_registered_rpc *tmp_rpc;
assert(mid->diag_enabled);
......@@ -1504,6 +1621,13 @@ void margo_diag_dump(margo_instance_id mid, const char* file, int uniquify)
time(&ltime);
fprintf(outfile, "# Margo diagnostics\n");
fprintf(outfile, "# %s\n", ctime(&ltime));
fprintf(outfile, "# RPC breadcrumbs for RPCs that were registered on this process:\n");
tmp_rpc = mid->registered_rpcs;
while(tmp_rpc)
{
fprintf(outfile, "# 0x%4lx\t%s\n", tmp_rpc->rpc_breadcrumb_fragment, tmp_rpc->func_name);
tmp_rpc = tmp_rpc->next;
}
fprintf(outfile, "# <stat>\t<avg>\t<min>\t<max>\t<count>\n");
print_diag_data(outfile, "trigger_elapsed",
"Time consumed by HG_Trigger()",
......@@ -1517,6 +1641,28 @@ void margo_diag_dump(margo_instance_id mid, const char* file, int uniquify)
print_diag_data(outfile, "progress_timeout_value",
"Timeout values passed to HG_Progress()",
&mid->diag_progress_timeout_value);
HASH_ITER(hh, mid->diag_rpc, dd, tmp)
{
int i;
uint64_t tmp_breadcrumb;
for(i=0; i<4; i++)
{
tmp_breadcrumb = dd->rpc_breadcrumb;
tmp_breadcrumb >>= (i*16);
tmp_breadcrumb &= 0xffff;
if(i==4)
sprintf(&rpc_breadcrumb_str[i*7], "0x%.4lx", tmp_breadcrumb);
else
sprintf(&rpc_breadcrumb_str[i*7], "0x%.4lx ", tmp_breadcrumb);
#if 0
if(i==0)
sprintf(&rpc_breadcrumb_str[(3-i)*5], "0x%4lx ", tmp_breadcrumb);
else
sprintf(&rpc_breadcrumb_str[(3-i)*5], "0x%4lx", tmp_breadcrumb);
#endif
}
print_diag_data(outfile, rpc_breadcrumb_str, "RPC statistics", dd);
}
if(outfile != stdout)
fclose(outfile);
......@@ -1742,3 +1888,135 @@ void __margo_internal_decr_pending(margo_instance_id mid)
mid->pending_operations -= 1;
ABT_mutex_unlock(mid->pending_operations_mtx);
}
/* sets the value of a breadcrumb, to be called just before issuing an RPC */
static uint64_t margo_breadcrumb_set(hg_id_t rpc_id)
{
uint64_t *val;
uint64_t tmp;
ABT_key_get(rpc_breadcrumb_key, (void**)(&val));
if(val == NULL)
{
/* key not set yet on this ULT; we need to allocate a new one
* with all zeroes for initial value of breadcrumb and idx
*/
/* NOTE: treating this as best effort; just return 0 if it fails */
val = calloc(1, sizeof(*val));
if(!val)
return(0);
}
/* NOTE: an rpc_id (after mux'ing) has provider in low order bits and
* base rpc_id in high order bits. After demuxing, a base_id has zeroed
* out low bits. So regardless of whether the rpc_id is a base_id or a
* mux'd id, either way we need to shift right to get either the
* provider id (or the space reserved for it) out of the way, then mask
* off 16 bits for use as a breadcrumb.
*/
tmp = rpc_id >> (__MARGO_PROVIDER_ID_SIZE*8);
tmp &= 0xffff;
/* clear low 16 bits of breadcrumb */
*val = (*val >> 16) << 16;
/* combine them, so that we have low order 16 of rpc id and high order
* bits of previous breadcrumb */
*val |= tmp;
ABT_key_set(rpc_breadcrumb_key, val);
return *val;
}
/* records statistics for a breadcrumb, to be used after completion of an
* RPC */
static void margo_breadcrumb_measure(margo_instance_id mid, uint64_t rpc_breadcrumb, double start)
{
struct diag_data *stat;
double end, elapsed;
if(!mid->diag_enabled)
return;
end = ABT_get_wtime();
elapsed = end-start;
ABT_mutex_lock(mid->diag_rpc_mutex);
HASH_FIND(hh, mid->diag_rpc, &rpc_breadcrumb,
sizeof(rpc_breadcrumb), stat);
if(!stat)
{
/* we aren't tracking this breadcrumb yet; add it */
stat = calloc(1, sizeof(*stat));
if(!stat)
{
/* best effort; we return gracefully without recording stats if this
* happens.
*/
ABT_mutex_unlock(mid->diag_rpc_mutex);
return;
}
stat->rpc_breadcrumb = rpc_breadcrumb;
HASH_ADD(hh, mid->diag_rpc, rpc_breadcrumb,
sizeof(rpc_breadcrumb), stat);
}
stat->count++;
stat->cumulative += elapsed;
if(elapsed > stat->max)
stat->max = elapsed;
if(stat->min == 0 || elapsed < stat->min)
stat->min = elapsed;
ABT_mutex_unlock(mid->diag_rpc_mutex);
return;
}
static void margo_internal_breadcrumb_handler_set(uint64_t rpc_breadcrumb)
{
uint64_t *val;